diff --git a/pkg/gordio/auth/apikey.go b/pkg/gordio/auth/apikey.go index cf42066..019c920 100644 --- a/pkg/gordio/auth/apikey.go +++ b/pkg/gordio/auth/apikey.go @@ -11,12 +11,12 @@ import ( ) type apiKeyAuth interface { - // CheckAPIKey validates the provided key and returns the API key record. + // CheckAPIKey validates the provided key and returns the API owner's UserID. // An error is returned if validation fails for any reason. - CheckAPIKey(ctx context.Context, key string) (*database.ApiKey, error) + CheckAPIKey(ctx context.Context, key string) (*UserID, error) } -func (a *authenticator) CheckAPIKey(ctx context.Context, key string) (*database.ApiKey, error) { +func (a *authenticator) CheckAPIKey(ctx context.Context, key string) (*UserID, error) { keyUuid, err := uuid.Parse(key) if err != nil { log.Error().Str("apikey", key).Msg("cannot parse key") @@ -40,5 +40,7 @@ func (a *authenticator) CheckAPIKey(ctx context.Context, key string) (*database. return nil, ErrUnauthorized } - return &apik, nil + owner := UserID(apik.Owner) + + return &owner, nil } diff --git a/pkg/gordio/auth/auth.go b/pkg/gordio/auth/auth.go index c26db91..858d07f 100644 --- a/pkg/gordio/auth/auth.go +++ b/pkg/gordio/auth/auth.go @@ -7,6 +7,18 @@ import ( "github.com/go-chi/jwtauth/v5" ) +type UserID int + +func (u *UserID) Int32Ptr() *int32 { + if u == nil { + return nil + } + + i := int32(*u) + + return &i +} + // Authenticator performs API key and user JWT authentication. type Authenticator interface { jwtAuth diff --git a/pkg/gordio/calls/call.go b/pkg/gordio/calls/call.go new file mode 100644 index 0000000..4eb3ae2 --- /dev/null +++ b/pkg/gordio/calls/call.go @@ -0,0 +1,26 @@ +package calls + +import ( + "dynatron.me/x/stillbox/pkg/gordio/auth" + + "time" +) + +type Call struct { + Audio []byte + AudioName string + AudioType string + DateTime time.Time + Frequencies []int + Frequency int + Patches []int + Source int + Sources []int + System int + Submitter *auth.UserID + SystemLabel string + Talkgroup int + TalkgroupGroup *string + TalkgroupLabel *string + TalkgroupTag *string +} diff --git a/pkg/gordio/server/ingest.go b/pkg/gordio/server/ingest.go new file mode 100644 index 0000000..2c5a868 --- /dev/null +++ b/pkg/gordio/server/ingest.go @@ -0,0 +1,16 @@ +package server + +import ( + "context" + + "dynatron.me/x/stillbox/pkg/gordio/calls" + "dynatron.me/x/stillbox/pkg/gordio/database" +) + +func (s *Server) Ingest(ctx context.Context, call *calls.Call) { + // decouple this context from the one we call sinks with + db := database.FromCtx(ctx) + nctx := database.CtxWithDB(context.Background(), db) + + s.sinks.EmitCall(nctx, call) +} diff --git a/pkg/gordio/server/routes.go b/pkg/gordio/server/routes.go index db452bd..eef102f 100644 --- a/pkg/gordio/server/routes.go +++ b/pkg/gordio/server/routes.go @@ -26,7 +26,7 @@ func (s *Server) setupRoutes() { r.Use(render.SetContentType(render.ContentTypeJSON)) // public routes s.auth.InstallRoutes(r) - s.httpIngestor.InstallRoutes(r) + s.sources.InstallPublicRoutes(r) }) r.Group(func(r chi.Router) { diff --git a/pkg/gordio/server/server.go b/pkg/gordio/server/server.go index 75008bd..ae846dc 100644 --- a/pkg/gordio/server/server.go +++ b/pkg/gordio/server/server.go @@ -6,17 +6,19 @@ import ( "dynatron.me/x/stillbox/pkg/gordio/auth" "dynatron.me/x/stillbox/pkg/gordio/config" "dynatron.me/x/stillbox/pkg/gordio/database" - "dynatron.me/x/stillbox/pkg/gordio/ingestors" + "dynatron.me/x/stillbox/pkg/gordio/sinks" + "dynatron.me/x/stillbox/pkg/gordio/sources" "github.com/go-chi/chi/middleware" "github.com/go-chi/chi/v5" ) type Server struct { - auth auth.Authenticator - conf *config.Config - db *database.DB - r *chi.Mux - httpIngestor *ingestors.HTTPIngestor + auth auth.Authenticator + conf *config.Config + db *database.DB + r *chi.Mux + sources sources.Sources + sinks sinks.Sinks } func New(cfg *config.Config) (*Server, error) { @@ -28,12 +30,15 @@ func New(cfg *config.Config) (*Server, error) { r := chi.NewRouter() authenticator := auth.NewAuthenticator(cfg.JWTSecret, cfg.Domain) srv := &Server{ - auth: authenticator, - conf: cfg, - db: db, - r: r, - httpIngestor: ingestors.NewHTTPIngestor(authenticator), + auth: authenticator, + conf: cfg, + db: db, + r: r, } + + srv.sinks.Register("database", sinks.NewDatabaseSink(db)) + srv.sources.Register("rdio-http", sources.NewRdioHTTP(authenticator, srv)) + r.Use(middleware.RequestID) r.Use(middleware.RealIP) r.Use(middleware.Logger) diff --git a/pkg/gordio/sinks/database.go b/pkg/gordio/sinks/database.go new file mode 100644 index 0000000..a617024 --- /dev/null +++ b/pkg/gordio/sinks/database.go @@ -0,0 +1,59 @@ +package sinks + +import ( + "context" + "fmt" + + "dynatron.me/x/stillbox/internal/common" + "dynatron.me/x/stillbox/pkg/gordio/calls" + "dynatron.me/x/stillbox/pkg/gordio/database" + + "github.com/rs/zerolog/log" +) + +type DatabaseSink struct { + db *database.DB +} + +func NewDatabaseSink(db *database.DB) *DatabaseSink { + return &DatabaseSink{ + db: db, + } +} + +func (s *DatabaseSink) Call(ctx context.Context, call *calls.Call) error { + db := database.FromCtx(ctx) + + dbCall, err := db.AddCall(ctx, s.toAddCallParams(call)) + if err != nil { + return fmt.Errorf("add call: %w", err) + } + + log.Debug().Str("id", dbCall.String()).Int("system", call.System).Int("tgid", call.Talkgroup).Msg("stored") + + return nil +} + +func (s *DatabaseSink) SinkType() string { + return "database" +} + +func (s *DatabaseSink) toAddCallParams(call *calls.Call) database.AddCallParams { + return database.AddCallParams{ + Submitter: call.Submitter.Int32Ptr(), + System: call.System, + Talkgroup: call.Talkgroup, + CallDate: call.DateTime, + AudioName: common.PtrOrNull(call.AudioName), + AudioBlob: call.Audio, + AudioType: common.PtrOrNull(call.AudioType), + Frequency: call.Frequency, + Frequencies: call.Frequencies, + Patches: call.Patches, + TgLabel: call.TalkgroupLabel, + TgTag: call.TalkgroupTag, + TgGroup: call.TalkgroupGroup, + Source: call.Source, + } + +} diff --git a/pkg/gordio/sinks/sinks.go b/pkg/gordio/sinks/sinks.go new file mode 100644 index 0000000..fb36536 --- /dev/null +++ b/pkg/gordio/sinks/sinks.go @@ -0,0 +1,40 @@ +package sinks + +import ( + "context" + "dynatron.me/x/stillbox/pkg/gordio/calls" + + "github.com/rs/zerolog/log" +) + +type Sink interface { + Call(context.Context, *calls.Call) error + SinkType() string +} + +type sinkInstance struct { + Sink + Name string +} + +type Sinks []sinkInstance + +func (s *Sinks) Register(name string, toAdd Sink) { + *s = append(*s, sinkInstance{ + Name: name, + Sink: toAdd, + }) +} + +func (s *Sinks) EmitCall(ctx context.Context, call *calls.Call) { + for _, sink := range *s { + go sink.emitCallLogErr(ctx, call) + } +} + +func (sink *sinkInstance) emitCallLogErr(ctx context.Context, call *calls.Call) { + err := sink.Call(ctx, call) + if err != nil { + log.Error().Str("sink", sink.Name).Err(err).Msg("call emit to sink failed") + } +} diff --git a/pkg/gordio/ingestors/http.go b/pkg/gordio/sources/http.go similarity index 65% rename from pkg/gordio/ingestors/http.go rename to pkg/gordio/sources/http.go index d52a7bc..26717dd 100644 --- a/pkg/gordio/ingestors/http.go +++ b/pkg/gordio/sources/http.go @@ -1,4 +1,4 @@ -package ingestors +package sources import ( "fmt" @@ -11,25 +11,31 @@ import ( "dynatron.me/x/stillbox/internal/common" "dynatron.me/x/stillbox/pkg/gordio/auth" - "dynatron.me/x/stillbox/pkg/gordio/database" + "dynatron.me/x/stillbox/pkg/gordio/calls" "github.com/go-chi/chi/v5" "github.com/rs/zerolog/log" ) -// HTTPIngestor is an ingestor that accepts calls over HTTP. -type HTTPIngestor struct { +// RdioHTTP is an source that accepts calls using the rdio-scanner HTTP interface. +type RdioHTTP struct { auth auth.Authenticator + ing Ingestor +} + +func (r *RdioHTTP) SourceType() string { + return "rdio-http" } // NewHTTPIngestor creates a new HTTPIngestor. It requires an Authenticator. -func NewHTTPIngestor(auth auth.Authenticator) *HTTPIngestor { - return &HTTPIngestor{ +func NewRdioHTTP(auth auth.Authenticator, ing Ingestor) *RdioHTTP { + return &RdioHTTP{ auth: auth, + ing: ing, } } -// InstallRoutes installs the HTTP ingestor's routes to the provided chi Router. -func (h *HTTPIngestor) InstallRoutes(r chi.Router) { +// InstallPublicRoutes installs the HTTP source's routes to the provided chi Router. +func (h *RdioHTTP) InstallPublicRoutes(r chi.Router) { r.Post("/api/call-upload", h.routeCallUpload) } @@ -52,26 +58,41 @@ type callUploadRequest struct { TalkgroupTag string `form:"talkgroupTag"` } -func (car *callUploadRequest) toAddCallParams(submitter int) database.AddCallParams { - return database.AddCallParams{ - Submitter: common.PtrTo(int32(submitter)), - System: car.System, - Talkgroup: car.Talkgroup, - CallDate: car.DateTime, - AudioName: common.PtrOrNull(car.AudioName), - AudioBlob: car.Audio, - AudioType: common.PtrOrNull(car.AudioType), - Frequency: car.Frequency, - Frequencies: car.Frequencies, - Patches: car.Patches, - TgLabel: common.PtrOrNull(car.TalkgroupLabel), - TgTag: common.PtrOrNull(car.TalkgroupTag), - TgGroup: common.PtrOrNull(car.TalkgroupGroup), - Source: car.Source, +func (car *callUploadRequest) mimeType() string { + // this is super naïve + fn := car.AudioName + switch { + case car.AudioType != "": + return car.AudioType + case strings.HasSuffix(fn, ".mp3"): + return "audio/mpeg" + case strings.HasSuffix(fn, ".wav"): + return "audio/wav" + } + + return "" +} + +func (car *callUploadRequest) toCall(submitter auth.UserID) *calls.Call { + return &calls.Call{ + Submitter: &submitter, + System: car.System, + Talkgroup: car.Talkgroup, + DateTime: car.DateTime, + AudioName: car.AudioName, + Audio: car.Audio, + AudioType: car.mimeType(), + Frequency: car.Frequency, + Frequencies: car.Frequencies, + Patches: car.Patches, + TalkgroupLabel: common.PtrOrNull(car.TalkgroupLabel), + TalkgroupTag: common.PtrOrNull(car.TalkgroupTag), + TalkgroupGroup: common.PtrOrNull(car.TalkgroupGroup), + Source: car.Source, } } -func (h *HTTPIngestor) routeCallUpload(w http.ResponseWriter, r *http.Request) { +func (h *RdioHTTP) routeCallUpload(w http.ResponseWriter, r *http.Request) { err := r.ParseMultipartForm(1024 * 1024 * 2) // 2MB if err != nil { http.Error(w, "cannot parse form "+err.Error(), http.StatusBadRequest) @@ -80,35 +101,28 @@ func (h *HTTPIngestor) routeCallUpload(w http.ResponseWriter, r *http.Request) { ctx := r.Context() - apik, err := h.auth.CheckAPIKey(ctx, r.Form.Get("key")) + submitter, err := h.auth.CheckAPIKey(ctx, r.Form.Get("key")) if err != nil { auth.ErrorResponse(w, err) return } - db := database.FromCtx(ctx) - if strings.Trim(r.Form.Get("test"), "\r\n") == "1" { // fudge the official response http.Error(w, "incomplete call data: no talkgroup", http.StatusExpectationFailed) return } - call := new(callUploadRequest) - err = call.fill(r) + cur := new(callUploadRequest) + err = cur.fill(r) if err != nil { http.Error(w, "cannot bind upload "+err.Error(), http.StatusExpectationFailed) return } - dbCall, err := db.AddCall(ctx, call.toAddCallParams(apik.Owner)) - if err != nil { - http.Error(w, "internal error", http.StatusInternalServerError) - log.Error().Err(err).Msg("add call") - return - } + h.ing.Ingest(ctx, cur.toCall(*submitter)) - log.Info().Str("id", dbCall.String()).Int("system", call.System).Int("tgid", call.Talkgroup).Msg("ingested") + log.Info().Int("system", cur.System).Int("tgid", cur.Talkgroup).Msg("ingested") w.Write([]byte("Call imported successfully.")) } diff --git a/pkg/gordio/sources/source.go b/pkg/gordio/sources/source.go new file mode 100644 index 0000000..08362b0 --- /dev/null +++ b/pkg/gordio/sources/source.go @@ -0,0 +1,42 @@ +package sources + +import ( + "context" + "dynatron.me/x/stillbox/pkg/gordio/calls" + + "github.com/go-chi/chi/v5" +) + +type Source interface { + SourceType() string +} + +type sourceInstance struct { + Source + Name string +} + +type Sources []sourceInstance + +func (s *Sources) Register(name string, src Source) { + *s = append(*s, sourceInstance{ + Name: name, + Source: src, + }) +} + +func (s *Sources) InstallPublicRoutes(r chi.Router) { + for _, si := range *s { + if rs, ok := si.Source.(PublicRouteSource); ok { + rs.InstallPublicRoutes(r) + } + } +} + +type Ingestor interface { + Ingest(context.Context, *calls.Call) +} + +type PublicRouteSource interface { + InstallPublicRoutes(chi.Router) +}