diff --git a/weed/filer/rocksdb/rocksdb_store.go b/weed/filer/rocksdb/rocksdb_store.go index ca6391386..9e058a96d 100644 --- a/weed/filer/rocksdb/rocksdb_store.go +++ b/weed/filer/rocksdb/rocksdb_store.go @@ -7,12 +7,14 @@ import ( "context" "crypto/md5" "fmt" + "io" + + "github.com/tecbot/gorocksdb" + "github.com/chrislusf/seaweedfs/weed/filer" "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" weed_util "github.com/chrislusf/seaweedfs/weed/util" - rocksdb "github.com/tecbot/gorocksdb" - "io" ) func init() { @@ -20,15 +22,15 @@ func init() { } type options struct { - opt *rocksdb.Options - ro *rocksdb.ReadOptions - wo *rocksdb.WriteOptions + opt *gorocksdb.Options + ro *gorocksdb.ReadOptions + wo *gorocksdb.WriteOptions } func (opt *options) init() { - opt.opt = rocksdb.NewDefaultOptions() - opt.ro = rocksdb.NewDefaultReadOptions() - opt.wo = rocksdb.NewDefaultWriteOptions() + opt.opt = gorocksdb.NewDefaultOptions() + opt.ro = gorocksdb.NewDefaultReadOptions() + opt.wo = gorocksdb.NewDefaultWriteOptions() } func (opt *options) close() { @@ -39,7 +41,7 @@ func (opt *options) close() { type RocksDBStore struct { path string - db *rocksdb.DB + db *gorocksdb.DB options } @@ -59,7 +61,13 @@ func (store *RocksDBStore) initialize(dir string) (err error) { } store.options.init() store.opt.SetCreateIfMissing(true) - store.db, err = rocksdb.OpenDb(store.opt, dir) + // reduce write amplification + // also avoid expired data stored in highest level never get compacted + store.opt.SetLevelCompactionDynamicLevelBytes(true) + store.opt.SetCompactionFilter(NewTTLFilter()) + // store.opt.SetMaxBackgroundCompactions(2) + + store.db, err = gorocksdb.OpenDb(store.opt, dir) return } @@ -116,7 +124,7 @@ func (store *RocksDBStore) FindEntry(ctx context.Context, fullpath weed_util.Ful entry = &filer.Entry{ FullPath: fullpath, } - err = entry.DecodeAttributesAndChunks(weed_util.MaybeDecompressData(data.Data())) + err = entry.DecodeAttributesAndChunks(data.Data()) if err != nil { return entry, fmt.Errorf("decode %s : %v", entry.FullPath, err) } @@ -141,10 +149,10 @@ func (store *RocksDBStore) DeleteEntry(ctx context.Context, fullpath weed_util.F func (store *RocksDBStore) DeleteFolderChildren(ctx context.Context, fullpath weed_util.FullPath) (err error) { directoryPrefix := genDirectoryKeyPrefix(fullpath, "") - batch := rocksdb.NewWriteBatch() + batch := gorocksdb.NewWriteBatch() defer batch.Destroy() - ro := rocksdb.NewDefaultReadOptions() + ro := gorocksdb.NewDefaultReadOptions() defer ro.Destroy() ro.SetFillCache(false) @@ -167,7 +175,7 @@ func (store *RocksDBStore) DeleteFolderChildren(ctx context.Context, fullpath we return nil } -func enumerate(iter *rocksdb.Iterator, prefix, lastKey []byte, includeLastKey bool, limit int, fn func(key, value []byte) bool) error { +func enumerate(iter *gorocksdb.Iterator, prefix, lastKey []byte, includeLastKey bool, limit int, fn func(key, value []byte) bool) error { if len(lastKey) == 0 { iter.Seek(prefix) @@ -231,7 +239,7 @@ func (store *RocksDBStore) ListDirectoryPrefixedEntries(ctx context.Context, ful lastFileStart = genDirectoryKeyPrefix(fullpath, startFileName) } - ro := rocksdb.NewDefaultReadOptions() + ro := gorocksdb.NewDefaultReadOptions() defer ro.Destroy() ro.SetFillCache(false) @@ -251,7 +259,7 @@ func (store *RocksDBStore) ListDirectoryPrefixedEntries(ctx context.Context, ful } // println("list", entry.FullPath, "chunks", len(entry.Chunks)) - if decodeErr := entry.DecodeAttributesAndChunks(weed_util.MaybeDecompressData(value)); decodeErr != nil { + if decodeErr := entry.DecodeAttributesAndChunks(value); decodeErr != nil { err = decodeErr glog.V(0).Infof("list %s : %v", entry.FullPath, err) return false diff --git a/weed/filer/rocksdb/rocksdb_ttl.go b/weed/filer/rocksdb/rocksdb_ttl.go new file mode 100644 index 000000000..98918b5d7 --- /dev/null +++ b/weed/filer/rocksdb/rocksdb_ttl.go @@ -0,0 +1,41 @@ +//+build rocksdb + +package rocksdb + +import ( + "time" + + "github.com/tecbot/gorocksdb" + + "github.com/chrislusf/seaweedfs/weed/filer" +) + +type TTLFilter struct { + skipLevel0 bool +} + +func NewTTLFilter() gorocksdb.CompactionFilter { + return &TTLFilter{ + skipLevel0: true, + } +} + +func (t *TTLFilter) Filter(level int, key, val []byte) (remove bool, newVal []byte) { + // decode could be slow, causing write stall + // level >0 sst can run compaction in parallel + if t.skipLevel0 && level == 0 { + return false, val + } + entry := filer.Entry{} + if err := entry.DecodeAttributesAndChunks(val); err == nil { + if entry.TtlSec == 0 || + entry.Crtime.Add(time.Duration(entry.TtlSec)*time.Second).After(time.Now()) { + return false, val + } + } + return true, nil +} + +func (t *TTLFilter) Name() string { + return "TTLFilter" +}