* * Copyright 2014 gRPC authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http:www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. *

package grpc

import (
	
	
	
	
	
	
	

	
	
	
	
	
	
	
	
	
	
	
	
	
	
)
StreamHandler defines the handler called by gRPC server to complete the execution of a streaming RPC. If a StreamHandler returns an error, it should be produced by the status package, or else gRPC will use codes.Unknown as the status code and err.Error() as the status message of the RPC.
type StreamHandler func(srv interface{}, stream ServerStream) error
StreamDesc represents a streaming RPC service's method specification.
At least one of these is true.
Stream defines the common interface a client or server stream has to satisfy. Deprecated: See ClientStream and ServerStream documentation instead.
Deprecated: See ClientStream and ServerStream documentation instead.
Deprecated: See ClientStream and ServerStream documentation instead.
Deprecated: See ClientStream and ServerStream documentation instead.
	RecvMsg(m interface{}) error
}
ClientStream defines the client-side behavior of a streaming RPC. All errors returned from ClientStream methods are compatible with the status package.
Header returns the header metadata received from the server if there is any. It blocks if the metadata is not ready to read.
Trailer returns the trailer metadata from the server, if there is any. It must only be called after stream.CloseAndRecv has returned, or stream.Recv has returned a non-nil error (including io.EOF).
CloseSend closes the send direction of the stream. It closes the stream when non-nil error is met. It is also not safe to call CloseSend concurrently with SendMsg.
Context returns the context for this stream. It should not be called until after Header or RecvMsg has returned. Once called, subsequent client-side retries are disabled.
SendMsg is generally called by generated code. On error, SendMsg aborts the stream. If the error was generated by the client, the status is returned directly; otherwise, io.EOF is returned and the status of the stream may be discovered using RecvMsg. SendMsg blocks until: - There is sufficient flow control to schedule m with the transport, or - The stream is done, or - The stream breaks. SendMsg does not wait until the message is received by the server. An untimely stream closure may result in lost messages. To ensure delivery, users should ensure the RPC completed successfully using RecvMsg. It is safe to have a goroutine calling SendMsg and another goroutine calling RecvMsg on the same stream at the same time, but it is not safe to call SendMsg on the same stream in different goroutines. It is also not safe to call CloseSend concurrently with SendMsg.
RecvMsg blocks until it receives a message into m or the stream is done. It returns io.EOF when the stream completes successfully. On any other error, the stream is aborted and the error contains the RPC status. It is safe to have a goroutine calling SendMsg and another goroutine calling RecvMsg on the same stream at the same time, but it is not safe to call RecvMsg on the same stream in different goroutines.
	RecvMsg(m interface{}) error
}
NewStream creates a new Stream for the client side. This is typically called by generated code. ctx is used for the lifetime of the stream. To ensure resources are not leaked due to the stream returned, one of the following actions must be performed: 1. Call Close on the ClientConn. 2. Cancel the context provided. 3. Call RecvMsg until a non-nil error is returned. A protobuf-generated client-streaming RPC, for instance, might use the helper function CloseAndRecv (note that CloseSend does not Recv, therefore is not guaranteed to release all resources). 4. Receive a non-nil, non-io.EOF error from Header or SendMsg. If none of the above happen, a goroutine and a context will be leaked, and grpc will not call the optionally-configured stats handler with a stats.End message.
allow interceptor to see all applicable call options, which means those configured as defaults from dial option as well as per-call options
	 = combine(.dopts.callOptions, )

	if .dopts.streamInt != nil {
		return .dopts.streamInt(, , , , newClientStream, ...)
	}
	return newClientStream(, , , , ...)
}
NewClientStream is a wrapper for ClientConn.NewStream.
func ( context.Context,  *StreamDesc,  *ClientConn,  string,  ...CallOption) (ClientStream, error) {
	return .NewStream(, , , ...)
}

