This commit is contained in:
aespinro 2024-10-21 21:49:06 +02:00
parent da7777c78b
commit 2e329e0bd3
3 changed files with 113 additions and 20 deletions

View File

@ -18,14 +18,13 @@ var labelerCmd = &cobra.Command{
func init() { func init() {
rootCmd.AddCommand(labelerCmd) rootCmd.AddCommand(labelerCmd)
// flags
// Here you will define your flags and configuration settings. labelerCmd.Flags().String("db-username", "postgres", "database username")
labelerCmd.Flags().String("db-password", "Me8140@01", "database user password")
// Cobra supports Persistent Flags which will work for this command labelerCmd.Flags().String("db-host", "db", "database host address")
// and all subcommands, e.g.: labelerCmd.Flags().Int("db-port", 5432, "database tcp port")
// labelerCmd.PersistentFlags().String("foo", "", "A help for foo") labelerCmd.Flags().String("db-name", "falcon", "database user password")
labelerCmd.Flags().String("nats-host", "nats", "nats.io broker host address")
// Cobra supports local flags which will only run when this command labelerCmd.Flags().Int("nats-port", 4222, "nats.io broker tcp port")
// is called directly, e.g.: labelerCmd.Flags().String("log-level", "debug", "log level trace")
// labelerCmd.Flags().BoolP("toggle", "t", false, "Help message for toggle")
} }

View File

@ -1,10 +1,12 @@
package app package app
import ( import (
"context"
"os" "os"
"os/signal" "os/signal"
"syscall" "syscall"
"git.espin.casa/albert/cml04-falcon-system/labeler/server"
"git.espin.casa/albert/cml04-falcon-system/labeler/service" "git.espin.casa/albert/cml04-falcon-system/labeler/service"
"git.espin.casa/albert/logger" "git.espin.casa/albert/logger"
"github.com/spf13/cobra" "github.com/spf13/cobra"
@ -20,6 +22,9 @@ func Run(cmd *cobra.Command, args []string) {
dbName, _ := cmd.Flags().GetString("db-name") dbName, _ := cmd.Flags().GetString("db-name")
natsHost, _ := cmd.Flags().GetString("nats-host") natsHost, _ := cmd.Flags().GetString("nats-host")
natsPort, _ := cmd.Flags().GetInt("nats-port") natsPort, _ := cmd.Flags().GetInt("nats-port")
// create context
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// setup logger // setup logger
log := logger.New(os.Stdout, logLevel) log := logger.New(os.Stdout, logLevel)
// log fields // log fields
@ -39,10 +44,15 @@ func Run(cmd *cobra.Command, args []string) {
Host: natsHost, Host: natsHost,
Port: natsPort, Port: natsPort,
}) })
// check error
if err != nil { if err != nil {
log.Error("create service failed", err, logFields) log.Error("create service failed", err, logFields)
return return
} }
defer svc.Close(ctx)
// create server
server := server.New("", svc, log)
server.Start()
// info banner // info banner
log.Info("started cml04-falcon-labeler service", logFields) log.Info("started cml04-falcon-labeler service", logFields)
// wait signal to finish // wait signal to finish

View File

@ -2,13 +2,17 @@ package server
import ( import (
"encoding/binary" "encoding/binary"
"fmt"
"io" "io"
"net"
"sync"
"git.espin.casa/albert/cml04-falcon-system/labeler/service" "git.espin.casa/albert/cml04-falcon-system/labeler/service"
"git.espin.casa/albert/logger"
) )
type Header struct { type Header struct {
MessageLenght int16 MessageLength int16
TelegramID int16 TelegramID int16
SequenceCounter int16 SequenceCounter int16
Flags int16 Flags int16
@ -37,20 +41,100 @@ type LabelData struct {
FreeTxtLp [180]byte FreeTxtLp [180]byte
} }
func (l *LabelData) FillTelegram(reader io.Reader) error { func (l *LabelData) FillTelegram(reader io.Reader) (int, error) {
if err := binary.Read(reader, binary.LittleEndian, &l); err != nil { // Read into the LabelData struct and return the number of bytes read
return err err := binary.Read(reader, binary.LittleEndian, l)
if err != nil {
return 0, err
} }
return nil // Calculate total number of bytes read based on struct size
labelDataSize := binary.Size(l)
return labelDataSize, nil
} }
type Server struct{} type Server struct {
address string
svc service.Service
mu sync.Mutex
log logger.LoggerAdapter
}
// Start begins the TCP server and listens for incoming connections
func (s *Server) Start() error { func (s *Server) Start() error {
listener, err := net.Listen("tcp", s.address)
return nil if err != nil {
return fmt.Errorf("failed to start server: %v", err)
}
defer listener.Close()
s.log.Info("server is listening", logger.LogFields{
"address": s.address,
})
for {
conn, err := listener.Accept()
if err != nil {
s.log.Error("accepting connections failed", err, logger.LogFields{})
continue
}
// Handle the connection concurrently
go s.handleConnection(conn)
}
} }
func New(svc service.Service) *Server { // handleConnection reads telegram data from the TCP connection and keeps the connection open
return &Server{} func (s *Server) handleConnection(conn net.Conn) {
defer conn.Close()
s.log.Info("new connection", logger.LogFields{
"from": conn.RemoteAddr(),
})
for {
// Initialize LabelData struct to hold the incoming telegram
var labelData LabelData
// Read the telegram from the connection and track the number of bytes read
bytesRead, err := labelData.FillTelegram(conn)
if err != nil {
// If the client disconnects or there's a read error, exit the loop
if err == io.EOF {
s.log.Info("client disconnected", logger.LogFields{"client": conn.RemoteAddr()})
break
}
s.log.Error("reading telegram failed", err, logger.LogFields{})
continue // Keep reading from the connection
}
// Check if TelegramID is 6500, if not discard but keep connection
if labelData.Header.TelegramID != 6500 {
s.log.Error("invalid telegram id", fmt.Errorf("invalid telegram"), logger.LogFields{
"id": labelData.Header.TelegramID,
"from": conn.RemoteAddr(),
})
continue // Discard the content but keep reading from the connection
}
// Check if MessageLength matches the data received
if labelData.Header.MessageLength != int16(bytesRead) {
fmt.Printf("MessageLength mismatch: expected %d, got %d bytes from %s\n", labelData.Header.MessageLength, bytesRead, conn.RemoteAddr())
continue // Discard the content but keep reading from the connection
}
// Locking if necessary when interacting with shared resources (svc, etc.)
s.mu.Lock()
// Process the valid telegram
s.log.Info("received valid label data", logger.LogFields{"from": conn.RemoteAddr()})
fmt.Printf("label: %+v\n", labelData)
s.mu.Unlock()
// Continue reading next telegrams from this connection
}
}
// New creates a new Server instance
func New(address string, svc service.Service, log logger.LoggerAdapter) *Server {
return &Server{
address: address,
svc: svc,
mu: sync.Mutex{},
log: log,
}
} }