From 9fbc4ea417d7af358270a53fdd4f7f37a37d82d0 Mon Sep 17 00:00:00 2001 From: Lisandro Pin Date: Tue, 17 Dec 2024 18:39:51 +0100 Subject: [PATCH] Rework `shell.EcBalance()`'s waitgroup code into a standalone type. (#6373) Rework `shell.EcBalance()`'s waitgroup with errors code into a standalone type. We'll re-use this for other EC jobs - for example, volume creation. Also fixes potential concurrency issues when collecting error results. --- weed/shell/command_ec_common.go | 115 +++++++++++++++++--------------- 1 file changed, 61 insertions(+), 54 deletions(-) diff --git a/weed/shell/command_ec_common.go b/weed/shell/command_ec_common.go index c73e342db..a09e2ad62 100644 --- a/weed/shell/command_ec_common.go +++ b/weed/shell/command_ec_common.go @@ -113,6 +113,50 @@ var ( getDefaultReplicaPlacement = _getDefaultReplicaPlacement ) +type ErrorWaitGroup struct { + parallelize bool + wg *sync.WaitGroup + errors []error + errorsMu sync.Mutex +} +type ErrorWaitGroupTask func() error + +func (ewg *ErrorWaitGroup) Init() { + if ewg.wg != nil { + return + } + ewg.wg = &sync.WaitGroup{} + ewg.errors = nil +} + +func (ewg *ErrorWaitGroup) Add(f ErrorWaitGroupTask) { + if ewg.wg == nil || !ewg.parallelize { + ewg.errors = append(ewg.errors, f()) + return + } + + ewg.wg.Add(1) + go func() { + err := f() + ewg.errorsMu.Lock() + ewg.errors = append(ewg.errors, err) + ewg.errorsMu.Unlock() + ewg.wg.Done() + }() +} + +func (ewg *ErrorWaitGroup) Wait() error { + if ewg.wg != nil { + ewg.wg.Wait() + } + + err := errors.Join(ewg.errors...) + ewg.wg = nil + ewg.errors = nil + + return err +} + func _getDefaultReplicaPlacement(commandEnv *CommandEnv) (*super_block.ReplicaPlacement, error) { var resp *master_pb.GetMasterConfigurationResponse var err error @@ -557,48 +601,8 @@ type ecBalancer struct { ecNodes []*EcNode replicaPlacement *super_block.ReplicaPlacement applyBalancing bool - parallelize bool - wg *sync.WaitGroup - wgErrors []error -} - -type ecBalancerTask func() error - -func (ecb *ecBalancer) wgInit() { - if ecb.wg != nil { - return - } - ecb.wg = &sync.WaitGroup{} - ecb.wgErrors = nil -} - -func (ecb *ecBalancer) wgAdd(f ecBalancerTask) { - if ecb.wg == nil || !ecb.parallelize { - if err := f(); err != nil { - ecb.wgErrors = append(ecb.wgErrors, err) - } - return - } - - ecb.wg.Add(1) - go func() { - if err := f(); err != nil { - ecb.wgErrors = append(ecb.wgErrors, err) - } - ecb.wg.Done() - }() -} - -func (ecb *ecBalancer) wgWait() error { - if ecb.wg != nil { - ecb.wg.Wait() - } - err := errors.Join(ecb.wgErrors...) - ecb.wg = nil - ecb.wgErrors = nil - - return err + ewg ErrorWaitGroup } func (ecb *ecBalancer) racks() map[RackId]*EcRack { @@ -637,13 +641,13 @@ func (ecb *ecBalancer) balanceEcVolumes(collection string) error { func (ecb *ecBalancer) deleteDuplicatedEcShards(collection string) error { vidLocations := ecb.collectVolumeIdToEcNodes(collection) - ecb.wgInit() + ecb.ewg.Init() for vid, locations := range vidLocations { - ecb.wgAdd(func() error { + ecb.ewg.Add(func() error { return ecb.doDeduplicateEcShards(collection, vid, locations) }) } - return ecb.wgWait() + return ecb.ewg.Wait() } func (ecb *ecBalancer) doDeduplicateEcShards(collection string, vid needle.VolumeId, locations []*EcNode) error { @@ -684,13 +688,13 @@ func (ecb *ecBalancer) balanceEcShardsAcrossRacks(collection string) error { vidLocations := ecb.collectVolumeIdToEcNodes(collection) // spread the ec shards evenly - ecb.wgInit() + ecb.ewg.Init() for vid, locations := range vidLocations { - ecb.wgAdd(func() error { + ecb.ewg.Add(func() error { return ecb.doBalanceEcShardsAcrossRacks(collection, vid, locations) }) } - return ecb.wgWait() + return ecb.ewg.Wait() } func countShardsByRack(vid needle.VolumeId, locations []*EcNode) map[string]int { @@ -792,7 +796,7 @@ func (ecb *ecBalancer) balanceEcShardsWithinRacks(collection string) error { racks := ecb.racks() // spread the ec shards evenly - ecb.wgInit() + ecb.ewg.Init() for vid, locations := range vidLocations { // see the volume's shards are in how many racks, and how many in each rack @@ -811,12 +815,12 @@ func (ecb *ecBalancer) balanceEcShardsWithinRacks(collection string) error { } sourceEcNodes := rackEcNodesWithVid[rackId] averageShardsPerEcNode := ceilDivide(rackToShardCount[rackId], len(possibleDestinationEcNodes)) - ecb.wgAdd(func() error { + ecb.ewg.Add(func() error { return ecb.doBalanceEcShardsWithinOneRack(averageShardsPerEcNode, collection, vid, sourceEcNodes, possibleDestinationEcNodes) }) } } - return ecb.wgWait() + return ecb.ewg.Wait() } func (ecb *ecBalancer) doBalanceEcShardsWithinOneRack(averageShardsPerEcNode int, collection string, vid needle.VolumeId, existingLocations, possibleDestinationEcNodes []*EcNode) error { @@ -847,13 +851,13 @@ func (ecb *ecBalancer) doBalanceEcShardsWithinOneRack(averageShardsPerEcNode int func (ecb *ecBalancer) balanceEcRacks() error { // balance one rack for all ec shards - ecb.wgInit() + ecb.ewg.Init() for _, ecRack := range ecb.racks() { - ecb.wgAdd(func() error { + ecb.ewg.Add(func() error { return ecb.doBalanceEcRack(ecRack) }) } - return ecb.wgWait() + return ecb.ewg.Wait() } func (ecb *ecBalancer) doBalanceEcRack(ecRack *EcRack) error { @@ -1067,7 +1071,10 @@ func EcBalance(commandEnv *CommandEnv, collections []string, dc string, ecReplic ecNodes: allEcNodes, replicaPlacement: ecReplicaPlacement, applyBalancing: applyBalancing, - parallelize: parallelize, + + ewg: ErrorWaitGroup{ + parallelize: parallelize, + }, } for _, c := range collections {