cml04-falcon-system/bundle/app/app.go
2024-10-02 15:36:58 +02:00

143 lines
3.7 KiB
Go

package app
import (
"bytes"
"context"
"encoding/json"
"fmt"
"os"
"os/signal"
"syscall"
cml04eventer "git.espin.casa/albert/cml04-eventer"
"git.espin.casa/albert/cml04-falcon-system/internal/publisher"
"git.espin.casa/albert/cml04-falcon-system/internal/storage"
"git.espin.casa/albert/logger"
"github.com/nats-io/nats.go"
"github.com/spf13/cobra"
)
func Run(cmd *cobra.Command, args []string) {
// read flags
logLevel, _ := cmd.Flags().GetString("log-level")
userName, _ := cmd.Flags().GetString("db-username")
userPass, _ := cmd.Flags().GetString("db-password")
dbHost, _ := cmd.Flags().GetString("db-host")
dbPort, _ := cmd.Flags().GetInt("db-port")
dbName, _ := cmd.Flags().GetString("db-name")
natsHost, _ := cmd.Flags().GetString("nats-host")
natsPort, _ := cmd.Flags().GetInt("nats-port")
// setup logger
log := logger.New(os.Stdout, logLevel)
// log fields
logFields := logger.LogFields{
"db_username": userName,
"db_password": "*****",
"db_host": dbHost,
"db_port": dbPort,
"db_name": dbName,
"nats_host": natsHost,
"nats_port": natsPort,
"log_level": logLevel,
}
// create storager
storage, err := storage.New(&storage.DBConfig{
Username: userName,
Password: userPass,
Host: dbHost,
Port: dbPort,
Name: dbName,
})
// handler error
if err != nil {
log.Error("create storage failed", err, logFields)
return
}
// NATS connect
nc, err := nats.Connect(fmt.Sprintf("nats://%s:%d", natsHost, natsPort))
if err != nil {
log.Error("create nats connection failed", err, logFields)
return
}
defer nc.Close()
// create publisher
pub := publisher.New(nc)
// create context
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// NATS subscribe subject
nc.Subscribe("barcode.new", func(msg *nats.Msg) {
go func(log logger.LoggerAdapter) {
if err := ProcessNewBarcodeMessage(ctx, msg, storage, pub, log); err != nil {
log.Error("process new barcode message failed", err, logger.LogFields{})
return
}
}(log)
})
nc.Subscribe("bundle.new", func(msg *nats.Msg) {
go func(log logger.LoggerAdapter) {
if err := ProcessNewBundleMessage(ctx, msg, storage, log); err != nil {
log.Error("process new bundle message failed", err, logger.LogFields{})
return
}
}(log)
})
// info banner
log.Info("started falcon barcoder service", logFields)
// wait signal to finish
signal := WaitSignal()
log.Info("signal received", logFields.Add(logger.LogFields{
"signal": signal,
}))
}
func ProcessNewBarcodeMessage(ctx context.Context, msg *nats.Msg, storage storage.Storager, pub publisher.Publisher, log logger.LoggerAdapter) error {
// event holder
event := &cml04eventer.Event{}
// create new buffer based on message data
buff := bytes.NewBuffer(msg.Data)
// decode message
if err := json.NewDecoder(buff).Decode(event); err != nil {
return err
}
// info banner
log.Info("received event bus message", logger.LogFields{
"event_id": event.EventID,
"event_topic": event.EventTopic,
})
// barcode
barcode := string(event.EventData)
// store barcode
if err := storage.ConfirmBundle(ctx, barcode); err != nil {
return err
}
// send bundle confirmation
if err := pub.ComfirmedBundle(ctx, barcode); err != nil {
return err
}
return nil
}
func ProcessNewBundleMessage(ctx context.Context, msg *nats.Msg, storage storage.Storager, log logger.LoggerAdapter) error {
return nil
}
// WaitSignal catching exit signal
func WaitSignal() os.Signal {
ch := make(chan os.Signal, 2)
signal.Notify(
ch,
syscall.SIGINT,
syscall.SIGQUIT,
syscall.SIGTERM,
)
for {
sig := <-ch
switch sig {
case syscall.SIGINT, syscall.SIGQUIT, syscall.SIGTERM:
return sig
}
}
}