Copyright 2017, OpenCensus Authors Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.

package stackdriver

import (
	
	
	
	
	
	
	
	
	
	

	
	
	
	

	monitoring 
	
	
	
	
	
	distributionpb 
	labelpb 
	
	googlemetricpb 
	metricpb 
	monitoredrespb 
	monitoringpb 
)

const (
	maxTimeSeriesPerUpload    = 200
	opencensusTaskKey         = "opencensus_task"
	opencensusTaskDescription = "Opencensus task identifier"
	defaultDisplayNamePrefix  = "OpenCensus"
	version                   = "0.13.3"
)
statsExporter exports stats to the Stackdriver Monitoring.
type statsExporter struct {
	o Options

	viewDataBundler *bundler.Bundler
	metricsBundler  *bundler.Bundler

	protoMu                sync.Mutex
	protoMetricDescriptors map[string]bool // Metric descriptors that were already created remotely

	metricMu          sync.Mutex
	metricDescriptors map[string]bool // Metric descriptors that were already created remotely

	c             *monitoring.MetricClient
	defaultLabels map[string]labelValue
	ir            *metricexport.IntervalReader

	initReaderOnce sync.Once
}

var (
	errBlankProjectID = errors.New("expecting a non-blank ProjectID")
)
newStatsExporter returns an exporter that uploads stats data to Stackdriver Monitoring. Only one Stackdriver exporter should be created per ProjectID per process, any subsequent invocations of NewExporter with the same ProjectID will return an error.
func ( Options) (*statsExporter, error) {
	if strings.TrimSpace(.ProjectID) == "" {
		return nil, errBlankProjectID
	}

	 := append(.MonitoringClientOptions, option.WithUserAgent(.UserAgent))
	 := .Context
	if  == nil {
		 = context.Background()
	}
	,  := monitoring.NewMetricClient(, ...)
	if  != nil {
		return nil, 
	}
	 := &statsExporter{
		c:                      ,
		o:                      ,
		protoMetricDescriptors: make(map[string]bool),
		metricDescriptors:      make(map[string]bool),
	}

	var  map[string]labelValue
	if .DefaultMonitoringLabels != nil {
		 = .DefaultMonitoringLabels.m
	} else {
		 = map[string]labelValue{
			opencensusTaskKey: {val: getTaskValue(), desc: opencensusTaskDescription},
		}
	}

Fill in the defaults firstly, irrespective of if the labelKeys and labelValues are mismatched.
	for ,  := range  {
		.defaultLabels[sanitize()] = 
	}

	.viewDataBundler = bundler.NewBundler((*view.Data)(nil), func( interface{}) {
		 := .([]*view.Data)
		.handleUpload(...)
	})
	.metricsBundler = bundler.NewBundler((*metricdata.Metric)(nil), func( interface{}) {
		 := .([]*metricdata.Metric)
		.handleMetricsUpload()
	})
	if  := .o.BundleDelayThreshold;  > 0 {
		.viewDataBundler.DelayThreshold = 
		.metricsBundler.DelayThreshold = 
	}
	if  := .o.BundleCountThreshold;  > 0 {
		.viewDataBundler.BundleCountThreshold = 
		.metricsBundler.BundleCountThreshold = 
	}
	return , nil
}

func ( *statsExporter) () error {
	.initReaderOnce.Do(func() {
		.ir, _ = metricexport.NewIntervalReader(metricexport.NewReader(), )
	})
	.ir.ReportingInterval = .o.ReportingInterval
	return .ir.Start()
}

func ( *statsExporter) () {
	if .ir != nil {
		.ir.Stop()
	}
}

func ( *statsExporter) ( *view.View,  []tag.Tag) ([]tag.Tag, *monitoredrespb.MonitoredResource) {
	 := .o.Resource
	if  == nil {
		 = &monitoredrespb.MonitoredResource{
			Type: "global",
		}
	}
	return , 
}
ExportView exports to the Stackdriver Monitoring if view data has one or more rows.
func ( *statsExporter) ( *view.Data) {
	if len(.Rows) == 0 {
		return
	}
	 := .viewDataBundler.Add(, 1)
	switch  {
	case nil:
		return
	case bundler.ErrOverflow:
		.o.handleError(errors.New("failed to upload: buffer full"))
	default:
		.o.handleError()
	}
}
getTaskValue returns a task label value in the format of "go-<pid>@<hostname>".
func () string {
	,  := os.Hostname()
	if  != nil {
		 = "localhost"
	}
	return "go-" + strconv.Itoa(os.Getpid()) + "@" + 
}
handleUpload handles uploading a slice of Data, as well as error handling.
func ( *statsExporter) ( ...*view.Data) {
	if  := .uploadStats();  != nil {
		.o.handleError()
	}
}
Flush waits for exported view data and metrics to be uploaded. This is useful if your program is ending and you do not want to lose data that hasn't yet been exported.
func ( *statsExporter) () {
	.viewDataBundler.Flush()
	.metricsBundler.Flush()
}

