mirror of
https://github.com/seaweedfs/seaweedfs.git
synced 2025-01-18 06:30:07 +08:00
Merge pull request #1427 from levenlabs/writable
Added VolumeMarkWritable and VolumeStatus grpc methods
This commit is contained in:
commit
e455eb8891
@ -37,8 +37,12 @@ service VolumeServer {
|
||||
}
|
||||
rpc VolumeMarkReadonly (VolumeMarkReadonlyRequest) returns (VolumeMarkReadonlyResponse) {
|
||||
}
|
||||
rpc VolumeMarkWritable (VolumeMarkWritableRequest) returns (VolumeMarkWritableResponse) {
|
||||
}
|
||||
rpc VolumeConfigure (VolumeConfigureRequest) returns (VolumeConfigureResponse) {
|
||||
}
|
||||
rpc VolumeStatus (VolumeStatusRequest) returns (VolumeStatusResponse) {
|
||||
}
|
||||
|
||||
// copy the .idx .dat files, and mount this volume
|
||||
rpc VolumeCopy (VolumeCopyRequest) returns (VolumeCopyResponse) {
|
||||
@ -200,6 +204,12 @@ message VolumeMarkReadonlyRequest {
|
||||
message VolumeMarkReadonlyResponse {
|
||||
}
|
||||
|
||||
message VolumeMarkWritableRequest {
|
||||
uint32 volume_id = 1;
|
||||
}
|
||||
message VolumeMarkWritableResponse {
|
||||
}
|
||||
|
||||
message VolumeConfigureRequest {
|
||||
uint32 volume_id = 1;
|
||||
string replication = 2;
|
||||
@ -208,6 +218,13 @@ message VolumeConfigureResponse {
|
||||
string error = 1;
|
||||
}
|
||||
|
||||
message VolumeStatusRequest {
|
||||
uint32 volume_id = 1;
|
||||
}
|
||||
message VolumeStatusResponse {
|
||||
bool is_read_only = 1;
|
||||
}
|
||||
|
||||
message VolumeCopyRequest {
|
||||
uint32 volume_id = 1;
|
||||
string collection = 2;
|
||||
|
File diff suppressed because it is too large
Load Diff
@ -149,7 +149,35 @@ func (vs *VolumeServer) VolumeMarkReadonly(ctx context.Context, req *volume_serv
|
||||
}
|
||||
|
||||
return resp, err
|
||||
}
|
||||
|
||||
func (vs *VolumeServer) VolumeMarkWritable(ctx context.Context, req *volume_server_pb.VolumeMarkWritableRequest) (*volume_server_pb.VolumeMarkWritableResponse, error) {
|
||||
|
||||
resp := &volume_server_pb.VolumeMarkWritableResponse{}
|
||||
|
||||
err := vs.store.MarkVolumeWritable(needle.VolumeId(req.VolumeId))
|
||||
|
||||
if err != nil {
|
||||
glog.Errorf("volume mark writable %v: %v", req, err)
|
||||
} else {
|
||||
glog.V(2).Infof("volume mark writable %v", req)
|
||||
}
|
||||
|
||||
return resp, err
|
||||
}
|
||||
|
||||
func (vs *VolumeServer) VolumeStatus(ctx context.Context, req *volume_server_pb.VolumeStatusRequest) (*volume_server_pb.VolumeStatusResponse, error) {
|
||||
|
||||
resp := &volume_server_pb.VolumeStatusResponse{}
|
||||
|
||||
v := vs.store.GetVolume(needle.VolumeId(req.VolumeId))
|
||||
if v == nil {
|
||||
return nil, fmt.Errorf("not found volume id %d", req.VolumeId)
|
||||
}
|
||||
|
||||
resp.IsReadOnly = v.IsReadOnly()
|
||||
|
||||
return resp, nil
|
||||
}
|
||||
|
||||
func (vs *VolumeServer) VolumeServerStatus(ctx context.Context, req *volume_server_pb.VolumeServerStatusRequest) (*volume_server_pb.VolumeServerStatusResponse, error) {
|
||||
|
@ -91,6 +91,43 @@ func LiveMoveVolume(grpcDialOption grpc.DialOption, volumeId needle.VolumeId, so
|
||||
|
||||
func copyVolume(grpcDialOption grpc.DialOption, volumeId needle.VolumeId, sourceVolumeServer, targetVolumeServer string) (lastAppendAtNs uint64, err error) {
|
||||
|
||||
// check to see if the volume is already read-only and if its not then we need
|
||||
// to mark it as read-only and then before we return we need to undo what we
|
||||
// did
|
||||
var shouldMarkWritable bool
|
||||
defer func() {
|
||||
if !shouldMarkWritable {
|
||||
return
|
||||
}
|
||||
|
||||
clientErr := operation.WithVolumeServerClient(sourceVolumeServer, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
|
||||
_, writableErr := volumeServerClient.VolumeMarkWritable(context.Background(), &volume_server_pb.VolumeMarkWritableRequest{
|
||||
VolumeId: uint32(volumeId),
|
||||
})
|
||||
return writableErr
|
||||
})
|
||||
if clientErr != nil {
|
||||
log.Printf("failed to mark volume %d as writable after copy from %s: %v", volumeId, sourceVolumeServer, clientErr)
|
||||
}
|
||||
}()
|
||||
|
||||
err = operation.WithVolumeServerClient(sourceVolumeServer, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
|
||||
resp, statusErr := volumeServerClient.VolumeStatus(context.Background(), &volume_server_pb.VolumeStatusRequest{
|
||||
VolumeId: uint32(volumeId),
|
||||
})
|
||||
if statusErr == nil && !resp.IsReadOnly {
|
||||
shouldMarkWritable = true
|
||||
_, readonlyErr := volumeServerClient.VolumeMarkReadonly(context.Background(), &volume_server_pb.VolumeMarkReadonlyRequest{
|
||||
VolumeId: uint32(volumeId),
|
||||
})
|
||||
return readonlyErr
|
||||
}
|
||||
return statusErr
|
||||
})
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
err = operation.WithVolumeServerClient(targetVolumeServer, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
|
||||
resp, replicateErr := volumeServerClient.VolumeCopy(context.Background(), &volume_server_pb.VolumeCopyRequest{
|
||||
VolumeId: uint32(volumeId),
|
||||
|
@ -307,7 +307,20 @@ func (s *Store) MarkVolumeReadonly(i needle.VolumeId) error {
|
||||
if v == nil {
|
||||
return fmt.Errorf("volume %d not found", i)
|
||||
}
|
||||
v.noWriteLock.Lock()
|
||||
v.noWriteOrDelete = true
|
||||
v.noWriteLock.Unlock()
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *Store) MarkVolumeWritable(i needle.VolumeId) error {
|
||||
v := s.findVolume(i)
|
||||
if v == nil {
|
||||
return fmt.Errorf("volume %d not found", i)
|
||||
}
|
||||
v.noWriteLock.Lock()
|
||||
v.noWriteOrDelete = false
|
||||
v.noWriteLock.Unlock()
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -27,6 +27,7 @@ type Volume struct {
|
||||
needleMapKind NeedleMapType
|
||||
noWriteOrDelete bool // if readonly, either noWriteOrDelete or noWriteCanDelete
|
||||
noWriteCanDelete bool // if readonly, either noWriteOrDelete or noWriteCanDelete
|
||||
noWriteLock sync.RWMutex
|
||||
hasRemoteFile bool // if the volume has a remote file
|
||||
MemoryMapMaxSizeMb uint32
|
||||
|
||||
@ -58,6 +59,8 @@ func NewVolume(dirname string, collection string, id needle.VolumeId, needleMapK
|
||||
}
|
||||
|
||||
func (v *Volume) String() string {
|
||||
v.noWriteLock.RLock()
|
||||
defer v.noWriteLock.RUnlock()
|
||||
return fmt.Sprintf("Id:%v, dir:%s, Collection:%s, dataFile:%v, nm:%v, noWrite:%v canDelete:%v", v.Id, v.dir, v.Collection, v.DataBackend, v.nm, v.noWriteOrDelete || v.noWriteCanDelete, v.noWriteCanDelete)
|
||||
}
|
||||
|
||||
@ -245,5 +248,7 @@ func (v *Volume) RemoteStorageNameKey() (storageName, storageKey string) {
|
||||
}
|
||||
|
||||
func (v *Volume) IsReadOnly() bool {
|
||||
v.noWriteLock.RLock()
|
||||
defer v.noWriteLock.RUnlock()
|
||||
return v.noWriteOrDelete || v.noWriteCanDelete || v.location.isDiskSpaceLow
|
||||
}
|
||||
|
@ -26,8 +26,10 @@ func (v *Volume) maybeWriteSuperBlock() error {
|
||||
if dataFile, e = os.Create(v.DataBackend.Name()); e == nil {
|
||||
v.DataBackend = backend.NewDiskFile(dataFile)
|
||||
if _, e = v.DataBackend.WriteAt(v.SuperBlock.Bytes(), 0); e == nil {
|
||||
v.noWriteLock.Lock()
|
||||
v.noWriteOrDelete = false
|
||||
v.noWriteCanDelete = false
|
||||
v.noWriteLock.Unlock()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user