Source File
metrics_batcher.go
Belonging Package
contrib.go.opencensus.io/exporter/stackdriver
package stackdriver
import (
monitoring
monitoringpb
)
const (
minNumWorkers = 1
minReqsChanSize = 5
)
type metricsBatcher struct {
projectName string
allTss []*monitoringpb.TimeSeries
allErrs []error
reqsChan chan *monitoringpb.CreateTimeSeriesRequest
respsChan chan *response
wg *sync.WaitGroup
}
func ( context.Context, string, int, *monitoring.MetricClient, time.Duration) *metricsBatcher {
if < minNumWorkers {
= minNumWorkers
}
:= make([]*worker, 0, )
:=
if < minReqsChanSize {
= minReqsChanSize
}
:= make(chan *monitoringpb.CreateTimeSeriesRequest, )
:= make(chan *response, )
var sync.WaitGroup
.Add()
for := 0; < ; ++ {
:= newWorker(, , , , &, )
= append(, )
go .start()
}
return &metricsBatcher{
projectName: fmt.Sprintf("projects/%s", ),
allTss: make([]*monitoringpb.TimeSeries, 0, maxTimeSeriesPerUpload),
droppedTimeSeries: 0,
workers: ,
wg: &,
reqsChan: ,
respsChan: ,
}
}
func ( *metricsBatcher) ( int, ...error) {
.droppedTimeSeries +=
for , := range {
if != nil {
.allErrs = append(.allErrs, )
}
}
}
func ( *metricsBatcher) ( *monitoringpb.TimeSeries) {
.allTss = append(.allTss, )
if len(.allTss) == maxTimeSeriesPerUpload {
.sendReqToChan()
.allTss = make([]*monitoringpb.TimeSeries, 0, maxTimeSeriesPerUpload)
}
}
if len(.allTss) > 0 {
.sendReqToChan()
}
close(.reqsChan)
.wg.Wait()
for := 0; < len(.workers); ++ {
:= <-.respsChan
.recordDroppedTimeseries(.droppedTimeSeries, .errs...)
}
close(.respsChan)
:= len(.allErrs)
if == 0 {
return nil
}
if == 1 {
return .allErrs[0]
}
:= make([]string, 0, )
for , := range .allErrs {
= append(, .Error())
}
return fmt.Errorf("[%s]", strings.Join(, "; "))
}
func ( *metricsBatcher) () {
:= &monitoringpb.CreateTimeSeriesRequest{
Name: .projectName,
TimeSeries: .allTss,
}
.reqsChan <-
}
var timeSeriesErrRegex = regexp.MustCompile(`: timeSeries\[([0-9]+(?:-[0-9]+)?(?:,[0-9]+(?:-[0-9]+)?)*)\]`)
if == nil {
return 0, nil
}
:= createTimeSeries(, , )
if == nil {
return 0, nil
}
:= timeSeriesErrRegex.FindAllStringSubmatch(.Error(), -1)
if !strings.HasPrefix(.Error(), "One or more TimeSeries could not be written:") || len() == 0 {
return len(.TimeSeries),
}
:= 0
for , := range {
for := 1; < len(); ++ {
for , := range strings.Split([], ",") {
:= strings.Split(, "-")
, := strconv.Atoi([0])
:=
if len() > 1 {
, _ = strconv.Atoi([1])
}
+= - + 1
}
}
}
return ,
}
type worker struct {
ctx context.Context
timeout time.Duration
mc *monitoring.MetricClient
resp *response
respsChan chan *response
reqsChan chan *monitoringpb.CreateTimeSeriesRequest
wg *sync.WaitGroup
}
func (
context.Context,
*monitoring.MetricClient,
chan *monitoringpb.CreateTimeSeriesRequest,
chan *response,
*sync.WaitGroup,
time.Duration) *worker {
return &worker{
ctx: ,
mc: ,
resp: &response{},
reqsChan: ,
respsChan: ,
wg: ,
}
}
func ( *worker) () {
for := range .reqsChan {
.sendReqWithTimeout()
}
.respsChan <- .resp
.wg.Done()
}
func ( *worker) ( *monitoringpb.CreateTimeSeriesRequest) {
, := newContextWithTimeout(.ctx, .timeout)
defer ()
.recordDroppedTimeseries(sendReq(, .mc, ))
}
func ( *worker) ( int, error) {
.resp.droppedTimeSeries +=
if != nil {
.resp.errs = append(.resp.errs, )
}
}
type response struct {
droppedTimeSeries int
errs []error
![]() |
The pages are generated with Golds v0.3.2-preview. (GOOS=darwin GOARCH=amd64) Golds is a Go 101 project developed by Tapir Liu. PR and bug reports are welcome and can be submitted to the issue list. Please follow @Go100and1 (reachable from the left QR code) to get the latest news of Golds. |