package app import ( "context" "flag" "fmt" "os" "os/signal" "syscall" cml04eventer "git.espin.casa/albert/cml04-eventer" "git.espin.casa/albert/cml04-gdm-int/pkg/api" "git.espin.casa/albert/cml04-l2-rsm-out/internal/logging" "git.espin.casa/albert/cml04-l2-rsm-out/internal/service" "git.espin.casa/albert/logger" "google.golang.org/protobuf/proto" ) func Run() error { // flags l2OutAddr := flag.String("l2-in-addr", "0.0.0.0:5013", "tcp l2 server address") nsqAddr := flag.String("nsq-addr", "10.1.152.13", "nsq server address") nsqPort := flag.Int("nsq-port", 4150, "nsq server port") logLevel := flag.String("log-level", "debug", "trace debug level") flag.Parse() // setup logger log := logger.New(os.Stdout, *logLevel) // log fields logFields := logger.LogFields{ "log_level": *logLevel, "l2_out_address": *l2OutAddr, "nsq_address": *nsqAddr, "nsq_port": *nsqPort, } // create service svc := service.NewService() svc = logging.NewLogginService(log, svc) // create context ctx := context.Background() // subscribe events subscriber, err := cml04eventer.NewSubscriber(&cml04eventer.SubscriberConfig{ NSQAddress: *nsqAddr, NSQPort: *nsqPort, Unmarshaler: cml04eventer.JSONMarshaler{}, Channel: "cml04-l2-rsm-out", }, log) // check error if err != nil { log.Error("create subscriber failed", err, logFields) return err } // subscribe events eventChan, err := subscriber.SubscribeEvent(ctx, "RSM") if err != nil { log.Error("subscribe event failed", err, logFields) return err } // go func() { for event := range eventChan { if err := ProcessEvent(event, svc); err != nil { log.Error("", err, logFields) } } }() // info banner log.Info("started cml04-l2-rsm-out telegram service", logFields) // wait signal to finish signal := WaitSignal() log.Info("signal received", logFields.Add(logger.LogFields{ "signal": signal, })) return nil } func ProcessEvent(evt *cml04eventer.Event, svc service.IService) error { telegramID, err := evt.EventMeta.Get("telegram_id") if err != nil { return err } fmt.Println(telegramID) switch telegramID.(float64) { case 9000: // bd roll data holder bdRollData := &api.BdRollData{} // unmarshal proto message if err := proto.Unmarshal(evt.EventData, bdRollData); err != nil { return err } if err := svc.SendBdRollData(context.Background(), bdRollData); err != nil { return err } case 9001: // ur roll data holder urRollData := &api.UrRollData{} // unmarshal proto message if err := proto.Unmarshal(evt.EventData, urRollData); err != nil { return err } if err := svc.SendUrRollData(context.Background(), urRollData); err != nil { return err } case 9002: // ed roll data holder edRollData := &api.EdRollData{} // unmarshal proto message if err := proto.Unmarshal(evt.EventData, edRollData); err != nil { return err } if err := svc.SendEdRollData(context.Background(), edRollData); err != nil { return err } case 9003: // uf roll data holder ufRollData := &api.UfRollData{} // unmarshal proto message if err := proto.Unmarshal(evt.EventData, ufRollData); err != nil { return err } if err := svc.SendUfRollData(context.Background(), ufRollData); err != nil { return err } } return nil } // WaitSignal catching exit signal func WaitSignal() os.Signal { ch := make(chan os.Signal, 2) signal.Notify( ch, syscall.SIGINT, syscall.SIGQUIT, syscall.SIGTERM, ) for { sig := <-ch switch sig { case syscall.SIGINT, syscall.SIGQUIT, syscall.SIGTERM: return sig } } }