Source File
stats.go
Belonging Package
contrib.go.opencensus.io/exporter/stackdriver
package stackdriver
import (
monitoring
distributionpb
labelpb
googlemetricpb
metricpb
monitoredrespb
monitoringpb
)
const (
maxTimeSeriesPerUpload = 200
opencensusTaskKey = "opencensus_task"
opencensusTaskDescription = "Opencensus task identifier"
defaultDisplayNamePrefix = "OpenCensus"
version = "0.13.3"
)
type statsExporter struct {
o Options
viewDataBundler *bundler.Bundler
metricsBundler *bundler.Bundler
protoMu sync.Mutex
protoMetricDescriptors map[string]bool // Metric descriptors that were already created remotely
metricMu sync.Mutex
metricDescriptors map[string]bool // Metric descriptors that were already created remotely
c *monitoring.MetricClient
defaultLabels map[string]labelValue
ir *metricexport.IntervalReader
initReaderOnce sync.Once
}
var (
errBlankProjectID = errors.New("expecting a non-blank ProjectID")
)
func ( Options) (*statsExporter, error) {
if strings.TrimSpace(.ProjectID) == "" {
return nil, errBlankProjectID
}
:= append(.MonitoringClientOptions, option.WithUserAgent(.UserAgent))
:= .Context
if == nil {
= context.Background()
}
, := monitoring.NewMetricClient(, ...)
if != nil {
return nil,
}
:= &statsExporter{
c: ,
o: ,
protoMetricDescriptors: make(map[string]bool),
metricDescriptors: make(map[string]bool),
}
var map[string]labelValue
if .DefaultMonitoringLabels != nil {
= .DefaultMonitoringLabels.m
} else {
= map[string]labelValue{
opencensusTaskKey: {val: getTaskValue(), desc: opencensusTaskDescription},
}
}
for , := range {
.defaultLabels[sanitize()] =
}
.viewDataBundler = bundler.NewBundler((*view.Data)(nil), func( interface{}) {
:= .([]*view.Data)
.handleUpload(...)
})
.metricsBundler = bundler.NewBundler((*metricdata.Metric)(nil), func( interface{}) {
:= .([]*metricdata.Metric)
.handleMetricsUpload()
})
if := .o.BundleDelayThreshold; > 0 {
.viewDataBundler.DelayThreshold =
.metricsBundler.DelayThreshold =
}
if := .o.BundleCountThreshold; > 0 {
.viewDataBundler.BundleCountThreshold =
.metricsBundler.BundleCountThreshold =
}
return , nil
}
func ( *statsExporter) () error {
.initReaderOnce.Do(func() {
.ir, _ = metricexport.NewIntervalReader(metricexport.NewReader(), )
})
.ir.ReportingInterval = .o.ReportingInterval
return .ir.Start()
}
func ( *statsExporter) () {
if .ir != nil {
.ir.Stop()
}
}
func ( *statsExporter) ( *view.View, []tag.Tag) ([]tag.Tag, *monitoredrespb.MonitoredResource) {
:= .o.Resource
if == nil {
= &monitoredrespb.MonitoredResource{
Type: "global",
}
}
return ,
}
func ( *statsExporter) ( *view.Data) {
if len(.Rows) == 0 {
return
}
:= .viewDataBundler.Add(, 1)
switch {
case nil:
return
case bundler.ErrOverflow:
.o.handleError(errors.New("failed to upload: buffer full"))
default:
.o.handleError()
}
}
func ( *statsExporter) ( ...*view.Data) {
if := .uploadStats(); != nil {
.o.handleError()
}
}
func ( *statsExporter) () {
.viewDataBundler.Flush()
.metricsBundler.Flush()
}
func ( *statsExporter) ( []*view.Data) error {
, := newContextWithTimeout(.o.Context, .o.Timeout)
defer ()
, := trace.StartSpan(
,
"contrib.go.opencensus.io/exporter/stackdriver.uploadStats",
trace.WithSampler(trace.NeverSample()),
)
defer .End()
for , := range {
if := .createMetricDescriptorFromView(, .View); != nil {
.SetStatus(trace.Status{Code: 2, Message: .Error()})
return
}
}
for , := range .makeReq(, maxTimeSeriesPerUpload) {
if := createTimeSeries(, .c, ); != nil {
return
}
}
return nil
}
func ( *statsExporter) ( []*view.Data, int) []*monitoringpb.CreateTimeSeriesRequest {
var []*monitoringpb.CreateTimeSeriesRequest
var []*monitoringpb.TimeSeries
for , := range {
for , := range .Rows {
, := .getMonitoredResource(.View, append([]tag.Tag(nil), .Tags...))
:= &monitoringpb.TimeSeries{
Metric: &metricpb.Metric{
Type: .metricType(.View),
Labels: newLabels(.defaultLabels, ),
},
Resource: ,
Points: []*monitoringpb.Point{newPoint(.View, , .Start, .End)},
}
= append(, )
}
}
var []*monitoringpb.TimeSeries
for , := range {
= append(, )
if len() == {
:= .combineTimeSeriesToCreateTimeSeriesRequest()
= append(, ...)
= [:0]
}
}
if len() > 0 {
:= .combineTimeSeriesToCreateTimeSeriesRequest()
= append(, ...)
}
return
}
func ( *statsExporter) ( context.Context, *view.View) (*metricpb.MetricDescriptor, error) {
:= .Measure
:= .Aggregation
:= .Name
:= .metricType()
var metricpb.MetricDescriptor_ValueType
:= metricpb.MetricDescriptor_CUMULATIVE
switch .Type {
case view.AggTypeCount:
= stats.UnitDimensionless
case view.AggTypeSum:
switch .(type) {
case *stats.Int64Measure:
= metricpb.MetricDescriptor_INT64
case *stats.Float64Measure:
= metricpb.MetricDescriptor_DOUBLE
}
case view.AggTypeDistribution:
= metricpb.MetricDescriptor_DISTRIBUTION
case view.AggTypeLastValue:
= metricpb.MetricDescriptor_GAUGE
switch .(type) {
case *stats.Int64Measure:
= metricpb.MetricDescriptor_INT64
case *stats.Float64Measure:
= metricpb.MetricDescriptor_DOUBLE
}
default:
return nil, fmt.Errorf("unsupported aggregation type: %s", .Type.String())
}
var string
if .o.GetMetricDisplayName == nil {
= .displayName()
} else {
= .o.GetMetricDisplayName()
}
:= &metricpb.MetricDescriptor{
Name: fmt.Sprintf("projects/%s/metricDescriptors/%s", .o.ProjectID, ),
DisplayName: ,
Description: .Description,
Unit: ,
Type: ,
MetricKind: ,
ValueType: ,
Labels: newLabelDescriptors(.defaultLabels, .TagKeys),
}
return , nil
}
if .o.SkipCMD {
return nil
}
.metricMu.Lock()
defer .metricMu.Unlock()
:= .Name
if , := .metricDescriptors[]; {
return nil
}
if builtinMetric(.metricType()) {
.metricDescriptors[] = true
return nil
}
, := .viewToMetricDescriptor(, )
if != nil {
return
}
if = .createMetricDescriptor(, ); != nil {
return
}
.metricDescriptors[] = true
return nil
}
func ( *statsExporter) ( string) string {
return
}
return path.Join(defaultDisplayNamePrefix, )
}
func ( *statsExporter) ( []*monitoringpb.TimeSeries) ( []*monitoringpb.CreateTimeSeriesRequest) {
if len() == 0 {
return nil
}
:= make([]*monitoringpb.TimeSeries, 0, len())
:= make([]*monitoringpb.TimeSeries, 0, len())
:= make(map[string]struct{})
for , := range {
:= metricSignature(.Metric)
if , := []; ! {
= append(, )
[] = struct{}{}
} else {
= append(, )
}
}
= append(, &monitoringpb.CreateTimeSeriesRequest{
Name: fmt.Sprintf("projects/%s", .o.ProjectID),
TimeSeries: ,
})
:= .()
= append(, ...)
return
}
func ( *googlemetricpb.Metric) string {
:= .GetLabels()
:= make([]string, 0, len())
for , := range {
= append(, )
}
sort.Strings()
return fmt.Sprintf("%s:%s", .GetType(), strings.Join(, ","))
}
func ( *view.View, *view.Row, , time.Time) *monitoringpb.Point {
switch .Aggregation.Type {
case view.AggTypeLastValue:
return newGaugePoint(, , )
default:
return newCumulativePoint(, , , )
}
}
func ( *view.View, *view.Row, , time.Time) *monitoringpb.Point {
return &monitoringpb.Point{
Interval: &monitoringpb.TimeInterval{
StartTime: ×tamp.Timestamp{
Seconds: .Unix(),
Nanos: int32(.Nanosecond()),
},
EndTime: ×tamp.Timestamp{
Seconds: .Unix(),
Nanos: int32(.Nanosecond()),
},
},
Value: newTypedValue(, ),
}
}
func ( *view.View, *view.Row, time.Time) *monitoringpb.Point {
:= ×tamp.Timestamp{
Seconds: .Unix(),
Nanos: int32(.Nanosecond()),
}
return &monitoringpb.Point{
Interval: &monitoringpb.TimeInterval{
EndTime: ,
},
Value: newTypedValue(, ),
}
}
func ( *view.View, *view.Row) *monitoringpb.TypedValue {
switch v := .Data.(type) {
case *view.CountData:
return &monitoringpb.TypedValue{Value: &monitoringpb.TypedValue_Int64Value{
Int64Value: .Value,
}}
case *view.SumData:
switch .Measure.(type) {
case *stats.Int64Measure:
return &monitoringpb.TypedValue{Value: &monitoringpb.TypedValue_Int64Value{
Int64Value: int64(.Value),
}}
case *stats.Float64Measure:
return &monitoringpb.TypedValue{Value: &monitoringpb.TypedValue_DoubleValue{
DoubleValue: .Value,
}}
}
case *view.DistributionData:
:= shouldInsertZeroBound(.Aggregation.Buckets...)
return &monitoringpb.TypedValue{Value: &monitoringpb.TypedValue_DistributionValue{
DistributionValue: &distributionpb.Distribution{
Count: .Count,
Mean: .Mean,
BucketOptions: &distributionpb.Distribution_BucketOptions{
Options: &distributionpb.Distribution_BucketOptions_ExplicitBuckets{
ExplicitBuckets: &distributionpb.Distribution_BucketOptions_Explicit{
Bounds: addZeroBoundOnCondition(, .Aggregation.Buckets...),
},
},
},
BucketCounts: addZeroBucketCountOnCondition(, .CountPerBucket...),
},
}}
case *view.LastValueData:
switch .Measure.(type) {
case *stats.Int64Measure:
return &monitoringpb.TypedValue{Value: &monitoringpb.TypedValue_Int64Value{
Int64Value: int64(.Value),
}}
case *stats.Float64Measure:
return &monitoringpb.TypedValue{Value: &monitoringpb.TypedValue_DoubleValue{
DoubleValue: .Value,
}}
}
}
return nil
}
func ( ...float64) bool {
if len() > 0 && [0] > 0.0 {
return true
}
return false
}
func ( bool, ...int64) []int64 {
if {
return append([]int64{0}, ...)
}
return
}
func ( bool, ...float64) []float64 {
if {
return append([]float64{0.0}, ...)
}
return
}
func ( *statsExporter) ( *view.View) string {
if := .o.GetMetricType; != nil {
return ()
}
return path.Join("custom.googleapis.com", "opencensus", .Name)
}
func ( map[string]labelValue, []tag.Tag) map[string]string {
:= make(map[string]string)
for , := range {
[sanitize()] = .val
}
for , := range {
[sanitize(.Key.Name())] = .Value
}
return
}
func ( map[string]labelValue, []tag.Key) []*labelpb.LabelDescriptor {
:= make([]*labelpb.LabelDescriptor, 0, len()+len())
for , := range {
= append(, &labelpb.LabelDescriptor{
Key: sanitize(),
Description: .desc,
ValueType: labelpb.LabelDescriptor_STRING,
})
}
for , := range {
= append(, &labelpb.LabelDescriptor{
Key: sanitize(.Name()),
ValueType: labelpb.LabelDescriptor_STRING, // We only use string tags
})
}
return
}
func ( *statsExporter) ( context.Context, *metric.MetricDescriptor) error {
, := newContextWithTimeout(, .o.Timeout)
defer ()
:= &monitoringpb.CreateMetricDescriptorRequest{
Name: fmt.Sprintf("projects/%s", .o.ProjectID),
MetricDescriptor: ,
}
, := createMetricDescriptor(, .c, )
return
}
var createMetricDescriptor = func( context.Context, *monitoring.MetricClient, *monitoringpb.CreateMetricDescriptorRequest) (*metric.MetricDescriptor, error) {
return .CreateMetricDescriptor(, )
}
var createTimeSeries = func( context.Context, *monitoring.MetricClient, *monitoringpb.CreateTimeSeriesRequest) error {
return .CreateTimeSeries(, )
}
var knownExternalMetricPrefixes = []string{
"custom.googleapis.com/",
"external.googleapis.com/",
}
![]() |
The pages are generated with Golds v0.3.2-preview. (GOOS=darwin GOARCH=amd64) Golds is a Go 101 project developed by Tapir Liu. PR and bug reports are welcome and can be submitted to the issue list. Please follow @Go100and1 (reachable from the left QR code) to get the latest news of Golds. |