* * Copyright 2016 gRPC authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http:www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. *
This file is the implementation of a gRPC server using HTTP/2 which uses the standard Go http2 Server implementation (via the http.Handler interface), rather than speaking low-level HTTP/2 frames itself. It is the implementation of *grpc.Server.ServeHTTP.

package transport

import (
	
	
	
	
	
	
	
	
	
	

	
	
	
	
	
	
	
	
	
)
NewServerHandlerTransport returns a ServerTransport handling gRPC from inside an http.Handler. It requires that the http Server supports HTTP/2.
func ( http.ResponseWriter,  *http.Request,  stats.Handler) (ServerTransport, error) {
	if .ProtoMajor != 2 {
		return nil, errors.New("gRPC requires HTTP/2")
	}
	if .Method != "POST" {
		return nil, errors.New("invalid gRPC request method")
	}
TODO: do we assume contentType is lowercase? we did before
	,  := grpcutil.ContentSubtype()
	if ! {
		return nil, errors.New("invalid gRPC request content-type")
	}
	if ,  := .(http.Flusher); ! {
		return nil, errors.New("gRPC requires a ResponseWriter supporting http.Flusher")
	}

	 := &serverHandlerTransport{
		rw:             ,
		req:            ,
		closedCh:       make(chan struct{}),
		writes:         make(chan func()),
		contentType:    ,
		contentSubtype: ,
		stats:          ,
	}

	if  := .Header.Get("grpc-timeout");  != "" {
		,  := decodeTimeout()
		if  != nil {
			return nil, status.Errorf(codes.Internal, "malformed time-out: %v", )
		}
		.timeoutSet = true
		.timeout = 
	}

	 := []string{"content-type", }
	if .Host != "" {
		 = append(, ":authority", .Host)
	}
	for ,  := range .Header {
		 = strings.ToLower()
		if isReservedHeader() && !isWhitelistedHeader() {
			continue
		}
		for ,  := range  {
			,  := decodeMetadataHeader(, )
			if  != nil {
				return nil, status.Errorf(codes.Internal, "malformed binary metadata: %v", )
			}
			 = append(, , )
		}
	}
	.headerMD = metadata.Pairs(...)

	return , nil
}
serverHandlerTransport is an implementation of ServerTransport which replies to exactly one gRPC request (exactly one HTTP request), using the net/http.Handler interface. This http.Handler is guaranteed at this point to be speaking over HTTP/2, so it's able to speak valid gRPC.
writes is a channel of code to run serialized in the ServeHTTP (HandleStreams) goroutine. The channel is closed when WriteStatus is called.
	writes chan func()
block concurrent WriteStatus calls e.g. grpc/(*serverStream).SendMsg/RecvMsg
we just mirror the request content-type
we store both contentType and contentSubtype so we don't keep recreating them TODO make sure this is consistent across handler_server and http2_server
strAddr is a net.Addr backed by either a TCP "ip:port" string, or the empty string if unknown.
type strAddr string

func ( strAddr) () string {
Per the documentation on net/http.Request.RemoteAddr, if this is set, it's set to the IP:port of the peer (hence, TCP): https://golang.org/pkg/net/http/#Request If we want to support Unix sockets later, we can add our own grpc-specific convention within the grpc codebase to set RemoteAddr to a different format, or probably better: we can attach it to the context and use that from serverHandlerTransport.RemoteAddr.
		return "tcp"
	}
	return ""
}

func ( strAddr) () string { return string() }
do runs fn in the ServeHTTP goroutine.
func ( *serverHandlerTransport) ( func()) error {
	select {
	case <-.closedCh:
		return ErrConnClosing
	case .writes <- :
		return nil
	}
}

func ( *serverHandlerTransport) ( *Stream,  *status.Status) error {
	.writeStatusMu.Lock()
	defer .writeStatusMu.Unlock()

	 := .updateHeaderSent()
	 := .do(func() {
		if ! {
			.writePendingHeaders()
		}
And flush, in case no header or body has been sent yet. This forces a separation of headers and trailers if this is the first call (for example, in end2end tests's TestNoService).
		.rw.(http.Flusher).Flush()

		 := .rw.Header()
		.Set("Grpc-Status", fmt.Sprintf("%d", .Code()))
		if  := .Message();  != "" {
			.Set("Grpc-Message", encodeGrpcMessage())
		}

		if  := .Proto();  != nil && len(.Details) > 0 {
			,  := proto.Marshal()
TODO: return error instead, when callers are able to handle it.
				panic()
			}

			.Set("Grpc-Status-Details-Bin", encodeBinHeader())
		}

		if  := .Trailer(); len() > 0 {
Clients don't tolerate reading restricted headers after some non restricted ones were sent.
				if isReservedHeader() {
					continue
				}
http2 ResponseWriter mechanism to send undeclared Trailers after the headers have possibly been written.
					.Add(http2.TrailerPrefix+, encodeMetadataHeader(, ))
				}
			}
		}
	})

	if  == nil { // transport has not been closed
Note: The trailer fields are compressed with hpack after this call returns. No WireLength field is set here.
			.stats.HandleRPC(.Context(), &stats.OutTrailer{
				Trailer: .trailer.Copy(),
			})
		}
	}
	.Close()
	return 
}
writePendingHeaders sets common and custom headers on the first write call (Write, WriteHeader, or WriteStatus)
writeCommonHeaders sets common headers on the first write call (Write, WriteHeader, or WriteStatus).
func ( *serverHandlerTransport) ( *Stream) {
	 := .rw.Header()
	["Date"] = nil // suppress Date to make tests happy; TODO: restore
	.Set("Content-Type", .contentType)
Predeclare trailers we'll set later in WriteStatus (after the body). This is a SHOULD in the HTTP RFC, and the way you add (known) Trailers per the net/http.ResponseWriter contract. See https://golang.org/pkg/net/http/#ResponseWriter and https://golang.org/pkg/net/http/#example_ResponseWriter_trailers
	.Add("Trailer", "Grpc-Status")
	.Add("Trailer", "Grpc-Message")
	.Add("Trailer", "Grpc-Status-Details-Bin")

	if .sendCompress != "" {
		.Set("Grpc-Encoding", .sendCompress)
	}
}
writeCustomHeaders sets custom headers set on the stream via SetHeader on the first write call (Write, WriteHeader, or WriteStatus).
func ( *serverHandlerTransport) ( *Stream) {
	 := .rw.Header()

	.hdrMu.Lock()
	for ,  := range .header {
		if isReservedHeader() {
			continue
		}
		for ,  := range  {
			.Add(, encodeMetadataHeader(, ))
		}
	}

	.hdrMu.Unlock()
}

