mirror of
https://github.com/seaweedfs/seaweedfs.git
synced 2024-11-24 02:59:13 +08:00
mount: file handle locks entry better
related to https://github.com/chrislusf/seaweedfs/issues/2952
This commit is contained in:
parent
746092a60b
commit
d65bb2c6df
@ -84,7 +84,7 @@ func (pages *ChunkedDirtyPages) saveChunkedFileIntevalToStorage(reader io.Reader
|
||||
}
|
||||
chunk.Mtime = mtime
|
||||
pages.collection, pages.replication = collection, replication
|
||||
pages.fh.addChunks([]*filer_pb.FileChunk{chunk})
|
||||
pages.fh.AddChunks([]*filer_pb.FileChunk{chunk})
|
||||
pages.fh.entryViewCache = nil
|
||||
glog.V(3).Infof("%v saveToStorage %s [%d,%d)", fileFullPath, chunk.FileId, offset, offset+size)
|
||||
|
||||
|
@ -13,12 +13,12 @@ import (
|
||||
type FileHandleId uint64
|
||||
|
||||
type FileHandle struct {
|
||||
fh FileHandleId
|
||||
counter int64
|
||||
entry *filer_pb.Entry
|
||||
chunkAddLock sync.Mutex
|
||||
inode uint64
|
||||
wfs *WFS
|
||||
fh FileHandleId
|
||||
counter int64
|
||||
entry *filer_pb.Entry
|
||||
entryLock sync.Mutex
|
||||
inode uint64
|
||||
wfs *WFS
|
||||
|
||||
// cache file has been written to
|
||||
dirtyMetadata bool
|
||||
@ -53,7 +53,20 @@ func (fh *FileHandle) FullPath() util.FullPath {
|
||||
return fp
|
||||
}
|
||||
|
||||
func (fh *FileHandle) addChunks(chunks []*filer_pb.FileChunk) {
|
||||
func (fh *FileHandle) GetEntry() *filer_pb.Entry {
|
||||
fh.entryLock.Lock()
|
||||
defer fh.entryLock.Unlock()
|
||||
return fh.entry
|
||||
}
|
||||
func (fh *FileHandle) SetEntry(entry *filer_pb.Entry) {
|
||||
fh.entryLock.Lock()
|
||||
defer fh.entryLock.Unlock()
|
||||
fh.entry = entry
|
||||
}
|
||||
|
||||
func (fh *FileHandle) AddChunks(chunks []*filer_pb.FileChunk) {
|
||||
fh.entryLock.Lock()
|
||||
defer fh.entryLock.Unlock()
|
||||
|
||||
// find the earliest incoming chunk
|
||||
newChunks := chunks
|
||||
@ -82,10 +95,8 @@ func (fh *FileHandle) addChunks(chunks []*filer_pb.FileChunk) {
|
||||
|
||||
glog.V(4).Infof("%s existing %d chunks adds %d more", fh.FullPath(), len(fh.entry.Chunks), len(chunks))
|
||||
|
||||
fh.chunkAddLock.Lock()
|
||||
fh.entry.Chunks = append(fh.entry.Chunks, newChunks...)
|
||||
fh.entryViewCache = nil
|
||||
fh.chunkAddLock.Unlock()
|
||||
}
|
||||
|
||||
func (fh *FileHandle) Release() {
|
||||
|
@ -25,6 +25,9 @@ func (fh *FileHandle) readFromChunks(buff []byte, offset int64) (int64, error) {
|
||||
|
||||
fileFullPath := fh.FullPath()
|
||||
|
||||
fh.entryLock.Lock()
|
||||
defer fh.entryLock.Unlock()
|
||||
|
||||
entry := fh.entry
|
||||
if entry == nil {
|
||||
return 0, io.EOF
|
||||
|
@ -131,10 +131,11 @@ func (wfs *WFS) maybeReadEntry(inode uint64) (path util.FullPath, fh *FileHandle
|
||||
}
|
||||
var found bool
|
||||
if fh, found = wfs.fhmap.FindFileHandle(inode); found {
|
||||
if fh.entry.Attributes == nil {
|
||||
fh.entry.Attributes = &filer_pb.FuseAttributes{}
|
||||
entry = fh.GetEntry()
|
||||
if entry != nil && fh.entry.Attributes == nil {
|
||||
entry.Attributes = &filer_pb.FuseAttributes{}
|
||||
}
|
||||
return path, fh, fh.entry, fuse.OK
|
||||
return path, fh, entry, fuse.OK
|
||||
}
|
||||
entry, status = wfs.maybeLoadEntry(path)
|
||||
return
|
||||
|
@ -43,6 +43,10 @@ func (wfs *WFS) SetAttr(cancel <-chan struct{}, input *fuse.SetAttrIn, out *fuse
|
||||
if status != fuse.OK {
|
||||
return status
|
||||
}
|
||||
if fh != nil {
|
||||
fh.entryLock.Lock()
|
||||
defer fh.entryLock.Unlock()
|
||||
}
|
||||
|
||||
if size, ok := input.GetSize(); ok {
|
||||
glog.V(4).Infof("%v setattr set size=%v chunks=%d", path, size, len(entry.Chunks))
|
||||
|
@ -118,6 +118,9 @@ func (wfs *WFS) doFlush(fh *FileHandle, uid, gid uint32) fuse.Status {
|
||||
|
||||
err := wfs.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
|
||||
|
||||
fh.entryLock.Lock()
|
||||
defer fh.entryLock.Unlock()
|
||||
|
||||
entry := fh.entry
|
||||
if entry == nil {
|
||||
return nil
|
||||
|
@ -94,10 +94,15 @@ func (wfs *WFS) SetXAttr(cancel <-chan struct{}, input *fuse.SetXAttrIn, attr st
|
||||
}
|
||||
}
|
||||
|
||||
path, _, entry, status := wfs.maybeReadEntry(input.NodeId)
|
||||
path, fh, entry, status := wfs.maybeReadEntry(input.NodeId)
|
||||
if status != fuse.OK {
|
||||
return status
|
||||
}
|
||||
if fh != nil {
|
||||
fh.entryLock.Lock()
|
||||
defer fh.entryLock.Unlock()
|
||||
}
|
||||
|
||||
if entry.Extended == nil {
|
||||
entry.Extended = make(map[string][]byte)
|
||||
}
|
||||
@ -154,10 +159,15 @@ func (wfs *WFS) RemoveXAttr(cancel <-chan struct{}, header *fuse.InHeader, attr
|
||||
if len(attr) == 0 {
|
||||
return fuse.EINVAL
|
||||
}
|
||||
path, _, entry, status := wfs.maybeReadEntry(header.NodeId)
|
||||
path, fh, entry, status := wfs.maybeReadEntry(header.NodeId)
|
||||
if status != fuse.OK {
|
||||
return status
|
||||
}
|
||||
if fh != nil {
|
||||
fh.entryLock.Lock()
|
||||
defer fh.entryLock.Unlock()
|
||||
}
|
||||
|
||||
if entry.Extended == nil {
|
||||
return fuse.ENOATTR
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user