commit 4c45dd216f98f81aea07d192b3abb3735325d34e Author: aespinro Date: Sun Aug 18 22:44:06 2024 +0200 wip diff --git a/README.md b/README.md new file mode 100644 index 0000000..7279f87 --- /dev/null +++ b/README.md @@ -0,0 +1 @@ +# CML04 Eventer \ No newline at end of file diff --git a/event.go b/event.go new file mode 100644 index 0000000..d0a8b68 --- /dev/null +++ b/event.go @@ -0,0 +1,34 @@ +package cml04eventer + +import ( + "time" + + "github.com/google/uuid" +) + +// Event struct +type Event struct { + EventID string `json:"event_id"` + EventTopic string `json:"event_topic"` + EventSender string `json:"event_sender"` + EventMeta MetaData `json:"event_meta"` + EventData []byte `json:"event_data"` + EventTime string `json:"event_time"` +} + +// NewEvent creates a new event +func NewEvent(eventSender, eventTopic string, eventMeta MetaData, eventData []byte) *Event { + // check if eventMeta is nil + if eventMeta == nil { + eventMeta = NewMetaData() + } + // return event + return &Event{ + EventID: uuid.NewString(), + EventTopic: eventTopic, + EventSender: eventSender, + EventMeta: eventMeta, + EventData: eventData, + EventTime: time.Now().Format(time.RFC3339), + } +} diff --git a/event_test.go b/event_test.go new file mode 100644 index 0000000..44605be --- /dev/null +++ b/event_test.go @@ -0,0 +1,48 @@ +package cml04eventer + +import ( + "testing" + + "github.com/nsqio/go-nsq" + "github.com/stretchr/testify/assert" +) + +func TestJSONMarshaler(t *testing.T) { + // Crear una instancia de JSONMarshaler + marshaler := JSONMarshaler{} + + // Crear un evento para probar la serialización + eventSender := "test-sender" + eventTopic := "test-topic" + eventMeta := MetaData{"key": "value"} + eventData := []byte("test-data") + event := NewEvent(eventSender, eventTopic, eventMeta, eventData) + + // Serializar el evento utilizando el Marshaler + serialized, err := marshaler.Marshal(eventTopic, event) + assert.NoError(t, err) + + // Deserializar el mensaje serializado + nsqMsg := &nsq.Message{Body: serialized} + decodedEvent, err := marshaler.Unmarshal(nsqMsg) + assert.NoError(t, err) + + // Comparar el evento original con el evento deserializado + assert.Equal(t, event, decodedEvent) +} + +func TestJSONMarshaler_ErrorOnInvalidJSON(t *testing.T) { + // Crear una instancia de JSONMarshaler + marshaler := JSONMarshaler{} + + // Crear un mensaje JSON inválido (por ejemplo, falta una llave) + invalidJSON := []byte(`{"key": "value"`) + + // Intentar deserializar el mensaje JSON inválido + nsqMsg := &nsq.Message{Body: invalidJSON} + decodedEvent, err := marshaler.Unmarshal(nsqMsg) + + // Debe devolver un error + assert.Error(t, err) + assert.Nil(t, decodedEvent) // El evento deserializado debe ser nulo +} diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..c0b2d47 --- /dev/null +++ b/go.mod @@ -0,0 +1,20 @@ +module git.espin.casa/albert/cml04-eventer + +go 1.22.0 + +require ( + git.espin.casa/albert/logger v0.0.0-20240221100041-dc3cb01119a3 + github.com/google/uuid v1.6.0 + github.com/nsqio/go-nsq v1.1.0 + github.com/pkg/errors v0.9.1 + github.com/stretchr/testify v1.8.4 +) + +require ( + github.com/davecgh/go-spew v1.1.1 // indirect + github.com/golang/snappy v0.0.1 // indirect + github.com/pmezard/go-difflib v1.0.0 // indirect + github.com/sirupsen/logrus v1.9.3 // indirect + golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8 // indirect + gopkg.in/yaml.v3 v3.0.1 // indirect +) diff --git a/go.sum b/go.sum new file mode 100644 index 0000000..de9830b --- /dev/null +++ b/go.sum @@ -0,0 +1,28 @@ +git.espin.casa/albert/logger v0.0.0-20240221100041-dc3cb01119a3 h1:HUEESn4o8sVXAUJWGJAATeitiNRxVzzwBU8RiIW4Wzc= +git.espin.casa/albert/logger v0.0.0-20240221100041-dc3cb01119a3/go.mod h1:P1yAUiotJurq7j/wZt6Cnel17HChplkz0E40WD8a5to= +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/golang/snappy v0.0.1 h1:Qgr9rKW7uDUkrbSmQeiDsGa8SjGyCOGtuasMWwvp2P4= +github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= +github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= +github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/nsqio/go-nsq v1.1.0 h1:PQg+xxiUjA7V+TLdXw7nVrJ5Jbl3sN86EhGCQj4+FYE= +github.com/nsqio/go-nsq v1.1.0/go.mod h1:vKq36oyeVXgsS5Q8YEO7WghqidAVXQlcFxzQbQTuDEY= +github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= +github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/sirupsen/logrus v1.9.3 h1:dueUQJ1C2q9oE3F7wvmSGAaVtTmUizReu6fjN8uqzbQ= +github.com/sirupsen/logrus v1.9.3/go.mod h1:naHLuLoDiP4jHNo9R0sCBMtWGeIprob74mVsIT4qYEQ= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk= +github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo= +golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8 h1:0A+M6Uqn+Eje4kHMK80dtF3JCXC4ykBgQG4Fe06QRhQ= +golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/marshal.go b/marshal.go new file mode 100644 index 0000000..0f2c6ef --- /dev/null +++ b/marshal.go @@ -0,0 +1,88 @@ +package cml04eventer + +import ( + "bytes" + "encoding/gob" + "encoding/json" + + "github.com/nsqio/go-nsq" + "github.com/pkg/errors" +) + +type Marshaler interface { + Marshal(topic string, msg *Event) ([]byte, error) +} + +type Unmarshaler interface { + Unmarshal(*nsq.Message) (*Event, error) +} + +type JSONMarshaler struct{} + +func (j JSONMarshaler) Marshal(topic string, msg *Event) ([]byte, error) { + // buffer holder + buf := new(bytes.Buffer) + // create encoder + encoder := json.NewEncoder(buf) + // encode message + if err := encoder.Encode(msg); err != nil { + return nil, errors.Wrap(err, "cannot encode message") + } + // return encoded message + return buf.Bytes(), nil +} + +func (j JSONMarshaler) Unmarshal(nsqMsg *nsq.Message) (*Event, error) { + // buffer holder + buf := new(bytes.Buffer) + // write message data to buffer + _, err := buf.Write(nsqMsg.Body) + if err != nil { + return nil, errors.Wrap(err, "cannot write nsq message data to buffer") + } + // create decoder + decoder := json.NewDecoder(buf) + // decode event from buffer message + var decodedEvent Event + // decode message + if err := decoder.Decode(&decodedEvent); err != nil { + return nil, errors.Wrap(err, "cannot decode message") + } + // return decoded message + return &decodedEvent, nil +} + +type GobMarshaler struct{} + +func (g GobMarshaler) Marshal(topic string, msg *Event) ([]byte, error) { + // buffer holder + buf := new(bytes.Buffer) + // create encoder + encoder := gob.NewEncoder(buf) + // encode message + if err := encoder.Encode(msg); err != nil { + return nil, errors.Wrap(err, "cannot encode message") + } + // return encoded message + return buf.Bytes(), nil +} + +func (g GobMarshaler) Unmarshal(nsqMsg *nsq.Message) (*Event, error) { + // buffer holder + buf := new(bytes.Buffer) + // write message data to buffer + _, err := buf.Write(nsqMsg.Body) + if err != nil { + return nil, errors.Wrap(err, "cannot write nsq message data to buffer") + } + // create decoder + decoder := gob.NewDecoder(buf) + // decode event from buffer message + var decodedEvent Event + // decode message + if err := decoder.Decode(&decodedEvent); err != nil { + return nil, errors.Wrap(err, "cannot decode message") + } + // return decoded message + return &decodedEvent, nil +} diff --git a/marshal_test.go b/marshal_test.go new file mode 100644 index 0000000..f6e9783 --- /dev/null +++ b/marshal_test.go @@ -0,0 +1,56 @@ +package cml04eventer + +import ( + "testing" + + "github.com/nsqio/go-nsq" + "github.com/stretchr/testify/assert" +) + +func TestJSONMarshaler_MarshalUnmarshal(t *testing.T) { + // Crear una instancia de JSONMarshaler + marshaler := JSONMarshaler{} + + // Crear un evento para probar la serialización + eventSender := "test-sender" + eventTopic := "test-topic" + eventMeta := MetaData{"key": "value"} + eventData := []byte("test-data") + event := NewEvent(eventSender, eventTopic, eventMeta, eventData) + + // Serializar el evento utilizando el Marshaler + serialized, err := marshaler.Marshal(eventTopic, event) + assert.NoError(t, err) + + // Deserializar el mensaje serializado + nsqMsg := &nsq.Message{Body: serialized} + decodedEvent, err := marshaler.Unmarshal(nsqMsg) + assert.NoError(t, err) + + // Comparar el evento original con el evento deserializado + assert.Equal(t, event, decodedEvent) +} + +func TestGobMarshaler_MarshalUnmarshal(t *testing.T) { + // Crear una instancia de GobMarshaler + marshaler := GobMarshaler{} + + // Crear un evento para probar la serialización + eventSender := "test-sender" + eventTopic := "test-topic" + eventMeta := MetaData{"key": "value"} + eventData := []byte("test-data") + event := NewEvent(eventSender, eventTopic, eventMeta, eventData) + + // Serializar el evento utilizando el Marshaler + serialized, err := marshaler.Marshal(eventTopic, event) + assert.NoError(t, err) + + // Deserializar el mensaje serializado + nsqMsg := &nsq.Message{Body: serialized} + decodedEvent, err := marshaler.Unmarshal(nsqMsg) + assert.NoError(t, err) + + // Comparar el evento original con el evento deserializado + assert.Equal(t, event, decodedEvent) +} diff --git a/metada_test.go b/metada_test.go new file mode 100644 index 0000000..b7a1c15 --- /dev/null +++ b/metada_test.go @@ -0,0 +1,58 @@ +package cml04eventer + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestMetaData_Get(t *testing.T) { + // Crear una instancia de MetaData + meta := NewMetaData() + + // Agregar un valor a MetaData + key := "test-key" + value := "test-value" + meta.Set(key, value) + + // Obtener el valor y verificar que sea correcto + result, err := meta.Get(key) + assert.NoError(t, err) + assert.Equal(t, value, result) + + // Intentar obtener un valor inexistente y verificar que se devuelva un error + _, err = meta.Get("non-existent-key") + assert.Error(t, err) +} + +func TestMetaData_Set(t *testing.T) { + // Crear una instancia de MetaData + meta := NewMetaData() + + // Establecer un valor en MetaData + key := "test-key" + value := "test-value" + meta.Set(key, value) + + // Obtener el valor y verificar que sea correcto + result, err := meta.Get(key) + assert.NoError(t, err) + assert.Equal(t, value, result) +} + +func TestMetaData_Delete(t *testing.T) { + // Crear una instancia de MetaData + meta := NewMetaData() + + // Agregar un valor a MetaData + key := "test-key" + value := "test-value" + meta.Set(key, value) + + // Eliminar el valor y verificar que se haya eliminado correctamente + meta.Delete(key) + + // Intentar obtener el valor eliminado y verificar que se devuelva un error + _, err := meta.Get(key) + assert.Error(t, err) +} diff --git a/metadata.go b/metadata.go new file mode 100644 index 0000000..0c7d5a7 --- /dev/null +++ b/metadata.go @@ -0,0 +1,25 @@ +package cml04eventer + +import "errors" + +type MetaData map[string]interface{} + +func (m MetaData) Get(key string) (interface{}, error) { + value, ok := m[key] + if !ok { + return nil, errors.New("key not found") + } + return value, nil +} + +func (m MetaData) Set(key string, value interface{}) { + m[key] = value +} + +func (m MetaData) Delete(key string) { + delete(m, key) +} + +func NewMetaData() MetaData { + return make(map[string]interface{}) +} diff --git a/publisher.go b/publisher.go new file mode 100644 index 0000000..9447701 --- /dev/null +++ b/publisher.go @@ -0,0 +1,88 @@ +package cml04eventer + +import ( + "fmt" + "sync" + + "git.espin.casa/albert/logger" + "github.com/nsqio/go-nsq" +) + +type Publisher interface { + PublishEvent(event *Event) error + Close() error +} + +type PublisherImpl struct { + config *PublisherImplConfig + log logger.LoggerAdapter + producer *nsq.Producer + lock *sync.Mutex +} + +type PublisherImplConfig struct { + NSQAddress string + NSQPort int + Marshaler Marshaler +} + +// PublishEvent publishes an event +func (p *PublisherImpl) PublishEvent(event *Event) error { + p.lock.Lock() + defer p.lock.Unlock() + // log fields + logFields := logger.LogFields{ + "event_id": event.EventID, + "event_sender": event.EventSender, + "event_data_length": len(event.EventData), + } + // log event + p.log.Info("new event published", logFields) + // marshal event + b, err := p.config.Marshaler.Marshal(event.EventTopic, event) + if err != nil { + p.log.Error("error marshalling event", err, logFields) + return err + } + // publish event + if err := p.producer.Publish(event.EventTopic, b); err != nil { + p.log.Error("error publishing event", err, logFields) + return err + } + return nil +} + +func (p *PublisherImpl) Close() error { + p.lock.Lock() + defer func() { + p.lock.Unlock() + p.log.Info("publisher closed", logger.LogFields{}) + }() + // log + p.log.Info("closing publisher", logger.LogFields{}) + // stop producer + p.producer.Stop() + return nil +} + +// NewPublisher creates a new publisher +func NewPublisher(config *PublisherImplConfig, log logger.LoggerAdapter) (Publisher, error) { + // create nsq producer address + addr := fmt.Sprintf("%s:%d", config.NSQAddress, config.NSQPort) + // create nsq producer + producer, err := nsq.NewProducer(addr, nsq.NewConfig()) + // handle error + if err != nil { + log.Error("error creating nsq producer", err, logger.LogFields{ + "nsq_address": addr, + }) + return nil, err + } + // return publishe + return &PublisherImpl{ + config: config, + log: log, + producer: producer, + lock: &sync.Mutex{}, + }, nil +} diff --git a/publisher_test.go b/publisher_test.go new file mode 100644 index 0000000..98319fb --- /dev/null +++ b/publisher_test.go @@ -0,0 +1,89 @@ +package cml04eventer + +import ( + "encoding/json" + "testing" + + "git.espin.casa/albert/logger" + "github.com/nsqio/go-nsq" + "github.com/stretchr/testify/assert" +) + +// MockLoggerAdapter is an implementation of LoggerAdapter for testing +type MockLoggerAdapter struct{} +type LoggerFields map[string]interface{} + +func (m *MockLoggerAdapter) Info(msg string, fields logger.LogFields) {} +func (m *MockLoggerAdapter) Error(msg string, err error, fields logger.LogFields) {} +func (m *MockLoggerAdapter) Debug(msg string, fields logger.LogFields) {} +func (m *MockLoggerAdapter) Warn(msg string, fields logger.LogFields) {} +func (m *MockLoggerAdapter) Trace(msg string, fields logger.LogFields) {} +func (m *MockLoggerAdapter) Fatal(msg string, err error, fields logger.LogFields) {} +func (m *MockLoggerAdapter) Panic(msg string, err error, fields logger.LogFields) {} + +// MockMarshaler is an implementation of Marshaler for testing +type MockMarshaler struct{} + +func (m MockMarshaler) Marshal(topic string, msg *Event) ([]byte, error) { + // Simply convert the event to JSON for testing + return json.Marshal(msg) +} + +func (m MockMarshaler) Unmarshal(nsqMsg *nsq.Message) (*Event, error) { + // Simply decode the JSON message for testing + var event Event + err := json.Unmarshal(nsqMsg.Body, &event) + return &event, err +} + +func TestPublisherImpl_PublishEvent(t *testing.T) { + // Create an instance of MockLoggerAdapter + mockLogger := &MockLoggerAdapter{} + + // Test configuration for the Publisher + config := &PublisherImplConfig{ + NSQAddress: "localhost", + NSQPort: 4150, + Marshaler: MockMarshaler{}, + } + + // Create an instance of PublisherImpl for testing + publisher, err := NewPublisher(config, mockLogger) + assert.NoError(t, err) + + // Create a test event + event := &Event{ + EventID: "123", + EventSender: "test-sender", + EventTopic: "test-topic", + EventData: []byte("test-data"), + } + + // Publish the event + err = publisher.PublishEvent(event) + assert.NoError(t, err) + + // Close the publisher + err = publisher.Close() + assert.NoError(t, err) +} + +func TestPublisherImpl_Close(t *testing.T) { + // Create an instance of MockLoggerAdapter + mockLogger := &MockLoggerAdapter{} + + // Test configuration for the Publisher + config := &PublisherImplConfig{ + NSQAddress: "localhost", + NSQPort: 4150, + Marshaler: MockMarshaler{}, + } + + // Create an instance of PublisherImpl for testing + publisher, err := NewPublisher(config, mockLogger) + assert.NoError(t, err) + + // Close the publisher + err = publisher.Close() + assert.NoError(t, err) +} diff --git a/subscriber.go b/subscriber.go new file mode 100644 index 0000000..286ec51 --- /dev/null +++ b/subscriber.go @@ -0,0 +1,129 @@ +package cml04eventer + +import ( + "context" + "errors" + "fmt" + "sync" + "time" + + "git.espin.casa/albert/logger" + "github.com/nsqio/go-nsq" +) + +type Subscriber interface { + SubscribeEvent(ctx context.Context, topic string) (<-chan *Event, error) + Close() error +} + +type SubscriberImpl struct { + log logger.LoggerAdapter + config *SubscriberConfig + consumer *nsq.Consumer + logger logger.LoggerAdapter + subsLock sync.RWMutex + closed bool + closing chan struct{} + outputsWg sync.WaitGroup + processingMessagesWg sync.WaitGroup +} + +type SubscriberConfig struct { + NSQAddress string + NSQPort int + Unmarshaler Unmarshaler + Channel string +} + +func (c *SubscriberImpl) SubscribeEvent(ctx context.Context, topic string) (<-chan *Event, error) { + // default config + nsqCfg := nsq.NewConfig() + // maximum number of attempts + nsqCfg.MaxAttempts = 10 + // maximum number of in flight messages + nsqCfg.MaxInFlight = 5 + // Maximum duration when REQueueing + nsqCfg.MaxRequeueDelay = time.Second * 900 + nsqCfg.DefaultRequeueDelay = time.Second * 0 + // events channel + output := make(chan *Event) + // wait group + c.outputsWg.Add(1) + // create consumer + consumer, err := nsq.NewConsumer(topic, c.config.Channel, nsqCfg) + // handle error + if err != nil { + return nil, err + } + // set consumer + c.consumer = consumer + // consume handler + consumer.AddHandler(nsq.HandlerFunc(func(message *nsq.Message) error { + // wait group + c.processingMessagesWg.Add(1) + // wait for close + defer c.processingMessagesWg.Done() + // check if sub is closed + if c.isClosed() { + return errors.New("connection is already closed") + } + // unmarshal message + event, err := c.config.Unmarshaler.Unmarshal(message) + if err != nil { + return err + } + output <- event + return nil + })) + // set nsq address + addr := fmt.Sprintf("%s:%d", c.config.NSQAddress, c.config.NSQPort) + // connect to nsqd + if err := consumer.ConnectToNSQD(addr); err != nil { + return nil, err + } + // return channel + return output, err +} + +func (c *SubscriberImpl) Close() error { + // check if already closed + if c.closed { + return nil + } + // close consumer + c.consumer.Stop() + // set closed + c.closed = true + // + c.log.Info("closing subscriber", logger.LogFields{}) + defer c.log.Info("subscriber closed", logger.LogFields{}) + // close channel + close(c.closing) + // return + return nil +} + +func (c *SubscriberImpl) isClosed() bool { + c.subsLock.RLock() + defer c.subsLock.RUnlock() + return c.closed +} + +func NewSubscriber(config *SubscriberConfig, log logger.LoggerAdapter) (Subscriber, error) { + return &SubscriberImpl{ + log: log, + config: &SubscriberConfig{ + NSQAddress: config.NSQAddress, + NSQPort: config.NSQPort, + Unmarshaler: config.Unmarshaler, + Channel: config.Channel, + }, + consumer: &nsq.Consumer{}, + logger: log, + subsLock: sync.RWMutex{}, + closed: false, + closing: make(chan struct{}), + outputsWg: sync.WaitGroup{}, + processingMessagesWg: sync.WaitGroup{}, + }, nil +} diff --git a/subscriber_test.go b/subscriber_test.go new file mode 100644 index 0000000..888945f --- /dev/null +++ b/subscriber_test.go @@ -0,0 +1,78 @@ +package cml04eventer + +import ( + "context" + "encoding/json" + "testing" + "time" + + "github.com/nsqio/go-nsq" + "github.com/stretchr/testify/assert" +) + +// MockUnmarshaler is an implementation of Unmarshaler for testing +type MockUnmarshaler struct{} + +func (m MockUnmarshaler) Unmarshal(nsqMsg *nsq.Message) (*Event, error) { + // Simply decode the JSON message for testing + var event Event + err := json.Unmarshal(nsqMsg.Body, &event) + return &event, err +} + +func TestSubscriberImpl_SubscribeEvent(t *testing.T) { + //eventChan := make(chan *Event) + // Create an instance of MockLoggerAdapter + mockLogger := &MockLoggerAdapter{} + + // Test configuration for the Subscriber + config := &SubscriberConfig{ + NSQAddress: "localhost", + NSQPort: 4150, + Unmarshaler: MockUnmarshaler{}, + Channel: "test-channel", + } + + // Create an instance of SubscriberImpl for testing + subscriber, err := NewSubscriber(config, mockLogger) + assert.NoError(t, err) + + // Create a context for testing + ctx := context.Background() + + // Subscribe to the event + eventChan, err := subscriber.SubscribeEvent(ctx, "test-topic") + assert.NoError(t, err) + + t.Log("Subscribed to event") + + t.Log(eventChan) + + // Wait for the message to be processed (you can adapt this as needed) + time.Sleep(1 * time.Second) + + // Close the subscriber + err = subscriber.Close() + assert.NoError(t, err) +} + +func TestSubscriberImpl_Close(t *testing.T) { + // Create an instance of MockLoggerAdapter + mockLogger := &MockLoggerAdapter{} + + // Test configuration for the Subscriber + config := &SubscriberConfig{ + NSQAddress: "localhost", + NSQPort: 4150, + Unmarshaler: MockUnmarshaler{}, + Channel: "test-channel", + } + + // Create an instance of SubscriberImpl for testing + subscriber, err := NewSubscriber(config, mockLogger) + assert.NoError(t, err) + + // Close the subscriber + err = subscriber.Close() + assert.NoError(t, err) +}