first commit

This commit is contained in:
aespinro 2025-05-15 15:48:24 +02:00
commit 08ae1382eb
13 changed files with 876 additions and 0 deletions

50
broker/server.go Normal file
View File

@ -0,0 +1,50 @@
package broker
import (
"sync"
"git.espin.casa/albert/whippet/proto"
"git.espin.casa/albert/whippet/store"
)
type ConsumerFunc func(topic string, msg *proto.DataMessage)
type Broker struct {
sync.RWMutex
subscribers map[string][]ConsumerFunc
storage *store.Storage
}
func NewBroker(storage *store.Storage) *Broker {
return &Broker{
subscribers: make(map[string][]ConsumerFunc),
storage: storage,
}
}
func (b *Broker) Subscribe(topic string, fn ConsumerFunc, fetchPast bool) {
b.Lock()
b.subscribers[topic] = append(b.subscribers[topic], fn)
b.Unlock()
if fetchPast {
msgs, err := b.storage.LoadAll(topic)
if err != nil {
b.Unlock()
return
}
for _, msg := range msgs {
fn(topic, msg)
}
}
}
func (b *Broker) Publish(topic string, msg *proto.DataMessage) {
b.storage.Save(topic, msg)
b.RLock()
defer b.RUnlock()
for _, fn := range b.subscribers[topic] {
go fn(topic, msg)
}
}

9
go.mod Normal file
View File

@ -0,0 +1,9 @@
module git.espin.casa/albert/whippet
go 1.23.1
require (
github.com/google/flatbuffers v25.2.10+incompatible
github.com/google/uuid v1.6.0
github.com/mattn/go-sqlite3 v1.14.28
)

6
go.sum Normal file
View File

@ -0,0 +1,6 @@
github.com/google/flatbuffers v25.2.10+incompatible h1:F3vclr7C3HpB1k9mxCGRMXq6FdUalZ6H/pNX4FP1v0Q=
github.com/google/flatbuffers v25.2.10+incompatible/go.mod h1:1AeVuKshWv4vARoZatz6mlQ0JxURH0Kv5+zNeJKJCa8=
github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/mattn/go-sqlite3 v1.14.28 h1:ThEiQrnbtumT+QMknw63Befp/ce/nUPgBPMlRFEum7A=
github.com/mattn/go-sqlite3 v1.14.28/go.mod h1:Uh1q+B4BYcTPb+yiD3kU8Ct7aC0hY9fxUwlHK0RXw+Y=

45
message.fbs Normal file
View File

@ -0,0 +1,45 @@
// file: message.fbs
namespace proto;
enum MessageType : byte {
NONE = 0,
DATA = 1,
ACK = 2,
HEARTBEAT = 3
}
table DataMessage {
id: string;
topic: string;
timestamp: ulong;
payload: [ubyte];
metadata: [KeyValue];
}
table AckMessage {
acked_id: string;
timestamp: ulong;
}
table HeartBeatMessage {
client_id: string;
timestamp: ulong;
}
table KeyValue {
key: string;
value: string;
}
union Message {
DataMessage,
AckMessage,
HeartBeatMessage,
}
table Envelope {
type: MessageType;
message: Message;
}
root_type Envelope;

112
messaging/messaging.go Normal file
View File

