From a7b1b23c58cbde798945be9caa6144c9e3399cf2 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Wed, 17 Jul 2019 00:03:05 -0700 Subject: [PATCH 1/7] fix wrong volume count fix https://github.com/chrislusf/seaweedfs/issues/1013 --- weed/storage/erasure_coding/ec_shard.go | 2 +- weed/storage/volume_vacuum.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/weed/storage/erasure_coding/ec_shard.go b/weed/storage/erasure_coding/ec_shard.go index b280157b8..47e6d3d1e 100644 --- a/weed/storage/erasure_coding/ec_shard.go +++ b/weed/storage/erasure_coding/ec_shard.go @@ -81,7 +81,7 @@ func (shard *EcVolumeShard) Close() { func (shard *EcVolumeShard) Destroy() { os.Remove(shard.FileName() + ToExt(int(shard.ShardId))) - stats.VolumeServerVolumeCounter.WithLabelValues(shard.Collection, "ec_shards").Inc() + stats.VolumeServerVolumeCounter.WithLabelValues(shard.Collection, "ec_shards").Dec() } func (shard *EcVolumeShard) ReadAt(buf []byte, offset int64) (int, error) { diff --git a/weed/storage/volume_vacuum.go b/weed/storage/volume_vacuum.go index cc7c7d6e6..3bb306649 100644 --- a/weed/storage/volume_vacuum.go +++ b/weed/storage/volume_vacuum.go @@ -53,7 +53,7 @@ func (v *Volume) CommitCompact() error { glog.V(0).Infof("fail to close volume %d", v.Id) } v.dataFile = nil - stats.VolumeServerVolumeCounter.WithLabelValues(v.Collection, "volume").Inc() + stats.VolumeServerVolumeCounter.WithLabelValues(v.Collection, "volume").Dec() var e error if e = v.makeupDiff(v.FileName()+".cpd", v.FileName()+".cpx", v.FileName()+".dat", v.FileName()+".idx"); e != nil { From 0ea98dc6a20d4914fece92a25815ed7a9f4555a2 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Wed, 17 Jul 2019 00:39:36 -0700 Subject: [PATCH 2/7] disable azure due to api changes --- weed/replication/sub/notification_gocdk_pub_sub.go | 2 +- weed/server/filer_server.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/weed/replication/sub/notification_gocdk_pub_sub.go b/weed/replication/sub/notification_gocdk_pub_sub.go index 9c76e6918..eddba9ff8 100644 --- a/weed/replication/sub/notification_gocdk_pub_sub.go +++ b/weed/replication/sub/notification_gocdk_pub_sub.go @@ -9,7 +9,7 @@ import ( "github.com/golang/protobuf/proto" "gocloud.dev/pubsub" _ "gocloud.dev/pubsub/awssnssqs" - _ "gocloud.dev/pubsub/azuresb" + // _ "gocloud.dev/pubsub/azuresb" _ "gocloud.dev/pubsub/gcppubsub" _ "gocloud.dev/pubsub/natspubsub" _ "gocloud.dev/pubsub/rabbitpubsub" diff --git a/weed/server/filer_server.go b/weed/server/filer_server.go index b9e6aa23d..2c2846ea9 100644 --- a/weed/server/filer_server.go +++ b/weed/server/filer_server.go @@ -24,7 +24,7 @@ import ( "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/notification" _ "github.com/chrislusf/seaweedfs/weed/notification/aws_sqs" - _ "github.com/chrislusf/seaweedfs/weed/notification/gocdk_pub_sub" + // _ "github.com/chrislusf/seaweedfs/weed/notification/gocdk_pub_sub" _ "github.com/chrislusf/seaweedfs/weed/notification/google_pub_sub" _ "github.com/chrislusf/seaweedfs/weed/notification/kafka" _ "github.com/chrislusf/seaweedfs/weed/notification/log" From aff911c00d2934cf0841856b0aff05f909ab93c8 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Wed, 17 Jul 2019 01:24:20 -0700 Subject: [PATCH 3/7] skip all azuresb --- weed/notification/gocdk_pub_sub/gocdk_pub_sub.go | 2 +- weed/server/filer_server.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/weed/notification/gocdk_pub_sub/gocdk_pub_sub.go b/weed/notification/gocdk_pub_sub/gocdk_pub_sub.go index 94a413ac0..ebf44ea6f 100644 --- a/weed/notification/gocdk_pub_sub/gocdk_pub_sub.go +++ b/weed/notification/gocdk_pub_sub/gocdk_pub_sub.go @@ -24,7 +24,7 @@ import ( "github.com/golang/protobuf/proto" "gocloud.dev/pubsub" _ "gocloud.dev/pubsub/awssnssqs" - _ "gocloud.dev/pubsub/azuresb" + // _ "gocloud.dev/pubsub/azuresb" _ "gocloud.dev/pubsub/gcppubsub" _ "gocloud.dev/pubsub/natspubsub" _ "gocloud.dev/pubsub/rabbitpubsub" diff --git a/weed/server/filer_server.go b/weed/server/filer_server.go index 2c2846ea9..b9e6aa23d 100644 --- a/weed/server/filer_server.go +++ b/weed/server/filer_server.go @@ -24,7 +24,7 @@ import ( "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/notification" _ "github.com/chrislusf/seaweedfs/weed/notification/aws_sqs" - // _ "github.com/chrislusf/seaweedfs/weed/notification/gocdk_pub_sub" + _ "github.com/chrislusf/seaweedfs/weed/notification/gocdk_pub_sub" _ "github.com/chrislusf/seaweedfs/weed/notification/google_pub_sub" _ "github.com/chrislusf/seaweedfs/weed/notification/kafka" _ "github.com/chrislusf/seaweedfs/weed/notification/log" From c33f4239557f9031c3390a4bc38b7b4623c4941d Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Wed, 17 Jul 2019 23:22:01 -0700 Subject: [PATCH 4/7] stop early if compaction fails fix https://github.com/chrislusf/seaweedfs/issues/1015 --- weed/storage/volume_read_write.go | 1 + 1 file changed, 1 insertion(+) diff --git a/weed/storage/volume_read_write.go b/weed/storage/volume_read_write.go index 93ce1eab9..fbc818905 100644 --- a/weed/storage/volume_read_write.go +++ b/weed/storage/volume_read_write.go @@ -224,6 +224,7 @@ func ScanVolumeFileFrom(version needle.Version, dataFile *os.File, offset int64, } if err != nil { glog.V(0).Infof("visit needle error: %v", err) + return fmt.Errorf("visit needle error: %v", err) } offset += NeedleHeaderSize + rest glog.V(4).Infof("==> new entry offset %d", offset) From 0264a7f50afe61f22b9dcf8496a57a916e2568b7 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Wed, 17 Jul 2019 23:23:01 -0700 Subject: [PATCH 5/7] set the max file key when ever possible --- weed/server/master_grpc_server.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/weed/server/master_grpc_server.go b/weed/server/master_grpc_server.go index 1a17327a0..e0d1fd174 100644 --- a/weed/server/master_grpc_server.go +++ b/weed/server/master_grpc_server.go @@ -52,8 +52,9 @@ func (ms *MasterServer) SendHeartbeat(stream master_pb.Seaweed_SendHeartbeatServ return err } + t.Sequence.SetMax(heartbeat.MaxFileKey) + if dn == nil { - t.Sequence.SetMax(heartbeat.MaxFileKey) if heartbeat.Ip == "" { if pr, ok := peer.FromContext(stream.Context()); ok { if pr.Addr != net.Addr(nil) { From 898d943b25975e83dff81bb532243cee823a47af Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Wed, 17 Jul 2019 23:43:48 -0700 Subject: [PATCH 6/7] refactoring --- weed/server/volume_grpc_batch_delete.go | 2 +- weed/server/volume_grpc_tail.go | 2 +- weed/storage/store.go | 4 ++-- weed/topology/store_replicate.go | 4 ++-- 4 files changed, 6 insertions(+), 6 deletions(-) diff --git a/weed/server/volume_grpc_batch_delete.go b/weed/server/volume_grpc_batch_delete.go index d7fbb6edf..fdb7937d2 100644 --- a/weed/server/volume_grpc_batch_delete.go +++ b/weed/server/volume_grpc_batch_delete.go @@ -58,7 +58,7 @@ func (vs *VolumeServer) BatchDelete(ctx context.Context, req *volume_server_pb.B break } n.LastModified = now - if size, err := vs.store.Delete(volumeId, n); err != nil { + if size, err := vs.store.DeleteVolumeNeedle(volumeId, n); err != nil { resp.Results = append(resp.Results, &volume_server_pb.DeleteResult{ FileId: fid, Status: http.StatusInternalServerError, diff --git a/weed/server/volume_grpc_tail.go b/weed/server/volume_grpc_tail.go index 698bad5b8..34c55a599 100644 --- a/weed/server/volume_grpc_tail.go +++ b/weed/server/volume_grpc_tail.go @@ -110,7 +110,7 @@ func (vs *VolumeServer) VolumeTailReceiver(ctx context.Context, req *volume_serv defer glog.V(1).Infof("receive tailing volume %d finished", v.Id) return resp, operation.TailVolumeFromSource(req.SourceVolumeServer, vs.grpcDialOption, v.Id, req.SinceNs, int(req.IdleTimeoutSeconds), func(n *needle.Needle) error { - _, _, err := vs.store.Write(v.Id, n) + _, _, err := vs.store.WriteVolumeNeedle(v.Id, n) return err }) diff --git a/weed/storage/store.go b/weed/storage/store.go index d2dd76d52..e99d77774 100644 --- a/weed/storage/store.go +++ b/weed/storage/store.go @@ -211,7 +211,7 @@ func (s *Store) Close() { } } -func (s *Store) Write(i needle.VolumeId, n *needle.Needle) (size uint32, isUnchanged bool, err error) { +func (s *Store) WriteVolumeNeedle(i needle.VolumeId, n *needle.Needle) (size uint32, isUnchanged bool, err error) { if v := s.findVolume(i); v != nil { if v.readOnly { err = fmt.Errorf("volume %d is read only", i) @@ -230,7 +230,7 @@ func (s *Store) Write(i needle.VolumeId, n *needle.Needle) (size uint32, isUncha return } -func (s *Store) Delete(i needle.VolumeId, n *needle.Needle) (uint32, error) { +func (s *Store) DeleteVolumeNeedle(i needle.VolumeId, n *needle.Needle) (uint32, error) { if v := s.findVolume(i); v != nil && !v.readOnly { return v.deleteNeedle(n) } diff --git a/weed/topology/store_replicate.go b/weed/topology/store_replicate.go index d4076d548..d21c4d210 100644 --- a/weed/topology/store_replicate.go +++ b/weed/topology/store_replicate.go @@ -25,7 +25,7 @@ func ReplicatedWrite(masterNode string, s *storage.Store, //check JWT jwt := security.GetJwt(r) - size, isUnchanged, err = s.Write(volumeId, n) + size, isUnchanged, err = s.WriteVolumeNeedle(volumeId, n) if err != nil { err = fmt.Errorf("failed to write to local disk: %v", err) return @@ -89,7 +89,7 @@ func ReplicatedDelete(masterNode string, store *storage.Store, //check JWT jwt := security.GetJwt(r) - ret, err := store.Delete(volumeId, n) + ret, err := store.DeleteVolumeNeedle(volumeId, n) if err != nil { glog.V(0).Infoln("delete error:", err) return ret, err From c54d9221b90676b499830f27064cc7ef8f6cfdfd Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Wed, 17 Jul 2019 23:57:34 -0700 Subject: [PATCH 7/7] write requests also checks cookie if overwrites protect against edge cases, avoid https://github.com/chrislusf/seaweedfs/issues/1014 --- weed/storage/volume_read_write.go | 18 +++++++++++++++++- 1 file changed, 17 insertions(+), 1 deletion(-) diff --git a/weed/storage/volume_read_write.go b/weed/storage/volume_read_write.go index fbc818905..0327e5a1f 100644 --- a/weed/storage/volume_read_write.go +++ b/weed/storage/volume_read_write.go @@ -96,13 +96,29 @@ func (v *Volume) writeNeedle(n *needle.Needle) (offset uint64, size uint32, isUn n.Ttl = v.Ttl } + // check whether existing needle cookie matches + nv, ok := v.nm.Get(n.Id) + if ok { + existingNeedle, _, _, existingNeedleReadErr := needle.ReadNeedleHeader(v.dataFile, v.Version(), nv.Offset.ToAcutalOffset()) + if existingNeedleReadErr != nil { + err = fmt.Errorf("reading existing needle: %v", existingNeedleReadErr) + return + } + if existingNeedle.Cookie != n.Cookie { + glog.V(0).Infof("write cookie mismatch: existing %x, new %x", existingNeedle.Cookie, n.Cookie) + err = fmt.Errorf("mismatching cookie %x", n.Cookie) + return + } + } + + // append to dat file n.AppendAtNs = uint64(time.Now().UnixNano()) if offset, size, _, err = n.Append(v.dataFile, v.Version()); err != nil { return } v.lastAppendAtNs = n.AppendAtNs - nv, ok := v.nm.Get(n.Id) + // add to needle map if !ok || uint64(nv.Offset.ToAcutalOffset()) < offset { if err = v.nm.Put(n.Id, ToOffset(int64(offset)), n.Size); err != nil { glog.V(4).Infof("failed to save in needle map %d: %v", n.Id, err)