* * Copyright 2014 gRPC authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http:www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. *

package transport

import (
	
	
	
	
	
	
	
	
	
	
	

	
	
	
	

	
	
	
	
	
	
	
	
	
	
)

ErrIllegalHeaderWrite indicates that setting header is illegal because of the stream's state.
ErrHeaderListSizeLimitViolation indicates that the header list size is larger than the limit set by peer.
	ErrHeaderListSizeLimitViolation = errors.New("transport: trying to send header list size larger than the limit set by peer")
)
serverConnectionCounter counts the number of connections a server has seen (equal to the number of http2Servers created). Must be accessed atomically.
http2Server implements the ServerTransport interface with HTTP2.
type http2Server struct {
	lastRead    int64 // Keep this field 64-bit aligned. Accessed atomically.
	ctx         context.Context
	done        chan struct{}
	conn        net.Conn
	loopy       *loopyWriter
	readerDone  chan struct{} // sync point to enable testing.
	writerDone  chan struct{} // sync point to enable testing.
	remoteAddr  net.Addr
	localAddr   net.Addr
	maxStreamID uint32               // max stream ID ever seen
	authInfo    credentials.AuthInfo // auth info about the connection
	inTapHandle tap.ServerInHandle
The max number of concurrent streams.
controlBuf delivers all the control related tasks (e.g., window updates, reset streams, and various settings) to the controller.
Keepalive and max-age parameters for the server.
Keepalive enforcement policy.
The time instance last ping was received.
Number of times the client has violated keepalive ping policy so far.
Flag to signify that number of ping strikes should be reset to 0. This is set whenever data or header frames are sent. 1 means yes.
drainChan is initialized when drain(...) is called the first time. After which the server writes out the first GoAway(with ID 2^31-1) frame. Then an independent goroutine will be launched to later send the second GoAway. During this time we don't want to write another first GoAway(with ID 2^31 -1) frame. Thus call to drain(...) will be a no-op if drainChan is already initialized since draining is already underway.
idle is the time instant when the connection went idle. This is either the beginning of the connection or when the number of RPCs go down to 0. When the connection is busy, this value is set to 0.
Fields below are for channelz metric collection.
	channelzID int64 // channelz unique identification number
	czData     *channelzData
	bufferPool *bufferPool

	connectionID uint64
}
newHTTP2Server constructs a ServerTransport based on HTTP2. ConnectionError is returned if something goes wrong.
func ( net.Conn,  *ServerConfig) ( ServerTransport,  error) {
	 := .WriteBufferSize
	 := .ReadBufferSize
	 := defaultServerMaxHeaderListSize
	if .MaxHeaderListSize != nil {
		 = *.MaxHeaderListSize
	}
Send initial settings as connection preface to client.
TODO(zhaoq): Have a better way to signal "no limit" because 0 is permitted in the HTTP2 spec.
	 := .MaxStreams
	if  == 0 {
		 = math.MaxUint32
	} else {
		 = append(, http2.Setting{
			ID:  http2.SettingMaxConcurrentStreams,
			Val: ,
		})
	}
	 := true
	 := int32(initialWindowSize)
	if .InitialWindowSize >= defaultWindowSize {
		 = .InitialWindowSize
		 = false
	}
	 := int32(initialWindowSize)
	if .InitialConnWindowSize >= defaultWindowSize {
		 = .InitialConnWindowSize
		 = false
	}
	if  != defaultWindowSize {
		 = append(, http2.Setting{
			ID:  http2.SettingInitialWindowSize,
			Val: uint32()})
	}
	if .MaxHeaderListSize != nil {
		 = append(, http2.Setting{
			ID:  http2.SettingMaxHeaderListSize,
			Val: *.MaxHeaderListSize,
		})
	}
	if .HeaderTableSize != nil {
		 = append(, http2.Setting{
			ID:  http2.SettingHeaderTableSize,
			Val: *.HeaderTableSize,
		})
	}
	if  := .fr.WriteSettings(...);  != nil {
		return nil, connectionErrorf(false, , "transport: %v", )
Adjust the connection flow control window if needed.
	if  := uint32( - defaultWindowSize);  > 0 {
		if  := .fr.WriteWindowUpdate(0, );  != nil {
			return nil, connectionErrorf(false, , "transport: %v", )
		}
	}
	 := .KeepaliveParams
	if .MaxConnectionIdle == 0 {
		.MaxConnectionIdle = defaultMaxConnectionIdle
	}
	if .MaxConnectionAge == 0 {
		.MaxConnectionAge = defaultMaxConnectionAge
Add a jitter to MaxConnectionAge.
	.MaxConnectionAge += getJitter(.MaxConnectionAge)
	if .MaxConnectionAgeGrace == 0 {
		.MaxConnectionAgeGrace = defaultMaxConnectionAgeGrace
	}
	if .Time == 0 {
		.Time = defaultServerKeepaliveTime
	}
	if .Timeout == 0 {
		.Timeout = defaultServerKeepaliveTimeout
	}
	 := .KeepalivePolicy
	if .MinTime == 0 {
		.MinTime = defaultKeepalivePolicyMinTime
	}
	 := make(chan struct{})
	 := &http2Server{
		ctx:               context.Background(),
		done:              ,
		conn:              ,
		remoteAddr:        .RemoteAddr(),
		localAddr:         .LocalAddr(),
		authInfo:          .AuthInfo,
		framer:            ,
		readerDone:        make(chan struct{}),
		writerDone:        make(chan struct{}),
		maxStreams:        ,
		inTapHandle:       .InTapHandle,
		fc:                &trInFlow{limit: uint32()},
		state:             reachable,
		activeStreams:     make(map[uint32]*Stream),
		stats:             .StatsHandler,
		kp:                ,
		idle:              time.Now(),
		kep:               ,
		initialWindowSize: ,
		czData:            new(channelzData),
		bufferPool:        newBufferPool(),
	}
	.controlBuf = newControlBuffer(.done)
	if  {
		.bdpEst = &bdpEstimator{
			bdp:               initialWindowSize,
			updateFlowControl: .updateFlowControl,
		}
	}
	if .stats != nil {
		.ctx = .stats.TagConn(.ctx, &stats.ConnTagInfo{
			RemoteAddr: .remoteAddr,
			LocalAddr:  .localAddr,
		})
		 := &stats.ConnBegin{}
		.stats.HandleConn(.ctx, )
	}
	if channelz.IsOn() {
		.channelzID = channelz.RegisterNormalSocket(, .ChannelzParentID, fmt.Sprintf("%s -> %s", .remoteAddr, .localAddr))
	}

	.connectionID = atomic.AddUint64(&serverConnectionCounter, 1)

	.framer.writer.Flush()

	defer func() {
		if  != nil {
			.Close()
		}
	}()
Check the validity of client preface.
	 := make([]byte, len(clientPreface))
	if ,  := io.ReadFull(.conn, );  != nil {
		return nil, connectionErrorf(false, , "transport: http2Server.HandleStreams failed to receive the preface from client: %v", )
	}
	if !bytes.Equal(, clientPreface) {
		return nil, connectionErrorf(false, nil, "transport: http2Server.HandleStreams received bogus greeting from client: %q", )
	}

	,  := .framer.fr.ReadFrame()
	if  == io.EOF ||  == io.ErrUnexpectedEOF {
		return nil, 
	}
	if  != nil {
		return nil, connectionErrorf(false, , "transport: http2Server.HandleStreams failed to read initial settings frame: %v", )
	}
	atomic.StoreInt64(&.lastRead, time.Now().UnixNano())
	,  := .(*http2.SettingsFrame)
	if ! {
		return nil, connectionErrorf(false, nil, "transport: http2Server.HandleStreams saw invalid preface type %T from client", )
	}
	.handleSettings()

	go func() {
		.loopy = newLoopyWriter(serverSide, .framer, .controlBuf, .bdpEst)
		.loopy.ssGoAwayHandler = .outgoingGoAwayHandler
		if  := .loopy.run();  != nil {
			if logger.V(logLevel) {
				logger.Errorf("transport: loopyWriter.run returning. Err: %v", )
			}
		}
		.conn.Close()
		close(.writerDone)
	}()
	go .keepalive()
	return , nil
}
operateHeader takes action on the decoded headers.
func ( *http2Server) ( *http2.MetaHeadersFrame,  func(*Stream),  func(context.Context, string) context.Context) ( bool) {
	 := .Header().StreamID
	 := &decodeState{
		serverSide: true,
	}
	if  := .decodeHeader();  != nil {
		if ,  := status.FromError();  {
			.controlBuf.put(&cleanupStream{
				streamID: ,
				rst:      true,
				rstCode:  statusCodeConvTab[.Code()],
				onWrite:  func() {},
			})
		}
		return false
	}

	 := newRecvBuffer()
	 := &Stream{
		id:             ,
		st:             ,
		buf:            ,
		fc:             &inFlow{limit: uint32(.initialWindowSize)},
		recvCompress:   .data.encoding,
		method:         .data.method,
		contentSubtype: .data.contentSubtype,
	}
s is just created by the caller. No lock needed.
		.state = streamReadDone
	}
	if .data.timeoutSet {
		.ctx, .cancel = context.WithTimeout(.ctx, .data.timeout)
	} else {
		.ctx, .cancel = context.WithCancel(.ctx)
	}
	 := &peer.Peer{
		Addr: .remoteAddr,
Attach Auth info if there is any.
	if .authInfo != nil {
		.AuthInfo = .authInfo
	}
Attach the received metadata to the context.
	if len(.data.mdata) > 0 {
		.ctx = metadata.NewIncomingContext(.ctx, .data.mdata)
	}
	if .data.statsTags != nil {
		.ctx = stats.SetIncomingTags(.ctx, .data.statsTags)
	}
	if .data.statsTrace != nil {
		.ctx = stats.SetIncomingTrace(.ctx, .data.statsTrace)
	}
	if .inTapHandle != nil {
		var  error
		 := &tap.Info{
			FullMethodName: .data.method,
		}
		.ctx,  = .inTapHandle(.ctx, )
		if  != nil {
			if logger.V(logLevel) {
				logger.Warningf("transport: http2Server.operateHeaders got an error from InTapHandle: %v", )
			}
			.controlBuf.put(&cleanupStream{
				streamID: .id,
				rst:      true,
				rstCode:  http2.ErrCodeRefusedStream,
				onWrite:  func() {},
			})
			.cancel()
			return false
		}
	}
	.mu.Lock()
	if .state != reachable {
		.mu.Unlock()
		.cancel()
		return false
	}
	if uint32(len(.activeStreams)) >= .maxStreams {
		.mu.Unlock()
		.controlBuf.put(&cleanupStream{
			streamID: ,
			rst:      true,
			rstCode:  http2.ErrCodeRefusedStream,
			onWrite:  func() {},
		})
		.cancel()
		return false
	}
	if %2 != 1 ||  <= .maxStreamID {
illegal gRPC stream id.
		if logger.V(logLevel) {
			logger.Errorf("transport: http2Server.HandleStreams received an illegal stream id: %v", )
		}
		.cancel()
		return true
	}
	.maxStreamID = 
	.activeStreams[] = 
	if len(.activeStreams) == 1 {
		.idle = time.Time{}
	}
	.mu.Unlock()
	if channelz.IsOn() {
		atomic.AddInt64(&.czData.streamsStarted, 1)
		atomic.StoreInt64(&.czData.lastStreamCreatedTime, time.Now().UnixNano())
	}
	.requestRead = func( int) {
		.adjustWindow(, uint32())
	}
	.ctx = (.ctx, .method)
	if .stats != nil {
		.ctx = .stats.TagRPC(.ctx, &stats.RPCTagInfo{FullMethodName: .method})
		 := &stats.InHeader{
			FullMethod:  .method,
			RemoteAddr:  .remoteAddr,
			LocalAddr:   .localAddr,
			Compression: .recvCompress,
			WireLength:  int(.Header().Length),
			Header:      metadata.MD(.data.mdata).Copy(),
		}
		.stats.HandleRPC(.ctx, )
	}
	.ctxDone = .ctx.Done()
	.wq = newWriteQuota(defaultWriteQuota, .ctxDone)
	.trReader = &transportReader{
		reader: &recvBufferReader{
			ctx:        .ctx,
			ctxDone:    .ctxDone,
			recv:       .buf,
			freeBuffer: .bufferPool.put,
		},
		windowHandler: func( int) {
			.updateWindow(, uint32())
		},
Register the stream with loopy.
	.controlBuf.put(&registerStream{
		streamID: .id,
		wq:       .wq,
	})
	()
	return false
}
HandleStreams receives incoming streams using the given handler. This is typically run in a separate goroutine. traceCtx attaches trace to ctx and returns the new context.
func ( *http2Server) ( func(*Stream),  func(context.Context, string) context.Context) {
	defer close(.readerDone)
	for {
		.controlBuf.throttle()
		,  := .framer.fr.ReadFrame()
		atomic.StoreInt64(&.lastRead, time.Now().UnixNano())
		if  != nil {
			if ,  := .(http2.StreamError);  {
				if logger.V(logLevel) {
					logger.Warningf("transport: http2Server.HandleStreams encountered http2.StreamError: %v", )
				}
				.mu.Lock()
				 := .activeStreams[.StreamID]
				.mu.Unlock()
				if  != nil {
					.closeStream(, true, .Code, false)
				} else {
					.controlBuf.put(&cleanupStream{
						streamID: .StreamID,
						rst:      true,
						rstCode:  .Code,
						onWrite:  func() {},
					})
				}
				continue
			}
			if  == io.EOF ||  == io.ErrUnexpectedEOF {
				.Close()
				return
			}
			if logger.V(logLevel) {
				logger.Warningf("transport: http2Server.HandleStreams failed to read frame: %v", )
			}
			.Close()
			return
		}
		switch frame := .(type) {
		case *http2.MetaHeadersFrame:
			if .operateHeaders(, , ) {
				.Close()
				break
			}
		case *http2.DataFrame:
			.handleData()
		case *http2.RSTStreamFrame:
			.handleRSTStream()
		case *http2.SettingsFrame:
			.handleSettings()
		case *http2.PingFrame:
			.handlePing()
		case *http2.WindowUpdateFrame:
			.handleWindowUpdate()
TODO: Handle GoAway from the client appropriately.
		default:
			if logger.V(logLevel) {
				logger.Errorf("transport: http2Server.HandleStreams found unhandled frame type %v.", )
			}
		}
	}
}

func ( *http2Server) ( http2.Frame) (*Stream, bool) {
	.mu.Lock()
	defer .mu.Unlock()
The transport is closing.
		return nil, false
	}
	,  := .activeStreams[.Header().StreamID]
The stream is already done.
		return nil, false
	}
	return , true
}
adjustWindow sends out extra window update over the initial window size of stream if the application is requesting data larger in size than the window.
func ( *http2Server) ( *Stream,  uint32) {
	if  := .fc.maybeAdjust();  > 0 {
		.controlBuf.put(&outgoingWindowUpdate{streamID: .id, increment: })
	}

}
updateWindow adjusts the inbound quota for the stream and the transport. Window updates will deliver to the controller for sending when the cumulative quota exceeds the corresponding threshold.
func ( *http2Server) ( *Stream,  uint32) {
	if  := .fc.onRead();  > 0 {
		.controlBuf.put(&outgoingWindowUpdate{streamID: .id,
			increment: ,
		})
	}
}
updateFlowControl updates the incoming flow control windows for the transport and the stream based on the current bdp estimation.
func ( *http2Server) ( uint32) {
	.mu.Lock()
	for ,  := range .activeStreams {
		.fc.newLimit()
	}
	.initialWindowSize = int32()
	.mu.Unlock()
	.controlBuf.put(&outgoingWindowUpdate{
		streamID:  0,
		increment: .fc.newLimit(),
	})
	.controlBuf.put(&outgoingSettings{
		ss: []http2.Setting{
			{
				ID:  http2.SettingInitialWindowSize,
				Val: ,
			},
		},
	})

}

func ( *http2Server) ( *http2.DataFrame) {
	 := .Header().Length
	var  bool
	if .bdpEst != nil {
		 = .bdpEst.add()
Decouple connection's flow control from application's read. An update on connection's flow control should not depend on whether user application has read the data or not. Such a restriction is already imposed on the stream's flow control, and therefore the sender will be blocked anyways. Decoupling the connection flow control will prevent other active(fast) streams from starving in presence of slow or inactive streams.
	if  := .fc.onData();  > 0 {
		.controlBuf.put(&outgoingWindowUpdate{
			streamID:  0,
			increment: ,
		})
	}
Avoid excessive ping detection (e.g. in an L7 proxy) by sending a window update prior to the BDP ping.
		if  := .fc.reset();  > 0 {
			.controlBuf.put(&outgoingWindowUpdate{
				streamID:  0,
				increment: ,
			})
		}
		.controlBuf.put(bdpPing)
Select the right stream to dispatch.
	,  := .getStream()
	if ! {
		return
	}
	if .getState() == streamReadDone {
		.closeStream(, true, http2.ErrCodeStreamClosed, false)
		return
	}
	if  > 0 {
		if  := .fc.onData();  != nil {
			.closeStream(, true, http2.ErrCodeFlowControl, false)
			return
		}
		if .Header().Flags.Has(http2.FlagDataPadded) {
			if  := .fc.onRead( - uint32(len(.Data())));  > 0 {
				.controlBuf.put(&outgoingWindowUpdate{.id, })
			}
TODO(bradfitz, zhaoq): A copy is required here because there is no guarantee f.Data() is consumed before the arrival of next frame. Can this copy be eliminated?
		if len(.Data()) > 0 {
			 := .bufferPool.get()
			.Reset()
			.Write(.Data())
			.write(recvMsg{buffer: })
		}
	}
Received the end of stream from the client.
If the stream is not deleted from the transport's active streams map, then do a regular close stream.
	if ,  := .getStream();  {
		.closeStream(, false, 0, false)
		return
If the stream is already deleted from the active streams map, then put a cleanupStream item into controlbuf to delete the stream from loopy writer's established streams map.
	.controlBuf.put(&cleanupStream{
		streamID: .Header().StreamID,
		rst:      false,
		rstCode:  0,
		onWrite:  func() {},
	})
}

func ( *http2Server) ( *http2.SettingsFrame) {
	if .IsAck() {
		return
	}
	var  []http2.Setting
	var  []func()
	.ForeachSetting(func( http2.Setting) error {
		switch .ID {
		case http2.SettingMaxHeaderListSize:
			 = append(, func() {
				.maxSendHeaderListSize = new(uint32)
				*.maxSendHeaderListSize = .Val
			})
		default:
			 = append(, )
		}
		return nil
	})
	.controlBuf.executeAndPut(func(interface{}) bool {
		for ,  := range  {
			()
		}
		return true
	}, &incomingSettings{
		ss: ,
	})
}

const (
	maxPingStrikes     = 2
	defaultPingTimeout = 2 * time.Hour
)

func ( *http2Server) ( *http2.PingFrame) {
	if .IsAck() {
		if .Data == goAwayPing.data && .drainChan != nil {
			close(.drainChan)
			return
Maybe it's a BDP ping.
		if .bdpEst != nil {
			.bdpEst.calculate(.Data)
		}
		return
	}
	 := &ping{ack: true}
	copy(.data[:], .Data[:])
	.controlBuf.put()

	 := time.Now()
	defer func() {
		.lastPingAt = 
A reset ping strikes means that we don't need to check for policy violation for this ping and the pingStrikes counter should be set to 0.
	if atomic.CompareAndSwapUint32(&.resetPingStrikes, 1, 0) {
		.pingStrikes = 0
		return
	}
	.mu.Lock()
	 := len(.activeStreams)
	.mu.Unlock()
Keepalive shouldn't be active thus, this new ping should have come after at least defaultPingTimeout.
Check if keepalive policy is respected.
		if .lastPingAt.Add(.kep.MinTime).After() {
			.pingStrikes++
		}
	}

Send goaway and close the connection.
		if logger.V(logLevel) {
			logger.Errorf("transport: Got too many pings from the client, closing the connection.")
		}
		.controlBuf.put(&goAway{code: http2.ErrCodeEnhanceYourCalm, debugData: []byte("too_many_pings"), closeConn: true})
	}
}

func ( *http2Server) ( *http2.WindowUpdateFrame) {
	.controlBuf.put(&incomingWindowUpdate{
		streamID:  .Header().StreamID,
		increment: .Increment,
	})
}

func ( []hpack.HeaderField,  metadata.MD) []hpack.HeaderField {
	for ,  := range  {
Clients don't tolerate reading restricted headers after some non restricted ones were sent.
			continue
		}
		for ,  := range  {
			 = append(, hpack.HeaderField{Name: , Value: encodeMetadataHeader(, )})
		}
	}
	return 
}

func ( *http2Server) ( interface{}) bool {
	if .maxSendHeaderListSize == nil {
		return true
	}
	 := .(*headerFrame)
	var  int64
	for ,  := range .hf {
		if  += int64(.Size());  > int64(*.maxSendHeaderListSize) {
			if logger.V(logLevel) {
				logger.Errorf("header list size to send violates the maximum size (%d bytes) set by client", *.maxSendHeaderListSize)
			}
			return false
		}
	}
	return true
}
WriteHeader sends the header metadata md back to the client.
func ( *http2Server) ( *Stream,  metadata.MD) error {
	if .updateHeaderSent() || .getState() == streamDone {
		return ErrIllegalHeaderWrite
	}
	.hdrMu.Lock()
	if .Len() > 0 {
		if .header.Len() > 0 {
			.header = metadata.Join(.header, )
		} else {
			.header = 
		}
	}
	if  := .writeHeaderLocked();  != nil {
		.hdrMu.Unlock()
		return 
	}
	.hdrMu.Unlock()
	return nil
}

func ( *http2Server) () {
	atomic.StoreUint32(&.resetPingStrikes, 1)
}

TODO(mmukhi): Benchmark if the performance gets better if count the metadata and other header fields first and create a slice of that exact size.
	 := make([]hpack.HeaderField, 0, 2) // at least :status, content-type will be there if none else.
	 = append(, hpack.HeaderField{Name: ":status", Value: "200"})
	 = append(, hpack.HeaderField{Name: "content-type", Value: grpcutil.ContentType(.contentSubtype)})
	if .sendCompress != "" {
		 = append(, hpack.HeaderField{Name: "grpc-encoding", Value: .sendCompress})
	}
	 = appendHeaderFieldsFromMD(, .header)
	,  := .controlBuf.executeAndPut(.checkForHeaderListSize, &headerFrame{
		streamID:  .id,
		hf:        ,
		endStream: false,
		onWrite:   .setResetPingStrikes,
	})
	if ! {
		if  != nil {
			return 
		}
		.closeStream(, true, http2.ErrCodeInternal, false)
		return ErrHeaderListSizeLimitViolation
	}
Note: Headers are compressed with hpack after this call returns. No WireLength field is set here.
		 := &stats.OutHeader{
			Header:      .header.Copy(),
			Compression: .sendCompress,
		}
		.stats.HandleRPC(.Context(), )
	}
	return nil
}
WriteStatus sends stream status to the client and terminates the stream. There is no further I/O operations being able to perform on this stream. TODO(zhaoq): Now it indicates the end of entire stream. Revisit if early OK is adopted.
func ( *http2Server) ( *Stream,  *status.Status) error {
	if .getState() == streamDone {
		return nil
	}
TODO(mmukhi): Benchmark if the performance gets better if count the metadata and other header fields first and create a slice of that exact size.
	 := make([]hpack.HeaderField, 0, 2) // grpc-status and grpc-message will be there if none else.
	if !.updateHeaderSent() {                      // No headers have been sent.
		if len(.header) > 0 { // Send a separate header frame.
			if  := .writeHeaderLocked();  != nil {
				.hdrMu.Unlock()
				return 
			}
		} else { // Send a trailer only response.
			 = append(, hpack.HeaderField{Name: ":status", Value: "200"})
			 = append(, hpack.HeaderField{Name: "content-type", Value: grpcutil.ContentType(.contentSubtype)})
		}
	}
	 = append(, hpack.HeaderField{Name: "grpc-status", Value: strconv.Itoa(int(.Code()))})
	 = append(, hpack.HeaderField{Name: "grpc-message", Value: encodeGrpcMessage(.Message())})

	if  := .Proto();  != nil && len(.Details) > 0 {
		,  := proto.Marshal()
TODO: return error instead, when callers are able to handle it.
			logger.Errorf("transport: failed to marshal rpc status: %v, error: %v", , )
		} else {
			 = append(, hpack.HeaderField{Name: "grpc-status-details-bin", Value: encodeBinHeader()})
		}
	}
Attach the trailer metadata.
	 = appendHeaderFieldsFromMD(, .trailer)
	 := &headerFrame{
		streamID:  .id,
		hf:        ,
		endStream: true,
		onWrite:   .setResetPingStrikes,
	}
	.hdrMu.Unlock()
	,  := .controlBuf.execute(.checkForHeaderListSize, )
	if ! {
		if  != nil {
			return 
		}
		.closeStream(, true, http2.ErrCodeInternal, false)
		return ErrHeaderListSizeLimitViolation
Send a RST_STREAM after the trailers if the client has not already half-closed.
	 := .getState() == streamActive
	.finishStream(, , http2.ErrCodeNo, , true)
Note: The trailer fields are compressed with hpack after this call returns. No WireLength field is set here.
		.stats.HandleRPC(.Context(), &stats.OutTrailer{
			Trailer: .trailer.Copy(),
		})
	}
	return nil
}
Write converts the data into HTTP2 data frame and sends it out. Non-nil error is returns if it fails (e.g., framing error, transport error).
func ( *http2Server) ( *Stream,  []byte,  []byte,  *Options) error {
	if !.isHeaderSent() { // Headers haven't been written yet.
		if  := .WriteHeader(, nil);  != nil {
			if ,  := .(ConnectionError);  {
				return 
TODO(mmukhi, dfawley): Make sure this is the right code to return.
			return status.Errorf(codes.Internal, "transport: %v", )
		}
Writing headers checks for this condition.
TODO(mmukhi, dfawley): Should the server write also return io.EOF?
			.cancel()
			select {
			case <-.done:
				return ErrConnClosing
			default:
			}
			return ContextErr(.ctx.Err())
		}
	}
	 := &dataFrame{
		streamID:    .id,
		h:           ,
		d:           ,
		onEachWrite: .setResetPingStrikes,
	}
	if  := .wq.get(int32(len() + len()));  != nil {
		select {
		case <-.done:
			return ErrConnClosing
		default:
		}
		return ContextErr(.ctx.Err())
	}
	return .controlBuf.put()
}
keepalive running in a separate goroutine does the following: 1. Gracefully closes an idle connection after a duration of keepalive.MaxConnectionIdle. 2. Gracefully closes any connection after a duration of keepalive.MaxConnectionAge. 3. Forcibly closes a connection after an additive period of keepalive.MaxConnectionAgeGrace over keepalive.MaxConnectionAge. 4. Makes sure a connection is alive by sending pings with a frequency of keepalive.Time and closes a non-responsive connection after an additional duration of keepalive.Timeout.
func ( *http2Server) () {
True iff a ping has been sent, and no data has been received since then.
Amount of time remaining before which we should receive an ACK for the last sent ping.
Records the last value of t.lastRead before we go block on the timer. This is required to check for read activity since then.
Initialize the different timers to their default values.
	 := time.NewTimer(.kp.MaxConnectionIdle)
	 := time.NewTimer(.kp.MaxConnectionAge)
	 := time.NewTimer(.kp.Time)
We need to drain the underlying channel in these timers after a call to Stop(), only if we are interested in resetting them. Clearly we are not interested in resetting them here.
		.Stop()
		.Stop()
		.Stop()
	}()

	for {
		select {
		case <-.C:
			.mu.Lock()
			 := .idle
			if .IsZero() { // The connection is non-idle.
				.mu.Unlock()
				.Reset(.kp.MaxConnectionIdle)
				continue
			}
			 := .kp.MaxConnectionIdle - time.Since()
			.mu.Unlock()
The connection has been idle for a duration of keepalive.MaxConnectionIdle or more. Gracefully close the connection.
				.drain(http2.ErrCodeNo, []byte{})
				return
			}
			.Reset()
		case <-.C:
			.drain(http2.ErrCodeNo, []byte{})
			.Reset(.kp.MaxConnectionAgeGrace)
			select {
Close the connection after grace period.
				if logger.V(logLevel) {
					logger.Infof("transport: closing server transport due to maximum connection age.")
				}
				.Close()
			case <-.done:
			}
			return
		case <-.C:
			 := atomic.LoadInt64(&.lastRead)
There has been read activity since the last time we were here. Setup the timer to fire at kp.Time seconds from lastRead time and continue.
				 = false
				.Reset(time.Duration() + .kp.Time - time.Duration(time.Now().UnixNano()))
				 = 
				continue
			}
			if  &&  <= 0 {
				if logger.V(logLevel) {
					logger.Infof("transport: closing server transport due to idleness.")
				}
				.Close()
				return
			}
			if ! {
				if channelz.IsOn() {
					atomic.AddInt64(&.czData.kpCount, 1)
				}
				.controlBuf.put()
				 = .kp.Timeout
				 = true
The amount of time to sleep here is the minimum of kp.Time and timeoutLeft. This will ensure that we wait only for kp.Time before sending out the next ping (for cases where the ping is acked).
			 := minTime(.kp.Time, )
			 -= 
			.Reset()
		case <-.done:
			return
		}
	}
}
Close starts shutting down the http2Server transport. TODO(zhaoq): Now the destruction is not blocked on any pending streams. This could cause some resource issue. Revisit this later.
func ( *http2Server) () error {
	.mu.Lock()
	if .state == closing {
		.mu.Unlock()
		return errors.New("transport: Close() was already called")
	}
	.state = closing
	 := .activeStreams
	.activeStreams = nil
	.mu.Unlock()
	.controlBuf.finish()
	close(.done)
	 := .conn.Close()
	if channelz.IsOn() {
		channelz.RemoveEntry(.channelzID)
Cancel all active streams.
	for ,  := range  {
		.cancel()
	}
	if .stats != nil {
		 := &stats.ConnEnd{}
		.stats.HandleConn(.ctx, )
	}
	return 
}
deleteStream deletes the stream s from transport's active streams.
In case stream sending and receiving are invoked in separate goroutines (e.g., bi-directional streaming), cancel needs to be called to interrupt the potential blocking on other goroutines.
	.cancel()

	.mu.Lock()
	if ,  := .activeStreams[.id];  {
		delete(.activeStreams, .id)
		if len(.activeStreams) == 0 {
			.idle = time.Now()
		}
	}
	.mu.Unlock()

	if channelz.IsOn() {
		if  {
			atomic.AddInt64(&.czData.streamsSucceeded, 1)
		} else {
			atomic.AddInt64(&.czData.streamsFailed, 1)
		}
	}
}
finishStream closes the stream and puts the trailing headerFrame into controlbuf.
func ( *http2Server) ( *Stream,  bool,  http2.ErrCode,  *headerFrame,  bool) {
	 := .swapState(streamDone)
If the stream was already done, return.
		return
	}

	.cleanup = &cleanupStream{
		streamID: .id,
		rst:      ,
		rstCode:  ,
		onWrite: func() {
			.deleteStream(, )
		},
	}
	.controlBuf.put()
}
closeStream clears the footprint of a stream when the stream is not needed any more.
func ( *http2Server) ( *Stream,  bool,  http2.ErrCode,  bool) {
	.swapState(streamDone)
	.deleteStream(, )

	.controlBuf.put(&cleanupStream{
		streamID: .id,
		rst:      ,
		rstCode:  ,
		onWrite:  func() {},
	})
}

