stillbox/internal/timeseries/level.go

133 lines
3.2 KiB
Go

package timeseries
import (
"time"
"github.com/rs/zerolog/log"
)
type level struct {
clock Clock
granularity time.Duration
length int
end time.Time
oldest int
newest int
buckets []int
}
func newLevel(clock Clock, granularity time.Duration, length int) level {
level := level{clock: clock, granularity: granularity, length: length}
level.init()
return level
}
func (l *level) init() {
buckets := make([]int, l.length)
l.buckets = buckets
l.clear(time.Time{})
}
func (l *level) clear(time time.Time) {
l.oldest = 1
l.newest = 0
l.end = time.Truncate(l.granularity)
for i := range l.buckets {
l.buckets[i] = 0
}
}
func (l *level) duration() time.Duration {
return l.granularity*time.Duration(l.length) - l.granularity
}
func (l *level) earliest() time.Time {
return l.end.Add(-l.duration())
}
func (l *level) latest() time.Time {
return l.end
}
func (l *level) increaseAtTime(amount int, time time.Time) {
difference := l.end.Sub(time.Truncate(l.granularity))
if difference < 0 {
// this cannot be negative because we advance before
// can at least be 0
log.Error().Time("time", time).Msg("level.increaseTime was called with a time in the future")
}
// l.length-1 because the newest element is always l.length-1 away from oldest
steps := (l.length - 1) - int(difference/l.granularity)
index := (l.oldest + steps) % l.length
l.buckets[index] += amount
}
func (l *level) advance(target time.Time) {
if !l.end.Before(target) {
return
}
for target.After(l.end) {
l.end = l.end.Add(l.granularity)
l.buckets[l.oldest] = 0
l.newest = l.oldest
l.oldest = (l.oldest + 1) % len(l.buckets)
}
}
// TODO: find a better way to handle latest parameter
// The parameter is used to avoid the overlap computation if end overlaps with the current time.
// Probably will find away when implementing redis version.
func (l *level) sumInterval(start, end time.Time, latest time.Time) float64 {
if start.Before(l.earliest()) {
start = l.earliest()
}
if end.After(l.latest()) {
end = l.latest()
}
idx := 0
// this is how many time steps start is away from earliest
startSteps := start.Sub(l.earliest()) / l.granularity
idx += int(startSteps)
currentTime := l.earliest()
currentTime = currentTime.Add(startSteps * l.granularity)
sum := 0.0
for idx < l.length && currentTime.Before(end) {
nextTime := currentTime.Add(l.granularity)
if nextTime.After(latest) {
nextTime = latest
}
if nextTime.Before(start) {
// the case nextTime.Before(start) happens when start is after latest
// therefore we don't have data and can return
break
}
count := float64(l.buckets[(l.oldest+idx)%l.length])
if currentTime.Before(start) || nextTime.After(end) {
// current bucket overlaps time range
overlapStart := max(currentTime, start)
overlapEnd := min(nextTime, end)
overlap := overlapEnd.Sub(overlapStart).Seconds() / l.granularity.Seconds()
count *= overlap
}
sum += count
idx++
currentTime = currentTime.Add(l.granularity)
}
return sum
}
func min(t1, t2 time.Time) time.Time {
if t1.Before(t2) {
return t1
}
return t2
}
func max(t1, t2 time.Time) time.Time {
if t1.After(t2) {
return t1
}
return t2
}