diff --git a/cmd/labeler.go b/cmd/labeler.go index ab13941..0a3af60 100644 --- a/cmd/labeler.go +++ b/cmd/labeler.go @@ -18,14 +18,13 @@ var labelerCmd = &cobra.Command{ func init() { rootCmd.AddCommand(labelerCmd) - - // Here you will define your flags and configuration settings. - - // Cobra supports Persistent Flags which will work for this command - // and all subcommands, e.g.: - // labelerCmd.PersistentFlags().String("foo", "", "A help for foo") - - // Cobra supports local flags which will only run when this command - // is called directly, e.g.: - // labelerCmd.Flags().BoolP("toggle", "t", false, "Help message for toggle") + // flags + labelerCmd.Flags().String("db-username", "postgres", "database username") + labelerCmd.Flags().String("db-password", "Me8140@01", "database user password") + labelerCmd.Flags().String("db-host", "db", "database host address") + labelerCmd.Flags().Int("db-port", 5432, "database tcp port") + labelerCmd.Flags().String("db-name", "falcon", "database user password") + labelerCmd.Flags().String("nats-host", "nats", "nats.io broker host address") + labelerCmd.Flags().Int("nats-port", 4222, "nats.io broker tcp port") + labelerCmd.Flags().String("log-level", "debug", "log level trace") } diff --git a/labeler/app/app.go b/labeler/app/app.go index 3a6471a..2910f49 100644 --- a/labeler/app/app.go +++ b/labeler/app/app.go @@ -1,10 +1,12 @@ package app import ( + "context" "os" "os/signal" "syscall" + "git.espin.casa/albert/cml04-falcon-system/labeler/server" "git.espin.casa/albert/cml04-falcon-system/labeler/service" "git.espin.casa/albert/logger" "github.com/spf13/cobra" @@ -20,6 +22,9 @@ func Run(cmd *cobra.Command, args []string) { dbName, _ := cmd.Flags().GetString("db-name") natsHost, _ := cmd.Flags().GetString("nats-host") natsPort, _ := cmd.Flags().GetInt("nats-port") + // create context + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() // setup logger log := logger.New(os.Stdout, logLevel) // log fields @@ -39,10 +44,15 @@ func Run(cmd *cobra.Command, args []string) { Host: natsHost, Port: natsPort, }) + // check error if err != nil { log.Error("create service failed", err, logFields) return } + defer svc.Close(ctx) + // create server + server := server.New("", svc, log) + server.Start() // info banner log.Info("started cml04-falcon-labeler service", logFields) // wait signal to finish diff --git a/labeler/server/server.go b/labeler/server/server.go index e1ed3d0..c931653 100644 --- a/labeler/server/server.go +++ b/labeler/server/server.go @@ -2,13 +2,17 @@ package server import ( "encoding/binary" + "fmt" "io" + "net" + "sync" "git.espin.casa/albert/cml04-falcon-system/labeler/service" + "git.espin.casa/albert/logger" ) type Header struct { - MessageLenght int16 + MessageLength int16 TelegramID int16 SequenceCounter int16 Flags int16 @@ -37,20 +41,100 @@ type LabelData struct { FreeTxtLp [180]byte } -func (l *LabelData) FillTelegram(reader io.Reader) error { - if err := binary.Read(reader, binary.LittleEndian, &l); err != nil { - return err +func (l *LabelData) FillTelegram(reader io.Reader) (int, error) { + // Read into the LabelData struct and return the number of bytes read + err := binary.Read(reader, binary.LittleEndian, l) + if err != nil { + return 0, err } - return nil + // Calculate total number of bytes read based on struct size + labelDataSize := binary.Size(l) + return labelDataSize, nil } -type Server struct{} +type Server struct { + address string + svc service.Service + mu sync.Mutex + log logger.LoggerAdapter +} +// Start begins the TCP server and listens for incoming connections func (s *Server) Start() error { - - return nil + listener, err := net.Listen("tcp", s.address) + if err != nil { + return fmt.Errorf("failed to start server: %v", err) + } + defer listener.Close() + s.log.Info("server is listening", logger.LogFields{ + "address": s.address, + }) + for { + conn, err := listener.Accept() + if err != nil { + s.log.Error("accepting connections failed", err, logger.LogFields{}) + continue + } + // Handle the connection concurrently + go s.handleConnection(conn) + } } -func New(svc service.Service) *Server { - return &Server{} +// handleConnection reads telegram data from the TCP connection and keeps the connection open +func (s *Server) handleConnection(conn net.Conn) { + defer conn.Close() + s.log.Info("new connection", logger.LogFields{ + "from": conn.RemoteAddr(), + }) + + for { + // Initialize LabelData struct to hold the incoming telegram + var labelData LabelData + + // Read the telegram from the connection and track the number of bytes read + bytesRead, err := labelData.FillTelegram(conn) + if err != nil { + // If the client disconnects or there's a read error, exit the loop + if err == io.EOF { + s.log.Info("client disconnected", logger.LogFields{"client": conn.RemoteAddr()}) + break + } + s.log.Error("reading telegram failed", err, logger.LogFields{}) + continue // Keep reading from the connection + } + + // Check if TelegramID is 6500, if not discard but keep connection + if labelData.Header.TelegramID != 6500 { + s.log.Error("invalid telegram id", fmt.Errorf("invalid telegram"), logger.LogFields{ + "id": labelData.Header.TelegramID, + "from": conn.RemoteAddr(), + }) + continue // Discard the content but keep reading from the connection + } + + // Check if MessageLength matches the data received + if labelData.Header.MessageLength != int16(bytesRead) { + fmt.Printf("MessageLength mismatch: expected %d, got %d bytes from %s\n", labelData.Header.MessageLength, bytesRead, conn.RemoteAddr()) + continue // Discard the content but keep reading from the connection + } + + // Locking if necessary when interacting with shared resources (svc, etc.) + s.mu.Lock() + // Process the valid telegram + s.log.Info("received valid label data", logger.LogFields{"from": conn.RemoteAddr()}) + fmt.Printf("label: %+v\n", labelData) + s.mu.Unlock() + + // Continue reading next telegrams from this connection + } +} + +// New creates a new Server instance +func New(address string, svc service.Service, log logger.LoggerAdapter) *Server { + return &Server{ + address: address, + svc: svc, + mu: sync.Mutex{}, + log: log, + } }