* * Copyright 2014 gRPC authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http:www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. *

package grpc

import (
	
	
	
	
	
	
	
	
	
	
	
	
	

	

	
	
	
	
	
	
	
	
	
	
	
	
	
	
	
	
)

const (
	defaultServerMaxReceiveMessageSize = 1024 * 1024 * 4
	defaultServerMaxSendMessageSize    = math.MaxInt32
)

var statusOK = status.New(codes.OK, "")
var logger = grpclog.Component("core")

type methodHandler func(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor UnaryServerInterceptor) (interface{}, error)
MethodDesc represents an RPC service's method specification.
ServiceDesc represents an RPC service's specification.
type ServiceDesc struct {
The pointer to the service interface. Used to check whether the user provided implementation satisfies the interface requirements.
	HandlerType interface{}
	Methods     []MethodDesc
	Streams     []StreamDesc
	Metadata    interface{}
}
serviceInfo wraps information about a service. It is very similar to ServiceDesc and is constructed from it for internal purposes.
Contains the implementation for the methods in this service.
Server is a gRPC server to serve RPC requests.
type Server struct {
	opts serverOptions

	mu       sync.Mutex // guards following
	lis      map[net.Listener]bool
	conns    map[transport.ServerTransport]bool
	serve    bool
	drain    bool
	cv       *sync.Cond              // signaled when connections close for GracefulStop
	services map[string]*serviceInfo // service name -> service info
	events   trace.EventLog

	quit               *grpcsync.Event
	done               *grpcsync.Event
	channelzRemoveOnce sync.Once
	serveWG            sync.WaitGroup // counts active Serve goroutines for GracefulStop

	channelzID int64 // channelz unique identification number
	czData     *channelzData

	serverWorkerChannels []chan *serverWorkerData
}

type serverOptions struct {
	creds                 credentials.TransportCredentials
	codec                 baseCodec
	cp                    Compressor
	dc                    Decompressor
	unaryInt              UnaryServerInterceptor
	streamInt             StreamServerInterceptor
	chainUnaryInts        []UnaryServerInterceptor
	chainStreamInts       []StreamServerInterceptor
	inTapHandle           tap.ServerInHandle
	statsHandler          stats.Handler
	maxConcurrentStreams  uint32
	maxReceiveMessageSize int
	maxSendMessageSize    int
	unknownStreamDesc     *StreamDesc
	keepaliveParams       keepalive.ServerParameters
	keepalivePolicy       keepalive.EnforcementPolicy
	initialWindowSize     int32
	initialConnWindowSize int32
	writeBufferSize       int
	readBufferSize        int
	connectionTimeout     time.Duration
	maxHeaderListSize     *uint32
	headerTableSize       *uint32
	numServerWorkers      uint32
}

var defaultServerOptions = serverOptions{
	maxReceiveMessageSize: defaultServerMaxReceiveMessageSize,
	maxSendMessageSize:    defaultServerMaxSendMessageSize,
	connectionTimeout:     120 * time.Second,
	writeBufferSize:       defaultWriteBufSize,
	readBufferSize:        defaultReadBufSize,
}
A ServerOption sets options such as credentials, codec and keepalive parameters, etc.
type ServerOption interface {
	apply(*serverOptions)
}
EmptyServerOption does not alter the server configuration. It can be embedded in another structure to build custom server options. This API is EXPERIMENTAL.
funcServerOption wraps a function that modifies serverOptions into an implementation of the ServerOption interface.
type funcServerOption struct {
	f func(*serverOptions)
}

func ( *funcServerOption) ( *serverOptions) {
	.f()
}

