seaweedfs/weed/filer2/filer_client_util.go

175 lines
4.2 KiB
Go
Raw Normal View History

2019-05-03 15:24:35 +08:00
package filer2
import (
"context"
"fmt"
"io"
"math"
2019-05-03 15:24:35 +08:00
"strings"
"sync"
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
"github.com/chrislusf/seaweedfs/weed/util"
)
func VolumeId(fileId string) string {
lastCommaIndex := strings.LastIndex(fileId, ",")
if lastCommaIndex > 0 {
return fileId[:lastCommaIndex]
}
return fileId
}
type FilerClient interface {
WithFilerClient(fn func(filer_pb.SeaweedFilerClient) error) error
AdjustedUrl(hostAndPort string) string
2019-05-03 15:24:35 +08:00
}
func ReadIntoBuffer(filerClient FilerClient, fullFilePath FullPath, buff []byte, chunkViews []*ChunkView, baseOffset int64) (totalRead int64, err error) {
2019-05-03 15:24:35 +08:00
var vids []string
for _, chunkView := range chunkViews {
vids = append(vids, VolumeId(chunkView.FileId))
}
vid2Locations := make(map[string]*filer_pb.Locations)
err = filerClient.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
2019-05-03 15:24:35 +08:00
glog.V(4).Infof("read fh lookup volume id locations: %v", vids)
resp, err := client.LookupVolume(context.Background(), &filer_pb.LookupVolumeRequest{
2019-05-03 15:24:35 +08:00
VolumeIds: vids,
})
if err != nil {
return err
}
vid2Locations = resp.LocationsMap
return nil
})
if err != nil {
return 0, fmt.Errorf("failed to lookup volume ids %v: %v", vids, err)
}
var wg sync.WaitGroup
for _, chunkView := range chunkViews {
wg.Add(1)
go func(chunkView *ChunkView) {
defer wg.Done()
glog.V(4).Infof("read fh reading chunk: %+v", chunkView)
locations := vid2Locations[VolumeId(chunkView.FileId)]
if locations == nil || len(locations.Locations) == 0 {
glog.V(0).Infof("failed to locate %s", chunkView.FileId)
err = fmt.Errorf("failed to locate %s", chunkView.FileId)
return
}
volumeServerAddress := filerClient.AdjustedUrl(locations.Locations[0].Url)
2019-05-03 15:24:35 +08:00
var n int64
n, err = util.ReadUrl(
fmt.Sprintf("http://%s/%s", volumeServerAddress, chunkView.FileId),
2019-05-03 15:24:35 +08:00
chunkView.Offset,
int(chunkView.Size),
buff[chunkView.LogicOffset-baseOffset:chunkView.LogicOffset-baseOffset+int64(chunkView.Size)],
!chunkView.IsFullChunk)
if err != nil {
glog.V(0).Infof("%v read http://%s/%v %v bytes: %v", fullFilePath, volumeServerAddress, chunkView.FileId, n, err)
2019-05-03 15:24:35 +08:00
err = fmt.Errorf("failed to read http://%s/%s: %v",
volumeServerAddress, chunkView.FileId, err)
2019-05-03 15:24:35 +08:00
return
}
glog.V(4).Infof("read fh read %d bytes: %+v", n, chunkView)
totalRead += n
}(chunkView)
}
wg.Wait()
return
}
func GetEntry(filerClient FilerClient, fullFilePath FullPath) (entry *filer_pb.Entry, err error) {
2019-05-03 15:24:35 +08:00
2020-01-20 15:59:46 +08:00
dir, name := fullFilePath.DirAndName()
2019-05-03 15:24:35 +08:00
err = filerClient.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
2019-05-03 15:24:35 +08:00
request := &filer_pb.LookupDirectoryEntryRequest{
Directory: dir,
Name: name,
}
2020-01-25 16:31:53 +08:00
// glog.V(3).Infof("read %s request: %v", fullFilePath, request)
resp, err := client.LookupDirectoryEntry(context.Background(), request)
2019-05-03 15:24:35 +08:00
if err != nil {
2019-05-04 04:13:08 +08:00
if err == ErrNotFound || strings.Contains(err.Error(), ErrNotFound.Error()) {
2019-05-04 05:12:51 +08:00
return nil
2019-05-03 15:24:35 +08:00
}
2020-01-25 16:31:53 +08:00
glog.V(3).Infof("read %s %v: %v", fullFilePath, resp, err)
2019-05-03 15:24:35 +08:00
return err
}
2019-12-11 15:13:14 +08:00
if resp.Entry == nil {
2020-01-25 16:31:53 +08:00
// glog.V(3).Infof("read %s entry: %v", fullFilePath, entry)
2019-12-11 15:13:14 +08:00
return nil
2019-05-03 15:24:35 +08:00
}
2019-12-11 15:13:14 +08:00
entry = resp.Entry
2019-05-03 15:24:35 +08:00
return nil
})
return
}
func ReadDirAllEntries(filerClient FilerClient, fullDirPath FullPath, prefix string, fn func(entry *filer_pb.Entry, isLast bool)) (err error) {
2019-05-03 15:24:35 +08:00
err = filerClient.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
2019-05-03 15:24:35 +08:00
lastEntryName := ""
request := &filer_pb.ListEntriesRequest{
2020-01-20 15:59:46 +08:00
Directory: string(fullDirPath),
Prefix: prefix,
StartFromFileName: lastEntryName,
Limit: math.MaxUint32,
}
2019-05-03 15:24:35 +08:00
glog.V(3).Infof("read directory: %v", request)
stream, err := client.ListEntries(context.Background(), request)
if err != nil {
return fmt.Errorf("list %s: %v", fullDirPath, err)
}
2019-05-03 15:24:35 +08:00
var prevEntry *filer_pb.Entry
for {
resp, recvErr := stream.Recv()
if recvErr != nil {
if recvErr == io.EOF {
if prevEntry != nil {
fn(prevEntry, true)
}
break
} else {
return recvErr
}
2019-05-03 15:24:35 +08:00
}
if prevEntry != nil {
fn(prevEntry, false)
2019-05-03 15:24:35 +08:00
}
prevEntry = resp.Entry
2019-05-03 15:24:35 +08:00
}
return nil
})
return
}