mirror of
https://github.com/seaweedfs/seaweedfs.git
synced 2024-11-30 23:29:02 +08:00
0122e022ea
* fix:mount deadlock * feat: concurrent read * fix * Remove useless code * fix --------- Co-authored-by: zemul <zhouzemiao@ihuman.com>
151 lines
4.1 KiB
Go
151 lines
4.1 KiB
Go
package filer
|
|
|
|
import (
|
|
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
|
|
"sync"
|
|
)
|
|
|
|
const SectionSize = 2 * 1024 * 1024 * 32 // 64MiB
|
|
type SectionIndex int64
|
|
type FileChunkSection struct {
|
|
sectionIndex SectionIndex
|
|
chunks []*filer_pb.FileChunk
|
|
visibleIntervals *IntervalList[*VisibleInterval]
|
|
chunkViews *IntervalList[*ChunkView]
|
|
reader *ChunkReadAt
|
|
lock sync.RWMutex
|
|
isPrepared bool
|
|
}
|
|
|
|
func NewFileChunkSection(si SectionIndex) *FileChunkSection {
|
|
return &FileChunkSection{
|
|
sectionIndex: si,
|
|
}
|
|
}
|
|
|
|
func (section *FileChunkSection) addChunk(chunk *filer_pb.FileChunk) error {
|
|
section.lock.Lock()
|
|
defer section.lock.Unlock()
|
|
|
|
start, stop := max(int64(section.sectionIndex)*SectionSize, chunk.Offset), min(((int64(section.sectionIndex)+1)*SectionSize), chunk.Offset+int64(chunk.Size))
|
|
|
|
section.chunks = append(section.chunks, chunk)
|
|
|
|
if section.visibleIntervals == nil {
|
|
section.visibleIntervals = readResolvedChunks(section.chunks, int64(section.sectionIndex)*SectionSize, (int64(section.sectionIndex)+1)*SectionSize)
|
|
} else {
|
|
MergeIntoVisibles(section.visibleIntervals, start, stop, chunk)
|
|
garbageFileIds := FindGarbageChunks(section.visibleIntervals, start, stop)
|
|
removeGarbageChunks(section, garbageFileIds)
|
|
}
|
|
|
|
if section.chunkViews != nil {
|
|
MergeIntoChunkViews(section.chunkViews, start, stop, chunk)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func removeGarbageChunks(section *FileChunkSection, garbageFileIds map[string]struct{}) {
|
|
for i := 0; i < len(section.chunks); {
|
|
t := section.chunks[i]
|
|
length := len(section.chunks)
|
|
if _, found := garbageFileIds[t.FileId]; found {
|
|
if i < length-1 {
|
|
section.chunks[i] = section.chunks[length-1]
|
|
}
|
|
section.chunks = section.chunks[:length-1]
|
|
} else {
|
|
i++
|
|
}
|
|
}
|
|
}
|
|
|
|
func (section *FileChunkSection) setupForRead(group *ChunkGroup, fileSize int64) {
|
|
if section.isPrepared {
|
|
section.reader.fileSize = fileSize
|
|
return
|
|
}
|
|
|
|
section.lock.Lock()
|
|
defer section.lock.Unlock()
|
|
|
|
if section.isPrepared {
|
|
section.reader.fileSize = fileSize
|
|
return
|
|
}
|
|
|
|
if section.visibleIntervals == nil {
|
|
section.visibleIntervals = readResolvedChunks(section.chunks, int64(section.sectionIndex)*SectionSize, (int64(section.sectionIndex)+1)*SectionSize)
|
|
section.chunks, _ = SeparateGarbageChunks(section.visibleIntervals, section.chunks)
|
|
if section.reader != nil {
|
|
_ = section.reader.Close()
|
|
section.reader = nil
|
|
}
|
|
}
|
|
if section.chunkViews == nil {
|
|
section.chunkViews = ViewFromVisibleIntervals(section.visibleIntervals, int64(section.sectionIndex)*SectionSize, (int64(section.sectionIndex)+1)*SectionSize)
|
|
}
|
|
|
|
if section.reader == nil {
|
|
section.reader = NewChunkReaderAtFromClient(group.readerCache, section.chunkViews, min(int64(section.sectionIndex+1)*SectionSize, fileSize))
|
|
}
|
|
|
|
section.isPrepared = true
|
|
section.reader.fileSize = fileSize
|
|
}
|
|
|
|
func (section *FileChunkSection) readDataAt(group *ChunkGroup, fileSize int64, buff []byte, offset int64) (n int, tsNs int64, err error) {
|
|
|
|
section.setupForRead(group, fileSize)
|
|
section.lock.RLock()
|
|
defer section.lock.RUnlock()
|
|
|
|
return section.reader.ReadAtWithTime(buff, offset)
|
|
}
|
|
|
|
func (section *FileChunkSection) DataStartOffset(group *ChunkGroup, offset int64, fileSize int64) int64 {
|
|
|
|
section.setupForRead(group, fileSize)
|
|
section.lock.RLock()
|
|
defer section.lock.RUnlock()
|
|
|
|
for x := section.visibleIntervals.Front(); x != nil; x = x.Next {
|
|
visible := x.Value
|
|
if visible.stop <= offset {
|
|
continue
|
|
}
|
|
if offset < visible.start {
|
|
return offset
|
|
}
|
|
return offset
|
|
}
|
|
return -1
|
|
}
|
|
|
|
func (section *FileChunkSection) NextStopOffset(group *ChunkGroup, offset int64, fileSize int64) int64 {
|
|
|
|
section.setupForRead(group, fileSize)
|
|
section.lock.RLock()
|
|
defer section.lock.RUnlock()
|
|
|
|
isAfterOffset := false
|
|
for x := section.visibleIntervals.Front(); x != nil; x = x.Next {
|
|
visible := x.Value
|
|
if !isAfterOffset {
|
|
if visible.stop <= offset {
|
|
continue
|
|
}
|
|
isAfterOffset = true
|
|
}
|
|
if offset < visible.start {
|
|
return offset
|
|
}
|
|
// now visible.start <= offset
|
|
if offset < visible.stop {
|
|
offset = visible.stop
|
|
}
|
|
}
|
|
return offset
|
|
}
|