func ( func(*serverOptions)) *funcServerOption {
	return &funcServerOption{
		f: ,
	}
}
WriteBufferSize determines how much data can be batched before doing a write on the wire. The corresponding memory allocation for this buffer will be twice the size to keep syscalls low. The default value for this buffer is 32KB. Zero will disable the write buffer such that each write will be on underlying connection. Note: A Send call may not directly translate to a write.
ReadBufferSize lets you set the size of read buffer, this determines how much data can be read at most for one read syscall. The default value for this buffer is 32KB. Zero will disable read buffer for a connection so data framer can access the underlying conn directly.
InitialWindowSize returns a ServerOption that sets window size for stream. The lower bound for window size is 64K and any value smaller than that will be ignored.
InitialConnWindowSize returns a ServerOption that sets window size for a connection. The lower bound for window size is 64K and any value smaller than that will be ignored.
KeepaliveParams returns a ServerOption that sets keepalive and max-age parameters for the server.
func ( keepalive.ServerParameters) ServerOption {
	if .Time > 0 && .Time < time.Second {
		logger.Warning("Adjusting keepalive ping interval to minimum period of 1s")
		.Time = time.Second
	}

	return newFuncServerOption(func( *serverOptions) {
		.keepaliveParams = 
	})
}
KeepaliveEnforcementPolicy returns a ServerOption that sets keepalive enforcement policy for the server.
CustomCodec returns a ServerOption that sets a codec for message marshaling and unmarshaling. This will override any lookups by content-subtype for Codecs registered with RegisterCodec. Deprecated: register codecs using encoding.RegisterCodec. The server will automatically use registered codecs based on the incoming requests' headers. See also https://github.com/grpc/grpc-go/blob/master/Documentation/encoding.md#using-a-codec. Will be supported throughout 1.x.
func ( Codec) ServerOption {
	return newFuncServerOption(func( *serverOptions) {
		.codec = 
	})
}
RPCCompressor returns a ServerOption that sets a compressor for outbound messages. For backward compatibility, all outbound messages will be sent using this compressor, regardless of incoming message compression. By default, server messages will be sent using the same compressor with which request messages were sent. Deprecated: use encoding.RegisterCompressor instead. Will be supported throughout 1.x.
func ( Compressor) ServerOption {
	return newFuncServerOption(func( *serverOptions) {
		.cp = 
	})
}
RPCDecompressor returns a ServerOption that sets a decompressor for inbound messages. It has higher priority than decompressors registered via encoding.RegisterCompressor. Deprecated: use encoding.RegisterCompressor instead. Will be supported throughout 1.x.
MaxMsgSize returns a ServerOption to set the max message size in bytes the server can receive. If this is not set, gRPC uses the default limit. Deprecated: use MaxRecvMsgSize instead. Will be supported throughout 1.x.
func ( int) ServerOption {
	return MaxRecvMsgSize()
}
MaxRecvMsgSize returns a ServerOption to set the max message size in bytes the server can receive. If this is not set, gRPC uses the default 4MB.
MaxSendMsgSize returns a ServerOption to set the max message size in bytes the server can send. If this is not set, gRPC uses the default `math.MaxInt32`.
MaxConcurrentStreams returns a ServerOption that will apply a limit on the number of concurrent streams to each ServerTransport.
Creds returns a ServerOption that sets credentials for server connections.
UnaryInterceptor returns a ServerOption that sets the UnaryServerInterceptor for the server. Only one unary interceptor can be installed. The construction of multiple interceptors (e.g., chaining) can be implemented at the caller.
func ( UnaryServerInterceptor) ServerOption {
	return newFuncServerOption(func( *serverOptions) {
		if .unaryInt != nil {
			panic("The unary server interceptor was already set and may not be reset.")
		}
		.unaryInt = 
	})
}
ChainUnaryInterceptor returns a ServerOption that specifies the chained interceptor for unary RPCs. The first interceptor will be the outer most, while the last interceptor will be the inner most wrapper around the real call. All unary interceptors added by this method will be chained.
StreamInterceptor returns a ServerOption that sets the StreamServerInterceptor for the server. Only one stream interceptor can be installed.
func ( StreamServerInterceptor) ServerOption {
	return newFuncServerOption(func( *serverOptions) {
		if .streamInt != nil {
			panic("The stream server interceptor was already set and may not be reset.")
		}
		.streamInt = 
	})
}
ChainStreamInterceptor returns a ServerOption that specifies the chained interceptor for streaming RPCs. The first interceptor will be the outer most, while the last interceptor will be the inner most wrapper around the real call. All stream interceptors added by this method will be chained.
InTapHandle returns a ServerOption that sets the tap handle for all the server transport to be created. Only one can be installed.
func ( tap.ServerInHandle) ServerOption {
	return newFuncServerOption(func( *serverOptions) {
		if .inTapHandle != nil {
			panic("The tap handle was already set and may not be reset.")
		}
		.inTapHandle = 
	})
}
StatsHandler returns a ServerOption that sets the stats handler for the server.
UnknownServiceHandler returns a ServerOption that allows for adding a custom unknown service handler. The provided method is a bidi-streaming RPC service handler that will be invoked instead of returning the "unimplemented" gRPC error whenever a request is received for an unregistered service or method. The handling function and stream interceptor (if set) have full access to the ServerStream, including its Context.
func ( StreamHandler) ServerOption {
	return newFuncServerOption(func( *serverOptions) {
		.unknownStreamDesc = &StreamDesc{
			StreamName: "unknown_service_handler",
We need to assume that the users of the streamHandler will want to use both.
ConnectionTimeout returns a ServerOption that sets the timeout for connection establishment (up to and including HTTP/2 handshaking) for all new connections. If this is not set, the default is 120 seconds. A zero or negative value will result in an immediate timeout. This API is EXPERIMENTAL.
MaxHeaderListSize returns a ServerOption that sets the max (uncompressed) size of header list that the server is prepared to accept.
HeaderTableSize returns a ServerOption that sets the size of dynamic header table for stream. This API is EXPERIMENTAL.
NumStreamWorkers returns a ServerOption that sets the number of worker goroutines that should be used to process incoming streams. Setting this to zero (default) will disable workers and spawn a new goroutine for each stream. This API is EXPERIMENTAL.
TODO: If/when this API gets stabilized (i.e. stream workers become the only way streams are processed), change the behavior of the zero value to a sane default. Preliminary experiments suggest that a value equal to the number of CPUs available is most performant; requires thorough testing.
	return newFuncServerOption(func( *serverOptions) {
		.numServerWorkers = 
	})
}
serverWorkerResetThreshold defines how often the stack must be reset. Every N requests, by spawning a new goroutine in its place, a worker can reset its stack so that large stacks don't live in memory forever. 2^16 should allow each goroutine stack to live for at least a few seconds in a typical workload (assuming a QPS of a few thousand requests/sec).
serverWorkers blocks on a *transport.Stream channel forever and waits for data to be fed by serveStreams. This allows different requests to be processed by the same goroutine, removing the need for expensive stack re-allocations (see the runtime.morestack problem [1]). [1] https://github.com/golang/go/issues/18138
To make sure all server workers don't reset at the same time, choose a random number of iterations before resetting.
	 := serverWorkerResetThreshold + grpcrand.Intn(serverWorkerResetThreshold)
	for  := 0;  < ; ++ {
		,  := <-
		if ! {
			return
		}
		.handleStream(.st, .stream, .traceInfo(.st, .stream))
		.wg.Done()
	}
	go .()
}
initServerWorkers creates worker goroutines and channels to process incoming connections to reduce the time spent overall on runtime.morestack.
NewServer creates a gRPC server which has no service registered and has not started to accept requests yet.
func ( ...ServerOption) *Server {
	 := defaultServerOptions
	for ,  := range  {
		.apply(&)
	}
	 := &Server{
		lis:      make(map[net.Listener]bool),
		opts:     ,
		conns:    make(map[transport.ServerTransport]bool),
		services: make(map[string]*serviceInfo),
		quit:     grpcsync.NewEvent(),
		done:     grpcsync.NewEvent(),
		czData:   new(channelzData),
	}
	chainUnaryServerInterceptors()
	chainStreamServerInterceptors()
	.cv = sync.NewCond(&.mu)
	if EnableTracing {
		, , ,  := runtime.Caller(1)
		.events = trace.NewEventLog("grpc.Server", fmt.Sprintf("%s:%d", , ))
	}

	if .opts.numServerWorkers > 0 {
		.initServerWorkers()
	}

	if channelz.IsOn() {
		.channelzID = channelz.RegisterServer(&channelzServer{}, "")
	}
	return 
}
printf records an event in s's event log, unless s has been stopped. REQUIRES s.mu is held.
func ( *Server) ( string,  ...interface{}) {
	if .events != nil {
		.events.Printf(, ...)
	}
}
errorf records an error in s's event log, unless s has been stopped. REQUIRES s.mu is held.
func ( *Server) ( string,  ...interface{}) {
	if .events != nil {
		.events.Errorf(, ...)
	}
}
ServiceRegistrar wraps a single method that supports service registration. It enables users to pass concrete types other than grpc.Server to the service registration methods exported by the IDL generated code.
RegisterService registers a service and its implementation to the concrete type implementing this interface. It may not be called once the server has started serving. desc describes the service and its methods and handlers. impl is the service implementation which is passed to the method handlers.
	RegisterService(desc *ServiceDesc, impl interface{})
}
RegisterService registers a service and its implementation to the gRPC server. It is called from the IDL generated code. This must be called before invoking Serve. If ss is non-nil (for legacy code), its type is checked to ensure it implements sd.HandlerType.
func ( *Server) ( *ServiceDesc,  interface{}) {
	if  != nil {
		 := reflect.TypeOf(.HandlerType).Elem()
		 := reflect.TypeOf()
		if !.Implements() {
			logger.Fatalf("grpc: Server.RegisterService found the handler of type %v that does not satisfy %v", , )
		}
	}
	.register(, )
}

func ( *Server) ( *ServiceDesc,  interface{}) {
	.mu.Lock()
	defer .mu.Unlock()
	.printf("RegisterService(%q)", .ServiceName)
	if .serve {
		logger.Fatalf("grpc: Server.RegisterService after Server.Serve for %q", .ServiceName)
	}
	if ,  := .services[.ServiceName];  {
		logger.Fatalf("grpc: Server.RegisterService found duplicate service registration for %q", .ServiceName)
	}
	 := &serviceInfo{
		serviceImpl: ,
		methods:     make(map[string]*MethodDesc),
		streams:     make(map[string]*StreamDesc),
		mdata:       .Metadata,
	}
	for  := range .Methods {
		 := &.Methods[]
		.methods[.MethodName] = 
	}
	for  := range .Streams {
		 := &.Streams[]
		.streams[.StreamName] = 
	}
	.services[.ServiceName] = 
}
MethodInfo contains the information of an RPC including its method name and type.
Name is the method name only, without the service name or package name.
IsClientStream indicates whether the RPC is a client streaming RPC.
IsServerStream indicates whether the RPC is a server streaming RPC.
ServiceInfo contains unary RPC method info, streaming RPC method info and metadata for a service.
type ServiceInfo struct {
Metadata is the metadata specified in ServiceDesc when registering service.
	Metadata interface{}
}
GetServiceInfo returns a map from service names to ServiceInfo. Service names include the package names, in the form of <package>.<service>.
func ( *Server) () map[string]ServiceInfo {
	 := make(map[string]ServiceInfo)
	for ,  := range .services {
		 := make([]MethodInfo, 0, len(.methods)+len(.streams))
		for  := range .methods {
			 = append(, MethodInfo{
				Name:           ,
				IsClientStream: false,
				IsServerStream: false,
			})
		}
		for ,  := range .streams {
			 = append(, MethodInfo{
				Name:           ,
				IsClientStream: .ClientStreams,
				IsServerStream: .ServerStreams,
			})
		}

		[] = ServiceInfo{
			Methods:  ,
			Metadata: .mdata,
		}
	}
	return 
}
ErrServerStopped indicates that the operation is now illegal because of the server being stopped.
var ErrServerStopped = errors.New("grpc: the server has been stopped")

func ( *Server) ( net.Conn) (net.Conn, credentials.AuthInfo, error) {
	if .opts.creds == nil {
		return , nil, nil
	}
	return .opts.creds.ServerHandshake()
}

type listenSocket struct {
	net.Listener
	channelzID int64
}

func ( *listenSocket) () *channelz.SocketInternalMetric {
	return &channelz.SocketInternalMetric{
		SocketOptions: channelz.GetSocketOption(.Listener),
		LocalAddr:     .Listener.Addr(),
	}
}

func ( *listenSocket) () error {
	 := .Listener.Close()
	if channelz.IsOn() {
		channelz.RemoveEntry(.channelzID)
	}
	return 
}
Serve accepts incoming connections on the listener lis, creating a new ServerTransport and service goroutine for each. The service goroutines read gRPC requests and then call the registered handlers to reply to them. Serve returns when lis.Accept fails with fatal errors. lis will be closed when this method returns. Serve will return a non-nil error unless Stop or GracefulStop is called.
func ( *Server) ( net.Listener) error {
	.mu.Lock()
	.printf("serving")
	.serve = true
Serve called after Stop or GracefulStop.
		.mu.Unlock()
		.Close()
		return ErrServerStopped
	}

	.serveWG.Add(1)
	defer func() {
		.serveWG.Done()
Stop or GracefulStop called; block until done and return nil.
			<-.done.Done()
		}
	}()

	 := &listenSocket{Listener: }
	.lis[] = true

	if channelz.IsOn() {
		.channelzID = channelz.RegisterListenSocket(, .channelzID, .Addr().String())
	}
	.mu.Unlock()

	defer func() {
		.mu.Lock()
		if .lis != nil && .lis[] {
			.Close()
			delete(.lis, )
		}
		.mu.Unlock()
	}()

	var  time.Duration // how long to sleep on accept failure

	for {
		,  := .Accept()
		if  != nil {
			if ,  := .(interface {
				() bool
			});  && .() {
				if  == 0 {
					 = 5 * time.Millisecond
				} else {
					 *= 2
				}
				if  := 1 * time.Second;  >  {
					 = 
				}
				.mu.Lock()
				.printf("Accept error: %v; retrying in %v", , )
				.mu.Unlock()
				 := time.NewTimer()
				select {
				case <-.C:
				case <-.quit.Done():
					.Stop()
					return nil
				}
				continue
			}
			.mu.Lock()
			.printf("done serving; Accept = %v", )
			.mu.Unlock()

			if .quit.HasFired() {
				return nil
			}
			return 
		}
Start a new goroutine to deal with rawConn so we don't stall this Accept loop goroutine. Make sure we account for the goroutine so GracefulStop doesn't nil out s.conns before this conn can be added.
		.serveWG.Add(1)
		go func() {
			.handleRawConn()
			.serveWG.Done()
		}()
	}
}
handleRawConn forks a goroutine to handle a just-accepted connection that has not had any I/O performed on it yet.
func ( *Server) ( net.Conn) {
	if .quit.HasFired() {
		.Close()
		return
	}
	.SetDeadline(time.Now().Add(.opts.connectionTimeout))
	, ,  := .useTransportAuthenticator()
ErrConnDispatched means that the connection was dispatched away from gRPC; those connections should be left open.
		if  != credentials.ErrConnDispatched {
			.mu.Lock()
			.errorf("ServerHandshake(%q) failed: %v", .RemoteAddr(), )
			.mu.Unlock()
			channelz.Warningf(logger, .channelzID, "grpc: Server.Serve failed to complete security handshake from %q: %v", .RemoteAddr(), )
			.Close()
		}
		.SetDeadline(time.Time{})
		return
	}
