package cml04eventer import ( "context" "errors" "fmt" "sync" "time" "git.espin.casa/albert/logger" "github.com/nsqio/go-nsq" ) type Subscriber interface { SubscribeEvent(ctx context.Context, topic string) (<-chan *Event, error) Close() error } type SubscriberImpl struct { log logger.LoggerAdapter config *SubscriberConfig consumer *nsq.Consumer logger logger.LoggerAdapter subsLock sync.RWMutex closed bool closing chan struct{} outputsWg sync.WaitGroup processingMessagesWg sync.WaitGroup } type SubscriberConfig struct { NSQAddress string NSQPort int Unmarshaler Unmarshaler Channel string } func (c *SubscriberImpl) SubscribeEvent(ctx context.Context, topic string) (<-chan *Event, error) { // default config nsqCfg := nsq.NewConfig() // maximum number of attempts nsqCfg.MaxAttempts = 10 // maximum number of in flight messages nsqCfg.MaxInFlight = 5 // Maximum duration when REQueueing nsqCfg.MaxRequeueDelay = time.Second * 900 nsqCfg.DefaultRequeueDelay = time.Second * 0 // events channel output := make(chan *Event) // wait group c.outputsWg.Add(1) // create consumer consumer, err := nsq.NewConsumer(topic, c.config.Channel, nsqCfg) // handle error if err != nil { return nil, err } // set consumer c.consumer = consumer // consume handler consumer.AddHandler(nsq.HandlerFunc(func(message *nsq.Message) error { // wait group c.processingMessagesWg.Add(1) // wait for close defer c.processingMessagesWg.Done() // check if sub is closed if c.isClosed() { return errors.New("connection is already closed") } // unmarshal message event, err := c.config.Unmarshaler.Unmarshal(message) if err != nil { return err } output <- event return nil })) // set nsq address addr := fmt.Sprintf("%s:%d", c.config.NSQAddress, c.config.NSQPort) // connect to nsqd if err := consumer.ConnectToNSQD(addr); err != nil { return nil, err } // return channel return output, err } func (c *SubscriberImpl) Close() error { // check if already closed if c.closed { return nil } // close consumer c.consumer.Stop() // set closed c.closed = true // c.log.Info("closing subscriber", logger.LogFields{}) defer c.log.Info("subscriber closed", logger.LogFields{}) // close channel close(c.closing) // return return nil } func (c *SubscriberImpl) isClosed() bool { c.subsLock.RLock() defer c.subsLock.RUnlock() return c.closed } func NewSubscriber(config *SubscriberConfig, log logger.LoggerAdapter) (Subscriber, error) { return &SubscriberImpl{ log: log, config: &SubscriberConfig{ NSQAddress: config.NSQAddress, NSQPort: config.NSQPort, Unmarshaler: config.Unmarshaler, Channel: config.Channel, }, consumer: &nsq.Consumer{}, logger: log, subsLock: sync.RWMutex{}, closed: false, closing: make(chan struct{}), outputsWg: sync.WaitGroup{}, processingMessagesWg: sync.WaitGroup{}, }, nil }