seaweedfs/weed/command/mq_broker.go

98 lines
3.4 KiB
Go
Raw Normal View History

2020-03-04 16:39:47 +08:00
package command
import (
"google.golang.org/grpc/reflection"
"github.com/seaweedfs/seaweedfs/weed/util/grace"
2020-05-05 17:05:28 +08:00
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/mq/broker"
"github.com/seaweedfs/seaweedfs/weed/pb"
"github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
"github.com/seaweedfs/seaweedfs/weed/security"
"github.com/seaweedfs/seaweedfs/weed/util"
2020-03-04 16:39:47 +08:00
)
var (
2022-07-02 12:59:50 +08:00
mqBrokerStandaloneOptions MessageQueueBrokerOptions
2020-03-04 16:39:47 +08:00
)
2022-07-02 12:59:50 +08:00
type MessageQueueBrokerOptions struct {
2022-07-11 03:11:37 +08:00
masters map[string]pb.ServerAddress
mastersString *string
filerGroup *string
ip *string
port *int
dataCenter *string
rack *string
cpuprofile *string
memprofile *string
2020-03-04 16:39:47 +08:00
}
func init() {
2022-07-02 12:59:50 +08:00
cmdMqBroker.Run = runMqBroker // break init cycle
2022-07-11 03:11:37 +08:00
mqBrokerStandaloneOptions.mastersString = cmdMqBroker.Flag.String("master", "localhost:9333", "comma-separated master servers")
2022-07-02 14:34:51 +08:00
mqBrokerStandaloneOptions.filerGroup = cmdMqBroker.Flag.String("filerGroup", "", "share metadata with other filers in the same filerGroup")
2022-07-02 12:59:50 +08:00
mqBrokerStandaloneOptions.ip = cmdMqBroker.Flag.String("ip", util.DetectedHostAddress(), "broker host address")
mqBrokerStandaloneOptions.port = cmdMqBroker.Flag.Int("port", 17777, "broker gRPC listen port")
2022-07-03 00:28:24 +08:00
mqBrokerStandaloneOptions.dataCenter = cmdMqBroker.Flag.String("dataCenter", "", "prefer to read and write to volumes in this data center")
mqBrokerStandaloneOptions.rack = cmdMqBroker.Flag.String("rack", "", "prefer to write to volumes in this rack")
2022-07-02 12:59:50 +08:00
mqBrokerStandaloneOptions.cpuprofile = cmdMqBroker.Flag.String("cpuprofile", "", "cpu profile output file")
mqBrokerStandaloneOptions.memprofile = cmdMqBroker.Flag.String("memprofile", "", "memory profile output file")
2020-03-04 16:39:47 +08:00
}
2022-07-02 12:59:50 +08:00
var cmdMqBroker = &Command{
2022-07-11 03:11:37 +08:00
UsageLine: "mq.broker [-port=17777] [-master=<ip:port>]",
2022-07-31 00:36:27 +08:00
Short: "<WIP> start a message queue broker",
2020-03-04 16:39:47 +08:00
Long: `start a message queue broker
The broker can accept gRPC calls to write or read messages. The messages are stored via filer.
The brokers are stateless. To scale up, just add more brokers.
`,
}
2022-07-02 12:59:50 +08:00
func runMqBroker(cmd *Command, args []string) bool {
2020-03-04 16:39:47 +08:00
2024-07-17 00:15:55 +08:00
util.LoadSecurityConfiguration()
2020-03-04 16:39:47 +08:00
2022-07-11 03:11:37 +08:00
mqBrokerStandaloneOptions.masters = pb.ServerAddresses(*mqBrokerStandaloneOptions.mastersString).ToAddressMap()
2022-07-02 12:59:50 +08:00
return mqBrokerStandaloneOptions.startQueueServer()
2020-03-04 16:39:47 +08:00
}
2022-07-02 12:59:50 +08:00
func (mqBrokerOpt *MessageQueueBrokerOptions) startQueueServer() bool {
2020-03-04 16:39:47 +08:00
2022-07-02 12:59:50 +08:00
grace.SetupProfiling(*mqBrokerStandaloneOptions.cpuprofile, *mqBrokerStandaloneOptions.memprofile)
2020-04-19 18:03:40 +08:00
2020-04-17 17:29:00 +08:00
grpcDialOption := security.LoadClientTLS(util.GetViper(), "grpc.msg_broker")
2020-03-04 16:39:47 +08:00
2022-07-02 14:34:51 +08:00
qs, err := broker.NewMessageBroker(&broker.MessageQueueBrokerOption{
2022-07-11 03:11:37 +08:00
Masters: mqBrokerOpt.masters,
2022-07-02 14:34:51 +08:00
FilerGroup: *mqBrokerOpt.filerGroup,
2022-07-03 15:29:25 +08:00
DataCenter: *mqBrokerOpt.dataCenter,
Rack: *mqBrokerOpt.rack,
2020-03-04 16:39:47 +08:00
DefaultReplication: "",
MaxMB: 0,
2022-07-02 12:59:50 +08:00
Ip: *mqBrokerOpt.ip,
Port: *mqBrokerOpt.port,
2020-04-17 17:29:00 +08:00
}, grpcDialOption)
2023-11-20 20:57:38 +08:00
if err != nil {
glog.Fatalf("failed to create new message broker for queue server: %v", err)
}
2020-03-04 16:39:47 +08:00
// start grpc listener
2022-07-02 12:59:50 +08:00
grpcL, _, err := util.NewIpAndLocalListeners("", *mqBrokerOpt.port, 0)
2020-03-04 16:39:47 +08:00
if err != nil {
2022-07-02 12:59:50 +08:00
glog.Fatalf("failed to listen on grpc port %d: %v", *mqBrokerOpt.port, err)
2020-03-04 16:39:47 +08:00
}
grpcS := pb.NewGrpcServer(security.LoadServerTLS(util.GetViper(), "grpc.msg_broker"))
2022-07-02 13:43:25 +08:00
mq_pb.RegisterSeaweedMessagingServer(grpcS, qs)
2020-03-04 16:39:47 +08:00
reflection.Register(grpcS)
grpcS.Serve(grpcL)
return true
}