Source File
stream.go
Belonging Package
google.golang.org/grpc
package grpc
import (
)
type StreamHandler func(srv interface{}, stream ServerStream) error
type StreamDesc struct {
StreamName string
Handler StreamHandler
RecvMsg(m interface{}) error
}
RecvMsg(m interface{}) error
}
= combine(.dopts.callOptions, )
if .dopts.streamInt != nil {
return .dopts.streamInt(, , , , newClientStream, ...)
}
return newClientStream(, , , , ...)
}
func ( context.Context, *StreamDesc, *ClientConn, string, ...CallOption) (ClientStream, error) {
return .NewStream(, , , ...)
}
func ( context.Context, *StreamDesc, *ClientConn, string, ...CallOption) ( ClientStream, error) {
if channelz.IsOn() {
.incrCallsStarted()
defer func() {
if != nil {
.incrCallsFailed()
}
}()
}
if := .waitForResolvedAddrs(); != nil {
return nil,
}
:= .GetMethodConfig()
if .WaitForReady != nil {
.failFast = !*.WaitForReady
}
var context.CancelFunc
if .Timeout != nil && *.Timeout >= 0 {
, = context.WithTimeout(, *.Timeout)
} else {
, = context.WithCancel()
}
defer func() {
if != nil {
()
}
}()
for , := range {
if := .before(); != nil {
return nil, toRPCErr()
}
}
.maxSendMessageSize = getMaxSize(.MaxReqSize, .maxSendMessageSize, defaultClientMaxSendMessageSize)
.maxReceiveMessageSize = getMaxSize(.MaxRespSize, .maxReceiveMessageSize, defaultClientMaxReceiveMessageSize)
if := setCallInfoCodec(); != nil {
return nil,
}
:= &transport.CallHdr{
Host: .authority,
Method: ,
ContentSubtype: .contentSubtype,
}
var Compressor
var encoding.Compressor
if := .compressorType; != "" {
.SendCompress =
if != encoding.Identity {
= encoding.GetCompressor()
if == nil {
return nil, status.Errorf(codes.Internal, "grpc: Compressor is not installed for requested grpc-encoding %q", )
}
}
} else if .dopts.cp != nil {
.SendCompress = .dopts.cp.Type()
= .dopts.cp
}
if .creds != nil {
.Creds = .creds
}
var *traceInfo
if EnableTracing {
= &traceInfo{
tr: trace.New("grpc.Sent."+methodFamily(), ),
firstLine: firstLine{
client: true,
},
}
if , := .Deadline(); {
.firstLine.deadline = time.Until()
}
.tr.LazyLog(&.firstLine, false)
= trace.NewContext(, .tr)
}
= newContextWithRPCInfo(, .failFast, .codec, , )
:= .dopts.copts.StatsHandler
var time.Time
if != nil {
= .TagRPC(, &stats.RPCTagInfo{FullMethodName: , FailFast: .failFast})
= time.Now()
:= &stats.Begin{
Client: true,
BeginTime: ,
FailFast: .failFast,
}
.HandleRPC(, )
}
:= &clientStream{
callHdr: ,
ctx: ,
methodConfig: &,
opts: ,
callInfo: ,
cc: ,
desc: ,
codec: .codec,
cp: ,
comp: ,
cancel: ,
beginTime: ,
firstAttempt: true,
}
if !.dopts.disableRetry {
.retryThrottler = .retryThrottler.Load().(*retryThrottler)
}
.binlog = binarylog.GetMethodLogger()
if := .newAttemptLocked(, ); != nil {
.finish()
return nil,
}
:= func( *csAttempt) error { return .newStream() }
if := .withRetry(, func() { .bufferForRetryLocked(0, ) }); != nil {
.finish()
return nil,
}
if .binlog != nil {
, := metadata.FromOutgoingContext()
:= &binarylog.ClientHeader{
OnClientSide: true,
Header: ,
MethodName: ,
Authority: .cc.authority,
}
if , := .Deadline(); {
.Timeout = time.Until()
if .Timeout < 0 {
.Timeout = 0
}
}
.binlog.Log()
}
= grpcutil.WithExtraMetadata(.ctx, metadata.Pairs(
"content-type", grpcutil.ContentType(.callHdr.ContentSubtype),
))
}
, , := .cc.getTransport(, .callInfo.failFast, .callHdr.Method)
if != nil {
return
}
if != nil {
.firstLine.SetRemoteAddr(.RemoteAddr())
}
.t =
.done =
.attempt =
return nil
}
func ( *csAttempt) () error {
:= .cs
.callHdr.PreviousAttempts = .numRetries
, := .t.NewStream(.ctx, .callHdr)
if != nil {
type clientStream struct {
callHdr *transport.CallHdr
opts []CallOption
callInfo *callInfo
cc *ClientConn
desc *StreamDesc
codec baseCodec
cp Compressor
comp encoding.Compressor
cancel context.CancelFunc // cancels all attempts
sentLast bool // sent an end stream
beginTime time.Time
methodConfig *MethodConfig
ctx context.Context // the application's context, wrapped by stats/tracing
retryThrottler *retryThrottler // The throttler active when the RPC began.
serverHeaderBinlogged bool
mu sync.Mutex
firstAttempt bool // if true, transparent retry is valid
numRetries int // exclusive of transparent retry attempt(s)
numRetriesSincePushback int // retries since pushback; to reset backoff
trInfo *traceInfo
statsHandler stats.Handler
}
func ( *clientStream) () {
.committed = true
.buffer = nil
}
func ( *clientStream) () {
.mu.Lock()
.commitAttemptLocked()
.mu.Unlock()
}
func ( *clientStream) ( error) error {
:= false
if .attempt.s == nil {
, := .(transport.PerformedIOError)
return nil
}
}
return
:= .attempt.s.Trailer()["grpc-retry-pushback-ms"]
if len() == 1 {
var error
if , = strconv.Atoi([0]); != nil || < 0 {
channelz.Infof(logger, .cc.channelzID, "Server retry pushback specified to abort (%q).", [0])
.retryThrottler.throttle() // This counts as a failure for throttling.
return
}
= true
} else if len() > 1 {
channelz.Warningf(logger, .cc.channelzID, "Server retry pushback specified multiple values (%q); not retrying.", )
.retryThrottler.throttle() // This counts as a failure for throttling.
return
}
}
var codes.Code
if .attempt.s != nil {
= .attempt.s.Status().Code()
} else {
= status.Convert().Code()
}
:= .methodConfig.retryPolicy
if == nil || !.retryableStatusCodes[] {
return
}
if .retryThrottler.throttle() {
return
}
if .numRetries+1 >= .maxAttempts {
return
}
var time.Duration
if {
= time.Millisecond * time.Duration()
.numRetriesSincePushback = 0
} else {
:= math.Pow(.backoffMultiplier, float64(.numRetriesSincePushback))
:= float64(.initialBackoff) *
if := float64(.maxBackoff); > {
=
}
= time.Duration(grpcrand.Int63n(int64()))
.numRetriesSincePushback++
}
func ( *clientStream) ( error) error {
for {
.attempt.finish()
if := .shouldRetry(); != nil {
.commitAttemptLocked()
return
}
.firstAttempt = false
if := .newAttemptLocked(nil, nil); != nil {
return
}
if = .replayBufferLocked(); == nil {
return nil
}
}
}
func ( *clientStream) () context.Context {
continue
}
if == io.EOF {
<-.s.Done()
}
if == nil || ( == io.EOF && .s.Status().Code() == codes.OK) {
()
.mu.Unlock()
return
}
if := .retryLocked(); != nil {
.mu.Unlock()
return
}
}
}
func ( *clientStream) () (metadata.MD, error) {
var metadata.MD
:= .withRetry(func( *csAttempt) error {
var error
, = .s.Header()
return toRPCErr()
}, .commitAttemptLocked)
if != nil {
.finish()
return nil,
}
:= &binarylog.ServerHeader{
OnClientSide: true,
Header: ,
PeerAddr: nil,
}
if , := peer.FromContext(.Context()); {
.PeerAddr = .Addr
}
.binlog.Log()
.serverHeaderBinlogged = true
}
return ,
}
if .committed {
return
}
.bufferSize +=
if .bufferSize > .callInfo.maxRetryRPCBufferSize {
.commitAttemptLocked()
return
}
.buffer = append(.buffer, )
}
func ( *clientStream) ( interface{}) ( error) {
defer func() {
, , , := prepareMsg(, .codec, .cp, .comp)
if != nil {
return
}
if len() > *.callInfo.maxSendMessageSize {
return status.Errorf(codes.ResourceExhausted, "trying to send message larger than max (%d vs. %d)", len(), *.callInfo.maxSendMessageSize)
}
:= // Store the pointer before setting to nil. For binary logging.
:= func( *csAttempt) error {
, = nil, nil
return
}
= .withRetry(, func() { .bufferForRetryLocked(len()+len(), ) })
if .binlog != nil && == nil {
.binlog.Log(&binarylog.ClientMessage{
OnClientSide: true,
Message: ,
})
}
return
}
func ( *clientStream) ( interface{}) error {
.Header()
}
var *payloadInfo
if .binlog != nil {
= &payloadInfo{}
}
:= .withRetry(func( *csAttempt) error {
return .recvMsg(, )
}, .commitAttemptLocked)
if .binlog != nil && == nil {
.binlog.Log(&binarylog.ServerMessage{
OnClientSide: true,
Message: .uncompressedBytes,
})
}
.finish()
return nil
}
.withRetry(, func() { .bufferForRetryLocked(0, ) })
if .binlog != nil {
.binlog.Log(&binarylog.ClientHalfClose{
OnClientSide: true,
})
return nil
}
func ( *clientStream) ( error) {
if .binlog != nil && status.Code() == codes.Canceled {
.binlog.Log(&binarylog.Cancel{
OnClientSide: true,
})
}
if == nil {
.retryThrottler.successfulRPC()
}
if channelz.IsOn() {
if != nil {
.cc.incrCallsFailed()
} else {
.cc.incrCallsSucceeded()
}
}
.cancel()
}
func ( *csAttempt) ( interface{}, , , []byte) error {
:= .cs
if .trInfo != nil {
.mu.Lock()
if .trInfo.tr != nil {
.trInfo.tr.LazyLog(&payload{sent: true, msg: }, true)
}
.mu.Unlock()
}
if := .t.Write(.s, , , &transport.Options{Last: !.desc.ClientStreams}); != nil {
return nil
}
return io.EOF
}
if .statsHandler != nil {
.statsHandler.HandleRPC(.ctx, outPayload(true, , , , time.Now()))
}
if channelz.IsOn() {
.t.IncrMsgSent()
}
return nil
}
func ( *csAttempt) ( interface{}, *payloadInfo) ( error) {
:= .cs
if .statsHandler != nil && == nil {
= &payloadInfo{}
}
if := .s.RecvCompress(); != "" && != encoding.Identity {
.dc = nil
.decomp = encoding.GetCompressor()
}
.decompSet = true
}
= recv(.p, .codec, .s, .dc, , *.callInfo.maxReceiveMessageSize, , .decomp)
if != nil {
if == io.EOF {
if := .s.Status().Err(); != nil {
return
}
return io.EOF // indicates successful end of stream.
}
return toRPCErr()
}
if .trInfo != nil {
.mu.Lock()
if .trInfo.tr != nil {
.trInfo.tr.LazyLog(&payload{sent: false, msg: }, true)
}
.mu.Unlock()
}
if .statsHandler != nil {
.statsHandler.HandleRPC(.ctx, &stats.InPayload{
Client: true,
RecvTime: time.Now(),
Data: .uncompressedBytes,
WireLength: .wireLength,
Length: len(.uncompressedBytes),
})
}
if channelz.IsOn() {
.t.IncrMsgRecv()
}
return nil
= recv(.p, .codec, .s, .dc, , *.callInfo.maxReceiveMessageSize, nil, .decomp)
if == nil {
return toRPCErr(errors.New("grpc: client streaming protocol violation: get <nil>, want <EOF>"))
}
if == io.EOF {
return .s.Status().Err() // non-server streaming Recv returns nil on success
}
return toRPCErr()
}
func ( *csAttempt) ( error) {
.mu.Lock()
if .finished {
.mu.Unlock()
return
}
.finished = true
= nil
}
var metadata.MD
if .s != nil {
.t.CloseStream(.s, )
= .s.Trailer()
}
if .done != nil {
:= false
if .s != nil {
= .s.BytesReceived()
}
.done(balancer.DoneInfo{
Err: ,
Trailer: ,
BytesSent: .s != nil,
BytesReceived: ,
ServerLoad: balancerload.Parse(),
})
}
if .statsHandler != nil {
:= &stats.End{
Client: true,
BeginTime: .cs.beginTime,
EndTime: time.Now(),
Trailer: ,
Error: ,
}
.statsHandler.HandleRPC(.cs.ctx, )
}
if .trInfo != nil && .trInfo.tr != nil {
if == nil {
.trInfo.tr.LazyPrintf("RPC: [OK]")
} else {
.trInfo.tr.LazyPrintf("RPC: [%v]", )
.trInfo.tr.SetError()
}
.trInfo.tr.Finish()
.trInfo.tr = nil
}
.mu.Unlock()
}
func ( context.Context, *StreamDesc, string, transport.ClientTransport, *addrConn, ...CallOption) ( ClientStream, error) {
:= &callInfo{}
, := context.WithCancel()
defer func() {
if != nil {
()
}
}()
for , := range {
if := .before(); != nil {
return nil, toRPCErr()
}
}
.maxReceiveMessageSize = getMaxSize(nil, .maxReceiveMessageSize, defaultClientMaxReceiveMessageSize)
.maxSendMessageSize = getMaxSize(nil, .maxSendMessageSize, defaultServerMaxSendMessageSize)
if := setCallInfoCodec(); != nil {
return nil,
}
:= &transport.CallHdr{
Host: .cc.authority,
Method: ,
ContentSubtype: .contentSubtype,
}
var Compressor
var encoding.Compressor
if := .compressorType; != "" {
.SendCompress =
if != encoding.Identity {
= encoding.GetCompressor()
if == nil {
return nil, status.Errorf(codes.Internal, "grpc: Compressor is not installed for requested grpc-encoding %q", )
}
}
} else if .cc.dopts.cp != nil {
.SendCompress = .cc.dopts.cp.Type()
= .cc.dopts.cp
}
if .creds != nil {
.Creds = .creds
}
go func() {
select {
case <-.ctx.Done():
.finish(status.Error(codes.Canceled, "grpc: the SubConn is closing"))
case <-.Done():
.finish(toRPCErr(.Err()))
}
}()
}
return , nil
}
type addrConnStream struct {
s *transport.Stream
ac *addrConn
callHdr *transport.CallHdr
cancel context.CancelFunc
opts []CallOption
callInfo *callInfo
t transport.ClientTransport
ctx context.Context
sentLast bool
desc *StreamDesc
codec baseCodec
cp Compressor
comp encoding.Compressor
decompSet bool
dc Decompressor
decomp encoding.Compressor
p *parser
mu sync.Mutex
finished bool
}
func ( *addrConnStream) () (metadata.MD, error) {
, := .s.Header()
if != nil {
.finish(toRPCErr())
}
return ,
}
func ( *addrConnStream) () metadata.MD {
return .s.Trailer()
}
func ( *addrConnStream) () error {
return nil
}
func ( *addrConnStream) () context.Context {
return .s.Context()
}
func ( *addrConnStream) ( interface{}) ( error) {
defer func() {
, , , := prepareMsg(, .codec, .cp, .comp)
if != nil {
return
}
if len() > *.callInfo.maxSendMessageSize {
return status.Errorf(codes.ResourceExhausted, "trying to send message larger than max (%d vs. %d)", len(), *.callInfo.maxSendMessageSize)
}
if := .t.Write(.s, , , &transport.Options{Last: !.desc.ClientStreams}); != nil {
return nil
}
return io.EOF
}
if channelz.IsOn() {
.t.IncrMsgSent()
}
return nil
}
func ( *addrConnStream) ( interface{}) ( error) {
defer func() {
.finish()
}
}()
if := .s.RecvCompress(); != "" && != encoding.Identity {
.dc = nil
.decomp = encoding.GetCompressor()
}
return nil
}
= recv(.p, .codec, .s, .dc, , *.callInfo.maxReceiveMessageSize, nil, .decomp)
if == nil {
return toRPCErr(errors.New("grpc: client streaming protocol violation: get <nil>, want <EOF>"))
}
if == io.EOF {
return .s.Status().Err() // non-server streaming Recv returns nil on success
}
return toRPCErr()
}
func ( *addrConnStream) ( error) {
.mu.Lock()
if .finished {
.mu.Unlock()
return
}
.finished = true
= nil
}
if .s != nil {
.t.CloseStream(.s, )
}
if != nil {
.ac.incrCallsFailed()
} else {
.ac.incrCallsSucceeded()
}
.cancel()
.mu.Unlock()
}
RecvMsg(m interface{}) error
}
serverHeaderBinlogged bool
mu sync.Mutex // protects trInfo.tr after the service handler runs.
}
func ( *serverStream) () context.Context {
return .ctx
}
func ( *serverStream) ( metadata.MD) error {
if .Len() == 0 {
return nil
}
return .s.SetHeader()
}
func ( *serverStream) ( metadata.MD) error {
:= .t.WriteHeader(.s, )
if .binlog != nil && !.serverHeaderBinlogged {
, := .s.Header()
.binlog.Log(&binarylog.ServerHeader{
Header: ,
})
.serverHeaderBinlogged = true
}
return
}
func ( *serverStream) ( metadata.MD) {
if .Len() == 0 {
return
}
.s.SetTrailer()
}
func ( *serverStream) ( interface{}) ( error) {
defer func() {
if .trInfo != nil {
.mu.Lock()
if .trInfo.tr != nil {
if == nil {
.trInfo.tr.LazyLog(&payload{sent: true, msg: }, true)
} else {
.trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{}}, true)
.trInfo.tr.SetError()
}
}
.mu.Unlock()
}
if != nil && != io.EOF {
, := status.FromError(toRPCErr())
}
if channelz.IsOn() && == nil {
.t.IncrMsgSent()
}
}()
, , , := prepareMsg(, .codec, .cp, .comp)
if != nil {
return
}
if len() > .maxSendMessageSize {
return status.Errorf(codes.ResourceExhausted, "trying to send message larger than max (%d vs. %d)", len(), .maxSendMessageSize)
}
if := .t.Write(.s, , , &transport.Options{Last: false}); != nil {
return toRPCErr()
}
if .binlog != nil {
if !.serverHeaderBinlogged {
, := .s.Header()
.binlog.Log(&binarylog.ServerHeader{
Header: ,
})
.serverHeaderBinlogged = true
}
.binlog.Log(&binarylog.ServerMessage{
Message: ,
})
}
if .statsHandler != nil {
.statsHandler.HandleRPC(.s.Context(), outPayload(false, , , , time.Now()))
}
return nil
}
func ( *serverStream) ( interface{}) ( error) {
defer func() {
if .trInfo != nil {
.mu.Lock()
if .trInfo.tr != nil {
if == nil {
.trInfo.tr.LazyLog(&payload{sent: false, msg: }, true)
} else if != io.EOF {
.trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{}}, true)
.trInfo.tr.SetError()
}
}
.mu.Unlock()
}
if != nil && != io.EOF {
, := status.FromError(toRPCErr())
}
if channelz.IsOn() && == nil {
.t.IncrMsgRecv()
}
}()
var *payloadInfo
if .statsHandler != nil || .binlog != nil {
= &payloadInfo{}
}
if := recv(.p, .codec, .s, .dc, , .maxReceiveMessageSize, , .decomp); != nil {
if == io.EOF {
if .binlog != nil {
.binlog.Log(&binarylog.ClientHalfClose{})
}
return
}
if == io.ErrUnexpectedEOF {
= status.Errorf(codes.Internal, io.ErrUnexpectedEOF.Error())
}
return toRPCErr()
}
if .statsHandler != nil {
.statsHandler.HandleRPC(.s.Context(), &stats.InPayload{
RecvTime: time.Now(),
Data: .uncompressedBytes,
WireLength: .wireLength,
Length: len(.uncompressedBytes),
})
}
if .binlog != nil {
.binlog.Log(&binarylog.ClientMessage{
Message: .uncompressedBytes,
})
}
return nil
}
func ( ServerStream) (string, bool) {
return Method(.Context())
}
func ( interface{}, baseCodec, Compressor, encoding.Compressor) (, , []byte, error) {
if , := .(*PreparedMsg); {
return .hdr, .payload, .encodedData, nil
![]() |
The pages are generated with Golds v0.3.2-preview. (GOOS=darwin GOARCH=amd64) Golds is a Go 101 project developed by Tapir Liu. PR and bug reports are welcome and can be submitted to the issue list. Please follow @Go100and1 (reachable from the left QR code) to get the latest news of Golds. |