From af81ae32ef5226f970bc8fac306274a5953f94b4 Mon Sep 17 00:00:00 2001 From: Daniel Ponte Date: Mon, 18 Nov 2024 18:31:17 -0500 Subject: [PATCH] Relay --- .gitignore | 3 ++- config.sample.yaml | 8 ++++++ pkg/server/server.go | 9 +++++++ pkg/sinks/relay.go | 57 ++++++++++++++++++++++++++++++++--------- pkg/sinks/relay_test.go | 15 ++++++++--- pkg/sinks/sinks.go | 41 +++++++++++++++++++++-------- 6 files changed, 105 insertions(+), 28 deletions(-) diff --git a/.gitignore b/.gitignore index 8e9a60e..610f57c 100644 --- a/.gitignore +++ b/.gitignore @@ -1,5 +1,6 @@ config.yaml -config.test.yaml +config.*.yaml +!config.sample.yaml /*.sql client/calls/ !client/calls/.gitkeep diff --git a/config.sample.yaml b/config.sample.yaml index 5a71064..6384f65 100644 --- a/config.sample.yaml +++ b/config.sample.yaml @@ -41,3 +41,11 @@ notify: # {{ end -}} config: webhookURL: "http://somewhere" +# configure upstream relays here +relay: + # `url` is the root of the instance +# - url: 'http://some.host:3051/' +# apiKey: aaaabbbb-cccc-dddd-eeee-ffff11112222 + # `required` specifies whether we should report failure (i.e. HTTP 500 for rdio-http) to the source + # if the relay call submission fails +# required: false diff --git a/pkg/server/server.go b/pkg/server/server.go index a5ff3bd..4357d12 100644 --- a/pkg/server/server.go +++ b/pkg/server/server.go @@ -31,6 +31,7 @@ type Server struct { r *chi.Mux sources sources.Sources sinks sinks.Sinks + relayer *sinks.RelayManager nex *nexus.Nexus logger *Logger alerter alerting.Alerter @@ -73,6 +74,7 @@ func New(ctx context.Context, cfg *config.Config) (*Server, error) { alerter: alerting.New(cfg.Alerting, tgCache, alerting.WithNotifier(notifier)), notifier: notifier, tgs: tgCache, + sinks: sinks.NewSinkManager(), rest: api, } @@ -85,6 +87,13 @@ func New(ctx context.Context, cfg *config.Config) (*Server, error) { srv.sources.Register("rdio-http", sources.NewRdioHTTP(authenticator, srv)) + relayer, err := sinks.NewRelayManager(srv.sinks, cfg.Relay) + if err != nil { + return nil, err + } + + srv.relayer = relayer + r.Use(middleware.RequestID) r.Use(middleware.RealIP) r.Use(RequestLogger()) diff --git a/pkg/sinks/relay.go b/pkg/sinks/relay.go index 2bbbc9a..1f3bb97 100644 --- a/pkg/sinks/relay.go +++ b/pkg/sinks/relay.go @@ -9,31 +9,56 @@ import ( "net/url" "dynatron.me/x/stillbox/internal/forms" + "dynatron.me/x/stillbox/internal/version" "dynatron.me/x/stillbox/pkg/calls" "dynatron.me/x/stillbox/pkg/config" ) -type RelaySink struct { +type RelayManager struct { + xp *http.Transport + client *http.Client + + relays []*Relay +} + +type Relay struct { config.Relay + mgr *RelayManager + Name string url *url.URL } -func MakeRelaySinks(s *Sinks, cfgs []config.Relay) error { +func NewRelayManager(s Sinks, cfgs []config.Relay) (*RelayManager, error) { + xp := http.DefaultTransport.(*http.Transport).Clone() + xp.MaxIdleConnsPerHost = 10 + + client := &http.Client{ + Transport: xp, + } + + rm := &RelayManager{ + xp: xp, + client: client, + relays: make([]*Relay, 0, len(cfgs)), + } + for i, cfg := range cfgs { - rs, err := NewRelaySink(cfg) + rs, err := rm.newRelay(cfg) if err != nil { - return err + return nil, err } + rm.relays = append(rm.relays, rs) + sinkName := fmt.Sprintf("relay%d:%s", i, rs.url.Host) s.Register(sinkName, rs, cfg.Required) } - return nil + return rm, nil } -func NewRelaySink(cfg config.Relay) (*RelaySink, error) { +func (rs *RelayManager) newRelay(cfg config.Relay) (*Relay, error) { u, err := url.Parse(cfg.URL) if err != nil { return nil, err @@ -45,13 +70,14 @@ func NewRelaySink(cfg config.Relay) (*RelaySink, error) { u = u.JoinPath("/api/call-upload") - return &RelaySink{ + return &Relay{ Relay: cfg, url: u, + mgr: rs, }, nil } -func (s *RelaySink) Call(ctx context.Context, call *calls.Call) error { +func (s *Relay) Call(ctx context.Context, call *calls.Call) error { var buf bytes.Buffer body := multipart.NewWriter(&buf) @@ -59,6 +85,12 @@ func (s *RelaySink) Call(ctx context.Context, call *calls.Call) error { if err != nil { return fmt.Errorf("relay form parse: %w", err) } + + err = body.WriteField("key", s.APIKey) + if err != nil { + return fmt.Errorf("relay set API key: %w", err) + } + body.Close() r, err := http.NewRequestWithContext(ctx, http.MethodPost, s.url.String(), &buf) @@ -67,19 +99,20 @@ func (s *RelaySink) Call(ctx context.Context, call *calls.Call) error { } r.Header.Set("Content-Type", body.FormDataContentType()) + r.Header.Set("User-Agent", version.HttpString("call-relay")) - resp, err := http.DefaultClient.Do(r) + resp, err := s.mgr.client.Do(r) if err != nil { - return fmt.Errorf("relay: %w", err) + return fmt.Errorf("relay %s: %w", s.Name, err) } if resp.StatusCode != http.StatusOK { - return fmt.Errorf("relay: received HTTP %d", resp.StatusCode) + return fmt.Errorf("relay %s: received HTTP %d", s.Name, resp.StatusCode) } return nil } -func (s *RelaySink) SinkType() string { +func (s *Relay) SinkType() string { return "relay" } diff --git a/pkg/sinks/relay_test.go b/pkg/sinks/relay_test.go index 88c4695..90cfda0 100644 --- a/pkg/sinks/relay_test.go +++ b/pkg/sinks/relay_test.go @@ -1,4 +1,4 @@ -package sinks_test +package sinks import ( "context" @@ -16,7 +16,6 @@ import ( "dynatron.me/x/stillbox/pkg/auth" "dynatron.me/x/stillbox/pkg/calls" "dynatron.me/x/stillbox/pkg/config" - "dynatron.me/x/stillbox/pkg/sinks" "dynatron.me/x/stillbox/pkg/sources" "github.com/google/uuid" @@ -88,13 +87,21 @@ func TestRelay(t *testing.T) { URL: svr.URL, APIKey: tc.apiKey, } + ns := &nullSinks{} - rs, err := sinks.NewRelaySink(cfg) + rm, err := NewRelayManager(ns, []config.Relay{cfg}) require.NoError(t, err) - err = rs.Call(context.Background(), &tc.call) + err = rm.relays[0].Call(context.Background(), &tc.call) assert.True(t, called) assert.NoError(t, err) assert.NoError(t, serr) }) } } + +type nullSinks struct{} + +func (*nullSinks) Register(name string, toAdd Sink, required bool) {} +func (*nullSinks) Unregister(name string) {} +func (*nullSinks) Shutdown() {} +func (*nullSinks) EmitCall(ctx context.Context, call *calls.Call) error { return nil } diff --git a/pkg/sinks/sinks.go b/pkg/sinks/sinks.go index 3c8afd9..8259f30 100644 --- a/pkg/sinks/sinks.go +++ b/pkg/sinks/sinks.go @@ -24,36 +24,55 @@ type sinkInstance struct { Required bool } -type Sinks struct { - sync.RWMutex - sinks []sinkInstance +type Sinks interface { + Register(name string, toAdd Sink, required bool) + Unregister(name string) + Shutdown() + EmitCall(ctx context.Context, call *calls.Call) error } -func (s *Sinks) Register(name string, toAdd Sink, required bool) { +type sinks struct { + sync.RWMutex + sinks map[string]sinkInstance +} + +func NewSinkManager() Sinks { + return &sinks{ + sinks: make(map[string]sinkInstance), + } +} + +func (s *sinks) Register(name string, toAdd Sink, required bool) { s.Lock() defer s.Unlock() - s.sinks = append(s.sinks, sinkInstance{ + s.sinks[name] = sinkInstance{ Name: name, Sink: toAdd, Required: required, - }) + } } -func (s *Sinks) Shutdown() { +func (s *sinks) Unregister(name string) { s.Lock() defer s.Unlock() - s.sinks = nil + delete(s.sinks, name) } -func (s *Sinks) EmitCall(ctx context.Context, call *calls.Call) error { +func (s *sinks) Shutdown() { + s.Lock() + defer s.Unlock() + + clear(s.sinks) +} + +func (s *sinks) EmitCall(ctx context.Context, call *calls.Call) error { s.Lock() defer s.Unlock() g, ctx := errgroup.WithContext(ctx) - for i := range s.sinks { - sink := s.sinks[i] + for _, sink := range s.sinks { g.Go(sink.callEmitter(ctx, call)) }