mirror of
https://github.com/seaweedfs/seaweedfs.git
synced 2024-11-28 13:31:27 +08:00
volume: use sorted index map for readonly volumes
This commit is contained in:
parent
2f21beaccd
commit
58f88e530c
@ -47,8 +47,8 @@ func (vs *VolumeServer) VolumeEcShardsGenerate(ctx context.Context, req *volume_
|
|||||||
}
|
}
|
||||||
|
|
||||||
// write .ecx file
|
// write .ecx file
|
||||||
if err := erasure_coding.WriteSortedEcxFile(baseFileName); err != nil {
|
if err := erasure_coding.WriteSortedFileFromIdx(baseFileName, ".ecx"); err != nil {
|
||||||
return nil, fmt.Errorf("WriteSortedEcxFile %s: %v", baseFileName, err)
|
return nil, fmt.Errorf("WriteSortedFileFromIdx %s: %v", baseFileName, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// write .ec01 ~ .ec14 files
|
// write .ec01 ~ .ec14 files
|
||||||
|
@ -21,16 +21,16 @@ const (
|
|||||||
ErasureCodingSmallBlockSize = 1024 * 1024 // 1MB
|
ErasureCodingSmallBlockSize = 1024 * 1024 // 1MB
|
||||||
)
|
)
|
||||||
|
|
||||||
// WriteSortedEcxFile generates .ecx file from existing .idx file
|
// WriteSortedFileFromIdx generates .ecx file from existing .idx file
|
||||||
// all keys are sorted in ascending order
|
// all keys are sorted in ascending order
|
||||||
func WriteSortedEcxFile(baseFileName string) (e error) {
|
func WriteSortedFileFromIdx(baseFileName string, ext string) (e error) {
|
||||||
|
|
||||||
cm, err := readCompactMap(baseFileName)
|
cm, err := readCompactMap(baseFileName)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("readCompactMap: %v", err)
|
return fmt.Errorf("readCompactMap: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
ecxFile, err := os.OpenFile(baseFileName+".ecx", os.O_TRUNC|os.O_CREATE|os.O_WRONLY, 0644)
|
ecxFile, err := os.OpenFile(baseFileName+ext, os.O_TRUNC|os.O_CREATE|os.O_WRONLY, 0644)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("failed to open ecx file: %v", err)
|
return fmt.Errorf("failed to open ecx file: %v", err)
|
||||||
}
|
}
|
||||||
@ -43,7 +43,7 @@ func WriteSortedEcxFile(baseFileName string) (e error) {
|
|||||||
})
|
})
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("failed to visit ecx file: %v", err)
|
return fmt.Errorf("failed to visit idx file: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
|
@ -26,14 +26,14 @@ func TestEncodingDecoding(t *testing.T) {
|
|||||||
t.Logf("generateEcFiles: %v", err)
|
t.Logf("generateEcFiles: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
err = WriteSortedEcxFile(baseFileName)
|
err = WriteSortedFileFromIdx(baseFileName, ".ecx")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Logf("WriteSortedEcxFile: %v", err)
|
t.Logf("WriteSortedFileFromIdx: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
err = validateFiles(baseFileName)
|
err = validateFiles(baseFileName)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Logf("WriteSortedEcxFile: %v", err)
|
t.Logf("WriteSortedFileFromIdx: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
removeGeneratedFiles(baseFileName)
|
removeGeneratedFiles(baseFileName)
|
||||||
|
@ -186,10 +186,10 @@ func (ev *EcVolume) LocateEcShardNeedle(needleId types.NeedleId, version needle.
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (ev *EcVolume) FindNeedleFromEcx(needleId types.NeedleId) (offset types.Offset, size uint32, err error) {
|
func (ev *EcVolume) FindNeedleFromEcx(needleId types.NeedleId) (offset types.Offset, size uint32, err error) {
|
||||||
return searchNeedleFromEcx(ev.ecxFile, ev.ecxFileSize, needleId, nil)
|
return SearchNeedleFromSortedIndex(ev.ecxFile, ev.ecxFileSize, needleId, nil)
|
||||||
}
|
}
|
||||||
|
|
||||||
func searchNeedleFromEcx(ecxFile *os.File, ecxFileSize int64, needleId types.NeedleId, processNeedleFn func(file *os.File, offset int64) error) (offset types.Offset, size uint32, err error) {
|
func SearchNeedleFromSortedIndex(ecxFile *os.File, ecxFileSize int64, needleId types.NeedleId, processNeedleFn func(file *os.File, offset int64) error) (offset types.Offset, size uint32, err error) {
|
||||||
var key types.NeedleId
|
var key types.NeedleId
|
||||||
buf := make([]byte, types.NeedleMapEntrySize)
|
buf := make([]byte, types.NeedleMapEntrySize)
|
||||||
l, h := int64(0), ecxFileSize/types.NeedleMapEntrySize
|
l, h := int64(0), ecxFileSize/types.NeedleMapEntrySize
|
||||||
|
@ -10,15 +10,15 @@ import (
|
|||||||
)
|
)
|
||||||
|
|
||||||
var (
|
var (
|
||||||
markNeedleDeleted = func(file *os.File, offset int64) error {
|
MarkNeedleDeleted = func(file *os.File, offset int64) error {
|
||||||
b := make([]byte, types.SizeSize)
|
b := make([]byte, types.SizeSize)
|
||||||
util.Uint32toBytes(b, types.TombstoneFileSize)
|
util.Uint32toBytes(b, types.TombstoneFileSize)
|
||||||
n, err := file.WriteAt(b, offset+types.NeedleIdSize+types.OffsetSize)
|
n, err := file.WriteAt(b, offset+types.NeedleIdSize+types.OffsetSize)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("ecx write error: %v", err)
|
return fmt.Errorf("sorted needle write error: %v", err)
|
||||||
}
|
}
|
||||||
if n != types.SizeSize {
|
if n != types.SizeSize {
|
||||||
return fmt.Errorf("ecx written %d bytes, expecting %d", n, types.SizeSize)
|
return fmt.Errorf("sorted needle written %d bytes, expecting %d", n, types.SizeSize)
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
@ -26,7 +26,7 @@ var (
|
|||||||
|
|
||||||
func (ev *EcVolume) DeleteNeedleFromEcx(needleId types.NeedleId) (err error) {
|
func (ev *EcVolume) DeleteNeedleFromEcx(needleId types.NeedleId) (err error) {
|
||||||
|
|
||||||
_, _, err = searchNeedleFromEcx(ev.ecxFile, ev.ecxFileSize, needleId, markNeedleDeleted)
|
_, _, err = SearchNeedleFromSortedIndex(ev.ecxFile, ev.ecxFileSize, needleId, MarkNeedleDeleted)
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if err == NotFoundError {
|
if err == NotFoundError {
|
||||||
@ -81,7 +81,7 @@ func RebuildEcxFile(baseFileName string) error {
|
|||||||
|
|
||||||
needleId := types.BytesToNeedleId(buf)
|
needleId := types.BytesToNeedleId(buf)
|
||||||
|
|
||||||
_, _, err = searchNeedleFromEcx(ecxFile, ecxFileSize, needleId, markNeedleDeleted)
|
_, _, err = SearchNeedleFromSortedIndex(ecxFile, ecxFileSize, needleId, MarkNeedleDeleted)
|
||||||
|
|
||||||
if err != nil && err != NotFoundError {
|
if err != nil && err != NotFoundError {
|
||||||
ecxFile.Close()
|
ecxFile.Close()
|
||||||
|
105
weed/storage/needle_map_sorted_file.go
Normal file
105
weed/storage/needle_map_sorted_file.go
Normal file
@ -0,0 +1,105 @@
|
|||||||
|
package storage
|
||||||
|
|
||||||
|
import (
|
||||||
|
"os"
|
||||||
|
|
||||||
|
"github.com/chrislusf/seaweedfs/weed/glog"
|
||||||
|
"github.com/chrislusf/seaweedfs/weed/storage/erasure_coding"
|
||||||
|
"github.com/chrislusf/seaweedfs/weed/storage/needle_map"
|
||||||
|
. "github.com/chrislusf/seaweedfs/weed/storage/types"
|
||||||
|
)
|
||||||
|
|
||||||
|
type SortedFileNeedleMap struct {
|
||||||
|
baseNeedleMapper
|
||||||
|
baseFileName string
|
||||||
|
dbFile *os.File
|
||||||
|
dbFileSize int64
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewSortedFileNeedleMap(baseFileName string, indexFile *os.File) (m *SortedFileNeedleMap, err error) {
|
||||||
|
m = &SortedFileNeedleMap{baseFileName: baseFileName}
|
||||||
|
m.indexFile = indexFile
|
||||||
|
fileName := baseFileName+".sdb"
|
||||||
|
if !isSortedFileFresh(fileName, indexFile) {
|
||||||
|
glog.V(0).Infof("Start to Generate %s from %s", fileName, indexFile.Name())
|
||||||
|
erasure_coding.WriteSortedFileFromIdx(baseFileName, ".sdb")
|
||||||
|
glog.V(0).Infof("Finished Generating %s from %s", fileName, indexFile.Name())
|
||||||
|
}
|
||||||
|
glog.V(1).Infof("Opening %s...", fileName)
|
||||||
|
|
||||||
|
if m.dbFile, err = os.Open(baseFileName + ".sdb"); err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
dbStat, _ := m.dbFile.Stat()
|
||||||
|
m.dbFileSize = dbStat.Size()
|
||||||
|
glog.V(1).Infof("Loading %s...", indexFile.Name())
|
||||||
|
mm, indexLoadError := newNeedleMapMetricFromIndexFile(indexFile)
|
||||||
|
if indexLoadError != nil {
|
||||||
|
return nil, indexLoadError
|
||||||
|
}
|
||||||
|
m.mapMetric = *mm
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
func isSortedFileFresh(dbFileName string, indexFile *os.File) bool {
|
||||||
|
// normally we always write to index file first
|
||||||
|
dbFile, err := os.Open(dbFileName)
|
||||||
|
if err != nil {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
defer dbFile.Close()
|
||||||
|
dbStat, dbStatErr := dbFile.Stat()
|
||||||
|
indexStat, indexStatErr := indexFile.Stat()
|
||||||
|
if dbStatErr != nil || indexStatErr != nil {
|
||||||
|
glog.V(0).Infof("Can not stat file: %v and %v", dbStatErr, indexStatErr)
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
return dbStat.ModTime().After(indexStat.ModTime())
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *SortedFileNeedleMap) Get(key NeedleId) (element *needle_map.NeedleValue, ok bool) {
|
||||||
|
offset, size, err := erasure_coding.SearchNeedleFromSortedIndex(m.dbFile, m.dbFileSize, key, nil)
|
||||||
|
ok = err == nil
|
||||||
|
return &needle_map.NeedleValue{Key: key, Offset: offset, Size: size}, ok
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *SortedFileNeedleMap) Put(key NeedleId, offset Offset, size uint32) error {
|
||||||
|
return os.ErrInvalid
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *SortedFileNeedleMap) Delete(key NeedleId, offset Offset) error {
|
||||||
|
|
||||||
|
_, size, err := erasure_coding.SearchNeedleFromSortedIndex(m.dbFile, m.dbFileSize, key, nil)
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
if err == erasure_coding.NotFoundError {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
if size == TombstoneFileSize {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// write to index file first
|
||||||
|
if err := m.appendToIndexFile(key, offset, TombstoneFileSize); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
_, _, err = erasure_coding.SearchNeedleFromSortedIndex(m.dbFile, m.dbFileSize, key, erasure_coding.MarkNeedleDeleted)
|
||||||
|
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *SortedFileNeedleMap) Close() {
|
||||||
|
m.indexFile.Close()
|
||||||
|
m.dbFile.Close()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *SortedFileNeedleMap) Destroy() error {
|
||||||
|
m.Close()
|
||||||
|
os.Remove(m.indexFile.Name())
|
||||||
|
return os.Remove(m.baseFileName + ".sdb")
|
||||||
|
}
|
@ -5,12 +5,12 @@ import (
|
|||||||
"os"
|
"os"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/chrislusf/seaweedfs/weed/stats"
|
|
||||||
"github.com/chrislusf/seaweedfs/weed/storage/backend"
|
|
||||||
"github.com/chrislusf/seaweedfs/weed/storage/needle"
|
|
||||||
"github.com/syndtr/goleveldb/leveldb/opt"
|
"github.com/syndtr/goleveldb/leveldb/opt"
|
||||||
|
|
||||||
"github.com/chrislusf/seaweedfs/weed/glog"
|
"github.com/chrislusf/seaweedfs/weed/glog"
|
||||||
|
"github.com/chrislusf/seaweedfs/weed/stats"
|
||||||
|
"github.com/chrislusf/seaweedfs/weed/storage/backend"
|
||||||
|
"github.com/chrislusf/seaweedfs/weed/storage/needle"
|
||||||
)
|
)
|
||||||
|
|
||||||
func loadVolumeWithoutIndex(dirname string, collection string, id needle.VolumeId, needleMapKind NeedleMapType) (v *Volume, e error) {
|
func loadVolumeWithoutIndex(dirname string, collection string, id needle.VolumeId, needleMapKind NeedleMapType) (v *Volume, e error) {
|
||||||
@ -88,41 +88,48 @@ func (v *Volume) load(alsoLoadIndex bool, createDatIfMissing bool, needleMapKind
|
|||||||
v.readOnly = true
|
v.readOnly = true
|
||||||
glog.V(0).Infof("volumeDataIntegrityChecking failed %v", e)
|
glog.V(0).Infof("volumeDataIntegrityChecking failed %v", e)
|
||||||
}
|
}
|
||||||
switch needleMapKind {
|
|
||||||
case NeedleMapInMemory:
|
if v.readOnly {
|
||||||
glog.V(0).Infoln("loading index", fileName+".idx", "to memory readonly", v.readOnly)
|
if v.nm, e = NewSortedFileNeedleMap(fileName, indexFile); e != nil {
|
||||||
if v.nm, e = LoadCompactNeedleMap(indexFile); e != nil {
|
glog.V(0).Infof("loading sorted db %s error: %v", fileName+".sdb", e)
|
||||||
glog.V(0).Infof("loading index %s to memory error: %v", fileName+".idx", e)
|
|
||||||
}
|
}
|
||||||
case NeedleMapLevelDb:
|
} else {
|
||||||
glog.V(0).Infoln("loading leveldb", fileName+".ldb")
|
switch needleMapKind {
|
||||||
opts := &opt.Options{
|
case NeedleMapInMemory:
|
||||||
BlockCacheCapacity: 2 * 1024 * 1024, // default value is 8MiB
|
glog.V(0).Infoln("loading index", fileName+".idx", "to memory")
|
||||||
WriteBuffer: 1 * 1024 * 1024, // default value is 4MiB
|
if v.nm, e = LoadCompactNeedleMap(indexFile); e != nil {
|
||||||
CompactionTableSizeMultiplier: 10, // default value is 1
|
glog.V(0).Infof("loading index %s to memory error: %v", fileName+".idx", e)
|
||||||
}
|
}
|
||||||
if v.nm, e = NewLevelDbNeedleMap(fileName+".ldb", indexFile, opts); e != nil {
|
case NeedleMapLevelDb:
|
||||||
glog.V(0).Infof("loading leveldb %s error: %v", fileName+".ldb", e)
|
glog.V(0).Infoln("loading leveldb", fileName+".ldb")
|
||||||
}
|
opts := &opt.Options{
|
||||||
case NeedleMapLevelDbMedium:
|
BlockCacheCapacity: 2 * 1024 * 1024, // default value is 8MiB
|
||||||
glog.V(0).Infoln("loading leveldb medium", fileName+".ldb")
|
WriteBuffer: 1 * 1024 * 1024, // default value is 4MiB
|
||||||
opts := &opt.Options{
|
CompactionTableSizeMultiplier: 10, // default value is 1
|
||||||
BlockCacheCapacity: 4 * 1024 * 1024, // default value is 8MiB
|
}
|
||||||
WriteBuffer: 2 * 1024 * 1024, // default value is 4MiB
|
if v.nm, e = NewLevelDbNeedleMap(fileName+".ldb", indexFile, opts); e != nil {
|
||||||
CompactionTableSizeMultiplier: 10, // default value is 1
|
glog.V(0).Infof("loading leveldb %s error: %v", fileName+".ldb", e)
|
||||||
}
|
}
|
||||||
if v.nm, e = NewLevelDbNeedleMap(fileName+".ldb", indexFile, opts); e != nil {
|
case NeedleMapLevelDbMedium:
|
||||||
glog.V(0).Infof("loading leveldb %s error: %v", fileName+".ldb", e)
|
glog.V(0).Infoln("loading leveldb medium", fileName+".ldb")
|
||||||
}
|
opts := &opt.Options{
|
||||||
case NeedleMapLevelDbLarge:
|
BlockCacheCapacity: 4 * 1024 * 1024, // default value is 8MiB
|
||||||
glog.V(0).Infoln("loading leveldb large", fileName+".ldb")
|
WriteBuffer: 2 * 1024 * 1024, // default value is 4MiB
|
||||||
opts := &opt.Options{
|
CompactionTableSizeMultiplier: 10, // default value is 1
|
||||||
BlockCacheCapacity: 8 * 1024 * 1024, // default value is 8MiB
|
}
|
||||||
WriteBuffer: 4 * 1024 * 1024, // default value is 4MiB
|
if v.nm, e = NewLevelDbNeedleMap(fileName+".ldb", indexFile, opts); e != nil {
|
||||||
CompactionTableSizeMultiplier: 10, // default value is 1
|
glog.V(0).Infof("loading leveldb %s error: %v", fileName+".ldb", e)
|
||||||
}
|
}
|
||||||
if v.nm, e = NewLevelDbNeedleMap(fileName+".ldb", indexFile, opts); e != nil {
|
case NeedleMapLevelDbLarge:
|
||||||
glog.V(0).Infof("loading leveldb %s error: %v", fileName+".ldb", e)
|
glog.V(0).Infoln("loading leveldb large", fileName+".ldb")
|
||||||
|
opts := &opt.Options{
|
||||||
|
BlockCacheCapacity: 8 * 1024 * 1024, // default value is 8MiB
|
||||||
|
WriteBuffer: 4 * 1024 * 1024, // default value is 4MiB
|
||||||
|
CompactionTableSizeMultiplier: 10, // default value is 1
|
||||||
|
}
|
||||||
|
if v.nm, e = NewLevelDbNeedleMap(fileName+".ldb", indexFile, opts); e != nil {
|
||||||
|
glog.V(0).Infof("loading leveldb %s error: %v", fileName+".ldb", e)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user