turkmentv_sms_transmitter/pkg/mq/rabbitmq/consumer.go

98 lines
2.0 KiB
Go

package rabbitmq
import (
"context"
"log"
"smpp-transmitter/pkg/mq"
"time"
)
type RabbitMQConsumer struct {
connection mq.Connection
exchangeName string
queueName string
routingKey string
prefetchCount int
}
func NewRabbitMQConsumer(url, exchangeName, queueName, routingKey string, prefetchCount int) mq.Consumer {
return &RabbitMQConsumer{
connection: NewRabbitMQConnection(url),
exchangeName: exchangeName,
queueName: queueName,
routingKey: routingKey,
prefetchCount: prefetchCount,
}
}
func (r *RabbitMQConsumer) Consume(handler mq.MessageHandler) {
for {
if err := r.connect(); err != nil {
log.Printf("Error connecting to RabbitMQ: %v. Retrying in 5 seconds...", err)
time.Sleep(5 * time.Second)
continue
}
if err := r.consumeMessages(handler); err != nil {
log.Printf("Error consuming messages: %v. Reconnecting...", err)
r.connection.Close()
}
time.Sleep(5 * time.Second)
}
}
func (r *RabbitMQConsumer) connect() error {
if err := r.connection.Connect(); err != nil {
return err
}
channel, err := r.connection.Channel()
if err != nil {
r.connection.Close()
return err
}
defer channel.Close()
if err := channel.ExchangeDeclare(r.exchangeName, "direct", true, false, false, false, nil); err != nil {
return err
}
if _, err := channel.QueueDeclare(r.queueName, true, false, false, false, nil); err != nil {
return err
}
if err := channel.QueueBind(r.queueName, r.routingKey, r.exchangeName, false, nil); err != nil {
return err
}
return nil
}
func (r *RabbitMQConsumer) consumeMessages(handler mq.MessageHandler) error {
channel, err := r.connection.Channel()
if err != nil {
return err
}
defer channel.Close()
msgs, err := channel.Consume(r.queueName, "", true, false, false, false, nil)
if err != nil {
return err
}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
go func() {
for msg := range msgs {
handler(ctx, msg.Body)
}
}()
// Wait for connection to close
<-r.connection.NotifyClose(make(chan *mq.Error))
return nil
}