diff --git a/weed/filer/ydb/ydb_queries.go b/weed/filer/ydb/ydb_queries.go index fdfc8bcb1..6e0939948 100644 --- a/weed/filer/ydb/ydb_queries.go +++ b/weed/filer/ydb/ydb_queries.go @@ -1,23 +1,15 @@ package ydb -const ( - createQuery = ` - PRAGMA TablePathPrefix("%s"); - CREATE TABLE file_meta ( - dir_hash int64, - name Utf8, - directory Utf8, - meta String, - PRIMARY KEY (dir_hash, name) - );` +import asql "github.com/chrislusf/seaweedfs/weed/filer/abstract_sql" +const ( insertQuery = ` DECLARE $dir_hash int64; DECLARE $name AS Utf8; DECLARE $directory AS Utf8; DECLARE $meta AS String; - UPSERT INTO file_meta + UPSERT INTO ` + asql.DEFAULT_TABLE + ` (dir_hash, name, directory, meta) VALUES ($dir_hash, $name, $directory, $meta);` @@ -28,7 +20,7 @@ const ( DECLARE $directory AS Utf8; DECLARE $meta AS String; - REPLACE INTO file_meta + REPLACE INTO ` + asql.DEFAULT_TABLE + ` (dir_hash, name, directory, meta) VALUES ($dir_hash, $name, $directory, $meta) @@ -38,7 +30,7 @@ const ( DECLARE $dir_hash int64; DECLARE $name AS Utf8; - DELETE FROM file_meta + DELETE FROM ` + asql.DEFAULT_TABLE + ` WHERE dir_hash == $dir_hash AND name == $name; COMMIT;` @@ -54,11 +46,11 @@ const ( DECLARE $dir_hash int64; DECLARE $directory AS Utf8; - DELETE FROM file_meta + DELETE FROM ` + asql.DEFAULT_TABLE + ` WHERE dir_hash == $dir_hash AND directory == $directory; COMMIT;` - ListDirectoryQuery = ` + listDirectoryQuery = ` DECLARE $dir_hash int64; DECLARE $directory AS Utf8; DECLARE $start_name AS Utf8; @@ -66,7 +58,7 @@ const ( DECLARE $limit AS int64; SELECT name, meta - FROM file_meta + FROM ` + asql.DEFAULT_TABLE + ` WHERE dir_hash == $dir_hash AND directory == $directory and name %s $start_name and name LIKE '$prefix%%' ORDER BY name ASC LIMIT $limit;` ) diff --git a/weed/filer/ydb/ydb_store.go b/weed/filer/ydb/ydb_store.go index 4f3477471..9f136703b 100644 --- a/weed/filer/ydb/ydb_store.go +++ b/weed/filer/ydb/ydb_store.go @@ -4,17 +4,20 @@ import ( "context" "fmt" "github.com/chrislusf/seaweedfs/weed/filer" + "github.com/chrislusf/seaweedfs/weed/filer/abstract_sql" "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" "github.com/chrislusf/seaweedfs/weed/util" - environ "github.com/ydb-platform/ydb-go-sdk-auth-environ" - ydb "github.com/ydb-platform/ydb-go-sdk/v3" + "github.com/ydb-platform/ydb-go-sdk-auth-environ" + "github.com/ydb-platform/ydb-go-sdk/v3" "github.com/ydb-platform/ydb-go-sdk/v3/sugar" "github.com/ydb-platform/ydb-go-sdk/v3/table" "github.com/ydb-platform/ydb-go-sdk/v3/table/result/named" "github.com/ydb-platform/ydb-go-sdk/v3/table/types" + "os" "path" "strings" + "sync" "time" ) @@ -31,10 +34,12 @@ var ( ) type YdbStore struct { - SupportBucketTable bool DB ydb.Connection dirBuckets string tablePathPrefix string + SupportBucketTable bool + dbs map[string]bool + dbsLock sync.Mutex } func init() { @@ -60,6 +65,7 @@ func (store *YdbStore) initialize(dirBuckets string, dsn string, tablePathPrefix store.dirBuckets = dirBuckets store.tablePathPrefix = tablePathPrefix store.SupportBucketTable = useBucketPrefix + store.dbs = make(map[string]bool) ctx, cancel := context.WithCancel(context.Background()) defer cancel() if connectionTimeOut == 0 { @@ -72,6 +78,9 @@ func (store *YdbStore) initialize(dirBuckets string, dsn string, tablePathPrefix if poolSizeLimit > 0 { opts = append(opts, ydb.WithSessionPoolSizeLimit(poolSizeLimit)) } + if dsn == "" { + dsn = os.Getenv("YDB_CONNECTION_STRING") + } store.DB, err = ydb.Open(ctx, dsn, opts...) if err != nil { _ = store.DB.Close(ctx) @@ -79,6 +88,7 @@ func (store *YdbStore) initialize(dirBuckets string, dsn string, tablePathPrefix return fmt.Errorf("can not connect to %s error:%v", dsn, err) } defer func() { _ = store.DB.Close(ctx) }() + store.tablePathPrefix = path.Join(store.DB.Name(), tablePathPrefix) if err = sugar.RemoveRecursive(ctx, store.DB, store.tablePathPrefix); err != nil { return fmt.Errorf("RemoveRecursive %s : %v", store.tablePathPrefix, err) @@ -86,6 +96,18 @@ func (store *YdbStore) initialize(dirBuckets string, dsn string, tablePathPrefix if err = sugar.MakeRecursive(ctx, store.DB, store.tablePathPrefix); err != nil { return fmt.Errorf("MakeRecursive %s : %v", store.tablePathPrefix, err) } + + whoAmI, err := store.DB.Discovery().WhoAmI(ctx) + if err != nil { + return fmt.Errorf("connect to %s error:%v", dsn, err) + } + glog.V(0).Infof("connected to ydb: %s", whoAmI.String()) + + tablePath := path.Join(store.tablePathPrefix, abstract_sql.DEFAULT_TABLE) + if err := store.createTable(ctx, tablePath); err != nil { + glog.Errorf("createTable %s: %v", tablePath, err) + } + return nil } @@ -102,11 +124,11 @@ func (store *YdbStore) insertOrUpdateEntry(ctx context.Context, entry *filer.Ent fileMeta := FileMeta{util.HashStringToLong(dir), name, dir, meta} return store.DB.Table().Do(ctx, func(ctx context.Context, s table.Session) (err error) { - stmt, err := s.Prepare(ctx, store.withPragma(store.getPrefix(dir), query)) + stmt, err := s.Prepare(ctx, withPragma(store.getPrefix(ctx, dir), query)) if err != nil { return fmt.Errorf("Prepare %s : %v", dir, err) } - _, _, err = stmt.Execute(ctx, rwTX, fileMeta.QueryParameters()) + _, _, err = stmt.Execute(ctx, rwTX, fileMeta.queryParameters()) return err }) } @@ -124,7 +146,7 @@ func (store *YdbStore) FindEntry(ctx context.Context, fullpath util.FullPath) (e var data []byte entryFound := false err = store.DB.Table().Do(ctx, func(ctx context.Context, s table.Session) error { - stmt, err := s.Prepare(ctx, store.withPragma(store.getPrefix(dir), findQuery)) + stmt, err := s.Prepare(ctx, withPragma(store.getPrefix(ctx, dir), findQuery)) if err != nil { return fmt.Errorf("Prepare %s : %v", entry.FullPath, err) } @@ -163,7 +185,7 @@ func (store *YdbStore) FindEntry(ctx context.Context, fullpath util.FullPath) (e func (store *YdbStore) DeleteEntry(ctx context.Context, fullpath util.FullPath) (err error) { dir, name := fullpath.DirAndName() return store.DB.Table().Do(ctx, func(ctx context.Context, s table.Session) (err error) { - stmt, err := s.Prepare(ctx, store.withPragma(store.getPrefix(dir), deleteQuery)) + stmt, err := s.Prepare(ctx, withPragma(store.getPrefix(ctx, dir), deleteQuery)) if err != nil { return fmt.Errorf("Prepare %s : %v", dir, err) } @@ -177,7 +199,7 @@ func (store *YdbStore) DeleteEntry(ctx context.Context, fullpath util.FullPath) func (store *YdbStore) DeleteFolderChildren(ctx context.Context, fullpath util.FullPath) (err error) { dir, _ := fullpath.DirAndName() return store.DB.Table().Do(ctx, func(ctx context.Context, s table.Session) (err error) { - stmt, err := s.Prepare(ctx, store.withPragma(store.getPrefix(dir), deleteFolderChildrenQuery)) + stmt, err := s.Prepare(ctx, withPragma(store.getPrefix(ctx, dir), deleteFolderChildrenQuery)) if err != nil { return fmt.Errorf("Prepare %s : %v", dir, err) } @@ -199,7 +221,7 @@ func (store *YdbStore) ListDirectoryPrefixedEntries(ctx context.Context, dirPath startFileCompOp = ">=" } err = store.DB.Table().Do(ctx, func(ctx context.Context, s table.Session) error { - stmt, err := s.Prepare(ctx, store.withPragma(store.getPrefix(dir), fmt.Sprintf(ListDirectoryQuery, startFileCompOp))) + stmt, err := s.Prepare(ctx, withPragma(store.getPrefix(ctx, dir), fmt.Sprintf(listDirectoryQuery, startFileCompOp))) if err != nil { return fmt.Errorf("Prepare %s : %v", dir, err) } @@ -277,22 +299,91 @@ func (store *YdbStore) Shutdown() { _ = store.DB.Close(context.Background()) } -func (store *YdbStore) getPrefix(dir string) string { +func (store *YdbStore) CanDropWholeBucket() bool { + return store.SupportBucketTable +} + +func (store *YdbStore) OnBucketCreation(bucket string) { + store.dbsLock.Lock() + defer store.dbsLock.Unlock() + + if err := store.createTable(context.Background(), bucket); err != nil { + glog.Errorf("createTable %s: %v", bucket, err) + } + + if store.dbs == nil { + return + } + store.dbs[bucket] = true +} + +func (store *YdbStore) OnBucketDeletion(bucket string) { + store.dbsLock.Lock() + defer store.dbsLock.Unlock() + + if err := store.deleteTable(context.Background(), bucket); err != nil { + glog.Errorf("deleteTable %s: %v", bucket, err) + } + + if store.dbs == nil { + return + } + delete(store.dbs, bucket) +} + +func (store *YdbStore) createTable(ctx context.Context, prefix string) error { + e, err := store.DB.Scheme().DescribePath(ctx, prefix) + if err != nil { + return fmt.Errorf("describe path %s error:%v", prefix, err) + } + if e.IsTable() { + return nil + } + return store.DB.Table().Do(ctx, func(ctx context.Context, s table.Session) error { + return s.CreateTable(ctx, prefix, createTableOptions()...) + }) +} + +func (store *YdbStore) deleteTable(ctx context.Context, prefix string) error { if !store.SupportBucketTable { - return store.tablePathPrefix + return nil + } + return store.DB.Table().Do(ctx, func(ctx context.Context, s table.Session) error { + return s.DropTable(ctx, path.Join(prefix, abstract_sql.DEFAULT_TABLE)) + }) +} + +func (store *YdbStore) getPrefix(ctx context.Context, dir string) (tablePathPrefix string) { + tablePathPrefix = store.tablePathPrefix + if !store.SupportBucketTable { + return } prefixBuckets := store.dirBuckets + "/" if strings.HasPrefix(dir, prefixBuckets) { // detect bucket bucketAndDir := dir[len(prefixBuckets):] - if t := strings.Index(bucketAndDir, "/"); t > 0 { - return path.Join(bucketAndDir[:t], store.tablePathPrefix) + t := strings.Index(bucketAndDir, "/") + if t < 0 { + return } - } - return store.tablePathPrefix -} + bucket := bucketAndDir[:t] -func (store *YdbStore) withPragma(prefix, query string) string { - return `PRAGMA TablePathPrefix("` + prefix + `");` + query + if bucket != "" { + return + } + store.dbsLock.Lock() + defer store.dbsLock.Unlock() + + if _, found := store.dbs[bucket]; !found { + if err := store.createTable(ctx, + path.Join(store.tablePathPrefix, bucket, abstract_sql.DEFAULT_TABLE)); err == nil { + store.dbs[bucket] = true + } else { + glog.Errorf("createTable %s: %v", bucket, err) + } + } + tablePathPrefix = path.Join(store.tablePathPrefix, bucket) + } + return } diff --git a/weed/filer/ydb/ydb_store_kv.go b/weed/filer/ydb/ydb_store_kv.go index b2a7af74f..81ac6446b 100644 --- a/weed/filer/ydb/ydb_store_kv.go +++ b/weed/filer/ydb/ydb_store_kv.go @@ -15,11 +15,11 @@ func (store *YdbStore) KvPut(ctx context.Context, key []byte, value []byte) (err dirStr, dirHash, name := abstract_sql.GenDirAndName(key) fileMeta := FileMeta{dirHash, name, dirStr, value} return store.DB.Table().Do(ctx, func(ctx context.Context, s table.Session) (err error) { - stmt, err := s.Prepare(ctx, store.withPragma(store.getPrefix(dirStr), insertQuery)) + stmt, err := s.Prepare(ctx, withPragma(store.getPrefix(ctx, dirStr), insertQuery)) if err != nil { return fmt.Errorf("Prepare %s: %v", util.NewFullPath(dirStr, name), err) } - _, _, err = stmt.Execute(ctx, rwTX, fileMeta.QueryParameters()) + _, _, err = stmt.Execute(ctx, rwTX, fileMeta.queryParameters()) if err != nil { return fmt.Errorf("kv put %s: %v", util.NewFullPath(dirStr, name), err) } @@ -31,7 +31,7 @@ func (store *YdbStore) KvGet(ctx context.Context, key []byte) (value []byte, err dirStr, dirHash, name := abstract_sql.GenDirAndName(key) valueFound := false err = store.DB.Table().Do(ctx, func(ctx context.Context, s table.Session) error { - stmt, err := s.Prepare(ctx, store.withPragma(store.getPrefix(dirStr), findQuery)) + stmt, err := s.Prepare(ctx, withPragma(store.getPrefix(ctx, dirStr), findQuery)) if err != nil { return fmt.Errorf("Prepare %s: %v", util.NewFullPath(dirStr, name), err) } @@ -62,7 +62,7 @@ func (store *YdbStore) KvGet(ctx context.Context, key []byte) (value []byte, err func (store *YdbStore) KvDelete(ctx context.Context, key []byte) (err error) { dirStr, dirHash, name := abstract_sql.GenDirAndName(key) return store.DB.Table().Do(ctx, func(ctx context.Context, s table.Session) (err error) { - stmt, err := s.Prepare(ctx, store.withPragma(store.getPrefix(dirStr), insertQuery)) + stmt, err := s.Prepare(ctx, withPragma(store.getPrefix(ctx, dirStr), insertQuery)) if err != nil { return fmt.Errorf("Prepare %s: %v", util.NewFullPath(dirStr, name), err) } diff --git a/weed/filer/ydb/ydb_types.go b/weed/filer/ydb/ydb_types.go index fb1df3eee..3c381797a 100644 --- a/weed/filer/ydb/ydb_types.go +++ b/weed/filer/ydb/ydb_types.go @@ -2,6 +2,7 @@ package ydb import ( "github.com/ydb-platform/ydb-go-sdk/v3/table" + "github.com/ydb-platform/ydb-go-sdk/v3/table/options" "github.com/ydb-platform/ydb-go-sdk/v3/table/types" ) @@ -18,10 +19,23 @@ type FileMeta struct { //ydb:gen scan,value type FileMetas []FileMeta -func (fm *FileMeta) QueryParameters() *table.QueryParameters { +func (fm *FileMeta) queryParameters() *table.QueryParameters { return table.NewQueryParameters( table.ValueParam("$dir_hash", types.Int64Value(fm.DirHash)), table.ValueParam("$name", types.UTF8Value(fm.Name)), table.ValueParam("$directory", types.UTF8Value(fm.Directory)), table.ValueParam("$meta", types.StringValue(fm.Meta))) } + +func createTableOptions() []options.CreateTableOption { + return []options.CreateTableOption{ + options.WithColumn("dir_hash", types.TypeUint64), + options.WithColumn("name", types.TypeUTF8), + options.WithColumn("directory", types.TypeUTF8), + options.WithColumn("meta", types.TypeString), + options.WithPrimaryKeyColumn("dir_hash", "name"), + } +} +func withPragma(prefix, query string) string { + return `PRAGMA TablePathPrefix("` + prefix + `");` + query +}