Source File
grpclb.go
Belonging Package
google.golang.org/grpc/balancer/grpclb
package grpclb
import (
grpclbstate
durationpb
lbpb
)
const (
lbTokenKey = "lb-token"
defaultFallbackTimeout = 10 * time.Second
grpclbName = "grpclb"
)
var errServerTerminatedConnection = errors.New("grpclb: failed to recv server list: server terminated connection")
var logger = grpclog.Component("grpclb")
func ( *durationpb.Duration) time.Duration {
if == nil {
return 0
}
return time.Duration(.Seconds)*time.Second + time.Duration(.Nanos)*time.Nanosecond
}
type loadBalancerClient struct {
cc *grpc.ClientConn
}
func ( *loadBalancerClient) ( context.Context, ...grpc.CallOption) (*balanceLoadClientStream, error) {
:= &grpc.StreamDesc{
StreamName: "BalanceLoad",
ServerStreams: true,
ClientStreams: true,
}
, := .cc.NewStream(, , "/grpc.lb.v1.LoadBalancer/BalanceLoad", ...)
if != nil {
return nil,
}
:= &balanceLoadClientStream{}
return , nil
}
type balanceLoadClientStream struct {
grpc.ClientStream
}
func ( *balanceLoadClientStream) ( *lbpb.LoadBalanceRequest) error {
return .ClientStream.SendMsg()
}
func ( *balanceLoadClientStream) () (*lbpb.LoadBalanceResponse, error) {
:= new(lbpb.LoadBalanceResponse)
if := .ClientStream.RecvMsg(); != nil {
return nil,
}
return , nil
}
func () {
balancer.Register(newLBBuilder())
dns.EnableSRVLookups = true
}
func () balancer.Builder {
return newLBBuilderWithFallbackTimeout(defaultFallbackTimeout)
}
func ( time.Duration) balancer.Builder {
return &lbBuilder{
fallbackTimeout: ,
}
}
type lbBuilder struct {
fallbackTimeout time.Duration
}
func ( *lbBuilder) () string {
return grpclbName
}
:= &lbManualResolver{scheme: "grpclb-internal", ccb: }
:= &lbBalancer{
cc: newLBCacheClientConn(),
target: .Target.Endpoint,
opt: ,
fallbackTimeout: .fallbackTimeout,
doneCh: make(chan struct{}),
manualResolver: ,
subConns: make(map[resolver.Address]balancer.SubConn),
scStates: make(map[balancer.SubConn]connectivity.State),
picker: &errPicker{err: balancer.ErrNoSubConnAvailable},
clientStats: newRPCStats(),
backoff: backoff.DefaultExponential, // TODO: make backoff configurable.
}
var error
if .CredsBundle != nil {
.grpclbClientConnCreds, = .CredsBundle.NewWithMode(internal.CredsBundleModeBalancer)
if != nil {
logger.Warningf("lbBalancer: client connection creds NewWithMode failed: %v", )
}
.grpclbBackendCreds, = .CredsBundle.NewWithMode(internal.CredsBundleModeBackendFromBalancer)
if != nil {
logger.Warningf("lbBalancer: backend creds NewWithMode failed: %v", )
}
}
return
}
type lbBalancer struct {
cc *lbCacheClientConn
target string
opt balancer.BuildOptions
usePickFirst bool
grpclbBackendCreds credentials.Bundle
fallbackTimeout time.Duration
doneCh chan struct{}
func ( *lbBalancer) ( bool) {
if .state == connectivity.TransientFailure {
.picker = &errPicker{err: balancer.ErrTransientFailure}
return
}
if .state == connectivity.Connecting {
.picker = &errPicker{err: balancer.ErrNoSubConnAvailable}
return
}
var []balancer.SubConn
if .usePickFirst {
for , := range .subConns {
= append(, )
break
}
} else {
for , := range .backendAddrsWithoutMetadata {
if , := .subConns[]; {
if , := .scStates[]; && == connectivity.Ready {
= append(, )
}
}
}
}
.picker = &errPicker{err: balancer.ErrNoSubConnAvailable}
return
}
if .inFallback {
.picker = newRRPicker()
return
}
if {
.picker = newLBPicker(.fullServerList, , .clientStats)
return
}
, := .picker.(*lbPicker)
if ! {
.picker = newLBPicker(.fullServerList, , .clientStats)
return
}
.updateReadySCs()
}
func ( *lbBalancer) () connectivity.State {
var uint64
for , := range .subConns {
if , := .scStates[]; {
switch {
case connectivity.Ready:
return connectivity.Ready
case connectivity.Connecting:
++
}
}
}
if > 0 {
return connectivity.Connecting
}
return connectivity.TransientFailure
}
func ( *lbBalancer) ( balancer.SubConn, balancer.SubConnState) {
:= .ConnectivityState
if logger.V(2) {
logger.Infof("lbBalancer: handle SubConn state change: %p, %v", , )
}
.mu.Lock()
defer .mu.Unlock()
, := .scStates[]
if ! {
if logger.V(2) {
logger.Infof("lbBalancer: got state changes for an unknown SubConn: %p, %v", , )
}
return
}
.scStates[] =
switch {
case connectivity.Idle:
.Connect()
.updateStateAndPicker(( == connectivity.Ready) != ( == connectivity.Ready), false)
if .state != connectivity.Ready {
.refreshSubConns(.resolvedBackendAddrs, true, .usePickFirst)
}
}
}
func ( *lbBalancer) ( bool, bool) {
:= .state
if || (.state != ) {
.regeneratePicker()
}
.cc.UpdateState(balancer.State{ConnectivityState: .state, Picker: .picker})
}
.refreshSubConns(.resolvedBackendAddrs, true, .usePickFirst)
.mu.Unlock()
}
func ( *lbBalancer) ( *grpclbServiceConfig) {
.mu.Lock()
defer .mu.Unlock()
:= childIsPickFirst()
if .usePickFirst == {
return
}
if logger.V(2) {
logger.Infof("lbBalancer: switching mode, new usePickFirst: %+v", )
}
.refreshSubConns(.backendAddrs, .inFallback, )
}
}
func ( *lbBalancer) ( balancer.ClientConnState) error {
if logger.V(2) {
logger.Infof("lbBalancer: UpdateClientConnState: %+v", )
}
, := .BalancerConfig.(*grpclbServiceConfig)
.handleServiceConfig()
:= .ResolverState.Addresses
var , []resolver.Address
for , := range {
if .Type == resolver.GRPCLB {
.Type = resolver.Backend
= append(, )
} else {
= append(, )
}
}
= .BalancerAddresses
}
return balancer.ErrBadResolverState
}
if len() == 0 {
if .ccRemoteLB != nil {
.ccRemoteLB.close()
.ccRemoteLB = nil
}
.manualResolver.UpdateState(resolver.State{Addresses: })
}
.mu.Lock()
.resolvedBackendAddrs =
.refreshSubConns(.resolvedBackendAddrs, true, .usePickFirst)
}
.mu.Unlock()
return nil
}
func ( *lbBalancer) () {
select {
case <-.doneCh:
return
default:
}
close(.doneCh)
if .ccRemoteLB != nil {
.ccRemoteLB.close()
}
.cc.close()
![]() |
The pages are generated with Golds v0.3.2-preview. (GOOS=darwin GOARCH=amd64) Golds is a Go 101 project developed by Tapir Liu. PR and bug reports are welcome and can be submitted to the issue list. Please follow @Go100and1 (reachable from the left QR code) to get the latest news of Golds. |