package mq import ( "context" "fmt" "log" "os" "time" "github.com/streadway/amqp" ) type MessageQueue interface { Consume(processMessage func(ctx context.Context, body []byte)) } type rabbitMQ struct { rabbitMQURL string exchangeName string queueName string routingKey string conn *amqp.Connection channel *amqp.Channel } func NewRabbitMQ() (MessageQueue, error) { rabbitMQURL := os.Getenv("RABBITMQ_URL") routeKey := os.Getenv("ROUTE_KEY") queueName := os.Getenv("QUEUE_NAME") exchangeName := os.Getenv("EXCHABGE_NAME") if rabbitMQURL == "" || queueName == "" || exchangeName == "" || routeKey == "" { return nil, fmt.Errorf("RABBITMQ_URL, ROUTE_KEY, EXCHABGE_NAME and QUEUE_NAME environment variables must be set") } return &rabbitMQ{ rabbitMQURL: rabbitMQURL, exchangeName: exchangeName, queueName: queueName, routingKey: routeKey, }, nil } func (r *rabbitMQ) connect() error { var err error r.conn, err = amqp.Dial(r.rabbitMQURL) if err != nil { log.Printf("Failed to connect to RabbitMQ: %v", err) return err } // Open a channel r.channel, err = r.conn.Channel() if err != nil { r.conn.Close() log.Printf("Failed to open a channel: %v", err) return err } // Declare exchange err = r.channel.ExchangeDeclare( r.exchangeName, "direct", true, false, false, false, nil, ) if err != nil { r.channel.Close() r.conn.Close() log.Printf("Failed to declare exchange: %v", err) return err } // Declare queue _, err = r.channel.QueueDeclare( r.queueName, true, false, false, false, nil, ) if err != nil { r.channel.Close() r.conn.Close() log.Printf("Failed to declare a queue: %v", err) return err } // Bind queue err = r.channel.QueueBind( r.queueName, r.routingKey, r.exchangeName, false, nil, ) if err != nil { r.channel.Close() r.conn.Close() log.Printf("Failed to bind queue: %v", err) return err } return nil } func (r *rabbitMQ) Consume(messageHandler func(ctx context.Context, body []byte)) { for { // Establish initial connection and setup if err := r.connect(); err != nil { log.Printf("Error setting up RabbitMQ: %v. Retrying in 5 seconds...", err) time.Sleep(5 * time.Second) // Wait before retrying continue } // Consume messages msgs, err := r.channel.Consume( r.queueName, // queue "", // consumer true, // auto-ack false, // exclusive false, // no-local false, // no-wait nil, // args ) if err != nil { log.Printf("Failed to register a consumer: %v", err) r.channel.Close() r.conn.Close() time.Sleep(5 * time.Second) // Wait before retrying continue } // Start message processing ctx, cancel := context.WithCancel(context.Background()) defer cancel() for { select { case msg, ok := <-msgs: if !ok { log.Println("Message channel closed, reconnecting...") r.channel.Close() r.conn.Close() break // Break inner loop to reconnect } messageHandler(ctx, msg.Body) case <-ctx.Done(): log.Println("Context canceled, shutting down...") r.channel.Close() r.conn.Close() return } } } }