@ -0,0 +1,112 @@
package messaging
import (
"time"
"git.espin.casa/albert/whippet/proto"
flatbuffers "github.com/google/flatbuffers/go"
"github.com/google/uuid"
)
type MetaData map[string]string
var builder *flatbuffers.Builder
func Init() {
builder = flatbuffers.NewBuilder(1024)
}
func BuildDataMessage(topic string, payload []byte, metadata MetaData) []byte {
//
builder.Reset()
// auto generate message id
id := uuid.New().String()
idOffset := builder.CreateString(id)
topicOffset := builder.CreateString(topic)
payloadOffset := builder.CreateByteVector(payload)
var kvOffsets []flatbuffers.UOffsetT
for k, v := range metadata {
key := builder.CreateString(k)
val := builder.CreateString(v)
proto.KeyValueStart(builder)
proto.KeyValueAddKey(builder, key)
proto.KeyValueAddValue(builder, val)
kvOffsets = append(kvOffsets, proto.KeyValueEnd(builder))
}
proto.DataMessageStartMetadataVector(builder, len(kvOffsets))
for i := len(kvOffsets) - 1; i >= 0; i-- {
builder.PrependUOffsetT(kvOffsets[i])
}
metadataVec := builder.EndVector(len(kvOffsets))
proto.DataMessageStart(builder)
proto.DataMessageAddId(builder, idOffset)
proto.DataMessageAddTopic(builder, topicOffset)
proto.DataMessageAddTimestamp(builder, uint64(time.Now().UnixNano()))
proto.DataMessageAddPayload(builder, payloadOffset)
proto.DataMessageAddMetadata(builder, metadataVec)
dataMsg := proto.DataMessageEnd(builder)
proto.EnvelopeStart(builder)
proto.EnvelopeAddType(builder, proto.MessageTypeDATA)
proto.EnvelopeAddMessage(builder, dataMsg)
envelope := proto.EnvelopeEnd(builder)
builder.Finish(envelope)
return builder.FinishedBytes()
}
func BuildAckMessage(ackedID string) []byte {
builder.Reset()
ackedIDOffset := builder.CreateString(ackedID)
proto.AckMessageStart(builder)
proto.AckMessageAddAckedId(builder, ackedIDOffset)
proto.AckMessageAddTimestamp(builder, uint64(time.Now().UnixNano()))
ackMsg := proto.AckMessageEnd(builder)
proto.EnvelopeStart(builder)
proto.EnvelopeAddType(builder, proto.MessageTypeACK)
proto.EnvelopeAddMessage(builder, ackMsg)
envelope := proto.EnvelopeEnd(builder)
builder.Finish(envelope)
return builder.FinishedBytes()
}
func BuildHeartBeatMessage(clientID string) []byte {
builder.Reset()
clientIDOffset := builder.CreateString(clientID)
proto.HeartBeatMessageStart(builder)
proto.HeartBeatMessageAddClientId(builder, clientIDOffset)
proto.HeartBeatMessageAddTimestamp(builder, uint64(time.Now().UnixNano()))
heartMsg := proto.HeartBeatMessageEnd(builder)
proto.EnvelopeStart(builder)
proto.EnvelopeAddType(builder, proto.MessageTypeHEARTBEAT)
proto.EnvelopeAddMessage(builder, heartMsg)
envelope := proto.EnvelopeEnd(builder)
builder.Finish(envelope)
return builder.FinishedBytes()
}

75
proto/AckMessage.go Normal file
View File

@ -0,0 +1,75 @@
// Code generated by the FlatBuffers compiler. DO NOT EDIT.
package proto
import (
flatbuffers "github.com/google/flatbuffers/go"
)
type AckMessage struct {
_tab flatbuffers.Table
}
func GetRootAsAckMessage(buf []byte, offset flatbuffers.UOffsetT) *AckMessage {
n := flatbuffers.GetUOffsetT(buf[offset:])
x := &AckMessage{}
x.Init(buf, n+offset)
return x
}
func FinishAckMessageBuffer(builder *flatbuffers.Builder, offset flatbuffers.UOffsetT) {
builder.Finish(offset)
}
func GetSizePrefixedRootAsAckMessage(buf []byte, offset flatbuffers.UOffsetT) *AckMessage {
n := flatbuffers.GetUOffsetT(buf[offset+flatbuffers.SizeUint32:])
x := &AckMessage{}
x.Init(buf, n+offset+flatbuffers.SizeUint32)
return x
}
func FinishSizePrefixedAckMessageBuffer(builder *flatbuffers.Builder, offset flatbuffers.UOffsetT) {
builder.FinishSizePrefixed(offset)
}
func (rcv *AckMessage) Init(buf []byte, i flatbuffers.UOffsetT) {
rcv._tab.Bytes = buf
rcv._tab.Pos = i
}
func (rcv *AckMessage) Table() flatbuffers.Table {
return rcv._tab
}
func (rcv *AckMessage) AckedId() []byte {
o := flatbuffers.UOffsetT(rcv._tab.Offset(4))
if o != 0 {
return rcv._tab.ByteVector(o + rcv._tab.Pos)
}
return nil
}
func (rcv *AckMessage) Timestamp() uint64 {
o := flatbuffers.UOffsetT(rcv._tab.Offset(6))
if o != 0 {
return rcv._tab.GetUint64(o + rcv._tab.Pos)
}
return 0
}
func (rcv *AckMessage) MutateTimestamp(n uint64) bool {
return rcv._tab.MutateUint64Slot(6, n)
}
func AckMessageStart(builder *flatbuffers.Builder) {
builder.StartObject(2)
}
func AckMessageAddAckedId(builder *flatbuffers.Builder, ackedId flatbuffers.UOffsetT) {
builder.PrependUOffsetTSlot(0, flatbuffers.UOffsetT(ackedId), 0)
}
func AckMessageAddTimestamp(builder *flatbuffers.Builder, timestamp uint64) {
builder.PrependUint64Slot(1, timestamp, 0)
}
func AckMessageEnd(builder *flatbuffers.Builder) flatbuffers.UOffsetT {
return builder.EndObject()
}

