From ec33e568d5ea79a949719fb6ca3f27f549cae602 Mon Sep 17 00:00:00 2001 From: Daniel Ponte Date: Mon, 18 Nov 2024 14:27:12 -0500 Subject: [PATCH] Add relay --- pkg/calls/call.go | 38 +++++++-------- pkg/config/config.go | 18 +++----- pkg/sinks/relay.go | 85 ++++++++++++++++++++++++++++++++++ pkg/sinks/relay_test.go | 100 ++++++++++++++++++++++++++++++++++++++++ pkg/sources/http.go | 10 ++-- 5 files changed, 216 insertions(+), 35 deletions(-) create mode 100644 pkg/sinks/relay.go create mode 100644 pkg/sinks/relay_test.go diff --git a/pkg/calls/call.go b/pkg/calls/call.go index 07b633d..6359940 100644 --- a/pkg/calls/call.go +++ b/pkg/calls/call.go @@ -33,26 +33,26 @@ func (d CallDuration) Seconds() int32 { } type Call struct { - ID uuid.UUID - Audio []byte - AudioName string - AudioType string - Duration CallDuration - DateTime time.Time - Frequencies []int - Frequency int - Patches []int - Source int - Sources []int - System int - Submitter *auth.UserID - SystemLabel string - Talkgroup int - TalkgroupGroup *string - TalkgroupLabel *string - TGAlphaTag *string + ID uuid.UUID `form:"-"` + Audio []byte `form:"audio" filenameField:"AudioName"` + AudioName string `form:"audioName"` + AudioType string `form:"audioType"` + Duration CallDuration `form:"-"` + DateTime time.Time `form:"dateTime"` + Frequencies []int `form:"frequencies"` + Frequency int `form:"frequency"` + Patches []int `form:"patches"` + Source int `form:"source"` + Sources []int `form:"sources"` + System int `form:"system"` + Submitter *auth.UserID `form:"-"` + SystemLabel string `form:"systemLabel"` + Talkgroup int `form:"talkgroup"` + TalkgroupGroup *string `form:"talkgroupGroup"` + TalkgroupLabel *string `form:"talkgroupLabel"` + TGAlphaTag *string `form:"talkgroupTag"` // not 1:1 - shouldStore bool + shouldStore bool `form:"-"` } func (c *Call) String() string { diff --git a/pkg/config/config.go b/pkg/config/config.go index fb2c589..c9f3074 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -22,6 +22,7 @@ type Config struct { Public bool `yaml:"public"` RateLimit RateLimit `yaml:"rateLimit"` Notify Notify `yaml:"notify"` + Relay []Relay `yaml:"relay"` configPath string } @@ -63,6 +64,12 @@ type Alerting struct { Renotify *jsontypes.Duration `yaml:"renotify,omitempty" form:"renotify,omitempty"` } +type Relay struct { + URL string `yaml:"url"` + APIKey string `yaml:"apiKey"` + Required bool `yaml:"required"` +} + type Notify []NotifyService type NotifyService struct { @@ -72,17 +79,6 @@ type NotifyService struct { Config map[string]interface{} `yaml:"config" json:"config"` } -func (n *NotifyService) GetS(k, defaultVal string) string { - if v, has := n.Config[k]; has { - if v, isString := v.(string); isString { - return v - } - log.Error().Str("configKey", k).Str("provider", n.Provider).Str("default", defaultVal).Msg("notify config value is not a string! using default") - } - - return defaultVal -} - func (rl *RateLimit) Verify() bool { if rl.Enable { if rl.Requests > 0 && rl.Over > 0 { diff --git a/pkg/sinks/relay.go b/pkg/sinks/relay.go new file mode 100644 index 0000000..2bbbc9a --- /dev/null +++ b/pkg/sinks/relay.go @@ -0,0 +1,85 @@ +package sinks + +import ( + "bytes" + "context" + "fmt" + "mime/multipart" + "net/http" + "net/url" + + "dynatron.me/x/stillbox/internal/forms" + "dynatron.me/x/stillbox/pkg/calls" + "dynatron.me/x/stillbox/pkg/config" +) + +type RelaySink struct { + config.Relay + + url *url.URL +} + +func MakeRelaySinks(s *Sinks, cfgs []config.Relay) error { + for i, cfg := range cfgs { + rs, err := NewRelaySink(cfg) + if err != nil { + return err + } + + sinkName := fmt.Sprintf("relay%d:%s", i, rs.url.Host) + s.Register(sinkName, rs, cfg.Required) + } + + return nil +} + +func NewRelaySink(cfg config.Relay) (*RelaySink, error) { + u, err := url.Parse(cfg.URL) + if err != nil { + return nil, err + } + + if u.Path != "" && u.Path != "/" { + return nil, fmt.Errorf("relay path in %s must be instance root", cfg.URL) + } + + u = u.JoinPath("/api/call-upload") + + return &RelaySink{ + Relay: cfg, + url: u, + }, nil +} + +func (s *RelaySink) Call(ctx context.Context, call *calls.Call) error { + var buf bytes.Buffer + body := multipart.NewWriter(&buf) + + err := forms.Marshal(call, body) + if err != nil { + return fmt.Errorf("relay form parse: %w", err) + } + body.Close() + + r, err := http.NewRequestWithContext(ctx, http.MethodPost, s.url.String(), &buf) + if err != nil { + return fmt.Errorf("relay newrequest: %w", err) + } + + r.Header.Set("Content-Type", body.FormDataContentType()) + + resp, err := http.DefaultClient.Do(r) + if err != nil { + return fmt.Errorf("relay: %w", err) + } + + if resp.StatusCode != http.StatusOK { + return fmt.Errorf("relay: received HTTP %d", resp.StatusCode) + } + + return nil +} + +func (s *RelaySink) SinkType() string { + return "relay" +} diff --git a/pkg/sinks/relay_test.go b/pkg/sinks/relay_test.go new file mode 100644 index 0000000..88c4695 --- /dev/null +++ b/pkg/sinks/relay_test.go @@ -0,0 +1,100 @@ +package sinks_test + +import ( + "context" + "math/rand" + "net/http" + "net/http/httptest" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "dynatron.me/x/stillbox/internal/common" + "dynatron.me/x/stillbox/internal/forms" + "dynatron.me/x/stillbox/pkg/auth" + "dynatron.me/x/stillbox/pkg/calls" + "dynatron.me/x/stillbox/pkg/config" + "dynatron.me/x/stillbox/pkg/sinks" + "dynatron.me/x/stillbox/pkg/sources" + + "github.com/google/uuid" +) + +type hand func(w http.ResponseWriter, r *http.Request) + +func (h hand) ServeHTTP(w http.ResponseWriter, r *http.Request) { + h(w, r) +} + +func TestRelay(t *testing.T) { + uuid.SetRand(rand.New(rand.NewSource(1))) + + tests := []struct { + name string + submitter auth.UserID + apiKey string + call calls.Call + }{ + { + name: "base", + submitter: auth.UserID(1), + call: calls.Call{ + ID: uuid.UUID([16]byte{0x52, 0xfd, 0xfc, 0x07, 0x21, 0x82, 0x45, 0x4f, 0x96, 0x3f, 0x5f, 0x0f, 0x9a, 0x62, 0x1d, 0x72}), + Submitter: common.PtrTo(auth.UserID(1)), + System: 197, + Talkgroup: 10101, + DateTime: time.Date(2024, 11, 10, 23, 33, 02, 0, time.Local), + AudioName: "rightnow.mp3", + Audio: []byte{0xFF, 0xF3, 0x14, 0xC4, 0x00, 0x00, 0x00, 0x03, 0x48, 0x01, 0x40, 0x00, 0x00, 0x4C, 0x41, 0x4D, 0x45, 0x33, 0x2E, 0x39, 0x36, 0x2E, 0x31, 0x55}, + AudioType: "audio/mpeg", + Duration: calls.CallDuration(24000000), + TalkgroupLabel: common.PtrTo("Some TG"), + }, + }, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + var serr error + var called bool + h := hand(func(w http.ResponseWriter, r *http.Request) { + called = true + assert.Equal(t, "/api/call-upload", r.URL.Path) + serr = r.ParseMultipartForm(1024 * 1024 * 2) + if serr != nil { + t.Log("parsemultipart", serr) + return + } + + cur := new(sources.CallUploadRequest) + serr = forms.Unmarshal(r, cur, forms.WithAcceptBlank()) + cur.DontStore = true + if serr != nil { + t.Log("unmarshal", serr) + return + } + + assert.Equal(t, tc.apiKey, cur.Key) + + toC, tcerr := cur.ToCall(tc.submitter) + require.NoError(t, tcerr) + assert.Equal(t, &tc.call, toC) + }) + svr := httptest.NewServer(h) + + cfg := config.Relay{ + URL: svr.URL, + APIKey: tc.apiKey, + } + + rs, err := sinks.NewRelaySink(cfg) + require.NoError(t, err) + err = rs.Call(context.Background(), &tc.call) + assert.True(t, called) + assert.NoError(t, err) + assert.NoError(t, serr) + }) + } +} diff --git a/pkg/sources/http.go b/pkg/sources/http.go index 32d1a28..7c30347 100644 --- a/pkg/sources/http.go +++ b/pkg/sources/http.go @@ -36,7 +36,7 @@ func (h *RdioHTTP) InstallPublicRoutes(r chi.Router) { r.Post("/api/call-upload", h.routeCallUpload) } -type callUploadRequest struct { +type CallUploadRequest struct { Audio []byte `form:"audio" filenameField:"AudioName"` AudioName string AudioType string `form:"audioType"` @@ -56,7 +56,7 @@ type callUploadRequest struct { DontStore bool `form:"dontStore"` } -func (car *callUploadRequest) mimeType() string { +func (car *CallUploadRequest) mimeType() string { // this is super naïve fn := car.AudioName switch { @@ -71,7 +71,7 @@ func (car *callUploadRequest) mimeType() string { return "" } -func (car *callUploadRequest) toCall(submitter auth.UserID) (*calls.Call, error) { +func (car *CallUploadRequest) ToCall(submitter auth.UserID) (*calls.Call, error) { return calls.Make(&calls.Call{ Submitter: &submitter, System: car.System, @@ -111,14 +111,14 @@ func (h *RdioHTTP) routeCallUpload(w http.ResponseWriter, r *http.Request) { return } - cur := new(callUploadRequest) + cur := new(CallUploadRequest) err = forms.Unmarshal(r, cur, forms.WithAcceptBlank()) if err != nil { http.Error(w, "cannot bind upload "+err.Error(), http.StatusExpectationFailed) return } - call, err := cur.toCall(*submitter) + call, err := cur.ToCall(*submitter) if err != nil { log.Error().Err(err).Msg("toCall failed") http.Error(w, err.Error(), http.StatusBadRequest)