Source File
grpclb_remote_balancer.go
Belonging Package
google.golang.org/grpc/balancer/grpclb
package grpclb
import (
timestamppb
lbpb
)
if cmp.Equal(.fullServerList, .Servers, cmp.Comparer(proto.Equal)) {
if logger.V(2) {
logger.Infof("lbBalancer: new serverlist same as the previous one, ignoring")
}
return
}
.fullServerList = .Servers
var []resolver.Address
for , := range .Servers {
if .Drop {
continue
}
:= metadata.Pairs(lbTokenKey, .LoadBalanceToken)
:= net.IP(.IpAddress)
:= .String()
.refreshSubConns(, false, .usePickFirst)
}
func ( *lbBalancer) ( []resolver.Address, bool, bool) {
:= balancer.NewSubConnOptions{}
if ! {
.CredsBundle = .grpclbBackendCreds
}
.backendAddrs =
.backendAddrsWithoutMetadata = nil
:= .inFallback !=
.inFallback =
.fullServerList = nil
}
:= .usePickFirst !=
:= .usePickFirst
.usePickFirst =
for , := range .subConns {
.cc.cc.RemoveSubConn()
} else {
.cc.RemoveSubConn()
}
delete(.subConns, )
}
}
if .usePickFirst {
var balancer.SubConn
for _, = range .subConns {
break
}
if != nil {
.UpdateAddresses()
.Connect()
return
for , := range {
:=
.Metadata = nil
[] = struct{}{}
.backendAddrsWithoutMetadata = append(.backendAddrsWithoutMetadata, )
.scStates[] = connectivity.Idle
}
.Connect()
}
}
if , := []; ! {
.cc.RemoveSubConn()
}
}
.updateStateAndPicker(true, true)
}
type remoteBalancerCCWrapper struct {
cc *grpc.ClientConn
lb *lbBalancer
backoff backoff.Strategy
done chan struct{}
wg sync.WaitGroup
}
func ( *lbBalancer) () {
var []grpc.DialOption
if := .opt.DialCreds; != nil {
= append(, grpc.WithTransportCredentials())
} else if := .grpclbClientConnCreds; != nil {
= append(, grpc.WithCredentialsBundle())
} else {
= append(, grpc.WithInsecure())
}
if .opt.Dialer != nil {
= append(, grpc.WithContextDialer(.opt.Dialer))
= append(, grpc.WithDefaultServiceConfig(`{"loadBalancingPolicy":"pick_first"}`))
= append(, grpc.WithResolvers(.manualResolver))
if channelz.IsOn() {
= append(, grpc.WithChannelzParentID(.opt.ChannelzParentID))
}
= append(, grpc.WithKeepaliveParams(keepalive.ClientParameters{
Time: 20 * time.Second,
Timeout: 10 * time.Second,
PermitWithoutStream: true,
}))
, := grpc.DialContext(context.Background(), .manualResolver.Scheme()+":///grpclb.subClientConn", ...)
if != nil {
logger.Fatalf("failed to dial: %v", )
}
:= &remoteBalancerCCWrapper{
cc: ,
lb: ,
backoff: .backoff,
done: make(chan struct{}),
}
.ccRemoteLB =
.wg.Add(1)
go .watchRemoteBalancer()
}
func ( *remoteBalancerCCWrapper) () {
close(.done)
.cc.Close()
.wg.Wait()
}
func ( *remoteBalancerCCWrapper) ( *balanceLoadClientStream) error {
for {
, := .Recv()
if != nil {
if == io.EOF {
return errServerTerminatedConnection
}
return fmt.Errorf("grpclb: failed to recv server list: %v", )
}
if := .GetServerList(); != nil {
.lb.processServerList()
}
.lb.mu.Lock()
.lb.refreshSubConns(.lb.resolvedBackendAddrs, true, .lb.usePickFirst)
.lb.mu.Unlock()
}
}
}
func ( *remoteBalancerCCWrapper) ( *balanceLoadClientStream, time.Duration) {
:= time.NewTicker()
defer .Stop()
:= false
for {
select {
case <-.C:
case <-.Context().Done():
return
}
:= .lb.clientStats.toClientStats()
:= isZeroStats()
continue
}
=
:= time.Now()
.Timestamp = ×tamppb.Timestamp{
Seconds: .Unix(),
Nanos: int32(.Nanosecond()),
}
if := .Send(&lbpb.LoadBalanceRequest{
LoadBalanceRequestType: &lbpb.LoadBalanceRequest_ClientStats{
ClientStats: ,
},
}); != nil {
return
}
}
}
func ( *remoteBalancerCCWrapper) () ( bool, error) {
:= &loadBalancerClient{cc: .cc}
, := context.WithCancel(context.Background())
defer ()
, := .BalanceLoad(, grpc.WaitForReady(true))
if != nil {
return true, fmt.Errorf("grpclb: failed to perform RPC to the remote balancer %v", )
}
.lb.mu.Lock()
.lb.remoteBalancerConnected = true
.lb.mu.Unlock()
:= &lbpb.LoadBalanceRequest{
LoadBalanceRequestType: &lbpb.LoadBalanceRequest_InitialRequest{
InitialRequest: &lbpb.InitialLoadBalanceRequest{
Name: .lb.target,
},
},
}
if := .Send(); != nil {
return true, fmt.Errorf("grpclb: failed to send init request: %v", )
}
, := .Recv()
if != nil {
return true, fmt.Errorf("grpclb: failed to recv init response: %v", )
}
:= .GetInitialResponse()
if == nil {
return true, fmt.Errorf("grpclb: reply from remote balancer did not include initial response")
}
.wg.Add(1)
go func() {
defer .wg.Done()
if := convertDuration(.ClientStatsReportInterval); > 0 {
.sendLoadReport(, )
}
return false, .readServerList()
}
func ( *remoteBalancerCCWrapper) () {
defer .wg.Done()
var int
for {
, := .callRemoteBalancer()
select {
case <-.done:
return
default:
if != nil {
if == errServerTerminatedConnection {
logger.Info()
} else {
logger.Warning()
}
}
.lb.cc.cc.ResolveNow(resolver.ResolveNowOptions{})
.lb.mu.Lock()
.lb.remoteBalancerConnected = false
![]() |
The pages are generated with Golds v0.3.2-preview. (GOOS=darwin GOARCH=amd64) Golds is a Go 101 project developed by Tapir Liu. PR and bug reports are welcome and can be submitted to the issue list. Please follow @Go100and1 (reachable from the left QR code) to get the latest news of Golds. |