func ( context.Context,  *StreamDesc,  *ClientConn,  string,  ...CallOption) ( ClientStream,  error) {
	if channelz.IsOn() {
		.incrCallsStarted()
		defer func() {
			if  != nil {
				.incrCallsFailed()
			}
		}()
	}
Provide an opportunity for the first RPC to see the first service config provided by the resolver.
	if  := .waitForResolvedAddrs();  != nil {
		return nil, 
	}
	 := .GetMethodConfig()
	if .WaitForReady != nil {
		.failFast = !*.WaitForReady
	}
Possible context leak: The cancel function for the child context we create will only be called when RecvMsg returns a non-nil error, if the ClientConn is closed, or if an error is generated by SendMsg. https://github.com/grpc/grpc-go/issues/1818.
	var  context.CancelFunc
	if .Timeout != nil && *.Timeout >= 0 {
		,  = context.WithTimeout(, *.Timeout)
	} else {
		,  = context.WithCancel()
	}
	defer func() {
		if  != nil {
			()
		}
	}()

	for ,  := range  {
		if  := .before();  != nil {
			return nil, toRPCErr()
		}
	}
	.maxSendMessageSize = getMaxSize(.MaxReqSize, .maxSendMessageSize, defaultClientMaxSendMessageSize)
	.maxReceiveMessageSize = getMaxSize(.MaxRespSize, .maxReceiveMessageSize, defaultClientMaxReceiveMessageSize)
	if  := setCallInfoCodec();  != nil {
		return nil, 
	}

	 := &transport.CallHdr{
		Host:           .authority,
		Method:         ,
		ContentSubtype: .contentSubtype,
	}
Set our outgoing compression according to the UseCompressor CallOption, if set. In that case, also find the compressor from the encoding package. Otherwise, use the compressor configured by the WithCompressor DialOption, if set.
	var  Compressor
	var  encoding.Compressor
	if  := .compressorType;  != "" {
		.SendCompress = 
		if  != encoding.Identity {
			 = encoding.GetCompressor()
			if  == nil {
				return nil, status.Errorf(codes.Internal, "grpc: Compressor is not installed for requested grpc-encoding %q", )
			}
		}
	} else if .dopts.cp != nil {
		.SendCompress = .dopts.cp.Type()
		 = .dopts.cp
	}
	if .creds != nil {
		.Creds = .creds
	}
	var  *traceInfo
	if EnableTracing {
		 = &traceInfo{
			tr: trace.New("grpc.Sent."+methodFamily(), ),
			firstLine: firstLine{
				client: true,
			},
		}
		if ,  := .Deadline();  {
			.firstLine.deadline = time.Until()
		}
		.tr.LazyLog(&.firstLine, false)
		 = trace.NewContext(, .tr)
	}
	 = newContextWithRPCInfo(, .failFast, .codec, , )
	 := .dopts.copts.StatsHandler
	var  time.Time
	if  != nil {
		 = .TagRPC(, &stats.RPCTagInfo{FullMethodName: , FailFast: .failFast})
		 = time.Now()
		 := &stats.Begin{
			Client:    true,
			BeginTime: ,
			FailFast:  .failFast,
		}
		.HandleRPC(, )
	}

	 := &clientStream{
		callHdr:      ,
		ctx:          ,
		methodConfig: &,
		opts:         ,
		callInfo:     ,
		cc:           ,
		desc:         ,
		codec:        .codec,
		cp:           ,
		comp:         ,
		cancel:       ,
		beginTime:    ,
		firstAttempt: true,
	}
	if !.dopts.disableRetry {
		.retryThrottler = .retryThrottler.Load().(*retryThrottler)
	}
	.binlog = binarylog.GetMethodLogger()
Only this initial attempt has stats/tracing. TODO(dfawley): move to newAttempt when per-attempt stats are implemented.
	if  := .newAttemptLocked(, );  != nil {
		.finish()
		return nil, 
	}

	 := func( *csAttempt) error { return .newStream() }
	if  := .withRetry(, func() { .bufferForRetryLocked(0, ) });  != nil {
		.finish()
		return nil, 
	}

	if .binlog != nil {
		,  := metadata.FromOutgoingContext()
		 := &binarylog.ClientHeader{
			OnClientSide: true,
			Header:       ,
			MethodName:   ,
			Authority:    .cc.authority,
		}
		if ,  := .Deadline();  {
			.Timeout = time.Until()
			if .Timeout < 0 {
				.Timeout = 0
			}
		}
		.binlog.Log()
	}

Listen on cc and stream contexts to cleanup when the user closes the ClientConn or cancels the stream context. In all other cases, an error should already be injected into the recv buffer by the transport, which the client will eventually receive, and then we will cancel the stream's context in clientStream.finish.
		go func() {
			select {
			case <-.ctx.Done():
				.finish(ErrClientConnClosing)
			case <-.Done():
				.finish(toRPCErr(.Err()))
			}
		}()
	}
	return , nil
}
newAttemptLocked creates a new attempt with a transport. If it succeeds, then it replaces clientStream's attempt with this new attempt.
func ( *clientStream) ( stats.Handler,  *traceInfo) ( error) {
	 := &csAttempt{
		cs:           ,
		dc:           .cc.dopts.dc,
		statsHandler: ,
		trInfo:       ,
	}
	defer func() {
This attempt is not set in the clientStream, so it's finish won't be called. Call it here for stats and trace in case they are not nil.
			.finish()
		}
	}()

	if  := .ctx.Err();  != nil {
		return toRPCErr()
	}

	 := .ctx
Add extra metadata (metadata that will be added by transport) to context so the balancer can see them.
		 = grpcutil.WithExtraMetadata(.ctx, metadata.Pairs(
			"content-type", grpcutil.ContentType(.callHdr.ContentSubtype),
		))
	}
	, ,  := .cc.getTransport(, .callInfo.failFast, .callHdr.Method)
	if  != nil {
		return 
	}
	if  != nil {
		.firstLine.SetRemoteAddr(.RemoteAddr())
	}
	.t = 
	.done = 
	.attempt = 
	return nil
}

