package app import ( "bytes" "context" "encoding/json" "fmt" "os" "os/signal" "syscall" cml04eventer "git.espin.casa/albert/cml04-eventer" "git.espin.casa/albert/cml04-falcon-system/internal/storage" "git.espin.casa/albert/cml04-falcon-system/internal/types" "git.espin.casa/albert/logger" "github.com/nats-io/nats.go" "github.com/spf13/cobra" ) func Run(cmd *cobra.Command, args []string) { // read flags logLevel, _ := cmd.Flags().GetString("log-level") dbUser, _ := cmd.Flags().GetString("db-username") dbPass, _ := cmd.Flags().GetString("db-password") dbHost, _ := cmd.Flags().GetString("db-host") dbPort, _ := cmd.Flags().GetInt("db-port") dbName, _ := cmd.Flags().GetString("db-name") natsHost, _ := cmd.Flags().GetString("nats-host") natsPort, _ := cmd.Flags().GetInt("nats-port") nsqHost, _ := cmd.Flags().GetString("nsq-host") nsqPort, _ := cmd.Flags().GetInt("nsq-port") // setup logger log := logger.New(os.Stdout, logLevel) // log fields logFields := logger.LogFields{ "nsq_host": nsqHost, "nsq_port": nsqPort, "nats_host": natsHost, "nats_port": natsPort, "log_level": logLevel, } // NATS connect nc, err := nats.Connect(fmt.Sprintf("nats://%s:%d", natsHost, natsPort)) if err != nil { log.Error("create nats connection failed", err, logFields) return } // on close disconnects defer nc.Close() // create storager storage, err := storage.New(&storage.DBConfig{ Username: dbUser, Password: dbPass, Host: dbHost, Port: dbPort, Name: dbName, }) // handle error if err != nil { log.Error("create storage failed", err, logFields) return } // create nsq subscriber subscriber, err := cml04eventer.NewSubscriber(&cml04eventer.SubscriberConfig{ NSQAddress: nsqHost, NSQPort: nsqPort, Unmarshaler: cml04eventer.JSONMarshaler{}, Channel: "CML04-FALCON", }, log) // handle error if err != nil { log.Error("create nsq subscriber failed", err, logFields) return } // on exit close subscriber defer subscriber.Close() // create context with cancel ctx, cancel := context.WithCancel(context.Background()) defer cancel() eventChan, err := subscriber.SubscribeEvent(ctx, "SAP-IN") // handle error if err != nil { log.Error("nsq subscription failed", err, logFields) return } go func() { for event := range eventChan { // create buffer buffer := bytes.NewBuffer(event.EventData) // get event message subject subject, err := event.EventMeta.Get("subject") if err != nil { log.Error("get event metada data failed", err, logFields) } switch subject { case "sap.in.telegramas.z_sms_10001": po := &types.ProductionOrder{} if err := json.Unmarshal(buffer.Bytes(), po); err != nil { log.Error("decode event message data failed", err, logFields) continue } // log info banner log.Info("received new production data from SAP event", logFields.Add( logger.LogFields{ "event_id": event.EventID, "production_order": po.POrderNo, })) if err := storage.StoreProductionOrder(ctx, po); err != nil { log.Error("store production order data failed", err, logFields) continue } case "sap.in.telegramas.z_sms_10002": co := &types.CustomerOrder{} if err := json.Unmarshal(buffer.Bytes(), co); err != nil { log.Error("decode event message data failed", err, logFields) continue } // log info banner log.Info("received new production data from SAP event", logFields.Add( logger.LogFields{ "event_id": event.EventID, "customer_order": co.COrderNo, "production_order": co.POrderNo, })) if err := storage.StoreCustomerOrder(ctx, co); err != nil { log.Error("store customer order data failed", err, logFields) continue } case "sap.in.telegramas.z_sms_10003": co := &types.CustomerOrder{} if err := json.Unmarshal(buffer.Bytes(), co); err != nil { log.Error("decode event message data failed", err, logFields) continue } // log info banner log.Info("received new production data from SAP event", logFields.Add( logger.LogFields{ "event_id": event.EventID, "customer_order": co.COrderNo, "production_order": co.POrderNo, })) if err := storage.StoreCustomerOrder(ctx, co); err != nil { log.Error("store customer order data failed", err, logFields) continue } default: continue } } }() // info banner log.Info("started cml04-falcon-sap service", logFields) // wait signal to finish signal := WaitSignal() log.Info("signal received", logFields.Add(logger.LogFields{ "signal": signal, })) } // WaitSignal catching exit signal func WaitSignal() os.Signal { ch := make(chan os.Signal, 2) signal.Notify( ch, syscall.SIGINT, syscall.SIGQUIT, syscall.SIGTERM, ) for { sig := <-ch switch sig { case syscall.SIGINT, syscall.SIGQUIT, syscall.SIGTERM: return sig } } }