This commit is contained in:
aespinro 2024-05-08 19:29:57 +02:00
commit 52ca86976f
13 changed files with 742 additions and 0 deletions

1
README.md Normal file
View File

@ -0,0 +1 @@
# CML04 Eventer

34
event.go Normal file
View File

@ -0,0 +1,34 @@
package cml04eventer
import (
"time"
"github.com/google/uuid"
)
// Event struct
type Event struct {
EventID string `json:"event_id"`
EventTopic string `json:"event_topic"`
EventSender string `json:"event_sender"`
EventMeta MetaData `json:"event_meta"`
EventData []byte `json:"event_data"`
EventTime string `json:"event_time"`
}
// NewEvent creates a new event
func NewEvent(eventSender, eventTopic string, eventMeta MetaData, eventData []byte) *Event {
// check if eventMeta is nil
if eventMeta == nil {
eventMeta = NewMetaData()
}
// return event
return &Event{
EventID: uuid.NewString(),
EventTopic: eventTopic,
EventSender: eventSender,
EventMeta: eventMeta,
EventData: eventData,
EventTime: time.Now().Format(time.RFC3339),
}
}

48
event_test.go Normal file
View File

@ -0,0 +1,48 @@
package cml04eventer
import (
"testing"
"github.com/nsqio/go-nsq"
"github.com/stretchr/testify/assert"
)
func TestJSONMarshaler(t *testing.T) {
// Crear una instancia de JSONMarshaler
marshaler := JSONMarshaler{}
// Crear un evento para probar la serialización
eventSender := "test-sender"
eventTopic := "test-topic"
eventMeta := MetaData{"key": "value"}
eventData := []byte("test-data")
event := NewEvent(eventSender, eventTopic, eventMeta, eventData)
// Serializar el evento utilizando el Marshaler
serialized, err := marshaler.Marshal(eventTopic, event)
assert.NoError(t, err)
// Deserializar el mensaje serializado
nsqMsg := &nsq.Message{Body: serialized}
decodedEvent, err := marshaler.Unmarshal(nsqMsg)
assert.NoError(t, err)
// Comparar el evento original con el evento deserializado
assert.Equal(t, event, decodedEvent)
}
func TestJSONMarshaler_ErrorOnInvalidJSON(t *testing.T) {
// Crear una instancia de JSONMarshaler
marshaler := JSONMarshaler{}
// Crear un mensaje JSON inválido (por ejemplo, falta una llave)
invalidJSON := []byte(`{"key": "value"`)
// Intentar deserializar el mensaje JSON inválido
nsqMsg := &nsq.Message{Body: invalidJSON}
decodedEvent, err := marshaler.Unmarshal(nsqMsg)
// Debe devolver un error
assert.Error(t, err)
assert.Nil(t, decodedEvent) // El evento deserializado debe ser nulo
}

20
go.mod Normal file
View File

@ -0,0 +1,20 @@
module git.espin.casa/albert/cml04-eventer
go 1.22.0
require (
git.espin.casa/albert/logger v0.0.0-20240221100041-dc3cb01119a3
github.com/google/uuid v1.6.0
github.com/nsqio/go-nsq v1.1.0
github.com/pkg/errors v0.9.1
github.com/stretchr/testify v1.8.4
)
require (
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/golang/snappy v0.0.1 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/sirupsen/logrus v1.9.3 // indirect
golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)

28
go.sum Normal file
View File

@ -0,0 +1,28 @@
git.espin.casa/albert/logger v0.0.0-20240221100041-dc3cb01119a3 h1:HUEESn4o8sVXAUJWGJAATeitiNRxVzzwBU8RiIW4Wzc=
git.espin.casa/albert/logger v0.0.0-20240221100041-dc3cb01119a3/go.mod h1:P1yAUiotJurq7j/wZt6Cnel17HChplkz0E40WD8a5to=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/golang/snappy v0.0.1 h1:Qgr9rKW7uDUkrbSmQeiDsGa8SjGyCOGtuasMWwvp2P4=
github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/nsqio/go-nsq v1.1.0 h1:PQg+xxiUjA7V+TLdXw7nVrJ5Jbl3sN86EhGCQj4+FYE=
github.com/nsqio/go-nsq v1.1.0/go.mod h1:vKq36oyeVXgsS5Q8YEO7WghqidAVXQlcFxzQbQTuDEY=
github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/sirupsen/logrus v1.9.3 h1:dueUQJ1C2q9oE3F7wvmSGAaVtTmUizReu6fjN8uqzbQ=
github.com/sirupsen/logrus v1.9.3/go.mod h1:naHLuLoDiP4jHNo9R0sCBMtWGeIprob74mVsIT4qYEQ=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk=
github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo=
golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8 h1:0A+M6Uqn+Eje4kHMK80dtF3JCXC4ykBgQG4Fe06QRhQ=
golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=

