This commit is contained in:
aespin 2024-08-20 10:08:32 +02:00
commit ab38428d66
12 changed files with 777 additions and 0 deletions

20
Makefile Normal file
View File

@ -0,0 +1,20 @@
# Variables
IMAGE_NAME = registry.espin.casa/artifactory/docker-local
CONTAINER_NAME = cml04-l2-rsm
# Construir la imagen Docker
build:
docker build -t $(IMAGE_NAME) -f docker/Dockerfile .
# Ejecutar el contenedor Docker
run:
docker run --name $(CONTAINER_NAME) -d $(IMAGE_NAME)
# Detener y eliminar el contenedor Docker
stop:
docker stop $(CONTAINER_NAME)
docker rm $(CONTAINER_NAME)
# Eliminar la imagen Docker
clean:
docker rmi $(IMAGE_NAME)

9
cmd/main.go Normal file
View File

@ -0,0 +1,9 @@
package main
import "git.espin.casa/albert/cml04-l2-rsm-in/internal/app"
func main() {
if err := app.Run(); err != nil {
panic(err)
}
}

28
docker/Dockerfile Normal file
View File

@ -0,0 +1,28 @@
# Etapa de compilación
FROM golang:1.22-alpine3.18 AS builder
WORKDIR /app
COPY . .
RUN go get -d -v ./...
RUN CGO_ENABLED=0 GOOS=linux GOARCH=amd64 go build -mod=mod -ldflags "-s -w" -o myapp cmd/main.go
# Etapa de producción
FROM alpine:3.18.6
RUN apk --no-cache add tzdata ca-certificates && apk add libaio
RUN cp /usr/share/zoneinfo/Europe/Madrid /etc/localtime
RUN echo "Europe/Madrid" > /etc/timezone
ENV TZ Europe/Madrid
WORKDIR /root/
COPY --from=builder /app/myapp .
CMD ["./myapp"]

24
go.mod Normal file
View File

@ -0,0 +1,24 @@
module git.espin.casa/albert/cml04-l2-rsm-in
go 1.22.0
require (
git.espin.casa/albert/cml04-eventer v0.0.0-20240222031945-f20c9e4ddc36
git.espin.casa/albert/cml04-gdm-int v0.0.0-20240221200353-854fa2d29d83
git.espin.casa/albert/logger v0.0.0-20240221100041-dc3cb01119a3
google.golang.org/grpc v1.62.0
google.golang.org/protobuf v1.32.0
)
require (
github.com/golang/protobuf v1.5.3 // indirect
github.com/golang/snappy v0.0.1 // indirect
github.com/google/uuid v1.6.0 // indirect
github.com/nsqio/go-nsq v1.1.0 // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/sirupsen/logrus v1.9.3 // indirect
golang.org/x/net v0.20.0 // indirect
golang.org/x/sys v0.16.0 // indirect
golang.org/x/text v0.14.0 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20240123012728-ef4313101c80 // indirect
)

55
go.sum Normal file
View File

