From fd813f8b742361b616172b9d739bf655217b786f Mon Sep 17 00:00:00 2001 From: aespin Date: Wed, 23 Oct 2024 14:26:40 +0200 Subject: [PATCH] wip --- bundle/app/app.go | 107 +++----------------- bundle/service/service.go | 194 ++++++++++++++++++++++++++++++++++++ internal/storage/storage.go | 1 - labeler/service/service.go | 2 +- 4 files changed, 207 insertions(+), 97 deletions(-) create mode 100644 bundle/service/service.go diff --git a/bundle/app/app.go b/bundle/app/app.go index 969e09d..5bbcd38 100644 --- a/bundle/app/app.go +++ b/bundle/app/app.go @@ -1,20 +1,13 @@ package app import ( - "bytes" "context" - "encoding/json" - "fmt" "os" "os/signal" "syscall" - cml04eventer "git.espin.casa/albert/cml04-eventer" - "git.espin.casa/albert/cml04-falcon-system/internal/publisher" - "git.espin.casa/albert/cml04-falcon-system/internal/storage" - "git.espin.casa/albert/cml04-falcon-system/internal/types" + "git.espin.casa/albert/cml04-falcon-system/bundle/service" "git.espin.casa/albert/logger" - "github.com/nats-io/nats.go" "github.com/spf13/cobra" ) @@ -41,50 +34,30 @@ func Run(cmd *cobra.Command, args []string) { "nats_port": natsPort, "log_level": logLevel, } - // create storager - storage, err := storage.New(&storage.DBConfig{ + // create service + service, err := service.New(&service.DBConfig{ Username: userName, Password: userPass, Host: dbHost, Port: dbPort, - Name: dbName, + Name: userName, + }, &service.NATSConfig{ + Host: natsHost, + Port: natsPort, }) // handler error if err != nil { - log.Error("create storage failed", err, logFields) + log.Error("create service failed", err, logFields) return } - // NATS connect - nc, err := nats.Connect(fmt.Sprintf("nats://%s:%d", natsHost, natsPort)) - if err != nil { - log.Error("create nats connection failed", err, logFields) - return - } - defer nc.Close() - // create publisher - pub := publisher.New(nc) // create context ctx, cancel := context.WithCancel(context.Background()) + // cancel main context on exit defer cancel() - // NATS subscribe subject - nc.Subscribe("barcode.new", func(msg *nats.Msg) { - go func(log logger.LoggerAdapter) { - if err := ProcessNewBarcodeMessage(ctx, msg, storage, pub, log); err != nil { - log.Error("process new barcode message failed", err, logger.LogFields{}) - return - } - }(log) - }) - nc.Subscribe("bundle.new", func(msg *nats.Msg) { - go func(log logger.LoggerAdapter) { - if err := ProcessNewBundleMessage(ctx, msg, storage, log); err != nil { - log.Error("process new bundle message failed", err, logger.LogFields{}) - return - } - }(log) - }) + // on exit close service + defer service.Close(ctx) // info banner - log.Info("started falcon barcoder service", logFields) + log.Info("started falcon bundle service", logFields) // wait signal to finish signal := WaitSignal() log.Info("signal received", logFields.Add(logger.LogFields{ @@ -92,62 +65,6 @@ func Run(cmd *cobra.Command, args []string) { })) } -func ProcessNewBarcodeMessage(ctx context.Context, msg *nats.Msg, storage storage.Storager, pub publisher.Publisher, log logger.LoggerAdapter) error { - // event holder - event := &cml04eventer.Event{} - // create new buffer based on message data - buff := bytes.NewBuffer(msg.Data) - // decode message - if err := json.NewDecoder(buff).Decode(event); err != nil { - return err - } - // info banner - log.Info("received event bus message", logger.LogFields{ - "event_id": event.EventID, - "event_topic": event.EventTopic, - }) - // barcode - barcode := string(event.EventData) - // store barcode - if err := storage.ConfirmBundle(ctx, barcode); err != nil { - return err - } - // send bundle confirmation - if err := pub.ComfirmedBundle(ctx, barcode); err != nil { - return err - } - return nil -} - -func ProcessNewBundleMessage(ctx context.Context, msg *nats.Msg, storage storage.Storager, log logger.LoggerAdapter) error { - // event holder - event := &cml04eventer.Event{} - // create new buffer based on message data - buff := bytes.NewBuffer(msg.Data) - // decode message - if err := json.NewDecoder(buff).Decode(event); err != nil { - return err - } - // info banner - log.Info("received event bus message", logger.LogFields{ - "event_id": event.EventID, - "event_topic": event.EventTopic, - }) - // bundle holder - bundle := &types.BundleData{} - // create new buffer based on message event data - buff = bytes.NewBuffer(event.EventData) - // decode message - if err := json.NewDecoder(buff).Decode(bundle); err != nil { - return err - } - // store bundle - if err := storage.StoreBundle(ctx, bundle); err != nil { - return err - } - return nil -} - // WaitSignal catching exit signal func WaitSignal() os.Signal { ch := make(chan os.Signal, 2) diff --git a/bundle/service/service.go b/bundle/service/service.go new file mode 100644 index 0000000..21254b2 --- /dev/null +++ b/bundle/service/service.go @@ -0,0 +1,194 @@ +package service + +import ( + "context" + "fmt" + "strconv" + + "time" + + "git.espin.casa/albert/cml04-falcon-system/internal/types" + "github.com/nats-io/nats.go" + "github.com/nats-io/nats.go/micro" + "gorm.io/driver/postgres" + "gorm.io/gorm" +) + +type Service interface { + Subscribe() error + Close(ctx context.Context) error +} + +const TimeOut time.Duration = time.Second * 2 + +type DBConfig struct { + Username string + Password string + Host string + Port int + Name string +} + +type NATSConfig struct { + Host string + Port int +} + +func Nats(conf *NATSConfig) (*nats.Conn, error) { + // NATS connect + nc, err := nats.Connect(fmt.Sprintf("nats://%s:%d", conf.Host, conf.Port)) + if err != nil { + return nil, err + } + return nc, nil +} + +type service struct { + db *gorm.DB + nc *nats.Conn + srv micro.Service +} + +func (s *service) svcNewLabelHandler(req micro.Request) { + // label holder + var label types.Label + // get label identificator + labelID := string(req.Data()) + if err := s.db.First(&label, labelID).Error; err != nil { + return + } + // convert logo code + dibujo, err := strconv.Atoi(label.LogoCode) + if err != nil { + return + } + + // bundle data + bundle := &types.BundleData{ + Grupo6: "", + Po: int(label.ProductionOrderNo), + Co: int(label.CustomerOrderNo), + Colada: label.HeatId, + Calidad: label.SteelGrade, + Matnr: label.MaterialCode, + Dibujo: dibujo, + Operador: "AUT", // <-- + Serie: 0, + Nromatricula: "", // <-- + NroBulto: strconv.Itoa(int(label.L2PackageId)), + EtiquetaDoble: "S", + Fecha: 0, + Turno: "", // <-- + Observacion1: "obser.1", + Observacion2: "obser.2", + Observacion3: "obser.3", + PaqueteLongitud: 0, + PaqueteAncho: 0, + PaqueteAlto: 0, + PaquetePeso: 0, + PaqueteNroSecciones: 0, + PaqueteNroMantos: 0, + PaqueteNroSeccManto: 0, + SeccionTipo: label.SectionType, + SeccionLongitud: 0, + SeccionAncho: 0, + SeccionAlto: 0, + Idioma: "ES", + Destino: 0, + Hora: 0, + Horario: 0, + Inst: "EST", + Tren: 1, + Normed: "", + Norpro: "", + Nortol: "", + Spras: "", + Statu: 0, + Crlf: "", + Maquina: 0, + Padre: "", + Paqpadre: "", + RelevantTime: "", + Desvio: 0, + Pesoteorico: 0, + PesoteoricoReal: 0, + DesvioTeoricoReal: 0, + FechaImpresion: "", + PesoNivel1: 0, + L3Sended: false, + Confirmed: false, + SAP: false, + } +} + +// Subscribe implements Service. +func (s *service) Subscribe() error { + srv, err := micro.AddService(s.nc, micro.Config{ + Name: "BundleService", + Version: "1.0.0", + // base handler + Endpoint: µ.EndpointConfig{ + Subject: "svc.new.label", + Handler: micro.HandlerFunc(s.svcNewLabelHandler), + }, + }) + // handle error + if err != nil { + return err + } + // set micro + s.srv = srv + // done + return nil +} + +// Close implements Service. +func (s *service) Close(ctx context.Context) error { + if err := s.nc.Flush(); err != nil { + return err + } + s.nc.Close() + return nil +} + +func DataBase(conf *DBConfig) (*gorm.DB, error) { + // create dsn string + dsn := fmt.Sprintf("host=%s user=%s password=%s dbname=%s port=%d sslmode=disable TimeZone=Europe/Madrid", + conf.Host, + conf.Username, + conf.Password, + conf.Name, + conf.Port, + ) + // create open database connection + db, err := gorm.Open(postgres.Open(dsn), &gorm.Config{}) + if err != nil { + return nil, err + } + return db, nil +} + +func AutoMigrate(db *gorm.DB) error { + return db.AutoMigrate(&types.BundleData{}) +} + +func New(dbConf *DBConfig, natsConf *NATSConfig) (Service, error) { + // create data base + db, err := DataBase(dbConf) + if err != nil { + return nil, err + } + // auto migrate + if err := AutoMigrate(db); err != nil { + return nil, err + } + // create nc connection + nc, err := Nats(natsConf) + if err != nil { + return nil, err + } + return &service{ + db: db, + nc: nc, + }, nil +} diff --git a/internal/storage/storage.go b/internal/storage/storage.go index c696c70..49e00f0 100644 --- a/internal/storage/storage.go +++ b/internal/storage/storage.go @@ -98,7 +98,6 @@ func ProductionDataBase(conf *DBConfig) (*gorm.DB, error) { func Automigrate(db *gorm.DB) error { return db.AutoMigrate( &types.Barcode{}, - &types.BundleData{}, &types.ProductionOrder{}, &types.CustomerOrder{}, &types.BeamCutPattern{}, diff --git a/labeler/service/service.go b/labeler/service/service.go index c08b6cd..142170a 100644 --- a/labeler/service/service.go +++ b/labeler/service/service.go @@ -74,7 +74,7 @@ func (s *service) Close(ctx context.Context) error { // PublishNewLabelData implements Service. func (s *service) PublishNewLabelData(ctx context.Context, id string) error { - _, err := s.nc.Request("new.label.data", []byte(id), TimeOut) + _, err := s.nc.Request("svc.new.label", []byte(id), TimeOut) if err != nil { return err }