mirror of
https://github.com/seaweedfs/seaweedfs.git
synced 2024-11-24 02:59:13 +08:00
Unify usage of shell.EcNode.dc as DataCenterId. (#6258)
This commit is contained in:
parent
2caa0e3741
commit
0d5393641e
@ -128,12 +128,11 @@ func oneServerCopyAndMountEcShardsFromSource(grpcDialOption grpc.DialOption,
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO: Make dc a DataCenterId instead of string.
|
func eachDataNode(topo *master_pb.TopologyInfo, fn func(dc DataCenterId, rack RackId, dn *master_pb.DataNodeInfo)) {
|
||||||
func eachDataNode(topo *master_pb.TopologyInfo, fn func(dc string, rack RackId, dn *master_pb.DataNodeInfo)) {
|
|
||||||
for _, dc := range topo.DataCenterInfos {
|
for _, dc := range topo.DataCenterInfos {
|
||||||
for _, rack := range dc.RackInfos {
|
for _, rack := range dc.RackInfos {
|
||||||
for _, dn := range rack.DataNodeInfos {
|
for _, dn := range rack.DataNodeInfos {
|
||||||
fn(dc.Id, RackId(rack.Id), dn)
|
fn(DataCenterId(dc.Id), RackId(rack.Id), dn)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -223,15 +222,15 @@ func collectEcNodes(commandEnv *CommandEnv, selectedDataCenter string) (ecNodes
|
|||||||
}
|
}
|
||||||
|
|
||||||
func collectEcVolumeServersByDc(topo *master_pb.TopologyInfo, selectedDataCenter string) (ecNodes []*EcNode, totalFreeEcSlots int) {
|
func collectEcVolumeServersByDc(topo *master_pb.TopologyInfo, selectedDataCenter string) (ecNodes []*EcNode, totalFreeEcSlots int) {
|
||||||
eachDataNode(topo, func(dc string, rack RackId, dn *master_pb.DataNodeInfo) {
|
eachDataNode(topo, func(dc DataCenterId, rack RackId, dn *master_pb.DataNodeInfo) {
|
||||||
if selectedDataCenter != "" && selectedDataCenter != dc {
|
if selectedDataCenter != "" && selectedDataCenter != string(dc) {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
freeEcSlots := countFreeShardSlots(dn, types.HardDriveType)
|
freeEcSlots := countFreeShardSlots(dn, types.HardDriveType)
|
||||||
ecNodes = append(ecNodes, &EcNode{
|
ecNodes = append(ecNodes, &EcNode{
|
||||||
info: dn,
|
info: dn,
|
||||||
dc: DataCenterId(dc),
|
dc: dc,
|
||||||
rack: rack,
|
rack: rack,
|
||||||
freeEcSlot: int(freeEcSlots),
|
freeEcSlot: int(freeEcSlots),
|
||||||
})
|
})
|
||||||
|
@ -4,11 +4,12 @@ import (
|
|||||||
"context"
|
"context"
|
||||||
"flag"
|
"flag"
|
||||||
"fmt"
|
"fmt"
|
||||||
"github.com/seaweedfs/seaweedfs/weed/pb"
|
|
||||||
"github.com/seaweedfs/seaweedfs/weed/storage/types"
|
|
||||||
"io"
|
"io"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"github.com/seaweedfs/seaweedfs/weed/pb"
|
||||||
|
"github.com/seaweedfs/seaweedfs/weed/storage/types"
|
||||||
|
|
||||||
"google.golang.org/grpc"
|
"google.golang.org/grpc"
|
||||||
|
|
||||||
"github.com/seaweedfs/seaweedfs/weed/operation"
|
"github.com/seaweedfs/seaweedfs/weed/operation"
|
||||||
@ -262,7 +263,7 @@ func collectTopologyInfo(commandEnv *CommandEnv, delayBeforeCollecting time.Dura
|
|||||||
func collectEcShardIds(topoInfo *master_pb.TopologyInfo, selectedCollection string) (vids []needle.VolumeId) {
|
func collectEcShardIds(topoInfo *master_pb.TopologyInfo, selectedCollection string) (vids []needle.VolumeId) {
|
||||||
|
|
||||||
vidMap := make(map[uint32]bool)
|
vidMap := make(map[uint32]bool)
|
||||||
eachDataNode(topoInfo, func(dc string, rack RackId, dn *master_pb.DataNodeInfo) {
|
eachDataNode(topoInfo, func(dc DataCenterId, rack RackId, dn *master_pb.DataNodeInfo) {
|
||||||
if diskInfo, found := dn.DiskInfos[string(types.HardDriveType)]; found {
|
if diskInfo, found := dn.DiskInfos[string(types.HardDriveType)]; found {
|
||||||
for _, v := range diskInfo.EcShardInfos {
|
for _, v := range diskInfo.EcShardInfos {
|
||||||
if v.Collection == selectedCollection {
|
if v.Collection == selectedCollection {
|
||||||
@ -282,7 +283,7 @@ func collectEcShardIds(topoInfo *master_pb.TopologyInfo, selectedCollection stri
|
|||||||
func collectEcNodeShardBits(topoInfo *master_pb.TopologyInfo, vid needle.VolumeId) map[pb.ServerAddress]erasure_coding.ShardBits {
|
func collectEcNodeShardBits(topoInfo *master_pb.TopologyInfo, vid needle.VolumeId) map[pb.ServerAddress]erasure_coding.ShardBits {
|
||||||
|
|
||||||
nodeToEcIndexBits := make(map[pb.ServerAddress]erasure_coding.ShardBits)
|
nodeToEcIndexBits := make(map[pb.ServerAddress]erasure_coding.ShardBits)
|
||||||
eachDataNode(topoInfo, func(dc string, rack RackId, dn *master_pb.DataNodeInfo) {
|
eachDataNode(topoInfo, func(dc DataCenterId, rack RackId, dn *master_pb.DataNodeInfo) {
|
||||||
if diskInfo, found := dn.DiskInfos[string(types.HardDriveType)]; found {
|
if diskInfo, found := dn.DiskInfos[string(types.HardDriveType)]; found {
|
||||||
for _, v := range diskInfo.EcShardInfos {
|
for _, v := range diskInfo.EcShardInfos {
|
||||||
if v.Id == uint32(vid) {
|
if v.Id == uint32(vid) {
|
||||||
|
@ -85,7 +85,7 @@ func (c *commandEcEncode) Do(args []string, commandEnv *CommandEnv, writer io.Wr
|
|||||||
|
|
||||||
if !*forceChanges {
|
if !*forceChanges {
|
||||||
var nodeCount int
|
var nodeCount int
|
||||||
eachDataNode(topologyInfo, func(dc string, rack RackId, dn *master_pb.DataNodeInfo) {
|
eachDataNode(topologyInfo, func(dc DataCenterId, rack RackId, dn *master_pb.DataNodeInfo) {
|
||||||
nodeCount++
|
nodeCount++
|
||||||
})
|
})
|
||||||
if nodeCount < erasure_coding.ParityShardsCount {
|
if nodeCount < erasure_coding.ParityShardsCount {
|
||||||
@ -309,7 +309,7 @@ func collectVolumeIdsForEcEncode(commandEnv *CommandEnv, selectedCollection stri
|
|||||||
fmt.Printf("collect volumes quiet for: %d seconds and %.1f%% full\n", quietSeconds, fullPercentage)
|
fmt.Printf("collect volumes quiet for: %d seconds and %.1f%% full\n", quietSeconds, fullPercentage)
|
||||||
|
|
||||||
vidMap := make(map[uint32]bool)
|
vidMap := make(map[uint32]bool)
|
||||||
eachDataNode(topologyInfo, func(dc string, rack RackId, dn *master_pb.DataNodeInfo) {
|
eachDataNode(topologyInfo, func(dc DataCenterId, rack RackId, dn *master_pb.DataNodeInfo) {
|
||||||
for _, diskInfo := range dn.DiskInfos {
|
for _, diskInfo := range dn.DiskInfos {
|
||||||
for _, v := range diskInfo.VolumeInfos {
|
for _, v := range diskInfo.VolumeInfos {
|
||||||
// ignore remote volumes
|
// ignore remote volumes
|
||||||
|
@ -5,6 +5,12 @@ import (
|
|||||||
"context"
|
"context"
|
||||||
"flag"
|
"flag"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"io"
|
||||||
|
"math"
|
||||||
|
"strings"
|
||||||
|
"sync"
|
||||||
|
"time"
|
||||||
|
|
||||||
"github.com/seaweedfs/seaweedfs/weed/filer"
|
"github.com/seaweedfs/seaweedfs/weed/filer"
|
||||||
"github.com/seaweedfs/seaweedfs/weed/operation"
|
"github.com/seaweedfs/seaweedfs/weed/operation"
|
||||||
"github.com/seaweedfs/seaweedfs/weed/pb"
|
"github.com/seaweedfs/seaweedfs/weed/pb"
|
||||||
@ -15,11 +21,6 @@ import (
|
|||||||
"github.com/seaweedfs/seaweedfs/weed/util"
|
"github.com/seaweedfs/seaweedfs/weed/util"
|
||||||
"go.uber.org/atomic"
|
"go.uber.org/atomic"
|
||||||
"golang.org/x/exp/slices"
|
"golang.org/x/exp/slices"
|
||||||
"io"
|
|
||||||
"math"
|
|
||||||
"strings"
|
|
||||||
"sync"
|
|
||||||
"time"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
func init() {
|
func init() {
|
||||||
@ -114,7 +115,7 @@ func (c *commandFsVerify) collectVolumeIds() error {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
eachDataNode(topologyInfo, func(dc string, rack RackId, nodeInfo *master_pb.DataNodeInfo) {
|
eachDataNode(topologyInfo, func(dc DataCenterId, rack RackId, nodeInfo *master_pb.DataNodeInfo) {
|
||||||
for _, diskInfo := range nodeInfo.DiskInfos {
|
for _, diskInfo := range nodeInfo.DiskInfos {
|
||||||
for _, vi := range diskInfo.VolumeInfos {
|
for _, vi := range diskInfo.VolumeInfos {
|
||||||
volumeServer := pb.NewServerAddressFromDataNode(nodeInfo)
|
volumeServer := pb.NewServerAddressFromDataNode(nodeInfo)
|
||||||
|
@ -2,9 +2,10 @@ package shell
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"testing"
|
||||||
|
|
||||||
"github.com/seaweedfs/seaweedfs/weed/storage/types"
|
"github.com/seaweedfs/seaweedfs/weed/storage/types"
|
||||||
"github.com/stretchr/testify/assert"
|
"github.com/stretchr/testify/assert"
|
||||||
"testing"
|
|
||||||
|
|
||||||
"github.com/seaweedfs/seaweedfs/weed/pb/master_pb"
|
"github.com/seaweedfs/seaweedfs/weed/pb/master_pb"
|
||||||
"github.com/seaweedfs/seaweedfs/weed/storage/super_block"
|
"github.com/seaweedfs/seaweedfs/weed/storage/super_block"
|
||||||
@ -275,7 +276,7 @@ func TestVolumeSelection(t *testing.T) {
|
|||||||
func TestDeleteEmptySelection(t *testing.T) {
|
func TestDeleteEmptySelection(t *testing.T) {
|
||||||
topologyInfo := parseOutput(topoData)
|
topologyInfo := parseOutput(topoData)
|
||||||
|
|
||||||
eachDataNode(topologyInfo, func(dc string, rack RackId, dn *master_pb.DataNodeInfo) {
|
eachDataNode(topologyInfo, func(dc DataCenterId, rack RackId, dn *master_pb.DataNodeInfo) {
|
||||||
for _, diskInfo := range dn.DiskInfos {
|
for _, diskInfo := range dn.DiskInfos {
|
||||||
for _, v := range diskInfo.VolumeInfos {
|
for _, v := range diskInfo.VolumeInfos {
|
||||||
if v.Size <= super_block.SuperBlockSize && v.ModifiedAtSecond > 0 {
|
if v.Size <= super_block.SuperBlockSize && v.ModifiedAtSecond > 0 {
|
||||||
|
@ -5,10 +5,11 @@ import (
|
|||||||
"errors"
|
"errors"
|
||||||
"flag"
|
"flag"
|
||||||
"fmt"
|
"fmt"
|
||||||
"github.com/seaweedfs/seaweedfs/weed/pb"
|
|
||||||
"io"
|
"io"
|
||||||
"path/filepath"
|
"path/filepath"
|
||||||
|
|
||||||
|
"github.com/seaweedfs/seaweedfs/weed/pb"
|
||||||
|
|
||||||
"github.com/seaweedfs/seaweedfs/weed/operation"
|
"github.com/seaweedfs/seaweedfs/weed/operation"
|
||||||
"github.com/seaweedfs/seaweedfs/weed/pb/master_pb"
|
"github.com/seaweedfs/seaweedfs/weed/pb/master_pb"
|
||||||
"github.com/seaweedfs/seaweedfs/weed/pb/volume_server_pb"
|
"github.com/seaweedfs/seaweedfs/weed/pb/volume_server_pb"
|
||||||
@ -72,7 +73,7 @@ func (c *commandVolumeConfigureReplication) Do(args []string, commandEnv *Comman
|
|||||||
volumeFilter := getVolumeFilter(replicaPlacement, uint32(vid), *collectionPattern)
|
volumeFilter := getVolumeFilter(replicaPlacement, uint32(vid), *collectionPattern)
|
||||||
|
|
||||||
// find all data nodes with volumes that needs replication change
|
// find all data nodes with volumes that needs replication change
|
||||||
eachDataNode(topologyInfo, func(dc string, rack RackId, dn *master_pb.DataNodeInfo) {
|
eachDataNode(topologyInfo, func(dc DataCenterId, rack RackId, dn *master_pb.DataNodeInfo) {
|
||||||
var targetVolumeIds []uint32
|
var targetVolumeIds []uint32
|
||||||
for _, diskInfo := range dn.DiskInfos {
|
for _, diskInfo := range dn.DiskInfos {
|
||||||
for _, v := range diskInfo.VolumeInfos {
|
for _, v := range diskInfo.VolumeInfos {
|
||||||
|
@ -2,13 +2,14 @@ package shell
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"flag"
|
"flag"
|
||||||
|
"io"
|
||||||
|
"log"
|
||||||
|
"time"
|
||||||
|
|
||||||
"github.com/seaweedfs/seaweedfs/weed/pb"
|
"github.com/seaweedfs/seaweedfs/weed/pb"
|
||||||
"github.com/seaweedfs/seaweedfs/weed/pb/master_pb"
|
"github.com/seaweedfs/seaweedfs/weed/pb/master_pb"
|
||||||
"github.com/seaweedfs/seaweedfs/weed/storage/needle"
|
"github.com/seaweedfs/seaweedfs/weed/storage/needle"
|
||||||
"github.com/seaweedfs/seaweedfs/weed/storage/super_block"
|
"github.com/seaweedfs/seaweedfs/weed/storage/super_block"
|
||||||
"io"
|
|
||||||
"log"
|
|
||||||
"time"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
func init() {
|
func init() {
|
||||||
@ -59,7 +60,7 @@ func (c *commandVolumeDeleteEmpty) Do(args []string, commandEnv *CommandEnv, wri
|
|||||||
quietSeconds := int64(*quietPeriod / time.Second)
|
quietSeconds := int64(*quietPeriod / time.Second)
|
||||||
nowUnixSeconds := time.Now().Unix()
|
nowUnixSeconds := time.Now().Unix()
|
||||||
|
|
||||||
eachDataNode(topologyInfo, func(dc string, rack RackId, dn *master_pb.DataNodeInfo) {
|
eachDataNode(topologyInfo, func(dc DataCenterId, rack RackId, dn *master_pb.DataNodeInfo) {
|
||||||
for _, diskInfo := range dn.DiskInfos {
|
for _, diskInfo := range dn.DiskInfos {
|
||||||
for _, v := range diskInfo.VolumeInfos {
|
for _, v := range diskInfo.VolumeInfos {
|
||||||
if v.Size <= super_block.SuperBlockSize && v.ModifiedAtSecond > 0 && v.ModifiedAtSecond+quietSeconds < nowUnixSeconds {
|
if v.Size <= super_block.SuperBlockSize && v.ModifiedAtSecond > 0 && v.ModifiedAtSecond+quietSeconds < nowUnixSeconds {
|
||||||
|
@ -179,8 +179,8 @@ func (c *commandVolumeFixReplication) Do(args []string, commandEnv *CommandEnv,
|
|||||||
func collectVolumeReplicaLocations(topologyInfo *master_pb.TopologyInfo) (map[uint32][]*VolumeReplica, []location) {
|
func collectVolumeReplicaLocations(topologyInfo *master_pb.TopologyInfo) (map[uint32][]*VolumeReplica, []location) {
|
||||||
volumeReplicas := make(map[uint32][]*VolumeReplica)
|
volumeReplicas := make(map[uint32][]*VolumeReplica)
|
||||||
var allLocations []location
|
var allLocations []location
|
||||||
eachDataNode(topologyInfo, func(dc string, rack RackId, dn *master_pb.DataNodeInfo) {
|
eachDataNode(topologyInfo, func(dc DataCenterId, rack RackId, dn *master_pb.DataNodeInfo) {
|
||||||
loc := newLocation(dc, string(rack), dn)
|
loc := newLocation(string(dc), string(rack), dn)
|
||||||
for _, diskInfo := range dn.DiskInfos {
|
for _, diskInfo := range dn.DiskInfos {
|
||||||
for _, v := range diskInfo.VolumeInfos {
|
for _, v := range diskInfo.VolumeInfos {
|
||||||
volumeReplicas[v.Id] = append(volumeReplicas[v.Id], &VolumeReplica{
|
volumeReplicas[v.Id] = append(volumeReplicas[v.Id], &VolumeReplica{
|
||||||
|
@ -651,7 +651,7 @@ func (c *commandVolumeFsck) collectVolumeIds() (volumeIdToServer map[string]map[
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
eachDataNode(topologyInfo, func(dc string, rack RackId, t *master_pb.DataNodeInfo) {
|
eachDataNode(topologyInfo, func(dc DataCenterId, rack RackId, t *master_pb.DataNodeInfo) {
|
||||||
var volumeCount, ecShardCount int
|
var volumeCount, ecShardCount int
|
||||||
dataNodeId := t.GetId()
|
dataNodeId := t.GetId()
|
||||||
for _, diskInfo := range t.DiskInfos {
|
for _, diskInfo := range t.DiskInfos {
|
||||||
|
@ -90,7 +90,7 @@ func (c *commandVolumeTierDownload) Do(args []string, commandEnv *CommandEnv, wr
|
|||||||
func collectRemoteVolumes(topoInfo *master_pb.TopologyInfo, selectedCollection string) (vids []needle.VolumeId) {
|
func collectRemoteVolumes(topoInfo *master_pb.TopologyInfo, selectedCollection string) (vids []needle.VolumeId) {
|
||||||
|
|
||||||
vidMap := make(map[uint32]bool)
|
vidMap := make(map[uint32]bool)
|
||||||
eachDataNode(topoInfo, func(dc string, rack RackId, dn *master_pb.DataNodeInfo) {
|
eachDataNode(topoInfo, func(dc DataCenterId, rack RackId, dn *master_pb.DataNodeInfo) {
|
||||||
for _, diskInfo := range dn.DiskInfos {
|
for _, diskInfo := range dn.DiskInfos {
|
||||||
for _, v := range diskInfo.VolumeInfos {
|
for _, v := range diskInfo.VolumeInfos {
|
||||||
if v.Collection == selectedCollection && v.RemoteStorageKey != "" && v.RemoteStorageName != "" {
|
if v.Collection == selectedCollection && v.RemoteStorageKey != "" && v.RemoteStorageName != "" {
|
||||||
|
@ -292,7 +292,7 @@ func collectVolumeIdsForTierChange(topologyInfo *master_pb.TopologyInfo, volumeS
|
|||||||
fmt.Printf("collect %s volumes quiet for: %d seconds\n", sourceTier, quietSeconds)
|
fmt.Printf("collect %s volumes quiet for: %d seconds\n", sourceTier, quietSeconds)
|
||||||
|
|
||||||
vidMap := make(map[uint32]bool)
|
vidMap := make(map[uint32]bool)
|
||||||
eachDataNode(topologyInfo, func(dc string, rack RackId, dn *master_pb.DataNodeInfo) {
|
eachDataNode(topologyInfo, func(dc DataCenterId, rack RackId, dn *master_pb.DataNodeInfo) {
|
||||||
for _, diskInfo := range dn.DiskInfos {
|
for _, diskInfo := range dn.DiskInfos {
|
||||||
for _, v := range diskInfo.VolumeInfos {
|
for _, v := range diskInfo.VolumeInfos {
|
||||||
// check collection name pattern
|
// check collection name pattern
|
||||||
|
Loading…
Reference in New Issue
Block a user