From 83080b5e034bdbc0ba58eb410d04fb78bebf08cf Mon Sep 17 00:00:00 2001 From: "ruitao.liu" Date: Fri, 4 Sep 2020 15:40:13 +0800 Subject: [PATCH] ES backended filer support kv ops. --- go.mod | 1 - weed/command/scaffold.go | 2 +- weed/filer/elastic/v7/elastic_store.go | 63 ++++++++++++++++++++++++-- 3 files changed, 61 insertions(+), 5 deletions(-) diff --git a/go.mod b/go.mod index d404b9d52..98ac2b4e5 100644 --- a/go.mod +++ b/go.mod @@ -27,7 +27,6 @@ require ( github.com/go-sql-driver/mysql v1.5.0 github.com/gocql/gocql v0.0.0-20190829130954-e163eff7a8c6 github.com/gogo/protobuf v1.2.2-0.20190730201129-28a6bbf47e48 // indirect - github.com/golang/groupcache v0.0.0-20190702054246-869f871628b6 // indirect github.com/golang/protobuf v1.4.2 github.com/google/btree v1.0.0 github.com/google/uuid v1.1.1 diff --git a/weed/command/scaffold.go b/weed/command/scaffold.go index 479e0665f..68fe8e982 100644 --- a/weed/command/scaffold.go +++ b/weed/command/scaffold.go @@ -177,7 +177,7 @@ database = "seaweedfs" [elastic7] enabled = false servers = "http://localhost:9200" -# increase the value is recommend, both here and in elastic cluster configuration +# increase the value is recommend, be sure the value in Elastic is greater or equal here index.max_result_window = 10000 ` diff --git a/weed/filer/elastic/v7/elastic_store.go b/weed/filer/elastic/v7/elastic_store.go index 5c57e352a..d263b5dae 100644 --- a/weed/filer/elastic/v7/elastic_store.go +++ b/weed/filer/elastic/v7/elastic_store.go @@ -18,6 +18,7 @@ import ( var ( indexType = "_doc" indexPrefix = ".seaweedfs_" + indexKV = ".seaweedfs_kv_entries" ) type ESEntry struct { @@ -34,6 +35,11 @@ type ElasticStore struct { maxPageSize int } +type ESKVEntry struct { + Key string `json:Key` + Value string `json:Value` +} + func (store *ElasticStore) GetName() string { return "elastic7" } @@ -66,15 +72,66 @@ func (store *ElasticStore) CommitTransaction(ctx context.Context) error { func (store *ElasticStore) RollbackTransaction(ctx context.Context) error { return nil } + func (store *ElasticStore) KvDelete(ctx context.Context, key []byte) (err error) { - return filer.ErrKvNotImplemented + id := fmt.Sprintf("%x", md5.Sum(key)) + deleteResult, err := store.client.Delete(). + Index(indexKV). + Type(indexType). + Id(id). + Do(context.Background()) + if err == nil { + if deleteResult.Result == "deleted" || deleteResult.Result == "not_found" { + return nil + } + } + glog.Errorf("delete key(id:%s) %v.", string(key), err) + return fmt.Errorf("delete key %v.", err) } + func (store *ElasticStore) KvGet(ctx context.Context, key []byte) (value []byte, err error) { - return []byte(""), filer.ErrKvNotImplemented + id := fmt.Sprintf("%x", md5.Sum(key)) + searchResult, err := store.client.Get(). + Index(indexKV). + Type(indexType). + Id(id). + Do(context.Background()) + if elastic.IsNotFound(err) { + return nil, filer_pb.ErrNotFound + } + if searchResult != nil && searchResult.Found { + esEntry := &ESKVEntry{} + if err := jsoniter.Unmarshal(searchResult.Source, esEntry); err == nil { + return []byte(esEntry.Value), nil + } + } + glog.Errorf("find key(%s),%v.", string(key), err) + return nil, filer_pb.ErrNotFound } + func (store *ElasticStore) KvPut(ctx context.Context, key []byte, value []byte) (err error) { - return filer.ErrKvNotImplemented + id := fmt.Sprintf("%x", md5.Sum(key)) + esEntry := &ESKVEntry{ + string(key), + string(value), + } + val, err := jsoniter.Marshal(esEntry) + if err != nil { + glog.Errorf("insert key(%s) %v.", string(key), err) + return fmt.Errorf("insert key %v.", err) + } + _, err = store.client.Index(). + Index(indexKV). + Type(indexType). + Id(id). + BodyJson(string(val)). + Do(context.Background()) + if err != nil { + return fmt.Errorf("kv put: %v", err) + } + return nil } + func (store *ElasticStore) ListDirectoryPrefixedEntries(ctx context.Context, fullpath weed_util.FullPath, startFileName string, inclusive bool, limit int, prefix string) (entries []*filer.Entry, err error) { return nil, filer.ErrUnsupportedListDirectoryPrefixed }