Finish handshaking (HTTP2)
	 := .newHTTP2Transport(, )
	if  == nil {
		return
	}

	.SetDeadline(time.Time{})
	if !.addConn() {
		return
	}
	go func() {
		.serveStreams()
		.removeConn()
	}()
}
newHTTP2Transport sets up a http/2 transport (using the gRPC http2 server transport in transport/http2_server.go).
If all stream workers are busy, fallback to the default code path.
				go func() {
					.handleStream(, , .traceInfo(, ))
					.Done()
				}()
			}
		} else {
			go func() {
				defer .Done()
				.handleStream(, , .traceInfo(, ))
			}()
		}
	}, func( context.Context,  string) context.Context {
		if !EnableTracing {
			return 
		}
		 := trace.New("grpc.Recv."+methodFamily(), )
		return trace.NewContext(, )
	})
	.Wait()
}

var _ http.Handler = (*Server)(nil)
ServeHTTP implements the Go standard library's http.Handler interface by responding to the gRPC request r, by looking up the requested gRPC method in the gRPC server s. The provided HTTP request must have arrived on an HTTP/2 connection. When using the Go standard library's server, practically this means that the Request must also have arrived over TLS. To share one port (such as 443 for https) between gRPC and an existing http.Handler, use a root http.Handler such as: if r.ProtoMajor == 2 && strings.HasPrefix( r.Header.Get("Content-Type"), "application/grpc") { grpcServer.ServeHTTP(w, r) } else { yourMux.ServeHTTP(w, r) } Note that ServeHTTP uses Go's HTTP/2 server implementation which is totally separate from grpc-go's HTTP/2 server. Performance and features may vary between the two paths. ServeHTTP does not support some gRPC features available through grpc-go's HTTP/2 server, and it is currently EXPERIMENTAL and subject to change.
func ( *Server) ( http.ResponseWriter,  *http.Request) {
	,  := transport.NewServerHandlerTransport(, , .opts.statsHandler)
	if  != nil {
		http.Error(, .Error(), http.StatusInternalServerError)
		return
	}
	if !.addConn() {
		return
	}
	defer .removeConn()
	.serveStreams()
}
traceInfo returns a traceInfo and associates it with stream, if tracing is enabled. If tracing is not enabled, it returns nil.
func ( *Server) ( transport.ServerTransport,  *transport.Stream) ( *traceInfo) {
	if !EnableTracing {
		return nil
	}
	,  := trace.FromContext(.Context())
	if ! {
		return nil
	}

	 = &traceInfo{
		tr: ,
		firstLine: firstLine{
			client:     false,
			remoteAddr: .RemoteAddr(),
		},
	}
	if ,  := .Context().Deadline();  {
		.firstLine.deadline = time.Until()
	}
	return 
}

