* * Copyright 2016 gRPC authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http:www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. *
Package grpclb defines a grpclb balancer. To install grpclb balancer, import this package as: import _ "google.golang.org/grpc/balancer/grpclb"
package grpclb

import (
	
	
	
	

	
	
	grpclbstate 
	
	
	
	
	
	
	

	durationpb 
	lbpb 
)

const (
	lbTokenKey             = "lb-token"
	defaultFallbackTimeout = 10 * time.Second
	grpclbName             = "grpclb"
)

var errServerTerminatedConnection = errors.New("grpclb: failed to recv server list: server terminated connection")
var logger = grpclog.Component("grpclb")

func ( *durationpb.Duration) time.Duration {
	if  == nil {
		return 0
	}
	return time.Duration(.Seconds)*time.Second + time.Duration(.Nanos)*time.Nanosecond
}
Client API for LoadBalancer service. Mostly copied from generated pb.go file. To avoid circular dependency.
type loadBalancerClient struct {
	cc *grpc.ClientConn
}

func ( *loadBalancerClient) ( context.Context,  ...grpc.CallOption) (*balanceLoadClientStream, error) {
	 := &grpc.StreamDesc{
		StreamName:    "BalanceLoad",
		ServerStreams: true,
		ClientStreams: true,
	}
	,  := .cc.NewStream(, , "/grpc.lb.v1.LoadBalancer/BalanceLoad", ...)
	if  != nil {
		return nil, 
	}
	 := &balanceLoadClientStream{}
	return , nil
}

type balanceLoadClientStream struct {
	grpc.ClientStream
}

func ( *balanceLoadClientStream) ( *lbpb.LoadBalanceRequest) error {
	return .ClientStream.SendMsg()
}

func ( *balanceLoadClientStream) () (*lbpb.LoadBalanceResponse, error) {
	 := new(lbpb.LoadBalanceResponse)
	if  := .ClientStream.RecvMsg();  != nil {
		return nil, 
	}
	return , nil
}

func () {
	balancer.Register(newLBBuilder())
	dns.EnableSRVLookups = true
}
newLBBuilder creates a builder for grpclb.
newLBBuilderWithFallbackTimeout creates a grpclb builder with the given fallbackTimeout. If no response is received from the remote balancer within fallbackTimeout, the backend addresses from the resolved address list will be used. Only call this function when a non-default fallback timeout is needed.
func ( time.Duration) balancer.Builder {
	return &lbBuilder{
		fallbackTimeout: ,
	}
}

type lbBuilder struct {
	fallbackTimeout time.Duration
}

func ( *lbBuilder) () string {
	return grpclbName
}

This generates a manual resolver builder with a fixed scheme. This scheme will be used to dial to remote LB, so we can send filtered address updates to remote LB ClientConn using this manual resolver.
	 := &lbManualResolver{scheme: "grpclb-internal", ccb: }

	 := &lbBalancer{
		cc:              newLBCacheClientConn(),
		target:          .Target.Endpoint,
		opt:             ,
		fallbackTimeout: .fallbackTimeout,
		doneCh:          make(chan struct{}),

		manualResolver: ,
		subConns:       make(map[resolver.Address]balancer.SubConn),
		scStates:       make(map[balancer.SubConn]connectivity.State),
		picker:         &errPicker{err: balancer.ErrNoSubConnAvailable},
		clientStats:    newRPCStats(),
		backoff:        backoff.DefaultExponential, // TODO: make backoff configurable.
	}

	var  error
	if .CredsBundle != nil {
		.grpclbClientConnCreds,  = .CredsBundle.NewWithMode(internal.CredsBundleModeBalancer)
		if  != nil {
			logger.Warningf("lbBalancer: client connection creds NewWithMode failed: %v", )
		}
		.grpclbBackendCreds,  = .CredsBundle.NewWithMode(internal.CredsBundleModeBackendFromBalancer)
		if  != nil {
			logger.Warningf("lbBalancer: backend creds NewWithMode failed: %v", )
		}
	}

	return 
}

