seaweedfs/weed/command/filer_backup.go
Max Denushev a5fe6e21bc
feat(filer.backup): add ignore errors option (#6235)
* feat(filer.backup): add ignore errors option

* feat(filer.backup): fix 404 error wrap

* feat(filer.backup): fix wrapping function

* feat(filer.backup): fix wrapping errors in genProcessFunction

* Update weed/command/filer_backup.go

* Update weed/command/filer_backup.go

* Update weed/command/filer_backup.go

---------

Co-authored-by: Max Denushev <denushev@tochka.com>
Co-authored-by: Chris Lu <chrislusf@users.noreply.github.com>
2024-11-14 08:40:55 -08:00

201 lines
7.5 KiB
Go

package command
import (
"errors"
"fmt"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/pb"
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
"github.com/seaweedfs/seaweedfs/weed/replication/source"
"github.com/seaweedfs/seaweedfs/weed/security"
"github.com/seaweedfs/seaweedfs/weed/util"
"github.com/seaweedfs/seaweedfs/weed/util/http"
"google.golang.org/grpc"
"regexp"
"strings"
"time"
)
type FilerBackupOptions struct {
isActivePassive *bool
filer *string
path *string
excludePaths *string
excludeFileName *string
debug *bool
proxyByFiler *bool
doDeleteFiles *bool
disableErrorRetry *bool
ignore404Error *bool
timeAgo *time.Duration
retentionDays *int
}
var (
filerBackupOptions FilerBackupOptions
)
func init() {
cmdFilerBackup.Run = runFilerBackup // break init cycle
filerBackupOptions.filer = cmdFilerBackup.Flag.String("filer", "localhost:8888", "filer of one SeaweedFS cluster")
filerBackupOptions.path = cmdFilerBackup.Flag.String("filerPath", "/", "directory to sync on filer")
filerBackupOptions.excludePaths = cmdFilerBackup.Flag.String("filerExcludePaths", "", "exclude directories to sync on filer")
filerBackupOptions.excludeFileName = cmdFilerBackup.Flag.String("filerExcludeFileName", "", "exclude file names that match the regexp to sync on filer")
filerBackupOptions.proxyByFiler = cmdFilerBackup.Flag.Bool("filerProxy", false, "read and write file chunks by filer instead of volume servers")
filerBackupOptions.doDeleteFiles = cmdFilerBackup.Flag.Bool("doDeleteFiles", false, "delete files on the destination")
filerBackupOptions.debug = cmdFilerBackup.Flag.Bool("debug", false, "debug mode to print out received files")
filerBackupOptions.timeAgo = cmdFilerBackup.Flag.Duration("timeAgo", 0, "start time before now. \"300ms\", \"1.5h\" or \"2h45m\". Valid time units are \"ns\", \"us\" (or \"µs\"), \"ms\", \"s\", \"m\", \"h\"")
filerBackupOptions.retentionDays = cmdFilerBackup.Flag.Int("retentionDays", 0, "incremental backup retention days")
filerBackupOptions.disableErrorRetry = cmdFilerBackup.Flag.Bool("disableErrorRetry", false, "disables errors retry, only logs will print")
filerBackupOptions.ignore404Error = cmdFilerBackup.Flag.Bool("ignore404Error", true, "ignore 404 errors from filer")
}
var cmdFilerBackup = &Command{
UsageLine: "filer.backup -filer=<filerHost>:<filerPort> ",
Short: "resume-able continuously replicate files from a SeaweedFS cluster to another location defined in replication.toml",
Long: `resume-able continuously replicate files from a SeaweedFS cluster to another location defined in replication.toml
filer.backup listens on filer notifications. If any file is updated, it will fetch the updated content,
and write to the destination. This is to replace filer.replicate command since additional message queue is not needed.
If restarted and "-timeAgo" is not set, the synchronization will resume from the previous checkpoints, persisted every minute.
A fresh sync will start from the earliest metadata logs. To reset the checkpoints, just set "-timeAgo" to a high value.
`,
}
func runFilerBackup(cmd *Command, args []string) bool {
util.LoadSecurityConfiguration()
util.LoadConfiguration("replication", true)
grpcDialOption := security.LoadClientTLS(util.GetViper(), "grpc.client")
clientId := util.RandomInt32()
var clientEpoch int32
for {
clientEpoch++
err := doFilerBackup(grpcDialOption, &filerBackupOptions, clientId, clientEpoch)
if err != nil {
glog.Errorf("backup from %s: %v", *filerBackupOptions.filer, err)
time.Sleep(1747 * time.Millisecond)
}
}
return true
}
const (
BackupKeyPrefix = "backup."
)
func doFilerBackup(grpcDialOption grpc.DialOption, backupOption *FilerBackupOptions, clientId int32, clientEpoch int32) error {
// find data sink
dataSink := findSink(util.GetViper())
if dataSink == nil {
return fmt.Errorf("no data sink configured in replication.toml")
}
sourceFiler := pb.ServerAddress(*backupOption.filer)
sourcePath := *backupOption.path
excludePaths := util.StringSplit(*backupOption.excludePaths, ",")
var reExcludeFileName *regexp.Regexp
if *backupOption.excludeFileName != "" {
var err error
if reExcludeFileName, err = regexp.Compile(*backupOption.excludeFileName); err != nil {
return fmt.Errorf("error compile regexp %v for exclude file name: %+v", *backupOption.excludeFileName, err)
}
}
timeAgo := *backupOption.timeAgo
targetPath := dataSink.GetSinkToDirectory()
debug := *backupOption.debug
// get start time for the data sink
startFrom := time.Unix(0, 0)
sinkId := util.HashStringToLong(dataSink.GetName() + dataSink.GetSinkToDirectory())
if timeAgo.Milliseconds() == 0 {
lastOffsetTsNs, err := getOffset(grpcDialOption, sourceFiler, BackupKeyPrefix, int32(sinkId))
if err != nil {
glog.V(0).Infof("starting from %v", startFrom)
} else {
startFrom = time.Unix(0, lastOffsetTsNs)
glog.V(0).Infof("resuming from %v", startFrom)
}
} else {
startFrom = time.Now().Add(-timeAgo)
glog.V(0).Infof("start time is set to %v", startFrom)
}
// create filer sink
filerSource := &source.FilerSource{}
filerSource.DoInitialize(
sourceFiler.ToHttpAddress(),
sourceFiler.ToGrpcAddress(),
sourcePath,
*backupOption.proxyByFiler)
dataSink.SetSourceFiler(filerSource)
var processEventFn func(*filer_pb.SubscribeMetadataResponse) error
if *backupOption.ignore404Error {
processEventFnGenerated := genProcessFunction(sourcePath, targetPath, excludePaths, reExcludeFileName, dataSink, *backupOption.doDeleteFiles, debug)
processEventFn = func(resp *filer_pb.SubscribeMetadataResponse) error {
err := processEventFnGenerated(resp)
if err == nil {
return nil
}
if errors.Is(err, http.ErrNotFound) {
glog.V(0).Infof("got 404 error, ignore it: %s", err.Error())
return nil
}
return err
}
} else {
processEventFn = genProcessFunction(sourcePath, targetPath, excludePaths, reExcludeFileName, dataSink, *backupOption.doDeleteFiles, debug)
}
processEventFnWithOffset := pb.AddOffsetFunc(processEventFn, 3*time.Second, func(counter int64, lastTsNs int64) error {
glog.V(0).Infof("backup %s progressed to %v %0.2f/sec", sourceFiler, time.Unix(0, lastTsNs), float64(counter)/float64(3))
return setOffset(grpcDialOption, sourceFiler, BackupKeyPrefix, int32(sinkId), lastTsNs)
})
if dataSink.IsIncremental() && *filerBackupOptions.retentionDays > 0 {
go func() {
for {
now := time.Now()
time.Sleep(time.Hour * 24)
key := util.Join(targetPath, now.Add(-1*time.Hour*24*time.Duration(*filerBackupOptions.retentionDays)).Format("2006-01-02"))
_ = dataSink.DeleteEntry(util.Join(targetPath, key), true, true, nil)
glog.V(0).Infof("incremental backup delete directory:%s", key)
}
}()
}
prefix := sourcePath
if !strings.HasSuffix(prefix, "/") {
prefix = prefix + "/"
}
eventErrorType := pb.RetryForeverOnError
if *backupOption.disableErrorRetry {
eventErrorType = pb.TrivialOnError
}
metadataFollowOption := &pb.MetadataFollowOption{
ClientName: "backup_" + dataSink.GetName(),
ClientId: clientId,
ClientEpoch: clientEpoch,
SelfSignature: 0,
PathPrefix: prefix,
AdditionalPathPrefixes: nil,
DirectoriesToWatch: nil,
StartTsNs: startFrom.UnixNano(),
StopTsNs: 0,
EventErrorType: eventErrorType,
}
return pb.FollowMetadata(sourceFiler, grpcDialOption, metadataFollowOption, processEventFnWithOffset)
}