* * Copyright 2014 gRPC authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http:www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. *

package transport

import (
	
	
	
	
	
	
	
	
	
	

	
	
	

	
	
	
	
	
	
	
	
	
	
	
)
clientConnectionCounter counts the number of connections a client has initiated (equal to the number of http2Clients created). Must be accessed atomically.
http2Client implements the ClientTransport interface with HTTP2.
type http2Client struct {
	lastRead   int64 // Keep this field 64-bit aligned. Accessed atomically.
	ctx        context.Context
	cancel     context.CancelFunc
	ctxDone    <-chan struct{} // Cache the ctx.Done() chan.
	userAgent  string
	md         interface{}
	conn       net.Conn // underlying communication channel
	loopy      *loopyWriter
	remoteAddr net.Addr
	localAddr  net.Addr
	authInfo   credentials.AuthInfo // auth info about the connection

	readerDone chan struct{} // sync point to enable testing.
goAway is closed to notify the upper layer (i.e., addrConn.transportMonitor) that the server sent GoAway on this transport.
	goAway chan struct{}

controlBuf delivers all the control related tasks (e.g., window updates, reset streams, and various settings) to the controller.
configured by peer through SETTINGS_MAX_HEADER_LIST_SIZE
onPrefaceReceipt is a callback that client transport calls upon receiving server preface to signal that a succefull HTTP2 connection was established.
prevGoAway ID records the Last-Stream-ID in the previous GOAway frame.
goAwayReason records the http2.ErrCode and debug data received with the GoAway frame.
A condition variable used to signal when the keepalive goroutine should go dormant. The condition for dormancy is based on the number of active streams and the `PermitWithoutStream` keepalive client parameter. And since the number of active streams is guarded by the above mutex, we use the same for this condition variable as well.
A boolean to track whether the keepalive goroutine is dormant or not. This is checked before attempting to signal the above condition variable.
Fields below are for channelz metric collection.
	channelzID int64 // channelz unique identification number
	czData     *channelzData

	onGoAway func(GoAwayReason)
	onClose  func()

	bufferPool *bufferPool

	connectionID uint64
}

func ( context.Context,  func(context.Context, string) (net.Conn, error),  string) (net.Conn, error) {
	if  != nil {
		return (, )
	}
	return (&net.Dialer{}).DialContext(, "tcp", )
}