func ( *Server) ( transport.ServerTransport) bool {
	.mu.Lock()
	defer .mu.Unlock()
	if .conns == nil {
		.Close()
		return false
	}
Transport added after we drained our existing conns: drain it immediately.
		.Drain()
	}
	.conns[] = true
	return true
}

func ( *Server) ( transport.ServerTransport) {
	.mu.Lock()
	defer .mu.Unlock()
	if .conns != nil {
		delete(.conns, )
		.cv.Broadcast()
	}
}

func ( *Server) () *channelz.ServerInternalMetric {
	return &channelz.ServerInternalMetric{
		CallsStarted:             atomic.LoadInt64(&.czData.callsStarted),
		CallsSucceeded:           atomic.LoadInt64(&.czData.callsSucceeded),
		CallsFailed:              atomic.LoadInt64(&.czData.callsFailed),
		LastCallStartedTimestamp: time.Unix(0, atomic.LoadInt64(&.czData.lastCallStartedTime)),
	}
}

func ( *Server) () {
	atomic.AddInt64(&.czData.callsStarted, 1)
	atomic.StoreInt64(&.czData.lastCallStartedTime, time.Now().UnixNano())
}

func ( *Server) () {
	atomic.AddInt64(&.czData.callsSucceeded, 1)
}

func ( *Server) () {
	atomic.AddInt64(&.czData.callsFailed, 1)
}

