seaweedfs/weed/mount/weedfs.go

206 lines
6.2 KiB
Go
Raw Normal View History

2022-02-11 12:32:13 +08:00
package mount
import (
2022-02-12 17:54:16 +08:00
"context"
"math/rand"
"os"
"path"
"path/filepath"
"sync/atomic"
"time"
"github.com/hanwen/go-fuse/v2/fuse"
"google.golang.org/grpc"
"github.com/seaweedfs/seaweedfs/weed/filer"
"github.com/seaweedfs/seaweedfs/weed/mount/meta_cache"
"github.com/seaweedfs/seaweedfs/weed/pb"
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
"github.com/seaweedfs/seaweedfs/weed/pb/mount_pb"
"github.com/seaweedfs/seaweedfs/weed/storage/types"
"github.com/seaweedfs/seaweedfs/weed/util"
"github.com/seaweedfs/seaweedfs/weed/util/chunk_cache"
"github.com/seaweedfs/seaweedfs/weed/util/grace"
"github.com/seaweedfs/seaweedfs/weed/wdclient"
2022-02-11 12:32:13 +08:00
"github.com/hanwen/go-fuse/v2/fs"
)
2022-02-11 14:43:55 +08:00
type Option struct {
2022-08-22 03:20:27 +08:00
filerIndex int32 // align memory for atomic read/write
2022-02-11 14:43:55 +08:00
FilerAddresses []pb.ServerAddress
2022-08-22 03:20:27 +08:00
MountDirectory string
2022-02-11 14:43:55 +08:00
GrpcDialOption grpc.DialOption
FilerMountRootPath string
Collection string
Replication string
TtlSec int32
DiskType types.DiskType
ChunkSizeLimit int64
ConcurrentWriters int
CacheDir string
CacheSizeMB int64
DataCenter string
Umask os.FileMode
Quota int64
2022-06-06 11:27:12 +08:00
DisableXAttr bool
2022-02-11 14:43:55 +08:00
MountUid uint32
MountGid uint32
MountMode os.FileMode
MountCtime time.Time
MountMtime time.Time
MountParentInode uint64
VolumeServerAccess string // how to access volume servers
Cipher bool // whether encrypt data on volume server
UidGidMapper *meta_cache.UidGidMapper
uniqueCacheDir string
uniqueCacheTempPageDir string
2022-02-11 14:43:55 +08:00
}
type WFS struct {
2022-03-13 16:14:50 +08:00
// https://dl.acm.org/doi/fullHtml/10.1145/3310148
// follow https://github.com/hanwen/go-fuse/blob/master/fuse/api.go
fuse.RawFileSystem
2022-04-03 06:14:37 +08:00
mount_pb.UnimplementedSeaweedMountServer
2022-02-11 12:32:13 +08:00
fs.Inode
2022-02-14 14:50:44 +08:00
option *Option
metaCache *meta_cache.MetaCache
stats statsCache
chunkCache *chunk_cache.TieredChunkCache
signature int32
concurrentWriters *util.LimitedConcurrentExecutor
inodeToPath *InodeToPath
fhmap *FileHandleToInode
dhmap *DirectoryHandleToInode
fuseServer *fuse.Server
2022-03-06 14:10:43 +08:00
IsOverQuota bool
2022-02-11 14:43:55 +08:00
}
func NewSeaweedFileSystem(option *Option) *WFS {
wfs := &WFS{
RawFileSystem: fuse.NewDefaultRawFileSystem(),
option: option,
signature: util.RandomInt32(),
2022-03-01 04:16:53 +08:00
inodeToPath: NewInodeToPath(util.FullPath(option.FilerMountRootPath)),
2022-02-14 11:14:34 +08:00
fhmap: NewFileHandleToInode(),
dhmap: NewDirectoryHandleToInode(),
2022-02-11 14:43:55 +08:00
}
wfs.option.filerIndex = int32(rand.Intn(len(option.FilerAddresses)))
2022-02-14 14:50:44 +08:00
wfs.option.setupUniqueCacheDirectory()
if option.CacheSizeMB > 0 {
wfs.chunkCache = chunk_cache.NewTieredChunkCache(256, option.getUniqueCacheDir(), option.CacheSizeMB, 1024*1024)
}
2022-03-01 04:16:53 +08:00
wfs.metaCache = meta_cache.NewMetaCache(path.Join(option.getUniqueCacheDir(), "meta"), option.UidGidMapper,
util.FullPath(option.FilerMountRootPath),
func(path util.FullPath) {
wfs.inodeToPath.MarkChildrenCached(path)
}, func(path util.FullPath) bool {
return wfs.inodeToPath.IsChildrenCached(path)
}, func(filePath util.FullPath, entry *filer_pb.Entry) {
})
2022-02-11 14:43:55 +08:00
grace.OnInterrupt(func() {
wfs.metaCache.Shutdown()
2022-02-16 16:39:21 +08:00
os.RemoveAll(option.getUniqueCacheDir())
2022-02-11 14:43:55 +08:00
})
2022-02-14 14:50:44 +08:00
if wfs.option.ConcurrentWriters > 0 {
wfs.concurrentWriters = util.NewLimitedConcurrentExecutor(wfs.option.ConcurrentWriters)
}
2022-02-11 14:43:55 +08:00
return wfs
}
2022-02-14 17:09:31 +08:00
func (wfs *WFS) StartBackgroundTasks() {
startTime := time.Now()
go meta_cache.SubscribeMetaEvents(wfs.metaCache, wfs.signature, wfs, wfs.option.FilerMountRootPath, startTime.UnixNano())
go wfs.loopCheckQuota()
2022-02-14 17:09:31 +08:00
}
2022-02-12 13:35:09 +08:00
func (wfs *WFS) String() string {
return "seaweedfs"
}
func (wfs *WFS) Init(server *fuse.Server) {
wfs.fuseServer = server
}
2022-08-04 16:35:18 +08:00
func (wfs *WFS) maybeReadEntry(inode uint64) (path util.FullPath, fh *FileHandle, entry *filer_pb.Entry, status fuse.Status) {
path, status = wfs.inodeToPath.GetPath(inode)
if status != fuse.OK {
return
}
2022-02-14 17:36:10 +08:00
var found bool
if fh, found = wfs.fhmap.FindFileHandle(inode); found {
entry = fh.UpdateEntry(func(entry *filer_pb.Entry) {
if entry != nil && fh.entry.Attributes == nil {
entry.Attributes = &filer_pb.FuseAttributes{}
}
})
2022-07-25 06:30:55 +08:00
} else {
entry, status = wfs.maybeLoadEntry(path)
2022-02-14 17:36:10 +08:00
}
return
2022-02-12 17:54:16 +08:00
}
func (wfs *WFS) maybeLoadEntry(fullpath util.FullPath) (*filer_pb.Entry, fuse.Status) {
// glog.V(3).Infof("read entry cache miss %s", fullpath)
dir, name := fullpath.DirAndName()
// return a valid entry for the mount root
if string(fullpath) == wfs.option.FilerMountRootPath {
return &filer_pb.Entry{
Name: name,
IsDirectory: true,
Attributes: &filer_pb.FuseAttributes{
Mtime: wfs.option.MountMtime.Unix(),
FileMode: uint32(wfs.option.MountMode),
Uid: wfs.option.MountUid,
Gid: wfs.option.MountGid,
Crtime: wfs.option.MountCtime.Unix(),
},
}, fuse.OK
}
// read from async meta cache
2022-05-25 09:52:04 +08:00
meta_cache.EnsureVisited(wfs.metaCache, wfs, util.FullPath(dir))
2022-02-12 17:54:16 +08:00
cachedEntry, cacheErr := wfs.metaCache.FindEntry(context.Background(), fullpath)
if cacheErr == filer_pb.ErrNotFound {
return nil, fuse.ENOENT
}
2022-02-13 14:41:45 +08:00
return cachedEntry.ToProtoEntry(), fuse.OK
2022-02-12 17:54:16 +08:00
}
2022-02-14 14:50:44 +08:00
func (wfs *WFS) LookupFn() wdclient.LookupFileIdFunctionType {
if wfs.option.VolumeServerAccess == "filerProxy" {
return func(fileId string) (targetUrls []string, err error) {
return []string{"http://" + wfs.getCurrentFiler().ToHttpAddress() + "/?proxyChunkId=" + fileId}, nil
}
}
return filer.LookupFn(wfs)
}
func (wfs *WFS) getCurrentFiler() pb.ServerAddress {
i := atomic.LoadInt32(&wfs.option.filerIndex)
return wfs.option.FilerAddresses[i]
2022-02-14 14:50:44 +08:00
}
2022-02-11 14:43:55 +08:00
func (option *Option) setupUniqueCacheDirectory() {
cacheUniqueId := util.Md5String([]byte(option.MountDirectory + string(option.FilerAddresses[0]) + option.FilerMountRootPath + util.Version()))[0:8]
option.uniqueCacheDir = path.Join(option.CacheDir, cacheUniqueId)
option.uniqueCacheTempPageDir = filepath.Join(option.uniqueCacheDir, "swap")
os.MkdirAll(option.uniqueCacheTempPageDir, os.FileMode(0777)&^option.Umask)
}
func (option *Option) getTempFilePageDir() string {
return option.uniqueCacheTempPageDir
2022-02-11 14:43:55 +08:00
}
func (option *Option) getUniqueCacheDir() string {
return option.uniqueCacheDir
2022-02-11 12:32:13 +08:00
}