Copyright 2019, OpenCensus Authors Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.

package stackdriver
The code in this file is responsible for converting OpenCensus Proto metricsdirectly to Stackdriver Metrics.

import (
	
	
	

	
	
	
	

	distributionpb 
	labelpb 
	googlemetricpb 
	monitoredrespb 
	monitoringpb 

	
	
	
)

const (
	exemplarAttachmentTypeString  = "type.googleapis.com/google.protobuf.StringValue"
	exemplarAttachmentTypeSpanCtx = "type.googleapis.com/google.monitoring.v3.SpanContext"
TODO(songy23): add support for this. exemplarAttachmentTypeDroppedLabels = "type.googleapis.com/google.monitoring.v3.DroppedLabels"
)
ExportMetrics exports OpenCensus Metrics to Stackdriver Monitoring.
func ( *statsExporter) ( context.Context,  []*metricdata.Metric) error {
	if len() == 0 {
		return nil
	}

	for ,  := range  {
TODO: [rghetia] handle errors.
	}

	return nil
}

func ( *statsExporter) ( []*metricdata.Metric) {
	 := .uploadMetrics()
	if  != nil {
		.o.handleError()
	}
}

func ( *statsExporter) ( []*metricdata.Metric) error {
	,  := newContextWithTimeout(.o.Context, .o.Timeout)
	defer ()

	var  []error

	,  := trace.StartSpan(
		,
		"contrib.go.opencensus.io/exporter/stackdriver.uploadMetrics",
		trace.WithSampler(trace.NeverSample()),
	)
	defer .End()

Now create the metric descriptor remotely.
		if  := .createMetricDescriptorFromMetric(, );  != nil {
			.SetStatus(trace.Status{Code: trace.StatusCodeUnknown, Message: .Error()})
			 = append(, )
			continue
		}
	}

	var  []*monitoringpb.TimeSeries
	for ,  := range  {
		,  := .metricToMpbTs(, )
		if  != nil {
			.SetStatus(trace.Status{Code: trace.StatusCodeUnknown, Message: .Error()})
			 = append(, )
			continue
		}
		if  != nil {
			 = append(, ...)
		}
	}
Now batch timeseries up and then export.
	for ,  := 0, 0;  < len();  =  {
		 =  + maxTimeSeriesPerUpload
		if  > len() {
			 = len()
		}
		 := [:]
		 := .combineTimeSeriesToCreateTimeSeriesRequest()
		for ,  := range  {
			if  := createTimeSeries(, .c, );  != nil {
				.SetStatus(trace.Status{Code: trace.StatusCodeUnknown, Message: .Error()})
				 = append(, )
			}
		}
	}

	 := len()
	if  == 0 {
		return nil
	} else if  == 1 {
		return [0]
	}
	 := make([]string, 0, )
	for ,  := range  {
		 = append(, .Error())
	}
	return fmt.Errorf("[%s]", strings.Join(, "; "))
}
metricToMpbTs converts a metric into a list of Stackdriver Monitoring v3 API TimeSeries but it doesn't invoke any remote API.
func ( *statsExporter) ( context.Context,  *metricdata.Metric) ([]*monitoringpb.TimeSeries, error) {
	if  == nil {
		return nil, errNilMetricOrMetricDescriptor
	}

	 := .metricRscToMpbRsc(.Resource)

	 := .Descriptor.Name
	 := .metricTypeFromProto()
	 := .Descriptor.LabelKeys
	,  := metricDescriptorTypeToMetricKind()

ignore these Timeserieses. TODO [rghetia] log errors.
		return nil, nil
	}

	 := make([]*monitoringpb.TimeSeries, 0, len(.TimeSeries))
	for ,  := range .TimeSeries {
		,  := .metricTsToMpbPoint(, )
TODO(@rghetia): record error metrics
			continue
		}
Each TimeSeries has labelValues which MUST be correlated with that from the MetricDescriptor
		,  := metricLabelsToTsLabels(.defaultLabels, , .LabelValues)
TODO: (@rghetia) perhaps log this error from labels extraction, if non-nil.
TODO(rghetia): optimize this. It is inefficient to convert this for all metrics.
			 = convertMonitoredResourceToPB()
			if .Type == "" {
				.Type = "global"
				.Labels = nil
			}
		} else {
			 = 
		}
		 = append(, &monitoringpb.TimeSeries{
			Metric: &googlemetricpb.Metric{
				Type:   ,
				Labels: ,
			},
			Resource: ,
			Points:   ,
		})
	}

	return , nil
}