func ( *Server) ( transport.ServerTransport,  *transport.Stream,  interface{},  Compressor,  *transport.Options,  encoding.Compressor) error {
	,  := encode(.getCodec(.ContentSubtype()), )
	if  != nil {
		channelz.Error(logger, .channelzID, "grpc: server failed to encode response: ", )
		return 
	}
	,  := compress(, , )
	if  != nil {
		channelz.Error(logger, .channelzID, "grpc: server failed to compress response: ", )
		return 
	}
TODO(dfawley): should we be checking len(data) instead?
	if len() > .opts.maxSendMessageSize {
		return status.Errorf(codes.ResourceExhausted, "grpc: trying to send message larger than max (%d vs. %d)", len(), .opts.maxSendMessageSize)
	}
	 = .Write(, , , )
	if  == nil && .opts.statsHandler != nil {
		.opts.statsHandler.HandleRPC(.Context(), outPayload(false, , , , time.Now()))
	}
	return 
}
chainUnaryServerInterceptors chains all unary server interceptors into one.
Prepend opts.unaryInt to the chaining interceptors if it exists, since unaryInt will be executed before any other chained interceptors.
	 := .opts.chainUnaryInts
	if .opts.unaryInt != nil {
		 = append([]UnaryServerInterceptor{.opts.unaryInt}, .opts.chainUnaryInts...)
	}

	var  UnaryServerInterceptor
	if len() == 0 {
		 = nil
	} else if len() == 1 {
		 = [0]
	} else {
		 = func( context.Context,  interface{},  *UnaryServerInfo,  UnaryHandler) (interface{}, error) {
			return [0](, , , getChainUnaryHandler(, 0, , ))
		}
	}

	.opts.unaryInt = 
}
getChainUnaryHandler recursively generate the chained UnaryHandler
func ( []UnaryServerInterceptor,  int,  *UnaryServerInfo,  UnaryHandler) UnaryHandler {
	if  == len()-1 {
		return 
	}

	return func( context.Context,  interface{}) (interface{}, error) {
		return [+1](, , , (, +1, , ))
	}
}

