Add relay
This commit is contained in:
parent
a318242de2
commit
ec33e568d5
5 changed files with 216 additions and 35 deletions
|
@ -33,26 +33,26 @@ func (d CallDuration) Seconds() int32 {
|
||||||
}
|
}
|
||||||
|
|
||||||
type Call struct {
|
type Call struct {
|
||||||
ID uuid.UUID
|
ID uuid.UUID `form:"-"`
|
||||||
Audio []byte
|
Audio []byte `form:"audio" filenameField:"AudioName"`
|
||||||
AudioName string
|
AudioName string `form:"audioName"`
|
||||||
AudioType string
|
AudioType string `form:"audioType"`
|
||||||
Duration CallDuration
|
Duration CallDuration `form:"-"`
|
||||||
DateTime time.Time
|
DateTime time.Time `form:"dateTime"`
|
||||||
Frequencies []int
|
Frequencies []int `form:"frequencies"`
|
||||||
Frequency int
|
Frequency int `form:"frequency"`
|
||||||
Patches []int
|
Patches []int `form:"patches"`
|
||||||
Source int
|
Source int `form:"source"`
|
||||||
Sources []int
|
Sources []int `form:"sources"`
|
||||||
System int
|
System int `form:"system"`
|
||||||
Submitter *auth.UserID
|
Submitter *auth.UserID `form:"-"`
|
||||||
SystemLabel string
|
SystemLabel string `form:"systemLabel"`
|
||||||
Talkgroup int
|
Talkgroup int `form:"talkgroup"`
|
||||||
TalkgroupGroup *string
|
TalkgroupGroup *string `form:"talkgroupGroup"`
|
||||||
TalkgroupLabel *string
|
TalkgroupLabel *string `form:"talkgroupLabel"`
|
||||||
TGAlphaTag *string
|
TGAlphaTag *string `form:"talkgroupTag"` // not 1:1
|
||||||
|
|
||||||
shouldStore bool
|
shouldStore bool `form:"-"`
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *Call) String() string {
|
func (c *Call) String() string {
|
||||||
|
|
|
@ -22,6 +22,7 @@ type Config struct {
|
||||||
Public bool `yaml:"public"`
|
Public bool `yaml:"public"`
|
||||||
RateLimit RateLimit `yaml:"rateLimit"`
|
RateLimit RateLimit `yaml:"rateLimit"`
|
||||||
Notify Notify `yaml:"notify"`
|
Notify Notify `yaml:"notify"`
|
||||||
|
Relay []Relay `yaml:"relay"`
|
||||||
|
|
||||||
configPath string
|
configPath string
|
||||||
}
|
}
|
||||||
|
@ -63,6 +64,12 @@ type Alerting struct {
|
||||||
Renotify *jsontypes.Duration `yaml:"renotify,omitempty" form:"renotify,omitempty"`
|
Renotify *jsontypes.Duration `yaml:"renotify,omitempty" form:"renotify,omitempty"`
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type Relay struct {
|
||||||
|
URL string `yaml:"url"`
|
||||||
|
APIKey string `yaml:"apiKey"`
|
||||||
|
Required bool `yaml:"required"`
|
||||||
|
}
|
||||||
|
|
||||||
type Notify []NotifyService
|
type Notify []NotifyService
|
||||||
|
|
||||||
type NotifyService struct {
|
type NotifyService struct {
|
||||||
|
@ -72,17 +79,6 @@ type NotifyService struct {
|
||||||
Config map[string]interface{} `yaml:"config" json:"config"`
|
Config map[string]interface{} `yaml:"config" json:"config"`
|
||||||
}
|
}
|
||||||
|
|
||||||
func (n *NotifyService) GetS(k, defaultVal string) string {
|
|
||||||
if v, has := n.Config[k]; has {
|
|
||||||
if v, isString := v.(string); isString {
|
|
||||||
return v
|
|
||||||
}
|
|
||||||
log.Error().Str("configKey", k).Str("provider", n.Provider).Str("default", defaultVal).Msg("notify config value is not a string! using default")
|
|
||||||
}
|
|
||||||
|
|
||||||
return defaultVal
|
|
||||||
}
|
|
||||||
|
|
||||||
func (rl *RateLimit) Verify() bool {
|
func (rl *RateLimit) Verify() bool {
|
||||||
if rl.Enable {
|
if rl.Enable {
|
||||||
if rl.Requests > 0 && rl.Over > 0 {
|
if rl.Requests > 0 && rl.Over > 0 {
|
||||||
|
|
85
pkg/sinks/relay.go
Normal file
85
pkg/sinks/relay.go
Normal file
|
@ -0,0 +1,85 @@
|
||||||
|
package sinks
|
||||||
|
|
||||||
|
import (
|
||||||
|
"bytes"
|
||||||
|
"context"
|
||||||
|
"fmt"
|
||||||
|
"mime/multipart"
|
||||||
|
"net/http"
|
||||||
|
"net/url"
|
||||||
|
|
||||||
|
"dynatron.me/x/stillbox/internal/forms"
|
||||||
|
"dynatron.me/x/stillbox/pkg/calls"
|
||||||
|
"dynatron.me/x/stillbox/pkg/config"
|
||||||
|
)
|
||||||
|
|
||||||
|
type RelaySink struct {
|
||||||
|
config.Relay
|
||||||
|
|
||||||
|
url *url.URL
|
||||||
|
}
|
||||||
|
|
||||||
|
func MakeRelaySinks(s *Sinks, cfgs []config.Relay) error {
|
||||||
|
for i, cfg := range cfgs {
|
||||||
|
rs, err := NewRelaySink(cfg)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
sinkName := fmt.Sprintf("relay%d:%s", i, rs.url.Host)
|
||||||
|
s.Register(sinkName, rs, cfg.Required)
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewRelaySink(cfg config.Relay) (*RelaySink, error) {
|
||||||
|
u, err := url.Parse(cfg.URL)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
if u.Path != "" && u.Path != "/" {
|
||||||
|
return nil, fmt.Errorf("relay path in %s must be instance root", cfg.URL)
|
||||||
|
}
|
||||||
|
|
||||||
|
u = u.JoinPath("/api/call-upload")
|
||||||
|
|
||||||
|
return &RelaySink{
|
||||||
|
Relay: cfg,
|
||||||
|
url: u,
|
||||||
|
}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *RelaySink) Call(ctx context.Context, call *calls.Call) error {
|
||||||
|
var buf bytes.Buffer
|
||||||
|
body := multipart.NewWriter(&buf)
|
||||||
|
|
||||||
|
err := forms.Marshal(call, body)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("relay form parse: %w", err)
|
||||||
|
}
|
||||||
|
body.Close()
|
||||||
|
|
||||||
|
r, err := http.NewRequestWithContext(ctx, http.MethodPost, s.url.String(), &buf)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("relay newrequest: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
r.Header.Set("Content-Type", body.FormDataContentType())
|
||||||
|
|
||||||
|
resp, err := http.DefaultClient.Do(r)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("relay: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if resp.StatusCode != http.StatusOK {
|
||||||
|
return fmt.Errorf("relay: received HTTP %d", resp.StatusCode)
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *RelaySink) SinkType() string {
|
||||||
|
return "relay"
|
||||||
|
}
|
100
pkg/sinks/relay_test.go
Normal file
100
pkg/sinks/relay_test.go
Normal file
|
@ -0,0 +1,100 @@
|
||||||
|
package sinks_test
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"math/rand"
|
||||||
|
"net/http"
|
||||||
|
"net/http/httptest"
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/stretchr/testify/assert"
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
|
|
||||||
|
"dynatron.me/x/stillbox/internal/common"
|
||||||
|
"dynatron.me/x/stillbox/internal/forms"
|
||||||
|
"dynatron.me/x/stillbox/pkg/auth"
|
||||||
|
"dynatron.me/x/stillbox/pkg/calls"
|
||||||
|
"dynatron.me/x/stillbox/pkg/config"
|
||||||
|
"dynatron.me/x/stillbox/pkg/sinks"
|
||||||
|
"dynatron.me/x/stillbox/pkg/sources"
|
||||||
|
|
||||||
|
"github.com/google/uuid"
|
||||||
|
)
|
||||||
|
|
||||||
|
type hand func(w http.ResponseWriter, r *http.Request)
|
||||||
|
|
||||||
|
func (h hand) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
||||||
|
h(w, r)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestRelay(t *testing.T) {
|
||||||
|
uuid.SetRand(rand.New(rand.NewSource(1)))
|
||||||
|
|
||||||
|
tests := []struct {
|
||||||
|
name string
|
||||||
|
submitter auth.UserID
|
||||||
|
apiKey string
|
||||||
|
call calls.Call
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
name: "base",
|
||||||
|
submitter: auth.UserID(1),
|
||||||
|
call: calls.Call{
|
||||||
|
ID: uuid.UUID([16]byte{0x52, 0xfd, 0xfc, 0x07, 0x21, 0x82, 0x45, 0x4f, 0x96, 0x3f, 0x5f, 0x0f, 0x9a, 0x62, 0x1d, 0x72}),
|
||||||
|
Submitter: common.PtrTo(auth.UserID(1)),
|
||||||
|
System: 197,
|
||||||
|
Talkgroup: 10101,
|
||||||
|
DateTime: time.Date(2024, 11, 10, 23, 33, 02, 0, time.Local),
|
||||||
|
AudioName: "rightnow.mp3",
|
||||||
|
Audio: []byte{0xFF, 0xF3, 0x14, 0xC4, 0x00, 0x00, 0x00, 0x03, 0x48, 0x01, 0x40, 0x00, 0x00, 0x4C, 0x41, 0x4D, 0x45, 0x33, 0x2E, 0x39, 0x36, 0x2E, 0x31, 0x55},
|
||||||
|
AudioType: "audio/mpeg",
|
||||||
|
Duration: calls.CallDuration(24000000),
|
||||||
|
TalkgroupLabel: common.PtrTo("Some TG"),
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, tc := range tests {
|
||||||
|
t.Run(tc.name, func(t *testing.T) {
|
||||||
|
var serr error
|
||||||
|
var called bool
|
||||||
|
h := hand(func(w http.ResponseWriter, r *http.Request) {
|
||||||
|
called = true
|
||||||
|
assert.Equal(t, "/api/call-upload", r.URL.Path)
|
||||||
|
serr = r.ParseMultipartForm(1024 * 1024 * 2)
|
||||||
|
if serr != nil {
|
||||||
|
t.Log("parsemultipart", serr)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
cur := new(sources.CallUploadRequest)
|
||||||
|
serr = forms.Unmarshal(r, cur, forms.WithAcceptBlank())
|
||||||
|
cur.DontStore = true
|
||||||
|
if serr != nil {
|
||||||
|
t.Log("unmarshal", serr)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
assert.Equal(t, tc.apiKey, cur.Key)
|
||||||
|
|
||||||
|
toC, tcerr := cur.ToCall(tc.submitter)
|
||||||
|
require.NoError(t, tcerr)
|
||||||
|
assert.Equal(t, &tc.call, toC)
|
||||||
|
})
|
||||||
|
svr := httptest.NewServer(h)
|
||||||
|
|
||||||
|
cfg := config.Relay{
|
||||||
|
URL: svr.URL,
|
||||||
|
APIKey: tc.apiKey,
|
||||||
|
}
|
||||||
|
|
||||||
|
rs, err := sinks.NewRelaySink(cfg)
|
||||||
|
require.NoError(t, err)
|
||||||
|
err = rs.Call(context.Background(), &tc.call)
|
||||||
|
assert.True(t, called)
|
||||||
|
assert.NoError(t, err)
|
||||||
|
assert.NoError(t, serr)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
|
@ -36,7 +36,7 @@ func (h *RdioHTTP) InstallPublicRoutes(r chi.Router) {
|
||||||
r.Post("/api/call-upload", h.routeCallUpload)
|
r.Post("/api/call-upload", h.routeCallUpload)
|
||||||
}
|
}
|
||||||
|
|
||||||
type callUploadRequest struct {
|
type CallUploadRequest struct {
|
||||||
Audio []byte `form:"audio" filenameField:"AudioName"`
|
Audio []byte `form:"audio" filenameField:"AudioName"`
|
||||||
AudioName string
|
AudioName string
|
||||||
AudioType string `form:"audioType"`
|
AudioType string `form:"audioType"`
|
||||||
|
@ -56,7 +56,7 @@ type callUploadRequest struct {
|
||||||
DontStore bool `form:"dontStore"`
|
DontStore bool `form:"dontStore"`
|
||||||
}
|
}
|
||||||
|
|
||||||
func (car *callUploadRequest) mimeType() string {
|
func (car *CallUploadRequest) mimeType() string {
|
||||||
// this is super naïve
|
// this is super naïve
|
||||||
fn := car.AudioName
|
fn := car.AudioName
|
||||||
switch {
|
switch {
|
||||||
|
@ -71,7 +71,7 @@ func (car *callUploadRequest) mimeType() string {
|
||||||
return ""
|
return ""
|
||||||
}
|
}
|
||||||
|
|
||||||
func (car *callUploadRequest) toCall(submitter auth.UserID) (*calls.Call, error) {
|
func (car *CallUploadRequest) ToCall(submitter auth.UserID) (*calls.Call, error) {
|
||||||
return calls.Make(&calls.Call{
|
return calls.Make(&calls.Call{
|
||||||
Submitter: &submitter,
|
Submitter: &submitter,
|
||||||
System: car.System,
|
System: car.System,
|
||||||
|
@ -111,14 +111,14 @@ func (h *RdioHTTP) routeCallUpload(w http.ResponseWriter, r *http.Request) {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
cur := new(callUploadRequest)
|
cur := new(CallUploadRequest)
|
||||||
err = forms.Unmarshal(r, cur, forms.WithAcceptBlank())
|
err = forms.Unmarshal(r, cur, forms.WithAcceptBlank())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
http.Error(w, "cannot bind upload "+err.Error(), http.StatusExpectationFailed)
|
http.Error(w, "cannot bind upload "+err.Error(), http.StatusExpectationFailed)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
call, err := cur.toCall(*submitter)
|
call, err := cur.ToCall(*submitter)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Error().Err(err).Msg("toCall failed")
|
log.Error().Err(err).Msg("toCall failed")
|
||||||
http.Error(w, err.Error(), http.StatusBadRequest)
|
http.Error(w, err.Error(), http.StatusBadRequest)
|
||||||
|
|
Loading…
Reference in a new issue