Source File
handler_server.go
Belonging Package
google.golang.org/grpc/internal/transport
package transport
import (
)
, := grpcutil.ContentSubtype()
if ! {
return nil, errors.New("invalid gRPC request content-type")
}
if , := .(http.Flusher); ! {
return nil, errors.New("gRPC requires a ResponseWriter supporting http.Flusher")
}
:= &serverHandlerTransport{
rw: ,
req: ,
closedCh: make(chan struct{}),
writes: make(chan func()),
contentType: ,
contentSubtype: ,
stats: ,
}
if := .Header.Get("grpc-timeout"); != "" {
, := decodeTimeout()
if != nil {
return nil, status.Errorf(codes.Internal, "malformed time-out: %v", )
}
.timeoutSet = true
.timeout =
}
:= []string{"content-type", }
if .Host != "" {
= append(, ":authority", .Host)
}
for , := range .Header {
= strings.ToLower()
if isReservedHeader() && !isWhitelistedHeader() {
continue
}
for , := range {
, := decodeMetadataHeader(, )
if != nil {
return nil, status.Errorf(codes.Internal, "malformed binary metadata: %v", )
}
= append(, , )
}
}
.headerMD = metadata.Pairs(...)
return , nil
}
writes chan func()
contentSubtype string
stats stats.Handler
}
func ( *serverHandlerTransport) () error {
.closeOnce.Do(.closeCloseChanOnce)
return nil
}
func ( *serverHandlerTransport) () { close(.closedCh) }
func ( *serverHandlerTransport) () net.Addr { return strAddr(.req.RemoteAddr) }
func ( *serverHandlerTransport) ( func()) error {
select {
case <-.closedCh:
return ErrConnClosing
case .writes <- :
return nil
}
}
func ( *serverHandlerTransport) ( *Stream, *status.Status) error {
.writeStatusMu.Lock()
defer .writeStatusMu.Unlock()
:= .updateHeaderSent()
:= .do(func() {
if ! {
.writePendingHeaders()
}
panic()
}
.Set("Grpc-Status-Details-Bin", encodeBinHeader())
}
if := .Trailer(); len() > 0 {
if isReservedHeader() {
continue
}
.Add(http2.TrailerPrefix+, encodeMetadataHeader(, ))
}
}
}
})
if == nil { // transport has not been closed
func ( *serverHandlerTransport) ( *Stream) {
.writeCommonHeaders()
.writeCustomHeaders()
}
func ( *serverHandlerTransport) ( *Stream) {
:= .rw.Header()
["Date"] = nil // suppress Date to make tests happy; TODO: restore
.Set("Content-Type", .contentType)
.Add("Trailer", "Grpc-Status")
.Add("Trailer", "Grpc-Message")
.Add("Trailer", "Grpc-Status-Details-Bin")
if .sendCompress != "" {
.Set("Grpc-Encoding", .sendCompress)
}
}
func ( *serverHandlerTransport) ( *Stream) {
:= .rw.Header()
.hdrMu.Lock()
for , := range .header {
if isReservedHeader() {
continue
}
for , := range {
.Add(, encodeMetadataHeader(, ))
}
}
.hdrMu.Unlock()
}
func ( *serverHandlerTransport) ( *Stream, []byte, []byte, *Options) error {
:= .updateHeaderSent()
return .do(func() {
if ! {
.writePendingHeaders()
}
.rw.Write()
.rw.Write()
.rw.(http.Flusher).Flush()
})
}
func ( *serverHandlerTransport) ( *Stream, metadata.MD) error {
if := .SetHeader(); != nil {
return
}
:= .updateHeaderSent()
:= .do(func() {
if ! {
.writePendingHeaders()
}
.rw.WriteHeader(200)
.rw.(http.Flusher).Flush()
})
if == nil {
.stats.HandleRPC(.Context(), &stats.OutHeader{
Header: .Copy(),
Compression: .sendCompress,
})
}
}
return
}
:= .req.Context()
var context.CancelFunc
if .timeoutSet {
, = context.WithTimeout(, .timeout)
} else {
, = context.WithCancel()
}
:= make(chan struct{})
go func() {
select {
case <-:
case <-.closedCh:
case <-.req.Context().Done():
}
()
.Close()
}()
:= .req
:= &Stream{
id: 0, // irrelevant
requestRead: func(int) {},
cancel: ,
buf: newRecvBuffer(),
st: ,
method: .URL.Path,
recvCompress: .Header.Get("grpc-encoding"),
contentSubtype: .contentSubtype,
}
:= &peer.Peer{
Addr: .RemoteAddr(),
}
if .TLS != nil {
.AuthInfo = credentials.TLSInfo{State: *.TLS, CommonAuthInfo: credentials.CommonAuthInfo{SecurityLevel: credentials.PrivacyAndIntegrity}}
}
= metadata.NewIncomingContext(, .headerMD)
.ctx = peer.NewContext(, )
if .stats != nil {
.ctx = .stats.TagRPC(.ctx, &stats.RPCTagInfo{FullMethodName: .method})
:= &stats.InHeader{
FullMethod: .method,
RemoteAddr: .RemoteAddr(),
Compression: .recvCompress,
}
.stats.HandleRPC(.ctx, )
}
.trReader = &transportReader{
reader: &recvBufferReader{ctx: .ctx, ctxDone: .ctx.Done(), recv: .buf, freeBuffer: func(*bytes.Buffer) {}},
windowHandler: func(int) {},
}
.Body.Close()
<-
}
func ( *serverHandlerTransport) () {
for {
select {
case := <-.writes:
()
case <-.closedCh:
return
}
}
}
func ( *serverHandlerTransport) () {}
func ( *serverHandlerTransport) () {}
func ( *serverHandlerTransport) () {
panic("Drain() is not implemented")
}
func ( error) error {
if == io.EOF || == io.ErrUnexpectedEOF {
return
}
if , := .(http2.StreamError); {
if , := http2ErrConvTab[.Code]; {
return status.Error(, .Error())
}
}
if strings.Contains(.Error(), "body closed by handler") {
return status.Error(codes.Canceled, .Error())
}
return connectionErrorf(true, , .Error())
![]() |
The pages are generated with Golds v0.3.2-preview. (GOOS=darwin GOARCH=amd64) Golds is a Go 101 project developed by Tapir Liu. PR and bug reports are welcome and can be submitted to the issue list. Please follow @Go100and1 (reachable from the left QR code) to get the latest news of Golds. |