2020-09-10 02:21:23 +08:00
package command
import (
"context"
"errors"
"fmt"
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/pb"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
"github.com/chrislusf/seaweedfs/weed/replication"
2021-03-01 08:22:27 +08:00
"github.com/chrislusf/seaweedfs/weed/replication/sink"
2020-09-10 02:21:23 +08:00
"github.com/chrislusf/seaweedfs/weed/replication/sink/filersink"
"github.com/chrislusf/seaweedfs/weed/replication/source"
"github.com/chrislusf/seaweedfs/weed/security"
"github.com/chrislusf/seaweedfs/weed/util"
2020-09-10 02:33:52 +08:00
"github.com/chrislusf/seaweedfs/weed/util/grace"
2020-09-10 02:21:23 +08:00
"google.golang.org/grpc"
"strings"
"time"
)
type SyncOptions struct {
isActivePassive * bool
filerA * string
filerB * string
aPath * string
bPath * string
aReplication * string
bReplication * string
aCollection * string
bCollection * string
aTtlSec * int
bTtlSec * int
2020-12-17 01:14:05 +08:00
aDiskType * string
bDiskType * string
2020-09-10 02:21:23 +08:00
aDebug * bool
bDebug * bool
2021-01-29 07:23:46 +08:00
aProxyByFiler * bool
bProxyByFiler * bool
2021-12-30 16:23:57 +08:00
clientId int32
2020-09-10 02:21:23 +08:00
}
var (
2020-09-12 19:08:03 +08:00
syncOptions SyncOptions
2020-09-10 02:33:52 +08:00
syncCpuProfile * string
syncMemProfile * string
2020-09-10 02:21:23 +08:00
)
func init ( ) {
cmdFilerSynchronize . Run = runFilerSynchronize // break init cycle
2021-01-24 16:01:44 +08:00
syncOptions . isActivePassive = cmdFilerSynchronize . Flag . Bool ( "isActivePassive" , false , "one directional follow from A to B if true" )
2020-09-10 02:21:23 +08:00
syncOptions . filerA = cmdFilerSynchronize . Flag . String ( "a" , "" , "filer A in one SeaweedFS cluster" )
syncOptions . filerB = cmdFilerSynchronize . Flag . String ( "b" , "" , "filer B in the other SeaweedFS cluster" )
syncOptions . aPath = cmdFilerSynchronize . Flag . String ( "a.path" , "/" , "directory to sync on filer A" )
syncOptions . bPath = cmdFilerSynchronize . Flag . String ( "b.path" , "/" , "directory to sync on filer B" )
syncOptions . aReplication = cmdFilerSynchronize . Flag . String ( "a.replication" , "" , "replication on filer A" )
syncOptions . bReplication = cmdFilerSynchronize . Flag . String ( "b.replication" , "" , "replication on filer B" )
syncOptions . aCollection = cmdFilerSynchronize . Flag . String ( "a.collection" , "" , "collection on filer A" )
syncOptions . bCollection = cmdFilerSynchronize . Flag . String ( "b.collection" , "" , "collection on filer B" )
syncOptions . aTtlSec = cmdFilerSynchronize . Flag . Int ( "a.ttlSec" , 0 , "ttl in seconds on filer A" )
syncOptions . bTtlSec = cmdFilerSynchronize . Flag . Int ( "b.ttlSec" , 0 , "ttl in seconds on filer B" )
2021-02-22 18:03:12 +08:00
syncOptions . aDiskType = cmdFilerSynchronize . Flag . String ( "a.disk" , "" , "[hdd|ssd|<tag>] hard drive or solid state drive or any tag on filer A" )
syncOptions . bDiskType = cmdFilerSynchronize . Flag . String ( "b.disk" , "" , "[hdd|ssd|<tag>] hard drive or solid state drive or any tag on filer B" )
2021-01-26 02:14:42 +08:00
syncOptions . aProxyByFiler = cmdFilerSynchronize . Flag . Bool ( "a.filerProxy" , false , "read and write file chunks by filer A instead of volume servers" )
syncOptions . bProxyByFiler = cmdFilerSynchronize . Flag . Bool ( "b.filerProxy" , false , "read and write file chunks by filer B instead of volume servers" )
2020-09-10 02:21:23 +08:00
syncOptions . aDebug = cmdFilerSynchronize . Flag . Bool ( "a.debug" , false , "debug mode to print out filer A received files" )
syncOptions . bDebug = cmdFilerSynchronize . Flag . Bool ( "b.debug" , false , "debug mode to print out filer B received files" )
2020-09-10 02:33:52 +08:00
syncCpuProfile = cmdFilerSynchronize . Flag . String ( "cpuprofile" , "" , "cpu profile output file" )
syncMemProfile = cmdFilerSynchronize . Flag . String ( "memprofile" , "" , "memory profile output file" )
2021-12-30 16:23:57 +08:00
syncOptions . clientId = util . RandomInt32 ( )
2020-09-10 02:21:23 +08:00
}
var cmdFilerSynchronize = & Command {
UsageLine : "filer.sync -a=<oneFilerHost>:<oneFilerPort> -b=<otherFilerHost>:<otherFilerPort>" ,
2021-08-09 13:30:12 +08:00
Short : "resumable continuous synchronization between two active-active or active-passive SeaweedFS clusters" ,
Long : ` resumable continuous synchronization for file changes between two active - active or active - passive filers
2020-09-10 02:21:23 +08:00
filer . sync listens on filer notifications . If any file is updated , it will fetch the updated content ,
and write to the other destination . Different from filer . replicate :
* filer . sync only works between two filers .
* filer . sync does not need any special message queue setup .
* filer . sync supports both active - active and active - passive modes .
If restarted , the synchronization will resume from the previous checkpoints , persisted every minute .
2020-09-15 15:40:38 +08:00
A fresh sync will start from the earliest metadata logs .
2020-09-10 02:21:23 +08:00
` ,
}
func runFilerSynchronize ( cmd * Command , args [ ] string ) bool {
2021-08-03 16:30:35 +08:00
util . LoadConfiguration ( "security" , false )
2020-09-10 02:21:23 +08:00
grpcDialOption := security . LoadClientTLS ( util . GetViper ( ) , "grpc.client" )
2020-09-10 02:33:52 +08:00
grace . SetupProfiling ( * syncCpuProfile , * syncMemProfile )
2021-09-13 13:47:52 +08:00
filerA := pb . ServerAddress ( * syncOptions . filerA )
filerB := pb . ServerAddress ( * syncOptions . filerB )
2020-09-10 02:21:23 +08:00
go func ( ) {
for {
2021-12-30 16:23:57 +08:00
err := doSubscribeFilerMetaChanges ( syncOptions . clientId , grpcDialOption , filerA , * syncOptions . aPath , * syncOptions . aProxyByFiler , filerB ,
2021-02-10 03:37:07 +08:00
* syncOptions . bPath , * syncOptions . bReplication , * syncOptions . bCollection , * syncOptions . bTtlSec , * syncOptions . bProxyByFiler , * syncOptions . bDiskType , * syncOptions . bDebug )
2020-09-10 02:21:23 +08:00
if err != nil {
glog . Errorf ( "sync from %s to %s: %v" , * syncOptions . filerA , * syncOptions . filerB , err )
time . Sleep ( 1747 * time . Millisecond )
}
}
} ( )
if ! * syncOptions . isActivePassive {
go func ( ) {
for {
2021-12-30 16:23:57 +08:00
err := doSubscribeFilerMetaChanges ( syncOptions . clientId , grpcDialOption , filerB , * syncOptions . bPath , * syncOptions . bProxyByFiler , filerA ,
2021-02-10 03:37:07 +08:00
* syncOptions . aPath , * syncOptions . aReplication , * syncOptions . aCollection , * syncOptions . aTtlSec , * syncOptions . aProxyByFiler , * syncOptions . aDiskType , * syncOptions . aDebug )
2020-09-10 02:21:23 +08:00
if err != nil {
glog . Errorf ( "sync from %s to %s: %v" , * syncOptions . filerB , * syncOptions . filerA , err )
time . Sleep ( 2147 * time . Millisecond )
}
}
} ( )
}
select { }
return true
}
2021-12-30 16:23:57 +08:00
func doSubscribeFilerMetaChanges ( clientId int32 , grpcDialOption grpc . DialOption , sourceFiler pb . ServerAddress , sourcePath string , sourceReadChunkFromFiler bool , targetFiler pb . ServerAddress , targetPath string ,
2021-02-10 03:37:07 +08:00
replicationStr , collection string , ttlSec int , sinkWriteChunkByFiler bool , diskType string , debug bool ) error {
2020-09-10 02:21:23 +08:00
// read source filer signature
sourceFilerSignature , sourceErr := replication . ReadFilerSignature ( grpcDialOption , sourceFiler )
if sourceErr != nil {
return sourceErr
}
// read target filer signature
targetFilerSignature , targetErr := replication . ReadFilerSignature ( grpcDialOption , targetFiler )
if targetErr != nil {
return targetErr
}
// if first time, start from now
// if has previously synced, resume from that point of time
2021-03-01 08:22:27 +08:00
sourceFilerOffsetTsNs , err := getOffset ( grpcDialOption , targetFiler , SyncKeyPrefix , sourceFilerSignature )
2020-09-10 02:21:23 +08:00
if err != nil {
return err
}
glog . V ( 0 ) . Infof ( "start sync %s(%d) => %s(%d) from %v(%d)" , sourceFiler , sourceFilerSignature , targetFiler , targetFilerSignature , time . Unix ( 0 , sourceFilerOffsetTsNs ) , sourceFilerOffsetTsNs )
// create filer sink
filerSource := & source . FilerSource { }
2021-09-13 13:47:52 +08:00
filerSource . DoInitialize ( sourceFiler . ToHttpAddress ( ) , sourceFiler . ToGrpcAddress ( ) , sourcePath , sourceReadChunkFromFiler )
2020-09-10 02:21:23 +08:00
filerSink := & filersink . FilerSink { }
2021-09-13 13:47:52 +08:00
filerSink . DoInitialize ( targetFiler . ToHttpAddress ( ) , targetFiler . ToGrpcAddress ( ) , targetPath , replicationStr , collection , ttlSec , diskType , grpcDialOption , sinkWriteChunkByFiler )
2020-09-10 02:21:23 +08:00
filerSink . SetSourceFiler ( filerSource )
2021-03-01 08:22:27 +08:00
persistEventFn := genProcessFunction ( sourcePath , targetPath , filerSink , debug )
2020-09-10 02:21:23 +08:00
processEventFn := func ( resp * filer_pb . SubscribeMetadataResponse ) error {
message := resp . EventNotification
for _ , sig := range message . Signatures {
if sig == targetFilerSignature && targetFilerSignature != 0 {
fmt . Printf ( "%s skipping %s change to %v\n" , targetFiler , sourceFiler , message )
return nil
}
}
2021-03-01 08:22:27 +08:00
return persistEventFn ( resp )
2020-09-10 02:21:23 +08:00
}
2022-01-10 17:00:11 +08:00
var lastLogTsNs = time . Now ( ) . Nanosecond ( )
2021-08-09 13:30:36 +08:00
processEventFnWithOffset := pb . AddOffsetFunc ( processEventFn , 3 * time . Second , func ( counter int64 , lastTsNs int64 ) error {
2022-01-10 17:00:11 +08:00
now := time . Now ( ) . Nanosecond ( )
glog . V ( 0 ) . Infof ( "sync %s to %s progressed to %v %0.2f/sec" , sourceFiler , targetFiler , time . Unix ( 0 , lastTsNs ) , float64 ( counter ) / ( float64 ( now - lastLogTsNs ) / 1e9 ) )
lastLogTsNs = now
2021-08-05 07:25:46 +08:00
return setOffset ( grpcDialOption , targetFiler , SyncKeyPrefix , sourceFilerSignature , lastTsNs )
2020-09-10 02:21:23 +08:00
} )
2021-12-30 16:23:57 +08:00
return pb . FollowMetadata ( sourceFiler , grpcDialOption , "syncTo_" + string ( targetFiler ) , clientId ,
2021-09-01 14:30:28 +08:00
sourcePath , nil , sourceFilerOffsetTsNs , targetFilerSignature , processEventFnWithOffset , false )
2021-08-05 07:25:46 +08:00
2020-09-10 02:21:23 +08:00
}
const (
SyncKeyPrefix = "sync."
)
2021-09-13 13:47:52 +08:00
func getOffset ( grpcDialOption grpc . DialOption , filer pb . ServerAddress , signaturePrefix string , signature int32 ) ( lastOffsetTsNs int64 , readErr error ) {
2020-09-10 02:21:23 +08:00
2021-12-26 16:15:03 +08:00
readErr = pb . WithFilerClient ( false , filer , grpcDialOption , func ( client filer_pb . SeaweedFilerClient ) error {
2021-03-01 08:22:27 +08:00
syncKey := [ ] byte ( signaturePrefix + "____" )
util . Uint32toBytes ( syncKey [ len ( signaturePrefix ) : len ( signaturePrefix ) + 4 ] , uint32 ( signature ) )
2020-09-10 02:21:23 +08:00
resp , err := client . KvGet ( context . Background ( ) , & filer_pb . KvGetRequest { Key : syncKey } )
if err != nil {
return err
}
if len ( resp . Error ) != 0 {
return errors . New ( resp . Error )
}
if len ( resp . Value ) < 8 {
return nil
}
lastOffsetTsNs = int64 ( util . BytesToUint64 ( resp . Value ) )
return nil
} )
return
}
2021-09-13 13:47:52 +08:00
func setOffset ( grpcDialOption grpc . DialOption , filer pb . ServerAddress , signaturePrefix string , signature int32 , offsetTsNs int64 ) error {
2021-12-26 16:15:03 +08:00
return pb . WithFilerClient ( false , filer , grpcDialOption , func ( client filer_pb . SeaweedFilerClient ) error {
2020-09-10 02:21:23 +08:00
2021-03-01 08:22:27 +08:00
syncKey := [ ] byte ( signaturePrefix + "____" )
util . Uint32toBytes ( syncKey [ len ( signaturePrefix ) : len ( signaturePrefix ) + 4 ] , uint32 ( signature ) )
2020-09-10 02:21:23 +08:00
valueBuf := make ( [ ] byte , 8 )
util . Uint64toBytes ( valueBuf , uint64 ( offsetTsNs ) )
resp , err := client . KvPut ( context . Background ( ) , & filer_pb . KvPutRequest {
Key : syncKey ,
Value : valueBuf ,
} )
if err != nil {
return err
}
if len ( resp . Error ) != 0 {
return errors . New ( resp . Error )
}
return nil
} )
}
2021-03-01 08:22:27 +08:00
func genProcessFunction ( sourcePath string , targetPath string , dataSink sink . ReplicationSink , debug bool ) func ( resp * filer_pb . SubscribeMetadataResponse ) error {
// process function
processEventFn := func ( resp * filer_pb . SubscribeMetadataResponse ) error {
message := resp . EventNotification
var sourceOldKey , sourceNewKey util . FullPath
if message . OldEntry != nil {
sourceOldKey = util . FullPath ( resp . Directory ) . Child ( message . OldEntry . Name )
}
if message . NewEntry != nil {
sourceNewKey = util . FullPath ( message . NewParentPath ) . Child ( message . NewEntry . Name )
}
if debug {
glog . V ( 0 ) . Infof ( "received %v" , resp )
}
if ! strings . HasPrefix ( resp . Directory , sourcePath ) {
return nil
}
// handle deletions
if message . OldEntry != nil && message . NewEntry == nil {
if ! strings . HasPrefix ( string ( sourceOldKey ) , sourcePath ) {
return nil
}
key := buildKey ( dataSink , message , targetPath , sourceOldKey , sourcePath )
return dataSink . DeleteEntry ( key , message . OldEntry . IsDirectory , message . DeleteChunks , message . Signatures )
}
// handle new entries
if message . OldEntry == nil && message . NewEntry != nil {
if ! strings . HasPrefix ( string ( sourceNewKey ) , sourcePath ) {
return nil
}
key := buildKey ( dataSink , message , targetPath , sourceNewKey , sourcePath )
return dataSink . CreateEntry ( key , message . NewEntry , message . Signatures )
}
// this is something special?
if message . OldEntry == nil && message . NewEntry == nil {
return nil
}
// handle updates
if strings . HasPrefix ( string ( sourceOldKey ) , sourcePath ) {
// old key is in the watched directory
if strings . HasPrefix ( string ( sourceNewKey ) , sourcePath ) {
// new key is also in the watched directory
if ! dataSink . IsIncremental ( ) {
oldKey := util . Join ( targetPath , string ( sourceOldKey ) [ len ( sourcePath ) : ] )
message . NewParentPath = util . Join ( targetPath , message . NewParentPath [ len ( sourcePath ) : ] )
foundExisting , err := dataSink . UpdateEntry ( string ( oldKey ) , message . OldEntry , message . NewParentPath , message . NewEntry , message . DeleteChunks , message . Signatures )
if foundExisting {
return err
}
// not able to find old entry
if err = dataSink . DeleteEntry ( string ( oldKey ) , message . OldEntry . IsDirectory , false , message . Signatures ) ; err != nil {
return fmt . Errorf ( "delete old entry %v: %v" , oldKey , err )
}
}
// create the new entry
newKey := buildKey ( dataSink , message , targetPath , sourceNewKey , sourcePath )
return dataSink . CreateEntry ( newKey , message . NewEntry , message . Signatures )
} else {
// new key is outside of the watched directory
if ! dataSink . IsIncremental ( ) {
key := buildKey ( dataSink , message , targetPath , sourceOldKey , sourcePath )
return dataSink . DeleteEntry ( key , message . OldEntry . IsDirectory , message . DeleteChunks , message . Signatures )
}
}
} else {
// old key is outside of the watched directory
if strings . HasPrefix ( string ( sourceNewKey ) , sourcePath ) {
// new key is in the watched directory
key := buildKey ( dataSink , message , targetPath , sourceNewKey , sourcePath )
return dataSink . CreateEntry ( key , message . NewEntry , message . Signatures )
} else {
// new key is also outside of the watched directory
// skip
}
}
return nil
}
return processEventFn
}
2021-05-29 21:45:23 +08:00
func buildKey ( dataSink sink . ReplicationSink , message * filer_pb . EventNotification , targetPath string , sourceKey util . FullPath , sourcePath string ) ( key string ) {
2021-03-01 08:22:27 +08:00
if ! dataSink . IsIncremental ( ) {
2021-05-29 21:45:23 +08:00
key = util . Join ( targetPath , string ( sourceKey ) [ len ( sourcePath ) : ] )
} else {
var mTime int64
if message . NewEntry != nil {
mTime = message . NewEntry . Attributes . Mtime
} else if message . OldEntry != nil {
mTime = message . OldEntry . Attributes . Mtime
}
dateKey := time . Unix ( mTime , 0 ) . Format ( "2006-01-02" )
key = util . Join ( targetPath , dateKey , string ( sourceKey ) [ len ( sourcePath ) : ] )
2021-03-01 08:22:27 +08:00
}
2021-05-29 21:45:23 +08:00
return escapeKey ( key )
2021-03-01 08:22:27 +08:00
}