Source File
rpc_util.go
Belonging Package
google.golang.org/grpc
package grpc
import (
)
Type() string
}
type gzipCompressor struct {
pool sync.Pool
}
func () Compressor {
, := NewGZIPCompressorWithLevel(gzip.DefaultCompression)
return
}
func ( int) (Compressor, error) {
if < gzip.DefaultCompression || > gzip.BestCompression {
return nil, fmt.Errorf("grpc: invalid compression level: %d", )
}
return &gzipCompressor{
pool: sync.Pool{
New: func() interface{} {
, := gzip.NewWriterLevel(ioutil.Discard, )
if != nil {
panic()
}
return
},
},
}, nil
}
func ( *gzipCompressor) ( io.Writer, []byte) error {
:= .pool.Get().(*gzip.Writer)
defer .pool.Put()
.Reset()
if , := .Write(); != nil {
return
}
return .Close()
}
func ( *gzipCompressor) () string {
return "gzip"
}
Type() string
}
type gzipDecompressor struct {
pool sync.Pool
}
func () Decompressor {
return &gzipDecompressor{}
}
func ( *gzipDecompressor) ( io.Reader) ([]byte, error) {
var *gzip.Reader
switch maybeZ := .pool.Get().(type) {
case nil:
, := gzip.NewReader()
if != nil {
return nil,
}
=
case *gzip.Reader:
=
if := .Reset(); != nil {
.pool.Put()
return nil,
}
}
defer func() {
.Close()
.pool.Put()
}()
return ioutil.ReadAll()
}
func ( *gzipDecompressor) () string {
return "gzip"
}
type callInfo struct {
compressorType string
failFast bool
maxReceiveMessageSize *int
maxSendMessageSize *int
creds credentials.PerRPCCredentials
contentSubtype string
codec baseCodec
maxRetryRPCBufferSize int
}
func () *callInfo {
return &callInfo{
failFast: true,
maxRetryRPCBufferSize: 256 * 1024, // 256KB
}
}
type EmptyCallOption struct{}
func (EmptyCallOption) (*callInfo) error { return nil }
func (EmptyCallOption) (*callInfo, *csAttempt) {}
func ( *metadata.MD) CallOption {
return HeaderCallOption{HeaderAddr: }
}
type HeaderCallOption struct {
HeaderAddr *metadata.MD
}
func ( HeaderCallOption) ( *callInfo) error { return nil }
func ( HeaderCallOption) ( *callInfo, *csAttempt) {
*.HeaderAddr, _ = .s.Header()
}
func ( *metadata.MD) CallOption {
return TrailerCallOption{TrailerAddr: }
}
type TrailerCallOption struct {
TrailerAddr *metadata.MD
}
func ( TrailerCallOption) ( *callInfo) error { return nil }
func ( TrailerCallOption) ( *callInfo, *csAttempt) {
*.TrailerAddr = .s.Trailer()
}
func ( *peer.Peer) CallOption {
return PeerCallOption{PeerAddr: }
}
type PeerCallOption struct {
PeerAddr *peer.Peer
}
func ( PeerCallOption) ( *callInfo) error { return nil }
func ( PeerCallOption) ( *callInfo, *csAttempt) {
if , := peer.FromContext(.s.Context()); {
*.PeerAddr = *
}
}
func ( bool) CallOption {
return FailFastCallOption{FailFast: !}
}
func ( bool) CallOption {
return FailFastCallOption{FailFast: }
}
type FailFastCallOption struct {
FailFast bool
}
func ( FailFastCallOption) ( *callInfo) error {
.failFast = .FailFast
return nil
}
func ( FailFastCallOption) ( *callInfo, *csAttempt) {}
func ( int) CallOption {
return MaxRecvMsgSizeCallOption{MaxRecvMsgSize: }
}
type MaxRecvMsgSizeCallOption struct {
MaxRecvMsgSize int
}
func ( MaxRecvMsgSizeCallOption) ( *callInfo) error {
.maxReceiveMessageSize = &.MaxRecvMsgSize
return nil
}
func ( MaxRecvMsgSizeCallOption) ( *callInfo, *csAttempt) {}
func ( int) CallOption {
return MaxSendMsgSizeCallOption{MaxSendMsgSize: }
}
type MaxSendMsgSizeCallOption struct {
MaxSendMsgSize int
}
func ( MaxSendMsgSizeCallOption) ( *callInfo) error {
.maxSendMessageSize = &.MaxSendMsgSize
return nil
}
func ( MaxSendMsgSizeCallOption) ( *callInfo, *csAttempt) {}
func ( credentials.PerRPCCredentials) CallOption {
return PerRPCCredsCallOption{Creds: }
}
type PerRPCCredsCallOption struct {
Creds credentials.PerRPCCredentials
}
func ( PerRPCCredsCallOption) ( *callInfo) error {
.creds = .Creds
return nil
}
func ( PerRPCCredsCallOption) ( *callInfo, *csAttempt) {}
func ( string) CallOption {
return CompressorCallOption{CompressorType: }
}
type CompressorCallOption struct {
CompressorType string
}
func ( CompressorCallOption) ( *callInfo) error {
.compressorType = .CompressorType
return nil
}
func ( CompressorCallOption) ( *callInfo, *csAttempt) {}
func ( string) CallOption {
return ContentSubtypeCallOption{ContentSubtype: strings.ToLower()}
}
type ContentSubtypeCallOption struct {
ContentSubtype string
}
func ( ContentSubtypeCallOption) ( *callInfo) error {
.contentSubtype = .ContentSubtype
return nil
}
func ( ContentSubtypeCallOption) ( *callInfo, *csAttempt) {}
func ( encoding.Codec) CallOption {
return ForceCodecCallOption{Codec: }
}
type ForceCodecCallOption struct {
Codec encoding.Codec
}
func ( ForceCodecCallOption) ( *callInfo) error {
.codec = .Codec
return nil
}
func ( ForceCodecCallOption) ( *callInfo, *csAttempt) {}
func ( Codec) CallOption {
return CustomCodecCallOption{Codec: }
}
type CustomCodecCallOption struct {
Codec Codec
}
func ( CustomCodecCallOption) ( *callInfo) error {
.codec = .Codec
return nil
}
func ( CustomCodecCallOption) ( *callInfo, *csAttempt) {}
func ( int) CallOption {
return MaxRetryRPCBufferSizeCallOption{}
}
type MaxRetryRPCBufferSizeCallOption struct {
MaxRetryRPCBufferSize int
}
func ( MaxRetryRPCBufferSizeCallOption) ( *callInfo) error {
.maxRetryRPCBufferSize = .MaxRetryRPCBufferSize
return nil
}
func ( MaxRetryRPCBufferSizeCallOption) ( *callInfo, *csAttempt) {}
type payloadFormat uint8
const (
compressionNone payloadFormat = 0 // no compression
compressionMade payloadFormat = 1 // compressed
)
func ( *parser) ( int) ( payloadFormat, []byte, error) {
if , := .r.Read(.header[:]); != nil {
return 0, nil,
}
= payloadFormat(.header[0])
:= binary.BigEndian.Uint32(.header[1:])
if == 0 {
return , nil, nil
}
if int64() > int64(maxInt) {
return 0, nil, status.Errorf(codes.ResourceExhausted, "grpc: received message larger than max length allowed on current machine (%d vs. %d)", , maxInt)
}
if int() > {
return 0, nil, status.Errorf(codes.ResourceExhausted, "grpc: received message larger than max (%d vs. %d)", , )
func ( baseCodec, interface{}) ([]byte, error) {
if == nil { // NOTE: typed nils will not be caught by this check
return nil, nil
}
, := .Marshal()
if != nil {
return nil, status.Errorf(codes.Internal, "grpc: error while marshaling: %v", .Error())
}
if uint(len()) > math.MaxUint32 {
return nil, status.Errorf(codes.ResourceExhausted, "grpc: message too large (%d bytes)", len())
}
return , nil
}
func ( []byte, Compressor, encoding.Compressor) ([]byte, error) {
if == nil && == nil {
return nil, nil
}
:= func( error) error {
return status.Errorf(codes.Internal, "grpc: error while compressing: %v", .Error())
}
:= &bytes.Buffer{}
if != nil {
, := .Compress()
if != nil {
return nil, ()
}
if , := .Write(); != nil {
return nil, ()
}
if := .Close(); != nil {
return nil, ()
}
} else {
if := .Do(, ); != nil {
return nil, ()
}
}
return .Bytes(), nil
}
const (
payloadLen = 1
sizeLen = 4
headerLen = payloadLen + sizeLen
)
binary.BigEndian.PutUint32([payloadLen:], uint32(len()))
return ,
}
func ( bool, interface{}, , []byte, time.Time) *stats.OutPayload {
return &stats.OutPayload{
Client: ,
Payload: ,
Data: ,
Length: len(),
WireLength: len() + headerLen,
SentTime: ,
}
}
func ( payloadFormat, string, bool) *status.Status {
switch {
case compressionNone:
case compressionMade:
if == "" || == encoding.Identity {
return status.New(codes.Internal, "grpc: compressed flag set with identity or empty encoding")
}
if ! {
return status.Newf(codes.Unimplemented, "grpc: Decompressor is not installed for grpc-encoding %q", )
}
default:
return status.Newf(codes.Internal, "grpc: received unexpected payload format %d", )
}
return nil
}
type payloadInfo struct {
wireLength int // The compressed length got from wire.
uncompressedBytes []byte
}
func ( *parser, *transport.Stream, Decompressor, int, *payloadInfo, encoding.Compressor) ([]byte, error) {
, , := .recvMsg()
if != nil {
return nil,
}
if != nil {
.wireLength = len()
}
if := checkRecvPayload(, .RecvCompress(), != nil || != nil); != nil {
return nil, .Err()
}
var int
func ( *parser, baseCodec, *transport.Stream, Decompressor, interface{}, int, *payloadInfo, encoding.Compressor) error {
, := recvAndDecompress(, , , , , )
if != nil {
return
}
if := .Unmarshal(, ); != nil {
return status.Errorf(codes.Internal, "grpc: failed to unmarshal the received message %v", )
}
if != nil {
.uncompressedBytes =
}
return nil
}
type rpcInfo struct {
failfast bool
preloaderInfo *compressorInfo
}
type compressorInfo struct {
codec baseCodec
cp Compressor
comp encoding.Compressor
}
type rpcInfoContextKey struct{}
func ( context.Context, bool, baseCodec, Compressor, encoding.Compressor) context.Context {
return context.WithValue(, rpcInfoContextKey{}, &rpcInfo{
failfast: ,
preloaderInfo: &compressorInfo{
codec: ,
cp: ,
comp: ,
},
})
}
func ( context.Context) ( *rpcInfo, bool) {
, = .Value(rpcInfoContextKey{}).(*rpcInfo)
return
}
func ( error) error {
if == nil || == io.EOF {
return
}
if == io.ErrUnexpectedEOF {
return status.Error(codes.Internal, .Error())
}
if , := status.FromError(); {
return
}
switch e := .(type) {
case transport.ConnectionError:
return status.Error(codes.Unavailable, .Desc)
default:
switch {
case context.DeadlineExceeded:
return status.Error(codes.DeadlineExceeded, .Error())
case context.Canceled:
return status.Error(codes.Canceled, .Error())
}
}
return status.Error(codes.Unknown, .Error())
}
return nil
}
type channelzData struct {
callsStarted int64
callsFailed int64
![]() |
The pages are generated with Golds v0.3.2-preview. (GOOS=darwin GOARCH=amd64) Golds is a Go 101 project developed by Tapir Liu. PR and bug reports are welcome and can be submitted to the issue list. Please follow @Go100and1 (reachable from the left QR code) to get the latest news of Golds. |