* * Copyright 2014 gRPC authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http:www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. *

package transport

import (
	
	
	
	
	

	
	
)

var updateHeaderTblSize = func( *hpack.Encoder,  uint32) {
	.SetMaxDynamicTableSizeLimit()
}

type itemNode struct {
	it   interface{}
	next *itemNode
}

type itemList struct {
	head *itemNode
	tail *itemNode
}

func ( *itemList) ( interface{}) {
	 := &itemNode{it: }
	if .tail == nil {
		.head, .tail = , 
		return
	}
	.tail.next = 
	.tail = 
}
peek returns the first item in the list without removing it from the list.
func ( *itemList) () interface{} {
	return .head.it
}

func ( *itemList) () interface{} {
	if .head == nil {
		return nil
	}
	 := .head.it
	.head = .head.next
	if .head == nil {
		.tail = nil
	}
	return 
}

func ( *itemList) () *itemNode {
	 := .head
	.head, .tail = nil, nil
	return 
}

func ( *itemList) () bool {
	return .head == nil
}
The following defines various control items which could flow through the control buffer of transport. They represent different aspects of control tasks, e.g., flow control, settings, streaming resetting, etc.
maxQueuedTransportResponseFrames is the most queued "transport response" frames we will buffer before preventing new reads from occurring on the transport. These are control frames sent in response to client requests, such as RST_STREAM due to bad headers or settings acks.
const maxQueuedTransportResponseFrames = 50

type cbItem interface {
	isTransportResponseFrame() bool
}
registerStream is used to register an incoming stream with loopy writer.
headerFrame is also used to register stream on the client-side.
type headerFrame struct {
	streamID   uint32
	hf         []hpack.HeaderField
	endStream  bool               // Valid on server side.
	initStream func(uint32) error // Used only on the client side.
	onWrite    func()
	wq         *writeQuota    // write quota for the stream created.
	cleanup    *cleanupStream // Valid on the server side.
	onOrphaned func(error)    // Valid on client-side
}

func ( *headerFrame) () bool {
	return .cleanup != nil && .cleanup.rst // Results in a RST_STREAM
}

type cleanupStream struct {
	streamID uint32
	rst      bool
	rstCode  http2.ErrCode
	onWrite  func()
}

func ( *cleanupStream) () bool { return .rst } // Results in a RST_STREAM

type dataFrame struct {
	streamID  uint32
	endStream bool
	h         []byte
onEachWrite is called every time a part of d is written out.
	onEachWrite func()
}

func (*dataFrame) () bool { return false }

type incomingWindowUpdate struct {
	streamID  uint32
	increment uint32
}

func (*incomingWindowUpdate) () bool { return false }

type outgoingWindowUpdate struct {
	streamID  uint32
	increment uint32
}

func (*outgoingWindowUpdate) () bool {
	return false // window updates are throttled by thresholds
}

type incomingSettings struct {
	ss []http2.Setting
}

func (*incomingSettings) () bool { return true } // Results in a settings ACK

type outgoingSettings struct {
	ss []http2.Setting
}

func (*outgoingSettings) () bool { return false }

type incomingGoAway struct {
}

func (*incomingGoAway) () bool { return false }

type goAway struct {
	code      http2.ErrCode
	debugData []byte
	headsUp   bool
	closeConn bool
}

func (*goAway) () bool { return false }

type ping struct {
	ack  bool
	data [8]byte
}

func (*ping) () bool { return true }

type outFlowControlSizeRequest struct {
	resp chan uint32
}

func (*outFlowControlSizeRequest) () bool { return false }

type outStreamState int

const (
	active outStreamState = iota
	empty
	waitingOnStreamQuota
)

type outStream struct {
	id               uint32
	state            outStreamState
	itl              *itemList
	bytesOutStanding int
	wq               *writeQuota

	next *outStream
	prev *outStream
}

func ( *outStream) () {
	if .prev != nil {
		.prev.next = .next
	}
	if .next != nil {
		.next.prev = .prev
	}
	.next, .prev = nil, nil
}

Following are sentinel objects that mark the beginning and end of the list. They do not contain any item lists. All valid objects are inserted in between them. This is needed so that an outStream object can deleteSelf() in O(1) time without knowing which list it belongs to.
	head *outStream
	tail *outStream
}

