shell: volume.fix.replication also purge over replicated volumes

This commit is contained in:
Chris Lu 2020-09-07 16:00:10 -07:00
parent c18ea21f7a
commit 64a621bcc8

View File

@ -3,8 +3,8 @@ package shell
import (
"context"
"fmt"
"github.com/chrislusf/seaweedfs/weed/storage/needle"
"io"
"math/rand"
"sort"
"github.com/chrislusf/seaweedfs/weed/operation"
@ -27,11 +27,13 @@ func (c *commandVolumeFixReplication) Name() string {
func (c *commandVolumeFixReplication) Help() string {
return `add replicas to volumes that are missing replicas
This command finds all under-replicated volumes, and finds volume servers with free slots.
This command finds all over-replicated volumes. If found, it will purge the oldest copies and stop.
This command also finds all under-replicated volumes, and finds volume servers with free slots.
If the free slots satisfy the replication requirement, the volume content is copied over and mounted.
volume.fix.replication -n # do not take action
volume.fix.replication # actually copying the volume files and mount the volume
volume.fix.replication # actually deleting or copying the volume files and mount the volume
Note:
* each time this will only add back one replica for one volume id. If there are multiple replicas
@ -69,20 +71,18 @@ func (c *commandVolumeFixReplication) Do(args []string, commandEnv *CommandEnv,
eachDataNode(resp.TopologyInfo, func(dc string, rack RackId, dn *master_pb.DataNodeInfo) {
loc := newLocation(dc, string(rack), dn)
for _, v := range dn.VolumeInfos {
if v.ReplicaPlacement > 0 {
volumeReplicas[v.Id] = append(volumeReplicas[v.Id], &VolumeReplica{
location: &loc,
info: v,
})
}
}
allLocations = append(allLocations, loc)
})
// find all under replicated volumes
var underReplicatedVolumeIds, overReplicatedVolumeIds []uint32
for vid, replicas := range volumeReplicas {
replica := replicas[rand.Intn(len(replicas))]
replica := replicas[0]
replicaPlacement, _ := super_block.NewReplicaPlacementFromByte(byte(replica.info.ReplicaPlacement))
if replicaPlacement.GetCopyCount() > len(replicas) {
underReplicatedVolumeIds = append(underReplicatedVolumeIds, vid)
@ -92,6 +92,10 @@ func (c *commandVolumeFixReplication) Do(args []string, commandEnv *CommandEnv,
}
}
if len(overReplicatedVolumeIds) > 0 {
return c.fixOverReplicatedVolumes(commandEnv, writer, takeAction, overReplicatedVolumeIds, volumeReplicas, allLocations)
}
if len(underReplicatedVolumeIds) == 0 {
return fmt.Errorf("no under replicated volumes")
}
@ -107,10 +111,31 @@ func (c *commandVolumeFixReplication) Do(args []string, commandEnv *CommandEnv,
}
func (c *commandVolumeFixReplication) fixOverReplicatedVolumes(commandEnv *CommandEnv, writer io.Writer, takeAction bool, overReplicatedVolumeIds []uint32, volumeReplicas map[uint32][]*VolumeReplica, allLocations []location) error {
for _, vid := range overReplicatedVolumeIds {
replicas := volumeReplicas[vid]
replicaPlacement, _ := super_block.NewReplicaPlacementFromByte(byte(replicas[0].info.ReplicaPlacement))
replica := pickOneReplicaToDelete(replicas, replicaPlacement)
fmt.Fprintf(writer, "deleting volume %d from %s ...\n", replica.info.Id, replica.location.dataNode.Id)
if !takeAction {
break
}
if err := deleteVolume(commandEnv.option.GrpcDialOption, needle.VolumeId(replica.info.Id), replica.location.dataNode.Id); err != nil {
return fmt.Errorf("deleting volume %d from %s : %v", replica.info.Id, replica.location.dataNode.Id, err)
}
}
return nil
}
func (c *commandVolumeFixReplication) fixUnderReplicatedVolumes(commandEnv *CommandEnv, writer io.Writer, takeAction bool, underReplicatedVolumeIds []uint32, volumeReplicas map[uint32][]*VolumeReplica, allLocations []location) error {
for _, vid := range underReplicatedVolumeIds {
replicas := volumeReplicas[vid]
replica := replicas[rand.Intn(len(replicas))]
replica := pickOneReplicaToCopyFrom(replicas)
replicaPlacement, _ := super_block.NewReplicaPlacementFromByte(byte(replica.info.ReplicaPlacement))
foundNewLocation := false
for _, dst := range allLocations {
@ -191,20 +216,13 @@ func keepDataNodesSorted(dataNodes []location) {
*/
func satisfyReplicaPlacement(replicaPlacement *super_block.ReplicaPlacement, replicas []*VolumeReplica, possibleLocation location) bool {
existingDataNodes := make(map[string]int)
for _, replica := range replicas {
existingDataNodes[replica.location.String()] += 1
}
sameDataNodeCount := existingDataNodes[possibleLocation.String()]
existingDataCenters, _, existingDataNodes := countReplicas(replicas)
if _, found := existingDataNodes[possibleLocation.String()]; found {
// avoid duplicated volume on the same data node
if sameDataNodeCount > 0 {
return false
}
existingDataCenters := make(map[string]int)
for _, replica := range replicas {
existingDataCenters[replica.location.DataCenter()] += 1
}
primaryDataCenters, _ := findTopKeys(existingDataCenters)
// ensure data center count is within limit
@ -225,20 +243,20 @@ func satisfyReplicaPlacement(replicaPlacement *super_block.ReplicaPlacement, rep
}
// now this is one of the primary dcs
existingRacks := make(map[string]int)
primaryDcRacks := make(map[string]int)
for _, replica := range replicas {
if replica.location.DataCenter() != possibleLocation.DataCenter() {
continue
}
existingRacks[replica.location.Rack()] += 1
primaryDcRacks[replica.location.Rack()] += 1
}
primaryRacks, _ := findTopKeys(existingRacks)
sameRackCount := existingRacks[possibleLocation.Rack()]
primaryRacks, _ := findTopKeys(primaryDcRacks)
sameRackCount := primaryDcRacks[possibleLocation.Rack()]
// ensure rack count is within limit
if _, found := existingRacks[possibleLocation.Rack()]; !found {
if _, found := primaryDcRacks[possibleLocation.Rack()]; !found {
// different from existing racks
if len(existingRacks) < replicaPlacement.DiffRackCount+1 {
if len(primaryDcRacks) < replicaPlacement.DiffRackCount+1 {
// lack on different racks
return true
} else {
@ -317,3 +335,43 @@ func (l location) Rack() string {
func (l location) DataCenter() string {
return l.dc
}
func pickOneReplicaToCopyFrom(replicas []*VolumeReplica) *VolumeReplica {
mostRecent := replicas[0]
for _, replica := range replicas {
if replica.info.ModifiedAtSecond > mostRecent.info.ModifiedAtSecond {
mostRecent = replica
}
}
return mostRecent
}
func countReplicas(replicas []*VolumeReplica) (diffDc, diffRack, diffNode map[string]int) {
diffDc = make(map[string]int)
diffRack = make(map[string]int)
diffNode = make(map[string]int)
for _, replica := range replicas {
diffDc[replica.location.DataCenter()] += 1
diffRack[replica.location.Rack()] += 1
diffNode[replica.location.String()] += 1
}
return
}
func pickOneReplicaToDelete(replicas []*VolumeReplica, replicaPlacement *super_block.ReplicaPlacement) *VolumeReplica {
allSame := true
oldest := replicas[0]
for _, replica := range replicas {
if replica.info.ModifiedAtSecond < oldest.info.ModifiedAtSecond {
oldest = replica
allSame = false
}
}
if !allSame {
return oldest
}
// TODO what if all the replicas have the same timestamp?
return oldest
}