func ( *csAttempt) () error {
	 := .cs
	.callHdr.PreviousAttempts = .numRetries
	,  := .t.NewStream(.ctx, .callHdr)
	if  != nil {
Return without converting to an RPC error so retry code can inspect.
			return 
		}
		return toRPCErr()
	}
	.attempt.s = 
	.attempt.p = &parser{r: }
	return nil
}
clientStream implements a client side Stream.
type clientStream struct {
	callHdr  *transport.CallHdr
	opts     []CallOption
	callInfo *callInfo
	cc       *ClientConn
	desc     *StreamDesc

	codec baseCodec
	cp    Compressor
	comp  encoding.Compressor

	cancel context.CancelFunc // cancels all attempts

	sentLast  bool // sent an end stream
	beginTime time.Time

	methodConfig *MethodConfig

	ctx context.Context // the application's context, wrapped by stats/tracing

	retryThrottler *retryThrottler // The throttler active when the RPC began.

serverHeaderBinlogged is a boolean for whether server header has been logged. Server header will be logged when the first time one of those happens: stream.Header(), stream.Recv(). It's only read and used by Recv() and Header(), so it doesn't need to be synchronized.
	serverHeaderBinlogged bool

	mu                      sync.Mutex
	firstAttempt            bool // if true, transparent retry is valid
	numRetries              int  // exclusive of transparent retry attempt(s)
	numRetriesSincePushback int  // retries since pushback; to reset backoff
attempt is the active client stream attempt. The only place where it is written is the newAttemptLocked method and this method never writes nil. So, attempt can be nil only inside newClientStream function when clientStream is first created. One of the first things done after clientStream's creation, is to call newAttemptLocked which either assigns a non nil value to the attempt or returns an error. If an error is returned from newAttemptLocked, then newClientStream calls finish on the clientStream and returns. So, finish method is the only place where we need to check if the attempt is nil.
TODO(hedging): hedging will have multiple attempts simultaneously.
	committed  bool                       // active attempt committed for retry?
	buffer     []func(a *csAttempt) error // operations to replay on retry
	bufferSize int                        // current size of buffer
}
csAttempt implements a single transport stream attempt within a clientStream.
trInfo may be nil (if EnableTracing is false). trInfo.tr is set when created (if EnableTracing is true), and cleared when the finish method is called.
shouldRetry returns nil if the RPC should be retried; otherwise it returns the error that should be returned by the operation.
func ( *clientStream) ( error) error {
	 := false
	if .attempt.s == nil {
		,  := .(transport.PerformedIOError)
Unwrap error.
			 = toRPCErr(.Err)
		} else {
			 = true
		}
In the event of a non-IO operation error from NewStream, we never attempted to write anything to the wire, so we can retry indefinitely for non-fail-fast RPCs.
			return nil
		}
	}
RPC is finished or committed; cannot retry.
		return 
Wait for the trailers.
	if .attempt.s != nil {
		<-.attempt.s.Done()
		 = .attempt.s.Unprocessed()
	}
First attempt, stream unprocessed: transparently retry.
		return nil
	}
	if .cc.dopts.disableRetry {
		return 
	}

	 := 0
	 := false
	if .attempt.s != nil {
		if !.attempt.s.TrailersOnly() {
			return 
		}
TODO(retry): Move down if the spec changes to not check server pushback before considering this a failure for throttling.
		 := .attempt.s.Trailer()["grpc-retry-pushback-ms"]
		if len() == 1 {
			var  error
			if ,  = strconv.Atoi([0]);  != nil ||  < 0 {
				channelz.Infof(logger, .cc.channelzID, "Server retry pushback specified to abort (%q).", [0])
				.retryThrottler.throttle() // This counts as a failure for throttling.
				return 
			}
			 = true
		} else if len() > 1 {
			channelz.Warningf(logger, .cc.channelzID, "Server retry pushback specified multiple values (%q); not retrying.", )
			.retryThrottler.throttle() // This counts as a failure for throttling.
			return 
		}
	}

	var  codes.Code
	if .attempt.s != nil {
		 = .attempt.s.Status().Code()
	} else {
		 = status.Convert().Code()
	}

	 := .methodConfig.retryPolicy
	if  == nil || !.retryableStatusCodes[] {
		return 
	}
Note: the ordering here is important; we count this as a failure only if the code matched a retryable code.
	if .retryThrottler.throttle() {
		return 
	}
	if .numRetries+1 >= .maxAttempts {
		return 
	}

	var  time.Duration
	if  {
		 = time.Millisecond * time.Duration()
		.numRetriesSincePushback = 0
	} else {
		 := math.Pow(.backoffMultiplier, float64(.numRetriesSincePushback))
		 := float64(.initialBackoff) * 
		if  := float64(.maxBackoff);  >  {
			 = 
		}
		 = time.Duration(grpcrand.Int63n(int64()))
		.numRetriesSincePushback++
	}