Perform this sanity check now.
	if len() != len() {
		return nil, fmt.Errorf("length mismatch: len(labelKeys)=%d len(labelValues)=%d", len(), len())
	}

	if len()+len() == 0 {
		return nil, nil
	}

Fill in the defaults firstly, irrespective of if the labelKeys and labelValues are mismatched.
	for ,  := range  {
		[sanitize()] = .val
	}

	for ,  := range  {
		 := []
		if .Present {
			[sanitize(.Key)] = .Value
		}
	}

	return , nil
}
createMetricDescriptorFromMetric creates a metric descriptor from the OpenCensus metric and then creates it remotely using Stackdriver's API.
Skip create metric descriptor if configured
	if .o.SkipCMD {
		return nil
	}

	.metricMu.Lock()
	defer .metricMu.Unlock()

	 := .Descriptor.Name
	if ,  := .metricDescriptors[];  {
		return nil
	}

	if builtinMetric(.metricTypeFromProto()) {
		.metricDescriptors[] = true
		return nil
	}
Otherwise, we encountered a cache-miss and should create the metric descriptor remotely.
	,  := .metricToMpbMetricDescriptor()
	if  != nil {
		return 
	}

	if  = .createMetricDescriptor(, );  != nil {
		return 
	}
Now record the metric as having been created.
	.metricDescriptors[] = true
	return nil
}

func ( *statsExporter) ( *metricdata.Metric) (*googlemetricpb.MetricDescriptor, error) {
	if  == nil {
		return nil, errNilMetricOrMetricDescriptor
	}

	 := .metricTypeFromProto(.Descriptor.Name)
	 := .displayName(.Descriptor.Name)
	,  := metricDescriptorTypeToMetricKind()

	 := &googlemetricpb.MetricDescriptor{
		Name:        fmt.Sprintf("projects/%s/metricDescriptors/%s", .o.ProjectID, ),
		DisplayName: ,
		Description: .Descriptor.Description,
		Unit:        string(.Descriptor.Unit),
		Type:        ,
		MetricKind:  ,
		ValueType:   ,
		Labels:      metricLableKeysToLabels(.defaultLabels, .Descriptor.LabelKeys),
	}

	return , nil
}

func ( map[string]labelValue,  []metricdata.LabelKey) []*labelpb.LabelDescriptor {
	 := make([]*labelpb.LabelDescriptor, 0, len()+len())
Fill in the defaults first.
	for ,  := range  {
		 = append(, &labelpb.LabelDescriptor{
			Key:         sanitize(),
			Description: .desc,
			ValueType:   labelpb.LabelDescriptor_STRING,
		})
	}
TODO: [rghetia] after upgrading to proto version3, retrun UNRECOGNIZED instead of UNSPECIFIED
TODO: [rghetia] after upgrading to proto version3, retrun UNRECOGNIZED instead of UNSPECIFIED
		return googlemetricpb.MetricDescriptor_METRIC_KIND_UNSPECIFIED, googlemetricpb.MetricDescriptor_VALUE_TYPE_UNSPECIFIED
	}
}

func ( *statsExporter) ( *resource.Resource) *monitoredrespb.MonitoredResource {
	if  == nil {
		 := .o.Resource
		if  == nil {
			 = &monitoredrespb.MonitoredResource{
				Type: "global",
			}
		}
		return 
	}
	 := .Type
	if  == "" {
		 = "global"
	}
	 := &monitoredrespb.MonitoredResource{
		Type: ,
	}
	if .Labels != nil {
		.Labels = make(map[string]string, len(.Labels))
TODO: [rghetia] add mapping between OC Labels and SD Labels.
			.Labels[] = 
		}
	}
	return 
}

func ( *statsExporter) ( *metricdata.TimeSeries,  googlemetricpb.MetricDescriptor_MetricKind) ( []*monitoringpb.Point,  error) {
	for ,  := range .Points {
If we have a last value aggregation point i.e. MetricDescriptor_GAUGE StartTime should be nil.
		 := timestampProto(.StartTime)
		if  == googlemetricpb.MetricDescriptor_GAUGE {
			 = nil
		}

		,  := metricPointToMpbPoint(, &, .o.ProjectID)
		if  != nil {
			return nil, 
		}
		 = append(, )
	}
	return , nil
}

