This commit is contained in:
Daniel Ponte 2024-11-18 18:31:17 -05:00
parent ec33e568d5
commit af81ae32ef
6 changed files with 105 additions and 28 deletions

3
.gitignore vendored
View file

@ -1,5 +1,6 @@
config.yaml
config.test.yaml
config.*.yaml
!config.sample.yaml
/*.sql
client/calls/
!client/calls/.gitkeep

View file

@ -41,3 +41,11 @@ notify:
# {{ end -}}
config:
webhookURL: "http://somewhere"
# configure upstream relays here
relay:
# `url` is the root of the instance
# - url: 'http://some.host:3051/'
# apiKey: aaaabbbb-cccc-dddd-eeee-ffff11112222
# `required` specifies whether we should report failure (i.e. HTTP 500 for rdio-http) to the source
# if the relay call submission fails
# required: false

View file

@ -31,6 +31,7 @@ type Server struct {
r *chi.Mux
sources sources.Sources
sinks sinks.Sinks
relayer *sinks.RelayManager
nex *nexus.Nexus
logger *Logger
alerter alerting.Alerter
@ -73,6 +74,7 @@ func New(ctx context.Context, cfg *config.Config) (*Server, error) {
alerter: alerting.New(cfg.Alerting, tgCache, alerting.WithNotifier(notifier)),
notifier: notifier,
tgs: tgCache,
sinks: sinks.NewSinkManager(),
rest: api,
}
@ -85,6 +87,13 @@ func New(ctx context.Context, cfg *config.Config) (*Server, error) {
srv.sources.Register("rdio-http", sources.NewRdioHTTP(authenticator, srv))
relayer, err := sinks.NewRelayManager(srv.sinks, cfg.Relay)
if err != nil {
return nil, err
}
srv.relayer = relayer
r.Use(middleware.RequestID)
r.Use(middleware.RealIP)
r.Use(RequestLogger())

View file

@ -9,31 +9,56 @@ import (
"net/url"
"dynatron.me/x/stillbox/internal/forms"
"dynatron.me/x/stillbox/internal/version"
"dynatron.me/x/stillbox/pkg/calls"
"dynatron.me/x/stillbox/pkg/config"
)
type RelaySink struct {
type RelayManager struct {
xp *http.Transport
client *http.Client
relays []*Relay
}
type Relay struct {
config.Relay
mgr *RelayManager
Name string
url *url.URL
}
func MakeRelaySinks(s *Sinks, cfgs []config.Relay) error {
for i, cfg := range cfgs {
rs, err := NewRelaySink(cfg)
if err != nil {
return err
func NewRelayManager(s Sinks, cfgs []config.Relay) (*RelayManager, error) {
xp := http.DefaultTransport.(*http.Transport).Clone()
xp.MaxIdleConnsPerHost = 10
client := &http.Client{
Transport: xp,
}
rm := &RelayManager{
xp: xp,
client: client,
relays: make([]*Relay, 0, len(cfgs)),
}
for i, cfg := range cfgs {
rs, err := rm.newRelay(cfg)
if err != nil {
return nil, err
}
rm.relays = append(rm.relays, rs)
sinkName := fmt.Sprintf("relay%d:%s", i, rs.url.Host)
s.Register(sinkName, rs, cfg.Required)
}
return nil
return rm, nil
}
func NewRelaySink(cfg config.Relay) (*RelaySink, error) {
func (rs *RelayManager) newRelay(cfg config.Relay) (*Relay, error) {
u, err := url.Parse(cfg.URL)
if err != nil {
return nil, err
@ -45,13 +70,14 @@ func NewRelaySink(cfg config.Relay) (*RelaySink, error) {
u = u.JoinPath("/api/call-upload")
return &RelaySink{
return &Relay{
Relay: cfg,
url: u,
mgr: rs,
}, nil
}
func (s *RelaySink) Call(ctx context.Context, call *calls.Call) error {
func (s *Relay) Call(ctx context.Context, call *calls.Call) error {
var buf bytes.Buffer
body := multipart.NewWriter(&buf)
@ -59,6 +85,12 @@ func (s *RelaySink) Call(ctx context.Context, call *calls.Call) error {
if err != nil {
return fmt.Errorf("relay form parse: %w", err)
}
err = body.WriteField("key", s.APIKey)
if err != nil {
return fmt.Errorf("relay set API key: %w", err)
}
body.Close()
r, err := http.NewRequestWithContext(ctx, http.MethodPost, s.url.String(), &buf)
@ -67,19 +99,20 @@ func (s *RelaySink) Call(ctx context.Context, call *calls.Call) error {
}
r.Header.Set("Content-Type", body.FormDataContentType())
r.Header.Set("User-Agent", version.HttpString("call-relay"))
resp, err := http.DefaultClient.Do(r)
resp, err := s.mgr.client.Do(r)
if err != nil {
return fmt.Errorf("relay: %w", err)
return fmt.Errorf("relay %s: %w", s.Name, err)
}
if resp.StatusCode != http.StatusOK {
return fmt.Errorf("relay: received HTTP %d", resp.StatusCode)
return fmt.Errorf("relay %s: received HTTP %d", s.Name, resp.StatusCode)
}
return nil
}
func (s *RelaySink) SinkType() string {
func (s *Relay) SinkType() string {
return "relay"
}

View file

@ -1,4 +1,4 @@
package sinks_test
package sinks
import (
"context"
@ -16,7 +16,6 @@ import (
"dynatron.me/x/stillbox/pkg/auth"
"dynatron.me/x/stillbox/pkg/calls"
"dynatron.me/x/stillbox/pkg/config"
"dynatron.me/x/stillbox/pkg/sinks"
"dynatron.me/x/stillbox/pkg/sources"
"github.com/google/uuid"
@ -88,13 +87,21 @@ func TestRelay(t *testing.T) {
URL: svr.URL,
APIKey: tc.apiKey,
}
ns := &nullSinks{}
rs, err := sinks.NewRelaySink(cfg)
rm, err := NewRelayManager(ns, []config.Relay{cfg})
require.NoError(t, err)
err = rs.Call(context.Background(), &tc.call)
err = rm.relays[0].Call(context.Background(), &tc.call)
assert.True(t, called)
assert.NoError(t, err)
assert.NoError(t, serr)
})
}
}
type nullSinks struct{}
func (*nullSinks) Register(name string, toAdd Sink, required bool) {}
func (*nullSinks) Unregister(name string) {}
func (*nullSinks) Shutdown() {}
func (*nullSinks) EmitCall(ctx context.Context, call *calls.Call) error { return nil }

View file

@ -24,36 +24,55 @@ type sinkInstance struct {
Required bool
}
type Sinks struct {
sync.RWMutex
sinks []sinkInstance
type Sinks interface {
Register(name string, toAdd Sink, required bool)
Unregister(name string)
Shutdown()
EmitCall(ctx context.Context, call *calls.Call) error
}
func (s *Sinks) Register(name string, toAdd Sink, required bool) {
type sinks struct {
sync.RWMutex
sinks map[string]sinkInstance
}
func NewSinkManager() Sinks {
return &sinks{
sinks: make(map[string]sinkInstance),
}
}
func (s *sinks) Register(name string, toAdd Sink, required bool) {
s.Lock()
defer s.Unlock()
s.sinks = append(s.sinks, sinkInstance{
s.sinks[name] = sinkInstance{
Name: name,
Sink: toAdd,
Required: required,
})
}
}
func (s *Sinks) Shutdown() {
func (s *sinks) Unregister(name string) {
s.Lock()
defer s.Unlock()
s.sinks = nil
delete(s.sinks, name)
}
func (s *Sinks) EmitCall(ctx context.Context, call *calls.Call) error {
func (s *sinks) Shutdown() {
s.Lock()
defer s.Unlock()
clear(s.sinks)
}
func (s *sinks) EmitCall(ctx context.Context, call *calls.Call) error {
s.Lock()
defer s.Unlock()
g, ctx := errgroup.WithContext(ctx)
for i := range s.sinks {
sink := s.sinks[i]
for _, sink := range s.sinks {
g.Go(sink.callEmitter(ctx, call))
}