* * Copyright 2014 gRPC authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http:www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. *
Package transport defines and implements message oriented communication channel to complete various transactions (e.g., an RPC). It is meant for grpc-internal usage and is not intended to be imported directly by users.
package transport

import (
	
	
	
	
	
	
	
	

	
	
	
	
	
	
	
	
)

const logLevel = 2

type bufferPool struct {
	pool sync.Pool
}

func () *bufferPool {
	return &bufferPool{
		pool: sync.Pool{
			New: func() interface{} {
				return new(bytes.Buffer)
			},
		},
	}
}

func ( *bufferPool) () *bytes.Buffer {
	return .pool.Get().(*bytes.Buffer)
}

func ( *bufferPool) ( *bytes.Buffer) {
	.pool.Put()
}
recvMsg represents the received msg from the transport. All transport protocol specific info has been removed.
type recvMsg struct {
nil: received some data io.EOF: stream is completed. data is nil. other non-nil error: transport failure. data is nil.
recvBuffer is an unbounded channel of recvMsg structs. Note: recvBuffer differs from buffer.Unbounded only in the fact that it holds a channel of recvMsg structs instead of objects implementing "item" interface. recvBuffer is written to much more often and using strict recvMsg structs helps avoid allocation in "recvBuffer.put"
type recvBuffer struct {
	c       chan recvMsg
	mu      sync.Mutex
	backlog []recvMsg
	err     error
}

func () *recvBuffer {
	 := &recvBuffer{
		c: make(chan recvMsg, 1),
	}
	return 
}

func ( *recvBuffer) ( recvMsg) {
	.mu.Lock()
	if .err != nil {
An error had occurred earlier, don't accept more data or errors.
		return
	}
	.err = .err
	if len(.backlog) == 0 {
		select {
		case .c <- :
			.mu.Unlock()
			return
		default:
		}
	}
	.backlog = append(.backlog, )
	.mu.Unlock()
}

func ( *recvBuffer) () {
	.mu.Lock()
	if len(.backlog) > 0 {
		select {
		case .c <- .backlog[0]:
			.backlog[0] = recvMsg{}
			.backlog = .backlog[1:]
		default:
		}
	}
	.mu.Unlock()
}
get returns the channel that receives a recvMsg in the buffer. Upon receipt of a recvMsg, the caller should call load to send another recvMsg onto the channel if there is any.
func ( *recvBuffer) () <-chan recvMsg {
	return .c
}
recvBufferReader implements io.Reader interface to read the data from recvBuffer.
type recvBufferReader struct {
	closeStream func(error) // Closes the client transport stream with the given error and nil trailer metadata.
	ctx         context.Context
	ctxDone     <-chan struct{} // cache of ctx.Done() (for performance).
	recv        *recvBuffer
	last        *bytes.Buffer // Stores the remaining data in the previous calls.
	err         error
	freeBuffer  func(*bytes.Buffer)
}
Read reads the next len(p) bytes from last. If last is drained, it tries to read additional data from recv. It blocks if there no additional data available in recv. If Read returns any non-nil error, it will continue to return that error.
func ( *recvBufferReader) ( []byte) ( int,  error) {
	if .err != nil {
		return 0, .err
	}
Read remaining data left in last call.
		,  := .last.Read()
		if .last.Len() == 0 {
			.freeBuffer(.last)
			.last = nil
		}
		return , nil
	}
	if .closeStream != nil {
		, .err = .readClient()
	} else {
		, .err = .read()
	}
	return , .err
}

func ( *recvBufferReader) ( []byte) ( int,  error) {
	select {
	case <-.ctxDone:
		return 0, ContextErr(.ctx.Err())
	case  := <-.recv.get():
		return .readAdditional(, )
	}
}

