Put LearnTG in the store where it belongs
This commit is contained in:
parent
8207c59815
commit
c48d1eaf8d
4 changed files with 27 additions and 21 deletions
|
@ -1,13 +1,11 @@
|
||||||
package calls
|
package calls
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
|
||||||
"fmt"
|
"fmt"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"dynatron.me/x/stillbox/internal/audio"
|
"dynatron.me/x/stillbox/internal/audio"
|
||||||
"dynatron.me/x/stillbox/pkg/auth"
|
"dynatron.me/x/stillbox/pkg/auth"
|
||||||
"dynatron.me/x/stillbox/pkg/database"
|
|
||||||
"dynatron.me/x/stillbox/pkg/pb"
|
"dynatron.me/x/stillbox/pkg/pb"
|
||||||
"dynatron.me/x/stillbox/pkg/talkgroups"
|
"dynatron.me/x/stillbox/pkg/talkgroups"
|
||||||
|
|
||||||
|
@ -113,21 +111,6 @@ func (c *Call) ToPB() *pb.Call {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *Call) LearnTG(ctx context.Context, db database.Store) (learnedId int, err error) {
|
|
||||||
err = db.AddTalkgroupWithLearnedFlag(ctx, int32(c.System), int32(c.Talkgroup))
|
|
||||||
if err != nil {
|
|
||||||
return 0, fmt.Errorf("addTalkgroupWithLearnedFlag: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
return db.AddLearnedTalkgroup(ctx, database.AddLearnedTalkgroupParams{
|
|
||||||
SystemID: c.System,
|
|
||||||
TGID: c.Talkgroup,
|
|
||||||
Name: c.TalkgroupLabel,
|
|
||||||
AlphaTag: c.TGAlphaTag,
|
|
||||||
TGGroup: c.TalkgroupGroup,
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *Call) computeLength() (err error) {
|
func (c *Call) computeLength() (err error) {
|
||||||
var td time.Duration
|
var td time.Duration
|
||||||
|
|
||||||
|
|
|
@ -79,7 +79,7 @@ func New(ctx context.Context, cfg *config.Config) (*Server, error) {
|
||||||
rest: api,
|
rest: api,
|
||||||
}
|
}
|
||||||
|
|
||||||
srv.sinks.Register("database", sinks.NewDatabaseSink(srv.db), true)
|
srv.sinks.Register("database", sinks.NewDatabaseSink(srv.db, tgCache), true)
|
||||||
srv.sinks.Register("nexus", sinks.NewNexusSink(srv.nex), false)
|
srv.sinks.Register("nexus", sinks.NewNexusSink(srv.nex), false)
|
||||||
|
|
||||||
if srv.alerter.Enabled() {
|
if srv.alerter.Enabled() {
|
||||||
|
|
|
@ -7,6 +7,7 @@ import (
|
||||||
"dynatron.me/x/stillbox/internal/common"
|
"dynatron.me/x/stillbox/internal/common"
|
||||||
"dynatron.me/x/stillbox/pkg/calls"
|
"dynatron.me/x/stillbox/pkg/calls"
|
||||||
"dynatron.me/x/stillbox/pkg/database"
|
"dynatron.me/x/stillbox/pkg/database"
|
||||||
|
"dynatron.me/x/stillbox/pkg/talkgroups/tgstore"
|
||||||
|
|
||||||
"github.com/jackc/pgx/v5"
|
"github.com/jackc/pgx/v5"
|
||||||
"github.com/jackc/pgx/v5/pgtype"
|
"github.com/jackc/pgx/v5/pgtype"
|
||||||
|
@ -15,10 +16,11 @@ import (
|
||||||
|
|
||||||
type DatabaseSink struct {
|
type DatabaseSink struct {
|
||||||
db database.Store
|
db database.Store
|
||||||
|
tgs tgstore.Store
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewDatabaseSink(store database.Store) *DatabaseSink {
|
func NewDatabaseSink(store database.Store, tgs tgstore.Store) *DatabaseSink {
|
||||||
return &DatabaseSink{store}
|
return &DatabaseSink{store, tgs}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *DatabaseSink) Call(ctx context.Context, call *calls.Call) error {
|
func (s *DatabaseSink) Call(ctx context.Context, call *calls.Call) error {
|
||||||
|
@ -43,7 +45,7 @@ func (s *DatabaseSink) Call(ctx context.Context, call *calls.Call) error {
|
||||||
|
|
||||||
if err != nil && database.IsTGConstraintViolation(err) {
|
if err != nil && database.IsTGConstraintViolation(err) {
|
||||||
return s.db.InTx(ctx, func(tx database.Store) error {
|
return s.db.InTx(ctx, func(tx database.Store) error {
|
||||||
_, err := call.LearnTG(ctx, tx)
|
_, err := s.tgs.LearnTG(ctx, call)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("add call: learn tg: %w", err)
|
return fmt.Errorf("add call: learn tg: %w", err)
|
||||||
}
|
}
|
||||||
|
|
|
@ -3,6 +3,7 @@ package tgstore
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"errors"
|
"errors"
|
||||||
|
"fmt"
|
||||||
"strings"
|
"strings"
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
@ -11,6 +12,7 @@ import (
|
||||||
"dynatron.me/x/stillbox/pkg/auth"
|
"dynatron.me/x/stillbox/pkg/auth"
|
||||||
"dynatron.me/x/stillbox/pkg/config"
|
"dynatron.me/x/stillbox/pkg/config"
|
||||||
"dynatron.me/x/stillbox/pkg/database"
|
"dynatron.me/x/stillbox/pkg/database"
|
||||||
|
"dynatron.me/x/stillbox/pkg/calls"
|
||||||
tgsp "dynatron.me/x/stillbox/pkg/talkgroups"
|
tgsp "dynatron.me/x/stillbox/pkg/talkgroups"
|
||||||
|
|
||||||
"github.com/jackc/pgx/v5"
|
"github.com/jackc/pgx/v5"
|
||||||
|
@ -37,6 +39,9 @@ type Store interface {
|
||||||
// TGs retrieves many talkgroups from the Store.
|
// TGs retrieves many talkgroups from the Store.
|
||||||
TGs(ctx context.Context, tgs tgsp.IDs) ([]*tgsp.Talkgroup, error)
|
TGs(ctx context.Context, tgs tgsp.IDs) ([]*tgsp.Talkgroup, error)
|
||||||
|
|
||||||
|
// LearnTG learns the talkgroup from a Call.
|
||||||
|
LearnTG(ctx context.Context, call *calls.Call) (learnedId int, err error)
|
||||||
|
|
||||||
// SystemTGs retrieves all Talkgroups associated with a System.
|
// SystemTGs retrieves all Talkgroups associated with a System.
|
||||||
SystemTGs(ctx context.Context, systemID int32) ([]*tgsp.Talkgroup, error)
|
SystemTGs(ctx context.Context, systemID int32) ([]*tgsp.Talkgroup, error)
|
||||||
|
|
||||||
|
@ -300,6 +305,22 @@ func (t *cache) UpdateTG(ctx context.Context, input database.UpdateTalkgroupPara
|
||||||
return record, nil
|
return record, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (t *cache) LearnTG(ctx context.Context, c *calls.Call) (learnedId int, err error) {
|
||||||
|
db := database.FromCtx(ctx)
|
||||||
|
err = db.AddTalkgroupWithLearnedFlag(ctx, int32(c.System), int32(c.Talkgroup))
|
||||||
|
if err != nil {
|
||||||
|
return 0, fmt.Errorf("addTalkgroupWithLearnedFlag: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return db.AddLearnedTalkgroup(ctx, database.AddLearnedTalkgroupParams{
|
||||||
|
SystemID: c.System,
|
||||||
|
TGID: c.Talkgroup,
|
||||||
|
Name: c.TalkgroupLabel,
|
||||||
|
AlphaTag: c.TGAlphaTag,
|
||||||
|
TGGroup: c.TalkgroupGroup,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
func (t *cache) UpsertTGs(ctx context.Context, system int, input []database.UpsertTalkgroupParams) ([]*tgsp.Talkgroup, error) {
|
func (t *cache) UpsertTGs(ctx context.Context, system int, input []database.UpsertTalkgroupParams) ([]*tgsp.Talkgroup, error) {
|
||||||
db := database.FromCtx(ctx)
|
db := database.FromCtx(ctx)
|
||||||
sysName, hasSys := t.SystemName(ctx, system)
|
sysName, hasSys := t.SystemName(ctx, system)
|
||||||
|
|
Loading…
Reference in a new issue