@ -0,0 +1,55 @@
git.espin.casa/albert/cml04-eventer v0.0.0-20240219141409-1aa896b4231e h1:b23pgexcFOMOyhEC04OSvyq0yUYa3tLfHxwtHVIPKBg=
git.espin.casa/albert/cml04-eventer v0.0.0-20240219141409-1aa896b4231e/go.mod h1:0bQIowhHHtVvDNpmHt5ed6tclhHgZHeH0FQlxQETHDw=
git.espin.casa/albert/cml04-eventer v0.0.0-20240222031945-f20c9e4ddc36 h1:HbF07y7ExWmkXVzQeKoo1t/u7rduuvn93IGJFCBCKAg=
git.espin.casa/albert/cml04-eventer v0.0.0-20240222031945-f20c9e4ddc36/go.mod h1:/fj0cTIFEdeYmzL2r4WLEAN2q3N3T48ZPOT1hqusuSI=
git.espin.casa/albert/cml04-gdm-int v0.0.0-20240221200353-854fa2d29d83 h1:M7Tpn26MPG+4PIFEBFwatfajLaEcaGCSeigtfzutGiM=
git.espin.casa/albert/cml04-gdm-int v0.0.0-20240221200353-854fa2d29d83/go.mod h1:ixkbymSPizICKrPse8rBi/WiVDrVZ7PXOKYa7XvB1VM=
git.espin.casa/albert/logger v0.0.0-20240221100041-dc3cb01119a3 h1:HUEESn4o8sVXAUJWGJAATeitiNRxVzzwBU8RiIW4Wzc=
git.espin.casa/albert/logger v0.0.0-20240221100041-dc3cb01119a3/go.mod h1:P1yAUiotJurq7j/wZt6Cnel17HChplkz0E40WD8a5to=
git.espin.ovh/albert/logger v0.0.0-20230415214151-6b29487a3a90 h1:+780Bc/gaN8bi8FD5JJLX4t5IaYk5ABnVAvJu7vgw3c=
git.espin.ovh/albert/logger v0.0.0-20230415214151-6b29487a3a90/go.mod h1:qv5K1umvHSzZoedQtlycCSfeqMRk7yFyVx4BhiwZ6HU=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk=
github.com/golang/protobuf v1.5.3 h1:KhyjKVUg7Usr/dYsdSqoFveMYd5ko72D+zANwlG1mmg=
github.com/golang/protobuf v1.5.3/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY=
github.com/golang/snappy v0.0.1 h1:Qgr9rKW7uDUkrbSmQeiDsGa8SjGyCOGtuasMWwvp2P4=
github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI=
github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/nsqio/go-nsq v1.1.0 h1:PQg+xxiUjA7V+TLdXw7nVrJ5Jbl3sN86EhGCQj4+FYE=
github.com/nsqio/go-nsq v1.1.0/go.mod h1:vKq36oyeVXgsS5Q8YEO7WghqidAVXQlcFxzQbQTuDEY=
github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/sirupsen/logrus v1.9.3 h1:dueUQJ1C2q9oE3F7wvmSGAaVtTmUizReu6fjN8uqzbQ=
github.com/sirupsen/logrus v1.9.3/go.mod h1:naHLuLoDiP4jHNo9R0sCBMtWGeIprob74mVsIT4qYEQ=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk=
github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo=
golang.org/x/net v0.20.0 h1:aCL9BSgETF1k+blQaYUBx9hJ9LOGP3gAVemcZlf1Kpo=
golang.org/x/net v0.20.0/go.mod h1:z8BVo6PvndSri0LbOE3hAn0apkU+1YvI6E70E9jsnvY=
golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.16.0 h1:xWw16ngr6ZMtmxDyKyIgsE93KNKz5HKmMa3b8ALHidU=
golang.org/x/sys v0.16.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/text v0.14.0 h1:ScX5w1eTa3QqT8oi6+ziP7dTV1S2+ALU0bI+0zXKWiQ=
golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
google.golang.org/genproto/googleapis/rpc v0.0.0-20240123012728-ef4313101c80 h1:AjyfHzEPEFp/NpvfN5g+KDla3EMojjhRVZc1i7cj+oM=
google.golang.org/genproto/googleapis/rpc v0.0.0-20240123012728-ef4313101c80/go.mod h1:PAREbraiVEVGVdTZsVWjSbbTtSyGbAgIIvni8a8CD5s=
google.golang.org/grpc v1.62.0 h1:HQKZ/fa1bXkX1oFOvSjmZEUL8wLSaZTjCcLAlmZRtdk=
google.golang.org/grpc v1.62.0/go.mod h1:IWTG0VlJLCh1SkC58F7np9ka9mx/WNkjl4PGJaiq+QE=
google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw=
google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc=
google.golang.org/protobuf v1.32.0 h1:pPC6BG5ex8PDFnkbrGU3EixyhKcQ2aDuBS36lqK/C7I=
google.golang.org/protobuf v1.32.0/go.mod h1:c6P6GXX6sHbq/GpV6MGZEdwhWPcYBgnhAHhKbcUYpos=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=

93
internal/app/app.go Normal file
View File