func () *outStreamList {
	,  := new(outStream), new(outStream)
	.next = 
	.prev = 
	return &outStreamList{
		head: ,
		tail: ,
	}
}

func ( *outStreamList) ( *outStream) {
	 := .tail.prev
	.next = 
	.prev = 
	.next = .tail
	.tail.prev = 
}
remove from the beginning of the list.
func ( *outStreamList) () *outStream {
	 := .head.next
	if  == .tail {
		return nil
	}
	.deleteSelf()
	return 
}
controlBuffer is a way to pass information to loopy. Information is passed as specific struct types called control frames. A control frame not only represents data, messages or headers to be sent out but can also be used to instruct loopy to update its internal state. It shouldn't be confused with an HTTP2 frame, although some of the control frames like dataFrame and headerFrame do go out on wire as HTTP2 frames.
type controlBuffer struct {
	ch              chan struct{}
	done            <-chan struct{}
	mu              sync.Mutex
	consumerWaiting bool
	list            *itemList
	err             error
transportResponseFrames counts the number of queued items that represent the response of an action initiated by the peer. trfChan is created when transportResponseFrames >= maxQueuedTransportResponseFrames and is closed and nilled when transportResponseFrames drops below the threshold. Both fields are protected by mu.
	transportResponseFrames int
	trfChan                 atomic.Value // *chan struct{}
}

func ( <-chan struct{}) *controlBuffer {
	return &controlBuffer{
		ch:   make(chan struct{}, 1),
		list: &itemList{},
		done: ,
	}
}
throttle blocks if there are too many incomingSettings/cleanupStreams in the controlbuf.
func ( *controlBuffer) () {
	,  := .trfChan.Load().(*chan struct{})
	if  != nil {
		select {
		case <-*:
		case <-.done:
		}
	}
}

func ( *controlBuffer) ( cbItem) error {
	,  := .executeAndPut(nil, )
	return 
}

func ( *controlBuffer) ( func( interface{}) bool,  cbItem) (bool, error) {
	var  bool
	.mu.Lock()
	if .err != nil {
		.mu.Unlock()
		return false, .err
	}
	if  != nil {
		if !() { // f wasn't successful
			.mu.Unlock()
			return false, nil
		}
	}
	if .consumerWaiting {
		 = true
		.consumerWaiting = false
	}
	.list.enqueue()
	if .isTransportResponseFrame() {
		.transportResponseFrames++
We are adding the frame that puts us over the threshold; create a throttling channel.
			 := make(chan struct{})
			.trfChan.Store(&)
		}
	}
	.mu.Unlock()
	if  {
		select {
		case .ch <- struct{}{}:
		default:
		}
	}
	return true, nil
}
Note argument f should never be nil.
func ( *controlBuffer) ( func( interface{}) bool,  interface{}) (bool, error) {
	.mu.Lock()
	if .err != nil {
		.mu.Unlock()
		return false, .err
	}
	if !() { // f wasn't successful
		.mu.Unlock()
		return false, nil
	}
	.mu.Unlock()
	return true, nil
}

func ( *controlBuffer) ( bool) (interface{}, error) {
	for {
		.mu.Lock()
		if .err != nil {
			.mu.Unlock()
			return nil, .err
		}
		if !.list.isEmpty() {
			 := .list.dequeue().(cbItem)
			if .isTransportResponseFrame() {
We are removing the frame that put us over the threshold; close and clear the throttling channel.
					 := .trfChan.Load().(*chan struct{})
					close(*)
					.trfChan.Store((*chan struct{})(nil))
				}
				.transportResponseFrames--
			}
			.mu.Unlock()
			return , nil
		}
		if ! {
			.mu.Unlock()
			return nil, nil
		}
		.consumerWaiting = true
		.mu.Unlock()
		select {
		case <-.ch:
		case <-.done:
			.finish()
			return nil, ErrConnClosing
		}
	}
}

func ( *controlBuffer) () {
	.mu.Lock()
	if .err != nil {
		.mu.Unlock()
		return
	}
There may be headers for streams in the control buffer. These streams need to be cleaned out since the transport is still not aware of these yet.
	for  := .list.dequeueAll();  != nil;  = .next {
		,  := .it.(*headerFrame)
		if ! {
			continue
		}
		if .onOrphaned != nil { // It will be nil on the server-side.
			.onOrphaned(ErrConnClosing)
		}
	}
	.mu.Unlock()
}

