diff --git a/weed/shell/command_ec_common.go b/weed/shell/command_ec_common.go index b5ba657e1..c38518664 100644 --- a/weed/shell/command_ec_common.go +++ b/weed/shell/command_ec_common.go @@ -559,24 +559,24 @@ type ecBalancer struct { applyBalancing bool parallelize bool - wg *sync.WaitGroup - // TODO: Maybe accumulate all errors instead of just the last one. - wgError error + wg *sync.WaitGroup + wgErrors []error } type ecBalancerTask func() error func (ecb *ecBalancer) wgInit() { - if ecb.wg == nil { - ecb.wg = &sync.WaitGroup{} - ecb.wgError = nil + if ecb.wg != nil { + return } + ecb.wg = &sync.WaitGroup{} + ecb.wgErrors = nil } func (ecb *ecBalancer) wgAdd(f ecBalancerTask) { if ecb.wg == nil || !ecb.parallelize { if err := f(); err != nil { - ecb.wgError = err + ecb.wgErrors = append(ecb.wgErrors, err) } return } @@ -584,7 +584,7 @@ func (ecb *ecBalancer) wgAdd(f ecBalancerTask) { ecb.wg.Add(1) go func() { if err := f(); err != nil { - ecb.wgError = err + ecb.wgErrors = append(ecb.wgErrors, err) } ecb.wg.Done() }() @@ -594,9 +594,9 @@ func (ecb *ecBalancer) wgWait() error { if ecb.wg != nil { ecb.wg.Wait() } - err := ecb.wgError + err := errors.Join(ecb.wgErrors...) ecb.wg = nil - ecb.wgError = nil + ecb.wgErrors = nil return err } @@ -846,15 +846,15 @@ func (ecb *ecBalancer) doBalanceEcShardsWithinOneRack(averageShardsPerEcNode int func (ecb *ecBalancer) balanceEcRacks() error { // balance one rack for all ec shards + ecb.wgInit() for _, ecRack := range ecb.racks() { - if err := ecb.doBalanceEcRack(ecRack); err != nil { - return err - } + ecb.wgAdd(func() error { + return ecb.doBalanceEcRack(ecRack) + }) } - return nil + return ecb.wgWait() } -// TODO: enable parallelization func (ecb *ecBalancer) doBalanceEcRack(ecRack *EcRack) error { if len(ecRack.ecNodes) <= 1 { return nil