func ( *http2Server) () net.Addr {
	return .remoteAddr
}

func ( *http2Server) () {
	.drain(http2.ErrCodeNo, []byte{})
}

func ( *http2Server) ( http2.ErrCode,  []byte) {
	.mu.Lock()
	defer .mu.Unlock()
	if .drainChan != nil {
		return
	}
	.drainChan = make(chan struct{})
	.controlBuf.put(&goAway{code: , debugData: , headsUp: true})
}

var goAwayPing = &ping{data: [8]byte{1, 6, 1, 8, 0, 3, 3, 9}}
Handles outgoing GoAway and returns true if loopy needs to put itself in draining mode.
func ( *http2Server) ( *goAway) (bool, error) {
	.mu.Lock()
	if .state == closing { // TODO(mmukhi): This seems unnecessary.
The transport is closing.
		return false, ErrConnClosing
	}
	 := .maxStreamID
Stop accepting more streams now.
		.state = draining
		if len(.activeStreams) == 0 {
			.closeConn = true
		}
		.mu.Unlock()
		if  := .framer.fr.WriteGoAway(, .code, .debugData);  != nil {
			return false, 
		}
Abruptly close the connection following the GoAway (via loopywriter). But flush out what's inside the buffer first.
			.framer.writer.Flush()
			return false, fmt.Errorf("transport: Connection closing")
		}
		return true, nil
	}
