113 lines
2.8 KiB
Go
113 lines
2.8 KiB
Go
![]() |
package app
|
||
|
|
||
|
import (
|
||
|
"context"
|
||
|
"fmt"
|
||
|
"os"
|
||
|
"os/signal"
|
||
|
"syscall"
|
||
|
|
||
|
"git.espin.casa/albert/cml04-falcon-system/internal/storage"
|
||
|
"git.espin.casa/albert/logger"
|
||
|
"github.com/nats-io/nats.go"
|
||
|
"github.com/spf13/cobra"
|
||
|
)
|
||
|
|
||
|
func Run(cmd *cobra.Command, args []string) {
|
||
|
// read flags
|
||
|
logLevel, _ := cmd.Flags().GetString("log-level")
|
||
|
userName, _ := cmd.Flags().GetString("db-username")
|
||
|
userPass, _ := cmd.Flags().GetString("db-password")
|
||
|
dbHost, _ := cmd.Flags().GetString("db-host")
|
||
|
dbPort, _ := cmd.Flags().GetInt("db-port")
|
||
|
dbName, _ := cmd.Flags().GetString("db-name")
|
||
|
natsHost, _ := cmd.Flags().GetString("nats-host")
|
||
|
natsPort, _ := cmd.Flags().GetInt("nats-port")
|
||
|
// setup logger
|
||
|
log := logger.New(os.Stdout, logLevel)
|
||
|
// log fields
|
||
|
logFields := logger.LogFields{
|
||
|
"db_username": userName,
|
||
|
"db_password": "*****",
|
||
|
"db_host": dbHost,
|
||
|
"db_port": dbPort,
|
||
|
"db_name": dbName,
|
||
|
"nats_host": natsHost,
|
||
|
"nats_port": natsPort,
|
||
|
"log_level": logLevel,
|
||
|
}
|
||
|
// create storager
|
||
|
storage, err := storage.New(&storage.DBConfig{
|
||
|
Username: userName,
|
||
|
Password: userPass,
|
||
|
Host: dbHost,
|
||
|
Port: dbPort,
|
||
|
Name: dbName,
|
||
|
})
|
||
|
// handler error
|
||
|
if err != nil {
|
||
|
log.Error("create storage failed", err, logFields)
|
||
|
return
|
||
|
}
|
||
|
// NATS connect
|
||
|
nc, err := nats.Connect(fmt.Sprintf("nats://%s:%d", natsHost, natsPort))
|
||
|
if err != nil {
|
||
|
log.Error("create nats connection failed", err, logFields)
|
||
|
return
|
||
|
}
|
||
|
defer nc.Close()
|
||
|
// create context
|
||
|
ctx, cancel := context.WithCancel(context.Background())
|
||
|
defer cancel()
|
||
|
// NATS subscribe subject
|
||
|
nc.Subscribe("barcode.new", func(msg *nats.Msg) {
|
||
|
go func(log logger.LoggerAdapter) {
|
||
|
if err := ProcessBarcodeMessage(ctx, msg, storage, log); err != nil {
|
||
|
log.Error("process new barcode message failed", err, logger.LogFields{})
|
||
|
return
|
||
|
}
|
||
|
}(log)
|
||
|
})
|
||
|
nc.Subscribe("bundle.new", func(msg *nats.Msg) {
|
||
|
go func(log logger.LoggerAdapter) {
|
||
|
if err := ProcessBundleMessage(ctx, msg, storage, log); err != nil {
|
||
|
log.Error("process new bundle message failed", err, logger.LogFields{})
|
||
|
return
|
||
|
}
|
||
|
}(log)
|
||
|
})
|
||
|
// info banner
|
||
|
log.Info("started falcon barcoder service", logFields)
|
||
|
// wait signal to finish
|
||
|
signal := WaitSignal()
|
||
|
log.Info("signal received", logFields.Add(logger.LogFields{
|
||
|
"signal": signal,
|
||
|
}))
|
||
|
}
|
||
|
|
||
|
func ProcessBarcodeMessage(ctx context.Context, msg *nats.Msg, storage storage.Storager, log logger.LoggerAdapter) error {
|
||
|
return nil
|
||
|
}
|
||
|
|
||
|
func ProcessBundleMessage(ctx context.Context, msg *nats.Msg, storage storage.Storager, log logger.LoggerAdapter) error {
|
||
|
return nil
|
||
|
}
|
||
|
|
||
|
// WaitSignal catching exit signal
|
||
|
func WaitSignal() os.Signal {
|
||
|
ch := make(chan os.Signal, 2)
|
||
|
signal.Notify(
|
||
|
ch,
|
||
|
syscall.SIGINT,
|
||
|
syscall.SIGQUIT,
|
||
|
syscall.SIGTERM,
|
||
|
)
|
||
|
for {
|
||
|
sig := <-ch
|
||
|
switch sig {
|
||
|
case syscall.SIGINT, syscall.SIGQUIT, syscall.SIGTERM:
|
||
|
return sig
|
||
|
}
|
||
|
}
|
||
|
}
|