* * Copyright 2017 gRPC authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http:www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. *

package grpclb

import (
	
	
	
	
	
	

	
	timestamppb 
	
	
	
	lbpb 
	
	
	
	
	
	
)
processServerList updates balancer's internal state, create/remove SubConns and regenerates picker using the received serverList.
func ( *lbBalancer) ( *lbpb.ServerList) {
	if logger.V(2) {
		logger.Infof("lbBalancer: processing server list: %+v", )
	}
	.mu.Lock()
	defer .mu.Unlock()
Set serverListReceived to true so fallback will not take effect if it has not hit timeout.
If the new server list == old server list, do nothing.
	if cmp.Equal(.fullServerList, .Servers, cmp.Comparer(proto.Equal)) {
		if logger.V(2) {
			logger.Infof("lbBalancer: new serverlist same as the previous one, ignoring")
		}
		return
	}
	.fullServerList = .Servers

	var  []resolver.Address
	for ,  := range .Servers {
		if .Drop {
			continue
		}

		 := metadata.Pairs(lbTokenKey, .LoadBalanceToken)
		 := net.IP(.IpAddress)
		 := .String()
Add square brackets to ipv6 addresses, otherwise net.Dial() and net.SplitHostPort() will return too many colons error.
			 = fmt.Sprintf("[%s]", )
		}
		 := resolver.Address{
			Addr:     fmt.Sprintf("%s:%d", , .Port),
			Metadata: &,
		}
		if logger.V(2) {
			logger.Infof("lbBalancer: server list entry[%d]: ipStr:|%s|, port:|%d|, load balancer token:|%v|",
				, , .Port, .LoadBalanceToken)
		}
		 = append(, )
	}
Call refreshSubConns to create/remove SubConns. If we are in fallback, this is also exiting fallback.
	.refreshSubConns(, false, .usePickFirst)
}
refreshSubConns creates/removes SubConns with backendAddrs, and refreshes balancer state and picker. Caller must hold lb.mu.
func ( *lbBalancer) ( []resolver.Address,  bool,  bool) {
	 := balancer.NewSubConnOptions{}
	if ! {
		.CredsBundle = .grpclbBackendCreds
	}

	.backendAddrs = 
	.backendAddrsWithoutMetadata = nil

	 := .inFallback != 
	.inFallback = 
Clear previous received list when entering fallback, so if the server comes back and sends the same list again, the new addresses will be used.
		.fullServerList = nil
	}

	 := .usePickFirst != 
	 := .usePickFirst
	.usePickFirst = 

Remove all SubConns when switching balancing policy or switching fallback mode. For fallback mode switching with pickfirst, we want to recreate the SubConn because the creds could be different.
		for ,  := range .subConns {
If old SubConn were created for pickfirst, bypass cache and remove directly.
				.cc.cc.RemoveSubConn()
			} else {
				.cc.RemoveSubConn()
			}
			delete(.subConns, )
		}
	}

	if .usePickFirst {
		var  balancer.SubConn
		for _,  = range .subConns {
			break
		}
		if  != nil {
			.UpdateAddresses()
			.Connect()
			return
This bypasses the cc wrapper with SubConn cache.
		,  := .cc.cc.NewSubConn(, )
		if  != nil {
			logger.Warningf("grpclb: failed to create new SubConn: %v", )
			return
		}
		.Connect()
		.subConns[[0]] = 
		.scStates[] = connectivity.Idle
		return
	}
addrsSet is the set converted from backendAddrsWithoutMetadata, it's used to quick lookup for an address.
Create new SubConns.
	for ,  := range  {
		 := 
		.Metadata = nil
		[] = struct{}{}
		.backendAddrsWithoutMetadata = append(.backendAddrsWithoutMetadata, )

Use addrWithMD to create the SubConn.
			,  := .cc.NewSubConn([]resolver.Address{}, )
			if  != nil {
				logger.Warningf("grpclb: failed to create new SubConn: %v", )
				continue
			}
			.subConns[] =  // Use the addr without MD as key for the map.
Only set state of new sc to IDLE. The state could already be READY for cached SubConns.
				.scStates[] = connectivity.Idle
			}
			.Connect()
		}
	}

a was removed by resolver.
		if ,  := []; ! {
			.cc.RemoveSubConn()
Keep the state of this sc in b.scStates until sc's state becomes Shutdown. The entry will be deleted in UpdateSubConnState.
		}
	}
Regenerate and update picker after refreshing subconns because with cache, even if SubConn was newed/removed, there might be no state changes (the subconn will be kept in cache, not actually newed/removed).
waitgroup to wait for all goroutines to exit.
	wg sync.WaitGroup
}

