Compare commits

...

7 Commits
main ... dev

Author SHA1 Message Date
merdan 47fc31ec73 arassa 2025-09-29 12:27:22 +05:00
merdan 23e44ec39f try fixing badkey 2024-10-09 15:48:27 +05:00
merdan 8f9bafef28 proper logging 2024-10-09 15:42:33 +05:00
merdan 343a02c0e0 gitignore for build folder 2024-10-09 14:23:20 +05:00
merdan 857b9e75a2 remove old files 2024-10-09 14:18:22 +05:00
merdan 26595f76d5 refactor structure 2024-10-09 14:17:17 +05:00
merdan ca35971f14 refactored main branch 2024-10-09 13:03:24 +05:00
15 changed files with 361 additions and 273 deletions

View File

@ -1,5 +1,5 @@
RABBITMQ_URL=amqp://admin:admin@localhost:5672/smstv RABBITMQ_URL=amqp://admin:admin@localhost:5672/queue
MYSQL_DSN="sms:AkNOPeiyeJYj@tcp(localhost:3306)/sms" MYSQL_DSN="user:password@tcp(localhost:3306)/db"
QUEUE_NAME=sms_reply QUEUE_NAME=sms_reply
EXCHABGE_NAME=sms_reply EXCHABGE_NAME=sms_reply
ROUTE_KEY=sms_reply ROUTE_KEY=sms_reply

1
.gitignore vendored
View File

@ -4,3 +4,4 @@ cs.exe
ts ts
ts.exe ts.exe
logs logs
build

View File

@ -5,76 +5,103 @@ import (
"log" "log"
"os" "os"
"os/signal" "os/signal"
"smpp-transmitter/pkg/data" "smpp-transmitter/config"
"smpp-transmitter/internal/models"
"smpp-transmitter/pkg/database"
fl "smpp-transmitter/pkg/logger" fl "smpp-transmitter/pkg/logger"
"smpp-transmitter/pkg/mq" "smpp-transmitter/pkg/mq"
"smpp-transmitter/pkg/mq/rabbitmq"
"syscall" "syscall"
"github.com/joho/godotenv" "github.com/joho/godotenv"
"gorm.io/driver/mysql"
"gorm.io/gorm" "gorm.io/gorm"
"gorm.io/gorm/logger"
) )
func main() { func main() {
err := godotenv.Load() if err := run(); err != nil {
log.Fatalf("Application error: %v", err)
}
}
func run() error {
// Load configuration
cfg, err := loadConfig()
if err != nil { if err != nil {
log.Fatal("Error loading .env file") return err
} }
// Load environment variables // Setup logger
mysqlDSN := os.Getenv("MYSQL_DSN") logger, err := fl.SetupLogger(cfg.AppEnv)
appEnv := os.Getenv("APP_ENV")
if mysqlDSN == "" {
log.Fatal("MYSQL_DSN environment variable must be set")
}
Log, err := fl.SetupLogger(appEnv)
if err != nil { if err != nil {
log.Fatalf("Failed to set up logger: %v", err) return err
} }
// Initialize MySQL database connection // Initialize database
db, err := gorm.Open(mysql.Open(mysqlDSN), &gorm.Config{ db, err := database.InitMYSQL(cfg.MysqlDSN)
Logger: logger.Default.LogMode(logger.Info), if err != nil {
logger.Error("Failed to connect to MySQL:", "err", err)
return err
}
// Initialize RabbitMQ consumer
consumer, err := initRabbitMQConsumer(cfg)
if err != nil {
logger.Error("Failed to initialize RabbitMQ consumer:", "err", err)
return err
}
// Start consuming messages
go startConsuming(consumer, db, logger)
// Handle graceful shutdown
waitForShutdown(logger)
return nil
}
func loadConfig() (*config.Config, error) {
if err := godotenv.Load(); err != nil {
return nil, err
}
return config.NewConfig()
}
func initRabbitMQConsumer(cfg *config.Config) (mq.Consumer, error) {
return rabbitmq.NewRabbitMQConsumer(
cfg.RabbitMQURL,
cfg.ExchangeName,
cfg.QueueName,
cfg.RoutingKey,
1,
), nil
}
func startConsuming(consumer mq.Consumer, db *gorm.DB, logger fl.LogService) {
consumer.Consume(func(ctx context.Context, body []byte) {
handleMessage(ctx, body, db, logger)
}) })
if err != nil { }
Log.Error("Failed to connect to MySQL:", err)
panic("Failed to connect to MySQL") func handleMessage(ctx context.Context, body []byte, db *gorm.DB, logger fl.LogService) {
message := &models.Message{}
if err := message.Convert(body); err != nil {
logger.Error("Message cannot be converted", "err", err)
return
} }
//initilize rabbitmq logger.Info(message.Msg, "src", message.Src, "dst", message.Dst)
messageQueue, err := mq.NewRabbitMQ()
if err != nil { if err := message.Insert(ctx, db); err != nil {
log.Fatalf("Failed to set up Message Broker: %v", err) logger.Error("Message cannot be Inserted", "err", err)
return
} }
}
// Run RabbitMQ consumer func waitForShutdown(logger fl.LogService) {
go messageQueue.Consume(func(ctx context.Context, body []byte) {
message := &data.Message{}
Log.Info("Received a raw message", "body", string(body))
if err := message.Convert(body); err != nil {
Log.Error("Message cannot be converted", err)
return
}
if err := message.Insert(ctx, db); err != nil {
Log.Error("Message cannot be Inserted", err)
return
}
log.Printf("Message handled %v", string(body))
})
// Handle signals for graceful shutdown
sigs := make(chan os.Signal, 1) sigs := make(chan os.Signal, 1)
signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM) signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM)
<-sigs <-sigs
log.Println("Shutting down...") logger.Info("Shutting down...")
Log.Info("Shutting down...")
os.Exit(0)
} }

