154 lines
4.1 KiB
Go
154 lines
4.1 KiB
Go
![]() |
package app
|
||
|
|
||
|
import (
|
||
|
"bytes"
|
||
|
"context"
|
||
|
"encoding/json"
|
||
|
"flag"
|
||
|
"os"
|
||
|
"os/signal"
|
||
|
"syscall"
|
||
|
|
||
|
cml04eventer "git.espin.casa/albert/cml04-eventer"
|
||
|
"git.espin.casa/albert/cml04-plb018/internal/logging"
|
||
|
"git.espin.casa/albert/cml04-plb018/internal/server"
|
||
|
"git.espin.casa/albert/cml04-plb018/internal/service"
|
||
|
"git.espin.casa/albert/cml04-plb018/internal/storage"
|
||
|
"git.espin.casa/albert/cml04-plb018/internal/types"
|
||
|
"git.espin.casa/albert/logger"
|
||
|
)
|
||
|
|
||
|
func Run() error {
|
||
|
// flags
|
||
|
httpBind := flag.String("bind-addr", ":3000", "http bind address")
|
||
|
nsqAddr := flag.String("nsq-addr", "10.1.152.13", "nsq address")
|
||
|
nsqPort := flag.Int("nsq-port", 4150, "nsq port")
|
||
|
logLevel := flag.String("level", "debug", "Log level")
|
||
|
flag.Parse()
|
||
|
// setup logger
|
||
|
log := logger.New(os.Stdout, *logLevel)
|
||
|
// log fields
|
||
|
logFields := logger.LogFields{
|
||
|
"http_bind": *httpBind,
|
||
|
"nsq_addr": *nsqAddr,
|
||
|
"nsq_port": *nsqPort,
|
||
|
"log_level": *logLevel,
|
||
|
}
|
||
|
// create subscriber
|
||
|
sub, err := cml04eventer.NewSubscriber(&cml04eventer.SubscriberConfig{
|
||
|
NSQAddress: *nsqAddr,
|
||
|
NSQPort: *nsqPort,
|
||
|
Unmarshaler: cml04eventer.JSONMarshaler{},
|
||
|
Channel: "CML04-PLB018",
|
||
|
}, log)
|
||
|
if err != nil {
|
||
|
log.Error("create subscriber failed", err, logFields)
|
||
|
return err
|
||
|
}
|
||
|
defer sub.Close()
|
||
|
// create publisher
|
||
|
pub, err := cml04eventer.NewPublisher(&cml04eventer.PublisherImplConfig{
|
||
|
NSQAddress: *nsqAddr,
|
||
|
NSQPort: *nsqPort,
|
||
|
Marshaler: cml04eventer.JSONMarshaler{},
|
||
|
}, log)
|
||
|
if err != nil {
|
||
|
log.Error("create publisher failed", err, logFields)
|
||
|
}
|
||
|
defer pub.Close()
|
||
|
// create context
|
||
|
ctx, cancel := context.WithCancel(context.Background())
|
||
|
defer cancel()
|
||
|
// subscribe events
|
||
|
SapChan, err := sub.SubscribeEvent(ctx, "SAP-IN")
|
||
|
if err != nil {
|
||
|
log.Error("create subscribe event failed", err, logFields)
|
||
|
return err
|
||
|
}
|
||
|
// create storage
|
||
|
storage, err := storage.NewStorage(log)
|
||
|
if err != nil {
|
||
|
log.Error("create storage event failed", err, logFields)
|
||
|
return err
|
||
|
}
|
||
|
// process incoming events
|
||
|
go func() {
|
||
|
for event := range SapChan {
|
||
|
if err := ProcessEvent(ctx, event, storage, log); err != nil {
|
||
|
log.Error("process event has been failed", err, logFields)
|
||
|
continue
|
||
|
}
|
||
|
}
|
||
|
}()
|
||
|
// create service
|
||
|
svc := service.NewService(storage, pub)
|
||
|
// create logging service
|
||
|
svc = logging.NewLogging(svc, log)
|
||
|
// create server
|
||
|
server := server.NewServer(*httpBind, svc)
|
||
|
// start server
|
||
|
server.Start()
|
||
|
// info banner
|
||
|
log.Info("started cml04-pbl018", logFields)
|
||
|
// wait signal to finish
|
||
|
signal := WaitSignal()
|
||
|
log.Info("signal received", logFields.Add(logger.LogFields{
|
||
|
"signal": signal.String(),
|
||
|
}))
|
||
|
return nil
|
||
|
}
|
||
|
|
||
|
func ProcessEvent(ctx context.Context, event *cml04eventer.Event, storage storage.Storager, log logger.LoggerAdapter) error {
|
||
|
subject, err := event.EventMeta.Get("subject")
|
||
|
if err != nil {
|
||
|
return err
|
||
|
}
|
||
|
switch subject.(string) {
|
||
|
// production order telegram
|
||
|
case "sap.in.telegramas.z_sms_10001":
|
||
|
log.Info("received production order data from SAP", logger.LogFields{})
|
||
|
po := &types.ProductionOrder{}
|
||
|
buf := bytes.NewBuffer(event.EventData)
|
||
|
if err := json.NewDecoder(buf).Decode(po); err != nil {
|
||
|
return err
|
||
|
}
|
||
|
return storage.SalvarOrdenProduccion(ctx, po)
|
||
|
// customer order telegram
|
||
|
case "sap.in.telegramas.z_sms_10002":
|
||
|
log.Info("received customer order data from SAP", logger.LogFields{})
|
||
|
co := &types.CustomerOrder{}
|
||
|
buf := bytes.NewBuffer(event.EventData)
|
||
|
if err := json.NewDecoder(buf).Decode(co); err != nil {
|
||
|
return err
|
||
|
}
|
||
|
return storage.SalvarOrdenCliente(ctx, co)
|
||
|
case "sap.in.calidades":
|
||
|
log.Info("received normas/calidades data from SAP", logger.LogFields{})
|
||
|
list := &types.NormaList{}
|
||
|
buf := bytes.NewBuffer(event.EventData)
|
||
|
if err := json.NewDecoder(buf).Decode(list); err != nil {
|
||
|
return err
|
||
|
}
|
||
|
return storage.SalvarNormas(ctx, list)
|
||
|
}
|
||
|
return nil
|
||
|
}
|
||
|
|
||
|
// WaitSignal catching exit signal
|
||
|
func WaitSignal() os.Signal {
|
||
|
ch := make(chan os.Signal, 2)
|
||
|
signal.Notify(
|
||
|
ch,
|
||
|
syscall.SIGINT,
|
||
|
syscall.SIGQUIT,
|
||
|
syscall.SIGTERM,
|
||
|
)
|
||
|
for {
|
||
|
sig := <-ch
|
||
|
switch sig {
|
||
|
case syscall.SIGINT, syscall.SIGQUIT, syscall.SIGTERM:
|
||
|
return sig
|
||
|
}
|
||
|
}
|
||
|
}
|