TODO(dfawley): we could eagerly fail here if dur puts us past the deadline, but unsure if it is worth doing.
	 := time.NewTimer()
	select {
	case <-.C:
		.numRetries++
		return nil
	case <-.ctx.Done():
		.Stop()
		return status.FromContextError(.ctx.Err()).Err()
	}
}
Returns nil if a retry was performed and succeeded; error otherwise.
func ( *clientStream) ( error) error {
	for {
		.attempt.finish()
		if  := .shouldRetry();  != nil {
			.commitAttemptLocked()
			return 
		}
		.firstAttempt = false
		if  := .newAttemptLocked(nil, nil);  != nil {
			return 
		}
		if  = .replayBufferLocked();  == nil {
			return nil
		}
	}
}

func ( *clientStream) () context.Context {
No need to lock before using attempt, since we know it is committed and cannot change.
	return .attempt.s.Context()
}

func ( *clientStream) ( func( *csAttempt) error,  func()) error {
	.mu.Lock()
	for {
		if .committed {
			.mu.Unlock()
			return (.attempt)
		}
		 := .attempt
		.mu.Unlock()
		 := ()
		.mu.Lock()
We started another attempt already.
			continue
		}
		if  == io.EOF {
			<-.s.Done()
		}
		if  == nil || ( == io.EOF && .s.Status().Code() == codes.OK) {
			()
			.mu.Unlock()
			return 
		}
		if  := .retryLocked();  != nil {
			.mu.Unlock()
			return 
		}
	}
}

func ( *clientStream) () (metadata.MD, error) {
	var  metadata.MD
	 := .withRetry(func( *csAttempt) error {
		var  error
		,  = .s.Header()
		return toRPCErr()
	}, .commitAttemptLocked)
	if  != nil {
		.finish()
		return nil, 
	}
Only log if binary log is on and header has not been logged.
		 := &binarylog.ServerHeader{
			OnClientSide: true,
			Header:       ,
			PeerAddr:     nil,
		}
		if ,  := peer.FromContext(.Context());  {
			.PeerAddr = .Addr
		}
		.binlog.Log()
		.serverHeaderBinlogged = true
	}
	return , 
}

On RPC failure, we never need to retry, because usage requires that RecvMsg() returned a non-nil error before calling this function is valid. We would have retried earlier if necessary. Commit the attempt anyway, just in case users are not following those directions -- it will prevent races and should not meaningfully impact performance.
	.commitAttempt()
	if .attempt.s == nil {
		return nil
	}
	return .attempt.s.Trailer()
}

func ( *clientStream) () error {
	 := .attempt
	for ,  := range .buffer {
		if  := ();  != nil {
			return 
		}
	}
	return nil
}

Note: we still will buffer if retry is disabled (for transparent retries).
	if .committed {
		return
	}
	.bufferSize += 
	if .bufferSize > .callInfo.maxRetryRPCBufferSize {
		.commitAttemptLocked()
		return
	}
	.buffer = append(.buffer, )
}

func ( *clientStream) ( interface{}) ( error) {
	defer func() {
Call finish on the client stream for errors generated by this SendMsg call, as these indicate problems created by this client. (Transport errors are converted to an io.EOF error in csAttempt.sendMsg; the real error will be returned from RecvMsg eventually in that case, or be retried.)
			.finish()
		}
	}()
	if .sentLast {
		return status.Errorf(codes.Internal, "SendMsg called after CloseSend")
	}
	if !.desc.ClientStreams {
		.sentLast = true
	}
load hdr, payload, data
	, , ,  := prepareMsg(, .codec, .cp, .comp)
	if  != nil {
		return 
	}
TODO(dfawley): should we be checking len(data) instead?
	if len() > *.callInfo.maxSendMessageSize {
		return status.Errorf(codes.ResourceExhausted, "trying to send message larger than max (%d vs. %d)", len(), *.callInfo.maxSendMessageSize)
	}
	 :=  // Store the pointer before setting to nil. For binary logging.
	 := func( *csAttempt) error {
nil out the message and uncomp when replaying; they are only needed for stats which is disabled for subsequent attempts.
		,  = nil, nil
		return 
	}
	 = .withRetry(, func() { .bufferForRetryLocked(len()+len(), ) })
	if .binlog != nil &&  == nil {
		.binlog.Log(&binarylog.ClientMessage{
			OnClientSide: true,
			Message:      ,
		})
	}
	return
}

func ( *clientStream) ( interface{}) error {
Call Header() to binary log header if it's not already logged.
		.Header()
	}
	var  *payloadInfo
	if .binlog != nil {
		 = &payloadInfo{}
	}
	 := .withRetry(func( *csAttempt) error {
		return .recvMsg(, )
	}, .commitAttemptLocked)
	if .binlog != nil &&  == nil {
		.binlog.Log(&binarylog.ServerMessage{
			OnClientSide: true,
			Message:      .uncompressedBytes,
		})
	}
err != nil or non-server-streaming indicates end of stream.
		.finish()

