Source File
server.go
Belonging Package
google.golang.org/grpc
package grpc
import (
)
const (
defaultServerMaxReceiveMessageSize = 1024 * 1024 * 4
defaultServerMaxSendMessageSize = math.MaxInt32
)
var statusOK = status.New(codes.OK, "")
var logger = grpclog.Component("core")
type methodHandler func(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor UnaryServerInterceptor) (interface{}, error)
type MethodDesc struct {
MethodName string
Handler methodHandler
}
type ServiceDesc struct {
HandlerType interface{}
Methods []MethodDesc
Streams []StreamDesc
Metadata interface{}
}
serviceImpl interface{}
methods map[string]*MethodDesc
streams map[string]*StreamDesc
mdata interface{}
}
type serverWorkerData struct {
st transport.ServerTransport
wg *sync.WaitGroup
stream *transport.Stream
}
type Server struct {
opts serverOptions
mu sync.Mutex // guards following
lis map[net.Listener]bool
conns map[transport.ServerTransport]bool
serve bool
drain bool
cv *sync.Cond // signaled when connections close for GracefulStop
services map[string]*serviceInfo // service name -> service info
events trace.EventLog
quit *grpcsync.Event
done *grpcsync.Event
channelzRemoveOnce sync.Once
serveWG sync.WaitGroup // counts active Serve goroutines for GracefulStop
channelzID int64 // channelz unique identification number
czData *channelzData
serverWorkerChannels []chan *serverWorkerData
}
type serverOptions struct {
creds credentials.TransportCredentials
codec baseCodec
cp Compressor
dc Decompressor
unaryInt UnaryServerInterceptor
streamInt StreamServerInterceptor
chainUnaryInts []UnaryServerInterceptor
chainStreamInts []StreamServerInterceptor
inTapHandle tap.ServerInHandle
statsHandler stats.Handler
maxConcurrentStreams uint32
maxReceiveMessageSize int
maxSendMessageSize int
unknownStreamDesc *StreamDesc
keepaliveParams keepalive.ServerParameters
keepalivePolicy keepalive.EnforcementPolicy
initialWindowSize int32
initialConnWindowSize int32
writeBufferSize int
readBufferSize int
connectionTimeout time.Duration
maxHeaderListSize *uint32
headerTableSize *uint32
numServerWorkers uint32
}
var defaultServerOptions = serverOptions{
maxReceiveMessageSize: defaultServerMaxReceiveMessageSize,
maxSendMessageSize: defaultServerMaxSendMessageSize,
connectionTimeout: 120 * time.Second,
writeBufferSize: defaultWriteBufSize,
readBufferSize: defaultReadBufSize,
}
type ServerOption interface {
apply(*serverOptions)
}
type EmptyServerOption struct{}
func (EmptyServerOption) (*serverOptions) {}
type funcServerOption struct {
f func(*serverOptions)
}
func ( *funcServerOption) ( *serverOptions) {
.f()
}
func ( func(*serverOptions)) *funcServerOption {
return &funcServerOption{
f: ,
}
}
func ( int) ServerOption {
return newFuncServerOption(func( *serverOptions) {
.writeBufferSize =
})
}
func ( int) ServerOption {
return newFuncServerOption(func( *serverOptions) {
.readBufferSize =
})
}
func ( int32) ServerOption {
return newFuncServerOption(func( *serverOptions) {
.initialWindowSize =
})
}
func ( int32) ServerOption {
return newFuncServerOption(func( *serverOptions) {
.initialConnWindowSize =
})
}
func ( keepalive.ServerParameters) ServerOption {
if .Time > 0 && .Time < time.Second {
logger.Warning("Adjusting keepalive ping interval to minimum period of 1s")
.Time = time.Second
}
return newFuncServerOption(func( *serverOptions) {
.keepaliveParams =
})
}
func ( keepalive.EnforcementPolicy) ServerOption {
return newFuncServerOption(func( *serverOptions) {
.keepalivePolicy =
})
}
func ( Codec) ServerOption {
return newFuncServerOption(func( *serverOptions) {
.codec =
})
}
func ( Compressor) ServerOption {
return newFuncServerOption(func( *serverOptions) {
.cp =
})
}
func ( Decompressor) ServerOption {
return newFuncServerOption(func( *serverOptions) {
.dc =
})
}
func ( int) ServerOption {
return MaxRecvMsgSize()
}
func ( int) ServerOption {
return newFuncServerOption(func( *serverOptions) {
.maxReceiveMessageSize =
})
}
func ( int) ServerOption {
return newFuncServerOption(func( *serverOptions) {
.maxSendMessageSize =
})
}
func ( uint32) ServerOption {
return newFuncServerOption(func( *serverOptions) {
.maxConcurrentStreams =
})
}
func ( credentials.TransportCredentials) ServerOption {
return newFuncServerOption(func( *serverOptions) {
.creds =
})
}
func ( UnaryServerInterceptor) ServerOption {
return newFuncServerOption(func( *serverOptions) {
if .unaryInt != nil {
panic("The unary server interceptor was already set and may not be reset.")
}
.unaryInt =
})
}
func ( ...UnaryServerInterceptor) ServerOption {
return newFuncServerOption(func( *serverOptions) {
.chainUnaryInts = append(.chainUnaryInts, ...)
})
}
func ( StreamServerInterceptor) ServerOption {
return newFuncServerOption(func( *serverOptions) {
if .streamInt != nil {
panic("The stream server interceptor was already set and may not be reset.")
}
.streamInt =
})
}
func ( ...StreamServerInterceptor) ServerOption {
return newFuncServerOption(func( *serverOptions) {
.chainStreamInts = append(.chainStreamInts, ...)
})
}
func ( tap.ServerInHandle) ServerOption {
return newFuncServerOption(func( *serverOptions) {
if .inTapHandle != nil {
panic("The tap handle was already set and may not be reset.")
}
.inTapHandle =
})
}
func ( stats.Handler) ServerOption {
return newFuncServerOption(func( *serverOptions) {
.statsHandler =
})
}
func ( StreamHandler) ServerOption {
return newFuncServerOption(func( *serverOptions) {
.unknownStreamDesc = &StreamDesc{
StreamName: "unknown_service_handler",
ClientStreams: true,
ServerStreams: true,
}
})
}
func ( time.Duration) ServerOption {
return newFuncServerOption(func( *serverOptions) {
.connectionTimeout =
})
}
func ( uint32) ServerOption {
return newFuncServerOption(func( *serverOptions) {
.maxHeaderListSize = &
})
}
func ( uint32) ServerOption {
return newFuncServerOption(func( *serverOptions) {
.headerTableSize = &
})
}
return newFuncServerOption(func( *serverOptions) {
.numServerWorkers =
})
}
const serverWorkerResetThreshold = 1 << 16
:= serverWorkerResetThreshold + grpcrand.Intn(serverWorkerResetThreshold)
for := 0; < ; ++ {
, := <-
if ! {
return
}
.handleStream(.st, .stream, .traceInfo(.st, .stream))
.wg.Done()
}
go .()
}
func ( *Server) () {
.serverWorkerChannels = make([]chan *serverWorkerData, .opts.numServerWorkers)
for := uint32(0); < .opts.numServerWorkers; ++ {
.serverWorkerChannels[] = make(chan *serverWorkerData)
go .serverWorker(.serverWorkerChannels[])
}
}
func ( *Server) () {
for := uint32(0); < .opts.numServerWorkers; ++ {
close(.serverWorkerChannels[])
}
}
func ( ...ServerOption) *Server {
:= defaultServerOptions
for , := range {
.apply(&)
}
:= &Server{
lis: make(map[net.Listener]bool),
opts: ,
conns: make(map[transport.ServerTransport]bool),
services: make(map[string]*serviceInfo),
quit: grpcsync.NewEvent(),
done: grpcsync.NewEvent(),
czData: new(channelzData),
}
chainUnaryServerInterceptors()
chainStreamServerInterceptors()
.cv = sync.NewCond(&.mu)
if EnableTracing {
, , , := runtime.Caller(1)
.events = trace.NewEventLog("grpc.Server", fmt.Sprintf("%s:%d", , ))
}
if .opts.numServerWorkers > 0 {
.initServerWorkers()
}
if channelz.IsOn() {
.channelzID = channelz.RegisterServer(&channelzServer{}, "")
}
return
}
RegisterService(desc *ServiceDesc, impl interface{})
}
func ( *Server) ( *ServiceDesc, interface{}) {
if != nil {
:= reflect.TypeOf(.HandlerType).Elem()
:= reflect.TypeOf()
if !.Implements() {
logger.Fatalf("grpc: Server.RegisterService found the handler of type %v that does not satisfy %v", , )
}
}
.register(, )
}
func ( *Server) ( *ServiceDesc, interface{}) {
.mu.Lock()
defer .mu.Unlock()
.printf("RegisterService(%q)", .ServiceName)
if .serve {
logger.Fatalf("grpc: Server.RegisterService after Server.Serve for %q", .ServiceName)
}
if , := .services[.ServiceName]; {
logger.Fatalf("grpc: Server.RegisterService found duplicate service registration for %q", .ServiceName)
}
:= &serviceInfo{
serviceImpl: ,
methods: make(map[string]*MethodDesc),
streams: make(map[string]*StreamDesc),
mdata: .Metadata,
}
for := range .Methods {
:= &.Methods[]
.methods[.MethodName] =
}
for := range .Streams {
:= &.Streams[]
.streams[.StreamName] =
}
.services[.ServiceName] =
}
type ServiceInfo struct {
Metadata interface{}
}
func ( *Server) () map[string]ServiceInfo {
:= make(map[string]ServiceInfo)
for , := range .services {
:= make([]MethodInfo, 0, len(.methods)+len(.streams))
for := range .methods {
= append(, MethodInfo{
Name: ,
IsClientStream: false,
IsServerStream: false,
})
}
for , := range .streams {
= append(, MethodInfo{
Name: ,
IsClientStream: .ClientStreams,
IsServerStream: .ServerStreams,
})
}
[] = ServiceInfo{
Methods: ,
Metadata: .mdata,
}
}
return
}
var ErrServerStopped = errors.New("grpc: the server has been stopped")
func ( *Server) ( net.Conn) (net.Conn, credentials.AuthInfo, error) {
if .opts.creds == nil {
return , nil, nil
}
return .opts.creds.ServerHandshake()
}
type listenSocket struct {
net.Listener
channelzID int64
}
func ( *listenSocket) () *channelz.SocketInternalMetric {
return &channelz.SocketInternalMetric{
SocketOptions: channelz.GetSocketOption(.Listener),
LocalAddr: .Listener.Addr(),
}
}
func ( *listenSocket) () error {
:= .Listener.Close()
if channelz.IsOn() {
channelz.RemoveEntry(.channelzID)
}
return
}
<-.done.Done()
}
}()
:= &listenSocket{Listener: }
.lis[] = true
if channelz.IsOn() {
.channelzID = channelz.RegisterListenSocket(, .channelzID, .Addr().String())
}
.mu.Unlock()
defer func() {
.mu.Lock()
if .lis != nil && .lis[] {
.Close()
delete(.lis, )
}
.mu.Unlock()
}()
var time.Duration // how long to sleep on accept failure
for {
, := .Accept()
if != nil {
if , := .(interface {
() bool
}); && .() {
if == 0 {
= 5 * time.Millisecond
} else {
*= 2
}
if := 1 * time.Second; > {
=
}
.mu.Lock()
.printf("Accept error: %v; retrying in %v", , )
.mu.Unlock()
:= time.NewTimer()
select {
case <-.C:
case <-.quit.Done():
.Stop()
return nil
}
continue
}
.mu.Lock()
.printf("done serving; Accept = %v", )
.mu.Unlock()
if .quit.HasFired() {
return nil
}
return
}
.serveWG.Add(1)
go func() {
.handleRawConn()
.serveWG.Done()
}()
}
}
func ( *Server) ( net.Conn) {
if .quit.HasFired() {
.Close()
return
}
.SetDeadline(time.Now().Add(.opts.connectionTimeout))
, , := .useTransportAuthenticator()
if != credentials.ErrConnDispatched {
.mu.Lock()
.errorf("ServerHandshake(%q) failed: %v", .RemoteAddr(), )
.mu.Unlock()
channelz.Warningf(logger, .channelzID, "grpc: Server.Serve failed to complete security handshake from %q: %v", .RemoteAddr(), )
.Close()
}
.SetDeadline(time.Time{})
return
}
:= .newHTTP2Transport(, )
if == nil {
return
}
.SetDeadline(time.Time{})
if !.addConn() {
return
}
go func() {
.serveStreams()
.removeConn()
}()
}
func ( *Server) ( net.Conn, credentials.AuthInfo) transport.ServerTransport {
:= &transport.ServerConfig{
MaxStreams: .opts.maxConcurrentStreams,
AuthInfo: ,
InTapHandle: .opts.inTapHandle,
StatsHandler: .opts.statsHandler,
KeepaliveParams: .opts.keepaliveParams,
KeepalivePolicy: .opts.keepalivePolicy,
InitialWindowSize: .opts.initialWindowSize,
InitialConnWindowSize: .opts.initialConnWindowSize,
WriteBufferSize: .opts.writeBufferSize,
ReadBufferSize: .opts.readBufferSize,
ChannelzParentID: .channelzID,
MaxHeaderListSize: .opts.maxHeaderListSize,
HeaderTableSize: .opts.headerTableSize,
}
, := transport.NewServerTransport("http2", , )
if != nil {
.mu.Lock()
.errorf("NewServerTransport(%q) failed: %v", .RemoteAddr(), )
.mu.Unlock()
.Close()
channelz.Warning(logger, .channelzID, "grpc: Server.Serve failed to create ServerTransport: ", )
return nil
}
return
}
func ( *Server) ( transport.ServerTransport) {
defer .Close()
var sync.WaitGroup
var uint32
.HandleStreams(func( *transport.Stream) {
.Add(1)
if .opts.numServerWorkers > 0 {
:= &serverWorkerData{st: , wg: &, stream: }
select {
case .serverWorkerChannels[atomic.AddUint32(&, 1)%.opts.numServerWorkers] <- :
go func() {
.handleStream(, , .traceInfo(, ))
.Done()
}()
}
} else {
go func() {
defer .Done()
.handleStream(, , .traceInfo(, ))
}()
}
}, func( context.Context, string) context.Context {
if !EnableTracing {
return
}
:= trace.New("grpc.Recv."+methodFamily(), )
return trace.NewContext(, )
})
.Wait()
}
var _ http.Handler = (*Server)(nil)
func ( *Server) ( http.ResponseWriter, *http.Request) {
, := transport.NewServerHandlerTransport(, , .opts.statsHandler)
if != nil {
http.Error(, .Error(), http.StatusInternalServerError)
return
}
if !.addConn() {
return
}
defer .removeConn()
.serveStreams()
}
func ( *Server) ( transport.ServerTransport, *transport.Stream) ( *traceInfo) {
if !EnableTracing {
return nil
}
, := trace.FromContext(.Context())
if ! {
return nil
}
= &traceInfo{
tr: ,
firstLine: firstLine{
client: false,
remoteAddr: .RemoteAddr(),
},
}
if , := .Context().Deadline(); {
.firstLine.deadline = time.Until()
}
return
}
func ( *Server) ( transport.ServerTransport) bool {
.mu.Lock()
defer .mu.Unlock()
if .conns == nil {
.Close()
return false
}
.Drain()
}
.conns[] = true
return true
}
func ( *Server) ( transport.ServerTransport) {
.mu.Lock()
defer .mu.Unlock()
if .conns != nil {
delete(.conns, )
.cv.Broadcast()
}
}
func ( *Server) () *channelz.ServerInternalMetric {
return &channelz.ServerInternalMetric{
CallsStarted: atomic.LoadInt64(&.czData.callsStarted),
CallsSucceeded: atomic.LoadInt64(&.czData.callsSucceeded),
CallsFailed: atomic.LoadInt64(&.czData.callsFailed),
LastCallStartedTimestamp: time.Unix(0, atomic.LoadInt64(&.czData.lastCallStartedTime)),
}
}
func ( *Server) () {
atomic.AddInt64(&.czData.callsStarted, 1)
atomic.StoreInt64(&.czData.lastCallStartedTime, time.Now().UnixNano())
}
func ( *Server) () {
atomic.AddInt64(&.czData.callsSucceeded, 1)
}
func ( *Server) () {
atomic.AddInt64(&.czData.callsFailed, 1)
}
func ( *Server) ( transport.ServerTransport, *transport.Stream, interface{}, Compressor, *transport.Options, encoding.Compressor) error {
, := encode(.getCodec(.ContentSubtype()), )
if != nil {
channelz.Error(logger, .channelzID, "grpc: server failed to encode response: ", )
return
}
, := compress(, , )
if != nil {
channelz.Error(logger, .channelzID, "grpc: server failed to compress response: ", )
return
}
if len() > .opts.maxSendMessageSize {
return status.Errorf(codes.ResourceExhausted, "grpc: trying to send message larger than max (%d vs. %d)", len(), .opts.maxSendMessageSize)
}
= .Write(, , , )
if == nil && .opts.statsHandler != nil {
.opts.statsHandler.HandleRPC(.Context(), outPayload(false, , , , time.Now()))
}
return
}
:= .opts.chainUnaryInts
if .opts.unaryInt != nil {
= append([]UnaryServerInterceptor{.opts.unaryInt}, .opts.chainUnaryInts...)
}
var UnaryServerInterceptor
if len() == 0 {
= nil
} else if len() == 1 {
= [0]
} else {
= func( context.Context, interface{}, *UnaryServerInfo, UnaryHandler) (interface{}, error) {
return [0](, , , getChainUnaryHandler(, 0, , ))
}
}
.opts.unaryInt =
}
func ( []UnaryServerInterceptor, int, *UnaryServerInfo, UnaryHandler) UnaryHandler {
if == len()-1 {
return
}
return func( context.Context, interface{}) (interface{}, error) {
return [+1](, , , (, +1, , ))
}
}
func ( *Server) ( transport.ServerTransport, *transport.Stream, *serviceInfo, *MethodDesc, *traceInfo) ( error) {
:= .opts.statsHandler
if != nil || != nil || channelz.IsOn() {
if channelz.IsOn() {
.incrCallsStarted()
}
var *stats.Begin
if != nil {
:= time.Now()
= &stats.Begin{
BeginTime: ,
}
.HandleRPC(.Context(), )
}
if != nil {
.tr.LazyLog(&.firstLine, false)
defer func() {
if != nil {
if != nil && != io.EOF {
.tr.LazyLog(&fmtStringer{"%v", []interface{}{}}, true)
.tr.SetError()
}
.tr.Finish()
}
if != nil {
:= &stats.End{
BeginTime: .BeginTime,
EndTime: time.Now(),
}
if != nil && != io.EOF {
.Error = toRPCErr()
}
.HandleRPC(.Context(), )
}
if channelz.IsOn() {
if != nil && != io.EOF {
.incrCallsFailed()
} else {
.incrCallsSucceeded()
}
}
}()
}
:= binarylog.GetMethodLogger(.Method())
if != nil {
:= .Context()
, := metadata.FromIncomingContext()
:= &binarylog.ClientHeader{
Header: ,
MethodName: .Method(),
PeerAddr: nil,
}
if , := .Deadline(); {
.Timeout = time.Until()
if .Timeout < 0 {
.Timeout = 0
}
}
if := [":authority"]; len() > 0 {
.Authority = [0]
}
if , := peer.FromContext(); {
.PeerAddr = .Addr
}
.Log()
}
var , encoding.Compressor
var Compressor
var Decompressor
if := .RecvCompress(); .opts.dc != nil && .opts.dc.Type() == {
= .opts.dc
} else if != "" && != encoding.Identity {
= encoding.GetCompressor()
if == nil {
:= status.Newf(codes.Unimplemented, "grpc: Decompressor is not installed for grpc-encoding %q", )
.WriteStatus(, )
return .Err()
}
}
= encoding.GetCompressor()
if != nil {
.SetSendCompress()
}
}
var *payloadInfo
if != nil || != nil {
= &payloadInfo{}
}
, := recvAndDecompress(&parser{r: }, , , .opts.maxReceiveMessageSize, , )
if != nil {
if := .WriteStatus(, status.Convert()); != nil {
channelz.Warningf(logger, .channelzID, "grpc: Server.processUnaryRPC failed to write status %v", )
}
return
}
if channelz.IsOn() {
.IncrMsgRecv()
}
:= func( interface{}) error {
if := .getCodec(.ContentSubtype()).Unmarshal(, ); != nil {
return status.Errorf(codes.Internal, "grpc: error unmarshalling request: %v", )
}
if != nil {
.HandleRPC(.Context(), &stats.InPayload{
RecvTime: time.Now(),
Payload: ,
WireLength: .wireLength,
Data: ,
Length: len(),
})
}
if != nil {
.Log(&binarylog.ClientMessage{
Message: ,
})
}
if != nil {
.tr.LazyLog(&payload{sent: false, msg: }, true)
}
return nil
}
:= NewContextWithServerTransportStream(.Context(), )
, := .Handler(.serviceImpl, , , .opts.unaryInt)
if != nil {
, := status.FromError()
return
}
if , := status.FromError(); {
if := .WriteStatus(, ); != nil {
channelz.Warningf(logger, .channelzID, "grpc: Server.processUnaryRPC failed to write status: %v", )
}
} else {
switch st := .(type) {
default:
panic(fmt.Sprintf("grpc: Unexpected error (%T) from sendResponse: %v", , ))
}
}
if != nil {
, := .Header()
.Log(&binarylog.ServerHeader{
Header: ,
})
.Log(&binarylog.ServerTrailer{
Trailer: .Trailer(),
Err: ,
})
}
return
}
if != nil {
, := .Header()
.Log(&binarylog.ServerHeader{
Header: ,
})
.Log(&binarylog.ServerMessage{
Message: ,
})
}
if channelz.IsOn() {
.IncrMsgSent()
}
if != nil {
.tr.LazyLog(&payload{sent: true, msg: }, true)
= .WriteStatus(, statusOK)
if != nil {
.Log(&binarylog.ServerTrailer{
Trailer: .Trailer(),
Err: ,
})
}
return
}
:= .opts.chainStreamInts
if .opts.streamInt != nil {
= append([]StreamServerInterceptor{.opts.streamInt}, .opts.chainStreamInts...)
}
var StreamServerInterceptor
if len() == 0 {
= nil
} else if len() == 1 {
= [0]
} else {
= func( interface{}, ServerStream, *StreamServerInfo, StreamHandler) error {
return [0](, , , getChainStreamHandler(, 0, , ))
}
}
.opts.streamInt =
}
func ( []StreamServerInterceptor, int, *StreamServerInfo, StreamHandler) StreamHandler {
if == len()-1 {
return
}
return func( interface{}, ServerStream) error {
return [+1](, , , (, +1, , ))
}
}
func ( *Server) ( transport.ServerTransport, *transport.Stream, *serviceInfo, *StreamDesc, *traceInfo) ( error) {
if channelz.IsOn() {
.incrCallsStarted()
}
:= .opts.statsHandler
var *stats.Begin
if != nil {
:= time.Now()
= &stats.Begin{
BeginTime: ,
}
.HandleRPC(.Context(), )
}
:= NewContextWithServerTransportStream(.Context(), )
:= &serverStream{
ctx: ,
t: ,
s: ,
p: &parser{r: },
codec: .getCodec(.ContentSubtype()),
maxReceiveMessageSize: .opts.maxReceiveMessageSize,
maxSendMessageSize: .opts.maxSendMessageSize,
trInfo: ,
statsHandler: ,
}
defer func() {
if != nil {
.mu.Lock()
if != nil && != io.EOF {
.trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{}}, true)
.trInfo.tr.SetError()
}
.trInfo.tr.Finish()
.trInfo.tr = nil
.mu.Unlock()
}
if != nil {
:= &stats.End{
BeginTime: .BeginTime,
EndTime: time.Now(),
}
if != nil && != io.EOF {
.Error = toRPCErr()
}
.HandleRPC(.Context(), )
}
if channelz.IsOn() {
if != nil && != io.EOF {
.incrCallsFailed()
} else {
.incrCallsSucceeded()
}
}
}()
}
.binlog = binarylog.GetMethodLogger(.Method())
if .binlog != nil {
, := metadata.FromIncomingContext()
:= &binarylog.ClientHeader{
Header: ,
MethodName: .Method(),
PeerAddr: nil,
}
if , := .Deadline(); {
.Timeout = time.Until()
if .Timeout < 0 {
.Timeout = 0
}
}
if := [":authority"]; len() > 0 {
.Authority = [0]
}
if , := peer.FromContext(.Context()); {
.PeerAddr = .Addr
}
.binlog.Log()
}
if := .RecvCompress(); .opts.dc != nil && .opts.dc.Type() == {
.dc = .opts.dc
} else if != "" && != encoding.Identity {
.decomp = encoding.GetCompressor()
if .decomp == nil {
:= status.Newf(codes.Unimplemented, "grpc: Decompressor is not installed for grpc-encoding %q", )
.WriteStatus(.s, )
return .Err()
}
}
.comp = encoding.GetCompressor()
if .comp != nil {
.SetSendCompress()
}
}
if != nil {
.tr.LazyLog(&.firstLine, false)
}
var error
var interface{}
if != nil {
= .serviceImpl
}
if .opts.streamInt == nil {
= .Handler(, )
} else {
:= &StreamServerInfo{
FullMethod: .Method(),
IsClientStream: .ClientStreams,
IsServerStream: .ServerStreams,
}
= .opts.streamInt(, , , .Handler)
}
if != nil {
, := status.FromError()
if ! {
= status.New(codes.Unknown, .Error())
= .Err()
}
if != nil {
.mu.Lock()
.trInfo.tr.LazyLog(stringer(.Message()), true)
.trInfo.tr.SetError()
.mu.Unlock()
}
.WriteStatus(.s, )
if .binlog != nil {
.binlog.Log(&binarylog.ServerTrailer{
Trailer: .s.Trailer(),
Err: ,
})
return
}
if != nil {
.mu.Lock()
.trInfo.tr.LazyLog(stringer("OK"), false)
.mu.Unlock()
}
= .WriteStatus(.s, statusOK)
if .binlog != nil {
.binlog.Log(&binarylog.ServerTrailer{
Trailer: .s.Trailer(),
Err: ,
})
}
return
}
func ( *Server) ( transport.ServerTransport, *transport.Stream, *traceInfo) {
:= .Method()
if != "" && [0] == '/' {
= [1:]
}
:= strings.LastIndex(, "/")
if == -1 {
if != nil {
.tr.LazyLog(&fmtStringer{"Malformed method name %q", []interface{}{}}, true)
.tr.SetError()
}
:= fmt.Sprintf("malformed method name: %q", .Method())
if := .WriteStatus(, status.New(codes.ResourceExhausted, )); != nil {
if != nil {
.tr.LazyLog(&fmtStringer{"%v", []interface{}{}}, true)
.tr.SetError()
}
channelz.Warningf(logger, .channelzID, "grpc: Server.handleStream failed to write status: %v", )
}
if != nil {
.tr.Finish()
}
return
}
:= [:]
:= [+1:]
, := .services[]
if {
if , := .methods[]; {
.processUnaryRPC(, , , , )
return
}
if , := .streams[]; {
.processStreamingRPC(, , , , )
return
}
if := .opts.unknownStreamDesc; != nil {
.processStreamingRPC(, , nil, , )
return
}
var string
if ! {
= fmt.Sprintf("unknown service %v", )
} else {
= fmt.Sprintf("unknown method %v for service %v", , )
}
if != nil {
.tr.LazyPrintf("%s", )
.tr.SetError()
}
if := .WriteStatus(, status.New(codes.Unimplemented, )); != nil {
if != nil {
.tr.LazyLog(&fmtStringer{"%v", []interface{}{}}, true)
.tr.SetError()
}
channelz.Warningf(logger, .channelzID, "grpc: Server.handleStream failed to write status: %v", )
}
if != nil {
.tr.Finish()
}
}
type streamKey struct{}
func ( context.Context) ServerTransportStream {
, := .Value(streamKey{}).(ServerTransportStream)
return
}
func ( *Server) () {
.quit.Fire()
defer .done.Fire()
.channelzRemoveOnce.Do(func() {
if channelz.IsOn() {
channelz.RemoveEntry(.channelzID)
}
})
.mu.Lock()
if .conns == nil {
.mu.Unlock()
return
}
for := range .lis {
.Close()
}
.lis = nil
if !.drain {
for := range .conns {
.Drain()
}
.drain = true
}
func ( context.Context) (string, bool) {
:= ServerTransportStreamFromContext()
if == nil {
return "", false
}
return .Method(), true
}
type channelzServer struct {
s *Server
}
func ( *channelzServer) () *channelz.ServerInternalMetric {
return .s.channelzMetric()
![]() |
The pages are generated with Golds v0.3.2-preview. (GOOS=darwin GOARCH=amd64) Golds is a Go 101 project developed by Tapir Liu. PR and bug reports are welcome and can be submitted to the issue list. Please follow @Go100and1 (reachable from the left QR code) to get the latest news of Golds. |