blasphem/pkg/bus/bus.go

81 lines
1.2 KiB
Go
Raw Permalink Normal View History

2022-09-25 11:42:36 -04:00
package bus
2022-10-25 00:16:29 -04:00
import (
"sync"
)
2022-09-25 11:42:36 -04:00
2022-10-25 00:16:29 -04:00
type (
Event struct {
EvType string
Data interface{}
}
listeners []chan<- Event
2022-12-19 19:24:01 -05:00
bus struct {
2022-10-25 00:16:29 -04:00
sync.RWMutex
subs map[string]listeners
}
2022-12-19 19:24:01 -05:00
Bus interface {
Sub(topic string, ch chan<- Event)
Unsub(topic string, ch chan<- Event)
Pub(topic string, data interface{})
ShutdownBus()
}
2022-10-25 00:16:29 -04:00
)
2022-09-25 11:42:36 -04:00
2022-12-19 19:24:01 -05:00
func New() Bus {
bus := &bus{
2022-10-25 00:16:29 -04:00
subs: make(map[string]listeners),
}
2022-09-25 11:42:36 -04:00
return bus
2022-10-25 00:16:29 -04:00
}
2022-12-19 19:24:01 -05:00
func (b *bus) Sub(topic string, ch chan<- Event) {
2022-10-25 00:16:29 -04:00
b.Lock()
defer b.Unlock()
if prev, ok := b.subs[topic]; ok {
b.subs[topic] = append(prev, ch)
} else {
b.subs[topic] = append(listeners{}, ch)
}
}
2022-12-19 19:24:01 -05:00
func (b *bus) Unsub(topic string, ch chan<- Event) {
2022-10-25 00:16:29 -04:00
b.Lock()
defer b.Unlock()
for i, v := range b.subs[topic] {
if v == ch {
// we don't care about order, replace and reslice
b.subs[topic][i] = b.subs[topic][len(b.subs[topic])-1]
b.subs[topic] = b.subs[topic][:len(b.subs[topic])-1]
}
}
}
2022-12-19 19:24:01 -05:00
func (b *bus) Pub(topic string, data interface{}) {
2022-10-25 00:16:29 -04:00
b.RLock()
defer b.RUnlock()
tc, ok := b.subs[topic]
if !ok {
return
}
for _, ch := range tc {
ch <- Event{EvType: topic, Data: data}
}
}
2022-12-19 19:24:01 -05:00
func (b *bus) ShutdownBus() {
2022-10-25 00:16:29 -04:00
for _, v := range b.subs {
for _, c := range v {
close(c)
}
}
2022-09-25 11:42:36 -04:00
}