Storage part

This commit is contained in:
Daniel 2024-11-20 19:15:26 -05:00
parent bba458fe93
commit 5d9a08780f
8 changed files with 215 additions and 5 deletions

View file

@ -40,6 +40,23 @@ type jwtAuth interface {
type claims map[string]interface{} type claims map[string]interface{}
func UIDFrom(ctx context.Context) *int32 {
tok, _, err := jwtauth.FromContext(ctx)
if err != nil {
return nil
}
uidStr := tok.Subject()
uidInt, err := strconv.Atoi(uidStr)
if err != nil {
return nil
}
uid := int32(uidInt)
return &uid
}
func (a *Auth) Authenticated(r *http.Request) (claims, bool) { func (a *Auth) Authenticated(r *http.Request) (claims, bool) {
// TODO: check IP against ACL, or conf.Public, and against map of routes // TODO: check IP against ACL, or conf.Public, and against map of routes
tok, cl, err := jwtauth.FromContext(r.Context()) tok, cl, err := jwtauth.FromContext(r.Context())

View file

@ -18,6 +18,83 @@ var (
ErrBatchAlreadyClosed = errors.New("batch already closed") ErrBatchAlreadyClosed = errors.New("batch already closed")
) )
const storeTGVersion = `-- name: StoreTGVersion :batchexec
INSERT INTO talkgroup_versions(time, created_by,
system_id,
tgid,
name,
alpha_tag,
tg_group,
frequency,
metadata,
tags,
alert,
alert_config,
weight,
learned
) SELECT NOW(), $1,
tg.system_id,
tg.tgid,
tg.name,
tg.alpha_tag,
tg.tg_group,
tg.frequency,
tg.metadata,
tg.tags,
tg.alert,
tg.alert_config,
tg.weight,
tg.learned
FROM talkgroups tg WHERE tg.system_id = $2 AND tg.tgid = $3
`
type StoreTGVersionBatchResults struct {
br pgx.BatchResults
tot int
closed bool
}
type StoreTGVersionParams struct {
Submitter *int32 `json:"submitter"`
SystemID int32 `json:"system_id"`
TGID int32 `json:"tgid"`
}
func (q *Queries) StoreTGVersion(ctx context.Context, arg []StoreTGVersionParams) *StoreTGVersionBatchResults {
batch := &pgx.Batch{}
for _, a := range arg {
vals := []interface{}{
a.Submitter,
a.SystemID,
a.TGID,
}
batch.Queue(storeTGVersion, vals...)
}
br := q.db.SendBatch(ctx, batch)
return &StoreTGVersionBatchResults{br, len(arg), false}
}
func (b *StoreTGVersionBatchResults) Exec(f func(int, error)) {
defer b.br.Close()
for t := 0; t < b.tot; t++ {
if b.closed {
if f != nil {
f(t, ErrBatchAlreadyClosed)
}
continue
}
_, err := b.br.Exec()
if f != nil {
f(t, err)
}
}
}
func (b *StoreTGVersionBatchResults) Close() error {
b.closed = true
return b.br.Close()
}
const upsertTalkgroup = `-- name: UpsertTalkgroup :batchone const upsertTalkgroup = `-- name: UpsertTalkgroup :batchone
INSERT INTO talkgroups AS tg ( INSERT INTO talkgroups AS tg (
system_id, tgid, name, alpha_tag, tg_group, frequency, metadata, tags, alert, alert_config, weight, learned system_id, tgid, name, alpha_tag, tg_group, frequency, metadata, tags, alert, alert_config, weight, learned

View file

@ -1666,6 +1666,55 @@ func (_c *Store_SetTalkgroupTags_Call) RunAndReturn(run func(context.Context, []
return _c return _c
} }
// StoreTGVersion provides a mock function with given fields: ctx, arg
func (_m *Store) StoreTGVersion(ctx context.Context, arg []database.StoreTGVersionParams) *database.StoreTGVersionBatchResults {
ret := _m.Called(ctx, arg)
if len(ret) == 0 {
panic("no return value specified for StoreTGVersion")
}
var r0 *database.StoreTGVersionBatchResults
if rf, ok := ret.Get(0).(func(context.Context, []database.StoreTGVersionParams) *database.StoreTGVersionBatchResults); ok {
r0 = rf(ctx, arg)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(*database.StoreTGVersionBatchResults)
}
}
return r0
}
// Store_StoreTGVersion_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'StoreTGVersion'
type Store_StoreTGVersion_Call struct {
*mock.Call
}
// StoreTGVersion is a helper method to define mock.On call
// - ctx context.Context
// - arg []database.StoreTGVersionParams
func (_e *Store_Expecter) StoreTGVersion(ctx interface{}, arg interface{}) *Store_StoreTGVersion_Call {
return &Store_StoreTGVersion_Call{Call: _e.mock.On("StoreTGVersion", ctx, arg)}
}
func (_c *Store_StoreTGVersion_Call) Run(run func(ctx context.Context, arg []database.StoreTGVersionParams)) *Store_StoreTGVersion_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(context.Context), args[1].([]database.StoreTGVersionParams))
})
return _c
}
func (_c *Store_StoreTGVersion_Call) Return(_a0 *database.StoreTGVersionBatchResults) *Store_StoreTGVersion_Call {
_c.Call.Return(_a0)
return _c
}
func (_c *Store_StoreTGVersion_Call) RunAndReturn(run func(context.Context, []database.StoreTGVersionParams) *database.StoreTGVersionBatchResults) *Store_StoreTGVersion_Call {
_c.Call.Return(run)
return _c
}
// UpdatePassword provides a mock function with given fields: ctx, username, password // UpdatePassword provides a mock function with given fields: ctx, username, password
func (_m *Store) UpdatePassword(ctx context.Context, username string, password string) error { func (_m *Store) UpdatePassword(ctx context.Context, username string, password string) error {
ret := _m.Called(ctx, username, password) ret := _m.Called(ctx, username, password)

View file

@ -98,6 +98,24 @@ type Talkgroup struct {
Learned bool `json:"learned"` Learned bool `json:"learned"`
} }
type TalkgroupVersion struct {
ID int `json:"id"`
Time pgtype.Timestamptz `json:"time"`
CreatedBy *int32 `json:"created_by"`
SystemID *int32 `json:"system_id"`
TGID *int32 `json:"tgid"`
Name *string `json:"name"`
AlphaTag *string `json:"alpha_tag"`
TGGroup *string `json:"tg_group"`
Frequency *int32 `json:"frequency"`
Metadata []byte `json:"metadata"`
Tags []string `json:"tags"`
Alert *bool `json:"alert"`
AlertConfig []byte `json:"alert_config"`
Weight *float32 `json:"weight"`
Learned *bool `json:"learned"`
}
type TalkgroupsLearned struct { type TalkgroupsLearned struct {
ID int `json:"id"` ID int `json:"id"`
SystemID int `json:"system_id"` SystemID int `json:"system_id"`

View file

@ -37,6 +37,7 @@ type Querier interface {
GetUsers(ctx context.Context) ([]User, error) GetUsers(ctx context.Context) ([]User, error)
SetCallTranscript(ctx context.Context, iD uuid.UUID, transcript *string) error SetCallTranscript(ctx context.Context, iD uuid.UUID, transcript *string) error
SetTalkgroupTags(ctx context.Context, tags []string, systemID int32, tGID int32) error SetTalkgroupTags(ctx context.Context, tags []string, systemID int32, tGID int32) error
StoreTGVersion(ctx context.Context, arg []StoreTGVersionParams) *StoreTGVersionBatchResults
UpdatePassword(ctx context.Context, username string, password string) error UpdatePassword(ctx context.Context, username string, password string) error
UpdateTalkgroup(ctx context.Context, arg UpdateTalkgroupParams) (Talkgroup, error) UpdateTalkgroup(ctx context.Context, arg UpdateTalkgroupParams) (Talkgroup, error)
UpsertTalkgroup(ctx context.Context, arg []UpsertTalkgroupParams) *UpsertTalkgroupBatchResults UpsertTalkgroup(ctx context.Context, arg []UpsertTalkgroupParams) *UpsertTalkgroupBatchResults

View file

@ -131,7 +131,7 @@ func (h *RdioHTTP) routeCallUpload(w http.ResponseWriter, r *http.Request) {
return return
} }
log.Info().Int("system", cur.System).Int("tgid", cur.Talkgroup).Msg("ingested") log.Info().Int("system", cur.System).Int("tgid", cur.Talkgroup).Str("duration", call.Duration.Duration().String()).Msg("ingested")
written, err := w.Write([]byte("Call imported successfully.")) written, err := w.Write([]byte("Call imported successfully."))
if err != nil { if err != nil {

View file

@ -8,6 +8,7 @@ import (
"time" "time"
"dynatron.me/x/stillbox/internal/common" "dynatron.me/x/stillbox/internal/common"
"dynatron.me/x/stillbox/pkg/auth"
"dynatron.me/x/stillbox/pkg/config" "dynatron.me/x/stillbox/pkg/config"
"dynatron.me/x/stillbox/pkg/database" "dynatron.me/x/stillbox/pkg/database"
@ -312,6 +313,7 @@ func (t *cache) UpsertTGs(ctx context.Context, system int, input []database.Upse
tgs := make([]*Talkgroup, 0, len(input)) tgs := make([]*Talkgroup, 0, len(input))
err := db.InTx(ctx, func(db database.Store) error { err := db.InTx(ctx, func(db database.Store) error {
versionParams := make([]database.StoreTGVersionParams, 0, len(input))
for i := range input { for i := range input {
// normalize tags // normalize tags
for j, tag := range input[i].Tags { for j, tag := range input[i].Tags {
@ -320,18 +322,25 @@ func (t *cache) UpsertTGs(ctx context.Context, system int, input []database.Upse
input[i].SystemID = int32(system) input[i].SystemID = int32(system)
input[i].Learned = common.PtrTo(false) input[i].Learned = common.PtrTo(false)
} }
var oerr error var oerr error
batch := db.UpsertTalkgroup(ctx, input) tgUpsertBatch := db.UpsertTalkgroup(ctx, input)
defer batch.Close() defer tgUpsertBatch.Close()
batch.QueryRow(func(_ int, r database.Talkgroup, err error) { tgUpsertBatch.QueryRow(func(_ int, r database.Talkgroup, err error) {
if err != nil { if err != nil {
oerr = err oerr = err
return return
} }
versionParams = append(versionParams, database.StoreTGVersionParams{
SystemID: int32(system),
TGID: r.TGID,
Submitter: auth.UIDFrom(ctx),
})
tgs = append(tgs, &Talkgroup{ tgs = append(tgs, &Talkgroup{
Talkgroup: r, Talkgroup: r,
System: sys, System: sys,
@ -343,7 +352,17 @@ func (t *cache) UpsertTGs(ctx context.Context, system int, input []database.Upse
return oerr return oerr
} }
return nil versionBatch := db.StoreTGVersion(ctx, versionParams)
defer versionBatch.Close()
versionBatch.Exec(func(_ int, err error) {
if err != nil {
oerr = err
return
}
})
return oerr
}, pgx.TxOptions{}) }, pgx.TxOptions{})
if err != nil { if err != nil {

View file

@ -122,6 +122,35 @@ SET
learned = COALESCE(sqlc.narg('learned'), tg.learned) learned = COALESCE(sqlc.narg('learned'), tg.learned)
RETURNING *; RETURNING *;
-- name: StoreTGVersion :batchexec
INSERT INTO talkgroup_versions(time, created_by,
system_id,
tgid,
name,
alpha_tag,
tg_group,
frequency,
metadata,
tags,
alert,
alert_config,
weight,
learned
) SELECT NOW(), @submitter,
tg.system_id,
tg.tgid,
tg.name,
tg.alpha_tag,
tg.tg_group,
tg.frequency,
tg.metadata,
tg.tags,
tg.alert,
tg.alert_config,
tg.weight,
tg.learned
FROM talkgroups tg WHERE tg.system_id = @system_id AND tg.tgid = @tgid;
-- name: AddTalkgroupWithLearnedFlag :exec -- name: AddTalkgroupWithLearnedFlag :exec
INSERT INTO talkgroups ( INSERT INTO talkgroups (
system_id, system_id,