func ( error) bool {
	switch err := .(type) {
	case interface {
		() bool
	}:
		return .()
	case interface {
		() bool
Timeouts may be resolved upon retry, and are thus treated as temporary.
		return .()
	}
	return true
}
newHTTP2Client constructs a connected ClientTransport to addr based on HTTP2 and starts to receive messages on it. Non-nil error returns if construction fails.
func (,  context.Context,  resolver.Address,  ConnectOptions,  func(),  func(GoAwayReason),  func()) ( *http2Client,  error) {
	 := "http"
	,  := context.WithCancel()
	defer func() {
		if  != nil {
			()
		}
	}()

	,  := dial(, .Dialer, .Addr)
	if  != nil {
		if .FailOnNonTempDialError {
			return nil, connectionErrorf(isTemporary(), , "transport: error while dialing: %v", )
		}
		return nil, connectionErrorf(true, , "transport: Error while dialing %v", )
Any further errors will close the underlying connection
	defer func( net.Conn) {
		if  != nil {
			.Close()
		}
	}()
Validate keepalive parameters.
	if .Time == 0 {
		.Time = defaultClientKeepaliveTime
	}
	if .Timeout == 0 {
		.Timeout = defaultClientKeepaliveTimeout
	}
	 := false
	if .Time != infinity {
		if  = syscall.SetTCPUserTimeout(, .Timeout);  != nil {
			return nil, connectionErrorf(false, , "transport: failed to set TCP_USER_TIMEOUT: %v", )
		}
		 = true
	}
	var (
		 bool
		 credentials.AuthInfo
	)
	 := .TransportCredentials
	 := .PerRPCCredentials

	if  := .CredsBundle;  != nil {
		if  := .TransportCredentials();  != nil {
			 = 
		}
		if  := .PerRPCCredentials();  != nil {
			 = append(, )
		}
	}
gRPC, resolver, balancer etc. can specify arbitrary data in the Attributes field of resolver.Address, which is shoved into connectCtx and passed to the credential handshaker. This makes it possible for address specific arbitrary data to reach the credential handshaker.
		 := internal.NewClientHandshakeInfoContext.(func(context.Context, credentials.ClientHandshakeInfo) context.Context)
		 = (, credentials.ClientHandshakeInfo{Attributes: .Attributes})
		, ,  = .ClientHandshake(, .ServerName, )
		if  != nil {
			return nil, connectionErrorf(isTemporary(), , "transport: authentication handshake failed: %v", )
		}
		 = true
		if .Info().SecurityProtocol == "tls" {
			 = "https"
		}
	}
	 := true
	 := int32(initialWindowSize)
	if .InitialConnWindowSize >= defaultWindowSize {
		 = .InitialConnWindowSize
		 = false
	}
	 := .WriteBufferSize
	 := .ReadBufferSize
	 := defaultClientMaxHeaderListSize
	if .MaxHeaderListSize != nil {
		 = *.MaxHeaderListSize
	}
	 := &http2Client{
		ctx:                   ,
		ctxDone:               .Done(), // Cache Done chan.
		cancel:                ,
		userAgent:             .UserAgent,
		md:                    .Metadata,
		conn:                  ,
		remoteAddr:            .RemoteAddr(),
		localAddr:             .LocalAddr(),
		authInfo:              ,
		readerDone:            make(chan struct{}),
		writerDone:            make(chan struct{}),
		goAway:                make(chan struct{}),
		framer:                newFramer(, , , ),
		fc:                    &trInFlow{limit: uint32()},
		scheme:                ,
		activeStreams:         make(map[uint32]*Stream),
		isSecure:              ,
		perRPCCreds:           ,
		kp:                    ,
		statsHandler:          .StatsHandler,
		initialWindowSize:     initialWindowSize,
		onPrefaceReceipt:      ,
		nextID:                1,
		maxConcurrentStreams:  defaultMaxStreamsClient,
		streamQuota:           defaultMaxStreamsClient,
		streamsQuotaAvailable: make(chan struct{}, 1),
		czData:                new(channelzData),
		onGoAway:              ,
		onClose:               ,
		keepaliveEnabled:      ,
		bufferPool:            newBufferPool(),
	}
	.controlBuf = newControlBuffer(.ctxDone)
	if .InitialWindowSize >= defaultWindowSize {
		.initialWindowSize = .InitialWindowSize
		 = false
	}
	if  {
		.bdpEst = &bdpEstimator{
			bdp:               initialWindowSize,
			updateFlowControl: .updateFlowControl,
		}
	}
	if .statsHandler != nil {
		.ctx = .statsHandler.TagConn(.ctx, &stats.ConnTagInfo{
			RemoteAddr: .remoteAddr,
			LocalAddr:  .localAddr,
		})
		 := &stats.ConnBegin{
			Client: true,
		}
		.statsHandler.HandleConn(.ctx, )
	}
	if channelz.IsOn() {
		.channelzID = channelz.RegisterNormalSocket(, .ChannelzParentID, fmt.Sprintf("%s -> %s", .localAddr, .remoteAddr))
	}
	if .keepaliveEnabled {
		.kpDormancyCond = sync.NewCond(&.mu)
		go .keepalive()
Start the reader goroutine for incoming message. Each transport has a dedicated goroutine which reads HTTP2 frame from network. Then it dispatches the frame to the corresponding stream entity.
	go .reader()
Send connection preface to server.
	,  := .conn.Write(clientPreface)
	if  != nil {
		.Close()
		return nil, connectionErrorf(true, , "transport: failed to write client preface: %v", )
	}
	if  != len(clientPreface) {
		.Close()
		return nil, connectionErrorf(true, , "transport: preface mismatch, wrote %d bytes; want %d", , len(clientPreface))
	}
	var  []http2.Setting

	if .initialWindowSize != defaultWindowSize {
		 = append(, http2.Setting{
			ID:  http2.SettingInitialWindowSize,
			Val: uint32(.initialWindowSize),
		})
	}
	if .MaxHeaderListSize != nil {
		 = append(, http2.Setting{
			ID:  http2.SettingMaxHeaderListSize,
			Val: *.MaxHeaderListSize,
		})
	}
	 = .framer.fr.WriteSettings(...)
	if  != nil {
		.Close()
		return nil, connectionErrorf(true, , "transport: failed to write initial settings frame: %v", )
Adjust the connection flow control window if needed.
	if  := uint32( - defaultWindowSize);  > 0 {
		if  := .framer.fr.WriteWindowUpdate(0, );  != nil {
			.Close()
			return nil, connectionErrorf(true, , "transport: failed to write window update: %v", )
		}
	}

	.connectionID = atomic.AddUint64(&clientConnectionCounter, 1)

	if  := .framer.writer.Flush();  != nil {
		return nil, 
	}
	go func() {
		.loopy = newLoopyWriter(clientSide, .framer, .controlBuf, .bdpEst)
		 := .loopy.run()
		if  != nil {
			if logger.V(logLevel) {
				logger.Errorf("transport: loopyWriter.run returning. Err: %v", )
			}
If it's a connection error, let reader goroutine handle it since there might be data in the buffers.
		if ,  := .(net.Error); ! {
			.conn.Close()
		}
		close(.writerDone)
	}()
	return , nil
}

TODO(zhaoq): Handle uint32 overflow of Stream.id.
	 := &Stream{
		ct:             ,
		done:           make(chan struct{}),
		method:         .Method,
		sendCompress:   .SendCompress,
		buf:            newRecvBuffer(),
		headerChan:     make(chan struct{}),
		contentSubtype: .ContentSubtype,
	}
	.wq = newWriteQuota(defaultWriteQuota, .done)
	.requestRead = func( int) {
		.adjustWindow(, uint32())
The client side stream context should have exactly the same life cycle with the user provided context. That means, s.ctx should be read-only. And s.ctx is done iff ctx is done. So we use the original context here instead of creating a copy.
	.ctx = 
	.trReader = &transportReader{
		reader: &recvBufferReader{
			ctx:     .ctx,
			ctxDone: .ctx.Done(),
			recv:    .buf,
			closeStream: func( error) {
				.CloseStream(, )
			},
			freeBuffer: .bufferPool.put,
		},
		windowHandler: func( int) {
			.updateWindow(, uint32())
		},
	}
	return 
}

func ( *http2Client) () *peer.Peer {
	return &peer.Peer{
		Addr:     .remoteAddr,
		AuthInfo: .authInfo,
	}
}

func ( *http2Client) ( context.Context,  *CallHdr) ([]hpack.HeaderField, error) {
	 := .createAudience()
	 := credentials.RequestInfo{
		Method:   .Method,
		AuthInfo: .authInfo,
	}
	 := internal.NewRequestInfoContext.(func(context.Context, credentials.RequestInfo) context.Context)(, )
	,  := .getTrAuthData(, )
	if  != nil {
		return nil, 
	}
	,  := .getCallAuthData(, , )
	if  != nil {
		return nil, 
TODO(mmukhi): Benchmark if the performance gets better if count the metadata and other header fields first and create a slice of that exact size. Make the slice of certain predictable size to reduce allocations made by append.
	 := 7 // :method, :scheme, :path, :authority, content-type, user-agent, te
	 += len() + len()
	 := make([]hpack.HeaderField, 0, )
	 = append(, hpack.HeaderField{Name: ":method", Value: "POST"})
	 = append(, hpack.HeaderField{Name: ":scheme", Value: .scheme})
	 = append(, hpack.HeaderField{Name: ":path", Value: .Method})
	 = append(, hpack.HeaderField{Name: ":authority", Value: .Host})
	 = append(, hpack.HeaderField{Name: "content-type", Value: grpcutil.ContentType(.ContentSubtype)})
	 = append(, hpack.HeaderField{Name: "user-agent", Value: .userAgent})
	 = append(, hpack.HeaderField{Name: "te", Value: "trailers"})
	if .PreviousAttempts > 0 {
		 = append(, hpack.HeaderField{Name: "grpc-previous-rpc-attempts", Value: strconv.Itoa(.PreviousAttempts)})
	}

	if .SendCompress != "" {
		 = append(, hpack.HeaderField{Name: "grpc-encoding", Value: .SendCompress})
		 = append(, hpack.HeaderField{Name: "grpc-accept-encoding", Value: .SendCompress})
	}
Send out timeout regardless its value. The server can detect timeout context by itself. TODO(mmukhi): Perhaps this field should be updated when actually writing out to the wire.
		 := time.Until()
		 = append(, hpack.HeaderField{Name: "grpc-timeout", Value: grpcutil.EncodeDuration()})
	}
	for ,  := range  {
		 = append(, hpack.HeaderField{Name: , Value: encodeMetadataHeader(, )})
	}
	for ,  := range  {
		 = append(, hpack.HeaderField{Name: , Value: encodeMetadataHeader(, )})
	}
	if  := stats.OutgoingTags();  != nil {
		 = append(, hpack.HeaderField{Name: "grpc-tags-bin", Value: encodeBinHeader()})
	}
	if  := stats.OutgoingTrace();  != nil {
		 = append(, hpack.HeaderField{Name: "grpc-trace-bin", Value: encodeBinHeader()})
	}

	if , ,  := metadata.FromOutgoingContextRaw();  {
		var  string
HTTP doesn't allow you to set pseudoheaders after non pseudoheaders were set.
			if isReservedHeader() {
				continue
			}
			for ,  := range  {
				 = append(, hpack.HeaderField{Name: , Value: encodeMetadataHeader(, )})
			}
		}
		for ,  := range  {
			for ,  := range  {
				if %2 == 0 {
					 = 
					continue
HTTP doesn't allow you to set pseudoheaders after non pseudoheaders were set.
				if isReservedHeader() {
					continue
				}
				 = append(, hpack.HeaderField{Name: strings.ToLower(), Value: encodeMetadataHeader(, )})
			}
		}
	}
	if ,  := .md.(*metadata.MD);  {
		for ,  := range * {
			if isReservedHeader() {
				continue
			}
			for ,  := range  {
				 = append(, hpack.HeaderField{Name: , Value: encodeMetadataHeader(, )})
			}
		}
	}
	return , nil
}

Create an audience string only if needed.
	if len(.perRPCCreds) == 0 && .Creds == nil {
		return ""
Construct URI required to get auth request metadata. Omit port if it is the default one.
	 := strings.TrimSuffix(.Host, ":443")
	 := strings.LastIndex(.Method, "/")
	if  == -1 {
		 = len(.Method)
	}
	return "https://" +  + .Method[:]
}

func ( *http2Client) ( context.Context,  string) (map[string]string, error) {
	if len(.perRPCCreds) == 0 {
		return nil, nil
	}
	 := map[string]string{}
	for ,  := range .perRPCCreds {
		,  := .GetRequestMetadata(, )
		if  != nil {
			if ,  := status.FromError();  {
				return nil, 
			}

			return nil, status.Errorf(codes.Unauthenticated, "transport: %v", )
		}
Capital header names are illegal in HTTP/2.
			 = strings.ToLower()
			[] = 
		}
	}
	return , nil
}

func ( *http2Client) ( context.Context,  string,  *CallHdr) (map[string]string, error) {
Check if credentials.PerRPCCredentials were provided via call options. Note: if these credentials are provided both via dial options and call options, then both sets of credentials will be applied.
	if  := .Creds;  != nil {
		if !.isSecure && .RequireTransportSecurity() {
			return nil, status.Error(codes.Unauthenticated, "transport: cannot send secure credentials on an insecure connection")
		}
		,  := .GetRequestMetadata(, )
		if  != nil {
			return nil, status.Errorf(codes.Internal, "transport: %v", )
		}
		 = make(map[string]string, len())
Capital header names are illegal in HTTP/2
			 = strings.ToLower()
			[] = 
		}
	}
	return , nil
}
PerformedIOError wraps an error to indicate IO may have been performed before the error occurred.
type PerformedIOError struct {
	Err error
}
Error implements error.
func ( PerformedIOError) () string {
	return .Err.Error()
}
NewStream creates a stream and registers it into the transport as "active" streams.
func ( *http2Client) ( context.Context,  *CallHdr) ( *Stream,  error) {
	 = peer.NewContext(, .getPeer())
	,  := .createHeaderFields(, )
We may have performed I/O in the per-RPC creds callback, so do not allow transparent retry.
		return nil, PerformedIOError{}
	}
	 := .newStream(, )
	 := func( error) {
If it was already done, return.
			return
The stream was unprocessed by the server.
If headerChan isn't closed, then close it.
		if atomic.CompareAndSwapUint32(&.headerChanClosed, 0, 1) {
			close(.headerChan)
		}
	}
	 := &headerFrame{
		hf:        ,
		endStream: false,
		initStream: func( uint32) error {
			.mu.Lock()
			if  := .state;  != reachable {
Do a quick cleanup.
				 := error(errStreamDrain)
				if  == closing {
					 = ErrConnClosing
				}
				()
				return 
			}
			.activeStreams[] = 
			if channelz.IsOn() {
				atomic.AddInt64(&.czData.streamsStarted, 1)
				atomic.StoreInt64(&.czData.lastStreamCreatedTime, time.Now().UnixNano())
If the keepalive goroutine has gone dormant, wake it up.
			if .kpDormant {
				.kpDormancyCond.Signal()
			}
			.mu.Unlock()
			return nil
		},
		onOrphaned: ,
		wq:         .wq,
	}
	 := true
	var  chan struct{}
	 := func( interface{}) bool {
		if .streamQuota <= 0 { // Can go negative if server decreases it.
			if  {
				.waitingStreams++
			}
			 = .streamsQuotaAvailable
			return false
		}
		if ! {
			.waitingStreams--
		}
		.streamQuota--
		 := .(*headerFrame)
		.streamID = .nextID
		.nextID += 2
		.id = .streamID
		.fc = &inFlow{limit: uint32(.initialWindowSize)}
		if .streamQuota > 0 && .waitingStreams > 0 {
			select {
			case .streamsQuotaAvailable <- struct{}{}:
			default:
			}
		}
		return true
	}
	var  error
	 := func( interface{}) bool {
		if .maxSendHeaderListSize == nil {
			return true
		}
		 := .(*headerFrame)
		var  int64
		for ,  := range .hf {
			if  += int64(.Size());  > int64(*.maxSendHeaderListSize) {
				 = status.Errorf(codes.Internal, "header list size to send violates the maximum size (%d bytes) set by server", *.maxSendHeaderListSize)
				return false
			}
		}
		return true
	}
	for {
		,  := .controlBuf.executeAndPut(func( interface{}) bool {
			if !() {
				return false
			}
			if !() {
				return false
			}
			return true
		}, )
		if  != nil {
			return nil, 
		}
		if  {
			break
		}
		if  != nil {
			return nil, 
		}
		 = false
		select {
		case <-:
		case <-.ctx.Done():
			return nil, ContextErr(.ctx.Err())
		case <-.goAway:
			return nil, errStreamDrain
		case <-.ctx.Done():
			return nil, ErrConnClosing
		}
	}
	if .statsHandler != nil {
		,  := metadata.FromOutgoingContext()
		if  {
			.Set("user-agent", .userAgent)
		} else {
			 = metadata.Pairs("user-agent", .userAgent)
Note: The header fields are compressed with hpack after this call returns. No WireLength field is set here.
		 := &stats.OutHeader{
			Client:      true,
			FullMethod:  .Method,
			RemoteAddr:  .remoteAddr,
			LocalAddr:   .localAddr,
			Compression: .SendCompress,
			Header:      ,
		}
		.statsHandler.HandleRPC(.ctx, )
	}
	return , nil
}
CloseStream clears the footprint of a stream when the stream is not needed any more. This must not be executed in reader's goroutine.
func ( *http2Client) ( *Stream,  error) {
	var (
		     bool
		 http2.ErrCode
	)
	if  != nil {
		 = true
		 = http2.ErrCodeCancel
	}
	.closeStream(, , , , status.Convert(), nil, false)
}

Set stream status to done.
If it was already done, return. If multiple closeStream calls happen simultaneously, wait for the first to finish.
		<-.done
		return
status and trailers can be updated here without any synchronization because the stream goroutine will only read it after it sees an io.EOF error from read or write and we'll write those errors only after updating this.
	.status = 
	if len() > 0 {
		.trailer = 
	}
This will unblock reads eventually.
		.write(recvMsg{err: })
If headerChan isn't closed, then close it.
	if atomic.CompareAndSwapUint32(&.headerChanClosed, 0, 1) {
		.noHeaders = true
		close(.headerChan)
	}
	 := &cleanupStream{
		streamID: .id,
		onWrite: func() {
			.mu.Lock()
			if .activeStreams != nil {
				delete(.activeStreams, .id)
			}
			.mu.Unlock()
			if channelz.IsOn() {
				if  {
					atomic.AddInt64(&.czData.streamsSucceeded, 1)
				} else {
					atomic.AddInt64(&.czData.streamsFailed, 1)
				}
			}
		},
		rst:     ,
		rstCode: ,
	}
	 := func(interface{}) bool {
		.streamQuota++
		if .streamQuota > 0 && .waitingStreams > 0 {
			select {
			case .streamsQuotaAvailable <- struct{}{}:
			default:
			}
		}
		return true
	}
This will unblock write.
	close(.done)
}
Close kicks off the shutdown process of the transport. This should be called only once on a transport. Once it is called, the transport should not be accessed any more. This method blocks until the addrConn that initiated this transport is re-connected. This happens because t.onClose() begins reconnect logic at the addrConn level and blocks until the addrConn is successfully connected.
func ( *http2Client) () error {
Make sure we only Close once.
	if .state == closing {
		.mu.Unlock()
		return nil
Call t.onClose before setting the state to closing to prevent the client from attempting to create new streams ASAP.
	.onClose()
	.state = closing
	 := .activeStreams
	.activeStreams = nil
If the keepalive goroutine is blocked on this condition variable, we should unblock it so that the goroutine eventually exits.
Notify all active streams.
	for ,  := range  {
		.closeStream(, ErrConnClosing, false, http2.ErrCodeNo, status.New(codes.Unavailable, ErrConnClosing.Desc), nil, false)
	}
	if .statsHandler != nil {
		 := &stats.ConnEnd{
			Client: true,
		}
		.statsHandler.HandleConn(.ctx, )
	}
	return 
}
GracefulClose sets the state to draining, which prevents new streams from being created and causes the transport to be closed when the last active stream is closed. If there are no active streams, the transport is closed immediately. This does nothing if the transport is already draining or closing.
func ( *http2Client) () {
Make sure we move to draining only from active.
	if .state == draining || .state == closing {
		.mu.Unlock()
		return
	}
	.state = draining
	 := len(.activeStreams)
	.mu.Unlock()
	if  == 0 {
		.Close()
		return
	}
	.controlBuf.put(&incomingGoAway{})
}
Write formats the data into HTTP2 data frame(s) and sends it out. The caller should proceed only if Write returns nil.
func ( *http2Client) ( *Stream,  []byte,  []byte,  *Options) error {
If it's the last message, update stream state.
		if !.compareAndSwapState(streamActive, streamWriteDone) {
			return errStreamDone
		}
	} else if .getState() != streamActive {
		return errStreamDone
	}
	 := &dataFrame{
		streamID:  .id,
		endStream: .Last,
		h:         ,
		d:         ,
	}
	if  != nil ||  != nil { // If it's not an empty data frame, check quota.
		if  := .wq.get(int32(len() + len()));  != nil {
			return 
		}
	}
	return .controlBuf.put()
}

func ( *http2Client) ( http2.Frame) *Stream {
	.mu.Lock()
	 := .activeStreams[.Header().StreamID]
	.mu.Unlock()
	return 
}
adjustWindow sends out extra window update over the initial window size of stream if the application is requesting data larger in size than the window.
func ( *http2Client) ( *Stream,  uint32) {
	if  := .fc.maybeAdjust();  > 0 {
		.controlBuf.put(&outgoingWindowUpdate{streamID: .id, increment: })
	}
}
updateWindow adjusts the inbound quota for the stream. Window updates will be sent out when the cumulative quota exceeds the corresponding threshold.
func ( *http2Client) ( *Stream,  uint32) {
	if  := .fc.onRead();  > 0 {
		.controlBuf.put(&outgoingWindowUpdate{streamID: .id, increment: })
	}
}
updateFlowControl updates the incoming flow control windows for the transport and the stream based on the current bdp estimation.
func ( *http2Client) ( uint32) {
	.mu.Lock()
	for ,  := range .activeStreams {
		.fc.newLimit()
	}
	.mu.Unlock()
	 := func(interface{}) bool {
		.initialWindowSize = int32()
		return true
	}
	.controlBuf.executeAndPut(, &outgoingWindowUpdate{streamID: 0, increment: .fc.newLimit()})
	.controlBuf.put(&outgoingSettings{
		ss: []http2.Setting{
			{
				ID:  http2.SettingInitialWindowSize,
				Val: ,
			},
		},
	})
}

func ( *http2Client) ( *http2.DataFrame) {
	 := .Header().Length
	var  bool
	if .bdpEst != nil {
		 = .bdpEst.add()
Decouple connection's flow control from application's read. An update on connection's flow control should not depend on whether user application has read the data or not. Such a restriction is already imposed on the stream's flow control, and therefore the sender will be blocked anyways. Decoupling the connection flow control will prevent other active(fast) streams from starving in presence of slow or inactive streams.
	if  := .fc.onData();  > 0 {
		.controlBuf.put(&outgoingWindowUpdate{
			streamID:  0,
			increment: ,
		})
	}
Avoid excessive ping detection (e.g. in an L7 proxy) by sending a window update prior to the BDP ping.

		if  := .fc.reset();  > 0 {
			.controlBuf.put(&outgoingWindowUpdate{
				streamID:  0,
				increment: ,
			})
		}

		.controlBuf.put(bdpPing)
Select the right stream to dispatch.
	 := .getStream()
	if  == nil {
		return
	}
	if  > 0 {
		if  := .fc.onData();  != nil {
			.closeStream(, io.EOF, true, http2.ErrCodeFlowControl, status.New(codes.Internal, .Error()), nil, false)
			return
		}
		if .Header().Flags.Has(http2.FlagDataPadded) {
			if  := .fc.onRead( - uint32(len(.Data())));  > 0 {
				.controlBuf.put(&outgoingWindowUpdate{.id, })
			}
TODO(bradfitz, zhaoq): A copy is required here because there is no guarantee f.Data() is consumed before the arrival of next frame. Can this copy be eliminated?
		if len(.Data()) > 0 {
			 := .bufferPool.get()
			.Reset()
			.Write(.Data())
			.write(recvMsg{buffer: })
		}
The server has closed the stream without sending trailers. Record that the read direction is closed, and set the status appropriately.
	if .FrameHeader.Flags.Has(http2.FlagDataEndStream) {
		.closeStream(, io.EOF, false, http2.ErrCodeNo, status.New(codes.Internal, "server closed the stream without sending trailers"), nil, true)
	}
}

func ( *http2Client) ( *http2.RSTStreamFrame) {
	 := .getStream()
	if  == nil {
		return
	}
The stream was unprocessed by the server.
		atomic.StoreUint32(&.unprocessed, 1)
	}
	,  := http2ErrConvTab[.ErrCode]
	if ! {
		if logger.V(logLevel) {
			logger.Warningf("transport: http2Client.handleRSTStream found no mapped gRPC status for the received http2 error %v", .ErrCode)
		}
		 = codes.Unknown
	}
	if  == codes.Canceled {
Our deadline was already exceeded, and that was likely the cause of this cancelation. Alter the status code accordingly.
			 = codes.DeadlineExceeded
		}
	}
	.closeStream(, io.EOF, false, http2.ErrCodeNo, status.Newf(, "stream terminated by RST_STREAM with error code: %v", .ErrCode), nil, false)
}

func ( *http2Client) ( *http2.SettingsFrame,  bool) {
	if .IsAck() {
		return
	}
	var  *uint32
	var  []http2.Setting
	var  []func()
	.ForeachSetting(func( http2.Setting) error {
		switch .ID {
		case http2.SettingMaxConcurrentStreams:
			 = new(uint32)
			* = .Val
		case http2.SettingMaxHeaderListSize:
			 = append(, func() {
				.maxSendHeaderListSize = new(uint32)
				*.maxSendHeaderListSize = .Val
			})
		default:
			 = append(, )
		}
		return nil
	})
	if  &&  == nil {
		 = new(uint32)
		* = math.MaxUint32
	}
	 := &incomingSettings{
		ss: ,
	}
	if  != nil {
		 := func() {
			 := int64(*) - int64(.maxConcurrentStreams)
			.maxConcurrentStreams = *
			.streamQuota += 
			if  > 0 && .waitingStreams > 0 {
				close(.streamsQuotaAvailable) // wake all of them up.
				.streamsQuotaAvailable = make(chan struct{}, 1)
			}
		}
		 = append(, )
	}
	.controlBuf.executeAndPut(func(interface{}) bool {
		for ,  := range  {
			()
		}
		return true
	}, )
}

func ( *http2Client) ( *http2.PingFrame) {
Maybe it's a BDP ping.
		if .bdpEst != nil {
			.bdpEst.calculate(.Data)
		}
		return
	}
	 := &ping{ack: true}
	copy(.data[:], .Data[:])
	.controlBuf.put()
}

func ( *http2Client) ( *http2.GoAwayFrame) {
	.mu.Lock()
	if .state == closing {
		.mu.Unlock()
		return
	}
	if .ErrCode == http2.ErrCodeEnhanceYourCalm {
		if logger.V(logLevel) {
			logger.Infof("Client received GoAway with http2.ErrCodeEnhanceYourCalm.")
		}
	}
	 := .LastStreamID
	if  > 0 && %2 != 1 {
		.mu.Unlock()
		.Close()
		return
A client can receive multiple GoAways from the server (see https://github.com/grpc/grpc-go/issues/1387). The idea is that the first GoAway will be sent with an ID of MaxInt32 and the second GoAway will be sent after an RTT delay with the ID of the last stream the server will process. Therefore, when we get the first GoAway we don't necessarily close any streams. While in case of second GoAway we close all streams created after the GoAwayId. This way streams that were in-flight while the GoAway from server was being sent don't get killed.
	select {
If there are multiple GoAways the first one should always have an ID greater than the following ones.
		if  > .prevGoAwayID {
			.mu.Unlock()
			.Close()
			return
		}
	default:
		.setGoAwayReason()
		close(.goAway)
Notify the clientconn about the GOAWAY before we set the state to draining, to allow the client to stop attempting to create streams before disallowing new streams on this connection.
All streams with IDs greater than the GoAwayId and smaller than the previous GoAway ID should be killed.
	 := .prevGoAwayID
	if  == 0 { // This is the first GoAway Frame.
		 = math.MaxUint32 // Kill all streams after the GoAway ID.
	}
	for ,  := range .activeStreams {
The stream was unprocessed by the server.
			atomic.StoreUint32(&.unprocessed, 1)
			.closeStream(, errStreamDrain, false, http2.ErrCodeNo, statusGoAway, nil, false)
		}
	}
	.prevGoAwayID = 
	 := len(.activeStreams)
	.mu.Unlock()
	if  == 0 {
		.Close()
	}
}
setGoAwayReason sets the value of t.goAwayReason based on the GoAway frame received. It expects a lock on transport's mutext to be held by the caller.
operateHeaders takes action on the decoded headers.
func ( *http2Client) ( *http2.MetaHeadersFrame) {
	 := .getStream()
	if  == nil {
		return
	}
	 := .StreamEnded()
	atomic.StoreUint32(&.bytesReceived, 1)
	 := atomic.LoadUint32(&.headerChanClosed) == 0

As specified by gRPC over HTTP2, a HEADERS frame (and associated CONTINUATION frames) can only appear at the start or end of a stream. Therefore, second HEADERS frame must have EOS bit set.
		 := status.New(codes.Internal, "a HEADERS frame cannot appear in the middle of a stream")
		.closeStream(, .Err(), true, http2.ErrCodeProtocol, , nil, false)
		return
	}

Initialize isGRPC value to be !initialHeader, since if a gRPC Response-Headers has already been received, then it means that the peer is speaking gRPC and we are in gRPC mode.
	.data.isGRPC = !
	if  := .decodeHeader();  != nil {
		.closeStream(, , true, http2.ErrCodeProtocol, status.Convert(), nil, )
		return
	}

	 := false
	defer func() {
		if .statsHandler != nil {
			if  {
				 := &stats.InHeader{
					Client:      true,
					WireLength:  int(.Header().Length),
					Header:      .header.Copy(),
					Compression: .recvCompress,
				}
				.statsHandler.HandleRPC(.ctx, )
			} else {
				 := &stats.InTrailer{
					Client:     true,
					WireLength: int(.Header().Length),
					Trailer:    .trailer.Copy(),
				}
				.statsHandler.HandleRPC(.ctx, )
			}
		}
	}()
If headerChan hasn't been closed yet
HEADERS frame block carries a Response-Headers.
These values can be set without any synchronization because stream goroutine will read it only after seeing a closed headerChan which we'll close after setting this.
			.recvCompress = .data.encoding
			if len(.data.mdata) > 0 {
				.header = .data.mdata
			}
HEADERS frame block carries a Trailers-Only.
			.noHeaders = true
		}
		close(.headerChan)
	}

	if ! {
		return
	}
if client received END_STREAM from server while stream was still active, send RST_STREAM
	 := .getState() == streamActive
	.closeStream(, io.EOF, , http2.ErrCodeNo, .status(), .data.mdata, true)
}
reader runs as a separate goroutine in charge of reading data from network connection. TODO(zhaoq): currently one reader per transport. Investigate whether this is optimal. TODO(zhaoq): Check the validity of the incoming frame sequence.
func ( *http2Client) () {
Check the validity of server preface.
	,  := .framer.fr.ReadFrame()
	if  != nil {
		.Close() // this kicks off resetTransport, so must be last before return
		return
	}
	.conn.SetReadDeadline(time.Time{}) // reset deadline once we get the settings frame (we didn't time out, yay!)
	if .keepaliveEnabled {
		atomic.StoreInt64(&.lastRead, time.Now().UnixNano())
	}
	,  := .(*http2.SettingsFrame)
	if ! {
		.Close() // this kicks off resetTransport, so must be last before return
		return
	}
	.onPrefaceReceipt()
	.handleSettings(, true)
loop to keep reading incoming messages on this transport.
Abort an active stream if the http2.Framer returns a http2.StreamError. This can happen only if the server's response is malformed http2.
			if ,  := .(http2.StreamError);  {
				.mu.Lock()
				 := .activeStreams[.StreamID]
				.mu.Unlock()
use error detail to provide better err message
					 := http2ErrConvTab[.Code]
					 := .framer.fr.ErrorDetail().Error()
					.closeStream(, status.Error(, ), true, http2.ErrCodeProtocol, status.New(, ), nil, false)
				}
				continue
Transport error.
				.Close()
				return
			}
		}
		switch frame := .(type) {
		case *http2.MetaHeadersFrame:
			.operateHeaders()
		case *http2.DataFrame:
			.handleData()
		case *http2.RSTStreamFrame:
			.handleRSTStream()
		case *http2.SettingsFrame:
			.handleSettings(, false)
		case *http2.PingFrame:
			.handlePing()
		case *http2.GoAwayFrame:
			.handleGoAway()
		case *http2.WindowUpdateFrame:
			.handleWindowUpdate()
		default:
			if logger.V(logLevel) {
				logger.Errorf("transport: http2Client.reader got unhandled frame type %v.", )
			}
		}
	}
}

