diff --git a/weed/mount/dirty_pages_chunked.go b/weed/mount/dirty_pages_chunked.go index ca44df227..a76c6dd61 100644 --- a/weed/mount/dirty_pages_chunked.go +++ b/weed/mount/dirty_pages_chunked.go @@ -30,7 +30,10 @@ func newMemoryChunkPages(fh *FileHandle, chunkSize int64) *ChunkedDirtyPages { fh: fh, } - dirtyPages.uploadPipeline = page_writer.NewUploadPipeline(fh.wfs.concurrentWriters, chunkSize, dirtyPages.saveChunkedFileIntevalToStorage, fh.wfs.option.ConcurrentWriters) + swapFileDir := fh.wfs.option.getTempFilePageDir() + + dirtyPages.uploadPipeline = page_writer.NewUploadPipeline(fh.wfs.concurrentWriters, chunkSize, + dirtyPages.saveChunkedFileIntevalToStorage, fh.wfs.option.ConcurrentWriters, swapFileDir) return dirtyPages } diff --git a/weed/mount/page_writer.go b/weed/mount/page_writer.go index 9da892f00..ffcaa6398 100644 --- a/weed/mount/page_writer.go +++ b/weed/mount/page_writer.go @@ -23,7 +23,6 @@ func newPageWriter(fh *FileHandle, chunkSize int64) *PageWriter { fh: fh, chunkSize: chunkSize, randomWriter: newMemoryChunkPages(fh, chunkSize), - // randomWriter: newTempFileDirtyPages(fh.f, chunkSize), } return pw } diff --git a/weed/mount/page_writer/page_chunk_mem.go b/weed/mount/page_writer/page_chunk_mem.go index dfd54c19e..52db6d4f9 100644 --- a/weed/mount/page_writer/page_chunk_mem.go +++ b/weed/mount/page_writer/page_chunk_mem.go @@ -3,10 +3,13 @@ package page_writer import ( "github.com/chrislusf/seaweedfs/weed/util" "github.com/chrislusf/seaweedfs/weed/util/mem" + "sync/atomic" ) var ( _ = PageChunk(&MemChunk{}) + + memChunkCounter int64 ) type MemChunk struct { @@ -17,6 +20,7 @@ type MemChunk struct { } func NewMemChunk(logicChunkIndex LogicChunkIndex, chunkSize int64) *MemChunk { + atomic.AddInt64(&memChunkCounter, 1) return &MemChunk{ logicChunkIndex: logicChunkIndex, chunkSize: chunkSize, @@ -26,6 +30,7 @@ func NewMemChunk(logicChunkIndex LogicChunkIndex, chunkSize int64) *MemChunk { } func (mc *MemChunk) FreeResource() { + atomic.AddInt64(&memChunkCounter, -1) mem.Free(mc.buf) } diff --git a/weed/mount/page_writer/upload_pipeline.go b/weed/mount/page_writer/upload_pipeline.go index 3ed966de4..190076a2b 100644 --- a/weed/mount/page_writer/upload_pipeline.go +++ b/weed/mount/page_writer/upload_pipeline.go @@ -25,6 +25,7 @@ type UploadPipeline struct { activeReadChunks map[LogicChunkIndex]int activeReadChunksLock sync.Mutex bufferChunkLimit int + swapFile *SwapFile } type SealedChunk struct { @@ -40,7 +41,7 @@ func (sc *SealedChunk) FreeReference(messageOnFree string) { } } -func NewUploadPipeline(writers *util.LimitedConcurrentExecutor, chunkSize int64, saveToStorageFn SaveToStorageFunc, bufferChunkLimit int) *UploadPipeline { +func NewUploadPipeline(writers *util.LimitedConcurrentExecutor, chunkSize int64, saveToStorageFn SaveToStorageFunc, bufferChunkLimit int, swapFileDir string) *UploadPipeline { return &UploadPipeline{ ChunkSize: chunkSize, writableChunks: make(map[LogicChunkIndex]PageChunk), @@ -50,6 +51,7 @@ func NewUploadPipeline(writers *util.LimitedConcurrentExecutor, chunkSize int64, saveToStorageFn: saveToStorageFn, activeReadChunks: make(map[LogicChunkIndex]int), bufferChunkLimit: bufferChunkLimit, + swapFile: NewSwapFile(swapFileDir, chunkSize), } } @@ -59,10 +61,14 @@ func (up *UploadPipeline) SaveDataAt(p []byte, off int64) (n int) { logicChunkIndex := LogicChunkIndex(off / up.ChunkSize) - memChunk, found := up.writableChunks[logicChunkIndex] + pageChunk, found := up.writableChunks[logicChunkIndex] if !found { - if len(up.writableChunks) < up.bufferChunkLimit { - memChunk = NewMemChunk(logicChunkIndex, up.ChunkSize) + if atomic.LoadInt64(&memChunkCounter) > 4*int64(up.bufferChunkLimit) { + // if total number of chunks is over 4 times of per file buffer count limit + pageChunk = up.swapFile.NewTempFileChunk(logicChunkIndex) + } else if len(up.writableChunks) < up.bufferChunkLimit { + // if current file chunks is still under the per file buffer count limit + pageChunk = NewMemChunk(logicChunkIndex, up.ChunkSize) } else { fullestChunkIndex, fullness := LogicChunkIndex(-1), int64(0) for lci, mc := range up.writableChunks { @@ -75,12 +81,12 @@ func (up *UploadPipeline) SaveDataAt(p []byte, off int64) (n int) { up.moveToSealed(up.writableChunks[fullestChunkIndex], fullestChunkIndex) delete(up.writableChunks, fullestChunkIndex) // fmt.Printf("flush chunk %d with %d bytes written\n", logicChunkIndex, fullness) - memChunk = NewMemChunk(logicChunkIndex, up.ChunkSize) + pageChunk = NewMemChunk(logicChunkIndex, up.ChunkSize) } - up.writableChunks[logicChunkIndex] = memChunk + up.writableChunks[logicChunkIndex] = pageChunk } - n = memChunk.WriteDataAt(p, off) - up.maybeMoveToSealed(memChunk, logicChunkIndex) + n = pageChunk.WriteDataAt(p, off) + up.maybeMoveToSealed(pageChunk, logicChunkIndex) return } @@ -179,6 +185,7 @@ func (up *UploadPipeline) moveToSealed(memChunk PageChunk, logicChunkIndex Logic } func (up *UploadPipeline) Shutdown() { + up.swapFile.FreeResource() for logicChunkIndex, sealedChunk := range up.sealedChunks { sealedChunk.FreeReference(fmt.Sprintf("%s uploadpipeline shutdown chunk %d", up.filepath, logicChunkIndex)) } diff --git a/weed/mount/page_writer/upload_pipeline_test.go b/weed/mount/page_writer/upload_pipeline_test.go index 816fb228b..63b60faaf 100644 --- a/weed/mount/page_writer/upload_pipeline_test.go +++ b/weed/mount/page_writer/upload_pipeline_test.go @@ -7,7 +7,7 @@ import ( func TestUploadPipeline(t *testing.T) { - uploadPipeline := NewUploadPipeline(nil, 2*1024*1024, nil, 16) + uploadPipeline := NewUploadPipeline(nil, 2*1024*1024, nil, 16, "") writeRange(uploadPipeline, 0, 131072) writeRange(uploadPipeline, 131072, 262144) diff --git a/weed/mount/weedfs.go b/weed/mount/weedfs.go index 52eefab32..260a4e1da 100644 --- a/weed/mount/weedfs.go +++ b/weed/mount/weedfs.go @@ -16,6 +16,7 @@ import ( "math/rand" "os" "path" + "path/filepath" "time" "github.com/hanwen/go-fuse/v2/fs" @@ -50,7 +51,8 @@ type Option struct { Cipher bool // whether encrypt data on volume server UidGidMapper *meta_cache.UidGidMapper - uniqueCacheDir string + uniqueCacheDir string + uniqueCacheTempPageDir string } type WFS struct { @@ -177,7 +179,12 @@ func (wfs *WFS) getCurrentFiler() pb.ServerAddress { func (option *Option) setupUniqueCacheDirectory() { cacheUniqueId := util.Md5String([]byte(option.MountDirectory + string(option.FilerAddresses[0]) + option.FilerMountRootPath + util.Version()))[0:8] option.uniqueCacheDir = path.Join(option.CacheDir, cacheUniqueId) - os.MkdirAll(option.uniqueCacheDir, os.FileMode(0777)&^option.Umask) + option.uniqueCacheTempPageDir = filepath.Join(option.uniqueCacheDir, "swap") + os.MkdirAll(option.uniqueCacheTempPageDir, os.FileMode(0777)&^option.Umask) +} + +func (option *Option) getTempFilePageDir() string { + return option.uniqueCacheTempPageDir } func (option *Option) getUniqueCacheDir() string {