mirror of
https://github.com/seaweedfs/seaweedfs.git
synced 2024-12-25 10:17:55 +08:00
commit
6a573d62e0
@ -24,7 +24,7 @@ import (
|
|||||||
"github.com/golang/protobuf/proto"
|
"github.com/golang/protobuf/proto"
|
||||||
"gocloud.dev/pubsub"
|
"gocloud.dev/pubsub"
|
||||||
_ "gocloud.dev/pubsub/awssnssqs"
|
_ "gocloud.dev/pubsub/awssnssqs"
|
||||||
_ "gocloud.dev/pubsub/azuresb"
|
// _ "gocloud.dev/pubsub/azuresb"
|
||||||
_ "gocloud.dev/pubsub/gcppubsub"
|
_ "gocloud.dev/pubsub/gcppubsub"
|
||||||
_ "gocloud.dev/pubsub/natspubsub"
|
_ "gocloud.dev/pubsub/natspubsub"
|
||||||
_ "gocloud.dev/pubsub/rabbitpubsub"
|
_ "gocloud.dev/pubsub/rabbitpubsub"
|
||||||
|
@ -9,7 +9,7 @@ import (
|
|||||||
"github.com/golang/protobuf/proto"
|
"github.com/golang/protobuf/proto"
|
||||||
"gocloud.dev/pubsub"
|
"gocloud.dev/pubsub"
|
||||||
_ "gocloud.dev/pubsub/awssnssqs"
|
_ "gocloud.dev/pubsub/awssnssqs"
|
||||||
_ "gocloud.dev/pubsub/azuresb"
|
// _ "gocloud.dev/pubsub/azuresb"
|
||||||
_ "gocloud.dev/pubsub/gcppubsub"
|
_ "gocloud.dev/pubsub/gcppubsub"
|
||||||
_ "gocloud.dev/pubsub/natspubsub"
|
_ "gocloud.dev/pubsub/natspubsub"
|
||||||
_ "gocloud.dev/pubsub/rabbitpubsub"
|
_ "gocloud.dev/pubsub/rabbitpubsub"
|
||||||
|
@ -52,8 +52,9 @@ func (ms *MasterServer) SendHeartbeat(stream master_pb.Seaweed_SendHeartbeatServ
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
t.Sequence.SetMax(heartbeat.MaxFileKey)
|
||||||
|
|
||||||
if dn == nil {
|
if dn == nil {
|
||||||
t.Sequence.SetMax(heartbeat.MaxFileKey)
|
|
||||||
if heartbeat.Ip == "" {
|
if heartbeat.Ip == "" {
|
||||||
if pr, ok := peer.FromContext(stream.Context()); ok {
|
if pr, ok := peer.FromContext(stream.Context()); ok {
|
||||||
if pr.Addr != net.Addr(nil) {
|
if pr.Addr != net.Addr(nil) {
|
||||||
|
@ -58,7 +58,7 @@ func (vs *VolumeServer) BatchDelete(ctx context.Context, req *volume_server_pb.B
|
|||||||
break
|
break
|
||||||
}
|
}
|
||||||
n.LastModified = now
|
n.LastModified = now
|
||||||
if size, err := vs.store.Delete(volumeId, n); err != nil {
|
if size, err := vs.store.DeleteVolumeNeedle(volumeId, n); err != nil {
|
||||||
resp.Results = append(resp.Results, &volume_server_pb.DeleteResult{
|
resp.Results = append(resp.Results, &volume_server_pb.DeleteResult{
|
||||||
FileId: fid,
|
FileId: fid,
|
||||||
Status: http.StatusInternalServerError,
|
Status: http.StatusInternalServerError,
|
||||||
|
@ -110,7 +110,7 @@ func (vs *VolumeServer) VolumeTailReceiver(ctx context.Context, req *volume_serv
|
|||||||
defer glog.V(1).Infof("receive tailing volume %d finished", v.Id)
|
defer glog.V(1).Infof("receive tailing volume %d finished", v.Id)
|
||||||
|
|
||||||
return resp, operation.TailVolumeFromSource(req.SourceVolumeServer, vs.grpcDialOption, v.Id, req.SinceNs, int(req.IdleTimeoutSeconds), func(n *needle.Needle) error {
|
return resp, operation.TailVolumeFromSource(req.SourceVolumeServer, vs.grpcDialOption, v.Id, req.SinceNs, int(req.IdleTimeoutSeconds), func(n *needle.Needle) error {
|
||||||
_, _, err := vs.store.Write(v.Id, n)
|
_, _, err := vs.store.WriteVolumeNeedle(v.Id, n)
|
||||||
return err
|
return err
|
||||||
})
|
})
|
||||||
|
|
||||||
|
@ -81,7 +81,7 @@ func (shard *EcVolumeShard) Close() {
|
|||||||
|
|
||||||
func (shard *EcVolumeShard) Destroy() {
|
func (shard *EcVolumeShard) Destroy() {
|
||||||
os.Remove(shard.FileName() + ToExt(int(shard.ShardId)))
|
os.Remove(shard.FileName() + ToExt(int(shard.ShardId)))
|
||||||
stats.VolumeServerVolumeCounter.WithLabelValues(shard.Collection, "ec_shards").Inc()
|
stats.VolumeServerVolumeCounter.WithLabelValues(shard.Collection, "ec_shards").Dec()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (shard *EcVolumeShard) ReadAt(buf []byte, offset int64) (int, error) {
|
func (shard *EcVolumeShard) ReadAt(buf []byte, offset int64) (int, error) {
|
||||||
|
@ -211,7 +211,7 @@ func (s *Store) Close() {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *Store) Write(i needle.VolumeId, n *needle.Needle) (size uint32, isUnchanged bool, err error) {
|
func (s *Store) WriteVolumeNeedle(i needle.VolumeId, n *needle.Needle) (size uint32, isUnchanged bool, err error) {
|
||||||
if v := s.findVolume(i); v != nil {
|
if v := s.findVolume(i); v != nil {
|
||||||
if v.readOnly {
|
if v.readOnly {
|
||||||
err = fmt.Errorf("volume %d is read only", i)
|
err = fmt.Errorf("volume %d is read only", i)
|
||||||
@ -230,7 +230,7 @@ func (s *Store) Write(i needle.VolumeId, n *needle.Needle) (size uint32, isUncha
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *Store) Delete(i needle.VolumeId, n *needle.Needle) (uint32, error) {
|
func (s *Store) DeleteVolumeNeedle(i needle.VolumeId, n *needle.Needle) (uint32, error) {
|
||||||
if v := s.findVolume(i); v != nil && !v.readOnly {
|
if v := s.findVolume(i); v != nil && !v.readOnly {
|
||||||
return v.deleteNeedle(n)
|
return v.deleteNeedle(n)
|
||||||
}
|
}
|
||||||
|
@ -96,13 +96,29 @@ func (v *Volume) writeNeedle(n *needle.Needle) (offset uint64, size uint32, isUn
|
|||||||
n.Ttl = v.Ttl
|
n.Ttl = v.Ttl
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// check whether existing needle cookie matches
|
||||||
|
nv, ok := v.nm.Get(n.Id)
|
||||||
|
if ok {
|
||||||
|
existingNeedle, _, _, existingNeedleReadErr := needle.ReadNeedleHeader(v.dataFile, v.Version(), nv.Offset.ToAcutalOffset())
|
||||||
|
if existingNeedleReadErr != nil {
|
||||||
|
err = fmt.Errorf("reading existing needle: %v", existingNeedleReadErr)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if existingNeedle.Cookie != n.Cookie {
|
||||||
|
glog.V(0).Infof("write cookie mismatch: existing %x, new %x", existingNeedle.Cookie, n.Cookie)
|
||||||
|
err = fmt.Errorf("mismatching cookie %x", n.Cookie)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// append to dat file
|
||||||
n.AppendAtNs = uint64(time.Now().UnixNano())
|
n.AppendAtNs = uint64(time.Now().UnixNano())
|
||||||
if offset, size, _, err = n.Append(v.dataFile, v.Version()); err != nil {
|
if offset, size, _, err = n.Append(v.dataFile, v.Version()); err != nil {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
v.lastAppendAtNs = n.AppendAtNs
|
v.lastAppendAtNs = n.AppendAtNs
|
||||||
|
|
||||||
nv, ok := v.nm.Get(n.Id)
|
// add to needle map
|
||||||
if !ok || uint64(nv.Offset.ToAcutalOffset()) < offset {
|
if !ok || uint64(nv.Offset.ToAcutalOffset()) < offset {
|
||||||
if err = v.nm.Put(n.Id, ToOffset(int64(offset)), n.Size); err != nil {
|
if err = v.nm.Put(n.Id, ToOffset(int64(offset)), n.Size); err != nil {
|
||||||
glog.V(4).Infof("failed to save in needle map %d: %v", n.Id, err)
|
glog.V(4).Infof("failed to save in needle map %d: %v", n.Id, err)
|
||||||
@ -224,6 +240,7 @@ func ScanVolumeFileFrom(version needle.Version, dataFile *os.File, offset int64,
|
|||||||
}
|
}
|
||||||
if err != nil {
|
if err != nil {
|
||||||
glog.V(0).Infof("visit needle error: %v", err)
|
glog.V(0).Infof("visit needle error: %v", err)
|
||||||
|
return fmt.Errorf("visit needle error: %v", err)
|
||||||
}
|
}
|
||||||
offset += NeedleHeaderSize + rest
|
offset += NeedleHeaderSize + rest
|
||||||
glog.V(4).Infof("==> new entry offset %d", offset)
|
glog.V(4).Infof("==> new entry offset %d", offset)
|
||||||
|
@ -53,7 +53,7 @@ func (v *Volume) CommitCompact() error {
|
|||||||
glog.V(0).Infof("fail to close volume %d", v.Id)
|
glog.V(0).Infof("fail to close volume %d", v.Id)
|
||||||
}
|
}
|
||||||
v.dataFile = nil
|
v.dataFile = nil
|
||||||
stats.VolumeServerVolumeCounter.WithLabelValues(v.Collection, "volume").Inc()
|
stats.VolumeServerVolumeCounter.WithLabelValues(v.Collection, "volume").Dec()
|
||||||
|
|
||||||
var e error
|
var e error
|
||||||
if e = v.makeupDiff(v.FileName()+".cpd", v.FileName()+".cpx", v.FileName()+".dat", v.FileName()+".idx"); e != nil {
|
if e = v.makeupDiff(v.FileName()+".cpd", v.FileName()+".cpx", v.FileName()+".dat", v.FileName()+".idx"); e != nil {
|
||||||
|
@ -25,7 +25,7 @@ func ReplicatedWrite(masterNode string, s *storage.Store,
|
|||||||
//check JWT
|
//check JWT
|
||||||
jwt := security.GetJwt(r)
|
jwt := security.GetJwt(r)
|
||||||
|
|
||||||
size, isUnchanged, err = s.Write(volumeId, n)
|
size, isUnchanged, err = s.WriteVolumeNeedle(volumeId, n)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
err = fmt.Errorf("failed to write to local disk: %v", err)
|
err = fmt.Errorf("failed to write to local disk: %v", err)
|
||||||
return
|
return
|
||||||
@ -89,7 +89,7 @@ func ReplicatedDelete(masterNode string, store *storage.Store,
|
|||||||
//check JWT
|
//check JWT
|
||||||
jwt := security.GetJwt(r)
|
jwt := security.GetJwt(r)
|
||||||
|
|
||||||
ret, err := store.Delete(volumeId, n)
|
ret, err := store.DeleteVolumeNeedle(volumeId, n)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
glog.V(0).Infoln("delete error:", err)
|
glog.V(0).Infoln("delete error:", err)
|
||||||
return ret, err
|
return ret, err
|
||||||
|
Loading…
Reference in New Issue
Block a user