callsnormalize #40

Merged
amigan merged 7 commits from callsnormalize into trunk 2024-11-19 10:17:17 -05:00
9 changed files with 52 additions and 46 deletions
Showing only changes of commit 5e6c97adec - Show all commits

View file

@ -40,7 +40,8 @@ func (as *alerter) tgStatsHandler(w http.ResponseWriter, r *http.Request) {
ctx := r.Context() ctx := r.Context()
db := database.FromCtx(ctx) db := database.FromCtx(ctx)
tgs, err := db.GetTalkgroupsWithLearnedBySysTGID(ctx, as.scoredTGsTuple()) tgt := as.scoredTGsTuple()
tgs, err := db.GetTalkgroupsWithLearnedBySysTGID(ctx, tgt)
if err != nil { if err != nil {
log.Error().Err(err).Msg("stats TG get failed") log.Error().Err(err).Msg("stats TG get failed")
http.Error(w, err.Error(), http.StatusInternalServerError) http.Error(w, err.Error(), http.StatusInternalServerError)

View file

@ -124,6 +124,7 @@ func (c *Call) LearnTG(ctx context.Context, db database.Store) (learnedId int, e
TGID: c.Talkgroup, TGID: c.Talkgroup,
Name: c.TalkgroupLabel, Name: c.TalkgroupLabel,
AlphaTag: c.TGAlphaTag, AlphaTag: c.TGAlphaTag,
TGGroup: c.TalkgroupGroup,
}) })
} }

View file

