stillbox/internal/timeseries/timeseries.go
Daniel Ponte cdc2034af6 Initial alerting
wip

kind of working

wip
2024-10-31 13:15:48 -04:00

274 lines
7.2 KiB
Go

package timeseries
import (
"errors"
"time"
)
// Explanation
// Have several granularity buckets
// 1s, 1m, 5m, ...
// The buckets will be in circular arrays
//
// For example we could have
// 60 1s buckets to make up 1 minute
// 60 1m buckets to make up 1 hour
// ...
// This would enable us to get the last 1 minute data at 1s granularity (every second)
//
// Date ranges are [start, end[
//
// Put:
// Every time an event comes we add it to all corresponding buckets
//
// Example:
// Event time = 12:00:00
// 1s bucket = 12:00:00
// 1m bucket = 12:00:00
// 5m bucket = 12:00:00
//
// Event time = 12:00:01
// 1s bucket = 12:00:01
// 1m bucket = 12:00:00
// 5m bucket = 12:00:00
//
// Event time = 12:01:01
// 1s bucket = 12:01:01
// 1m bucket = 12:01:00
// 5m bucket = 12:00:00
//
// Fetch:
// Given a time span we try to find the buckets with the finest granularity
// to satisfy the time span and return the sum of their contents
//
// Example:
// Now = 12:05:30
// Time span = 12:05:00 - 12:05:02
// Return sum of 1s buckets 0,1
//
// Now = 12:10:00
// Time span = 12:05:00 - 12:07:00
// Return sum of 1m buckets 5,6
//
// Now = 12:10:00
// Time span = 12:00:00 - 12:10:00 (last 10 minutes)
// Return sum of 5m buckets 0,1
//
// Now = 12:10:01
// Time span = 12:05:01 - 12:10:01 (last 5 minutes)
// Return sum of 5m buckets (59/(5*60))*1, (1/(5*60))*2
//
// Now = 12:10:01
// Time span = 12:04:01 - 12:10:01 (last 6 minutes)
// Return sum of 1m buckets (59/60)*4, 5, 6, 7, 8, 9, (1/60)*10
var (
// ErrBadRange indicates that the given range is invalid. Start should always be <= End
ErrBadRange = errors.New("timeseries: range is invalid")
// ErrBadGranularities indicates that the provided granularities are not strictly increasing
ErrBadGranularities = errors.New("timeseries: granularities must be strictly increasing and non empty")
// ErrRangeNotCovered indicates that the provided range lies outside the time series
ErrRangeNotCovered = errors.New("timeseries: range is not convered")
)
// defaultGranularities are used in case no granularities are provided to the constructor.
var defaultGranularities = []Granularity{
{time.Second, 60},
{time.Minute, 60},
{time.Hour, 24},
}
// Clock specifies the needed time related functions used by the time series.
// To use a custom clock implement the interface and pass it to the time series constructor.
// The default clock uses time.Now()
type Clock interface {
Now() time.Time
}
// DefaultClock is used in case no clock is provided to the constructor.
type defaultClock struct{}
var DefaultClock Clock = &defaultClock{}
func (c *defaultClock) Now() time.Time {
return time.Now()
}
// Granularity describes the granularity for one level of the time series.
// Count cannot be 0.
type Granularity struct {
Granularity time.Duration
Count int
}
type options struct {
clock Clock
granularities []Granularity
}
// Option configures the time series.
type Option func(*options)
// WithClock returns a Option that sets the clock used by the time series.
func WithClock(c Clock) Option {
return func(o *options) {
o.clock = c
}
}
// WithGranularities returns a Option that sets the granularites used by the time series.
func WithGranularities(g []Granularity) Option {
return func(o *options) {
o.granularities = g
}
}
type TimeSeries struct {
clock Clock
levels []level
pending int
pendingTime time.Time
latest time.Time
}
// NewTimeSeries creates a new time series with the provided options.
// If no options are provided default values are used.
func NewTimeSeries(os ...Option) (*TimeSeries, error) {
opts := options{}
for _, o := range os {
o(&opts)
}
if opts.clock == nil {
opts.clock = DefaultClock
}
if opts.granularities == nil {
opts.granularities = defaultGranularities
}
return newTimeSeries(opts.clock, opts.granularities)
}
func newTimeSeries(clock Clock, granularities []Granularity) (*TimeSeries, error) {
err := checkGranularities(granularities)
if err != nil {
return nil, err
}
return &TimeSeries{clock: clock, levels: createLevels(clock, granularities)}, nil
}
func checkGranularities(granularities []Granularity) error {
if len(granularities) == 0 {
return ErrBadGranularities
}
last := time.Duration(0)
for i := 0; i < len(granularities); i++ {
if granularities[i].Count == 0 {
return ErrBadGranularities
}
if granularities[i].Granularity <= last {
return ErrBadGranularities
}
last = granularities[i].Granularity
}
return nil
}
func createLevels(clock Clock, granularities []Granularity) []level {
levels := make([]level, len(granularities))
for i := range granularities {
levels[i] = newLevel(clock, granularities[i].Granularity, granularities[i].Count)
}
return levels
}
// Increase adds amount at current time.
func (t *TimeSeries) Increase(amount int) {
t.IncreaseAtTime(amount, t.clock.Now())
}
// IncreaseAtTime adds amount at a specific time.
func (t *TimeSeries) IncreaseAtTime(amount int, time time.Time) {
if time.After(t.latest) {
t.latest = time
}
if time.After(t.pendingTime) {
t.advance(time)
t.pending = amount
} else if time.After(t.pendingTime.Add(-t.levels[0].granularity)) {
t.pending++
} else {
t.increaseAtTime(amount, time)
}
}
func (t *TimeSeries) increaseAtTime(amount int, time time.Time) {
for i := range t.levels {
if time.Before(t.levels[i].latest().Add(-1 * t.levels[i].duration())) {
continue
}
t.levels[i].increaseAtTime(amount, time)
}
}
func (t *TimeSeries) advance(target time.Time) {
// we need this here because advance is called from other locations
// than IncreaseAtTime that don't check by themselves
if !target.After(t.pendingTime) {
return
}
t.advanceLevels(target)
t.handlePending()
}
func (t *TimeSeries) advanceLevels(target time.Time) {
for i := range t.levels {
if !target.Before(t.levels[i].latest().Add(t.levels[i].duration())) {
t.levels[i].clear(target)
continue
}
t.levels[i].advance(target)
}
}
func (t *TimeSeries) handlePending() {
t.increaseAtTime(t.pending, t.pendingTime)
t.pending = 0
t.pendingTime = t.levels[0].latest()
}
// Recent returns the sum over [now-duration, now).
func (t *TimeSeries) Recent(duration time.Duration) (float64, error) {
now := t.clock.Now()
return t.Range(now.Add(-duration), now)
}
// Range returns the sum over the given range [start, end).
// ErrBadRange is returned if start is after end.
// ErrRangeNotCovered is returned if the range lies outside the time series.
func (t *TimeSeries) Range(start, end time.Time) (float64, error) {
if start.After(end) {
return 0, ErrBadRange
}
t.advance(t.clock.Now())
if ok, err := t.intersects(start, end); !ok {
return 0, err
}
for i := range t.levels {
// use !start.Before so earliest() is included
// if we use earliest().Before() we won't get start
if !start.Before(t.levels[i].earliest()) {
return t.levels[i].sumInterval(start, end, t.latest), nil
}
}
return t.levels[len(t.levels)-1].sumInterval(start, end, t.latest), nil
}
func (t *TimeSeries) intersects(start, end time.Time) (bool, error) {
biggestLevel := t.levels[len(t.levels)-1]
if end.Before(biggestLevel.latest().Add(-biggestLevel.duration())) {
return false, ErrRangeNotCovered
}
if start.After(t.levels[0].latest()) {
return false, ErrRangeNotCovered
}
return true, nil
}