From ab38428d66414f31c230dc82d25543ce84cbdbf8 Mon Sep 17 00:00:00 2001 From: aespin Date: Tue, 20 Aug 2024 10:08:32 +0200 Subject: [PATCH] wip --- Makefile | 20 ++++ cmd/main.go | 9 ++ docker/Dockerfile | 28 +++++ go.mod | 24 ++++ go.sum | 55 +++++++++ internal/app/app.go | 93 +++++++++++++++ internal/helpers/helper.go | 9 ++ internal/logging/logging.go | 147 +++++++++++++++++++++++ internal/server/server.go | 224 ++++++++++++++++++++++++++++++++++++ internal/service/service.go | 126 ++++++++++++++++++++ internal/types/l2_header.go | 11 ++ internal/types/request.go | 31 +++++ 12 files changed, 777 insertions(+) create mode 100644 Makefile create mode 100644 cmd/main.go create mode 100644 docker/Dockerfile create mode 100644 go.mod create mode 100644 go.sum create mode 100644 internal/app/app.go create mode 100644 internal/helpers/helper.go create mode 100644 internal/logging/logging.go create mode 100644 internal/server/server.go create mode 100644 internal/service/service.go create mode 100644 internal/types/l2_header.go create mode 100644 internal/types/request.go diff --git a/Makefile b/Makefile new file mode 100644 index 0000000..45c7156 --- /dev/null +++ b/Makefile @@ -0,0 +1,20 @@ +# Variables +IMAGE_NAME = registry.espin.casa/artifactory/docker-local +CONTAINER_NAME = cml04-l2-rsm + +# Construir la imagen Docker +build: + docker build -t $(IMAGE_NAME) -f docker/Dockerfile . + +# Ejecutar el contenedor Docker +run: + docker run --name $(CONTAINER_NAME) -d $(IMAGE_NAME) + +# Detener y eliminar el contenedor Docker +stop: + docker stop $(CONTAINER_NAME) + docker rm $(CONTAINER_NAME) + +# Eliminar la imagen Docker +clean: + docker rmi $(IMAGE_NAME) \ No newline at end of file diff --git a/cmd/main.go b/cmd/main.go new file mode 100644 index 0000000..02c725b --- /dev/null +++ b/cmd/main.go @@ -0,0 +1,9 @@ +package main + +import "git.espin.casa/albert/cml04-l2-rsm-in/internal/app" + +func main() { + if err := app.Run(); err != nil { + panic(err) + } +} diff --git a/docker/Dockerfile b/docker/Dockerfile new file mode 100644 index 0000000..56c999a --- /dev/null +++ b/docker/Dockerfile @@ -0,0 +1,28 @@ +# Etapa de compilación +FROM golang:1.22-alpine3.18 AS builder + +WORKDIR /app + +COPY . . + +RUN go get -d -v ./... + +RUN CGO_ENABLED=0 GOOS=linux GOARCH=amd64 go build -mod=mod -ldflags "-s -w" -o myapp cmd/main.go + +# Etapa de producción + +FROM alpine:3.18.6 + +RUN apk --no-cache add tzdata ca-certificates && apk add libaio + +RUN cp /usr/share/zoneinfo/Europe/Madrid /etc/localtime + +RUN echo "Europe/Madrid" > /etc/timezone +ENV TZ Europe/Madrid + + +WORKDIR /root/ + +COPY --from=builder /app/myapp . + +CMD ["./myapp"] \ No newline at end of file diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..15260a5 --- /dev/null +++ b/go.mod @@ -0,0 +1,24 @@ +module git.espin.casa/albert/cml04-l2-rsm-in + +go 1.22.0 + +require ( + git.espin.casa/albert/cml04-eventer v0.0.0-20240222031945-f20c9e4ddc36 + git.espin.casa/albert/cml04-gdm-int v0.0.0-20240221200353-854fa2d29d83 + git.espin.casa/albert/logger v0.0.0-20240221100041-dc3cb01119a3 + google.golang.org/grpc v1.62.0 + google.golang.org/protobuf v1.32.0 +) + +require ( + github.com/golang/protobuf v1.5.3 // indirect + github.com/golang/snappy v0.0.1 // indirect + github.com/google/uuid v1.6.0 // indirect + github.com/nsqio/go-nsq v1.1.0 // indirect + github.com/pkg/errors v0.9.1 // indirect + github.com/sirupsen/logrus v1.9.3 // indirect + golang.org/x/net v0.20.0 // indirect + golang.org/x/sys v0.16.0 // indirect + golang.org/x/text v0.14.0 // indirect + google.golang.org/genproto/googleapis/rpc v0.0.0-20240123012728-ef4313101c80 // indirect +) diff --git a/go.sum b/go.sum new file mode 100644 index 0000000..bcc60db --- /dev/null +++ b/go.sum @@ -0,0 +1,55 @@ +git.espin.casa/albert/cml04-eventer v0.0.0-20240219141409-1aa896b4231e h1:b23pgexcFOMOyhEC04OSvyq0yUYa3tLfHxwtHVIPKBg= +git.espin.casa/albert/cml04-eventer v0.0.0-20240219141409-1aa896b4231e/go.mod h1:0bQIowhHHtVvDNpmHt5ed6tclhHgZHeH0FQlxQETHDw= +git.espin.casa/albert/cml04-eventer v0.0.0-20240222031945-f20c9e4ddc36 h1:HbF07y7ExWmkXVzQeKoo1t/u7rduuvn93IGJFCBCKAg= +git.espin.casa/albert/cml04-eventer v0.0.0-20240222031945-f20c9e4ddc36/go.mod h1:/fj0cTIFEdeYmzL2r4WLEAN2q3N3T48ZPOT1hqusuSI= +git.espin.casa/albert/cml04-gdm-int v0.0.0-20240221200353-854fa2d29d83 h1:M7Tpn26MPG+4PIFEBFwatfajLaEcaGCSeigtfzutGiM= +git.espin.casa/albert/cml04-gdm-int v0.0.0-20240221200353-854fa2d29d83/go.mod h1:ixkbymSPizICKrPse8rBi/WiVDrVZ7PXOKYa7XvB1VM= +git.espin.casa/albert/logger v0.0.0-20240221100041-dc3cb01119a3 h1:HUEESn4o8sVXAUJWGJAATeitiNRxVzzwBU8RiIW4Wzc= +git.espin.casa/albert/logger v0.0.0-20240221100041-dc3cb01119a3/go.mod h1:P1yAUiotJurq7j/wZt6Cnel17HChplkz0E40WD8a5to= +git.espin.ovh/albert/logger v0.0.0-20230415214151-6b29487a3a90 h1:+780Bc/gaN8bi8FD5JJLX4t5IaYk5ABnVAvJu7vgw3c= +git.espin.ovh/albert/logger v0.0.0-20230415214151-6b29487a3a90/go.mod h1:qv5K1umvHSzZoedQtlycCSfeqMRk7yFyVx4BhiwZ6HU= +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk= +github.com/golang/protobuf v1.5.3 h1:KhyjKVUg7Usr/dYsdSqoFveMYd5ko72D+zANwlG1mmg= +github.com/golang/protobuf v1.5.3/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY= +github.com/golang/snappy v0.0.1 h1:Qgr9rKW7uDUkrbSmQeiDsGa8SjGyCOGtuasMWwvp2P4= +github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= +github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= +github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= +github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= +github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/nsqio/go-nsq v1.1.0 h1:PQg+xxiUjA7V+TLdXw7nVrJ5Jbl3sN86EhGCQj4+FYE= +github.com/nsqio/go-nsq v1.1.0/go.mod h1:vKq36oyeVXgsS5Q8YEO7WghqidAVXQlcFxzQbQTuDEY= +github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= +github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/sirupsen/logrus v1.9.3 h1:dueUQJ1C2q9oE3F7wvmSGAaVtTmUizReu6fjN8uqzbQ= +github.com/sirupsen/logrus v1.9.3/go.mod h1:naHLuLoDiP4jHNo9R0sCBMtWGeIprob74mVsIT4qYEQ= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk= +github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo= +golang.org/x/net v0.20.0 h1:aCL9BSgETF1k+blQaYUBx9hJ9LOGP3gAVemcZlf1Kpo= +golang.org/x/net v0.20.0/go.mod h1:z8BVo6PvndSri0LbOE3hAn0apkU+1YvI6E70E9jsnvY= +golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.16.0 h1:xWw16ngr6ZMtmxDyKyIgsE93KNKz5HKmMa3b8ALHidU= +golang.org/x/sys v0.16.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/text v0.14.0 h1:ScX5w1eTa3QqT8oi6+ziP7dTV1S2+ALU0bI+0zXKWiQ= +golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU= +golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +google.golang.org/genproto/googleapis/rpc v0.0.0-20240123012728-ef4313101c80 h1:AjyfHzEPEFp/NpvfN5g+KDla3EMojjhRVZc1i7cj+oM= +google.golang.org/genproto/googleapis/rpc v0.0.0-20240123012728-ef4313101c80/go.mod h1:PAREbraiVEVGVdTZsVWjSbbTtSyGbAgIIvni8a8CD5s= +google.golang.org/grpc v1.62.0 h1:HQKZ/fa1bXkX1oFOvSjmZEUL8wLSaZTjCcLAlmZRtdk= +google.golang.org/grpc v1.62.0/go.mod h1:IWTG0VlJLCh1SkC58F7np9ka9mx/WNkjl4PGJaiq+QE= +google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw= +google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= +google.golang.org/protobuf v1.32.0 h1:pPC6BG5ex8PDFnkbrGU3EixyhKcQ2aDuBS36lqK/C7I= +google.golang.org/protobuf v1.32.0/go.mod h1:c6P6GXX6sHbq/GpV6MGZEdwhWPcYBgnhAHhKbcUYpos= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/internal/app/app.go b/internal/app/app.go new file mode 100644 index 0000000..479e53b --- /dev/null +++ b/internal/app/app.go @@ -0,0 +1,93 @@ +package app + +import ( + "flag" + "os" + "os/signal" + "syscall" + + cml04eventer "git.espin.casa/albert/cml04-eventer" + "git.espin.casa/albert/cml04-gdm-int/pkg/api" + "git.espin.casa/albert/cml04-l2-rsm-in/internal/logging" + "git.espin.casa/albert/cml04-l2-rsm-in/internal/server" + "git.espin.casa/albert/cml04-l2-rsm-in/internal/service" + "git.espin.casa/albert/logger" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" +) + +func Run() error { + // flags + l2InAddr := flag.String("l2-in-addr", "0.0.0.0:5012", "tcp l2 server address") + apiAddr := flag.String("l2-out-addr", "localhost:3333", "tcp adress") + nsqAddr := flag.String("nsq-addr", "10.1.152.13", "nsq server address") + nsqPort := flag.Int("nsq-port", 4150, "nsq server port") + logLevel := flag.String("log-level", "debug", "trace debug level") + flag.Parse() + // setup logger + log := logger.New(os.Stdout, *logLevel) + // log fields + logFields := logger.LogFields{ + "log_level": *logLevel, + "l2_in_address": *l2InAddr, + "api_adress": *apiAddr, + "nsq_address": *nsqAddr, + "nsq_port": *nsqPort, + } + // create event publisher + publisher, err := cml04eventer.NewPublisher(&cml04eventer.PublisherImplConfig{ + NSQAddress: *nsqAddr, + NSQPort: *nsqPort, + Marshaler: cml04eventer.JSONMarshaler{}, + }, log) + if err != nil { + log.Error("failed to create event publisher", err, logFields) + return err + } + defer publisher.Close() + // api dial + conn, err := grpc.Dial(*apiAddr, grpc.WithTransportCredentials(insecure.NewCredentials())) + if err != nil { + log.Error("failed to dial api server", err, logger.LogFields{}) + } + // create api client + client := api.NewRollDataServiceClient(conn) + defer conn.Close() + // create service + svc := service.NewService(client, publisher) + svc = logging.NewLoggingService(log, svc) + // create server + server := server.NewServer(*l2InAddr, log, svc) + // start tcp server (telegrams) + go func() { + if err := server.Run(); err != nil { + log.Error("server error", err, logFields) + } + }() + // info banner + log.Info("started cml04-l2-rsm-in telegram service", logFields) + // wait signal to finish + signal := WaitSignal() + log.Info("signal received", logFields.Add(logger.LogFields{ + "signal": signal, + })) + return nil +} + +// WaitSignal catching exit signal +func WaitSignal() os.Signal { + ch := make(chan os.Signal, 2) + signal.Notify( + ch, + syscall.SIGINT, + syscall.SIGQUIT, + syscall.SIGTERM, + ) + for { + sig := <-ch + switch sig { + case syscall.SIGINT, syscall.SIGQUIT, syscall.SIGTERM: + return sig + } + } +} diff --git a/internal/helpers/helper.go b/internal/helpers/helper.go new file mode 100644 index 0000000..890c795 --- /dev/null +++ b/internal/helpers/helper.go @@ -0,0 +1,9 @@ +package helpers + +import "regexp" + +func CleanString(s string) string { + // reg expression only alfanumeric characters + reg := regexp.MustCompile("[^a-zA-Z0-9]+") + return reg.ReplaceAllString(s, "") +} diff --git a/internal/logging/logging.go b/internal/logging/logging.go new file mode 100644 index 0000000..473804e --- /dev/null +++ b/internal/logging/logging.go @@ -0,0 +1,147 @@ +package logging + +import ( + "context" + "time" + + "git.espin.casa/albert/cml04-gdm-int/pkg/api" + "git.espin.casa/albert/cml04-l2-rsm-in/internal/service" + "git.espin.casa/albert/logger" +) + +type Logging struct { + log logger.LoggerAdapter + next service.IService +} + +// PublishBdRollData implements service.IService. +func (svc *Logging) PublishBdRollData(ctx context.Context, telegramData *api.BdRollData) (err error) { + defer func(start time.Time) { + logFields := logger.LogFields{ + "took": time.Since(start), + "bd_roll_id": telegramData.GetBdRollId(), + } + if err != nil { + svc.log.Error("BD publish roll data failed", err, logFields) + } else { + svc.log.Info("BD publish roll data success", logFields) + } + }(time.Now()) + return svc.next.PublishBdRollData(ctx, telegramData) +} + +// PublishEdRollData implements service.IService. +func (svc *Logging) PublishEdRollData(ctx context.Context, telegramData *api.EdRollData) (err error) { + defer func(start time.Time) { + logFields := logger.LogFields{ + "took": time.Since(start), + "ed_roll_id": telegramData.GetEdRollId(), + } + if err != nil { + svc.log.Error("ED publish roll data failed", err, logFields) + } else { + svc.log.Info("ED publish roll data success", logFields) + } + }(time.Now()) + return svc.next.PublishEdRollData(ctx, telegramData) +} + +// PublishUfRollData implements service.IService. +func (svc *Logging) PublishUfRollData(ctx context.Context, telegramData *api.UfRollData) (err error) { + defer func(start time.Time) { + logFields := logger.LogFields{ + "took": time.Since(start), + "uf_roll_id": telegramData.GetUfRollId(), + } + if err != nil { + svc.log.Error("UF publish roll data failed", err, logFields) + } else { + svc.log.Info("UF publish roll data success", logFields) + } + }(time.Now()) + return svc.next.PublishUfRollData(ctx, telegramData) +} + +// PublishUrRollData implements service.IService. +func (svc *Logging) PublishUrRollData(ctx context.Context, telegramData *api.UrRollData) (err error) { + defer func(start time.Time) { + logFields := logger.LogFields{ + "took": time.Since(start), + "ur_roll_id": telegramData.GetUrRollId(), + } + if err != nil { + svc.log.Error("UR publish roll data failed", err, logFields) + } else { + svc.log.Info("UR publish roll data success", logFields) + } + }(time.Now()) + return svc.next.PublishUrRollData(ctx, telegramData) +} + +// BdReqTelegram implements service.IService. +func (svc *Logging) BdReqTelegram(ctx context.Context, req *api.BdRollDataReq) (res *api.BdRollDataRes, err error) { + defer func(start time.Time) { + logFields := logger.LogFields{ + "took": time.Since(start), + "request_roll_id": req.GetRollId(), + } + if err != nil { + svc.log.Error("BD request roll data failed", err, logFields) + } else { + svc.log.Info("BD request roll data success", logFields) + } + }(time.Now()) + return svc.next.BdReqTelegram(ctx, req) +} + +// EdReqTelegram implements service.IService. +func (svc *Logging) EdReqTelegram(ctx context.Context, req *api.EdRollDataReq) (res *api.EdRollDataRes, err error) { + defer func(start time.Time) { + logFields := logger.LogFields{ + "took": time.Since(start), + "request_roll_id": req.GetRollId(), + } + if err != nil { + svc.log.Error("ED request roll data failed", err, logFields) + } else { + svc.log.Info("ED request roll data success", logFields) + } + }(time.Now()) + return svc.next.EdReqTelegram(ctx, req) +} + +// UfReqTelegram implements service.IService. +func (svc *Logging) UfReqTelegram(ctx context.Context, req *api.UfRollDataReq) (res *api.UfRollDataRes, err error) { + defer func(start time.Time) { + logFields := logger.LogFields{ + "took": time.Since(start), + "request_roll_id": req.GetRollId(), + } + if err != nil { + svc.log.Error("UF request roll data failed", err, logFields) + } else { + svc.log.Info("UF request roll data success", logFields) + } + }(time.Now()) + return svc.next.UfReqTelegram(ctx, req) +} + +// UrReqTelegram implements service.IService. +func (svc *Logging) UrReqTelegram(ctx context.Context, req *api.UrRollDataReq) (res *api.UrRollDataRes, err error) { + defer func(start time.Time) { + logFields := logger.LogFields{ + "took": time.Since(start), + "request_roll_id": req.GetRollId(), + } + if err != nil { + svc.log.Error("UR request roll data failed", err, logFields) + } else { + svc.log.Info("UR request roll data success", logFields) + } + }(time.Now()) + return svc.next.UrReqTelegram(ctx, req) +} + +func NewLoggingService(log logger.LoggerAdapter, next service.IService) service.IService { + return &Logging{log, next} +} diff --git a/internal/server/server.go b/internal/server/server.go new file mode 100644 index 0000000..01a379d --- /dev/null +++ b/internal/server/server.go @@ -0,0 +1,224 @@ +package server + +import ( + "bytes" + "context" + "encoding/binary" + "net" + "time" + + "git.espin.casa/albert/cml04-gdm-int/pkg/api" + "git.espin.casa/albert/cml04-l2-rsm-in/internal/helpers" + "git.espin.casa/albert/cml04-l2-rsm-in/internal/service" + "git.espin.casa/albert/cml04-l2-rsm-in/internal/types" + "git.espin.casa/albert/logger" +) + +type Server struct { + address string + svc service.IService + log logger.LoggerAdapter +} + +type Client struct { + conn net.Conn + svc service.IService + log logger.LoggerAdapter +} + +func (client *Client) handleRequest() { + // telegram bytes buffer holder 4096 maximun telegram size defined by SMS + telegramBytes := bytes.NewBuffer(make([]byte, 4096)) + for { + _, err := client.conn.Read(telegramBytes.Bytes()) + if err != nil { + client.log.Error("reading telegram failed", err, logger.LogFields{}) + return + } + // interpret first 24 bytes as header + header := &types.TelegramHeader{} + headerReader := bytes.NewReader(telegramBytes.Bytes()[:24]) + if err := binary.Read(headerReader, binary.LittleEndian, header); err != nil { + client.log.Error("interpreting telegram header failed", err, logger.LogFields{}) + return + } + // switch telegram id + switch header.TelegramID { + // handle telegram data + case types.BdReqTelegramID: + // read telegram data + telegramData := &types.BdRequestTelegram{} + if err := binary.Read(telegramBytes, binary.LittleEndian, telegramData); err != nil { + client.log.Error("reading telegram content failed", err, logger.LogFields{}) + return + } + // clean roll set id + id := helpers.CleanString(string(telegramData.BdRollSetId[:])) + // api request roll + req := &api.BdRollDataReq{ + Sender: "cml04-l2-rsm", + RollId: id, + TimeStamp: time.Now().UTC().Format(time.RFC3339), + } + // create context + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + // handle telegram data + res, err := client.handleBdDataRequest(ctx, req) + if err != nil { + client.log.Error("processing telegram failed", err, logger.LogFields{}) + return + } + // publish response + if err := client.handlePublishBdRollData(ctx, res); err != nil { + client.log.Error("publishing BD roll data failed", err, logger.LogFields{}) + return + } + + case types.UrReqTelegramID: + // read telegram data + telegramData := &types.UrRequestTelegram{} + if err := binary.Read(telegramBytes, binary.LittleEndian, telegramData); err != nil { + client.log.Error("reading telegram content failed", err, logger.LogFields{}) + return + } + // clean roll set id + id := helpers.CleanString(string(telegramData.UrRollSetId[:])) + // api request roll + req := &api.UrRollDataReq{ + Sender: "cml04-l2-rsm", + RollId: id, + TimeStamp: time.Now().UTC().Format(time.RFC3339), + } + // handle telegram data + res, err := client.handleUrReqTelegram(context.Background(), req) + if err != nil { + client.log.Error("processing telegram failed", err, logger.LogFields{}) + return + } + // publish response + if err := client.handlePublishUrRollData(context.Background(), res); err != nil { + client.log.Error("publishing UR roll data failed", err, logger.LogFields{}) + return + } + case types.EdReqTelegramID: + // read telegram data + telegramData := &types.EdRequestTelegram{} + if err := binary.Read(telegramBytes, binary.LittleEndian, telegramData); err != nil { + client.log.Error("reading telegram content failed", err, logger.LogFields{}) + return + } + // clean roll set id + id := helpers.CleanString(string(telegramData.EdRollSetId[:])) + // api request roll + req := &api.EdRollDataReq{ + Sender: "cml04-l2-rsm", + RollId: id, + TimeStamp: time.Now().UTC().Format(time.RFC3339), + } + // handle telegram data + res, err := client.handleEdReqTelegram(context.Background(), req) + if err != nil { + client.log.Error("processing telegram failed", err, logger.LogFields{}) + return + } + // publish response + if err := client.handlePublishEdRollData(context.Background(), res); err != nil { + client.log.Error("publishing ED roll data failed", err, logger.LogFields{}) + return + } + case types.UfReqTelegramID: + // read telegram data + telegramData := &types.UfRequestTelegram{} + if err := binary.Read(telegramBytes, binary.LittleEndian, telegramData); err != nil { + client.log.Error("reading telegram content failed", err, logger.LogFields{}) + return + } + // clean roll set id + id := helpers.CleanString(string(telegramData.UfRollSetId[:])) + // api request roll + req := &api.UfRollDataReq{ + Sender: "cml04-l2-rsm", + RollId: id, + TimeStamp: time.Now().UTC().Format(time.RFC3339), + } + // handle telegram data + res, err := client.handleUfReqTelegram(context.Background(), req) + if err != nil { + client.log.Error("processing telegram failed", err, logger.LogFields{}) + return + } + // publish response + if err := client.handlePublishUfRollData(context.Background(), res); err != nil { + client.log.Error("publishing UF roll data failed", err, logger.LogFields{}) + return + } + } + } +} + +func (client *Client) handlePublishBdRollData(ctx context.Context, data *api.BdRollDataRes) error { + return client.svc.PublishBdRollData(ctx, data.GetRollData()) +} + +func (client *Client) handlePublishUrRollData(ctx context.Context, data *api.UrRollDataRes) error { + return client.svc.PublishUrRollData(ctx, data.GetRollData()) +} + +func (client *Client) handlePublishEdRollData(ctx context.Context, data *api.EdRollDataRes) error { + return client.svc.PublishEdRollData(ctx, data.GetRollData()) +} + +func (client *Client) handlePublishUfRollData(ctx context.Context, data *api.UfRollDataRes) error { + return client.svc.PublishUfRollData(ctx, data.GetRollData()) +} + +func (client *Client) handleBdDataRequest(ctx context.Context, req *api.BdRollDataReq) (*api.BdRollDataRes, error) { + return client.svc.BdReqTelegram(ctx, req) +} + +func (client *Client) handleUrReqTelegram(ctx context.Context, req *api.UrRollDataReq) (*api.UrRollDataRes, error) { + return client.svc.UrReqTelegram(ctx, req) +} + +func (client *Client) handleEdReqTelegram(ctx context.Context, req *api.EdRollDataReq) (*api.EdRollDataRes, error) { + return client.svc.EdReqTelegram(ctx, req) +} + +func (client *Client) handleUfReqTelegram(ctx context.Context, telegramData *api.UfRollDataReq) (*api.UfRollDataRes, error) { + return client.svc.UfReqTelegram(ctx, telegramData) +} + +func (server *Server) Run() error { + // create tcp listener + listener, err := net.Listen("tcp", server.address) + if err != nil { + return err + } + // close on exit + defer listener.Close() + // accept incoming connections + for { + // accept connection + conn, err := listener.Accept() + if err != nil { + return err + } + // create new client + client := &Client{ + conn: conn, + svc: server.svc, + log: server.log, + } + // handle client + go client.handleRequest() + } +} + +func NewServer(address string, log logger.LoggerAdapter, svc service.IService) *Server { + return &Server{ + address: address, + svc: svc, + log: log, + } +} diff --git a/internal/service/service.go b/internal/service/service.go new file mode 100644 index 0000000..48b6fa2 --- /dev/null +++ b/internal/service/service.go @@ -0,0 +1,126 @@ +package service + +import ( + "context" + + cml04eventer "git.espin.casa/albert/cml04-eventer" + "git.espin.casa/albert/cml04-gdm-int/pkg/api" + "google.golang.org/protobuf/proto" +) + +const ( + NSQTopic string = "RSM" + NSQChannel string = "cml04-l2-rsm-in" +) + +type IService interface { + BdReqTelegram(ctx context.Context, telegramData *api.BdRollDataReq) (res *api.BdRollDataRes, err error) + UrReqTelegram(ctx context.Context, telegramData *api.UrRollDataReq) (res *api.UrRollDataRes, err error) + EdReqTelegram(ctx context.Context, telegramData *api.EdRollDataReq) (res *api.EdRollDataRes, err error) + UfReqTelegram(ctx context.Context, telegramData *api.UfRollDataReq) (res *api.UfRollDataRes, err error) + PublishBdRollData(ctx context.Context, telegramData *api.BdRollData) (err error) + PublishUrRollData(ctx context.Context, telegramData *api.UrRollData) (err error) + PublishEdRollData(ctx context.Context, telegramData *api.EdRollData) (err error) + PublishUfRollData(ctx context.Context, telegramData *api.UfRollData) (err error) +} + +type service struct { + client api.RollDataServiceClient + publisher cml04eventer.Publisher +} + +// PublishBdRollData implements IService. +func (s *service) PublishBdRollData(ctx context.Context, telegramData *api.BdRollData) (err error) { + // create meta data + meta := cml04eventer.NewMetaData() + // set telegram id + meta.Set("telegram_id", 9000) + // marshal telegram data + data, err := proto.Marshal(telegramData) + if err != nil { + return err + } + // set event data + evt := cml04eventer.NewEvent(NSQChannel, NSQTopic, meta, data) + // publish event data + return s.publisher.PublishEvent(evt) +} + +// PublishUrRollData implements IService. +func (s *service) PublishUrRollData(ctx context.Context, telegramData *api.UrRollData) (err error) { + // create meta data + meta := cml04eventer.NewMetaData() + // set telegram id + meta.Set("telegram_id", 9001) + // marshal telegram data + data, err := proto.Marshal(telegramData) + if err != nil { + return err + } + // set event data + evt := cml04eventer.NewEvent(NSQChannel, NSQTopic, meta, data) + // publish event data + return s.publisher.PublishEvent(evt) +} + +// PublishEdRollData implements IService. +func (s *service) PublishEdRollData(ctx context.Context, telegramData *api.EdRollData) (err error) { + // create meta data + meta := cml04eventer.NewMetaData() + // set telegram id + meta.Set("telegram_id", 9002) + // marshal telegram data + data, err := proto.Marshal(telegramData) + if err != nil { + return err + } + // set event data + evt := cml04eventer.NewEvent(NSQChannel, NSQTopic, meta, data) + // publish event data + return s.publisher.PublishEvent(evt) + +} + +// PublishUfRollData implements IService. +func (s *service) PublishUfRollData(ctx context.Context, telegramData *api.UfRollData) (err error) { + // create meta data + meta := cml04eventer.NewMetaData() + // set telegram id + meta.Set("telegram_id", 9003) + // marshal telegram data + data, err := proto.Marshal(telegramData) + if err != nil { + return err + } + // set event data + evt := cml04eventer.NewEvent(NSQChannel, NSQTopic, meta, data) + // publish event data + return s.publisher.PublishEvent(evt) +} + +// EdReqTelegram implements IService. +func (svc *service) EdReqTelegram(ctx context.Context, req *api.EdRollDataReq) (res *api.EdRollDataRes, err error) { + return svc.client.EdRollData(ctx, req) +} + +// UfReqTelegram implements IService. +func (svc *service) UfReqTelegram(ctx context.Context, req *api.UfRollDataReq) (res *api.UfRollDataRes, err error) { + return svc.client.UfRollData(ctx, req) +} + +// UrReqTelegram implements IService. +func (svc *service) UrReqTelegram(ctx context.Context, req *api.UrRollDataReq) (res *api.UrRollDataRes, err error) { + return svc.client.UrRollData(ctx, req) +} + +// BdReqTelegram implements IService. +func (svc *service) BdReqTelegram(ctx context.Context, req *api.BdRollDataReq) (res *api.BdRollDataRes, err error) { + return svc.client.BdRollData(ctx, req) +} + +func NewService(client api.RollDataServiceClient, publisher cml04eventer.Publisher) IService { + return &service{ + client: client, + publisher: publisher, + } +} diff --git a/internal/types/l2_header.go b/internal/types/l2_header.go new file mode 100644 index 0000000..4800c20 --- /dev/null +++ b/internal/types/l2_header.go @@ -0,0 +1,11 @@ +package types + +type TelegramID int16 + +type TelegramHeader struct { + MessageLength int16 `json:"message_length"` + TelegramID TelegramID `json:"telegram_id"` + SequenceCounter int16 `json:"sequence_counter"` + Flags int16 `json:"flags"` + TimeStamp [8]int16 `json:"timestamp"` +} diff --git a/internal/types/request.go b/internal/types/request.go new file mode 100644 index 0000000..15034f4 --- /dev/null +++ b/internal/types/request.go @@ -0,0 +1,31 @@ +package types + +const ( + BdReqTelegramID TelegramID = 9500 + UrReqTelegramID TelegramID = 9501 + EdReqTelegramID TelegramID = 9502 + UfReqTelegramID TelegramID = 9503 +) + +type BdRequestTelegram struct { + TelegramHeader TelegramHeader + BdRollSetId [20]byte +} + +type UrRequestTelegram struct { + TelegramHeader TelegramHeader + UrRollSetId [20]byte + UrRollSetType int32 +} + +type EdRequestTelegram struct { + TelegramHeader TelegramHeader + EdRollSetId [20]byte + EdRollSetType int32 +} + +type UfRequestTelegram struct { + TelegramHeader TelegramHeader + UfRollSetId [20]byte + UfRollSetType int32 +}