2013-08-14 14:26:51 +08:00
|
|
|
package operation
|
|
|
|
|
|
|
|
import (
|
2018-11-23 16:26:15 +08:00
|
|
|
"context"
|
2015-03-09 16:10:01 +08:00
|
|
|
"fmt"
|
2022-07-29 15:17:28 +08:00
|
|
|
"github.com/seaweedfs/seaweedfs/weed/pb"
|
|
|
|
"github.com/seaweedfs/seaweedfs/weed/pb/master_pb"
|
|
|
|
"github.com/seaweedfs/seaweedfs/weed/security"
|
2024-03-29 15:38:27 +08:00
|
|
|
"github.com/seaweedfs/seaweedfs/weed/stats"
|
2023-08-23 15:31:33 +08:00
|
|
|
"github.com/seaweedfs/seaweedfs/weed/storage/needle"
|
|
|
|
"google.golang.org/grpc"
|
2023-09-11 13:03:44 +08:00
|
|
|
"sync"
|
2013-08-14 14:26:51 +08:00
|
|
|
)
|
|
|
|
|
2016-06-26 10:50:18 +08:00
|
|
|
type VolumeAssignRequest struct {
|
2019-11-10 20:11:03 +08:00
|
|
|
Count uint64
|
|
|
|
Replication string
|
|
|
|
Collection string
|
|
|
|
Ttl string
|
2020-12-17 01:14:05 +08:00
|
|
|
DiskType string
|
2019-11-10 20:11:03 +08:00
|
|
|
DataCenter string
|
|
|
|
Rack string
|
|
|
|
DataNode string
|
|
|
|
WritableVolumeCount uint32
|
2016-06-26 10:50:18 +08:00
|
|
|
}
|
|
|
|
|
2013-08-14 14:26:51 +08:00
|
|
|
type AssignResult struct {
|
2021-09-13 13:47:52 +08:00
|
|
|
Fid string `json:"fid,omitempty"`
|
|
|
|
Url string `json:"url,omitempty"`
|
|
|
|
PublicUrl string `json:"publicUrl,omitempty"`
|
|
|
|
GrpcPort int `json:"grpcPort,omitempty"`
|
|
|
|
Count uint64 `json:"count,omitempty"`
|
|
|
|
Error string `json:"error,omitempty"`
|
|
|
|
Auth security.EncodedJwt `json:"auth,omitempty"`
|
|
|
|
Replicas []Location `json:"replicas,omitempty"`
|
2013-08-14 14:26:51 +08:00
|
|
|
}
|
|
|
|
|
2023-08-23 15:31:33 +08:00
|
|
|
// This is a proxy to the master server, only for assigning volume ids.
|
|
|
|
// It runs via grpc to the master server in streaming mode.
|
|
|
|
// The connection to the master would only be re-established when the last connection has error.
|
|
|
|
type AssignProxy struct {
|
|
|
|
grpcConnection *grpc.ClientConn
|
|
|
|
pool chan *singleThreadAssignProxy
|
|
|
|
}
|
|
|
|
|
|
|
|
func NewAssignProxy(masterFn GetMasterFn, grpcDialOption grpc.DialOption, concurrency int) (ap *AssignProxy, err error) {
|
|
|
|
ap = &AssignProxy{
|
|
|
|
pool: make(chan *singleThreadAssignProxy, concurrency),
|
|
|
|
}
|
2024-06-15 02:40:34 +08:00
|
|
|
ap.grpcConnection, err = pb.GrpcDial(context.Background(), masterFn(context.Background()).ToGrpcAddress(), true, grpcDialOption)
|
2023-08-23 15:31:33 +08:00
|
|
|
if err != nil {
|
2024-06-15 02:40:34 +08:00
|
|
|
return nil, fmt.Errorf("fail to dial %s: %v", masterFn(context.Background()).ToGrpcAddress(), err)
|
2023-08-23 15:31:33 +08:00
|
|
|
}
|
|
|
|
for i := 0; i < concurrency; i++ {
|
|
|
|
ap.pool <- &singleThreadAssignProxy{}
|
|
|
|
}
|
|
|
|
return ap, nil
|
|
|
|
}
|
|
|
|
|
|
|
|
func (ap *AssignProxy) Assign(primaryRequest *VolumeAssignRequest, alternativeRequests ...*VolumeAssignRequest) (ret *AssignResult, err error) {
|
|
|
|
p := <-ap.pool
|
|
|
|
defer func() {
|
|
|
|
ap.pool <- p
|
|
|
|
}()
|
|
|
|
|
|
|
|
return p.doAssign(ap.grpcConnection, primaryRequest, alternativeRequests...)
|
|
|
|
}
|
|
|
|
|
|
|
|
type singleThreadAssignProxy struct {
|
|
|
|
assignClient master_pb.Seaweed_StreamAssignClient
|
2023-09-11 13:03:44 +08:00
|
|
|
sync.Mutex
|
2023-08-23 15:31:33 +08:00
|
|
|
}
|
|
|
|
|
|
|
|
func (ap *singleThreadAssignProxy) doAssign(grpcConnection *grpc.ClientConn, primaryRequest *VolumeAssignRequest, alternativeRequests ...*VolumeAssignRequest) (ret *AssignResult, err error) {
|
2023-09-11 13:03:44 +08:00
|
|
|
ap.Lock()
|
|
|
|
defer ap.Unlock()
|
|
|
|
|
2023-08-23 15:31:33 +08:00
|
|
|
if ap.assignClient == nil {
|
|
|
|
client := master_pb.NewSeaweedClient(grpcConnection)
|
|
|
|
ap.assignClient, err = client.StreamAssign(context.Background())
|
|
|
|
if err != nil {
|
|
|
|
ap.assignClient = nil
|
|
|
|
return nil, fmt.Errorf("fail to create stream assign client: %v", err)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
var requests []*VolumeAssignRequest
|
|
|
|
requests = append(requests, primaryRequest)
|
|
|
|
requests = append(requests, alternativeRequests...)
|
|
|
|
ret = &AssignResult{}
|
|
|
|
|
|
|
|
for _, request := range requests {
|
|
|
|
if request == nil {
|
|
|
|
continue
|
|
|
|
}
|
|
|
|
req := &master_pb.AssignRequest{
|
|
|
|
Count: request.Count,
|
|
|
|
Replication: request.Replication,
|
|
|
|
Collection: request.Collection,
|
|
|
|
Ttl: request.Ttl,
|
|
|
|
DiskType: request.DiskType,
|
|
|
|
DataCenter: request.DataCenter,
|
|
|
|
Rack: request.Rack,
|
|
|
|
DataNode: request.DataNode,
|
|
|
|
WritableVolumeCount: request.WritableVolumeCount,
|
|
|
|
}
|
|
|
|
if err = ap.assignClient.Send(req); err != nil {
|
|
|
|
return nil, fmt.Errorf("StreamAssignSend: %v", err)
|
|
|
|
}
|
|
|
|
resp, grpcErr := ap.assignClient.Recv()
|
|
|
|
if grpcErr != nil {
|
|
|
|
return nil, grpcErr
|
|
|
|
}
|
|
|
|
if resp.Error != "" {
|
|
|
|
return nil, fmt.Errorf("StreamAssignRecv: %v", resp.Error)
|
|
|
|
}
|
|
|
|
|
|
|
|
ret.Count = resp.Count
|
|
|
|
ret.Fid = resp.Fid
|
|
|
|
ret.Url = resp.Location.Url
|
|
|
|
ret.PublicUrl = resp.Location.PublicUrl
|
|
|
|
ret.GrpcPort = int(resp.Location.GrpcPort)
|
|
|
|
ret.Error = resp.Error
|
|
|
|
ret.Auth = security.EncodedJwt(resp.Auth)
|
|
|
|
for _, r := range resp.Replicas {
|
|
|
|
ret.Replicas = append(ret.Replicas, Location{
|
|
|
|
Url: r.Url,
|
|
|
|
PublicUrl: r.PublicUrl,
|
|
|
|
DataCenter: r.DataCenter,
|
|
|
|
})
|
|
|
|
}
|
|
|
|
|
|
|
|
if ret.Count <= 0 {
|
|
|
|
continue
|
|
|
|
}
|
|
|
|
break
|
|
|
|
}
|
|
|
|
|
|
|
|
return
|
|
|
|
}
|
|
|
|
|
2021-02-18 12:55:55 +08:00
|
|
|
func Assign(masterFn GetMasterFn, grpcDialOption grpc.DialOption, primaryRequest *VolumeAssignRequest, alternativeRequests ...*VolumeAssignRequest) (*AssignResult, error) {
|
2018-11-21 03:35:45 +08:00
|
|
|
|
2018-07-10 14:18:20 +08:00
|
|
|
var requests []*VolumeAssignRequest
|
|
|
|
requests = append(requests, primaryRequest)
|
|
|
|
requests = append(requests, alternativeRequests...)
|
2016-06-23 11:19:09 +08:00
|
|
|
|
2018-07-10 14:18:20 +08:00
|
|
|
var lastError error
|
2018-11-21 03:35:45 +08:00
|
|
|
ret := &AssignResult{}
|
|
|
|
|
2018-07-10 14:18:20 +08:00
|
|
|
for i, request := range requests {
|
|
|
|
if request == nil {
|
|
|
|
continue
|
|
|
|
}
|
|
|
|
|
2024-06-15 02:40:34 +08:00
|
|
|
lastError = WithMasterServerClient(false, masterFn(context.Background()), grpcDialOption, func(masterClient master_pb.SeaweedClient) error {
|
2018-11-21 03:35:45 +08:00
|
|
|
req := &master_pb.AssignRequest{
|
2020-11-17 16:36:21 +08:00
|
|
|
Count: request.Count,
|
|
|
|
Replication: request.Replication,
|
|
|
|
Collection: request.Collection,
|
|
|
|
Ttl: request.Ttl,
|
2020-12-17 01:14:05 +08:00
|
|
|
DiskType: request.DiskType,
|
2020-11-17 16:36:21 +08:00
|
|
|
DataCenter: request.DataCenter,
|
|
|
|
Rack: request.Rack,
|
|
|
|
DataNode: request.DataNode,
|
|
|
|
WritableVolumeCount: request.WritableVolumeCount,
|
2018-11-21 03:35:45 +08:00
|
|
|
}
|
2019-02-20 17:01:01 +08:00
|
|
|
resp, grpcErr := masterClient.Assign(context.Background(), req)
|
2018-11-21 03:35:45 +08:00
|
|
|
if grpcErr != nil {
|
|
|
|
return grpcErr
|
|
|
|
}
|
|
|
|
|
2021-10-11 16:24:30 +08:00
|
|
|
if resp.Error != "" {
|
|
|
|
return fmt.Errorf("assignRequest: %v", resp.Error)
|
|
|
|
}
|
|
|
|
|
2018-11-21 03:35:45 +08:00
|
|
|
ret.Count = resp.Count
|
|
|
|
ret.Fid = resp.Fid
|
2021-09-13 13:47:52 +08:00
|
|
|
ret.Url = resp.Location.Url
|
|
|
|
ret.PublicUrl = resp.Location.PublicUrl
|
|
|
|
ret.GrpcPort = int(resp.Location.GrpcPort)
|
2018-11-21 03:35:45 +08:00
|
|
|
ret.Error = resp.Error
|
2019-02-15 16:09:19 +08:00
|
|
|
ret.Auth = security.EncodedJwt(resp.Auth)
|
2021-09-09 06:54:55 +08:00
|
|
|
for _, r := range resp.Replicas {
|
2021-09-13 13:47:52 +08:00
|
|
|
ret.Replicas = append(ret.Replicas, Location{
|
2022-08-05 08:35:00 +08:00
|
|
|
Url: r.Url,
|
|
|
|
PublicUrl: r.PublicUrl,
|
|
|
|
DataCenter: r.DataCenter,
|
2021-09-06 14:32:25 +08:00
|
|
|
})
|
|
|
|
}
|
2018-11-21 03:35:45 +08:00
|
|
|
|
|
|
|
return nil
|
|
|
|
|
|
|
|
})
|
|
|
|
|
|
|
|
if lastError != nil {
|
2024-03-29 15:38:27 +08:00
|
|
|
stats.FilerHandlerCounter.WithLabelValues(stats.ErrorChunkAssign).Inc()
|
2018-11-21 03:35:45 +08:00
|
|
|
continue
|
2018-07-10 14:18:20 +08:00
|
|
|
}
|
2018-11-21 03:35:45 +08:00
|
|
|
|
2018-07-10 14:18:20 +08:00
|
|
|
if ret.Count <= 0 {
|
|
|
|
lastError = fmt.Errorf("assign failure %d: %v", i+1, ret.Error)
|
|
|
|
continue
|
|
|
|
}
|
2018-11-21 03:35:45 +08:00
|
|
|
|
2021-04-14 18:29:28 +08:00
|
|
|
break
|
2013-08-14 14:26:51 +08:00
|
|
|
}
|
2018-11-21 03:35:45 +08:00
|
|
|
|
|
|
|
return ret, lastError
|
2013-08-14 14:26:51 +08:00
|
|
|
}
|
2019-02-15 16:09:19 +08:00
|
|
|
|
2021-09-13 13:47:52 +08:00
|
|
|
func LookupJwt(master pb.ServerAddress, grpcDialOption grpc.DialOption, fileId string) (token security.EncodedJwt) {
|
2019-02-15 16:09:19 +08:00
|
|
|
|
2021-12-26 16:15:03 +08:00
|
|
|
WithMasterServerClient(false, master, grpcDialOption, func(masterClient master_pb.SeaweedClient) error {
|
2019-02-15 16:09:19 +08:00
|
|
|
|
2021-08-13 12:40:33 +08:00
|
|
|
resp, grpcErr := masterClient.LookupVolume(context.Background(), &master_pb.LookupVolumeRequest{
|
|
|
|
VolumeOrFileIds: []string{fileId},
|
|
|
|
})
|
|
|
|
if grpcErr != nil {
|
|
|
|
return grpcErr
|
2019-02-15 16:09:19 +08:00
|
|
|
}
|
|
|
|
|
2021-08-13 12:40:33 +08:00
|
|
|
if len(resp.VolumeIdLocations) == 0 {
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
token = security.EncodedJwt(resp.VolumeIdLocations[0].Auth)
|
|
|
|
|
|
|
|
return nil
|
|
|
|
|
|
|
|
})
|
|
|
|
|
|
|
|
return
|
2019-02-15 16:09:19 +08:00
|
|
|
}
|
2020-11-16 08:58:48 +08:00
|
|
|
|
|
|
|
type StorageOption struct {
|
2020-11-17 17:00:02 +08:00
|
|
|
Replication string
|
2020-12-17 01:14:05 +08:00
|
|
|
DiskType string
|
2020-11-17 17:00:02 +08:00
|
|
|
Collection string
|
|
|
|
DataCenter string
|
|
|
|
Rack string
|
2021-12-22 21:57:26 +08:00
|
|
|
DataNode string
|
2020-11-17 17:00:02 +08:00
|
|
|
TtlSeconds int32
|
|
|
|
VolumeGrowthCount uint32
|
2023-10-13 05:29:55 +08:00
|
|
|
MaxFileNameLength uint32
|
|
|
|
Fsync bool
|
2023-04-29 23:31:05 +08:00
|
|
|
SaveInside bool
|
2020-11-16 08:58:48 +08:00
|
|
|
}
|
|
|
|
|
|
|
|
func (so *StorageOption) TtlString() string {
|
|
|
|
return needle.SecondsToTTL(so.TtlSeconds)
|
|
|
|
}
|
|
|
|
|
|
|
|
func (so *StorageOption) ToAssignRequests(count int) (ar *VolumeAssignRequest, altRequest *VolumeAssignRequest) {
|
|
|
|
ar = &VolumeAssignRequest{
|
2020-11-17 17:00:02 +08:00
|
|
|
Count: uint64(count),
|
|
|
|
Replication: so.Replication,
|
|
|
|
Collection: so.Collection,
|
|
|
|
Ttl: so.TtlString(),
|
2020-12-17 01:14:05 +08:00
|
|
|
DiskType: so.DiskType,
|
2020-11-17 17:00:02 +08:00
|
|
|
DataCenter: so.DataCenter,
|
|
|
|
Rack: so.Rack,
|
2021-12-22 21:57:26 +08:00
|
|
|
DataNode: so.DataNode,
|
2020-11-17 17:00:02 +08:00
|
|
|
WritableVolumeCount: so.VolumeGrowthCount,
|
2020-11-16 08:58:48 +08:00
|
|
|
}
|
2021-12-22 21:57:26 +08:00
|
|
|
if so.DataCenter != "" || so.Rack != "" || so.DataNode != "" {
|
2020-11-16 08:58:48 +08:00
|
|
|
altRequest = &VolumeAssignRequest{
|
2020-11-17 17:00:02 +08:00
|
|
|
Count: uint64(count),
|
|
|
|
Replication: so.Replication,
|
|
|
|
Collection: so.Collection,
|
|
|
|
Ttl: so.TtlString(),
|
2020-12-17 01:14:05 +08:00
|
|
|
DiskType: so.DiskType,
|
2020-11-17 17:00:02 +08:00
|
|
|
DataCenter: "",
|
|
|
|
Rack: "",
|
2021-12-22 21:57:26 +08:00
|
|
|
DataNode: "",
|
2020-11-17 17:00:02 +08:00
|
|
|
WritableVolumeCount: so.VolumeGrowthCount,
|
2020-11-16 08:58:48 +08:00
|
|
|
}
|
|
|
|
}
|
|
|
|
return
|
|
|
|
}
|