mirror of
https://github.com/seaweedfs/seaweedfs.git
synced 2024-11-25 11:39:12 +08:00
handle large file copy when write request is larger than buffer
This commit is contained in:
parent
44acf4b756
commit
430eb67489
@ -26,11 +26,39 @@ func newDirtyPages(file *File) *ContinuousDirtyPages {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (pages *ContinuousDirtyPages) AddPage(ctx context.Context, offset int64, data []byte) (chunk *filer_pb.FileChunk, err error) {
|
func (pages *ContinuousDirtyPages) AddPage(ctx context.Context, offset int64, data []byte) (chunks []*filer_pb.FileChunk, err error) {
|
||||||
|
|
||||||
|
var chunk *filer_pb.FileChunk
|
||||||
|
|
||||||
if len(data) > len(pages.Data) {
|
if len(data) > len(pages.Data) {
|
||||||
// this is more than what we can hold.
|
// this is more than what we can hold.
|
||||||
panic("not prepared if buffer is smaller than each system write!")
|
|
||||||
|
glog.V(0).Infof("not prepared if buffer is smaller than each system write! file %s [%d,%d)", pages.f.Name, offset, int64(len(data))+offset)
|
||||||
|
|
||||||
|
// flush existing
|
||||||
|
if chunk, err = pages.saveExistingPagesToStorage(ctx); err == nil {
|
||||||
|
if chunk != nil {
|
||||||
|
glog.V(4).Infof("%s/%s flush existing [%d,%d)", pages.f.dir.Path, pages.f.Name, chunk.Offset, chunk.Offset+int64(chunk.Size))
|
||||||
|
}
|
||||||
|
chunks = append(chunks, chunk)
|
||||||
|
} else {
|
||||||
|
glog.V(0).Infof("%s/%s failed to flush1 [%d,%d): %v", pages.f.dir.Path, pages.f.Name, chunk.Offset, chunk.Offset+int64(chunk.Size), err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
pages.Size = 0
|
||||||
|
|
||||||
|
// flush the big page
|
||||||
|
if chunk, err = pages.saveToStorage(ctx, data, offset); err == nil {
|
||||||
|
if chunk != nil {
|
||||||
|
glog.V(4).Infof("%s/%s flush big request [%d,%d)", pages.f.dir.Path, pages.f.Name, chunk.Offset, chunk.Offset+int64(chunk.Size))
|
||||||
|
chunks = append(chunks, chunk)
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
glog.V(0).Infof("%s/%s failed to flush2 [%d,%d): %v", pages.f.dir.Path, pages.f.Name, chunk.Offset, chunk.Offset+int64(chunk.Size), err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
if offset < pages.Offset || offset >= pages.Offset+int64(len(pages.Data)) ||
|
if offset < pages.Offset || offset >= pages.Offset+int64(len(pages.Data)) ||
|
||||||
@ -41,9 +69,10 @@ func (pages *ContinuousDirtyPages) AddPage(ctx context.Context, offset int64, da
|
|||||||
|
|
||||||
// println("offset", offset, "size", len(data), "existing offset", pages.Offset, "size", pages.Size)
|
// println("offset", offset, "size", len(data), "existing offset", pages.Offset, "size", pages.Size)
|
||||||
|
|
||||||
if chunk, err = pages.saveToStorage(ctx); err == nil {
|
if chunk, err = pages.saveExistingPagesToStorage(ctx); err == nil {
|
||||||
if chunk != nil {
|
if chunk != nil {
|
||||||
glog.V(4).Infof("%s/%s add save [%d,%d)", pages.f.dir.Path, pages.f.Name, chunk.Offset, chunk.Offset+int64(chunk.Size))
|
glog.V(4).Infof("%s/%s add save [%d,%d)", pages.f.dir.Path, pages.f.Name, chunk.Offset, chunk.Offset+int64(chunk.Size))
|
||||||
|
chunks = append(chunks, chunk)
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
glog.V(0).Infof("%s/%s add save [%d,%d): %v", pages.f.dir.Path, pages.f.Name, chunk.Offset, chunk.Offset+int64(chunk.Size), err)
|
glog.V(0).Infof("%s/%s add save [%d,%d): %v", pages.f.dir.Path, pages.f.Name, chunk.Offset, chunk.Offset+int64(chunk.Size), err)
|
||||||
@ -67,7 +96,7 @@ func (pages *ContinuousDirtyPages) FlushToStorage(ctx context.Context) (chunk *f
|
|||||||
return nil, nil
|
return nil, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
if chunk, err = pages.saveToStorage(ctx); err == nil {
|
if chunk, err = pages.saveExistingPagesToStorage(ctx); err == nil {
|
||||||
pages.Size = 0
|
pages.Size = 0
|
||||||
if chunk != nil {
|
if chunk != nil {
|
||||||
glog.V(4).Infof("%s/%s flush [%d,%d)", pages.f.dir.Path, pages.f.Name, chunk.Offset, chunk.Offset+int64(chunk.Size))
|
glog.V(4).Infof("%s/%s flush [%d,%d)", pages.f.dir.Path, pages.f.Name, chunk.Offset, chunk.Offset+int64(chunk.Size))
|
||||||
@ -76,7 +105,11 @@ func (pages *ContinuousDirtyPages) FlushToStorage(ctx context.Context) (chunk *f
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
func (pages *ContinuousDirtyPages) saveToStorage(ctx context.Context) (*filer_pb.FileChunk, error) {
|
func (pages *ContinuousDirtyPages) saveExistingPagesToStorage(ctx context.Context) (*filer_pb.FileChunk, error) {
|
||||||
|
return pages.saveToStorage(ctx, pages.Data[:pages.Size], pages.Offset)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (pages *ContinuousDirtyPages) saveToStorage(ctx context.Context, buf []byte, offset int64) (*filer_pb.FileChunk, error) {
|
||||||
|
|
||||||
if pages.Size == 0 {
|
if pages.Size == 0 {
|
||||||
return nil, nil
|
return nil, nil
|
||||||
@ -119,8 +152,8 @@ func (pages *ContinuousDirtyPages) saveToStorage(ctx context.Context) (*filer_pb
|
|||||||
|
|
||||||
return &filer_pb.FileChunk{
|
return &filer_pb.FileChunk{
|
||||||
FileId: fileId,
|
FileId: fileId,
|
||||||
Offset: pages.Offset,
|
Offset: offset,
|
||||||
Size: uint64(pages.Size),
|
Size: uint64(len(buf)),
|
||||||
Mtime: time.Now().UnixNano(),
|
Mtime: time.Now().UnixNano(),
|
||||||
}, nil
|
}, nil
|
||||||
|
|
||||||
|
@ -125,7 +125,7 @@ func (fh *FileHandle) Write(ctx context.Context, req *fuse.WriteRequest, resp *f
|
|||||||
|
|
||||||
glog.V(4).Infof("%+v/%v write fh: [%d,%d)", fh.f.dir.Path, fh.f.Name, req.Offset, req.Offset+int64(len(req.Data)))
|
glog.V(4).Infof("%+v/%v write fh: [%d,%d)", fh.f.dir.Path, fh.f.Name, req.Offset, req.Offset+int64(len(req.Data)))
|
||||||
|
|
||||||
chunk, err := fh.dirtyPages.AddPage(ctx, req.Offset, req.Data)
|
chunks, err := fh.dirtyPages.AddPage(ctx, req.Offset, req.Data)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("write %s/%s at [%d,%d): %v", fh.f.dir.Path, fh.f.Name, req.Offset, req.Offset+int64(len(req.Data)), err)
|
return fmt.Errorf("write %s/%s at [%d,%d): %v", fh.f.dir.Path, fh.f.Name, req.Offset, req.Offset+int64(len(req.Data)), err)
|
||||||
}
|
}
|
||||||
@ -137,7 +137,7 @@ func (fh *FileHandle) Write(ctx context.Context, req *fuse.WriteRequest, resp *f
|
|||||||
fh.dirtyMetadata = true
|
fh.dirtyMetadata = true
|
||||||
}
|
}
|
||||||
|
|
||||||
if chunk != nil {
|
for _, chunk := range chunks {
|
||||||
fh.f.Chunks = append(fh.f.Chunks, chunk)
|
fh.f.Chunks = append(fh.f.Chunks, chunk)
|
||||||
glog.V(1).Infof("uploaded %s/%s to %s [%d,%d)", fh.f.dir.Path, fh.f.Name, chunk.FileId, chunk.Offset, chunk.Offset+int64(chunk.Size))
|
glog.V(1).Infof("uploaded %s/%s to %s [%d,%d)", fh.f.dir.Path, fh.f.Name, chunk.FileId, chunk.Offset, chunk.Offset+int64(chunk.Size))
|
||||||
fh.dirtyMetadata = true
|
fh.dirtyMetadata = true
|
||||||
|
Loading…
Reference in New Issue
Block a user