If the context is canceled, then closes the stream with nil metadata. closeStream writes its error parameter to r.recv as a recvMsg. r.readAdditional acts on that message and returns the necessary error.
	select {
Note that this adds the ctx error to the end of recv buffer, and reads from the head. This will delay the error until recv buffer is empty, thus will delay ctx cancellation in Recv(). It's done this way to fix a race between ctx cancel and trailer. The race was, stream.Recv() may return ctx error if ctxDone wins the race, but stream.Trailer() may return a non-nil md because the stream was not marked as done when trailer is received. This closeStream call will mark stream as done, thus fix the race. TODO: delaying ctx error seems like a unnecessary side effect. What we really want is to mark the stream as done, and return ctx error faster.
		.closeStream(ContextErr(.ctx.Err()))
		 := <-.recv.get()
		return .readAdditional(, )
	case  := <-.recv.get():
		return .readAdditional(, )
	}
}

func ( *recvBufferReader) ( recvMsg,  []byte) ( int,  error) {
	.recv.load()
	if .err != nil {
		return 0, .err
	}
	,  := .buffer.Read()
	if .buffer.Len() == 0 {
		.freeBuffer(.buffer)
		.last = nil
	} else {
		.last = .buffer
	}
	return , nil
}

type streamState uint32

const (
	streamActive    streamState = iota
	streamWriteDone             // EndStream sent
	streamReadDone              // EndStream received
	streamDone                  // the entire stream is finished.
)
Stream represents an RPC in the transport layer.
type Stream struct {
	id           uint32
	st           ServerTransport    // nil for client side Stream
	ct           *http2Client       // nil for server side Stream
	ctx          context.Context    // the associated context of the stream
	cancel       context.CancelFunc // always nil for client side Stream
	done         chan struct{}      // closed at the end of stream to unblock writers. On the client side.
	ctxDone      <-chan struct{}    // same as done chan but for server side. Cache of ctx.Done() (for performance)
	method       string             // the associated RPC method of the stream
	recvCompress string
	sendCompress string
	buf          *recvBuffer
	trReader     io.Reader
	fc           *inFlow
	wq           *writeQuota
Callback to state application's intentions to read data. This is used to adjust flow control, if needed.
	requestRead func(int)

	headerChan       chan struct{} // closed to indicate the end of header metadata.
headerValid indicates whether a valid header was received. Only meaningful after headerChan is closed (always call waitOnHeader() before reading its value). Not valid on server side.
hdrMu protects header and trailer metadata on the server-side.
On client side, header keeps the received header metadata. On server side, header keeps the header set by SetHeader(). The complete header will merged into this after t.WriteHeader() is called.
	header  metadata.MD
	trailer metadata.MD // the key-value map of trailer metadata.

	noHeaders bool // set if the client never received headers (set only after the stream is done).
On the server-side, headerSent is atomically set to 1 when the headers are sent out.
On client-side it is the status error received from the server. On server-side it is unused.
	status *status.Status

	bytesReceived uint32 // indicates whether any bytes have been received on this stream
	unprocessed   uint32 // set if the server sends a refused stream or GOAWAY including this stream
contentSubtype is the content-subtype for requests. this must be lowercase or the behavior is undefined.
isHeaderSent is only valid on the server-side.
func ( *Stream) () bool {
	return atomic.LoadUint32(&.headerSent) == 1
}
updateHeaderSent updates headerSent and returns true if it was alreay set. It is valid only on server-side.
func ( *Stream) () bool {
	return atomic.SwapUint32(&.headerSent, 1) == 1
}

func ( *Stream) ( streamState) streamState {
	return streamState(atomic.SwapUint32((*uint32)(&.state), uint32()))
}

func ( *Stream) (,  streamState) bool {
	return atomic.CompareAndSwapUint32((*uint32)(&.state), uint32(), uint32())
}

func ( *Stream) () streamState {
	return streamState(atomic.LoadUint32((*uint32)(&.state)))
}

func ( *Stream) () {
On the server headerChan is always nil since a stream originates only after having received headers.
		return
	}
	select {
Close the stream to prevent headers/trailers from changing after this function returns.
headerChan could possibly not be closed yet if closeStream raced with operateHeaders; wait until it is closed explicitly here.
		<-.headerChan
	case <-.headerChan:
	}
}
RecvCompress returns the compression algorithm applied to the inbound message. It is empty string if there is no compression applied.
func ( *Stream) () string {
	.waitOnHeader()
	return .recvCompress
}
SetSendCompress sets the compression algorithm to the stream.
func ( *Stream) ( string) {
	.sendCompress = 
}
Done returns a channel which is closed when it receives the final status from the server.
func ( *Stream) () <-chan struct{} {
	return .done
}
Header returns the header metadata of the stream. On client side, it acquires the key-value pairs of header metadata once it is available. It blocks until i) the metadata is ready or ii) there is no header metadata or iii) the stream is canceled/expired. On server side, it returns the out header after t.WriteHeader is called. It does not block and must not be called until after WriteHeader.
func ( *Stream) () (metadata.MD, error) {
On server side, return the header in stream. It will be the out header after t.WriteHeader is called.
		return .header.Copy(), nil
	}
	.waitOnHeader()
	if !.headerValid {
		return nil, .status.Err()
	}
	return .header.Copy(), nil
}
TrailersOnly blocks until a header or trailers-only frame is received and then returns true if the stream was trailers-only. If the stream ends before headers are received, returns true, nil. Client-side only.
func ( *Stream) () bool {
	.waitOnHeader()
	return .noHeaders
}
Trailer returns the cached trailer metedata. Note that if it is not called after the entire stream is done, it could return an empty MD. Client side only. It can be safely read only after stream has ended that is either read or write have returned io.EOF.
func ( *Stream) () metadata.MD {
	 := .trailer.Copy()
	return 
}
ContentSubtype returns the content-subtype for a request. For example, a content-subtype of "proto" will result in a content-type of "application/grpc+proto". This will always be lowercase. See https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md#requests for more details.
func ( *Stream) () string {
	return .contentSubtype
}
Context returns the context of the stream.
func ( *Stream) () context.Context {
	return .ctx
}
Method returns the method for the stream.
func ( *Stream) () string {
	return .method
}
Status returns the status received from the server. Status can be read safely only after the stream has ended, that is, after Done() is closed.
func ( *Stream) () *status.Status {
	return .status
}
SetHeader sets the header metadata. This can be called multiple times. Server side only. This should not be called in parallel to other data writes.
func ( *Stream) ( metadata.MD) error {
	if .Len() == 0 {
		return nil
	}
	if .isHeaderSent() || .getState() == streamDone {
		return ErrIllegalHeaderWrite
	}
	.hdrMu.Lock()
	.header = metadata.Join(.header, )
	.hdrMu.Unlock()
	return nil
}
SendHeader sends the given header metadata. The given metadata is combined with any metadata set by previous calls to SetHeader and then written to the transport stream.
func ( *Stream) ( metadata.MD) error {
	return .st.WriteHeader(, )
}
SetTrailer sets the trailer metadata which will be sent with the RPC status by the server. This can be called multiple times. Server side only. This should not be called parallel to other data writes.
func ( *Stream) ( metadata.MD) error {
	if .Len() == 0 {
		return nil
	}
	if .getState() == streamDone {
		return ErrIllegalHeaderWrite
	}
	.hdrMu.Lock()
	.trailer = metadata.Join(.trailer, )
	.hdrMu.Unlock()
	return nil
}

