From eae3f27c8021ef1903419de43c68d2dc62a09953 Mon Sep 17 00:00:00 2001 From: James Hartig Date: Wed, 1 Apr 2020 15:18:40 -0400 Subject: [PATCH] Added treat_replication_as_minimums master toml option --- weed/command/scaffold.go | 8 ++++++++ weed/server/master_server.go | 5 ++++- weed/topology/collection.go | 11 ++++++++--- weed/topology/topology.go | 8 +++++--- weed/topology/topology_test.go | 4 ++-- weed/topology/volume_growth_test.go | 2 +- weed/topology/volume_layout.go | 14 +++++++++++--- 7 files changed, 39 insertions(+), 13 deletions(-) diff --git a/weed/command/scaffold.go b/weed/command/scaffold.go index 9f119a638..e391c23ea 100644 --- a/weed/command/scaffold.go +++ b/weed/command/scaffold.go @@ -394,5 +394,13 @@ copy_2 = 6 # create 2 x 6 = 12 actual volumes copy_3 = 3 # create 3 x 3 = 9 actual volumes copy_other = 1 # create n x 1 = n actual volumes +# configuration flags for replication +[master.replication] +# any replication counts should be considered minimums. If you specify 010 and +# have 3 different racks, that's still considered writable. Writes will still +# try to replicate to all available volumes. You should only use this option +# if you are doing your own replication or periodic sync of volumes. +treat_replication_as_minimums = false + ` ) diff --git a/weed/server/master_server.go b/weed/server/master_server.go index a9ae6b888..497990f29 100644 --- a/weed/server/master_server.go +++ b/weed/server/master_server.go @@ -77,6 +77,9 @@ func NewMasterServer(r *mux.Router, option *MasterOption, peers []string) *Maste v.SetDefault("jwt.signing.read.expires_after_seconds", 60) readExpiresAfterSec := v.GetInt("jwt.signing.read.expires_after_seconds") + v.SetDefault("master.replication.treat_replication_as_minimums", false) + replicationAsMin := v.GetBool("master.replication.treat_replication_as_minimums") + var preallocateSize int64 if option.VolumePreallocate { preallocateSize = int64(option.VolumeSizeLimitMB) * (1 << 20) @@ -96,7 +99,7 @@ func NewMasterServer(r *mux.Router, option *MasterOption, peers []string) *Maste if nil == seq { glog.Fatalf("create sequencer failed.") } - ms.Topo = topology.NewTopology("topo", seq, uint64(ms.option.VolumeSizeLimitMB)*1024*1024, ms.option.PulseSeconds) + ms.Topo = topology.NewTopology("topo", seq, uint64(ms.option.VolumeSizeLimitMB)*1024*1024, ms.option.PulseSeconds, replicationAsMin) ms.vg = topology.NewDefaultVolumeGrowth() glog.V(0).Infoln("Volume Size Limit is", ms.option.VolumeSizeLimitMB, "MB") diff --git a/weed/topology/collection.go b/weed/topology/collection.go index 7a611d904..5b410d1eb 100644 --- a/weed/topology/collection.go +++ b/weed/topology/collection.go @@ -11,11 +11,16 @@ import ( type Collection struct { Name string volumeSizeLimit uint64 + replicationAsMin bool storageType2VolumeLayout *util.ConcurrentReadMap } -func NewCollection(name string, volumeSizeLimit uint64) *Collection { - c := &Collection{Name: name, volumeSizeLimit: volumeSizeLimit} +func NewCollection(name string, volumeSizeLimit uint64, replicationAsMin bool) *Collection { + c := &Collection{ + Name: name, + volumeSizeLimit: volumeSizeLimit, + replicationAsMin: replicationAsMin, + } c.storageType2VolumeLayout = util.NewConcurrentReadMap() return c } @@ -30,7 +35,7 @@ func (c *Collection) GetOrCreateVolumeLayout(rp *super_block.ReplicaPlacement, t keyString += ttl.String() } vl := c.storageType2VolumeLayout.Get(keyString, func() interface{} { - return NewVolumeLayout(rp, ttl, c.volumeSizeLimit) + return NewVolumeLayout(rp, ttl, c.volumeSizeLimit, c.replicationAsMin) }) return vl.(*VolumeLayout) } diff --git a/weed/topology/topology.go b/weed/topology/topology.go index fbf998707..c24cab9d6 100644 --- a/weed/topology/topology.go +++ b/weed/topology/topology.go @@ -27,7 +27,8 @@ type Topology struct { pulse int64 - volumeSizeLimit uint64 + volumeSizeLimit uint64 + replicationAsMin bool Sequence sequence.Sequencer @@ -38,7 +39,7 @@ type Topology struct { RaftServer raft.Server } -func NewTopology(id string, seq sequence.Sequencer, volumeSizeLimit uint64, pulse int) *Topology { +func NewTopology(id string, seq sequence.Sequencer, volumeSizeLimit uint64, pulse int, replicationAsMin bool) *Topology { t := &Topology{} t.id = NodeId(id) t.nodeType = "Topology" @@ -48,6 +49,7 @@ func NewTopology(id string, seq sequence.Sequencer, volumeSizeLimit uint64, puls t.ecShardMap = make(map[needle.VolumeId]*EcShardLocations) t.pulse = int64(pulse) t.volumeSizeLimit = volumeSizeLimit + t.replicationAsMin = replicationAsMin t.Sequence = seq @@ -138,7 +140,7 @@ func (t *Topology) PickForWrite(count uint64, option *VolumeGrowOption) (string, func (t *Topology) GetVolumeLayout(collectionName string, rp *super_block.ReplicaPlacement, ttl *needle.TTL) *VolumeLayout { return t.collectionMap.Get(collectionName, func() interface{} { - return NewCollection(collectionName, t.volumeSizeLimit) + return NewCollection(collectionName, t.volumeSizeLimit, t.replicationAsMin) }).(*Collection).GetOrCreateVolumeLayout(rp, ttl) } diff --git a/weed/topology/topology_test.go b/weed/topology/topology_test.go index e7676ccf7..2fe381ca2 100644 --- a/weed/topology/topology_test.go +++ b/weed/topology/topology_test.go @@ -23,7 +23,7 @@ func TestRemoveDataCenter(t *testing.T) { } func TestHandlingVolumeServerHeartbeat(t *testing.T) { - topo := NewTopology("weedfs", sequence.NewMemorySequencer(), 32*1024, 5) + topo := NewTopology("weedfs", sequence.NewMemorySequencer(), 32*1024, 5, false) dc := topo.GetOrCreateDataCenter("dc1") rack := dc.GetOrCreateRack("rack1") @@ -140,7 +140,7 @@ func assert(t *testing.T, message string, actual, expected int) { func TestAddRemoveVolume(t *testing.T) { - topo := NewTopology("weedfs", sequence.NewMemorySequencer(), 32*1024, 5) + topo := NewTopology("weedfs", sequence.NewMemorySequencer(), 32*1024, 5, false) dc := topo.GetOrCreateDataCenter("dc1") rack := dc.GetOrCreateRack("rack1") diff --git a/weed/topology/volume_growth_test.go b/weed/topology/volume_growth_test.go index 6ff5be0eb..bc9083fd2 100644 --- a/weed/topology/volume_growth_test.go +++ b/weed/topology/volume_growth_test.go @@ -81,7 +81,7 @@ func setup(topologyLayout string) *Topology { fmt.Println("data:", data) //need to connect all nodes first before server adding volumes - topo := NewTopology("weedfs", sequence.NewMemorySequencer(), 32*1024, 5) + topo := NewTopology("weedfs", sequence.NewMemorySequencer(), 32*1024, 5, false) mTopology := data.(map[string]interface{}) for dcKey, dcValue := range mTopology { dc := NewDataCenter(dcKey) diff --git a/weed/topology/volume_layout.go b/weed/topology/volume_layout.go index 908bbb9e9..9e84fd2da 100644 --- a/weed/topology/volume_layout.go +++ b/weed/topology/volume_layout.go @@ -22,6 +22,7 @@ type VolumeLayout struct { readonlyVolumes map[needle.VolumeId]bool // transient set of readonly volumes oversizedVolumes map[needle.VolumeId]bool // set of oversized volumes volumeSizeLimit uint64 + replicationAsMin bool accessLock sync.RWMutex } @@ -31,7 +32,7 @@ type VolumeLayoutStats struct { FileCount uint64 } -func NewVolumeLayout(rp *super_block.ReplicaPlacement, ttl *needle.TTL, volumeSizeLimit uint64) *VolumeLayout { +func NewVolumeLayout(rp *super_block.ReplicaPlacement, ttl *needle.TTL, volumeSizeLimit uint64, replicationAsMin bool) *VolumeLayout { return &VolumeLayout{ rp: rp, ttl: ttl, @@ -40,6 +41,7 @@ func NewVolumeLayout(rp *super_block.ReplicaPlacement, ttl *needle.TTL, volumeSi readonlyVolumes: make(map[needle.VolumeId]bool), oversizedVolumes: make(map[needle.VolumeId]bool), volumeSizeLimit: volumeSizeLimit, + replicationAsMin: replicationAsMin, } } @@ -107,7 +109,7 @@ func (vl *VolumeLayout) UnRegisterVolume(v *storage.VolumeInfo, dn *DataNode) { } func (vl *VolumeLayout) ensureCorrectWritables(v *storage.VolumeInfo) { - if vl.vid2location[v.Id].Length() == vl.rp.GetCopyCount() && vl.isWritable(v) { + if vl.enoughCopies(v.Id) && vl.isWritable(v) { if _, ok := vl.oversizedVolumes[v.Id]; !ok { vl.setVolumeWritable(v.Id) } @@ -272,12 +274,18 @@ func (vl *VolumeLayout) SetVolumeAvailable(dn *DataNode, vid needle.VolumeId, is return false } - if vl.vid2location[vid].Length() == vl.rp.GetCopyCount() { + if vl.enoughCopies(vid) { return vl.setVolumeWritable(vid) } return false } +func (vl *VolumeLayout) enoughCopies(vid needle.VolumeId) bool { + locations := vl.vid2location[vid].Length() + desired := vl.rp.GetCopyCount() + return locations == desired || (vl.replicationAsMin && locations > desired) +} + func (vl *VolumeLayout) SetVolumeCapacityFull(vid needle.VolumeId) bool { vl.accessLock.Lock() defer vl.accessLock.Unlock()