refactor: extract out the write throttler

This commit is contained in:
Chris Lu 2019-05-06 13:56:08 -07:00
parent 4e42e7b5e7
commit cf58fc0e63
2 changed files with 47 additions and 33 deletions

View File

@ -236,15 +236,13 @@ func (v *Volume) makeupDiff(newDatFileName, newIdxFileName, oldDatFileName, oldI
} }
type VolumeFileScanner4Vacuum struct { type VolumeFileScanner4Vacuum struct {
version needle.Version version needle.Version
v *Volume v *Volume
dst *os.File dst *os.File
nm *NeedleMap nm *NeedleMap
newOffset int64 newOffset int64
now uint64 now uint64
compactionBytePerSecond int64 writeThrottler *util.WriteThrottler
lastSizeCounter int64
lastSizeCheckTime time.Time
} }
func (scanner *VolumeFileScanner4Vacuum) VisitSuperBlock(superBlock SuperBlock) error { func (scanner *VolumeFileScanner4Vacuum) VisitSuperBlock(superBlock SuperBlock) error {
@ -274,28 +272,11 @@ func (scanner *VolumeFileScanner4Vacuum) VisitNeedle(n *needle.Needle, offset in
} }
delta := n.DiskSize(scanner.version) delta := n.DiskSize(scanner.version)
scanner.newOffset += delta scanner.newOffset += delta
scanner.maybeSlowdown(delta) scanner.writeThrottler.MaybeSlowdown(delta)
glog.V(4).Infoln("saving key", n.Id, "volume offset", offset, "=>", scanner.newOffset, "data_size", n.Size) glog.V(4).Infoln("saving key", n.Id, "volume offset", offset, "=>", scanner.newOffset, "data_size", n.Size)
} }
return nil return nil
} }
func (scanner *VolumeFileScanner4Vacuum) maybeSlowdown(delta int64) {
if scanner.compactionBytePerSecond > 0 {
scanner.lastSizeCounter += delta
now := time.Now()
elapsedDuration := now.Sub(scanner.lastSizeCheckTime)
if elapsedDuration > 100*time.Millisecond {
overLimitBytes := scanner.lastSizeCounter - scanner.compactionBytePerSecond/10
if overLimitBytes > 0 {
overRatio := float64(overLimitBytes) / float64(scanner.compactionBytePerSecond)
sleepTime := time.Duration(overRatio*1000) * time.Millisecond
// glog.V(0).Infof("currently %d bytes, limit to %d bytes, over by %d bytes, sleeping %v over %.4f", scanner.lastSizeCounter, scanner.compactionBytePerSecond/10, overLimitBytes, sleepTime, overRatio)
time.Sleep(sleepTime)
}
scanner.lastSizeCounter, scanner.lastSizeCheckTime = 0, time.Now()
}
}
}
func (v *Volume) copyDataAndGenerateIndexFile(dstName, idxName string, preallocate int64, compactionBytePerSecond int64) (err error) { func (v *Volume) copyDataAndGenerateIndexFile(dstName, idxName string, preallocate int64, compactionBytePerSecond int64) (err error) {
var ( var (
@ -312,12 +293,11 @@ func (v *Volume) copyDataAndGenerateIndexFile(dstName, idxName string, prealloca
defer idx.Close() defer idx.Close()
scanner := &VolumeFileScanner4Vacuum{ scanner := &VolumeFileScanner4Vacuum{
v: v, v: v,
now: uint64(time.Now().Unix()), now: uint64(time.Now().Unix()),
nm: NewBtreeNeedleMap(idx), nm: NewBtreeNeedleMap(idx),
dst: dst, dst: dst,
compactionBytePerSecond: compactionBytePerSecond, writeThrottler: util.NewWriteThrottler(compactionBytePerSecond),
lastSizeCheckTime: time.Now(),
} }
err = ScanVolumeFile(v.dir, v.Collection, v.Id, v.needleMapKind, scanner) err = ScanVolumeFile(v.dir, v.Collection, v.Id, v.needleMapKind, scanner)
return return

34
weed/util/throttler.go Normal file
View File

@ -0,0 +1,34 @@
package util
import "time"
type WriteThrottler struct {
compactionBytePerSecond int64
lastSizeCounter int64
lastSizeCheckTime time.Time
}
func NewWriteThrottler(bytesPerSecond int64) *WriteThrottler {
return &WriteThrottler{
compactionBytePerSecond: bytesPerSecond,
lastSizeCheckTime: time.Now(),
}
}
func (wt *WriteThrottler) MaybeSlowdown(delta int64) {
if wt.compactionBytePerSecond > 0 {
wt.lastSizeCounter += delta
now := time.Now()
elapsedDuration := now.Sub(wt.lastSizeCheckTime)
if elapsedDuration > 100*time.Millisecond {
overLimitBytes := wt.lastSizeCounter - wt.compactionBytePerSecond/10
if overLimitBytes > 0 {
overRatio := float64(overLimitBytes) / float64(wt.compactionBytePerSecond)
sleepTime := time.Duration(overRatio*1000) * time.Millisecond
// glog.V(0).Infof("currently %d bytes, limit to %d bytes, over by %d bytes, sleeping %v over %.4f", wt.lastSizeCounter, wt.compactionBytePerSecond/10, overLimitBytes, sleepTime, overRatio)
time.Sleep(sleepTime)
}
wt.lastSizeCounter, wt.lastSizeCheckTime = 0, time.Now()
}
}
}