From 8ed490164e117f7bbf5a1fd59c867b1c820dbae2 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Tue, 28 Apr 2020 02:05:44 -0700 Subject: [PATCH] refactoring --- .../broker/broker_grpc_server_subscribe.go | 16 +++++++++------- 1 file changed, 9 insertions(+), 7 deletions(-) diff --git a/weed/messaging/broker/broker_grpc_server_subscribe.go b/weed/messaging/broker/broker_grpc_server_subscribe.go index c358eccf6..290c84e34 100644 --- a/weed/messaging/broker/broker_grpc_server_subscribe.go +++ b/weed/messaging/broker/broker_grpc_server_subscribe.go @@ -70,12 +70,7 @@ func (broker *MessageBroker) Subscribe(stream messaging_pb.SeaweedMessaging_Subs return err } - messageCount, err = lock.logBuffer.LoopProcessLogData(lastReadTime, func() bool { - lock.Mutex.Lock() - lock.cond.Wait() - lock.Mutex.Unlock() - return true - }, func(logEntry *filer_pb.LogEntry) error { + eachLogEntryFn := func(logEntry *filer_pb.LogEntry) error { m := &messaging_pb.Message{} if err = proto.Unmarshal(logEntry.Data, m); err != nil { glog.Errorf("unexpected unmarshal messaging_pb.Message: %v", err) @@ -87,7 +82,14 @@ func (broker *MessageBroker) Subscribe(stream messaging_pb.SeaweedMessaging_Subs return err } return nil - }) + } + + messageCount, err = lock.logBuffer.LoopProcessLogData(lastReadTime, func() bool { + lock.Mutex.Lock() + lock.cond.Wait() + lock.Mutex.Unlock() + return true + }, eachLogEntryFn) return err