commit fb030000a11fe3f0990959afb6dfa5a990051bd0 Author: merdan Date: Thu Aug 29 15:14:19 2024 +0500 initial commit diff --git a/.env.example b/.env.example new file mode 100644 index 0000000..37c2cfc --- /dev/null +++ b/.env.example @@ -0,0 +1,6 @@ +RABBITMQ_URL=amqp://admin:admin@localhost:5672/smstv +MYSQL_DSN="sms:AkNOPeiyeJYj@tcp(localhost:3306)/sms" +QUEUE_NAME=sms_reply +EXCHABGE_NAME=sms_reply +ROUTE_KEY=sms_reply +APP_ENV=test diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..9d924cc --- /dev/null +++ b/.gitignore @@ -0,0 +1,5 @@ +.env +cs +cs.exe +ts +ts.exe \ No newline at end of file diff --git a/cmd/consumer/cs.go b/cmd/consumer/cs.go new file mode 100644 index 0000000..991195f --- /dev/null +++ b/cmd/consumer/cs.go @@ -0,0 +1,78 @@ +package main + +import ( + "context" + "log" + "os" + "os/signal" + "syscall" + + "smpp-transmitter/pkg/data" + fl "smpp-transmitter/pkg/logger" + "smpp-transmitter/pkg/mq" + + "gorm.io/driver/mysql" + "gorm.io/gorm" + "gorm.io/gorm/logger" +) + +func main() { + // Load environment variables + rabbitMQURL := os.Getenv("RABBITMQ_URL") + mysqlDSN := os.Getenv("MYSQL_DSN") + queueName := os.Getenv("QUEUE_NAME") + exchangeName := os.Getenv("EXCHABGE_NAME") + appEnv := os.Getenv("APP_ENV") + routeKey := os.Getenv("ROUTE_KEY") + + if rabbitMQURL == "" || mysqlDSN == "" || queueName == "" || exchangeName == "" || routeKey == "" { + log.Fatal("RABBITMQ_URL, MYSQL_DSN, and QUEUE_NAME environment variables must be set") + } + + Log, err := fl.SetupLogger(appEnv) + if err != nil { + log.Fatalf("Failed to set up logger: %v", err) + } + + // Initialize MySQL database connection + db, err := gorm.Open(mysql.Open(mysqlDSN), &gorm.Config{ + Logger: logger.Default.LogMode(logger.Info), + }) + + if err != nil { + Log.Error("Failed to connect to MySQL:", err) + log.Fatalf("Failed to connect to MySQL: %v", err) + } + + messageQueue := mq.NewRabbitMQ(rabbitMQURL, exchangeName, queueName, routeKey) + + // Start message processing + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + // Run RabbitMQ consumer + go messageQueue.Consume(ctx, func(ctx context.Context, body []byte) { + message := &data.Message{} + + Log.Info("Received a raw message", "body", string(body)) + + if err := message.Convert(body); err != nil { + Log.Error("Message cannot be converted", err) + return + } + + if err := message.Insert(ctx, db); err != nil { + Log.Error("Message cannot be Inserted", err) + return + } + Log.Info("Message handled successfully") + + }) + + // Handle signals for graceful shutdown + sigs := make(chan os.Signal, 1) + signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM) + <-sigs + log.Println("Shutting down...") + Log.Info("Shutting down...") +} diff --git a/cmd/transmitter/ts.go b/cmd/transmitter/ts.go new file mode 100644 index 0000000..06ab7d0 --- /dev/null +++ b/cmd/transmitter/ts.go @@ -0,0 +1 @@ +package main diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..790d220 --- /dev/null +++ b/go.mod @@ -0,0 +1,16 @@ +module smpp-transmitter + +go 1.23rc1 + +require ( + github.com/streadway/amqp v1.1.0 + gorm.io/driver/mysql v1.5.7 +) + +require ( + github.com/go-sql-driver/mysql v1.7.0 // indirect + github.com/jinzhu/inflection v1.0.0 // indirect + github.com/jinzhu/now v1.1.5 // indirect + golang.org/x/text v0.14.0 // indirect + gorm.io/gorm v1.25.11 // indirect +) diff --git a/go.sum b/go.sum new file mode 100644 index 0000000..213ecae --- /dev/null +++ b/go.sum @@ -0,0 +1,16 @@ +github.com/go-sql-driver/mysql v1.7.0 h1:ueSltNNllEqE3qcWBTD0iQd3IpL/6U+mJxLkazJ7YPc= +github.com/go-sql-driver/mysql v1.7.0/go.mod h1:OXbVy3sEdcQ2Doequ6Z5BW6fXNQTmx+9S1MCJN5yJMI= +github.com/jinzhu/inflection v1.0.0 h1:K317FqzuhWc8YvSVlFMCCUb36O/S9MCKRDI7QkRKD/E= +github.com/jinzhu/inflection v1.0.0/go.mod h1:h+uFLlag+Qp1Va5pdKtLDYj+kHp5pxUVkryuEj+Srlc= +github.com/jinzhu/now v1.1.5 h1:/o9tlHleP7gOFmsnYNz3RGnqzefHA47wQpKrrdTIwXQ= +github.com/jinzhu/now v1.1.5/go.mod h1:d3SSVoowX0Lcu0IBviAWJpolVfI5UJVZZ7cO71lE/z8= +github.com/streadway/amqp v1.1.0 h1:py12iX8XSyI7aN/3dUT8DFIDJazNJsVJdxNVEpnQTZM= +github.com/streadway/amqp v1.1.0/go.mod h1:WYSrTEYHOXHd0nwFeUXAe2G2hRnQT+deZJJf88uS9Bg= +golang.org/x/text v0.14.0 h1:ScX5w1eTa3QqT8oi6+ziP7dTV1S2+ALU0bI+0zXKWiQ= +golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU= +gorm.io/driver/mysql v1.5.7 h1:MndhOPYOfEp2rHKgkZIhJ16eVUIRf2HmzgoPmh7FCWo= +gorm.io/driver/mysql v1.5.7/go.mod h1:sEtPWMiqiN1N1cMXoXmBbd8C6/l+TESwriotuRRpkDM= +gorm.io/gorm v1.25.7 h1:VsD6acwRjz2zFxGO50gPO6AkNs7KKnvfzUjHQhZDz/A= +gorm.io/gorm v1.25.7/go.mod h1:hbnx/Oo0ChWMn1BIhpy1oYozzpM15i4YPuHDmfYtwg8= +gorm.io/gorm v1.25.11 h1:/Wfyg1B/je1hnDx3sMkX+gAlxrlZpn6X0BXRlwXlvHg= +gorm.io/gorm v1.25.11/go.mod h1:xh7N7RHfYlNc5EmcI/El95gXusucDrQnHXe0+CgWcLQ= diff --git a/pkg/data/db.go b/pkg/data/db.go new file mode 100644 index 0000000..686dfde --- /dev/null +++ b/pkg/data/db.go @@ -0,0 +1,31 @@ +package data + +import ( + "gorm.io/driver/mysql" + "gorm.io/gorm" + "gorm.io/gorm/logger" +) + +type DBService interface { +} + +type mysqlDb struct { + db *gorm.DB +} + +func NewMysqlDB(mysqlDSN string) (DBService, error) { + // Initialize MySQL database connection + db, err := gorm.Open(mysql.Open(mysqlDSN), &gorm.Config{ + Logger: logger.Default.LogMode(logger.Info), + }) + + if err != nil { + return nil, err + } + + mdb := &mysqlDb{ + db: db, + } + + return mdb, nil +} diff --git a/pkg/data/message.go b/pkg/data/message.go new file mode 100644 index 0000000..0d316f4 --- /dev/null +++ b/pkg/data/message.go @@ -0,0 +1,28 @@ +package data + +import ( + "context" + "encoding/json" + + "gorm.io/gorm" +) + +type Message struct { + Src string `json:"src"` + Dst string `json:"dst"` + Msg string `json:"msg"` +} + +func (m *Message) Convert(msgBody []byte) error { + if err := json.Unmarshal(msgBody, &m); err != nil { + return err + } + return nil +} + +func (m *Message) Insert(ctx context.Context, db *gorm.DB) error { + if err := db.WithContext(ctx).Create(&m).Error; err != nil { + return err + } + return nil +} diff --git a/pkg/logger/logger.go b/pkg/logger/logger.go new file mode 100644 index 0000000..9ad5d0d --- /dev/null +++ b/pkg/logger/logger.go @@ -0,0 +1,76 @@ +package logger + +import ( + "io" + "log/slog" + "os" +) + +type LogService interface { + Error(msg string, args ...any) + Info(msg string, args ...any) +} + +type loggers struct { + infoLogger *slog.Logger + errorLogger *slog.Logger +} + +func SetupLogger(env string) (LogService, error) { + var infoHandler slog.Handler + var errorHandler slog.Handler + + if env == "test" { + infoHandler = slog.NewTextHandler(io.Discard, &slog.HandlerOptions{Level: slog.LevelInfo}) + errorHandler = slog.NewTextHandler(io.Discard, &slog.HandlerOptions{Level: slog.LevelError}) + } else { + err := os.MkdirAll("logs", 0755) + if err != nil { + return nil, err + } + + infoFile, err := os.OpenFile("logs/Info.log", os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0666) + if err != nil { + if os.IsNotExist(err) { + infoFile, err = os.Create("logs/Info.log") + if err != nil { + return nil, err + } + } else { + return nil, err + } + } + + errorFile, err := os.OpenFile("logs/Error.log", os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0666) + if err != nil { + infoFile.Close() + if os.IsNotExist(err) { + errorFile, err = os.Create("logs/Error.log") + if err != nil { + return nil, err + } + } else { + return nil, err + } + } + + infoHandler = slog.NewTextHandler(infoFile, &slog.HandlerOptions{Level: slog.LevelInfo}) + errorHandler = slog.NewTextHandler(errorFile, &slog.HandlerOptions{Level: slog.LevelError}) + } + + infoLogger := slog.New(infoHandler) + errorLogger := slog.New(errorHandler) + + return &loggers{ + infoLogger: infoLogger, + errorLogger: errorLogger, + }, nil +} + +func (l *loggers) Error(msg string, args ...any) { + l.errorLogger.Error(msg, args) +} + +func (l *loggers) Info(msg string, args ...any) { + l.infoLogger.Info(msg, args) +} diff --git a/pkg/mq/rabbitmq.go b/pkg/mq/rabbitmq.go new file mode 100644 index 0000000..8a1290b --- /dev/null +++ b/pkg/mq/rabbitmq.go @@ -0,0 +1,112 @@ +package mq + +import ( + "context" + "log" + "time" + + "github.com/streadway/amqp" +) + +type MessageQueue interface { + Consume(ctx context.Context, processMessage func(ctx context.Context, body []byte)) +} + +type rabbitMQ struct { + rabbitMQURL string + exchangeName string + queueName string + routingKey string +} + +func NewRabbitMQ(rabbitMQURL, exchangeName, queueName, routingKey string) MessageQueue { + return &rabbitMQ{ + rabbitMQURL: rabbitMQURL, + exchangeName: exchangeName, + queueName: queueName, + routingKey: routingKey, + } +} + +func (r *rabbitMQ) Consume(ctx context.Context, messageHandler func(ctx context.Context, body []byte)) { + for { + conn, err := amqp.Dial(r.rabbitMQURL) + if err != nil { + log.Printf("Failed to connect to RabbitMQ: %v", err) + time.Sleep(5 * time.Second) // Wait before retrying + continue + } + defer conn.Close() + + ch, err := conn.Channel() + if err != nil { + log.Printf("Failed to open a channel: %v", err) + time.Sleep(5 * time.Second) // Wait before retrying + continue + } + defer ch.Close() + + // Declare the exchange + err = ch.ExchangeDeclare( + r.exchangeName, // name of the exchange + "direct", // type of exchange + true, // durable + false, // auto-delete + false, // internal + false, // no-wait + nil, // arguments + ) + if err != nil { + log.Printf("Failed to declare exchange: %v", err) + time.Sleep(5 * time.Second) // Wait before retrying + continue + } + + q, err := ch.QueueDeclare( + r.queueName, // name + false, // durable + false, // delete when unused + false, // exclusive + false, // no-wait + nil, // arguments + ) + if err != nil { + log.Printf("Failed to declare a queue: %v", err) + time.Sleep(5 * time.Second) // Wait before retrying + continue + } + err = ch.QueueBind( + r.queueName, + r.routingKey, + r.exchangeName, + false, + nil, + ) + + msgs, err := ch.Consume( + q.Name, // queue + "", // consumer + true, // auto-ack + false, // exclusive + false, // no-local + false, // no-wait + nil, // args + ) + if err != nil { + log.Printf("Failed to register a consumer: %v", err) + time.Sleep(5 * time.Second) // Wait before retrying + continue + } + + for { + select { + case msg := <-msgs: + messageHandler(ctx, msg.Body) + case <-ctx.Done(): + log.Println("Consumer stopped.") + return + } + } + } + +} diff --git a/pkg/smpp/smpp.go b/pkg/smpp/smpp.go new file mode 100644 index 0000000..518bd6e --- /dev/null +++ b/pkg/smpp/smpp.go @@ -0,0 +1 @@ +package smpp