From f5a748d33c52a2874dbde92746a03140ef379296 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Thu, 16 Apr 2020 02:55:09 -0700 Subject: [PATCH] refactoring --- weed/command/msg_broker.go | 7 +- weed/messaging/msg_broker_grpc_server.go | 98 +++++++++++++++++++ .../msg_broker_server.go | 2 +- weed/server/msg_broker_grpc_server.go | 23 ----- weed/util/log_buffer/log_buffer.go | 4 +- 5 files changed, 104 insertions(+), 30 deletions(-) create mode 100644 weed/messaging/msg_broker_grpc_server.go rename weed/{server => messaging}/msg_broker_server.go (99%) delete mode 100644 weed/server/msg_broker_grpc_server.go diff --git a/weed/command/msg_broker.go b/weed/command/msg_broker.go index 3cb424298..f77582f03 100644 --- a/weed/command/msg_broker.go +++ b/weed/command/msg_broker.go @@ -8,13 +8,12 @@ import ( "google.golang.org/grpc/reflection" + "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/messaging" "github.com/chrislusf/seaweedfs/weed/pb" "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" "github.com/chrislusf/seaweedfs/weed/pb/messaging_pb" "github.com/chrislusf/seaweedfs/weed/security" - weed_server "github.com/chrislusf/seaweedfs/weed/server" - - "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/util" ) @@ -79,7 +78,7 @@ func (msgBrokerOpt *QueueOptions) startQueueServer() bool { } } - qs, err := weed_server.NewMessageBroker(&weed_server.MessageBrokerOption{ + qs, err := messaging.NewMessageBroker(&messaging.MessageBrokerOption{ Filers: []string{*msgBrokerOpt.filer}, DefaultReplication: "", MaxMB: 0, diff --git a/weed/messaging/msg_broker_grpc_server.go b/weed/messaging/msg_broker_grpc_server.go new file mode 100644 index 000000000..a29bc11b0 --- /dev/null +++ b/weed/messaging/msg_broker_grpc_server.go @@ -0,0 +1,98 @@ +package messaging + +import ( + "context" + "fmt" + "io" + "time" + + "github.com/golang/protobuf/proto" + + "github.com/chrislusf/seaweedfs/weed/filer2" + "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/pb/messaging_pb" + "github.com/chrislusf/seaweedfs/weed/util/log_buffer" +) + +func (broker *MessageBroker) Subscribe(request *messaging_pb.SubscribeRequest, server messaging_pb.SeaweedMessaging_SubscribeServer) error { + panic("implement me") +} + +func (broker *MessageBroker) Publish(stream messaging_pb.SeaweedMessaging_PublishServer) error { + + // process initial request + in, err := stream.Recv() + if err == io.EOF { + return nil + } + if err != nil { + return err + } + namespace, topic, partition := in.Namespace, in.Topic, in.Partition + + updatesChan := make(chan int32) + + go func() { + for update := range updatesChan { + if err := stream.Send(&messaging_pb.PublishResponse{ + PartitionCount: update, + }); err != nil { + glog.V(0).Infof("err sending publish response: %v", err) + return + } + } + }() + + logBuffer := log_buffer.NewLogBuffer(time.Minute, func(startTime, stopTime time.Time, buf []byte) { + + //targetFile := + fmt.Sprintf("%s/%s/%s/%04d-%02d-%02d/%02d-%02d.part%02d", + filer2.TopicsDir, namespace, topic, + startTime.Year(), startTime.Month(), startTime.Day(), startTime.Hour(), startTime.Minute(), + partition, + ) + + /* + if err := f.appendToFile(targetFile, buf); err != nil { + glog.V(0).Infof("log write failed %s: %v", targetFile, err) + } + */ + + }, func() { + // notify subscribers + }) + + for { + in, err := stream.Recv() + if err == io.EOF { + return nil + } + if err != nil { + return err + } + + m := &messaging_pb.Message{ + Timestamp: time.Now().UnixNano(), + Key: in.Key, + Value: in.Value, + Headers: in.Headers, + } + + data, err := proto.Marshal(m) + if err != nil { + glog.Errorf("marshall error: %v\n", err) + continue + } + + logBuffer.AddToBuffer(in.Key, data) + + } +} + +func (broker *MessageBroker) ConfigureTopic(c context.Context, request *messaging_pb.ConfigureTopicRequest) (*messaging_pb.ConfigureTopicResponse, error) { + panic("implement me") +} + +func (broker *MessageBroker) GetTopicConfiguration(c context.Context, request *messaging_pb.GetTopicConfigurationRequest) (*messaging_pb.GetTopicConfigurationResponse, error) { + panic("implement me") +} diff --git a/weed/server/msg_broker_server.go b/weed/messaging/msg_broker_server.go similarity index 99% rename from weed/server/msg_broker_server.go rename to weed/messaging/msg_broker_server.go index a9d908581..9174ca4cf 100644 --- a/weed/server/msg_broker_server.go +++ b/weed/messaging/msg_broker_server.go @@ -1,4 +1,4 @@ -package weed_server +package messaging import ( "context" diff --git a/weed/server/msg_broker_grpc_server.go b/weed/server/msg_broker_grpc_server.go deleted file mode 100644 index 6feaa0c63..000000000 --- a/weed/server/msg_broker_grpc_server.go +++ /dev/null @@ -1,23 +0,0 @@ -package weed_server - -import ( - "context" - - "github.com/chrislusf/seaweedfs/weed/pb/messaging_pb" -) - -func (broker *MessageBroker) Subscribe(request *messaging_pb.SubscribeRequest, server messaging_pb.SeaweedMessaging_SubscribeServer) error { - panic("implement me") -} - -func (broker *MessageBroker) Publish(server messaging_pb.SeaweedMessaging_PublishServer) error { - panic("implement me") -} - -func (broker *MessageBroker) ConfigureTopic(c context.Context, request *messaging_pb.ConfigureTopicRequest) (*messaging_pb.ConfigureTopicResponse, error) { - panic("implement me") -} - -func (broker *MessageBroker) GetTopicConfiguration(c context.Context, request *messaging_pb.GetTopicConfigurationRequest) (*messaging_pb.GetTopicConfigurationResponse, error) { - panic("implement me") -} diff --git a/weed/util/log_buffer/log_buffer.go b/weed/util/log_buffer/log_buffer.go index 04fabdf8c..c7cb90549 100644 --- a/weed/util/log_buffer/log_buffer.go +++ b/weed/util/log_buffer/log_buffer.go @@ -51,7 +51,7 @@ func NewLogBuffer(flushInterval time.Duration, flushFn func(startTime, stopTime return lb } -func (m *LogBuffer) AddToBuffer(key, data []byte) { +func (m *LogBuffer) AddToBuffer(partitionKey, data []byte) { m.Lock() defer func() { @@ -65,7 +65,7 @@ func (m *LogBuffer) AddToBuffer(key, data []byte) { ts := time.Now() logEntry := &filer_pb.LogEntry{ TsNs: ts.UnixNano(), - PartitionKeyHash: util.HashToInt32(key), + PartitionKeyHash: util.HashToInt32(partitionKey), Data: data, }