seaweedfs/weed/mount/weedfs.go

188 lines
5.5 KiB
Go
Raw Normal View History

2022-02-11 12:32:13 +08:00
package mount
import (
2022-02-12 17:54:16 +08:00
"context"
2022-02-14 14:50:44 +08:00
"github.com/chrislusf/seaweedfs/weed/filer"
2022-02-14 17:09:31 +08:00
"github.com/chrislusf/seaweedfs/weed/mount/meta_cache"
2022-02-11 14:43:55 +08:00
"github.com/chrislusf/seaweedfs/weed/pb"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
"github.com/chrislusf/seaweedfs/weed/storage/types"
"github.com/chrislusf/seaweedfs/weed/util"
2022-02-14 14:50:44 +08:00
"github.com/chrislusf/seaweedfs/weed/util/chunk_cache"
2022-02-11 14:43:55 +08:00
"github.com/chrislusf/seaweedfs/weed/util/grace"
2022-02-14 14:50:44 +08:00
"github.com/chrislusf/seaweedfs/weed/wdclient"
"github.com/hanwen/go-fuse/v2/fuse"
2022-02-11 14:43:55 +08:00
"google.golang.org/grpc"
2022-02-14 14:50:44 +08:00
"math/rand"
2022-02-11 14:43:55 +08:00
"os"
"path"
"path/filepath"
"time"
2022-02-11 12:32:13 +08:00
"github.com/hanwen/go-fuse/v2/fs"
)
2022-02-11 14:43:55 +08:00
type Option struct {
MountDirectory string
FilerAddresses []pb.ServerAddress
filerIndex int
GrpcDialOption grpc.DialOption
FilerMountRootPath string
Collection string
Replication string
TtlSec int32
DiskType types.DiskType
ChunkSizeLimit int64
ConcurrentWriters int
CacheDir string
CacheSizeMB int64
DataCenter string
Umask os.FileMode
MountUid uint32
MountGid uint32
MountMode os.FileMode
MountCtime time.Time
MountMtime time.Time
MountParentInode uint64
VolumeServerAccess string // how to access volume servers
Cipher bool // whether encrypt data on volume server
UidGidMapper *meta_cache.UidGidMapper
uniqueCacheDir string
uniqueCacheTempPageDir string
}
type WFS struct {
// follow https://github.com/hanwen/go-fuse/blob/master/fuse/api.go
fuse.RawFileSystem
2022-02-11 12:32:13 +08:00
fs.Inode
2022-02-14 14:50:44 +08:00
option *Option
metaCache *meta_cache.MetaCache
stats statsCache
root Directory
chunkCache *chunk_cache.TieredChunkCache
signature int32
concurrentWriters *util.LimitedConcurrentExecutor
inodeToPath *InodeToPath
fhmap *FileHandleToInode
2022-02-11 14:43:55 +08:00
}
func NewSeaweedFileSystem(option *Option) *WFS {
wfs := &WFS{
RawFileSystem: fuse.NewDefaultRawFileSystem(),
option: option,
signature: util.RandomInt32(),
2022-02-12 17:54:16 +08:00
inodeToPath: NewInodeToPath(),
2022-02-14 11:14:34 +08:00
fhmap: NewFileHandleToInode(),
2022-02-11 14:43:55 +08:00
}
wfs.root = Directory{
name: "/",
wfs: wfs,
entry: nil,
parent: nil,
}
2022-02-14 14:50:44 +08:00
wfs.option.filerIndex = rand.Intn(len(option.FilerAddresses))
wfs.option.setupUniqueCacheDirectory()
if option.CacheSizeMB > 0 {
wfs.chunkCache = chunk_cache.NewTieredChunkCache(256, option.getUniqueCacheDir(), option.CacheSizeMB, 1024*1024)
}
2022-02-14 17:09:31 +08:00
wfs.metaCache = meta_cache.NewMetaCache(path.Join(option.getUniqueCacheDir(), "meta"), option.UidGidMapper, func(path util.FullPath) {
wfs.inodeToPath.MarkChildrenCached(path)
}, func(path util.FullPath) bool {
return wfs.inodeToPath.IsChildrenCached(path)
}, func(filePath util.FullPath, entry *filer_pb.Entry) {
2022-02-11 14:43:55 +08:00
})
grace.OnInterrupt(func() {
wfs.metaCache.Shutdown()
})
2022-02-14 14:50:44 +08:00
if wfs.option.ConcurrentWriters > 0 {
wfs.concurrentWriters = util.NewLimitedConcurrentExecutor(wfs.option.ConcurrentWriters)
}
2022-02-11 14:43:55 +08:00
return wfs
}
2022-02-14 17:09:31 +08:00
func (wfs *WFS) StartBackgroundTasks() {
startTime := time.Now()
go meta_cache.SubscribeMetaEvents(wfs.metaCache, wfs.signature, wfs, wfs.option.FilerMountRootPath, startTime.UnixNano())
}
func (wfs *WFS) Root() *Directory {
return &wfs.root
}
2022-02-12 13:35:09 +08:00
func (wfs *WFS) String() string {
return "seaweedfs"
}
2022-02-14 17:36:10 +08:00
func (wfs *WFS) maybeReadEntry(inode uint64) (path util.FullPath, fh *FileHandle, entry *filer_pb.Entry, status fuse.Status) {
path = wfs.inodeToPath.GetPath(inode)
2022-02-14 17:36:10 +08:00
var found bool
if fh, found = wfs.fhmap.FindFileHandle(inode); found {
return path, fh, fh.entry, fuse.OK
}
entry, status = wfs.maybeLoadEntry(path)
return
2022-02-12 17:54:16 +08:00
}
func (wfs *WFS) maybeLoadEntry(fullpath util.FullPath) (*filer_pb.Entry, fuse.Status) {
// glog.V(3).Infof("read entry cache miss %s", fullpath)
dir, name := fullpath.DirAndName()
// return a valid entry for the mount root
if string(fullpath) == wfs.option.FilerMountRootPath {
return &filer_pb.Entry{
Name: name,
IsDirectory: true,
Attributes: &filer_pb.FuseAttributes{
Mtime: wfs.option.MountMtime.Unix(),
FileMode: uint32(wfs.option.MountMode),
Uid: wfs.option.MountUid,
Gid: wfs.option.MountGid,
Crtime: wfs.option.MountCtime.Unix(),
},
}, fuse.OK
}
// read from async meta cache
meta_cache.EnsureVisited(wfs.metaCache, wfs, util.FullPath(dir))
cachedEntry, cacheErr := wfs.metaCache.FindEntry(context.Background(), fullpath)
if cacheErr == filer_pb.ErrNotFound {
return nil, fuse.ENOENT
}
2022-02-13 14:41:45 +08:00
return cachedEntry.ToProtoEntry(), fuse.OK
2022-02-12 17:54:16 +08:00
}
2022-02-14 14:50:44 +08:00
func (wfs *WFS) LookupFn() wdclient.LookupFileIdFunctionType {
if wfs.option.VolumeServerAccess == "filerProxy" {
return func(fileId string) (targetUrls []string, err error) {
return []string{"http://" + wfs.getCurrentFiler().ToHttpAddress() + "/?proxyChunkId=" + fileId}, nil
}
}
return filer.LookupFn(wfs)
}
func (wfs *WFS) getCurrentFiler() pb.ServerAddress {
return wfs.option.FilerAddresses[wfs.option.filerIndex]
}
2022-02-11 14:43:55 +08:00
func (option *Option) setupUniqueCacheDirectory() {
cacheUniqueId := util.Md5String([]byte(option.MountDirectory + string(option.FilerAddresses[0]) + option.FilerMountRootPath + util.Version()))[0:8]
option.uniqueCacheDir = path.Join(option.CacheDir, cacheUniqueId)
option.uniqueCacheTempPageDir = filepath.Join(option.uniqueCacheDir, "sw")
os.MkdirAll(option.uniqueCacheTempPageDir, os.FileMode(0777)&^option.Umask)
}
func (option *Option) getTempFilePageDir() string {
return option.uniqueCacheTempPageDir
}
func (option *Option) getUniqueCacheDir() string {
return option.uniqueCacheDir
2022-02-11 12:32:13 +08:00
}