mirror of
https://github.com/seaweedfs/seaweedfs.git
synced 2024-12-02 00:19:15 +08:00
7413d59750
This fixes the calculation of the amount of EC shards a node holds. Previously a global counter was increased, but also used inside the loop to apply disk usage deltas. This led to wrong absolute numbers. The fix is to apply only deltas of single EC shards per iteration.
143 lines
3.7 KiB
Go
143 lines
3.7 KiB
Go
package topology
|
|
|
|
import (
|
|
"github.com/chrislusf/seaweedfs/weed/storage/erasure_coding"
|
|
"github.com/chrislusf/seaweedfs/weed/storage/needle"
|
|
"github.com/chrislusf/seaweedfs/weed/storage/types"
|
|
)
|
|
|
|
func (dn *DataNode) GetEcShards() (ret []*erasure_coding.EcVolumeInfo) {
|
|
dn.RLock()
|
|
for _, c := range dn.children {
|
|
disk := c.(*Disk)
|
|
ret = append(ret, disk.GetEcShards()...)
|
|
}
|
|
dn.RUnlock()
|
|
return ret
|
|
}
|
|
|
|
func (dn *DataNode) UpdateEcShards(actualShards []*erasure_coding.EcVolumeInfo) (newShards, deletedShards []*erasure_coding.EcVolumeInfo) {
|
|
// prepare the new ec shard map
|
|
actualEcShardMap := make(map[needle.VolumeId]*erasure_coding.EcVolumeInfo)
|
|
for _, ecShards := range actualShards {
|
|
actualEcShardMap[ecShards.VolumeId] = ecShards
|
|
}
|
|
|
|
existingEcShards := dn.GetEcShards()
|
|
|
|
// find out the newShards and deletedShards
|
|
var newShardCount, deletedShardCount int
|
|
for _, ecShards := range existingEcShards {
|
|
|
|
disk := dn.getOrCreateDisk(ecShards.DiskType)
|
|
deltaDiskUsages := newDiskUsages()
|
|
deltaDiskUsage := deltaDiskUsages.getOrCreateDisk(types.ToDiskType(ecShards.DiskType))
|
|
|
|
vid := ecShards.VolumeId
|
|
if actualEcShards, ok := actualEcShardMap[vid]; !ok {
|
|
// dn registered ec shards not found in the new set of ec shards
|
|
deletedShards = append(deletedShards, ecShards)
|
|
deletedShardCount += ecShards.ShardIdCount()
|
|
} else {
|
|
// found, but maybe the actual shard could be missing
|
|
a := actualEcShards.Minus(ecShards)
|
|
if a.ShardIdCount() > 0 {
|
|
newShards = append(newShards, a)
|
|
newShardCount += a.ShardIdCount()
|
|
}
|
|
d := ecShards.Minus(actualEcShards)
|
|
if d.ShardIdCount() > 0 {
|
|
deletedShards = append(deletedShards, d)
|
|
deletedShardCount += d.ShardIdCount()
|
|
}
|
|
}
|
|
|
|
deltaDiskUsage.ecShardCount = int64(newShardCount - deletedShardCount)
|
|
disk.UpAdjustDiskUsageDelta(deltaDiskUsages)
|
|
|
|
}
|
|
|
|
for _, ecShards := range actualShards {
|
|
if dn.hasEcShards(ecShards.VolumeId) {
|
|
continue
|
|
}
|
|
|
|
newShards = append(newShards, ecShards)
|
|
|
|
disk := dn.getOrCreateDisk(ecShards.DiskType)
|
|
deltaDiskUsages := newDiskUsages()
|
|
deltaDiskUsage := deltaDiskUsages.getOrCreateDisk(types.ToDiskType(ecShards.DiskType))
|
|
deltaDiskUsage.ecShardCount = int64(ecShards.ShardIdCount())
|
|
disk.UpAdjustDiskUsageDelta(deltaDiskUsages)
|
|
}
|
|
|
|
if len(newShards) > 0 || len(deletedShards) > 0 {
|
|
// if changed, set to the new ec shard map
|
|
dn.doUpdateEcShards(actualShards)
|
|
}
|
|
|
|
return
|
|
}
|
|
|
|
func (dn *DataNode) hasEcShards(volumeId needle.VolumeId) (found bool) {
|
|
dn.RLock()
|
|
defer dn.RUnlock()
|
|
for _, c := range dn.children {
|
|
disk := c.(*Disk)
|
|
_, found = disk.ecShards[volumeId]
|
|
if found {
|
|
return
|
|
}
|
|
}
|
|
return
|
|
}
|
|
|
|
func (dn *DataNode) doUpdateEcShards(actualShards []*erasure_coding.EcVolumeInfo) {
|
|
dn.Lock()
|
|
for _, c := range dn.children {
|
|
disk := c.(*Disk)
|
|
disk.ecShards = make(map[needle.VolumeId]*erasure_coding.EcVolumeInfo)
|
|
}
|
|
for _, shard := range actualShards {
|
|
disk := dn.getOrCreateDisk(shard.DiskType)
|
|
disk.ecShards[shard.VolumeId] = shard
|
|
}
|
|
dn.Unlock()
|
|
}
|
|
|
|
func (dn *DataNode) DeltaUpdateEcShards(newShards, deletedShards []*erasure_coding.EcVolumeInfo) {
|
|
|
|
for _, newShard := range newShards {
|
|
dn.AddOrUpdateEcShard(newShard)
|
|
}
|
|
|
|
for _, deletedShard := range deletedShards {
|
|
dn.DeleteEcShard(deletedShard)
|
|
}
|
|
|
|
}
|
|
|
|
func (dn *DataNode) AddOrUpdateEcShard(s *erasure_coding.EcVolumeInfo) {
|
|
disk := dn.getOrCreateDisk(s.DiskType)
|
|
disk.AddOrUpdateEcShard(s)
|
|
}
|
|
|
|
func (dn *DataNode) DeleteEcShard(s *erasure_coding.EcVolumeInfo) {
|
|
disk := dn.getOrCreateDisk(s.DiskType)
|
|
disk.DeleteEcShard(s)
|
|
}
|
|
|
|
func (dn *DataNode) HasVolumesById(volumeId needle.VolumeId) (hasVolumeId bool) {
|
|
|
|
dn.RLock()
|
|
defer dn.RUnlock()
|
|
for _, c := range dn.children {
|
|
disk := c.(*Disk)
|
|
if disk.HasVolumesById(volumeId) {
|
|
return true
|
|
}
|
|
}
|
|
return false
|
|
|
|
}
|