func ( *statsExporter) ( []*view.Data) error {
	,  := newContextWithTimeout(.o.Context, .o.Timeout)
	defer ()
	,  := trace.StartSpan(
		,
		"contrib.go.opencensus.io/exporter/stackdriver.uploadStats",
		trace.WithSampler(trace.NeverSample()),
	)
	defer .End()

	for ,  := range  {
		if  := .createMetricDescriptorFromView(, .View);  != nil {
			.SetStatus(trace.Status{Code: 2, Message: .Error()})
			return 
		}
	}
	for ,  := range .makeReq(, maxTimeSeriesPerUpload) {
		if  := createTimeSeries(, .c, );  != nil {
TODO(jbd): Don't fail fast here, batch errors?
			return 
		}
	}
	return nil
}

func ( *statsExporter) ( []*view.Data,  int) []*monitoringpb.CreateTimeSeriesRequest {
	var  []*monitoringpb.CreateTimeSeriesRequest

	var  []*monitoringpb.TimeSeries
	for ,  := range  {
		for ,  := range .Rows {
			,  := .getMonitoredResource(.View, append([]tag.Tag(nil), .Tags...))
			 := &monitoringpb.TimeSeries{
				Metric: &metricpb.Metric{
					Type:   .metricType(.View),
					Labels: newLabels(.defaultLabels, ),
				},
				Resource: ,
				Points:   []*monitoringpb.Point{newPoint(.View, , .Start, .End)},
			}
			 = append(, )
		}
	}

	var  []*monitoringpb.TimeSeries
	for ,  := range  {
		 = append(, )
		if len() ==  {
			 := .combineTimeSeriesToCreateTimeSeriesRequest()
			 = append(, ...)
			 = [:0]
		}
	}

	if len() > 0 {
		 := .combineTimeSeriesToCreateTimeSeriesRequest()
		 = append(, ...)
	}
	return 
}

func ( *statsExporter) ( context.Context,  *view.View) (*metricpb.MetricDescriptor, error) {
	 := .Measure
	 := .Aggregation
	 := .Name

	 := .metricType()
	var  metricpb.MetricDescriptor_ValueType
Default metric Kind
	 := metricpb.MetricDescriptor_CUMULATIVE

	switch .Type {
	case view.AggTypeCount:
If the aggregation type is count, which counts the number of recorded measurements, the unit must be "1", because this view does not apply to the recorded values.
		 = stats.UnitDimensionless
	case view.AggTypeSum:
		switch .(type) {
		case *stats.Int64Measure:
			 = metricpb.MetricDescriptor_INT64
		case *stats.Float64Measure:
			 = metricpb.MetricDescriptor_DOUBLE
		}
	case view.AggTypeDistribution:
		 = metricpb.MetricDescriptor_DISTRIBUTION
	case view.AggTypeLastValue:
		 = metricpb.MetricDescriptor_GAUGE
		switch .(type) {
		case *stats.Int64Measure:
			 = metricpb.MetricDescriptor_INT64
		case *stats.Float64Measure:
			 = metricpb.MetricDescriptor_DOUBLE
		}
	default:
		return nil, fmt.Errorf("unsupported aggregation type: %s", .Type.String())
	}

	var  string
	if .o.GetMetricDisplayName == nil {
		 = .displayName()
	} else {
		 = .o.GetMetricDisplayName()
	}

	 := &metricpb.MetricDescriptor{
		Name:        fmt.Sprintf("projects/%s/metricDescriptors/%s", .o.ProjectID, ),
		DisplayName: ,
		Description: .Description,
		Unit:        ,
		Type:        ,
		MetricKind:  ,
		ValueType:   ,
		Labels:      newLabelDescriptors(.defaultLabels, .TagKeys),
	}
	return , nil
}
createMetricDescriptorFromView creates a MetricDescriptor for the given view data in Stackdriver Monitoring. An error will be returned if there is already a metric descriptor created with the same name but it has a different aggregation or keys.
Skip create metric descriptor if configured
	if .o.SkipCMD {
		return nil
	}

	.metricMu.Lock()
	defer .metricMu.Unlock()

	 := .Name

	if ,  := .metricDescriptors[];  {
		return nil
	}

	if builtinMetric(.metricType()) {
		.metricDescriptors[] = true
		return nil
	}

	,  := .viewToMetricDescriptor(, )
	if  != nil {
		return 
	}

	if  = .createMetricDescriptor(, );  != nil {
		return 
	}
