133 lines
3.2 KiB
Go
133 lines
3.2 KiB
Go
package timeseries
|
|
|
|
import (
|
|
"time"
|
|
|
|
"github.com/rs/zerolog/log"
|
|
)
|
|
|
|
type level struct {
|
|
clock Clock
|
|
granularity time.Duration
|
|
length int
|
|
end time.Time
|
|
oldest int
|
|
newest int
|
|
buckets []int
|
|
}
|
|
|
|
func newLevel(clock Clock, granularity time.Duration, length int) level {
|
|
level := level{clock: clock, granularity: granularity, length: length}
|
|
level.init()
|
|
return level
|
|
}
|
|
|
|
func (l *level) init() {
|
|
buckets := make([]int, l.length)
|
|
l.buckets = buckets
|
|
l.clear(time.Time{})
|
|
}
|
|
|
|
func (l *level) clear(time time.Time) {
|
|
l.oldest = 1
|
|
l.newest = 0
|
|
l.end = time.Truncate(l.granularity)
|
|
for i := range l.buckets {
|
|
l.buckets[i] = 0
|
|
}
|
|
}
|
|
|
|
func (l *level) duration() time.Duration {
|
|
return l.granularity*time.Duration(l.length) - l.granularity
|
|
}
|
|
|
|
func (l *level) earliest() time.Time {
|
|
return l.end.Add(-l.duration())
|
|
}
|
|
|
|
func (l *level) latest() time.Time {
|
|
return l.end
|
|
}
|
|
|
|
func (l *level) increaseAtTime(amount int, time time.Time) {
|
|
difference := l.end.Sub(time.Truncate(l.granularity))
|
|
if difference < 0 {
|
|
// this cannot be negative because we advance before
|
|
// can at least be 0
|
|
log.Error().Time("time", time).Msg("level.increaseTime was called with a time in the future")
|
|
}
|
|
// l.length-1 because the newest element is always l.length-1 away from oldest
|
|
steps := (l.length - 1) - int(difference/l.granularity)
|
|
index := (l.oldest + steps) % l.length
|
|
l.buckets[index] += amount
|
|
}
|
|
|
|
func (l *level) advance(target time.Time) {
|
|
if !l.end.Before(target) {
|
|
return
|
|
}
|
|
for target.After(l.end) {
|
|
l.end = l.end.Add(l.granularity)
|
|
l.buckets[l.oldest] = 0
|
|
l.newest = l.oldest
|
|
l.oldest = (l.oldest + 1) % len(l.buckets)
|
|
}
|
|
}
|
|
|
|
// TODO: find a better way to handle latest parameter
|
|
// The parameter is used to avoid the overlap computation if end overlaps with the current time.
|
|
// Probably will find away when implementing redis version.
|
|
func (l *level) sumInterval(start, end time.Time, latest time.Time) float64 {
|
|
if start.Before(l.earliest()) {
|
|
start = l.earliest()
|
|
}
|
|
if end.After(l.latest()) {
|
|
end = l.latest()
|
|
}
|
|
idx := 0
|
|
// this is how many time steps start is away from earliest
|
|
startSteps := start.Sub(l.earliest()) / l.granularity
|
|
idx += int(startSteps)
|
|
|
|
currentTime := l.earliest()
|
|
currentTime = currentTime.Add(startSteps * l.granularity)
|
|
|
|
sum := 0.0
|
|
for idx < l.length && currentTime.Before(end) {
|
|
nextTime := currentTime.Add(l.granularity)
|
|
if nextTime.After(latest) {
|
|
nextTime = latest
|
|
}
|
|
if nextTime.Before(start) {
|
|
// the case nextTime.Before(start) happens when start is after latest
|
|
// therefore we don't have data and can return
|
|
break
|
|
}
|
|
count := float64(l.buckets[(l.oldest+idx)%l.length])
|
|
if currentTime.Before(start) || nextTime.After(end) {
|
|
// current bucket overlaps time range
|
|
overlapStart := max(currentTime, start)
|
|
overlapEnd := min(nextTime, end)
|
|
overlap := overlapEnd.Sub(overlapStart).Seconds() / l.granularity.Seconds()
|
|
count *= overlap
|
|
}
|
|
sum += count
|
|
idx++
|
|
currentTime = currentTime.Add(l.granularity)
|
|
}
|
|
return sum
|
|
}
|
|
|
|
func min(t1, t2 time.Time) time.Time {
|
|
if t1.Before(t2) {
|
|
return t1
|
|
}
|
|
return t2
|
|
}
|
|
|
|
func max(t1, t2 time.Time) time.Time {
|
|
if t1.After(t2) {
|
|
return t1
|
|
}
|
|
return t2
|
|
}
|