diff --git a/weed/shell/command_ec_balance.go b/weed/shell/command_ec_balance.go index 9e27f40ce..31782b279 100644 --- a/weed/shell/command_ec_balance.go +++ b/weed/shell/command_ec_balance.go @@ -17,7 +17,6 @@ func (c *commandEcBalance) Name() string { return "ec.balance" } -// TODO: Update help string and move to command_ec_common.go once shard replica placement logic is enabled. func (c *commandEcBalance) Help() string { return `balance all ec shards among all racks and volume servers @@ -36,6 +35,7 @@ func (c *commandEcBalance) Do(args []string, commandEnv *CommandEnv, writer io.W collection := balanceCommand.String("collection", "EACH_COLLECTION", "collection name, or \"EACH_COLLECTION\" for each collection") dc := balanceCommand.String("dataCenter", "", "only apply the balancing for this dataCenter") shardReplicaPlacement := balanceCommand.String("shardReplicaPlacement", "", "replica placement for EC shards, or master default if empty") + parallelize := balanceCommand.Bool("parallelize", true, "parallelize operations whenever possible") applyBalancing := balanceCommand.Bool("force", false, "apply the balancing plan") if err = balanceCommand.Parse(args); err != nil { return nil @@ -62,5 +62,5 @@ func (c *commandEcBalance) Do(args []string, commandEnv *CommandEnv, writer io.W return err } - return EcBalance(commandEnv, collections, *dc, rp, *applyBalancing) + return EcBalance(commandEnv, collections, *dc, rp, *parallelize, *applyBalancing) } diff --git a/weed/shell/command_ec_common.go b/weed/shell/command_ec_common.go index 30667896a..b5ba657e1 100644 --- a/weed/shell/command_ec_common.go +++ b/weed/shell/command_ec_common.go @@ -6,6 +6,7 @@ import ( "fmt" "math/rand/v2" "sort" + "sync" "time" "github.com/seaweedfs/seaweedfs/weed/glog" @@ -556,6 +557,48 @@ type ecBalancer struct { ecNodes []*EcNode replicaPlacement *super_block.ReplicaPlacement applyBalancing bool + parallelize bool + + wg *sync.WaitGroup + // TODO: Maybe accumulate all errors instead of just the last one. + wgError error +} + +type ecBalancerTask func() error + +func (ecb *ecBalancer) wgInit() { + if ecb.wg == nil { + ecb.wg = &sync.WaitGroup{} + ecb.wgError = nil + } +} + +func (ecb *ecBalancer) wgAdd(f ecBalancerTask) { + if ecb.wg == nil || !ecb.parallelize { + if err := f(); err != nil { + ecb.wgError = err + } + return + } + + ecb.wg.Add(1) + go func() { + if err := f(); err != nil { + ecb.wgError = err + } + ecb.wg.Done() + }() +} + +func (ecb *ecBalancer) wgWait() error { + if ecb.wg != nil { + ecb.wg.Wait() + } + err := ecb.wgError + ecb.wg = nil + ecb.wgError = nil + + return err } func (ecb *ecBalancer) racks() map[RackId]*EcRack { @@ -592,15 +635,15 @@ func (ecb *ecBalancer) balanceEcVolumes(collection string) error { } func (ecb *ecBalancer) deleteDuplicatedEcShards(collection string) error { - // vid => []ecNode vidLocations := ecb.collectVolumeIdToEcNodes(collection) - // deduplicate ec shards + + ecb.wgInit() for vid, locations := range vidLocations { - if err := ecb.doDeduplicateEcShards(collection, vid, locations); err != nil { - return err - } + ecb.wgAdd(func() error { + return ecb.doDeduplicateEcShards(collection, vid, locations) + }) } - return nil + return ecb.wgWait() } func (ecb *ecBalancer) doDeduplicateEcShards(collection string, vid needle.VolumeId, locations []*EcNode) error { @@ -636,6 +679,7 @@ func (ecb *ecBalancer) doDeduplicateEcShards(collection string, vid needle.Volum return nil } +// TODO: enable parallelization func (ecb *ecBalancer) balanceEcShardsAcrossRacks(collection string) error { // collect vid => []ecNode, since previous steps can change the locations vidLocations := ecb.collectVolumeIdToEcNodes(collection) @@ -741,6 +785,7 @@ func (ecb *ecBalancer) pickRackToBalanceShardsInto(rackToEcNodes map[RackId]*EcR return targets[rand.IntN(len(targets))], nil } +// TODO: enable parallelization func (ecb *ecBalancer) balanceEcShardsWithinRacks(collection string) error { // collect vid => []ecNode, since previous steps can change the locations vidLocations := ecb.collectVolumeIdToEcNodes(collection) @@ -809,6 +854,7 @@ func (ecb *ecBalancer) balanceEcRacks() error { return nil } +// TODO: enable parallelization func (ecb *ecBalancer) doBalanceEcRack(ecRack *EcRack) error { if len(ecRack.ecNodes) <= 1 { return nil @@ -1001,7 +1047,7 @@ func (ecb *ecBalancer) collectVolumeIdToEcNodes(collection string) map[needle.Vo return vidLocations } -func EcBalance(commandEnv *CommandEnv, collections []string, dc string, ecReplicaPlacement *super_block.ReplicaPlacement, applyBalancing bool) (err error) { +func EcBalance(commandEnv *CommandEnv, collections []string, dc string, ecReplicaPlacement *super_block.ReplicaPlacement, parallelize bool, applyBalancing bool) (err error) { if len(collections) == 0 { return fmt.Errorf("no collections to balance") } @@ -1020,6 +1066,7 @@ func EcBalance(commandEnv *CommandEnv, collections []string, dc string, ecReplic ecNodes: allEcNodes, replicaPlacement: ecReplicaPlacement, applyBalancing: applyBalancing, + parallelize: parallelize, } for _, c := range collections { diff --git a/weed/shell/command_ec_encode.go b/weed/shell/command_ec_encode.go index f8b881d7c..62bf7fbbf 100644 --- a/weed/shell/command_ec_encode.go +++ b/weed/shell/command_ec_encode.go @@ -65,8 +65,7 @@ func (c *commandEcEncode) Do(args []string, commandEnv *CommandEnv, writer io.Wr collection := encodeCommand.String("collection", "", "the collection name") fullPercentage := encodeCommand.Float64("fullPercent", 95, "the volume reaches the percentage of max volume size") quietPeriod := encodeCommand.Duration("quietFor", time.Hour, "select volumes without no writes for this period") - // TODO: Add concurrency support to EcBalance and reenable this switch? - //parallelCopy := encodeCommand.Bool("parallelCopy", true, "copy shards in parallel") + parallelize := encodeCommand.Bool("parallelize", true, "parallelize operations whenever possible") forceChanges := encodeCommand.Bool("force", false, "force the encoding even if the cluster has less than recommended 4 nodes") shardReplicaPlacement := encodeCommand.String("shardReplicaPlacement", "", "replica placement for EC shards, or master default if empty") applyBalancing := encodeCommand.Bool("rebalance", false, "re-balance EC shards after creation") @@ -125,7 +124,7 @@ func (c *commandEcEncode) Do(args []string, commandEnv *CommandEnv, writer io.Wr } } // ...then re-balance ec shards. - if err := EcBalance(commandEnv, collections, "", rp, *applyBalancing); err != nil { + if err := EcBalance(commandEnv, collections, "", rp, *parallelize, *applyBalancing); err != nil { return fmt.Errorf("re-balance ec shards for collection(s) %v: %v", collections, err) }