diff --git a/eventer/server/server.go b/eventer/server/server.go index f93eb3f..c5a5b0b 100644 --- a/eventer/server/server.go +++ b/eventer/server/server.go @@ -10,27 +10,8 @@ type Server struct { svc micro.Service } -func AddEndPoints(svc micro.Service) (err error) { - // - err = svc.AddEndpoint("svc.eventer.new.bundle", micro.HandlerFunc(SvcEventerNewBundleHandle)) - if err != nil { - return err - } - - err = svc.AddEndpoint("svc.eventer.new.barcode", micro.HandlerFunc(SvcEventerNewBarcodeHandle)) - if err != nil { - return err - } - - return -} - -func SvcEventerNewBundleHandle(req micro.Request) { - req.Respond(req.Data()) -} - -func SvcEventerNewBarcodeHandle(req micro.Request) { - req.Respond(req.Data()) +func eventHandler(req micro.Request) { + req.Data() } func New(nc *nats.Conn) (*Server, error) { @@ -38,6 +19,10 @@ func New(nc *nats.Conn) (*Server, error) { Name: "EventService", Version: "1.0.0", Description: "falcon event service", + Endpoint: µ.EndpointConfig{ + Subject: "svc.event", + Handler: micro.HandlerFunc(eventHandler), + }, }) if err != nil { return nil, err diff --git a/labeler/app/app.go b/labeler/app/app.go index 9790c3b..a24cc20 100644 --- a/labeler/app/app.go +++ b/labeler/app/app.go @@ -53,7 +53,7 @@ func Run(cmd *cobra.Command, args []string) { defer svc.Close(ctx) // create server server := server.New(bindAddr, svc, log) - go server.Start() + go server.Run() // info banner log.Info("started cml04-falcon-labeler service", logFields) // wait signal to finish diff --git a/labeler/server/server.go b/labeler/server/server.go index 1386635..1e30fe8 100644 --- a/labeler/server/server.go +++ b/labeler/server/server.go @@ -1,11 +1,17 @@ package server import ( + "bytes" + "context" "encoding/binary" - "fmt" - "io" + "errors" + "net" - "sync" + "strconv" + "strings" + + "git.espin.casa/albert/cml04-falcon-system/internal/types" + "github.com/google/uuid" "git.espin.casa/albert/cml04-falcon-system/labeler/service" "git.espin.casa/albert/logger" @@ -21,119 +27,207 @@ type Header struct { type LabelData struct { Header Header - DateTime [23]byte - L2PackageId int16 - L3PackageId [6]byte - ProductionOrderNo int32 - CustomerOrderNo int32 - CustomerName [30]byte - LogoCode [3]byte - SteelGrade [15]byte - MaterialCode [18]byte - HeatId [10]byte - SectionType [20]byte - PackageDimensions [3]float32 - PackageWeight float32 - SectionDimensions [3]float32 - NumberSections int32 - NumberLayers int32 - NumberSectionsInLayer int32 - FreeTxtLp [180]byte + DateTime [23]byte `json:"date_time"` + L2PackageId int32 `json:"l2_package_id"` + L3PackageId [6]byte `json:"l3_package_id"` + ProductionOrderNo int32 `json:"production_order_no"` + CustomerOrderNo int32 `json:"customer_order_no"` + CustomerName [30]byte `json:"customer_name"` + LogoCode [3]byte `json:"logo_code"` + SteelGrade [15]byte `json:"steel_grade"` + MaterialCode [18]byte `json:"material_code"` + HeatId [10]byte `json:"heat_id"` + SectionType [20]byte `json:"section_type"` + PackageDimensions [3]float32 `json:"package_dimensions"` + PackageWeight float32 `json:"package_weight"` + SectionDimensions [3]float32 `json:"section_dimensions"` + NumberSections int32 `json:"number_sections"` + NumberLayers int32 `json:"number_layers"` + NumberSectionsInLayer int32 `json:"number_sections_last_layer"` + FreeTxtLp [180]byte `json:"free_txt_lp"` } -func (l *LabelData) FillTelegram(reader io.Reader) (int, error) { - // Read into the LabelData struct and return the number of bytes read - err := binary.Read(reader, binary.LittleEndian, l) - if err != nil { - return 0, err +func cleanNonPrintable(input []byte) string { + var result strings.Builder + + for _, b := range input { + if strconv.IsPrint(rune(b)) { + result.WriteByte(b) + } } - // Calculate total number of bytes read based on struct size - labelDataSize := binary.Size(l) - return labelDataSize, nil + + return result.String() +} + +func (pmd *LabelData) DateTimeAsString() string { + s := cleanNonPrintable(pmd.DateTime[:]) + return strings.TrimRight(string(s), "\x00") +} + +func (pmd *LabelData) L3PackageIdAsString() string { + s := cleanNonPrintable(pmd.L3PackageId[:]) + return strings.TrimRight(string(s), "\x00") +} + +func (pmd *LabelData) CustomerNameAsString() string { + s := cleanNonPrintable(pmd.CustomerName[:]) + return strings.TrimRight(string(s), "\x00") +} + +func (pmd *LabelData) LogoCodeAsString() string { + s := cleanNonPrintable(pmd.LogoCode[:]) + return strings.TrimRight(string(s), "\x00") +} + +func (pmd *LabelData) SteelGradeAsString() string { + s := cleanNonPrintable(pmd.SteelGrade[:]) + return strings.TrimRight(string(s), "\x00") +} + +func (pmd *LabelData) MaterialCodeAsString() string { + s := cleanNonPrintable(pmd.MaterialCode[:]) + return strings.TrimRight(string(s), "\x00") +} + +func (pmd *LabelData) HeatIdAsString() string { + s := cleanNonPrintable(pmd.HeatId[:]) + return strings.TrimRight(string(s), "\x00") +} + +func (pmd *LabelData) SectionTypeAsString() string { + s := cleanNonPrintable(pmd.SectionType[:]) + return strings.TrimRight(string(s), "\x00") +} + +func (pmd *LabelData) FreeTxtLpAsString() string { + s := cleanNonPrintable(pmd.FreeTxtLp[:]) + return strings.TrimRight(string(s), "\x00") } type Server struct { address string - svc service.Service - mu sync.Mutex log logger.LoggerAdapter + svc service.Service } -// Start begins the TCP server and listens for incoming connections -func (s *Server) Start() error { - listener, err := net.Listen("tcp", s.address) - if err != nil { - return fmt.Errorf("failed to start server: %v", err) - } - defer listener.Close() - s.log.Info("server is listening", logger.LogFields{ - "address": s.address, - }) +type Client struct { + conn net.Conn + log logger.LoggerAdapter + svc service.Service +} + +func (client *Client) handleRequest() { + // telegram bytes buffer holder 4096 maximun telegram size defined by SMS + telegramBytes := bytes.NewBuffer(make([]byte, 4096)) for { + pl, err := client.conn.Read(telegramBytes.Bytes()) + if err != nil { + client.log.Error("reading telegram failed", err, logger.LogFields{}) + return + } + // interpret first 24 bytes as header + header := &Header{} + headerReader := bytes.NewReader(telegramBytes.Bytes()[:24]) + if err := binary.Read(headerReader, binary.LittleEndian, header); err != nil { + client.log.Error("interpreting telegram header failed", err, logger.LogFields{}) + return + } + // switch telegram id + switch header.TelegramID { + // handle telegram data + case 6500: + if header.MessageLength != int16(pl) { + client.log.Error("message length mismatch telegram defined length", errors.New(""), logger.LogFields{ + "telegram_defined_length": header.MessageLength, + "received_length": pl, + }) + return + } + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + // read telegram data + telegramData := &LabelData{} + if err := binary.Read(telegramBytes, binary.LittleEndian, telegramData); err != nil { + client.log.Error("reading telegram content failed", err, logger.LogFields{}) + return + } + if err := client.handlePublishPrintData(ctx, telegramData); err != nil { + client.log.Error("publish print data failed", err, logger.LogFields{}) + return + } + default: + } + } +} + +func (client *Client) handlePublishPrintData(ctx context.Context, data *LabelData) error { + // set label data + label := &types.Label{ + ID: uuid.NewString(), + DateTime: data.DateTimeAsString(), + L2PackageId: int16(data.L2PackageId), + L3PackageId: data.L3PackageIdAsString(), + ProductionOrderNo: data.ProductionOrderNo, + CustomerOrderNo: data.CustomerOrderNo, + CustomerName: data.CustomerNameAsString(), + LogoCode: data.LogoCodeAsString(), + SteelGrade: data.SteelGradeAsString(), + MaterialCode: data.MaterialCodeAsString(), + HeatId: data.HeatIdAsString(), + SectionType: data.SectionTypeAsString(), + PackageDimenA: data.PackageDimensions[0], + PackageDimenB: data.PackageDimensions[1], + PackageDimenC: data.PackageDimensions[2], + PackageWeight: data.PackageWeight, + SectionDimenA: data.SectionDimensions[0], + SectionDimenB: data.SectionDimensions[1], + SectionDimenC: data.SectionDimensions[2], + NumberSections: data.NumberSections, + NumberLayers: data.NumberLayers, + NumberSectionsInLayer: data.NumberSectionsInLayer, + FreeTxtLp: data.FreeTxtLpAsString(), + } + // store new label data received + if err := client.svc.StoreLabelData(ctx, label); err != nil { + return err + } + // publish new label data event + if err := client.svc.PublishNewLabelData(ctx, label.ID); err != nil { + return err + } + return nil +} + +func (server *Server) Run() error { + // create tcp listener + listener, err := net.Listen("tcp", server.address) + if err != nil { + return err + } + // close on exit + defer listener.Close() + // accept incoming connections + for { + // accept connection conn, err := listener.Accept() if err != nil { - s.log.Error("accepting connections failed", err, logger.LogFields{}) - continue + return err } - // Handle the connection concurrently - go s.handleConnection(conn) + // create new client + client := &Client{ + conn: conn, + log: server.log, + svc: server.svc, + } + // handle client + go client.handleRequest() } } -// handleConnection reads telegram data from the TCP connection and keeps the connection open -func (s *Server) handleConnection(conn net.Conn) { - defer conn.Close() - s.log.Info("new connection", logger.LogFields{ - "from": conn.RemoteAddr(), - }) - - for { - // Initialize LabelData struct to hold the incoming telegram - var labelData LabelData - - // Read the telegram from the connection and track the number of bytes read - bytesRead, err := labelData.FillTelegram(conn) - if err != nil { - // If the client disconnects or there's a read error, exit the loop - if err == io.EOF { - s.log.Info("client disconnected", logger.LogFields{"client": conn.RemoteAddr()}) - break - } - s.log.Error("reading telegram failed", err, logger.LogFields{}) - return // Keep reading from the connection - } - - // Check if TelegramID is 6500, if not discard but keep connection - if labelData.Header.TelegramID != 6500 { - s.log.Error("invalid telegram id", fmt.Errorf("invalid telegram"), logger.LogFields{ - "id": labelData.Header.TelegramID, - "from": conn.RemoteAddr(), - }) - return // Discard the content but keep reading from the connection - } - - // Check if MessageLength matches the data received - if labelData.Header.MessageLength != int16(bytesRead) { - fmt.Printf("MessageLength mismatch: expected %d, got %d bytes from %s\n", labelData.Header.MessageLength, bytesRead, conn.RemoteAddr()) - return // Discard the content but keep reading from the connection - } - - // Locking if necessary when interacting with shared resources (svc, etc.) - s.mu.Lock() - // Process the valid telegram - s.log.Info("received valid label data", logger.LogFields{"from": conn.RemoteAddr()}) - fmt.Printf("label: %+v\n", labelData) - s.mu.Unlock() - // Continue reading next telegrams from this connection - } -} - -// New creates a new Server instance func New(address string, svc service.Service, log logger.LoggerAdapter) *Server { return &Server{ address: address, svc: svc, - mu: sync.Mutex{}, log: log, } }