2024-08-29 10:14:19 +00:00
|
|
|
package mq
|
|
|
|
|
|
|
|
|
|
import (
|
|
|
|
|
"context"
|
|
|
|
|
"log"
|
|
|
|
|
"time"
|
|
|
|
|
|
|
|
|
|
"github.com/streadway/amqp"
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
type MessageQueue interface {
|
|
|
|
|
Consume(ctx context.Context, processMessage func(ctx context.Context, body []byte))
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
type rabbitMQ struct {
|
|
|
|
|
rabbitMQURL string
|
|
|
|
|
exchangeName string
|
|
|
|
|
queueName string
|
|
|
|
|
routingKey string
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func NewRabbitMQ(rabbitMQURL, exchangeName, queueName, routingKey string) MessageQueue {
|
|
|
|
|
return &rabbitMQ{
|
|
|
|
|
rabbitMQURL: rabbitMQURL,
|
|
|
|
|
exchangeName: exchangeName,
|
|
|
|
|
queueName: queueName,
|
|
|
|
|
routingKey: routingKey,
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (r *rabbitMQ) Consume(ctx context.Context, messageHandler func(ctx context.Context, body []byte)) {
|
|
|
|
|
for {
|
|
|
|
|
conn, err := amqp.Dial(r.rabbitMQURL)
|
|
|
|
|
if err != nil {
|
|
|
|
|
log.Printf("Failed to connect to RabbitMQ: %v", err)
|
|
|
|
|
time.Sleep(5 * time.Second) // Wait before retrying
|
|
|
|
|
continue
|
|
|
|
|
}
|
|
|
|
|
defer conn.Close()
|
|
|
|
|
|
|
|
|
|
ch, err := conn.Channel()
|
|
|
|
|
if err != nil {
|
|
|
|
|
log.Printf("Failed to open a channel: %v", err)
|
|
|
|
|
time.Sleep(5 * time.Second) // Wait before retrying
|
|
|
|
|
continue
|
|
|
|
|
}
|
|
|
|
|
defer ch.Close()
|
|
|
|
|
|
|
|
|
|
// Declare the exchange
|
|
|
|
|
err = ch.ExchangeDeclare(
|
|
|
|
|
r.exchangeName, // name of the exchange
|
|
|
|
|
"direct", // type of exchange
|
|
|
|
|
true, // durable
|
|
|
|
|
false, // auto-delete
|
|
|
|
|
false, // internal
|
|
|
|
|
false, // no-wait
|
|
|
|
|
nil, // arguments
|
|
|
|
|
)
|
|
|
|
|
if err != nil {
|
|
|
|
|
log.Printf("Failed to declare exchange: %v", err)
|
|
|
|
|
time.Sleep(5 * time.Second) // Wait before retrying
|
|
|
|
|
continue
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
q, err := ch.QueueDeclare(
|
|
|
|
|
r.queueName, // name
|
2024-08-30 07:12:18 +00:00
|
|
|
true, // durable
|
2024-08-29 10:14:19 +00:00
|
|
|
false, // delete when unused
|
|
|
|
|
false, // exclusive
|
|
|
|
|
false, // no-wait
|
|
|
|
|
nil, // arguments
|
|
|
|
|
)
|
|
|
|
|
if err != nil {
|
|
|
|
|
log.Printf("Failed to declare a queue: %v", err)
|
|
|
|
|
time.Sleep(5 * time.Second) // Wait before retrying
|
|
|
|
|
continue
|
|
|
|
|
}
|
|
|
|
|
err = ch.QueueBind(
|
|
|
|
|
r.queueName,
|
|
|
|
|
r.routingKey,
|
|
|
|
|
r.exchangeName,
|
|
|
|
|
false,
|
|
|
|
|
nil,
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
msgs, err := ch.Consume(
|
|
|
|
|
q.Name, // queue
|
|
|
|
|
"", // consumer
|
|
|
|
|
true, // auto-ack
|
|
|
|
|
false, // exclusive
|
|
|
|
|
false, // no-local
|
|
|
|
|
false, // no-wait
|
|
|
|
|
nil, // args
|
|
|
|
|
)
|
|
|
|
|
if err != nil {
|
|
|
|
|
log.Printf("Failed to register a consumer: %v", err)
|
|
|
|
|
time.Sleep(5 * time.Second) // Wait before retrying
|
|
|
|
|
continue
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
for {
|
|
|
|
|
select {
|
|
|
|
|
case msg := <-msgs:
|
|
|
|
|
messageHandler(ctx, msg.Body)
|
|
|
|
|
case <-ctx.Done():
|
|
|
|
|
log.Println("Consumer stopped.")
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
}
|