diff --git a/weed/mq/client/pub_client/connect.go b/weed/mq/client/pub_client/connect.go index 364d41560..4ff12737f 100644 --- a/weed/mq/client/pub_client/connect.go +++ b/weed/mq/client/pub_client/connect.go @@ -21,12 +21,12 @@ func (p *TopicPublisher) doConnect(partition *mq_pb.Partition, brokerAddress str return publishClient, fmt.Errorf("dial broker %s: %v", brokerAddress, err) } brokerClient := mq_pb.NewSeaweedMessagingClient(grpcConnection) - stream, err := brokerClient.SubscribeMessage(context.Background()) + stream, err := brokerClient.PublishMessage(context.Background()) if err != nil { return publishClient, fmt.Errorf("create publish client: %v", err) } publishClient = &PublishClient{ - SeaweedMessaging_PublishClient: stream, + SeaweedMessaging_PublishMessageClient: stream, Broker: brokerAddress, } if err = publishClient.Send(&mq_pb.PublishMessageRequest{ diff --git a/weed/mq/client/pub_client/publisher.go b/weed/mq/client/pub_client/publisher.go index a0c26db36..d5176f21b 100644 --- a/weed/mq/client/pub_client/publisher.go +++ b/weed/mq/client/pub_client/publisher.go @@ -17,7 +17,7 @@ type PublisherConfiguration struct { } type PublishClient struct { - mq_pb.SeaweedMessaging_PublishClient + mq_pb.SeaweedMessaging_PublishMessageClient Broker string Err error }