mirror of
https://github.com/seaweedfs/seaweedfs.git
synced 2024-11-23 18:49:17 +08:00
feat(filer.backup): add ignore errors option (#6235)
* feat(filer.backup): add ignore errors option * feat(filer.backup): fix 404 error wrap * feat(filer.backup): fix wrapping function * feat(filer.backup): fix wrapping errors in genProcessFunction * Update weed/command/filer_backup.go * Update weed/command/filer_backup.go * Update weed/command/filer_backup.go --------- Co-authored-by: Max Denushev <denushev@tochka.com> Co-authored-by: Chris Lu <chrislusf@users.noreply.github.com>
This commit is contained in:
parent
3003c9e17e
commit
a5fe6e21bc
@ -1,12 +1,15 @@
|
||||
package command
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"github.com/seaweedfs/seaweedfs/weed/glog"
|
||||
"github.com/seaweedfs/seaweedfs/weed/pb"
|
||||
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
|
||||
"github.com/seaweedfs/seaweedfs/weed/replication/source"
|
||||
"github.com/seaweedfs/seaweedfs/weed/security"
|
||||
"github.com/seaweedfs/seaweedfs/weed/util"
|
||||
"github.com/seaweedfs/seaweedfs/weed/util/http"
|
||||
"google.golang.org/grpc"
|
||||
"regexp"
|
||||
"strings"
|
||||
@ -14,16 +17,18 @@ import (
|
||||
)
|
||||
|
||||
type FilerBackupOptions struct {
|
||||
isActivePassive *bool
|
||||
filer *string
|
||||
path *string
|
||||
excludePaths *string
|
||||
excludeFileName *string
|
||||
debug *bool
|
||||
proxyByFiler *bool
|
||||
doDeleteFiles *bool
|
||||
timeAgo *time.Duration
|
||||
retentionDays *int
|
||||
isActivePassive *bool
|
||||
filer *string
|
||||
path *string
|
||||
excludePaths *string
|
||||
excludeFileName *string
|
||||
debug *bool
|
||||
proxyByFiler *bool
|
||||
doDeleteFiles *bool
|
||||
disableErrorRetry *bool
|
||||
ignore404Error *bool
|
||||
timeAgo *time.Duration
|
||||
retentionDays *int
|
||||
}
|
||||
|
||||
var (
|
||||
@ -41,6 +46,8 @@ func init() {
|
||||
filerBackupOptions.debug = cmdFilerBackup.Flag.Bool("debug", false, "debug mode to print out received files")
|
||||
filerBackupOptions.timeAgo = cmdFilerBackup.Flag.Duration("timeAgo", 0, "start time before now. \"300ms\", \"1.5h\" or \"2h45m\". Valid time units are \"ns\", \"us\" (or \"µs\"), \"ms\", \"s\", \"m\", \"h\"")
|
||||
filerBackupOptions.retentionDays = cmdFilerBackup.Flag.Int("retentionDays", 0, "incremental backup retention days")
|
||||
filerBackupOptions.disableErrorRetry = cmdFilerBackup.Flag.Bool("disableErrorRetry", false, "disables errors retry, only logs will print")
|
||||
filerBackupOptions.ignore404Error = cmdFilerBackup.Flag.Bool("ignore404Error", true, "ignore 404 errors from filer")
|
||||
}
|
||||
|
||||
var cmdFilerBackup = &Command{
|
||||
@ -130,7 +137,23 @@ func doFilerBackup(grpcDialOption grpc.DialOption, backupOption *FilerBackupOpti
|
||||
*backupOption.proxyByFiler)
|
||||
dataSink.SetSourceFiler(filerSource)
|
||||
|
||||
processEventFn := genProcessFunction(sourcePath, targetPath, excludePaths, reExcludeFileName, dataSink, *backupOption.doDeleteFiles, debug)
|
||||
var processEventFn func(*filer_pb.SubscribeMetadataResponse) error
|
||||
if *backupOption.ignore404Error {
|
||||
processEventFnGenerated := genProcessFunction(sourcePath, targetPath, excludePaths, reExcludeFileName, dataSink, *backupOption.doDeleteFiles, debug)
|
||||
processEventFn = func(resp *filer_pb.SubscribeMetadataResponse) error {
|
||||
err := processEventFnGenerated(resp)
|
||||
if err == nil {
|
||||
return nil
|
||||
}
|
||||
if errors.Is(err, http.ErrNotFound) {
|
||||
glog.V(0).Infof("got 404 error, ignore it: %s", err.Error())
|
||||
return nil
|
||||
}
|
||||
return err
|
||||
}
|
||||
} else {
|
||||
processEventFn = genProcessFunction(sourcePath, targetPath, excludePaths, reExcludeFileName, dataSink, *backupOption.doDeleteFiles, debug)
|
||||
}
|
||||
|
||||
processEventFnWithOffset := pb.AddOffsetFunc(processEventFn, 3*time.Second, func(counter int64, lastTsNs int64) error {
|
||||
glog.V(0).Infof("backup %s progressed to %v %0.2f/sec", sourceFiler, time.Unix(0, lastTsNs), float64(counter)/float64(3))
|
||||
@ -154,6 +177,11 @@ func doFilerBackup(grpcDialOption grpc.DialOption, backupOption *FilerBackupOpti
|
||||
prefix = prefix + "/"
|
||||
}
|
||||
|
||||
eventErrorType := pb.RetryForeverOnError
|
||||
if *backupOption.disableErrorRetry {
|
||||
eventErrorType = pb.TrivialOnError
|
||||
}
|
||||
|
||||
metadataFollowOption := &pb.MetadataFollowOption{
|
||||
ClientName: "backup_" + dataSink.GetName(),
|
||||
ClientId: clientId,
|
||||
@ -164,7 +192,7 @@ func doFilerBackup(grpcDialOption grpc.DialOption, backupOption *FilerBackupOpti
|
||||
DirectoriesToWatch: nil,
|
||||
StartTsNs: startFrom.UnixNano(),
|
||||
StopTsNs: 0,
|
||||
EventErrorType: pb.RetryForeverOnError,
|
||||
EventErrorType: eventErrorType,
|
||||
}
|
||||
|
||||
return pb.FollowMetadata(sourceFiler, grpcDialOption, metadataFollowOption, processEventFnWithOffset)
|
||||
|
@ -436,7 +436,7 @@ func genProcessFunction(sourcePath string, targetPath string, excludePaths []str
|
||||
}
|
||||
key := buildKey(dataSink, message, targetPath, sourceNewKey, sourcePath)
|
||||
if err := dataSink.CreateEntry(key, message.NewEntry, message.Signatures); err != nil {
|
||||
return fmt.Errorf("create entry1 : %v", err)
|
||||
return fmt.Errorf("create entry1 : %w", err)
|
||||
} else {
|
||||
return nil
|
||||
}
|
||||
@ -462,13 +462,13 @@ func genProcessFunction(sourcePath string, targetPath string, excludePaths []str
|
||||
|
||||
// not able to find old entry
|
||||
if err = dataSink.DeleteEntry(string(oldKey), message.OldEntry.IsDirectory, false, message.Signatures); err != nil {
|
||||
return fmt.Errorf("delete old entry %v: %v", oldKey, err)
|
||||
return fmt.Errorf("delete old entry %v: %w", oldKey, err)
|
||||
}
|
||||
}
|
||||
// create the new entry
|
||||
newKey := buildKey(dataSink, message, targetPath, sourceNewKey, sourcePath)
|
||||
if err := dataSink.CreateEntry(newKey, message.NewEntry, message.Signatures); err != nil {
|
||||
return fmt.Errorf("create entry2 : %v", err)
|
||||
return fmt.Errorf("create entry2 : %w", err)
|
||||
} else {
|
||||
return nil
|
||||
}
|
||||
@ -486,7 +486,7 @@ func genProcessFunction(sourcePath string, targetPath string, excludePaths []str
|
||||
// new key is in the watched directory
|
||||
key := buildKey(dataSink, message, targetPath, sourceNewKey, sourcePath)
|
||||
if err := dataSink.CreateEntry(key, message.NewEntry, message.Signatures); err != nil {
|
||||
return fmt.Errorf("create entry3 : %v", err)
|
||||
return fmt.Errorf("create entry3 : %w", err)
|
||||
} else {
|
||||
return nil
|
||||
}
|
||||
|
@ -5,8 +5,8 @@ import (
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"github.com/seaweedfs/seaweedfs/weed/util/mem"
|
||||
"github.com/seaweedfs/seaweedfs/weed/util"
|
||||
"github.com/seaweedfs/seaweedfs/weed/util/mem"
|
||||
"io"
|
||||
"net/http"
|
||||
"net/url"
|
||||
@ -16,6 +16,8 @@ import (
|
||||
"github.com/seaweedfs/seaweedfs/weed/glog"
|
||||
)
|
||||
|
||||
var ErrNotFound = fmt.Errorf("not found")
|
||||
|
||||
func Post(url string, values url.Values) ([]byte, error) {
|
||||
r, err := GetGlobalHttpClient().PostForm(url, values)
|
||||
if err != nil {
|
||||
@ -311,7 +313,10 @@ func ReadUrlAsStreamAuthenticated(fileUrl, jwt string, cipherKey []byte, isConte
|
||||
}
|
||||
defer CloseResponse(r)
|
||||
if r.StatusCode >= 400 {
|
||||
retryable = r.StatusCode == http.StatusNotFound || r.StatusCode >= 499
|
||||
if r.StatusCode == http.StatusNotFound {
|
||||
return true, fmt.Errorf("%s: %s: %w", fileUrl, r.Status, ErrNotFound)
|
||||
}
|
||||
retryable = r.StatusCode >= 499
|
||||
return retryable, fmt.Errorf("%s: %s", fileUrl, r.Status)
|
||||
}
|
||||
|
||||
@ -477,4 +482,4 @@ func RetriedFetchChunkData(buffer []byte, urlStrings []string, cipherKey []byte,
|
||||
|
||||
return n, err
|
||||
|
||||
}
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user