From 9357e8387d0e11e9b311a4a5619b3e3127ae7b67 Mon Sep 17 00:00:00 2001 From: merdan Date: Tue, 8 Oct 2024 17:09:45 +0500 Subject: [PATCH] fix reconnection logic --- pkg/mq/rabbitmq.go | 53 ++++++++++++++++++++++++++++++++-------------- 1 file changed, 37 insertions(+), 16 deletions(-) diff --git a/pkg/mq/rabbitmq.go b/pkg/mq/rabbitmq.go index 357bbb6..282f6d5 100644 --- a/pkg/mq/rabbitmq.go +++ b/pkg/mq/rabbitmq.go @@ -137,26 +137,47 @@ func (r *rabbitMQ) Consume(messageHandler func(ctx context.Context, body []byte) } // Start message processing ctx, cancel := context.WithCancel(context.Background()) - defer cancel() + // Create a channel to signal when message processing is done + done := make(chan bool) - for { - select { - case msg, ok := <-msgs: - if !ok { - log.Println("Message channel closed, reconnecting...") - r.channel.Close() - r.conn.Close() - time.Sleep(5 * time.Second) - return // Break inner loop to reconnect + go func() { + defer close(done) + for { + select { + case msg, ok := <-msgs: + if !ok { + log.Println("Message channel closed, reconnecting...") + return + } + messageHandler(ctx, msg.Body) + case <-ctx.Done(): + log.Println("Context canceled, shutting down...") + return } - messageHandler(ctx, msg.Body) - case <-ctx.Done(): - log.Println("Context canceled, shutting down...") - r.channel.Close() - r.conn.Close() - return } + }() + + // Wait for either the done signal or a connection error + select { + case <-done: + // Message processing stopped, attempt to reconnect + case <-r.conn.NotifyClose(make(chan *amqp.Error)): + // Connection was closed, attempt to reconnect } + + cancel() // Cancel the context to stop message processing + r.cleanup() + log.Println("Reconnecting in 5 seconds...") + time.Sleep(5 * time.Second) } } + +func (r *rabbitMQ) cleanup() { + if r.channel != nil { + r.channel.Close() + } + if r.conn != nil { + r.conn.Close() + } +}