package broker import ( "sync" "git.espin.casa/albert/whippet/proto" "git.espin.casa/albert/whippet/store" ) type ConsumerFunc func(topic string, msg *proto.DataMessage) type Broker struct { sync.RWMutex subscribers map[string][]ConsumerFunc storage *store.Storage } func NewBroker(storage *store.Storage) *Broker { return &Broker{ subscribers: make(map[string][]ConsumerFunc), storage: storage, } } func (b *Broker) Subscribe(topic string, fn ConsumerFunc, fetchPast bool) { b.Lock() b.subscribers[topic] = append(b.subscribers[topic], fn) b.Unlock() if fetchPast { msgs, err := b.storage.LoadAll(topic) if err != nil { b.Unlock() return } for _, msg := range msgs { fn(topic, msg) } } } func (b *Broker) Publish(topic string, msg *proto.DataMessage) { b.storage.Save(topic, msg) b.RLock() defer b.RUnlock() for _, fn := range b.subscribers[topic] { go fn(topic, msg) } }