func (,  time.Duration) time.Duration {
	if  <  {
		return 
	}
	return 
}
keepalive running in a separate goroutune makes sure the connection is alive by sending pings.
func ( *http2Client) () {
True iff a ping has been sent, and no data has been received since then.
Amount of time remaining before which we should receive an ACK for the last sent ping.
Records the last value of t.lastRead before we go block on the timer. This is required to check for read activity since then.
	 := time.Now().UnixNano()
	 := time.NewTimer(.kp.Time)
	for {
		select {
		case <-.C:
			 := atomic.LoadInt64(&.lastRead)
There has been read activity since the last time we were here.
Next timer should fire at kp.Time seconds from lastRead time.
				.Reset(time.Duration() + .kp.Time - time.Duration(time.Now().UnixNano()))
				 = 
				continue
			}
			if  &&  <= 0 {
				.Close()
				return
			}
			.mu.Lock()
If the transport is closing, we should exit from the keepalive goroutine here. If not, we could have a race between the call to Signal() from Close() and the call to Wait() here, whereby the keepalive goroutine ends up blocking on the condition variable which will never be signalled again.
				.mu.Unlock()
				return
			}
If a ping was sent out previously (because there were active streams at that point) which wasn't acked and its timeout hadn't fired, but we got here and are about to go dormant, we should make sure that we unconditionally send a ping once we awaken.
				 = false
				.kpDormant = true
				.kpDormancyCond.Wait()
			}
			.kpDormant = false
			.mu.Unlock()
