mirror of
https://github.com/seaweedfs/seaweedfs.git
synced 2025-01-18 14:41:31 +08:00
lock node.children
This commit is contained in:
parent
57c85adc53
commit
4734efa9a1
@ -4,6 +4,7 @@ import (
|
||||
"errors"
|
||||
"math/rand"
|
||||
"strings"
|
||||
"sync"
|
||||
|
||||
"github.com/chrislusf/seaweedfs/go/glog"
|
||||
"github.com/chrislusf/seaweedfs/go/storage"
|
||||
@ -32,7 +33,7 @@ type Node interface {
|
||||
IsDataNode() bool
|
||||
IsRack() bool
|
||||
IsDataCenter() bool
|
||||
Children() map[NodeId]Node
|
||||
Children() []Node
|
||||
Parent() Node
|
||||
|
||||
GetValue() interface{} //get reference to the topology,dc,rack,datanode
|
||||
@ -43,6 +44,7 @@ type NodeImpl struct {
|
||||
activeVolumeCount int
|
||||
maxVolumeCount int
|
||||
parent Node
|
||||
sync.RWMutex // lock children
|
||||
children map[NodeId]Node
|
||||
maxVolumeId storage.VolumeId
|
||||
|
||||
@ -55,6 +57,7 @@ type NodeImpl struct {
|
||||
func (n *NodeImpl) RandomlyPickNodes(numberOfNodes int, filterFirstNodeFn func(dn Node) error) (firstNode Node, restNodes []Node, err error) {
|
||||
candidates := make([]Node, 0, len(n.children))
|
||||
var errs []string
|
||||
n.RLock()
|
||||
for _, node := range n.children {
|
||||
if err := filterFirstNodeFn(node); err == nil {
|
||||
candidates = append(candidates, node)
|
||||
@ -62,6 +65,7 @@ func (n *NodeImpl) RandomlyPickNodes(numberOfNodes int, filterFirstNodeFn func(d
|
||||
errs = append(errs, string(node.Id())+":"+err.Error())
|
||||
}
|
||||
}
|
||||
n.RUnlock()
|
||||
if len(candidates) == 0 {
|
||||
return nil, nil, errors.New("No matching data node found! \n" + strings.Join(errs, "\n"))
|
||||
}
|
||||
@ -70,6 +74,7 @@ func (n *NodeImpl) RandomlyPickNodes(numberOfNodes int, filterFirstNodeFn func(d
|
||||
|
||||
restNodes = make([]Node, numberOfNodes-1)
|
||||
candidates = candidates[:0]
|
||||
n.RLock()
|
||||
for _, node := range n.children {
|
||||
if node.Id() == firstNode.Id() {
|
||||
continue
|
||||
@ -80,6 +85,7 @@ func (n *NodeImpl) RandomlyPickNodes(numberOfNodes int, filterFirstNodeFn func(d
|
||||
glog.V(2).Infoln("select rest node candidate:", node.Id())
|
||||
candidates = append(candidates, node)
|
||||
}
|
||||
n.RUnlock()
|
||||
glog.V(2).Infoln(n.Id(), "picking", numberOfNodes-1, "from rest", len(candidates), "node candidates")
|
||||
ret := len(restNodes) == 0
|
||||
for k, node := range candidates {
|
||||
@ -126,8 +132,13 @@ func (n *NodeImpl) FreeSpace() int {
|
||||
func (n *NodeImpl) SetParent(node Node) {
|
||||
n.parent = node
|
||||
}
|
||||
func (n *NodeImpl) Children() map[NodeId]Node {
|
||||
return n.children
|
||||
func (n *NodeImpl) Children() (ret []Node) {
|
||||
n.RLock()
|
||||
defer n.RUnlock()
|
||||
for _, c := range n.children {
|
||||
ret = append(ret, c)
|
||||
}
|
||||
return ret
|
||||
}
|
||||
func (n *NodeImpl) Parent() Node {
|
||||
return n.parent
|
||||
@ -136,6 +147,8 @@ func (n *NodeImpl) GetValue() interface{} {
|
||||
return n.value
|
||||
}
|
||||
func (n *NodeImpl) ReserveOneVolume(r int) (assignedNode *DataNode, err error) {
|
||||
n.RLock()
|
||||
defer n.RUnlock()
|
||||
for _, node := range n.children {
|
||||
freeSpace := node.FreeSpace()
|
||||
// fmt.Println("r =", r, ", node =", node, ", freeSpace =", freeSpace)
|
||||
@ -198,6 +211,8 @@ func (n *NodeImpl) GetMaxVolumeCount() int {
|
||||
}
|
||||
|
||||
func (n *NodeImpl) LinkChildNode(node Node) {
|
||||
n.Lock()
|
||||
defer n.Unlock()
|
||||
if n.children[node.Id()] == nil {
|
||||
n.children[node.Id()] = node
|
||||
n.UpAdjustMaxVolumeCountDelta(node.GetMaxVolumeCount())
|
||||
@ -210,6 +225,8 @@ func (n *NodeImpl) LinkChildNode(node Node) {
|
||||
}
|
||||
|
||||
func (n *NodeImpl) UnlinkChildNode(nodeId NodeId) {
|
||||
n.Lock()
|
||||
defer n.Unlock()
|
||||
node := n.children[nodeId]
|
||||
if node != nil {
|
||||
node.SetParent(nil)
|
||||
|
Loading…
Reference in New Issue
Block a user