@ -0,0 +1,93 @@
package app
import (
"flag"
"os"
"os/signal"
"syscall"
cml04eventer "git.espin.casa/albert/cml04-eventer"
"git.espin.casa/albert/cml04-gdm-int/pkg/api"
"git.espin.casa/albert/cml04-l2-rsm-in/internal/logging"
"git.espin.casa/albert/cml04-l2-rsm-in/internal/server"
"git.espin.casa/albert/cml04-l2-rsm-in/internal/service"
"git.espin.casa/albert/logger"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
)
func Run() error {
// flags
l2InAddr := flag.String("l2-in-addr", "0.0.0.0:5012", "tcp l2 server address")
apiAddr := flag.String("l2-out-addr", "localhost:3333", "tcp adress")
nsqAddr := flag.String("nsq-addr", "10.1.152.13", "nsq server address")
nsqPort := flag.Int("nsq-port", 4150, "nsq server port")
logLevel := flag.String("log-level", "debug", "trace debug level")
flag.Parse()
// setup logger
log := logger.New(os.Stdout, *logLevel)
// log fields
logFields := logger.LogFields{
"log_level": *logLevel,
"l2_in_address": *l2InAddr,
"api_adress": *apiAddr,
"nsq_address": *nsqAddr,
"nsq_port": *nsqPort,
}
// create event publisher
publisher, err := cml04eventer.NewPublisher(&cml04eventer.PublisherImplConfig{
NSQAddress: *nsqAddr,
NSQPort: *nsqPort,
Marshaler: cml04eventer.JSONMarshaler{},
}, log)
if err != nil {
log.Error("failed to create event publisher", err, logFields)
return err
}
defer publisher.Close()
// api dial
conn, err := grpc.Dial(*apiAddr, grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
log.Error("failed to dial api server", err, logger.LogFields{})
}
// create api client
client := api.NewRollDataServiceClient(conn)
defer conn.Close()
// create service
svc := service.NewService(client, publisher)
svc = logging.NewLoggingService(log, svc)
// create server
server := server.NewServer(*l2InAddr, log, svc)
// start tcp server (telegrams)
go func() {
if err := server.Run(); err != nil {
log.Error("server error", err, logFields)
}
}()
// info banner
log.Info("started cml04-l2-rsm-in telegram service", logFields)
// wait signal to finish
signal := WaitSignal()
log.Info("signal received", logFields.Add(logger.LogFields{
"signal": signal,
}))
return nil
}
// WaitSignal catching exit signal
func WaitSignal() os.Signal {
ch := make(chan os.Signal, 2)
signal.Notify(
ch,
syscall.SIGINT,
syscall.SIGQUIT,
syscall.SIGTERM,
)
for {
sig := <-ch
switch sig {
case syscall.SIGINT, syscall.SIGQUIT, syscall.SIGTERM:
return sig
}
}
}

View File

@ -0,0 +1,9 @@
package helpers
import "regexp"
func CleanString(s string) string {
// reg expression only alfanumeric characters
reg := regexp.MustCompile("[^a-zA-Z0-9]+")
return reg.ReplaceAllString(s, "")
}

147
internal/logging/logging.go Normal file
View File