We get here either because we were dormant and a new stream was created which unblocked the Wait() call, or because the keepalive timer expired. In both cases, we need to send a ping.
			if ! {
				if channelz.IsOn() {
					atomic.AddInt64(&.czData.kpCount, 1)
				}
				.controlBuf.put()
				 = .kp.Timeout
				 = true
The amount of time to sleep here is the minimum of kp.Time and timeoutLeft. This will ensure that we wait only for kp.Time before sending out the next ping (for cases where the ping is acked).
RemoteName :
	}
	if ,  := .authInfo.(credentials.ChannelzSecurityInfo);  {
		.Security = .GetSecurityValue()
	}
	.RemoteFlowControlWindow = .getOutFlowWindow()
	return &
}

func ( *http2Client) () net.Addr { return .remoteAddr }

func ( *http2Client) () {
	atomic.AddInt64(&.czData.msgSent, 1)
	atomic.StoreInt64(&.czData.lastMsgSentTime, time.Now().UnixNano())
}

func ( *http2Client) () {
	atomic.AddInt64(&.czData.msgRecv, 1)
	atomic.StoreInt64(&.czData.lastMsgRecvTime, time.Now().UnixNano())
}

func ( *http2Client) () int64 {
	 := make(chan uint32, 1)
	 := time.NewTimer(time.Second)
	defer .Stop()
	.controlBuf.put(&outFlowControlSizeRequest{})
	select {
	case  := <-:
		return int64()
	case <-.ctxDone:
		return -1
	case <-.C:
		return -2
	}