mirror of
https://github.com/seaweedfs/seaweedfs.git
synced 2025-01-12 07:42:16 +08:00
952afd810c
* reduce locks to avoid dead lock Flush->FlushData->uplloadPipeline.FluahAll uploaderCount>0 goroutine 1 [sync.Cond.Wait, 71 minutes]: sync.runtime_notifyListWait(0xc0007ae4d0, 0x0) /usr/local/go/src/runtime/sema.go:569 +0x159 sync.(*Cond).Wait(0xc001a59290?) /usr/local/go/src/sync/cond.go:70 +0x85 github.com/seaweedfs/seaweedfs/weed/mount/page_writer.(*UploadPipeline).waitForCurrentWritersToComplete(0xc0002ee4d0) /github/workspace/weed/mount/page_writer/upload_pipeline_lock.go:58 +0x32 github.com/seaweedfs/seaweedfs/weed/mount/page_writer.(*UploadPipeline).FlushAll(0xc0002ee4d0) /github/workspace/weed/mount/page_writer/upload_pipeline.go:151 +0x25 github.com/seaweedfs/seaweedfs/weed/mount.(*ChunkedDirtyPages).FlushData(0xc00087e840) /github/workspace/weed/mount/dirty_pages_chunked.go:54 +0x29 github.com/seaweedfs/seaweedfs/weed/mount.(*PageWriter).FlushData(...) /github/workspace/weed/mount/page_writer.go:50 github.com/seaweedfs/seaweedfs/weed/mount.(*WFS).doFlush(0xc0006ad600, 0xc00030d380, 0x0, 0x0) /github/workspace/weed/mount/weedfs_file_sync.go:101 +0x169 github.com/seaweedfs/seaweedfs/weed/mount.(*WFS).Flush(0xc0006ad600, 0xc001a594a8?, 0xc0004c1ca0) /github/workspace/weed/mount/weedfs_file_sync.go:59 +0x48 github.com/hanwen/go-fuse/v2/fuse.doFlush(0xc0000da870?, 0xc0004c1b08) SaveContent -> MemChunk.RLock -> ChunkedDirtyPages.saveChunkedFileIntervalToStorage pages.fh.AddChunks([]*filer_pb.FileChunk{chunk}) fh.entryLock.Lock() sync.(*RWMutex).Lock(0x0?) /usr/local/go/src/sync/rwmutex.go:146 +0x31 github.com/seaweedfs/seaweedfs/weed/mount.(*FileHandle).AddChunks(0xc00030d380, {0xc00028bdc8, 0x1, 0x1}) /github/workspace/weed/mount/filehandle.go:93 +0x45 github.com/seaweedfs/seaweedfs/weed/mount.(*ChunkedDirtyPages).saveChunkedFileIntervalToStorage(0xc00087e840, {0x2be7ac0, 0xc00018d9e0}, 0x0, 0x121, 0x17e3c624565ace45, 0x1?) /github/workspace/weed/mount/dirty_pages_chunked.go:80 +0x2d4 github.com/seaweedfs/seaweedfs/weed/mount/page_writer.(*MemChunk).SaveContent(0xc0008d9130, 0xc0008093e0) /github/workspace/weed/mount/page_writer/page_chunk_mem.go:115 +0x112 github.com/seaweedfs/seaweedfs/weed/mount/page_writer.(*UploadPipeline).moveToSealed.func1() /github/workspace/weed/mount/page_writer/upload_pipeline.go:187 +0x55 github.com/seaweedfs/seaweedfs/weed/util.(*LimitedConcurrentExecutor).Execute.func1() /github/workspace/weed/util/limited_executor.go:38 +0x62 created by github.com/seaweedfs/seaweedfs/weed/util.(*LimitedConcurrentExecutor).Execute in goroutine 1 /github/workspace/weed/util/limited_executor.go:33 +0x97 On metadata update fh.entryLock.Lock() fh.dirtyPages.Destroy() up.chunksLock.Lock => each sealed chunk.FreeReference => MemChunk.Lock goroutine 134 [sync.RWMutex.Lock, 71 minutes]: sync.runtime_SemacquireRWMutex(0xc0007c3558?, 0xea?, 0x3fb0800?) /usr/local/go/src/runtime/sema.go:87 +0x25 sync.(*RWMutex).Lock(0xc0007c35a8?) /usr/local/go/src/sync/rwmutex.go:151 +0x6a github.com/seaweedfs/seaweedfs/weed/mount/page_writer.(*MemChunk).FreeResource(0xc0008d9130) /github/workspace/weed/mount/page_writer/page_chunk_mem.go:38 +0x2a github.com/seaweedfs/seaweedfs/weed/mount/page_writer.(*SealedChunk).FreeReference(0xc00071cdb0, {0xc0006ba1a0, 0x20}) /github/workspace/weed/mount/page_writer/upload_pipeline.go:38 +0xb7 github.com/seaweedfs/seaweedfs/weed/mount/page_writer.(*UploadPipeline).Shutdown(0xc0002ee4d0) /github/workspace/weed/mount/page_writer/upload_pipeline.go:220 +0x185 github.com/seaweedfs/seaweedfs/weed/mount.(*ChunkedDirtyPages).Destroy(0xc0008cea40?) /github/workspace/weed/mount/dirty_pages_chunked.go:87 +0x17 github.com/seaweedfs/seaweedfs/weed/mount.(*PageWriter).Destroy(...) /github/workspace/weed/mount/page_writer.go:78 github.com/seaweedfs/seaweedfs/weed/mount.NewSeaweedFileSystem.func3({0xc00069a6c0, 0x30}, 0x6?) /github/workspace/weed/mount/weedfs.go:119 +0x17a github.com/seaweedfs/seaweedfs/weed/mount/meta_cache.NewMetaCache.func1({0xc00069a6c0?, 0xc00069a480?}, 0x4015b40?) /github/workspace/weed/mount/meta_cache/meta_cache.go:37 +0x1c github.com/seaweedfs/seaweedfs/weed/mount/meta_cache.SubscribeMetaEvents.func1(0xc000661810) /github/workspace/weed/mount/meta_cache/meta_cache_subscribe.go:43 +0x570 * use locked entry everywhere * modifiable remote entry * skip locking after getting lock from fhLockTable
231 lines
7.2 KiB
Go
231 lines
7.2 KiB
Go
package mount
|
|
|
|
import (
|
|
"context"
|
|
"math/rand"
|
|
"os"
|
|
"path"
|
|
"path/filepath"
|
|
"sync/atomic"
|
|
"time"
|
|
|
|
"github.com/hanwen/go-fuse/v2/fuse"
|
|
"google.golang.org/grpc"
|
|
|
|
"github.com/seaweedfs/seaweedfs/weed/filer"
|
|
"github.com/seaweedfs/seaweedfs/weed/mount/meta_cache"
|
|
"github.com/seaweedfs/seaweedfs/weed/pb"
|
|
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
|
|
"github.com/seaweedfs/seaweedfs/weed/pb/mount_pb"
|
|
"github.com/seaweedfs/seaweedfs/weed/storage/types"
|
|
"github.com/seaweedfs/seaweedfs/weed/util"
|
|
"github.com/seaweedfs/seaweedfs/weed/util/chunk_cache"
|
|
"github.com/seaweedfs/seaweedfs/weed/util/grace"
|
|
"github.com/seaweedfs/seaweedfs/weed/wdclient"
|
|
|
|
"github.com/hanwen/go-fuse/v2/fs"
|
|
)
|
|
|
|
type Option struct {
|
|
filerIndex int32 // align memory for atomic read/write
|
|
FilerAddresses []pb.ServerAddress
|
|
MountDirectory string
|
|
GrpcDialOption grpc.DialOption
|
|
FilerMountRootPath string
|
|
Collection string
|
|
Replication string
|
|
TtlSec int32
|
|
DiskType types.DiskType
|
|
ChunkSizeLimit int64
|
|
ConcurrentWriters int
|
|
CacheDirForRead string
|
|
CacheSizeMBForRead int64
|
|
CacheDirForWrite string
|
|
DataCenter string
|
|
Umask os.FileMode
|
|
Quota int64
|
|
DisableXAttr bool
|
|
|
|
MountUid uint32
|
|
MountGid uint32
|
|
MountMode os.FileMode
|
|
MountCtime time.Time
|
|
MountMtime time.Time
|
|
MountParentInode uint64
|
|
|
|
VolumeServerAccess string // how to access volume servers
|
|
Cipher bool // whether encrypt data on volume server
|
|
UidGidMapper *meta_cache.UidGidMapper
|
|
|
|
uniqueCacheDirForRead string
|
|
uniqueCacheDirForWrite string
|
|
}
|
|
|
|
type WFS struct {
|
|
// https://dl.acm.org/doi/fullHtml/10.1145/3310148
|
|
// follow https://github.com/hanwen/go-fuse/blob/master/fuse/api.go
|
|
fuse.RawFileSystem
|
|
mount_pb.UnimplementedSeaweedMountServer
|
|
fs.Inode
|
|
option *Option
|
|
metaCache *meta_cache.MetaCache
|
|
stats statsCache
|
|
chunkCache *chunk_cache.TieredChunkCache
|
|
signature int32
|
|
concurrentWriters *util.LimitedConcurrentExecutor
|
|
inodeToPath *InodeToPath
|
|
fhmap *FileHandleToInode
|
|
dhmap *DirectoryHandleToInode
|
|
fuseServer *fuse.Server
|
|
IsOverQuota bool
|
|
fhLockTable *util.LockTable[FileHandleId]
|
|
}
|
|
|
|
func NewSeaweedFileSystem(option *Option) *WFS {
|
|
wfs := &WFS{
|
|
RawFileSystem: fuse.NewDefaultRawFileSystem(),
|
|
option: option,
|
|
signature: util.RandomInt32(),
|
|
inodeToPath: NewInodeToPath(util.FullPath(option.FilerMountRootPath)),
|
|
fhmap: NewFileHandleToInode(),
|
|
dhmap: NewDirectoryHandleToInode(),
|
|
fhLockTable: util.NewLockTable[FileHandleId](),
|
|
}
|
|
|
|
wfs.option.filerIndex = int32(rand.Intn(len(option.FilerAddresses)))
|
|
wfs.option.setupUniqueCacheDirectory()
|
|
if option.CacheSizeMBForRead > 0 {
|
|
wfs.chunkCache = chunk_cache.NewTieredChunkCache(256, option.getUniqueCacheDirForRead(), option.CacheSizeMBForRead, 1024*1024)
|
|
}
|
|
|
|
wfs.metaCache = meta_cache.NewMetaCache(path.Join(option.getUniqueCacheDirForRead(), "meta"), option.UidGidMapper,
|
|
util.FullPath(option.FilerMountRootPath),
|
|
func(path util.FullPath) {
|
|
wfs.inodeToPath.MarkChildrenCached(path)
|
|
}, func(path util.FullPath) bool {
|
|
return wfs.inodeToPath.IsChildrenCached(path)
|
|
}, func(filePath util.FullPath, entry *filer_pb.Entry) {
|
|
// Find inode if it is not a deleted path
|
|
if inode, inode_found := wfs.inodeToPath.GetInode(filePath); inode_found {
|
|
// Find open file handle
|
|
if fh, fh_found := wfs.fhmap.FindFileHandle(inode); fh_found {
|
|
fhActiveLock := fh.wfs.fhLockTable.AcquireLock("invalidateFunc", fh.fh, util.ExclusiveLock)
|
|
defer fh.wfs.fhLockTable.ReleaseLock(fh.fh, fhActiveLock)
|
|
|
|
// Recreate dirty pages
|
|
fh.dirtyPages.Destroy()
|
|
fh.dirtyPages = newPageWriter(fh, wfs.option.ChunkSizeLimit)
|
|
|
|
// Update handle entry
|
|
newentry, status := wfs.maybeLoadEntry(filePath)
|
|
if status == fuse.OK {
|
|
if fh.GetEntry().GetEntry() != newentry {
|
|
fh.SetEntry(newentry)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
})
|
|
grace.OnInterrupt(func() {
|
|
wfs.metaCache.Shutdown()
|
|
os.RemoveAll(option.getUniqueCacheDirForWrite())
|
|
os.RemoveAll(option.getUniqueCacheDirForRead())
|
|
})
|
|
|
|
if wfs.option.ConcurrentWriters > 0 {
|
|
wfs.concurrentWriters = util.NewLimitedConcurrentExecutor(wfs.option.ConcurrentWriters)
|
|
}
|
|
return wfs
|
|
}
|
|
|
|
func (wfs *WFS) StartBackgroundTasks() {
|
|
startTime := time.Now()
|
|
go meta_cache.SubscribeMetaEvents(wfs.metaCache, wfs.signature, wfs, wfs.option.FilerMountRootPath, startTime.UnixNano())
|
|
go wfs.loopCheckQuota()
|
|
}
|
|
|
|
func (wfs *WFS) String() string {
|
|
return "seaweedfs"
|
|
}
|
|
|
|
func (wfs *WFS) Init(server *fuse.Server) {
|
|
wfs.fuseServer = server
|
|
}
|
|
|
|
func (wfs *WFS) maybeReadEntry(inode uint64) (path util.FullPath, fh *FileHandle, entry *filer_pb.Entry, status fuse.Status) {
|
|
path, status = wfs.inodeToPath.GetPath(inode)
|
|
if status != fuse.OK {
|
|
return
|
|
}
|
|
var found bool
|
|
if fh, found = wfs.fhmap.FindFileHandle(inode); found {
|
|
entry = fh.UpdateEntry(func(entry *filer_pb.Entry) {
|
|
if entry != nil && fh.entry.Attributes == nil {
|
|
entry.Attributes = &filer_pb.FuseAttributes{}
|
|
}
|
|
})
|
|
} else {
|
|
entry, status = wfs.maybeLoadEntry(path)
|
|
}
|
|
return
|
|
}
|
|
|
|
func (wfs *WFS) maybeLoadEntry(fullpath util.FullPath) (*filer_pb.Entry, fuse.Status) {
|
|
|
|
// glog.V(3).Infof("read entry cache miss %s", fullpath)
|
|
dir, name := fullpath.DirAndName()
|
|
|
|
// return a valid entry for the mount root
|
|
if string(fullpath) == wfs.option.FilerMountRootPath {
|
|
return &filer_pb.Entry{
|
|
Name: name,
|
|
IsDirectory: true,
|
|
Attributes: &filer_pb.FuseAttributes{
|
|
Mtime: wfs.option.MountMtime.Unix(),
|
|
FileMode: uint32(wfs.option.MountMode),
|
|
Uid: wfs.option.MountUid,
|
|
Gid: wfs.option.MountGid,
|
|
Crtime: wfs.option.MountCtime.Unix(),
|
|
},
|
|
}, fuse.OK
|
|
}
|
|
|
|
// read from async meta cache
|
|
meta_cache.EnsureVisited(wfs.metaCache, wfs, util.FullPath(dir))
|
|
cachedEntry, cacheErr := wfs.metaCache.FindEntry(context.Background(), fullpath)
|
|
if cacheErr == filer_pb.ErrNotFound {
|
|
return nil, fuse.ENOENT
|
|
}
|
|
return cachedEntry.ToProtoEntry(), fuse.OK
|
|
}
|
|
|
|
func (wfs *WFS) LookupFn() wdclient.LookupFileIdFunctionType {
|
|
if wfs.option.VolumeServerAccess == "filerProxy" {
|
|
return func(fileId string) (targetUrls []string, err error) {
|
|
return []string{"http://" + wfs.getCurrentFiler().ToHttpAddress() + "/?proxyChunkId=" + fileId}, nil
|
|
}
|
|
}
|
|
return filer.LookupFn(wfs)
|
|
}
|
|
|
|
func (wfs *WFS) getCurrentFiler() pb.ServerAddress {
|
|
i := atomic.LoadInt32(&wfs.option.filerIndex)
|
|
return wfs.option.FilerAddresses[i]
|
|
}
|
|
|
|
func (option *Option) setupUniqueCacheDirectory() {
|
|
cacheUniqueId := util.Md5String([]byte(option.MountDirectory + string(option.FilerAddresses[0]) + option.FilerMountRootPath + util.Version()))[0:8]
|
|
option.uniqueCacheDirForRead = path.Join(option.CacheDirForRead, cacheUniqueId)
|
|
os.MkdirAll(option.uniqueCacheDirForRead, os.FileMode(0777)&^option.Umask)
|
|
option.uniqueCacheDirForWrite = filepath.Join(path.Join(option.CacheDirForWrite, cacheUniqueId), "swap")
|
|
os.MkdirAll(option.uniqueCacheDirForWrite, os.FileMode(0777)&^option.Umask)
|
|
}
|
|
|
|
func (option *Option) getUniqueCacheDirForWrite() string {
|
|
return option.uniqueCacheDirForWrite
|
|
}
|
|
|
|
func (option *Option) getUniqueCacheDirForRead() string {
|
|
return option.uniqueCacheDirForRead
|
|
}
|