This commit is contained in:
aespin 2024-10-22 17:34:07 +02:00
parent cd4e313d06
commit c894099a26
3 changed files with 196 additions and 117 deletions

View File

@ -10,27 +10,8 @@ type Server struct {
svc micro.Service svc micro.Service
} }
func AddEndPoints(svc micro.Service) (err error) { func eventHandler(req micro.Request) {
// req.Data()
err = svc.AddEndpoint("svc.eventer.new.bundle", micro.HandlerFunc(SvcEventerNewBundleHandle))
if err != nil {
return err
}
err = svc.AddEndpoint("svc.eventer.new.barcode", micro.HandlerFunc(SvcEventerNewBarcodeHandle))
if err != nil {
return err
}
return
}
func SvcEventerNewBundleHandle(req micro.Request) {
req.Respond(req.Data())
}
func SvcEventerNewBarcodeHandle(req micro.Request) {
req.Respond(req.Data())
} }
func New(nc *nats.Conn) (*Server, error) { func New(nc *nats.Conn) (*Server, error) {
@ -38,6 +19,10 @@ func New(nc *nats.Conn) (*Server, error) {
Name: "EventService", Name: "EventService",
Version: "1.0.0", Version: "1.0.0",
Description: "falcon event service", Description: "falcon event service",
Endpoint: &micro.EndpointConfig{
Subject: "svc.event",
Handler: micro.HandlerFunc(eventHandler),
},
}) })
if err != nil { if err != nil {
return nil, err return nil, err

View File

@ -53,7 +53,7 @@ func Run(cmd *cobra.Command, args []string) {
defer svc.Close(ctx) defer svc.Close(ctx)
// create server // create server
server := server.New(bindAddr, svc, log) server := server.New(bindAddr, svc, log)
go server.Start() go server.Run()
// info banner // info banner
log.Info("started cml04-falcon-labeler service", logFields) log.Info("started cml04-falcon-labeler service", logFields)
// wait signal to finish // wait signal to finish

View File

@ -1,11 +1,17 @@
package server package server
import ( import (
"bytes"
"context"
"encoding/binary" "encoding/binary"
"fmt" "errors"
"io"
"net" "net"
"sync" "strconv"
"strings"
"git.espin.casa/albert/cml04-falcon-system/internal/types"
"github.com/google/uuid"
"git.espin.casa/albert/cml04-falcon-system/labeler/service" "git.espin.casa/albert/cml04-falcon-system/labeler/service"
"git.espin.casa/albert/logger" "git.espin.casa/albert/logger"
@ -21,119 +27,207 @@ type Header struct {
type LabelData struct { type LabelData struct {
Header Header Header Header
DateTime [23]byte DateTime [23]byte `json:"date_time"`
L2PackageId int16 L2PackageId int32 `json:"l2_package_id"`
L3PackageId [6]byte L3PackageId [6]byte `json:"l3_package_id"`
ProductionOrderNo int32 ProductionOrderNo int32 `json:"production_order_no"`
CustomerOrderNo int32 CustomerOrderNo int32 `json:"customer_order_no"`
CustomerName [30]byte CustomerName [30]byte `json:"customer_name"`
LogoCode [3]byte LogoCode [3]byte `json:"logo_code"`
SteelGrade [15]byte SteelGrade [15]byte `json:"steel_grade"`
MaterialCode [18]byte MaterialCode [18]byte `json:"material_code"`
HeatId [10]byte HeatId [10]byte `json:"heat_id"`
SectionType [20]byte SectionType [20]byte `json:"section_type"`
PackageDimensions [3]float32 PackageDimensions [3]float32 `json:"package_dimensions"`
PackageWeight float32 PackageWeight float32 `json:"package_weight"`
SectionDimensions [3]float32 SectionDimensions [3]float32 `json:"section_dimensions"`
NumberSections int32 NumberSections int32 `json:"number_sections"`
NumberLayers int32 NumberLayers int32 `json:"number_layers"`
NumberSectionsInLayer int32 NumberSectionsInLayer int32 `json:"number_sections_last_layer"`
FreeTxtLp [180]byte FreeTxtLp [180]byte `json:"free_txt_lp"`
} }
func (l *LabelData) FillTelegram(reader io.Reader) (int, error) { func cleanNonPrintable(input []byte) string {
// Read into the LabelData struct and return the number of bytes read var result strings.Builder
err := binary.Read(reader, binary.LittleEndian, l)
if err != nil { for _, b := range input {
return 0, err if strconv.IsPrint(rune(b)) {
result.WriteByte(b)
}
} }
// Calculate total number of bytes read based on struct size
labelDataSize := binary.Size(l) return result.String()
return labelDataSize, nil }
func (pmd *LabelData) DateTimeAsString() string {
s := cleanNonPrintable(pmd.DateTime[:])
return strings.TrimRight(string(s), "\x00")
}
func (pmd *LabelData) L3PackageIdAsString() string {
s := cleanNonPrintable(pmd.L3PackageId[:])
return strings.TrimRight(string(s), "\x00")
}
func (pmd *LabelData) CustomerNameAsString() string {
s := cleanNonPrintable(pmd.CustomerName[:])
return strings.TrimRight(string(s), "\x00")
}
func (pmd *LabelData) LogoCodeAsString() string {
s := cleanNonPrintable(pmd.LogoCode[:])
return strings.TrimRight(string(s), "\x00")
}
func (pmd *LabelData) SteelGradeAsString() string {
s := cleanNonPrintable(pmd.SteelGrade[:])
return strings.TrimRight(string(s), "\x00")
}
func (pmd *LabelData) MaterialCodeAsString() string {
s := cleanNonPrintable(pmd.MaterialCode[:])
return strings.TrimRight(string(s), "\x00")
}
func (pmd *LabelData) HeatIdAsString() string {
s := cleanNonPrintable(pmd.HeatId[:])
return strings.TrimRight(string(s), "\x00")
}
func (pmd *LabelData) SectionTypeAsString() string {
s := cleanNonPrintable(pmd.SectionType[:])
return strings.TrimRight(string(s), "\x00")
}
func (pmd *LabelData) FreeTxtLpAsString() string {
s := cleanNonPrintable(pmd.FreeTxtLp[:])
return strings.TrimRight(string(s), "\x00")
} }
type Server struct { type Server struct {
address string address string
svc service.Service
mu sync.Mutex
log logger.LoggerAdapter log logger.LoggerAdapter
svc service.Service
} }
// Start begins the TCP server and listens for incoming connections type Client struct {
func (s *Server) Start() error { conn net.Conn
listener, err := net.Listen("tcp", s.address) log logger.LoggerAdapter
if err != nil { svc service.Service
return fmt.Errorf("failed to start server: %v", err) }
}
defer listener.Close() func (client *Client) handleRequest() {
s.log.Info("server is listening", logger.LogFields{ // telegram bytes buffer holder 4096 maximun telegram size defined by SMS
"address": s.address, telegramBytes := bytes.NewBuffer(make([]byte, 4096))
})
for { for {
pl, err := client.conn.Read(telegramBytes.Bytes())
if err != nil {
client.log.Error("reading telegram failed", err, logger.LogFields{})
return
}
// interpret first 24 bytes as header
header := &Header{}
headerReader := bytes.NewReader(telegramBytes.Bytes()[:24])
if err := binary.Read(headerReader, binary.LittleEndian, header); err != nil {
client.log.Error("interpreting telegram header failed", err, logger.LogFields{})
return
}
// switch telegram id
switch header.TelegramID {
// handle telegram data
case 6500:
if header.MessageLength != int16(pl) {
client.log.Error("message length mismatch telegram defined length", errors.New(""), logger.LogFields{
"telegram_defined_length": header.MessageLength,
"received_length": pl,
})
return
}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// read telegram data
telegramData := &LabelData{}
if err := binary.Read(telegramBytes, binary.LittleEndian, telegramData); err != nil {
client.log.Error("reading telegram content failed", err, logger.LogFields{})
return
}
if err := client.handlePublishPrintData(ctx, telegramData); err != nil {
client.log.Error("publish print data failed", err, logger.LogFields{})
return
}
default:
}
}
}
func (client *Client) handlePublishPrintData(ctx context.Context, data *LabelData) error {
// set label data
label := &types.Label{
ID: uuid.NewString(),
DateTime: data.DateTimeAsString(),
L2PackageId: int16(data.L2PackageId),
L3PackageId: data.L3PackageIdAsString(),
ProductionOrderNo: data.ProductionOrderNo,
CustomerOrderNo: data.CustomerOrderNo,
CustomerName: data.CustomerNameAsString(),
LogoCode: data.LogoCodeAsString(),
SteelGrade: data.SteelGradeAsString(),
MaterialCode: data.MaterialCodeAsString(),
HeatId: data.HeatIdAsString(),
SectionType: data.SectionTypeAsString(),
PackageDimenA: data.PackageDimensions[0],
PackageDimenB: data.PackageDimensions[1],
PackageDimenC: data.PackageDimensions[2],
PackageWeight: data.PackageWeight,
SectionDimenA: data.SectionDimensions[0],
SectionDimenB: data.SectionDimensions[1],
SectionDimenC: data.SectionDimensions[2],
NumberSections: data.NumberSections,
NumberLayers: data.NumberLayers,
NumberSectionsInLayer: data.NumberSectionsInLayer,
FreeTxtLp: data.FreeTxtLpAsString(),
}
// store new label data received
if err := client.svc.StoreLabelData(ctx, label); err != nil {
return err
}
// publish new label data event
if err := client.svc.PublishNewLabelData(ctx, label.ID); err != nil {
return err
}
return nil
}
func (server *Server) Run() error {
// create tcp listener
listener, err := net.Listen("tcp", server.address)
if err != nil {
return err
}
// close on exit
defer listener.Close()
// accept incoming connections
for {
// accept connection
conn, err := listener.Accept() conn, err := listener.Accept()
if err != nil { if err != nil {
s.log.Error("accepting connections failed", err, logger.LogFields{}) return err
continue
} }
// Handle the connection concurrently // create new client
go s.handleConnection(conn) client := &Client{
conn: conn,
log: server.log,
svc: server.svc,
}
// handle client
go client.handleRequest()
} }
} }
// handleConnection reads telegram data from the TCP connection and keeps the connection open
func (s *Server) handleConnection(conn net.Conn) {
defer conn.Close()
s.log.Info("new connection", logger.LogFields{
"from": conn.RemoteAddr(),
})
for {
// Initialize LabelData struct to hold the incoming telegram
var labelData LabelData
// Read the telegram from the connection and track the number of bytes read
bytesRead, err := labelData.FillTelegram(conn)
if err != nil {
// If the client disconnects or there's a read error, exit the loop
if err == io.EOF {
s.log.Info("client disconnected", logger.LogFields{"client": conn.RemoteAddr()})
break
}
s.log.Error("reading telegram failed", err, logger.LogFields{})
return // Keep reading from the connection
}
// Check if TelegramID is 6500, if not discard but keep connection
if labelData.Header.TelegramID != 6500 {
s.log.Error("invalid telegram id", fmt.Errorf("invalid telegram"), logger.LogFields{
"id": labelData.Header.TelegramID,
"from": conn.RemoteAddr(),
})
return // Discard the content but keep reading from the connection
}
// Check if MessageLength matches the data received
if labelData.Header.MessageLength != int16(bytesRead) {
fmt.Printf("MessageLength mismatch: expected %d, got %d bytes from %s\n", labelData.Header.MessageLength, bytesRead, conn.RemoteAddr())
return // Discard the content but keep reading from the connection
}
// Locking if necessary when interacting with shared resources (svc, etc.)
s.mu.Lock()
// Process the valid telegram
s.log.Info("received valid label data", logger.LogFields{"from": conn.RemoteAddr()})
fmt.Printf("label: %+v\n", labelData)
s.mu.Unlock()
// Continue reading next telegrams from this connection
}
}
// New creates a new Server instance
func New(address string, svc service.Service, log logger.LoggerAdapter) *Server { func New(address string, svc service.Service, log logger.LoggerAdapter) *Server {
return &Server{ return &Server{
address: address, address: address,
svc: svc, svc: svc,
mu: sync.Mutex{},
log: log, log: log,
} }
} }