Merge pull request #1449 from taozix/master

ES backended filer support kv ops.
This commit is contained in:
Chris Lu 2020-09-04 11:22:52 -07:00 committed by GitHub
commit ca5d8e0d45
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 104 additions and 31 deletions

1
go.mod
View File

@ -27,7 +27,6 @@ require (
github.com/go-sql-driver/mysql v1.5.0
github.com/gocql/gocql v0.0.0-20190829130954-e163eff7a8c6
github.com/gogo/protobuf v1.2.2-0.20190730201129-28a6bbf47e48 // indirect
github.com/golang/groupcache v0.0.0-20190702054246-869f871628b6 // indirect
github.com/golang/protobuf v1.4.2
github.com/google/btree v1.0.0
github.com/google/uuid v1.1.1

View File

@ -177,7 +177,7 @@ database = "seaweedfs"
[elastic7]
enabled = false
servers = "http://localhost:9200"
# increase the value is recommend, both here and in elastic cluster configuration
# increase the value is recommend, be sure the value in Elastic is greater or equal here
index.max_result_window = 10000
`

View File

@ -16,8 +16,14 @@ import (
)
var (
indexType = "_doc"
indexPrefix = ".seaweedfs_"
indexType = "_doc"
indexPrefix = ".seaweedfs_"
indexKV = ".seaweedfs_kv_entries"
mappingWithoutQuery = ` {
"mappings": {
"enabled": false
}
}`
)
type ESEntry struct {
@ -25,6 +31,10 @@ type ESEntry struct {
Entry *filer.Entry
}
type ESKVEntry struct {
Value string `json:Value`
}
func init() {
filer.Stores = append(filer.Stores, &ElasticStore{})
}
@ -55,6 +65,12 @@ func (store *ElasticStore) Initialize(configuration weed_util.Configuration, pre
if err != nil {
return fmt.Errorf("init elastic %s: %v.", servers, err)
}
if ok, err := store.client.IndexExists(indexKV).Do(context.Background()); err == nil && !ok {
_, err = store.client.CreateIndex(indexKV).Body(mappingWithoutQuery).Do(context.Background())
if err != nil {
return fmt.Errorf("create index(%s) %v.", indexKV, err)
}
}
return nil
}
func (store *ElasticStore) BeginTransaction(ctx context.Context) (context.Context, error) {
@ -66,15 +82,6 @@ func (store *ElasticStore) CommitTransaction(ctx context.Context) error {
func (store *ElasticStore) RollbackTransaction(ctx context.Context) error {
return nil
}
func (store *ElasticStore) KvDelete(ctx context.Context, key []byte) (err error) {
return filer.ErrKvNotImplemented
}
func (store *ElasticStore) KvGet(ctx context.Context, key []byte) (value []byte, err error) {
return []byte(""), filer.ErrKvNotImplemented
}
func (store *ElasticStore) KvPut(ctx context.Context, key []byte, value []byte) (err error) {
return filer.ErrKvNotImplemented
}
func (store *ElasticStore) ListDirectoryPrefixedEntries(ctx context.Context, fullpath weed_util.FullPath, startFileName string, inclusive bool, limit int, prefix string) (entries []*filer.Entry, err error) {
return nil, filer.ErrUnsupportedListDirectoryPrefixed
}
@ -97,7 +104,7 @@ func (store *ElasticStore) InsertEntry(ctx context.Context, entry *filer.Entry)
Type(indexType).
Id(id).
BodyJson(string(value)).
Do(context.Background())
Do(ctx)
if err != nil {
glog.Errorf("insert entry(%s) %v.", string(entry.FullPath), err)
return fmt.Errorf("insert entry %v.", err)
@ -114,7 +121,7 @@ func (store *ElasticStore) FindEntry(ctx context.Context, fullpath weed_util.Ful
Index(index).
Type(indexType).
Id(id).
Do(context.Background())
Do(ctx)
if elastic.IsNotFound(err) {
return nil, filer_pb.ErrNotFound
}
@ -133,24 +140,24 @@ func (store *ElasticStore) DeleteEntry(ctx context.Context, fullpath weed_util.F
index := getIndex(fullpath)
id := fmt.Sprintf("%x", md5.Sum([]byte(fullpath)))
if strings.Count(string(fullpath), "/") == 1 {
return store.deleteIndex(index)
return store.deleteIndex(ctx, index)
}
return store.deleteEntry(index, id)
return store.deleteEntry(ctx, index, id)
}
func (store *ElasticStore) deleteIndex(index string) (err error) {
deleteResult, err := store.client.DeleteIndex(index).Do(context.Background())
func (store *ElasticStore) deleteIndex(ctx context.Context, index string) (err error) {
deleteResult, err := store.client.DeleteIndex(index).Do(ctx)
if elastic.IsNotFound(err) || (err == nil && deleteResult.Acknowledged) {
return nil
}
glog.Errorf("delete index(%s) %v.", index, err)
return err
}
func (store *ElasticStore) deleteEntry(index, id string) (err error) {
func (store *ElasticStore) deleteEntry(ctx context.Context, index, id string) (err error) {
deleteResult, err := store.client.Delete().
Index(index).
Type(indexType).
Id(id).
Do(context.Background())
Do(ctx)
if err == nil {
if deleteResult.Result == "deleted" || deleteResult.Result == "not_found" {
return nil
@ -178,12 +185,15 @@ func (store *ElasticStore) ListDirectoryEntries(
}
func (store *ElasticStore) listRootDirectoryEntries(ctx context.Context, startFileName string, inclusive bool, limit int) (entries []*filer.Entry, err error) {
indexResult, err := store.client.CatIndices().Do(context.Background())
indexResult, err := store.client.CatIndices().Do(ctx)
if err != nil {
glog.Errorf("list indices %v.", err)
return entries, err
}
for _, index := range indexResult {
if index.Index == indexKV {
continue
}
if strings.HasPrefix(index.Index, indexPrefix) {
if entry, err := store.FindEntry(ctx,
weed_util.FullPath("/"+strings.Replace(index.Index, indexPrefix, "", 1))); err == nil {
@ -209,16 +219,16 @@ func (store *ElasticStore) listDirectoryEntries(
index := getIndex(fullpath)
nextStart := ""
parentId := fmt.Sprintf("%x", md5.Sum([]byte(fullpath)))
if _, err := store.client.Refresh(index).Do(context.Background()); err != nil {
if _, err := store.client.Refresh(index).Do(ctx); err != nil {
if elastic.IsNotFound(err) {
store.client.CreateIndex(index).Do(context.Background())
store.client.CreateIndex(index).Do(ctx)
return entries, nil
}
}
for {
result := &elastic.SearchResult{}
if (startFileName == "" && first) || inclusive {
if result, err = store.search(index, parentId); err != nil {
if result, err = store.search(ctx, index, parentId); err != nil {
glog.Errorf("search (%s,%s,%t,%d) %v.", string(fullpath), startFileName, inclusive, limit, err)
return entries, err
}
@ -228,7 +238,7 @@ func (store *ElasticStore) listDirectoryEntries(
fullPath = nextStart
}
after := fmt.Sprintf("%x", md5.Sum([]byte(fullPath)))
if result, err = store.searchAfter(index, parentId, after); err != nil {
if result, err = store.searchAfter(ctx, index, parentId, after); err != nil {
glog.Errorf("searchAfter (%s,%s,%t,%d) %v.", string(fullpath), startFileName, inclusive, limit, err)
return entries, err
}
@ -259,8 +269,8 @@ func (store *ElasticStore) listDirectoryEntries(
return entries, nil
}
func (store *ElasticStore) search(index, parentId string) (result *elastic.SearchResult, err error) {
if count, err := store.client.Count(index).Do(context.Background()); err == nil && count == 0 {
func (store *ElasticStore) search(ctx context.Context, index, parentId string) (result *elastic.SearchResult, err error) {
if count, err := store.client.Count(index).Do(ctx); err == nil && count == 0 {
return &elastic.SearchResult{
Hits: &elastic.SearchHits{
Hits: make([]*elastic.SearchHit, 0)},
@ -271,18 +281,18 @@ func (store *ElasticStore) search(index, parentId string) (result *elastic.Searc
Query(elastic.NewMatchQuery("ParentId", parentId)).
Size(store.maxPageSize).
Sort("_id", false).
Do(context.Background())
Do(ctx)
return queryResult, err
}
func (store *ElasticStore) searchAfter(index, parentId, after string) (result *elastic.SearchResult, err error) {
func (store *ElasticStore) searchAfter(ctx context.Context, index, parentId, after string) (result *elastic.SearchResult, err error) {
queryResult, err := store.client.Search().
Index(index).
Query(elastic.NewMatchQuery("ParentId", parentId)).
SearchAfter(after).
Size(store.maxPageSize).
Sort("_id", false).
Do(context.Background())
Do(ctx)
return queryResult, err
}

View File

@ -0,0 +1,64 @@
package elastic
import (
"context"
"fmt"
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
jsoniter "github.com/json-iterator/go"
elastic "github.com/olivere/elastic/v7"
)
func (store *ElasticStore) KvDelete(ctx context.Context, key []byte) (err error) {
deleteResult, err := store.client.Delete().
Index(indexKV).
Type(indexType).
Id(string(key)).
Do(ctx)
if err == nil {
if deleteResult.Result == "deleted" || deleteResult.Result == "not_found" {
return nil
}
}
glog.Errorf("delete key(id:%s) %v.", string(key), err)
return fmt.Errorf("delete key %v.", err)
}
func (store *ElasticStore) KvGet(ctx context.Context, key []byte) (value []byte, err error) {
searchResult, err := store.client.Get().
Index(indexKV).
Type(indexType).
Id(string(key)).
Do(ctx)
if elastic.IsNotFound(err) {
return nil, filer_pb.ErrNotFound
}
if searchResult != nil && searchResult.Found {
esEntry := &ESKVEntry{}
if err := jsoniter.Unmarshal(searchResult.Source, esEntry); err == nil {
return []byte(esEntry.Value), nil
}
}
glog.Errorf("find key(%s),%v.", string(key), err)
return nil, filer_pb.ErrNotFound
}
func (store *ElasticStore) KvPut(ctx context.Context, key []byte, value []byte) (err error) {
esEntry := &ESKVEntry{string(value)}
val, err := jsoniter.Marshal(esEntry)
if err != nil {
glog.Errorf("insert key(%s) %v.", string(key), err)
return fmt.Errorf("insert key %v.", err)
}
_, err = store.client.Index().
Index(indexKV).
Type(indexType).
Id(string(key)).
BodyJson(string(val)).
Do(ctx)
if err != nil {
return fmt.Errorf("kv put: %v", err)
}
return nil
}