2016-06-03 11:05:34 +08:00
package weed_server
import (
2019-03-16 06:26:09 +08:00
"context"
2020-03-09 06:42:44 +08:00
"crypto/md5"
2016-06-03 11:05:34 +08:00
"encoding/json"
"errors"
2019-06-23 13:53:52 +08:00
"fmt"
2019-04-15 14:28:24 +08:00
"io"
2016-06-03 11:05:34 +08:00
"io/ioutil"
2019-03-28 05:25:18 +08:00
"mime"
2016-06-03 11:05:34 +08:00
"net/http"
"net/url"
2019-02-15 16:09:19 +08:00
"os"
2019-03-28 05:25:18 +08:00
filenamePath "path"
2017-01-08 09:16:29 +08:00
"strconv"
2017-01-08 22:34:26 +08:00
"strings"
2018-05-28 14:59:49 +08:00
"time"
2017-01-08 09:16:29 +08:00
2018-05-28 02:52:26 +08:00
"github.com/chrislusf/seaweedfs/weed/filer2"
2016-06-03 11:05:34 +08:00
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/operation"
2018-05-28 02:52:26 +08:00
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
2019-02-15 16:09:19 +08:00
"github.com/chrislusf/seaweedfs/weed/security"
2019-06-16 03:21:44 +08:00
"github.com/chrislusf/seaweedfs/weed/stats"
2020-03-09 15:16:10 +08:00
"github.com/chrislusf/seaweedfs/weed/storage/needle"
2016-06-03 11:05:34 +08:00
"github.com/chrislusf/seaweedfs/weed/util"
2018-07-22 08:47:59 +08:00
)
var (
OS_UID = uint32 ( os . Getuid ( ) )
OS_GID = uint32 ( os . Getgid ( ) )
2016-06-03 11:05:34 +08:00
)
2016-06-03 11:44:50 +08:00
type FilerPostResult struct {
Name string ` json:"name,omitempty" `
2020-02-04 09:04:06 +08:00
Size int64 ` json:"size,omitempty" `
2016-06-03 11:44:50 +08:00
Error string ` json:"error,omitempty" `
Fid string ` json:"fid,omitempty" `
Url string ` json:"url,omitempty" `
}
2020-03-09 15:16:10 +08:00
func ( fs * FilerServer ) assignNewFileInfo ( w http . ResponseWriter , r * http . Request , replication , collection , dataCenter , ttlString string ) ( fileId , urlLocation string , auth security . EncodedJwt , err error ) {
2019-06-23 13:53:52 +08:00
stats . FilerRequestCounter . WithLabelValues ( "assign" ) . Inc ( )
start := time . Now ( )
defer func ( ) { stats . FilerRequestHistogram . WithLabelValues ( "assign" ) . Observe ( time . Since ( start ) . Seconds ( ) ) } ( )
2016-06-26 10:50:18 +08:00
ar := & operation . VolumeAssignRequest {
Count : 1 ,
Replication : replication ,
Collection : collection ,
2020-03-09 15:16:10 +08:00
Ttl : ttlString ,
2018-07-15 04:36:28 +08:00
DataCenter : dataCenter ,
2016-06-26 10:50:18 +08:00
}
2018-07-10 15:20:50 +08:00
var altRequest * operation . VolumeAssignRequest
2018-07-15 04:36:28 +08:00
if dataCenter != "" {
2018-07-10 15:20:50 +08:00
altRequest = & operation . VolumeAssignRequest {
Count : 1 ,
Replication : replication ,
Collection : collection ,
2020-03-09 15:16:10 +08:00
Ttl : ttlString ,
2018-07-10 15:20:50 +08:00
DataCenter : "" ,
}
}
2018-07-22 01:39:02 +08:00
2019-02-19 04:11:52 +08:00
assignResult , ae := operation . Assign ( fs . filer . GetMaster ( ) , fs . grpcDialOption , ar , altRequest )
2016-06-08 15:46:14 +08:00
if ae != nil {
2018-07-22 01:39:02 +08:00
glog . Errorf ( "failing to assign a file id: %v" , ae )
2016-06-08 15:46:14 +08:00
writeJsonError ( w , r , http . StatusInternalServerError , ae )
err = ae
return
}
fileId = assignResult . Fid
urlLocation = "http://" + assignResult . Url + "/" + assignResult . Fid
2019-02-15 16:09:19 +08:00
auth = assignResult . Auth
2016-06-08 15:46:14 +08:00
return
}
2016-06-03 11:05:34 +08:00
func ( fs * FilerServer ) PostHandler ( w http . ResponseWriter , r * http . Request ) {
2016-08-06 06:01:30 +08:00
2019-03-16 06:55:34 +08:00
ctx := context . Background ( )
2016-06-03 11:05:34 +08:00
query := r . URL . Query ( )
2020-02-25 14:28:45 +08:00
collection , replication := fs . detectCollection ( r . RequestURI , query . Get ( "collection" ) , query . Get ( "replication" ) )
2018-07-15 04:36:28 +08:00
dataCenter := query . Get ( "dataCenter" )
if dataCenter == "" {
dataCenter = fs . option . DataCenter
}
2020-03-09 15:16:10 +08:00
ttlString := r . URL . Query ( ) . Get ( "ttl" )
2016-06-03 11:05:34 +08:00
2020-03-09 15:16:10 +08:00
// read ttl in seconds
ttl , err := needle . ReadTTL ( ttlString )
ttlSeconds := int32 ( 0 )
if err == nil {
ttlSeconds = int32 ( ttl . Minutes ( ) ) * 60
}
if autoChunked := fs . autoChunk ( ctx , w , r , replication , collection , dataCenter , ttlSeconds , ttlString ) ; autoChunked {
2016-08-06 06:01:30 +08:00
return
}
2020-03-07 22:06:58 +08:00
if fs . option . Cipher {
2020-03-09 15:16:10 +08:00
reply , err := fs . encrypt ( ctx , w , r , replication , collection , dataCenter , ttlSeconds , ttlString )
2020-03-07 22:06:58 +08:00
if err != nil {
writeJsonError ( w , r , http . StatusInternalServerError , err )
} else if reply != nil {
writeJsonQuiet ( w , r , http . StatusCreated , reply )
}
return
}
2020-03-09 15:16:10 +08:00
fileId , urlLocation , auth , err := fs . assignNewFileInfo ( w , r , replication , collection , dataCenter , ttlString )
2019-01-03 04:58:26 +08:00
2018-07-22 01:39:02 +08:00
if err != nil || fileId == "" || urlLocation == "" {
2019-01-03 04:58:26 +08:00
glog . V ( 0 ) . Infof ( "fail to allocate volume for %s, collection:%s, datacenter:%s" , r . URL . Path , collection , dataCenter )
2020-03-07 22:06:58 +08:00
writeJsonError ( w , r , http . StatusInternalServerError , fmt . Errorf ( "fail to allocate volume for %s, collection:%s, datacenter:%s" , r . URL . Path , collection , dataCenter ) )
2018-07-22 01:39:02 +08:00
return
}
2018-09-12 15:46:12 +08:00
glog . V ( 4 ) . Infof ( "write %s to %v" , r . URL . Path , urlLocation )
2016-06-03 11:05:34 +08:00
u , _ := url . Parse ( urlLocation )
2019-06-23 13:53:52 +08:00
ret , err := fs . uploadToVolumeServer ( r , u , auth , w , fileId )
if err != nil {
2016-06-03 11:05:34 +08:00
return
}
2019-06-23 13:53:52 +08:00
2020-03-09 15:16:10 +08:00
if err = fs . updateFilerStore ( ctx , r , w , replication , collection , ret , fileId , ttlSeconds ) ; err != nil {
2016-06-03 11:05:34 +08:00
return
}
2018-07-22 08:54:14 +08:00
2019-06-23 13:53:52 +08:00
// send back post result
reply := FilerPostResult {
Name : ret . Name ,
2020-02-04 09:04:06 +08:00
Size : int64 ( ret . Size ) ,
2019-06-23 13:53:52 +08:00
Error : ret . Error ,
Fid : fileId ,
Url : urlLocation ,
2016-06-03 11:05:34 +08:00
}
2019-06-23 13:53:52 +08:00
setEtag ( w , ret . ETag )
writeJsonQuiet ( w , r , http . StatusCreated , reply )
}
// update metadata in filer store
func ( fs * FilerServer ) updateFilerStore ( ctx context . Context , r * http . Request , w http . ResponseWriter ,
2020-03-09 15:16:10 +08:00
replication string , collection string , ret * operation . UploadResult , fileId string , ttlSeconds int32 ) ( err error ) {
2019-06-23 13:53:52 +08:00
stats . FilerRequestCounter . WithLabelValues ( "postStoreWrite" ) . Inc ( )
start := time . Now ( )
2019-06-23 16:57:51 +08:00
defer func ( ) {
stats . FilerRequestHistogram . WithLabelValues ( "postStoreWrite" ) . Observe ( time . Since ( start ) . Seconds ( ) )
} ( )
2016-06-03 15:24:55 +08:00
2019-11-27 00:04:47 +08:00
modeStr := r . URL . Query ( ) . Get ( "mode" )
if modeStr == "" {
modeStr = "0660"
}
mode , err := strconv . ParseUint ( modeStr , 8 , 32 )
if err != nil {
glog . Errorf ( "Invalid mode format: %s, use 0660 by default" , modeStr )
mode = 0660
}
2019-06-23 13:53:52 +08:00
path := r . URL . Path
2019-07-11 19:20:03 +08:00
if strings . HasSuffix ( path , "/" ) {
if ret . Name != "" {
path += ret . Name
}
}
2020-03-23 15:01:34 +08:00
existingEntry , err := fs . filer . FindEntry ( ctx , util . FullPath ( path ) )
2018-12-17 16:20:00 +08:00
crTime := time . Now ( )
if err == nil && existingEntry != nil {
2019-07-11 19:20:03 +08:00
crTime = existingEntry . Crtime
2018-12-17 16:20:00 +08:00
}
2018-05-14 14:56:16 +08:00
entry := & filer2 . Entry {
2020-03-23 15:01:34 +08:00
FullPath : util . FullPath ( path ) ,
2018-05-14 14:56:16 +08:00
Attr : filer2 . Attr {
2018-06-11 07:57:32 +08:00
Mtime : time . Now ( ) ,
2018-12-17 16:20:00 +08:00
Crtime : crTime ,
2019-11-27 00:04:47 +08:00
Mode : os . FileMode ( mode ) ,
2018-07-22 08:47:59 +08:00
Uid : OS_UID ,
Gid : OS_GID ,
2018-06-11 07:57:32 +08:00
Replication : replication ,
Collection : collection ,
2020-03-09 15:16:10 +08:00
TtlSec : ttlSeconds ,
2020-03-07 23:25:15 +08:00
Mime : ret . Mime ,
2018-05-14 14:56:16 +08:00
} ,
2018-05-16 15:08:44 +08:00
Chunks : [ ] * filer_pb . FileChunk { {
FileId : fileId ,
2018-07-22 08:47:59 +08:00
Size : uint64 ( ret . Size ) ,
2018-05-16 15:54:44 +08:00
Mtime : time . Now ( ) . UnixNano ( ) ,
2019-06-23 13:53:52 +08:00
ETag : ret . ETag ,
2018-05-14 14:56:16 +08:00
} } ,
}
2020-03-07 23:25:15 +08:00
if entry . Attr . Mime == "" {
if ext := filenamePath . Ext ( path ) ; ext != "" {
entry . Attr . Mime = mime . TypeByExtension ( ext )
}
2019-03-28 05:25:18 +08:00
}
2019-01-03 04:58:26 +08:00
// glog.V(4).Infof("saving %s => %+v", path, entry)
2020-01-23 03:42:40 +08:00
if dbErr := fs . filer . CreateEntry ( ctx , entry , false ) ; dbErr != nil {
2019-12-13 16:23:05 +08:00
fs . filer . DeleteChunks ( entry . Chunks )
2019-06-23 13:53:52 +08:00
glog . V ( 0 ) . Infof ( "failing to write %s to filer server : %v" , path , dbErr )
writeJsonError ( w , r , http . StatusInternalServerError , dbErr )
err = dbErr
2016-06-03 11:05:34 +08:00
return
}
2016-06-03 11:44:50 +08:00
2019-06-23 13:53:52 +08:00
return nil
}
// send request to volume server
2020-03-07 22:06:58 +08:00
func ( fs * FilerServer ) uploadToVolumeServer ( r * http . Request , u * url . URL , auth security . EncodedJwt , w http . ResponseWriter , fileId string ) ( ret * operation . UploadResult , err error ) {
2019-06-23 13:53:52 +08:00
stats . FilerRequestCounter . WithLabelValues ( "postUpload" ) . Inc ( )
start := time . Now ( )
defer func ( ) { stats . FilerRequestHistogram . WithLabelValues ( "postUpload" ) . Observe ( time . Since ( start ) . Seconds ( ) ) } ( )
2020-03-07 22:06:58 +08:00
ret = & operation . UploadResult { }
2020-03-09 06:42:44 +08:00
hash := md5 . New ( )
var body = ioutil . NopCloser ( io . TeeReader ( r . Body , hash ) )
2020-03-07 22:06:58 +08:00
2019-06-23 13:53:52 +08:00
request := & http . Request {
Method : r . Method ,
URL : u ,
Proto : r . Proto ,
ProtoMajor : r . ProtoMajor ,
ProtoMinor : r . ProtoMinor ,
Header : r . Header ,
2020-03-09 06:42:44 +08:00
Body : body ,
2019-06-23 13:53:52 +08:00
Host : r . Host ,
ContentLength : r . ContentLength ,
2016-06-03 11:44:50 +08:00
}
2020-03-07 22:06:58 +08:00
2019-06-23 13:53:52 +08:00
if auth != "" {
request . Header . Set ( "Authorization" , "BEARER " + string ( auth ) )
}
resp , doErr := util . Do ( request )
if doErr != nil {
glog . Errorf ( "failing to connect to volume server %s: %v, %+v" , r . RequestURI , doErr , r . Method )
writeJsonError ( w , r , http . StatusInternalServerError , doErr )
err = doErr
return
}
defer func ( ) {
io . Copy ( ioutil . Discard , resp . Body )
resp . Body . Close ( )
} ( )
2020-03-09 06:42:44 +08:00
2019-06-23 13:53:52 +08:00
respBody , raErr := ioutil . ReadAll ( resp . Body )
if raErr != nil {
glog . V ( 0 ) . Infoln ( "failing to upload to volume server" , r . RequestURI , raErr . Error ( ) )
writeJsonError ( w , r , http . StatusInternalServerError , raErr )
err = raErr
return
}
2020-03-09 06:42:44 +08:00
2019-06-23 13:53:52 +08:00
glog . V ( 4 ) . Infoln ( "post result" , string ( respBody ) )
unmarshalErr := json . Unmarshal ( respBody , & ret )
if unmarshalErr != nil {
glog . V ( 0 ) . Infoln ( "failing to read upload resonse" , r . RequestURI , string ( respBody ) )
writeJsonError ( w , r , http . StatusInternalServerError , unmarshalErr )
err = unmarshalErr
return
}
if ret . Error != "" {
err = errors . New ( ret . Error )
glog . V ( 0 ) . Infoln ( "failing to post to volume server" , r . RequestURI , ret . Error )
writeJsonError ( w , r , http . StatusInternalServerError , err )
return
}
// find correct final path
path := r . URL . Path
if strings . HasSuffix ( path , "/" ) {
if ret . Name != "" {
path += ret . Name
} else {
err = fmt . Errorf ( "can not to write to folder %s without a file name" , path )
fs . filer . DeleteFileByFileId ( fileId )
glog . V ( 0 ) . Infoln ( "Can not to write to folder" , path , "without a file name!" )
writeJsonError ( w , r , http . StatusInternalServerError , err )
return
}
}
2020-03-09 06:42:44 +08:00
// use filer calculated md5 ETag, instead of the volume server crc ETag
ret . ETag = fmt . Sprintf ( "%x" , hash . Sum ( nil ) )
2019-06-23 13:53:52 +08:00
return
2016-06-03 11:05:34 +08:00
}
// curl -X DELETE http://localhost:8888/path/to
2018-07-25 13:33:26 +08:00
// curl -X DELETE http://localhost:8888/path/to?recursive=true
2019-09-12 11:26:20 +08:00
// curl -X DELETE http://localhost:8888/path/to?recursive=true&ignoreRecursiveError=true
2019-12-12 06:58:22 +08:00
// curl -X DELETE http://localhost:8888/path/to?recursive=true&skipChunkDeletion=true
2016-06-03 11:05:34 +08:00
func ( fs * FilerServer ) DeleteHandler ( w http . ResponseWriter , r * http . Request ) {
2018-05-14 14:56:16 +08:00
2018-07-25 13:33:26 +08:00
isRecursive := r . FormValue ( "recursive" ) == "true"
2020-01-01 03:52:54 +08:00
if ! isRecursive && fs . option . recursiveDelete {
if r . FormValue ( "recursive" ) != "false" {
isRecursive = true
}
}
2019-09-12 11:26:20 +08:00
ignoreRecursiveError := r . FormValue ( "ignoreRecursiveError" ) == "true"
2019-12-12 06:58:22 +08:00
skipChunkDeletion := r . FormValue ( "skipChunkDeletion" ) == "true"
2018-07-25 13:33:26 +08:00
2020-03-23 15:01:34 +08:00
err := fs . filer . DeleteEntryMetaAndData ( context . Background ( ) , util . FullPath ( r . URL . Path ) , isRecursive , ignoreRecursiveError , ! skipChunkDeletion )
2018-05-14 14:56:16 +08:00
if err != nil {
2018-07-22 09:47:23 +08:00
glog . V ( 1 ) . Infoln ( "deleting" , r . URL . Path , ":" , err . Error ( ) )
2019-12-19 13:04:40 +08:00
httpStatus := http . StatusInternalServerError
2020-03-08 09:01:39 +08:00
if err == filer_pb . ErrNotFound {
2019-12-19 13:04:40 +08:00
httpStatus = http . StatusNotFound
}
writeJsonError ( w , r , httpStatus , err )
2018-05-14 14:56:16 +08:00
return
}
2018-07-22 09:49:47 +08:00
w . WriteHeader ( http . StatusNoContent )
2016-06-03 11:05:34 +08:00
}
2020-02-25 14:28:45 +08:00
func ( fs * FilerServer ) detectCollection ( requestURI , qCollection , qReplication string ) ( collection , replication string ) {
// default
collection = fs . option . Collection
replication = fs . option . DefaultReplication
// get default collection settings
if qCollection != "" {
collection = qCollection
}
if qReplication != "" {
replication = qReplication
}
// required by buckets folder
if strings . HasPrefix ( requestURI , fs . filer . DirBucketsPath + "/" ) {
bucketAndObjectKey := requestURI [ len ( fs . filer . DirBucketsPath ) + 1 : ]
t := strings . Index ( bucketAndObjectKey , "/" )
if t < 0 {
collection = bucketAndObjectKey
}
if t > 0 {
collection = bucketAndObjectKey [ : t ]
}
replication = fs . filer . ReadBucketOption ( collection )
}
return
}