152
proto/DataMessage.go Normal file
View File

@ -0,0 +1,152 @@
// Code generated by the FlatBuffers compiler. DO NOT EDIT.
package proto
import (
flatbuffers "github.com/google/flatbuffers/go"
)
type DataMessage struct {
_tab flatbuffers.Table
}
func GetRootAsDataMessage(buf []byte, offset flatbuffers.UOffsetT) *DataMessage {
n := flatbuffers.GetUOffsetT(buf[offset:])
x := &DataMessage{}
x.Init(buf, n+offset)
return x
}
func FinishDataMessageBuffer(builder *flatbuffers.Builder, offset flatbuffers.UOffsetT) {
builder.Finish(offset)
}
func GetSizePrefixedRootAsDataMessage(buf []byte, offset flatbuffers.UOffsetT) *DataMessage {
n := flatbuffers.GetUOffsetT(buf[offset+flatbuffers.SizeUint32:])
x := &DataMessage{}
x.Init(buf, n+offset+flatbuffers.SizeUint32)
return x
}
func FinishSizePrefixedDataMessageBuffer(builder *flatbuffers.Builder, offset flatbuffers.UOffsetT) {
builder.FinishSizePrefixed(offset)
}
func (rcv *DataMessage) Init(buf []byte, i flatbuffers.UOffsetT) {
rcv._tab.Bytes = buf
rcv._tab.Pos = i
}
func (rcv *DataMessage) Table() flatbuffers.Table {
return rcv._tab
}
func (rcv *DataMessage) Id() []byte {
o := flatbuffers.UOffsetT(rcv._tab.Offset(4))
if o != 0 {
return rcv._tab.ByteVector(o + rcv._tab.Pos)
}
return nil
}
func (rcv *DataMessage) Topic() []byte {
o := flatbuffers.UOffsetT(rcv._tab.Offset(6))
if o != 0 {
return rcv._tab.ByteVector(o + rcv._tab.Pos)
}
return nil
}
func (rcv *DataMessage) Timestamp() uint64 {
o := flatbuffers.UOffsetT(rcv._tab.Offset(8))
if o != 0 {
return rcv._tab.GetUint64(o + rcv._tab.Pos)
}
return 0
}
func (rcv *DataMessage) MutateTimestamp(n uint64) bool {
return rcv._tab.MutateUint64Slot(8, n)
}
func (rcv *DataMessage) Payload(j int) byte {
o := flatbuffers.UOffsetT(rcv._tab.Offset(10))
if o != 0 {
a := rcv._tab.Vector(o)
return rcv._tab.GetByte(a + flatbuffers.UOffsetT(j*1))
}
return 0
}
func (rcv *DataMessage) PayloadLength() int {
o := flatbuffers.UOffsetT(rcv._tab.Offset(10))
if o != 0 {
return rcv._tab.VectorLen(o)
}
return 0
}
func (rcv *DataMessage) PayloadBytes() []byte {
o := flatbuffers.UOffsetT(rcv._tab.Offset(10))
if o != 0 {
return rcv._tab.ByteVector(o + rcv._tab.Pos)
}
return nil
}
func (rcv *DataMessage) MutatePayload(j int, n byte) bool {
o := flatbuffers.UOffsetT(rcv._tab.Offset(10))
if o != 0 {
a := rcv._tab.Vector(o)
return rcv._tab.MutateByte(a+flatbuffers.UOffsetT(j*1), n)
}
return false
}
func (rcv *DataMessage) Metadata(obj *KeyValue, j int) bool {
o := flatbuffers.UOffsetT(rcv._tab.Offset(12))
if o != 0 {
x := rcv._tab.Vector(o)
x += flatbuffers.UOffsetT(j) * 4
x = rcv._tab.Indirect(x)
obj.Init(rcv._tab.Bytes, x)
return true
}
return false
}
func (rcv *DataMessage) MetadataLength() int {
o := flatbuffers.UOffsetT(rcv._tab.Offset(12))
if o != 0 {
return rcv._tab.VectorLen(o)
}
return 0
}
func DataMessageStart(builder *flatbuffers.Builder) {
builder.StartObject(5)
}
func DataMessageAddId(builder *flatbuffers.Builder, id flatbuffers.UOffsetT) {
builder.PrependUOffsetTSlot(0, flatbuffers.UOffsetT(id), 0)
}
func DataMessageAddTopic(builder *flatbuffers.Builder, topic flatbuffers.UOffsetT) {
builder.PrependUOffsetTSlot(1, flatbuffers.UOffsetT(topic), 0)
}
func DataMessageAddTimestamp(builder *flatbuffers.Builder, timestamp uint64) {
builder.PrependUint64Slot(2, timestamp, 0)
}
func DataMessageAddPayload(builder *flatbuffers.Builder, payload flatbuffers.UOffsetT) {
builder.PrependUOffsetTSlot(3, flatbuffers.UOffsetT(payload), 0)
}
func DataMessageStartPayloadVector(builder *flatbuffers.Builder, numElems int) flatbuffers.UOffsetT {
return builder.StartVector(1, numElems, 1)
}
func DataMessageAddMetadata(builder *flatbuffers.Builder, metadata flatbuffers.UOffsetT) {
builder.PrependUOffsetTSlot(4, flatbuffers.UOffsetT(metadata), 0)
}
func DataMessageStartMetadataVector(builder *flatbuffers.Builder, numElems int) flatbuffers.UOffsetT {
return builder.StartVector(4, numElems, 4)
}
func DataMessageEnd(builder *flatbuffers.Builder) flatbuffers.UOffsetT {
return builder.EndObject()
}

