Compare commits
7 Commits
| Author | SHA1 | Date |
|---|---|---|
|
|
47fc31ec73 | |
|
|
23e44ec39f | |
|
|
8f9bafef28 | |
|
|
343a02c0e0 | |
|
|
857b9e75a2 | |
|
|
26595f76d5 | |
|
|
ca35971f14 |
|
|
@ -1,5 +1,5 @@
|
||||||
RABBITMQ_URL=amqp://admin:admin@localhost:5672/smstv
|
RABBITMQ_URL=amqp://admin:admin@localhost:5672/queue
|
||||||
MYSQL_DSN="sms:AkNOPeiyeJYj@tcp(localhost:3306)/sms"
|
MYSQL_DSN="user:password@tcp(localhost:3306)/db"
|
||||||
QUEUE_NAME=sms_reply
|
QUEUE_NAME=sms_reply
|
||||||
EXCHABGE_NAME=sms_reply
|
EXCHABGE_NAME=sms_reply
|
||||||
ROUTE_KEY=sms_reply
|
ROUTE_KEY=sms_reply
|
||||||
|
|
|
||||||
|
|
@ -4,3 +4,4 @@ cs.exe
|
||||||
ts
|
ts
|
||||||
ts.exe
|
ts.exe
|
||||||
logs
|
logs
|
||||||
|
build
|
||||||
|
|
@ -5,76 +5,103 @@ import (
|
||||||
"log"
|
"log"
|
||||||
"os"
|
"os"
|
||||||
"os/signal"
|
"os/signal"
|
||||||
"smpp-transmitter/pkg/data"
|
"smpp-transmitter/config"
|
||||||
|
"smpp-transmitter/internal/models"
|
||||||
|
"smpp-transmitter/pkg/database"
|
||||||
fl "smpp-transmitter/pkg/logger"
|
fl "smpp-transmitter/pkg/logger"
|
||||||
"smpp-transmitter/pkg/mq"
|
"smpp-transmitter/pkg/mq"
|
||||||
|
"smpp-transmitter/pkg/mq/rabbitmq"
|
||||||
"syscall"
|
"syscall"
|
||||||
|
|
||||||
"github.com/joho/godotenv"
|
"github.com/joho/godotenv"
|
||||||
"gorm.io/driver/mysql"
|
|
||||||
"gorm.io/gorm"
|
"gorm.io/gorm"
|
||||||
"gorm.io/gorm/logger"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
func main() {
|
func main() {
|
||||||
err := godotenv.Load()
|
if err := run(); err != nil {
|
||||||
|
log.Fatalf("Application error: %v", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func run() error {
|
||||||
|
// Load configuration
|
||||||
|
cfg, err := loadConfig()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Fatal("Error loading .env file")
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
// Load environment variables
|
// Setup logger
|
||||||
mysqlDSN := os.Getenv("MYSQL_DSN")
|
logger, err := fl.SetupLogger(cfg.AppEnv)
|
||||||
appEnv := os.Getenv("APP_ENV")
|
|
||||||
|
|
||||||
if mysqlDSN == "" {
|
|
||||||
log.Fatal("MYSQL_DSN environment variable must be set")
|
|
||||||
}
|
|
||||||
|
|
||||||
Log, err := fl.SetupLogger(appEnv)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Fatalf("Failed to set up logger: %v", err)
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
// Initialize MySQL database connection
|
// Initialize database
|
||||||
db, err := gorm.Open(mysql.Open(mysqlDSN), &gorm.Config{
|
db, err := database.InitMYSQL(cfg.MysqlDSN)
|
||||||
Logger: logger.Default.LogMode(logger.Info),
|
if err != nil {
|
||||||
|
logger.Error("Failed to connect to MySQL:", "err", err)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// Initialize RabbitMQ consumer
|
||||||
|
consumer, err := initRabbitMQConsumer(cfg)
|
||||||
|
if err != nil {
|
||||||
|
logger.Error("Failed to initialize RabbitMQ consumer:", "err", err)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// Start consuming messages
|
||||||
|
go startConsuming(consumer, db, logger)
|
||||||
|
|
||||||
|
// Handle graceful shutdown
|
||||||
|
waitForShutdown(logger)
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func loadConfig() (*config.Config, error) {
|
||||||
|
if err := godotenv.Load(); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return config.NewConfig()
|
||||||
|
}
|
||||||
|
|
||||||
|
func initRabbitMQConsumer(cfg *config.Config) (mq.Consumer, error) {
|
||||||
|
return rabbitmq.NewRabbitMQConsumer(
|
||||||
|
cfg.RabbitMQURL,
|
||||||
|
cfg.ExchangeName,
|
||||||
|
cfg.QueueName,
|
||||||
|
cfg.RoutingKey,
|
||||||
|
1,
|
||||||
|
), nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func startConsuming(consumer mq.Consumer, db *gorm.DB, logger fl.LogService) {
|
||||||
|
consumer.Consume(func(ctx context.Context, body []byte) {
|
||||||
|
handleMessage(ctx, body, db, logger)
|
||||||
})
|
})
|
||||||
if err != nil {
|
}
|
||||||
Log.Error("Failed to connect to MySQL:", err)
|
|
||||||
panic("Failed to connect to MySQL")
|
func handleMessage(ctx context.Context, body []byte, db *gorm.DB, logger fl.LogService) {
|
||||||
|
message := &models.Message{}
|
||||||
|
|
||||||
|
if err := message.Convert(body); err != nil {
|
||||||
|
logger.Error("Message cannot be converted", "err", err)
|
||||||
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
//initilize rabbitmq
|
logger.Info(message.Msg, "src", message.Src, "dst", message.Dst)
|
||||||
messageQueue, err := mq.NewRabbitMQ()
|
|
||||||
if err != nil {
|
if err := message.Insert(ctx, db); err != nil {
|
||||||
log.Fatalf("Failed to set up Message Broker: %v", err)
|
logger.Error("Message cannot be Inserted", "err", err)
|
||||||
|
return
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// Run RabbitMQ consumer
|
func waitForShutdown(logger fl.LogService) {
|
||||||
go messageQueue.Consume(func(ctx context.Context, body []byte) {
|
|
||||||
message := &data.Message{}
|
|
||||||
|
|
||||||
Log.Info("Received a raw message", "body", string(body))
|
|
||||||
|
|
||||||
if err := message.Convert(body); err != nil {
|
|
||||||
Log.Error("Message cannot be converted", err)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
if err := message.Insert(ctx, db); err != nil {
|
|
||||||
Log.Error("Message cannot be Inserted", err)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
log.Printf("Message handled %v", string(body))
|
|
||||||
|
|
||||||
})
|
|
||||||
|
|
||||||
// Handle signals for graceful shutdown
|
|
||||||
sigs := make(chan os.Signal, 1)
|
sigs := make(chan os.Signal, 1)
|
||||||
signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM)
|
signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM)
|
||||||
<-sigs
|
<-sigs
|
||||||
log.Println("Shutting down...")
|
logger.Info("Shutting down...")
|
||||||
Log.Info("Shutting down...")
|
|
||||||
os.Exit(0)
|
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -1 +0,0 @@
|
||||||
package main
|
|
||||||
|
|
@ -0,0 +1,32 @@
|
||||||
|
package config
|
||||||
|
|
||||||
|
import (
|
||||||
|
"errors"
|
||||||
|
"os"
|
||||||
|
)
|
||||||
|
|
||||||
|
type Config struct {
|
||||||
|
AppEnv string
|
||||||
|
MysqlDSN string
|
||||||
|
RabbitMQURL string
|
||||||
|
ExchangeName string
|
||||||
|
QueueName string
|
||||||
|
RoutingKey string
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewConfig() (*Config, error) {
|
||||||
|
cfg := &Config{
|
||||||
|
AppEnv: os.Getenv("APP_ENV"),
|
||||||
|
MysqlDSN: os.Getenv("MYSQL_DSN"),
|
||||||
|
RabbitMQURL: os.Getenv("RABBITMQ_URL"),
|
||||||
|
ExchangeName: os.Getenv("EXCHANGE_NAME"),
|
||||||
|
QueueName: os.Getenv("QUEUE_NAME"),
|
||||||
|
RoutingKey: os.Getenv("ROUTING_KEY"),
|
||||||
|
}
|
||||||
|
|
||||||
|
if cfg.MysqlDSN == "" || cfg.RabbitMQURL == "" || cfg.ExchangeName == "" || cfg.QueueName == "" || cfg.RoutingKey == "" {
|
||||||
|
return nil, errors.New("missing required environment variables")
|
||||||
|
}
|
||||||
|
|
||||||
|
return cfg, nil
|
||||||
|
}
|
||||||
|
|
@ -1,4 +1,4 @@
|
||||||
package data
|
package models
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
|
@ -1,31 +0,0 @@
|
||||||
package data
|
|
||||||
|
|
||||||
import (
|
|
||||||
"gorm.io/driver/mysql"
|
|
||||||
"gorm.io/gorm"
|
|
||||||
"gorm.io/gorm/logger"
|
|
||||||
)
|
|
||||||
|
|
||||||
type DBService interface {
|
|
||||||
}
|
|
||||||
|
|
||||||
type mysqlDb struct {
|
|
||||||
db *gorm.DB
|
|
||||||
}
|
|
||||||
|
|
||||||
func NewMysqlDB(mysqlDSN string) (DBService, error) {
|
|
||||||
// Initialize MySQL database connection
|
|
||||||
db, err := gorm.Open(mysql.Open(mysqlDSN), &gorm.Config{
|
|
||||||
Logger: logger.Default.LogMode(logger.Info),
|
|
||||||
})
|
|
||||||
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
mdb := &mysqlDb{
|
|
||||||
db: db,
|
|
||||||
}
|
|
||||||
|
|
||||||
return mdb, nil
|
|
||||||
}
|
|
||||||
|
|
@ -0,0 +1,13 @@
|
||||||
|
package database
|
||||||
|
|
||||||
|
import (
|
||||||
|
"gorm.io/driver/mysql"
|
||||||
|
"gorm.io/gorm"
|
||||||
|
"gorm.io/gorm/logger"
|
||||||
|
)
|
||||||
|
|
||||||
|
func InitMYSQL(dsn string) (*gorm.DB, error) {
|
||||||
|
return gorm.Open(mysql.Open(dsn), &gorm.Config{
|
||||||
|
Logger: logger.Default.LogMode(logger.Error),
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
@ -1,7 +1,6 @@
|
||||||
package logger
|
package logger
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"io"
|
|
||||||
"log/slog"
|
"log/slog"
|
||||||
"os"
|
"os"
|
||||||
)
|
)
|
||||||
|
|
@ -21,8 +20,8 @@ func SetupLogger(env string) (LogService, error) {
|
||||||
var errorHandler slog.Handler
|
var errorHandler slog.Handler
|
||||||
|
|
||||||
if env == "test" {
|
if env == "test" {
|
||||||
infoHandler = slog.NewTextHandler(io.Discard, &slog.HandlerOptions{Level: slog.LevelInfo})
|
infoHandler = slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{Level: slog.LevelInfo})
|
||||||
errorHandler = slog.NewTextHandler(io.Discard, &slog.HandlerOptions{Level: slog.LevelError})
|
errorHandler = slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{Level: slog.LevelError})
|
||||||
} else {
|
} else {
|
||||||
err := os.MkdirAll("logs", 0755)
|
err := os.MkdirAll("logs", 0755)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
@ -68,9 +67,9 @@ func SetupLogger(env string) (LogService, error) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (l *loggers) Error(msg string, args ...any) {
|
func (l *loggers) Error(msg string, args ...any) {
|
||||||
l.errorLogger.Error(msg, args)
|
l.errorLogger.Error(msg, args...)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (l *loggers) Info(msg string, args ...any) {
|
func (l *loggers) Info(msg string, args ...any) {
|
||||||
l.infoLogger.Info(msg, args)
|
l.infoLogger.Info(msg, args...)
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -0,0 +1,41 @@
|
||||||
|
package mq
|
||||||
|
|
||||||
|
import "context"
|
||||||
|
|
||||||
|
type MessageHandler func(ctx context.Context, body []byte)
|
||||||
|
|
||||||
|
type Consumer interface {
|
||||||
|
Consume(handler MessageHandler)
|
||||||
|
}
|
||||||
|
|
||||||
|
type Connection interface {
|
||||||
|
Connect() error
|
||||||
|
Close() error
|
||||||
|
Channel() (Channel, error)
|
||||||
|
NotifyClose(chan *Error) chan *Error
|
||||||
|
}
|
||||||
|
|
||||||
|
type Channel interface {
|
||||||
|
ExchangeDeclare(name, kind string, durable, autoDelete, internal, noWait bool, args Table) error
|
||||||
|
QueueDeclare(name string, durable, autoDelete, exclusive, noWait bool, args Table) (Queue, error)
|
||||||
|
QueueBind(name, key, exchange string, noWait bool, args Table) error
|
||||||
|
Consume(queue, consumer string, autoAck, exclusive, noLocal, noWait bool, args Table) (<-chan Delivery, error)
|
||||||
|
Close() error
|
||||||
|
}
|
||||||
|
|
||||||
|
type Queue struct {
|
||||||
|
Name string
|
||||||
|
}
|
||||||
|
|
||||||
|
type Delivery struct {
|
||||||
|
Body []byte
|
||||||
|
}
|
||||||
|
|
||||||
|
type Error struct {
|
||||||
|
Code int
|
||||||
|
Reason string
|
||||||
|
Server bool
|
||||||
|
Recover bool
|
||||||
|
}
|
||||||
|
|
||||||
|
type Table map[string]interface{}
|
||||||
|
|
@ -1,183 +0,0 @@
|
||||||
package mq
|
|
||||||
|
|
||||||
import (
|
|
||||||
"context"
|
|
||||||
"fmt"
|
|
||||||
"log"
|
|
||||||
"os"
|
|
||||||
"time"
|
|
||||||
|
|
||||||
"github.com/streadway/amqp"
|
|
||||||
)
|
|
||||||
|
|
||||||
type MessageQueue interface {
|
|
||||||
Consume(processMessage func(ctx context.Context, body []byte))
|
|
||||||
}
|
|
||||||
|
|
||||||
type rabbitMQ struct {
|
|
||||||
rabbitMQURL string
|
|
||||||
exchangeName string
|
|
||||||
queueName string
|
|
||||||
routingKey string
|
|
||||||
conn *amqp.Connection
|
|
||||||
channel *amqp.Channel
|
|
||||||
}
|
|
||||||
|
|
||||||
func NewRabbitMQ() (MessageQueue, error) {
|
|
||||||
rabbitMQURL := os.Getenv("RABBITMQ_URL")
|
|
||||||
routeKey := os.Getenv("ROUTE_KEY")
|
|
||||||
queueName := os.Getenv("QUEUE_NAME")
|
|
||||||
exchangeName := os.Getenv("EXCHABGE_NAME")
|
|
||||||
|
|
||||||
if rabbitMQURL == "" || queueName == "" || exchangeName == "" || routeKey == "" {
|
|
||||||
return nil, fmt.Errorf("RABBITMQ_URL, ROUTE_KEY, EXCHABGE_NAME and QUEUE_NAME environment variables must be set")
|
|
||||||
}
|
|
||||||
|
|
||||||
return &rabbitMQ{
|
|
||||||
rabbitMQURL: rabbitMQURL,
|
|
||||||
exchangeName: exchangeName,
|
|
||||||
queueName: queueName,
|
|
||||||
routingKey: routeKey,
|
|
||||||
}, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (r *rabbitMQ) connect() error {
|
|
||||||
var err error
|
|
||||||
r.conn, err = amqp.Dial(r.rabbitMQURL)
|
|
||||||
if err != nil {
|
|
||||||
log.Printf("Failed to connect to RabbitMQ: %v", err)
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
// Open a channel
|
|
||||||
r.channel, err = r.conn.Channel()
|
|
||||||
if err != nil {
|
|
||||||
r.conn.Close()
|
|
||||||
log.Printf("Failed to open a channel: %v", err)
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
// Declare exchange
|
|
||||||
err = r.channel.ExchangeDeclare(
|
|
||||||
r.exchangeName,
|
|
||||||
"direct",
|
|
||||||
true,
|
|
||||||
false,
|
|
||||||
false,
|
|
||||||
false,
|
|
||||||
nil,
|
|
||||||
)
|
|
||||||
if err != nil {
|
|
||||||
r.channel.Close()
|
|
||||||
r.conn.Close()
|
|
||||||
log.Printf("Failed to declare exchange: %v", err)
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
// Declare queue
|
|
||||||
_, err = r.channel.QueueDeclare(
|
|
||||||
r.queueName,
|
|
||||||
true,
|
|
||||||
false,
|
|
||||||
false,
|
|
||||||
false,
|
|
||||||
nil,
|
|
||||||
)
|
|
||||||
if err != nil {
|
|
||||||
r.channel.Close()
|
|
||||||
r.conn.Close()
|
|
||||||
log.Printf("Failed to declare a queue: %v", err)
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
// Bind queue
|
|
||||||
err = r.channel.QueueBind(
|
|
||||||
r.queueName,
|
|
||||||
r.routingKey,
|
|
||||||
r.exchangeName,
|
|
||||||
false,
|
|
||||||
nil,
|
|
||||||
)
|
|
||||||
if err != nil {
|
|
||||||
r.channel.Close()
|
|
||||||
r.conn.Close()
|
|
||||||
log.Printf("Failed to bind queue: %v", err)
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (r *rabbitMQ) Consume(messageHandler func(ctx context.Context, body []byte)) {
|
|
||||||
|
|
||||||
for {
|
|
||||||
// Establish initial connection and setup
|
|
||||||
if err := r.connect(); err != nil {
|
|
||||||
log.Printf("Error setting up RabbitMQ: %v. Retrying in 5 seconds...", err)
|
|
||||||
time.Sleep(5 * time.Second) // Wait before retrying
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
// Consume messages
|
|
||||||
msgs, err := r.channel.Consume(
|
|
||||||
r.queueName, // queue
|
|
||||||
"", // consumer
|
|
||||||
true, // auto-ack
|
|
||||||
false, // exclusive
|
|
||||||
false, // no-local
|
|
||||||
false, // no-wait
|
|
||||||
nil, // args
|
|
||||||
)
|
|
||||||
if err != nil {
|
|
||||||
log.Printf("Failed to register a consumer: %v", err)
|
|
||||||
r.channel.Close()
|
|
||||||
r.conn.Close()
|
|
||||||
time.Sleep(5 * time.Second) // Wait before retrying
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
// Start message processing
|
|
||||||
ctx, cancel := context.WithCancel(context.Background())
|
|
||||||
// Create a channel to signal when message processing is done
|
|
||||||
done := make(chan bool)
|
|
||||||
|
|
||||||
go func() {
|
|
||||||
defer close(done)
|
|
||||||
for {
|
|
||||||
select {
|
|
||||||
case msg, ok := <-msgs:
|
|
||||||
if !ok {
|
|
||||||
log.Println("Message channel closed, reconnecting...")
|
|
||||||
return
|
|
||||||
}
|
|
||||||
messageHandler(ctx, msg.Body)
|
|
||||||
case <-ctx.Done():
|
|
||||||
log.Println("Context canceled, shutting down...")
|
|
||||||
return
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
|
|
||||||
// Wait for either the done signal or a connection error
|
|
||||||
select {
|
|
||||||
case <-done:
|
|
||||||
// Message processing stopped, attempt to reconnect
|
|
||||||
case <-r.conn.NotifyClose(make(chan *amqp.Error)):
|
|
||||||
// Connection was closed, attempt to reconnect
|
|
||||||
}
|
|
||||||
|
|
||||||
cancel() // Cancel the context to stop message processing
|
|
||||||
r.cleanup()
|
|
||||||
log.Println("Reconnecting in 5 seconds...")
|
|
||||||
time.Sleep(5 * time.Second)
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
func (r *rabbitMQ) cleanup() {
|
|
||||||
if r.channel != nil {
|
|
||||||
r.channel.Close()
|
|
||||||
}
|
|
||||||
if r.conn != nil {
|
|
||||||
r.conn.Close()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
@ -0,0 +1,45 @@
|
||||||
|
package rabbitmq
|
||||||
|
|
||||||
|
import (
|
||||||
|
"smpp-transmitter/pkg/mq"
|
||||||
|
|
||||||
|
"github.com/streadway/amqp"
|
||||||
|
)
|
||||||
|
|
||||||
|
type rabbitMQChannel struct {
|
||||||
|
ch *amqp.Channel
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *rabbitMQChannel) ExchangeDeclare(name, kind string, durable, autoDelete, internal, noWait bool, args mq.Table) error {
|
||||||
|
return r.ch.ExchangeDeclare(name, kind, durable, autoDelete, internal, noWait, amqp.Table(args))
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *rabbitMQChannel) QueueDeclare(name string, durable, autoDelete, exclusive, noWait bool, args mq.Table) (mq.Queue, error) {
|
||||||
|
q, err := r.ch.QueueDeclare(name, durable, autoDelete, exclusive, noWait, amqp.Table(args))
|
||||||
|
return mq.Queue{Name: q.Name}, err
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *rabbitMQChannel) QueueBind(name, key, exchange string, noWait bool, args mq.Table) error {
|
||||||
|
return r.ch.QueueBind(name, key, exchange, noWait, amqp.Table(args))
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *rabbitMQChannel) Consume(queue, consumer string, autoAck, exclusive, noLocal, noWait bool, args mq.Table) (<-chan mq.Delivery, error) {
|
||||||
|
deliveries, err := r.ch.Consume(queue, consumer, autoAck, exclusive, noLocal, noWait, amqp.Table(args))
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
ch := make(chan mq.Delivery)
|
||||||
|
go func() {
|
||||||
|
for d := range deliveries {
|
||||||
|
ch <- mq.Delivery{Body: d.Body}
|
||||||
|
}
|
||||||
|
close(ch)
|
||||||
|
}()
|
||||||
|
|
||||||
|
return ch, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *rabbitMQChannel) Close() error {
|
||||||
|
return r.ch.Close()
|
||||||
|
}
|
||||||
|
|
@ -0,0 +1,49 @@
|
||||||
|
package rabbitmq
|
||||||
|
|
||||||
|
import (
|
||||||
|
"smpp-transmitter/pkg/mq"
|
||||||
|
|
||||||
|
"github.com/streadway/amqp"
|
||||||
|
)
|
||||||
|
|
||||||
|
type rabbitMQConnection struct {
|
||||||
|
conn *amqp.Connection
|
||||||
|
url string
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewRabbitMQConnection(url string) mq.Connection {
|
||||||
|
return &rabbitMQConnection{url: url}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *rabbitMQConnection) Connect() error {
|
||||||
|
var err error
|
||||||
|
r.conn, err = amqp.Dial(r.url)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *rabbitMQConnection) Close() error {
|
||||||
|
return r.conn.Close()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *rabbitMQConnection) Channel() (mq.Channel, error) {
|
||||||
|
ch, err := r.conn.Channel()
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
return &rabbitMQChannel{ch: ch}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *rabbitMQConnection) NotifyClose(c chan *mq.Error) chan *mq.Error {
|
||||||
|
ch := r.conn.NotifyClose(make(chan *amqp.Error))
|
||||||
|
go func() {
|
||||||
|
for err := range ch {
|
||||||
|
c <- &mq.Error{
|
||||||
|
Code: err.Code,
|
||||||
|
Reason: err.Reason,
|
||||||
|
Server: err.Server,
|
||||||
|
Recover: err.Recover,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
return c
|
||||||
|
}
|
||||||
|
|
@ -0,0 +1,97 @@
|
||||||
|
package rabbitmq
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"log"
|
||||||
|
"smpp-transmitter/pkg/mq"
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
|
type rabbitMQConsumer struct {
|
||||||
|
connection mq.Connection
|
||||||
|
exchangeName string
|
||||||
|
queueName string
|
||||||
|
routingKey string
|
||||||
|
prefetchCount int
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewRabbitMQConsumer(url, exchangeName, queueName, routingKey string, prefetchCount int) mq.Consumer {
|
||||||
|
return &rabbitMQConsumer{
|
||||||
|
connection: NewRabbitMQConnection(url),
|
||||||
|
exchangeName: exchangeName,
|
||||||
|
queueName: queueName,
|
||||||
|
routingKey: routingKey,
|
||||||
|
prefetchCount: prefetchCount,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *rabbitMQConsumer) Consume(handler mq.MessageHandler) {
|
||||||
|
for {
|
||||||
|
if err := r.connect(); err != nil {
|
||||||
|
log.Printf("Error connecting to RabbitMQ: %v. Retrying in 5 seconds...", err)
|
||||||
|
time.Sleep(5 * time.Second)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := r.consumeMessages(handler); err != nil {
|
||||||
|
log.Printf("Error consuming messages: %v. Reconnecting...", err)
|
||||||
|
r.connection.Close()
|
||||||
|
}
|
||||||
|
|
||||||
|
time.Sleep(5 * time.Second)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *rabbitMQConsumer) connect() error {
|
||||||
|
if err := r.connection.Connect(); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
channel, err := r.connection.Channel()
|
||||||
|
if err != nil {
|
||||||
|
r.connection.Close()
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
defer channel.Close()
|
||||||
|
|
||||||
|
if err := channel.ExchangeDeclare(r.exchangeName, "direct", true, false, false, false, nil); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
if _, err := channel.QueueDeclare(r.queueName, true, false, false, false, nil); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := channel.QueueBind(r.queueName, r.routingKey, r.exchangeName, false, nil); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *rabbitMQConsumer) consumeMessages(handler mq.MessageHandler) error {
|
||||||
|
channel, err := r.connection.Channel()
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
defer channel.Close()
|
||||||
|
|
||||||
|
msgs, err := channel.Consume(r.queueName, "", true, false, false, false, nil)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
ctx, cancel := context.WithCancel(context.Background())
|
||||||
|
defer cancel()
|
||||||
|
|
||||||
|
go func() {
|
||||||
|
for msg := range msgs {
|
||||||
|
handler(ctx, msg.Body)
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
// Wait for connection to close
|
||||||
|
<-r.connection.NotifyClose(make(chan *mq.Error))
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
@ -1 +0,0 @@
|
||||||
package smpp
|
|
||||||
Loading…
Reference in New Issue