package messaging import ( "time" "git.espin.casa/albert/whippet/proto" flatbuffers "github.com/google/flatbuffers/go" "github.com/google/uuid" ) type MetaData map[string]string var builder *flatbuffers.Builder func Init() { builder = flatbuffers.NewBuilder(1024) } func BuildDataMessage(topic string, payload []byte, metadata MetaData) []byte { // builder.Reset() // auto generate message id id := uuid.New().String() idOffset := builder.CreateString(id) topicOffset := builder.CreateString(topic) payloadOffset := builder.CreateByteVector(payload) var kvOffsets []flatbuffers.UOffsetT for k, v := range metadata { key := builder.CreateString(k) val := builder.CreateString(v) proto.KeyValueStart(builder) proto.KeyValueAddKey(builder, key) proto.KeyValueAddValue(builder, val) kvOffsets = append(kvOffsets, proto.KeyValueEnd(builder)) } proto.DataMessageStartMetadataVector(builder, len(kvOffsets)) for i := len(kvOffsets) - 1; i >= 0; i-- { builder.PrependUOffsetT(kvOffsets[i]) } metadataVec := builder.EndVector(len(kvOffsets)) proto.DataMessageStart(builder) proto.DataMessageAddId(builder, idOffset) proto.DataMessageAddTopic(builder, topicOffset) proto.DataMessageAddTimestamp(builder, uint64(time.Now().UnixNano())) proto.DataMessageAddPayload(builder, payloadOffset) proto.DataMessageAddMetadata(builder, metadataVec) dataMsg := proto.DataMessageEnd(builder) proto.EnvelopeStart(builder) proto.EnvelopeAddType(builder, proto.MessageTypeDATA) proto.EnvelopeAddMessage(builder, dataMsg) envelope := proto.EnvelopeEnd(builder) builder.Finish(envelope) return builder.FinishedBytes() } func BuildAckMessage(ackedID string) []byte { builder.Reset() ackedIDOffset := builder.CreateString(ackedID) proto.AckMessageStart(builder) proto.AckMessageAddAckedId(builder, ackedIDOffset) proto.AckMessageAddTimestamp(builder, uint64(time.Now().UnixNano())) ackMsg := proto.AckMessageEnd(builder) proto.EnvelopeStart(builder) proto.EnvelopeAddType(builder, proto.MessageTypeACK) proto.EnvelopeAddMessage(builder, ackMsg) envelope := proto.EnvelopeEnd(builder) builder.Finish(envelope) return builder.FinishedBytes() } func BuildHeartBeatMessage(clientID string) []byte { builder.Reset() clientIDOffset := builder.CreateString(clientID) proto.HeartBeatMessageStart(builder) proto.HeartBeatMessageAddClientId(builder, clientIDOffset) proto.HeartBeatMessageAddTimestamp(builder, uint64(time.Now().UnixNano())) heartMsg := proto.HeartBeatMessageEnd(builder) proto.EnvelopeStart(builder) proto.EnvelopeAddType(builder, proto.MessageTypeHEARTBEAT) proto.EnvelopeAddMessage(builder, heartMsg) envelope := proto.EnvelopeEnd(builder) builder.Finish(envelope) return builder.FinishedBytes() }