finish will not log Trailer. Log Trailer here.
			 := &binarylog.ServerTrailer{
				OnClientSide: true,
				Trailer:      .Trailer(),
				Err:          ,
			}
			if .Err == io.EOF {
				.Err = nil
			}
			if ,  := peer.FromContext(.Context());  {
				.PeerAddr = .Addr
			}
			.binlog.Log()
		}
	}
	return 
}

func ( *clientStream) () error {
TODO: return an error and finish the stream instead, due to API misuse?
		return nil
	}
	.sentLast = true
	 := func( *csAttempt) error {
Always return nil; io.EOF is the only error that might make sense instead, but there is no need to signal the client to call RecvMsg as the only use left for the stream after CloseSend is to call RecvMsg. This also matches historical behavior.
		return nil
	}
	.withRetry(, func() { .bufferForRetryLocked(0, ) })
	if .binlog != nil {
		.binlog.Log(&binarylog.ClientHalfClose{
			OnClientSide: true,
		})
We never returned an error here for reasons.
	return nil
}

func ( *clientStream) ( error) {
Ending a stream with EOF indicates a success.
		 = nil
	}
	.mu.Lock()
	if .finished {
		.mu.Unlock()
		return
	}
	.finished = true
	.commitAttemptLocked()
	if .attempt != nil {
after functions all rely upon having a stream.
		if .attempt.s != nil {
			for ,  := range .opts {
				.after(.callInfo, .attempt)
			}
		}
	}
For binary logging. only log cancel in finish (could be caused by RPC ctx canceled or ClientConn closed). Trailer will be logged in RecvMsg. Only one of cancel or trailer needs to be logged. In the cases where users don't call RecvMsg, users must have already canceled the RPC.
	if .binlog != nil && status.Code() == codes.Canceled {
		.binlog.Log(&binarylog.Cancel{
			OnClientSide: true,
		})
	}
	if  == nil {
		.retryThrottler.successfulRPC()
	}
	if channelz.IsOn() {
		if  != nil {
			.cc.incrCallsFailed()
		} else {
			.cc.incrCallsSucceeded()
		}
	}
	.cancel()
}

func ( *csAttempt) ( interface{}, , ,  []byte) error {
	 := .cs
	if .trInfo != nil {
		.mu.Lock()
		if .trInfo.tr != nil {
			.trInfo.tr.LazyLog(&payload{sent: true, msg: }, true)
		}
		.mu.Unlock()
	}
	if  := .t.Write(.s, , , &transport.Options{Last: !.desc.ClientStreams});  != nil {
For non-client-streaming RPCs, we return nil instead of EOF on error because the generated code requires it. finish is not called; RecvMsg() will call it with the stream's status independently.
			return nil
		}
		return io.EOF
	}
	if .statsHandler != nil {
		.statsHandler.HandleRPC(.ctx, outPayload(true, , , , time.Now()))
	}
	if channelz.IsOn() {
		.t.IncrMsgSent()
	}
	return nil
}

func ( *csAttempt) ( interface{},  *payloadInfo) ( error) {
	 := .cs
	if .statsHandler != nil &&  == nil {
		 = &payloadInfo{}
	}

Block until we receive headers containing received message encoding.
		if  := .s.RecvCompress();  != "" &&  != encoding.Identity {
No configured decompressor, or it does not match the incoming message encoding; attempt to find a registered compressor that does.
No compression is used; disable our decompressor.
			.dc = nil
Only initialize this state once per stream.
		.decompSet = true
	}
	 = recv(.p, .codec, .s, .dc, , *.callInfo.maxReceiveMessageSize, , .decomp)
	if  != nil {
		if  == io.EOF {
			if  := .s.Status().Err();  != nil {
				return 
			}
			return io.EOF // indicates successful end of stream.
		}
		return toRPCErr()
	}
	if .trInfo != nil {
		.mu.Lock()
		if .trInfo.tr != nil {
			.trInfo.tr.LazyLog(&payload{sent: false, msg: }, true)
		}
		.mu.Unlock()
	}
	if .statsHandler != nil {
		.statsHandler.HandleRPC(.ctx, &stats.InPayload{
			Client:   true,
			RecvTime: time.Now(),
TODO truncate large payload.
			Data:       .uncompressedBytes,
			WireLength: .wireLength,
			Length:     len(.uncompressedBytes),
		})
	}
	if channelz.IsOn() {
		.t.IncrMsgRecv()
	}
Subsequent messages should be received by subsequent RecvMsg calls.
		return nil
Special handling for non-server-stream rpcs. This recv expects EOF or errors, so we don't collect inPayload.
	 = recv(.p, .codec, .s, .dc, , *.callInfo.maxReceiveMessageSize, nil, .decomp)
	if  == nil {
		return toRPCErr(errors.New("grpc: client streaming protocol violation: get <nil>, want <EOF>"))
	}
	if  == io.EOF {
		return .s.Status().Err() // non-server streaming Recv returns nil on success
	}
	return toRPCErr()
}

