Add talkgroup activity alerting #17

Merged
amigan merged 11 commits from alerting into trunk 2024-10-31 00:20:48 -04:00
11 changed files with 1425 additions and 0 deletions
Showing only changes of commit c3a9ed7a97 - Show all commits

View file

@ -0,0 +1,21 @@
MIT License
Copyright (c) 2016 Neri Marschik
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.

View file

@ -0,0 +1,112 @@
# go-time-series
[![License](http://img.shields.io/badge/license-MIT-red.svg?style=flat)](./LICENSE)
[![GoDoc](https://godoc.org/github.com/codesuki/go-time-series?status.svg)](https://godoc.org/github.com/codesuki/go-time-series)
[![Build Status](http://img.shields.io/travis/codesuki/go-time-series.svg?style=flat)](https://travis-ci.org/codesuki/go-time-series)
[![codecov](https://codecov.io/gh/codesuki/go-time-series/branch/master/graph/badge.svg)](https://codecov.io/gh/codesuki/go-time-series)
Time series implementation in Go.
It is used in [go-trending](https://www.github.com/codesuki/go-trending) as a backend for a trending algorithm.
The time series supports storing counts at different granularities, e.g. seconds, minutes, hours, ....<br />
In case of go-trending the time series is configured to have recent data available at small granularity, i.e. the recent 60 seconds, and historical data available at large granularity, i.e. the last few hours, days of data.
A redis backend is planned.
* Simple interface
* Store time series data at different granularities
* Use your own clock implementation, e.g. for testing or similar
## Examples
### Creating a time series with default settings
The default settings use `time.Now()` as clock and `time.Second * 60`, `time.Minute * 60` and `time.Hour * 24` as granularities.
```go
import "github.com/codesuki/go-time-series"
...
ts, err := timeseries.NewTimeSeries()
if err != nil {
// handle error
}
```
### Creating a customized time series
You can specify the clock and/or granularities to use. A clock must implement the `timeseries.Clock` interface.
```go
import "github.com/codesuki/go-time-series"
...
type clock struct {}
func (c *clock) Now() {
return time.Time{} // always returns the zero time
}
var myClock clock
...
ts, err := timeseries.NewTimeSeries(
timeseries.WithGranularities(
[]timeseries.Granularity{
{Granularity: time.Second, Count: 60},
{Granularity: time.Minute, Count: 60},
{Granularity: time.Hour, Count: 24},
{Granularity: time.Hour * 24, Count: 7},
}),
timeseries.WithClock(&myClock),
)
if err != nil {
// handle error
}
```
### Filling the time series
To fill the time series with counts, e.g. events, you can use two different functions.
```go
import "github.com/codesuki/go-time-series"
...
ts, err := timeseries.NewTimeSeries()
if err != nil {
// handle error
}
ts.Increase(2) // adds 2 to the counter at the current time
ts.IncreaseAtTime(3, time.Now().Add(-2 * time.Minute)) // adds 3 to the counter 2 minutes ago
```
### Querying the time series
The `Range()` function takes 2 arguments, i.e. the start and end of a time span.
`Recent()` is a small helper function that just uses `clock.Now()` as `end` in `Range`.
Please refer to the [documentation](https://godoc.org/github.com/codesuki/go-time-series) for how `Range()` works exactly. There are some details depending on what range you query and what range is available.
```go
import "github.com/codesuki/go-time-series"
...
ts, err := timeseries.NewTimeSeries()
if err != nil {
// handle error
}
ts.Increase(2) // adds 2 to the counter at the current time
// 1s passes
ts.Increase(3)
// 1s passes
ts.Recent(5 * time.Second) // returns 5
ts.Range(time.Now().Add(-5 * time.Second), time.Now()) // returns 5
```
## Documentation
GoDoc is located [here](https://godoc.org/github.com/codesuki/go-time-series)
## License
go-time-series is [MIT licensed](./LICENSE).

View file

@ -0,0 +1,132 @@
package timeseries
import (
"log"
"time"
)
type level struct {
clock Clock
granularity time.Duration
length int
end time.Time
oldest int
newest int
buckets []int
}
func newLevel(clock Clock, granularity time.Duration, length int) level {
level := level{clock: clock, granularity: granularity, length: length}
level.init()
return level
}
func (l *level) init() {
buckets := make([]int, l.length)
l.buckets = buckets
l.clear(time.Time{})
}
func (l *level) clear(time time.Time) {
l.oldest = 1
l.newest = 0
l.end = time.Truncate(l.granularity)
for i := range l.buckets {
l.buckets[i] = 0
}
}
func (l *level) duration() time.Duration {
return l.granularity*time.Duration(l.length) - l.granularity
}
func (l *level) earliest() time.Time {
return l.end.Add(-l.duration())
}
func (l *level) latest() time.Time {
return l.end
}
func (l *level) increaseAtTime(amount int, time time.Time) {
difference := l.end.Sub(time.Truncate(l.granularity))
if difference < 0 {
// this cannot be negative because we advance before
// can at least be 0
log.Println("level.increaseTime was called with a time in the future")
}
// l.length-1 because the newest element is always l.length-1 away from oldest
steps := (l.length - 1) - int(difference/l.granularity)
index := (l.oldest + steps) % l.length
l.buckets[index] += amount
}
func (l *level) advance(target time.Time) {
if !l.end.Before(target) {
return
}
for target.After(l.end) {
l.end = l.end.Add(l.granularity)
l.buckets[l.oldest] = 0
l.newest = l.oldest
l.oldest = (l.oldest + 1) % len(l.buckets)
}
}
// TODO: find a better way to handle latest parameter
// The parameter is used to avoid the overlap computation if end overlaps with the current time.
// Probably will find away when implementing redis version.
func (l *level) sumInterval(start, end time.Time, latest time.Time) float64 {
if start.Before(l.earliest()) {
start = l.earliest()
}
if end.After(l.latest()) {
end = l.latest()
}
idx := 0
// this is how many time steps start is away from earliest
startSteps := start.Sub(l.earliest()) / l.granularity
idx += int(startSteps)
currentTime := l.earliest()
currentTime = currentTime.Add(startSteps * l.granularity)
sum := 0.0
for idx < l.length && currentTime.Before(end) {
nextTime := currentTime.Add(l.granularity)
if nextTime.After(latest) {
nextTime = latest
}
if nextTime.Before(start) {
// the case nextTime.Before(start) happens when start is after latest
// therefore we don't have data and can return
break
}
count := float64(l.buckets[(l.oldest+idx)%l.length])
if currentTime.Before(start) || nextTime.After(end) {
// current bucket overlaps time range
overlapStart := max(currentTime, start)
overlapEnd := min(nextTime, end)
overlap := overlapEnd.Sub(overlapStart).Seconds() / l.granularity.Seconds()
count *= overlap
}
sum += count
idx++
currentTime = currentTime.Add(l.granularity)
}
return sum
}
func min(t1, t2 time.Time) time.Time {
if t1.Before(t2) {
return t1
}
return t2
}
func max(t1, t2 time.Time) time.Time {
if t1.After(t2) {
return t1
}
return t2
}

View file

@ -0,0 +1,272 @@
package timeseries
import (
"errors"
"time"
)
// Explanation
// Have several granularity buckets
// 1s, 1m, 5m, ...
// The buckets will be in circular arrays
//
// For example we could have
// 60 1s buckets to make up 1 minute
// 60 1m buckets to make up 1 hour
// ...
// This would enable us to get the last 1 minute data at 1s granularity (every second)
//
// Date ranges are [start, end[
//
// Put:
// Every time an event comes we add it to all corresponding buckets
//
// Example:
// Event time = 12:00:00
// 1s bucket = 12:00:00
// 1m bucket = 12:00:00
// 5m bucket = 12:00:00
//
// Event time = 12:00:01
// 1s bucket = 12:00:01
// 1m bucket = 12:00:00
// 5m bucket = 12:00:00
//
// Event time = 12:01:01
// 1s bucket = 12:01:01
// 1m bucket = 12:01:00
// 5m bucket = 12:00:00
//
// Fetch:
// Given a time span we try to find the buckets with the finest granularity
// to satisfy the time span and return the sum of their contents
//
// Example:
// Now = 12:05:30
// Time span = 12:05:00 - 12:05:02
// Return sum of 1s buckets 0,1
//
// Now = 12:10:00
// Time span = 12:05:00 - 12:07:00
// Return sum of 1m buckets 5,6
//
// Now = 12:10:00
// Time span = 12:00:00 - 12:10:00 (last 10 minutes)
// Return sum of 5m buckets 0,1
//
// Now = 12:10:01
// Time span = 12:05:01 - 12:10:01 (last 5 minutes)
// Return sum of 5m buckets (59/(5*60))*1, (1/(5*60))*2
//
// Now = 12:10:01
// Time span = 12:04:01 - 12:10:01 (last 6 minutes)
// Return sum of 1m buckets (59/60)*4, 5, 6, 7, 8, 9, (1/60)*10
var (
// ErrBadRange indicates that the given range is invalid. Start should always be <= End
ErrBadRange = errors.New("timeseries: range is invalid")
// ErrBadGranularities indicates that the provided granularities are not strictly increasing
ErrBadGranularities = errors.New("timeseries: granularities must be strictly increasing and non empty")
// ErrRangeNotCovered indicates that the provided range lies outside the time series
ErrRangeNotCovered = errors.New("timeseries: range is not convered")
)
// defaultGranularities are used in case no granularities are provided to the constructor.
var defaultGranularities = []Granularity{
{time.Second, 60},
{time.Minute, 60},
{time.Hour, 24},
}
// Clock specifies the needed time related functions used by the time series.
// To use a custom clock implement the interface and pass it to the time series constructor.
// The default clock uses time.Now()
type Clock interface {
Now() time.Time
}
// defaultClock is used in case no clock is provided to the constructor.
type defaultClock struct{}
func (c *defaultClock) Now() time.Time {
return time.Now()
}
// Granularity describes the granularity for one level of the time series.
// Count cannot be 0.
type Granularity struct {
Granularity time.Duration
Count int
}
type options struct {
clock Clock
granularities []Granularity
}
// Option configures the time series.
type Option func(*options)
// WithClock returns a Option that sets the clock used by the time series.
func WithClock(c Clock) Option {
return func(o *options) {
o.clock = c
}
}
// WithGranularities returns a Option that sets the granularites used by the time series.
func WithGranularities(g []Granularity) Option {
return func(o *options) {
o.granularities = g
}
}
type TimeSeries struct {
clock Clock
levels []level
pending int
pendingTime time.Time
latest time.Time
}
// NewTimeSeries creates a new time series with the provided options.
// If no options are provided default values are used.
func NewTimeSeries(os ...Option) (*TimeSeries, error) {
opts := options{}
for _, o := range os {
o(&opts)
}
if opts.clock == nil {
opts.clock = &defaultClock{}
}
if opts.granularities == nil {
opts.granularities = defaultGranularities
}
return newTimeSeries(opts.clock, opts.granularities)
}
func newTimeSeries(clock Clock, granularities []Granularity) (*TimeSeries, error) {
err := checkGranularities(granularities)
if err != nil {
return nil, err
}
return &TimeSeries{clock: clock, levels: createLevels(clock, granularities)}, nil
}
func checkGranularities(granularities []Granularity) error {
if len(granularities) == 0 {
return ErrBadGranularities
}
last := time.Duration(0)
for i := 0; i < len(granularities); i++ {
if granularities[i].Count == 0 {
return ErrBadGranularities
}
if granularities[i].Granularity <= last {
return ErrBadGranularities
}
last = granularities[i].Granularity
}
return nil
}
func createLevels(clock Clock, granularities []Granularity) []level {
levels := make([]level, len(granularities))
for i := range granularities {
levels[i] = newLevel(clock, granularities[i].Granularity, granularities[i].Count)
}
return levels
}
// Increase adds amount at current time.
func (t *TimeSeries) Increase(amount int) {
t.IncreaseAtTime(amount, t.clock.Now())
}
// IncreaseAtTime adds amount at a specific time.
func (t *TimeSeries) IncreaseAtTime(amount int, time time.Time) {
if time.After(t.latest) {
t.latest = time
}
if time.After(t.pendingTime) {
t.advance(time)
t.pending = amount
} else if time.After(t.pendingTime.Add(-t.levels[0].granularity)) {
t.pending++
} else {
t.increaseAtTime(amount, time)
}
}
func (t *TimeSeries) increaseAtTime(amount int, time time.Time) {
for i := range t.levels {
if time.Before(t.levels[i].latest().Add(-1 * t.levels[i].duration())) {
continue
}
t.levels[i].increaseAtTime(amount, time)
}
}
func (t *TimeSeries) advance(target time.Time) {
// we need this here because advance is called from other locations
// than IncreaseAtTime that don't check by themselves
if !target.After(t.pendingTime) {
return
}
t.advanceLevels(target)
t.handlePending()
}
func (t *TimeSeries) advanceLevels(target time.Time) {
for i := range t.levels {
if !target.Before(t.levels[i].latest().Add(t.levels[i].duration())) {
t.levels[i].clear(target)
continue
}
t.levels[i].advance(target)
}
}
func (t *TimeSeries) handlePending() {
t.increaseAtTime(t.pending, t.pendingTime)
t.pending = 0
t.pendingTime = t.levels[0].latest()
}
// Recent returns the sum over [now-duration, now).
func (t *TimeSeries) Recent(duration time.Duration) (float64, error) {
now := t.clock.Now()
return t.Range(now.Add(-duration), now)
}
// Range returns the sum over the given range [start, end).
// ErrBadRange is returned if start is after end.
// ErrRangeNotCovered is returned if the range lies outside the time series.
func (t *TimeSeries) Range(start, end time.Time) (float64, error) {
if start.After(end) {
return 0, ErrBadRange
}
t.advance(t.clock.Now())
if ok, err := t.intersects(start, end); !ok {
return 0, err
}
for i := range t.levels {
// use !start.Before so earliest() is included
// if we use earliest().Before() we won't get start
if !start.Before(t.levels[i].earliest()) {
return t.levels[i].sumInterval(start, end, t.latest), nil
}
}
return t.levels[len(t.levels)-1].sumInterval(start, end, t.latest), nil
}
func (t *TimeSeries) intersects(start, end time.Time) (bool, error) {
biggestLevel := t.levels[len(t.levels)-1]
if end.Before(biggestLevel.latest().Add(-biggestLevel.duration())) {
return false, ErrRangeNotCovered
}
if start.After(t.levels[0].latest()) {
return false, ErrRangeNotCovered
}
return true, nil
}

View file

@ -0,0 +1,336 @@
package timeseries
import (
"testing"
"time"
"github.com/benbjohnson/clock"
)
// TODO: do table based testing
func setup() (*TimeSeries, *clock.Mock) {
clock := clock.NewMock()
ts, _ := NewTimeSeries(
WithClock(clock),
WithGranularities(
[]Granularity{
{time.Second, 60},
{time.Minute, 60},
},
),
)
return ts, clock
}
func TestClock(t *testing.T) {
clock := &defaultClock{}
// there is a small chance this won't pass
if clock.Now().Truncate(time.Second) != time.Now().Truncate(time.Second) {
t.Errorf("default clock does not track time.Now")
}
}
func TestNewTimeSeries(t *testing.T) {
ts, err := NewTimeSeries()
if ts == nil {
t.Errorf("constructor returned nil")
}
if err != nil {
t.Errorf("should not return error")
}
}
func TestNewTimeSeriesWithGranularities(t *testing.T) {
granularities := []Granularity{
{time.Second, 60},
{time.Minute, 60},
{time.Hour, 24},
}
ts, err := NewTimeSeries(WithGranularities(granularities))
if ts == nil || err != nil {
t.Error("could not create time series")
}
badGranularities := []Granularity{
{time.Minute, 60},
{time.Second, 60},
{time.Hour, 24},
}
_, err = NewTimeSeries(WithGranularities(badGranularities))
if err != ErrBadGranularities {
t.Error("should not accept decreasing granularities")
}
badGranularities = []Granularity{
{time.Minute, 60},
{time.Second, 0},
{time.Hour, 24},
}
_, err = NewTimeSeries(WithGranularities(badGranularities))
if err != ErrBadGranularities {
t.Error("should not accept granularities with zero count")
}
_, err = NewTimeSeries(WithGranularities([]Granularity{}))
if err != ErrBadGranularities {
t.Error("should not accept empty granularities")
}
}
func TestNewTimeSeriesWithClock(t *testing.T) {
clock := clock.NewMock()
ts, _ := NewTimeSeries(WithClock(clock))
ts.Increase(2)
clock.Add(time.Second * 1)
ts.Increase(1)
res, _ := ts.Range(time.Unix(0, 0), time.Unix(1, 0))
if res != 2 {
t.Errorf("expected %d got %f", 2, res)
}
}
func TestRecentSeconds(t *testing.T) {
ts, clock := setup()
clock.Add(time.Minute * 5)
ts.Increase(1)
clock.Add(time.Second * 1)
ts.Increase(2)
clock.Add(time.Second * 1)
ts.Increase(3)
res, _ := ts.Recent(time.Second)
if res != 2 {
t.Errorf("expected %d got %f", 2, res)
}
res, _ = ts.Recent(2 * time.Second)
if res != 3 {
t.Errorf("expected %d got %f", 3, res)
}
// test earliest second
clock.Add(57 * time.Second) // time: 09:05:59
res, _ = ts.Recent(59 * time.Second)
if res != 6 {
t.Errorf("expected %d got %f", 6, res)
}
// test future time
clock.Add(1 * time.Second)
clock.Add(57 * time.Second) // time: 09:06:00
res, _ = ts.Recent(59 * time.Second)
if res != 0 {
t.Errorf("expected %d got %f", 0, res)
}
}
func TestRecentMinutes(t *testing.T) {
ts, clock := setup()
clock.Add(time.Minute * 1) // 09:01:00
ts.Increase(60)
clock.Add(time.Minute * 1) // 09:02:00
ts.Increase(1)
clock.Add(time.Minute * 1) // 09:03:00
ts.Increase(60)
clock.Add(time.Second * 1) // 09:03:01
ts.Increase(3)
// test interpolation at beginning
// 59/60 * 60 + 1 + 60 = 120
res, _ := ts.Recent(2 * time.Minute)
if res != 120 {
t.Errorf("expected %d got %f", 120, res)
}
// test interpolation at end
// 60/2 = 30
res, _ = ts.Range(
clock.Now().Add(-2*time.Minute+-1*time.Second), // 09:01:00
clock.Now().Add(-1*time.Minute+-31*time.Second), // 09:01:30
)
if res != 30 {
t.Errorf("expected %d got %f", 30, res)
}
// get from earliest data point
clock.Add(time.Second*59 + time.Minute*56)
ts.Increase(60)
clock.Add(time.Minute * 1)
ts.Increase(70)
clock.Add(time.Minute * 59)
res, _ = ts.Recent(time.Minute * 60)
if res != 70 {
t.Errorf("expected %d got %f", 70, res)
}
}
func TestRecentWholeRange(t *testing.T) {
ts, clock := setup()
clock.Add(time.Minute * 1) // 09:01:00
ts.Increase(60)
clock.Add(time.Minute * 1) // 09:02:00
ts.Increase(1)
clock.Add(time.Minute * 1) // 09:03:00
ts.Increase(60)
clock.Add(time.Second * 1) // 09:03:01
ts.Increase(3)
// 60 + 1 + 60 = 121
res, _ := ts.Recent(60 * time.Minute)
if res != 121 {
t.Errorf("expected %d got %f", 62, res)
}
}
func TestRecentWholeRangeBig(t *testing.T) {
ts, clock := setup()
clock.Add(time.Minute * 1) // 09:01:00
ts.Increase(60)
clock.Add(time.Minute * 1) // 09:02:00
ts.Increase(1)
clock.Add(time.Minute * 1) // 09:03:00
ts.Increase(60)
clock.Add(time.Second * 1) // 09:03:01
ts.Increase(3)
// 60 + 1 + 60 = 121
res, _ := ts.Recent(120 * time.Minute)
if res != 121 {
t.Errorf("expected %d got %f", 121, res)
}
}
func TestRangeEndInFuture(t *testing.T) {
ts, clock := setup()
clock.Add(time.Minute * 1) // 09:01:00
ts.Increase(1)
res, _ := ts.Range(clock.Now().Add(-1*time.Minute), clock.Now().Add(5*time.Minute))
if res != 0 {
t.Errorf("expected %d got %f", 0, res)
}
}
func TestRangeBadRange(t *testing.T) {
ts, clock := setup()
clock.Add(time.Minute * 1) // 09:01:00
ts.Increase(60)
clock.Add(time.Minute * 1) // 09:02:00
ts.Increase(1)
clock.Add(time.Minute * 1) // 09:03:00
ts.Increase(60)
clock.Add(time.Second * 1) // 09:03:01
ts.Increase(3)
// start is after end
_, err := ts.Range(clock.Now().Add(time.Minute), clock.Now())
if err != ErrBadRange {
t.Errorf("should return ErrBadRange")
}
// range is after end
_, err = ts.Range(clock.Now().Add(time.Minute), clock.Now().Add(5*time.Minute))
if err != ErrRangeNotCovered {
t.Errorf("should return ErrRangeNotCovered")
}
// range is before start
_, err = ts.Range(clock.Now().Add(-5*time.Hour), clock.Now().Add(-4*time.Hour))
if err != ErrRangeNotCovered {
t.Errorf("should return ErrRangeNotCovered")
}
}
func TestIncrease(t *testing.T) {
ts, clock := setup()
// time 12:00
ts.Increase(2)
clock.Add(time.Minute * 1) // time: 12:01:00
ts.Increase(4)
clock.Add(time.Minute * 1) // time: 12:02:00
ts.Increase(6)
clock.Add(time.Second * 10) // time: 12:02:10
ts.Increase(2)
clock.Add(time.Second * 10) // time: 12:02:20
ts.Increase(2)
clock.Add(time.Second * 10) // time: 12:02:30
ts.Increase(2)
clock.Add(time.Second * 10) // time: 12:02:40
ts.Increase(2)
clock.Add(time.Second * 10) // time: 12:02:50
ts.Increase(2)
clock.Add(time.Second * 10) // time: 12:03:00
ts.Increase(2)
// get range from 12:00:30 - 12:02:30
// 0.5 * 2 + 4 + 0.5 * 16 = 13
res, _ := ts.Range(clock.Now().Add(-time.Second*150), clock.Now().Add(-time.Second*30))
if res != 13 {
t.Errorf("expected %d got %f", 13, res)
}
// get range from 12:01:00 - 12:02:00
// = 4
res, _ = ts.Range(clock.Now().Add(-time.Second*120), clock.Now().Add(-time.Second*60))
if res != 4 {
t.Errorf("expected %d got %f", 4, res)
}
}
func TestIncreasePending(t *testing.T) {
ts, clock := setup()
ts.Increase(1) // this should advance and reset pending
ts.Increase(1) // this should increase pending
clock.Add(time.Second)
ts.Increase(1)
res, _ := ts.Recent(59 * time.Second)
if res != 2 {
t.Errorf("expected %d got %f", 2, res)
}
clock.Add(time.Second) // the latest data gets merged in because time advanced
res, _ = ts.Recent(59 * time.Second)
if res != 3 {
t.Errorf("expected %d got %f", 3, res)
}
}
func TestIncreaseAtTime(t *testing.T) {
ts, clock := setup()
ts.Increase(60) // time: 09:00:00
clock.Add(time.Second) // time: 09:00:01
ts.IncreaseAtTime(60, clock.Now().Add(-1*time.Minute)) // time: 08:59:01
ts.Increase(1) // time: 09:00:01
// from: 08:59:01 - 09:00:01
// (59/60 * 60) + 60 = 119
res, _ := ts.Recent(time.Minute)
if res != 119 {
t.Errorf("expected %d got %f", 119, res)
}
// from: 08:59:00 - 09:00:00
// 60
res, _ = ts.Range(
clock.Now().Add(-1*time.Minute+-1*time.Second),
clock.Now().Add(-1*time.Second),
)
if res != 60 {
t.Errorf("expected %d got %f", 60, res)
}
}

21
internal/trending/LICENSE Normal file
View file

@ -0,0 +1,21 @@
MIT License
Copyright (c) 2016 Neri Marschik
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.

View file

@ -0,0 +1,74 @@
# go-trending
[![License](http://img.shields.io/badge/license-MIT-red.svg?style=flat)](./LICENSE)
[![GoDoc](https://godoc.org/github.com/codesuki/go-trending?status.svg)](https://godoc.org/github.com/codesuki/go-trending)
[![Build Status](http://img.shields.io/travis/codesuki/go-trending.svg?style=flat)](https://travis-ci.org/codesuki/go-trending)
[![codecov](https://codecov.io/gh/codesuki/go-trending/branch/master/graph/badge.svg)](https://codecov.io/gh/codesuki/go-trending)
Trending algorithm based on the article [Trending at Instagram](http://instagram-engineering.tumblr.com/post/122961624217/trending-at-instagram). To detect trends an items current behavior is compared to its usual behavior. The more it differes the higher / lower the score. Items will start trending if the current usage is higher than its average usage. To avoid items quickly become non-trending again the scores are smoothed.
* Configurable and simple to use
* Use your own clock implementation, e.g. for testing or similar
* Use any time series implementation as backend that implements the TimeSeries interface
### Details
Uses a [time series](https://www.github.com/codesuki/go-time-series) for each item to keep track of its past behavior and get recent behavior with small granularity. Computes the [Kullback-Leibler divergence](https://en.wikipedia.org/wiki/Kullback%E2%80%93Leibler_divergence) between recent behavior and expected, i.e. past, bahavior. Then blends the current item score with its past [decayed](https://en.wikipedia.org/wiki/Exponential_decay) maximum score to get the final score.
## Examples
### Creating a default scorer
```go
import "github.com/codesuki/go-trending"
...
scorer := trending.NewScorer()
```
### Creating a customized scorer
**Parameters**
* **Time series:** is used for creating the backing `TimeSeries` objects
* **Half-life:** controls how long an item is trending after the activity went back to normal.
* **Recent duration:** controls how much data is used to compute the current state. If there is not much activity try looking at larger duration.
* **Storage duration:** controls how much historical data is used. Trends older than the storage duration won't have any effect on the computation. The time series in use should have at least as much storage duration as specified here.
```go
import "github.com/codesuki/go-trending"
...
func NewTimeSeries(id string) TimeSeries {
// create time series that satisfies the TimeSeries interface
return timeSeries
}
...
scorer := trending.NewScorer(
WithTimeSeries(NewTimeSeries),
WithHalflife(time.Hour),
WithRecentDuration(time.Minute),
WithStorageDuration(7 * 24 * time.Hour),
)
```
### Using the scorer
```go
import "github.com/codesuki/go-trending"
...
scorer := trending.NewScorer()
scorer.AddEvent("id", time)
// add more events. maybe using an event stream.
...
trendingItems := scorer.Score()
```
## Documentation
GoDoc is located [here](https://godoc.org/github.com/codesuki/go-trending)
## License
go-trending is [MIT licensed](./LICENSE).

101
internal/trending/item.go Normal file
View file

@ -0,0 +1,101 @@
package trending
import (
"math"
"time"
)
type item struct {
eventSeries TimeSeries
maxSeries SlidingWindow
max float64
maxTime time.Time
options *options
// TODO: move outside of item because it's the same for all items
defaultExpectation float64
defaultHourlyCount float64
}
func newItem(id string, options *options) *item {
defaultHourlyCount := float64(options.baseCount) * float64(options.storageDuration/time.Hour)
defaultExpectation := float64(options.baseCount) / float64(time.Hour/options.recentDuration)
return &item{
eventSeries: options.creator(id),
maxSeries: options.slidingWindowCreator(id),
options: options,
defaultExpectation: defaultExpectation,
defaultHourlyCount: defaultHourlyCount,
}
}
func (i *item) score() score {
recentCount, count := i.computeCounts()
if recentCount < i.options.countThreshold {
return score{}
}
if recentCount == count {
// we see this for the first time so there is no historical data
// use a sensible default like average/median over all items
count = recentCount + i.defaultHourlyCount
}
probability := recentCount / count
// order of those two lines is important.
// if we insert before reading we might just get the same value.
expectation := i.computeRecentMax()
i.maxSeries.Insert(probability)
if expectation == 0.0 {
expectation = i.defaultExpectation
}
klScore := computeKullbackLeibler(probability, expectation)
if klScore > i.max {
i.updateMax(klScore)
}
i.decayMax()
mixedScore := 0.5 * (klScore + i.max)
return score{
Score: mixedScore,
Probability: probability,
Expectation: expectation,
Maximum: i.max,
KLScore: klScore,
}
}
func (i *item) computeCounts() (float64, float64) {
now := time.Now()
totalCount, _ := i.eventSeries.Range(now.Add(-i.options.storageDuration), now)
count, _ := i.eventSeries.Range(now.Add(-i.options.recentDuration), now)
return count, totalCount
}
func (i *item) computeRecentMax() float64 {
return i.maxSeries.Max()
}
func (i *item) decayMax() {
i.updateMax(i.max * i.computeExponentialDecayMultiplier())
}
func (i *item) updateMax(score float64) {
i.max = score
i.maxTime = time.Now()
}
func (i *item) computeExponentialDecayMultiplier() float64 {
return math.Pow(0.5, float64(time.Now().Unix()-i.maxTime.Unix())/i.options.halfLife.Seconds())
}
func computeKullbackLeibler(probability float64, expectation float64) float64 {
if probability == 0.0 {
return 0.0
}
return probability * math.Log(probability/expectation)
}

View file

@ -0,0 +1,40 @@
package trending
type score struct {
ID string
Score float64
Probability float64
Expectation float64
Maximum float64
KLScore float64
}
type Scores []score
func (s Scores) Len() int {
return len(s)
}
func (s Scores) Swap(i, j int) {
s[i], s[j] = s[j], s[i]
}
func (s Scores) Less(i, j int) bool {
return s[i].Score > s[j].Score
}
func (s Scores) take(count int) Scores {
if count >= len(s) {
return s
}
return s[0 : count-1]
}
func (s Scores) threshold(t float64) Scores {
for i := range s {
if s[i].Score < t {
return s[0:i]
}
}
return s
}

View file

@ -0,0 +1,123 @@
package slidingwindow
import "time"
// Clock specifies the needed time related functions used by the time series.
// To use a custom clock implement the interface and pass it to the time series constructor.
// The default clock uses time.Now()
type Clock interface {
Now() time.Time
}
// defaultClock is used in case no clock is provided to the constructor.
type defaultClock struct{}
func (c *defaultClock) Now() time.Time {
return time.Now()
}
type slidingWindow struct {
buffer []float64
length int
end time.Time
start time.Time
oldest int
newest int
step time.Duration
duration time.Duration
clock Clock
}
var defaultStep = time.Hour * 24
var defaultDuration = time.Hour * 24 * 7
type options struct {
clock Clock
step time.Duration
duration time.Duration
}
type option func(*options)
func WithStep(step time.Duration) option {
return func(o *options) {
o.step = step
}
}
func WithDuration(duration time.Duration) option {
return func(o *options) {
o.duration = duration
}
}
func WithClock(clock Clock) option {
return func(o *options) {
o.clock = clock
}
}
func NewSlidingWindow(os ...option) *slidingWindow {
opts := options{}
for _, o := range os {
o(&opts)
}
if opts.clock == nil {
opts.clock = &defaultClock{}
}
if opts.step.Nanoseconds() == 0 {
opts.step = defaultStep
}
if opts.duration.Nanoseconds() == 0 {
opts.duration = defaultDuration
}
return newSlidingWindow(opts.step, opts.duration, opts.clock)
}
func newSlidingWindow(step time.Duration, duration time.Duration, clock Clock) *slidingWindow {
length := int(duration / step)
now := clock.Now()
return &slidingWindow{
buffer: make([]float64, length),
length: length,
end: now.Truncate(step).Add(-duration),
start: now,
step: step,
duration: duration,
oldest: 1,
clock: clock,
}
}
func (sw *slidingWindow) Insert(score float64) {
sw.advance()
if score > sw.buffer[sw.newest] {
sw.buffer[sw.newest] = score
}
}
func (sw *slidingWindow) Max() float64 {
sw.advance()
max := 0.0
for i := range sw.buffer {
if sw.buffer[i] > max {
max = sw.buffer[i]
}
}
return max
}
func (sw *slidingWindow) advance() {
newEnd := sw.clock.Now().Truncate(sw.step).Add(-sw.duration)
for newEnd.After(sw.end) {
sw.end = sw.end.Add(sw.step)
sw.buffer[sw.oldest] = 0.0
sw.newest = sw.oldest
sw.oldest = (sw.oldest + 1) % sw.length
}
}

View file

@ -0,0 +1,193 @@
package trending
import (
"sort"
"time"
timeseries "dynatron.me/x/stillbox/internal/timeseries"
"dynatron.me/x/stillbox/internal/trending/slidingwindow"
)
// Algorithm:
// 1. Divide one week into 5 minutes bins
// The algorithm uses expected probability to compute its ranking.
// By choosing a one week span to compute the expectation the algorithm will forget old trends.
// 2. For every play event increase the counter in the current bin
// 3. Compute the KL Divergence with the following steps
// - Compute the probability of the last full bin (this should be the current 5 minutes sliding window)
// - Compute the expected probability over the past bins including the current bin
// - Compute KL Divergence (kld = p * ln(p/e))
// 4. Keep the highest KL Divergence score together with its timestamp
// 5. Compute exponential decay multiplier and multiply with highest KL Divergence
// 6. Blend current KL Divergence score with decayed high score
var defaultHalfLife = 2 * time.Hour
var defaultRecentDuration = 5 * time.Minute
var defaultStorageDuration = 7 * 24 * time.Hour
var defaultMaxResults = 100
var defaultBaseCount = 3
var defaultScoreThreshold = 0.01
var defaultCountThreshold = 3.0
type options struct {
creator TimeSeriesCreator
slidingWindowCreator SlidingWindowCreator
halfLife time.Duration
recentDuration time.Duration
storageDuration time.Duration
maxResults int
baseCount int
scoreThreshold float64
countThreshold float64
}
type Option func(*options)
func WithTimeSeries(creator TimeSeriesCreator) Option {
return func(o *options) {
o.creator = creator
}
}
func WithSlidingWindow(creator SlidingWindowCreator) Option {
return func(o *options) {
o.slidingWindowCreator = creator
}
}
func WithHalfLife(halfLife time.Duration) Option {
return func(o *options) {
o.halfLife = halfLife
}
}
func WithRecentDuration(recentDuration time.Duration) Option {
return func(o *options) {
o.recentDuration = recentDuration
}
}
func WithStorageDuration(storageDuration time.Duration) Option {
return func(o *options) {
o.storageDuration = storageDuration
}
}
func WithMaxResults(maxResults int) Option {
return func(o *options) {
o.maxResults = maxResults
}
}
func WithScoreThreshold(threshold float64) Option {
return func(o *options) {
o.scoreThreshold = threshold
}
}
func WithCountThreshold(threshold float64) Option {
return func(o *options) {
o.countThreshold = threshold
}
}
type Scorer struct {
options options
items map[string]*item
}
type SlidingWindow interface {
Insert(score float64)
Max() float64
}
type SlidingWindowCreator func(string) SlidingWindow
type TimeSeries interface {
IncreaseAtTime(amount int, time time.Time)
Range(start, end time.Time) (float64, error)
}
type TimeSeriesCreator func(string) TimeSeries
func NewMemoryTimeSeries(id string) TimeSeries {
ts, _ := timeseries.NewTimeSeries(timeseries.WithGranularities(
[]timeseries.Granularity{
{Granularity: time.Second, Count: 60},
{Granularity: time.Minute, Count: 10},
{Granularity: time.Hour, Count: 24},
{Granularity: time.Hour * 24, Count: 7},
},
))
return ts
}
func NewScorer(options ...Option) Scorer {
scorer := Scorer{items: make(map[string]*item)}
for _, o := range options {
o(&scorer.options)
}
if scorer.options.creator == nil {
scorer.options.creator = NewMemoryTimeSeries
}
if scorer.options.halfLife == 0 {
scorer.options.halfLife = defaultHalfLife
}
if scorer.options.recentDuration == 0 {
scorer.options.recentDuration = defaultRecentDuration
}
if scorer.options.storageDuration == 0 {
scorer.options.storageDuration = defaultStorageDuration
}
if scorer.options.maxResults == 0 {
scorer.options.maxResults = defaultMaxResults
}
if scorer.options.scoreThreshold == 0.0 {
scorer.options.scoreThreshold = defaultScoreThreshold
}
if scorer.options.countThreshold == 0.0 {
scorer.options.countThreshold = defaultCountThreshold
}
if scorer.options.baseCount == 0.0 {
scorer.options.baseCount = defaultBaseCount
}
if scorer.options.slidingWindowCreator == nil {
scorer.options.slidingWindowCreator = func(id string) SlidingWindow {
return slidingwindow.NewSlidingWindow(
slidingwindow.WithStep(time.Hour*24),
slidingwindow.WithDuration(scorer.options.storageDuration),
)
}
}
return scorer
}
func (s *Scorer) AddEvent(id string, time time.Time) {
item := s.items[id]
if item == nil {
item = newItem(id, &s.options)
s.items[id] = item
}
s.addToItem(item, time)
}
func (s *Scorer) addToItem(item *item, time time.Time) {
item.eventSeries.IncreaseAtTime(1, time)
}
func (s *Scorer) Score() Scores {
var scores Scores
for id, item := range s.items {
score := item.score()
score.ID = id
scores = append(scores, score)
}
sort.Sort(scores)
if s.options.scoreThreshold > 0 {
scores = scores.threshold(s.options.scoreThreshold)
}
return scores.take(s.options.maxResults)
}