Now cache the metric descriptor
	.metricDescriptors[] = true
	return nil
}

func ( *statsExporter) ( string) string {
If the display name suffix is already prefixed with domain, skip adding extra prefix
		return 
	}
	return path.Join(defaultDisplayNamePrefix, )
}

func ( *statsExporter) ( []*monitoringpb.TimeSeries) ( []*monitoringpb.CreateTimeSeriesRequest) {
	if len() == 0 {
		return nil
	}
Since there are scenarios in which Metrics with the same Type can be bunched in the same TimeSeries, we have to ensure that we create a unique CreateTimeSeriesRequest with entirely unique Metrics per TimeSeries, lest we'll encounter: err: rpc error: code = InvalidArgument desc = One or more TimeSeries could not be written: Field timeSeries[2] had an invalid value: Duplicate TimeSeries encountered. Only one point can be written per TimeSeries per request.: timeSeries[2] This scenario happens when we are using the OpenCensus Agent in which multiple metrics are streamed by various client applications. See https://github.com/census-ecosystem/opencensus-go-exporter-stackdriver/issues/73
	 := make([]*monitoringpb.TimeSeries, 0, len())
	 := make([]*monitoringpb.TimeSeries, 0, len())
	 := make(map[string]struct{})

	for ,  := range  {
		 := metricSignature(.Metric)
		if ,  := []; ! {
			 = append(, )
			[] = struct{}{}
		} else {
			 = append(, )
		}
	}
UniqueTimeSeries can be bunched up together While for each nonUniqueTimeSeries, we have to make a unique CreateTimeSeriesRequest.
	 = append(, &monitoringpb.CreateTimeSeriesRequest{
		Name:       fmt.Sprintf("projects/%s", .o.ProjectID),
		TimeSeries: ,
	})
Now recursively also combine the non-unique TimeSeries that were singly added to nonUniqueTimeSeries. The reason is that we need optimal combinations for optimal combinations because: * "a/b/c" * "a/b/c" * "x/y/z" * "a/b/c" * "x/y/z" * "p/y/z" * "d/y/z" should produce: CreateTimeSeries(uniqueTimeSeries) :: ["a/b/c", "x/y/z", "p/y/z", "d/y/z"] CreateTimeSeries(nonUniqueTimeSeries) :: ["a/b/c"] CreateTimeSeries(nonUniqueTimeSeries) :: ["a/b/c", "x/y/z"]
	 := .()
	 = append(, ...)

	return 
}
metricSignature creates a unique signature consisting of a metric's type and its lexicographically sorted label values See https://github.com/census-ecosystem/opencensus-go-exporter-stackdriver/issues/120
func ( *googlemetricpb.Metric) string {
	 := .GetLabels()
	 := make([]string, 0, len())

	for ,  := range  {
		 = append(, )
	}
	sort.Strings()
	return fmt.Sprintf("%s:%s", .GetType(), strings.Join(, ","))
}

func ( *view.View,  *view.Row, ,  time.Time) *monitoringpb.Point {
	switch .Aggregation.Type {
	case view.AggTypeLastValue:
		return newGaugePoint(, , )
	default:
		return newCumulativePoint(, , , )
	}
}

func ( *view.View,  *view.Row, ,  time.Time) *monitoringpb.Point {
	return &monitoringpb.Point{
		Interval: &monitoringpb.TimeInterval{
			StartTime: &timestamp.Timestamp{
				Seconds: .Unix(),
				Nanos:   int32(.Nanosecond()),
			},
			EndTime: &timestamp.Timestamp{
				Seconds: .Unix(),
				Nanos:   int32(.Nanosecond()),
			},
		},
		Value: newTypedValue(, ),
	}
}

func ( *view.View,  *view.Row,  time.Time) *monitoringpb.Point {
	 := &timestamp.Timestamp{
		Seconds: .Unix(),
		Nanos:   int32(.Nanosecond()),
	}
	return &monitoringpb.Point{
		Interval: &monitoringpb.TimeInterval{
			EndTime: ,
		},
		Value: newTypedValue(, ),
	}
}

