FUSE mount: support multiple filers

fix https://github.com/chrislusf/seaweedfs/issues/2015
fix https://github.com/chrislusf/seaweedfs/issues/1531
This commit is contained in:
Chris Lu 2021-05-21 01:28:00 -07:00
parent 30c67e3652
commit dc1309f084
5 changed files with 72 additions and 27 deletions

View File

@ -51,9 +51,9 @@ func runMount(cmd *Command, args []string) bool {
func RunMount(option *MountOptions, umask os.FileMode) bool {
filer := *option.filer
filers := strings.Split(*option.filer, ",")
// parse filer grpc address
filerGrpcAddress, err := pb.ParseServerToGrpcAddress(filer)
filerGrpcAddresses, err := pb.ParseServersToGrpcAddresses(filers)
if err != nil {
glog.V(0).Infof("ParseFilerGrpcAddress: %v", err)
return true
@ -64,22 +64,22 @@ func RunMount(option *MountOptions, umask os.FileMode) bool {
grpcDialOption := security.LoadClientTLS(util.GetViper(), "grpc.client")
var cipher bool
for i := 0; i < 10; i++ {
err = pb.WithGrpcFilerClient(filerGrpcAddress, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
err = pb.WithOneOfGrpcFilerClients(filerGrpcAddresses, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
resp, err := client.GetFilerConfiguration(context.Background(), &filer_pb.GetFilerConfigurationRequest{})
if err != nil {
return fmt.Errorf("get filer grpc address %s configuration: %v", filerGrpcAddress, err)
return fmt.Errorf("get filer grpc address %v configuration: %v", filerGrpcAddresses, err)
}
cipher = resp.Cipher
return nil
})
if err != nil {
glog.V(0).Infof("failed to talk to filer %s: %v", filerGrpcAddress, err)
glog.V(0).Infof("failed to talk to filer %v: %v", filerGrpcAddresses, err)
glog.V(0).Infof("wait for %d seconds ...", i+1)
time.Sleep(time.Duration(i+1) * time.Second)
}
}
if err != nil {
glog.Errorf("failed to talk to filer %s: %v", filerGrpcAddress, err)
glog.Errorf("failed to talk to filer %v: %v", filerGrpcAddresses, err)
return true
}
@ -145,7 +145,7 @@ func RunMount(option *MountOptions, umask os.FileMode) bool {
options := []fuse.MountOption{
fuse.VolumeName(mountName),
fuse.FSName(filer + ":" + filerMountRootPath),
fuse.FSName(*option.filer + ":" + filerMountRootPath),
fuse.Subtype("seaweedfs"),
// fuse.NoAppleDouble(), // include .DS_Store, otherwise can not delete non-empty folders
fuse.NoAppleXattr(),
@ -181,8 +181,8 @@ func RunMount(option *MountOptions, umask os.FileMode) bool {
seaweedFileSystem := filesys.NewSeaweedFileSystem(&filesys.Option{
MountDirectory: dir,
FilerAddress: filer,
FilerGrpcAddress: filerGrpcAddress,
FilerAddresses: filers,
FilerGrpcAddresses: filerGrpcAddresses,
GrpcDialOption: grpcDialOption,
FilerMountRootPath: mountRoot,
Collection: *option.collection,
@ -218,7 +218,7 @@ func RunMount(option *MountOptions, umask os.FileMode) bool {
c.Close()
})
glog.V(0).Infof("mounted %s%s to %s", filer, mountRoot, dir)
glog.V(0).Infof("mounted %s%s to %v", *option.filer, mountRoot, dir)
server := fs.New(c, nil)
seaweedFileSystem.Server = server
err = server.Serve(seaweedFileSystem)

View File

@ -28,8 +28,9 @@ import (
type Option struct {
MountDirectory string
FilerAddress string
FilerGrpcAddress string
FilerAddresses []string
filerIndex int
FilerGrpcAddresses []string
GrpcDialOption grpc.DialOption
FilerMountRootPath string
Collection string
@ -95,7 +96,7 @@ func NewSeaweedFileSystem(option *Option) *WFS {
},
signature: util.RandomInt32(),
}
cacheUniqueId := util.Md5String([]byte(option.MountDirectory + option.FilerGrpcAddress + option.FilerMountRootPath + util.Version()))[0:8]
cacheUniqueId := util.Md5String([]byte(option.MountDirectory + option.FilerGrpcAddresses[0] + option.FilerMountRootPath + util.Version()))[0:8]
cacheDir := path.Join(option.CacheDir, cacheUniqueId)
if option.CacheSizeMB > 0 {
os.MkdirAll(cacheDir, os.FileMode(0777)&^option.Umask)
@ -259,11 +260,13 @@ func (wfs *WFS) mapPbIdFromLocalToFiler(entry *filer_pb.Entry) {
func (wfs *WFS) LookupFn() wdclient.LookupFileIdFunctionType {
if wfs.option.VolumeServerAccess == "filerProxy" {
return func(fileId string) (targetUrls []string, err error) {
return []string{"http://" + wfs.option.FilerAddress + "/?proxyChunkId=" + fileId}, nil
return []string{"http://" + wfs.getCurrentFiler() + "/?proxyChunkId=" + fileId}, nil
}
}
return filer.LookupFn(wfs)
}
func (wfs *WFS) getCurrentFiler() string {
return wfs.option.FilerAddresses[wfs.option.filerIndex]
}
type NodeWithId uint64

View File

@ -1,6 +1,7 @@
package filesys
import (
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/util"
"google.golang.org/grpc"
@ -10,20 +11,36 @@ import (
var _ = filer_pb.FilerClient(&WFS{})
func (wfs *WFS) WithFilerClient(fn func(filer_pb.SeaweedFilerClient) error) error {
func (wfs *WFS) WithFilerClient(fn func(filer_pb.SeaweedFilerClient) error) (err error) {
err := util.Retry("filer grpc "+wfs.option.FilerGrpcAddress, func() error {
return pb.WithCachedGrpcClient(func(grpcConnection *grpc.ClientConn) error {
client := filer_pb.NewSeaweedFilerClient(grpcConnection)
return fn(client)
}, wfs.option.FilerGrpcAddress, wfs.option.GrpcDialOption)
return util.Retry("filer grpc", func() error {
i := wfs.option.filerIndex
n := len(wfs.option.FilerGrpcAddresses)
for x := 0; x < n; x++ {
filerGrpcAddress := wfs.option.FilerGrpcAddresses[i]
err = pb.WithCachedGrpcClient(func(grpcConnection *grpc.ClientConn) error {
client := filer_pb.NewSeaweedFilerClient(grpcConnection)
return fn(client)
}, filerGrpcAddress, wfs.option.GrpcDialOption)
if err != nil {
glog.V(0).Infof("WithFilerClient %d %v: %v", x, filerGrpcAddress, err)
} else {
wfs.option.filerIndex = i
return nil
}
i++
if i >= n {
i = 0
}
}
return err
})
if err == nil {
return nil
}
return err
}
func (wfs *WFS) AdjustedUrl(location *filer_pb.Location) string {

View File

@ -56,7 +56,7 @@ func (wfs *WFS) saveDataAsChunk(fullPath util.FullPath, writeOnly bool) filer.Sa
fileUrl := fmt.Sprintf("http://%s/%s", host, fileId)
if wfs.option.VolumeServerAccess == "filerProxy" {
fileUrl = fmt.Sprintf("http://%s/?proxyChunkId=%s", wfs.option.FilerAddress, fileId)
fileUrl = fmt.Sprintf("http://%s/?proxyChunkId=%s", wfs.getCurrentFiler(), fileId)
}
uploadResult, err, data := operation.Upload(fileUrl, filename, wfs.option.Cipher, reader, false, "", nil, auth)
if err != nil {

View File

@ -111,6 +111,16 @@ func WithCachedGrpcClient(fn func(*grpc.ClientConn) error, address string, opts
func ParseServerToGrpcAddress(server string) (serverGrpcAddress string, err error) {
return ParseServerAddress(server, 10000)
}
func ParseServersToGrpcAddresses(servers []string) (serverGrpcAddresses []string, err error) {
for _, server := range servers {
if serverGrpcAddress, parseErr := ParseServerToGrpcAddress(server); parseErr == nil {
serverGrpcAddresses = append(serverGrpcAddresses, serverGrpcAddress)
} else {
return nil, parseErr
}
}
return
}
func ParseServerAddress(server string, deltaPort int) (newServerAddress string, err error) {
@ -202,3 +212,18 @@ func WithGrpcFilerClient(filerGrpcAddress string, grpcDialOption grpc.DialOption
}, filerGrpcAddress, grpcDialOption)
}
func WithOneOfGrpcFilerClients(filerGrpcAddresses []string, grpcDialOption grpc.DialOption, fn func(client filer_pb.SeaweedFilerClient) error) (err error) {
for _, filerGrpcAddress := range filerGrpcAddresses {
err = WithCachedGrpcClient(func(grpcConnection *grpc.ClientConn) error {
client := filer_pb.NewSeaweedFilerClient(grpcConnection)
return fn(client)
}, filerGrpcAddress, grpcDialOption)
if err == nil {
return nil
}
}
return err
}