88
marshal.go Normal file
View File

@ -0,0 +1,88 @@
package cml04eventer
import (
"bytes"
"encoding/gob"
"encoding/json"
"github.com/nsqio/go-nsq"
"github.com/pkg/errors"
)
type Marshaler interface {
Marshal(topic string, msg *Event) ([]byte, error)
}
type Unmarshaler interface {
Unmarshal(*nsq.Message) (*Event, error)
}
type JSONMarshaler struct{}
func (j JSONMarshaler) Marshal(topic string, msg *Event) ([]byte, error) {
// buffer holder
buf := new(bytes.Buffer)
// create encoder
encoder := json.NewEncoder(buf)
// encode message
if err := encoder.Encode(msg); err != nil {
return nil, errors.Wrap(err, "cannot encode message")
}
// return encoded message
return buf.Bytes(), nil
}
func (j JSONMarshaler) Unmarshal(nsqMsg *nsq.Message) (*Event, error) {
// buffer holder
buf := new(bytes.Buffer)
// write message data to buffer
_, err := buf.Write(nsqMsg.Body)
if err != nil {
return nil, errors.Wrap(err, "cannot write nsq message data to buffer")
}
// create decoder
decoder := json.NewDecoder(buf)
// decode event from buffer message
var decodedEvent Event
// decode message
if err := decoder.Decode(&decodedEvent); err != nil {
return nil, errors.Wrap(err, "cannot decode message")
}
// return decoded message
return &decodedEvent, nil
}
type GobMarshaler struct{}
func (g GobMarshaler) Marshal(topic string, msg *Event) ([]byte, error) {
// buffer holder
buf := new(bytes.Buffer)
// create encoder
encoder := gob.NewEncoder(buf)
// encode message
if err := encoder.Encode(msg); err != nil {
return nil, errors.Wrap(err, "cannot encode message")
}
// return encoded message
return buf.Bytes(), nil
}
func (g GobMarshaler) Unmarshal(nsqMsg *nsq.Message) (*Event, error) {
// buffer holder
buf := new(bytes.Buffer)
// write message data to buffer
_, err := buf.Write(nsqMsg.Body)
if err != nil {
return nil, errors.Wrap(err, "cannot write nsq message data to buffer")
}
// create decoder
decoder := gob.NewDecoder(buf)
// decode event from buffer message
var decodedEvent Event
// decode message
if err := decoder.Decode(&decodedEvent); err != nil {
return nil, errors.Wrap(err, "cannot decode message")
}
// return decoded message
return &decodedEvent, nil
}

56
marshal_test.go Normal file
View File

@ -0,0 +1,56 @@
package cml04eventer
import (
"testing"
"github.com/nsqio/go-nsq"
"github.com/stretchr/testify/assert"
)
func TestJSONMarshaler_MarshalUnmarshal(t *testing.T) {
// Crear una instancia de JSONMarshaler
marshaler := JSONMarshaler{}
// Crear un evento para probar la serialización
eventSender := "test-sender"
eventTopic := "test-topic"
eventMeta := MetaData{"key": "value"}
eventData := []byte("test-data")
event := NewEvent(eventSender, eventTopic, eventMeta, eventData)
// Serializar el evento utilizando el Marshaler
serialized, err := marshaler.Marshal(eventTopic, event)
assert.NoError(t, err)
// Deserializar el mensaje serializado
nsqMsg := &nsq.Message{Body: serialized}
decodedEvent, err := marshaler.Unmarshal(nsqMsg)
assert.NoError(t, err)
// Comparar el evento original con el evento deserializado
assert.Equal(t, event, decodedEvent)
}
func TestGobMarshaler_MarshalUnmarshal(t *testing.T) {
// Crear una instancia de GobMarshaler
marshaler := GobMarshaler{}
// Crear un evento para probar la serialización
eventSender := "test-sender"
eventTopic := "test-topic"
eventMeta := MetaData{"key": "value"}
eventData := []byte("test-data")
event := NewEvent(eventSender, eventTopic, eventMeta, eventData)
// Serializar el evento utilizando el Marshaler
serialized, err := marshaler.Marshal(eventTopic, event)
assert.NoError(t, err)
// Deserializar el mensaje serializado
nsqMsg := &nsq.Message{Body: serialized}
decodedEvent, err := marshaler.Unmarshal(nsqMsg)
assert.NoError(t, err)
// Comparar el evento original con el evento deserializado
assert.Equal(t, event, decodedEvent)
}

