This commit is contained in:
Albert Espín 2024-10-17 19:37:56 +02:00
parent a8265c7ae5
commit 759e924837
5 changed files with 61 additions and 9 deletions

View File

@ -12,7 +12,7 @@ type Event struct {
EventTopic string `json:"event_topic"`
EventSender string `json:"event_sender"`
EventMeta MetaData `json:"event_meta"`
EventData []byte `json:"event_data"`
EventData interface{} `json:"event_data"`
EventTime string `json:"event_time"`
}

3
go.mod
View File

@ -13,8 +13,11 @@ require (
require (
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/golang/snappy v0.0.1 // indirect
github.com/hashicorp/go-msgpack v0.5.5 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/sirupsen/logrus v1.9.3 // indirect
github.com/vmihailenco/msgpack/v5 v5.4.1 // indirect
github.com/vmihailenco/tagparser/v2 v2.0.0 // indirect
golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8 // indirect
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c // indirect
)

6
go.sum
View File

@ -7,6 +7,8 @@ github.com/golang/snappy v0.0.1 h1:Qgr9rKW7uDUkrbSmQeiDsGa8SjGyCOGtuasMWwvp2P4=
github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/hashicorp/go-msgpack v0.5.5 h1:i9R9JSrqIz0QVLz3sz+i3YJdT7TTSLcfLLzJi9aZTuI=
github.com/hashicorp/go-msgpack v0.5.5/go.mod h1:ahLV/dePpqEmjfWmKiqvPkv/twdG7iPBM1vqhUKIvfM=
github.com/nsqio/go-nsq v1.1.0 h1:PQg+xxiUjA7V+TLdXw7nVrJ5Jbl3sN86EhGCQj4+FYE=
github.com/nsqio/go-nsq v1.1.0/go.mod h1:vKq36oyeVXgsS5Q8YEO7WghqidAVXQlcFxzQbQTuDEY=
github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
@ -18,6 +20,10 @@ github.com/sirupsen/logrus v1.9.3/go.mod h1:naHLuLoDiP4jHNo9R0sCBMtWGeIprob74mVs
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5CcY=
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/vmihailenco/msgpack/v5 v5.4.1 h1:cQriyiUvjTwOHg8QZaPihLWeRAAVoCpE00IUPn0Bjt8=
github.com/vmihailenco/msgpack/v5 v5.4.1/go.mod h1:GaZTsDaehaPpQVyxrf5mtQlH+pc21PIudVV/E3rRQok=
github.com/vmihailenco/tagparser/v2 v2.0.0 h1:y09buUbR+b5aycVFQs/g70pqKVZNBmxwAhO7/IwNM9g=
github.com/vmihailenco/tagparser/v2 v2.0.0/go.mod h1:Wri+At7QHww0WTrCBeu4J6bNtoV6mEfg5OIWRZA9qds=
golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8 h1:0A+M6Uqn+Eje4kHMK80dtF3JCXC4ykBgQG4Fe06QRhQ=
golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=

View File

@ -7,6 +7,7 @@ import (
"github.com/nsqio/go-nsq"
"github.com/pkg/errors"
"github.com/vmihailenco/msgpack/v5"
)
type Marshaler interface {
@ -86,3 +87,38 @@ func (g GobMarshaler) Unmarshal(nsqMsg *nsq.Message) (*Event, error) {
// return decoded message
return &decodedEvent, nil
}
type MsgPackMarshaler struct{}
func (MsgPackMarshaler) Marshal(topic string, msg *Event) ([]byte, error) {
// buffer holder
buf := bytes.Buffer{}
// create encoder
encoder := msgpack.NewEncoder(&buf)
// encode message
if err := encoder.Encode(msg); err != nil {
return nil, errors.Wrap(err, "cannot encode message")
}
// return encoded message
return buf.Bytes(), nil
}
func (MsgPackMarshaler) Unmarshal(nsqMsg *nsq.Message) (*Event, error) {
// buffer holder
buf := new(bytes.Buffer)
// write message data to buffer
_, err := buf.Write(nsqMsg.Body)
if err != nil {
return nil, errors.Wrap(err, "cannot write nsq message data to buffer")
}
// create decoder
decoder := msgpack.NewDecoder(buf)
// decode event from buffer message
var decodedEvent Event
// decode message
if err := decoder.Decode(&decodedEvent); err != nil {
return nil, errors.Wrap(err, "cannot decode message")
}
// return decoded message
return &decodedEvent, nil
}

View File

@ -34,7 +34,6 @@ func (p *PublisherImpl) PublishEvent(event *Event) error {
logFields := logger.LogFields{
"event_id": event.EventID,
"event_sender": event.EventSender,
"event_data_length": len(event.EventData),
}
// log event
p.log.Info("new event published", logFields)
@ -65,6 +64,14 @@ func (p *PublisherImpl) Close() error {
return nil
}
func DefaultPublisherConfig() *PublisherImplConfig {
return &PublisherImplConfig{
NSQAddress: "",
NSQPort: 0,
Marshaler: MsgPackMarshaler{},
}
}
// NewPublisher creates a new publisher
func NewPublisher(config *PublisherImplConfig, log logger.LoggerAdapter) (Publisher, error) {
// create nsq producer address