mirror of
https://github.com/seaweedfs/seaweedfs.git
synced 2024-11-28 21:39:02 +08:00
9f9ef1340c
streaming mode would create separate grpc connections for each call. this is to ensure the long poll connections are properly closed.
182 lines
5.8 KiB
Go
182 lines
5.8 KiB
Go
package filer
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"github.com/chrislusf/seaweedfs/weed/pb"
|
|
"github.com/chrislusf/seaweedfs/weed/pb/remote_pb"
|
|
"github.com/chrislusf/seaweedfs/weed/remote_storage"
|
|
"github.com/chrislusf/seaweedfs/weed/util"
|
|
"github.com/golang/protobuf/proto"
|
|
"google.golang.org/grpc"
|
|
"math"
|
|
"strings"
|
|
|
|
"github.com/chrislusf/seaweedfs/weed/glog"
|
|
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
|
|
"github.com/viant/ptrie"
|
|
)
|
|
|
|
const REMOTE_STORAGE_CONF_SUFFIX = ".conf"
|
|
const REMOTE_STORAGE_MOUNT_FILE = "mount.mapping"
|
|
|
|
type FilerRemoteStorage struct {
|
|
rules ptrie.Trie
|
|
storageNameToConf map[string]*remote_pb.RemoteConf
|
|
}
|
|
|
|
func NewFilerRemoteStorage() (rs *FilerRemoteStorage) {
|
|
rs = &FilerRemoteStorage{
|
|
rules: ptrie.New(),
|
|
storageNameToConf: make(map[string]*remote_pb.RemoteConf),
|
|
}
|
|
return rs
|
|
}
|
|
|
|
func (rs *FilerRemoteStorage) LoadRemoteStorageConfigurationsAndMapping(filer *Filer) (err error) {
|
|
// execute this on filer
|
|
|
|
limit := int64(math.MaxInt32)
|
|
|
|
entries, _, err := filer.ListDirectoryEntries(context.Background(), DirectoryEtcRemote, "", false, limit, "", "", "")
|
|
if err != nil {
|
|
if err == filer_pb.ErrNotFound {
|
|
return nil
|
|
}
|
|
glog.Errorf("read remote storage %s: %v", DirectoryEtcRemote, err)
|
|
return
|
|
}
|
|
|
|
for _, entry := range entries {
|
|
if entry.Name() == REMOTE_STORAGE_MOUNT_FILE {
|
|
if err := rs.loadRemoteStorageMountMapping(entry.Content); err != nil {
|
|
return err
|
|
}
|
|
continue
|
|
}
|
|
if !strings.HasSuffix(entry.Name(), REMOTE_STORAGE_CONF_SUFFIX) {
|
|
return nil
|
|
}
|
|
conf := &remote_pb.RemoteConf{}
|
|
if err := proto.Unmarshal(entry.Content, conf); err != nil {
|
|
return fmt.Errorf("unmarshal %s/%s: %v", DirectoryEtcRemote, entry.Name(), err)
|
|
}
|
|
rs.storageNameToConf[conf.Name] = conf
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (rs *FilerRemoteStorage) loadRemoteStorageMountMapping(data []byte) (err error) {
|
|
mappings := &remote_pb.RemoteStorageMapping{}
|
|
if err := proto.Unmarshal(data, mappings); err != nil {
|
|
return fmt.Errorf("unmarshal %s/%s: %v", DirectoryEtcRemote, REMOTE_STORAGE_MOUNT_FILE, err)
|
|
}
|
|
for dir, storageLocation := range mappings.Mappings {
|
|
rs.mapDirectoryToRemoteStorage(util.FullPath(dir), storageLocation)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (rs *FilerRemoteStorage) mapDirectoryToRemoteStorage(dir util.FullPath, loc *remote_pb.RemoteStorageLocation) {
|
|
rs.rules.Put([]byte(dir+"/"), loc)
|
|
}
|
|
|
|
func (rs *FilerRemoteStorage) FindMountDirectory(p util.FullPath) (mountDir util.FullPath, remoteLocation *remote_pb.RemoteStorageLocation) {
|
|
rs.rules.MatchPrefix([]byte(p), func(key []byte, value interface{}) bool {
|
|
mountDir = util.FullPath(string(key[:len(key)-1]))
|
|
remoteLocation = value.(*remote_pb.RemoteStorageLocation)
|
|
return true
|
|
})
|
|
return
|
|
}
|
|
|
|
func (rs *FilerRemoteStorage) FindRemoteStorageClient(p util.FullPath) (client remote_storage.RemoteStorageClient, remoteConf *remote_pb.RemoteConf, found bool) {
|
|
var storageLocation *remote_pb.RemoteStorageLocation
|
|
rs.rules.MatchPrefix([]byte(p), func(key []byte, value interface{}) bool {
|
|
storageLocation = value.(*remote_pb.RemoteStorageLocation)
|
|
return true
|
|
})
|
|
|
|
if storageLocation == nil {
|
|
found = false
|
|
return
|
|
}
|
|
|
|
return rs.GetRemoteStorageClient(storageLocation.Name)
|
|
}
|
|
|
|
func (rs *FilerRemoteStorage) GetRemoteStorageClient(storageName string) (client remote_storage.RemoteStorageClient, remoteConf *remote_pb.RemoteConf, found bool) {
|
|
remoteConf, found = rs.storageNameToConf[storageName]
|
|
if !found {
|
|
return
|
|
}
|
|
|
|
var err error
|
|
if client, err = remote_storage.GetRemoteStorage(remoteConf); err == nil {
|
|
found = true
|
|
return
|
|
}
|
|
return
|
|
}
|
|
|
|
func UnmarshalRemoteStorageMappings(oldContent []byte) (mappings *remote_pb.RemoteStorageMapping, err error) {
|
|
mappings = &remote_pb.RemoteStorageMapping{
|
|
Mappings: make(map[string]*remote_pb.RemoteStorageLocation),
|
|
}
|
|
if len(oldContent) > 0 {
|
|
if err = proto.Unmarshal(oldContent, mappings); err != nil {
|
|
glog.Warningf("unmarshal existing mappings: %v", err)
|
|
}
|
|
}
|
|
return
|
|
}
|
|
|
|
func ReadRemoteStorageConf(grpcDialOption grpc.DialOption, filerAddress pb.ServerAddress, storageName string) (conf *remote_pb.RemoteConf, readErr error) {
|
|
var oldContent []byte
|
|
if readErr = pb.WithFilerClient(false, filerAddress, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
|
|
oldContent, readErr = ReadInsideFiler(client, DirectoryEtcRemote, storageName+REMOTE_STORAGE_CONF_SUFFIX)
|
|
return readErr
|
|
}); readErr != nil {
|
|
return nil, readErr
|
|
}
|
|
|
|
// unmarshal storage configuration
|
|
conf = &remote_pb.RemoteConf{}
|
|
if unMarshalErr := proto.Unmarshal(oldContent, conf); unMarshalErr != nil {
|
|
readErr = fmt.Errorf("unmarshal %s/%s: %v", DirectoryEtcRemote, storageName+REMOTE_STORAGE_CONF_SUFFIX, unMarshalErr)
|
|
return
|
|
}
|
|
|
|
return
|
|
}
|
|
|
|
func DetectMountInfo(grpcDialOption grpc.DialOption, filerAddress pb.ServerAddress, dir string) (*remote_pb.RemoteStorageMapping, string, *remote_pb.RemoteStorageLocation, *remote_pb.RemoteConf, error) {
|
|
|
|
mappings, listErr := ReadMountMappings(grpcDialOption, filerAddress)
|
|
if listErr != nil {
|
|
return nil, "", nil, nil, listErr
|
|
}
|
|
if dir == "" {
|
|
return mappings, "", nil, nil, fmt.Errorf("need to specify '-dir' option")
|
|
}
|
|
|
|
var localMountedDir string
|
|
var remoteStorageMountedLocation *remote_pb.RemoteStorageLocation
|
|
for k, loc := range mappings.Mappings {
|
|
if strings.HasPrefix(dir, k) {
|
|
localMountedDir, remoteStorageMountedLocation = k, loc
|
|
}
|
|
}
|
|
if localMountedDir == "" {
|
|
return mappings, localMountedDir, remoteStorageMountedLocation, nil, fmt.Errorf("%s is not mounted", dir)
|
|
}
|
|
|
|
// find remote storage configuration
|
|
remoteStorageConf, err := ReadRemoteStorageConf(grpcDialOption, filerAddress, remoteStorageMountedLocation.Name)
|
|
if err != nil {
|
|
return mappings, localMountedDir, remoteStorageMountedLocation, remoteStorageConf, err
|
|
}
|
|
|
|
return mappings, localMountedDir, remoteStorageMountedLocation, remoteStorageConf, nil
|
|
}
|