commit 4e10b796085e9e389b3378de89437fc7ddfe45e2 Author: albert Date: Sun Oct 20 11:59:18 2024 +0200 first commit diff --git a/event.go b/event.go new file mode 100644 index 0000000..54d5176 --- /dev/null +++ b/event.go @@ -0,0 +1,34 @@ +package rasrush + +import ( + "time" + + "github.com/google/uuid" +) + +// Event struct +type Event struct { + EventID string `json:"event_id"` + EventTopic string `json:"event_topic"` + EventSender string `json:"event_sender"` + EventMeta MetaData `json:"event_meta"` + EventData interface{} `json:"event_data"` + EventTime string `json:"event_time"` +} + +// NewEvent creates a new event +func NewEvent(eventSender, eventTopic string, eventMeta MetaData, eventData []byte) *Event { + // check if eventMeta is nil + if eventMeta == nil { + eventMeta = NewMetaData() + } + // return event + return &Event{ + EventID: uuid.NewString(), + EventTopic: eventTopic, + EventSender: eventSender, + EventMeta: eventMeta, + EventData: eventData, + EventTime: time.Now().UTC().Format(time.RFC3339), + } +} diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..2d5e691 --- /dev/null +++ b/go.mod @@ -0,0 +1,15 @@ +module git.espin.casa/albert/rasrush + +go 1.23.1 + +require ( + github.com/google/uuid v1.6.0 + github.com/nsqio/go-nsq v1.1.0 + github.com/pkg/errors v0.9.1 + github.com/vmihailenco/msgpack/v5 v5.4.1 +) + +require ( + github.com/golang/snappy v0.0.1 // indirect + github.com/vmihailenco/tagparser/v2 v2.0.0 // indirect +) diff --git a/go.sum b/go.sum new file mode 100644 index 0000000..067e3f4 --- /dev/null +++ b/go.sum @@ -0,0 +1,20 @@ +github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8= +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/golang/snappy v0.0.1 h1:Qgr9rKW7uDUkrbSmQeiDsGa8SjGyCOGtuasMWwvp2P4= +github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= +github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= +github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/nsqio/go-nsq v1.1.0 h1:PQg+xxiUjA7V+TLdXw7nVrJ5Jbl3sN86EhGCQj4+FYE= +github.com/nsqio/go-nsq v1.1.0/go.mod h1:vKq36oyeVXgsS5Q8YEO7WghqidAVXQlcFxzQbQTuDEY= +github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= +github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/stretchr/testify v1.6.1 h1:hDPOHmpOpP40lSULcqw7IrRb/u7w6RpDC9399XyoNd0= +github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/vmihailenco/msgpack/v5 v5.4.1 h1:cQriyiUvjTwOHg8QZaPihLWeRAAVoCpE00IUPn0Bjt8= +github.com/vmihailenco/msgpack/v5 v5.4.1/go.mod h1:GaZTsDaehaPpQVyxrf5mtQlH+pc21PIudVV/E3rRQok= +github.com/vmihailenco/tagparser/v2 v2.0.0 h1:y09buUbR+b5aycVFQs/g70pqKVZNBmxwAhO7/IwNM9g= +github.com/vmihailenco/tagparser/v2 v2.0.0/go.mod h1:Wri+At7QHww0WTrCBeu4J6bNtoV6mEfg5OIWRZA9qds= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c h1:dUUwHk2QECo/6vqA44rthZ8ie2QXMNeKRTHCNY2nXvo= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/marshal.go b/marshal.go new file mode 100644 index 0000000..2a8965e --- /dev/null +++ b/marshal.go @@ -0,0 +1,52 @@ +package rasrush + +import ( + "bytes" + + "github.com/nsqio/go-nsq" + "github.com/pkg/errors" + "github.com/vmihailenco/msgpack/v5" +) + +type Marshaler interface { + Marshal(topic string, msg *Event) ([]byte, error) +} + +type Unmarshaler interface { + Unmarshal(*nsq.Message) (*Event, error) +} + +type MsgPackMarshaler struct{} + +func (MsgPackMarshaler) Marshal(topic string, msg *Event) ([]byte, error) { + // buffer holder + buf := bytes.Buffer{} + // create encoder + encoder := msgpack.NewEncoder(&buf) + // encode message + if err := encoder.Encode(msg); err != nil { + return nil, errors.Wrap(err, "cannot encode message") + } + // return encoded message + return buf.Bytes(), nil +} + +func (MsgPackMarshaler) Unmarshal(data []byte) (*Event, error) { + // buffer holder + buf := new(bytes.Buffer) + // write message data to buffer + _, err := buf.Write(data) + if err != nil { + return nil, errors.Wrap(err, "cannot write data buffer") + } + // create decoder + decoder := msgpack.NewDecoder(buf) + // decode event from buffer message + var decodedEvent Event + // decode message + if err := decoder.Decode(&decodedEvent); err != nil { + return nil, errors.Wrap(err, "cannot decode message") + } + // return decoded message + return &decodedEvent, nil +} diff --git a/metadata.go b/metadata.go new file mode 100644 index 0000000..89caf1f --- /dev/null +++ b/metadata.go @@ -0,0 +1,25 @@ +package rasrush + +import "errors" + +type MetaData map[string]interface{} + +func (m MetaData) Get(key string) (interface{}, error) { + value, ok := m[key] + if !ok { + return nil, errors.New("key not found") + } + return value, nil +} + +func (m MetaData) Set(key string, value interface{}) { + m[key] = value +} + +func (m MetaData) Delete(key string) { + delete(m, key) +} + +func NewMetaData() MetaData { + return make(map[string]interface{}) +}