Source File
transport.go
Belonging Package
google.golang.org/grpc/internal/transport
package transport
import (
)
const logLevel = 2
type bufferPool struct {
pool sync.Pool
}
func () *bufferPool {
return &bufferPool{
pool: sync.Pool{
New: func() interface{} {
return new(bytes.Buffer)
},
},
}
}
func ( *bufferPool) () *bytes.Buffer {
return .pool.Get().(*bytes.Buffer)
}
func ( *bufferPool) ( *bytes.Buffer) {
.pool.Put()
}
type recvMsg struct {
return
}
.err = .err
if len(.backlog) == 0 {
select {
case .c <- :
.mu.Unlock()
return
default:
}
}
.backlog = append(.backlog, )
.mu.Unlock()
}
func ( *recvBuffer) () {
.mu.Lock()
if len(.backlog) > 0 {
select {
case .c <- .backlog[0]:
.backlog[0] = recvMsg{}
.backlog = .backlog[1:]
default:
}
}
.mu.Unlock()
}
func ( *recvBuffer) () <-chan recvMsg {
return .c
}
type recvBufferReader struct {
closeStream func(error) // Closes the client transport stream with the given error and nil trailer metadata.
ctx context.Context
ctxDone <-chan struct{} // cache of ctx.Done() (for performance).
recv *recvBuffer
last *bytes.Buffer // Stores the remaining data in the previous calls.
err error
freeBuffer func(*bytes.Buffer)
}
, := .last.Read()
if .last.Len() == 0 {
.freeBuffer(.last)
.last = nil
}
return , nil
}
if .closeStream != nil {
, .err = .readClient()
} else {
, .err = .read()
}
return , .err
}
func ( *recvBufferReader) ( []byte) ( int, error) {
select {
case <-.ctxDone:
return 0, ContextErr(.ctx.Err())
case := <-.recv.get():
return .readAdditional(, )
}
}
select {
.closeStream(ContextErr(.ctx.Err()))
:= <-.recv.get()
return .readAdditional(, )
case := <-.recv.get():
return .readAdditional(, )
}
}
func ( *recvBufferReader) ( recvMsg, []byte) ( int, error) {
.recv.load()
if .err != nil {
return 0, .err
}
, := .buffer.Read()
if .buffer.Len() == 0 {
.freeBuffer(.buffer)
.last = nil
} else {
.last = .buffer
}
return , nil
}
type streamState uint32
const (
streamActive streamState = iota
streamWriteDone // EndStream sent
streamReadDone // EndStream received
streamDone // the entire stream is finished.
)
type Stream struct {
id uint32
st ServerTransport // nil for client side Stream
ct *http2Client // nil for server side Stream
ctx context.Context // the associated context of the stream
cancel context.CancelFunc // always nil for client side Stream
done chan struct{} // closed at the end of stream to unblock writers. On the client side.
ctxDone <-chan struct{} // same as done chan but for server side. Cache of ctx.Done() (for performance)
method string // the associated RPC method of the stream
recvCompress string
sendCompress string
buf *recvBuffer
trReader io.Reader
fc *inFlow
wq *writeQuota
requestRead func(int)
headerChan chan struct{} // closed to indicate the end of header metadata.
status *status.Status
bytesReceived uint32 // indicates whether any bytes have been received on this stream
unprocessed uint32 // set if the server sends a refused stream or GOAWAY including this stream
func ( *Stream) () bool {
return atomic.LoadUint32(&.headerSent) == 1
}
func ( *Stream) () bool {
return atomic.SwapUint32(&.headerSent, 1) == 1
}
func ( *Stream) ( streamState) streamState {
return streamState(atomic.SwapUint32((*uint32)(&.state), uint32()))
}
func ( *Stream) (, streamState) bool {
return atomic.CompareAndSwapUint32((*uint32)(&.state), uint32(), uint32())
}
func ( *Stream) () streamState {
return streamState(atomic.LoadUint32((*uint32)(&.state)))
}
func ( *Stream) () {
return
}
select {
<-.headerChan
case <-.headerChan:
}
}
func ( *Stream) () string {
.waitOnHeader()
return .recvCompress
}
func ( *Stream) ( string) {
.sendCompress =
}
func ( *Stream) () bool {
.waitOnHeader()
return .noHeaders
}
func ( *Stream) () string {
return .contentSubtype
}
if := .trReader.(*transportReader).er; != nil {
return 0,
}
.requestRead(len())
return io.ReadFull(.trReader, )
}
type transportReader struct {
windowHandler func(int)
er error
}
func ( *transportReader) ( []byte) ( int, error) {
, = .reader.Read()
if != nil {
.er =
return
}
.windowHandler()
return
}
func ( *Stream) () bool {
return atomic.LoadUint32(&.bytesReceived) == 1
}
func ( *Stream) () bool {
return atomic.LoadUint32(&.unprocessed) == 1
}
type transportState int
const (
reachable transportState = iota
closing
draining
)
type ServerConfig struct {
MaxStreams uint32
AuthInfo credentials.AuthInfo
InTapHandle tap.ServerInHandle
StatsHandler stats.Handler
KeepaliveParams keepalive.ServerParameters
KeepalivePolicy keepalive.EnforcementPolicy
InitialWindowSize int32
InitialConnWindowSize int32
WriteBufferSize int
ReadBufferSize int
ChannelzParentID int64
MaxHeaderListSize *uint32
HeaderTableSize *uint32
}
func ( string, net.Conn, *ServerConfig) (ServerTransport, error) {
return newHTTP2Server(, )
}
func (, context.Context, resolver.Address, ConnectOptions, func(), func(GoAwayReason), func()) (ClientTransport, error) {
return newHTTP2Client(, , , , , , )
}
ContentSubtype string
PreviousAttempts int // value of grpc-previous-rpc-attempts header to set
}
Close() error
GracefulClose()
Error() <-chan struct{}
GoAway() <-chan struct{}
GetGoAwayReason() GoAwayReason
IncrMsgSent()
IncrMsgRecv()
}
Close() error
Drain()
IncrMsgSent()
IncrMsgRecv()
}
func ( bool, error, string, ...interface{}) ConnectionError {
return ConnectionError{
Desc: fmt.Sprintf(, ...),
temp: ,
err: ,
}
}
func ( ConnectionError) () bool {
return .temp
}
statusGoAway = status.New(codes.Unavailable, "the stream is rejected because server is draining the connection")
)
type GoAwayReason uint8
type channelzData struct {
![]() |
The pages are generated with Golds v0.3.2-preview. (GOOS=darwin GOARCH=amd64) Golds is a Go 101 project developed by Tapir Liu. PR and bug reports are welcome and can be submitted to the issue list. Please follow @Go100and1 (reachable from the left QR code) to get the latest news of Golds. |