func ( *Stream) ( recvMsg) {
	.buf.put()
}
Read reads all p bytes from the wire for this stream.
Don't request a read if there was an error earlier
	if  := .trReader.(*transportReader).er;  != nil {
		return 0, 
	}
	.requestRead(len())
	return io.ReadFull(.trReader, )
}
tranportReader reads all the data available for this Stream from the transport and passes them into the decoder, which converts them into a gRPC message stream. The error is io.EOF when the stream is done or another non-nil error if the stream broke.
type transportReader struct {
The handler to control the window update procedure for both this particular stream and the associated transport.
	windowHandler func(int)
	er            error
}

func ( *transportReader) ( []byte) ( int,  error) {
	,  = .reader.Read()
	if  != nil {
		.er = 
		return
	}
	.windowHandler()
	return
}
BytesReceived indicates whether any bytes have been received on this stream.
func ( *Stream) () bool {
	return atomic.LoadUint32(&.bytesReceived) == 1
}
Unprocessed indicates whether the server did not process this stream -- i.e. it sent a refused stream or GOAWAY including this stream ID.
func ( *Stream) () bool {
	return atomic.LoadUint32(&.unprocessed) == 1
}
GoString is implemented by Stream so context.String() won't race when printing %#v.
func ( *Stream) () string {
	return fmt.Sprintf("<stream: %p, %v>", , .method)
}
state of transport
NewServerTransport creates a ServerTransport with conn or non-nil error if it fails.
func ( string,  net.Conn,  *ServerConfig) (ServerTransport, error) {
	return newHTTP2Server(, )
}
ConnectOptions covers all relevant options for communicating with the server.
UserAgent is the application user agent.
Dialer specifies how to dial a network address.
FailOnNonTempDialError specifies if gRPC fails on non-temporary dial errors.
PerRPCCredentials stores the PerRPCCredentials required to issue RPCs.
TransportCredentials stores the Authenticator required to setup a client connection. Only one of TransportCredentials and CredsBundle is non-nil.
CredsBundle is the credentials bundle to be used. Only one of TransportCredentials and CredsBundle is non-nil.
KeepaliveParams stores the keepalive parameters.
StatsHandler stores the handler for stats.
InitialWindowSize sets the initial window size for a stream.
InitialConnWindowSize sets the initial window size for a connection.
WriteBufferSize sets the size of write buffer which in turn determines how much data can be batched before it's written on the wire.
ReadBufferSize sets the size of read buffer, which in turn determines how much data can be read at most for one read syscall.
ChannelzParentID sets the addrConn id which initiate the creation of this client transport.
MaxHeaderListSize sets the max (uncompressed) size of header list that is prepared to be received.
NewClientTransport establishes the transport with the required ConnectOptions and returns it to the caller.
func (,  context.Context,  resolver.Address,  ConnectOptions,  func(),  func(GoAwayReason),  func()) (ClientTransport, error) {
	return newHTTP2Client(, , , , , , )
}
Options provides additional hints and information for message transmission.
Last indicates whether this write is the last piece for this stream.
CallHdr carries the information of a particular RPC.
Host specifies the peer's host.
Method specifies the operation to perform.
SendCompress specifies the compression algorithm applied on outbound message.
Creds specifies credentials.PerRPCCredentials for a call.
ContentSubtype specifies the content-subtype for a request. For example, a content-subtype of "proto" will result in a content-type of "application/grpc+proto". The value of ContentSubtype must be all lowercase, otherwise the behavior is undefined. See https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md#requests for more details.
	ContentSubtype string

	PreviousAttempts int // value of grpc-previous-rpc-attempts header to set
}
ClientTransport is the common interface for all gRPC client-side transport implementations.
Close tears down this transport. Once it returns, the transport should not be accessed any more. The caller must make sure this is called only once.
	Close() error
