From 90bf09c1b50929be9bb98371e6d209d9c614fe08 Mon Sep 17 00:00:00 2001 From: merdan Date: Mon, 16 Sep 2024 12:07:05 +0500 Subject: [PATCH] fixed reconnection logic --- cmd/consumer/cs.go | 31 ++++---- pkg/mq/rabbitmq.go | 173 +++++++++++++++++++++++++++++---------------- 2 files changed, 123 insertions(+), 81 deletions(-) diff --git a/cmd/consumer/cs.go b/cmd/consumer/cs.go index 0409dcd..248bcfd 100644 --- a/cmd/consumer/cs.go +++ b/cmd/consumer/cs.go @@ -2,15 +2,13 @@ package main import ( "context" - "fmt" "log" "os" "os/signal" - "syscall" - "smpp-transmitter/pkg/data" fl "smpp-transmitter/pkg/logger" "smpp-transmitter/pkg/mq" + "syscall" "github.com/joho/godotenv" "gorm.io/driver/mysql" @@ -25,41 +23,35 @@ func main() { } // Load environment variables - rabbitMQURL := os.Getenv("RABBITMQ_URL") mysqlDSN := os.Getenv("MYSQL_DSN") - queueName := os.Getenv("QUEUE_NAME") - exchangeName := os.Getenv("EXCHABGE_NAME") appEnv := os.Getenv("APP_ENV") - routeKey := os.Getenv("ROUTE_KEY") - if rabbitMQURL == "" || mysqlDSN == "" || queueName == "" || exchangeName == "" || routeKey == "" { - log.Fatal("RABBITMQ_URL, MYSQL_DSN, and QUEUE_NAME environment variables must be set") + if mysqlDSN == "" { + log.Fatal("MYSQL_DSN environment variable must be set") } Log, err := fl.SetupLogger(appEnv) if err != nil { log.Fatalf("Failed to set up logger: %v", err) } - fmt.Printf("DB: %s", mysqlDSN) + // Initialize MySQL database connection db, err := gorm.Open(mysql.Open(mysqlDSN), &gorm.Config{ Logger: logger.Default.LogMode(logger.Info), }) - if err != nil { Log.Error("Failed to connect to MySQL:", err) - log.Fatalf("Failed to connect to MySQL: %v", err) + panic("Failed to connect to MySQL") } //initilize rabbitmq - messageQueue := mq.NewRabbitMQ(rabbitMQURL, exchangeName, queueName, routeKey) - - // Start message processing - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() + messageQueue, err := mq.NewRabbitMQ() + if err != nil { + log.Fatalf("Failed to set up Message Broker: %v", err) + } // Run RabbitMQ consumer - go messageQueue.Consume(ctx, func(ctx context.Context, body []byte) { + go messageQueue.Consume(func(ctx context.Context, body []byte) { message := &data.Message{} Log.Info("Received a raw message", "body", string(body)) @@ -74,7 +66,7 @@ func main() { return } - fmt.Printf("Message handled %v", string(body)) + log.Printf("Message handled %v", string(body)) }) @@ -84,4 +76,5 @@ func main() { <-sigs log.Println("Shutting down...") Log.Info("Shutting down...") + os.Exit(0) } diff --git a/pkg/mq/rabbitmq.go b/pkg/mq/rabbitmq.go index cd928df..368e671 100644 --- a/pkg/mq/rabbitmq.go +++ b/pkg/mq/rabbitmq.go @@ -2,14 +2,16 @@ package mq import ( "context" + "fmt" "log" + "os" "time" "github.com/streadway/amqp" ) type MessageQueue interface { - Consume(ctx context.Context, processMessage func(ctx context.Context, body []byte)) + Consume(processMessage func(ctx context.Context, body []byte)) } type rabbitMQ struct { @@ -17,93 +19,140 @@ type rabbitMQ struct { exchangeName string queueName string routingKey string + conn *amqp.Connection + channel *amqp.Channel } -func NewRabbitMQ(rabbitMQURL, exchangeName, queueName, routingKey string) MessageQueue { +func NewRabbitMQ() (MessageQueue, error) { + rabbitMQURL := os.Getenv("RABBITMQ_URL") + routeKey := os.Getenv("ROUTE_KEY") + queueName := os.Getenv("QUEUE_NAME") + exchangeName := os.Getenv("EXCHABGE_NAME") + + if rabbitMQURL == "" || queueName == "" || exchangeName == "" || routeKey == "" { + return nil, fmt.Errorf("RABBITMQ_URL, ROUTE_KEY, EXCHABGE_NAME and QUEUE_NAME environment variables must be set") + } + return &rabbitMQ{ rabbitMQURL: rabbitMQURL, exchangeName: exchangeName, queueName: queueName, - routingKey: routingKey, - } + routingKey: routeKey, + }, nil } -func (r *rabbitMQ) Consume(ctx context.Context, messageHandler func(ctx context.Context, body []byte)) { +func (r *rabbitMQ) connect() error { + var err error + r.conn, err = amqp.Dial(r.rabbitMQURL) + if err != nil { + log.Printf("Failed to connect to RabbitMQ: %v", err) + return err + } + + // Open a channel + r.channel, err = r.conn.Channel() + if err != nil { + r.conn.Close() + log.Printf("Failed to open a channel: %v", err) + return err + } + + // Declare exchange + err = r.channel.ExchangeDeclare( + r.exchangeName, + "direct", + true, + false, + false, + false, + nil, + ) + if err != nil { + r.channel.Close() + r.conn.Close() + log.Printf("Failed to declare exchange: %v", err) + return err + } + + // Declare queue + _, err = r.channel.QueueDeclare( + r.queueName, + true, + false, + false, + false, + nil, + ) + if err != nil { + r.channel.Close() + r.conn.Close() + log.Printf("Failed to declare a queue: %v", err) + return err + } + + // Bind queue + err = r.channel.QueueBind( + r.queueName, + r.routingKey, + r.exchangeName, + false, + nil, + ) + if err != nil { + r.channel.Close() + r.conn.Close() + log.Printf("Failed to bind queue: %v", err) + return err + } + + return nil +} + +func (r *rabbitMQ) Consume(messageHandler func(ctx context.Context, body []byte)) { + for { - conn, err := amqp.Dial(r.rabbitMQURL) - if err != nil { - log.Printf("Failed to connect to RabbitMQ: %v", err) - time.Sleep(5 * time.Second) // Wait before retrying - continue - } - defer conn.Close() - - ch, err := conn.Channel() - if err != nil { - log.Printf("Failed to open a channel: %v", err) - time.Sleep(5 * time.Second) // Wait before retrying - continue - } - defer ch.Close() - - // Declare the exchange - err = ch.ExchangeDeclare( - r.exchangeName, // name of the exchange - "direct", // type of exchange - true, // durable - false, // auto-delete - false, // internal - false, // no-wait - nil, // arguments - ) - if err != nil { - log.Printf("Failed to declare exchange: %v", err) + // Establish initial connection and setup + if err := r.connect(); err != nil { + log.Printf("Error setting up RabbitMQ: %v. Retrying in 5 seconds...", err) time.Sleep(5 * time.Second) // Wait before retrying continue } - q, err := ch.QueueDeclare( - r.queueName, // name - true, // durable - false, // delete when unused + // Consume messages + msgs, err := r.channel.Consume( + r.queueName, // queue + "", // consumer + true, // auto-ack false, // exclusive + false, // no-local false, // no-wait - nil, // arguments - ) - if err != nil { - log.Printf("Failed to declare a queue: %v", err) - time.Sleep(5 * time.Second) // Wait before retrying - continue - } - err = ch.QueueBind( - r.queueName, - r.routingKey, - r.exchangeName, - false, - nil, - ) - - msgs, err := ch.Consume( - q.Name, // queue - "", // consumer - true, // auto-ack - false, // exclusive - false, // no-local - false, // no-wait - nil, // args + nil, // args ) if err != nil { log.Printf("Failed to register a consumer: %v", err) + r.channel.Close() + r.conn.Close() time.Sleep(5 * time.Second) // Wait before retrying continue } + // Start message processing + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() for { select { - case msg := <-msgs: + case msg, ok := <-msgs: + if !ok { + log.Println("Message channel closed, reconnecting...") + r.channel.Close() + r.conn.Close() + break // Break inner loop to reconnect + } messageHandler(ctx, msg.Body) case <-ctx.Done(): - log.Println("Consumer stopped.") + log.Println("Context canceled, shutting down...") + r.channel.Close() + r.conn.Close() return } }