@ -0,0 +1,147 @@
package logging
import (
"context"
"time"
"git.espin.casa/albert/cml04-gdm-int/pkg/api"
"git.espin.casa/albert/cml04-l2-rsm-in/internal/service"
"git.espin.casa/albert/logger"
)
type Logging struct {
log logger.LoggerAdapter
next service.IService
}
// PublishBdRollData implements service.IService.
func (svc *Logging) PublishBdRollData(ctx context.Context, telegramData *api.BdRollData) (err error) {
defer func(start time.Time) {
logFields := logger.LogFields{
"took": time.Since(start),
"bd_roll_id": telegramData.GetBdRollId(),
}
if err != nil {
svc.log.Error("BD publish roll data failed", err, logFields)
} else {
svc.log.Info("BD publish roll data success", logFields)
}
}(time.Now())
return svc.next.PublishBdRollData(ctx, telegramData)
}
// PublishEdRollData implements service.IService.
func (svc *Logging) PublishEdRollData(ctx context.Context, telegramData *api.EdRollData) (err error) {
defer func(start time.Time) {
logFields := logger.LogFields{
"took": time.Since(start),
"ed_roll_id": telegramData.GetEdRollId(),
}
if err != nil {
svc.log.Error("ED publish roll data failed", err, logFields)
} else {
svc.log.Info("ED publish roll data success", logFields)
}
}(time.Now())
return svc.next.PublishEdRollData(ctx, telegramData)
}
// PublishUfRollData implements service.IService.
func (svc *Logging) PublishUfRollData(ctx context.Context, telegramData *api.UfRollData) (err error) {
defer func(start time.Time) {
logFields := logger.LogFields{
"took": time.Since(start),
"uf_roll_id": telegramData.GetUfRollId(),
}
if err != nil {
svc.log.Error("UF publish roll data failed", err, logFields)
} else {
svc.log.Info("UF publish roll data success", logFields)
}
}(time.Now())
return svc.next.PublishUfRollData(ctx, telegramData)
}
// PublishUrRollData implements service.IService.
func (svc *Logging) PublishUrRollData(ctx context.Context, telegramData *api.UrRollData) (err error) {
defer func(start time.Time) {
logFields := logger.LogFields{
"took": time.Since(start),
"ur_roll_id": telegramData.GetUrRollId(),
}
if err != nil {
svc.log.Error("UR publish roll data failed", err, logFields)
} else {
svc.log.Info("UR publish roll data success", logFields)
}
}(time.Now())
return svc.next.PublishUrRollData(ctx, telegramData)
}
// BdReqTelegram implements service.IService.
func (svc *Logging) BdReqTelegram(ctx context.Context, req *api.BdRollDataReq) (res *api.BdRollDataRes, err error) {
defer func(start time.Time) {
logFields := logger.LogFields{
"took": time.Since(start),
"request_roll_id": req.GetRollId(),
}
if err != nil {
svc.log.Error("BD request roll data failed", err, logFields)
} else {
svc.log.Info("BD request roll data success", logFields)
}
}(time.Now())
return svc.next.BdReqTelegram(ctx, req)
}
// EdReqTelegram implements service.IService.
func (svc *Logging) EdReqTelegram(ctx context.Context, req *api.EdRollDataReq) (res *api.EdRollDataRes, err error) {
defer func(start time.Time) {
logFields := logger.LogFields{
"took": time.Since(start),
"request_roll_id": req.GetRollId(),
}
if err != nil {
svc.log.Error("ED request roll data failed", err, logFields)
} else {
svc.log.Info("ED request roll data success", logFields)
}
}(time.Now())
return svc.next.EdReqTelegram(ctx, req)
}
// UfReqTelegram implements service.IService.
func (svc *Logging) UfReqTelegram(ctx context.Context, req *api.UfRollDataReq) (res *api.UfRollDataRes, err error) {
defer func(start time.Time) {
logFields := logger.LogFields{
"took": time.Since(start),
"request_roll_id": req.GetRollId(),
}
if err != nil {
svc.log.Error("UF request roll data failed", err, logFields)
} else {
svc.log.Info("UF request roll data success", logFields)
}
}(time.Now())
return svc.next.UfReqTelegram(ctx, req)
}
// UrReqTelegram implements service.IService.
func (svc *Logging) UrReqTelegram(ctx context.Context, req *api.UrRollDataReq) (res *api.UrRollDataRes, err error) {
defer func(start time.Time) {
logFields := logger.LogFields{
"took": time.Since(start),
"request_roll_id": req.GetRollId(),
}
if err != nil {
svc.log.Error("UR request roll data failed", err, logFields)
} else {
svc.log.Info("UR request roll data success", logFields)
}
}(time.Now())
return svc.next.UrReqTelegram(ctx, req)
}
func NewLoggingService(log logger.LoggerAdapter, next service.IService) service.IService {
return &Logging{log, next}
}

224
internal/server/server.go Normal file
View File

