Parallelize EC balancing for racks. (#6351)

This commit is contained in:
Lisandro Pin 2024-12-13 14:33:53 +01:00 committed by GitHub
parent d6f3e1970d
commit b81def5e5c
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

View File

@ -559,24 +559,24 @@ type ecBalancer struct {
applyBalancing bool applyBalancing bool
parallelize bool parallelize bool
wg *sync.WaitGroup wg *sync.WaitGroup
// TODO: Maybe accumulate all errors instead of just the last one. wgErrors []error
wgError error
} }
type ecBalancerTask func() error type ecBalancerTask func() error
func (ecb *ecBalancer) wgInit() { func (ecb *ecBalancer) wgInit() {
if ecb.wg == nil { if ecb.wg != nil {
ecb.wg = &sync.WaitGroup{} return
ecb.wgError = nil
} }
ecb.wg = &sync.WaitGroup{}
ecb.wgErrors = nil
} }
func (ecb *ecBalancer) wgAdd(f ecBalancerTask) { func (ecb *ecBalancer) wgAdd(f ecBalancerTask) {
if ecb.wg == nil || !ecb.parallelize { if ecb.wg == nil || !ecb.parallelize {
if err := f(); err != nil { if err := f(); err != nil {
ecb.wgError = err ecb.wgErrors = append(ecb.wgErrors, err)
} }
return return
} }
@ -584,7 +584,7 @@ func (ecb *ecBalancer) wgAdd(f ecBalancerTask) {
ecb.wg.Add(1) ecb.wg.Add(1)
go func() { go func() {
if err := f(); err != nil { if err := f(); err != nil {
ecb.wgError = err ecb.wgErrors = append(ecb.wgErrors, err)
} }
ecb.wg.Done() ecb.wg.Done()
}() }()
@ -594,9 +594,9 @@ func (ecb *ecBalancer) wgWait() error {
if ecb.wg != nil { if ecb.wg != nil {
ecb.wg.Wait() ecb.wg.Wait()
} }
err := ecb.wgError err := errors.Join(ecb.wgErrors...)
ecb.wg = nil ecb.wg = nil
ecb.wgError = nil ecb.wgErrors = nil
return err return err
} }
@ -846,15 +846,15 @@ func (ecb *ecBalancer) doBalanceEcShardsWithinOneRack(averageShardsPerEcNode int
func (ecb *ecBalancer) balanceEcRacks() error { func (ecb *ecBalancer) balanceEcRacks() error {
// balance one rack for all ec shards // balance one rack for all ec shards
ecb.wgInit()
for _, ecRack := range ecb.racks() { for _, ecRack := range ecb.racks() {
if err := ecb.doBalanceEcRack(ecRack); err != nil { ecb.wgAdd(func() error {
return err return ecb.doBalanceEcRack(ecRack)
} })
} }
return nil return ecb.wgWait()
} }
// TODO: enable parallelization
func (ecb *ecBalancer) doBalanceEcRack(ecRack *EcRack) error { func (ecb *ecBalancer) doBalanceEcRack(ecRack *EcRack) error {
if len(ecRack.ecNodes) <= 1 { if len(ecRack.ecNodes) <= 1 {
return nil return nil