diff --git a/cmd/consumer/cs.go b/cmd/consumer/cs.go index 248bcfd..a3ba0e1 100644 --- a/cmd/consumer/cs.go +++ b/cmd/consumer/cs.go @@ -5,76 +5,102 @@ import ( "log" "os" "os/signal" + "smpp-transmitter/config" "smpp-transmitter/pkg/data" fl "smpp-transmitter/pkg/logger" "smpp-transmitter/pkg/mq" + "smpp-transmitter/pkg/mq/rabbitmq" "syscall" "github.com/joho/godotenv" - "gorm.io/driver/mysql" "gorm.io/gorm" - "gorm.io/gorm/logger" ) func main() { - err := godotenv.Load() + if err := run(); err != nil { + log.Fatalf("Application error: %v", err) + } +} + +func run() error { + // Load configuration + cfg, err := loadConfig() if err != nil { - log.Fatal("Error loading .env file") + return err } - // Load environment variables - mysqlDSN := os.Getenv("MYSQL_DSN") - appEnv := os.Getenv("APP_ENV") - - if mysqlDSN == "" { - log.Fatal("MYSQL_DSN environment variable must be set") - } - - Log, err := fl.SetupLogger(appEnv) + // Setup logger + logger, err := fl.SetupLogger(cfg.AppEnv) if err != nil { - log.Fatalf("Failed to set up logger: %v", err) + return err } - // Initialize MySQL database connection - db, err := gorm.Open(mysql.Open(mysqlDSN), &gorm.Config{ - Logger: logger.Default.LogMode(logger.Info), + // Initialize database + db, err := data.InitDB(cfg.MysqlDSN) + if err != nil { + logger.Error("Failed to connect to MySQL:", err) + return err + } + + // Initialize RabbitMQ consumer + consumer, err := initRabbitMQConsumer(cfg) + if err != nil { + logger.Error("Failed to initialize RabbitMQ consumer:", err) + return err + } + + // Start consuming messages + go startConsuming(consumer, db, logger) + + // Handle graceful shutdown + waitForShutdown(logger) + + return nil +} + +func loadConfig() (*config.Config, error) { + if err := godotenv.Load(); err != nil { + return nil, err + } + + return config.NewConfig() +} + +func initRabbitMQConsumer(cfg *config.Config) (mq.Consumer, error) { + return rabbitmq.NewRabbitMQConsumer( + cfg.RabbitMQURL, + cfg.ExchangeName, + cfg.QueueName, + cfg.RoutingKey, + 1, + ), nil +} + +func startConsuming(consumer mq.Consumer, db *gorm.DB, logger fl.LogService) { + consumer.Consume(func(ctx context.Context, body []byte) { + handleMessage(ctx, body, db, logger) }) - if err != nil { - Log.Error("Failed to connect to MySQL:", err) - panic("Failed to connect to MySQL") +} + +func handleMessage(ctx context.Context, body []byte, db *gorm.DB, logger fl.LogService) { + message := &data.Message{} + + if err := message.Convert(body); err != nil { + logger.Error("Message cannot be converted", err) + return } - //initilize rabbitmq - messageQueue, err := mq.NewRabbitMQ() - if err != nil { - log.Fatalf("Failed to set up Message Broker: %v", err) + logger.Info("Received message", message.Src, message.Dst, message.Msg) + + if err := message.Insert(ctx, db); err != nil { + logger.Error("Message cannot be Inserted", err) + return } +} - // Run RabbitMQ consumer - go messageQueue.Consume(func(ctx context.Context, body []byte) { - message := &data.Message{} - - Log.Info("Received a raw message", "body", string(body)) - - if err := message.Convert(body); err != nil { - Log.Error("Message cannot be converted", err) - return - } - - if err := message.Insert(ctx, db); err != nil { - Log.Error("Message cannot be Inserted", err) - return - } - - log.Printf("Message handled %v", string(body)) - - }) - - // Handle signals for graceful shutdown +func waitForShutdown(logger fl.LogService) { sigs := make(chan os.Signal, 1) signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM) <-sigs - log.Println("Shutting down...") - Log.Info("Shutting down...") - os.Exit(0) + logger.Info("Shutting down...") } diff --git a/config/config.go b/config/config.go new file mode 100644 index 0000000..451d37b --- /dev/null +++ b/config/config.go @@ -0,0 +1,32 @@ +package config + +import ( + "errors" + "os" +) + +type Config struct { + AppEnv string + MysqlDSN string + RabbitMQURL string + ExchangeName string + QueueName string + RoutingKey string +} + +func NewConfig() (*Config, error) { + cfg := &Config{ + AppEnv: os.Getenv("APP_ENV"), + MysqlDSN: os.Getenv("MYSQL_DSN"), + RabbitMQURL: os.Getenv("RABBITMQ_URL"), + ExchangeName: os.Getenv("EXCHANGE_NAME"), + QueueName: os.Getenv("QUEUE_NAME"), + RoutingKey: os.Getenv("ROUTING_KEY"), + } + + if cfg.MysqlDSN == "" || cfg.RabbitMQURL == "" || cfg.ExchangeName == "" || cfg.QueueName == "" || cfg.RoutingKey == "" { + return nil, errors.New("missing required environment variables") + } + + return cfg, nil +} diff --git a/pkg/data/db.go b/pkg/data/db.go index 686dfde..4b6d2cf 100644 --- a/pkg/data/db.go +++ b/pkg/data/db.go @@ -6,26 +6,8 @@ import ( "gorm.io/gorm/logger" ) -type DBService interface { -} - -type mysqlDb struct { - db *gorm.DB -} - -func NewMysqlDB(mysqlDSN string) (DBService, error) { - // Initialize MySQL database connection - db, err := gorm.Open(mysql.Open(mysqlDSN), &gorm.Config{ - Logger: logger.Default.LogMode(logger.Info), +func InitDB(dsn string) (*gorm.DB, error) { + return gorm.Open(mysql.Open(dsn), &gorm.Config{ + Logger: logger.Default.LogMode(logger.Error), }) - - if err != nil { - return nil, err - } - - mdb := &mysqlDb{ - db: db, - } - - return mdb, nil } diff --git a/pkg/mq/interface.go b/pkg/mq/interface.go new file mode 100644 index 0000000..55ed4ad --- /dev/null +++ b/pkg/mq/interface.go @@ -0,0 +1,41 @@ +package mq + +import "context" + +type MessageHandler func(ctx context.Context, body []byte) + +type Consumer interface { + Consume(handler MessageHandler) +} + +type Connection interface { + Connect() error + Close() error + Channel() (Channel, error) + NotifyClose(chan *Error) chan *Error +} + +type Channel interface { + ExchangeDeclare(name, kind string, durable, autoDelete, internal, noWait bool, args Table) error + QueueDeclare(name string, durable, autoDelete, exclusive, noWait bool, args Table) (Queue, error) + QueueBind(name, key, exchange string, noWait bool, args Table) error + Consume(queue, consumer string, autoAck, exclusive, noLocal, noWait bool, args Table) (<-chan Delivery, error) + Close() error +} + +type Queue struct { + Name string +} + +type Delivery struct { + Body []byte +} + +type Error struct { + Code int + Reason string + Server bool + Recover bool +} + +type Table map[string]interface{} diff --git a/pkg/mq/rabbitmq/channel.go b/pkg/mq/rabbitmq/channel.go new file mode 100644 index 0000000..fe59ba4 --- /dev/null +++ b/pkg/mq/rabbitmq/channel.go @@ -0,0 +1,45 @@ +package rabbitmq + +import ( + "smpp-transmitter/pkg/mq" + + "github.com/streadway/amqp" +) + +type rabbitMQChannel struct { + ch *amqp.Channel +} + +func (r *rabbitMQChannel) ExchangeDeclare(name, kind string, durable, autoDelete, internal, noWait bool, args mq.Table) error { + return r.ch.ExchangeDeclare(name, kind, durable, autoDelete, internal, noWait, amqp.Table(args)) +} + +func (r *rabbitMQChannel) QueueDeclare(name string, durable, autoDelete, exclusive, noWait bool, args mq.Table) (mq.Queue, error) { + q, err := r.ch.QueueDeclare(name, durable, autoDelete, exclusive, noWait, amqp.Table(args)) + return mq.Queue{Name: q.Name}, err +} + +func (r *rabbitMQChannel) QueueBind(name, key, exchange string, noWait bool, args mq.Table) error { + return r.ch.QueueBind(name, key, exchange, noWait, amqp.Table(args)) +} + +func (r *rabbitMQChannel) Consume(queue, consumer string, autoAck, exclusive, noLocal, noWait bool, args mq.Table) (<-chan mq.Delivery, error) { + deliveries, err := r.ch.Consume(queue, consumer, autoAck, exclusive, noLocal, noWait, amqp.Table(args)) + if err != nil { + return nil, err + } + + ch := make(chan mq.Delivery) + go func() { + for d := range deliveries { + ch <- mq.Delivery{Body: d.Body} + } + close(ch) + }() + + return ch, nil +} + +func (r *rabbitMQChannel) Close() error { + return r.ch.Close() +} diff --git a/pkg/mq/rabbitmq/connection.go b/pkg/mq/rabbitmq/connection.go new file mode 100644 index 0000000..4b2b3c9 --- /dev/null +++ b/pkg/mq/rabbitmq/connection.go @@ -0,0 +1,49 @@ +package rabbitmq + +import ( + "smpp-transmitter/pkg/mq" + + "github.com/streadway/amqp" +) + +type rabbitMQConnection struct { + conn *amqp.Connection + url string +} + +func NewRabbitMQConnection(url string) mq.Connection { + return &rabbitMQConnection{url: url} +} + +func (r *rabbitMQConnection) Connect() error { + var err error + r.conn, err = amqp.Dial(r.url) + return err +} + +func (r *rabbitMQConnection) Close() error { + return r.conn.Close() +} + +func (r *rabbitMQConnection) Channel() (mq.Channel, error) { + ch, err := r.conn.Channel() + if err != nil { + return nil, err + } + return &rabbitMQChannel{ch: ch}, nil +} + +func (r *rabbitMQConnection) NotifyClose(c chan *mq.Error) chan *mq.Error { + ch := r.conn.NotifyClose(make(chan *amqp.Error)) + go func() { + for err := range ch { + c <- &mq.Error{ + Code: err.Code, + Reason: err.Reason, + Server: err.Server, + Recover: err.Recover, + } + } + }() + return c +} diff --git a/pkg/mq/rabbitmq/consumer.go b/pkg/mq/rabbitmq/consumer.go new file mode 100644 index 0000000..e10b6fe --- /dev/null +++ b/pkg/mq/rabbitmq/consumer.go @@ -0,0 +1,97 @@ +package rabbitmq + +import ( + "context" + "log" + "smpp-transmitter/pkg/mq" + "time" +) + +type RabbitMQConsumer struct { + connection mq.Connection + exchangeName string + queueName string + routingKey string + prefetchCount int +} + +func NewRabbitMQConsumer(url, exchangeName, queueName, routingKey string, prefetchCount int) mq.Consumer { + return &RabbitMQConsumer{ + connection: NewRabbitMQConnection(url), + exchangeName: exchangeName, + queueName: queueName, + routingKey: routingKey, + prefetchCount: prefetchCount, + } +} + +func (r *RabbitMQConsumer) Consume(handler mq.MessageHandler) { + for { + if err := r.connect(); err != nil { + log.Printf("Error connecting to RabbitMQ: %v. Retrying in 5 seconds...", err) + time.Sleep(5 * time.Second) + continue + } + + if err := r.consumeMessages(handler); err != nil { + log.Printf("Error consuming messages: %v. Reconnecting...", err) + r.connection.Close() + } + + time.Sleep(5 * time.Second) + } +} + +func (r *RabbitMQConsumer) connect() error { + if err := r.connection.Connect(); err != nil { + return err + } + + channel, err := r.connection.Channel() + if err != nil { + r.connection.Close() + return err + } + defer channel.Close() + + if err := channel.ExchangeDeclare(r.exchangeName, "direct", true, false, false, false, nil); err != nil { + return err + } + + if _, err := channel.QueueDeclare(r.queueName, true, false, false, false, nil); err != nil { + return err + } + + if err := channel.QueueBind(r.queueName, r.routingKey, r.exchangeName, false, nil); err != nil { + return err + } + + return nil +} + +func (r *RabbitMQConsumer) consumeMessages(handler mq.MessageHandler) error { + channel, err := r.connection.Channel() + if err != nil { + return err + } + defer channel.Close() + + msgs, err := channel.Consume(r.queueName, "", true, false, false, false, nil) + if err != nil { + return err + } + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + go func() { + for msg := range msgs { + handler(ctx, msg.Body) + } + }() + + // Wait for connection to close + <-r.connection.NotifyClose(make(chan *mq.Error)) + + return nil +}