turkmentv_sms_transmitter/pkg/mq/rabbitmq.go

113 lines
2.4 KiB
Go

package mq
import (
"context"
"log"
"time"
"github.com/streadway/amqp"
)
type MessageQueue interface {
Consume(ctx context.Context, processMessage func(ctx context.Context, body []byte))
}
type rabbitMQ struct {
rabbitMQURL string
exchangeName string
queueName string
routingKey string
}
func NewRabbitMQ(rabbitMQURL, exchangeName, queueName, routingKey string) MessageQueue {
return &rabbitMQ{
rabbitMQURL: rabbitMQURL,
exchangeName: exchangeName,
queueName: queueName,
routingKey: routingKey,
}
}
func (r *rabbitMQ) Consume(ctx context.Context, messageHandler func(ctx context.Context, body []byte)) {
for {
conn, err := amqp.Dial(r.rabbitMQURL)
if err != nil {
log.Printf("Failed to connect to RabbitMQ: %v", err)
time.Sleep(5 * time.Second) // Wait before retrying
continue
}
defer conn.Close()
ch, err := conn.Channel()
if err != nil {
log.Printf("Failed to open a channel: %v", err)
time.Sleep(5 * time.Second) // Wait before retrying
continue
}
defer ch.Close()
// Declare the exchange
err = ch.ExchangeDeclare(
r.exchangeName, // name of the exchange
"direct", // type of exchange
true, // durable
false, // auto-delete
false, // internal
false, // no-wait
nil, // arguments
)
if err != nil {
log.Printf("Failed to declare exchange: %v", err)
time.Sleep(5 * time.Second) // Wait before retrying
continue
}
q, err := ch.QueueDeclare(
r.queueName, // name
false, // durable
false, // delete when unused
false, // exclusive
false, // no-wait
nil, // arguments
)
if err != nil {
log.Printf("Failed to declare a queue: %v", err)
time.Sleep(5 * time.Second) // Wait before retrying
continue
}
err = ch.QueueBind(
r.queueName,
r.routingKey,
r.exchangeName,
false,
nil,
)
msgs, err := ch.Consume(
q.Name, // queue
"", // consumer
true, // auto-ack
false, // exclusive
false, // no-local
false, // no-wait
nil, // args
)
if err != nil {
log.Printf("Failed to register a consumer: %v", err)
time.Sleep(5 * time.Second) // Wait before retrying
continue
}
for {
select {
case msg := <-msgs:
messageHandler(ctx, msg.Body)
case <-ctx.Done():
log.Println("Consumer stopped.")
return
}
}
}
}