type lbBalancer struct {
	cc     *lbCacheClientConn
	target string
	opt    balancer.BuildOptions

	usePickFirst bool
grpclbClientConnCreds is the creds bundle to be used to connect to grpclb servers. If it's nil, use the TransportCredentials from BuildOptions instead.
grpclbBackendCreds is the creds bundle to be used for addresses that are returned by grpclb server. If it's nil, don't set anything when creating SubConns.
manualResolver is used in the remote LB ClientConn inside grpclb. When resolved address updates are received by grpclb, filtered updates will be send to remote LB ClientConn through this resolver.
The ClientConn to talk to the remote balancer.
backoff for calling remote balancer.
Support client side load reporting. Each picker gets a reference to this, and will update its content.
The full server list including drops, used to check if the newly received serverList contains anything new. Each generate picker will also have reference to this list to do the first layer pick.
Backend addresses. It's kept so the addresses are available when switching between round_robin and pickfirst.
All backends addresses, with metadata set to nil. This list contains all backend addresses in the same order and with the same duplicates as in serverlist. When generating picker, a SubConn slice with the same order but with only READY SCs will be gerenated.
Roundrobin functionalities.
	state    connectivity.State
	subConns map[resolver.Address]balancer.SubConn   // Used to new/remove SubConn.
	scStates map[balancer.SubConn]connectivity.State // Used to filter READY SubConns.
Support fallback to resolved backend addresses if there's no response from remote balancer within fallbackTimeout.
resolvedBackendAddrs is resolvedAddrs minus remote balancers. It's set when resolved address updates are received, and read in the goroutine handling fallback.
regeneratePicker takes a snapshot of the balancer, and generates a picker from it. The picker - always returns ErrTransientFailure if the balancer is in TransientFailure, - does two layer roundrobin pick otherwise. Caller must hold lb.mu.
func ( *lbBalancer) ( bool) {
	if .state == connectivity.TransientFailure {
		.picker = &errPicker{err: balancer.ErrTransientFailure}
		return
	}

	if .state == connectivity.Connecting {
		.picker = &errPicker{err: balancer.ErrNoSubConnAvailable}
		return
	}

	var  []balancer.SubConn
	if .usePickFirst {
		for ,  := range .subConns {
			 = append(, )
			break
		}
	} else {
		for ,  := range .backendAddrsWithoutMetadata {
			if ,  := .subConns[];  {
				if ,  := .scStates[];  &&  == connectivity.Ready {
					 = append(, )
				}
			}
		}
	}

If there's no ready SubConns, always re-pick. This is to avoid drops unless at least one SubConn is ready. Otherwise we may drop more often than want because of drops + re-picks(which become re-drops). This doesn't seem to be necessary after the connecting check above. Kept for safety.
		.picker = &errPicker{err: balancer.ErrNoSubConnAvailable}
		return
	}
	if .inFallback {
		.picker = newRRPicker()
		return
	}
	if  {
		.picker = newLBPicker(.fullServerList, , .clientStats)
		return
	}
	,  := .picker.(*lbPicker)
	if ! {
		.picker = newLBPicker(.fullServerList, , .clientStats)
		return
	}
	.updateReadySCs()
}
aggregateSubConnStats calculate the aggregated state of SubConns in lb.SubConns. These SubConns are subconns in use (when switching between fallback and grpclb). lb.scState contains states for all SubConns, including those in cache (SubConns are cached for 10 seconds after remove). The aggregated state is: - If at least one SubConn in Ready, the aggregated state is Ready; - Else if at least one SubConn in Connecting, the aggregated state is Connecting; - Else the aggregated state is TransientFailure.
func ( *lbBalancer) () connectivity.State {
	var  uint64

	for ,  := range .subConns {
		if ,  := .scStates[];  {
			switch  {
			case connectivity.Ready:
				return connectivity.Ready
			case connectivity.Connecting:
				++
			}
		}
	}
	if  > 0 {
		return connectivity.Connecting
	}
	return connectivity.TransientFailure
}