func ( *Server) ( transport.ServerTransport,  *transport.Stream,  *serviceInfo,  *MethodDesc,  *traceInfo) ( error) {
	 := .opts.statsHandler
	if  != nil ||  != nil || channelz.IsOn() {
		if channelz.IsOn() {
			.incrCallsStarted()
		}
		var  *stats.Begin
		if  != nil {
			 := time.Now()
			 = &stats.Begin{
				BeginTime: ,
			}
			.HandleRPC(.Context(), )
		}
		if  != nil {
			.tr.LazyLog(&.firstLine, false)
The deferred error handling for tracing, stats handler and channelz are combined into one function to reduce stack usage -- a defer takes ~56-64 bytes on the stack, so overflowing the stack will require a stack re-allocation, which is expensive. To maintain behavior similar to separate deferred statements, statements should be executed in the reverse order. That is, tracing first, stats handler second, and channelz last. Note that panics *within* defers will lead to different behavior, but that's an acceptable compromise; that would be undefined behavior territory anyway.
		defer func() {
			if  != nil {
				if  != nil &&  != io.EOF {
					.tr.LazyLog(&fmtStringer{"%v", []interface{}{}}, true)
					.tr.SetError()
				}
				.tr.Finish()
			}

			if  != nil {
				 := &stats.End{
					BeginTime: .BeginTime,
					EndTime:   time.Now(),
				}
				if  != nil &&  != io.EOF {
					.Error = toRPCErr()
				}
				.HandleRPC(.Context(), )
			}

			if channelz.IsOn() {
				if  != nil &&  != io.EOF {
					.incrCallsFailed()
				} else {
					.incrCallsSucceeded()
				}
			}
		}()
	}

	 := binarylog.GetMethodLogger(.Method())
	if  != nil {
		 := .Context()
		,  := metadata.FromIncomingContext()
		 := &binarylog.ClientHeader{
			Header:     ,
			MethodName: .Method(),
			PeerAddr:   nil,
		}
		if ,  := .Deadline();  {
			.Timeout = time.Until()
			if .Timeout < 0 {
				.Timeout = 0
			}
		}
		if  := [":authority"]; len() > 0 {
			.Authority = [0]
		}
		if ,  := peer.FromContext();  {
			.PeerAddr = .Addr
		}
		.Log()
	}
comp and cp are used for compression. decomp and dc are used for decompression. If comp and decomp are both set, they are the same; however they are kept separate to ensure that at most one of the compressor/decompressor variable pairs are set for use later.
	var ,  encoding.Compressor
	var  Compressor
	var  Decompressor
If dc is set and matches the stream's compression, use it. Otherwise, try to find a matching registered compressor for decomp.
	if  := .RecvCompress(); .opts.dc != nil && .opts.dc.Type() ==  {
		 = .opts.dc
	} else if  != "" &&  != encoding.Identity {
		 = encoding.GetCompressor()
		if  == nil {
			 := status.Newf(codes.Unimplemented, "grpc: Decompressor is not installed for grpc-encoding %q", )
			.WriteStatus(, )
			return .Err()
		}
	}
If cp is set, use it. Otherwise, attempt to compress the response using the incoming message compression method. NOTE: this needs to be ahead of all handling, https://github.com/grpc/grpc-go/issues/686.
	if .opts.cp != nil {
		 = .opts.cp
		.SetSendCompress(.Type())
Legacy compressor not specified; attempt to respond with same encoding.
		 = encoding.GetCompressor()
		if  != nil {
			.SetSendCompress()
		}
	}

	var  *payloadInfo
	if  != nil ||  != nil {
		 = &payloadInfo{}
	}
	,  := recvAndDecompress(&parser{r: }, , , .opts.maxReceiveMessageSize, , )
	if  != nil {
		if  := .WriteStatus(, status.Convert());  != nil {
			channelz.Warningf(logger, .channelzID, "grpc: Server.processUnaryRPC failed to write status %v", )
		}
		return 
	}
	if channelz.IsOn() {
		.IncrMsgRecv()
	}
	 := func( interface{}) error {
		if  := .getCodec(.ContentSubtype()).Unmarshal(, );  != nil {
			return status.Errorf(codes.Internal, "grpc: error unmarshalling request: %v", )
		}
		if  != nil {
			.HandleRPC(.Context(), &stats.InPayload{
				RecvTime:   time.Now(),
				Payload:    ,
				WireLength: .wireLength,
				Data:       ,
				Length:     len(),
			})
		}
		if  != nil {
			.Log(&binarylog.ClientMessage{
				Message: ,
			})
		}
		if  != nil {
			.tr.LazyLog(&payload{sent: false, msg: }, true)
		}
		return nil
	}
	 := NewContextWithServerTransportStream(.Context(), )
	,  := .Handler(.serviceImpl, , , .opts.unaryInt)
	if  != nil {
		,  := status.FromError()
Convert appErr if it is not a grpc status error.
			 = status.Error(codes.Unknown, .Error())
			, _ = status.FromError()
		}
		if  != nil {
			.tr.LazyLog(stringer(.Message()), true)
			.tr.SetError()
		}
		if  := .WriteStatus(, );  != nil {
			channelz.Warningf(logger, .channelzID, "grpc: Server.processUnaryRPC failed to write status: %v", )
		}
		if  != nil {
Only log serverHeader if there was header. Otherwise it can be trailer only.
				.Log(&binarylog.ServerHeader{
					Header: ,
				})
			}
			.Log(&binarylog.ServerTrailer{
				Trailer: .Trailer(),
				Err:     ,
			})
		}
		return 
	}
	if  != nil {
		.tr.LazyLog(stringer("OK"), false)
	}
	 := &transport.Options{Last: true}

	if  := .sendResponse(, , , , , );  != nil {
The entire stream is done (for unary RPC only).
			return 
		}
		if ,  := status.FromError();  {
			if  := .WriteStatus(, );  != nil {
				channelz.Warningf(logger, .channelzID, "grpc: Server.processUnaryRPC failed to write status: %v", )
			}
		} else {
			switch st := .(type) {
Nothing to do here.
			default:
				panic(fmt.Sprintf("grpc: Unexpected error (%T) from sendResponse: %v", , ))
			}
		}
		if  != nil {
			,  := .Header()
			.Log(&binarylog.ServerHeader{
				Header: ,
			})
			.Log(&binarylog.ServerTrailer{
				Trailer: .Trailer(),
				Err:     ,
			})
		}
		return 
	}
	if  != nil {
		,  := .Header()
		.Log(&binarylog.ServerHeader{
			Header: ,
		})
		.Log(&binarylog.ServerMessage{
			Message: ,
		})
	}
	if channelz.IsOn() {
		.IncrMsgSent()
	}
	if  != nil {
		.tr.LazyLog(&payload{sent: true, msg: }, true)
TODO: Should we be logging if writing status failed here, like above? Should the logging be in WriteStatus? Should we ignore the WriteStatus error or allow the stats handler to see it?
	 = .WriteStatus(, statusOK)
	if  != nil {
		.Log(&binarylog.ServerTrailer{
			Trailer: .Trailer(),
			Err:     ,
		})
	}
	return 
}
chainStreamServerInterceptors chains all stream server interceptors into one.
Prepend opts.streamInt to the chaining interceptors if it exists, since streamInt will be executed before any other chained interceptors.
	 := .opts.chainStreamInts
	if .opts.streamInt != nil {
		 = append([]StreamServerInterceptor{.opts.streamInt}, .opts.chainStreamInts...)
	}

	var  StreamServerInterceptor
	if len() == 0 {
		 = nil
	} else if len() == 1 {
		 = [0]
	} else {
		 = func( interface{},  ServerStream,  *StreamServerInfo,  StreamHandler) error {
			return [0](, , , getChainStreamHandler(, 0, , ))
		}
	}

	.opts.streamInt = 
}
getChainStreamHandler recursively generate the chained StreamHandler
func ( []StreamServerInterceptor,  int,  *StreamServerInfo,  StreamHandler) StreamHandler {
	if  == len()-1 {
		return 
	}

	return func( interface{},  ServerStream) error {
		return [+1](, , , (, +1, , ))
	}
}

