From 6daa932f5c1571cc60cf89014cf17810d756dd6b Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Wed, 17 Feb 2021 20:55:55 -0800 Subject: [PATCH] refactoring to get master function, instead of passing master values directly this will enable retrying later --- unmaintained/volume_tailer/volume_tailer.go | 2 +- weed/command/backup.go | 2 +- weed/command/benchmark.go | 4 ++-- weed/command/download.go | 12 +++++------ weed/command/filer_copy.go | 4 +++- weed/command/upload.go | 4 ++-- weed/filer/filer_notify_append.go | 2 +- weed/operation/assign_file_id.go | 4 ++-- weed/operation/chunked_file.go | 8 ++++--- weed/operation/delete_content.go | 4 ++-- weed/operation/lookup.go | 15 +++++++------ weed/operation/submit.go | 24 +++++++++++---------- weed/operation/tail_volume.go | 4 ++-- weed/server/common.go | 4 ++-- weed/server/filer_grpc_server.go | 2 +- weed/server/filer_server_handlers_write.go | 2 +- weed/server/master_server_handlers_admin.go | 4 ++-- weed/server/volume_server_handlers_read.go | 2 +- weed/server/volume_server_handlers_write.go | 6 +++--- weed/topology/store_replicate.go | 12 +++++------ 20 files changed, 64 insertions(+), 57 deletions(-) diff --git a/unmaintained/volume_tailer/volume_tailer.go b/unmaintained/volume_tailer/volume_tailer.go index e93f1cc13..32da2e6ab 100644 --- a/unmaintained/volume_tailer/volume_tailer.go +++ b/unmaintained/volume_tailer/volume_tailer.go @@ -37,7 +37,7 @@ func main() { sinceTimeNs = time.Now().Add(-*rewindDuration).UnixNano() } - err := operation.TailVolume(*master, grpcDialOption, vid, uint64(sinceTimeNs), *timeoutSeconds, func(n *needle.Needle) (err error) { + err := operation.TailVolume(func()string{return *master}, grpcDialOption, vid, uint64(sinceTimeNs), *timeoutSeconds, func(n *needle.Needle) (err error) { if n.Size == 0 { println("-", n.String()) return nil diff --git a/weed/command/backup.go b/weed/command/backup.go index 4c37c2763..b9973c6a9 100644 --- a/weed/command/backup.go +++ b/weed/command/backup.go @@ -72,7 +72,7 @@ func runBackup(cmd *Command, args []string) bool { vid := needle.VolumeId(*s.volumeId) // find volume location, replication, ttl info - lookup, err := operation.Lookup(*s.master, vid.String()) + lookup, err := operation.Lookup(func()string{return *s.master}, vid.String()) if err != nil { fmt.Printf("Error looking up volume %d: %v\n", vid, err) return true diff --git a/weed/command/benchmark.go b/weed/command/benchmark.go index 370ec9eaf..e1b6d8d6c 100644 --- a/weed/command/benchmark.go +++ b/weed/command/benchmark.go @@ -238,12 +238,12 @@ func writeFiles(idChan chan int, fileIdLineChan chan string, s *stat) { Replication: *b.replication, DiskType: *b.diskType, } - if assignResult, err := operation.Assign(b.masterClient.GetMaster(), b.grpcDialOption, ar); err == nil { + if assignResult, err := operation.Assign(b.masterClient.GetMaster, b.grpcDialOption, ar); err == nil { fp.Server, fp.Fid, fp.Collection = assignResult.Url, assignResult.Fid, *b.collection if !isSecure && assignResult.Auth != "" { isSecure = true } - if _, err := fp.Upload(0, b.masterClient.GetMaster(), false, assignResult.Auth, b.grpcDialOption); err == nil { + if _, err := fp.Upload(0, b.masterClient.GetMaster, false, assignResult.Auth, b.grpcDialOption); err == nil { if random.Intn(100) < *b.deletePercentage { s.total++ delayedDeleteChan <- &delayedFile{time.Now().Add(time.Second), fp} diff --git a/weed/command/download.go b/weed/command/download.go index f7588fbf0..5da1f57d9 100644 --- a/weed/command/download.go +++ b/weed/command/download.go @@ -44,15 +44,15 @@ var cmdDownload = &Command{ func runDownload(cmd *Command, args []string) bool { for _, fid := range args { - if e := downloadToFile(*d.server, fid, util.ResolvePath(*d.dir)); e != nil { + if e := downloadToFile(func()string{return *d.server}, fid, util.ResolvePath(*d.dir)); e != nil { fmt.Println("Download Error: ", fid, e) } } return true } -func downloadToFile(server, fileId, saveDir string) error { - fileUrl, lookupError := operation.LookupFileId(server, fileId) +func downloadToFile(masterFn operation.GetMasterFn, fileId, saveDir string) error { + fileUrl, lookupError := operation.LookupFileId(masterFn, fileId) if lookupError != nil { return lookupError } @@ -83,7 +83,7 @@ func downloadToFile(server, fileId, saveDir string) error { fids := strings.Split(string(content), "\n") for _, partId := range fids { var n int - _, part, err := fetchContent(*d.server, partId) + _, part, err := fetchContent(masterFn, partId) if err == nil { n, err = f.Write(part) } @@ -103,8 +103,8 @@ func downloadToFile(server, fileId, saveDir string) error { return nil } -func fetchContent(server string, fileId string) (filename string, content []byte, e error) { - fileUrl, lookupError := operation.LookupFileId(server, fileId) +func fetchContent(masterFn operation.GetMasterFn, fileId string) (filename string, content []byte, e error) { + fileUrl, lookupError := operation.LookupFileId(masterFn, fileId) if lookupError != nil { return "", nil, lookupError } diff --git a/weed/command/filer_copy.go b/weed/command/filer_copy.go index 20f00a3eb..bf64e72b3 100644 --- a/weed/command/filer_copy.go +++ b/weed/command/filer_copy.go @@ -463,7 +463,9 @@ func (worker *FileCopyWorker) uploadFileInChunks(task FileCopyTask, f *os.File, for _, chunk := range chunks { fileIds = append(fileIds, chunk.FileId) } - operation.DeleteFiles(copy.masters[0], false, worker.options.grpcDialOption, fileIds) + operation.DeleteFiles(func() string { + return copy.masters[0] + }, false, worker.options.grpcDialOption, fileIds) return uploadError } diff --git a/weed/command/upload.go b/weed/command/upload.go index 85d6e5c09..effcf8836 100644 --- a/weed/command/upload.go +++ b/weed/command/upload.go @@ -96,7 +96,7 @@ func runUpload(cmd *Command, args []string) bool { if e != nil { return e } - results, e := operation.SubmitFiles(*upload.master, grpcDialOption, parts, *upload.replication, *upload.collection, *upload.dataCenter, *upload.ttl, *upload.diskType, *upload.maxMB, *upload.usePublicUrl) + results, e := operation.SubmitFiles(func()string {return *upload.master}, grpcDialOption, parts, *upload.replication, *upload.collection, *upload.dataCenter, *upload.ttl, *upload.diskType, *upload.maxMB, *upload.usePublicUrl) bytes, _ := json.Marshal(results) fmt.Println(string(bytes)) if e != nil { @@ -113,7 +113,7 @@ func runUpload(cmd *Command, args []string) bool { if e != nil { fmt.Println(e.Error()) } - results, _ := operation.SubmitFiles(*upload.master, grpcDialOption, parts, *upload.replication, *upload.collection, *upload.dataCenter, *upload.ttl, *upload.diskType, *upload.maxMB, *upload.usePublicUrl) + results, _ := operation.SubmitFiles(func()string {return *upload.master}, grpcDialOption, parts, *upload.replication, *upload.collection, *upload.dataCenter, *upload.ttl, *upload.diskType, *upload.maxMB, *upload.usePublicUrl) bytes, _ := json.Marshal(results) fmt.Println(string(bytes)) } diff --git a/weed/filer/filer_notify_append.go b/weed/filer/filer_notify_append.go index 09c39dd89..d441bbbc9 100644 --- a/weed/filer/filer_notify_append.go +++ b/weed/filer/filer_notify_append.go @@ -56,7 +56,7 @@ func (f *Filer) assignAndUpload(targetFile string, data []byte) (*operation.Assi WritableVolumeCount: rule.VolumeGrowthCount, } - assignResult, err := operation.Assign(f.GetMaster(), f.GrpcDialOption, assignRequest) + assignResult, err := operation.Assign(f.GetMaster, f.GrpcDialOption, assignRequest) if err != nil { return nil, nil, fmt.Errorf("AssignVolume: %v", err) } diff --git a/weed/operation/assign_file_id.go b/weed/operation/assign_file_id.go index b2fcf0c96..cc1359961 100644 --- a/weed/operation/assign_file_id.go +++ b/weed/operation/assign_file_id.go @@ -34,7 +34,7 @@ type AssignResult struct { Auth security.EncodedJwt `json:"auth,omitempty"` } -func Assign(server string, grpcDialOption grpc.DialOption, primaryRequest *VolumeAssignRequest, alternativeRequests ...*VolumeAssignRequest) (*AssignResult, error) { +func Assign(masterFn GetMasterFn, grpcDialOption grpc.DialOption, primaryRequest *VolumeAssignRequest, alternativeRequests ...*VolumeAssignRequest) (*AssignResult, error) { var requests []*VolumeAssignRequest requests = append(requests, primaryRequest) @@ -48,7 +48,7 @@ func Assign(server string, grpcDialOption grpc.DialOption, primaryRequest *Volum continue } - lastError = WithMasterServerClient(server, grpcDialOption, func(masterClient master_pb.SeaweedClient) error { + lastError = WithMasterServerClient(masterFn(), grpcDialOption, func(masterClient master_pb.SeaweedClient) error { req := &master_pb.AssignRequest{ Count: request.Count, diff --git a/weed/operation/chunked_file.go b/weed/operation/chunked_file.go index 1bac028ff..8506e0518 100644 --- a/weed/operation/chunked_file.go +++ b/weed/operation/chunked_file.go @@ -72,12 +72,12 @@ func (cm *ChunkManifest) Marshal() ([]byte, error) { return json.Marshal(cm) } -func (cm *ChunkManifest) DeleteChunks(master string, usePublicUrl bool, grpcDialOption grpc.DialOption) error { +func (cm *ChunkManifest) DeleteChunks(masterFn GetMasterFn, usePublicUrl bool, grpcDialOption grpc.DialOption) error { var fileIds []string for _, ci := range cm.Chunks { fileIds = append(fileIds, ci.Fid) } - results, err := DeleteFiles(master, usePublicUrl, grpcDialOption, fileIds) + results, err := DeleteFiles(masterFn, usePublicUrl, grpcDialOption, fileIds) if err != nil { glog.V(0).Infof("delete %+v: %v", fileIds, err) return fmt.Errorf("chunk delete: %v", err) @@ -174,7 +174,9 @@ func (cf *ChunkedFileReader) WriteTo(w io.Writer) (n int64, err error) { for ; chunkIndex < len(cf.chunkList); chunkIndex++ { ci := cf.chunkList[chunkIndex] // if we need read date from local volume server first? - fileUrl, lookupError := LookupFileId(cf.master, ci.Fid) + fileUrl, lookupError := LookupFileId(func() string { + return cf.master + }, ci.Fid) if lookupError != nil { return n, lookupError } diff --git a/weed/operation/delete_content.go b/weed/operation/delete_content.go index 65baaddf2..8f87882b1 100644 --- a/weed/operation/delete_content.go +++ b/weed/operation/delete_content.go @@ -28,10 +28,10 @@ func ParseFileId(fid string) (vid string, key_cookie string, err error) { } // DeleteFiles batch deletes a list of fileIds -func DeleteFiles(master string, usePublicUrl bool, grpcDialOption grpc.DialOption, fileIds []string) ([]*volume_server_pb.DeleteResult, error) { +func DeleteFiles(masterFn GetMasterFn, usePublicUrl bool, grpcDialOption grpc.DialOption, fileIds []string) ([]*volume_server_pb.DeleteResult, error) { lookupFunc := func(vids []string) (results map[string]LookupResult, err error) { - results, err = LookupVolumeIds(master, grpcDialOption, vids) + results, err = LookupVolumeIds(masterFn, grpcDialOption, vids) if err == nil && usePublicUrl { for _, result := range results { for _, loc := range result.Locations { diff --git a/weed/operation/lookup.go b/weed/operation/lookup.go index d0773e7fd..0372e47b0 100644 --- a/weed/operation/lookup.go +++ b/weed/operation/lookup.go @@ -33,10 +33,10 @@ var ( vc VidCache // caching of volume locations, re-check if after 10 minutes ) -func Lookup(server string, vid string) (ret *LookupResult, err error) { +func Lookup(masterFn GetMasterFn, vid string) (ret *LookupResult, err error) { locations, cache_err := vc.Get(vid) if cache_err != nil { - if ret, err = do_lookup(server, vid); err == nil { + if ret, err = do_lookup(masterFn, vid); err == nil { vc.Set(vid, ret.Locations, 10*time.Minute) } } else { @@ -45,9 +45,10 @@ func Lookup(server string, vid string) (ret *LookupResult, err error) { return } -func do_lookup(server string, vid string) (*LookupResult, error) { +func do_lookup(masterFn GetMasterFn, vid string) (*LookupResult, error) { values := make(url.Values) values.Add("volumeId", vid) + server := masterFn() jsonBlob, err := util.Post("http://"+server+"/dir/lookup", values) if err != nil { return nil, err @@ -63,12 +64,12 @@ func do_lookup(server string, vid string) (*LookupResult, error) { return &ret, nil } -func LookupFileId(server string, fileId string) (fullUrl string, err error) { +func LookupFileId(masterFn GetMasterFn, fileId string) (fullUrl string, err error) { parts := strings.Split(fileId, ",") if len(parts) != 2 { return "", errors.New("Invalid fileId " + fileId) } - lookup, lookupError := Lookup(server, parts[0]) + lookup, lookupError := Lookup(masterFn, parts[0]) if lookupError != nil { return "", lookupError } @@ -79,7 +80,7 @@ func LookupFileId(server string, fileId string) (fullUrl string, err error) { } // LookupVolumeIds find volume locations by cache and actual lookup -func LookupVolumeIds(server string, grpcDialOption grpc.DialOption, vids []string) (map[string]LookupResult, error) { +func LookupVolumeIds(masterFn GetMasterFn, grpcDialOption grpc.DialOption, vids []string) (map[string]LookupResult, error) { ret := make(map[string]LookupResult) var unknown_vids []string @@ -99,7 +100,7 @@ func LookupVolumeIds(server string, grpcDialOption grpc.DialOption, vids []strin //only query unknown_vids - err := WithMasterServerClient(server, grpcDialOption, func(masterClient master_pb.SeaweedClient) error { + err := WithMasterServerClient(masterFn(), grpcDialOption, func(masterClient master_pb.SeaweedClient) error { req := &master_pb.LookupVolumeRequest{ VolumeIds: unknown_vids, diff --git a/weed/operation/submit.go b/weed/operation/submit.go index c34b33577..87c5e4279 100644 --- a/weed/operation/submit.go +++ b/weed/operation/submit.go @@ -39,7 +39,9 @@ type SubmitResult struct { Error string `json:"error,omitempty"` } -func SubmitFiles(master string, grpcDialOption grpc.DialOption, files []FilePart, replication string, collection string, dataCenter string, ttl string, diskType string, maxMB int, usePublicUrl bool) ([]SubmitResult, error) { +type GetMasterFn func() string + +func SubmitFiles(masterFn GetMasterFn, grpcDialOption grpc.DialOption, files []FilePart, replication string, collection string, dataCenter string, ttl string, diskType string, maxMB int, usePublicUrl bool) ([]SubmitResult, error) { results := make([]SubmitResult, len(files)) for index, file := range files { results[index].FileName = file.FileName @@ -52,7 +54,7 @@ func SubmitFiles(master string, grpcDialOption grpc.DialOption, files []FilePart Ttl: ttl, DiskType: diskType, } - ret, err := Assign(master, grpcDialOption, ar) + ret, err := Assign(masterFn, grpcDialOption, ar) if err != nil { for index := range files { results[index].Error = err.Error() @@ -73,7 +75,7 @@ func SubmitFiles(master string, grpcDialOption grpc.DialOption, files []FilePart file.DataCenter = dataCenter file.Ttl = ttl file.DiskType = diskType - results[index].Size, err = file.Upload(maxMB, master, usePublicUrl, ret.Auth, grpcDialOption) + results[index].Size, err = file.Upload(maxMB, masterFn, usePublicUrl, ret.Auth, grpcDialOption) if err != nil { results[index].Error = err.Error() } @@ -116,7 +118,7 @@ func newFilePart(fullPathFilename string) (ret FilePart, err error) { return ret, nil } -func (fi FilePart) Upload(maxMB int, master string, usePublicUrl bool, jwt security.EncodedJwt, grpcDialOption grpc.DialOption) (retSize uint32, err error) { +func (fi FilePart) Upload(maxMB int, masterFn GetMasterFn, usePublicUrl bool, jwt security.EncodedJwt, grpcDialOption grpc.DialOption) (retSize uint32, err error) { fileUrl := "http://" + fi.Server + "/" + fi.Fid if fi.ModTime != 0 { fileUrl += "?ts=" + strconv.Itoa(int(fi.ModTime)) @@ -148,7 +150,7 @@ func (fi FilePart) Upload(maxMB int, master string, usePublicUrl bool, jwt secur Ttl: fi.Ttl, DiskType: fi.DiskType, } - ret, err = Assign(master, grpcDialOption, ar) + ret, err = Assign(masterFn, grpcDialOption, ar) if err != nil { return } @@ -162,10 +164,10 @@ func (fi FilePart) Upload(maxMB int, master string, usePublicUrl bool, jwt secur Ttl: fi.Ttl, DiskType: fi.DiskType, } - ret, err = Assign(master, grpcDialOption, ar) + ret, err = Assign(masterFn, grpcDialOption, ar) if err != nil { // delete all uploaded chunks - cm.DeleteChunks(master, usePublicUrl, grpcDialOption) + cm.DeleteChunks(masterFn, usePublicUrl, grpcDialOption) return } id = ret.Fid @@ -182,11 +184,11 @@ func (fi FilePart) Upload(maxMB int, master string, usePublicUrl bool, jwt secur count, e := upload_one_chunk( baseName+"-"+strconv.FormatInt(i+1, 10), io.LimitReader(fi.Reader, chunkSize), - master, fileUrl, + masterFn, fileUrl, ret.Auth) if e != nil { // delete all uploaded chunks - cm.DeleteChunks(master, usePublicUrl, grpcDialOption) + cm.DeleteChunks(masterFn, usePublicUrl, grpcDialOption) return 0, e } cm.Chunks = append(cm.Chunks, @@ -201,7 +203,7 @@ func (fi FilePart) Upload(maxMB int, master string, usePublicUrl bool, jwt secur err = upload_chunked_file_manifest(fileUrl, &cm, jwt) if err != nil { // delete all uploaded chunks - cm.DeleteChunks(master, usePublicUrl, grpcDialOption) + cm.DeleteChunks(masterFn, usePublicUrl, grpcDialOption) } } else { ret, e, _ := Upload(fileUrl, baseName, false, fi.Reader, false, fi.MimeType, nil, jwt) @@ -213,7 +215,7 @@ func (fi FilePart) Upload(maxMB int, master string, usePublicUrl bool, jwt secur return } -func upload_one_chunk(filename string, reader io.Reader, master, +func upload_one_chunk(filename string, reader io.Reader, masterFn GetMasterFn, fileUrl string, jwt security.EncodedJwt, ) (size uint32, e error) { glog.V(4).Info("Uploading part ", filename, " to ", fileUrl, "...") diff --git a/weed/operation/tail_volume.go b/weed/operation/tail_volume.go index a15c21ae4..045948274 100644 --- a/weed/operation/tail_volume.go +++ b/weed/operation/tail_volume.go @@ -11,9 +11,9 @@ import ( "github.com/chrislusf/seaweedfs/weed/storage/needle" ) -func TailVolume(master string, grpcDialOption grpc.DialOption, vid needle.VolumeId, sinceNs uint64, timeoutSeconds int, fn func(n *needle.Needle) error) error { +func TailVolume(masterFn GetMasterFn, grpcDialOption grpc.DialOption, vid needle.VolumeId, sinceNs uint64, timeoutSeconds int, fn func(n *needle.Needle) error) error { // find volume location, replication, ttl info - lookup, err := Lookup(master, vid.String()) + lookup, err := Lookup(masterFn, vid.String()) if err != nil { return fmt.Errorf("look up volume %d: %v", vid, err) } diff --git a/weed/server/common.go b/weed/server/common.go index 7d32d6a9a..9001a3b33 100644 --- a/weed/server/common.go +++ b/weed/server/common.go @@ -100,7 +100,7 @@ func debug(params ...interface{}) { glog.V(4).Infoln(params...) } -func submitForClientHandler(w http.ResponseWriter, r *http.Request, masterUrl string, grpcDialOption grpc.DialOption) { +func submitForClientHandler(w http.ResponseWriter, r *http.Request, masterFn operation.GetMasterFn, grpcDialOption grpc.DialOption) { m := make(map[string]interface{}) if r.Method != "POST" { writeJsonError(w, r, http.StatusMethodNotAllowed, errors.New("Only submit via POST!")) @@ -133,7 +133,7 @@ func submitForClientHandler(w http.ResponseWriter, r *http.Request, masterUrl st Ttl: r.FormValue("ttl"), DiskType: r.FormValue("disk"), } - assignResult, ae := operation.Assign(masterUrl, grpcDialOption, ar) + assignResult, ae := operation.Assign(masterFn, grpcDialOption, ar) if ae != nil { writeJsonError(w, r, http.StatusInternalServerError, ae) return diff --git a/weed/server/filer_grpc_server.go b/weed/server/filer_grpc_server.go index 04145f2f9..a4bb721ef 100644 --- a/weed/server/filer_grpc_server.go +++ b/weed/server/filer_grpc_server.go @@ -337,7 +337,7 @@ func (fs *FilerServer) AssignVolume(ctx context.Context, req *filer_pb.AssignVol assignRequest, altRequest := so.ToAssignRequests(int(req.Count)) - assignResult, err := operation.Assign(fs.filer.GetMaster(), fs.grpcDialOption, assignRequest, altRequest) + assignResult, err := operation.Assign(fs.filer.GetMaster, fs.grpcDialOption, assignRequest, altRequest) if err != nil { glog.V(3).Infof("AssignVolume: %v", err) return &filer_pb.AssignVolumeResponse{Error: fmt.Sprintf("assign volume: %v", err)}, nil diff --git a/weed/server/filer_server_handlers_write.go b/weed/server/filer_server_handlers_write.go index 28dff07c6..0ce3e4a58 100644 --- a/weed/server/filer_server_handlers_write.go +++ b/weed/server/filer_server_handlers_write.go @@ -37,7 +37,7 @@ func (fs *FilerServer) assignNewFileInfo(so *operation.StorageOption) (fileId, u ar, altRequest := so.ToAssignRequests(1) - assignResult, ae := operation.Assign(fs.filer.GetMaster(), fs.grpcDialOption, ar, altRequest) + assignResult, ae := operation.Assign(fs.filer.GetMaster, fs.grpcDialOption, ar, altRequest) if ae != nil { glog.Errorf("failing to assign a file id: %v", ae) err = ae diff --git a/weed/server/master_server_handlers_admin.go b/weed/server/master_server_handlers_admin.go index e68dafc5a..4a77d24b5 100644 --- a/weed/server/master_server_handlers_admin.go +++ b/weed/server/master_server_handlers_admin.go @@ -125,13 +125,13 @@ func (ms *MasterServer) selfUrl(r *http.Request) string { } func (ms *MasterServer) submitFromMasterServerHandler(w http.ResponseWriter, r *http.Request) { if ms.Topo.IsLeader() { - submitForClientHandler(w, r, ms.selfUrl(r), ms.grpcDialOption) + submitForClientHandler(w, r, func()string{return ms.selfUrl(r)}, ms.grpcDialOption) } else { masterUrl, err := ms.Topo.Leader() if err != nil { writeJsonError(w, r, http.StatusInternalServerError, err) } else { - submitForClientHandler(w, r, masterUrl, ms.grpcDialOption) + submitForClientHandler(w, r, func()string{return masterUrl}, ms.grpcDialOption) } } } diff --git a/weed/server/volume_server_handlers_read.go b/weed/server/volume_server_handlers_read.go index 1cd4ee21d..2a82fc1b1 100644 --- a/weed/server/volume_server_handlers_read.go +++ b/weed/server/volume_server_handlers_read.go @@ -63,7 +63,7 @@ func (vs *VolumeServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request) w.WriteHeader(http.StatusNotFound) return } - lookupResult, err := operation.Lookup(vs.GetMaster(), volumeId.String()) + lookupResult, err := operation.Lookup(vs.GetMaster, volumeId.String()) glog.V(2).Infoln("volume", volumeId, "found on", lookupResult, "error", err) if err == nil && len(lookupResult.Locations) > 0 { u, _ := url.Parse(util.NormalizeUrl(lookupResult.Locations[0].PublicUrl)) diff --git a/weed/server/volume_server_handlers_write.go b/weed/server/volume_server_handlers_write.go index 01a77b901..602b147e1 100644 --- a/weed/server/volume_server_handlers_write.go +++ b/weed/server/volume_server_handlers_write.go @@ -50,7 +50,7 @@ func (vs *VolumeServer) PostHandler(w http.ResponseWriter, r *http.Request) { } ret := operation.UploadResult{} - isUnchanged, writeError := topology.ReplicatedWrite(vs.GetMaster(), vs.store, volumeId, reqNeedle, r) + isUnchanged, writeError := topology.ReplicatedWrite(vs.GetMaster, vs.store, volumeId, reqNeedle, r) // http 204 status code does not allow body if writeError == nil && isUnchanged { @@ -128,7 +128,7 @@ func (vs *VolumeServer) DeleteHandler(w http.ResponseWriter, r *http.Request) { return } // make sure all chunks had deleted before delete manifest - if e := chunkManifest.DeleteChunks(vs.GetMaster(), false, vs.grpcDialOption); e != nil { + if e := chunkManifest.DeleteChunks(vs.GetMaster, false, vs.grpcDialOption); e != nil { writeJsonError(w, r, http.StatusInternalServerError, fmt.Errorf("Delete chunks error: %v", e)) return } @@ -143,7 +143,7 @@ func (vs *VolumeServer) DeleteHandler(w http.ResponseWriter, r *http.Request) { } } - _, err := topology.ReplicatedDelete(vs.GetMaster(), vs.store, volumeId, n, r) + _, err := topology.ReplicatedDelete(vs.GetMaster, vs.store, volumeId, n, r) writeDeleteResult(err, count, w, r) diff --git a/weed/topology/store_replicate.go b/weed/topology/store_replicate.go index 6b4076913..ea0a8c968 100644 --- a/weed/topology/store_replicate.go +++ b/weed/topology/store_replicate.go @@ -18,7 +18,7 @@ import ( "github.com/chrislusf/seaweedfs/weed/util" ) -func ReplicatedWrite(masterNode string, s *storage.Store, volumeId needle.VolumeId, n *needle.Needle, r *http.Request) (isUnchanged bool, err error) { +func ReplicatedWrite(masterFn operation.GetMasterFn, s *storage.Store, volumeId needle.VolumeId, n *needle.Needle, r *http.Request) (isUnchanged bool, err error) { //check JWT jwt := security.GetJwt(r) @@ -27,7 +27,7 @@ func ReplicatedWrite(masterNode string, s *storage.Store, volumeId needle.Volume var remoteLocations []operation.Location if r.FormValue("type") != "replicate" { // this is the initial request - remoteLocations, err = getWritableRemoteReplications(s, volumeId, masterNode) + remoteLocations, err = getWritableRemoteReplications(s, volumeId, masterFn) if err != nil { glog.V(0).Infoln(err) return @@ -92,7 +92,7 @@ func ReplicatedWrite(masterNode string, s *storage.Store, volumeId needle.Volume return } -func ReplicatedDelete(masterNode string, store *storage.Store, +func ReplicatedDelete(masterFn operation.GetMasterFn, store *storage.Store, volumeId needle.VolumeId, n *needle.Needle, r *http.Request) (size types.Size, err error) { @@ -101,7 +101,7 @@ func ReplicatedDelete(masterNode string, store *storage.Store, var remoteLocations []operation.Location if r.FormValue("type") != "replicate" { - remoteLocations, err = getWritableRemoteReplications(store, volumeId, masterNode) + remoteLocations, err = getWritableRemoteReplications(store, volumeId, masterFn) if err != nil { glog.V(0).Infoln(err) return @@ -161,7 +161,7 @@ func distributedOperation(locations []operation.Location, store *storage.Store, return ret.Error() } -func getWritableRemoteReplications(s *storage.Store, volumeId needle.VolumeId, masterNode string) ( +func getWritableRemoteReplications(s *storage.Store, volumeId needle.VolumeId, masterFn operation.GetMasterFn) ( remoteLocations []operation.Location, err error) { v := s.GetVolume(volumeId) @@ -170,7 +170,7 @@ func getWritableRemoteReplications(s *storage.Store, volumeId needle.VolumeId, m } // not on local store, or has replications - lookupResult, lookupErr := operation.Lookup(masterNode, volumeId.String()) + lookupResult, lookupErr := operation.Lookup(masterFn, volumeId.String()) if lookupErr == nil { selfUrl := s.Ip + ":" + strconv.Itoa(s.Port) for _, location := range lookupResult.Locations {