stillbox/pkg/nexus/nexus.go

108 lines
1.6 KiB
Go
Raw Permalink Normal View History

2024-08-04 00:55:28 -04:00
package nexus
import (
2024-08-06 11:19:30 -04:00
"context"
2024-08-04 00:55:28 -04:00
"sync"
2024-08-04 08:41:35 -04:00
2024-08-05 18:11:31 -04:00
"dynatron.me/x/stillbox/pkg/calls"
2024-08-04 08:41:35 -04:00
"dynatron.me/x/stillbox/pkg/pb"
2024-10-21 22:19:31 -04:00
"github.com/rs/zerolog/log"
2024-08-04 00:55:28 -04:00
)
type Nexus struct {
sync.RWMutex
clients map[*client]struct{}
*wsManager
2024-08-04 08:41:35 -04:00
callCh chan *calls.Call
2024-08-04 00:55:28 -04:00
}
type Registry interface {
NewClient(Connection) Client
Register(Client)
Unregister(Client)
}
func New() *Nexus {
n := &Nexus{
clients: make(map[*client]struct{}),
2024-08-04 10:56:46 -04:00
callCh: make(chan *calls.Call),
2024-08-04 00:55:28 -04:00
}
n.wsManager = newWsManager(n)
return n
}
2024-08-06 11:19:30 -04:00
func (n *Nexus) Go(ctx context.Context) {
2024-08-04 08:41:35 -04:00
for {
select {
case call, ok := <-n.callCh:
if !ok {
return
}
2024-08-06 11:19:30 -04:00
n.broadcastCallToClients(ctx, call)
case <-ctx.Done():
2024-10-21 13:49:20 -04:00
n.Shutdown()
2024-08-04 10:56:46 -04:00
return
2024-08-04 08:41:35 -04:00
}
}
}
2024-08-04 10:56:46 -04:00
func (n *Nexus) BroadcastCall(call *calls.Call) {
n.callCh <- call
}
2024-08-06 11:19:30 -04:00
func (n *Nexus) broadcastCallToClients(ctx context.Context, call *calls.Call) {
2024-08-04 08:41:35 -04:00
message := &pb.Message{
ToClientMessage: &pb.Message_Call{Call: call.ToPB()},
}
2024-08-04 14:55:15 -04:00
n.Lock()
defer n.Unlock()
2024-08-04 08:41:35 -04:00
2024-08-06 11:19:30 -04:00
for cl := range n.clients {
if !cl.filter.Test(ctx, call) {
continue
}
2024-10-21 22:19:31 -04:00
switch err := cl.Send(message); err {
case ErrSentToClosed:
2024-08-04 14:55:15 -04:00
// we already hold the lock, and the channel is closed anyway
delete(n.clients, cl)
2024-10-21 22:19:31 -04:00
case nil:
default:
log.Error().Err(err).Msg("broadcast send failed")
2024-08-04 14:55:15 -04:00
}
2024-08-04 08:41:35 -04:00
}
}
2024-08-04 00:55:28 -04:00
func (n *Nexus) Register(c Client) {
n.Lock()
defer n.Unlock()
n.clients[c.(*client)] = struct{}{}
}
func (n *Nexus) Unregister(c Client) {
n.Lock()
defer n.Unlock()
2024-08-04 08:41:35 -04:00
cl := c.(*client)
delete(n.clients, cl)
2024-08-04 00:55:28 -04:00
}
2024-10-21 13:49:20 -04:00
func (n *Nexus) Shutdown() {
2024-10-21 22:19:31 -04:00
n.Lock()
defer n.Unlock()
close(n.callCh)
2024-10-21 13:49:20 -04:00
for c := range n.clients {
c.Shutdown()
}
}