package app import ( "bytes" "context" "encoding/json" "flag" "os" "os/signal" "syscall" cml04eventer "git.espin.casa/albert/cml04-eventer" "git.espin.casa/albert/cml04-plb018/internal/logging" "git.espin.casa/albert/cml04-plb018/internal/server" "git.espin.casa/albert/cml04-plb018/internal/service" "git.espin.casa/albert/cml04-plb018/internal/storage" "git.espin.casa/albert/cml04-plb018/internal/types" "git.espin.casa/albert/logger" ) func Run() error { // flags httpBind := flag.String("bind-addr", ":3000", "http bind address") nsqAddr := flag.String("nsq-addr", "10.1.152.13", "nsq address") nsqPort := flag.Int("nsq-port", 4150, "nsq port") logLevel := flag.String("level", "debug", "Log level") flag.Parse() // setup logger log := logger.New(os.Stdout, *logLevel) // log fields logFields := logger.LogFields{ "http_bind": *httpBind, "nsq_addr": *nsqAddr, "nsq_port": *nsqPort, "log_level": *logLevel, } // create subscriber sub, err := cml04eventer.NewSubscriber(&cml04eventer.SubscriberConfig{ NSQAddress: *nsqAddr, NSQPort: *nsqPort, Unmarshaler: cml04eventer.JSONMarshaler{}, Channel: "CML04-PLB018", }, log) if err != nil { log.Error("create subscriber failed", err, logFields) return err } defer sub.Close() // create publisher pub, err := cml04eventer.NewPublisher(&cml04eventer.PublisherImplConfig{ NSQAddress: *nsqAddr, NSQPort: *nsqPort, Marshaler: cml04eventer.JSONMarshaler{}, }, log) if err != nil { log.Error("create publisher failed", err, logFields) } defer pub.Close() // create context ctx, cancel := context.WithCancel(context.Background()) defer cancel() // subscribe events SapChan, err := sub.SubscribeEvent(ctx, "SAP-IN") if err != nil { log.Error("create subscribe event failed", err, logFields) return err } // create storage storage, err := storage.NewStorage(log) if err != nil { log.Error("create storage event failed", err, logFields) return err } // process incoming events go func() { for event := range SapChan { if err := ProcessEvent(ctx, event, storage, log); err != nil { log.Error("process event has been failed", err, logFields) continue } } }() // create service svc := service.NewService(storage, pub) // create logging service svc = logging.NewLogging(svc, log) // create server server := server.NewServer(*httpBind, svc) // start server server.Start() // info banner log.Info("started cml04-pbl018", logFields) // wait signal to finish signal := WaitSignal() log.Info("signal received", logFields.Add(logger.LogFields{ "signal": signal.String(), })) return nil } func ProcessEvent(ctx context.Context, event *cml04eventer.Event, storage storage.Storager, log logger.LoggerAdapter) error { subject, err := event.EventMeta.Get("subject") if err != nil { return err } switch subject.(string) { // production order telegram case "sap.in.telegramas.z_sms_10001": log.Info("received production order data from SAP", logger.LogFields{}) po := &types.ProductionOrder{} buf := bytes.NewBuffer(event.EventData) if err := json.NewDecoder(buf).Decode(po); err != nil { return err } return storage.SalvarOrdenProduccion(ctx, po) // customer order telegram case "sap.in.telegramas.z_sms_10002": log.Info("received customer order data from SAP", logger.LogFields{}) co := &types.CustomerOrder{} buf := bytes.NewBuffer(event.EventData) if err := json.NewDecoder(buf).Decode(co); err != nil { return err } return storage.SalvarOrdenCliente(ctx, co) case "sap.in.calidades": log.Info("received normas/calidades data from SAP", logger.LogFields{}) list := &types.NormaList{} buf := bytes.NewBuffer(event.EventData) if err := json.NewDecoder(buf).Decode(list); err != nil { return err } return storage.SalvarNormas(ctx, list) } return nil } // WaitSignal catching exit signal func WaitSignal() os.Signal { ch := make(chan os.Signal, 2) signal.Notify( ch, syscall.SIGINT, syscall.SIGQUIT, syscall.SIGTERM, ) for { sig := <-ch switch sig { case syscall.SIGINT, syscall.SIGQUIT, syscall.SIGTERM: return sig } } }