func ( *Server) ( transport.ServerTransport,  *transport.Stream,  *serviceInfo,  *StreamDesc,  *traceInfo) ( error) {
	if channelz.IsOn() {
		.incrCallsStarted()
	}
	 := .opts.statsHandler
	var  *stats.Begin
	if  != nil {
		 := time.Now()
		 = &stats.Begin{
			BeginTime: ,
		}
		.HandleRPC(.Context(), )
	}
	 := NewContextWithServerTransportStream(.Context(), )
	 := &serverStream{
		ctx:                   ,
		t:                     ,
		s:                     ,
		p:                     &parser{r: },
		codec:                 .getCodec(.ContentSubtype()),
		maxReceiveMessageSize: .opts.maxReceiveMessageSize,
		maxSendMessageSize:    .opts.maxSendMessageSize,
		trInfo:                ,
		statsHandler:          ,
	}

See comment in processUnaryRPC on defers.
		defer func() {
			if  != nil {
				.mu.Lock()
				if  != nil &&  != io.EOF {
					.trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{}}, true)
					.trInfo.tr.SetError()
				}
				.trInfo.tr.Finish()
				.trInfo.tr = nil
				.mu.Unlock()
			}

			if  != nil {
				 := &stats.End{
					BeginTime: .BeginTime,
					EndTime:   time.Now(),
				}
				if  != nil &&  != io.EOF {
					.Error = toRPCErr()
				}
				.HandleRPC(.Context(), )
			}

			if channelz.IsOn() {
				if  != nil &&  != io.EOF {
					.incrCallsFailed()
				} else {
					.incrCallsSucceeded()
				}
			}
		}()
	}

	.binlog = binarylog.GetMethodLogger(.Method())
	if .binlog != nil {
		,  := metadata.FromIncomingContext()
		 := &binarylog.ClientHeader{
			Header:     ,
			MethodName: .Method(),
			PeerAddr:   nil,
		}
		if ,  := .Deadline();  {
			.Timeout = time.Until()
			if .Timeout < 0 {
				.Timeout = 0
			}
		}
		if  := [":authority"]; len() > 0 {
			.Authority = [0]
		}
		if ,  := peer.FromContext(.Context());  {
			.PeerAddr = .Addr
		}
		.binlog.Log()
	}
If dc is set and matches the stream's compression, use it. Otherwise, try to find a matching registered compressor for decomp.
	if  := .RecvCompress(); .opts.dc != nil && .opts.dc.Type() ==  {
		.dc = .opts.dc
	} else if  != "" &&  != encoding.Identity {
		.decomp = encoding.GetCompressor()
		if .decomp == nil {
			 := status.Newf(codes.Unimplemented, "grpc: Decompressor is not installed for grpc-encoding %q", )
			.WriteStatus(.s, )
			return .Err()
		}
	}
If cp is set, use it. Otherwise, attempt to compress the response using the incoming message compression method. NOTE: this needs to be ahead of all handling, https://github.com/grpc/grpc-go/issues/686.
	if .opts.cp != nil {
		.cp = .opts.cp
		.SetSendCompress(.opts.cp.Type())
Legacy compressor not specified; attempt to respond with same encoding.
		.comp = encoding.GetCompressor()
		if .comp != nil {
			.SetSendCompress()
		}
	}

	if  != nil {
		.tr.LazyLog(&.firstLine, false)
	}
	var  error
	var  interface{}
	if  != nil {
		 = .serviceImpl
	}
	if .opts.streamInt == nil {
		 = .Handler(, )
	} else {
		 := &StreamServerInfo{
			FullMethod:     .Method(),
			IsClientStream: .ClientStreams,
			IsServerStream: .ServerStreams,
		}
		 = .opts.streamInt(, , , .Handler)
	}
	if  != nil {
		,  := status.FromError()
		if ! {
			 = status.New(codes.Unknown, .Error())
			 = .Err()
		}
		if  != nil {
			.mu.Lock()
			.trInfo.tr.LazyLog(stringer(.Message()), true)
			.trInfo.tr.SetError()
			.mu.Unlock()
		}
		.WriteStatus(.s, )
		if .binlog != nil {
			.binlog.Log(&binarylog.ServerTrailer{
				Trailer: .s.Trailer(),
				Err:     ,
			})
TODO: Should we log an error from WriteStatus here and below?
		return 
	}
	if  != nil {
		.mu.Lock()
		.trInfo.tr.LazyLog(stringer("OK"), false)
		.mu.Unlock()
	}
	 = .WriteStatus(.s, statusOK)
	if .binlog != nil {
		.binlog.Log(&binarylog.ServerTrailer{
			Trailer: .s.Trailer(),
			Err:     ,
		})
	}
	return 
}