type side int

const (
	clientSide side = iota
	serverSide
)
Loopy receives frames from the control buffer. Each frame is handled individually; most of the work done by loopy goes into handling data frames. Loopy maintains a queue of active streams, and each stream maintains a queue of data frames; as loopy receives data frames it gets added to the queue of the relevant stream. Loopy goes over this list of active streams by processing one node every iteration, thereby closely resemebling to a round-robin scheduling over all streams. While processing a stream, loopy writes out data bytes from this stream capped by the min of http2MaxFrameLen, connection-level flow control and stream-level flow control.
estdStreams is map of all established streams that are not cleaned-up yet. On client-side, this is all streams whose headers were sent out. On server-side, this is all streams whose headers were received.
activeStreams is a linked-list of all streams that have data to send and some stream-level flow control quota. Each of these streams internally have a list of data items(and perhaps trailers on the server-side) to be sent out.
	activeStreams *outStreamList
	framer        *framer
	hBuf          *bytes.Buffer  // The buffer for HPACK encoding.
	hEnc          *hpack.Encoder // HPACK encoder.
	bdpEst        *bdpEstimator
	draining      bool
Side-specific handlers
run should be run in a separate goroutine. It reads control frames from controlBuf and processes them by: 1. Updating loopy's internal state, or/and 2. Writing out HTTP2 frames on the wire. Loopy keeps all active streams with data to send in a linked-list. All streams in the activeStreams linked-list must have both: 1. Data to send, and 2. Stream level flow control quota available. In each iteration of run loop, other than processing the incoming control frame, loopy calls processData, which processes one node from the activeStreams linked-list. This results in writing of HTTP2 frames into an underlying write buffer. When there's no more control frames to read from controlBuf, loopy flushes the write buffer. As an optimization, to increase the batch size for each flush, loopy yields the processor, once if the batch size is too low to give stream goroutines a chance to fill it up.
func ( *loopyWriter) () ( error) {
	defer func() {
Don't log ErrConnClosing as error since it happens 1. When the connection is closed by some other known issue. 2. User closed the connection. 3. A graceful close of connection.
			if logger.V(logLevel) {
				logger.Infof("transport: loopyWriter.run returning. %v", )
			}
			 = nil
		}
	}()
	for {
		,  := .cbuf.get(true)
		if  != nil {
			return 
		}
		if  = .handle();  != nil {
			return 
		}
		if _,  = .processData();  != nil {
			return 
		}
		 := true
	:
		for {
			,  := .cbuf.get(false)
			if  != nil {
				return 
			}
			if  != nil {
				if  = .handle();  != nil {
					return 
				}
				if _,  = .processData();  != nil {
					return 
				}
				continue 
			}
			,  := .processData()
			if  != nil {
				return 
			}
			if ! {
				continue 
			}
			if  {
				 = false
				if .framer.writer.offset < minBatchSize {
					runtime.Gosched()
					continue 
				}
			}
			.framer.writer.Flush()
			break 

		}
	}
}

func ( *loopyWriter) ( *outgoingWindowUpdate) error {
	return .framer.fr.WriteWindowUpdate(.streamID, .increment)
}

Otherwise update the quota.
	if .streamID == 0 {
		.sendQuota += .increment
		return nil
Find the stream and update it.
	if ,  := .estdStreams[.streamID];  {
		.bytesOutStanding -= int(.increment)
		if  := int(.oiws) - .bytesOutStanding;  > 0 && .state == waitingOnStreamQuota {
			.state = active
			.activeStreams.enqueue()
			return nil
		}
	}
	return nil
}

func ( *loopyWriter) ( *outgoingSettings) error {
	return .framer.fr.WriteSettings(.ss...)
}

func ( *loopyWriter) ( *incomingSettings) error {
	if  := .applySettings(.ss);  != nil {
		return 
	}
	return .framer.fr.WriteSettingsAck()
}

func ( *loopyWriter) ( *registerStream) error {
	 := &outStream{
		id:    .streamID,
		state: empty,
		itl:   &itemList{},
		wq:    .wq,
	}
	.estdStreams[.streamID] = 
	return nil
}