91
proto/Envelope.go Normal file
View File

@ -0,0 +1,91 @@
// Code generated by the FlatBuffers compiler. DO NOT EDIT.
package proto
import (
flatbuffers "github.com/google/flatbuffers/go"
)
type Envelope struct {
_tab flatbuffers.Table
}
func GetRootAsEnvelope(buf []byte, offset flatbuffers.UOffsetT) *Envelope {
n := flatbuffers.GetUOffsetT(buf[offset:])
x := &Envelope{}
x.Init(buf, n+offset)
return x
}
func FinishEnvelopeBuffer(builder *flatbuffers.Builder, offset flatbuffers.UOffsetT) {
builder.Finish(offset)
}
func GetSizePrefixedRootAsEnvelope(buf []byte, offset flatbuffers.UOffsetT) *Envelope {
n := flatbuffers.GetUOffsetT(buf[offset+flatbuffers.SizeUint32:])
x := &Envelope{}
x.Init(buf, n+offset+flatbuffers.SizeUint32)
return x
}
func FinishSizePrefixedEnvelopeBuffer(builder *flatbuffers.Builder, offset flatbuffers.UOffsetT) {
builder.FinishSizePrefixed(offset)
}
func (rcv *Envelope) Init(buf []byte, i flatbuffers.UOffsetT) {
rcv._tab.Bytes = buf
rcv._tab.Pos = i
}
func (rcv *Envelope) Table() flatbuffers.Table {
return rcv._tab
}
func (rcv *Envelope) Type() MessageType {
o := flatbuffers.UOffsetT(rcv._tab.Offset(4))
if o != 0 {
return MessageType(rcv._tab.GetInt8(o + rcv._tab.Pos))
}
return 0
}
func (rcv *Envelope) MutateType(n MessageType) bool {
return rcv._tab.MutateInt8Slot(4, int8(n))
}
func (rcv *Envelope) MessageType() Message {
o := flatbuffers.UOffsetT(rcv._tab.Offset(6))
if o != 0 {
return Message(rcv._tab.GetByte(o + rcv._tab.Pos))
}
return 0
}
func (rcv *Envelope) MutateMessageType(n Message) bool {
return rcv._tab.MutateByteSlot(6, byte(n))
}
func (rcv *Envelope) Message(obj *flatbuffers.Table) bool {
o := flatbuffers.UOffsetT(rcv._tab.Offset(8))
if o != 0 {
rcv._tab.Union(obj, o)
return true
}
return false
}
func EnvelopeStart(builder *flatbuffers.Builder) {
builder.StartObject(3)
}
func EnvelopeAddType(builder *flatbuffers.Builder, type_ MessageType) {
builder.PrependInt8Slot(0, int8(type_), 0)
}
func EnvelopeAddMessageType(builder *flatbuffers.Builder, messageType Message) {
builder.PrependByteSlot(1, byte(messageType), 0)
}
func EnvelopeAddMessage(builder *flatbuffers.Builder, message flatbuffers.UOffsetT) {
builder.PrependUOffsetTSlot(2, flatbuffers.UOffsetT(message), 0)
}
func EnvelopeEnd(builder *flatbuffers.Builder) flatbuffers.UOffsetT {
return builder.EndObject()
}

