seaweedfs/weed/command/filer_replication.go

149 lines
4.2 KiB
Go
Raw Normal View History

2018-09-17 15:27:56 +08:00
package command
import (
2019-03-16 08:20:24 +08:00
"context"
2018-10-07 04:01:38 +08:00
"strings"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/replication"
"github.com/seaweedfs/seaweedfs/weed/replication/sink"
"github.com/seaweedfs/seaweedfs/weed/replication/sub"
"github.com/seaweedfs/seaweedfs/weed/util"
2018-09-17 15:27:56 +08:00
)
func init() {
cmdFilerReplicate.Run = runFilerReplicate // break init cycle
}
var cmdFilerReplicate = &Command{
UsageLine: "filer.replicate",
Short: "replicate file changes to another destination",
Long: `replicate file changes to another destination
filer.replicate listens on filer notifications. If any file is updated, it will fetch the updated content,
and write to the other destination.
2019-02-10 13:07:12 +08:00
Run "weed scaffold -config=replication" to generate a replication.toml file and customize the parameters.
2018-09-17 15:27:56 +08:00
`,
}
func runFilerReplicate(cmd *Command, args []string) bool {
util.LoadConfiguration("security", false)
util.LoadConfiguration("replication", true)
util.LoadConfiguration("notification", true)
config := util.GetViper()
2018-09-17 15:27:56 +08:00
var notificationInput sub.NotificationInput
2018-09-17 15:27:56 +08:00
2018-12-06 16:44:41 +08:00
validateOneEnabledInput(config)
for _, input := range sub.NotificationInputs {
2018-09-17 15:27:56 +08:00
if config.GetBool("notification." + input.GetName() + ".enabled") {
if err := input.Initialize(config, "notification."+input.GetName()+"."); err != nil {
2018-09-17 15:27:56 +08:00
glog.Fatalf("Failed to initialize notification input for %s: %+v",
input.GetName(), err)
}
glog.V(0).Infof("Configure notification input to %s", input.GetName())
notificationInput = input
break
}
}
2018-10-30 17:29:11 +08:00
if notificationInput == nil {
println("No notification is defined in notification.toml file.")
println("Please follow 'weed scaffold -config=notification' to see example notification configurations.")
2018-10-30 17:29:11 +08:00
return true
}
2018-09-23 15:40:36 +08:00
// avoid recursive replication
if config.GetBool("notification.source.filer.enabled") && config.GetBool("notification.sink.filer.enabled") {
if config.GetString("source.filer.grpcAddress") == config.GetString("sink.filer.grpcAddress") {
fromDir := config.GetString("source.filer.directory")
toDir := config.GetString("sink.filer.directory")
2018-09-23 15:40:36 +08:00
if strings.HasPrefix(toDir, fromDir) {
glog.Fatalf("recursive replication! source directory %s includes the sink directory %s", fromDir, toDir)
}
}
}
dataSink := findSink(config)
2018-10-04 14:36:52 +08:00
2018-10-07 04:01:38 +08:00
if dataSink == nil {
println("no data sink configured in replication.toml:")
2018-10-07 04:01:38 +08:00
for _, sk := range sink.Sinks {
println(" " + sk.GetName())
}
return true
}
replicator := replication.NewReplicator(config, "source.filer.", dataSink)
2018-09-17 15:27:56 +08:00
for {
2021-01-27 03:08:44 +08:00
key, m, onSuccessFn, onFailureFn, err := notificationInput.ReceiveMessage()
2018-09-17 15:27:56 +08:00
if err != nil {
2018-09-17 17:23:21 +08:00
glog.Errorf("receive %s: %+v", key, err)
2021-01-27 03:08:44 +08:00
if onFailureFn != nil {
onFailureFn()
}
2018-09-17 15:27:56 +08:00
continue
}
if key == "" {
// long poll received no messages
2021-01-27 03:08:44 +08:00
if onSuccessFn != nil {
onSuccessFn()
}
continue
}
2018-09-22 15:12:10 +08:00
if m.OldEntry != nil && m.NewEntry == nil {
glog.V(1).Infof("delete: %s", key)
2018-09-22 15:12:10 +08:00
} else if m.OldEntry == nil && m.NewEntry != nil {
glog.V(1).Infof("add: %s", key)
2018-09-22 15:12:10 +08:00
} else {
glog.V(1).Infof("modify: %s", key)
}
2019-03-16 08:20:24 +08:00
if err = replicator.Replicate(context.Background(), key, m); err != nil {
2018-09-17 17:23:21 +08:00
glog.Errorf("replicate %s: %+v", key, err)
2021-01-27 03:08:44 +08:00
if onFailureFn != nil {
onFailureFn()
}
2018-11-05 03:58:59 +08:00
} else {
2018-11-05 04:07:33 +08:00
glog.V(1).Infof("replicated %s", key)
2021-01-27 03:08:44 +08:00
if onSuccessFn != nil {
onSuccessFn()
}
2018-09-17 15:27:56 +08:00
}
}
}
2018-12-06 16:44:41 +08:00
func findSink(config *util.ViperProxy) sink.ReplicationSink {
var dataSink sink.ReplicationSink
for _, sk := range sink.Sinks {
if config.GetBool("sink." + sk.GetName() + ".enabled") {
if err := sk.Initialize(config, "sink."+sk.GetName()+"."); err != nil {
glog.Fatalf("Failed to initialize sink for %s: %+v",
sk.GetName(), err)
}
glog.V(0).Infof("Configure sink to %s", sk.GetName())
dataSink = sk
break
}
}
return dataSink
}
2021-01-12 18:28:13 +08:00
func validateOneEnabledInput(config *util.ViperProxy) {
2018-12-06 16:44:41 +08:00
enabledInput := ""
for _, input := range sub.NotificationInputs {
if config.GetBool("notification." + input.GetName() + ".enabled") {
if enabledInput == "" {
enabledInput = input.GetName()
} else {
glog.Fatalf("Notification input is enabled for both %s and %s", enabledInput, input.GetName())
}
}
}
}