func ( *csAttempt) ( error) {
	.mu.Lock()
	if .finished {
		.mu.Unlock()
		return
	}
	.finished = true
Ending a stream with EOF indicates a success.
		 = nil
	}
	var  metadata.MD
	if .s != nil {
		.t.CloseStream(.s, )
		 = .s.Trailer()
	}

	if .done != nil {
		 := false
		if .s != nil {
			 = .s.BytesReceived()
		}
		.done(balancer.DoneInfo{
			Err:           ,
			Trailer:       ,
			BytesSent:     .s != nil,
			BytesReceived: ,
			ServerLoad:    balancerload.Parse(),
		})
	}
	if .statsHandler != nil {
		 := &stats.End{
			Client:    true,
			BeginTime: .cs.beginTime,
			EndTime:   time.Now(),
			Trailer:   ,
			Error:     ,
		}
		.statsHandler.HandleRPC(.cs.ctx, )
	}
	if .trInfo != nil && .trInfo.tr != nil {
		if  == nil {
			.trInfo.tr.LazyPrintf("RPC: [OK]")
		} else {
			.trInfo.tr.LazyPrintf("RPC: [%v]", )
			.trInfo.tr.SetError()
		}
		.trInfo.tr.Finish()
		.trInfo.tr = nil
	}
	.mu.Unlock()
}
newClientStream creates a ClientStream with the specified transport, on the given addrConn. It's expected that the given transport is either the same one in addrConn, or is already closed. To avoid race, transport is specified separately, instead of using ac.transpot. Main difference between this and ClientConn.NewStream: - no retry - no service config (or wait for service config) - no tracing or stats
TODO: return RPC error here?
		return nil, errors.New("transport provided is nil")
defaultCallInfo contains unnecessary info(i.e. failfast, maxRetryRPCBufferSize), so we just initialize an empty struct.
	 := &callInfo{}
Possible context leak: The cancel function for the child context we create will only be called when RecvMsg returns a non-nil error, if the ClientConn is closed, or if an error is generated by SendMsg. https://github.com/grpc/grpc-go/issues/1818.
	,  := context.WithCancel()
	defer func() {
		if  != nil {
			()
		}
	}()

	for ,  := range  {
		if  := .before();  != nil {
			return nil, toRPCErr()
		}
	}
	.maxReceiveMessageSize = getMaxSize(nil, .maxReceiveMessageSize, defaultClientMaxReceiveMessageSize)
	.maxSendMessageSize = getMaxSize(nil, .maxSendMessageSize, defaultServerMaxSendMessageSize)
	if  := setCallInfoCodec();  != nil {
		return nil, 
	}

	 := &transport.CallHdr{
		Host:           .cc.authority,
		Method:         ,
		ContentSubtype: .contentSubtype,
	}
Set our outgoing compression according to the UseCompressor CallOption, if set. In that case, also find the compressor from the encoding package. Otherwise, use the compressor configured by the WithCompressor DialOption, if set.
	var  Compressor
	var  encoding.Compressor
	if  := .compressorType;  != "" {
		.SendCompress = 
		if  != encoding.Identity {
			 = encoding.GetCompressor()
			if  == nil {
				return nil, status.Errorf(codes.Internal, "grpc: Compressor is not installed for requested grpc-encoding %q", )
			}
		}
	} else if .cc.dopts.cp != nil {
		.SendCompress = .cc.dopts.cp.Type()
		 = .cc.dopts.cp
	}
	if .creds != nil {
		.Creds = .creds
	}
Use a special addrConnStream to avoid retry.
	 := &addrConnStream{
		callHdr:  ,
		ac:       ,
		ctx:      ,
		cancel:   ,
		opts:     ,
		callInfo: ,
		desc:     ,
		codec:    .codec,
		cp:       ,
		comp:     ,
		t:        ,
	}

	,  := .t.NewStream(.ctx, .callHdr)
	if  != nil {
		 = toRPCErr()
		return nil, 
	}
	.s = 
	.p = &parser{r: }
	.incrCallsStarted()
Listen on cc and stream contexts to cleanup when the user closes the ClientConn or cancels the stream context. In all other cases, an error should already be injected into the recv buffer by the transport, which the client will eventually receive, and then we will cancel the stream's context in clientStream.finish.
TODO: return an error and finish the stream instead, due to API misuse?
		return nil
	}
	.sentLast = true

Always return nil; io.EOF is the only error that might make sense instead, but there is no need to signal the client to call RecvMsg as the only use left for the stream after CloseSend is to call RecvMsg. This also matches historical behavior.
	return nil
}

func ( *addrConnStream) () context.Context {
	return .s.Context()
}

