Source File
clientconn.go
Belonging Package
google.golang.org/grpc
grpclbName = "grpclb"
)
invalidDefaultServiceConfigErrPrefix = "grpc: the provided default service config is invalid"
)
errCredentialsConflict = errors.New("grpc: transport credentials are set for an insecure connection (grpc.WithTransportCredentials() and grpc.WithInsecure() are both called)")
)
const (
defaultClientMaxReceiveMessageSize = 1024 * 1024 * 4
defaultWriteBufSize = 32 * 1024
defaultReadBufSize = 32 * 1024
)
func ( string, ...DialOption) (*ClientConn, error) {
return DialContext(context.Background(), , ...)
}
func ( context.Context, string, ...DialOption) ( *ClientConn, error) {
:= &ClientConn{
target: ,
csMgr: &connectivityStateManager{},
conns: make(map[*addrConn]struct{}),
dopts: defaultDialOptions(),
blockingpicker: newPickerWrapper(),
czData: new(channelzData),
firstResolveEvent: grpcsync.NewEvent(),
}
.retryThrottler.Store((*retryThrottler)(nil))
.ctx, .cancel = context.WithCancel(context.Background())
for , := range {
.apply(&.dopts)
}
chainUnaryClientInterceptors()
chainStreamClientInterceptors()
defer func() {
if != nil {
.Close()
}
}()
if channelz.IsOn() {
if .dopts.channelzParentID != 0 {
.channelzID = channelz.RegisterChannel(&channelzChannel{}, .dopts.channelzParentID, )
channelz.AddTraceEvent(logger, .channelzID, 0, &channelz.TraceEventDesc{
Desc: "Channel Created",
Severity: channelz.CtINFO,
Parent: &channelz.TraceEventDesc{
Desc: fmt.Sprintf("Nested Channel(id:%d) created", .channelzID),
Severity: channelz.CtINFO,
},
})
} else {
.channelzID = channelz.RegisterChannel(&channelzChannel{}, 0, )
channelz.Info(logger, .channelzID, "Channel Created")
}
.csMgr.channelzID = .channelzID
}
if !.dopts.insecure {
if .dopts.copts.TransportCredentials == nil && .dopts.copts.CredsBundle == nil {
return nil, errNoTransportSecurity
}
if .dopts.copts.TransportCredentials != nil && .dopts.copts.CredsBundle != nil {
return nil, errTransportCredsAndBundle
}
} else {
if .dopts.copts.TransportCredentials != nil || .dopts.copts.CredsBundle != nil {
return nil, errCredentialsConflict
}
for , := range .dopts.copts.PerRPCCredentials {
if .RequireTransportSecurity() {
return nil, errTransportCredentialsMissing
}
}
}
if .dopts.defaultServiceConfigRawJSON != nil {
:= parseServiceConfig(*.dopts.defaultServiceConfigRawJSON)
if .Err != nil {
return nil, fmt.Errorf("%s: %v", invalidDefaultServiceConfigErrPrefix, .Err)
}
.dopts.defaultServiceConfig, _ = .Config.(*ServiceConfig)
}
.mkp = .dopts.copts.KeepaliveParams
if .dopts.copts.Dialer == nil {
.dopts.copts.Dialer = func( context.Context, string) (net.Conn, error) {
, := parseDialTarget()
return (&net.Dialer{}).DialContext(, , )
}
if .dopts.withProxy {
.dopts.copts.Dialer = newProxyDialer(.dopts.copts.Dialer)
}
}
if .dopts.copts.UserAgent != "" {
.dopts.copts.UserAgent += " " + grpcUA
} else {
.dopts.copts.UserAgent = grpcUA
}
if .dopts.timeout > 0 {
var context.CancelFunc
, = context.WithTimeout(, .dopts.timeout)
defer ()
}
defer func() {
select {
case <-.Done():
switch {
case .Err() == :
= nil
case == nil || !.dopts.returnLastError:
, = nil, .Err()
default:
, = nil, fmt.Errorf("%v: %v", .Err(), )
}
default:
}
}()
:= false
.parsedTarget = grpcutil.ParseTarget(.target)
:= strings.HasPrefix(.target, "unix:")
channelz.Infof(logger, .channelzID, "parsed scheme: %q", .parsedTarget.Scheme)
:= .getResolver(.parsedTarget.Scheme)
channelz.Infof(logger, .channelzID, "scheme %q not registered, fallback to default scheme", .parsedTarget.Scheme)
.parsedTarget = resolver.Target{
Scheme: resolver.GetDefaultScheme(),
Endpoint: ,
}
= .getResolver(.parsedTarget.Scheme)
if == nil {
return nil, fmt.Errorf("could not get resolver for default scheme: %q", .parsedTarget.Scheme)
}
}
:= .dopts.copts.TransportCredentials
if != nil && .Info().ServerName != "" {
.authority = .Info().ServerName
} else if .dopts.insecure && .dopts.authority != "" {
.authority = .dopts.authority
} else if {
.authority = "localhost"
.authority = .parsedTarget.Endpoint
}
select {
case , := <-.dopts.scChan:
if {
.sc = &
}
case <-.Done():
return nil, .Err()
}
}
if .dopts.scChan != nil {
go .scWatcher()
}
var credentials.TransportCredentials
if := .dopts.copts.TransportCredentials; != nil {
= .Clone()
}
.balancerBuildOpts = balancer.BuildOptions{
DialCreds: ,
CredsBundle: .dopts.copts.CredsBundle,
Dialer: .dopts.copts.Dialer,
ChannelzParentID: .channelzID,
Target: .parsedTarget,
}
, := newCCResolverWrapper(, )
if != nil {
return nil, fmt.Errorf("failed to build resolver: %v", )
}
.mu.Lock()
.resolverWrapper =
.mu.Unlock()
if .dopts.block {
for {
:= .GetState()
if == connectivity.Ready {
break
} else if .dopts.copts.FailOnNonTempDialError && == connectivity.TransientFailure {
if = .connectionError(); != nil {
, := .(interface {
() bool
})
if && !.() {
return nil,
}
}
}
if = .connectionError(); != nil && .dopts.returnLastError {
return nil,
}
return nil, .Err()
}
}
}
return , nil
}
func ( *ClientConn) {
if .dopts.unaryInt != nil {
= append([]UnaryClientInterceptor{.dopts.unaryInt}, ...)
}
var UnaryClientInterceptor
if len() == 0 {
= nil
} else if len() == 1 {
= [0]
} else {
= func( context.Context, string, , interface{}, *ClientConn, UnaryInvoker, ...CallOption) error {
return [0](, , , , , getChainUnaryInvoker(, 0, ), ...)
}
}
.dopts.unaryInt =
}
func ( []UnaryClientInterceptor, int, UnaryInvoker) UnaryInvoker {
if == len()-1 {
return
}
return func( context.Context, string, , interface{}, *ClientConn, ...CallOption) error {
return [+1](, , , , , (, +1, ), ...)
}
}
func ( *ClientConn) {
if .dopts.streamInt != nil {
= append([]StreamClientInterceptor{.dopts.streamInt}, ...)
}
var StreamClientInterceptor
if len() == 0 {
= nil
} else if len() == 1 {
= [0]
} else {
= func( context.Context, *StreamDesc, *ClientConn, string, Streamer, ...CallOption) (ClientStream, error) {
return [0](, , , , getChainStreamer(, 0, ), ...)
}
}
.dopts.streamInt =
}
func ( []StreamClientInterceptor, int, Streamer) Streamer {
if == len()-1 {
return
}
return func( context.Context, *StreamDesc, *ClientConn, string, ...CallOption) (ClientStream, error) {
return [+1](, , , , (, +1, ), ...)
}
}
type connectivityStateManager struct {
mu sync.Mutex
state connectivity.State
notifyChan chan struct{}
channelzID int64
}
func ( *connectivityStateManager) ( connectivity.State) {
.mu.Lock()
defer .mu.Unlock()
if .state == connectivity.Shutdown {
return
}
if .state == {
return
}
.state =
channelz.Infof(logger, .channelzID, "Channel Connectivity change to %v", )
close(.notifyChan)
.notifyChan = nil
}
}
func ( *connectivityStateManager) () connectivity.State {
.mu.Lock()
defer .mu.Unlock()
return .state
}
func ( *connectivityStateManager) () <-chan struct{} {
.mu.Lock()
defer .mu.Unlock()
if .notifyChan == nil {
.notifyChan = make(chan struct{})
}
return .notifyChan
}
NewStream(ctx context.Context, desc *StreamDesc, method string, opts ...CallOption) (ClientStream, error)
}
var _ ClientConnInterface = (*ClientConn)(nil)
type ClientConn struct {
ctx context.Context
cancel context.CancelFunc
target string
parsedTarget resolver.Target
authority string
dopts dialOptions
csMgr *connectivityStateManager
balancerBuildOpts balancer.BuildOptions
blockingpicker *pickerWrapper
mu sync.RWMutex
resolverWrapper *ccResolverWrapper
sc *ServiceConfig
mkp keepalive.ClientParameters
curBalancerName string
balancerWrapper *ccBalancerWrapper
retryThrottler atomic.Value
firstResolveEvent *grpcsync.Event
channelzID int64 // channelz unique identification number
czData *channelzData
lceMu sync.Mutex // protects lastConnectionError
lastConnectionError error
}
func ( *ClientConn) () connectivity.State {
return .csMgr.getState()
}
func ( *ClientConn) () {
for {
select {
case , := <-.dopts.scChan:
if ! {
return
}
if .firstResolveEvent.HasFired() {
return nil
}
select {
case <-.firstResolveEvent.Done():
return nil
case <-.Done():
return status.FromContextError(.Err()).Err()
case <-.ctx.Done():
return ErrClientConnClosing
}
}
var emptyServiceConfig *ServiceConfig
func () {
:= parseServiceConfig("{}")
if .Err != nil {
panic(fmt.Sprintf("impossible error parsing empty service config: %v", .Err))
}
emptyServiceConfig = .Config.(*ServiceConfig)
}
func ( *ClientConn) ( []resolver.Address) {
if .sc != nil {
.applyServiceConfigAndBalancer(.sc, )
return
}
if .dopts.defaultServiceConfig != nil {
.applyServiceConfigAndBalancer(.dopts.defaultServiceConfig, )
} else {
.applyServiceConfigAndBalancer(emptyServiceConfig, )
}
}
func ( *ClientConn) ( resolver.State, error) error {
defer .firstResolveEvent.Fire()
.maybeApplyDefaultServiceConfig(nil)
if .balancerWrapper != nil {
.balancerWrapper.resolverError()
}
.mu.Unlock()
return balancer.ErrBadResolverState
}
var error
if .dopts.disableServiceConfig || .ServiceConfig == nil {
} else {
if , := .ServiceConfig.Config.(*ServiceConfig); .ServiceConfig.Err == nil && {
.applyServiceConfigAndBalancer(, .Addresses)
} else {
= balancer.ErrBadResolverState
if .balancerWrapper == nil {
var error
if .ServiceConfig.Err != nil {
= status.Errorf(codes.Unavailable, "error parsing service config: %v", .ServiceConfig.Err)
} else {
= status.Errorf(codes.Unavailable, "illegal service config type: %T", .ServiceConfig.Config)
}
.blockingpicker.updatePicker(base.NewErrPicker())
.csMgr.updateState(connectivity.TransientFailure)
.mu.Unlock()
return
}
}
}
var serviceconfig.LoadBalancingConfig
if .dopts.balancerBuilder == nil && .sc != nil && .sc.lbConfig != nil {
= .sc.lbConfig.cfg
}
:= .curBalancerName
:= .balancerWrapper
.mu.Unlock()
for := 0; < len(.Addresses); {
if .Addresses[].Type == resolver.GRPCLB {
copy(.Addresses[:], .Addresses[+1:])
.Addresses = .Addresses[:len(.Addresses)-1]
continue
}
++
}
}
:= .updateClientConnState(&balancer.ClientConnState{ResolverState: , BalancerConfig: })
if == nil {
}
return
}
func ( *ClientConn) ( string) {
if strings.EqualFold(.curBalancerName, ) {
return
}
channelz.Infof(logger, .channelzID, "ClientConn switching balancer to %q", )
if .dopts.balancerBuilder != nil {
channelz.Info(logger, .channelzID, "ignoring balancer switching: Balancer DialOption used instead")
return
}
if .balancerWrapper != nil {
.balancerWrapper.close()
}
:= balancer.Get()
if == nil {
channelz.Warningf(logger, .channelzID, "Channel switches to new LB policy %q due to fallback from invalid balancer name", PickFirstBalancerName)
channelz.Infof(logger, .channelzID, "failed to get balancer builder for: %v, using pick_first instead", )
= newPickfirstBuilder()
} else {
channelz.Infof(logger, .channelzID, "Channel switches to new LB policy %q", )
}
.curBalancerName = .Name()
.balancerWrapper = newCCBalancerWrapper(, , .balancerBuildOpts)
}
func ( *ClientConn) ( balancer.SubConn, connectivity.State, error) {
.mu.Lock()
if .conns == nil {
.mu.Unlock()
return
.balancerWrapper.handleSubConnStateChange(, , )
.mu.Unlock()
}
func ( *ClientConn) ( []resolver.Address, balancer.NewSubConnOptions) (*addrConn, error) {
:= &addrConn{
state: connectivity.Idle,
cc: ,
addrs: ,
scopts: ,
dopts: .dopts,
czData: new(channelzData),
resetBackoff: make(chan struct{}),
}
.mu.Lock()
if .conns == nil {
.mu.Unlock()
return nil, ErrClientConnClosing
}
if channelz.IsOn() {
.channelzID = channelz.RegisterSubChannel(, .channelzID, "")
channelz.AddTraceEvent(logger, .channelzID, 0, &channelz.TraceEventDesc{
Desc: "Subchannel Created",
Severity: channelz.CtINFO,
Parent: &channelz.TraceEventDesc{
Desc: fmt.Sprintf("Subchannel(id:%d) created", .channelzID),
Severity: channelz.CtINFO,
},
})
}
.conns[] = struct{}{}
.mu.Unlock()
return , nil
}
func ( *ClientConn) ( *addrConn, error) {
.mu.Lock()
if .conns == nil {
.mu.Unlock()
return
}
delete(.conns, )
.mu.Unlock()
.tearDown()
}
func ( *ClientConn) () *channelz.ChannelInternalMetric {
return &channelz.ChannelInternalMetric{
State: .GetState(),
Target: .target,
CallsStarted: atomic.LoadInt64(&.czData.callsStarted),
CallsSucceeded: atomic.LoadInt64(&.czData.callsSucceeded),
CallsFailed: atomic.LoadInt64(&.czData.callsFailed),
LastCallStartedTimestamp: time.Unix(0, atomic.LoadInt64(&.czData.lastCallStartedTime)),
}
}
func ( *ClientConn) () string {
return .target
}
func ( *ClientConn) () {
atomic.AddInt64(&.czData.callsStarted, 1)
atomic.StoreInt64(&.czData.lastCallStartedTime, time.Now().UnixNano())
}
func ( *ClientConn) () {
atomic.AddInt64(&.czData.callsSucceeded, 1)
}
func ( *ClientConn) () {
atomic.AddInt64(&.czData.callsFailed, 1)
}
go .resetTransport()
return nil
}
func ( *addrConn) ( []resolver.Address) bool {
.mu.Lock()
defer .mu.Unlock()
channelz.Infof(logger, .channelzID, "addrConn: tryUpdateAddrs curAddr: %v, addrs: %v", .curAddr, )
if .state == connectivity.Shutdown ||
.state == connectivity.TransientFailure ||
.state == connectivity.Idle {
.addrs =
return true
}
if .state == connectivity.Connecting {
return false
}
.mu.RLock()
defer .mu.RUnlock()
if .sc == nil {
return MethodConfig{}
}
if , := .sc.Methods[]; {
return
}
:= strings.LastIndex(, "/")
if , := .sc.Methods[[:+1]]; {
return
}
return .sc.Methods[""]
}
func ( *ClientConn) () *healthCheckConfig {
.mu.RLock()
defer .mu.RUnlock()
if .sc == nil {
return nil
}
return .sc.healthCheckConfig
}
func ( *ClientConn) ( context.Context, bool, string) (transport.ClientTransport, func(balancer.DoneInfo), error) {
, , := .blockingpicker.pick(, , balancer.PickInfo{
Ctx: ,
FullMethodName: ,
})
if != nil {
return nil, nil, toRPCErr()
}
return , , nil
}
func ( *ClientConn) ( *ServiceConfig, []resolver.Address) {
return
}
.sc =
if .sc.retryThrottling != nil {
:= &retryThrottler{
tokens: .sc.retryThrottling.MaxTokens,
max: .sc.retryThrottling.MaxTokens,
thresh: .sc.retryThrottling.MaxTokens / 2,
ratio: .sc.retryThrottling.TokenRatio,
}
.retryThrottler.Store()
} else {
.retryThrottler.Store((*retryThrottler)(nil))
}
.curBalancerName = .dopts.balancerBuilder.Name()
.balancerWrapper = newCCBalancerWrapper(, .dopts.balancerBuilder, .balancerBuildOpts)
}
}
func ( *ClientConn) ( resolver.ResolveNowOptions) {
.mu.RLock()
:= .resolverWrapper
.mu.RUnlock()
if == nil {
return
}
go .resolveNow()
}
func ( *ClientConn) () {
.mu.Lock()
:= .conns
.mu.Unlock()
for := range {
.resetConnectBackoff()
}
}
func ( *ClientConn) () error {
defer .cancel()
.mu.Lock()
if .conns == nil {
.mu.Unlock()
return ErrClientConnClosing
}
:= .conns
.conns = nil
.csMgr.updateState(connectivity.Shutdown)
:= .resolverWrapper
.resolverWrapper = nil
:= .balancerWrapper
.balancerWrapper = nil
.mu.Unlock()
.blockingpicker.close()
if != nil {
.close()
}
if != nil {
.close()
}
for := range {
.tearDown(ErrClientConnClosing)
}
if channelz.IsOn() {
:= &channelz.TraceEventDesc{
Desc: "Channel Deleted",
Severity: channelz.CtINFO,
}
if .dopts.channelzParentID != 0 {
.Parent = &channelz.TraceEventDesc{
Desc: fmt.Sprintf("Nested channel(id:%d) deleted", .channelzID),
Severity: channelz.CtINFO,
}
}
channelz.RemoveEntry(.channelzID)
}
return nil
}
state connectivity.State
backoffIdx int // Needs to be stateful for resetConnectBackoff.
resetBackoff chan struct{}
channelzID int64 // channelz unique identification number.
czData *channelzData
}
func ( *addrConn) ( connectivity.State, error) {
if .state == {
return
}
.state =
channelz.Infof(logger, .channelzID, "Subchannel Connectivity change to %v", )
.cc.handleSubConnStateChange(.acbw, , )
}
func ( *addrConn) ( transport.GoAwayReason) {
switch {
case transport.GoAwayTooManyPings:
:= 2 * .dopts.copts.KeepaliveParams.Time
.cc.mu.Lock()
if > .cc.mkp.Time {
.cc.mkp.Time =
}
.cc.mu.Unlock()
}
}
func ( *addrConn) () {
for := 0; ; ++ {
if > 0 {
.cc.resolveNow(resolver.ResolveNowOptions{})
}
.mu.Lock()
if .state == connectivity.Shutdown {
.mu.Unlock()
return
}
:= .addrs
:= minConnectTimeout
if .dopts.minConnectTimeout != nil {
= .dopts.minConnectTimeout()
}
=
:= time.Now().Add()
.updateConnectivityState(connectivity.Connecting, nil)
.transport = nil
.mu.Unlock()
, , , := .tryAllAddrs(, )
.mu.Lock()
if .state == connectivity.Shutdown {
.mu.Unlock()
return
}
.updateConnectivityState(connectivity.TransientFailure, )
:= .resetBackoff
.mu.Unlock()
:= time.NewTimer()
select {
case <-.C:
.mu.Lock()
.backoffIdx++
.mu.Unlock()
case <-:
.Stop()
case <-.ctx.Done():
.Stop()
return
}
continue
}
.mu.Lock()
if .state == connectivity.Shutdown {
.mu.Unlock()
.Close()
return
}
.curAddr =
.transport =
.backoffIdx = 0
, := context.WithCancel(.ctx)
.startHealthCheck()
.mu.Unlock()
<-.Done()
}
}
func ( *addrConn) ( []resolver.Address, time.Time) (transport.ClientTransport, resolver.Address, *grpcsync.Event, error) {
var error
for , := range {
.mu.Lock()
if .state == connectivity.Shutdown {
.mu.Unlock()
return nil, resolver.Address{}, nil, errConnClosing
}
.cc.mu.RLock()
.dopts.copts.KeepaliveParams = .cc.mkp
.cc.mu.RUnlock()
:= .dopts.copts
if .scopts.CredsBundle != nil {
.CredsBundle = .scopts.CredsBundle
}
.mu.Unlock()
channelz.Infof(logger, .channelzID, "Subchannel picks a new address %q to connect", .Addr)
, , := .createTransport(, , )
if == nil {
return , , , nil
}
if == nil {
=
}
.cc.updateConnectionError()
}
if .ServerName == "" {
.ServerName = .cc.authority
}
:= sync.Once{}
:= func( transport.GoAwayReason) {
.mu.Lock()
.adjustParams()
.Do(func() {
.updateConnectivityState(connectivity.Connecting, nil)
}
})
.mu.Unlock()
.Fire()
}
:= func() {
.mu.Lock()
.Do(func() {
.updateConnectivityState(connectivity.Connecting, nil)
}
})
.mu.Unlock()
close()
.Fire()
}
:= func() {
close()
}
, := context.WithDeadline(.ctx, )
defer ()
if channelz.IsOn() {
.ChannelzParentID = .channelzID
}
, := transport.NewClientTransport(, .cc.ctx, , , , , )
}
return , , nil
}
func ( *addrConn) ( context.Context) {
var bool
defer func() {
if ! {
.updateConnectivityState(connectivity.Ready, nil)
}
}()
if .cc.dopts.disableHealthCheck {
return
}
:= .cc.healthCheckConfig()
if == nil {
return
}
if !.scopts.HealthCheckEnabled {
return
}
:= .cc.dopts.healthCheckFunc
channelz.Error(logger, .channelzID, "Health check is requested but health check function is not set.")
return
}
= true
:= .transport
:= func( string) (interface{}, error) {
.mu.Lock()
if .transport != {
.mu.Unlock()
return nil, status.Error(codes.Canceled, "the provided transport is no longer valid to use")
}
.mu.Unlock()
return newNonRetryClientStream(, &StreamDesc{ServerStreams: true}, , , )
}
:= func( connectivity.State, error) {
.mu.Lock()
defer .mu.Unlock()
if .transport != {
return
}
.updateConnectivityState(, )
go func() {
:= .cc.dopts.healthCheckFunc(, , , .ServiceName)
if != nil {
if status.Code() == codes.Unimplemented {
channelz.Error(logger, .channelzID, "Subchannel health check is unimplemented at server side, thus health check is disabled")
} else {
channelz.Errorf(logger, .channelzID, "HealthCheckFunc exits with unexpected error %v", )
}
}
}()
}
func ( *addrConn) () {
.mu.Lock()
close(.resetBackoff)
.backoffIdx = 0
.resetBackoff = make(chan struct{})
.mu.Unlock()
}
.mu.Unlock()
.GracefulClose()
.mu.Lock()
}
if channelz.IsOn() {
channelz.AddTraceEvent(logger, .channelzID, 0, &channelz.TraceEventDesc{
Desc: "Subchannel Deleted",
Severity: channelz.CtINFO,
Parent: &channelz.TraceEventDesc{
Desc: fmt.Sprintf("Subchanel(id:%d) deleted", .channelzID),
Severity: channelz.CtINFO,
},
channelz.RemoveEntry(.channelzID)
}
.mu.Unlock()
}
func ( *addrConn) () connectivity.State {
.mu.Lock()
defer .mu.Unlock()
return .state
}
func ( *addrConn) () *channelz.ChannelInternalMetric {
.mu.Lock()
:= .curAddr.Addr
.mu.Unlock()
return &channelz.ChannelInternalMetric{
State: .getState(),
Target: ,
CallsStarted: atomic.LoadInt64(&.czData.callsStarted),
CallsSucceeded: atomic.LoadInt64(&.czData.callsSucceeded),
CallsFailed: atomic.LoadInt64(&.czData.callsFailed),
LastCallStartedTimestamp: time.Unix(0, atomic.LoadInt64(&.czData.lastCallStartedTime)),
}
}
func ( *addrConn) () {
atomic.AddInt64(&.czData.callsStarted, 1)
atomic.StoreInt64(&.czData.lastCallStartedTime, time.Now().UnixNano())
}
func ( *addrConn) () {
atomic.AddInt64(&.czData.callsSucceeded, 1)
}
func ( *addrConn) () {
atomic.AddInt64(&.czData.callsFailed, 1)
}
type retryThrottler struct {
max float64
thresh float64
ratio float64
mu sync.Mutex
tokens float64 // TODO(dfawley): replace with atomic and remove lock.
}
func ( *retryThrottler) () bool {
if == nil {
return false
}
.mu.Lock()
defer .mu.Unlock()
.tokens--
if .tokens < 0 {
.tokens = 0
}
return .tokens <= .thresh
}
func ( *retryThrottler) () {
if == nil {
return
}
.mu.Lock()
defer .mu.Unlock()
.tokens += .ratio
if .tokens > .max {
.tokens = .max
}
}
type channelzChannel struct {
cc *ClientConn
}
func ( *channelzChannel) () *channelz.ChannelInternalMetric {
return .cc.channelzMetric()
}
var ErrClientConnTimeout = errors.New("grpc: timed out when dialing")
func ( *ClientConn) ( string) resolver.Builder {
for , := range .dopts.resolvers {
if == .Scheme() {
return
}
}
return resolver.Get()
}
func ( *ClientConn) ( error) {
.lceMu.Lock()
.lastConnectionError =
.lceMu.Unlock()
}
func ( *ClientConn) () error {
.lceMu.Lock()
defer .lceMu.Unlock()
return .lastConnectionError
![]() |
The pages are generated with Golds v0.3.2-preview. (GOOS=darwin GOARCH=amd64) Golds is a Go 101 project developed by Tapir Liu. PR and bug reports are welcome and can be submitted to the issue list. Please follow @Go100and1 (reachable from the left QR code) to get the latest news of Golds. |