seaweedfs/weed/pb/filer_pb/filer_client.go

304 lines
7.9 KiB
Go
Raw Normal View History

2020-03-23 15:01:34 +08:00
package filer_pb
2019-05-03 15:24:35 +08:00
import (
"context"
2020-03-26 17:21:30 +08:00
"errors"
2019-05-03 15:24:35 +08:00
"fmt"
"io"
"math"
2020-03-23 15:30:02 +08:00
"os"
2020-08-14 15:22:21 +08:00
"strings"
2020-03-23 15:30:02 +08:00
"time"
2019-05-03 15:24:35 +08:00
"github.com/chrislusf/seaweedfs/weed/glog"
2020-03-23 15:01:34 +08:00
"github.com/chrislusf/seaweedfs/weed/util"
2019-05-03 15:24:35 +08:00
)
2020-03-23 15:30:02 +08:00
var (
OS_UID = uint32(os.Getuid())
OS_GID = uint32(os.Getgid())
)
2019-05-03 15:24:35 +08:00
type FilerClient interface {
2020-03-23 15:01:34 +08:00
WithFilerClient(fn func(SeaweedFilerClient) error) error
2021-01-29 06:36:29 +08:00
AdjustedUrl(location *Location) string
2019-05-03 15:24:35 +08:00
}
2020-03-23 15:01:34 +08:00
func GetEntry(filerClient FilerClient, fullFilePath util.FullPath) (entry *Entry, err error) {
2019-05-03 15:24:35 +08:00
2020-01-20 15:59:46 +08:00
dir, name := fullFilePath.DirAndName()
2019-05-03 15:24:35 +08:00
2020-03-23 15:01:34 +08:00
err = filerClient.WithFilerClient(func(client SeaweedFilerClient) error {
2019-05-03 15:24:35 +08:00
2020-03-23 15:01:34 +08:00
request := &LookupDirectoryEntryRequest{
2019-05-03 15:24:35 +08:00
Directory: dir,
Name: name,
}
2020-01-25 16:31:53 +08:00
// glog.V(3).Infof("read %s request: %v", fullFilePath, request)
2020-03-23 15:01:34 +08:00
resp, err := LookupEntry(client, request)
2019-05-03 15:24:35 +08:00
if err != nil {
2020-03-23 15:01:34 +08:00
if err == ErrNotFound {
2019-05-04 05:12:51 +08:00
return nil
2019-05-03 15:24:35 +08:00
}
2020-01-25 16:31:53 +08:00
glog.V(3).Infof("read %s %v: %v", fullFilePath, resp, err)
2019-05-03 15:24:35 +08:00
return err
}
2019-12-11 15:13:14 +08:00
if resp.Entry == nil {
2020-01-25 16:31:53 +08:00
// glog.V(3).Infof("read %s entry: %v", fullFilePath, entry)
2019-12-11 15:13:14 +08:00
return nil
2019-05-03 15:24:35 +08:00
}
2019-12-11 15:13:14 +08:00
entry = resp.Entry
2019-05-03 15:24:35 +08:00
return nil
})
return
}
type EachEntryFunciton func(entry *Entry, isLast bool) error
func ReadDirAllEntries(filerClient FilerClient, fullDirPath util.FullPath, prefix string, fn EachEntryFunciton) (err error) {
2019-05-03 15:24:35 +08:00
2020-12-10 15:23:38 +08:00
var counter uint32
var startFrom string
var counterFunc = func(entry *Entry, isLast bool) error {
counter++
startFrom = entry.Name
return fn(entry, isLast)
}
2020-03-23 15:30:02 +08:00
2020-12-10 15:23:38 +08:00
var paginationLimit uint32 = 10000
if err = doList(filerClient, fullDirPath, prefix, counterFunc, "", false, paginationLimit); err != nil {
return err
}
for counter == paginationLimit {
counter = 0
if err = doList(filerClient, fullDirPath, prefix, counterFunc, startFrom, false, paginationLimit); err != nil {
return err
}
}
return nil
2020-03-23 15:30:02 +08:00
}
func List(filerClient FilerClient, parentDirectoryPath, prefix string, fn EachEntryFunciton, startFrom string, inclusive bool, limit uint32) (err error) {
2020-12-13 04:42:53 +08:00
return filerClient.WithFilerClient(func(client SeaweedFilerClient) error {
return doSeaweedList(client, util.FullPath(parentDirectoryPath), prefix, fn, startFrom, inclusive, limit)
})
2020-03-23 15:30:02 +08:00
}
2019-05-03 15:24:35 +08:00
func doList(filerClient FilerClient, fullDirPath util.FullPath, prefix string, fn EachEntryFunciton, startFrom string, inclusive bool, limit uint32) (err error) {
2020-12-13 04:42:53 +08:00
return filerClient.WithFilerClient(func(client SeaweedFilerClient) error {
return doSeaweedList(client, fullDirPath, prefix, fn, startFrom, inclusive, limit)
})
}
2020-03-23 15:30:02 +08:00
2020-12-13 04:42:53 +08:00
func SeaweedList(client SeaweedFilerClient, parentDirectoryPath, prefix string, fn EachEntryFunciton, startFrom string, inclusive bool, limit uint32) (err error) {
return doSeaweedList(client, util.FullPath(parentDirectoryPath), prefix, fn, startFrom, inclusive, limit)
}
2019-05-03 15:24:35 +08:00
2020-12-13 04:42:53 +08:00
func doSeaweedList(client SeaweedFilerClient, fullDirPath util.FullPath, prefix string, fn EachEntryFunciton, startFrom string, inclusive bool, limit uint32) (err error) {
// Redundancy limit to make it correctly judge whether it is the last file.
redLimit := limit
2021-03-15 04:21:02 +08:00
if limit != math.MaxInt32 && limit != 0 {
redLimit = limit + 1
}
2020-12-13 04:42:53 +08:00
request := &ListEntriesRequest{
Directory: string(fullDirPath),
Prefix: prefix,
StartFromFileName: startFrom,
Limit: redLimit,
2020-12-13 04:42:53 +08:00
InclusiveStartFrom: inclusive,
}
glog.V(4).Infof("read directory: %v", request)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
stream, err := client.ListEntries(ctx, request)
if err != nil {
return fmt.Errorf("list %s: %v", fullDirPath, err)
}
2019-05-03 15:24:35 +08:00
2020-12-13 04:42:53 +08:00
var prevEntry *Entry
count := 0
2020-12-13 04:42:53 +08:00
for {
resp, recvErr := stream.Recv()
if recvErr != nil {
if recvErr == io.EOF {
if prevEntry != nil {
if err := fn(prevEntry, true); err != nil {
return err
}
}
2020-12-13 04:42:53 +08:00
break
} else {
return recvErr
2019-05-03 15:24:35 +08:00
}
2020-12-13 04:42:53 +08:00
}
if prevEntry != nil {
if err := fn(prevEntry, false); err != nil {
return err
2019-05-03 15:24:35 +08:00
}
}
2020-12-13 04:42:53 +08:00
prevEntry = resp.Entry
count++
if count > int(limit) && limit != 0 {
prevEntry = nil
}
2020-12-13 04:42:53 +08:00
}
2019-05-03 15:24:35 +08:00
2020-12-13 04:42:53 +08:00
return nil
2019-05-03 15:24:35 +08:00
}
2020-03-23 15:06:24 +08:00
func Exists(filerClient FilerClient, parentDirectoryPath string, entryName string, isDirectory bool) (exists bool, err error) {
err = filerClient.WithFilerClient(func(client SeaweedFilerClient) error {
request := &LookupDirectoryEntryRequest{
Directory: parentDirectoryPath,
Name: entryName,
}
glog.V(4).Infof("exists entry %v/%v: %v", parentDirectoryPath, entryName, request)
resp, err := LookupEntry(client, request)
if err != nil {
if err == ErrNotFound {
exists = false
return nil
}
glog.V(0).Infof("exists entry %v: %v", request, err)
return fmt.Errorf("exists entry %s/%s: %v", parentDirectoryPath, entryName, err)
}
exists = resp.Entry.IsDirectory == isDirectory
return nil
})
return
}
2020-03-23 15:30:02 +08:00
func Touch(filerClient FilerClient, parentDirectoryPath string, entryName string, entry *Entry) (err error) {
return filerClient.WithFilerClient(func(client SeaweedFilerClient) error {
request := &UpdateEntryRequest{
Directory: parentDirectoryPath,
Entry: entry,
}
glog.V(4).Infof("touch entry %v/%v: %v", parentDirectoryPath, entryName, request)
if err := UpdateEntry(client, request); err != nil {
glog.V(0).Infof("touch exists entry %v: %v", request, err)
return fmt.Errorf("touch exists entry %s/%s: %v", parentDirectoryPath, entryName, err)
}
return nil
})
}
2020-03-23 15:30:02 +08:00
func Mkdir(filerClient FilerClient, parentDirectoryPath string, dirName string, fn func(entry *Entry)) error {
return filerClient.WithFilerClient(func(client SeaweedFilerClient) error {
entry := &Entry{
Name: dirName,
IsDirectory: true,
Attributes: &FuseAttributes{
Mtime: time.Now().Unix(),
Crtime: time.Now().Unix(),
FileMode: uint32(0777 | os.ModeDir),
Uid: OS_UID,
Gid: OS_GID,
},
}
if fn != nil {
fn(entry)
}
request := &CreateEntryRequest{
Directory: parentDirectoryPath,
Entry: entry,
}
glog.V(1).Infof("mkdir: %v", request)
if err := CreateEntry(client, request); err != nil {
glog.V(0).Infof("mkdir %v: %v", request, err)
return fmt.Errorf("mkdir %s/%s: %v", parentDirectoryPath, dirName, err)
}
return nil
})
}
func MkFile(filerClient FilerClient, parentDirectoryPath string, fileName string, chunks []*FileChunk, fn func(entry *Entry)) error {
2020-03-23 15:30:02 +08:00
return filerClient.WithFilerClient(func(client SeaweedFilerClient) error {
entry := &Entry{
Name: fileName,
IsDirectory: false,
Attributes: &FuseAttributes{
Mtime: time.Now().Unix(),
Crtime: time.Now().Unix(),
FileMode: uint32(0770),
Uid: OS_UID,
Gid: OS_GID,
},
Chunks: chunks,
}
if fn != nil {
fn(entry)
}
2020-03-23 15:30:02 +08:00
request := &CreateEntryRequest{
Directory: parentDirectoryPath,
Entry: entry,
}
glog.V(1).Infof("create file: %s/%s", parentDirectoryPath, fileName)
if err := CreateEntry(client, request); err != nil {
glog.V(0).Infof("create file %v:%v", request, err)
return fmt.Errorf("create file %s/%s: %v", parentDirectoryPath, fileName, err)
}
return nil
})
}
2020-03-23 16:14:21 +08:00
2020-09-10 02:21:23 +08:00
func Remove(filerClient FilerClient, parentDirectoryPath, name string, isDeleteData, isRecursive, ignoreRecursiveErr, isFromOtherCluster bool, signatures []int32) error {
2020-03-23 16:14:21 +08:00
return filerClient.WithFilerClient(func(client SeaweedFilerClient) error {
deleteEntryRequest := &DeleteEntryRequest{
2020-03-23 16:14:21 +08:00
Directory: parentDirectoryPath,
Name: name,
IsDeleteData: isDeleteData,
IsRecursive: isRecursive,
IgnoreRecursiveError: ignoreRecursiveErr,
IsFromOtherCluster: isFromOtherCluster,
2020-09-10 02:21:23 +08:00
Signatures: signatures,
}
if resp, err := client.DeleteEntry(context.Background(), deleteEntryRequest); err != nil {
if strings.Contains(err.Error(), ErrNotFound.Error()) {
2020-08-14 15:23:01 +08:00
return nil
}
2020-03-23 16:14:21 +08:00
return err
2020-03-26 17:21:30 +08:00
} else {
if resp.Error != "" {
if strings.Contains(resp.Error, ErrNotFound.Error()) {
2020-08-14 15:23:01 +08:00
return nil
}
2020-03-26 17:21:30 +08:00
return errors.New(resp.Error)
}
2020-03-23 16:14:21 +08:00
}
return nil
})
}