remove old files
This commit is contained in:
parent
26595f76d5
commit
857b9e75a2
|
|
@ -1,183 +0,0 @@
|
|||
package mq
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"log"
|
||||
"os"
|
||||
"time"
|
||||
|
||||
"github.com/streadway/amqp"
|
||||
)
|
||||
|
||||
type MessageQueue interface {
|
||||
Consume(processMessage func(ctx context.Context, body []byte))
|
||||
}
|
||||
|
||||
type rabbitMQ struct {
|
||||
rabbitMQURL string
|
||||
exchangeName string
|
||||
queueName string
|
||||
routingKey string
|
||||
conn *amqp.Connection
|
||||
channel *amqp.Channel
|
||||
}
|
||||
|
||||
func NewRabbitMQ() (MessageQueue, error) {
|
||||
rabbitMQURL := os.Getenv("RABBITMQ_URL")
|
||||
routeKey := os.Getenv("ROUTE_KEY")
|
||||
queueName := os.Getenv("QUEUE_NAME")
|
||||
exchangeName := os.Getenv("EXCHABGE_NAME")
|
||||
|
||||
if rabbitMQURL == "" || queueName == "" || exchangeName == "" || routeKey == "" {
|
||||
return nil, fmt.Errorf("RABBITMQ_URL, ROUTE_KEY, EXCHABGE_NAME and QUEUE_NAME environment variables must be set")
|
||||
}
|
||||
|
||||
return &rabbitMQ{
|
||||
rabbitMQURL: rabbitMQURL,
|
||||
exchangeName: exchangeName,
|
||||
queueName: queueName,
|
||||
routingKey: routeKey,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (r *rabbitMQ) connect() error {
|
||||
var err error
|
||||
r.conn, err = amqp.Dial(r.rabbitMQURL)
|
||||
if err != nil {
|
||||
log.Printf("Failed to connect to RabbitMQ: %v", err)
|
||||
return err
|
||||
}
|
||||
|
||||
// Open a channel
|
||||
r.channel, err = r.conn.Channel()
|
||||
if err != nil {
|
||||
r.conn.Close()
|
||||
log.Printf("Failed to open a channel: %v", err)
|
||||
return err
|
||||
}
|
||||
|
||||
// Declare exchange
|
||||
err = r.channel.ExchangeDeclare(
|
||||
r.exchangeName,
|
||||
"direct",
|
||||
true,
|
||||
false,
|
||||
false,
|
||||
false,
|
||||
nil,
|
||||
)
|
||||
if err != nil {
|
||||
r.channel.Close()
|
||||
r.conn.Close()
|
||||
log.Printf("Failed to declare exchange: %v", err)
|
||||
return err
|
||||
}
|
||||
|
||||
// Declare queue
|
||||
_, err = r.channel.QueueDeclare(
|
||||
r.queueName,
|
||||
true,
|
||||
false,
|
||||
false,
|
||||
false,
|
||||
nil,
|
||||
)
|
||||
if err != nil {
|
||||
r.channel.Close()
|
||||
r.conn.Close()
|
||||
log.Printf("Failed to declare a queue: %v", err)
|
||||
return err
|
||||
}
|
||||
|
||||
// Bind queue
|
||||
err = r.channel.QueueBind(
|
||||
r.queueName,
|
||||
r.routingKey,
|
||||
r.exchangeName,
|
||||
false,
|
||||
nil,
|
||||
)
|
||||
if err != nil {
|
||||
r.channel.Close()
|
||||
r.conn.Close()
|
||||
log.Printf("Failed to bind queue: %v", err)
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (r *rabbitMQ) Consume(messageHandler func(ctx context.Context, body []byte)) {
|
||||
|
||||
for {
|
||||
// Establish initial connection and setup
|
||||
if err := r.connect(); err != nil {
|
||||
log.Printf("Error setting up RabbitMQ: %v. Retrying in 5 seconds...", err)
|
||||
time.Sleep(5 * time.Second) // Wait before retrying
|
||||
continue
|
||||
}
|
||||
|
||||
// Consume messages
|
||||
msgs, err := r.channel.Consume(
|
||||
r.queueName, // queue
|
||||
"", // consumer
|
||||
true, // auto-ack
|
||||
false, // exclusive
|
||||
false, // no-local
|
||||
false, // no-wait
|
||||
nil, // args
|
||||
)
|
||||
if err != nil {
|
||||
log.Printf("Failed to register a consumer: %v", err)
|
||||
r.channel.Close()
|
||||
r.conn.Close()
|
||||
time.Sleep(5 * time.Second) // Wait before retrying
|
||||
continue
|
||||
}
|
||||
// Start message processing
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
// Create a channel to signal when message processing is done
|
||||
done := make(chan bool)
|
||||
|
||||
go func() {
|
||||
defer close(done)
|
||||
for {
|
||||
select {
|
||||
case msg, ok := <-msgs:
|
||||
if !ok {
|
||||
log.Println("Message channel closed, reconnecting...")
|
||||
return
|
||||
}
|
||||
messageHandler(ctx, msg.Body)
|
||||
case <-ctx.Done():
|
||||
log.Println("Context canceled, shutting down...")
|
||||
return
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
// Wait for either the done signal or a connection error
|
||||
select {
|
||||
case <-done:
|
||||
// Message processing stopped, attempt to reconnect
|
||||
case <-r.conn.NotifyClose(make(chan *amqp.Error)):
|
||||
// Connection was closed, attempt to reconnect
|
||||
}
|
||||
|
||||
cancel() // Cancel the context to stop message processing
|
||||
r.cleanup()
|
||||
log.Println("Reconnecting in 5 seconds...")
|
||||
time.Sleep(5 * time.Second)
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
func (r *rabbitMQ) cleanup() {
|
||||
if r.channel != nil {
|
||||
r.channel.Close()
|
||||
}
|
||||
if r.conn != nil {
|
||||
r.conn.Close()
|
||||
}
|
||||
}
|
||||
Loading…
Reference in New Issue