From 430eb67489372eea51c39cda1414411a9c96df8c Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Wed, 30 May 2018 22:02:21 -0700 Subject: [PATCH] handle large file copy when write request is larger than buffer --- weed/filesys/dirty_page.go | 47 ++++++++++++++++++++++++++++++++------ weed/filesys/filehandle.go | 4 ++-- 2 files changed, 42 insertions(+), 9 deletions(-) diff --git a/weed/filesys/dirty_page.go b/weed/filesys/dirty_page.go index e3ee945a1..609af4181 100644 --- a/weed/filesys/dirty_page.go +++ b/weed/filesys/dirty_page.go @@ -26,11 +26,39 @@ func newDirtyPages(file *File) *ContinuousDirtyPages { } } -func (pages *ContinuousDirtyPages) AddPage(ctx context.Context, offset int64, data []byte) (chunk *filer_pb.FileChunk, err error) { +func (pages *ContinuousDirtyPages) AddPage(ctx context.Context, offset int64, data []byte) (chunks []*filer_pb.FileChunk, err error) { + + var chunk *filer_pb.FileChunk if len(data) > len(pages.Data) { // this is more than what we can hold. - panic("not prepared if buffer is smaller than each system write!") + + glog.V(0).Infof("not prepared if buffer is smaller than each system write! file %s [%d,%d)", pages.f.Name, offset, int64(len(data))+offset) + + // flush existing + if chunk, err = pages.saveExistingPagesToStorage(ctx); err == nil { + if chunk != nil { + glog.V(4).Infof("%s/%s flush existing [%d,%d)", pages.f.dir.Path, pages.f.Name, chunk.Offset, chunk.Offset+int64(chunk.Size)) + } + chunks = append(chunks, chunk) + } else { + glog.V(0).Infof("%s/%s failed to flush1 [%d,%d): %v", pages.f.dir.Path, pages.f.Name, chunk.Offset, chunk.Offset+int64(chunk.Size), err) + return + } + pages.Size = 0 + + // flush the big page + if chunk, err = pages.saveToStorage(ctx, data, offset); err == nil { + if chunk != nil { + glog.V(4).Infof("%s/%s flush big request [%d,%d)", pages.f.dir.Path, pages.f.Name, chunk.Offset, chunk.Offset+int64(chunk.Size)) + chunks = append(chunks, chunk) + } + } else { + glog.V(0).Infof("%s/%s failed to flush2 [%d,%d): %v", pages.f.dir.Path, pages.f.Name, chunk.Offset, chunk.Offset+int64(chunk.Size), err) + return + } + + return } if offset < pages.Offset || offset >= pages.Offset+int64(len(pages.Data)) || @@ -41,9 +69,10 @@ func (pages *ContinuousDirtyPages) AddPage(ctx context.Context, offset int64, da // println("offset", offset, "size", len(data), "existing offset", pages.Offset, "size", pages.Size) - if chunk, err = pages.saveToStorage(ctx); err == nil { + if chunk, err = pages.saveExistingPagesToStorage(ctx); err == nil { if chunk != nil { glog.V(4).Infof("%s/%s add save [%d,%d)", pages.f.dir.Path, pages.f.Name, chunk.Offset, chunk.Offset+int64(chunk.Size)) + chunks = append(chunks, chunk) } } else { glog.V(0).Infof("%s/%s add save [%d,%d): %v", pages.f.dir.Path, pages.f.Name, chunk.Offset, chunk.Offset+int64(chunk.Size), err) @@ -67,7 +96,7 @@ func (pages *ContinuousDirtyPages) FlushToStorage(ctx context.Context) (chunk *f return nil, nil } - if chunk, err = pages.saveToStorage(ctx); err == nil { + if chunk, err = pages.saveExistingPagesToStorage(ctx); err == nil { pages.Size = 0 if chunk != nil { glog.V(4).Infof("%s/%s flush [%d,%d)", pages.f.dir.Path, pages.f.Name, chunk.Offset, chunk.Offset+int64(chunk.Size)) @@ -76,7 +105,11 @@ func (pages *ContinuousDirtyPages) FlushToStorage(ctx context.Context) (chunk *f return } -func (pages *ContinuousDirtyPages) saveToStorage(ctx context.Context) (*filer_pb.FileChunk, error) { +func (pages *ContinuousDirtyPages) saveExistingPagesToStorage(ctx context.Context) (*filer_pb.FileChunk, error) { + return pages.saveToStorage(ctx, pages.Data[:pages.Size], pages.Offset) +} + +func (pages *ContinuousDirtyPages) saveToStorage(ctx context.Context, buf []byte, offset int64) (*filer_pb.FileChunk, error) { if pages.Size == 0 { return nil, nil @@ -119,8 +152,8 @@ func (pages *ContinuousDirtyPages) saveToStorage(ctx context.Context) (*filer_pb return &filer_pb.FileChunk{ FileId: fileId, - Offset: pages.Offset, - Size: uint64(pages.Size), + Offset: offset, + Size: uint64(len(buf)), Mtime: time.Now().UnixNano(), }, nil diff --git a/weed/filesys/filehandle.go b/weed/filesys/filehandle.go index 32e4622d0..bec240de2 100644 --- a/weed/filesys/filehandle.go +++ b/weed/filesys/filehandle.go @@ -125,7 +125,7 @@ func (fh *FileHandle) Write(ctx context.Context, req *fuse.WriteRequest, resp *f glog.V(4).Infof("%+v/%v write fh: [%d,%d)", fh.f.dir.Path, fh.f.Name, req.Offset, req.Offset+int64(len(req.Data))) - chunk, err := fh.dirtyPages.AddPage(ctx, req.Offset, req.Data) + chunks, err := fh.dirtyPages.AddPage(ctx, req.Offset, req.Data) if err != nil { return fmt.Errorf("write %s/%s at [%d,%d): %v", fh.f.dir.Path, fh.f.Name, req.Offset, req.Offset+int64(len(req.Data)), err) } @@ -137,7 +137,7 @@ func (fh *FileHandle) Write(ctx context.Context, req *fuse.WriteRequest, resp *f fh.dirtyMetadata = true } - if chunk != nil { + for _, chunk := range chunks { fh.f.Chunks = append(fh.f.Chunks, chunk) glog.V(1).Infof("uploaded %s/%s to %s [%d,%d)", fh.f.dir.Path, fh.f.Name, chunk.FileId, chunk.Offset, chunk.Offset+int64(chunk.Size)) fh.dirtyMetadata = true