func ( *addrConnStream) ( interface{}) ( error) {
	defer func() {
Call finish on the client stream for errors generated by this SendMsg call, as these indicate problems created by this client. (Transport errors are converted to an io.EOF error in csAttempt.sendMsg; the real error will be returned from RecvMsg eventually in that case, or be retried.)
			.finish()
		}
	}()
	if .sentLast {
		return status.Errorf(codes.Internal, "SendMsg called after CloseSend")
	}
	if !.desc.ClientStreams {
		.sentLast = true
	}
load hdr, payload, data
	, , ,  := prepareMsg(, .codec, .cp, .comp)
	if  != nil {
		return 
	}
TODO(dfawley): should we be checking len(data) instead?
	if len() > *.callInfo.maxSendMessageSize {
		return status.Errorf(codes.ResourceExhausted, "trying to send message larger than max (%d vs. %d)", len(), *.callInfo.maxSendMessageSize)
	}

	if  := .t.Write(.s, , , &transport.Options{Last: !.desc.ClientStreams});  != nil {
For non-client-streaming RPCs, we return nil instead of EOF on error because the generated code requires it. finish is not called; RecvMsg() will call it with the stream's status independently.
			return nil
		}
		return io.EOF
	}

	if channelz.IsOn() {
		.t.IncrMsgSent()
	}
	return nil
}

func ( *addrConnStream) ( interface{}) ( error) {
	defer func() {
err != nil or non-server-streaming indicates end of stream.
			.finish()
		}
	}()

Block until we receive headers containing received message encoding.
		if  := .s.RecvCompress();  != "" &&  != encoding.Identity {
No configured decompressor, or it does not match the incoming message encoding; attempt to find a registered compressor that does.
				.dc = nil
				.decomp = encoding.GetCompressor()
			}
No compression is used; disable our decompressor.
			.dc = nil
Only initialize this state once per stream.
		.decompSet = true
	}
	 = recv(.p, .codec, .s, .dc, , *.callInfo.maxReceiveMessageSize, nil, .decomp)
	if  != nil {
		if  == io.EOF {
			if  := .s.Status().Err();  != nil {
				return 
			}
			return io.EOF // indicates successful end of stream.
		}
		return toRPCErr()
	}

	if channelz.IsOn() {
		.t.IncrMsgRecv()
	}
Subsequent messages should be received by subsequent RecvMsg calls.
		return nil
	}
Special handling for non-server-stream rpcs. This recv expects EOF or errors, so we don't collect inPayload.
	 = recv(.p, .codec, .s, .dc, , *.callInfo.maxReceiveMessageSize, nil, .decomp)
	if  == nil {
		return toRPCErr(errors.New("grpc: client streaming protocol violation: get <nil>, want <EOF>"))
	}
	if  == io.EOF {
		return .s.Status().Err() // non-server streaming Recv returns nil on success
	}
	return toRPCErr()
}

func ( *addrConnStream) ( error) {
	.mu.Lock()
	if .finished {
		.mu.Unlock()
		return
	}
	.finished = true
Ending a stream with EOF indicates a success.
		 = nil
	}
	if .s != nil {
		.t.CloseStream(.s, )
	}

	if  != nil {
		.ac.incrCallsFailed()
	} else {
		.ac.incrCallsSucceeded()
	}
	.cancel()
	.mu.Unlock()
}
ServerStream defines the server-side behavior of a streaming RPC. All errors returned from ServerStream methods are compatible with the status package.
SetHeader sets the header metadata. It may be called multiple times. When call multiple times, all the provided metadata will be merged. All the metadata will be sent out when one of the following happens: - ServerStream.SendHeader() is called; - The first response is sent out; - An RPC status is sent out (error or success).
SendHeader sends the header metadata. The provided md and headers set by SetHeader() will be sent. It fails if called multiple times.
SetTrailer sets the trailer metadata which will be sent with the RPC status. When called more than once, all the provided metadata will be merged.
Context returns the context for this stream.
SendMsg sends a message. On error, SendMsg aborts the stream and the error is returned directly. SendMsg blocks until: - There is sufficient flow control to schedule m with the transport, or - The stream is done, or - The stream breaks. SendMsg does not wait until the message is received by the client. An untimely stream closure may result in lost messages. It is safe to have a goroutine calling SendMsg and another goroutine calling RecvMsg on the same stream at the same time, but it is not safe to call SendMsg on the same stream in different goroutines.
RecvMsg blocks until it receives a message into m or the stream is done. It returns io.EOF when the client has performed a CloseSend. On any non-EOF error, the stream is aborted and the error contains the RPC status. It is safe to have a goroutine calling SendMsg and another goroutine calling RecvMsg on the same stream at the same time, but it is not safe to call RecvMsg on the same stream in different goroutines.
	RecvMsg(m interface{}) error
}
serverHeaderBinlogged indicates whether server header has been logged. It will happen when one of the following two happens: stream.SendHeader(), stream.Send(). It's only checked in send and sendHeader, doesn't need to be synchronized.
	serverHeaderBinlogged bool

	mu sync.Mutex // protects trInfo.tr after the service handler runs.
}

func ( *serverStream) () context.Context {
	return .ctx
}

