Source File
trace.go
Belonging Package
contrib.go.opencensus.io/exporter/stackdriver
package stackdriver
import (
tracingclient
tracepb
commonpb
resourcepb
)
type traceExporter struct {
o Options
projectID string
uploadFn func(spans []*tracepb.Span)
overflowLogger
client *tracingclient.Client
}
var _ trace.Exporter = (*traceExporter)(nil)
func ( Options) (*traceExporter, error) {
:= .Context
if == nil {
= context.Background()
}
, := tracingclient.NewClient(, .TraceClientOptions...)
if != nil {
return nil, fmt.Errorf("stackdriver: couldn't initialize trace client: %v", )
}
return newTraceExporterWithClient(, ), nil
}
const defaultBufferedByteLimit = 8 * 1024 * 1024
func ( Options, *tracingclient.Client) *traceExporter {
:= &traceExporter{
projectID: .ProjectID,
client: ,
o: ,
}
:= bundler.NewBundler((*tracepb.Span)(nil), func( interface{}) {
.uploadFn(.([]*tracepb.Span))
})
if .BundleDelayThreshold > 0 {
.DelayThreshold = .BundleDelayThreshold
} else {
.DelayThreshold = 2 * time.Second
}
if .BundleCountThreshold > 0 {
.BundleCountThreshold = .BundleCountThreshold
} else {
.BundleCountThreshold = 50
}
if .NumberOfWorkers > 0 {
.HandlerLimit = .NumberOfWorkers
.BundleByteThreshold = .BundleCountThreshold * 200
.BundleByteLimit = .BundleCountThreshold * 1000
if .TraceSpansBufferMaxBytes > 0 {
.BufferedByteLimit = .TraceSpansBufferMaxBytes
} else {
.BufferedByteLimit = defaultBufferedByteLimit
}
.bundler =
.uploadFn = .uploadSpans
return
}
func ( *traceExporter) ( *trace.SpanData) {
:= protoFromSpanData(, .projectID, .o.Resource, .o.UserAgent)
:= proto.Size()
:= .bundler.Add(, )
switch {
case nil:
return
case bundler.ErrOversizedItem:
case bundler.ErrOverflow:
.overflowLogger.log()
default:
.o.handleError()
}
}
func ( *traceExporter) () {
.bundler.Flush()
}
func ( *traceExporter) ( context.Context, *commonpb.Node, *resourcepb.Resource, []*trace.SpanData) (int, error) {
, := trace.StartSpan(
,
"contrib.go.opencensus.io/exporter/stackdriver.PushTraceSpans",
trace.WithSampler(trace.NeverSample()),
)
defer .End()
.AddAttributes(trace.Int64Attribute("num_spans", int64(len())))
:= make([]*tracepb.Span, 0, len())
:= .o.Resource
if != nil {
= .o.MapResource(resourcepbToResource())
}
for , := range {
= append(, protoFromSpanData(, .projectID, , .o.UserAgent))
}
:= tracepb.BatchWriteSpansRequest{
Name: "projects/" + .projectID,
Spans: ,
, := newContextWithTimeout(, .o.Timeout)
defer ()
:= .client.BatchWriteSpans(, &)
if != nil {
return len(),
}
return 0, nil
}
func ( *traceExporter) ( []*tracepb.Span) {
:= tracepb.BatchWriteSpansRequest{
Name: "projects/" + .projectID,
Spans: ,
, := newContextWithTimeout(.o.Context, .o.Timeout)
defer ()
, := trace.StartSpan(
,
"contrib.go.opencensus.io/exporter/stackdriver.uploadSpans",
trace.WithSampler(trace.NeverSample()),
)
defer .End()
.AddAttributes(trace.Int64Attribute("num_spans", int64(len())))
:= .client.BatchWriteSpans(, &)
if != nil {
.SetStatus(trace.Status{Code: 2, Message: .Error()})
.o.handleError()
}
}
type overflowLogger struct {
mu sync.Mutex
pause bool
accum int
}
func ( *overflowLogger) () {
.pause = true
time.AfterFunc(5*time.Second, func() {
.mu.Lock()
defer .mu.Unlock()
switch {
case .accum == 0:
.pause = false
case .accum == 1:
log.Println("OpenCensus Stackdriver exporter: failed to upload span: buffer full")
.accum = 0
.()
default:
log.Printf("OpenCensus Stackdriver exporter: failed to upload %d spans: buffer full", .accum)
.accum = 0
.()
}
})
}
func ( *overflowLogger) () {
.mu.Lock()
defer .mu.Unlock()
if !.pause {
log.Println("OpenCensus Stackdriver exporter: failed to upload span: buffer full")
.delay()
} else {
.accum++
}
![]() |
The pages are generated with Golds v0.3.2-preview. (GOOS=darwin GOARCH=amd64) Golds is a Go 101 project developed by Tapir Liu. PR and bug reports are welcome and can be submitted to the issue list. Please follow @Go100and1 (reachable from the left QR code) to get the latest news of Golds. |