commit 08ae1382ebea89632563238429e433b3e9958d00 Author: aespinro Date: Thu May 15 15:48:24 2025 +0200 first commit diff --git a/broker/server.go b/broker/server.go new file mode 100644 index 0000000..219a8cc --- /dev/null +++ b/broker/server.go @@ -0,0 +1,50 @@ +package broker + +import ( + "sync" + + "git.espin.casa/albert/whippet/proto" + "git.espin.casa/albert/whippet/store" +) + +type ConsumerFunc func(topic string, msg *proto.DataMessage) + +type Broker struct { + sync.RWMutex + subscribers map[string][]ConsumerFunc + storage *store.Storage +} + +func NewBroker(storage *store.Storage) *Broker { + return &Broker{ + subscribers: make(map[string][]ConsumerFunc), + storage: storage, + } +} + +func (b *Broker) Subscribe(topic string, fn ConsumerFunc, fetchPast bool) { + b.Lock() + b.subscribers[topic] = append(b.subscribers[topic], fn) + b.Unlock() + + if fetchPast { + msgs, err := b.storage.LoadAll(topic) + if err != nil { + b.Unlock() + return + } + for _, msg := range msgs { + fn(topic, msg) + } + } +} + +func (b *Broker) Publish(topic string, msg *proto.DataMessage) { + b.storage.Save(topic, msg) + + b.RLock() + defer b.RUnlock() + for _, fn := range b.subscribers[topic] { + go fn(topic, msg) + } +} diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..825c2f7 --- /dev/null +++ b/go.mod @@ -0,0 +1,9 @@ +module git.espin.casa/albert/whippet + +go 1.23.1 + +require ( + github.com/google/flatbuffers v25.2.10+incompatible + github.com/google/uuid v1.6.0 + github.com/mattn/go-sqlite3 v1.14.28 +) diff --git a/go.sum b/go.sum new file mode 100644 index 0000000..c4456d0 --- /dev/null +++ b/go.sum @@ -0,0 +1,6 @@ +github.com/google/flatbuffers v25.2.10+incompatible h1:F3vclr7C3HpB1k9mxCGRMXq6FdUalZ6H/pNX4FP1v0Q= +github.com/google/flatbuffers v25.2.10+incompatible/go.mod h1:1AeVuKshWv4vARoZatz6mlQ0JxURH0Kv5+zNeJKJCa8= +github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= +github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/mattn/go-sqlite3 v1.14.28 h1:ThEiQrnbtumT+QMknw63Befp/ce/nUPgBPMlRFEum7A= +github.com/mattn/go-sqlite3 v1.14.28/go.mod h1:Uh1q+B4BYcTPb+yiD3kU8Ct7aC0hY9fxUwlHK0RXw+Y= diff --git a/message.fbs b/message.fbs new file mode 100644 index 0000000..b4f1645 --- /dev/null +++ b/message.fbs @@ -0,0 +1,45 @@ +// file: message.fbs +namespace proto; + +enum MessageType : byte { + NONE = 0, + DATA = 1, + ACK = 2, + HEARTBEAT = 3 +} + +table DataMessage { + id: string; + topic: string; + timestamp: ulong; + payload: [ubyte]; + metadata: [KeyValue]; +} + +table AckMessage { + acked_id: string; + timestamp: ulong; +} + +table HeartBeatMessage { + client_id: string; + timestamp: ulong; +} + +table KeyValue { + key: string; + value: string; +} + +union Message { + DataMessage, + AckMessage, + HeartBeatMessage, +} + +table Envelope { + type: MessageType; + message: Message; +} + +root_type Envelope; \ No newline at end of file diff --git a/messaging/messaging.go b/messaging/messaging.go new file mode 100644 index 0000000..21cedf9 --- /dev/null +++ b/messaging/messaging.go @@ -0,0 +1,112 @@ +package messaging + +import ( + "time" + + "git.espin.casa/albert/whippet/proto" + flatbuffers "github.com/google/flatbuffers/go" + "github.com/google/uuid" +) + +type MetaData map[string]string + +var builder *flatbuffers.Builder + +func Init() { + builder = flatbuffers.NewBuilder(1024) +} + +func BuildDataMessage(topic string, payload []byte, metadata MetaData) []byte { + // + builder.Reset() + // auto generate message id + id := uuid.New().String() + + idOffset := builder.CreateString(id) + topicOffset := builder.CreateString(topic) + payloadOffset := builder.CreateByteVector(payload) + + var kvOffsets []flatbuffers.UOffsetT + + for k, v := range metadata { + key := builder.CreateString(k) + val := builder.CreateString(v) + + proto.KeyValueStart(builder) + proto.KeyValueAddKey(builder, key) + proto.KeyValueAddValue(builder, val) + + kvOffsets = append(kvOffsets, proto.KeyValueEnd(builder)) + } + + proto.DataMessageStartMetadataVector(builder, len(kvOffsets)) + + for i := len(kvOffsets) - 1; i >= 0; i-- { + builder.PrependUOffsetT(kvOffsets[i]) + } + + metadataVec := builder.EndVector(len(kvOffsets)) + + proto.DataMessageStart(builder) + proto.DataMessageAddId(builder, idOffset) + proto.DataMessageAddTopic(builder, topicOffset) + proto.DataMessageAddTimestamp(builder, uint64(time.Now().UnixNano())) + proto.DataMessageAddPayload(builder, payloadOffset) + proto.DataMessageAddMetadata(builder, metadataVec) + + dataMsg := proto.DataMessageEnd(builder) + + proto.EnvelopeStart(builder) + proto.EnvelopeAddType(builder, proto.MessageTypeDATA) + proto.EnvelopeAddMessage(builder, dataMsg) + envelope := proto.EnvelopeEnd(builder) + + builder.Finish(envelope) + + return builder.FinishedBytes() + +} + +func BuildAckMessage(ackedID string) []byte { + builder.Reset() + + ackedIDOffset := builder.CreateString(ackedID) + + proto.AckMessageStart(builder) + proto.AckMessageAddAckedId(builder, ackedIDOffset) + proto.AckMessageAddTimestamp(builder, uint64(time.Now().UnixNano())) + + ackMsg := proto.AckMessageEnd(builder) + + proto.EnvelopeStart(builder) + proto.EnvelopeAddType(builder, proto.MessageTypeACK) + proto.EnvelopeAddMessage(builder, ackMsg) + envelope := proto.EnvelopeEnd(builder) + + builder.Finish(envelope) + + return builder.FinishedBytes() + +} + +func BuildHeartBeatMessage(clientID string) []byte { + builder.Reset() + + clientIDOffset := builder.CreateString(clientID) + + proto.HeartBeatMessageStart(builder) + proto.HeartBeatMessageAddClientId(builder, clientIDOffset) + proto.HeartBeatMessageAddTimestamp(builder, uint64(time.Now().UnixNano())) + heartMsg := proto.HeartBeatMessageEnd(builder) + + proto.EnvelopeStart(builder) + proto.EnvelopeAddType(builder, proto.MessageTypeHEARTBEAT) + proto.EnvelopeAddMessage(builder, heartMsg) + + envelope := proto.EnvelopeEnd(builder) + + builder.Finish(envelope) + + return builder.FinishedBytes() + +} diff --git a/proto/AckMessage.go b/proto/AckMessage.go new file mode 100644 index 0000000..ad3a025 --- /dev/null +++ b/proto/AckMessage.go @@ -0,0 +1,75 @@ +// Code generated by the FlatBuffers compiler. DO NOT EDIT. + +package proto + +import ( + flatbuffers "github.com/google/flatbuffers/go" +) + +type AckMessage struct { + _tab flatbuffers.Table +} + +func GetRootAsAckMessage(buf []byte, offset flatbuffers.UOffsetT) *AckMessage { + n := flatbuffers.GetUOffsetT(buf[offset:]) + x := &AckMessage{} + x.Init(buf, n+offset) + return x +} + +func FinishAckMessageBuffer(builder *flatbuffers.Builder, offset flatbuffers.UOffsetT) { + builder.Finish(offset) +} + +func GetSizePrefixedRootAsAckMessage(buf []byte, offset flatbuffers.UOffsetT) *AckMessage { + n := flatbuffers.GetUOffsetT(buf[offset+flatbuffers.SizeUint32:]) + x := &AckMessage{} + x.Init(buf, n+offset+flatbuffers.SizeUint32) + return x +} + +func FinishSizePrefixedAckMessageBuffer(builder *flatbuffers.Builder, offset flatbuffers.UOffsetT) { + builder.FinishSizePrefixed(offset) +} + +func (rcv *AckMessage) Init(buf []byte, i flatbuffers.UOffsetT) { + rcv._tab.Bytes = buf + rcv._tab.Pos = i +} + +func (rcv *AckMessage) Table() flatbuffers.Table { + return rcv._tab +} + +func (rcv *AckMessage) AckedId() []byte { + o := flatbuffers.UOffsetT(rcv._tab.Offset(4)) + if o != 0 { + return rcv._tab.ByteVector(o + rcv._tab.Pos) + } + return nil +} + +func (rcv *AckMessage) Timestamp() uint64 { + o := flatbuffers.UOffsetT(rcv._tab.Offset(6)) + if o != 0 { + return rcv._tab.GetUint64(o + rcv._tab.Pos) + } + return 0 +} + +func (rcv *AckMessage) MutateTimestamp(n uint64) bool { + return rcv._tab.MutateUint64Slot(6, n) +} + +func AckMessageStart(builder *flatbuffers.Builder) { + builder.StartObject(2) +} +func AckMessageAddAckedId(builder *flatbuffers.Builder, ackedId flatbuffers.UOffsetT) { + builder.PrependUOffsetTSlot(0, flatbuffers.UOffsetT(ackedId), 0) +} +func AckMessageAddTimestamp(builder *flatbuffers.Builder, timestamp uint64) { + builder.PrependUint64Slot(1, timestamp, 0) +} +func AckMessageEnd(builder *flatbuffers.Builder) flatbuffers.UOffsetT { + return builder.EndObject() +} diff --git a/proto/DataMessage.go b/proto/DataMessage.go new file mode 100644 index 0000000..81053e5 --- /dev/null +++ b/proto/DataMessage.go @@ -0,0 +1,152 @@ +// Code generated by the FlatBuffers compiler. DO NOT EDIT. + +package proto + +import ( + flatbuffers "github.com/google/flatbuffers/go" +) + +type DataMessage struct { + _tab flatbuffers.Table +} + +func GetRootAsDataMessage(buf []byte, offset flatbuffers.UOffsetT) *DataMessage { + n := flatbuffers.GetUOffsetT(buf[offset:]) + x := &DataMessage{} + x.Init(buf, n+offset) + return x +} + +func FinishDataMessageBuffer(builder *flatbuffers.Builder, offset flatbuffers.UOffsetT) { + builder.Finish(offset) +} + +func GetSizePrefixedRootAsDataMessage(buf []byte, offset flatbuffers.UOffsetT) *DataMessage { + n := flatbuffers.GetUOffsetT(buf[offset+flatbuffers.SizeUint32:]) + x := &DataMessage{} + x.Init(buf, n+offset+flatbuffers.SizeUint32) + return x +} + +func FinishSizePrefixedDataMessageBuffer(builder *flatbuffers.Builder, offset flatbuffers.UOffsetT) { + builder.FinishSizePrefixed(offset) +} + +func (rcv *DataMessage) Init(buf []byte, i flatbuffers.UOffsetT) { + rcv._tab.Bytes = buf + rcv._tab.Pos = i +} + +func (rcv *DataMessage) Table() flatbuffers.Table { + return rcv._tab +} + +func (rcv *DataMessage) Id() []byte { + o := flatbuffers.UOffsetT(rcv._tab.Offset(4)) + if o != 0 { + return rcv._tab.ByteVector(o + rcv._tab.Pos) + } + return nil +} + +func (rcv *DataMessage) Topic() []byte { + o := flatbuffers.UOffsetT(rcv._tab.Offset(6)) + if o != 0 { + return rcv._tab.ByteVector(o + rcv._tab.Pos) + } + return nil +} + +func (rcv *DataMessage) Timestamp() uint64 { + o := flatbuffers.UOffsetT(rcv._tab.Offset(8)) + if o != 0 { + return rcv._tab.GetUint64(o + rcv._tab.Pos) + } + return 0 +} + +func (rcv *DataMessage) MutateTimestamp(n uint64) bool { + return rcv._tab.MutateUint64Slot(8, n) +} + +func (rcv *DataMessage) Payload(j int) byte { + o := flatbuffers.UOffsetT(rcv._tab.Offset(10)) + if o != 0 { + a := rcv._tab.Vector(o) + return rcv._tab.GetByte(a + flatbuffers.UOffsetT(j*1)) + } + return 0 +} + +func (rcv *DataMessage) PayloadLength() int { + o := flatbuffers.UOffsetT(rcv._tab.Offset(10)) + if o != 0 { + return rcv._tab.VectorLen(o) + } + return 0 +} + +func (rcv *DataMessage) PayloadBytes() []byte { + o := flatbuffers.UOffsetT(rcv._tab.Offset(10)) + if o != 0 { + return rcv._tab.ByteVector(o + rcv._tab.Pos) + } + return nil +} + +func (rcv *DataMessage) MutatePayload(j int, n byte) bool { + o := flatbuffers.UOffsetT(rcv._tab.Offset(10)) + if o != 0 { + a := rcv._tab.Vector(o) + return rcv._tab.MutateByte(a+flatbuffers.UOffsetT(j*1), n) + } + return false +} + +func (rcv *DataMessage) Metadata(obj *KeyValue, j int) bool { + o := flatbuffers.UOffsetT(rcv._tab.Offset(12)) + if o != 0 { + x := rcv._tab.Vector(o) + x += flatbuffers.UOffsetT(j) * 4 + x = rcv._tab.Indirect(x) + obj.Init(rcv._tab.Bytes, x) + return true + } + return false +} + +func (rcv *DataMessage) MetadataLength() int { + o := flatbuffers.UOffsetT(rcv._tab.Offset(12)) + if o != 0 { + return rcv._tab.VectorLen(o) + } + return 0 +} + +func DataMessageStart(builder *flatbuffers.Builder) { + builder.StartObject(5) +} +func DataMessageAddId(builder *flatbuffers.Builder, id flatbuffers.UOffsetT) { + builder.PrependUOffsetTSlot(0, flatbuffers.UOffsetT(id), 0) +} +func DataMessageAddTopic(builder *flatbuffers.Builder, topic flatbuffers.UOffsetT) { + builder.PrependUOffsetTSlot(1, flatbuffers.UOffsetT(topic), 0) +} +func DataMessageAddTimestamp(builder *flatbuffers.Builder, timestamp uint64) { + builder.PrependUint64Slot(2, timestamp, 0) +} +func DataMessageAddPayload(builder *flatbuffers.Builder, payload flatbuffers.UOffsetT) { + builder.PrependUOffsetTSlot(3, flatbuffers.UOffsetT(payload), 0) +} +func DataMessageStartPayloadVector(builder *flatbuffers.Builder, numElems int) flatbuffers.UOffsetT { + return builder.StartVector(1, numElems, 1) +} +func DataMessageAddMetadata(builder *flatbuffers.Builder, metadata flatbuffers.UOffsetT) { + builder.PrependUOffsetTSlot(4, flatbuffers.UOffsetT(metadata), 0) +} +func DataMessageStartMetadataVector(builder *flatbuffers.Builder, numElems int) flatbuffers.UOffsetT { + return builder.StartVector(4, numElems, 4) +} +func DataMessageEnd(builder *flatbuffers.Builder) flatbuffers.UOffsetT { + return builder.EndObject() +} diff --git a/proto/Envelope.go b/proto/Envelope.go new file mode 100644 index 0000000..6fc3dfe --- /dev/null +++ b/proto/Envelope.go @@ -0,0 +1,91 @@ +// Code generated by the FlatBuffers compiler. DO NOT EDIT. + +package proto + +import ( + flatbuffers "github.com/google/flatbuffers/go" +) + +type Envelope struct { + _tab flatbuffers.Table +} + +func GetRootAsEnvelope(buf []byte, offset flatbuffers.UOffsetT) *Envelope { + n := flatbuffers.GetUOffsetT(buf[offset:]) + x := &Envelope{} + x.Init(buf, n+offset) + return x +} + +func FinishEnvelopeBuffer(builder *flatbuffers.Builder, offset flatbuffers.UOffsetT) { + builder.Finish(offset) +} + +func GetSizePrefixedRootAsEnvelope(buf []byte, offset flatbuffers.UOffsetT) *Envelope { + n := flatbuffers.GetUOffsetT(buf[offset+flatbuffers.SizeUint32:]) + x := &Envelope{} + x.Init(buf, n+offset+flatbuffers.SizeUint32) + return x +} + +func FinishSizePrefixedEnvelopeBuffer(builder *flatbuffers.Builder, offset flatbuffers.UOffsetT) { + builder.FinishSizePrefixed(offset) +} + +func (rcv *Envelope) Init(buf []byte, i flatbuffers.UOffsetT) { + rcv._tab.Bytes = buf + rcv._tab.Pos = i +} + +func (rcv *Envelope) Table() flatbuffers.Table { + return rcv._tab +} + +func (rcv *Envelope) Type() MessageType { + o := flatbuffers.UOffsetT(rcv._tab.Offset(4)) + if o != 0 { + return MessageType(rcv._tab.GetInt8(o + rcv._tab.Pos)) + } + return 0 +} + +func (rcv *Envelope) MutateType(n MessageType) bool { + return rcv._tab.MutateInt8Slot(4, int8(n)) +} + +func (rcv *Envelope) MessageType() Message { + o := flatbuffers.UOffsetT(rcv._tab.Offset(6)) + if o != 0 { + return Message(rcv._tab.GetByte(o + rcv._tab.Pos)) + } + return 0 +} + +func (rcv *Envelope) MutateMessageType(n Message) bool { + return rcv._tab.MutateByteSlot(6, byte(n)) +} + +func (rcv *Envelope) Message(obj *flatbuffers.Table) bool { + o := flatbuffers.UOffsetT(rcv._tab.Offset(8)) + if o != 0 { + rcv._tab.Union(obj, o) + return true + } + return false +} + +func EnvelopeStart(builder *flatbuffers.Builder) { + builder.StartObject(3) +} +func EnvelopeAddType(builder *flatbuffers.Builder, type_ MessageType) { + builder.PrependInt8Slot(0, int8(type_), 0) +} +func EnvelopeAddMessageType(builder *flatbuffers.Builder, messageType Message) { + builder.PrependByteSlot(1, byte(messageType), 0) +} +func EnvelopeAddMessage(builder *flatbuffers.Builder, message flatbuffers.UOffsetT) { + builder.PrependUOffsetTSlot(2, flatbuffers.UOffsetT(message), 0) +} +func EnvelopeEnd(builder *flatbuffers.Builder) flatbuffers.UOffsetT { + return builder.EndObject() +} diff --git a/proto/HeartBeatMessage.go b/proto/HeartBeatMessage.go new file mode 100644 index 0000000..c18da5c --- /dev/null +++ b/proto/HeartBeatMessage.go @@ -0,0 +1,75 @@ +// Code generated by the FlatBuffers compiler. DO NOT EDIT. + +package proto + +import ( + flatbuffers "github.com/google/flatbuffers/go" +) + +type HeartBeatMessage struct { + _tab flatbuffers.Table +} + +func GetRootAsHeartBeatMessage(buf []byte, offset flatbuffers.UOffsetT) *HeartBeatMessage { + n := flatbuffers.GetUOffsetT(buf[offset:]) + x := &HeartBeatMessage{} + x.Init(buf, n+offset) + return x +} + +func FinishHeartBeatMessageBuffer(builder *flatbuffers.Builder, offset flatbuffers.UOffsetT) { + builder.Finish(offset) +} + +func GetSizePrefixedRootAsHeartBeatMessage(buf []byte, offset flatbuffers.UOffsetT) *HeartBeatMessage { + n := flatbuffers.GetUOffsetT(buf[offset+flatbuffers.SizeUint32:]) + x := &HeartBeatMessage{} + x.Init(buf, n+offset+flatbuffers.SizeUint32) + return x +} + +func FinishSizePrefixedHeartBeatMessageBuffer(builder *flatbuffers.Builder, offset flatbuffers.UOffsetT) { + builder.FinishSizePrefixed(offset) +} + +func (rcv *HeartBeatMessage) Init(buf []byte, i flatbuffers.UOffsetT) { + rcv._tab.Bytes = buf + rcv._tab.Pos = i +} + +func (rcv *HeartBeatMessage) Table() flatbuffers.Table { + return rcv._tab +} + +func (rcv *HeartBeatMessage) ClientId() []byte { + o := flatbuffers.UOffsetT(rcv._tab.Offset(4)) + if o != 0 { + return rcv._tab.ByteVector(o + rcv._tab.Pos) + } + return nil +} + +func (rcv *HeartBeatMessage) Timestamp() uint64 { + o := flatbuffers.UOffsetT(rcv._tab.Offset(6)) + if o != 0 { + return rcv._tab.GetUint64(o + rcv._tab.Pos) + } + return 0 +} + +func (rcv *HeartBeatMessage) MutateTimestamp(n uint64) bool { + return rcv._tab.MutateUint64Slot(6, n) +} + +func HeartBeatMessageStart(builder *flatbuffers.Builder) { + builder.StartObject(2) +} +func HeartBeatMessageAddClientId(builder *flatbuffers.Builder, clientId flatbuffers.UOffsetT) { + builder.PrependUOffsetTSlot(0, flatbuffers.UOffsetT(clientId), 0) +} +func HeartBeatMessageAddTimestamp(builder *flatbuffers.Builder, timestamp uint64) { + builder.PrependUint64Slot(1, timestamp, 0) +} +func HeartBeatMessageEnd(builder *flatbuffers.Builder) flatbuffers.UOffsetT { + return builder.EndObject() +} diff --git a/proto/KeyValue.go b/proto/KeyValue.go new file mode 100644 index 0000000..940cf6d --- /dev/null +++ b/proto/KeyValue.go @@ -0,0 +1,71 @@ +// Code generated by the FlatBuffers compiler. DO NOT EDIT. + +package proto + +import ( + flatbuffers "github.com/google/flatbuffers/go" +) + +type KeyValue struct { + _tab flatbuffers.Table +} + +func GetRootAsKeyValue(buf []byte, offset flatbuffers.UOffsetT) *KeyValue { + n := flatbuffers.GetUOffsetT(buf[offset:]) + x := &KeyValue{} + x.Init(buf, n+offset) + return x +} + +func FinishKeyValueBuffer(builder *flatbuffers.Builder, offset flatbuffers.UOffsetT) { + builder.Finish(offset) +} + +func GetSizePrefixedRootAsKeyValue(buf []byte, offset flatbuffers.UOffsetT) *KeyValue { + n := flatbuffers.GetUOffsetT(buf[offset+flatbuffers.SizeUint32:]) + x := &KeyValue{} + x.Init(buf, n+offset+flatbuffers.SizeUint32) + return x +} + +func FinishSizePrefixedKeyValueBuffer(builder *flatbuffers.Builder, offset flatbuffers.UOffsetT) { + builder.FinishSizePrefixed(offset) +} + +func (rcv *KeyValue) Init(buf []byte, i flatbuffers.UOffsetT) { + rcv._tab.Bytes = buf + rcv._tab.Pos = i +} + +func (rcv *KeyValue) Table() flatbuffers.Table { + return rcv._tab +} + +func (rcv *KeyValue) Key() []byte { + o := flatbuffers.UOffsetT(rcv._tab.Offset(4)) + if o != 0 { + return rcv._tab.ByteVector(o + rcv._tab.Pos) + } + return nil +} + +func (rcv *KeyValue) Value() []byte { + o := flatbuffers.UOffsetT(rcv._tab.Offset(6)) + if o != 0 { + return rcv._tab.ByteVector(o + rcv._tab.Pos) + } + return nil +} + +func KeyValueStart(builder *flatbuffers.Builder) { + builder.StartObject(2) +} +func KeyValueAddKey(builder *flatbuffers.Builder, key flatbuffers.UOffsetT) { + builder.PrependUOffsetTSlot(0, flatbuffers.UOffsetT(key), 0) +} +func KeyValueAddValue(builder *flatbuffers.Builder, value flatbuffers.UOffsetT) { + builder.PrependUOffsetTSlot(1, flatbuffers.UOffsetT(value), 0) +} +func KeyValueEnd(builder *flatbuffers.Builder) flatbuffers.UOffsetT { + return builder.EndObject() +} diff --git a/proto/Message.go b/proto/Message.go new file mode 100644 index 0000000..26e5b65 --- /dev/null +++ b/proto/Message.go @@ -0,0 +1,35 @@ +// Code generated by the FlatBuffers compiler. DO NOT EDIT. + +package proto + +import "strconv" + +type Message byte + +const ( + MessageNONE Message = 0 + MessageDataMessage Message = 1 + MessageAckMessage Message = 2 + MessageHeartBeatMessage Message = 3 +) + +var EnumNamesMessage = map[Message]string{ + MessageNONE: "NONE", + MessageDataMessage: "DataMessage", + MessageAckMessage: "AckMessage", + MessageHeartBeatMessage: "HeartBeatMessage", +} + +var EnumValuesMessage = map[string]Message{ + "NONE": MessageNONE, + "DataMessage": MessageDataMessage, + "AckMessage": MessageAckMessage, + "HeartBeatMessage": MessageHeartBeatMessage, +} + +func (v Message) String() string { + if s, ok := EnumNamesMessage[v]; ok { + return s + } + return "Message(" + strconv.FormatInt(int64(v), 10) + ")" +} diff --git a/proto/MessageType.go b/proto/MessageType.go new file mode 100644 index 0000000..aa72824 --- /dev/null +++ b/proto/MessageType.go @@ -0,0 +1,35 @@ +// Code generated by the FlatBuffers compiler. DO NOT EDIT. + +package proto + +import "strconv" + +type MessageType int8 + +const ( + MessageTypeNONE MessageType = 0 + MessageTypeDATA MessageType = 1 + MessageTypeACK MessageType = 2 + MessageTypeHEARTBEAT MessageType = 3 +) + +var EnumNamesMessageType = map[MessageType]string{ + MessageTypeNONE: "NONE", + MessageTypeDATA: "DATA", + MessageTypeACK: "ACK", + MessageTypeHEARTBEAT: "HEARTBEAT", +} + +var EnumValuesMessageType = map[string]MessageType{ + "NONE": MessageTypeNONE, + "DATA": MessageTypeDATA, + "ACK": MessageTypeACK, + "HEARTBEAT": MessageTypeHEARTBEAT, +} + +func (v MessageType) String() string { + if s, ok := EnumNamesMessageType[v]; ok { + return s + } + return "MessageType(" + strconv.FormatInt(int64(v), 10) + ")" +} diff --git a/store/storage.go b/store/storage.go new file mode 100644 index 0000000..66cd95d --- /dev/null +++ b/store/storage.go @@ -0,0 +1,120 @@ +package store + +import ( + "database/sql" + + "git.espin.casa/albert/whippet/proto" + _ "github.com/mattn/go-sqlite3" +) + +type Storage struct { + db *sql.DB +} + +func NewStorage(path string) *Storage { + db, err := sql.Open("sqlite3", path) + if err != nil { + return nil + } + s := &Storage{db: db} + if err := s.initSchema(); err != nil { + return nil + } + return s +} + +func (s *Storage) initSchema() error { + schema := ` + CREATE TABLE IF NOT EXISTS messages ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + topic TEXT, + msg_id TEXT, + payload BLOB, + timestamp INTEGER + );` + _, err := s.db.Exec(schema) + return err +} + +// Save guarda un mensaje en la base de datos +func (s *Storage) Save(topic string, msg *proto.DataMessage) error { + _, err := s.db.Exec(` + INSERT INTO messages (topic, msg_id, payload, timestamp) + VALUES (?, ?, ?, ?)`, + topic, string(msg.Id()), msg.PayloadBytes(), msg.Timestamp(), + ) + return err +} + +// LoadSince carga mensajes desde un timestamp específico +func (s *Storage) LoadSince(topic string, sinceTimestamp uint64, max int) ([]*proto.DataMessage, error) { + rows, err := s.db.Query(` + SELECT msg_id, payload, timestamp + FROM messages + WHERE topic = ? AND timestamp > ? + ORDER BY timestamp ASC + LIMIT ?`, + topic, sinceTimestamp, max, + ) + if err != nil { + return nil, err + } + defer rows.Close() + + var result []*proto.DataMessage + for rows.Next() { + var id string + var payload []byte + var timestamp uint64 + + err := rows.Scan(&id, &payload, ×tamp) + if err != nil { + return nil, err + } + + // Reensamblar el FlatBuffer + msg := proto.GetRootAsDataMessage(payload, 0) + // Cuidado: esto apunta a datos en `payload` directamente + // Alternativa: clonar si necesitas preservar después + result = append(result, msg) + } + return result, nil +} + +// LoadAll carga todos los mensajes de un tema específico +func (s *Storage) LoadAll(topic string) ([]*proto.DataMessage, error) { + rows, err := s.db.Query(` + SELECT msg_id, payload, timestamp + FROM messages + WHERE topic = ? + ORDER BY timestamp ASC + `, topic) + if err != nil { + return nil, err + } + defer rows.Close() + + var result []*proto.DataMessage + for rows.Next() { + var id string + var payload []byte + var timestamp uint64 + + err := rows.Scan(&id, &payload, ×tamp) + if err != nil { + return nil, err + } + + // Reensamblar el FlatBuffer + msg := proto.GetRootAsDataMessage(payload, 0) + // Cuidado: esto apunta a datos en `payload` directamente + // Alternativa: clonar si necesitas preservar después + result = append(result, msg) + } + return result, nil +} + +// LoadAllTopics carga todos los temas disponibles +func (s *Storage) Close() error { + return s.db.Close() +}