89 lines
1.9 KiB
Go
89 lines
1.9 KiB
Go
|
package cml04eventer
|
||
|
|
||
|
import (
|
||
|
"fmt"
|
||
|
"sync"
|
||
|
|
||
|
"git.espin.casa/albert/logger"
|
||
|
"github.com/nsqio/go-nsq"
|
||
|
)
|
||
|
|
||
|
type Publisher interface {
|
||
|
PublishEvent(event *Event) error
|
||
|
Close() error
|
||
|
}
|
||
|
|
||
|
type PublisherImpl struct {
|
||
|
config *PublisherImplConfig
|
||
|
log logger.LoggerAdapter
|
||
|
producer *nsq.Producer
|
||
|
lock *sync.Mutex
|
||
|
}
|
||
|
|
||
|
type PublisherImplConfig struct {
|
||
|
NSQAddress string
|
||
|
NSQPort int
|
||
|
Marshaler Marshaler
|
||
|
}
|
||
|
|
||
|
// PublishEvent publishes an event
|
||
|
func (p *PublisherImpl) PublishEvent(event *Event) error {
|
||
|
p.lock.Lock()
|
||
|
defer p.lock.Unlock()
|
||
|
// log fields
|
||
|
logFields := logger.LogFields{
|
||
|
"event_id": event.EventID,
|
||
|
"event_sender": event.EventSender,
|
||
|
"event_data_length": len(event.EventData),
|
||
|
}
|
||
|
// log event
|
||
|
p.log.Info("new event published", logFields)
|
||
|
// marshal event
|
||
|
b, err := p.config.Marshaler.Marshal(event.EventTopic, event)
|
||
|
if err != nil {
|
||
|
p.log.Error("error marshalling event", err, logFields)
|
||
|
return err
|
||
|
}
|
||
|
// publish event
|
||
|
if err := p.producer.Publish(event.EventTopic, b); err != nil {
|
||
|
p.log.Error("error publishing event", err, logFields)
|
||
|
return err
|
||
|
}
|
||
|
return nil
|
||
|
}
|
||
|
|
||
|
func (p *PublisherImpl) Close() error {
|
||
|
p.lock.Lock()
|
||
|
defer func() {
|
||
|
p.lock.Unlock()
|
||
|
p.log.Info("publisher closed", logger.LogFields{})
|
||
|
}()
|
||
|
// log
|
||
|
p.log.Info("closing publisher", logger.LogFields{})
|
||
|
// stop producer
|
||
|
p.producer.Stop()
|
||
|
return nil
|
||
|
}
|
||
|
|
||
|
// NewPublisher creates a new publisher
|
||
|
func NewPublisher(config *PublisherImplConfig, log logger.LoggerAdapter) (Publisher, error) {
|
||
|
// create nsq producer address
|
||
|
addr := fmt.Sprintf("%s:%d", config.NSQAddress, config.NSQPort)
|
||
|
// create nsq producer
|
||
|
producer, err := nsq.NewProducer(addr, nsq.NewConfig())
|
||
|
// handle error
|
||
|
if err != nil {
|
||
|
log.Error("error creating nsq producer", err, logger.LogFields{
|
||
|
"nsq_address": addr,
|
||
|
})
|
||
|
return nil, err
|
||
|
}
|
||
|
// return publishe
|
||
|
return &PublisherImpl{
|
||
|
config: config,
|
||
|
log: log,
|
||
|
producer: producer,
|
||
|
lock: &sync.Mutex{},
|
||
|
}, nil
|
||
|
}
|