58
metada_test.go Normal file
View File

@ -0,0 +1,58 @@
package cml04eventer
import (
"testing"
"github.com/stretchr/testify/assert"
)
func TestMetaData_Get(t *testing.T) {
// Crear una instancia de MetaData
meta := NewMetaData()
// Agregar un valor a MetaData
key := "test-key"
value := "test-value"
meta.Set(key, value)
// Obtener el valor y verificar que sea correcto
result, err := meta.Get(key)
assert.NoError(t, err)
assert.Equal(t, value, result)
// Intentar obtener un valor inexistente y verificar que se devuelva un error
_, err = meta.Get("non-existent-key")
assert.Error(t, err)
}
func TestMetaData_Set(t *testing.T) {
// Crear una instancia de MetaData
meta := NewMetaData()
// Establecer un valor en MetaData
key := "test-key"
value := "test-value"
meta.Set(key, value)
// Obtener el valor y verificar que sea correcto
result, err := meta.Get(key)
assert.NoError(t, err)
assert.Equal(t, value, result)
}
func TestMetaData_Delete(t *testing.T) {
// Crear una instancia de MetaData
meta := NewMetaData()
// Agregar un valor a MetaData
key := "test-key"
value := "test-value"
meta.Set(key, value)
// Eliminar el valor y verificar que se haya eliminado correctamente
meta.Delete(key)
// Intentar obtener el valor eliminado y verificar que se devuelva un error
_, err := meta.Get(key)
assert.Error(t, err)
}

25
metadata.go Normal file
View File

@ -0,0 +1,25 @@
package cml04eventer
import "errors"
type MetaData map[string]interface{}
func (m MetaData) Get(key string) (interface{}, error) {
value, ok := m[key]
if !ok {
return nil, errors.New("key not found")
}
return value, nil
}
func (m MetaData) Set(key string, value interface{}) {
m[key] = value
}
func (m MetaData) Delete(key string) {
delete(m, key)
}
func NewMetaData() MetaData {
return make(map[string]interface{})
}

88
publisher.go Normal file
View File

@ -0,0 +1,88 @@
package cml04eventer
import (
"fmt"
"sync"
"git.espin.casa/albert/logger"
"github.com/nsqio/go-nsq"
)
type Publisher interface {
PublishEvent(event *Event) error
Close() error
}
type PublisherImpl struct {
config *PublisherImplConfig
log logger.LoggerAdapter
producer *nsq.Producer
lock *sync.Mutex
}
type PublisherImplConfig struct {
NSQAddress string
NSQPort int
Marshaler Marshaler
}
// PublishEvent publishes an event
func (p *PublisherImpl) PublishEvent(event *Event) error {
p.lock.Lock()
defer p.lock.Unlock()
// log fields
logFields := logger.LogFields{
"event_id": event.EventID,
"event_sender": event.EventSender,
"event_data_length": len(event.EventData),
}
// log event
p.log.Info("new event published", logFields)
// marshal event
b, err := p.config.Marshaler.Marshal(event.EventTopic, event)
if err != nil {
p.log.Error("error marshalling event", err, logFields)
return err
}
// publish event
if err := p.producer.Publish(event.EventTopic, b); err != nil {
p.log.Error("error publishing event", err, logFields)
return err
}
return nil
}
func (p *PublisherImpl) Close() error {
p.lock.Lock()
defer func() {
p.lock.Unlock()
p.log.Info("publisher closed", logger.LogFields{})
}()
// log
p.log.Info("closing publisher", logger.LogFields{})
// stop producer
p.producer.Stop()
return nil
}
// NewPublisher creates a new publisher
func NewPublisher(config *PublisherImplConfig, log logger.LoggerAdapter) (Publisher, error) {
// create nsq producer address
addr := fmt.Sprintf("%s:%d", config.NSQAddress, config.NSQPort)
// create nsq producer
producer, err := nsq.NewProducer(addr, nsq.NewConfig())
// handle error
if err != nil {
log.Error("error creating nsq producer", err, logger.LogFields{
"nsq_address": addr,
})
return nil, err
}
// return publishe
return &PublisherImpl{
config: config,
log: log,
producer: producer,
lock: &sync.Mutex{},
}, nil
}

