mirror of
https://github.com/seaweedfs/seaweedfs.git
synced 2025-01-22 17:40:49 +08:00
70a4c98b00
for later locking on reading chunks
232 lines
5.7 KiB
Go
232 lines
5.7 KiB
Go
package hbase
|
|
|
|
import (
|
|
"bytes"
|
|
"context"
|
|
"fmt"
|
|
"github.com/seaweedfs/seaweedfs/weed/filer"
|
|
"github.com/seaweedfs/seaweedfs/weed/glog"
|
|
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
|
|
"github.com/seaweedfs/seaweedfs/weed/util"
|
|
"github.com/tsuna/gohbase"
|
|
"github.com/tsuna/gohbase/hrpc"
|
|
"io"
|
|
)
|
|
|
|
func init() {
|
|
filer.Stores = append(filer.Stores, &HbaseStore{})
|
|
}
|
|
|
|
type HbaseStore struct {
|
|
Client gohbase.Client
|
|
table []byte
|
|
cfKv string
|
|
cfMetaDir string
|
|
column string
|
|
}
|
|
|
|
func (store *HbaseStore) GetName() string {
|
|
return "hbase"
|
|
}
|
|
|
|
func (store *HbaseStore) Initialize(configuration util.Configuration, prefix string) (err error) {
|
|
return store.initialize(
|
|
configuration.GetString(prefix+"zkquorum"),
|
|
configuration.GetString(prefix+"table"),
|
|
)
|
|
}
|
|
|
|
func (store *HbaseStore) initialize(zkquorum, table string) (err error) {
|
|
store.Client = gohbase.NewClient(zkquorum)
|
|
store.table = []byte(table)
|
|
store.cfKv = "kv"
|
|
store.cfMetaDir = "meta"
|
|
store.column = "a"
|
|
|
|
// check table exists
|
|
key := "whatever"
|
|
headers := map[string][]string{store.cfMetaDir: nil}
|
|
get, err := hrpc.NewGet(context.Background(), store.table, []byte(key), hrpc.Families(headers))
|
|
if err != nil {
|
|
return fmt.Errorf("NewGet returned an error: %v", err)
|
|
}
|
|
_, err = store.Client.Get(get)
|
|
if err != gohbase.TableNotFound {
|
|
return nil
|
|
}
|
|
|
|
// create table
|
|
adminClient := gohbase.NewAdminClient(zkquorum)
|
|
cFamilies := []string{store.cfKv, store.cfMetaDir}
|
|
cf := make(map[string]map[string]string, len(cFamilies))
|
|
for _, f := range cFamilies {
|
|
cf[f] = nil
|
|
}
|
|
ct := hrpc.NewCreateTable(context.Background(), []byte(table), cf)
|
|
if err := adminClient.CreateTable(ct); err != nil {
|
|
return err
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (store *HbaseStore) InsertEntry(ctx context.Context, entry *filer.Entry) error {
|
|
value, err := entry.EncodeAttributesAndChunks()
|
|
if err != nil {
|
|
return fmt.Errorf("encoding %s %+v: %v", entry.FullPath, entry.Attr, err)
|
|
}
|
|
if len(entry.GetChunks()) > filer.CountEntryChunksForGzip {
|
|
value = util.MaybeGzipData(value)
|
|
}
|
|
|
|
return store.doPut(ctx, store.cfMetaDir, []byte(entry.FullPath), value, entry.TtlSec)
|
|
}
|
|
|
|
func (store *HbaseStore) UpdateEntry(ctx context.Context, entry *filer.Entry) (err error) {
|
|
return store.InsertEntry(ctx, entry)
|
|
}
|
|
|
|
func (store *HbaseStore) FindEntry(ctx context.Context, path util.FullPath) (entry *filer.Entry, err error) {
|
|
value, err := store.doGet(ctx, store.cfMetaDir, []byte(path))
|
|
if err != nil {
|
|
if err == filer.ErrKvNotFound {
|
|
return nil, filer_pb.ErrNotFound
|
|
}
|
|
return nil, err
|
|
}
|
|
|
|
entry = &filer.Entry{
|
|
FullPath: path,
|
|
}
|
|
err = entry.DecodeAttributesAndChunks(util.MaybeDecompressData(value))
|
|
if err != nil {
|
|
return entry, fmt.Errorf("decode %s : %v", entry.FullPath, err)
|
|
}
|
|
return entry, nil
|
|
}
|
|
|
|
func (store *HbaseStore) DeleteEntry(ctx context.Context, path util.FullPath) (err error) {
|
|
return store.doDelete(ctx, store.cfMetaDir, []byte(path))
|
|
}
|
|
|
|
func (store *HbaseStore) DeleteFolderChildren(ctx context.Context, path util.FullPath) (err error) {
|
|
|
|
family := map[string][]string{store.cfMetaDir: {COLUMN_NAME}}
|
|
expectedPrefix := []byte(path.Child(""))
|
|
scan, err := hrpc.NewScanRange(ctx, store.table, expectedPrefix, nil, hrpc.Families(family))
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
scanner := store.Client.Scan(scan)
|
|
defer scanner.Close()
|
|
for {
|
|
res, err := scanner.Next()
|
|
if err != nil {
|
|
break
|
|
}
|
|
if len(res.Cells) == 0 {
|
|
continue
|
|
}
|
|
cell := res.Cells[0]
|
|
|
|
if !bytes.HasPrefix(cell.Row, expectedPrefix) {
|
|
break
|
|
}
|
|
fullpath := util.FullPath(cell.Row)
|
|
dir, _ := fullpath.DirAndName()
|
|
if dir != string(path) {
|
|
continue
|
|
}
|
|
|
|
err = store.doDelete(ctx, store.cfMetaDir, cell.Row)
|
|
if err != nil {
|
|
break
|
|
}
|
|
|
|
}
|
|
return
|
|
}
|
|
|
|
func (store *HbaseStore) ListDirectoryEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, eachEntryFunc filer.ListEachEntryFunc) (string, error) {
|
|
return store.ListDirectoryPrefixedEntries(ctx, dirPath, startFileName, includeStartFile, limit, "", eachEntryFunc)
|
|
}
|
|
|
|
func (store *HbaseStore) ListDirectoryPrefixedEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, prefix string, eachEntryFunc filer.ListEachEntryFunc) (lastFileName string, err error) {
|
|
family := map[string][]string{store.cfMetaDir: {COLUMN_NAME}}
|
|
expectedPrefix := []byte(dirPath.Child(prefix))
|
|
scan, err := hrpc.NewScanRange(ctx, store.table, expectedPrefix, nil, hrpc.Families(family))
|
|
if err != nil {
|
|
return lastFileName, err
|
|
}
|
|
|
|
scanner := store.Client.Scan(scan)
|
|
defer scanner.Close()
|
|
for {
|
|
res, err := scanner.Next()
|
|
if err == io.EOF {
|
|
break
|
|
}
|
|
if err != nil {
|
|
return lastFileName, err
|
|
}
|
|
if len(res.Cells) == 0 {
|
|
continue
|
|
}
|
|
cell := res.Cells[0]
|
|
|
|
if !bytes.HasPrefix(cell.Row, expectedPrefix) {
|
|
break
|
|
}
|
|
|
|
fullpath := util.FullPath(cell.Row)
|
|
dir, fileName := fullpath.DirAndName()
|
|
if dir != string(dirPath) {
|
|
continue
|
|
}
|
|
|
|
value := cell.Value
|
|
|
|
if fileName == startFileName && !includeStartFile {
|
|
continue
|
|
}
|
|
|
|
limit--
|
|
if limit < 0 {
|
|
break
|
|
}
|
|
|
|
lastFileName = fileName
|
|
|
|
entry := &filer.Entry{
|
|
FullPath: fullpath,
|
|
}
|
|
if decodeErr := entry.DecodeAttributesAndChunks(util.MaybeDecompressData(value)); decodeErr != nil {
|
|
err = decodeErr
|
|
glog.V(0).Infof("list %s : %v", entry.FullPath, err)
|
|
break
|
|
}
|
|
if !eachEntryFunc(entry) {
|
|
break
|
|
}
|
|
}
|
|
|
|
return lastFileName, nil
|
|
}
|
|
|
|
func (store *HbaseStore) BeginTransaction(ctx context.Context) (context.Context, error) {
|
|
return ctx, nil
|
|
}
|
|
|
|
func (store *HbaseStore) CommitTransaction(ctx context.Context) error {
|
|
return nil
|
|
}
|
|
|
|
func (store *HbaseStore) RollbackTransaction(ctx context.Context) error {
|
|
return nil
|
|
}
|
|
|
|
func (store *HbaseStore) Shutdown() {
|
|
store.Client.Close()
|
|
}
|