func ( *view.View,  *view.Row) *monitoringpb.TypedValue {
	switch v := .Data.(type) {
	case *view.CountData:
		return &monitoringpb.TypedValue{Value: &monitoringpb.TypedValue_Int64Value{
			Int64Value: .Value,
		}}
	case *view.SumData:
		switch .Measure.(type) {
		case *stats.Int64Measure:
			return &monitoringpb.TypedValue{Value: &monitoringpb.TypedValue_Int64Value{
				Int64Value: int64(.Value),
			}}
		case *stats.Float64Measure:
			return &monitoringpb.TypedValue{Value: &monitoringpb.TypedValue_DoubleValue{
				DoubleValue: .Value,
			}}
		}
	case *view.DistributionData:
		 := shouldInsertZeroBound(.Aggregation.Buckets...)
		return &monitoringpb.TypedValue{Value: &monitoringpb.TypedValue_DistributionValue{
			DistributionValue: &distributionpb.Distribution{
				Count:                 .Count,
				Mean:                  .Mean,
TODO(songya): uncomment this once Stackdriver supports min/max. Range: &distributionpb.Distribution_Range{ Min: v.Min, Max: v.Max, },
				BucketOptions: &distributionpb.Distribution_BucketOptions{
					Options: &distributionpb.Distribution_BucketOptions_ExplicitBuckets{
						ExplicitBuckets: &distributionpb.Distribution_BucketOptions_Explicit{
							Bounds: addZeroBoundOnCondition(, .Aggregation.Buckets...),
						},
					},
				},
				BucketCounts: addZeroBucketCountOnCondition(, .CountPerBucket...),
			},
		}}
	case *view.LastValueData:
		switch .Measure.(type) {
		case *stats.Int64Measure:
			return &monitoringpb.TypedValue{Value: &monitoringpb.TypedValue_Int64Value{
				Int64Value: int64(.Value),
			}}
		case *stats.Float64Measure:
			return &monitoringpb.TypedValue{Value: &monitoringpb.TypedValue_DoubleValue{
				DoubleValue: .Value,
			}}
		}
	}
	return nil
}

func ( ...float64) bool {
	if len() > 0 && [0] > 0.0 {
		return true
	}
	return false
}

func ( bool,  ...int64) []int64 {
	if  {
		return append([]int64{0}, ...)
	}
	return 
}

func ( bool,  ...float64) []float64 {
	if  {
		return append([]float64{0.0}, ...)
	}
	return 
}

func ( *statsExporter) ( *view.View) string {
	if  := .o.GetMetricType;  != nil {
		return ()
	}
	return path.Join("custom.googleapis.com", "opencensus", .Name)
}

func ( map[string]labelValue,  []tag.Tag) map[string]string {
	 := make(map[string]string)
	for ,  := range  {
		[sanitize()] = .val
	}
	for ,  := range  {
		[sanitize(.Key.Name())] = .Value
	}
	return 
}

func ( map[string]labelValue,  []tag.Key) []*labelpb.LabelDescriptor {
	 := make([]*labelpb.LabelDescriptor, 0, len()+len())
	for ,  := range  {
		 = append(, &labelpb.LabelDescriptor{
			Key:         sanitize(),
			Description: .desc,
			ValueType:   labelpb.LabelDescriptor_STRING,
		})
	}
	for ,  := range  {
		 = append(, &labelpb.LabelDescriptor{
			Key:       sanitize(.Name()),
			ValueType: labelpb.LabelDescriptor_STRING, // We only use string tags
		})
	}
	return 
}

func ( *statsExporter) ( context.Context,  *metric.MetricDescriptor) error {
	,  := newContextWithTimeout(, .o.Timeout)
	defer ()
	 := &monitoringpb.CreateMetricDescriptorRequest{
		Name:             fmt.Sprintf("projects/%s", .o.ProjectID),
		MetricDescriptor: ,
	}
	,  := createMetricDescriptor(, .c, )
	return 
}

var createMetricDescriptor = func( context.Context,  *monitoring.MetricClient,  *monitoringpb.CreateMetricDescriptorRequest) (*metric.MetricDescriptor, error) {
	return .CreateMetricDescriptor(, )
}

var createTimeSeries = func( context.Context,  *monitoring.MetricClient,  *monitoringpb.CreateTimeSeriesRequest) error {
	return .CreateTimeSeries(, )
}

var knownExternalMetricPrefixes = []string{
	"custom.googleapis.com/",
	"external.googleapis.com/",
}
builtinMetric returns true if a MetricType is a heuristically known built-in Stackdriver metric
func ( string) bool {
	for ,  := range knownExternalMetricPrefixes {
		if strings.HasPrefix(, ) {
			return false
		}
	}
	return true