View File

@ -1 +0,0 @@
package main

32
config/config.go Normal file
View File

@ -0,0 +1,32 @@
package config
import (
"errors"
"os"
)
type Config struct {
AppEnv string
MysqlDSN string
RabbitMQURL string
ExchangeName string
QueueName string
RoutingKey string
}
func NewConfig() (*Config, error) {
cfg := &Config{
AppEnv: os.Getenv("APP_ENV"),
MysqlDSN: os.Getenv("MYSQL_DSN"),
RabbitMQURL: os.Getenv("RABBITMQ_URL"),
ExchangeName: os.Getenv("EXCHANGE_NAME"),
QueueName: os.Getenv("QUEUE_NAME"),
RoutingKey: os.Getenv("ROUTING_KEY"),
}
if cfg.MysqlDSN == "" || cfg.RabbitMQURL == "" || cfg.ExchangeName == "" || cfg.QueueName == "" || cfg.RoutingKey == "" {
return nil, errors.New("missing required environment variables")
}
return cfg, nil
}

View File

@ -1,4 +1,4 @@
package data package models
import ( import (
"context" "context"

View File

@ -1,31 +0,0 @@
package data
import (
"gorm.io/driver/mysql"
"gorm.io/gorm"
"gorm.io/gorm/logger"
)
type DBService interface {
}
type mysqlDb struct {
db *gorm.DB
}
func NewMysqlDB(mysqlDSN string) (DBService, error) {
// Initialize MySQL database connection
db, err := gorm.Open(mysql.Open(mysqlDSN), &gorm.Config{
Logger: logger.Default.LogMode(logger.Info),
})
if err != nil {
return nil, err
}
mdb := &mysqlDb{
db: db,
}
return mdb, nil
}

13
pkg/database/mysql.go Normal file
View File

@ -0,0 +1,13 @@
package database
import (
"gorm.io/driver/mysql"
"gorm.io/gorm"
"gorm.io/gorm/logger"
)
func InitMYSQL(dsn string) (*gorm.DB, error) {
return gorm.Open(mysql.Open(dsn), &gorm.Config{
Logger: logger.Default.LogMode(logger.Error),
})
}

View File

@ -1,7 +1,6 @@
package logger package logger
import ( import (
"io"
"log/slog" "log/slog"
"os" "os"
) )
@ -21,8 +20,8 @@ func SetupLogger(env string) (LogService, error) {
var errorHandler slog.Handler var errorHandler slog.Handler
if env == "test" { if env == "test" {
infoHandler = slog.NewTextHandler(io.Discard, &slog.HandlerOptions{Level: slog.LevelInfo}) infoHandler = slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{Level: slog.LevelInfo})
errorHandler = slog.NewTextHandler(io.Discard, &slog.HandlerOptions{Level: slog.LevelError}) errorHandler = slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{Level: slog.LevelError})
} else { } else {
err := os.MkdirAll("logs", 0755) err := os.MkdirAll("logs", 0755)
if err != nil { if err != nil {
@ -68,9 +67,9 @@ func SetupLogger(env string) (LogService, error) {
} }
func (l *loggers) Error(msg string, args ...any) { func (l *loggers) Error(msg string, args ...any) {
l.errorLogger.Error(msg, args) l.errorLogger.Error(msg, args...)
} }
func (l *loggers) Info(msg string, args ...any) { func (l *loggers) Info(msg string, args ...any) {
l.infoLogger.Info(msg, args) l.infoLogger.Info(msg, args...)
} }

41
pkg/mq/interface.go Normal file
View File

@ -0,0 +1,41 @@
package mq
import "context"
type MessageHandler func(ctx context.Context, body []byte)
type Consumer interface {
Consume(handler MessageHandler)
}
type Connection interface {
Connect() error
Close() error
Channel() (Channel, error)
NotifyClose(chan *Error) chan *Error
}
type Channel interface {
ExchangeDeclare(name, kind string, durable, autoDelete, internal, noWait bool, args Table) error
QueueDeclare(name string, durable, autoDelete, exclusive, noWait bool, args Table) (Queue, error)
QueueBind(name, key, exchange string, noWait bool, args Table) error
Consume(queue, consumer string, autoAck, exclusive, noLocal, noWait bool, args Table) (<-chan Delivery, error)
Close() error
}
type Queue struct {
Name string
}
type Delivery struct {
Body []byte
}
type Error struct {
Code int
Reason string
Server bool
Recover bool
}
type Table map[string]interface{}

View File

@ -1,183 +0,0 @@
package mq
import (
"context"
"fmt"
"log"
"os"
"time"
"github.com/streadway/amqp"
)
type MessageQueue interface {
Consume(processMessage func(ctx context.Context, body []byte))
}
type rabbitMQ struct {
rabbitMQURL string
exchangeName string
queueName string
routingKey string
conn *amqp.Connection
channel *amqp.Channel
}
func NewRabbitMQ() (MessageQueue, error) {
rabbitMQURL := os.Getenv("RABBITMQ_URL")
routeKey := os.Getenv("ROUTE_KEY")
queueName := os.Getenv("QUEUE_NAME")
exchangeName := os.Getenv("EXCHABGE_NAME")
if rabbitMQURL == "" || queueName == "" || exchangeName == "" || routeKey == "" {
return nil, fmt.Errorf("RABBITMQ_URL, ROUTE_KEY, EXCHABGE_NAME and QUEUE_NAME environment variables must be set")
}
return &rabbitMQ{
rabbitMQURL: rabbitMQURL,
exchangeName: exchangeName,
queueName: queueName,
routingKey: routeKey,
}, nil
}
func (r *rabbitMQ) connect() error {
var err error
r.conn, err = amqp.Dial(r.rabbitMQURL)
if err != nil {
log.Printf("Failed to connect to RabbitMQ: %v", err)
return err
}
// Open a channel
r.channel, err = r.conn.Channel()
if err != nil {
r.conn.Close()
log.Printf("Failed to open a channel: %v", err)
return err
}
// Declare exchange
err = r.channel.ExchangeDeclare(
r.exchangeName,
"direct",
true,
false,
false,
false,
nil,
)
if err != nil {
r.channel.Close()
r.conn.Close()
log.Printf("Failed to declare exchange: %v", err)
return err
}
// Declare queue
_, err = r.channel.QueueDeclare(
r.queueName,
true,
false,
false,
false,
nil,
)
if err != nil {
r.channel.Close()
r.conn.Close()
log.Printf("Failed to declare a queue: %v", err)
return err
}
// Bind queue
err = r.channel.QueueBind(
r.queueName,
r.routingKey,
r.exchangeName,
false,
nil,
)
if err != nil {
r.channel.Close()
r.conn.Close()
log.Printf("Failed to bind queue: %v", err)
return err
}
return nil
}
func (r *rabbitMQ) Consume(messageHandler func(ctx context.Context, body []byte)) {
for {
// Establish initial connection and setup
if err := r.connect(); err != nil {
log.Printf("Error setting up RabbitMQ: %v. Retrying in 5 seconds...", err)
time.Sleep(5 * time.Second) // Wait before retrying
continue
}
// Consume messages
msgs, err := r.channel.Consume(
r.queueName, // queue
"", // consumer
true, // auto-ack
false, // exclusive
false, // no-local
false, // no-wait
nil, // args
)
if err != nil {
log.Printf("Failed to register a consumer: %v", err)
r.channel.Close()
r.conn.Close()
time.Sleep(5 * time.Second) // Wait before retrying
continue
}
// Start message processing
ctx, cancel := context.WithCancel(context.Background())
// Create a channel to signal when message processing is done
done := make(chan bool)
go func() {
defer close(done)
for {
select {
case msg, ok := <-msgs:
if !ok {
log.Println("Message channel closed, reconnecting...")
return
}
messageHandler(ctx, msg.Body)
case <-ctx.Done():
log.Println("Context canceled, shutting down...")
return
}
}
}()
// Wait for either the done signal or a connection error
select {
case <-done:
// Message processing stopped, attempt to reconnect
case <-r.conn.NotifyClose(make(chan *amqp.Error)):
// Connection was closed, attempt to reconnect
}
cancel() // Cancel the context to stop message processing
r.cleanup()
log.Println("Reconnecting in 5 seconds...")
time.Sleep(5 * time.Second)
}
}
func (r *rabbitMQ) cleanup() {
if r.channel != nil {
r.channel.Close()
}
if r.conn != nil {
r.conn.Close()
}
}

View File

@ -0,0 +1,45 @@
package rabbitmq
import (
"smpp-transmitter/pkg/mq"
"github.com/streadway/amqp"
)
type rabbitMQChannel struct {
ch *amqp.Channel
}
func (r *rabbitMQChannel) ExchangeDeclare(name, kind string, durable, autoDelete, internal, noWait bool, args mq.Table) error {
return r.ch.ExchangeDeclare(name, kind, durable, autoDelete, internal, noWait, amqp.Table(args))
}
func (r *rabbitMQChannel) QueueDeclare(name string, durable, autoDelete, exclusive, noWait bool, args mq.Table) (mq.Queue, error) {
q, err := r.ch.QueueDeclare(name, durable, autoDelete, exclusive, noWait, amqp.Table(args))
return mq.Queue{Name: q.Name}, err
}
func (r *rabbitMQChannel) QueueBind(name, key, exchange string, noWait bool, args mq.Table) error {
return r.ch.QueueBind(name, key, exchange, noWait, amqp.Table(args))
}
func (r *rabbitMQChannel) Consume(queue, consumer string, autoAck, exclusive, noLocal, noWait bool, args mq.Table) (<-chan mq.Delivery, error) {
deliveries, err := r.ch.Consume(queue, consumer, autoAck, exclusive, noLocal, noWait, amqp.Table(args))
if err != nil {
return nil, err
}
ch := make(chan mq.Delivery)
go func() {
for d := range deliveries {
ch <- mq.Delivery{Body: d.Body}
}
close(ch)
}()
return ch, nil
}
func (r *rabbitMQChannel) Close() error {
return r.ch.Close()
}

View File

@ -0,0 +1,49 @@
package rabbitmq
import (
"smpp-transmitter/pkg/mq"
"github.com/streadway/amqp"
)
type rabbitMQConnection struct {
conn *amqp.Connection
url string
}
func NewRabbitMQConnection(url string) mq.Connection {
return &rabbitMQConnection{url: url}
}
func (r *rabbitMQConnection) Connect() error {
var err error
r.conn, err = amqp.Dial(r.url)
return err
}
func (r *rabbitMQConnection) Close() error {
return r.conn.Close()
}
func (r *rabbitMQConnection) Channel() (mq.Channel, error) {
ch, err := r.conn.Channel()
if err != nil {
return nil, err
}
return &rabbitMQChannel{ch: ch}, nil
}
func (r *rabbitMQConnection) NotifyClose(c chan *mq.Error) chan *mq.Error {
ch := r.conn.NotifyClose(make(chan *amqp.Error))
go func() {
for err := range ch {
c <- &mq.Error{
Code: err.Code,
Reason: err.Reason,
Server: err.Server,
Recover: err.Recover,
}
}
}()
return c
}

View File

@ -0,0 +1,97 @@
package rabbitmq
import (
"context"
"log"
"smpp-transmitter/pkg/mq"
"time"
)
type rabbitMQConsumer struct {
connection mq.Connection
exchangeName string
queueName string
routingKey string
prefetchCount int
}
func NewRabbitMQConsumer(url, exchangeName, queueName, routingKey string, prefetchCount int) mq.Consumer {
return &rabbitMQConsumer{
connection: NewRabbitMQConnection(url),
exchangeName: exchangeName,
queueName: queueName,
routingKey: routingKey,
prefetchCount: prefetchCount,
}
}
func (r *rabbitMQConsumer) Consume(handler mq.MessageHandler) {
for {
if err := r.connect(); err != nil {
log.Printf("Error connecting to RabbitMQ: %v. Retrying in 5 seconds...", err)
time.Sleep(5 * time.Second)
continue
}
if err := r.consumeMessages(handler); err != nil {
log.Printf("Error consuming messages: %v. Reconnecting...", err)
r.connection.Close()
}
time.Sleep(5 * time.Second)
}
}
func (r *rabbitMQConsumer) connect() error {
if err := r.connection.Connect(); err != nil {
return err
}
channel, err := r.connection.Channel()
if err != nil {
r.connection.Close()
return err
}
defer channel.Close()
if err := channel.ExchangeDeclare(r.exchangeName, "direct", true, false, false, false, nil); err != nil {
return err
}
if _, err := channel.QueueDeclare(r.queueName, true, false, false, false, nil); err != nil {
return err
}
if err := channel.QueueBind(r.queueName, r.routingKey, r.exchangeName, false, nil); err != nil {
return err
}
return nil
}
func (r *rabbitMQConsumer) consumeMessages(handler mq.MessageHandler) error {
channel, err := r.connection.Channel()
if err != nil {
return err
}
defer channel.Close()
msgs, err := channel.Consume(r.queueName, "", true, false, false, false, nil)
if err != nil {
return err
}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
go func() {
for msg := range msgs {
handler(ctx, msg.Body)
}
}()
// Wait for connection to close
<-r.connection.NotifyClose(make(chan *mq.Error))
return nil
}

View File

@ -1 +0,0 @@
package smpp