From 0cfed8c3cb8cd4e9e35310a0ce7621945e10ef70 Mon Sep 17 00:00:00 2001 From: Konstantin Lebedev Date: Wed, 10 Feb 2021 15:29:05 +0500 Subject: [PATCH] restart replication if RabbitMQ connection closed https://github.com/google/go-cloud/issues/2958 --- docker/{ => compose}/notification.toml | 0 docker/{ => compose}/replication.toml | 0 weed/replication/sub/notification_gocdk_pub_sub.go | 4 ++++ 3 files changed, 4 insertions(+) rename docker/{ => compose}/notification.toml (100%) rename docker/{ => compose}/replication.toml (100%) diff --git a/docker/notification.toml b/docker/compose/notification.toml similarity index 100% rename from docker/notification.toml rename to docker/compose/notification.toml diff --git a/docker/replication.toml b/docker/compose/replication.toml similarity index 100% rename from docker/replication.toml rename to docker/compose/replication.toml diff --git a/weed/replication/sub/notification_gocdk_pub_sub.go b/weed/replication/sub/notification_gocdk_pub_sub.go index 413c0e3cf..1d7fe520b 100644 --- a/weed/replication/sub/notification_gocdk_pub_sub.go +++ b/weed/replication/sub/notification_gocdk_pub_sub.go @@ -113,6 +113,10 @@ func (k *GoCDKPubSubInput) Initialize(configuration util.Configuration, prefix s func (k *GoCDKPubSubInput) ReceiveMessage() (key string, message *filer_pb.EventNotification, onSuccessFn func(), onFailureFn func(), err error) { msg, err := k.sub.Receive(context.Background()) if err != nil { + var conn *amqp.Connection + if k.sub.As(&conn) && conn.IsClosed() { + glog.Fatalln(err) + } return } onFailureFn = func() {