mount: add mode to run external to SeaweedFS container cluster

This commit is contained in:
Chris Lu 2020-02-26 16:46:01 -08:00
parent 543cf1c80e
commit 0156e2975a
8 changed files with 71 additions and 35 deletions

View File

@ -18,6 +18,7 @@ type MountOptions struct {
dataCenter *string dataCenter *string
allowOthers *bool allowOthers *bool
umaskString *string umaskString *string
outsideContainerClusterMode *bool
} }
var ( var (
@ -41,6 +42,7 @@ func init() {
mountOptions.umaskString = cmdMount.Flag.String("umask", "022", "octal umask, e.g., 022, 0111") mountOptions.umaskString = cmdMount.Flag.String("umask", "022", "octal umask, e.g., 022, 0111")
mountCpuProfile = cmdMount.Flag.String("cpuprofile", "", "cpu profile output file") mountCpuProfile = cmdMount.Flag.String("cpuprofile", "", "cpu profile output file")
mountMemProfile = cmdMount.Flag.String("memprofile", "", "memory profile output file") mountMemProfile = cmdMount.Flag.String("memprofile", "", "memory profile output file")
mountOptions.outsideContainerClusterMode = cmdMount.Flag.Bool("outsideContainerClusterMode", false, "allows other users to access the file system")
} }
var cmdMount = &Command{ var cmdMount = &Command{
@ -58,6 +60,12 @@ var cmdMount = &Command{
On OS X, it requires OSXFUSE (http://osxfuse.github.com/). On OS X, it requires OSXFUSE (http://osxfuse.github.com/).
If the SeaweedFS systemm runs in a container cluster, e.g. managed by kubernetes or docker compose,
the volume servers are not accessible by their own ip addresses.
In "outsideContainerClusterMode", the mount will use the filer ip address instead, assuming:
* All volume server containers are accessible through the same hostname or IP address as the filer.
* All volume server container ports are open external to the cluster.
`, `,
} }

View File

@ -46,11 +46,12 @@ func runMount(cmd *Command, args []string) bool {
*mountOptions.ttlSec, *mountOptions.ttlSec,
*mountOptions.dirListCacheLimit, *mountOptions.dirListCacheLimit,
os.FileMode(umask), os.FileMode(umask),
*mountOptions.outsideContainerClusterMode,
) )
} }
func RunMount(filer, filerMountRootPath, dir, collection, replication, dataCenter string, chunkSizeLimitMB int, func RunMount(filer, filerMountRootPath, dir, collection, replication, dataCenter string, chunkSizeLimitMB int,
allowOthers bool, ttlSec int, dirListCacheLimit int64, umask os.FileMode) bool { allowOthers bool, ttlSec int, dirListCacheLimit int64, umask os.FileMode, outsideContainerClusterMode bool) bool {
util.LoadConfiguration("security", false) util.LoadConfiguration("security", false)
@ -180,6 +181,7 @@ func RunMount(filer, filerMountRootPath, dir, collection, replication, dataCente
MountCtime: fileInfo.ModTime(), MountCtime: fileInfo.ModTime(),
MountMtime: time.Now(), MountMtime: time.Now(),
Umask: umask, Umask: umask,
OutsideContainerClusterMode: outsideContainerClusterMode,
})) }))
if err != nil { if err != nil {
fuse.Unmount(dir) fuse.Unmount(dir)

View File

@ -23,6 +23,7 @@ func VolumeId(fileId string) string {
type FilerClient interface { type FilerClient interface {
WithFilerClient(fn func(filer_pb.SeaweedFilerClient) error) error WithFilerClient(fn func(filer_pb.SeaweedFilerClient) error) error
AdjustedUrl(hostAndPort string) string
} }
func ReadIntoBuffer(filerClient FilerClient, fullFilePath FullPath, buff []byte, chunkViews []*ChunkView, baseOffset int64) (totalRead int64, err error) { func ReadIntoBuffer(filerClient FilerClient, fullFilePath FullPath, buff []byte, chunkViews []*ChunkView, baseOffset int64) (totalRead int64, err error) {
@ -67,9 +68,10 @@ func ReadIntoBuffer(filerClient FilerClient, fullFilePath FullPath, buff []byte,
return return
} }
volumeServerAddress := filerClient.AdjustedUrl(locations.Locations[0].Url)
var n int64 var n int64
n, err = util.ReadUrl( n, err = util.ReadUrl(
fmt.Sprintf("http://%s/%s", locations.Locations[0].Url, chunkView.FileId), fmt.Sprintf("http://%s/%s", volumeServerAddress, chunkView.FileId),
chunkView.Offset, chunkView.Offset,
int(chunkView.Size), int(chunkView.Size),
buff[chunkView.LogicOffset-baseOffset:chunkView.LogicOffset-baseOffset+int64(chunkView.Size)], buff[chunkView.LogicOffset-baseOffset:chunkView.LogicOffset-baseOffset+int64(chunkView.Size)],
@ -77,10 +79,10 @@ func ReadIntoBuffer(filerClient FilerClient, fullFilePath FullPath, buff []byte,
if err != nil { if err != nil {
glog.V(0).Infof("%v read http://%s/%v %v bytes: %v", fullFilePath, locations.Locations[0].Url, chunkView.FileId, n, err) glog.V(0).Infof("%v read http://%s/%v %v bytes: %v", fullFilePath, volumeServerAddress, chunkView.FileId, n, err)
err = fmt.Errorf("failed to read http://%s/%s: %v", err = fmt.Errorf("failed to read http://%s/%s: %v",
locations.Locations[0].Url, chunkView.FileId, err) volumeServerAddress, chunkView.FileId, err)
return return
} }

View File

@ -165,6 +165,7 @@ func (pages *ContinuousDirtyPages) saveToStorage(reader io.Reader, offset int64,
} }
fileId, host, auth = resp.FileId, resp.Url, security.EncodedJwt(resp.Auth) fileId, host, auth = resp.FileId, resp.Url, security.EncodedJwt(resp.Auth)
host = pages.f.wfs.AdjustedUrl(host)
pages.collection, pages.replication = resp.Collection, resp.Replication pages.collection, pages.replication = resp.Collection, resp.Replication
return nil return nil

View File

@ -5,6 +5,7 @@ import (
"fmt" "fmt"
"math" "math"
"os" "os"
"strings"
"sync" "sync"
"time" "time"
@ -37,6 +38,9 @@ type Option struct {
MountMode os.FileMode MountMode os.FileMode
MountCtime time.Time MountCtime time.Time
MountMtime time.Time MountMtime time.Time
// whether the mount runs outside SeaweedFS containers
OutsideContainerClusterMode bool
} }
var _ = fs.FS(&WFS{}) var _ = fs.FS(&WFS{})
@ -247,5 +251,17 @@ func (wfs *WFS) forgetNode(fullpath filer2.FullPath) {
defer wfs.nodesLock.Unlock() defer wfs.nodesLock.Unlock()
delete(wfs.nodes, fullpath.AsInode()) delete(wfs.nodes, fullpath.AsInode())
}
func (wfs *WFS) AdjustedUrl(hostAndPort string) string {
if !wfs.option.OutsideContainerClusterMode {
return hostAndPort
}
commaIndex := strings.Index(hostAndPort, ":")
if commaIndex < 0 {
return hostAndPort
}
filerCommaIndex := strings.Index(wfs.option.FilerGrpcAddress, ":")
return fmt.Sprintf("%s:%s", wfs.option.FilerGrpcAddress[:filerCommaIndex], hostAndPort[commaIndex+1:])
} }

View File

@ -3,11 +3,12 @@ package filesys
import ( import (
"context" "context"
"google.golang.org/grpc"
"github.com/chrislusf/seaweedfs/weed/filer2" "github.com/chrislusf/seaweedfs/weed/filer2"
"github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/operation" "github.com/chrislusf/seaweedfs/weed/operation"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb" "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
"google.golang.org/grpc"
) )
func (wfs *WFS) deleteFileChunks(chunks []*filer_pb.FileChunk) { func (wfs *WFS) deleteFileChunks(chunks []*filer_pb.FileChunk) {
@ -21,12 +22,12 @@ func (wfs *WFS) deleteFileChunks(chunks []*filer_pb.FileChunk) {
} }
wfs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { wfs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
deleteFileIds(wfs.option.GrpcDialOption, client, fileIds) wfs.deleteFileIds(wfs.option.GrpcDialOption, client, fileIds)
return nil return nil
}) })
} }
func deleteFileIds(grpcDialOption grpc.DialOption, client filer_pb.SeaweedFilerClient, fileIds []string) error { func (wfs *WFS) deleteFileIds(grpcDialOption grpc.DialOption, client filer_pb.SeaweedFilerClient, fileIds []string) error {
var vids []string var vids []string
for _, fileId := range fileIds { for _, fileId := range fileIds {
@ -56,7 +57,7 @@ func deleteFileIds(grpcDialOption grpc.DialOption, client filer_pb.SeaweedFilerC
} }
for _, loc := range locations.Locations { for _, loc := range locations.Locations {
lr.Locations = append(lr.Locations, operation.Location{ lr.Locations = append(lr.Locations, operation.Location{
Url: loc.Url, Url: wfs.AdjustedUrl(loc.Url),
PublicUrl: loc.PublicUrl, PublicUrl: loc.PublicUrl,
}) })
} }

View File

@ -104,6 +104,9 @@ func (fs *WebDavFileSystem) WithFilerClient(fn func(filer_pb.SeaweedFilerClient)
}, fs.option.FilerGrpcAddress, fs.option.GrpcDialOption) }, fs.option.FilerGrpcAddress, fs.option.GrpcDialOption)
} }
func (fs *WebDavFileSystem) AdjustedUrl(hostAndPort string) string {
return hostAndPort
}
func clearName(name string) (string, error) { func clearName(name string) (string, error) {
slashed := strings.HasSuffix(name, "/") slashed := strings.HasSuffix(name, "/")

View File

@ -105,3 +105,6 @@ func (env *CommandEnv) getFilerClient(filerServer string, filerPort int64) *comm
func (c *commandFilerClient) WithFilerClient(fn func(filer_pb.SeaweedFilerClient) error) error { func (c *commandFilerClient) WithFilerClient(fn func(filer_pb.SeaweedFilerClient) error) error {
return c.env.withFilerClient(c.filerServer, c.filerPort, fn) return c.env.withFilerClient(c.filerServer, c.filerPort, fn)
} }
func (c *commandFilerClient) AdjustedUrl(hostAndPort string) string {
return hostAndPort
}