75
proto/HeartBeatMessage.go Normal file
View File

@ -0,0 +1,75 @@
// Code generated by the FlatBuffers compiler. DO NOT EDIT.
package proto
import (
flatbuffers "github.com/google/flatbuffers/go"
)
type HeartBeatMessage struct {
_tab flatbuffers.Table
}
func GetRootAsHeartBeatMessage(buf []byte, offset flatbuffers.UOffsetT) *HeartBeatMessage {
n := flatbuffers.GetUOffsetT(buf[offset:])
x := &HeartBeatMessage{}
x.Init(buf, n+offset)
return x
}
func FinishHeartBeatMessageBuffer(builder *flatbuffers.Builder, offset flatbuffers.UOffsetT) {
builder.Finish(offset)
}
func GetSizePrefixedRootAsHeartBeatMessage(buf []byte, offset flatbuffers.UOffsetT) *HeartBeatMessage {
n := flatbuffers.GetUOffsetT(buf[offset+flatbuffers.SizeUint32:])
x := &HeartBeatMessage{}
x.Init(buf, n+offset+flatbuffers.SizeUint32)
return x
}
func FinishSizePrefixedHeartBeatMessageBuffer(builder *flatbuffers.Builder, offset flatbuffers.UOffsetT) {
builder.FinishSizePrefixed(offset)
}
func (rcv *HeartBeatMessage) Init(buf []byte, i flatbuffers.UOffsetT) {
rcv._tab.Bytes = buf
rcv._tab.Pos = i
}
func (rcv *HeartBeatMessage) Table() flatbuffers.Table {
return rcv._tab
}
func (rcv *HeartBeatMessage) ClientId() []byte {
o := flatbuffers.UOffsetT(rcv._tab.Offset(4))
if o != 0 {
return rcv._tab.ByteVector(o + rcv._tab.Pos)
}
return nil
}
func (rcv *HeartBeatMessage) Timestamp() uint64 {
o := flatbuffers.UOffsetT(rcv._tab.Offset(6))
if o != 0 {
return rcv._tab.GetUint64(o + rcv._tab.Pos)
}
return 0
}
func (rcv *HeartBeatMessage) MutateTimestamp(n uint64) bool {
return rcv._tab.MutateUint64Slot(6, n)
}
func HeartBeatMessageStart(builder *flatbuffers.Builder) {
builder.StartObject(2)
}
func HeartBeatMessageAddClientId(builder *flatbuffers.Builder, clientId flatbuffers.UOffsetT) {
builder.PrependUOffsetTSlot(0, flatbuffers.UOffsetT(clientId), 0)
}
func HeartBeatMessageAddTimestamp(builder *flatbuffers.Builder, timestamp uint64) {
builder.PrependUint64Slot(1, timestamp, 0)
}
func HeartBeatMessageEnd(builder *flatbuffers.Builder) flatbuffers.UOffsetT {
return builder.EndObject()
}