89
publisher_test.go Normal file
View File

@ -0,0 +1,89 @@
package cml04eventer
import (
"encoding/json"
"testing"
"git.espin.casa/albert/logger"
"github.com/nsqio/go-nsq"
"github.com/stretchr/testify/assert"
)
// MockLoggerAdapter is an implementation of LoggerAdapter for testing
type MockLoggerAdapter struct{}
type LoggerFields map[string]interface{}
func (m *MockLoggerAdapter) Info(msg string, fields logger.LogFields) {}
func (m *MockLoggerAdapter) Error(msg string, err error, fields logger.LogFields) {}
func (m *MockLoggerAdapter) Debug(msg string, fields logger.LogFields) {}
func (m *MockLoggerAdapter) Warn(msg string, fields logger.LogFields) {}
func (m *MockLoggerAdapter) Trace(msg string, fields logger.LogFields) {}
func (m *MockLoggerAdapter) Fatal(msg string, err error, fields logger.LogFields) {}
func (m *MockLoggerAdapter) Panic(msg string, err error, fields logger.LogFields) {}
// MockMarshaler is an implementation of Marshaler for testing
type MockMarshaler struct{}
func (m MockMarshaler) Marshal(topic string, msg *Event) ([]byte, error) {
// Simply convert the event to JSON for testing
return json.Marshal(msg)
}
func (m MockMarshaler) Unmarshal(nsqMsg *nsq.Message) (*Event, error) {
// Simply decode the JSON message for testing
var event Event
err := json.Unmarshal(nsqMsg.Body, &event)
return &event, err
}
func TestPublisherImpl_PublishEvent(t *testing.T) {
// Create an instance of MockLoggerAdapter
mockLogger := &MockLoggerAdapter{}
// Test configuration for the Publisher
config := &PublisherImplConfig{
NSQAddress: "localhost",
NSQPort: 4150,
Marshaler: MockMarshaler{},
}
// Create an instance of PublisherImpl for testing
publisher, err := NewPublisher(config, mockLogger)
assert.NoError(t, err)
// Create a test event
event := &Event{
EventID: "123",
EventSender: "test-sender",
EventTopic: "test-topic",
EventData: []byte("test-data"),
}
// Publish the event
err = publisher.PublishEvent(event)
assert.NoError(t, err)
// Close the publisher
err = publisher.Close()
assert.NoError(t, err)
}
func TestPublisherImpl_Close(t *testing.T) {
// Create an instance of MockLoggerAdapter
mockLogger := &MockLoggerAdapter{}
// Test configuration for the Publisher
config := &PublisherImplConfig{
NSQAddress: "localhost",
NSQPort: 4150,
Marshaler: MockMarshaler{},
}
// Create an instance of PublisherImpl for testing
publisher, err := NewPublisher(config, mockLogger)
assert.NoError(t, err)
// Close the publisher
err = publisher.Close()
assert.NoError(t, err)
}

129
subscriber.go Normal file
View File

