avoid fail on tail error

This commit is contained in:
Chris Lu 2021-08-10 02:50:28 -07:00
parent 18228f3044
commit 69a6da7969
4 changed files with 47 additions and 28 deletions

View File

@ -71,7 +71,10 @@ func TailVolumeFromSource(volumeServer string, grpcDialOption grpc.DialOption, v
n := new(needle.Needle)
n.ParseNeedleHeader(needleHeader)
n.ReadNeedleBodyBytes(needleBody, needle.CurrentVersion)
err = n.ReadNeedleBodyBytes(needleBody, needle.CurrentVersion)
if err != nil {
return err
}
err = fn(n)

View File

@ -340,7 +340,7 @@ func moveVolume(commandEnv *CommandEnv, v *master_pb.VolumeInformationMessage, f
}
fmt.Fprintf(os.Stdout, " moving %s volume %s%d %s => %s\n", v.DiskType, collectionPrefix, v.Id, fullNode.info.Id, emptyNode.info.Id)
if applyChange {
return LiveMoveVolume(commandEnv.option.GrpcDialOption, needle.VolumeId(v.Id), fullNode.info.Id, emptyNode.info.Id, 5*time.Second, v.DiskType)
return LiveMoveVolume(commandEnv.option.GrpcDialOption, os.Stderr, needle.VolumeId(v.Id), fullNode.info.Id, emptyNode.info.Id, 5*time.Second, v.DiskType, false)
}
return nil
}

View File

