mirror of
https://github.com/seaweedfs/seaweedfs.git
synced 2025-01-05 18:05:12 +08:00
2e78a522ab
* remove old raft servers if they don't answer to pings for too long add ping durations as options rename ping fields fix some todos get masters through masterclient raft remove server from leader use raft servers to ping them CheckMastersAlive for hashicorp raft only * prepare blocking ping * pass waitForReady as param * pass waitForReady through all functions * waitForReady works * refactor * remove unneeded params * rollback unneeded changes * fix
153 lines
3.8 KiB
Go
153 lines
3.8 KiB
Go
package source
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"io"
|
|
"net/http"
|
|
"strings"
|
|
|
|
"google.golang.org/grpc"
|
|
|
|
"github.com/seaweedfs/seaweedfs/weed/pb"
|
|
"github.com/seaweedfs/seaweedfs/weed/security"
|
|
|
|
"github.com/seaweedfs/seaweedfs/weed/glog"
|
|
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
|
|
"github.com/seaweedfs/seaweedfs/weed/util"
|
|
)
|
|
|
|
type ReplicationSource interface {
|
|
ReadPart(part string) io.ReadCloser
|
|
}
|
|
|
|
type FilerSource struct {
|
|
grpcAddress string
|
|
grpcDialOption grpc.DialOption
|
|
Dir string
|
|
address string
|
|
proxyByFiler bool
|
|
dataCenter string
|
|
}
|
|
|
|
func (fs *FilerSource) Initialize(configuration util.Configuration, prefix string) error {
|
|
fs.dataCenter = configuration.GetString(prefix + "dataCenter")
|
|
return fs.DoInitialize(
|
|
"",
|
|
configuration.GetString(prefix+"grpcAddress"),
|
|
configuration.GetString(prefix+"directory"),
|
|
false,
|
|
)
|
|
}
|
|
|
|
func (fs *FilerSource) DoInitialize(address, grpcAddress string, dir string, readChunkFromFiler bool) (err error) {
|
|
fs.address = address
|
|
if fs.address == "" {
|
|
fs.address = pb.GrpcAddressToServerAddress(grpcAddress)
|
|
}
|
|
fs.grpcAddress = grpcAddress
|
|
fs.Dir = dir
|
|
fs.grpcDialOption = security.LoadClientTLS(util.GetViper(), "grpc.client")
|
|
fs.proxyByFiler = readChunkFromFiler
|
|
return nil
|
|
}
|
|
|
|
func (fs *FilerSource) LookupFileId(part string) (fileUrls []string, err error) {
|
|
|
|
vid2Locations := make(map[string]*filer_pb.Locations)
|
|
|
|
vid := volumeId(part)
|
|
|
|
err = fs.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
|
|
|
|
resp, err := client.LookupVolume(context.Background(), &filer_pb.LookupVolumeRequest{
|
|
VolumeIds: []string{vid},
|
|
})
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
vid2Locations = resp.LocationsMap
|
|
|
|
return nil
|
|
})
|
|
|
|
if err != nil {
|
|
glog.V(1).Infof("LookupFileId volume id %s: %v", vid, err)
|
|
return nil, fmt.Errorf("LookupFileId volume id %s: %v", vid, err)
|
|
}
|
|
|
|
locations := vid2Locations[vid]
|
|
|
|
if locations == nil || len(locations.Locations) == 0 {
|
|
glog.V(1).Infof("LookupFileId locate volume id %s: %v", vid, err)
|
|
return nil, fmt.Errorf("LookupFileId locate volume id %s: %v", vid, err)
|
|
}
|
|
|
|
if !fs.proxyByFiler {
|
|
for _, loc := range locations.Locations {
|
|
fileUrl := fmt.Sprintf("http://%s/%s?readDeleted=true", loc.Url, part)
|
|
// Prefer same data center
|
|
if fs.dataCenter != "" && fs.dataCenter == loc.DataCenter {
|
|
fileUrls = append([]string{fileUrl}, fileUrls...)
|
|
} else {
|
|
fileUrls = append(fileUrls, fileUrl)
|
|
}
|
|
}
|
|
} else {
|
|
fileUrls = append(fileUrls, fmt.Sprintf("http://%s/?proxyChunkId=%s", fs.address, part))
|
|
}
|
|
|
|
return
|
|
}
|
|
|
|
func (fs *FilerSource) ReadPart(fileId string) (filename string, header http.Header, resp *http.Response, err error) {
|
|
|
|
if fs.proxyByFiler {
|
|
return util.DownloadFile("http://"+fs.address+"/?proxyChunkId="+fileId, "")
|
|
}
|
|
|
|
fileUrls, err := fs.LookupFileId(fileId)
|
|
if err != nil {
|
|
return "", nil, nil, err
|
|
}
|
|
|
|
for _, fileUrl := range fileUrls {
|
|
filename, header, resp, err = util.DownloadFile(fileUrl, "")
|
|
if err != nil {
|
|
glog.V(1).Infof("fail to read from %s: %v", fileUrl, err)
|
|
} else {
|
|
break
|
|
}
|
|
}
|
|
|
|
return filename, header, resp, err
|
|
}
|
|
|
|
var _ = filer_pb.FilerClient(&FilerSource{})
|
|
|
|
func (fs *FilerSource) WithFilerClient(streamingMode bool, fn func(filer_pb.SeaweedFilerClient) error) error {
|
|
|
|
return pb.WithGrpcClient(streamingMode, func(grpcConnection *grpc.ClientConn) error {
|
|
client := filer_pb.NewSeaweedFilerClient(grpcConnection)
|
|
return fn(client)
|
|
}, fs.grpcAddress, false, fs.grpcDialOption)
|
|
|
|
}
|
|
|
|
func (fs *FilerSource) AdjustedUrl(location *filer_pb.Location) string {
|
|
return location.Url
|
|
}
|
|
|
|
func (fs *FilerSource) GetDataCenter() string {
|
|
return fs.dataCenter
|
|
}
|
|
|
|
func volumeId(fileId string) string {
|
|
lastCommaIndex := strings.LastIndex(fileId, ",")
|
|
if lastCommaIndex > 0 {
|
|
return fileId[:lastCommaIndex]
|
|
}
|
|
return fileId
|
|
}
|