func ( *loopyWriter) ( *headerFrame) error {
	if .side == serverSide {
		,  := .estdStreams[.streamID]
		if ! {
			if logger.V(logLevel) {
				logger.Warningf("transport: loopy doesn't recognize the stream: %d", .streamID)
			}
			return nil
Case 1.A: Server is responding back with headers.
		if !.endStream {
			return .writeHeader(.streamID, .endStream, .hf, .onWrite)
else: Case 1.B: Server wants to close stream.

add it str's list of items.
			.itl.enqueue()
			return nil
		}
		if  := .writeHeader(.streamID, .endStream, .hf, .onWrite);  != nil {
			return 
		}
		return .cleanupStreamHandler(.cleanup)
Case 2: Client wants to originate stream.
	 := &outStream{
		id:    .streamID,
		state: empty,
		itl:   &itemList{},
		wq:    .wq,
	}
	.itl.enqueue()
	return .originateStream()
}

func ( *loopyWriter) ( *outStream) error {
	 := .itl.dequeue().(*headerFrame)
	if  := .initStream(.id);  != nil {
		if  == ErrConnClosing {
			return 
Other errors(errStreamDrain) need not close transport.
		return nil
	}
	if  := .writeHeader(.id, .endStream, .hf, .onWrite);  != nil {
		return 
	}
	.estdStreams[.id] = 
	return nil
}

func ( *loopyWriter) ( uint32,  bool,  []hpack.HeaderField,  func()) error {
	if  != nil {
		()
	}
	.hBuf.Reset()
	for ,  := range  {
		if  := .hEnc.WriteField();  != nil {
			if logger.V(logLevel) {
				logger.Warningf("transport: loopyWriter.writeHeader encountered error while encoding headers: %v", )
			}
		}
	}
	var (
		               error
		,  bool
	)
	 = true
	for ! {
		 := .hBuf.Len()
		if  > http2MaxFrameLen {
			 = http2MaxFrameLen
		} else {
			 = true
		}
		if  {
			 = false
			 = .framer.fr.WriteHeaders(http2.HeadersFrameParam{
				StreamID:      ,
				BlockFragment: .hBuf.Next(),
				EndStream:     ,
				EndHeaders:    ,
			})
		} else {
			 = .framer.fr.WriteContinuation(
				,
				,
				.hBuf.Next(),
			)
		}
		if  != nil {
			return 
		}
	}
	return nil
}

func ( *loopyWriter) ( *dataFrame) error {
	,  := .estdStreams[.streamID]
	if ! {
		return nil
If we got data for a stream it means that stream was originated and the headers were sent out.
On the server side it could be a trailers-only response or a RST_STREAM before stream initialization thus the stream might not be established yet.
		delete(.estdStreams, .streamID)
		.deleteSelf()
	}
	if .rst { // If RST_STREAM needs to be sent.
		if  := .framer.fr.WriteRSTStream(.streamID, .rstCode);  != nil {
			return 
		}
	}
	if .side == clientSide && .draining && len(.estdStreams) == 0 {
		return ErrConnClosing
	}
	return nil
}

func ( *loopyWriter) (*incomingGoAway) error {
	if .side == clientSide {
		.draining = true
		if len(.estdStreams) == 0 {
			return ErrConnClosing
		}
	}
	return nil
}

Handling of outgoing GoAway is very specific to side.
	if .ssGoAwayHandler != nil {
		,  := .ssGoAwayHandler()
		if  != nil {
			return 
		}
		.draining = 
	}
	return nil
}

func ( *loopyWriter) ( interface{}) error {
	switch i := .(type) {
	case *incomingWindowUpdate:
		return .incomingWindowUpdateHandler()
	case *outgoingWindowUpdate:
		return .outgoingWindowUpdateHandler()
	case *incomingSettings:
		return .incomingSettingsHandler()
	case *outgoingSettings:
		return .outgoingSettingsHandler()
	case *headerFrame:
		return .headerHandler()
	case *registerStream:
		return .registerStreamHandler()
	case *cleanupStream:
		return .cleanupStreamHandler()
	case *incomingGoAway:
		return .incomingGoAwayHandler()
	case *dataFrame:
		return .preprocessData()
	case *ping:
		return .pingHandler()
	case *goAway:
		return .goAwayHandler()
	case *outFlowControlSizeRequest:
		return .outFlowControlSizeRequestHandler()
	default:
		return fmt.Errorf("transport: unknown control message type %T", )
	}
}

func ( *loopyWriter) ( []http2.Setting) error {
	for ,  := range  {
		switch .ID {
		case http2.SettingInitialWindowSize:
			 := .oiws
			.oiws = .Val
If the new limit is greater make all depleted streams active.
				for ,  := range .estdStreams {
					if .state == waitingOnStreamQuota {
						.state = active
						.activeStreams.enqueue()
					}
				}
			}
		case http2.SettingHeaderTableSize:
			updateHeaderTblSize(.hEnc, .Val)
		}
	}
	return nil
}
processData removes the first stream from active streams, writes out at most 16KB of its data and then puts it at the end of activeStreams if there's still more data to be sent and stream has some stream-level flow control.
func ( *loopyWriter) () (bool, error) {
	if .sendQuota == 0 {
		return true, nil
	}
	 := .activeStreams.dequeue() // Remove the first stream.
	if  == nil {
		return true, nil
	}
A data item is represented by a dataFrame, since it later translates into multiple HTTP2 data frames. Every dataFrame has two buffers; h that keeps grpc-message header and d that is acutal data. As an optimization to keep wire traffic low, data from d is copied to h to make as big as the maximum possilbe HTTP2 frame size.

Client sends out empty data frame with endStream = true
		if  := .framer.fr.WriteData(.streamID, .endStream, nil);  != nil {
			return false, 
		}
		.itl.dequeue() // remove the empty data item from stream
		if .itl.isEmpty() {
			.state = empty
		} else if ,  := .itl.peek().(*headerFrame);  { // the next item is trailers.
			if  := .writeHeader(.streamID, .endStream, .hf, .onWrite);  != nil {
				return false, 
			}
			if  := .cleanupStreamHandler(.cleanup);  != nil {
				return false, nil
			}
		} else {
			.activeStreams.enqueue()
		}
		return false, nil
	}
	var (
		 []byte
Figure out the maximum size we can send
	 := http2MaxFrameLen
	if  := int(.oiws) - .bytesOutStanding;  <= 0 { // stream-level flow control.
		.state = waitingOnStreamQuota
		return false, nil
	} else if  >  {
		 = 
	}
	if  > int(.sendQuota) { // connection-level flow control.
		 = int(.sendQuota)
Compute how much of the header and data we can send within quota and max frame length
	 := min(, len(.h))
	 := min(-, len(.d))
	if  != 0 {
		if  == 0 {
			 = .h
We can add some data to grpc message header to distribute bytes more equally across frames. Copy on the stack to avoid generating garbage
			var  [http2MaxFrameLen]byte
			copy([:], .h)
			copy([:], .d[:])
			 = [:+]
		}
	} else {
		 = .d
	}

	 :=  + 
Now that outgoing flow controls are checked we can replenish str's write quota
	.wq.replenish()
If this is the last data message on this stream and all of it can be written in this iteration.
	if .endStream && len(.h)+len(.d) <=  {
		 = true
	}
	if .onEachWrite != nil {
		.onEachWrite()
	}
	if  := .framer.fr.WriteData(.streamID, , [:]);  != nil {
		return false, 
	}
	.bytesOutStanding += 
	.sendQuota -= uint32()
	.h = .h[:]
	.d = .d[:]

	if len(.h) == 0 && len(.d) == 0 { // All the data from that message was written out.
		.itl.dequeue()
	}
	if .itl.isEmpty() {
		.state = empty
	} else if ,  := .itl.peek().(*headerFrame);  { // The next item is trailers.
		if  := .writeHeader(.streamID, .endStream, .hf, .onWrite);  != nil {
			return false, 
		}
		if  := .cleanupStreamHandler(.cleanup);  != nil {
			return false, 
		}
	} else if int(.oiws)-.bytesOutStanding <= 0 { // Ran out of stream quota.
		.state = waitingOnStreamQuota
	} else { // Otherwise add it back to the list of active streams.
		.activeStreams.enqueue()
	}
	return false, nil
}

func (,  int) int {
	if  <  {
		return 
	}
	return