package bus import ( "sync" ) type ( Event struct { EvType string Data interface{} } listeners []chan<- Event bus struct { sync.RWMutex subs map[string]listeners } Bus interface { Sub(topic string, ch chan<- Event) Unsub(topic string, ch chan<- Event) Pub(topic string, data interface{}) ShutdownBus() } ) func New() Bus { bus := &bus{ subs: make(map[string]listeners), } return bus } func (b *bus) Sub(topic string, ch chan<- Event) { b.Lock() defer b.Unlock() if prev, ok := b.subs[topic]; ok { b.subs[topic] = append(prev, ch) } else { b.subs[topic] = append(listeners{}, ch) } } func (b *bus) Unsub(topic string, ch chan<- Event) { b.Lock() defer b.Unlock() for i, v := range b.subs[topic] { if v == ch { // we don't care about order, replace and reslice b.subs[topic][i] = b.subs[topic][len(b.subs[topic])-1] b.subs[topic] = b.subs[topic][:len(b.subs[topic])-1] } } } func (b *bus) Pub(topic string, data interface{}) { b.RLock() defer b.RUnlock() tc, ok := b.subs[topic] if !ok { return } for _, ch := range tc { ch <- Event{EvType: topic, Data: data} } } func (b *bus) ShutdownBus() { for _, v := range b.subs { for _, c := range v { close(c) } } }