func ( *lbBalancer) () {
	var  []grpc.DialOption
	if  := .opt.DialCreds;  != nil {
		 = append(, grpc.WithTransportCredentials())
	} else if  := .grpclbClientConnCreds;  != nil {
		 = append(, grpc.WithCredentialsBundle())
	} else {
		 = append(, grpc.WithInsecure())
	}
	if .opt.Dialer != nil {
		 = append(, grpc.WithContextDialer(.opt.Dialer))
Explicitly set pickfirst as the balancer.
	 = append(, grpc.WithDefaultServiceConfig(`{"loadBalancingPolicy":"pick_first"}`))
	 = append(, grpc.WithResolvers(.manualResolver))
	if channelz.IsOn() {
		 = append(, grpc.WithChannelzParentID(.opt.ChannelzParentID))
	}
Enable Keepalive for grpclb client.
The dial target is not important. The grpclb server addresses will set field ServerName, and creds will receive ServerName as authority.
	,  := grpc.DialContext(context.Background(), .manualResolver.Scheme()+":///grpclb.subClientConn", ...)
	if  != nil {
		logger.Fatalf("failed to dial: %v", )
	}
	 := &remoteBalancerCCWrapper{
		cc:      ,
		lb:      ,
		backoff: .backoff,
		done:    make(chan struct{}),
	}
	.ccRemoteLB = 
	.wg.Add(1)
	go .watchRemoteBalancer()
}
close closed the ClientConn to remote balancer, and waits until all goroutines to finish.
func ( *remoteBalancerCCWrapper) () {
	close(.done)
	.cc.Close()
	.wg.Wait()
}

func ( *remoteBalancerCCWrapper) ( *balanceLoadClientStream) error {
	for {
		,  := .Recv()
		if  != nil {
			if  == io.EOF {
				return errServerTerminatedConnection
			}
			return fmt.Errorf("grpclb: failed to recv server list: %v", )
		}
		if  := .GetServerList();  != nil {
			.lb.processServerList()
		}
Eagerly enter fallback
			.lb.mu.Lock()
			.lb.refreshSubConns(.lb.resolvedBackendAddrs, true, .lb.usePickFirst)
			.lb.mu.Unlock()
		}
	}
}

func ( *remoteBalancerCCWrapper) ( *balanceLoadClientStream,  time.Duration) {
	 := time.NewTicker()
	defer .Stop()
	 := false
	for {
		select {
		case <-.C:
		case <-.Context().Done():
			return
		}
		 := .lb.clientStats.toClientStats()
		 := isZeroStats()
Quash redundant empty load reports.
			continue
		}
		 = 
		 := time.Now()
		.Timestamp = &timestamppb.Timestamp{
			Seconds: .Unix(),
			Nanos:   int32(.Nanosecond()),
		}
		if  := .Send(&lbpb.LoadBalanceRequest{
			LoadBalanceRequestType: &lbpb.LoadBalanceRequest_ClientStats{
				ClientStats: ,
			},
		});  != nil {
			return
		}
	}
}

func ( *remoteBalancerCCWrapper) () ( bool,  error) {
	 := &loadBalancerClient{cc: .cc}
	,  := context.WithCancel(context.Background())
	defer ()
	,  := .BalanceLoad(, grpc.WaitForReady(true))
	if  != nil {
		return true, fmt.Errorf("grpclb: failed to perform RPC to the remote balancer %v", )
	}
	.lb.mu.Lock()
	.lb.remoteBalancerConnected = true
	.lb.mu.Unlock()
grpclb handshake on the stream.
	 := &lbpb.LoadBalanceRequest{
		LoadBalanceRequestType: &lbpb.LoadBalanceRequest_InitialRequest{
			InitialRequest: &lbpb.InitialLoadBalanceRequest{
				Name: .lb.target,
			},
		},
	}
	if  := .Send();  != nil {
		return true, fmt.Errorf("grpclb: failed to send init request: %v", )
	}
	,  := .Recv()
	if  != nil {
		return true, fmt.Errorf("grpclb: failed to recv init response: %v", )
	}
	 := .GetInitialResponse()
	if  == nil {
		return true, fmt.Errorf("grpclb: reply from remote balancer did not include initial response")
	}

	.wg.Add(1)
	go func() {
		defer .wg.Done()
		if  := convertDuration(.ClientStatsReportInterval);  > 0 {
			.sendLoadReport(, )
		}
No backoff if init req/resp handshake was successful.
	return false, .readServerList()
}

func ( *remoteBalancerCCWrapper) () {
	defer .wg.Done()
	var  int
	for {
		,  := .callRemoteBalancer()
		select {
		case <-.done:
			return
		default:
			if  != nil {
				if  == errServerTerminatedConnection {
					logger.Info()
				} else {
					logger.Warning()
				}
			}
Trigger a re-resolve when the stream errors.
Enter fallback when connection to remote balancer is lost, and the aggregated state is not Ready.
Entering fallback.
			.lb.refreshSubConns(.lb.resolvedBackendAddrs, true, .lb.usePickFirst)
		}
		.lb.mu.Unlock()

		if ! {
			 = 0
			continue
		}

		 := time.NewTimer(.backoff.Backoff()) // Copy backoff
		select {
		case <-.C:
		case <-.done:
			.Stop()
			return
		}
		++
	}