GracefulClose starts to tear down the transport: the transport will stop accepting new RPCs and NewStream will return error. Once all streams are finished, the transport will close. It does not block.
	GracefulClose()
Write sends the data for the given stream. A nil stream indicates the write is to be performed on the transport as a whole.
	Write(s *Stream, hdr []byte, data []byte, opts *Options) error
NewStream creates a Stream for an RPC.
	NewStream(ctx context.Context, callHdr *CallHdr) (*Stream, error)
CloseStream clears the footprint of a stream when the stream is not needed any more. The err indicates the error incurred when CloseStream is called. Must be called when a stream is finished unless the associated transport is closing.
	CloseStream(stream *Stream, err error)
Error returns a channel that is closed when some I/O error happens. Typically the caller should have a goroutine to monitor this in order to take action (e.g., close the current transport and create a new one) in error case. It should not return nil once the transport is initiated.
	Error() <-chan struct{}
GoAway returns a channel that is closed when ClientTransport receives the draining signal from the server (e.g., GOAWAY frame in HTTP/2).
	GoAway() <-chan struct{}
GetGoAwayReason returns the reason why GoAway frame was received.
	GetGoAwayReason() GoAwayReason
RemoteAddr returns the remote network address.
	RemoteAddr() net.Addr
IncrMsgSent increments the number of message sent through this transport.
	IncrMsgSent()
