seaweedfs/weed/server/filer_server_handlers_write.go

165 lines
4.7 KiB
Go
Raw Normal View History

2016-06-03 11:05:34 +08:00
package weed_server
import (
2019-03-16 06:26:09 +08:00
"context"
2016-06-03 11:05:34 +08:00
"net/http"
2019-02-15 16:09:19 +08:00
"os"
"strings"
2018-05-28 14:59:49 +08:00
"time"
2016-06-03 11:05:34 +08:00
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/operation"
2018-05-28 02:52:26 +08:00
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
2019-02-15 16:09:19 +08:00
"github.com/chrislusf/seaweedfs/weed/security"
2019-06-16 03:21:44 +08:00
"github.com/chrislusf/seaweedfs/weed/stats"
"github.com/chrislusf/seaweedfs/weed/storage/needle"
2016-06-03 11:05:34 +08:00
"github.com/chrislusf/seaweedfs/weed/util"
)
var (
OS_UID = uint32(os.Getuid())
OS_GID = uint32(os.Getgid())
2016-06-03 11:05:34 +08:00
)
type FilerPostResult struct {
Name string `json:"name,omitempty"`
Size int64 `json:"size,omitempty"`
Error string `json:"error,omitempty"`
Fid string `json:"fid,omitempty"`
Url string `json:"url,omitempty"`
}
func (fs *FilerServer) assignNewFileInfo(replication, collection, dataCenter, rack, ttlString string, fsync bool) (fileId, urlLocation string, auth security.EncodedJwt, err error) {
2019-06-23 13:53:52 +08:00
stats.FilerRequestCounter.WithLabelValues("assign").Inc()
start := time.Now()
defer func() { stats.FilerRequestHistogram.WithLabelValues("assign").Observe(time.Since(start).Seconds()) }()
2016-06-26 10:50:18 +08:00
ar := &operation.VolumeAssignRequest{
Count: 1,
Replication: replication,
Collection: collection,
Ttl: ttlString,
DataCenter: dataCenter,
2016-06-26 10:50:18 +08:00
}
2018-07-10 15:20:50 +08:00
var altRequest *operation.VolumeAssignRequest
if dataCenter != "" || rack != "" {
2018-07-10 15:20:50 +08:00
altRequest = &operation.VolumeAssignRequest{
Count: 1,
Replication: replication,
Collection: collection,
Ttl: ttlString,
2018-07-10 15:20:50 +08:00
DataCenter: "",
Rack: "",
2018-07-10 15:20:50 +08:00
}
}
2019-02-19 04:11:52 +08:00
assignResult, ae := operation.Assign(fs.filer.GetMaster(), fs.grpcDialOption, ar, altRequest)
2016-06-08 15:46:14 +08:00
if ae != nil {
glog.Errorf("failing to assign a file id: %v", ae)
2016-06-08 15:46:14 +08:00
err = ae
return
}
fileId = assignResult.Fid
urlLocation = "http://" + assignResult.Url + "/" + assignResult.Fid
2020-04-12 14:37:10 +08:00
if fsync {
urlLocation += "?fsync=true"
}
2019-02-15 16:09:19 +08:00
auth = assignResult.Auth
2016-06-08 15:46:14 +08:00
return
}
2016-06-03 11:05:34 +08:00
func (fs *FilerServer) PostHandler(w http.ResponseWriter, r *http.Request) {
2019-03-16 06:55:34 +08:00
ctx := context.Background()
2016-06-03 11:05:34 +08:00
query := r.URL.Query()
2020-04-12 14:37:10 +08:00
collection, replication, fsync := fs.detectCollection(r.RequestURI, query.Get("collection"), query.Get("replication"))
dataCenter := query.Get("dataCenter")
if dataCenter == "" {
dataCenter = fs.option.DataCenter
}
rack := query.Get("rack")
if dataCenter == "" {
rack = fs.option.Rack
}
ttlString := r.URL.Query().Get("ttl")
2016-06-03 11:05:34 +08:00
// read ttl in seconds
ttl, err := needle.ReadTTL(ttlString)
ttlSeconds := int32(0)
if err == nil {
ttlSeconds = int32(ttl.Minutes()) * 60
}
fs.autoChunk(ctx, w, r, replication, collection, dataCenter, rack, ttlSeconds, ttlString, fsync)
2019-06-23 13:53:52 +08:00
2016-06-03 11:05:34 +08:00
}
// curl -X DELETE http://localhost:8888/path/to
2018-07-25 13:33:26 +08:00
// curl -X DELETE http://localhost:8888/path/to?recursive=true
// curl -X DELETE http://localhost:8888/path/to?recursive=true&ignoreRecursiveError=true
2019-12-12 06:58:22 +08:00
// curl -X DELETE http://localhost:8888/path/to?recursive=true&skipChunkDeletion=true
2016-06-03 11:05:34 +08:00
func (fs *FilerServer) DeleteHandler(w http.ResponseWriter, r *http.Request) {
2018-05-14 14:56:16 +08:00
2018-07-25 13:33:26 +08:00
isRecursive := r.FormValue("recursive") == "true"
if !isRecursive && fs.option.recursiveDelete {
if r.FormValue("recursive") != "false" {
isRecursive = true
}
}
ignoreRecursiveError := r.FormValue("ignoreRecursiveError") == "true"
2019-12-12 06:58:22 +08:00
skipChunkDeletion := r.FormValue("skipChunkDeletion") == "true"
2018-07-25 13:33:26 +08:00
objectPath := r.URL.Path
if len(r.URL.Path) > 1 && strings.HasSuffix(objectPath, "/") {
objectPath = objectPath[0 : len(objectPath)-1]
}
err := fs.filer.DeleteEntryMetaAndData(context.Background(), util.FullPath(objectPath), isRecursive, ignoreRecursiveError, !skipChunkDeletion, false, nil)
2018-05-14 14:56:16 +08:00
if err != nil {
glog.V(1).Infoln("deleting", objectPath, ":", err.Error())
httpStatus := http.StatusInternalServerError
2020-03-08 09:01:39 +08:00
if err == filer_pb.ErrNotFound {
httpStatus = http.StatusNotFound
}
writeJsonError(w, r, httpStatus, err)
2018-05-14 14:56:16 +08:00
return
}
2018-07-22 09:49:47 +08:00
w.WriteHeader(http.StatusNoContent)
2016-06-03 11:05:34 +08:00
}
2020-04-12 14:37:10 +08:00
func (fs *FilerServer) detectCollection(requestURI, qCollection, qReplication string) (collection, replication string, fsync bool) {
// default
collection = fs.option.Collection
replication = fs.option.DefaultReplication
// get default collection settings
if qCollection != "" {
collection = qCollection
}
if qReplication != "" {
replication = qReplication
}
// required by buckets folder
bucketDefaultReplication := ""
if strings.HasPrefix(requestURI, fs.filer.DirBucketsPath+"/") {
bucketAndObjectKey := requestURI[len(fs.filer.DirBucketsPath)+1:]
t := strings.Index(bucketAndObjectKey, "/")
if t < 0 {
collection = bucketAndObjectKey
}
if t > 0 {
collection = bucketAndObjectKey[:t]
}
bucketDefaultReplication, fsync = fs.filer.ReadBucketOption(collection)
}
if replication == "" {
replication = bucketDefaultReplication
}
return
}