func ( *timestamp.Timestamp,  *metricdata.Point,  string) (*monitoringpb.Point, error) {
	if  == nil {
		return nil, nil
	}

	,  := metricPointToMpbValue(, )
	if  != nil {
		return nil, 
	}

	 := &monitoringpb.Point{
		Value: ,
		Interval: &monitoringpb.TimeInterval{
			StartTime: ,
			EndTime:   timestampProto(.Time),
		},
	}
	return , nil
}

func ( *metricdata.Point,  string) (*monitoringpb.TypedValue, error) {
	if  == nil {
		return nil, nil
	}

	var  error
	var  *monitoringpb.TypedValue
	switch v := .Value.(type) {
	default:
		 = fmt.Errorf("protoToMetricPoint: unknown Data type: %T", .Value)

	case int64:
		 = &monitoringpb.TypedValue{
			Value: &monitoringpb.TypedValue_Int64Value{
				Int64Value: ,
			},
		}

	case float64:
		 = &monitoringpb.TypedValue{
			Value: &monitoringpb.TypedValue_DoubleValue{
				DoubleValue: ,
			},
		}

	case *metricdata.Distribution:
		 := 
		var  *monitoringpb.TypedValue_DistributionValue
		var  float64
		if .Count > 0 {
			 = float64(.Sum) / float64(.Count)
		}
		 = &monitoringpb.TypedValue_DistributionValue{
			DistributionValue: &distributionpb.Distribution{
				Count:                 .Count,
				Mean:                  ,
				SumOfSquaredDeviation: .SumOfSquaredDeviation,
			},
		}

		 := false
		if  := .BucketOptions;  != nil {
			 = shouldInsertZeroBound(.Bounds...)
			.DistributionValue.BucketOptions = &distributionpb.Distribution_BucketOptions{
				Options: &distributionpb.Distribution_BucketOptions_ExplicitBuckets{
The first bucket bound should be 0.0 because the Metrics first bucket is [0, first_bound) but Stackdriver monitoring bucket bounds begin with -infinity (first bucket is (-infinity, 0))
						Bounds: addZeroBoundOnCondition(, .Bounds...),
					},
				},
			}
		}
		,  := metricBucketToBucketCountsAndExemplars(.Buckets, )
		.DistributionValue.BucketCounts = addZeroBucketCountOnCondition(, ...)
		.DistributionValue.Exemplars = 

		 = &monitoringpb.TypedValue{Value: }
	}

	return , 
}

func ( []metricdata.Bucket,  string) ([]int64, []*distributionpb.Distribution_Exemplar) {
	 := make([]int64, len())
	var  []*distributionpb.Distribution_Exemplar
	for ,  := range  {
		[] = .Count
		if .Exemplar != nil {
			 = append(, metricExemplarToPbExemplar(.Exemplar, ))
		}
	}
	return , 
}

func ( *metricdata.Exemplar,  string) *distributionpb.Distribution_Exemplar {
	return &distributionpb.Distribution_Exemplar{
		Value:       .Value,
		Timestamp:   timestampProto(.Timestamp),
		Attachments: attachmentsToPbAttachments(.Attachments, ),
	}
}

func ( metricdata.Attachments,  string) []*any.Any {
	var  []*any.Any
	for ,  := range  {
		if ,  := .(trace.SpanContext);  {
			 = append(, toPbSpanCtxAttachment(, ))
Treat everything else as plain string for now. TODO(songy23): add support for dropped label attachments.
			 = append(, toPbStringAttachment())
		}
	}
	return 
}

func ( interface{}) *any.Any {
	 := fmt.Sprintf("%v", )
	return &any.Any{
		TypeUrl: exemplarAttachmentTypeString,
		Value:   []byte(),
	}
}

func ( trace.SpanContext,  string) *any.Any {
	 := monitoringpb.SpanContext{
		SpanName: fmt.Sprintf("projects/%s/traces/%s/spans/%s", , .TraceID.String(), .SpanID.String()),
	}
	,  := proto.Marshal(&)
	return &any.Any{
		TypeUrl: exemplarAttachmentTypeSpanCtx,
		Value:   ,
	}