Source File
timeseries.go
Belonging Package
golang.org/x/net/internal/timeseries
package timeseries // import "golang.org/x/net/internal/timeseries"
import (
)
const (
timeSeriesNumBuckets = 64
minuteHourSeriesNumBuckets = 60
)
var timeSeriesResolutions = []time.Duration{
1 * time.Second,
10 * time.Second,
1 * time.Minute,
10 * time.Minute,
1 * time.Hour,
6 * time.Hour,
24 * time.Hour, // 1 day
7 * 24 * time.Hour, // 1 week
4 * 7 * 24 * time.Hour, // 4 weeks
16 * 7 * 24 * time.Hour, // 16 weeks
}
var minuteHourSeriesResolutions = []time.Duration{
1 * time.Second,
1 * time.Minute,
}
type Observable interface {
Multiply(ratio float64) // Multiplies the data in self by a given ratio
Add(other Observable) // Adds the data from a different observation to self
Clear() // Clears the observation so it can be reused.
CopyFrom(other Observable) // Copies the contents of a given observation to self
}
func () Observable {
:= Float(0)
return &
}
type Clock interface {
Time() time.Time
}
type defaultClock int
var defaultClockInstance defaultClock
func (defaultClock) () time.Time { return time.Now() }
type tsLevel struct {
oldest int // index to oldest bucketed Observable
newest int // index to newest bucketed Observable
end time.Time // end timestamp for this level
size time.Duration // duration of the bucketed Observable
buckets []Observable // collections of observations
provider func() Observable // used for creating new Observable
}
func ( *tsLevel) () {
.oldest = 0
.newest = len(.buckets) - 1
.end = time.Time{}
for := range .buckets {
if .buckets[] != nil {
.buckets[].Clear()
.buckets[] = nil
}
}
}
func ( *tsLevel) ( time.Duration, int, func() Observable) {
.size =
.provider =
.buckets = make([]Observable, )
}
type timeSeries struct {
provider func() Observable // make more Observable
numBuckets int // number of buckets in each level
levels []*tsLevel // levels of bucketed Observable
lastAdd time.Time // time of last Observable tracked
total Observable // convenient aggregation of all Observable
clock Clock // Clock for getting current time
pending Observable // observations not yet bucketed
pendingTime time.Time // what time are we keeping in pending
dirty bool // if there are pending observations
}
func ( *timeSeries) ( []time.Duration, func() Observable, int, Clock) {
.provider =
.numBuckets =
.clock =
.levels = make([]*tsLevel, len())
for := range {
if > 0 && [-1] >= [] {
log.Print("timeseries: resolutions must be monotonically increasing")
break
}
:= new(tsLevel)
.InitLevel([], .numBuckets, .provider)
.levels[] =
}
.Clear()
}
func ( *timeSeries) () {
.lastAdd = time.Time{}
.total = .resetObservation(.total)
.pending = .resetObservation(.pending)
.pendingTime = time.Time{}
.dirty = false
for := range .levels {
.levels[].Clear()
}
}
func ( *timeSeries) ( Observable) {
.AddWithTime(, .clock.Time())
}
func ( *timeSeries) ( Observable, time.Time) {
:= .levels[0].size
if .After(.lastAdd) {
.lastAdd =
}
if .After(.pendingTime) {
.advance()
.mergePendingUpdates()
.pendingTime = .levels[0].end
.pending.CopyFrom()
.dirty = true
.pending.Add()
.dirty = true
} else {
.mergeValue(, )
}
}
func ( *timeSeries) ( Observable, time.Time) {
for , := range .levels {
:= (.numBuckets - 1) - int(.end.Sub()/.size)
if 0 <= && < .numBuckets {
:= (.oldest + ) % .numBuckets
if .buckets[] == nil {
.buckets[] = .provider()
}
.buckets[].Add()
}
}
.total.Add()
}
func ( *timeSeries) () {
if .dirty {
.mergeValue(.pending, .pendingTime)
.pending = .resetObservation(.pending)
.dirty = false
}
}
if !.Before(.end.Add(.size * time.Duration(.numBuckets))) {
for , := range .buckets {
.resetObservation()
}
.end = time.Unix(0, (.UnixNano()/.size.Nanoseconds())*.size.Nanoseconds())
}
for .After(.end) {
.end = .end.Add(.size)
.newest = .oldest
.oldest = (.oldest + 1) % .numBuckets
.resetObservation(.buckets[.newest])
}
= .end
}
}
func ( *timeSeries) (, int) Observable {
:= .clock.Time()
if .levels[0].end.Before() {
.advance()
}
.mergePendingUpdates()
:= .provider()
:= .levels[]
:= .newest
for := 0; < ; ++ {
if .buckets[] != nil {
.Add(.buckets[])
}
if == 0 {
= .numBuckets
}
--
}
return
}
func ( *timeSeries) (, int) []Observable {
if < 0 || > len(.levels) {
log.Print("timeseries: bad level argument: ", )
return nil
}
if < 0 || >= .numBuckets {
log.Print("timeseries: bad num argument: ", )
return nil
}
:= make([]Observable, )
:= .clock.Time()
if .levels[0].end.Before() {
.advance()
}
.mergePendingUpdates()
:= .levels[]
:= .newest
for := 0; < ; ++ {
:= .provider()
[] =
if .buckets[] != nil {
.CopyFrom(.buckets[])
}
if == 0 {
= .numBuckets
}
-= 1
}
return
}
func ( *timeSeries) ( float64) {
for , := range .levels {
for := 0; < .numBuckets; ++ {
.buckets[].Multiply()
}
}
.total.Multiply()
.pending.Multiply()
}
func ( *timeSeries) (, time.Time) Observable {
return .ComputeRange(, , 1)[0]
}
func ( *timeSeries) ( time.Duration) Observable {
:= .clock.Time()
return .Range(.Add(-), )
}
func ( *timeSeries) () Observable {
.mergePendingUpdates()
return .total
}
func ( *timeSeries) (, time.Time, int) []Observable {
if .After() {
log.Printf("timeseries: start > finish, %v>%v", , )
return nil
}
if < 0 {
log.Printf("timeseries: num < 0, %v", )
return nil
}
:= make([]Observable, )
for , := range .levels {
if !.Before(.end.Add(-.size * time.Duration(.numBuckets))) {
.extract(, , , , )
return
}
}
func ( *timeSeries) ( time.Duration, int) []Observable {
if < 0 {
return nil
}
:= .clock.Time()
return .ComputeRange(.Add(-), , )
}
func ( *timeSeries) ( *tsLevel, , time.Time, int, []Observable) {
.mergePendingUpdates()
:= .size
:= .Sub() / time.Duration()
:=
:= .end.Add(- * time.Duration(.numBuckets))
:= 0
for := 0; < ; ++ {
[] = .resetObservation([])
:= .Add()
for < .numBuckets && .Before() {
:= .Add()
if .After(.lastAdd) {
= .lastAdd
}
if !.Before() {
:= .buckets[(+.oldest)%.numBuckets]
func ( *timeSeries) ( Observable) Observable {
if == nil {
= .provider()
} else {
.Clear()
}
return
}
type TimeSeries struct {
timeSeries
}
func ( func() Observable) *TimeSeries {
return NewTimeSeriesWithClock(, defaultClockInstance)
}
func ( func() Observable, Clock) *TimeSeries {
:= new(TimeSeries)
.timeSeries.init(timeSeriesResolutions, , timeSeriesNumBuckets, )
return
}
type MinuteHourSeries struct {
timeSeries
}
func ( func() Observable) *MinuteHourSeries {
return NewMinuteHourSeriesWithClock(, defaultClockInstance)
}
func ( func() Observable, Clock) *MinuteHourSeries {
:= new(MinuteHourSeries)
.timeSeries.init(minuteHourSeriesResolutions, ,
minuteHourSeriesNumBuckets, )
return
}
func ( *MinuteHourSeries) () Observable {
return .timeSeries.Latest(0, 60)
}
func ( *MinuteHourSeries) () Observable {
return .timeSeries.Latest(1, 60)
}
func (, time.Time) time.Time {
if .Before() {
return
}
return
}
func (, time.Time) time.Time {
if .After() {
return
}
return
![]() |
The pages are generated with Golds v0.3.2-preview. (GOOS=darwin GOARCH=amd64) Golds is a Go 101 project developed by Tapir Liu. PR and bug reports are welcome and can be submitted to the issue list. Please follow @Go100and1 (reachable from the left QR code) to get the latest news of Golds. |