This commit is contained in:
aespin 2024-10-23 14:26:40 +02:00
parent c894099a26
commit fd813f8b74
4 changed files with 207 additions and 97 deletions

View File

@ -1,20 +1,13 @@
package app package app
import ( import (
"bytes"
"context" "context"
"encoding/json"
"fmt"
"os" "os"
"os/signal" "os/signal"
"syscall" "syscall"
cml04eventer "git.espin.casa/albert/cml04-eventer" "git.espin.casa/albert/cml04-falcon-system/bundle/service"
"git.espin.casa/albert/cml04-falcon-system/internal/publisher"
"git.espin.casa/albert/cml04-falcon-system/internal/storage"
"git.espin.casa/albert/cml04-falcon-system/internal/types"
"git.espin.casa/albert/logger" "git.espin.casa/albert/logger"
"github.com/nats-io/nats.go"
"github.com/spf13/cobra" "github.com/spf13/cobra"
) )
@ -41,50 +34,30 @@ func Run(cmd *cobra.Command, args []string) {
"nats_port": natsPort, "nats_port": natsPort,
"log_level": logLevel, "log_level": logLevel,
} }
// create storager // create service
storage, err := storage.New(&storage.DBConfig{ service, err := service.New(&service.DBConfig{
Username: userName, Username: userName,
Password: userPass, Password: userPass,
Host: dbHost, Host: dbHost,
Port: dbPort, Port: dbPort,
Name: dbName, Name: userName,
}, &service.NATSConfig{
Host: natsHost,
Port: natsPort,
}) })
// handler error // handler error
if err != nil { if err != nil {
log.Error("create storage failed", err, logFields) log.Error("create service failed", err, logFields)
return return
} }
// NATS connect
nc, err := nats.Connect(fmt.Sprintf("nats://%s:%d", natsHost, natsPort))
if err != nil {
log.Error("create nats connection failed", err, logFields)
return
}
defer nc.Close()
// create publisher
pub := publisher.New(nc)
// create context // create context
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
// cancel main context on exit
defer cancel() defer cancel()
// NATS subscribe subject // on exit close service
nc.Subscribe("barcode.new", func(msg *nats.Msg) { defer service.Close(ctx)
go func(log logger.LoggerAdapter) {
if err := ProcessNewBarcodeMessage(ctx, msg, storage, pub, log); err != nil {
log.Error("process new barcode message failed", err, logger.LogFields{})
return
}
}(log)
})
nc.Subscribe("bundle.new", func(msg *nats.Msg) {
go func(log logger.LoggerAdapter) {
if err := ProcessNewBundleMessage(ctx, msg, storage, log); err != nil {
log.Error("process new bundle message failed", err, logger.LogFields{})
return
}
}(log)
})
// info banner // info banner
log.Info("started falcon barcoder service", logFields) log.Info("started falcon bundle service", logFields)
// wait signal to finish // wait signal to finish
signal := WaitSignal() signal := WaitSignal()
log.Info("signal received", logFields.Add(logger.LogFields{ log.Info("signal received", logFields.Add(logger.LogFields{
@ -92,62 +65,6 @@ func Run(cmd *cobra.Command, args []string) {
})) }))
} }
func ProcessNewBarcodeMessage(ctx context.Context, msg *nats.Msg, storage storage.Storager, pub publisher.Publisher, log logger.LoggerAdapter) error {
// event holder
event := &cml04eventer.Event{}
// create new buffer based on message data
buff := bytes.NewBuffer(msg.Data)
// decode message
if err := json.NewDecoder(buff).Decode(event); err != nil {
return err
}
// info banner
log.Info("received event bus message", logger.LogFields{
"event_id": event.EventID,
"event_topic": event.EventTopic,
})
// barcode
barcode := string(event.EventData)
// store barcode
if err := storage.ConfirmBundle(ctx, barcode); err != nil {
return err
}
// send bundle confirmation
if err := pub.ComfirmedBundle(ctx, barcode); err != nil {
return err
}
return nil
}
func ProcessNewBundleMessage(ctx context.Context, msg *nats.Msg, storage storage.Storager, log logger.LoggerAdapter) error {
// event holder
event := &cml04eventer.Event{}
// create new buffer based on message data
buff := bytes.NewBuffer(msg.Data)
// decode message
if err := json.NewDecoder(buff).Decode(event); err != nil {
return err
}
// info banner
log.Info("received event bus message", logger.LogFields{
"event_id": event.EventID,
"event_topic": event.EventTopic,
})
// bundle holder
bundle := &types.BundleData{}
// create new buffer based on message event data
buff = bytes.NewBuffer(event.EventData)
// decode message
if err := json.NewDecoder(buff).Decode(bundle); err != nil {
return err
}
// store bundle
if err := storage.StoreBundle(ctx, bundle); err != nil {
return err
}
return nil
}
// WaitSignal catching exit signal // WaitSignal catching exit signal
func WaitSignal() os.Signal { func WaitSignal() os.Signal {
ch := make(chan os.Signal, 2) ch := make(chan os.Signal, 2)

194
bundle/service/service.go Normal file
View File

@ -0,0 +1,194 @@
package service
import (
"context"
"fmt"
"strconv"
"time"
"git.espin.casa/albert/cml04-falcon-system/internal/types"
"github.com/nats-io/nats.go"
"github.com/nats-io/nats.go/micro"
"gorm.io/driver/postgres"
"gorm.io/gorm"
)
type Service interface {
Subscribe() error
Close(ctx context.Context) error
}
const TimeOut time.Duration = time.Second * 2
type DBConfig struct {
Username string
Password string
Host string
Port int
Name string
}
type NATSConfig struct {
Host string
Port int
}
func Nats(conf *NATSConfig) (*nats.Conn, error) {
// NATS connect
nc, err := nats.Connect(fmt.Sprintf("nats://%s:%d", conf.Host, conf.Port))
if err != nil {
return nil, err
}
return nc, nil
}
type service struct {
db *gorm.DB
nc *nats.Conn
srv micro.Service
}
func (s *service) svcNewLabelHandler(req micro.Request) {
// label holder
var label types.Label
// get label identificator
labelID := string(req.Data())
if err := s.db.First(&label, labelID).Error; err != nil {
return
}
// convert logo code
dibujo, err := strconv.Atoi(label.LogoCode)
if err != nil {
return
}
// bundle data
bundle := &types.BundleData{
Grupo6: "",
Po: int(label.ProductionOrderNo),
Co: int(label.CustomerOrderNo),
Colada: label.HeatId,
Calidad: label.SteelGrade,
Matnr: label.MaterialCode,
Dibujo: dibujo,
Operador: "AUT", // <--
Serie: 0,
Nromatricula: "", // <--
NroBulto: strconv.Itoa(int(label.L2PackageId)),
EtiquetaDoble: "S",
Fecha: 0,
Turno: "", // <--
Observacion1: "obser.1",
Observacion2: "obser.2",
Observacion3: "obser.3",
PaqueteLongitud: 0,
PaqueteAncho: 0,
PaqueteAlto: 0,
PaquetePeso: 0,
PaqueteNroSecciones: 0,
PaqueteNroMantos: 0,
PaqueteNroSeccManto: 0,
SeccionTipo: label.SectionType,
SeccionLongitud: 0,
SeccionAncho: 0,
SeccionAlto: 0,
Idioma: "ES",
Destino: 0,
Hora: 0,
Horario: 0,
Inst: "EST",
Tren: 1,
Normed: "",
Norpro: "",
Nortol: "",
Spras: "",
Statu: 0,
Crlf: "",
Maquina: 0,
Padre: "",
Paqpadre: "",
RelevantTime: "",
Desvio: 0,
Pesoteorico: 0,
PesoteoricoReal: 0,
DesvioTeoricoReal: 0,
FechaImpresion: "",
PesoNivel1: 0,
L3Sended: false,
Confirmed: false,
SAP: false,
}
}
// Subscribe implements Service.
func (s *service) Subscribe() error {
srv, err := micro.AddService(s.nc, micro.Config{
Name: "BundleService",
Version: "1.0.0",
// base handler
Endpoint: &micro.EndpointConfig{
Subject: "svc.new.label",
Handler: micro.HandlerFunc(s.svcNewLabelHandler),
},
})
// handle error
if err != nil {
return err
}
// set micro
s.srv = srv
// done
return nil
}
// Close implements Service.
func (s *service) Close(ctx context.Context) error {
if err := s.nc.Flush(); err != nil {
return err
}
s.nc.Close()
return nil
}
func DataBase(conf *DBConfig) (*gorm.DB, error) {
// create dsn string
dsn := fmt.Sprintf("host=%s user=%s password=%s dbname=%s port=%d sslmode=disable TimeZone=Europe/Madrid",
conf.Host,
conf.Username,
conf.Password,
conf.Name,
conf.Port,
)
// create open database connection
db, err := gorm.Open(postgres.Open(dsn), &gorm.Config{})
if err != nil {
return nil, err
}
return db, nil
}
func AutoMigrate(db *gorm.DB) error {
return db.AutoMigrate(&types.BundleData{})
}
func New(dbConf *DBConfig, natsConf *NATSConfig) (Service, error) {
// create data base
db, err := DataBase(dbConf)
if err != nil {
return nil, err
}
// auto migrate
if err := AutoMigrate(db); err != nil {
return nil, err
}
// create nc connection
nc, err := Nats(natsConf)
if err != nil {
return nil, err
}
return &service{
db: db,
nc: nc,
}, nil
}

View File

@ -98,7 +98,6 @@ func ProductionDataBase(conf *DBConfig) (*gorm.DB, error) {
func Automigrate(db *gorm.DB) error { func Automigrate(db *gorm.DB) error {
return db.AutoMigrate( return db.AutoMigrate(
&types.Barcode{}, &types.Barcode{},
&types.BundleData{},
&types.ProductionOrder{}, &types.ProductionOrder{},
&types.CustomerOrder{}, &types.CustomerOrder{},
&types.BeamCutPattern{}, &types.BeamCutPattern{},

View File

@ -74,7 +74,7 @@ func (s *service) Close(ctx context.Context) error {
// PublishNewLabelData implements Service. // PublishNewLabelData implements Service.
func (s *service) PublishNewLabelData(ctx context.Context, id string) error { func (s *service) PublishNewLabelData(ctx context.Context, id string) error {
_, err := s.nc.Request("new.label.data", []byte(id), TimeOut) _, err := s.nc.Request("svc.new.label", []byte(id), TimeOut)
if err != nil { if err != nil {
return err return err
} }