func ( *serverStream) ( metadata.MD) error {
	if .Len() == 0 {
		return nil
	}
	return .s.SetHeader()
}

func ( *serverStream) ( metadata.MD) error {
	 := .t.WriteHeader(.s, )
	if .binlog != nil && !.serverHeaderBinlogged {
		,  := .s.Header()
		.binlog.Log(&binarylog.ServerHeader{
			Header: ,
		})
		.serverHeaderBinlogged = true
	}
	return 
}

func ( *serverStream) ( metadata.MD) {
	if .Len() == 0 {
		return
	}
	.s.SetTrailer()
}

func ( *serverStream) ( interface{}) ( error) {
	defer func() {
		if .trInfo != nil {
			.mu.Lock()
			if .trInfo.tr != nil {
				if  == nil {
					.trInfo.tr.LazyLog(&payload{sent: true, msg: }, true)
				} else {
					.trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{}}, true)
					.trInfo.tr.SetError()
				}
			}
			.mu.Unlock()
		}
		if  != nil &&  != io.EOF {
			,  := status.FromError(toRPCErr())
Non-user specified status was sent out. This should be an error case (as a server side Cancel maybe). This is not handled specifically now. User will return a final status from the service handler, we will log that error instead. This behavior is similar to an interceptor.
		}
		if channelz.IsOn() &&  == nil {
			.t.IncrMsgSent()
		}
	}()
load hdr, payload, data
	, , ,  := prepareMsg(, .codec, .cp, .comp)
	if  != nil {
		return 
	}
TODO(dfawley): should we be checking len(data) instead?
	if len() > .maxSendMessageSize {
		return status.Errorf(codes.ResourceExhausted, "trying to send message larger than max (%d vs. %d)", len(), .maxSendMessageSize)
	}
	if  := .t.Write(.s, , , &transport.Options{Last: false});  != nil {
		return toRPCErr()
	}
	if .binlog != nil {
		if !.serverHeaderBinlogged {
			,  := .s.Header()
			.binlog.Log(&binarylog.ServerHeader{
				Header: ,
			})
			.serverHeaderBinlogged = true
		}
		.binlog.Log(&binarylog.ServerMessage{
			Message: ,
		})
	}
	if .statsHandler != nil {
		.statsHandler.HandleRPC(.s.Context(), outPayload(false, , , , time.Now()))
	}
	return nil
}

func ( *serverStream) ( interface{}) ( error) {
	defer func() {
		if .trInfo != nil {
			.mu.Lock()
			if .trInfo.tr != nil {
				if  == nil {
					.trInfo.tr.LazyLog(&payload{sent: false, msg: }, true)
				} else if  != io.EOF {
					.trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{}}, true)
					.trInfo.tr.SetError()
				}
			}
			.mu.Unlock()
		}
		if  != nil &&  != io.EOF {
			,  := status.FromError(toRPCErr())
Non-user specified status was sent out. This should be an error case (as a server side Cancel maybe). This is not handled specifically now. User will return a final status from the service handler, we will log that error instead. This behavior is similar to an interceptor.
		}
		if channelz.IsOn() &&  == nil {
			.t.IncrMsgRecv()
		}
	}()
	var  *payloadInfo
	if .statsHandler != nil || .binlog != nil {
		 = &payloadInfo{}
	}
	if  := recv(.p, .codec, .s, .dc, , .maxReceiveMessageSize, , .decomp);  != nil {
		if  == io.EOF {
			if .binlog != nil {
				.binlog.Log(&binarylog.ClientHalfClose{})
			}
			return 
		}
		if  == io.ErrUnexpectedEOF {
			 = status.Errorf(codes.Internal, io.ErrUnexpectedEOF.Error())
		}
		return toRPCErr()
	}
	if .statsHandler != nil {
		.statsHandler.HandleRPC(.s.Context(), &stats.InPayload{
			RecvTime: time.Now(),
TODO truncate large payload.
			Data:       .uncompressedBytes,
			WireLength: .wireLength,
			Length:     len(.uncompressedBytes),
		})
	}
	if .binlog != nil {
		.binlog.Log(&binarylog.ClientMessage{
			Message: .uncompressedBytes,
		})
	}
	return nil
}
MethodFromServerStream returns the method string for the input stream. The returned string is in the format of "/service/method".
func ( ServerStream) (string, bool) {
	return Method(.Context())
}
prepareMsg returns the hdr, payload and data using the compressors passed or using the passed preparedmsg
func ( interface{},  baseCodec,  Compressor,  encoding.Compressor) (, ,  []byte,  error) {
	if ,  := .(*PreparedMsg);  {
		return .hdr, .payload, .encodedData, nil
The input interface is not a prepared msg. Marshal and Compress the data at this point
	,  = encode(, )
	if  != nil {
		return nil, nil, nil, 
	}
	,  := compress(, , )
	if  != nil {
		return nil, nil, nil, 
	}
	,  = msgHeader(, )
	return , , , nil