diff --git a/internal/timeseries/LICENSE b/internal/timeseries/LICENSE new file mode 100644 index 0000000..9934510 --- /dev/null +++ b/internal/timeseries/LICENSE @@ -0,0 +1,21 @@ +MIT License + +Copyright (c) 2016 Neri Marschik + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. diff --git a/internal/timeseries/README.md b/internal/timeseries/README.md new file mode 100644 index 0000000..bf9a7be --- /dev/null +++ b/internal/timeseries/README.md @@ -0,0 +1,112 @@ +# go-time-series + + +[![License](http://img.shields.io/badge/license-MIT-red.svg?style=flat)](./LICENSE) +[![GoDoc](https://godoc.org/github.com/codesuki/go-time-series?status.svg)](https://godoc.org/github.com/codesuki/go-time-series) +[![Build Status](http://img.shields.io/travis/codesuki/go-time-series.svg?style=flat)](https://travis-ci.org/codesuki/go-time-series) +[![codecov](https://codecov.io/gh/codesuki/go-time-series/branch/master/graph/badge.svg)](https://codecov.io/gh/codesuki/go-time-series) + +Time series implementation in Go. + +It is used in [go-trending](https://www.github.com/codesuki/go-trending) as a backend for a trending algorithm. +The time series supports storing counts at different granularities, e.g. seconds, minutes, hours, ....
+In case of go-trending the time series is configured to have recent data available at small granularity, i.e. the recent 60 seconds, and historical data available at large granularity, i.e. the last few hours, days of data. + +A redis backend is planned. + +* Simple interface +* Store time series data at different granularities +* Use your own clock implementation, e.g. for testing or similar + +## Examples + +### Creating a time series with default settings +The default settings use `time.Now()` as clock and `time.Second * 60`, `time.Minute * 60` and `time.Hour * 24` as granularities. + +```go +import "github.com/codesuki/go-time-series" + +... + +ts, err := timeseries.NewTimeSeries() +if err != nil { + // handle error +} +``` + +### Creating a customized time series +You can specify the clock and/or granularities to use. A clock must implement the `timeseries.Clock` interface. + +```go +import "github.com/codesuki/go-time-series" + +... +type clock struct {} +func (c *clock) Now() { + return time.Time{} // always returns the zero time +} +var myClock clock +... + +ts, err := timeseries.NewTimeSeries( + timeseries.WithGranularities( + []timeseries.Granularity{ + {Granularity: time.Second, Count: 60}, + {Granularity: time.Minute, Count: 60}, + {Granularity: time.Hour, Count: 24}, + {Granularity: time.Hour * 24, Count: 7}, + }), + timeseries.WithClock(&myClock), +) +if err != nil { + // handle error +} +``` + +### Filling the time series +To fill the time series with counts, e.g. events, you can use two different functions. + +```go +import "github.com/codesuki/go-time-series" + +... + +ts, err := timeseries.NewTimeSeries() +if err != nil { + // handle error +} + +ts.Increase(2) // adds 2 to the counter at the current time +ts.IncreaseAtTime(3, time.Now().Add(-2 * time.Minute)) // adds 3 to the counter 2 minutes ago +``` + +### Querying the time series +The `Range()` function takes 2 arguments, i.e. the start and end of a time span. +`Recent()` is a small helper function that just uses `clock.Now()` as `end` in `Range`. +Please refer to the [documentation](https://godoc.org/github.com/codesuki/go-time-series) for how `Range()` works exactly. There are some details depending on what range you query and what range is available. + +```go +import "github.com/codesuki/go-time-series" + +... + +ts, err := timeseries.NewTimeSeries() +if err != nil { + // handle error +} + +ts.Increase(2) // adds 2 to the counter at the current time +// 1s passes +ts.Increase(3) +// 1s passes + +ts.Recent(5 * time.Second) // returns 5 + +ts.Range(time.Now().Add(-5 * time.Second), time.Now()) // returns 5 +``` + +## Documentation +GoDoc is located [here](https://godoc.org/github.com/codesuki/go-time-series) + +## License +go-time-series is [MIT licensed](./LICENSE). diff --git a/internal/timeseries/level.go b/internal/timeseries/level.go new file mode 100644 index 0000000..8dd41b0 --- /dev/null +++ b/internal/timeseries/level.go @@ -0,0 +1,132 @@ +package timeseries + +import ( + "log" + "time" +) + +type level struct { + clock Clock + granularity time.Duration + length int + end time.Time + oldest int + newest int + buckets []int +} + +func newLevel(clock Clock, granularity time.Duration, length int) level { + level := level{clock: clock, granularity: granularity, length: length} + level.init() + return level +} + +func (l *level) init() { + buckets := make([]int, l.length) + l.buckets = buckets + l.clear(time.Time{}) +} + +func (l *level) clear(time time.Time) { + l.oldest = 1 + l.newest = 0 + l.end = time.Truncate(l.granularity) + for i := range l.buckets { + l.buckets[i] = 0 + } +} + +func (l *level) duration() time.Duration { + return l.granularity*time.Duration(l.length) - l.granularity +} + +func (l *level) earliest() time.Time { + return l.end.Add(-l.duration()) +} + +func (l *level) latest() time.Time { + return l.end +} + +func (l *level) increaseAtTime(amount int, time time.Time) { + difference := l.end.Sub(time.Truncate(l.granularity)) + if difference < 0 { + // this cannot be negative because we advance before + // can at least be 0 + log.Println("level.increaseTime was called with a time in the future") + } + // l.length-1 because the newest element is always l.length-1 away from oldest + steps := (l.length - 1) - int(difference/l.granularity) + index := (l.oldest + steps) % l.length + l.buckets[index] += amount +} + +func (l *level) advance(target time.Time) { + if !l.end.Before(target) { + return + } + for target.After(l.end) { + l.end = l.end.Add(l.granularity) + l.buckets[l.oldest] = 0 + l.newest = l.oldest + l.oldest = (l.oldest + 1) % len(l.buckets) + } +} + +// TODO: find a better way to handle latest parameter +// The parameter is used to avoid the overlap computation if end overlaps with the current time. +// Probably will find away when implementing redis version. +func (l *level) sumInterval(start, end time.Time, latest time.Time) float64 { + if start.Before(l.earliest()) { + start = l.earliest() + } + if end.After(l.latest()) { + end = l.latest() + } + idx := 0 + // this is how many time steps start is away from earliest + startSteps := start.Sub(l.earliest()) / l.granularity + idx += int(startSteps) + + currentTime := l.earliest() + currentTime = currentTime.Add(startSteps * l.granularity) + + sum := 0.0 + for idx < l.length && currentTime.Before(end) { + nextTime := currentTime.Add(l.granularity) + if nextTime.After(latest) { + nextTime = latest + } + if nextTime.Before(start) { + // the case nextTime.Before(start) happens when start is after latest + // therefore we don't have data and can return + break + } + count := float64(l.buckets[(l.oldest+idx)%l.length]) + if currentTime.Before(start) || nextTime.After(end) { + // current bucket overlaps time range + overlapStart := max(currentTime, start) + overlapEnd := min(nextTime, end) + overlap := overlapEnd.Sub(overlapStart).Seconds() / l.granularity.Seconds() + count *= overlap + } + sum += count + idx++ + currentTime = currentTime.Add(l.granularity) + } + return sum +} + +func min(t1, t2 time.Time) time.Time { + if t1.Before(t2) { + return t1 + } + return t2 +} + +func max(t1, t2 time.Time) time.Time { + if t1.After(t2) { + return t1 + } + return t2 +} diff --git a/internal/timeseries/timeseries.go b/internal/timeseries/timeseries.go new file mode 100644 index 0000000..889cec5 --- /dev/null +++ b/internal/timeseries/timeseries.go @@ -0,0 +1,272 @@ +package timeseries + +import ( + "errors" + "time" +) + +// Explanation +// Have several granularity buckets +// 1s, 1m, 5m, ... +// The buckets will be in circular arrays +// +// For example we could have +// 60 1s buckets to make up 1 minute +// 60 1m buckets to make up 1 hour +// ... +// This would enable us to get the last 1 minute data at 1s granularity (every second) +// +// Date ranges are [start, end[ +// +// Put: +// Every time an event comes we add it to all corresponding buckets +// +// Example: +// Event time = 12:00:00 +// 1s bucket = 12:00:00 +// 1m bucket = 12:00:00 +// 5m bucket = 12:00:00 +// +// Event time = 12:00:01 +// 1s bucket = 12:00:01 +// 1m bucket = 12:00:00 +// 5m bucket = 12:00:00 +// +// Event time = 12:01:01 +// 1s bucket = 12:01:01 +// 1m bucket = 12:01:00 +// 5m bucket = 12:00:00 +// +// Fetch: +// Given a time span we try to find the buckets with the finest granularity +// to satisfy the time span and return the sum of their contents +// +// Example: +// Now = 12:05:30 +// Time span = 12:05:00 - 12:05:02 +// Return sum of 1s buckets 0,1 +// +// Now = 12:10:00 +// Time span = 12:05:00 - 12:07:00 +// Return sum of 1m buckets 5,6 +// +// Now = 12:10:00 +// Time span = 12:00:00 - 12:10:00 (last 10 minutes) +// Return sum of 5m buckets 0,1 +// +// Now = 12:10:01 +// Time span = 12:05:01 - 12:10:01 (last 5 minutes) +// Return sum of 5m buckets (59/(5*60))*1, (1/(5*60))*2 +// +// Now = 12:10:01 +// Time span = 12:04:01 - 12:10:01 (last 6 minutes) +// Return sum of 1m buckets (59/60)*4, 5, 6, 7, 8, 9, (1/60)*10 + +var ( + // ErrBadRange indicates that the given range is invalid. Start should always be <= End + ErrBadRange = errors.New("timeseries: range is invalid") + // ErrBadGranularities indicates that the provided granularities are not strictly increasing + ErrBadGranularities = errors.New("timeseries: granularities must be strictly increasing and non empty") + // ErrRangeNotCovered indicates that the provided range lies outside the time series + ErrRangeNotCovered = errors.New("timeseries: range is not convered") +) + +// defaultGranularities are used in case no granularities are provided to the constructor. +var defaultGranularities = []Granularity{ + {time.Second, 60}, + {time.Minute, 60}, + {time.Hour, 24}, +} + +// Clock specifies the needed time related functions used by the time series. +// To use a custom clock implement the interface and pass it to the time series constructor. +// The default clock uses time.Now() +type Clock interface { + Now() time.Time +} + +// defaultClock is used in case no clock is provided to the constructor. +type defaultClock struct{} + +func (c *defaultClock) Now() time.Time { + return time.Now() +} + +// Granularity describes the granularity for one level of the time series. +// Count cannot be 0. +type Granularity struct { + Granularity time.Duration + Count int +} + +type options struct { + clock Clock + granularities []Granularity +} + +// Option configures the time series. +type Option func(*options) + +// WithClock returns a Option that sets the clock used by the time series. +func WithClock(c Clock) Option { + return func(o *options) { + o.clock = c + } +} + +// WithGranularities returns a Option that sets the granularites used by the time series. +func WithGranularities(g []Granularity) Option { + return func(o *options) { + o.granularities = g + } +} + +type TimeSeries struct { + clock Clock + levels []level + pending int + pendingTime time.Time + latest time.Time +} + +// NewTimeSeries creates a new time series with the provided options. +// If no options are provided default values are used. +func NewTimeSeries(os ...Option) (*TimeSeries, error) { + opts := options{} + for _, o := range os { + o(&opts) + } + if opts.clock == nil { + opts.clock = &defaultClock{} + } + if opts.granularities == nil { + opts.granularities = defaultGranularities + } + return newTimeSeries(opts.clock, opts.granularities) +} + +func newTimeSeries(clock Clock, granularities []Granularity) (*TimeSeries, error) { + err := checkGranularities(granularities) + if err != nil { + return nil, err + } + return &TimeSeries{clock: clock, levels: createLevels(clock, granularities)}, nil +} + +func checkGranularities(granularities []Granularity) error { + if len(granularities) == 0 { + return ErrBadGranularities + } + last := time.Duration(0) + for i := 0; i < len(granularities); i++ { + if granularities[i].Count == 0 { + return ErrBadGranularities + } + if granularities[i].Granularity <= last { + return ErrBadGranularities + } + last = granularities[i].Granularity + } + return nil +} + +func createLevels(clock Clock, granularities []Granularity) []level { + levels := make([]level, len(granularities)) + for i := range granularities { + levels[i] = newLevel(clock, granularities[i].Granularity, granularities[i].Count) + } + return levels +} + +// Increase adds amount at current time. +func (t *TimeSeries) Increase(amount int) { + t.IncreaseAtTime(amount, t.clock.Now()) +} + +// IncreaseAtTime adds amount at a specific time. +func (t *TimeSeries) IncreaseAtTime(amount int, time time.Time) { + if time.After(t.latest) { + t.latest = time + } + if time.After(t.pendingTime) { + t.advance(time) + t.pending = amount + } else if time.After(t.pendingTime.Add(-t.levels[0].granularity)) { + t.pending++ + } else { + t.increaseAtTime(amount, time) + } +} + +func (t *TimeSeries) increaseAtTime(amount int, time time.Time) { + for i := range t.levels { + if time.Before(t.levels[i].latest().Add(-1 * t.levels[i].duration())) { + continue + } + t.levels[i].increaseAtTime(amount, time) + } +} + +func (t *TimeSeries) advance(target time.Time) { + // we need this here because advance is called from other locations + // than IncreaseAtTime that don't check by themselves + if !target.After(t.pendingTime) { + return + } + t.advanceLevels(target) + t.handlePending() +} + +func (t *TimeSeries) advanceLevels(target time.Time) { + for i := range t.levels { + if !target.Before(t.levels[i].latest().Add(t.levels[i].duration())) { + t.levels[i].clear(target) + continue + } + t.levels[i].advance(target) + } +} + +func (t *TimeSeries) handlePending() { + t.increaseAtTime(t.pending, t.pendingTime) + t.pending = 0 + t.pendingTime = t.levels[0].latest() +} + +// Recent returns the sum over [now-duration, now). +func (t *TimeSeries) Recent(duration time.Duration) (float64, error) { + now := t.clock.Now() + return t.Range(now.Add(-duration), now) +} + +// Range returns the sum over the given range [start, end). +// ErrBadRange is returned if start is after end. +// ErrRangeNotCovered is returned if the range lies outside the time series. +func (t *TimeSeries) Range(start, end time.Time) (float64, error) { + if start.After(end) { + return 0, ErrBadRange + } + t.advance(t.clock.Now()) + if ok, err := t.intersects(start, end); !ok { + return 0, err + } + for i := range t.levels { + // use !start.Before so earliest() is included + // if we use earliest().Before() we won't get start + if !start.Before(t.levels[i].earliest()) { + return t.levels[i].sumInterval(start, end, t.latest), nil + } + } + return t.levels[len(t.levels)-1].sumInterval(start, end, t.latest), nil +} + +func (t *TimeSeries) intersects(start, end time.Time) (bool, error) { + biggestLevel := t.levels[len(t.levels)-1] + if end.Before(biggestLevel.latest().Add(-biggestLevel.duration())) { + return false, ErrRangeNotCovered + } + if start.After(t.levels[0].latest()) { + return false, ErrRangeNotCovered + } + return true, nil +} diff --git a/internal/timeseries/timeseries_test.go b/internal/timeseries/timeseries_test.go new file mode 100644 index 0000000..2d0f745 --- /dev/null +++ b/internal/timeseries/timeseries_test.go @@ -0,0 +1,336 @@ +package timeseries + +import ( + "testing" + "time" + + "github.com/benbjohnson/clock" +) + +// TODO: do table based testing + +func setup() (*TimeSeries, *clock.Mock) { + clock := clock.NewMock() + ts, _ := NewTimeSeries( + WithClock(clock), + WithGranularities( + []Granularity{ + {time.Second, 60}, + {time.Minute, 60}, + }, + ), + ) + return ts, clock +} + +func TestClock(t *testing.T) { + clock := &defaultClock{} + + // there is a small chance this won't pass + if clock.Now().Truncate(time.Second) != time.Now().Truncate(time.Second) { + t.Errorf("default clock does not track time.Now") + } +} + +func TestNewTimeSeries(t *testing.T) { + ts, err := NewTimeSeries() + if ts == nil { + t.Errorf("constructor returned nil") + } + if err != nil { + t.Errorf("should not return error") + } +} + +func TestNewTimeSeriesWithGranularities(t *testing.T) { + granularities := []Granularity{ + {time.Second, 60}, + {time.Minute, 60}, + {time.Hour, 24}, + } + ts, err := NewTimeSeries(WithGranularities(granularities)) + if ts == nil || err != nil { + t.Error("could not create time series") + } + + badGranularities := []Granularity{ + {time.Minute, 60}, + {time.Second, 60}, + {time.Hour, 24}, + } + _, err = NewTimeSeries(WithGranularities(badGranularities)) + if err != ErrBadGranularities { + t.Error("should not accept decreasing granularities") + } + + badGranularities = []Granularity{ + {time.Minute, 60}, + {time.Second, 0}, + {time.Hour, 24}, + } + _, err = NewTimeSeries(WithGranularities(badGranularities)) + if err != ErrBadGranularities { + t.Error("should not accept granularities with zero count") + } + + _, err = NewTimeSeries(WithGranularities([]Granularity{})) + if err != ErrBadGranularities { + t.Error("should not accept empty granularities") + } +} + +func TestNewTimeSeriesWithClock(t *testing.T) { + clock := clock.NewMock() + ts, _ := NewTimeSeries(WithClock(clock)) + + ts.Increase(2) + clock.Add(time.Second * 1) + ts.Increase(1) + + res, _ := ts.Range(time.Unix(0, 0), time.Unix(1, 0)) + if res != 2 { + t.Errorf("expected %d got %f", 2, res) + } +} + +func TestRecentSeconds(t *testing.T) { + ts, clock := setup() + + clock.Add(time.Minute * 5) + ts.Increase(1) + clock.Add(time.Second * 1) + ts.Increase(2) + clock.Add(time.Second * 1) + ts.Increase(3) + + res, _ := ts.Recent(time.Second) + if res != 2 { + t.Errorf("expected %d got %f", 2, res) + } + + res, _ = ts.Recent(2 * time.Second) + if res != 3 { + t.Errorf("expected %d got %f", 3, res) + } + + // test earliest second + clock.Add(57 * time.Second) // time: 09:05:59 + res, _ = ts.Recent(59 * time.Second) + if res != 6 { + t.Errorf("expected %d got %f", 6, res) + } + + // test future time + clock.Add(1 * time.Second) + clock.Add(57 * time.Second) // time: 09:06:00 + res, _ = ts.Recent(59 * time.Second) + if res != 0 { + t.Errorf("expected %d got %f", 0, res) + } +} + +func TestRecentMinutes(t *testing.T) { + ts, clock := setup() + + clock.Add(time.Minute * 1) // 09:01:00 + ts.Increase(60) + clock.Add(time.Minute * 1) // 09:02:00 + ts.Increase(1) + clock.Add(time.Minute * 1) // 09:03:00 + ts.Increase(60) + clock.Add(time.Second * 1) // 09:03:01 + ts.Increase(3) + + // test interpolation at beginning + // 59/60 * 60 + 1 + 60 = 120 + res, _ := ts.Recent(2 * time.Minute) + if res != 120 { + t.Errorf("expected %d got %f", 120, res) + } + + // test interpolation at end + // 60/2 = 30 + res, _ = ts.Range( + clock.Now().Add(-2*time.Minute+-1*time.Second), // 09:01:00 + clock.Now().Add(-1*time.Minute+-31*time.Second), // 09:01:30 + ) + if res != 30 { + t.Errorf("expected %d got %f", 30, res) + } + + // get from earliest data point + clock.Add(time.Second*59 + time.Minute*56) + ts.Increase(60) + clock.Add(time.Minute * 1) + ts.Increase(70) + clock.Add(time.Minute * 59) + res, _ = ts.Recent(time.Minute * 60) + if res != 70 { + t.Errorf("expected %d got %f", 70, res) + } +} + +func TestRecentWholeRange(t *testing.T) { + ts, clock := setup() + + clock.Add(time.Minute * 1) // 09:01:00 + ts.Increase(60) + clock.Add(time.Minute * 1) // 09:02:00 + ts.Increase(1) + clock.Add(time.Minute * 1) // 09:03:00 + ts.Increase(60) + clock.Add(time.Second * 1) // 09:03:01 + ts.Increase(3) + + // 60 + 1 + 60 = 121 + res, _ := ts.Recent(60 * time.Minute) + if res != 121 { + t.Errorf("expected %d got %f", 62, res) + } +} + +func TestRecentWholeRangeBig(t *testing.T) { + ts, clock := setup() + + clock.Add(time.Minute * 1) // 09:01:00 + ts.Increase(60) + clock.Add(time.Minute * 1) // 09:02:00 + ts.Increase(1) + clock.Add(time.Minute * 1) // 09:03:00 + ts.Increase(60) + clock.Add(time.Second * 1) // 09:03:01 + ts.Increase(3) + + // 60 + 1 + 60 = 121 + res, _ := ts.Recent(120 * time.Minute) + if res != 121 { + t.Errorf("expected %d got %f", 121, res) + } +} + +func TestRangeEndInFuture(t *testing.T) { + ts, clock := setup() + + clock.Add(time.Minute * 1) // 09:01:00 + ts.Increase(1) + + res, _ := ts.Range(clock.Now().Add(-1*time.Minute), clock.Now().Add(5*time.Minute)) + if res != 0 { + t.Errorf("expected %d got %f", 0, res) + } +} + +func TestRangeBadRange(t *testing.T) { + ts, clock := setup() + + clock.Add(time.Minute * 1) // 09:01:00 + ts.Increase(60) + clock.Add(time.Minute * 1) // 09:02:00 + ts.Increase(1) + clock.Add(time.Minute * 1) // 09:03:00 + ts.Increase(60) + clock.Add(time.Second * 1) // 09:03:01 + ts.Increase(3) + + // start is after end + _, err := ts.Range(clock.Now().Add(time.Minute), clock.Now()) + if err != ErrBadRange { + t.Errorf("should return ErrBadRange") + } + + // range is after end + _, err = ts.Range(clock.Now().Add(time.Minute), clock.Now().Add(5*time.Minute)) + if err != ErrRangeNotCovered { + t.Errorf("should return ErrRangeNotCovered") + } + + // range is before start + _, err = ts.Range(clock.Now().Add(-5*time.Hour), clock.Now().Add(-4*time.Hour)) + if err != ErrRangeNotCovered { + t.Errorf("should return ErrRangeNotCovered") + } +} + +func TestIncrease(t *testing.T) { + ts, clock := setup() + + // time 12:00 + ts.Increase(2) + clock.Add(time.Minute * 1) // time: 12:01:00 + ts.Increase(4) + clock.Add(time.Minute * 1) // time: 12:02:00 + ts.Increase(6) + clock.Add(time.Second * 10) // time: 12:02:10 + ts.Increase(2) + clock.Add(time.Second * 10) // time: 12:02:20 + ts.Increase(2) + clock.Add(time.Second * 10) // time: 12:02:30 + ts.Increase(2) + clock.Add(time.Second * 10) // time: 12:02:40 + ts.Increase(2) + clock.Add(time.Second * 10) // time: 12:02:50 + ts.Increase(2) + clock.Add(time.Second * 10) // time: 12:03:00 + ts.Increase(2) + // get range from 12:00:30 - 12:02:30 + // 0.5 * 2 + 4 + 0.5 * 16 = 13 + res, _ := ts.Range(clock.Now().Add(-time.Second*150), clock.Now().Add(-time.Second*30)) + if res != 13 { + t.Errorf("expected %d got %f", 13, res) + } + + // get range from 12:01:00 - 12:02:00 + // = 4 + res, _ = ts.Range(clock.Now().Add(-time.Second*120), clock.Now().Add(-time.Second*60)) + if res != 4 { + t.Errorf("expected %d got %f", 4, res) + } + +} + +func TestIncreasePending(t *testing.T) { + ts, clock := setup() + + ts.Increase(1) // this should advance and reset pending + ts.Increase(1) // this should increase pending + clock.Add(time.Second) + ts.Increase(1) + + res, _ := ts.Recent(59 * time.Second) + if res != 2 { + t.Errorf("expected %d got %f", 2, res) + } + + clock.Add(time.Second) // the latest data gets merged in because time advanced + + res, _ = ts.Recent(59 * time.Second) + if res != 3 { + t.Errorf("expected %d got %f", 3, res) + } +} + +func TestIncreaseAtTime(t *testing.T) { + ts, clock := setup() + + ts.Increase(60) // time: 09:00:00 + clock.Add(time.Second) // time: 09:00:01 + ts.IncreaseAtTime(60, clock.Now().Add(-1*time.Minute)) // time: 08:59:01 + ts.Increase(1) // time: 09:00:01 + + // from: 08:59:01 - 09:00:01 + // (59/60 * 60) + 60 = 119 + res, _ := ts.Recent(time.Minute) + if res != 119 { + t.Errorf("expected %d got %f", 119, res) + } + + // from: 08:59:00 - 09:00:00 + // 60 + res, _ = ts.Range( + clock.Now().Add(-1*time.Minute+-1*time.Second), + clock.Now().Add(-1*time.Second), + ) + if res != 60 { + t.Errorf("expected %d got %f", 60, res) + } +} diff --git a/internal/trending/LICENSE b/internal/trending/LICENSE new file mode 100644 index 0000000..9934510 --- /dev/null +++ b/internal/trending/LICENSE @@ -0,0 +1,21 @@ +MIT License + +Copyright (c) 2016 Neri Marschik + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. diff --git a/internal/trending/README.md b/internal/trending/README.md new file mode 100644 index 0000000..812db63 --- /dev/null +++ b/internal/trending/README.md @@ -0,0 +1,74 @@ +# go-trending +[![License](http://img.shields.io/badge/license-MIT-red.svg?style=flat)](./LICENSE) +[![GoDoc](https://godoc.org/github.com/codesuki/go-trending?status.svg)](https://godoc.org/github.com/codesuki/go-trending) +[![Build Status](http://img.shields.io/travis/codesuki/go-trending.svg?style=flat)](https://travis-ci.org/codesuki/go-trending) +[![codecov](https://codecov.io/gh/codesuki/go-trending/branch/master/graph/badge.svg)](https://codecov.io/gh/codesuki/go-trending) + +Trending algorithm based on the article [Trending at Instagram](http://instagram-engineering.tumblr.com/post/122961624217/trending-at-instagram). To detect trends an items current behavior is compared to its usual behavior. The more it differes the higher / lower the score. Items will start trending if the current usage is higher than its average usage. To avoid items quickly become non-trending again the scores are smoothed. + +* Configurable and simple to use +* Use your own clock implementation, e.g. for testing or similar +* Use any time series implementation as backend that implements the TimeSeries interface + +### Details +Uses a [time series](https://www.github.com/codesuki/go-time-series) for each item to keep track of its past behavior and get recent behavior with small granularity. Computes the [Kullback-Leibler divergence](https://en.wikipedia.org/wiki/Kullback%E2%80%93Leibler_divergence) between recent behavior and expected, i.e. past, bahavior. Then blends the current item score with its past [decayed](https://en.wikipedia.org/wiki/Exponential_decay) maximum score to get the final score. + +## Examples + +### Creating a default scorer +```go +import "github.com/codesuki/go-trending" + +... + +scorer := trending.NewScorer() +``` + +### Creating a customized scorer +**Parameters** +* **Time series:** is used for creating the backing `TimeSeries` objects +* **Half-life:** controls how long an item is trending after the activity went back to normal. +* **Recent duration:** controls how much data is used to compute the current state. If there is not much activity try looking at larger duration. +* **Storage duration:** controls how much historical data is used. Trends older than the storage duration won't have any effect on the computation. The time series in use should have at least as much storage duration as specified here. + +```go +import "github.com/codesuki/go-trending" + +... +func NewTimeSeries(id string) TimeSeries { + // create time series that satisfies the TimeSeries interface + return timeSeries +} + +... + +scorer := trending.NewScorer( + WithTimeSeries(NewTimeSeries), + WithHalflife(time.Hour), + WithRecentDuration(time.Minute), + WithStorageDuration(7 * 24 * time.Hour), +) +``` + + +### Using the scorer +```go +import "github.com/codesuki/go-trending" + +... + +scorer := trending.NewScorer() + +scorer.AddEvent("id", time) +// add more events. maybe using an event stream. + +... + +trendingItems := scorer.Score() +``` + +## Documentation +GoDoc is located [here](https://godoc.org/github.com/codesuki/go-trending) + +## License +go-trending is [MIT licensed](./LICENSE). diff --git a/internal/trending/item.go b/internal/trending/item.go new file mode 100644 index 0000000..534ef41 --- /dev/null +++ b/internal/trending/item.go @@ -0,0 +1,101 @@ +package trending + +import ( + "math" + "time" +) + +type item struct { + eventSeries TimeSeries + maxSeries SlidingWindow + + max float64 + maxTime time.Time + options *options + + // TODO: move outside of item because it's the same for all items + defaultExpectation float64 + defaultHourlyCount float64 +} + +func newItem(id string, options *options) *item { + defaultHourlyCount := float64(options.baseCount) * float64(options.storageDuration/time.Hour) + defaultExpectation := float64(options.baseCount) / float64(time.Hour/options.recentDuration) + return &item{ + eventSeries: options.creator(id), + maxSeries: options.slidingWindowCreator(id), + options: options, + + defaultExpectation: defaultExpectation, + defaultHourlyCount: defaultHourlyCount, + } +} + +func (i *item) score() score { + recentCount, count := i.computeCounts() + if recentCount < i.options.countThreshold { + return score{} + } + if recentCount == count { + // we see this for the first time so there is no historical data + // use a sensible default like average/median over all items + count = recentCount + i.defaultHourlyCount + } + probability := recentCount / count + + // order of those two lines is important. + // if we insert before reading we might just get the same value. + expectation := i.computeRecentMax() + i.maxSeries.Insert(probability) + + if expectation == 0.0 { + expectation = i.defaultExpectation + } + + klScore := computeKullbackLeibler(probability, expectation) + if klScore > i.max { + i.updateMax(klScore) + } + i.decayMax() + + mixedScore := 0.5 * (klScore + i.max) + + return score{ + Score: mixedScore, + Probability: probability, + Expectation: expectation, + Maximum: i.max, + KLScore: klScore, + } +} + +func (i *item) computeCounts() (float64, float64) { + now := time.Now() + totalCount, _ := i.eventSeries.Range(now.Add(-i.options.storageDuration), now) + count, _ := i.eventSeries.Range(now.Add(-i.options.recentDuration), now) + return count, totalCount +} + +func (i *item) computeRecentMax() float64 { + return i.maxSeries.Max() +} + +func (i *item) decayMax() { + i.updateMax(i.max * i.computeExponentialDecayMultiplier()) +} + +func (i *item) updateMax(score float64) { + i.max = score + i.maxTime = time.Now() +} + +func (i *item) computeExponentialDecayMultiplier() float64 { + return math.Pow(0.5, float64(time.Now().Unix()-i.maxTime.Unix())/i.options.halfLife.Seconds()) +} + +func computeKullbackLeibler(probability float64, expectation float64) float64 { + if probability == 0.0 { + return 0.0 + } + return probability * math.Log(probability/expectation) +} diff --git a/internal/trending/score.go b/internal/trending/score.go new file mode 100644 index 0000000..22e5f3a --- /dev/null +++ b/internal/trending/score.go @@ -0,0 +1,40 @@ +package trending + +type score struct { + ID string + Score float64 + Probability float64 + Expectation float64 + Maximum float64 + KLScore float64 +} + +type Scores []score + +func (s Scores) Len() int { + return len(s) +} + +func (s Scores) Swap(i, j int) { + s[i], s[j] = s[j], s[i] +} + +func (s Scores) Less(i, j int) bool { + return s[i].Score > s[j].Score +} + +func (s Scores) take(count int) Scores { + if count >= len(s) { + return s + } + return s[0 : count-1] +} + +func (s Scores) threshold(t float64) Scores { + for i := range s { + if s[i].Score < t { + return s[0:i] + } + } + return s +} diff --git a/internal/trending/slidingwindow/slidingwindow.go b/internal/trending/slidingwindow/slidingwindow.go new file mode 100644 index 0000000..9b825e7 --- /dev/null +++ b/internal/trending/slidingwindow/slidingwindow.go @@ -0,0 +1,123 @@ +package slidingwindow + +import "time" + +// Clock specifies the needed time related functions used by the time series. +// To use a custom clock implement the interface and pass it to the time series constructor. +// The default clock uses time.Now() +type Clock interface { + Now() time.Time +} + +// defaultClock is used in case no clock is provided to the constructor. +type defaultClock struct{} + +func (c *defaultClock) Now() time.Time { + return time.Now() +} + +type slidingWindow struct { + buffer []float64 + length int + + end time.Time + start time.Time + + oldest int + newest int + + step time.Duration + duration time.Duration + + clock Clock +} + +var defaultStep = time.Hour * 24 +var defaultDuration = time.Hour * 24 * 7 + +type options struct { + clock Clock + + step time.Duration + duration time.Duration +} + +type option func(*options) + +func WithStep(step time.Duration) option { + return func(o *options) { + o.step = step + } +} + +func WithDuration(duration time.Duration) option { + return func(o *options) { + o.duration = duration + } +} + +func WithClock(clock Clock) option { + return func(o *options) { + o.clock = clock + } +} + +func NewSlidingWindow(os ...option) *slidingWindow { + opts := options{} + for _, o := range os { + o(&opts) + } + if opts.clock == nil { + opts.clock = &defaultClock{} + } + if opts.step.Nanoseconds() == 0 { + opts.step = defaultStep + } + if opts.duration.Nanoseconds() == 0 { + opts.duration = defaultDuration + } + return newSlidingWindow(opts.step, opts.duration, opts.clock) +} + +func newSlidingWindow(step time.Duration, duration time.Duration, clock Clock) *slidingWindow { + length := int(duration / step) + now := clock.Now() + return &slidingWindow{ + buffer: make([]float64, length), + length: length, + end: now.Truncate(step).Add(-duration), + start: now, + step: step, + duration: duration, + oldest: 1, + clock: clock, + } +} + +func (sw *slidingWindow) Insert(score float64) { + sw.advance() + if score > sw.buffer[sw.newest] { + sw.buffer[sw.newest] = score + } +} + +func (sw *slidingWindow) Max() float64 { + sw.advance() + max := 0.0 + for i := range sw.buffer { + if sw.buffer[i] > max { + max = sw.buffer[i] + } + } + return max +} + +func (sw *slidingWindow) advance() { + newEnd := sw.clock.Now().Truncate(sw.step).Add(-sw.duration) + for newEnd.After(sw.end) { + sw.end = sw.end.Add(sw.step) + sw.buffer[sw.oldest] = 0.0 + sw.newest = sw.oldest + sw.oldest = (sw.oldest + 1) % sw.length + } +} diff --git a/internal/trending/trending.go b/internal/trending/trending.go new file mode 100644 index 0000000..6117872 --- /dev/null +++ b/internal/trending/trending.go @@ -0,0 +1,193 @@ +package trending + +import ( + "sort" + "time" + + timeseries "dynatron.me/x/stillbox/internal/timeseries" + "dynatron.me/x/stillbox/internal/trending/slidingwindow" +) + +// Algorithm: +// 1. Divide one week into 5 minutes bins +// The algorithm uses expected probability to compute its ranking. +// By choosing a one week span to compute the expectation the algorithm will forget old trends. +// 2. For every play event increase the counter in the current bin +// 3. Compute the KL Divergence with the following steps +// - Compute the probability of the last full bin (this should be the current 5 minutes sliding window) +// - Compute the expected probability over the past bins including the current bin +// - Compute KL Divergence (kld = p * ln(p/e)) +// 4. Keep the highest KL Divergence score together with its timestamp +// 5. Compute exponential decay multiplier and multiply with highest KL Divergence +// 6. Blend current KL Divergence score with decayed high score + +var defaultHalfLife = 2 * time.Hour +var defaultRecentDuration = 5 * time.Minute +var defaultStorageDuration = 7 * 24 * time.Hour +var defaultMaxResults = 100 +var defaultBaseCount = 3 +var defaultScoreThreshold = 0.01 +var defaultCountThreshold = 3.0 + +type options struct { + creator TimeSeriesCreator + slidingWindowCreator SlidingWindowCreator + + halfLife time.Duration + + recentDuration time.Duration + storageDuration time.Duration + + maxResults int + baseCount int + scoreThreshold float64 + countThreshold float64 +} + +type Option func(*options) + +func WithTimeSeries(creator TimeSeriesCreator) Option { + return func(o *options) { + o.creator = creator + } +} + +func WithSlidingWindow(creator SlidingWindowCreator) Option { + return func(o *options) { + o.slidingWindowCreator = creator + } +} + +func WithHalfLife(halfLife time.Duration) Option { + return func(o *options) { + o.halfLife = halfLife + } +} + +func WithRecentDuration(recentDuration time.Duration) Option { + return func(o *options) { + o.recentDuration = recentDuration + } +} + +func WithStorageDuration(storageDuration time.Duration) Option { + return func(o *options) { + o.storageDuration = storageDuration + } +} + +func WithMaxResults(maxResults int) Option { + return func(o *options) { + o.maxResults = maxResults + } +} + +func WithScoreThreshold(threshold float64) Option { + return func(o *options) { + o.scoreThreshold = threshold + } +} + +func WithCountThreshold(threshold float64) Option { + return func(o *options) { + o.countThreshold = threshold + } +} + +type Scorer struct { + options options + items map[string]*item +} + +type SlidingWindow interface { + Insert(score float64) + Max() float64 +} + +type SlidingWindowCreator func(string) SlidingWindow + +type TimeSeries interface { + IncreaseAtTime(amount int, time time.Time) + Range(start, end time.Time) (float64, error) +} + +type TimeSeriesCreator func(string) TimeSeries + +func NewMemoryTimeSeries(id string) TimeSeries { + ts, _ := timeseries.NewTimeSeries(timeseries.WithGranularities( + []timeseries.Granularity{ + {Granularity: time.Second, Count: 60}, + {Granularity: time.Minute, Count: 10}, + {Granularity: time.Hour, Count: 24}, + {Granularity: time.Hour * 24, Count: 7}, + }, + )) + return ts +} + +func NewScorer(options ...Option) Scorer { + scorer := Scorer{items: make(map[string]*item)} + for _, o := range options { + o(&scorer.options) + } + if scorer.options.creator == nil { + scorer.options.creator = NewMemoryTimeSeries + } + if scorer.options.halfLife == 0 { + scorer.options.halfLife = defaultHalfLife + } + if scorer.options.recentDuration == 0 { + scorer.options.recentDuration = defaultRecentDuration + } + if scorer.options.storageDuration == 0 { + scorer.options.storageDuration = defaultStorageDuration + } + if scorer.options.maxResults == 0 { + scorer.options.maxResults = defaultMaxResults + } + if scorer.options.scoreThreshold == 0.0 { + scorer.options.scoreThreshold = defaultScoreThreshold + } + if scorer.options.countThreshold == 0.0 { + scorer.options.countThreshold = defaultCountThreshold + } + if scorer.options.baseCount == 0.0 { + scorer.options.baseCount = defaultBaseCount + } + if scorer.options.slidingWindowCreator == nil { + scorer.options.slidingWindowCreator = func(id string) SlidingWindow { + return slidingwindow.NewSlidingWindow( + slidingwindow.WithStep(time.Hour*24), + slidingwindow.WithDuration(scorer.options.storageDuration), + ) + } + } + return scorer +} + +func (s *Scorer) AddEvent(id string, time time.Time) { + item := s.items[id] + if item == nil { + item = newItem(id, &s.options) + s.items[id] = item + } + s.addToItem(item, time) +} + +func (s *Scorer) addToItem(item *item, time time.Time) { + item.eventSeries.IncreaseAtTime(1, time) +} + +func (s *Scorer) Score() Scores { + var scores Scores + for id, item := range s.items { + score := item.score() + score.ID = id + scores = append(scores, score) + } + sort.Sort(scores) + if s.options.scoreThreshold > 0 { + scores = scores.threshold(s.options.scoreThreshold) + } + return scores.take(s.options.maxResults) +}