seaweedfs/weed/mount/weedfs.go

240 lines
7.4 KiB
Go
Raw Normal View History

2022-02-11 12:32:13 +08:00
package mount
import (
2022-02-12 17:54:16 +08:00
"context"
2024-09-13 13:45:30 +08:00
"errors"
"math/rand"
"os"
"path"
"path/filepath"
"sync/atomic"
"time"
"github.com/hanwen/go-fuse/v2/fuse"
"google.golang.org/grpc"
"github.com/seaweedfs/seaweedfs/weed/filer"
"github.com/seaweedfs/seaweedfs/weed/mount/meta_cache"
"github.com/seaweedfs/seaweedfs/weed/pb"
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
"github.com/seaweedfs/seaweedfs/weed/pb/mount_pb"
"github.com/seaweedfs/seaweedfs/weed/storage/types"
"github.com/seaweedfs/seaweedfs/weed/util"
"github.com/seaweedfs/seaweedfs/weed/util/chunk_cache"
"github.com/seaweedfs/seaweedfs/weed/util/grace"
"github.com/seaweedfs/seaweedfs/weed/wdclient"
2022-02-11 12:32:13 +08:00
"github.com/hanwen/go-fuse/v2/fs"
)
2022-02-11 14:43:55 +08:00
type Option struct {
2022-08-22 03:20:27 +08:00
filerIndex int32 // align memory for atomic read/write
2022-02-11 14:43:55 +08:00
FilerAddresses []pb.ServerAddress
2022-08-22 03:20:27 +08:00
MountDirectory string
2022-02-11 14:43:55 +08:00
GrpcDialOption grpc.DialOption
FilerMountRootPath string
Collection string
Replication string
TtlSec int32
DiskType types.DiskType
ChunkSizeLimit int64
ConcurrentWriters int
2023-08-17 14:47:43 +08:00
CacheDirForRead string
CacheSizeMBForRead int64
CacheDirForWrite string
2022-02-11 14:43:55 +08:00
DataCenter string
Umask os.FileMode
Quota int64
2022-06-06 11:27:12 +08:00
DisableXAttr bool
2022-02-11 14:43:55 +08:00
MountUid uint32
MountGid uint32
MountMode os.FileMode
MountCtime time.Time
MountMtime time.Time
MountParentInode uint64
VolumeServerAccess string // how to access volume servers
Cipher bool // whether encrypt data on volume server
UidGidMapper *meta_cache.UidGidMapper
2023-08-17 14:47:43 +08:00
uniqueCacheDirForRead string
uniqueCacheDirForWrite string
2022-02-11 14:43:55 +08:00
}
type WFS struct {
2022-03-13 16:14:50 +08:00
// https://dl.acm.org/doi/fullHtml/10.1145/3310148
// follow https://github.com/hanwen/go-fuse/blob/master/fuse/api.go
fuse.RawFileSystem
2022-04-03 06:14:37 +08:00
mount_pb.UnimplementedSeaweedMountServer
2022-02-11 12:32:13 +08:00
fs.Inode
2022-02-14 14:50:44 +08:00
option *Option
metaCache *meta_cache.MetaCache
stats statsCache
chunkCache *chunk_cache.TieredChunkCache
signature int32
concurrentWriters *util.LimitedConcurrentExecutor
inodeToPath *InodeToPath
2024-09-13 13:45:30 +08:00
fhMap *FileHandleToInode
dhMap *DirectoryHandleToInode
fuseServer *fuse.Server
2022-03-06 14:10:43 +08:00
IsOverQuota bool
2023-09-22 02:08:26 +08:00
fhLockTable *util.LockTable[FileHandleId]
FilerConf *filer.FilerConf
2022-02-11 14:43:55 +08:00
}
func NewSeaweedFileSystem(option *Option) *WFS {
wfs := &WFS{
RawFileSystem: fuse.NewDefaultRawFileSystem(),
option: option,
signature: util.RandomInt32(),
2022-03-01 04:16:53 +08:00
inodeToPath: NewInodeToPath(util.FullPath(option.FilerMountRootPath)),
2024-09-13 13:45:30 +08:00
fhMap: NewFileHandleToInode(),
dhMap: NewDirectoryHandleToInode(),
2023-09-22 02:08:26 +08:00
fhLockTable: util.NewLockTable[FileHandleId](),
2022-02-11 14:43:55 +08:00
}
wfs.option.filerIndex = int32(rand.Intn(len(option.FilerAddresses)))
2022-02-14 14:50:44 +08:00
wfs.option.setupUniqueCacheDirectory()
2023-08-17 14:47:43 +08:00
if option.CacheSizeMBForRead > 0 {
wfs.chunkCache = chunk_cache.NewTieredChunkCache(256, option.getUniqueCacheDirForRead(), option.CacheSizeMBForRead, 1024*1024)
2022-02-14 14:50:44 +08:00
}
2023-08-17 14:47:43 +08:00
wfs.metaCache = meta_cache.NewMetaCache(path.Join(option.getUniqueCacheDirForRead(), "meta"), option.UidGidMapper,
2022-03-01 04:16:53 +08:00
util.FullPath(option.FilerMountRootPath),
func(path util.FullPath) {
wfs.inodeToPath.MarkChildrenCached(path)
}, func(path util.FullPath) bool {
return wfs.inodeToPath.IsChildrenCached(path)
}, func(filePath util.FullPath, entry *filer_pb.Entry) {
2024-06-07 03:49:33 +08:00
// Find inode if it is not a deleted path
2024-09-13 13:45:30 +08:00
if inode, inodeFound := wfs.inodeToPath.GetInode(filePath); inodeFound {
2024-06-07 03:49:33 +08:00
// Find open file handle
2024-09-13 13:45:30 +08:00
if fh, fhFound := wfs.fhMap.FindFileHandle(inode); fhFound {
2024-06-07 03:49:33 +08:00
fhActiveLock := fh.wfs.fhLockTable.AcquireLock("invalidateFunc", fh.fh, util.ExclusiveLock)
defer fh.wfs.fhLockTable.ReleaseLock(fh.fh, fhActiveLock)
// Recreate dirty pages
fh.dirtyPages.Destroy()
fh.dirtyPages = newPageWriter(fh, wfs.option.ChunkSizeLimit)
// Update handle entry
2024-09-13 13:45:30 +08:00
newEntry, status := wfs.maybeLoadEntry(filePath)
2024-06-07 03:49:33 +08:00
if status == fuse.OK {
2024-09-13 13:45:30 +08:00
if fh.GetEntry().GetEntry() != newEntry {
fh.SetEntry(newEntry)
2024-06-07 03:49:33 +08:00
}
}
}
}
2022-03-01 04:16:53 +08:00
})
2022-02-11 14:43:55 +08:00
grace.OnInterrupt(func() {
wfs.metaCache.Shutdown()
2023-08-17 14:47:43 +08:00
os.RemoveAll(option.getUniqueCacheDirForWrite())
2023-08-17 14:54:23 +08:00
os.RemoveAll(option.getUniqueCacheDirForRead())
2022-02-11 14:43:55 +08:00
})
2022-02-14 14:50:44 +08:00
if wfs.option.ConcurrentWriters > 0 {
wfs.concurrentWriters = util.NewLimitedConcurrentExecutor(wfs.option.ConcurrentWriters)
}
2022-02-11 14:43:55 +08:00
return wfs
}
func (wfs *WFS) StartBackgroundTasks() error {
2024-10-07 03:55:19 +08:00
follower, err := wfs.subscribeFilerConfEvents()
if err != nil {
return err
}
2022-02-14 17:09:31 +08:00
startTime := time.Now()
2024-10-07 03:55:19 +08:00
go meta_cache.SubscribeMetaEvents(wfs.metaCache, wfs.signature, wfs, wfs.option.FilerMountRootPath, startTime.UnixNano(), follower)
go wfs.loopCheckQuota()
return nil
2022-02-14 17:09:31 +08:00
}
2022-02-12 13:35:09 +08:00
func (wfs *WFS) String() string {
return "seaweedfs"
}
func (wfs *WFS) Init(server *fuse.Server) {
wfs.fuseServer = server
}
2022-08-04 16:35:18 +08:00
func (wfs *WFS) maybeReadEntry(inode uint64) (path util.FullPath, fh *FileHandle, entry *filer_pb.Entry, status fuse.Status) {
path, status = wfs.inodeToPath.GetPath(inode)
if status != fuse.OK {
return
}
2022-02-14 17:36:10 +08:00
var found bool
2024-09-13 13:45:30 +08:00
if fh, found = wfs.fhMap.FindFileHandle(inode); found {
entry = fh.UpdateEntry(func(entry *filer_pb.Entry) {
if entry != nil && fh.entry.Attributes == nil {
entry.Attributes = &filer_pb.FuseAttributes{}
}
})
2022-07-25 06:30:55 +08:00
} else {
entry, status = wfs.maybeLoadEntry(path)
2022-02-14 17:36:10 +08:00
}
return
2022-02-12 17:54:16 +08:00
}
func (wfs *WFS) maybeLoadEntry(fullpath util.FullPath) (*filer_pb.Entry, fuse.Status) {
// glog.V(3).Infof("read entry cache miss %s", fullpath)
dir, name := fullpath.DirAndName()
// return a valid entry for the mount root
if string(fullpath) == wfs.option.FilerMountRootPath {
return &filer_pb.Entry{
Name: name,
IsDirectory: true,
Attributes: &filer_pb.FuseAttributes{
Mtime: wfs.option.MountMtime.Unix(),
FileMode: uint32(wfs.option.MountMode),
Uid: wfs.option.MountUid,
Gid: wfs.option.MountGid,
Crtime: wfs.option.MountCtime.Unix(),
},
}, fuse.OK
}
// read from async meta cache
2022-05-25 09:52:04 +08:00
meta_cache.EnsureVisited(wfs.metaCache, wfs, util.FullPath(dir))
2022-02-12 17:54:16 +08:00
cachedEntry, cacheErr := wfs.metaCache.FindEntry(context.Background(), fullpath)
2024-09-13 13:45:30 +08:00
if errors.Is(cacheErr, filer_pb.ErrNotFound) {
2022-02-12 17:54:16 +08:00
return nil, fuse.ENOENT
}
2022-02-13 14:41:45 +08:00
return cachedEntry.ToProtoEntry(), fuse.OK
2022-02-12 17:54:16 +08:00
}
2022-02-14 14:50:44 +08:00
func (wfs *WFS) LookupFn() wdclient.LookupFileIdFunctionType {
if wfs.option.VolumeServerAccess == "filerProxy" {
return func(fileId string) (targetUrls []string, err error) {
return []string{"http://" + wfs.getCurrentFiler().ToHttpAddress() + "/?proxyChunkId=" + fileId}, nil
}
}
return filer.LookupFn(wfs)
}
func (wfs *WFS) getCurrentFiler() pb.ServerAddress {
i := atomic.LoadInt32(&wfs.option.filerIndex)
return wfs.option.FilerAddresses[i]
2022-02-14 14:50:44 +08:00
}
2022-02-11 14:43:55 +08:00
func (option *Option) setupUniqueCacheDirectory() {
cacheUniqueId := util.Md5String([]byte(option.MountDirectory + string(option.FilerAddresses[0]) + option.FilerMountRootPath + util.Version()))[0:8]
2023-08-17 14:47:43 +08:00
option.uniqueCacheDirForRead = path.Join(option.CacheDirForRead, cacheUniqueId)
os.MkdirAll(option.uniqueCacheDirForRead, os.FileMode(0777)&^option.Umask)
option.uniqueCacheDirForWrite = filepath.Join(path.Join(option.CacheDirForWrite, cacheUniqueId), "swap")
os.MkdirAll(option.uniqueCacheDirForWrite, os.FileMode(0777)&^option.Umask)
}
2023-08-17 14:47:43 +08:00
func (option *Option) getUniqueCacheDirForWrite() string {
return option.uniqueCacheDirForWrite
2022-02-11 14:43:55 +08:00
}
2023-08-17 14:47:43 +08:00
func (option *Option) getUniqueCacheDirForRead() string {
return option.uniqueCacheDirForRead
2022-02-11 12:32:13 +08:00
}