@ -69,11 +69,11 @@ func (c *commandVolumeMove) Do(args []string, commandEnv *CommandEnv, writer io.
return fmt.Errorf("source and target volume servers are the same!")
}
return LiveMoveVolume(commandEnv.option.GrpcDialOption, volumeId, sourceVolumeServer, targetVolumeServer, 5*time.Second, *diskTypeStr)
return LiveMoveVolume(commandEnv.option.GrpcDialOption, writer, volumeId, sourceVolumeServer, targetVolumeServer, 5*time.Second, *diskTypeStr, false)
}
// LiveMoveVolume moves one volume from one source volume server to one target volume server, with idleTimeout to drain the incoming requests.
func LiveMoveVolume(grpcDialOption grpc.DialOption, volumeId needle.VolumeId, sourceVolumeServer, targetVolumeServer string, idleTimeout time.Duration, diskType string) (err error) {
func LiveMoveVolume(grpcDialOption grpc.DialOption, writer io.Writer, volumeId needle.VolumeId, sourceVolumeServer, targetVolumeServer string, idleTimeout time.Duration, diskType string, skipTailError bool) (err error) {
log.Printf("copying volume %d from %s to %s", volumeId, sourceVolumeServer, targetVolumeServer)
lastAppendAtNs, err := copyVolume(grpcDialOption, volumeId, sourceVolumeServer, targetVolumeServer, diskType)
@ -83,7 +83,11 @@ func LiveMoveVolume(grpcDialOption grpc.DialOption, volumeId needle.VolumeId, so
log.Printf("tailing volume %d from %s to %s", volumeId, sourceVolumeServer, targetVolumeServer)
if err = tailVolume(grpcDialOption, volumeId, sourceVolumeServer, targetVolumeServer, lastAppendAtNs, idleTimeout); err != nil {
return fmt.Errorf("tail volume %d from %s to %s: %v", volumeId, sourceVolumeServer, targetVolumeServer, err)
if skipTailError {
fmt.Fprintf(writer, "tail volume %d from %s to %s: %v", volumeId, sourceVolumeServer, targetVolumeServer, err)
} else {
return fmt.Errorf("tail volume %d from %s to %s: %v", volumeId, sourceVolumeServer, targetVolumeServer, err)
}
}
log.Printf("deleting volume %d from %s", volumeId, sourceVolumeServer)

View File

@ -8,7 +8,7 @@ import (
"github.com/chrislusf/seaweedfs/weed/wdclient"
"io"
"path/filepath"
"strings"
"sync"
"time"
"github.com/chrislusf/seaweedfs/weed/storage/needle"
@ -19,6 +19,9 @@ func init() {
}
type commandVolumeTierMove struct {
activeServers map[string]struct{}
activeServersLock sync.Mutex
activeServersCond *sync.Cond
}
func (c *commandVolumeTierMove) Name() string {
@ -38,6 +41,9 @@ func (c *commandVolumeTierMove) Help() string {
func (c *commandVolumeTierMove) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
c.activeServers = make(map[string]struct{})
c.activeServersCond = sync.NewCond(new(sync.Mutex))
if err = commandEnv.confirmIsLocked(); err != nil {
return
}
@ -75,7 +81,7 @@ func (c *commandVolumeTierMove) Do(args []string, commandEnv *CommandEnv, writer
_, allLocations := collectVolumeReplicaLocations(topologyInfo)
for _, vid := range volumeIds {
if err = doVolumeTierMove(commandEnv, writer, vid, toDiskType, allLocations, *applyChange); err != nil {
if err = c.doVolumeTierMove(commandEnv, writer, vid, toDiskType, allLocations, *applyChange); err != nil {
fmt.Printf("tier move volume %d: %v\n", vid, err)
}
}
@ -92,7 +98,7 @@ func isOneOf(server string, locations []wdclient.Location) bool {
return false
}
func doVolumeTierMove(commandEnv *CommandEnv, writer io.Writer, vid needle.VolumeId, toDiskType types.DiskType, allLocations []location, applyChanges bool) (err error) {
func (c *commandVolumeTierMove) doVolumeTierMove(commandEnv *CommandEnv, writer io.Writer, vid needle.VolumeId, toDiskType types.DiskType, allLocations []location, applyChanges bool) (err error) {
// find volume location
locations, found := commandEnv.MasterClient.GetLocations(uint32(vid))
if !found {
@ -127,26 +133,8 @@ func doVolumeTierMove(commandEnv *CommandEnv, writer io.Writer, vid needle.Volum
break
}
// mark all replicas as read only
if err = markVolumeReadonly(commandEnv.option.GrpcDialOption, vid, locations); err != nil {
return fmt.Errorf("mark volume %d as readonly on %s: %v", vid, locations[0].Url, err)
}
if err = LiveMoveVolume(commandEnv.option.GrpcDialOption, vid, sourceVolumeServer, dst.dataNode.Id, 5*time.Second, toDiskType.ReadableString()); err != nil {
return fmt.Errorf("move volume %d %s => %s : %v", vid, locations[0].Url, dst.dataNode.Id, err)
}
// adjust volume count
dst.dataNode.DiskInfos[string(toDiskType)].VolumeCount++
// remove the remaining replicas
for _, loc := range locations {
if loc.Url != dst.dataNode.Id {
if err = deleteVolume(commandEnv.option.GrpcDialOption, vid, loc.Url); err != nil {
if !strings.Contains(err.Error(), "not found") {
fmt.Fprintf(writer, "failed to delete volume %d on %s: %v\n", vid, loc.Url, err)
}
}
}
if err := c.doMoveOneVolume(commandEnv, writer, vid, toDiskType, locations, sourceVolumeServer, dst); err != nil {
return err
}
}
}
@ -158,6 +146,30 @@ func doVolumeTierMove(commandEnv *CommandEnv, writer io.Writer, vid needle.Volum
return nil
}
func (c *commandVolumeTierMove) doMoveOneVolume(commandEnv *CommandEnv, writer io.Writer, vid needle.VolumeId, toDiskType types.DiskType, locations []wdclient.Location, sourceVolumeServer string, dst location) (err error) {
// mark all replicas as read only
if err = markVolumeReadonly(commandEnv.option.GrpcDialOption, vid, locations); err != nil {
return fmt.Errorf("mark volume %d as readonly on %s: %v", vid, locations[0].Url, err)
}
if err = LiveMoveVolume(commandEnv.option.GrpcDialOption, writer, vid, sourceVolumeServer, dst.dataNode.Id, 5*time.Second, toDiskType.ReadableString(), true); err != nil {
return fmt.Errorf("move volume %d %s => %s : %v", vid, locations[0].Url, dst.dataNode.Id, err)
}
// adjust volume count
dst.dataNode.DiskInfos[string(toDiskType)].VolumeCount++
// remove the remaining replicas
for _, loc := range locations {
if loc.Url != dst.dataNode.Id && loc.Url != sourceVolumeServer {
if err = deleteVolume(commandEnv.option.GrpcDialOption, vid, loc.Url); err != nil {
fmt.Fprintf(writer, "failed to delete volume %d on %s: %v\n", vid, loc.Url, err)
}
}
}
return nil
}
func collectVolumeIdsForTierChange(commandEnv *CommandEnv, topologyInfo *master_pb.TopologyInfo, volumeSizeLimitMb uint64, sourceTier types.DiskType, collectionPattern string, fullPercentage float64, quietPeriod time.Duration) (vids []needle.VolumeId, err error) {
quietSeconds := int64(quietPeriod / time.Second)