184 lines
3.7 KiB
Go
184 lines
3.7 KiB
Go
package mq
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"log"
|
|
"os"
|
|
"time"
|
|
|
|
"github.com/streadway/amqp"
|
|
)
|
|
|
|
type MessageQueue interface {
|
|
Consume(processMessage func(ctx context.Context, body []byte))
|
|
}
|
|
|
|
type rabbitMQ struct {
|
|
rabbitMQURL string
|
|
exchangeName string
|
|
queueName string
|
|
routingKey string
|
|
conn *amqp.Connection
|
|
channel *amqp.Channel
|
|
}
|
|
|
|
func NewRabbitMQ() (MessageQueue, error) {
|
|
rabbitMQURL := os.Getenv("RABBITMQ_URL")
|
|
routeKey := os.Getenv("ROUTE_KEY")
|
|
queueName := os.Getenv("QUEUE_NAME")
|
|
exchangeName := os.Getenv("EXCHABGE_NAME")
|
|
|
|
if rabbitMQURL == "" || queueName == "" || exchangeName == "" || routeKey == "" {
|
|
return nil, fmt.Errorf("RABBITMQ_URL, ROUTE_KEY, EXCHABGE_NAME and QUEUE_NAME environment variables must be set")
|
|
}
|
|
|
|
return &rabbitMQ{
|
|
rabbitMQURL: rabbitMQURL,
|
|
exchangeName: exchangeName,
|
|
queueName: queueName,
|
|
routingKey: routeKey,
|
|
}, nil
|
|
}
|
|
|
|
func (r *rabbitMQ) connect() error {
|
|
var err error
|
|
r.conn, err = amqp.Dial(r.rabbitMQURL)
|
|
if err != nil {
|
|
log.Printf("Failed to connect to RabbitMQ: %v", err)
|
|
return err
|
|
}
|
|
|
|
// Open a channel
|
|
r.channel, err = r.conn.Channel()
|
|
if err != nil {
|
|
r.conn.Close()
|
|
log.Printf("Failed to open a channel: %v", err)
|
|
return err
|
|
}
|
|
|
|
// Declare exchange
|
|
err = r.channel.ExchangeDeclare(
|
|
r.exchangeName,
|
|
"direct",
|
|
true,
|
|
false,
|
|
false,
|
|
false,
|
|
nil,
|
|
)
|
|
if err != nil {
|
|
r.channel.Close()
|
|
r.conn.Close()
|
|
log.Printf("Failed to declare exchange: %v", err)
|
|
return err
|
|
}
|
|
|
|
// Declare queue
|
|
_, err = r.channel.QueueDeclare(
|
|
r.queueName,
|
|
true,
|
|
false,
|
|
false,
|
|
false,
|
|
nil,
|
|
)
|
|
if err != nil {
|
|
r.channel.Close()
|
|
r.conn.Close()
|
|
log.Printf("Failed to declare a queue: %v", err)
|
|
return err
|
|
}
|
|
|
|
// Bind queue
|
|
err = r.channel.QueueBind(
|
|
r.queueName,
|
|
r.routingKey,
|
|
r.exchangeName,
|
|
false,
|
|
nil,
|
|
)
|
|
if err != nil {
|
|
r.channel.Close()
|
|
r.conn.Close()
|
|
log.Printf("Failed to bind queue: %v", err)
|
|
return err
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (r *rabbitMQ) Consume(messageHandler func(ctx context.Context, body []byte)) {
|
|
|
|
for {
|
|
// Establish initial connection and setup
|
|
if err := r.connect(); err != nil {
|
|
log.Printf("Error setting up RabbitMQ: %v. Retrying in 5 seconds...", err)
|
|
time.Sleep(5 * time.Second) // Wait before retrying
|
|
continue
|
|
}
|
|
|
|
// Consume messages
|
|
msgs, err := r.channel.Consume(
|
|
r.queueName, // queue
|
|
"", // consumer
|
|
true, // auto-ack
|
|
false, // exclusive
|
|
false, // no-local
|
|
false, // no-wait
|
|
nil, // args
|
|
)
|
|
if err != nil {
|
|
log.Printf("Failed to register a consumer: %v", err)
|
|
r.channel.Close()
|
|
r.conn.Close()
|
|
time.Sleep(5 * time.Second) // Wait before retrying
|
|
continue
|
|
}
|
|
// Start message processing
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
// Create a channel to signal when message processing is done
|
|
done := make(chan bool)
|
|
|
|
go func() {
|
|
defer close(done)
|
|
for {
|
|
select {
|
|
case msg, ok := <-msgs:
|
|
if !ok {
|
|
log.Println("Message channel closed, reconnecting...")
|
|
return
|
|
}
|
|
messageHandler(ctx, msg.Body)
|
|
case <-ctx.Done():
|
|
log.Println("Context canceled, shutting down...")
|
|
return
|
|
}
|
|
}
|
|
}()
|
|
|
|
// Wait for either the done signal or a connection error
|
|
select {
|
|
case <-done:
|
|
// Message processing stopped, attempt to reconnect
|
|
case <-r.conn.NotifyClose(make(chan *amqp.Error)):
|
|
// Connection was closed, attempt to reconnect
|
|
}
|
|
|
|
cancel() // Cancel the context to stop message processing
|
|
r.cleanup()
|
|
log.Println("Reconnecting in 5 seconds...")
|
|
time.Sleep(5 * time.Second)
|
|
}
|
|
|
|
}
|
|
|
|
func (r *rabbitMQ) cleanup() {
|
|
if r.channel != nil {
|
|
r.channel.Close()
|
|
}
|
|
if r.conn != nil {
|
|
r.conn.Close()
|
|
}
|
|
}
|