For a graceful close, send out a GoAway with stream ID of MaxUInt32, Follow that with a ping and wait for the ack to come back or a timer to expire. During this time accept new streams since they might have originated before the GoAway reaches the client. After getting the ack or timer expiration send out another GoAway this time with an ID of the max stream server intends to process.
RemoteName :
	}
	if ,  := .authInfo.(credentials.ChannelzSecurityInfo);  {
		.Security = .GetSecurityValue()
	}
	.RemoteFlowControlWindow = .getOutFlowWindow()
	return &
}

func ( *http2Server) () {
	atomic.AddInt64(&.czData.msgSent, 1)
	atomic.StoreInt64(&.czData.lastMsgSentTime, time.Now().UnixNano())
}

func ( *http2Server) () {
	atomic.AddInt64(&.czData.msgRecv, 1)
	atomic.StoreInt64(&.czData.lastMsgRecvTime, time.Now().UnixNano())
}

func ( *http2Server) () int64 {
	 := make(chan uint32, 1)
	 := time.NewTimer(time.Second)
	defer .Stop()
	.controlBuf.put(&outFlowControlSizeRequest{})
	select {
	case  := <-:
		return int64()
	case <-.done:
		return -1
	case <-.C:
		return -2
	}
}

func ( time.Duration) time.Duration {
	if  == infinity {
		return 0
Generate a jitter between +/- 10% of the value.
	 := int64( / 10)
	 := grpcrand.Int63n(2*) - 
	return time.Duration()