From bdefdee4e622058d9c2ebe9d2b495ff0625a420f Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Sat, 4 Sep 2021 22:46:28 -0700 Subject: [PATCH] filer.remote.sync: add option to add randomized suffix to buckets to avoid conflicts --- weed/command/filer_remote_sync.go | 16 +++++---- weed/command/filer_remote_sync_buckets.go | 42 +++++++++++++++-------- 2 files changed, 37 insertions(+), 21 deletions(-) diff --git a/weed/command/filer_remote_sync.go b/weed/command/filer_remote_sync.go index ef023b9cb..6ca8477ce 100644 --- a/weed/command/filer_remote_sync.go +++ b/weed/command/filer_remote_sync.go @@ -15,13 +15,14 @@ import ( ) type RemoteSyncOptions struct { - filerAddress *string - grpcDialOption grpc.DialOption - readChunkFromFiler *bool - debug *bool - timeAgo *time.Duration - dir *string - createBucketAt *string + filerAddress *string + grpcDialOption grpc.DialOption + readChunkFromFiler *bool + debug *bool + timeAgo *time.Duration + dir *string + createBucketAt *string + createBucketRandomSuffix *bool mappings *remote_pb.RemoteStorageMapping remoteConfs map[string]*remote_pb.RemoteConf @@ -52,6 +53,7 @@ func init() { remoteSyncOptions.filerAddress = cmdFilerRemoteSynchronize.Flag.String("filer", "localhost:8888", "filer of the SeaweedFS cluster") remoteSyncOptions.dir = cmdFilerRemoteSynchronize.Flag.String("dir", "/", "a mounted directory on filer") remoteSyncOptions.createBucketAt = cmdFilerRemoteSynchronize.Flag.String("createBucketAt", "", "one remote storage name to create new buckets in") + remoteSyncOptions.createBucketRandomSuffix = cmdFilerRemoteSynchronize.Flag.Bool("createBucketWithRandomSuffix", false, "add randomized suffix to bucket name to avoid conflicts") remoteSyncOptions.readChunkFromFiler = cmdFilerRemoteSynchronize.Flag.Bool("filerProxy", false, "read file chunks from filer instead of volume servers") remoteSyncOptions.debug = cmdFilerRemoteSynchronize.Flag.Bool("debug", false, "debug mode to print out filer updated remote files") remoteSyncOptions.timeAgo = cmdFilerRemoteSynchronize.Flag.Duration("timeAgo", 0, "start time before now. \"300ms\", \"1.5h\" or \"2h45m\". Valid time units are \"ns\", \"us\" (or \"µs\"), \"ms\", \"s\", \"m\", \"h\"") diff --git a/weed/command/filer_remote_sync_buckets.go b/weed/command/filer_remote_sync_buckets.go index cbe112c22..4059fd228 100644 --- a/weed/command/filer_remote_sync_buckets.go +++ b/weed/command/filer_remote_sync_buckets.go @@ -12,6 +12,7 @@ import ( "github.com/chrislusf/seaweedfs/weed/util" "github.com/golang/protobuf/proto" "math" + "math/rand" "strings" "time" ) @@ -56,18 +57,30 @@ func (option *RemoteSyncOptions) makeBucketedEventProcessor(filerSource *source. return err } - glog.V(0).Infof("create bucket %s", entry.Name) - if err := client.CreateBucket(entry.Name); err != nil { + bucketName := strings.ToLower(entry.Name) + if *option.createBucketRandomSuffix { + // https://docs.aws.amazon.com/AmazonS3/latest/userguide/bucketnamingrules.html + if len(bucketName)+5 > 63 { + bucketName = bucketName[:58] + } + bucketName = fmt.Sprintf("%s-%4d", bucketName, rand.Uint32()%10000) + } + + glog.V(0).Infof("create bucket %s", bucketName) + if err := client.CreateBucket(bucketName); err != nil { return err } bucketPath := util.FullPath(option.bucketsDir).Child(entry.Name) remoteLocation := &remote_pb.RemoteStorageLocation{ Name: *option.createBucketAt, - Bucket: entry.Name, + Bucket: bucketName, Path: "/", } + // need to add new mapping here before getting upates from metadata tailing + option.mappings.Mappings[string(bucketPath)] = remoteLocation + return filer.InsertMountMapping(option, string(bucketPath), remoteLocation) } @@ -76,13 +89,13 @@ func (option *RemoteSyncOptions) makeBucketedEventProcessor(filerSource *source. return nil } - client, err := option.findRemoteStorageClient(entry.Name) + client, remoteStorageMountLocation, err := option.findRemoteStorageClient(entry.Name) if err != nil { return err } - glog.V(0).Infof("delete bucket %s", entry.Name) - if err := client.DeleteBucket(entry.Name); err != nil { + glog.V(0).Infof("delete remote bucket %s", remoteStorageMountLocation.Bucket) + if err := client.DeleteBucket(remoteStorageMountLocation.Bucket); err != nil { return err } @@ -277,23 +290,24 @@ func (option *RemoteSyncOptions) makeBucketedEventProcessor(filerSource *source. return eachEntryFunc, nil } -func (option *RemoteSyncOptions)findRemoteStorageClient(bucketName string) (remote_storage.RemoteStorageClient, error) { +func (option *RemoteSyncOptions) findRemoteStorageClient(bucketName string) (client remote_storage.RemoteStorageClient, remoteStorageMountLocation *remote_pb.RemoteStorageLocation, err error) { bucket := util.FullPath(option.bucketsDir).Child(bucketName) - remoteStorageMountLocation, isMounted := option.mappings.Mappings[string(bucket)] + var isMounted bool + remoteStorageMountLocation, isMounted = option.mappings.Mappings[string(bucket)] if !isMounted { - return nil, fmt.Errorf("%s is not mounted", bucket) + return nil, remoteStorageMountLocation, fmt.Errorf("%s is not mounted", bucket) } remoteConf, hasClient := option.remoteConfs[remoteStorageMountLocation.Name] if !hasClient { - return nil, fmt.Errorf("%s mounted to un-configured %+v", bucket, remoteStorageMountLocation) + return nil, remoteStorageMountLocation, fmt.Errorf("%s mounted to un-configured %+v", bucket, remoteStorageMountLocation) } - client, err := remote_storage.GetRemoteStorage(remoteConf) + client, err = remote_storage.GetRemoteStorage(remoteConf) if err != nil { - return nil, err + return nil, remoteStorageMountLocation, err } - return client, nil + return client, remoteStorageMountLocation, nil } func (option *RemoteSyncOptions) detectBucketInfo(actualDir string) (bucket util.FullPath, remoteStorageMountLocation *remote_pb.RemoteStorageLocation, remoteConf *remote_pb.RemoteConf, ok bool) { @@ -346,4 +360,4 @@ func (option *RemoteSyncOptions) collectRemoteStorageConf() (err error) { }, "", false, math.MaxUint32) return -} \ No newline at end of file +}