cml04-eventer/publisher.go

89 lines
1.9 KiB
Go

package cml04eventer
import (
"fmt"
"sync"
"git.espin.casa/albert/logger"
"github.com/nsqio/go-nsq"
)
type Publisher interface {
PublishEvent(event *Event) error
Close() error
}
type PublisherImpl struct {
config *PublisherImplConfig
log logger.LoggerAdapter
producer *nsq.Producer
lock *sync.Mutex
}
type PublisherImplConfig struct {
NSQAddress string
NSQPort int
Marshaler Marshaler
}
// PublishEvent publishes an event
func (p *PublisherImpl) PublishEvent(event *Event) error {
p.lock.Lock()
defer p.lock.Unlock()
// log fields
logFields := logger.LogFields{
"event_id": event.EventID,
"event_sender": event.EventSender,
"event_data_length": len(event.EventData),
}
// log event
p.log.Info("new event published", logFields)
// marshal event
b, err := p.config.Marshaler.Marshal(event.EventTopic, event)
if err != nil {
p.log.Error("error marshalling event", err, logFields)
return err
}
// publish event
if err := p.producer.Publish(event.EventTopic, b); err != nil {
p.log.Error("error publishing event", err, logFields)
return err
}
return nil
}
func (p *PublisherImpl) Close() error {
p.lock.Lock()
defer func() {
p.lock.Unlock()
p.log.Info("publisher closed", logger.LogFields{})
}()
// log
p.log.Info("closing publisher", logger.LogFields{})
// stop producer
p.producer.Stop()
return nil
}
// NewPublisher creates a new publisher
func NewPublisher(config *PublisherImplConfig, log logger.LoggerAdapter) (Publisher, error) {
// create nsq producer address
addr := fmt.Sprintf("%s:%d", config.NSQAddress, config.NSQPort)
// create nsq producer
producer, err := nsq.NewProducer(addr, nsq.NewConfig())
// handle error
if err != nil {
log.Error("error creating nsq producer", err, logger.LogFields{
"nsq_address": addr,
})
return nil, err
}
// return publishe
return &PublisherImpl{
config: config,
log: log,
producer: producer,
lock: &sync.Mutex{},
}, nil
}