seaweedfs/weed/server/filer_server.go

188 lines
5.8 KiB
Go
Raw Normal View History

2014-03-31 02:28:04 +08:00
package weed_server
import (
2019-06-24 06:29:49 +08:00
"context"
"fmt"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
2018-08-13 16:22:32 +08:00
"net/http"
"os"
2020-03-30 16:19:33 +08:00
"sync"
2019-06-24 06:29:49 +08:00
"time"
2018-08-13 16:22:32 +08:00
"github.com/chrislusf/seaweedfs/weed/stats"
"google.golang.org/grpc"
2020-05-05 17:05:28 +08:00
"github.com/chrislusf/seaweedfs/weed/util/grace"
2019-06-24 06:29:49 +08:00
"github.com/chrislusf/seaweedfs/weed/operation"
"github.com/chrislusf/seaweedfs/weed/pb"
2019-06-24 06:29:49 +08:00
"github.com/chrislusf/seaweedfs/weed/pb/master_pb"
"github.com/chrislusf/seaweedfs/weed/util"
2020-09-01 15:21:19 +08:00
"github.com/chrislusf/seaweedfs/weed/filer"
_ "github.com/chrislusf/seaweedfs/weed/filer/cassandra"
2020-09-03 17:05:26 +08:00
_ "github.com/chrislusf/seaweedfs/weed/filer/elastic/v7"
2020-09-01 15:21:19 +08:00
_ "github.com/chrislusf/seaweedfs/weed/filer/etcd"
2020-12-24 13:49:01 +08:00
_ "github.com/chrislusf/seaweedfs/weed/filer/hbase"
2020-09-01 15:21:19 +08:00
_ "github.com/chrislusf/seaweedfs/weed/filer/leveldb"
_ "github.com/chrislusf/seaweedfs/weed/filer/leveldb2"
_ "github.com/chrislusf/seaweedfs/weed/filer/leveldb3"
2020-09-01 15:21:19 +08:00
_ "github.com/chrislusf/seaweedfs/weed/filer/mongodb"
_ "github.com/chrislusf/seaweedfs/weed/filer/mysql"
2021-01-20 09:21:50 +08:00
_ "github.com/chrislusf/seaweedfs/weed/filer/mysql2"
2020-09-01 15:21:19 +08:00
_ "github.com/chrislusf/seaweedfs/weed/filer/postgres"
2021-01-20 10:07:29 +08:00
_ "github.com/chrislusf/seaweedfs/weed/filer/postgres2"
2020-09-01 15:21:19 +08:00
_ "github.com/chrislusf/seaweedfs/weed/filer/redis"
_ "github.com/chrislusf/seaweedfs/weed/filer/redis2"
2021-10-04 16:01:31 +08:00
_ "github.com/chrislusf/seaweedfs/weed/filer/redis3"
2021-07-01 16:21:14 +08:00
_ "github.com/chrislusf/seaweedfs/weed/filer/sqlite"
2018-08-13 16:22:32 +08:00
"github.com/chrislusf/seaweedfs/weed/glog"
2018-09-16 16:18:30 +08:00
"github.com/chrislusf/seaweedfs/weed/notification"
_ "github.com/chrislusf/seaweedfs/weed/notification/aws_sqs"
2019-07-17 16:24:20 +08:00
_ "github.com/chrislusf/seaweedfs/weed/notification/gocdk_pub_sub"
_ "github.com/chrislusf/seaweedfs/weed/notification/google_pub_sub"
2018-11-01 16:12:21 +08:00
_ "github.com/chrislusf/seaweedfs/weed/notification/kafka"
2018-09-16 16:18:30 +08:00
_ "github.com/chrislusf/seaweedfs/weed/notification/log"
2018-07-22 08:39:10 +08:00
"github.com/chrislusf/seaweedfs/weed/security"
2014-03-31 02:28:04 +08:00
)
2018-07-07 17:18:47 +08:00
type FilerOption struct {
Masters []pb.ServerAddress
Collection string
DefaultReplication string
DisableDirListing bool
MaxMB int
DirListingLimit int
DataCenter string
Rack string
DataNode string
DefaultLevelDbDir string
DisableHttp bool
Host pb.ServerAddress
recursiveDelete bool
Cipher bool
SaveToFilerLimit int64
ConcurrentUploadLimit int64
2018-07-07 17:18:47 +08:00
}
2014-03-31 02:28:04 +08:00
type FilerServer struct {
filer_pb.UnimplementedSeaweedFilerServer
2019-02-19 04:11:52 +08:00
option *FilerOption
secret security.SigningKey
2020-09-01 15:21:19 +08:00
filer *filer.Filer
2019-02-19 04:11:52 +08:00
grpcDialOption grpc.DialOption
2020-03-30 16:19:33 +08:00
2020-09-17 21:43:54 +08:00
// metrics read from the master
metricsAddress string
metricsIntervalSec int
2020-03-30 16:19:33 +08:00
// notifying clients
2020-04-05 15:51:16 +08:00
listenersLock sync.Mutex
listenersCond *sync.Cond
2020-05-05 17:05:28 +08:00
2020-05-08 17:47:22 +08:00
brokers map[string]map[string]bool
2020-05-05 17:05:28 +08:00
brokersLock sync.Mutex
inFlightDataSize int64
inFlightDataLimitCond *sync.Cond
2014-03-31 02:28:04 +08:00
}
2018-07-07 17:18:47 +08:00
func NewFilerServer(defaultMux, readonlyMux *http.ServeMux, option *FilerOption) (fs *FilerServer, err error) {
2018-10-08 01:54:05 +08:00
2014-03-31 02:28:04 +08:00
fs = &FilerServer{
option: option,
grpcDialOption: security.LoadClientTLS(util.GetViper(), "grpc.filer"),
brokers: make(map[string]map[string]bool),
inFlightDataLimitCond: sync.NewCond(new(sync.Mutex)),
2014-03-31 02:28:04 +08:00
}
2020-04-05 15:51:16 +08:00
fs.listenersCond = sync.NewCond(&fs.listenersLock)
2018-07-07 17:18:47 +08:00
if len(option.Masters) == 0 {
glog.Fatal("master list is required!")
}
fs.filer = filer.NewFiler(option.Masters, fs.grpcDialOption, option.Host, option.Collection, option.DefaultReplication, option.DataCenter, func() {
fs.listenersCond.Broadcast()
})
fs.filer.Cipher = option.Cipher
fs.checkWithMaster()
2020-04-03 15:40:54 +08:00
go stats.LoopPushingMetric("filer", string(fs.option.Host), fs.metricsAddress, fs.metricsIntervalSec)
2021-11-07 05:23:35 +08:00
go fs.filer.KeepMasterClientConnected()
v := util.GetViper()
if !util.LoadConfiguration("filer", false) {
2019-06-30 15:44:57 +08:00
v.Set("leveldb2.enabled", true)
v.Set("leveldb2.dir", option.DefaultLevelDbDir)
_, err := os.Stat(option.DefaultLevelDbDir)
if os.IsNotExist(err) {
os.MkdirAll(option.DefaultLevelDbDir, 0755)
}
glog.V(0).Infof("default to create filer store dir in %s", option.DefaultLevelDbDir)
} else {
glog.Warningf("skipping default store dir in %s", option.DefaultLevelDbDir)
}
util.LoadConfiguration("notification", false)
2018-05-14 14:56:16 +08:00
fs.option.recursiveDelete = v.GetBool("filer.options.recursive_delete")
2020-04-07 16:58:48 +08:00
v.SetDefault("filer.options.buckets_folder", "/buckets")
2020-04-07 16:30:53 +08:00
fs.filer.DirBucketsPath = v.GetString("filer.options.buckets_folder")
// TODO deprecated, will be be removed after 2020-12-31
2020-11-17 08:57:31 +08:00
// replaced by https://github.com/chrislusf/seaweedfs/wiki/Path-Specific-Configuration
// fs.filer.FsyncBuckets = v.GetStringSlice("filer.options.buckets_fsync")
fs.filer.LoadConfiguration(v)
notification.LoadConfiguration(v, "notification.")
2018-08-13 16:20:49 +08:00
2018-10-08 01:54:05 +08:00
handleStaticResources(defaultMux)
if !option.DisableHttp {
defaultMux.HandleFunc("/", fs.filerHandler)
}
2017-05-28 11:14:22 +08:00
if defaultMux != readonlyMux {
handleStaticResources(readonlyMux)
2017-05-28 11:14:22 +08:00
readonlyMux.HandleFunc("/", fs.readonlyFilerHandler)
}
2014-03-31 02:28:04 +08:00
2021-11-07 05:23:35 +08:00
fs.filer.AggregateFromPeers(option.Host)
2020-04-12 14:37:10 +08:00
fs.filer.LoadBuckets()
2020-11-16 06:06:03 +08:00
fs.filer.LoadFilerConf()
fs.filer.LoadRemoteStorageConfAndMapping()
grace.OnInterrupt(func() {
2020-03-15 11:30:26 +08:00
fs.filer.Shutdown()
})
2019-06-24 06:29:49 +08:00
return fs, nil
}
func (fs *FilerServer) checkWithMaster() {
2019-06-24 06:29:49 +08:00
isConnected := false
for !isConnected {
2020-09-17 21:43:54 +08:00
for _, master := range fs.option.Masters {
readErr := operation.WithMasterServerClient(master, fs.grpcDialOption, func(masterClient master_pb.SeaweedClient) error {
resp, err := masterClient.GetMasterConfiguration(context.Background(), &master_pb.GetMasterConfigurationRequest{})
if err != nil {
return fmt.Errorf("get master %s configuration: %v", master, err)
}
fs.metricsAddress, fs.metricsIntervalSec = resp.MetricsAddress, int(resp.MetricsIntervalSeconds)
if fs.option.DefaultReplication == "" {
fs.option.DefaultReplication = resp.DefaultReplication
}
return nil
})
2020-04-03 15:47:48 +08:00
if readErr == nil {
isConnected = true
} else {
time.Sleep(7 * time.Second)
}
2019-06-24 06:29:49 +08:00
}
}
2014-03-31 02:28:04 +08:00
}