2024-08-01 01:01:08 -04:00
|
|
|
package sinks
|
|
|
|
|
|
|
|
import (
|
|
|
|
"context"
|
|
|
|
"fmt"
|
|
|
|
|
|
|
|
"dynatron.me/x/stillbox/internal/common"
|
2024-08-05 18:11:31 -04:00
|
|
|
"dynatron.me/x/stillbox/pkg/calls"
|
2024-11-03 07:19:03 -05:00
|
|
|
"dynatron.me/x/stillbox/pkg/database"
|
2024-08-01 01:01:08 -04:00
|
|
|
|
2024-11-17 21:46:10 -05:00
|
|
|
"github.com/jackc/pgx/v5"
|
2024-10-23 08:55:19 -04:00
|
|
|
"github.com/jackc/pgx/v5/pgtype"
|
2024-08-01 01:01:08 -04:00
|
|
|
"github.com/rs/zerolog/log"
|
|
|
|
)
|
|
|
|
|
|
|
|
type DatabaseSink struct {
|
2024-11-17 21:46:10 -05:00
|
|
|
db database.Store
|
2024-08-01 01:01:08 -04:00
|
|
|
}
|
|
|
|
|
2024-11-17 21:46:10 -05:00
|
|
|
func NewDatabaseSink(store database.Store) *DatabaseSink {
|
|
|
|
return &DatabaseSink{store}
|
2024-08-01 01:01:08 -04:00
|
|
|
}
|
|
|
|
|
|
|
|
func (s *DatabaseSink) Call(ctx context.Context, call *calls.Call) error {
|
2024-08-18 08:44:44 -04:00
|
|
|
if !call.ShouldStore() {
|
|
|
|
log.Debug().Str("call", call.String()).Msg("received dontStore call")
|
|
|
|
return nil
|
|
|
|
}
|
2024-10-23 08:55:19 -04:00
|
|
|
|
2024-11-17 21:46:10 -05:00
|
|
|
params := s.toAddCallParams(call)
|
|
|
|
|
|
|
|
err := s.db.InTx(ctx, func(tx database.Store) error {
|
|
|
|
err := tx.AddCall(ctx, params)
|
|
|
|
if err != nil {
|
|
|
|
|
|
|
|
return fmt.Errorf("add call: %w", err)
|
|
|
|
}
|
2024-08-01 01:01:08 -04:00
|
|
|
|
2024-11-17 21:46:10 -05:00
|
|
|
log.Debug().Str("id", call.ID.String()).Int("system", call.System).Int("tgid", call.Talkgroup).Msg("stored")
|
|
|
|
|
|
|
|
return nil
|
|
|
|
}, pgx.TxOptions{})
|
|
|
|
|
|
|
|
if err != nil && database.IsTGConstraintViolation(err) {
|
|
|
|
return s.db.InTx(ctx, func(tx database.Store) error {
|
|
|
|
_, err := call.LearnTG(ctx, tx)
|
|
|
|
if err != nil {
|
|
|
|
return fmt.Errorf("add call: learn tg: %w", err)
|
|
|
|
}
|
|
|
|
|
|
|
|
err = tx.AddCall(ctx, params)
|
|
|
|
if err != nil {
|
|
|
|
return fmt.Errorf("add call: retry: %w", err)
|
|
|
|
}
|
|
|
|
|
|
|
|
return nil
|
|
|
|
}, pgx.TxOptions{})
|
|
|
|
}
|
2024-08-01 01:01:08 -04:00
|
|
|
|
2024-11-17 21:46:10 -05:00
|
|
|
return err
|
2024-08-01 01:01:08 -04:00
|
|
|
}
|
|
|
|
|
|
|
|
func (s *DatabaseSink) SinkType() string {
|
|
|
|
return "database"
|
|
|
|
}
|
|
|
|
|
|
|
|
func (s *DatabaseSink) toAddCallParams(call *calls.Call) database.AddCallParams {
|
|
|
|
return database.AddCallParams{
|
2024-11-06 20:47:10 -05:00
|
|
|
ID: call.ID,
|
2024-08-01 01:01:08 -04:00
|
|
|
Submitter: call.Submitter.Int32Ptr(),
|
|
|
|
System: call.System,
|
|
|
|
Talkgroup: call.Talkgroup,
|
2024-10-23 08:55:19 -04:00
|
|
|
CallDate: pgtype.Timestamptz{Time: call.DateTime, Valid: true},
|
2024-08-01 01:01:08 -04:00
|
|
|
AudioName: common.PtrOrNull(call.AudioName),
|
|
|
|
AudioBlob: call.Audio,
|
|
|
|
AudioType: common.PtrOrNull(call.AudioType),
|
2024-11-06 20:55:48 -05:00
|
|
|
Duration: call.Duration.MsInt32Ptr(),
|
2024-08-01 01:01:08 -04:00
|
|
|
Frequency: call.Frequency,
|
|
|
|
Frequencies: call.Frequencies,
|
|
|
|
Patches: call.Patches,
|
2024-11-17 21:46:10 -05:00
|
|
|
TGLabel: call.TalkgroupLabel,
|
|
|
|
TGAlphaTag: call.TGAlphaTag,
|
|
|
|
TGGroup: call.TalkgroupGroup,
|
2024-08-01 01:01:08 -04:00
|
|
|
Source: call.Source,
|
|
|
|
}
|
|
|
|
}
|