seaweedfs/weed/topology/node.go

280 lines
7.9 KiB
Go
Raw Normal View History

2012-08-24 11:56:09 +08:00
package topology
2012-08-24 13:56:14 +08:00
import (
"errors"
"math/rand"
"strings"
"sync"
"sync/atomic"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/stats"
"github.com/seaweedfs/seaweedfs/weed/storage/erasure_coding"
"github.com/seaweedfs/seaweedfs/weed/storage/needle"
"github.com/seaweedfs/seaweedfs/weed/storage/types"
2012-08-24 13:56:14 +08:00
)
2012-08-24 11:56:09 +08:00
type NodeId string
type Node interface {
Id() NodeId
String() string
2020-12-18 05:25:05 +08:00
AvailableSpaceFor(option *VolumeGrowOption) int64
ReserveOneVolume(r int64, option *VolumeGrowOption) (*DataNode, error)
2021-02-16 18:47:02 +08:00
UpAdjustDiskUsageDelta(deltaDiskUsages *DiskUsages)
2019-04-19 12:43:36 +08:00
UpAdjustMaxVolumeId(vid needle.VolumeId)
2021-02-16 18:47:02 +08:00
GetDiskUsages() *DiskUsages
2012-10-10 11:53:31 +08:00
2019-04-19 12:43:36 +08:00
GetMaxVolumeId() needle.VolumeId
SetParent(Node)
LinkChildNode(node Node)
UnlinkChildNode(nodeId NodeId)
2021-05-06 18:46:14 +08:00
CollectDeadNodeAndFullVolumes(freshThreshHold int64, volumeSizeLimit uint64, growThreshold float64)
2012-09-03 16:50:04 +08:00
2012-09-09 07:25:44 +08:00
IsDataNode() bool
IsRack() bool
IsDataCenter() bool
2016-05-20 14:57:31 +08:00
Children() []Node
Parent() Node
2012-10-10 11:53:31 +08:00
GetValue() interface{} //get reference to the topology,dc,rack,datanode
2012-08-24 11:56:09 +08:00
}
type NodeImpl struct {
2021-02-16 18:47:02 +08:00
diskUsages *DiskUsages
id NodeId
parent Node
2021-02-18 12:57:08 +08:00
sync.RWMutex // lock children
2021-02-16 18:47:02 +08:00
children map[NodeId]Node
maxVolumeId needle.VolumeId
//for rack, data center, topology
nodeType string
2012-10-10 11:53:31 +08:00
value interface{}
2012-09-01 17:20:59 +08:00
}
2012-08-31 16:35:11 +08:00
2021-02-16 18:47:02 +08:00
func (n *NodeImpl) GetDiskUsages() *DiskUsages {
return n.diskUsages
}
// the first node must satisfy filterFirstNodeFn(), the rest nodes must have one free slot
2020-12-18 05:25:05 +08:00
func (n *NodeImpl) PickNodesByWeight(numberOfNodes int, option *VolumeGrowOption, filterFirstNodeFn func(dn Node) error) (firstNode Node, restNodes []Node, err error) {
var totalWeights int64
2015-03-10 15:20:31 +08:00
var errs []string
2016-05-20 14:57:31 +08:00
n.RLock()
candidates := make([]Node, 0, len(n.children))
candidatesWeights := make([]int64, 0, len(n.children))
//pick nodes which has enough free volumes as candidates, and use free volumes number as node weight.
for _, node := range n.children {
2020-12-18 05:25:05 +08:00
if node.AvailableSpaceFor(option) <= 0 {
continue
}
2020-12-18 05:25:05 +08:00
totalWeights += node.AvailableSpaceFor(option)
candidates = append(candidates, node)
2020-12-18 05:25:05 +08:00
candidatesWeights = append(candidatesWeights, node.AvailableSpaceFor(option))
}
2016-05-20 14:57:31 +08:00
n.RUnlock()
if len(candidates) < numberOfNodes {
2020-03-23 09:32:49 +08:00
glog.V(0).Infoln(n.Id(), "failed to pick", numberOfNodes, "from ", len(candidates), "node candidates")
return nil, nil, errors.New("Not enough data nodes found!")
}
//pick nodes randomly by weights, the node picked earlier has higher final weights
sortedCandidates := make([]Node, 0, len(candidates))
2020-03-07 22:12:57 +08:00
for i := 0; i < len(candidates); i++ {
weightsInterval := rand.Int63n(totalWeights)
lastWeights := int64(0)
for k, weights := range candidatesWeights {
2020-03-07 22:12:57 +08:00
if (weightsInterval >= lastWeights) && (weightsInterval < lastWeights+weights) {
sortedCandidates = append(sortedCandidates, candidates[k])
candidatesWeights[k] = 0
totalWeights -= weights
break
}
lastWeights += weights
}
}
restNodes = make([]Node, 0, numberOfNodes-1)
ret := false
n.RLock()
for k, node := range sortedCandidates {
if err := filterFirstNodeFn(node); err == nil {
firstNode = node
if k >= numberOfNodes-1 {
restNodes = sortedCandidates[:numberOfNodes-1]
} else {
restNodes = append(restNodes, sortedCandidates[:k]...)
restNodes = append(restNodes, sortedCandidates[k+1:numberOfNodes]...)
}
ret = true
break
} else {
errs = append(errs, string(node.Id())+":"+err.Error())
}
}
n.RUnlock()
if !ret {
return nil, nil, errors.New("No matching data node found! \n" + strings.Join(errs, "\n"))
}
return
}
2012-09-09 07:25:44 +08:00
func (n *NodeImpl) IsDataNode() bool {
return n.nodeType == "DataNode"
}
func (n *NodeImpl) IsRack() bool {
return n.nodeType == "Rack"
}
func (n *NodeImpl) IsDataCenter() bool {
return n.nodeType == "DataCenter"
}
func (n *NodeImpl) String() string {
if n.parent != nil {
return n.parent.String() + ":" + string(n.id)
}
return string(n.id)
}
func (n *NodeImpl) Id() NodeId {
return n.id
}
2021-02-16 18:47:02 +08:00
func (n *NodeImpl) getOrCreateDisk(diskType types.DiskType) *DiskUsageCounts {
return n.diskUsages.getOrCreateDisk(diskType)
}
2021-02-16 18:47:02 +08:00
func (n *NodeImpl) AvailableSpaceFor(option *VolumeGrowOption) int64 {
t := n.getOrCreateDisk(option.DiskType)
freeVolumeSlotCount := atomic.LoadInt64(&t.maxVolumeCount) + atomic.LoadInt64(&t.remoteVolumeCount) - atomic.LoadInt64(&t.volumeCount)
ecShardCount := atomic.LoadInt64(&t.ecShardCount)
if ecShardCount > 0 {
freeVolumeSlotCount = freeVolumeSlotCount - ecShardCount/erasure_coding.DataShardsCount - 1
}
return freeVolumeSlotCount
}
func (n *NodeImpl) SetParent(node Node) {
n.parent = node
}
2016-05-20 14:57:31 +08:00
func (n *NodeImpl) Children() (ret []Node) {
n.RLock()
defer n.RUnlock()
for _, c := range n.children {
ret = append(ret, c)
}
return ret
2012-09-03 16:50:04 +08:00
}
func (n *NodeImpl) Parent() Node {
return n.parent
2012-09-03 16:50:04 +08:00
}
2012-10-10 11:53:31 +08:00
func (n *NodeImpl) GetValue() interface{} {
return n.value
}
2020-12-18 05:25:05 +08:00
func (n *NodeImpl) ReserveOneVolume(r int64, option *VolumeGrowOption) (assignedNode *DataNode, err error) {
2016-05-20 14:57:31 +08:00
n.RLock()
defer n.RUnlock()
for _, node := range n.children {
2020-12-18 05:25:05 +08:00
freeSpace := node.AvailableSpaceFor(option)
// fmt.Println("r =", r, ", node =", node, ", freeSpace =", freeSpace)
if freeSpace <= 0 {
continue
}
2012-09-01 17:20:59 +08:00
if r >= freeSpace {
r -= freeSpace
} else {
2020-12-18 05:25:05 +08:00
if node.IsDataNode() && node.AvailableSpaceFor(option) > 0 {
// fmt.Println("vid =", vid, " assigned to node =", node, ", freeSpace =", node.FreeSpace())
return node.(*DataNode), nil
2012-09-03 16:50:04 +08:00
}
2020-12-18 05:25:05 +08:00
assignedNode, err = node.ReserveOneVolume(r, option)
2018-01-17 19:53:41 +08:00
if err == nil {
return
}
}
}
2017-08-11 01:26:19 +08:00
return nil, errors.New("No free volume slot found!")
}
2021-02-16 18:47:02 +08:00
func (n *NodeImpl) UpAdjustDiskUsageDelta(deltaDiskUsages *DiskUsages) { //can be negative
for diskType, diskUsage := range deltaDiskUsages.usages {
existingDisk := n.getOrCreateDisk(diskType)
existingDisk.addDiskUsageCounts(diskUsage)
2020-03-23 09:32:49 +08:00
}
if n.parent != nil {
2021-02-16 18:47:02 +08:00
n.parent.UpAdjustDiskUsageDelta(deltaDiskUsages)
}
}
2019-04-19 12:43:36 +08:00
func (n *NodeImpl) UpAdjustMaxVolumeId(vid needle.VolumeId) { //can be negative
if n.maxVolumeId < vid {
n.maxVolumeId = vid
if n.parent != nil {
n.parent.UpAdjustMaxVolumeId(vid)
}
}
2012-09-01 17:20:59 +08:00
}
2019-04-19 12:43:36 +08:00
func (n *NodeImpl) GetMaxVolumeId() needle.VolumeId {
2012-08-31 16:35:11 +08:00
return n.maxVolumeId
2012-08-29 16:37:40 +08:00
}
func (n *NodeImpl) LinkChildNode(node Node) {
2016-05-20 14:57:31 +08:00
n.Lock()
defer n.Unlock()
2021-02-17 02:48:16 +08:00
n.doLinkChildNode(node)
}
func (n *NodeImpl) doLinkChildNode(node Node) {
if n.children[node.Id()] == nil {
n.children[node.Id()] = node
2021-02-16 18:47:02 +08:00
n.UpAdjustDiskUsageDelta(node.GetDiskUsages())
n.UpAdjustMaxVolumeId(node.GetMaxVolumeId())
node.SetParent(n)
glog.V(0).Infoln(n, "adds child", node.Id())
}
}
func (n *NodeImpl) UnlinkChildNode(nodeId NodeId) {
2016-05-20 14:57:31 +08:00
n.Lock()
defer n.Unlock()
2012-09-01 17:20:59 +08:00
node := n.children[nodeId]
if node != nil {
node.SetParent(nil)
delete(n.children, node.Id())
2021-02-16 18:47:02 +08:00
n.UpAdjustDiskUsageDelta(node.GetDiskUsages().negative())
glog.V(0).Infoln(n, "removes", node.Id())
}
}
2021-05-06 18:46:14 +08:00
func (n *NodeImpl) CollectDeadNodeAndFullVolumes(freshThreshHold int64, volumeSizeLimit uint64, growThreshold float64) {
if n.IsRack() {
for _, c := range n.Children() {
dn := c.(*DataNode) //can not cast n to DataNode
dn.RLock()
2016-05-20 14:32:56 +08:00
for _, v := range dn.GetVolumes() {
2021-05-06 18:46:14 +08:00
if v.Size >= volumeSizeLimit {
//fmt.Println("volume",v.Id,"size",v.Size,">",volumeSizeLimit)
n.GetTopology().chanFullVolumes <- v
2021-07-01 16:21:14 +08:00
} else if float64(v.Size) > float64(volumeSizeLimit)*growThreshold {
n.GetTopology().chanCrowdedVolumes <- v
}
copyCount := v.ReplicaPlacement.GetCopyCount()
if copyCount > 1 {
if copyCount > len(n.GetTopology().Lookup(v.Collection, v.Id)) {
stats.MasterReplicaPlacementMismatch.WithLabelValues(v.Collection, v.Id.String()).Set(1)
} else {
stats.MasterReplicaPlacementMismatch.WithLabelValues(v.Collection, v.Id.String()).Set(0)
}
}
}
dn.RUnlock()
}
} else {
for _, c := range n.Children() {
2021-05-06 18:46:14 +08:00
c.CollectDeadNodeAndFullVolumes(freshThreshHold, volumeSizeLimit, growThreshold)
}
}
2012-09-19 05:05:12 +08:00
}
2012-10-10 11:53:31 +08:00
func (n *NodeImpl) GetTopology() *Topology {
var p Node
p = n
for p.Parent() != nil {
p = p.Parent()
}
return p.GetValue().(*Topology)
}