package weed_server import ( "context" "fmt" "net/http" "os" "sync" "time" "github.com/chrislusf/seaweedfs/weed/stats" "google.golang.org/grpc" "github.com/chrislusf/seaweedfs/weed/util/grace" "github.com/chrislusf/seaweedfs/weed/operation" "github.com/chrislusf/seaweedfs/weed/pb" "github.com/chrislusf/seaweedfs/weed/pb/master_pb" "github.com/chrislusf/seaweedfs/weed/util" "github.com/chrislusf/seaweedfs/weed/filer" _ "github.com/chrislusf/seaweedfs/weed/filer/cassandra" _ "github.com/chrislusf/seaweedfs/weed/filer/elastic/v7" _ "github.com/chrislusf/seaweedfs/weed/filer/etcd" _ "github.com/chrislusf/seaweedfs/weed/filer/hbase" _ "github.com/chrislusf/seaweedfs/weed/filer/leveldb" _ "github.com/chrislusf/seaweedfs/weed/filer/leveldb2" _ "github.com/chrislusf/seaweedfs/weed/filer/leveldb3" _ "github.com/chrislusf/seaweedfs/weed/filer/mongodb" _ "github.com/chrislusf/seaweedfs/weed/filer/mysql" _ "github.com/chrislusf/seaweedfs/weed/filer/mysql2" _ "github.com/chrislusf/seaweedfs/weed/filer/postgres" _ "github.com/chrislusf/seaweedfs/weed/filer/postgres2" _ "github.com/chrislusf/seaweedfs/weed/filer/redis" _ "github.com/chrislusf/seaweedfs/weed/filer/redis2" "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/notification" _ "github.com/chrislusf/seaweedfs/weed/notification/aws_sqs" _ "github.com/chrislusf/seaweedfs/weed/notification/gocdk_pub_sub" _ "github.com/chrislusf/seaweedfs/weed/notification/google_pub_sub" _ "github.com/chrislusf/seaweedfs/weed/notification/kafka" _ "github.com/chrislusf/seaweedfs/weed/notification/log" "github.com/chrislusf/seaweedfs/weed/security" ) type FilerOption struct { Masters []string Collection string DefaultReplication string DisableDirListing bool MaxMB int DirListingLimit int DataCenter string Rack string DefaultLevelDbDir string DisableHttp bool Host string Port uint32 recursiveDelete bool Cipher bool SaveToFilerLimit int64 Filers []string ConcurrentUploadLimit int64 } type FilerServer struct { option *FilerOption secret security.SigningKey filer *filer.Filer grpcDialOption grpc.DialOption // metrics read from the master metricsAddress string metricsIntervalSec int // notifying clients listenersLock sync.Mutex listenersCond *sync.Cond brokers map[string]map[string]bool brokersLock sync.Mutex inFlightDataSize int64 inFlightDataLimitCond *sync.Cond } func NewFilerServer(defaultMux, readonlyMux *http.ServeMux, option *FilerOption) (fs *FilerServer, err error) { fs = &FilerServer{ option: option, grpcDialOption: security.LoadClientTLS(util.GetViper(), "grpc.filer"), brokers: make(map[string]map[string]bool), inFlightDataLimitCond: sync.NewCond(new(sync.Mutex)), } fs.listenersCond = sync.NewCond(&fs.listenersLock) if len(option.Masters) == 0 { glog.Fatal("master list is required!") } fs.filer = filer.NewFiler(option.Masters, fs.grpcDialOption, option.Host, option.Port, option.Collection, option.DefaultReplication, option.DataCenter, func() { fs.listenersCond.Broadcast() }) fs.filer.Cipher = option.Cipher fs.checkWithMaster() go stats.LoopPushingMetric("filer", stats.SourceName(fs.option.Port), fs.metricsAddress, fs.metricsIntervalSec) go fs.filer.KeepConnectedToMaster() v := util.GetViper() if !util.LoadConfiguration("filer", false) { v.Set("leveldb2.enabled", true) v.Set("leveldb2.dir", option.DefaultLevelDbDir) _, err := os.Stat(option.DefaultLevelDbDir) if os.IsNotExist(err) { os.MkdirAll(option.DefaultLevelDbDir, 0755) } glog.V(0).Infof("default to create filer store dir in %s", option.DefaultLevelDbDir) } else { glog.Warningf("skipping default store dir in %s", option.DefaultLevelDbDir) } util.LoadConfiguration("notification", false) fs.option.recursiveDelete = v.GetBool("filer.options.recursive_delete") v.SetDefault("filer.options.buckets_folder", "/buckets") fs.filer.DirBucketsPath = v.GetString("filer.options.buckets_folder") // TODO deprecated, will be be removed after 2020-12-31 // replaced by https://github.com/chrislusf/seaweedfs/wiki/Path-Specific-Configuration // fs.filer.FsyncBuckets = v.GetStringSlice("filer.options.buckets_fsync") fs.filer.LoadConfiguration(v) notification.LoadConfiguration(v, "notification.") handleStaticResources(defaultMux) if !option.DisableHttp { defaultMux.HandleFunc("/", fs.filerHandler) } if defaultMux != readonlyMux { handleStaticResources(readonlyMux) readonlyMux.HandleFunc("/", fs.readonlyFilerHandler) } fs.filer.AggregateFromPeers(fmt.Sprintf("%s:%d", option.Host, option.Port), option.Filers) fs.filer.LoadBuckets() fs.filer.LoadFilerConf() grace.OnInterrupt(func() { fs.filer.Shutdown() }) return fs, nil } func (fs *FilerServer) checkWithMaster() { for _, master := range fs.option.Masters { _, err := pb.ParseServerToGrpcAddress(master) if err != nil { glog.Fatalf("invalid master address %s: %v", master, err) } } isConnected := false for !isConnected { for _, master := range fs.option.Masters { readErr := operation.WithMasterServerClient(master, fs.grpcDialOption, func(masterClient master_pb.SeaweedClient) error { resp, err := masterClient.GetMasterConfiguration(context.Background(), &master_pb.GetMasterConfigurationRequest{}) if err != nil { return fmt.Errorf("get master %s configuration: %v", master, err) } fs.metricsAddress, fs.metricsIntervalSec = resp.MetricsAddress, int(resp.MetricsIntervalSeconds) if fs.option.DefaultReplication == "" { fs.option.DefaultReplication = resp.DefaultReplication } return nil }) if readErr == nil { isConnected = true } else { time.Sleep(7 * time.Second) } } } }