From 759e924837d09e34ce2cda5750179bc614af7626 Mon Sep 17 00:00:00 2001 From: albert Date: Thu, 17 Oct 2024 19:37:56 +0200 Subject: [PATCH] v2 --- event.go | 12 ++++++------ go.mod | 3 +++ go.sum | 6 ++++++ marshal.go | 36 ++++++++++++++++++++++++++++++++++++ publisher.go | 13 ++++++++++--- 5 files changed, 61 insertions(+), 9 deletions(-) diff --git a/event.go b/event.go index d0a8b68..a828616 100644 --- a/event.go +++ b/event.go @@ -8,12 +8,12 @@ import ( // Event struct type Event struct { - EventID string `json:"event_id"` - EventTopic string `json:"event_topic"` - EventSender string `json:"event_sender"` - EventMeta MetaData `json:"event_meta"` - EventData []byte `json:"event_data"` - EventTime string `json:"event_time"` + EventID string `json:"event_id"` + EventTopic string `json:"event_topic"` + EventSender string `json:"event_sender"` + EventMeta MetaData `json:"event_meta"` + EventData interface{} `json:"event_data"` + EventTime string `json:"event_time"` } // NewEvent creates a new event diff --git a/go.mod b/go.mod index c194598..40be6c9 100644 --- a/go.mod +++ b/go.mod @@ -13,8 +13,11 @@ require ( require ( github.com/davecgh/go-spew v1.1.1 // indirect github.com/golang/snappy v0.0.1 // indirect + github.com/hashicorp/go-msgpack v0.5.5 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect github.com/sirupsen/logrus v1.9.3 // indirect + github.com/vmihailenco/msgpack/v5 v5.4.1 // indirect + github.com/vmihailenco/tagparser/v2 v2.0.0 // indirect golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8 // indirect gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c // indirect ) diff --git a/go.sum b/go.sum index 581eac8..5585ab8 100644 --- a/go.sum +++ b/go.sum @@ -7,6 +7,8 @@ github.com/golang/snappy v0.0.1 h1:Qgr9rKW7uDUkrbSmQeiDsGa8SjGyCOGtuasMWwvp2P4= github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/hashicorp/go-msgpack v0.5.5 h1:i9R9JSrqIz0QVLz3sz+i3YJdT7TTSLcfLLzJi9aZTuI= +github.com/hashicorp/go-msgpack v0.5.5/go.mod h1:ahLV/dePpqEmjfWmKiqvPkv/twdG7iPBM1vqhUKIvfM= github.com/nsqio/go-nsq v1.1.0 h1:PQg+xxiUjA7V+TLdXw7nVrJ5Jbl3sN86EhGCQj4+FYE= github.com/nsqio/go-nsq v1.1.0/go.mod h1:vKq36oyeVXgsS5Q8YEO7WghqidAVXQlcFxzQbQTuDEY= github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= @@ -18,6 +20,10 @@ github.com/sirupsen/logrus v1.9.3/go.mod h1:naHLuLoDiP4jHNo9R0sCBMtWGeIprob74mVs github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5CcY= github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/vmihailenco/msgpack/v5 v5.4.1 h1:cQriyiUvjTwOHg8QZaPihLWeRAAVoCpE00IUPn0Bjt8= +github.com/vmihailenco/msgpack/v5 v5.4.1/go.mod h1:GaZTsDaehaPpQVyxrf5mtQlH+pc21PIudVV/E3rRQok= +github.com/vmihailenco/tagparser/v2 v2.0.0 h1:y09buUbR+b5aycVFQs/g70pqKVZNBmxwAhO7/IwNM9g= +github.com/vmihailenco/tagparser/v2 v2.0.0/go.mod h1:Wri+At7QHww0WTrCBeu4J6bNtoV6mEfg5OIWRZA9qds= golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8 h1:0A+M6Uqn+Eje4kHMK80dtF3JCXC4ykBgQG4Fe06QRhQ= golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= diff --git a/marshal.go b/marshal.go index 0f2c6ef..f5e763f 100644 --- a/marshal.go +++ b/marshal.go @@ -7,6 +7,7 @@ import ( "github.com/nsqio/go-nsq" "github.com/pkg/errors" + "github.com/vmihailenco/msgpack/v5" ) type Marshaler interface { @@ -86,3 +87,38 @@ func (g GobMarshaler) Unmarshal(nsqMsg *nsq.Message) (*Event, error) { // return decoded message return &decodedEvent, nil } + +type MsgPackMarshaler struct{} + +func (MsgPackMarshaler) Marshal(topic string, msg *Event) ([]byte, error) { + // buffer holder + buf := bytes.Buffer{} + // create encoder + encoder := msgpack.NewEncoder(&buf) + // encode message + if err := encoder.Encode(msg); err != nil { + return nil, errors.Wrap(err, "cannot encode message") + } + // return encoded message + return buf.Bytes(), nil +} + +func (MsgPackMarshaler) Unmarshal(nsqMsg *nsq.Message) (*Event, error) { + // buffer holder + buf := new(bytes.Buffer) + // write message data to buffer + _, err := buf.Write(nsqMsg.Body) + if err != nil { + return nil, errors.Wrap(err, "cannot write nsq message data to buffer") + } + // create decoder + decoder := msgpack.NewDecoder(buf) + // decode event from buffer message + var decodedEvent Event + // decode message + if err := decoder.Decode(&decodedEvent); err != nil { + return nil, errors.Wrap(err, "cannot decode message") + } + // return decoded message + return &decodedEvent, nil +} diff --git a/publisher.go b/publisher.go index 9447701..38d7d71 100644 --- a/publisher.go +++ b/publisher.go @@ -32,9 +32,8 @@ func (p *PublisherImpl) PublishEvent(event *Event) error { defer p.lock.Unlock() // log fields logFields := logger.LogFields{ - "event_id": event.EventID, - "event_sender": event.EventSender, - "event_data_length": len(event.EventData), + "event_id": event.EventID, + "event_sender": event.EventSender, } // log event p.log.Info("new event published", logFields) @@ -65,6 +64,14 @@ func (p *PublisherImpl) Close() error { return nil } +func DefaultPublisherConfig() *PublisherImplConfig { + return &PublisherImplConfig{ + NSQAddress: "", + NSQPort: 0, + Marshaler: MsgPackMarshaler{}, + } +} + // NewPublisher creates a new publisher func NewPublisher(config *PublisherImplConfig, log logger.LoggerAdapter) (Publisher, error) { // create nsq producer address