@ -0,0 +1,129 @@
package cml04eventer
import (
"context"
"errors"
"fmt"
"sync"
"time"
"git.espin.casa/albert/logger"
"github.com/nsqio/go-nsq"
)
type Subscriber interface {
SubscribeEvent(ctx context.Context, topic string) (<-chan *Event, error)
Close() error
}
type SubscriberImpl struct {
log logger.LoggerAdapter
config *SubscriberConfig
consumer *nsq.Consumer
logger logger.LoggerAdapter
subsLock sync.RWMutex
closed bool
closing chan struct{}
outputsWg sync.WaitGroup
processingMessagesWg sync.WaitGroup
}
type SubscriberConfig struct {
NSQAddress string
NSQPort int
Unmarshaler Unmarshaler
Channel string
}
func (c *SubscriberImpl) SubscribeEvent(ctx context.Context, topic string) (<-chan *Event, error) {
// default config
nsqCfg := nsq.NewConfig()
// maximum number of attempts
nsqCfg.MaxAttempts = 10
// maximum number of in flight messages
nsqCfg.MaxInFlight = 5
// Maximum duration when REQueueing
nsqCfg.MaxRequeueDelay = time.Second * 900
nsqCfg.DefaultRequeueDelay = time.Second * 0
// events channel
output := make(chan *Event)
// wait group
c.outputsWg.Add(1)
// create consumer
consumer, err := nsq.NewConsumer(topic, c.config.Channel, nsqCfg)
// handle error
if err != nil {
return nil, err
}
// set consumer
c.consumer = consumer
// consume handler
consumer.AddHandler(nsq.HandlerFunc(func(message *nsq.Message) error {
// wait group
c.processingMessagesWg.Add(1)
// wait for close
defer c.processingMessagesWg.Done()
// check if sub is closed
if c.isClosed() {
return errors.New("connection is already closed")
}
// unmarshal message
event, err := c.config.Unmarshaler.Unmarshal(message)
if err != nil {
return err
}
output <- event
return nil
}))
// set nsq address
addr := fmt.Sprintf("%s:%d", c.config.NSQAddress, c.config.NSQPort)
// connect to nsqd
if err := consumer.ConnectToNSQD(addr); err != nil {
return nil, err
}
// return channel
return output, err
}
func (c *SubscriberImpl) Close() error {
// check if already closed
if c.closed {
return nil
}
// close consumer
c.consumer.Stop()
// set closed
c.closed = true
//
c.log.Info("closing subscriber", logger.LogFields{})
defer c.log.Info("subscriber closed", logger.LogFields{})
// close channel
close(c.closing)
// return
return nil
}
func (c *SubscriberImpl) isClosed() bool {
c.subsLock.RLock()
defer c.subsLock.RUnlock()
return c.closed
}
func NewSubscriber(config *SubscriberConfig, log logger.LoggerAdapter) (Subscriber, error) {
return &SubscriberImpl{
log: log,
config: &SubscriberConfig{
NSQAddress: config.NSQAddress,
NSQPort: config.NSQPort,
Unmarshaler: config.Unmarshaler,
Channel: config.Channel,
},
consumer: &nsq.Consumer{},
logger: log,
subsLock: sync.RWMutex{},
closed: false,
closing: make(chan struct{}),
outputsWg: sync.WaitGroup{},
processingMessagesWg: sync.WaitGroup{},
}, nil
}

78
subscriber_test.go Normal file
View File

@ -0,0 +1,78 @@
package cml04eventer
import (
"context"
"encoding/json"
"testing"
"time"
"github.com/nsqio/go-nsq"
"github.com/stretchr/testify/assert"
)
// MockUnmarshaler is an implementation of Unmarshaler for testing
type MockUnmarshaler struct{}
func (m MockUnmarshaler) Unmarshal(nsqMsg *nsq.Message) (*Event, error) {
// Simply decode the JSON message for testing
var event Event
err := json.Unmarshal(nsqMsg.Body, &event)
return &event, err
}
func TestSubscriberImpl_SubscribeEvent(t *testing.T) {
//eventChan := make(chan *Event)
// Create an instance of MockLoggerAdapter
mockLogger := &MockLoggerAdapter{}
// Test configuration for the Subscriber
config := &SubscriberConfig{
NSQAddress: "localhost",
NSQPort: 4150,
Unmarshaler: MockUnmarshaler{},
Channel: "test-channel",
}
// Create an instance of SubscriberImpl for testing
subscriber, err := NewSubscriber(config, mockLogger)
assert.NoError(t, err)
// Create a context for testing
ctx := context.Background()
// Subscribe to the event
eventChan, err := subscriber.SubscribeEvent(ctx, "test-topic")
assert.NoError(t, err)
t.Log("Subscribed to event")
t.Log(eventChan)
// Wait for the message to be processed (you can adapt this as needed)
time.Sleep(1 * time.Second)
// Close the subscriber
err = subscriber.Close()
assert.NoError(t, err)
}
func TestSubscriberImpl_Close(t *testing.T) {
// Create an instance of MockLoggerAdapter
mockLogger := &MockLoggerAdapter{}
// Test configuration for the Subscriber
config := &SubscriberConfig{
NSQAddress: "localhost",
NSQPort: 4150,
Unmarshaler: MockUnmarshaler{},
Channel: "test-channel",
}
// Create an instance of SubscriberImpl for testing
subscriber, err := NewSubscriber(config, mockLogger)
assert.NoError(t, err)
// Close the subscriber
err = subscriber.Close()
assert.NoError(t, err)
}