package app import ( "bytes" "context" "encoding/json" "fmt" "os" "os/signal" "strings" "syscall" cml04eventer "git.espin.casa/albert/cml04-eventer" "git.espin.casa/albert/cml04-falcon-system/internal/storage" "git.espin.casa/albert/cml04-falcon-system/internal/types" "git.espin.casa/albert/logger" "github.com/nats-io/nats.go" "github.com/spf13/cobra" ) func Run(cmd *cobra.Command, args []string) { // read flags logLevel, _ := cmd.Flags().GetString("log-level") userName, _ := cmd.Flags().GetString("db-username") userPass, _ := cmd.Flags().GetString("db-password") dbHost, _ := cmd.Flags().GetString("db-host") dbPort, _ := cmd.Flags().GetInt("db-port") dbName, _ := cmd.Flags().GetString("db-name") natsHost, _ := cmd.Flags().GetString("nats-host") natsPort, _ := cmd.Flags().GetInt("nats-port") // setup logger log := logger.New(os.Stdout, logLevel) // log fields logFields := logger.LogFields{ "db_username": userName, "db_password": "*****", "db_host": dbHost, "db_port": dbPort, "db_name": dbName, "nats_host": natsHost, "nats_port": natsPort, "log_level": logLevel, } // create storager storage, err := storage.New(&storage.DBConfig{ Username: userName, Password: userPass, Host: dbHost, Port: dbPort, Name: dbName, }) // handler error if err != nil { log.Error("create storage failed", err, logFields) return } // NATS connect nc, err := nats.Connect(fmt.Sprintf("nats://%s:%d", natsHost, natsPort)) if err != nil { log.Error("create nats connection failed", err, logFields) return } defer nc.Close() // create context ctx, cancel := context.WithCancel(context.Background()) defer cancel() // NATS subscribe subject nc.Subscribe("barcode.new", func(msg *nats.Msg) { go func(log logger.LoggerAdapter) { if err := ProcessMessage(ctx, msg, storage, log); err != nil { log.Error("process message failed", err, logger.LogFields{}) return } }(log) }) // info banner log.Info("started falcon barcoder service", logFields) // wait signal to finish signal := WaitSignal() log.Info("signal received", logFields.Add(logger.LogFields{ "signal": signal, })) } func LoadingBed(barcode string) types.LoadingBed { if strings.HasPrefix(barcode, "940") { return types.ATA12 } return types.ATA345 } func ProcessMessage(ctx context.Context, msg *nats.Msg, storage storage.Storager, log logger.LoggerAdapter) error { // event holder event := &cml04eventer.Event{} // create new buffer based on message data buff := bytes.NewBuffer(msg.Data) // decode message if err := json.NewDecoder(buff).Decode(event); err != nil { return err } // info banner log.Info("received event bus message", logger.LogFields{ "event_id": event.EventID, "event_topic": event.EventTopic, }) // barcode bar := string(event.EventData) // create and fill barcode type barcode := &types.Barcode{ Barcode: bar, LoadingBed: LoadingBed(bar), } // store barcode return storage.StoreBarcode(ctx, barcode) } // WaitSignal catching exit signal func WaitSignal() os.Signal { ch := make(chan os.Signal, 2) signal.Notify( ch, syscall.SIGINT, syscall.SIGQUIT, syscall.SIGTERM, ) for { sig := <-ch switch sig { case syscall.SIGINT, syscall.SIGQUIT, syscall.SIGTERM: return sig } } }