seaweedfs/weed/server/filer_server_handlers_write_merge.go

66 lines
2.0 KiB
Go
Raw Normal View History

package weed_server
import (
"github.com/seaweedfs/seaweedfs/weed/filer"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/operation"
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
"github.com/seaweedfs/seaweedfs/weed/stats"
"io"
"math"
)
const MergeChunkMinCount int = 1000
func (fs *FilerServer) maybeMergeChunks(so *operation.StorageOption, inputChunks []*filer_pb.FileChunk) (mergedChunks []*filer_pb.FileChunk, err error) {
// Only merge small chunks more than half of the file
var chunkSize = fs.option.MaxMB * 1024 * 1024
var smallChunk, sumChunk int
var minOffset int64 = math.MaxInt64
for _, chunk := range inputChunks {
if chunk.IsChunkManifest {
continue
}
if chunk.Size < uint64(chunkSize/2) {
smallChunk++
if chunk.Offset < minOffset {
minOffset = chunk.Offset
}
}
sumChunk++
}
if smallChunk < MergeChunkMinCount || smallChunk < sumChunk/2 {
return inputChunks, nil
}
return fs.mergeChunks(so, inputChunks, minOffset)
}
func (fs *FilerServer) mergeChunks(so *operation.StorageOption, inputChunks []*filer_pb.FileChunk, chunkOffset int64) (mergedChunks []*filer_pb.FileChunk, mergeErr error) {
chunkedFileReader := filer.NewChunkStreamReaderFromFiler(fs.filer.MasterClient, inputChunks)
_, mergeErr = chunkedFileReader.Seek(chunkOffset, io.SeekCurrent)
if mergeErr != nil {
return nil, mergeErr
}
mergedChunks, _, _, mergeErr, _ = fs.uploadReaderToChunks(chunkedFileReader, chunkOffset, int32(fs.option.MaxMB*1024*1024), "", "", true, so)
if mergeErr != nil {
return
}
stats.FilerHandlerCounter.WithLabelValues(stats.ChunkMerge).Inc()
for _, chunk := range inputChunks {
if chunk.Offset < chunkOffset || chunk.IsChunkManifest {
mergedChunks = append(mergedChunks, chunk)
}
}
garbage, err := filer.MinusChunks(fs.lookupFileId, inputChunks, mergedChunks)
if err != nil {
glog.Errorf("Failed to resolve old entry chunks when delete old entry chunks. new: %s, old: %s",
mergedChunks, inputChunks)
return
}
fs.filer.DeleteChunksNotRecursive(garbage)
return
}