sources/sinks

This commit is contained in:
Daniel 2024-08-01 01:01:08 -04:00
parent b4e01c80c5
commit f101380d87
10 changed files with 269 additions and 53 deletions

View file

@ -11,12 +11,12 @@ import (
)
type apiKeyAuth interface {
// CheckAPIKey validates the provided key and returns the API key record.
// CheckAPIKey validates the provided key and returns the API owner's UserID.
// An error is returned if validation fails for any reason.
CheckAPIKey(ctx context.Context, key string) (*database.ApiKey, error)
CheckAPIKey(ctx context.Context, key string) (*UserID, error)
}
func (a *authenticator) CheckAPIKey(ctx context.Context, key string) (*database.ApiKey, error) {
func (a *authenticator) CheckAPIKey(ctx context.Context, key string) (*UserID, error) {
keyUuid, err := uuid.Parse(key)
if err != nil {
log.Error().Str("apikey", key).Msg("cannot parse key")
@ -40,5 +40,7 @@ func (a *authenticator) CheckAPIKey(ctx context.Context, key string) (*database.
return nil, ErrUnauthorized
}
return &apik, nil
owner := UserID(apik.Owner)
return &owner, nil
}

View file

@ -7,6 +7,18 @@ import (
"github.com/go-chi/jwtauth/v5"
)
type UserID int
func (u *UserID) Int32Ptr() *int32 {
if u == nil {
return nil
}
i := int32(*u)
return &i
}
// Authenticator performs API key and user JWT authentication.
type Authenticator interface {
jwtAuth

26
pkg/gordio/calls/call.go Normal file
View file

@ -0,0 +1,26 @@
package calls
import (
"dynatron.me/x/stillbox/pkg/gordio/auth"
"time"
)
type Call struct {
Audio []byte
AudioName string
AudioType string
DateTime time.Time
Frequencies []int
Frequency int
Patches []int
Source int
Sources []int
System int
Submitter *auth.UserID
SystemLabel string
Talkgroup int
TalkgroupGroup *string
TalkgroupLabel *string
TalkgroupTag *string
}

View file

@ -0,0 +1,16 @@
package server
import (
"context"
"dynatron.me/x/stillbox/pkg/gordio/calls"
"dynatron.me/x/stillbox/pkg/gordio/database"
)
func (s *Server) Ingest(ctx context.Context, call *calls.Call) {
// decouple this context from the one we call sinks with
db := database.FromCtx(ctx)
nctx := database.CtxWithDB(context.Background(), db)
s.sinks.EmitCall(nctx, call)
}

View file

@ -26,7 +26,7 @@ func (s *Server) setupRoutes() {
r.Use(render.SetContentType(render.ContentTypeJSON))
// public routes
s.auth.InstallRoutes(r)
s.httpIngestor.InstallRoutes(r)
s.sources.InstallPublicRoutes(r)
})
r.Group(func(r chi.Router) {

View file

@ -6,7 +6,8 @@ import (
"dynatron.me/x/stillbox/pkg/gordio/auth"
"dynatron.me/x/stillbox/pkg/gordio/config"
"dynatron.me/x/stillbox/pkg/gordio/database"
"dynatron.me/x/stillbox/pkg/gordio/ingestors"
"dynatron.me/x/stillbox/pkg/gordio/sinks"
"dynatron.me/x/stillbox/pkg/gordio/sources"
"github.com/go-chi/chi/middleware"
"github.com/go-chi/chi/v5"
)
@ -16,7 +17,8 @@ type Server struct {
conf *config.Config
db *database.DB
r *chi.Mux
httpIngestor *ingestors.HTTPIngestor
sources sources.Sources
sinks sinks.Sinks
}
func New(cfg *config.Config) (*Server, error) {
@ -32,8 +34,11 @@ func New(cfg *config.Config) (*Server, error) {
conf: cfg,
db: db,
r: r,
httpIngestor: ingestors.NewHTTPIngestor(authenticator),
}
srv.sinks.Register("database", sinks.NewDatabaseSink(db))
srv.sources.Register("rdio-http", sources.NewRdioHTTP(authenticator, srv))
r.Use(middleware.RequestID)
r.Use(middleware.RealIP)
r.Use(middleware.Logger)

View file

@ -0,0 +1,59 @@
package sinks
import (
"context"
"fmt"
"dynatron.me/x/stillbox/internal/common"
"dynatron.me/x/stillbox/pkg/gordio/calls"
"dynatron.me/x/stillbox/pkg/gordio/database"
"github.com/rs/zerolog/log"
)
type DatabaseSink struct {
db *database.DB
}
func NewDatabaseSink(db *database.DB) *DatabaseSink {
return &DatabaseSink{
db: db,
}
}
func (s *DatabaseSink) Call(ctx context.Context, call *calls.Call) error {
db := database.FromCtx(ctx)
dbCall, err := db.AddCall(ctx, s.toAddCallParams(call))
if err != nil {
return fmt.Errorf("add call: %w", err)
}
log.Debug().Str("id", dbCall.String()).Int("system", call.System).Int("tgid", call.Talkgroup).Msg("stored")
return nil
}
func (s *DatabaseSink) SinkType() string {
return "database"
}
func (s *DatabaseSink) toAddCallParams(call *calls.Call) database.AddCallParams {
return database.AddCallParams{
Submitter: call.Submitter.Int32Ptr(),
System: call.System,
Talkgroup: call.Talkgroup,
CallDate: call.DateTime,
AudioName: common.PtrOrNull(call.AudioName),
AudioBlob: call.Audio,
AudioType: common.PtrOrNull(call.AudioType),
Frequency: call.Frequency,
Frequencies: call.Frequencies,
Patches: call.Patches,
TgLabel: call.TalkgroupLabel,
TgTag: call.TalkgroupTag,
TgGroup: call.TalkgroupGroup,
Source: call.Source,
}
}

40
pkg/gordio/sinks/sinks.go Normal file
View file

@ -0,0 +1,40 @@
package sinks
import (
"context"
"dynatron.me/x/stillbox/pkg/gordio/calls"
"github.com/rs/zerolog/log"
)
type Sink interface {
Call(context.Context, *calls.Call) error
SinkType() string
}
type sinkInstance struct {
Sink
Name string
}
type Sinks []sinkInstance
func (s *Sinks) Register(name string, toAdd Sink) {
*s = append(*s, sinkInstance{
Name: name,
Sink: toAdd,
})
}
func (s *Sinks) EmitCall(ctx context.Context, call *calls.Call) {
for _, sink := range *s {
go sink.emitCallLogErr(ctx, call)
}
}
func (sink *sinkInstance) emitCallLogErr(ctx context.Context, call *calls.Call) {
err := sink.Call(ctx, call)
if err != nil {
log.Error().Str("sink", sink.Name).Err(err).Msg("call emit to sink failed")
}
}

View file

@ -1,4 +1,4 @@
package ingestors
package sources
import (
"fmt"
@ -11,25 +11,31 @@ import (
"dynatron.me/x/stillbox/internal/common"
"dynatron.me/x/stillbox/pkg/gordio/auth"
"dynatron.me/x/stillbox/pkg/gordio/database"
"dynatron.me/x/stillbox/pkg/gordio/calls"
"github.com/go-chi/chi/v5"
"github.com/rs/zerolog/log"
)
// HTTPIngestor is an ingestor that accepts calls over HTTP.
type HTTPIngestor struct {
// RdioHTTP is an source that accepts calls using the rdio-scanner HTTP interface.
type RdioHTTP struct {
auth auth.Authenticator
ing Ingestor
}
func (r *RdioHTTP) SourceType() string {
return "rdio-http"
}
// NewHTTPIngestor creates a new HTTPIngestor. It requires an Authenticator.
func NewHTTPIngestor(auth auth.Authenticator) *HTTPIngestor {
return &HTTPIngestor{
func NewRdioHTTP(auth auth.Authenticator, ing Ingestor) *RdioHTTP {
return &RdioHTTP{
auth: auth,
ing: ing,
}
}
// InstallRoutes installs the HTTP ingestor's routes to the provided chi Router.
func (h *HTTPIngestor) InstallRoutes(r chi.Router) {
// InstallPublicRoutes installs the HTTP source's routes to the provided chi Router.
func (h *RdioHTTP) InstallPublicRoutes(r chi.Router) {
r.Post("/api/call-upload", h.routeCallUpload)
}
@ -52,26 +58,41 @@ type callUploadRequest struct {
TalkgroupTag string `form:"talkgroupTag"`
}
func (car *callUploadRequest) toAddCallParams(submitter int) database.AddCallParams {
return database.AddCallParams{
Submitter: common.PtrTo(int32(submitter)),
func (car *callUploadRequest) mimeType() string {
// this is super naïve
fn := car.AudioName
switch {
case car.AudioType != "":
return car.AudioType
case strings.HasSuffix(fn, ".mp3"):
return "audio/mpeg"
case strings.HasSuffix(fn, ".wav"):
return "audio/wav"
}
return ""
}
func (car *callUploadRequest) toCall(submitter auth.UserID) *calls.Call {
return &calls.Call{
Submitter: &submitter,
System: car.System,
Talkgroup: car.Talkgroup,
CallDate: car.DateTime,
AudioName: common.PtrOrNull(car.AudioName),
AudioBlob: car.Audio,
AudioType: common.PtrOrNull(car.AudioType),
DateTime: car.DateTime,
AudioName: car.AudioName,
Audio: car.Audio,
AudioType: car.mimeType(),
Frequency: car.Frequency,
Frequencies: car.Frequencies,
Patches: car.Patches,
TgLabel: common.PtrOrNull(car.TalkgroupLabel),
TgTag: common.PtrOrNull(car.TalkgroupTag),
TgGroup: common.PtrOrNull(car.TalkgroupGroup),
TalkgroupLabel: common.PtrOrNull(car.TalkgroupLabel),
TalkgroupTag: common.PtrOrNull(car.TalkgroupTag),
TalkgroupGroup: common.PtrOrNull(car.TalkgroupGroup),
Source: car.Source,
}
}
func (h *HTTPIngestor) routeCallUpload(w http.ResponseWriter, r *http.Request) {
func (h *RdioHTTP) routeCallUpload(w http.ResponseWriter, r *http.Request) {
err := r.ParseMultipartForm(1024 * 1024 * 2) // 2MB
if err != nil {
http.Error(w, "cannot parse form "+err.Error(), http.StatusBadRequest)
@ -80,35 +101,28 @@ func (h *HTTPIngestor) routeCallUpload(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
apik, err := h.auth.CheckAPIKey(ctx, r.Form.Get("key"))
submitter, err := h.auth.CheckAPIKey(ctx, r.Form.Get("key"))
if err != nil {
auth.ErrorResponse(w, err)
return
}
db := database.FromCtx(ctx)
if strings.Trim(r.Form.Get("test"), "\r\n") == "1" {
// fudge the official response
http.Error(w, "incomplete call data: no talkgroup", http.StatusExpectationFailed)
return
}
call := new(callUploadRequest)
err = call.fill(r)
cur := new(callUploadRequest)
err = cur.fill(r)
if err != nil {
http.Error(w, "cannot bind upload "+err.Error(), http.StatusExpectationFailed)
return
}
dbCall, err := db.AddCall(ctx, call.toAddCallParams(apik.Owner))
if err != nil {
http.Error(w, "internal error", http.StatusInternalServerError)
log.Error().Err(err).Msg("add call")
return
}
h.ing.Ingest(ctx, cur.toCall(*submitter))
log.Info().Str("id", dbCall.String()).Int("system", call.System).Int("tgid", call.Talkgroup).Msg("ingested")
log.Info().Int("system", cur.System).Int("tgid", cur.Talkgroup).Msg("ingested")
w.Write([]byte("Call imported successfully."))
}

View file

@ -0,0 +1,42 @@
package sources
import (
"context"
"dynatron.me/x/stillbox/pkg/gordio/calls"
"github.com/go-chi/chi/v5"
)
type Source interface {
SourceType() string
}
type sourceInstance struct {
Source
Name string
}
type Sources []sourceInstance
func (s *Sources) Register(name string, src Source) {
*s = append(*s, sourceInstance{
Name: name,
Source: src,
})
}
func (s *Sources) InstallPublicRoutes(r chi.Router) {
for _, si := range *s {
if rs, ok := si.Source.(PublicRouteSource); ok {
rs.InstallPublicRoutes(r)
}
}
}
type Ingestor interface {
Ingest(context.Context, *calls.Call)
}
type PublicRouteSource interface {
InstallPublicRoutes(chi.Router)
}