seaweedfs/weed/mq/client/cmd/weed_pub/publisher.go

59 lines
1.3 KiB
Go
Raw Normal View History

2023-08-29 00:02:12 +08:00
package main
import (
2023-09-05 12:43:30 +08:00
"flag"
2023-08-29 00:02:12 +08:00
"fmt"
"github.com/seaweedfs/seaweedfs/weed/mq/client/pub_client"
2023-09-05 12:43:30 +08:00
"log"
"sync"
"time"
2023-08-29 00:02:12 +08:00
)
2023-09-05 12:43:30 +08:00
var (
messageCount = flag.Int("n", 1000, "message count")
concurrency = flag.Int("c", 4, "concurrency count")
)
func doPublish(publisher *pub_client.TopicPublisher, id int) {
startTime := time.Now()
for i := 0; i < *messageCount / *concurrency; i++ {
// Simulate publishing a message
key := []byte(fmt.Sprintf("key-%d-%d", id, i))
value := []byte(fmt.Sprintf("value-%d-%d", id, i))
publisher.Publish(key, value) // Call your publisher function here
// println("Published", string(key), string(value))
}
elapsed := time.Since(startTime)
log.Printf("Publisher %d finished in %s", id, elapsed)
}
2023-08-29 00:02:12 +08:00
2023-09-05 12:43:30 +08:00
func main() {
flag.Parse()
2023-08-29 00:02:12 +08:00
publisher := pub_client.NewTopicPublisher(
"test", "test")
if err := publisher.Connect("localhost:17777"); err != nil {
fmt.Println(err)
return
}
2023-09-05 12:43:30 +08:00
startTime := time.Now()
// Start multiple publishers
var wg sync.WaitGroup
for i := 0; i < *concurrency; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
doPublish(publisher, id)
}(i)
2023-08-29 00:02:12 +08:00
}
2023-09-05 12:43:30 +08:00
// Wait for all publishers to finish
wg.Wait()
elapsed := time.Since(startTime)
2023-09-08 14:55:19 +08:00
publisher.Shutdown()
2023-09-05 12:43:30 +08:00
log.Printf("Published %d messages in %s (%.2f msg/s)", *messageCount, elapsed, float64(*messageCount)/elapsed.Seconds())
2023-08-29 00:02:12 +08:00
}