refactoring

This commit is contained in:
Chris Lu 2020-08-18 18:01:37 -07:00
parent c026eb0592
commit 6ccd7f0a4d
9 changed files with 18 additions and 15 deletions

View File

@ -130,7 +130,7 @@ func (cs *CompactSection) Delete(key NeedleId) Size {
cs.Lock() cs.Lock()
ret := Size(0) ret := Size(0)
if i := cs.binarySearchValues(skey); i >= 0 { if i := cs.binarySearchValues(skey); i >= 0 {
if cs.values[i].Size > 0 && cs.values[i].Size != TombstoneFileSize { if cs.values[i].Size > 0 && cs.values[i].Size.IsValid() {
ret = cs.values[i].Size ret = cs.values[i].Size
cs.values[i].Size = TombstoneFileSize cs.values[i].Size = TombstoneFileSize
} }

View File

@ -76,7 +76,7 @@ func TestCompactMap(t *testing.T) {
t.Fatal("key", i, "size", v.Size) t.Fatal("key", i, "size", v.Size)
} }
} else if i%37 == 0 { } else if i%37 == 0 {
if ok && v.Size != TombstoneFileSize { if ok && v.Size.IsValid() {
t.Fatal("key", i, "should have been deleted needle value", v) t.Fatal("key", i, "should have been deleted needle value", v)
} }
} else if i%2 == 0 { } else if i%2 == 0 {
@ -89,7 +89,7 @@ func TestCompactMap(t *testing.T) {
for i := uint32(10 * batch); i < 100*batch; i++ { for i := uint32(10 * batch); i < 100*batch; i++ {
v, ok := m.Get(NeedleId(i)) v, ok := m.Get(NeedleId(i))
if i%37 == 0 { if i%37 == 0 {
if ok && v.Size != TombstoneFileSize { if ok && v.Size.IsValid() {
t.Fatal("key", i, "should have been deleted needle value", v) t.Fatal("key", i, "should have been deleted needle value", v)
} }
} else if i%2 == 0 { } else if i%2 == 0 {

View File

@ -74,7 +74,7 @@ func generateLevelDbFile(dbFileName string, indexFile *os.File) error {
} }
defer db.Close() defer db.Close()
return idx.WalkIndexFile(indexFile, func(key NeedleId, offset Offset, size Size) error { return idx.WalkIndexFile(indexFile, func(key NeedleId, offset Offset, size Size) error {
if !offset.IsZero() && size != TombstoneFileSize { if !offset.IsZero() && size.IsValid() {
levelDbWrite(db, key, offset, size) levelDbWrite(db, key, offset, size)
} else { } else {
levelDbDelete(db, key) levelDbDelete(db, key)

View File

@ -30,11 +30,11 @@ func LoadCompactNeedleMap(file *os.File) (*NeedleMap, error) {
func doLoading(file *os.File, nm *NeedleMap) (*NeedleMap, error) { func doLoading(file *os.File, nm *NeedleMap) (*NeedleMap, error) {
e := idx.WalkIndexFile(file, func(key NeedleId, offset Offset, size Size) error { e := idx.WalkIndexFile(file, func(key NeedleId, offset Offset, size Size) error {
nm.MaybeSetMaxFileKey(key) nm.MaybeSetMaxFileKey(key)
if !offset.IsZero() && size != TombstoneFileSize { if !offset.IsZero() && size.IsValid() {
nm.FileCounter++ nm.FileCounter++
nm.FileByteCounter = nm.FileByteCounter + uint64(size) nm.FileByteCounter = nm.FileByteCounter + uint64(size)
oldOffset, oldSize := nm.m.Set(NeedleId(key), offset, size) oldOffset, oldSize := nm.m.Set(NeedleId(key), offset, size)
if !oldOffset.IsZero() && oldSize != TombstoneFileSize { if !oldOffset.IsZero() && oldSize.IsValid() {
nm.DeletionCounter++ nm.DeletionCounter++
nm.DeletionByteCounter = nm.DeletionByteCounter + uint64(oldSize) nm.DeletionByteCounter = nm.DeletionByteCounter + uint64(oldSize)
} }

View File

@ -31,7 +31,7 @@ func (mm *mapMetric) logPut(key NeedleId, oldSize Size, newSize Size) {
} }
mm.MaybeSetMaxFileKey(key) mm.MaybeSetMaxFileKey(key)
mm.LogFileCounter(newSize) mm.LogFileCounter(newSize)
if oldSize > 0 && oldSize != TombstoneFileSize { if oldSize > 0 && oldSize.IsValid() {
mm.LogDeletionCounter(oldSize) mm.LogDeletionCounter(oldSize)
} }
} }
@ -101,7 +101,7 @@ func newNeedleMapMetricFromIndexFile(r *os.File) (mm *mapMetric, err error) {
mm.MaybeSetMaxFileKey(key) mm.MaybeSetMaxFileKey(key)
NeedleIdToBytes(buf, key) NeedleIdToBytes(buf, key)
if size != TombstoneFileSize { if size.IsValid() {
mm.FileByteCounter += uint64(size) mm.FileByteCounter += uint64(size)
} }
@ -111,7 +111,7 @@ func newNeedleMapMetricFromIndexFile(r *os.File) (mm *mapMetric, err error) {
} else { } else {
// deleted file // deleted file
mm.DeletionCounter++ mm.DeletionCounter++
if size != TombstoneFileSize { if size.IsValid() {
// previously already deleted file // previously already deleted file
mm.DeletionByteCounter += uint64(size) mm.DeletionByteCounter += uint64(size)
} }

View File

@ -18,6 +18,9 @@ type Size uint32
func (s Size) IsDeleted() bool { func (s Size) IsDeleted() bool {
return s == TombstoneFileSize return s == TombstoneFileSize
} }
func (s Size) IsValid() bool {
return s != TombstoneFileSize
}
type OffsetLower struct { type OffsetLower struct {
b3 byte b3 byte

View File

@ -253,7 +253,7 @@ func (scanner *VolumeFileScanner4GenIdx) ReadNeedleBody() bool {
} }
func (scanner *VolumeFileScanner4GenIdx) VisitNeedle(n *needle.Needle, offset int64, needleHeader, needleBody []byte) error { func (scanner *VolumeFileScanner4GenIdx) VisitNeedle(n *needle.Needle, offset int64, needleHeader, needleBody []byte) error {
if n.Size > 0 && n.Size != TombstoneFileSize { if n.Size > 0 && n.Size.IsValid() {
return scanner.v.nm.Put(n.Id, ToOffset(offset), n.Size) return scanner.v.nm.Put(n.Id, ToOffset(offset), n.Size)
} }
return scanner.v.nm.Delete(n.Id, ToOffset(offset)) return scanner.v.nm.Delete(n.Id, ToOffset(offset))

View File

@ -25,7 +25,7 @@ func (v *Volume) isFileUnchanged(n *needle.Needle) bool {
} }
nv, ok := v.nm.Get(n.Id) nv, ok := v.nm.Get(n.Id)
if ok && !nv.Offset.IsZero() && nv.Size != TombstoneFileSize { if ok && !nv.Offset.IsZero() && nv.Size.IsValid() {
oldNeedle := new(needle.Needle) oldNeedle := new(needle.Needle)
err := oldNeedle.ReadData(v.DataBackend, nv.Offset.ToAcutalOffset(), nv.Size, v.Version()) err := oldNeedle.ReadData(v.DataBackend, nv.Offset.ToAcutalOffset(), nv.Size, v.Version())
if err != nil { if err != nil {
@ -196,7 +196,7 @@ func (v *Volume) syncDelete(n *needle.Needle) (Size, error) {
nv, ok := v.nm.Get(n.Id) nv, ok := v.nm.Get(n.Id)
//fmt.Println("key", n.Id, "volume offset", nv.Offset, "data_size", n.Size, "cached size", nv.Size) //fmt.Println("key", n.Id, "volume offset", nv.Offset, "data_size", n.Size, "cached size", nv.Size)
if ok && nv.Size != TombstoneFileSize { if ok && nv.Size.IsValid() {
size := nv.Size size := nv.Size
n.Data = nil n.Data = nil
n.AppendAtNs = uint64(time.Now().UnixNano()) n.AppendAtNs = uint64(time.Now().UnixNano())
@ -234,7 +234,7 @@ func (v *Volume) doDeleteRequest(n *needle.Needle) (Size, error) {
glog.V(4).Infof("delete needle %s", needle.NewFileIdFromNeedle(v.Id, n).String()) glog.V(4).Infof("delete needle %s", needle.NewFileIdFromNeedle(v.Id, n).String())
nv, ok := v.nm.Get(n.Id) nv, ok := v.nm.Get(n.Id)
//fmt.Println("key", n.Id, "volume offset", nv.Offset, "data_size", n.Size, "cached size", nv.Size) //fmt.Println("key", n.Id, "volume offset", nv.Offset, "data_size", n.Size, "cached size", nv.Size)
if ok && nv.Size != TombstoneFileSize { if ok && nv.Size.IsValid() {
size := nv.Size size := nv.Size
n.Data = nil n.Data = nil
n.AppendAtNs = uint64(time.Now().UnixNano()) n.AppendAtNs = uint64(time.Now().UnixNano())

View File

@ -274,7 +274,7 @@ func (v *Volume) makeupDiff(newDatFileName, newIdxFileName, oldDatFileName, oldI
} }
//updated needle //updated needle
if !increIdxEntry.offset.IsZero() && increIdxEntry.size != 0 && increIdxEntry.size != TombstoneFileSize { if !increIdxEntry.offset.IsZero() && increIdxEntry.size != 0 && increIdxEntry.size.IsValid() {
//even the needle cache in memory is hit, the need_bytes is correct //even the needle cache in memory is hit, the need_bytes is correct
glog.V(4).Infof("file %d offset %d size %d", key, increIdxEntry.offset.ToAcutalOffset(), increIdxEntry.size) glog.V(4).Infof("file %d offset %d size %d", key, increIdxEntry.offset.ToAcutalOffset(), increIdxEntry.size)
var needleBytes []byte var needleBytes []byte
@ -335,7 +335,7 @@ func (scanner *VolumeFileScanner4Vacuum) VisitNeedle(n *needle.Needle, offset in
} }
nv, ok := scanner.v.nm.Get(n.Id) nv, ok := scanner.v.nm.Get(n.Id)
glog.V(4).Infoln("needle expected offset ", offset, "ok", ok, "nv", nv) glog.V(4).Infoln("needle expected offset ", offset, "ok", ok, "nv", nv)
if ok && nv.Offset.ToAcutalOffset() == offset && nv.Size > 0 && nv.Size != TombstoneFileSize { if ok && nv.Offset.ToAcutalOffset() == offset && nv.Size > 0 && nv.Size.IsValid() {
if err := scanner.nm.Set(n.Id, ToOffset(scanner.newOffset), n.Size); err != nil { if err := scanner.nm.Set(n.Id, ToOffset(scanner.newOffset), n.Size); err != nil {
return fmt.Errorf("cannot put needle: %s", err) return fmt.Errorf("cannot put needle: %s", err)
} }