mirror of
https://github.com/seaweedfs/seaweedfs.git
synced 2025-01-21 16:43:23 +08:00
bc888226fc
If an EC shard is created but not spread to other servers, the masterclient would think this shard is not located here.
143 lines
3.7 KiB
Go
143 lines
3.7 KiB
Go
package topology
|
|
|
|
import (
|
|
"github.com/chrislusf/seaweedfs/weed/storage/erasure_coding"
|
|
"github.com/chrislusf/seaweedfs/weed/storage/needle"
|
|
"github.com/chrislusf/seaweedfs/weed/storage/types"
|
|
)
|
|
|
|
func (dn *DataNode) GetEcShards() (ret []*erasure_coding.EcVolumeInfo) {
|
|
dn.RLock()
|
|
for _, c := range dn.children {
|
|
disk := c.(*Disk)
|
|
ret = append(ret, disk.GetEcShards()...)
|
|
}
|
|
dn.RUnlock()
|
|
return ret
|
|
}
|
|
|
|
func (dn *DataNode) UpdateEcShards(actualShards []*erasure_coding.EcVolumeInfo) (newShards, deletedShards []*erasure_coding.EcVolumeInfo) {
|
|
// prepare the new ec shard map
|
|
actualEcShardMap := make(map[needle.VolumeId]*erasure_coding.EcVolumeInfo)
|
|
for _, ecShards := range actualShards {
|
|
actualEcShardMap[ecShards.VolumeId] = ecShards
|
|
}
|
|
|
|
existingEcShards := dn.GetEcShards()
|
|
|
|
// find out the newShards and deletedShards
|
|
var newShardCount, deletedShardCount int
|
|
for _, ecShards := range existingEcShards {
|
|
|
|
disk := dn.getOrCreateDisk(ecShards.DiskType)
|
|
deltaDiskUsages := newDiskUsages()
|
|
deltaDiskUsage := deltaDiskUsages.getOrCreateDisk(types.ToDiskType(ecShards.DiskType))
|
|
|
|
vid := ecShards.VolumeId
|
|
if actualEcShards, ok := actualEcShardMap[vid]; !ok {
|
|
// dn registered ec shards not found in the new set of ec shards
|
|
deletedShards = append(deletedShards, ecShards)
|
|
deletedShardCount += ecShards.ShardIdCount()
|
|
} else {
|
|
// found, but maybe the actual shard could be missing
|
|
a := actualEcShards.Minus(ecShards)
|
|
if a.ShardIdCount() > 0 {
|
|
newShards = append(newShards, a)
|
|
newShardCount += a.ShardIdCount()
|
|
}
|
|
d := ecShards.Minus(actualEcShards)
|
|
if d.ShardIdCount() > 0 {
|
|
deletedShards = append(deletedShards, d)
|
|
deletedShardCount += d.ShardIdCount()
|
|
}
|
|
}
|
|
|
|
deltaDiskUsage.ecShardCount = int64(newShardCount - deletedShardCount)
|
|
disk.UpAdjustDiskUsageDelta(deltaDiskUsages)
|
|
|
|
}
|
|
|
|
for _, ecShards := range actualShards {
|
|
if dn.HasEcShards(ecShards.VolumeId) {
|
|
continue
|
|
}
|
|
|
|
newShards = append(newShards, ecShards)
|
|
|
|
disk := dn.getOrCreateDisk(ecShards.DiskType)
|
|
deltaDiskUsages := newDiskUsages()
|
|
deltaDiskUsage := deltaDiskUsages.getOrCreateDisk(types.ToDiskType(ecShards.DiskType))
|
|
deltaDiskUsage.ecShardCount = int64(ecShards.ShardIdCount())
|
|
disk.UpAdjustDiskUsageDelta(deltaDiskUsages)
|
|
}
|
|
|
|
if len(newShards) > 0 || len(deletedShards) > 0 {
|
|
// if changed, set to the new ec shard map
|
|
dn.doUpdateEcShards(actualShards)
|
|
}
|
|
|
|
return
|
|
}
|
|
|
|
func (dn *DataNode) HasEcShards(volumeId needle.VolumeId) (found bool) {
|
|
dn.RLock()
|
|
defer dn.RUnlock()
|
|
for _, c := range dn.children {
|
|
disk := c.(*Disk)
|
|
_, found = disk.ecShards[volumeId]
|
|
if found {
|
|
return
|
|
}
|
|
}
|
|
return
|
|
}
|
|
|
|
func (dn *DataNode) doUpdateEcShards(actualShards []*erasure_coding.EcVolumeInfo) {
|
|
dn.Lock()
|
|
for _, c := range dn.children {
|
|
disk := c.(*Disk)
|
|
disk.ecShards = make(map[needle.VolumeId]*erasure_coding.EcVolumeInfo)
|
|
}
|
|
for _, shard := range actualShards {
|
|
disk := dn.getOrCreateDisk(shard.DiskType)
|
|
disk.ecShards[shard.VolumeId] = shard
|
|
}
|
|
dn.Unlock()
|
|
}
|
|
|
|
func (dn *DataNode) DeltaUpdateEcShards(newShards, deletedShards []*erasure_coding.EcVolumeInfo) {
|
|
|
|
for _, newShard := range newShards {
|
|
dn.AddOrUpdateEcShard(newShard)
|
|
}
|
|
|
|
for _, deletedShard := range deletedShards {
|
|
dn.DeleteEcShard(deletedShard)
|
|
}
|
|
|
|
}
|
|
|
|
func (dn *DataNode) AddOrUpdateEcShard(s *erasure_coding.EcVolumeInfo) {
|
|
disk := dn.getOrCreateDisk(s.DiskType)
|
|
disk.AddOrUpdateEcShard(s)
|
|
}
|
|
|
|
func (dn *DataNode) DeleteEcShard(s *erasure_coding.EcVolumeInfo) {
|
|
disk := dn.getOrCreateDisk(s.DiskType)
|
|
disk.DeleteEcShard(s)
|
|
}
|
|
|
|
func (dn *DataNode) HasVolumesById(volumeId needle.VolumeId) (hasVolumeId bool) {
|
|
|
|
dn.RLock()
|
|
defer dn.RUnlock()
|
|
for _, c := range dn.children {
|
|
disk := c.(*Disk)
|
|
if disk.HasVolumesById(volumeId) {
|
|
return true
|
|
}
|
|
}
|
|
return false
|
|
|
|
}
|