cml04-l2-rsm-out/internal/app/app.go
2024-08-20 10:07:14 +02:00

144 lines
3.5 KiB
Go

package app
import (
"context"
"flag"
"fmt"
"os"
"os/signal"
"syscall"
cml04eventer "git.espin.casa/albert/cml04-eventer"
"git.espin.casa/albert/cml04-gdm-int/pkg/api"
"git.espin.casa/albert/cml04-l2-rsm-out/internal/logging"
"git.espin.casa/albert/cml04-l2-rsm-out/internal/service"
"git.espin.casa/albert/logger"
"google.golang.org/protobuf/proto"
)
func Run() error {
// flags
l2OutAddr := flag.String("l2-in-addr", "0.0.0.0:5013", "tcp l2 server address")
nsqAddr := flag.String("nsq-addr", "10.1.152.13", "nsq server address")
nsqPort := flag.Int("nsq-port", 4150, "nsq server port")
logLevel := flag.String("log-level", "debug", "trace debug level")
flag.Parse()
// setup logger
log := logger.New(os.Stdout, *logLevel)
// log fields
logFields := logger.LogFields{
"log_level": *logLevel,
"l2_out_address": *l2OutAddr,
"nsq_address": *nsqAddr,
"nsq_port": *nsqPort,
}
// create service
svc := service.NewService()
svc = logging.NewLogginService(log, svc)
// create context
ctx := context.Background()
// subscribe events
subscriber, err := cml04eventer.NewSubscriber(&cml04eventer.SubscriberConfig{
NSQAddress: *nsqAddr,
NSQPort: *nsqPort,
Unmarshaler: cml04eventer.JSONMarshaler{},
Channel: "cml04-l2-rsm-out",
}, log)
// check error
if err != nil {
log.Error("create subscriber failed", err, logFields)
return err
}
// subscribe events
eventChan, err := subscriber.SubscribeEvent(ctx, "RSM")
if err != nil {
log.Error("subscribe event failed", err, logFields)
return err
}
//
go func() {
for event := range eventChan {
if err := ProcessEvent(event, svc); err != nil {
log.Error("", err, logFields)
}
}
}()
// info banner
log.Info("started cml04-l2-rsm-out telegram service", logFields)
// wait signal to finish
signal := WaitSignal()
log.Info("signal received", logFields.Add(logger.LogFields{
"signal": signal,
}))
return nil
}
func ProcessEvent(evt *cml04eventer.Event, svc service.IService) error {
telegramID, err := evt.EventMeta.Get("telegram_id")
if err != nil {
return err
}
fmt.Println(telegramID)
switch telegramID.(float64) {
case 9000:
// bd roll data holder
bdRollData := &api.BdRollData{}
// unmarshal proto message
if err := proto.Unmarshal(evt.EventData, bdRollData); err != nil {
return err
}
if err := svc.SendBdRollData(context.Background(), bdRollData); err != nil {
return err
}
case 9001:
// ur roll data holder
urRollData := &api.UrRollData{}
// unmarshal proto message
if err := proto.Unmarshal(evt.EventData, urRollData); err != nil {
return err
}
if err := svc.SendUrRollData(context.Background(), urRollData); err != nil {
return err
}
case 9002:
// ed roll data holder
edRollData := &api.EdRollData{}
// unmarshal proto message
if err := proto.Unmarshal(evt.EventData, edRollData); err != nil {
return err
}
if err := svc.SendEdRollData(context.Background(), edRollData); err != nil {
return err
}
case 9003:
// uf roll data holder
ufRollData := &api.UfRollData{}
// unmarshal proto message
if err := proto.Unmarshal(evt.EventData, ufRollData); err != nil {
return err
}
if err := svc.SendUfRollData(context.Background(), ufRollData); err != nil {
return err
}
}
return nil
}
// WaitSignal catching exit signal
func WaitSignal() os.Signal {
ch := make(chan os.Signal, 2)
signal.Notify(
ch,
syscall.SIGINT,
syscall.SIGQUIT,
syscall.SIGTERM,
)
for {
sig := <-ch
switch sig {
case syscall.SIGINT, syscall.SIGQUIT, syscall.SIGTERM:
return sig
}
}
}