func ( *lbBalancer) ( balancer.SubConn,  balancer.SubConnState) {
	 := .ConnectivityState
	if logger.V(2) {
		logger.Infof("lbBalancer: handle SubConn state change: %p, %v", , )
	}
	.mu.Lock()
	defer .mu.Unlock()

	,  := .scStates[]
	if ! {
		if logger.V(2) {
			logger.Infof("lbBalancer: got state changes for an unknown SubConn: %p, %v", , )
		}
		return
	}
	.scStates[] = 
	switch  {
	case connectivity.Idle:
		.Connect()
When an address was removed by resolver, b called RemoveSubConn but kept the sc's state in scStates. Remove state for this sc here.
		delete(.scStates, )
Force regenerate picker if - this sc became ready from not-ready - this sc became not-ready from ready
Enter fallback when the aggregated state is not Ready and the connection to remote balancer is lost.
	if .state != connectivity.Ready {
Enter fallback.
updateStateAndPicker re-calculate the aggregated state, and regenerate picker if overall state is changed. If forceRegeneratePicker is true, picker will be regenerated.
func ( *lbBalancer) ( bool,  bool) {
	 := .state
Regenerate picker when one of the following happens: - caller wants to regenerate - the aggregated state changed
	if  || (.state != ) {
		.regeneratePicker()
	}

	.cc.UpdateState(balancer.State{ConnectivityState: .state, Picker: .picker})
}
fallbackToBackendsAfter blocks for fallbackTimeout and falls back to use resolved backends (backends received from resolver, not from remote balancer) if no connection to remote balancers was successful.
func ( *lbBalancer) ( time.Duration) {
	 := time.NewTimer()
	defer .Stop()
	select {
	case <-.C:
	case <-.doneCh:
		return
	}
	.mu.Lock()
	if .inFallback || .serverListReceived {
		.mu.Unlock()
		return
Enter fallback.
	.refreshSubConns(.resolvedBackendAddrs, true, .usePickFirst)
	.mu.Unlock()
}

func ( *lbBalancer) ( *grpclbServiceConfig) {
	.mu.Lock()
	defer .mu.Unlock()

	 := childIsPickFirst()
	if .usePickFirst ==  {
		return
	}
	if logger.V(2) {
		logger.Infof("lbBalancer: switching mode, new usePickFirst: %+v", )
	}
	.refreshSubConns(.backendAddrs, .inFallback, )
}

Ignore resolver errors. GRPCLB is not selected unless the resolver works at least once.
}

func ( *lbBalancer) ( balancer.ClientConnState) error {
	if logger.V(2) {
		logger.Infof("lbBalancer: UpdateClientConnState: %+v", )
	}
	,  := .BalancerConfig.(*grpclbServiceConfig)
	.handleServiceConfig()

	 := .ResolverState.Addresses

	var ,  []resolver.Address
	for ,  := range  {
		if .Type == resolver.GRPCLB {
			.Type = resolver.Backend
			 = append(, )
		} else {
			 = append(, )
		}
	}
Override any balancer addresses provided via ccs.ResolverState.Addresses.
		 = .BalancerAddresses
	}

There should be at least one address, either grpclb server or fallback. Empty address is not valid.
		return balancer.ErrBadResolverState
	}

	if len() == 0 {
		if .ccRemoteLB != nil {
			.ccRemoteLB.close()
			.ccRemoteLB = nil
		}
First time receiving resolved addresses, create a cc to remote balancers.
Start the fallback goroutine.
cc to remote balancers uses lb.manualResolver. Send the updated remote balancer addresses to it through manualResolver.
		.manualResolver.UpdateState(resolver.State{Addresses: })
	}

	.mu.Lock()
	.resolvedBackendAddrs = 
If there's no remote balancer address in ClientConn update, grpclb enters fallback mode immediately. If a new update is received while grpclb is in fallback, update the list of backends being used to the new fallback backends.
		.refreshSubConns(.resolvedBackendAddrs, true, .usePickFirst)
	}
	.mu.Unlock()
	return nil
}

func ( *lbBalancer) () {
	select {
	case <-.doneCh:
		return
	default:
	}
	close(.doneCh)
	if .ccRemoteLB != nil {
		.ccRemoteLB.close()
	}
	.cc.close()