cml04-eventer/subscriber.go

130 lines
3.0 KiB
Go

package cml04eventer
import (
"context"
"errors"
"fmt"
"sync"
"time"
"git.espin.casa/albert/logger"
"github.com/nsqio/go-nsq"
)
type Subscriber interface {
SubscribeEvent(ctx context.Context, topic string) (<-chan *Event, error)
Close() error
}
type SubscriberImpl struct {
log logger.LoggerAdapter
config *SubscriberConfig
consumer *nsq.Consumer
logger logger.LoggerAdapter
subsLock sync.RWMutex
closed bool
closing chan struct{}
outputsWg sync.WaitGroup
processingMessagesWg sync.WaitGroup
}
type SubscriberConfig struct {
NSQAddress string
NSQPort int
Unmarshaler Unmarshaler
Channel string
}
func (c *SubscriberImpl) SubscribeEvent(ctx context.Context, topic string) (<-chan *Event, error) {
// default config
nsqCfg := nsq.NewConfig()
// maximum number of attempts
nsqCfg.MaxAttempts = 10
// maximum number of in flight messages
nsqCfg.MaxInFlight = 5
// Maximum duration when REQueueing
nsqCfg.MaxRequeueDelay = time.Second * 900
nsqCfg.DefaultRequeueDelay = time.Second * 0
// events channel
output := make(chan *Event)
// wait group
c.outputsWg.Add(1)
// create consumer
consumer, err := nsq.NewConsumer(topic, c.config.Channel, nsqCfg)
// handle error
if err != nil {
return nil, err
}
// set consumer
c.consumer = consumer
// consume handler
consumer.AddHandler(nsq.HandlerFunc(func(message *nsq.Message) error {
// wait group
c.processingMessagesWg.Add(1)
// wait for close
defer c.processingMessagesWg.Done()
// check if sub is closed
if c.isClosed() {
return errors.New("connection is already closed")
}
// unmarshal message
event, err := c.config.Unmarshaler.Unmarshal(message)
if err != nil {
return err
}
output <- event
return nil
}))
// set nsq address
addr := fmt.Sprintf("%s:%d", c.config.NSQAddress, c.config.NSQPort)
// connect to nsqd
if err := consumer.ConnectToNSQD(addr); err != nil {
return nil, err
}
// return channel
return output, err
}
func (c *SubscriberImpl) Close() error {
// check if already closed
if c.closed {
return nil
}
// close consumer
c.consumer.Stop()
// set closed
c.closed = true
//
c.log.Info("closing subscriber", logger.LogFields{})
defer c.log.Info("subscriber closed", logger.LogFields{})
// close channel
close(c.closing)
// return
return nil
}
func (c *SubscriberImpl) isClosed() bool {
c.subsLock.RLock()
defer c.subsLock.RUnlock()
return c.closed
}
func NewSubscriber(config *SubscriberConfig, log logger.LoggerAdapter) (Subscriber, error) {
return &SubscriberImpl{
log: log,
config: &SubscriberConfig{
NSQAddress: config.NSQAddress,
NSQPort: config.NSQPort,
Unmarshaler: config.Unmarshaler,
Channel: config.Channel,
},
consumer: &nsq.Consumer{},
logger: log,
subsLock: sync.RWMutex{},
closed: false,
closing: make(chan struct{}),
outputsWg: sync.WaitGroup{},
processingMessagesWg: sync.WaitGroup{},
}, nil
}