IncrMsgRecv increments the number of message received through this transport.
	IncrMsgRecv()
}
ServerTransport is the common interface for all gRPC server-side transport implementations. Methods may be called concurrently from multiple goroutines, but Write methods for a given Stream will be called serially.
HandleStreams receives incoming streams using the given handler.
	HandleStreams(func(*Stream), func(context.Context, string) context.Context)
WriteHeader sends the header metadata for the given stream. WriteHeader may not be called on all streams.
	WriteHeader(s *Stream, md metadata.MD) error
Write sends the data for the given stream. Write may not be called on all streams.
	Write(s *Stream, hdr []byte, data []byte, opts *Options) error
WriteStatus sends the status of a stream to the client. WriteStatus is the final call made on a stream and always occurs.
	WriteStatus(s *Stream, st *status.Status) error
Close tears down the transport. Once it is called, the transport should not be accessed any more. All the pending streams and their handlers will be terminated asynchronously.
	Close() error
RemoteAddr returns the remote network address.
	RemoteAddr() net.Addr
Drain notifies the client this ServerTransport stops accepting new RPCs.
	Drain()
IncrMsgSent increments the number of message sent through this transport.
	IncrMsgSent()
IncrMsgRecv increments the number of message received through this transport.
	IncrMsgRecv()
}
connectionErrorf creates an ConnectionError with the specified error description.
func ( bool,  error,  string,  ...interface{}) ConnectionError {
	return ConnectionError{
		Desc: fmt.Sprintf(, ...),
		temp: ,
		err:  ,
	}
}
ConnectionError is an error that results in the termination of the entire connection and the retry of all the active streams.
type ConnectionError struct {
	Desc string
	temp bool
	err  error
}

func ( ConnectionError) () string {
	return fmt.Sprintf("connection error: desc = %q", .Desc)
}
Temporary indicates if this connection error is temporary or fatal.
func ( ConnectionError) () bool {
	return .temp
}
Origin returns the original error of this connection error.
Never return nil error here. If the original error is nil, return itself.
	if .err == nil {
		return 
	}
	return .err
}

ErrConnClosing indicates that the transport is closing.
errStreamDrain indicates that the stream is rejected because the connection is draining. This could be caused by goaway or balancer removing the address.
errStreamDone is returned from write at the client side to indiacte application layer of an error.
StatusGoAway indicates that the server sent a GOAWAY that included this stream's ID in unprocessed RPCs.
	statusGoAway = status.New(codes.Unavailable, "the stream is rejected because server is draining the connection")
)
GoAwayReason contains the reason for the GoAway frame received.
GoAwayInvalid indicates that no GoAway frame is received.
GoAwayNoReason is the default value when GoAway frame is received.
GoAwayTooManyPings indicates that a GoAway frame with ErrCodeEnhanceYourCalm was received and that the debug data said "too_many_pings".
channelzData is used to store channelz related data for http2Client and http2Server. These fields cannot be embedded in the original structs (e.g. http2Client), since to do atomic operation on int64 variable on 32-bit machine, user is responsible to enforce memory alignment. Here, by grouping those int64 fields inside a struct, we are enforcing the alignment.
type channelzData struct {
The number of streams that have started, including already finished ones.
Client side: The number of streams that have ended successfully by receiving EoS bit set frame from server. Server side: The number of streams that have ended successfully by sending frame with EoS bit set.
lastStreamCreatedTime stores the timestamp that the last stream gets created. It is of int64 type instead of time.Time since it's more costly to atomically update time.Time variable than int64 variable. The same goes for lastMsgSentTime and lastMsgRecvTime.
ContextErr converts the error from context package into a status error.
func ( error) error {
	switch  {
	case context.DeadlineExceeded:
		return status.Error(codes.DeadlineExceeded, .Error())
	case context.Canceled:
		return status.Error(codes.Canceled, .Error())
	}
	return status.Errorf(codes.Internal, "Unexpected error from context packet: %v", )