func ( *serverHandlerTransport) ( *Stream,  []byte,  []byte,  *Options) error {
	 := .updateHeaderSent()
	return .do(func() {
		if ! {
			.writePendingHeaders()
		}
		.rw.Write()
		.rw.Write()
		.rw.(http.Flusher).Flush()
	})
}

func ( *serverHandlerTransport) ( *Stream,  metadata.MD) error {
	if  := .SetHeader();  != nil {
		return 
	}

	 := .updateHeaderSent()
	 := .do(func() {
		if ! {
			.writePendingHeaders()
		}

		.rw.WriteHeader(200)
		.rw.(http.Flusher).Flush()
	})

	if  == nil {
Note: The header fields are compressed with hpack after this call returns. No WireLength field is set here.
			.stats.HandleRPC(.Context(), &stats.OutHeader{
				Header:      .Copy(),
				Compression: .sendCompress,
			})
		}
	}
	return 
}

With this transport type there will be exactly 1 stream: this HTTP request.

	 := .req.Context()
	var  context.CancelFunc
	if .timeoutSet {
		,  = context.WithTimeout(, .timeout)
	} else {
		,  = context.WithCancel()
	}
requestOver is closed when the status has been written via WriteStatus.
	 := make(chan struct{})
	go func() {
		select {
		case <-:
		case <-.closedCh:
		case <-.req.Context().Done():
		}
		()
		.Close()
	}()

	 := .req

	 := &Stream{
		id:             0, // irrelevant
		requestRead:    func(int) {},
		cancel:         ,
		buf:            newRecvBuffer(),
		st:             ,
		method:         .URL.Path,
		recvCompress:   .Header.Get("grpc-encoding"),
		contentSubtype: .contentSubtype,
	}
	 := &peer.Peer{
		Addr: .RemoteAddr(),
	}
	if .TLS != nil {
		.AuthInfo = credentials.TLSInfo{State: *.TLS, CommonAuthInfo: credentials.CommonAuthInfo{SecurityLevel: credentials.PrivacyAndIntegrity}}
	}
	 = metadata.NewIncomingContext(, .headerMD)
	.ctx = peer.NewContext(, )
	if .stats != nil {
		.ctx = .stats.TagRPC(.ctx, &stats.RPCTagInfo{FullMethodName: .method})
		 := &stats.InHeader{
			FullMethod:  .method,
			RemoteAddr:  .RemoteAddr(),
			Compression: .recvCompress,
		}
		.stats.HandleRPC(.ctx, )
	}
	.trReader = &transportReader{
		reader:        &recvBufferReader{ctx: .ctx, ctxDone: .ctx.Done(), recv: .buf, freeBuffer: func(*bytes.Buffer) {}},
		windowHandler: func(int) {},
	}
readerDone is closed when the Body.Read-ing goroutine exits.
	 := make(chan struct{})
	go func() {
		defer close()
TODO: minimize garbage, optimize recvBuffer code/ownership
		const  = 8196
		for  := make([]byte, ); ; {
			,  := .Body.Read()
			if  > 0 {
				.buf.put(recvMsg{buffer: bytes.NewBuffer([::])})
				 = [:]
			}
			if  != nil {
				.buf.put(recvMsg{err: mapRecvMsgError()})
				return
			}
			if len() == 0 {
				 = make([]byte, )
			}
		}
	}()
startStream is provided by the *grpc.Server's serveStreams. It starts a goroutine serving s and exits immediately. The goroutine that is started is the one that then calls into ht, calling WriteHeader, Write, WriteStatus, Close, etc.
	()

	.runStream()
	close()
Wait for reading goroutine to finish.
	.Body.Close()
	<-
}

func ( *serverHandlerTransport) () {
	for {
		select {
		case  := <-.writes:
			()
		case <-.closedCh:
			return
		}
	}
}

func ( *serverHandlerTransport) () {}

func ( *serverHandlerTransport) () {}

func ( *serverHandlerTransport) () {
	panic("Drain() is not implemented")
}
mapRecvMsgError returns the non-nil err into the appropriate error value as expected by callers of *grpc.parser.recvMsg. In particular, in can only be: * io.EOF * io.ErrUnexpectedEOF * of type transport.ConnectionError * an error from the status package
func ( error) error {
	if  == io.EOF ||  == io.ErrUnexpectedEOF {
		return 
	}
	if ,  := .(http2.StreamError);  {
		if ,  := http2ErrConvTab[.Code];  {
			return status.Error(, .Error())
		}
	}
	if strings.Contains(.Error(), "body closed by handler") {
		return status.Error(codes.Canceled, .Error())
	}
	return connectionErrorf(true, , .Error())