rasrush/marshal.go

53 lines
1.2 KiB
Go
Raw Permalink Normal View History

2024-10-20 11:59:18 +02:00
package rasrush
import (
"bytes"
"github.com/nsqio/go-nsq"
"github.com/pkg/errors"
"github.com/vmihailenco/msgpack/v5"
)
type Marshaler interface {
Marshal(topic string, msg *Event) ([]byte, error)
}
type Unmarshaler interface {
Unmarshal(*nsq.Message) (*Event, error)
}
type MsgPackMarshaler struct{}
func (MsgPackMarshaler) Marshal(topic string, msg *Event) ([]byte, error) {
// buffer holder
buf := bytes.Buffer{}
// create encoder
encoder := msgpack.NewEncoder(&buf)
// encode message
if err := encoder.Encode(msg); err != nil {
return nil, errors.Wrap(err, "cannot encode message")
}
// return encoded message
return buf.Bytes(), nil
}
func (MsgPackMarshaler) Unmarshal(data []byte) (*Event, error) {
// buffer holder
buf := new(bytes.Buffer)
// write message data to buffer
_, err := buf.Write(data)
if err != nil {
return nil, errors.Wrap(err, "cannot write data buffer")
}
// create decoder
decoder := msgpack.NewDecoder(buf)
// decode event from buffer message
var decodedEvent Event
// decode message
if err := decoder.Decode(&decodedEvent); err != nil {
return nil, errors.Wrap(err, "cannot decode message")
}
// return decoded message
return &decodedEvent, nil
}