diff --git a/weed/server/filer_server_handlers_write_autochunk.go b/weed/server/filer_server_handlers_write_autochunk.go index cce64f74d..ba0260f04 100644 --- a/weed/server/filer_server_handlers_write_autochunk.go +++ b/weed/server/filer_server_handlers_write_autochunk.go @@ -99,7 +99,7 @@ func (fs *FilerServer) doPostAutoChunk(ctx context.Context, w http.ResponseWrite return } - fileChunks, md5Hash, chunkOffset, err, smallContent := fs.uploadReaderToChunks(w, r, part1, chunkSize, fileName, contentType, contentLength, so) + fileChunks, md5Hash, chunkOffset, err, smallContent := fs.uploadRequestToChunks(w, r, part1, chunkSize, fileName, contentType, contentLength, so) if err != nil { return nil, nil, err } @@ -130,7 +130,8 @@ func (fs *FilerServer) doPutAutoChunk(ctx context.Context, w http.ResponseWriter return nil, nil, err } - fileChunks, md5Hash, chunkOffset, err, smallContent := fs.uploadReaderToChunks(w, r, r.Body, chunkSize, fileName, contentType, contentLength, so) + fileChunks, md5Hash, chunkOffset, err, smallContent := fs.uploadRequestToChunks(w, r, r.Body, chunkSize, fileName, contentType, contentLength, so) + if err != nil { return nil, nil, err } diff --git a/weed/server/filer_server_handlers_write_merge.go b/weed/server/filer_server_handlers_write_merge.go index 672a82672..b3ccd4cc1 100644 --- a/weed/server/filer_server_handlers_write_merge.go +++ b/weed/server/filer_server_handlers_write_merge.go @@ -1,11 +1,65 @@ package weed_server import ( + "github.com/seaweedfs/seaweedfs/weed/filer" + "github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/operation" "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" + "github.com/seaweedfs/seaweedfs/weed/stats" + "io" + "math" ) +const MergeChunkMinCount int = 1000 + func (fs *FilerServer) maybeMergeChunks(so *operation.StorageOption, inputChunks []*filer_pb.FileChunk) (mergedChunks []*filer_pb.FileChunk, err error) { - //TODO merge consecutive smaller chunks into a large chunk to reduce number of chunks - return inputChunks, nil + // Only merge small chunks more than half of the file + var chunkSize = fs.option.MaxMB * 1024 * 1024 + var smallChunk, sumChunk int + var minOffset int64 = math.MaxInt64 + for _, chunk := range inputChunks { + if chunk.IsChunkManifest { + continue + } + if chunk.Size < uint64(chunkSize/2) { + smallChunk++ + if chunk.Offset < minOffset { + minOffset = chunk.Offset + } + } + sumChunk++ + } + if smallChunk < MergeChunkMinCount || smallChunk < sumChunk/2 { + return inputChunks, nil + } + + return fs.mergeChunks(so, inputChunks, minOffset) +} + +func (fs *FilerServer) mergeChunks(so *operation.StorageOption, inputChunks []*filer_pb.FileChunk, chunkOffset int64) (mergedChunks []*filer_pb.FileChunk, mergeErr error) { + chunkedFileReader := filer.NewChunkStreamReaderFromFiler(fs.filer.MasterClient, inputChunks) + _, mergeErr = chunkedFileReader.Seek(chunkOffset, io.SeekCurrent) + if mergeErr != nil { + return nil, mergeErr + } + mergedChunks, _, _, mergeErr, _ = fs.uploadReaderToChunks(chunkedFileReader, chunkOffset, int32(fs.option.MaxMB*1024*1024), "", "", true, so) + if mergeErr != nil { + return + } + + stats.FilerHandlerCounter.WithLabelValues(stats.ChunkMerge).Inc() + for _, chunk := range inputChunks { + if chunk.Offset < chunkOffset || chunk.IsChunkManifest { + mergedChunks = append(mergedChunks, chunk) + } + } + + garbage, err := filer.MinusChunks(fs.lookupFileId, inputChunks, mergedChunks) + if err != nil { + glog.Errorf("Failed to resolve old entry chunks when delete old entry chunks. new: %s, old: %s", + mergedChunks, inputChunks) + return + } + fs.filer.DeleteChunksNotRecursive(garbage) + return } diff --git a/weed/server/filer_server_handlers_write_upload.go b/weed/server/filer_server_handlers_write_upload.go index d0d1575cf..a08d5efe0 100644 --- a/weed/server/filer_server_handlers_write_upload.go +++ b/weed/server/filer_server_handlers_write_upload.go @@ -27,7 +27,7 @@ var bufPool = sync.Pool{ }, } -func (fs *FilerServer) uploadReaderToChunks(w http.ResponseWriter, r *http.Request, reader io.Reader, chunkSize int32, fileName, contentType string, contentLength int64, so *operation.StorageOption) (fileChunks []*filer_pb.FileChunk, md5Hash hash.Hash, chunkOffset int64, uploadErr error, smallContent []byte) { +func (fs *FilerServer) uploadRequestToChunks(w http.ResponseWriter, r *http.Request, reader io.Reader, chunkSize int32, fileName, contentType string, contentLength int64, so *operation.StorageOption) (fileChunks []*filer_pb.FileChunk, md5Hash hash.Hash, chunkOffset int64, uploadErr error, smallContent []byte) { query := r.URL.Query() isAppend := isAppend(r) @@ -45,7 +45,13 @@ func (fs *FilerServer) uploadReaderToChunks(w http.ResponseWriter, r *http.Reque chunkOffset = offsetInt } + return fs.uploadReaderToChunks(reader, chunkOffset, chunkSize, fileName, contentType, isAppend, so) +} + +func (fs *FilerServer) uploadReaderToChunks(reader io.Reader, startOffset int64, chunkSize int32, fileName, contentType string, isAppend bool, so *operation.StorageOption) (fileChunks []*filer_pb.FileChunk, md5Hash hash.Hash, chunkOffset int64, uploadErr error, smallContent []byte) { + md5Hash = md5.New() + chunkOffset = startOffset var partReader = io.NopCloser(io.TeeReader(reader, md5Hash)) var wg sync.WaitGroup diff --git a/weed/stats/metrics_names.go b/weed/stats/metrics_names.go index 15f0ad24f..13f491513 100644 --- a/weed/stats/metrics_names.go +++ b/weed/stats/metrics_names.go @@ -27,12 +27,14 @@ const ( Failed = "failed" // filer handler - DirList = "dirList" - ContentSaveToFiler = "contentSaveToFiler" - AutoChunk = "autoChunk" - ChunkProxy = "chunkProxy" - ChunkAssign = "chunkAssign" - ChunkUpload = "chunkUpload" + DirList = "dirList" + ContentSaveToFiler = "contentSaveToFiler" + AutoChunk = "autoChunk" + ChunkProxy = "chunkProxy" + ChunkAssign = "chunkAssign" + ChunkUpload = "chunkUpload" + ChunkMerge = "chunkMerge" + ChunkDoUploadRetry = "chunkDoUploadRetry" ChunkUploadRetry = "chunkUploadRetry" ChunkAssignRetry = "chunkAssignRetry"