71
proto/KeyValue.go Normal file
View File

@ -0,0 +1,71 @@
// Code generated by the FlatBuffers compiler. DO NOT EDIT.
package proto
import (
flatbuffers "github.com/google/flatbuffers/go"
)
type KeyValue struct {
_tab flatbuffers.Table
}
func GetRootAsKeyValue(buf []byte, offset flatbuffers.UOffsetT) *KeyValue {
n := flatbuffers.GetUOffsetT(buf[offset:])
x := &KeyValue{}
x.Init(buf, n+offset)
return x
}
func FinishKeyValueBuffer(builder *flatbuffers.Builder, offset flatbuffers.UOffsetT) {
builder.Finish(offset)
}
func GetSizePrefixedRootAsKeyValue(buf []byte, offset flatbuffers.UOffsetT) *KeyValue {
n := flatbuffers.GetUOffsetT(buf[offset+flatbuffers.SizeUint32:])
x := &KeyValue{}
x.Init(buf, n+offset+flatbuffers.SizeUint32)
return x
}
func FinishSizePrefixedKeyValueBuffer(builder *flatbuffers.Builder, offset flatbuffers.UOffsetT) {
builder.FinishSizePrefixed(offset)
}
func (rcv *KeyValue) Init(buf []byte, i flatbuffers.UOffsetT) {
rcv._tab.Bytes = buf
rcv._tab.Pos = i
}
func (rcv *KeyValue) Table() flatbuffers.Table {
return rcv._tab
}
func (rcv *KeyValue) Key() []byte {
o := flatbuffers.UOffsetT(rcv._tab.Offset(4))
if o != 0 {
return rcv._tab.ByteVector(o + rcv._tab.Pos)
}
return nil
}
func (rcv *KeyValue) Value() []byte {
o := flatbuffers.UOffsetT(rcv._tab.Offset(6))
if o != 0 {
return rcv._tab.ByteVector(o + rcv._tab.Pos)
}
return nil
}
func KeyValueStart(builder *flatbuffers.Builder) {
builder.StartObject(2)
}
func KeyValueAddKey(builder *flatbuffers.Builder, key flatbuffers.UOffsetT) {
builder.PrependUOffsetTSlot(0, flatbuffers.UOffsetT(key), 0)
}
func KeyValueAddValue(builder *flatbuffers.Builder, value flatbuffers.UOffsetT) {
builder.PrependUOffsetTSlot(1, flatbuffers.UOffsetT(value), 0)
}
func KeyValueEnd(builder *flatbuffers.Builder) flatbuffers.UOffsetT {
return builder.EndObject()
}

35
proto/Message.go Normal file
View File

@ -0,0 +1,35 @@
// Code generated by the FlatBuffers compiler. DO NOT EDIT.
package proto
import "strconv"
type Message byte
const (
MessageNONE Message = 0
MessageDataMessage Message = 1
MessageAckMessage Message = 2
MessageHeartBeatMessage Message = 3
)
var EnumNamesMessage = map[Message]string{
MessageNONE: "NONE",
MessageDataMessage: "DataMessage",
MessageAckMessage: "AckMessage",
MessageHeartBeatMessage: "HeartBeatMessage",
}
var EnumValuesMessage = map[string]Message{
"NONE": MessageNONE,
"DataMessage": MessageDataMessage,
"AckMessage": MessageAckMessage,
"HeartBeatMessage": MessageHeartBeatMessage,
}
func (v Message) String() string {
if s, ok := EnumNamesMessage[v]; ok {
return s
}
return "Message(" + strconv.FormatInt(int64(v), 10) + ")"
}

35
proto/MessageType.go Normal file
View File

