53 lines
1.2 KiB
Go
53 lines
1.2 KiB
Go
package rasrush
|
|
|
|
import (
|
|
"bytes"
|
|
|
|
"github.com/nsqio/go-nsq"
|
|
"github.com/pkg/errors"
|
|
"github.com/vmihailenco/msgpack/v5"
|
|
)
|
|
|
|
type Marshaler interface {
|
|
Marshal(topic string, msg *Event) ([]byte, error)
|
|
}
|
|
|
|
type Unmarshaler interface {
|
|
Unmarshal(*nsq.Message) (*Event, error)
|
|
}
|
|
|
|
type MsgPackMarshaler struct{}
|
|
|
|
func (MsgPackMarshaler) Marshal(topic string, msg *Event) ([]byte, error) {
|
|
// buffer holder
|
|
buf := bytes.Buffer{}
|
|
// create encoder
|
|
encoder := msgpack.NewEncoder(&buf)
|
|
// encode message
|
|
if err := encoder.Encode(msg); err != nil {
|
|
return nil, errors.Wrap(err, "cannot encode message")
|
|
}
|
|
// return encoded message
|
|
return buf.Bytes(), nil
|
|
}
|
|
|
|
func (MsgPackMarshaler) Unmarshal(data []byte) (*Event, error) {
|
|
// buffer holder
|
|
buf := new(bytes.Buffer)
|
|
// write message data to buffer
|
|
_, err := buf.Write(data)
|
|
if err != nil {
|
|
return nil, errors.Wrap(err, "cannot write data buffer")
|
|
}
|
|
// create decoder
|
|
decoder := msgpack.NewDecoder(buf)
|
|
// decode event from buffer message
|
|
var decodedEvent Event
|
|
// decode message
|
|
if err := decoder.Decode(&decodedEvent); err != nil {
|
|
return nil, errors.Wrap(err, "cannot decode message")
|
|
}
|
|
// return decoded message
|
|
return &decodedEvent, nil
|
|
}
|