@ -0,0 +1,224 @@
package server
import (
"bytes"
"context"
"encoding/binary"
"net"
"time"
"git.espin.casa/albert/cml04-gdm-int/pkg/api"
"git.espin.casa/albert/cml04-l2-rsm-in/internal/helpers"
"git.espin.casa/albert/cml04-l2-rsm-in/internal/service"
"git.espin.casa/albert/cml04-l2-rsm-in/internal/types"
"git.espin.casa/albert/logger"
)
type Server struct {
address string
svc service.IService
log logger.LoggerAdapter
}
type Client struct {
conn net.Conn
svc service.IService
log logger.LoggerAdapter
}
func (client *Client) handleRequest() {
// telegram bytes buffer holder 4096 maximun telegram size defined by SMS
telegramBytes := bytes.NewBuffer(make([]byte, 4096))
for {
_, err := client.conn.Read(telegramBytes.Bytes())
if err != nil {
client.log.Error("reading telegram failed", err, logger.LogFields{})
return
}
// interpret first 24 bytes as header
header := &types.TelegramHeader{}
headerReader := bytes.NewReader(telegramBytes.Bytes()[:24])
if err := binary.Read(headerReader, binary.LittleEndian, header); err != nil {
client.log.Error("interpreting telegram header failed", err, logger.LogFields{})
return
}
// switch telegram id
switch header.TelegramID {
// handle telegram data
case types.BdReqTelegramID:
// read telegram data
telegramData := &types.BdRequestTelegram{}
if err := binary.Read(telegramBytes, binary.LittleEndian, telegramData); err != nil {
client.log.Error("reading telegram content failed", err, logger.LogFields{})
return
}
// clean roll set id
id := helpers.CleanString(string(telegramData.BdRollSetId[:]))
// api request roll
req := &api.BdRollDataReq{
Sender: "cml04-l2-rsm",
RollId: id,
TimeStamp: time.Now().UTC().Format(time.RFC3339),
}
// create context
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
// handle telegram data
res, err := client.handleBdDataRequest(ctx, req)
if err != nil {
client.log.Error("processing telegram failed", err, logger.LogFields{})
return
}
// publish response
if err := client.handlePublishBdRollData(ctx, res); err != nil {
client.log.Error("publishing BD roll data failed", err, logger.LogFields{})
return
}
case types.UrReqTelegramID:
// read telegram data
telegramData := &types.UrRequestTelegram{}
if err := binary.Read(telegramBytes, binary.LittleEndian, telegramData); err != nil {
client.log.Error("reading telegram content failed", err, logger.LogFields{})
return
}
// clean roll set id
id := helpers.CleanString(string(telegramData.UrRollSetId[:]))
// api request roll
req := &api.UrRollDataReq{
Sender: "cml04-l2-rsm",
RollId: id,
TimeStamp: time.Now().UTC().Format(time.RFC3339),
}
// handle telegram data
res, err := client.handleUrReqTelegram(context.Background(), req)
if err != nil {
client.log.Error("processing telegram failed", err, logger.LogFields{})
return
}
// publish response
if err := client.handlePublishUrRollData(context.Background(), res); err != nil {
client.log.Error("publishing UR roll data failed", err, logger.LogFields{})
return
}
case types.EdReqTelegramID:
// read telegram data
telegramData := &types.EdRequestTelegram{}
if err := binary.Read(telegramBytes, binary.LittleEndian, telegramData); err != nil {
client.log.Error("reading telegram content failed", err, logger.LogFields{})
return
}
// clean roll set id
id := helpers.CleanString(string(telegramData.EdRollSetId[:]))
// api request roll
req := &api.EdRollDataReq{
Sender: "cml04-l2-rsm",
RollId: id,
TimeStamp: time.Now().UTC().Format(time.RFC3339),
}
// handle telegram data
res, err := client.handleEdReqTelegram(context.Background(), req)
if err != nil {
client.log.Error("processing telegram failed", err, logger.LogFields{})
return
}
// publish response
if err := client.handlePublishEdRollData(context.Background(), res); err != nil {
client.log.Error("publishing ED roll data failed", err, logger.LogFields{})
return
}
case types.UfReqTelegramID:
// read telegram data
telegramData := &types.UfRequestTelegram{}
if err := binary.Read(telegramBytes, binary.LittleEndian, telegramData); err != nil {
client.log.Error("reading telegram content failed", err, logger.LogFields{})
return
}
// clean roll set id
id := helpers.CleanString(string(telegramData.UfRollSetId[:]))
// api request roll
req := &api.UfRollDataReq{
Sender: "cml04-l2-rsm",
RollId: id,
TimeStamp: time.Now().UTC().Format(time.RFC3339),
}
// handle telegram data
res, err := client.handleUfReqTelegram(context.Background(), req)
if err != nil {
client.log.Error("processing telegram failed", err, logger.LogFields{})
return
}
// publish response
if err := client.handlePublishUfRollData(context.Background(), res); err != nil {
client.log.Error("publishing UF roll data failed", err, logger.LogFields{})
return
}
}
}
}
func (client *Client) handlePublishBdRollData(ctx context.Context, data *api.BdRollDataRes) error {
return client.svc.PublishBdRollData(ctx, data.GetRollData())
}
func (client *Client) handlePublishUrRollData(ctx context.Context, data *api.UrRollDataRes) error {
return client.svc.PublishUrRollData(ctx, data.GetRollData())
}
func (client *Client) handlePublishEdRollData(ctx context.Context, data *api.EdRollDataRes) error {
return client.svc.PublishEdRollData(ctx, data.GetRollData())
}
func (client *Client) handlePublishUfRollData(ctx context.Context, data *api.UfRollDataRes) error {
return client.svc.PublishUfRollData(ctx, data.GetRollData())
}
func (client *Client) handleBdDataRequest(ctx context.Context, req *api.BdRollDataReq) (*api.BdRollDataRes, error) {
return client.svc.BdReqTelegram(ctx, req)
}
func (client *Client) handleUrReqTelegram(ctx context.Context, req *api.UrRollDataReq) (*api.UrRollDataRes, error) {
return client.svc.UrReqTelegram(ctx, req)
}
func (client *Client) handleEdReqTelegram(ctx context.Context, req *api.EdRollDataReq) (*api.EdRollDataRes, error) {
return client.svc.EdReqTelegram(ctx, req)
}
func (client *Client) handleUfReqTelegram(ctx context.Context, telegramData *api.UfRollDataReq) (*api.UfRollDataRes, error) {
return client.svc.UfReqTelegram(ctx, telegramData)
}
func (server *Server) Run() error {
// create tcp listener
listener, err := net.Listen("tcp", server.address)
if err != nil {
return err
}
// close on exit
defer listener.Close()
// accept incoming connections
for {
// accept connection
conn, err := listener.Accept()
if err != nil {
return err
}
// create new client
client := &Client{
conn: conn,
svc: server.svc,
log: server.log,
}
// handle client
go client.handleRequest()
}
}
func NewServer(address string, log logger.LoggerAdapter, svc service.IService) *Server {
return &Server{
address: address,
svc: svc,
log: log,
}
}