@ -0,0 +1,35 @@
// Code generated by the FlatBuffers compiler. DO NOT EDIT.
package proto
import "strconv"
type MessageType int8
const (
MessageTypeNONE MessageType = 0
MessageTypeDATA MessageType = 1
MessageTypeACK MessageType = 2
MessageTypeHEARTBEAT MessageType = 3
)
var EnumNamesMessageType = map[MessageType]string{
MessageTypeNONE: "NONE",
MessageTypeDATA: "DATA",
MessageTypeACK: "ACK",
MessageTypeHEARTBEAT: "HEARTBEAT",
}
var EnumValuesMessageType = map[string]MessageType{
"NONE": MessageTypeNONE,
"DATA": MessageTypeDATA,
"ACK": MessageTypeACK,
"HEARTBEAT": MessageTypeHEARTBEAT,
}
func (v MessageType) String() string {
if s, ok := EnumNamesMessageType[v]; ok {
return s
}
return "MessageType(" + strconv.FormatInt(int64(v), 10) + ")"
}

120
store/storage.go Normal file
View File

@ -0,0 +1,120 @@
package store
import (
"database/sql"
"git.espin.casa/albert/whippet/proto"
_ "github.com/mattn/go-sqlite3"
)
type Storage struct {
db *sql.DB
}
func NewStorage(path string) *Storage {
db, err := sql.Open("sqlite3", path)
if err != nil {
return nil
}
s := &Storage{db: db}
if err := s.initSchema(); err != nil {
return nil
}
return s
}
func (s *Storage) initSchema() error {
schema := `
CREATE TABLE IF NOT EXISTS messages (
id INTEGER PRIMARY KEY AUTOINCREMENT,
topic TEXT,
msg_id TEXT,
payload BLOB,
timestamp INTEGER
);`
_, err := s.db.Exec(schema)
return err
}
// Save guarda un mensaje en la base de datos
func (s *Storage) Save(topic string, msg *proto.DataMessage) error {
_, err := s.db.Exec(`
INSERT INTO messages (topic, msg_id, payload, timestamp)
VALUES (?, ?, ?, ?)`,
topic, string(msg.Id()), msg.PayloadBytes(), msg.Timestamp(),
)
return err
}
// LoadSince carga mensajes desde un timestamp específico
func (s *Storage) LoadSince(topic string, sinceTimestamp uint64, max int) ([]*proto.DataMessage, error) {
rows, err := s.db.Query(`
SELECT msg_id, payload, timestamp
FROM messages
WHERE topic = ? AND timestamp > ?
ORDER BY timestamp ASC
LIMIT ?`,
topic, sinceTimestamp, max,
)
if err != nil {
return nil, err
}
defer rows.Close()
var result []*proto.DataMessage
for rows.Next() {
var id string
var payload []byte
var timestamp uint64
err := rows.Scan(&id, &payload, &timestamp)
if err != nil {
return nil, err
}
// Reensamblar el FlatBuffer
msg := proto.GetRootAsDataMessage(payload, 0)
// Cuidado: esto apunta a datos en `payload` directamente
// Alternativa: clonar si necesitas preservar después
result = append(result, msg)
}
return result, nil
}
// LoadAll carga todos los mensajes de un tema específico
func (s *Storage) LoadAll(topic string) ([]*proto.DataMessage, error) {
rows, err := s.db.Query(`
SELECT msg_id, payload, timestamp
FROM messages
WHERE topic = ?
ORDER BY timestamp ASC
`, topic)
if err != nil {
return nil, err
}
defer rows.Close()
var result []*proto.DataMessage
for rows.Next() {
var id string
var payload []byte
var timestamp uint64
err := rows.Scan(&id, &payload, &timestamp)
if err != nil {
return nil, err
}
// Reensamblar el FlatBuffer
msg := proto.GetRootAsDataMessage(payload, 0)
// Cuidado: esto apunta a datos en `payload` directamente
// Alternativa: clonar si necesitas preservar después
result = append(result, msg)
}
return result, nil
}
// LoadAllTopics carga todos los temas disponibles
func (s *Storage) Close() error {
return s.db.Close()
}