syntax = "proto3"; package messaging_pb; import "schema.proto"; option go_package = "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"; option java_package = "seaweedfs.mq"; option java_outer_classname = "MessageQueueProto"; ////////////////////////////////////////////////// service SeaweedMessaging { // control plane rpc FindBrokerLeader (FindBrokerLeaderRequest) returns (FindBrokerLeaderResponse) { } // control plane for balancer rpc PublisherToPubBalancer (stream PublisherToPubBalancerRequest) returns (stream PublisherToPubBalancerResponse) { } rpc BalanceTopics (BalanceTopicsRequest) returns (BalanceTopicsResponse) { } // control plane for topic partitions rpc ListTopics (ListTopicsRequest) returns (ListTopicsResponse) { } rpc ConfigureTopic (ConfigureTopicRequest) returns (ConfigureTopicResponse) { } rpc LookupTopicBrokers (LookupTopicBrokersRequest) returns (LookupTopicBrokersResponse) { } // invoked by the balancer, running on each broker rpc AssignTopicPartitions (AssignTopicPartitionsRequest) returns (AssignTopicPartitionsResponse) { } rpc ClosePublishers(ClosePublishersRequest) returns (ClosePublishersResponse) { } rpc CloseSubscribers(CloseSubscribersRequest) returns (CloseSubscribersResponse) { } // subscriber connects to broker balancer, which coordinates with the subscribers rpc SubscriberToSubCoordinator (stream SubscriberToSubCoordinatorRequest) returns (stream SubscriberToSubCoordinatorResponse) { } // data plane for each topic partition rpc PublishMessage (stream PublishMessageRequest) returns (stream PublishMessageResponse) { } rpc SubscribeMessage (stream SubscribeMessageRequest) returns (stream SubscribeMessageResponse) { } // The lead broker asks a follower broker to follow itself rpc PublishFollowMe (stream PublishFollowMeRequest) returns (stream PublishFollowMeResponse) { } rpc SubscribeFollowMe (stream SubscribeFollowMeRequest) returns (SubscribeFollowMeResponse) { } } ////////////////////////////////////////////////// message FindBrokerLeaderRequest { string filer_group = 1; } message FindBrokerLeaderResponse { string broker = 1; } message Topic { string namespace = 1; string name = 2; } message Partition { int32 ring_size = 1; int32 range_start = 2; int32 range_stop = 3; int64 unix_time_ns = 4; } message Offset { Topic topic = 1; repeated PartitionOffset partition_offsets = 2; } enum PartitionOffsetStartType { EARLIEST = 0; EARLIEST_IN_MEMORY = 1; LATEST = 2; } message PartitionOffset { Partition partition = 1; int64 start_ts_ns = 2; int64 stop_ts_ns = 3; PartitionOffsetStartType start_type = 4; } ////////////////////////////////////////////////// message BrokerStats { int32 cpu_usage_percent = 1; map stats = 2; } message TopicPartitionStats { Topic topic = 1; Partition partition = 2; int32 publisher_count = 3; int32 subscriber_count = 4; string follower = 5; } message PublisherToPubBalancerRequest { message InitMessage { string broker = 1; } oneof message { InitMessage init = 1; BrokerStats stats = 2; } } message PublisherToPubBalancerResponse { } message BalanceTopicsRequest { } message BalanceTopicsResponse { } ////////////////////////////////////////////////// message ConfigureTopicRequest { Topic topic = 1; int32 partition_count = 2; schema_pb.RecordType record_type = 3; } message ConfigureTopicResponse { repeated BrokerPartitionAssignment broker_partition_assignments = 2; schema_pb.RecordType record_type = 3; } message ListTopicsRequest { } message ListTopicsResponse { repeated Topic topics = 1; } message LookupTopicBrokersRequest { Topic topic = 1; } message LookupTopicBrokersResponse { Topic topic = 1; repeated BrokerPartitionAssignment broker_partition_assignments = 2; } message BrokerPartitionAssignment { Partition partition = 1; string leader_broker = 2; string follower_broker = 3; } message AssignTopicPartitionsRequest { Topic topic = 1; repeated BrokerPartitionAssignment broker_partition_assignments = 2; bool is_leader = 3; bool is_draining = 4; } message AssignTopicPartitionsResponse { } message SubscriberToSubCoordinatorRequest { message InitMessage { string consumer_group = 1; string consumer_group_instance_id = 2; Topic topic = 3; // The consumer group instance will be assigned at most max_partition_count partitions. // If the number of partitions is less than the sum of max_partition_count, // the consumer group instance may be assigned partitions less than max_partition_count. // Default is 1. int32 max_partition_count = 4; // If consumer group instance changes, wait for rebalance_seconds before reassigning partitions // Exception: if adding a new consumer group instance and sum of max_partition_count equals the number of partitions, // the rebalance will happen immediately. // Default is 10 seconds. int32 rebalance_seconds = 5; } oneof message { InitMessage init = 1; } } message SubscriberToSubCoordinatorResponse { message Assignment { int64 generation = 1; repeated BrokerPartitionAssignment partition_assignments = 2; } oneof message { Assignment assignment = 1; } } ////////////////////////////////////////////////// message ControlMessage { bool is_close = 1; } message DataMessage { bytes key = 1; bytes value = 2; int64 ts_ns = 3; ControlMessage ctrl = 4; } message PublishMessageRequest { message InitMessage { Topic topic = 1; Partition partition = 2; int32 ack_interval = 3; string follower_broker = 4; string publisher_name = 5; // for debugging } oneof message { InitMessage init = 1; DataMessage data = 2; } int64 sequence = 3; } message PublishMessageResponse { int64 ack_sequence = 1; string error = 2; bool should_close = 3; } message PublishFollowMeRequest { message InitMessage { Topic topic = 1; Partition partition = 2; } message FlushMessage { int64 ts_ns = 1; } message CloseMessage { } oneof message { InitMessage init = 1; DataMessage data = 2; FlushMessage flush = 3; CloseMessage close = 4; } } message PublishFollowMeResponse { int64 ack_ts_ns = 1; } message SubscribeMessageRequest { message InitMessage { string consumer_group = 1; string consumer_id = 2; string client_id = 3; Topic topic = 4; PartitionOffset partition_offset = 5; string filter = 6; string follower_broker = 7; int32 concurrency = 8; } message AckMessage { int64 sequence = 1; bytes key = 2; } oneof message { InitMessage init = 1; AckMessage ack = 2; } } message SubscribeMessageResponse { message SubscribeCtrlMessage { string error = 1; bool is_end_of_stream = 2; bool is_end_of_topic = 3; } oneof message { SubscribeCtrlMessage ctrl = 1; DataMessage data = 2; } } message SubscribeFollowMeRequest { message InitMessage { Topic topic = 1; Partition partition = 2; string consumer_group = 3; } message AckMessage { int64 ts_ns = 1; } message CloseMessage { } oneof message { InitMessage init = 1; AckMessage ack = 2; CloseMessage close = 3; } } message SubscribeFollowMeResponse { int64 ack_ts_ns = 1; } message ClosePublishersRequest { Topic topic = 1; int64 unix_time_ns = 2; } message ClosePublishersResponse { } message CloseSubscribersRequest { Topic topic = 1; int64 unix_time_ns = 2; } message CloseSubscribersResponse { }