126
internal/service/service.go Normal file
View File

@ -0,0 +1,126 @@
package service
import (
"context"
cml04eventer "git.espin.casa/albert/cml04-eventer"
"git.espin.casa/albert/cml04-gdm-int/pkg/api"
"google.golang.org/protobuf/proto"
)
const (
NSQTopic string = "RSM"
NSQChannel string = "cml04-l2-rsm-in"
)
type IService interface {
BdReqTelegram(ctx context.Context, telegramData *api.BdRollDataReq) (res *api.BdRollDataRes, err error)
UrReqTelegram(ctx context.Context, telegramData *api.UrRollDataReq) (res *api.UrRollDataRes, err error)
EdReqTelegram(ctx context.Context, telegramData *api.EdRollDataReq) (res *api.EdRollDataRes, err error)
UfReqTelegram(ctx context.Context, telegramData *api.UfRollDataReq) (res *api.UfRollDataRes, err error)
PublishBdRollData(ctx context.Context, telegramData *api.BdRollData) (err error)
PublishUrRollData(ctx context.Context, telegramData *api.UrRollData) (err error)
PublishEdRollData(ctx context.Context, telegramData *api.EdRollData) (err error)
PublishUfRollData(ctx context.Context, telegramData *api.UfRollData) (err error)
}
type service struct {
client api.RollDataServiceClient
publisher cml04eventer.Publisher
}
// PublishBdRollData implements IService.
func (s *service) PublishBdRollData(ctx context.Context, telegramData *api.BdRollData) (err error) {
// create meta data
meta := cml04eventer.NewMetaData()
// set telegram id
meta.Set("telegram_id", 9000)
// marshal telegram data
data, err := proto.Marshal(telegramData)
if err != nil {
return err
}
// set event data
evt := cml04eventer.NewEvent(NSQChannel, NSQTopic, meta, data)
// publish event data
return s.publisher.PublishEvent(evt)
}
// PublishUrRollData implements IService.
func (s *service) PublishUrRollData(ctx context.Context, telegramData *api.UrRollData) (err error) {
// create meta data
meta := cml04eventer.NewMetaData()
// set telegram id
meta.Set("telegram_id", 9001)
// marshal telegram data
data, err := proto.Marshal(telegramData)
if err != nil {
return err
}
// set event data
evt := cml04eventer.NewEvent(NSQChannel, NSQTopic, meta, data)
// publish event data
return s.publisher.PublishEvent(evt)
}
// PublishEdRollData implements IService.
func (s *service) PublishEdRollData(ctx context.Context, telegramData *api.EdRollData) (err error) {
// create meta data
meta := cml04eventer.NewMetaData()
// set telegram id
meta.Set("telegram_id", 9002)
// marshal telegram data
data, err := proto.Marshal(telegramData)
if err != nil {
return err
}
// set event data
evt := cml04eventer.NewEvent(NSQChannel, NSQTopic, meta, data)
// publish event data
return s.publisher.PublishEvent(evt)
}
// PublishUfRollData implements IService.
func (s *service) PublishUfRollData(ctx context.Context, telegramData *api.UfRollData) (err error) {
// create meta data
meta := cml04eventer.NewMetaData()
// set telegram id
meta.Set("telegram_id", 9003)
// marshal telegram data
data, err := proto.Marshal(telegramData)
if err != nil {
return err
}
// set event data
evt := cml04eventer.NewEvent(NSQChannel, NSQTopic, meta, data)
// publish event data
return s.publisher.PublishEvent(evt)
}
// EdReqTelegram implements IService.
func (svc *service) EdReqTelegram(ctx context.Context, req *api.EdRollDataReq) (res *api.EdRollDataRes, err error) {
return svc.client.EdRollData(ctx, req)
}
// UfReqTelegram implements IService.
func (svc *service) UfReqTelegram(ctx context.Context, req *api.UfRollDataReq) (res *api.UfRollDataRes, err error) {
return svc.client.UfRollData(ctx, req)
}
// UrReqTelegram implements IService.
func (svc *service) UrReqTelegram(ctx context.Context, req *api.UrRollDataReq) (res *api.UrRollDataRes, err error) {
return svc.client.UrRollData(ctx, req)
}
// BdReqTelegram implements IService.
func (svc *service) BdReqTelegram(ctx context.Context, req *api.BdRollDataReq) (res *api.BdRollDataRes, err error) {
return svc.client.BdRollData(ctx, req)
}
func NewService(client api.RollDataServiceClient, publisher cml04eventer.Publisher) IService {
return &service{
client: client,
publisher: publisher,
}
}

View File

@ -0,0 +1,11 @@
package types
type TelegramID int16
type TelegramHeader struct {
MessageLength int16 `json:"message_length"`
TelegramID TelegramID `json:"telegram_id"`
SequenceCounter int16 `json:"sequence_counter"`
Flags int16 `json:"flags"`
TimeStamp [8]int16 `json:"timestamp"`
}

31
internal/types/request.go Normal file
View File

@ -0,0 +1,31 @@
package types
const (
BdReqTelegramID TelegramID = 9500
UrReqTelegramID TelegramID = 9501
EdReqTelegramID TelegramID = 9502
UfReqTelegramID TelegramID = 9503
)
type BdRequestTelegram struct {
TelegramHeader TelegramHeader
BdRollSetId [20]byte
}
type UrRequestTelegram struct {
TelegramHeader TelegramHeader
UrRollSetId [20]byte
UrRollSetType int32
}
type EdRequestTelegram struct {
TelegramHeader TelegramHeader
EdRollSetId [20]byte
EdRollSetType int32
}
type UfRequestTelegram struct {
TelegramHeader TelegramHeader
UfRollSetId [20]byte
UfRollSetType int32
}