Compare commits

..

No commits in common. "dev" and "main" have entirely different histories.
dev ... main

15 changed files with 273 additions and 361 deletions

View File

@ -1,5 +1,5 @@
RABBITMQ_URL=amqp://admin:admin@localhost:5672/queue
MYSQL_DSN="user:password@tcp(localhost:3306)/db"
RABBITMQ_URL=amqp://admin:admin@localhost:5672/smstv
MYSQL_DSN="sms:AkNOPeiyeJYj@tcp(localhost:3306)/sms"
QUEUE_NAME=sms_reply
EXCHABGE_NAME=sms_reply
ROUTE_KEY=sms_reply

1
.gitignore vendored
View File

@ -4,4 +4,3 @@ cs.exe
ts
ts.exe
logs
build

View File

@ -5,103 +5,76 @@ import (
"log"
"os"
"os/signal"
"smpp-transmitter/config"
"smpp-transmitter/internal/models"
"smpp-transmitter/pkg/database"
"smpp-transmitter/pkg/data"
fl "smpp-transmitter/pkg/logger"
"smpp-transmitter/pkg/mq"
"smpp-transmitter/pkg/mq/rabbitmq"
"syscall"
"github.com/joho/godotenv"
"gorm.io/driver/mysql"
"gorm.io/gorm"
"gorm.io/gorm/logger"
)
func main() {
if err := run(); err != nil {
log.Fatalf("Application error: %v", err)
}
}
func run() error {
// Load configuration
cfg, err := loadConfig()
err := godotenv.Load()
if err != nil {
return err
log.Fatal("Error loading .env file")
}
// Setup logger
logger, err := fl.SetupLogger(cfg.AppEnv)
// Load environment variables
mysqlDSN := os.Getenv("MYSQL_DSN")
appEnv := os.Getenv("APP_ENV")
if mysqlDSN == "" {
log.Fatal("MYSQL_DSN environment variable must be set")
}
Log, err := fl.SetupLogger(appEnv)
if err != nil {
return err
log.Fatalf("Failed to set up logger: %v", err)
}
// Initialize database
db, err := database.InitMYSQL(cfg.MysqlDSN)
if err != nil {
logger.Error("Failed to connect to MySQL:", "err", err)
return err
}
// Initialize RabbitMQ consumer
consumer, err := initRabbitMQConsumer(cfg)
if err != nil {
logger.Error("Failed to initialize RabbitMQ consumer:", "err", err)
return err
}
// Start consuming messages
go startConsuming(consumer, db, logger)
// Handle graceful shutdown
waitForShutdown(logger)
return nil
}
func loadConfig() (*config.Config, error) {
if err := godotenv.Load(); err != nil {
return nil, err
}
return config.NewConfig()
}
func initRabbitMQConsumer(cfg *config.Config) (mq.Consumer, error) {
return rabbitmq.NewRabbitMQConsumer(
cfg.RabbitMQURL,
cfg.ExchangeName,
cfg.QueueName,
cfg.RoutingKey,
1,
), nil
}
func startConsuming(consumer mq.Consumer, db *gorm.DB, logger fl.LogService) {
consumer.Consume(func(ctx context.Context, body []byte) {
handleMessage(ctx, body, db, logger)
// Initialize MySQL database connection
db, err := gorm.Open(mysql.Open(mysqlDSN), &gorm.Config{
Logger: logger.Default.LogMode(logger.Info),
})
}
func handleMessage(ctx context.Context, body []byte, db *gorm.DB, logger fl.LogService) {
message := &models.Message{}
if err := message.Convert(body); err != nil {
logger.Error("Message cannot be converted", "err", err)
return
if err != nil {
Log.Error("Failed to connect to MySQL:", err)
panic("Failed to connect to MySQL")
}
logger.Info(message.Msg, "src", message.Src, "dst", message.Dst)
if err := message.Insert(ctx, db); err != nil {
logger.Error("Message cannot be Inserted", "err", err)
return
//initilize rabbitmq
messageQueue, err := mq.NewRabbitMQ()
if err != nil {
log.Fatalf("Failed to set up Message Broker: %v", err)
}
}
func waitForShutdown(logger fl.LogService) {
// Run RabbitMQ consumer
go messageQueue.Consume(func(ctx context.Context, body []byte) {
message := &data.Message{}
Log.Info("Received a raw message", "body", string(body))
if err := message.Convert(body); err != nil {
Log.Error("Message cannot be converted", err)
return
}
if err := message.Insert(ctx, db); err != nil {
Log.Error("Message cannot be Inserted", err)
return
}
log.Printf("Message handled %v", string(body))
})
// Handle signals for graceful shutdown
sigs := make(chan os.Signal, 1)
signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM)
<-sigs
logger.Info("Shutting down...")
log.Println("Shutting down...")
Log.Info("Shutting down...")
os.Exit(0)
}

1
cmd/transmitter/ts.go Normal file
View File

@ -0,0 +1 @@
package main

View File

@ -1,32 +0,0 @@
package config
import (
"errors"
"os"
)
type Config struct {
AppEnv string
MysqlDSN string
RabbitMQURL string
ExchangeName string
QueueName string
RoutingKey string
}
func NewConfig() (*Config, error) {
cfg := &Config{
AppEnv: os.Getenv("APP_ENV"),
MysqlDSN: os.Getenv("MYSQL_DSN"),
RabbitMQURL: os.Getenv("RABBITMQ_URL"),
ExchangeName: os.Getenv("EXCHANGE_NAME"),
QueueName: os.Getenv("QUEUE_NAME"),
RoutingKey: os.Getenv("ROUTING_KEY"),
}
if cfg.MysqlDSN == "" || cfg.RabbitMQURL == "" || cfg.ExchangeName == "" || cfg.QueueName == "" || cfg.RoutingKey == "" {
return nil, errors.New("missing required environment variables")
}
return cfg, nil
}

31
pkg/data/db.go Normal file
View File

@ -0,0 +1,31 @@
package data
import (
"gorm.io/driver/mysql"
"gorm.io/gorm"
"gorm.io/gorm/logger"
)
type DBService interface {
}
type mysqlDb struct {
db *gorm.DB
}
func NewMysqlDB(mysqlDSN string) (DBService, error) {
// Initialize MySQL database connection
db, err := gorm.Open(mysql.Open(mysqlDSN), &gorm.Config{
Logger: logger.Default.LogMode(logger.Info),
})
if err != nil {
return nil, err
}
mdb := &mysqlDb{
db: db,
}
return mdb, nil
}

View File

@ -1,4 +1,4 @@
package models
package data
import (
"context"

View File

@ -1,13 +0,0 @@
package database
import (
"gorm.io/driver/mysql"
"gorm.io/gorm"
"gorm.io/gorm/logger"
)
func InitMYSQL(dsn string) (*gorm.DB, error) {
return gorm.Open(mysql.Open(dsn), &gorm.Config{
Logger: logger.Default.LogMode(logger.Error),
})
}

View File

@ -1,6 +1,7 @@
package logger
import (
"io"
"log/slog"
"os"
)
@ -20,8 +21,8 @@ func SetupLogger(env string) (LogService, error) {
var errorHandler slog.Handler
if env == "test" {
infoHandler = slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{Level: slog.LevelInfo})
errorHandler = slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{Level: slog.LevelError})
infoHandler = slog.NewTextHandler(io.Discard, &slog.HandlerOptions{Level: slog.LevelInfo})
errorHandler = slog.NewTextHandler(io.Discard, &slog.HandlerOptions{Level: slog.LevelError})
} else {
err := os.MkdirAll("logs", 0755)
if err != nil {
@ -67,9 +68,9 @@ func SetupLogger(env string) (LogService, error) {
}
func (l *loggers) Error(msg string, args ...any) {
l.errorLogger.Error(msg, args...)
l.errorLogger.Error(msg, args)
}
func (l *loggers) Info(msg string, args ...any) {
l.infoLogger.Info(msg, args...)
l.infoLogger.Info(msg, args)
}

View File

@ -1,41 +0,0 @@
package mq
import "context"
type MessageHandler func(ctx context.Context, body []byte)
type Consumer interface {
Consume(handler MessageHandler)
}
type Connection interface {
Connect() error
Close() error
Channel() (Channel, error)
NotifyClose(chan *Error) chan *Error
}
type Channel interface {
ExchangeDeclare(name, kind string, durable, autoDelete, internal, noWait bool, args Table) error
QueueDeclare(name string, durable, autoDelete, exclusive, noWait bool, args Table) (Queue, error)
QueueBind(name, key, exchange string, noWait bool, args Table) error
Consume(queue, consumer string, autoAck, exclusive, noLocal, noWait bool, args Table) (<-chan Delivery, error)
Close() error
}
type Queue struct {
Name string
}
type Delivery struct {
Body []byte
}
type Error struct {
Code int
Reason string
Server bool
Recover bool
}
type Table map[string]interface{}

183
pkg/mq/rabbitmq.go Normal file
View File

@ -0,0 +1,183 @@
package mq
import (
"context"
"fmt"
"log"
"os"
"time"
"github.com/streadway/amqp"
)
type MessageQueue interface {
Consume(processMessage func(ctx context.Context, body []byte))
}
type rabbitMQ struct {
rabbitMQURL string
exchangeName string
queueName string
routingKey string
conn *amqp.Connection
channel *amqp.Channel
}
func NewRabbitMQ() (MessageQueue, error) {
rabbitMQURL := os.Getenv("RABBITMQ_URL")
routeKey := os.Getenv("ROUTE_KEY")
queueName := os.Getenv("QUEUE_NAME")
exchangeName := os.Getenv("EXCHABGE_NAME")
if rabbitMQURL == "" || queueName == "" || exchangeName == "" || routeKey == "" {
return nil, fmt.Errorf("RABBITMQ_URL, ROUTE_KEY, EXCHABGE_NAME and QUEUE_NAME environment variables must be set")
}
return &rabbitMQ{
rabbitMQURL: rabbitMQURL,
exchangeName: exchangeName,
queueName: queueName,
routingKey: routeKey,
}, nil
}
func (r *rabbitMQ) connect() error {
var err error
r.conn, err = amqp.Dial(r.rabbitMQURL)
if err != nil {
log.Printf("Failed to connect to RabbitMQ: %v", err)
return err
}
// Open a channel
r.channel, err = r.conn.Channel()
if err != nil {
r.conn.Close()
log.Printf("Failed to open a channel: %v", err)
return err
}
// Declare exchange
err = r.channel.ExchangeDeclare(
r.exchangeName,
"direct",
true,
false,
false,
false,
nil,
)
if err != nil {
r.channel.Close()
r.conn.Close()
log.Printf("Failed to declare exchange: %v", err)
return err
}
// Declare queue
_, err = r.channel.QueueDeclare(
r.queueName,
true,
false,
false,
false,
nil,
)
if err != nil {
r.channel.Close()
r.conn.Close()
log.Printf("Failed to declare a queue: %v", err)
return err
}
// Bind queue
err = r.channel.QueueBind(
r.queueName,
r.routingKey,
r.exchangeName,
false,
nil,
)
if err != nil {
r.channel.Close()
r.conn.Close()
log.Printf("Failed to bind queue: %v", err)
return err
}
return nil
}
func (r *rabbitMQ) Consume(messageHandler func(ctx context.Context, body []byte)) {
for {
// Establish initial connection and setup
if err := r.connect(); err != nil {
log.Printf("Error setting up RabbitMQ: %v. Retrying in 5 seconds...", err)
time.Sleep(5 * time.Second) // Wait before retrying
continue
}
// Consume messages
msgs, err := r.channel.Consume(
r.queueName, // queue
"", // consumer
true, // auto-ack
false, // exclusive
false, // no-local
false, // no-wait
nil, // args
)
if err != nil {
log.Printf("Failed to register a consumer: %v", err)
r.channel.Close()
r.conn.Close()
time.Sleep(5 * time.Second) // Wait before retrying
continue
}
// Start message processing
ctx, cancel := context.WithCancel(context.Background())
// Create a channel to signal when message processing is done
done := make(chan bool)
go func() {
defer close(done)
for {
select {
case msg, ok := <-msgs:
if !ok {
log.Println("Message channel closed, reconnecting...")
return
}
messageHandler(ctx, msg.Body)
case <-ctx.Done():
log.Println("Context canceled, shutting down...")
return
}
}
}()
// Wait for either the done signal or a connection error
select {
case <-done:
// Message processing stopped, attempt to reconnect
case <-r.conn.NotifyClose(make(chan *amqp.Error)):
// Connection was closed, attempt to reconnect
}
cancel() // Cancel the context to stop message processing
r.cleanup()
log.Println("Reconnecting in 5 seconds...")
time.Sleep(5 * time.Second)
}
}
func (r *rabbitMQ) cleanup() {
if r.channel != nil {
r.channel.Close()
}
if r.conn != nil {
r.conn.Close()
}
}

View File

@ -1,45 +0,0 @@
package rabbitmq
import (
"smpp-transmitter/pkg/mq"
"github.com/streadway/amqp"
)
type rabbitMQChannel struct {
ch *amqp.Channel
}
func (r *rabbitMQChannel) ExchangeDeclare(name, kind string, durable, autoDelete, internal, noWait bool, args mq.Table) error {
return r.ch.ExchangeDeclare(name, kind, durable, autoDelete, internal, noWait, amqp.Table(args))
}
func (r *rabbitMQChannel) QueueDeclare(name string, durable, autoDelete, exclusive, noWait bool, args mq.Table) (mq.Queue, error) {
q, err := r.ch.QueueDeclare(name, durable, autoDelete, exclusive, noWait, amqp.Table(args))
return mq.Queue{Name: q.Name}, err
}
func (r *rabbitMQChannel) QueueBind(name, key, exchange string, noWait bool, args mq.Table) error {
return r.ch.QueueBind(name, key, exchange, noWait, amqp.Table(args))
}
func (r *rabbitMQChannel) Consume(queue, consumer string, autoAck, exclusive, noLocal, noWait bool, args mq.Table) (<-chan mq.Delivery, error) {
deliveries, err := r.ch.Consume(queue, consumer, autoAck, exclusive, noLocal, noWait, amqp.Table(args))
if err != nil {
return nil, err
}
ch := make(chan mq.Delivery)
go func() {
for d := range deliveries {
ch <- mq.Delivery{Body: d.Body}
}
close(ch)
}()
return ch, nil
}
func (r *rabbitMQChannel) Close() error {
return r.ch.Close()
}

View File

@ -1,49 +0,0 @@
package rabbitmq
import (
"smpp-transmitter/pkg/mq"
"github.com/streadway/amqp"
)
type rabbitMQConnection struct {
conn *amqp.Connection
url string
}
func NewRabbitMQConnection(url string) mq.Connection {
return &rabbitMQConnection{url: url}
}
func (r *rabbitMQConnection) Connect() error {
var err error
r.conn, err = amqp.Dial(r.url)
return err
}
func (r *rabbitMQConnection) Close() error {
return r.conn.Close()
}
func (r *rabbitMQConnection) Channel() (mq.Channel, error) {
ch, err := r.conn.Channel()
if err != nil {
return nil, err
}
return &rabbitMQChannel{ch: ch}, nil
}
func (r *rabbitMQConnection) NotifyClose(c chan *mq.Error) chan *mq.Error {
ch := r.conn.NotifyClose(make(chan *amqp.Error))
go func() {
for err := range ch {
c <- &mq.Error{
Code: err.Code,
Reason: err.Reason,
Server: err.Server,
Recover: err.Recover,
}
}
}()
return c
}

View File

@ -1,97 +0,0 @@
package rabbitmq
import (
"context"
"log"
"smpp-transmitter/pkg/mq"
"time"
)
type rabbitMQConsumer struct {
connection mq.Connection
exchangeName string
queueName string
routingKey string
prefetchCount int
}
func NewRabbitMQConsumer(url, exchangeName, queueName, routingKey string, prefetchCount int) mq.Consumer {
return &rabbitMQConsumer{
connection: NewRabbitMQConnection(url),
exchangeName: exchangeName,
queueName: queueName,
routingKey: routingKey,
prefetchCount: prefetchCount,
}
}
func (r *rabbitMQConsumer) Consume(handler mq.MessageHandler) {
for {
if err := r.connect(); err != nil {
log.Printf("Error connecting to RabbitMQ: %v. Retrying in 5 seconds...", err)
time.Sleep(5 * time.Second)
continue
}
if err := r.consumeMessages(handler); err != nil {
log.Printf("Error consuming messages: %v. Reconnecting...", err)
r.connection.Close()
}
time.Sleep(5 * time.Second)
}
}
func (r *rabbitMQConsumer) connect() error {
if err := r.connection.Connect(); err != nil {
return err
}
channel, err := r.connection.Channel()
if err != nil {
r.connection.Close()
return err
}
defer channel.Close()
if err := channel.ExchangeDeclare(r.exchangeName, "direct", true, false, false, false, nil); err != nil {
return err
}
if _, err := channel.QueueDeclare(r.queueName, true, false, false, false, nil); err != nil {
return err
}
if err := channel.QueueBind(r.queueName, r.routingKey, r.exchangeName, false, nil); err != nil {
return err
}
return nil
}
func (r *rabbitMQConsumer) consumeMessages(handler mq.MessageHandler) error {
channel, err := r.connection.Channel()
if err != nil {
return err
}
defer channel.Close()
msgs, err := channel.Consume(r.queueName, "", true, false, false, false, nil)
if err != nil {
return err
}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
go func() {
for msg := range msgs {
handler(ctx, msg.Body)
}
}()
// Wait for connection to close
<-r.connection.NotifyClose(make(chan *mq.Error))
return nil
}

1
pkg/smpp/smpp.go Normal file
View File

@ -0,0 +1 @@
package smpp