@ -15,7 +15,7 @@ type talkgroupQuerier interface {
type TGTuples [2][]uint32 type TGTuples [2][]uint32
const TGConstraintName = "" const TGConstraintName = "calls_system_talkgroup_fkey"
func IsTGConstraintViolation(e error) bool { func IsTGConstraintViolation(e error) bool {
var err *pgconn.PgError var err *pgconn.PgError
@ -51,7 +51,7 @@ SELECT
tgl.id, tgl.system_id::INT4, tgl.tgid::INT4, tgl.name, tgl.id, tgl.system_id::INT4, tgl.tgid::INT4, tgl.name,
tgl.alpha_tag, tgl.tg_group, NULL::INTEGER, NULL::JSONB, tgl.alpha_tag, tgl.tg_group, NULL::INTEGER, NULL::JSONB,
CASE WHEN tgl.tg_group IS NULL THEN NULL ELSE ARRAY[tgl.tg_group] END, CASE WHEN tgl.tg_group IS NULL THEN NULL ELSE ARRAY[tgl.tg_group] END,
TRUE, NULL::JSONB, 1.0, TRUE learned, sys.id, sys.name TRUE, NULL::JSONB, 1.0, sys.id, sys.name, TRUE learned
FROM talkgroups_learned tgl FROM talkgroups_learned tgl
JOIN systems sys ON tgl.system_id = sys.id JOIN systems sys ON tgl.system_id = sys.id
JOIN UNNEST($1::INT4[], $2::INT4[]) AS tgt(sys, tg) ON (tgl.system_id = tgt.sys AND tgl.tgid = tgt.tg);` JOIN UNNEST($1::INT4[], $2::INT4[]) AS tgt(sys, tg) ON (tgl.system_id = tgt.sys AND tgl.tgid = tgt.tg);`

View file

@ -27,21 +27,12 @@ func (s *DatabaseSink) Call(ctx context.Context, call *calls.Call) error {
return nil return nil
} }
return s.db.InTx(ctx, func(tx database.Store) error { params := s.toAddCallParams(call)
params := s.toAddCallParams(call)
err := s.db.InTx(ctx, func(tx database.Store) error {
err := tx.AddCall(ctx, params) err := tx.AddCall(ctx, params)
if err != nil { if err != nil {
if database.IsTGConstraintViolation(err) {
_, err := call.LearnTG(ctx, tx)
if err != nil {
return fmt.Errorf("add call: learn tg: %w", err)
}
err = tx.AddCall(ctx, params)
if err != nil {
return fmt.Errorf("add call: retry: %w", err)
}
}
return fmt.Errorf("add call: %w", err) return fmt.Errorf("add call: %w", err)
} }
@ -49,6 +40,24 @@ func (s *DatabaseSink) Call(ctx context.Context, call *calls.Call) error {
return nil return nil
}, pgx.TxOptions{}) }, pgx.TxOptions{})
if err != nil && database.IsTGConstraintViolation(err) {
return s.db.InTx(ctx, func(tx database.Store) error {
_, err := call.LearnTG(ctx, tx)
if err != nil {
return fmt.Errorf("add call: learn tg: %w", err)
}
err = tx.AddCall(ctx, params)
if err != nil {
return fmt.Errorf("add call: retry: %w", err)
}
return nil
}, pgx.TxOptions{})
}
return err
} }
func (s *DatabaseSink) SinkType() string { func (s *DatabaseSink) SinkType() string {

View file

@ -252,7 +252,7 @@ func (t *cache) TG(ctx context.Context, tg ID) (*Talkgroup, error) {
case pgx.ErrNoRows: case pgx.ErrNoRows:
return nil, ErrNotFound return nil, ErrNotFound
default: default:
log.Error().Err(err).Msg("TG() cache add db get") log.Error().Err(err).Uint32("sys", tg.System).Uint32("tg", tg.Talkgroup).Msg("TG() cache add db get")
return nil, errors.Join(ErrNotFound, err) return nil, errors.Join(ErrNotFound, err)
} }

View file

@ -44,7 +44,7 @@ CREATE INDEX talkgroups_system_tgid_idx ON talkgroups (system_id, tgid);
CREATE INDEX IF NOT EXISTS talkgroup_id_tags ON talkgroups USING GIN (tags); CREATE INDEX IF NOT EXISTS talkgroup_id_tags ON talkgroups USING GIN (tags);
CREATE TABLE IF NOT EXISTS talkgroups_learned( CREATE TABLE IF NOT EXISTS talkgroups_learned(
id UUID PRIMARY KEY, id INTEGER PRIMARY KEY GENERATED ALWAYS AS IDENTITY,
system_id INTEGER REFERENCES systems(id) NOT NULL, system_id INTEGER REFERENCES systems(id) NOT NULL,
tgid INTEGER NOT NULL, tgid INTEGER NOT NULL,
name TEXT NOT NULL, name TEXT NOT NULL,
@ -55,7 +55,7 @@ CREATE TABLE IF NOT EXISTS talkgroups_learned(
); );
CREATE TABLE IF NOT EXISTS alerts( CREATE TABLE IF NOT EXISTS alerts(
id UUID PRIMARY KEY, id INTEGER PRIMARY KEY GENERATED ALWAYS AS IDENTITY,
time TIMESTAMPTZ NOT NULL, time TIMESTAMPTZ NOT NULL,
tgid INTEGER NOT NULL, tgid INTEGER NOT NULL,
system_id INTEGER REFERENCES systems(id) NOT NULL, system_id INTEGER REFERENCES systems(id) NOT NULL,

View file

@ -1,11 +1,11 @@
ALTER TABLE calls DROP CONSTRAINT IF EXISTS calls_talkgroup_id_fkey; ALTER TABLE calls DROP CONSTRAINT IF EXISTS calls_system_talkgroup_fkey;
ALTER TABLE talkgroups ALTER COLUMN id DROP IDENTITY IF EXISTS;
ALTER TABLE talkgroups_learned ALTER COLUMN id SET DATA TYPE UUID USING (gen_random_uuid());
DROP SEQUENCE IF EXISTS talkgroups_id_seq;
ALTER TABLE talkgroups DROP COLUMN IF EXISTS learned; ALTER TABLE talkgroups DROP COLUMN IF EXISTS learned;
ALTER TABLE talkgroups ALTER COLUMN id DROP IDENTITY IF EXISTS;
ALTER TABLE talkgroups ALTER COLUMN id SET DATA TYPE UUID USING (gen_random_uuid());
DROP SEQUENCE IF EXISTS talkgroups_id_seq;
CREATE OR REPLACE FUNCTION learn_talkgroup() CREATE OR REPLACE FUNCTION learn_talkgroup()
RETURNS TRIGGER AS $$ RETURNS TRIGGER AS $$
BEGIN BEGIN
@ -23,12 +23,3 @@ END
$$ LANGUAGE plpgsql; $$ LANGUAGE plpgsql;
CREATE OR REPLACE TRIGGER learn_tg AFTER INSERT ON calls FOR EACH ROW EXECUTE FUNCTION learn_talkgroup(); CREATE OR REPLACE TRIGGER learn_tg AFTER INSERT ON calls FOR EACH ROW EXECUTE FUNCTION learn_talkgroup();
ALTER TABLE talkgroups_learned ALTER COLUMN id DROP IDENTITY IF EXISTS;
ALTER TABLE talkgroups_learned ALTER COLUMN id SET DATA TYPE UUID USING (gen_random_uuid());
DROP SEQUENCE IF EXISTS talkgroups_learned_id_seq;
ALTER TABLE alerts ALTER COLUMN id DROP IDENTITY IF EXISTS;
ALTER TABLE alerts ALTER COLUMN id SET DATA TYPE UUID USING (gen_random_uuid());
DROP SEQUENCE IF EXISTS alerts_id_seq;

View file

@ -1,21 +1,24 @@
CREATE SEQUENCE IF NOT EXISTS alerts_id_seq START WITH 1;
ALTER TABLE alerts ALTER COLUMN id SET DATA TYPE INTEGER USING (nextval('alerts_id_seq'));
ALTER TABLE alerts ALTER COLUMN id ADD GENERATED ALWAYS AS IDENTITY;
DROP SEQUENCE IF EXISTS alerts_id_seq;
CREATE SEQUENCE IF NOT EXISTS talkgroups_learned_id_seq START WITH 1;
ALTER TABLE talkgroups_learned ALTER COLUMN id SET DATA TYPE INTEGER USING (nextval('talkgroups_learned_id_seq'));
ALTER TABLE talkgroups_learned ALTER COLUMN id ADD GENERATED ALWAYS AS IDENTITY;
DROP SEQUENCE IF EXISTS talkgroup_learned_id_seq;
DROP TRIGGER IF EXISTS learn_tg ON calls; DROP TRIGGER IF EXISTS learn_tg ON calls;
DROP FUNCTION IF EXISTS learn_talkgroup(); DROP FUNCTION IF EXISTS learn_talkgroup();
ALTER TABLE talkgroups ADD COLUMN IF NOT EXISTS learned BOOLEAN NOT NULL DEFAULT FALSE;
CREATE SEQUENCE IF NOT EXISTS talkgroups_id_seq START WITH 1; CREATE SEQUENCE IF NOT EXISTS talkgroups_id_seq START WITH 1;
ALTER TABLE talkgroups ALTER COLUMN id SET DATA TYPE INTEGER USING (nextval('talkgroups_id_seq')); ALTER TABLE talkgroups ALTER COLUMN id SET DATA TYPE INTEGER USING (nextval('talkgroups_id_seq'));
ALTER TABLE talkgroups ALTER COLUMN id ADD GENERATED ALWAYS AS IDENTITY; ALTER TABLE talkgroups ALTER COLUMN id ADD GENERATED ALWAYS AS IDENTITY;
DROP SEQUENCE IF EXISTS talkgroups_id_seq; DROP SEQUENCE IF EXISTS talkgroups_id_seq;
ALTER TABLE calls ADD CONSTRAINT calls_talkgroup_id_fkey FOREIGN KEY (system, talkgroup) REFERENCES talkgroups(system_id, tgid); ALTER TABLE talkgroups ADD COLUMN IF NOT EXISTS learned BOOLEAN NOT NULL DEFAULT FALSE;
-- calls fkey constraint requires us to migrate all calls' talkgroup tuples to exist
INSERT INTO talkgroups (system_id, tgid, learned)
SELECT DISTINCT system_id, tgid, TRUE FROM talkgroups_learned ON CONFLICT DO NOTHING;
INSERT INTO talkgroups (system_id, tgid, learned)
SELECT DISTINCT system, talkgroup, TRUE FROM calls ON CONFLICT DO NOTHING;
INSERT INTO talkgroups_learned (system_id, tgid, name, tg_group, alpha_tag)
SELECT DISTINCT c.system, c.talkgroup, c.tg_label, c.tg_group, c.tg_alpha_tag
FROM calls c
JOIN talkgroups t ON (t.system_id = c.system AND t.tgid = c.talkgroup AND t.learned IS TRUE)
ON CONFLICT DO NOTHING;
ALTER TABLE calls ADD CONSTRAINT calls_system_talkgroup_fkey FOREIGN KEY (system, talkgroup) REFERENCES talkgroups(system_id, tgid);

View file

@ -85,7 +85,8 @@ CREATE TABLE IF NOT EXISTS calls(
tg_alpha_tag TEXT, tg_alpha_tag TEXT,
tg_group TEXT, tg_group TEXT,
source INTEGER NOT NULL, source INTEGER NOT NULL,
transcript TEXT transcript TEXT,
FOREIGN KEY (system, talkgroup) REFERENCES talkgroups(system_id, tgid)
); );
CREATE INDEX IF NOT EXISTS calls_transcript_idx ON calls USING GIN (to_tsvector('english', transcript)); CREATE INDEX IF NOT EXISTS calls_transcript_idx ON calls USING GIN (to_tsvector('english', transcript));