func ( *Server) ( transport.ServerTransport,  *transport.Stream,  *traceInfo) {
	 := .Method()
	if  != "" && [0] == '/' {
		 = [1:]
	}
	 := strings.LastIndex(, "/")
	if  == -1 {
		if  != nil {
			.tr.LazyLog(&fmtStringer{"Malformed method name %q", []interface{}{}}, true)
			.tr.SetError()
		}
		 := fmt.Sprintf("malformed method name: %q", .Method())
		if  := .WriteStatus(, status.New(codes.ResourceExhausted, ));  != nil {
			if  != nil {
				.tr.LazyLog(&fmtStringer{"%v", []interface{}{}}, true)
				.tr.SetError()
			}
			channelz.Warningf(logger, .channelzID, "grpc: Server.handleStream failed to write status: %v", )
		}
		if  != nil {
			.tr.Finish()
		}
		return
	}
	 := [:]
	 := [+1:]

	,  := .services[]
	if  {
		if ,  := .methods[];  {
			.processUnaryRPC(, , , , )
			return
		}
		if ,  := .streams[];  {
			.processStreamingRPC(, , , , )
			return
		}
Unknown service, or known server unknown method.
	if  := .opts.unknownStreamDesc;  != nil {
		.processStreamingRPC(, , nil, , )
		return
	}
	var  string
	if ! {
		 = fmt.Sprintf("unknown service %v", )
	} else {
		 = fmt.Sprintf("unknown method %v for service %v", , )
	}
	if  != nil {
		.tr.LazyPrintf("%s", )
		.tr.SetError()
	}
	if  := .WriteStatus(, status.New(codes.Unimplemented, ));  != nil {
		if  != nil {
			.tr.LazyLog(&fmtStringer{"%v", []interface{}{}}, true)
			.tr.SetError()
		}
		channelz.Warningf(logger, .channelzID, "grpc: Server.handleStream failed to write status: %v", )
	}
	if  != nil {
		.tr.Finish()
	}
}
The key to save ServerTransportStream in the context.
type streamKey struct{}
NewContextWithServerTransportStream creates a new context from ctx and attaches stream to it. This API is EXPERIMENTAL.
ServerTransportStream is a minimal interface that a transport stream must implement. This can be used to mock an actual transport stream for tests of handler code that use, for example, grpc.SetHeader (which requires some stream to be in context). See also NewContextWithServerTransportStream. This API is EXPERIMENTAL.
type ServerTransportStream interface {
	Method() string
	SetHeader(md metadata.MD) error
	SendHeader(md metadata.MD) error
	SetTrailer(md metadata.MD) error
}
ServerTransportStreamFromContext returns the ServerTransportStream saved in ctx. Returns nil if the given context has no stream associated with it (which implies it is not an RPC invocation context). This API is EXPERIMENTAL.
Stop stops the gRPC server. It immediately closes all open connections and listeners. It cancels all active RPCs on the server side and the corresponding pending RPCs on the client side will get notified by connection errors.
func ( *Server) () {
	.quit.Fire()

	defer func() {
		.serveWG.Wait()
		.done.Fire()
	}()

	.channelzRemoveOnce.Do(func() {
		if channelz.IsOn() {
			channelz.RemoveEntry(.channelzID)
		}
	})

	.mu.Lock()
	 := .lis
	.lis = nil
	 := .conns
interrupt GracefulStop if Stop and GracefulStop are called concurrently.
	.cv.Broadcast()
	.mu.Unlock()

	for  := range  {
		.Close()
	}
	for  := range  {
		.Close()
	}
	if .opts.numServerWorkers > 0 {
		.stopServerWorkers()
	}

	.mu.Lock()
	if .events != nil {
		.events.Finish()
		.events = nil
	}
	.mu.Unlock()
}
GracefulStop stops the gRPC server gracefully. It stops the server from accepting new connections and RPCs and blocks until all the pending RPCs are finished.
func ( *Server) () {
	.quit.Fire()
	defer .done.Fire()

	.channelzRemoveOnce.Do(func() {
		if channelz.IsOn() {
			channelz.RemoveEntry(.channelzID)
		}
	})
	.mu.Lock()
	if .conns == nil {
		.mu.Unlock()
		return
	}

	for  := range .lis {
		.Close()
	}
	.lis = nil
	if !.drain {
		for  := range .conns {
			.Drain()
		}
		.drain = true
	}
Wait for serving threads to be ready to exit. Only then can we be sure no new conns will be created.
	.mu.Unlock()
	.serveWG.Wait()
	.mu.Lock()

	for len(.conns) != 0 {
		.cv.Wait()
	}
	.conns = nil
	if .events != nil {
		.events.Finish()
		.events = nil
	}
	.mu.Unlock()
}
contentSubtype must be lowercase cannot return nil
func ( *Server) ( string) baseCodec {
	if .opts.codec != nil {
		return .opts.codec
	}
	if  == "" {
		return encoding.GetCodec(proto.Name)
	}
	 := encoding.GetCodec()
	if  == nil {
		return encoding.GetCodec(proto.Name)
	}
	return 
}
SetHeader sets the header metadata. When called multiple times, all the provided metadata will be merged. All the metadata will be sent out when one of the following happens: - grpc.SendHeader() is called; - The first response is sent out; - An RPC status is sent out (error or success).
func ( context.Context,  metadata.MD) error {
	if .Len() == 0 {
		return nil
	}
	 := ServerTransportStreamFromContext()
	if  == nil {
		return status.Errorf(codes.Internal, "grpc: failed to fetch the stream from the context %v", )
	}
	return .SetHeader()
}
SendHeader sends header metadata. It may be called at most once. The provided md and headers set by SetHeader() will be sent.
func ( context.Context,  metadata.MD) error {
	 := ServerTransportStreamFromContext()
	if  == nil {
		return status.Errorf(codes.Internal, "grpc: failed to fetch the stream from the context %v", )
	}
	if  := .SendHeader();  != nil {
		return toRPCErr()
	}
	return nil
}
SetTrailer sets the trailer metadata that will be sent when an RPC returns. When called more than once, all the provided metadata will be merged.
func ( context.Context,  metadata.MD) error {
	if .Len() == 0 {
		return nil
	}
	 := ServerTransportStreamFromContext()
	if  == nil {
		return status.Errorf(codes.Internal, "grpc: failed to fetch the stream from the context %v", )
	}
	return .SetTrailer()
}
Method returns the method string for the server context. The returned string is in the format of "/service/method".
func ( context.Context) (string, bool) {
	 := ServerTransportStreamFromContext()
	if  == nil {
		return "", false
	}
	return .Method(), true
}

type channelzServer struct {
	s *Server
}

func ( *channelzServer) () *channelz.ServerInternalMetric {
	return .s.channelzMetric()