Source File
controlbuf.go
Belonging Package
google.golang.org/grpc/internal/transport
package transport
import (
)
var updateHeaderTblSize = func( *hpack.Encoder, uint32) {
.SetMaxDynamicTableSizeLimit()
}
type itemNode struct {
it interface{}
next *itemNode
}
type itemList struct {
head *itemNode
tail *itemNode
}
func ( *itemList) ( interface{}) {
:= &itemNode{it: }
if .tail == nil {
.head, .tail = ,
return
}
.tail.next =
.tail =
}
func ( *itemList) () interface{} {
return .head.it
}
func ( *itemList) () interface{} {
if .head == nil {
return nil
}
:= .head.it
.head = .head.next
if .head == nil {
.tail = nil
}
return
}
func ( *itemList) () *itemNode {
:= .head
.head, .tail = nil, nil
return
}
func ( *itemList) () bool {
return .head == nil
}
const maxQueuedTransportResponseFrames = 50
type cbItem interface {
isTransportResponseFrame() bool
}
type registerStream struct {
streamID uint32
wq *writeQuota
}
func (*registerStream) () bool { return false }
type headerFrame struct {
streamID uint32
hf []hpack.HeaderField
endStream bool // Valid on server side.
initStream func(uint32) error // Used only on the client side.
onWrite func()
wq *writeQuota // write quota for the stream created.
cleanup *cleanupStream // Valid on the server side.
onOrphaned func(error) // Valid on client-side
}
func ( *headerFrame) () bool {
return .cleanup != nil && .cleanup.rst // Results in a RST_STREAM
}
type cleanupStream struct {
streamID uint32
rst bool
rstCode http2.ErrCode
onWrite func()
}
func ( *cleanupStream) () bool { return .rst } // Results in a RST_STREAM
type dataFrame struct {
streamID uint32
endStream bool
h []byte
onEachWrite func()
}
func (*dataFrame) () bool { return false }
type incomingWindowUpdate struct {
streamID uint32
increment uint32
}
func (*incomingWindowUpdate) () bool { return false }
type outgoingWindowUpdate struct {
streamID uint32
increment uint32
}
func (*outgoingWindowUpdate) () bool {
return false // window updates are throttled by thresholds
}
type incomingSettings struct {
ss []http2.Setting
}
func (*incomingSettings) () bool { return true } // Results in a settings ACK
type outgoingSettings struct {
ss []http2.Setting
}
func (*outgoingSettings) () bool { return false }
type incomingGoAway struct {
}
func (*incomingGoAway) () bool { return false }
type goAway struct {
code http2.ErrCode
debugData []byte
headsUp bool
closeConn bool
}
func (*goAway) () bool { return false }
type ping struct {
ack bool
data [8]byte
}
func (*ping) () bool { return true }
type outFlowControlSizeRequest struct {
resp chan uint32
}
func (*outFlowControlSizeRequest) () bool { return false }
type outStreamState int
const (
active outStreamState = iota
empty
waitingOnStreamQuota
)
type outStream struct {
id uint32
state outStreamState
itl *itemList
bytesOutStanding int
wq *writeQuota
next *outStream
prev *outStream
}
func ( *outStream) () {
if .prev != nil {
.prev.next = .next
}
if .next != nil {
.next.prev = .prev
}
.next, .prev = nil, nil
}
func ( *outStreamList) () *outStream {
:= .head.next
if == .tail {
return nil
}
.deleteSelf()
return
}
transportResponseFrames int
trfChan atomic.Value // *chan struct{}
}
func ( <-chan struct{}) *controlBuffer {
return &controlBuffer{
ch: make(chan struct{}, 1),
list: &itemList{},
done: ,
}
}
func ( *controlBuffer) () {
, := .trfChan.Load().(*chan struct{})
if != nil {
select {
case <-*:
case <-.done:
}
}
}
func ( *controlBuffer) ( cbItem) error {
, := .executeAndPut(nil, )
return
}
func ( *controlBuffer) ( func( interface{}) bool, cbItem) (bool, error) {
var bool
.mu.Lock()
if .err != nil {
.mu.Unlock()
return false, .err
}
if != nil {
if !() { // f wasn't successful
.mu.Unlock()
return false, nil
}
}
if .consumerWaiting {
= true
.consumerWaiting = false
}
.list.enqueue()
if .isTransportResponseFrame() {
.transportResponseFrames++
func ( *controlBuffer) ( func( interface{}) bool, interface{}) (bool, error) {
.mu.Lock()
if .err != nil {
.mu.Unlock()
return false, .err
}
if !() { // f wasn't successful
.mu.Unlock()
return false, nil
}
.mu.Unlock()
return true, nil
}
func ( *controlBuffer) ( bool) (interface{}, error) {
for {
.mu.Lock()
if .err != nil {
.mu.Unlock()
return nil, .err
}
if !.list.isEmpty() {
:= .list.dequeue().(cbItem)
if .isTransportResponseFrame() {
:= .trfChan.Load().(*chan struct{})
close(*)
.trfChan.Store((*chan struct{})(nil))
}
.transportResponseFrames--
}
.mu.Unlock()
return , nil
}
if ! {
.mu.Unlock()
return nil, nil
}
.consumerWaiting = true
.mu.Unlock()
select {
case <-.ch:
case <-.done:
.finish()
return nil, ErrConnClosing
}
}
}
func ( *controlBuffer) () {
.mu.Lock()
if .err != nil {
.mu.Unlock()
return
}
for := .list.dequeueAll(); != nil; = .next {
, := .it.(*headerFrame)
if ! {
continue
}
if .onOrphaned != nil { // It will be nil on the server-side.
.onOrphaned(ErrConnClosing)
}
}
.mu.Unlock()
}
type side int
const (
clientSide side = iota
serverSide
)
type loopyWriter struct {
side side
cbuf *controlBuffer
sendQuota uint32
activeStreams *outStreamList
framer *framer
hBuf *bytes.Buffer // The buffer for HPACK encoding.
hEnc *hpack.Encoder // HPACK encoder.
bdpEst *bdpEstimator
draining bool
ssGoAwayHandler func(*goAway) (bool, error)
}
func ( side, *framer, *controlBuffer, *bdpEstimator) *loopyWriter {
var bytes.Buffer
:= &loopyWriter{
side: ,
cbuf: ,
sendQuota: defaultWindowSize,
oiws: defaultWindowSize,
estdStreams: make(map[uint32]*outStream),
activeStreams: newOutStreamList(),
framer: ,
hBuf: &,
hEnc: hpack.NewEncoder(&),
bdpEst: ,
}
return
}
const minBatchSize = 1000
func ( *loopyWriter) () ( error) {
defer func() {
if logger.V(logLevel) {
logger.Infof("transport: loopyWriter.run returning. %v", )
}
= nil
}
}()
for {
, := .cbuf.get(true)
if != nil {
return
}
if = .handle(); != nil {
return
}
if _, = .processData(); != nil {
return
}
:= true
:
for {
, := .cbuf.get(false)
if != nil {
return
}
if != nil {
if = .handle(); != nil {
return
}
if _, = .processData(); != nil {
return
}
continue
}
, := .processData()
if != nil {
return
}
if ! {
continue
}
if {
= false
if .framer.writer.offset < minBatchSize {
runtime.Gosched()
continue
}
}
.framer.writer.Flush()
break
}
}
}
func ( *loopyWriter) ( *outgoingWindowUpdate) error {
return .framer.fr.WriteWindowUpdate(.streamID, .increment)
}
if , := .estdStreams[.streamID]; {
.bytesOutStanding -= int(.increment)
if := int(.oiws) - .bytesOutStanding; > 0 && .state == waitingOnStreamQuota {
.state = active
.activeStreams.enqueue()
return nil
}
}
return nil
}
func ( *loopyWriter) ( *outgoingSettings) error {
return .framer.fr.WriteSettings(.ss...)
}
func ( *loopyWriter) ( *incomingSettings) error {
if := .applySettings(.ss); != nil {
return
}
return .framer.fr.WriteSettingsAck()
}
func ( *loopyWriter) ( *registerStream) error {
:= &outStream{
id: .streamID,
state: empty,
itl: &itemList{},
wq: .wq,
}
.estdStreams[.streamID] =
return nil
}
func ( *loopyWriter) ( *headerFrame) error {
if .side == serverSide {
, := .estdStreams[.streamID]
if ! {
if logger.V(logLevel) {
logger.Warningf("transport: loopy doesn't recognize the stream: %d", .streamID)
}
return nil
:= &outStream{
id: .streamID,
state: empty,
itl: &itemList{},
wq: .wq,
}
.itl.enqueue()
return .originateStream()
}
func ( *loopyWriter) ( *outStream) error {
:= .itl.dequeue().(*headerFrame)
if := .initStream(.id); != nil {
if == ErrConnClosing {
return
return nil
}
if := .writeHeader(.id, .endStream, .hf, .onWrite); != nil {
return
}
.estdStreams[.id] =
return nil
}
func ( *loopyWriter) ( uint32, bool, []hpack.HeaderField, func()) error {
if != nil {
()
}
.hBuf.Reset()
for , := range {
if := .hEnc.WriteField(); != nil {
if logger.V(logLevel) {
logger.Warningf("transport: loopyWriter.writeHeader encountered error while encoding headers: %v", )
}
}
}
var (
error
, bool
)
= true
for ! {
:= .hBuf.Len()
if > http2MaxFrameLen {
= http2MaxFrameLen
} else {
= true
}
if {
= false
= .framer.fr.WriteHeaders(http2.HeadersFrameParam{
StreamID: ,
BlockFragment: .hBuf.Next(),
EndStream: ,
EndHeaders: ,
})
} else {
= .framer.fr.WriteContinuation(
,
,
.hBuf.Next(),
)
}
if != nil {
return
}
}
return nil
}
func ( *loopyWriter) ( *dataFrame) error {
, := .estdStreams[.streamID]
if ! {
return nil
.itl.enqueue()
if .state == empty {
.state = active
.activeStreams.enqueue()
}
return nil
}
func ( *loopyWriter) ( *ping) error {
if !.ack {
.bdpEst.timesnap(.data)
}
return .framer.fr.WritePing(.ack, .data)
}
func ( *loopyWriter) ( *outFlowControlSizeRequest) error {
.resp <- .sendQuota
return nil
}
func ( *loopyWriter) ( *cleanupStream) error {
.onWrite()
delete(.estdStreams, .streamID)
.deleteSelf()
}
if .rst { // If RST_STREAM needs to be sent.
if := .framer.fr.WriteRSTStream(.streamID, .rstCode); != nil {
return
}
}
if .side == clientSide && .draining && len(.estdStreams) == 0 {
return ErrConnClosing
}
return nil
}
func ( *loopyWriter) (*incomingGoAway) error {
if .side == clientSide {
.draining = true
if len(.estdStreams) == 0 {
return ErrConnClosing
}
}
return nil
}
if .ssGoAwayHandler != nil {
, := .ssGoAwayHandler()
if != nil {
return
}
.draining =
}
return nil
}
func ( *loopyWriter) ( interface{}) error {
switch i := .(type) {
case *incomingWindowUpdate:
return .incomingWindowUpdateHandler()
case *outgoingWindowUpdate:
return .outgoingWindowUpdateHandler()
case *incomingSettings:
return .incomingSettingsHandler()
case *outgoingSettings:
return .outgoingSettingsHandler()
case *headerFrame:
return .headerHandler()
case *registerStream:
return .registerStreamHandler()
case *cleanupStream:
return .cleanupStreamHandler()
case *incomingGoAway:
return .incomingGoAwayHandler()
case *dataFrame:
return .preprocessData()
case *ping:
return .pingHandler()
case *goAway:
return .goAwayHandler()
case *outFlowControlSizeRequest:
return .outFlowControlSizeRequestHandler()
default:
return fmt.Errorf("transport: unknown control message type %T", )
}
}
func ( *loopyWriter) ( []http2.Setting) error {
for , := range {
switch .ID {
case http2.SettingInitialWindowSize:
:= .oiws
.oiws = .Val
for , := range .estdStreams {
if .state == waitingOnStreamQuota {
.state = active
.activeStreams.enqueue()
}
}
}
case http2.SettingHeaderTableSize:
updateHeaderTblSize(.hEnc, .Val)
}
}
return nil
}
if := .framer.fr.WriteData(.streamID, .endStream, nil); != nil {
return false,
}
.itl.dequeue() // remove the empty data item from stream
if .itl.isEmpty() {
.state = empty
} else if , := .itl.peek().(*headerFrame); { // the next item is trailers.
if := .writeHeader(.streamID, .endStream, .hf, .onWrite); != nil {
return false,
}
if := .cleanupStreamHandler(.cleanup); != nil {
return false, nil
}
} else {
.activeStreams.enqueue()
}
return false, nil
}
var (
[]byte
:= http2MaxFrameLen
if := int(.oiws) - .bytesOutStanding; <= 0 { // stream-level flow control.
.state = waitingOnStreamQuota
return false, nil
} else if > {
=
}
if > int(.sendQuota) { // connection-level flow control.
= int(.sendQuota)
if .endStream && len(.h)+len(.d) <= {
= true
}
if .onEachWrite != nil {
.onEachWrite()
}
if := .framer.fr.WriteData(.streamID, , [:]); != nil {
return false,
}
.bytesOutStanding +=
.sendQuota -= uint32()
.h = .h[:]
.d = .d[:]
if len(.h) == 0 && len(.d) == 0 { // All the data from that message was written out.
.itl.dequeue()
}
if .itl.isEmpty() {
.state = empty
} else if , := .itl.peek().(*headerFrame); { // The next item is trailers.
if := .writeHeader(.streamID, .endStream, .hf, .onWrite); != nil {
return false,
}
if := .cleanupStreamHandler(.cleanup); != nil {
return false,
}
} else if int(.oiws)-.bytesOutStanding <= 0 { // Ran out of stream quota.
.state = waitingOnStreamQuota
} else { // Otherwise add it back to the list of active streams.
.activeStreams.enqueue()
}
return false, nil
}
func (, int) int {
if < {
return
}
return
![]() |
The pages are generated with Golds v0.3.2-preview. (GOOS=darwin GOARCH=amd64) Golds is a Go 101 project developed by Tapir Liu. PR and bug reports are welcome and can be submitted to the issue list. Please follow @Go100and1 (reachable from the left QR code) to get the latest news of Golds. |