Source File
http2_server.go
Belonging Package
google.golang.org/grpc/internal/transport
package transport
import (
)
ErrHeaderListSizeLimitViolation = errors.New("transport: trying to send header list size larger than the limit set by peer")
)
type http2Server struct {
lastRead int64 // Keep this field 64-bit aligned. Accessed atomically.
ctx context.Context
done chan struct{}
conn net.Conn
loopy *loopyWriter
readerDone chan struct{} // sync point to enable testing.
writerDone chan struct{} // sync point to enable testing.
remoteAddr net.Addr
localAddr net.Addr
maxStreamID uint32 // max stream ID ever seen
authInfo credentials.AuthInfo // auth info about the connection
inTapHandle tap.ServerInHandle
resetPingStrikes uint32 // Accessed atomically.
initialWindowSize int32
bdpEst *bdpEstimator
maxSendHeaderListSize *uint32
mu sync.Mutex // guard the following
drainChan chan struct{}
state transportState
channelzID int64 // channelz unique identification number
czData *channelzData
bufferPool *bufferPool
connectionID uint64
}
func ( net.Conn, *ServerConfig) ( ServerTransport, error) {
:= .WriteBufferSize
:= .ReadBufferSize
:= defaultServerMaxHeaderListSize
if .MaxHeaderListSize != nil {
= *.MaxHeaderListSize
}
:= []http2.Setting{{
ID: http2.SettingMaxFrameSize,
Val: http2MaxFrameLen,
:= .MaxStreams
if == 0 {
= math.MaxUint32
} else {
= append(, http2.Setting{
ID: http2.SettingMaxConcurrentStreams,
Val: ,
})
}
:= true
:= int32(initialWindowSize)
if .InitialWindowSize >= defaultWindowSize {
= .InitialWindowSize
= false
}
:= int32(initialWindowSize)
if .InitialConnWindowSize >= defaultWindowSize {
= .InitialConnWindowSize
= false
}
if != defaultWindowSize {
= append(, http2.Setting{
ID: http2.SettingInitialWindowSize,
Val: uint32()})
}
if .MaxHeaderListSize != nil {
= append(, http2.Setting{
ID: http2.SettingMaxHeaderListSize,
Val: *.MaxHeaderListSize,
})
}
if .HeaderTableSize != nil {
= append(, http2.Setting{
ID: http2.SettingHeaderTableSize,
Val: *.HeaderTableSize,
})
}
if := .fr.WriteSettings(...); != nil {
return nil, connectionErrorf(false, , "transport: %v", )
if := uint32( - defaultWindowSize); > 0 {
if := .fr.WriteWindowUpdate(0, ); != nil {
return nil, connectionErrorf(false, , "transport: %v", )
}
}
:= .KeepaliveParams
if .MaxConnectionIdle == 0 {
.MaxConnectionIdle = defaultMaxConnectionIdle
}
if .MaxConnectionAge == 0 {
.MaxConnectionAge = defaultMaxConnectionAge
.MaxConnectionAge += getJitter(.MaxConnectionAge)
if .MaxConnectionAgeGrace == 0 {
.MaxConnectionAgeGrace = defaultMaxConnectionAgeGrace
}
if .Time == 0 {
.Time = defaultServerKeepaliveTime
}
if .Timeout == 0 {
.Timeout = defaultServerKeepaliveTimeout
}
:= .KeepalivePolicy
if .MinTime == 0 {
.MinTime = defaultKeepalivePolicyMinTime
}
:= make(chan struct{})
:= &http2Server{
ctx: context.Background(),
done: ,
conn: ,
remoteAddr: .RemoteAddr(),
localAddr: .LocalAddr(),
authInfo: .AuthInfo,
framer: ,
readerDone: make(chan struct{}),
writerDone: make(chan struct{}),
maxStreams: ,
inTapHandle: .InTapHandle,
fc: &trInFlow{limit: uint32()},
state: reachable,
activeStreams: make(map[uint32]*Stream),
stats: .StatsHandler,
kp: ,
idle: time.Now(),
kep: ,
initialWindowSize: ,
czData: new(channelzData),
bufferPool: newBufferPool(),
}
.controlBuf = newControlBuffer(.done)
if {
.bdpEst = &bdpEstimator{
bdp: initialWindowSize,
updateFlowControl: .updateFlowControl,
}
}
if .stats != nil {
.ctx = .stats.TagConn(.ctx, &stats.ConnTagInfo{
RemoteAddr: .remoteAddr,
LocalAddr: .localAddr,
})
:= &stats.ConnBegin{}
.stats.HandleConn(.ctx, )
}
if channelz.IsOn() {
.channelzID = channelz.RegisterNormalSocket(, .ChannelzParentID, fmt.Sprintf("%s -> %s", .remoteAddr, .localAddr))
}
.connectionID = atomic.AddUint64(&serverConnectionCounter, 1)
.framer.writer.Flush()
defer func() {
if != nil {
.Close()
}
}()
:= make([]byte, len(clientPreface))
if , := io.ReadFull(.conn, ); != nil {
return nil, connectionErrorf(false, , "transport: http2Server.HandleStreams failed to receive the preface from client: %v", )
}
if !bytes.Equal(, clientPreface) {
return nil, connectionErrorf(false, nil, "transport: http2Server.HandleStreams received bogus greeting from client: %q", )
}
, := .framer.fr.ReadFrame()
if == io.EOF || == io.ErrUnexpectedEOF {
return nil,
}
if != nil {
return nil, connectionErrorf(false, , "transport: http2Server.HandleStreams failed to read initial settings frame: %v", )
}
atomic.StoreInt64(&.lastRead, time.Now().UnixNano())
, := .(*http2.SettingsFrame)
if ! {
return nil, connectionErrorf(false, nil, "transport: http2Server.HandleStreams saw invalid preface type %T from client", )
}
.handleSettings()
go func() {
.loopy = newLoopyWriter(serverSide, .framer, .controlBuf, .bdpEst)
.loopy.ssGoAwayHandler = .outgoingGoAwayHandler
if := .loopy.run(); != nil {
if logger.V(logLevel) {
logger.Errorf("transport: loopyWriter.run returning. Err: %v", )
}
}
.conn.Close()
close(.writerDone)
}()
go .keepalive()
return , nil
}
func ( *http2Server) ( *http2.MetaHeadersFrame, func(*Stream), func(context.Context, string) context.Context) ( bool) {
:= .Header().StreamID
:= &decodeState{
serverSide: true,
}
if := .decodeHeader(); != nil {
if , := status.FromError(); {
.controlBuf.put(&cleanupStream{
streamID: ,
rst: true,
rstCode: statusCodeConvTab[.Code()],
onWrite: func() {},
})
}
return false
}
:= newRecvBuffer()
:= &Stream{
id: ,
st: ,
buf: ,
fc: &inFlow{limit: uint32(.initialWindowSize)},
recvCompress: .data.encoding,
method: .data.method,
contentSubtype: .data.contentSubtype,
}
.state = streamReadDone
}
if .data.timeoutSet {
.ctx, .cancel = context.WithTimeout(.ctx, .data.timeout)
} else {
.ctx, .cancel = context.WithCancel(.ctx)
}
:= &peer.Peer{
Addr: .remoteAddr,
if len(.data.mdata) > 0 {
.ctx = metadata.NewIncomingContext(.ctx, .data.mdata)
}
if .data.statsTags != nil {
.ctx = stats.SetIncomingTags(.ctx, .data.statsTags)
}
if .data.statsTrace != nil {
.ctx = stats.SetIncomingTrace(.ctx, .data.statsTrace)
}
if .inTapHandle != nil {
var error
:= &tap.Info{
FullMethodName: .data.method,
}
.ctx, = .inTapHandle(.ctx, )
if != nil {
if logger.V(logLevel) {
logger.Warningf("transport: http2Server.operateHeaders got an error from InTapHandle: %v", )
}
.controlBuf.put(&cleanupStream{
streamID: .id,
rst: true,
rstCode: http2.ErrCodeRefusedStream,
onWrite: func() {},
})
.cancel()
return false
}
}
.mu.Lock()
if .state != reachable {
.mu.Unlock()
.cancel()
return false
}
if uint32(len(.activeStreams)) >= .maxStreams {
.mu.Unlock()
.controlBuf.put(&cleanupStream{
streamID: ,
rst: true,
rstCode: http2.ErrCodeRefusedStream,
onWrite: func() {},
})
.cancel()
return false
}
if %2 != 1 || <= .maxStreamID {
if logger.V(logLevel) {
logger.Errorf("transport: http2Server.HandleStreams received an illegal stream id: %v", )
}
.cancel()
return true
}
.maxStreamID =
.activeStreams[] =
if len(.activeStreams) == 1 {
.idle = time.Time{}
}
.mu.Unlock()
if channelz.IsOn() {
atomic.AddInt64(&.czData.streamsStarted, 1)
atomic.StoreInt64(&.czData.lastStreamCreatedTime, time.Now().UnixNano())
}
.requestRead = func( int) {
.adjustWindow(, uint32())
}
.ctx = (.ctx, .method)
if .stats != nil {
.ctx = .stats.TagRPC(.ctx, &stats.RPCTagInfo{FullMethodName: .method})
:= &stats.InHeader{
FullMethod: .method,
RemoteAddr: .remoteAddr,
LocalAddr: .localAddr,
Compression: .recvCompress,
WireLength: int(.Header().Length),
Header: metadata.MD(.data.mdata).Copy(),
}
.stats.HandleRPC(.ctx, )
}
.ctxDone = .ctx.Done()
.wq = newWriteQuota(defaultWriteQuota, .ctxDone)
.trReader = &transportReader{
reader: &recvBufferReader{
ctx: .ctx,
ctxDone: .ctxDone,
recv: .buf,
freeBuffer: .bufferPool.put,
},
windowHandler: func( int) {
.updateWindow(, uint32())
},
.controlBuf.put(®isterStream{
streamID: .id,
wq: .wq,
})
()
return false
}
func ( *http2Server) ( func(*Stream), func(context.Context, string) context.Context) {
defer close(.readerDone)
for {
.controlBuf.throttle()
, := .framer.fr.ReadFrame()
atomic.StoreInt64(&.lastRead, time.Now().UnixNano())
if != nil {
if , := .(http2.StreamError); {
if logger.V(logLevel) {
logger.Warningf("transport: http2Server.HandleStreams encountered http2.StreamError: %v", )
}
.mu.Lock()
:= .activeStreams[.StreamID]
.mu.Unlock()
if != nil {
.closeStream(, true, .Code, false)
} else {
.controlBuf.put(&cleanupStream{
streamID: .StreamID,
rst: true,
rstCode: .Code,
onWrite: func() {},
})
}
continue
}
if == io.EOF || == io.ErrUnexpectedEOF {
.Close()
return
}
if logger.V(logLevel) {
logger.Warningf("transport: http2Server.HandleStreams failed to read frame: %v", )
}
.Close()
return
}
switch frame := .(type) {
case *http2.MetaHeadersFrame:
if .operateHeaders(, , ) {
.Close()
break
}
case *http2.DataFrame:
.handleData()
case *http2.RSTStreamFrame:
.handleRSTStream()
case *http2.SettingsFrame:
.handleSettings()
case *http2.PingFrame:
.handlePing()
case *http2.WindowUpdateFrame:
.handleWindowUpdate()
return nil, false
}
, := .activeStreams[.Header().StreamID]
func ( *http2Server) ( *Stream, uint32) {
if := .fc.maybeAdjust(); > 0 {
.controlBuf.put(&outgoingWindowUpdate{streamID: .id, increment: })
}
}
func ( *http2Server) ( *Stream, uint32) {
if := .fc.onRead(); > 0 {
.controlBuf.put(&outgoingWindowUpdate{streamID: .id,
increment: ,
})
}
}
func ( *http2Server) ( uint32) {
.mu.Lock()
for , := range .activeStreams {
.fc.newLimit()
}
.initialWindowSize = int32()
.mu.Unlock()
.controlBuf.put(&outgoingWindowUpdate{
streamID: 0,
increment: .fc.newLimit(),
})
.controlBuf.put(&outgoingSettings{
ss: []http2.Setting{
{
ID: http2.SettingInitialWindowSize,
Val: ,
},
},
})
}
func ( *http2Server) ( *http2.DataFrame) {
:= .Header().Length
var bool
if .bdpEst != nil {
= .bdpEst.add()
if := .fc.onData(); > 0 {
.controlBuf.put(&outgoingWindowUpdate{
streamID: 0,
increment: ,
})
}
if := .fc.reset(); > 0 {
.controlBuf.put(&outgoingWindowUpdate{
streamID: 0,
increment: ,
})
}
.controlBuf.put(bdpPing)
, := .getStream()
if ! {
return
}
if .getState() == streamReadDone {
.closeStream(, true, http2.ErrCodeStreamClosed, false)
return
}
if > 0 {
if := .fc.onData(); != nil {
.closeStream(, true, http2.ErrCodeFlowControl, false)
return
}
if .Header().Flags.Has(http2.FlagDataPadded) {
if := .fc.onRead( - uint32(len(.Data()))); > 0 {
.controlBuf.put(&outgoingWindowUpdate{.id, })
}
.compareAndSwapState(streamActive, streamReadDone)
.write(recvMsg{err: io.EOF})
}
}
if , := .getStream(); {
.closeStream(, false, 0, false)
return
.controlBuf.put(&cleanupStream{
streamID: .Header().StreamID,
rst: false,
rstCode: 0,
onWrite: func() {},
})
}
func ( *http2Server) ( *http2.SettingsFrame) {
if .IsAck() {
return
}
var []http2.Setting
var []func()
.ForeachSetting(func( http2.Setting) error {
switch .ID {
case http2.SettingMaxHeaderListSize:
= append(, func() {
.maxSendHeaderListSize = new(uint32)
*.maxSendHeaderListSize = .Val
})
default:
= append(, )
}
return nil
})
.controlBuf.executeAndPut(func(interface{}) bool {
for , := range {
()
}
return true
}, &incomingSettings{
ss: ,
})
}
const (
maxPingStrikes = 2
defaultPingTimeout = 2 * time.Hour
)
func ( *http2Server) ( *http2.PingFrame) {
if .IsAck() {
if .Data == goAwayPing.data && .drainChan != nil {
close(.drainChan)
return
if atomic.CompareAndSwapUint32(&.resetPingStrikes, 1, 0) {
.pingStrikes = 0
return
}
.mu.Lock()
:= len(.activeStreams)
.mu.Unlock()
if .lastPingAt.Add(defaultPingTimeout).After() {
.pingStrikes++
}
if .lastPingAt.Add(.kep.MinTime).After() {
.pingStrikes++
}
}
if logger.V(logLevel) {
logger.Errorf("transport: Got too many pings from the client, closing the connection.")
}
.controlBuf.put(&goAway{code: http2.ErrCodeEnhanceYourCalm, debugData: []byte("too_many_pings"), closeConn: true})
}
}
func ( *http2Server) ( *http2.WindowUpdateFrame) {
.controlBuf.put(&incomingWindowUpdate{
streamID: .Header().StreamID,
increment: .Increment,
})
}
func ( []hpack.HeaderField, metadata.MD) []hpack.HeaderField {
for , := range {
continue
}
for , := range {
= append(, hpack.HeaderField{Name: , Value: encodeMetadataHeader(, )})
}
}
return
}
func ( *http2Server) ( interface{}) bool {
if .maxSendHeaderListSize == nil {
return true
}
:= .(*headerFrame)
var int64
for , := range .hf {
if += int64(.Size()); > int64(*.maxSendHeaderListSize) {
if logger.V(logLevel) {
logger.Errorf("header list size to send violates the maximum size (%d bytes) set by client", *.maxSendHeaderListSize)
}
return false
}
}
return true
}
func ( *http2Server) ( *Stream, metadata.MD) error {
if .updateHeaderSent() || .getState() == streamDone {
return ErrIllegalHeaderWrite
}
.hdrMu.Lock()
if .Len() > 0 {
if .header.Len() > 0 {
.header = metadata.Join(.header, )
} else {
.header =
}
}
if := .writeHeaderLocked(); != nil {
.hdrMu.Unlock()
return
}
.hdrMu.Unlock()
return nil
}
func ( *http2Server) () {
atomic.StoreUint32(&.resetPingStrikes, 1)
}
:= make([]hpack.HeaderField, 0, 2) // at least :status, content-type will be there if none else.
= append(, hpack.HeaderField{Name: ":status", Value: "200"})
= append(, hpack.HeaderField{Name: "content-type", Value: grpcutil.ContentType(.contentSubtype)})
if .sendCompress != "" {
= append(, hpack.HeaderField{Name: "grpc-encoding", Value: .sendCompress})
}
= appendHeaderFieldsFromMD(, .header)
, := .controlBuf.executeAndPut(.checkForHeaderListSize, &headerFrame{
streamID: .id,
hf: ,
endStream: false,
onWrite: .setResetPingStrikes,
})
if ! {
if != nil {
return
}
.closeStream(, true, http2.ErrCodeInternal, false)
return ErrHeaderListSizeLimitViolation
}
func ( *http2Server) ( *Stream, *status.Status) error {
if .getState() == streamDone {
return nil
}
:= make([]hpack.HeaderField, 0, 2) // grpc-status and grpc-message will be there if none else.
if !.updateHeaderSent() { // No headers have been sent.
if len(.header) > 0 { // Send a separate header frame.
if := .writeHeaderLocked(); != nil {
.hdrMu.Unlock()
return
}
} else { // Send a trailer only response.
= append(, hpack.HeaderField{Name: ":status", Value: "200"})
= append(, hpack.HeaderField{Name: "content-type", Value: grpcutil.ContentType(.contentSubtype)})
}
}
= append(, hpack.HeaderField{Name: "grpc-status", Value: strconv.Itoa(int(.Code()))})
= append(, hpack.HeaderField{Name: "grpc-message", Value: encodeGrpcMessage(.Message())})
if := .Proto(); != nil && len(.Details) > 0 {
, := proto.Marshal()
logger.Errorf("transport: failed to marshal rpc status: %v, error: %v", , )
} else {
= append(, hpack.HeaderField{Name: "grpc-status-details-bin", Value: encodeBinHeader()})
}
}
= appendHeaderFieldsFromMD(, .trailer)
:= &headerFrame{
streamID: .id,
hf: ,
endStream: true,
onWrite: .setResetPingStrikes,
}
.hdrMu.Unlock()
, := .controlBuf.execute(.checkForHeaderListSize, )
if ! {
if != nil {
return
}
.closeStream(, true, http2.ErrCodeInternal, false)
return ErrHeaderListSizeLimitViolation
:= .getState() == streamActive
.finishStream(, , http2.ErrCodeNo, , true)
func ( *http2Server) ( *Stream, []byte, []byte, *Options) error {
if !.isHeaderSent() { // Headers haven't been written yet.
if := .WriteHeader(, nil); != nil {
if , := .(ConnectionError); {
return
.cancel()
select {
case <-.done:
return ErrConnClosing
default:
}
return ContextErr(.ctx.Err())
}
}
:= &dataFrame{
streamID: .id,
h: ,
d: ,
onEachWrite: .setResetPingStrikes,
}
if := .wq.get(int32(len() + len())); != nil {
select {
case <-.done:
return ErrConnClosing
default:
}
return ContextErr(.ctx.Err())
}
return .controlBuf.put()
}
func ( *http2Server) () {
= false
.Reset(time.Duration() + .kp.Time - time.Duration(time.Now().UnixNano()))
=
continue
}
if && <= 0 {
if logger.V(logLevel) {
logger.Infof("transport: closing server transport due to idleness.")
}
.Close()
return
}
if ! {
if channelz.IsOn() {
atomic.AddInt64(&.czData.kpCount, 1)
}
.controlBuf.put()
= .kp.Timeout
= true
func ( *http2Server) () error {
.mu.Lock()
if .state == closing {
.mu.Unlock()
return errors.New("transport: Close() was already called")
}
.state = closing
:= .activeStreams
.activeStreams = nil
.mu.Unlock()
.controlBuf.finish()
close(.done)
:= .conn.Close()
if channelz.IsOn() {
channelz.RemoveEntry(.channelzID)
.cancel()
.mu.Lock()
if , := .activeStreams[.id]; {
delete(.activeStreams, .id)
if len(.activeStreams) == 0 {
.idle = time.Now()
}
}
.mu.Unlock()
if channelz.IsOn() {
if {
atomic.AddInt64(&.czData.streamsSucceeded, 1)
} else {
atomic.AddInt64(&.czData.streamsFailed, 1)
}
}
}
func ( *http2Server) ( *Stream, bool, http2.ErrCode, *headerFrame, bool) {
:= .swapState(streamDone)
return
}
.cleanup = &cleanupStream{
streamID: .id,
rst: ,
rstCode: ,
onWrite: func() {
.deleteStream(, )
},
}
.controlBuf.put()
}
func ( *http2Server) ( *Stream, bool, http2.ErrCode, bool) {
.swapState(streamDone)
.deleteStream(, )
.controlBuf.put(&cleanupStream{
streamID: .id,
rst: ,
rstCode: ,
onWrite: func() {},
})
}
func ( *http2Server) () net.Addr {
return .remoteAddr
}
func ( *http2Server) () {
.drain(http2.ErrCodeNo, []byte{})
}
func ( *http2Server) ( http2.ErrCode, []byte) {
.mu.Lock()
defer .mu.Unlock()
if .drainChan != nil {
return
}
.drainChan = make(chan struct{})
.controlBuf.put(&goAway{code: , debugData: , headsUp: true})
}
var goAwayPing = &ping{data: [8]byte{1, 6, 1, 8, 0, 3, 3, 9}}
return false, ErrConnClosing
}
:= .maxStreamID
if := .framer.fr.WriteGoAway(math.MaxUint32, http2.ErrCodeNo, []byte{}); != nil {
return false,
}
if := .framer.fr.WritePing(false, goAwayPing.data); != nil {
return false,
}
go func() {
:= time.NewTimer(time.Minute)
defer .Stop()
select {
case <-.drainChan:
case <-.C:
case <-.done:
return
}
.controlBuf.put(&goAway{code: .code, debugData: .debugData})
}()
return false, nil
}
func ( *http2Server) () *channelz.SocketInternalMetric {
:= channelz.SocketInternalMetric{
StreamsStarted: atomic.LoadInt64(&.czData.streamsStarted),
StreamsSucceeded: atomic.LoadInt64(&.czData.streamsSucceeded),
StreamsFailed: atomic.LoadInt64(&.czData.streamsFailed),
MessagesSent: atomic.LoadInt64(&.czData.msgSent),
MessagesReceived: atomic.LoadInt64(&.czData.msgRecv),
KeepAlivesSent: atomic.LoadInt64(&.czData.kpCount),
LastRemoteStreamCreatedTimestamp: time.Unix(0, atomic.LoadInt64(&.czData.lastStreamCreatedTime)),
LastMessageSentTimestamp: time.Unix(0, atomic.LoadInt64(&.czData.lastMsgSentTime)),
LastMessageReceivedTimestamp: time.Unix(0, atomic.LoadInt64(&.czData.lastMsgRecvTime)),
LocalFlowControlWindow: int64(.fc.getSize()),
SocketOptions: channelz.GetSocketOption(.conn),
LocalAddr: .localAddr,
}
if , := .authInfo.(credentials.ChannelzSecurityInfo); {
.Security = .GetSecurityValue()
}
.RemoteFlowControlWindow = .getOutFlowWindow()
return &
}
func ( *http2Server) () {
atomic.AddInt64(&.czData.msgSent, 1)
atomic.StoreInt64(&.czData.lastMsgSentTime, time.Now().UnixNano())
}
func ( *http2Server) () {
atomic.AddInt64(&.czData.msgRecv, 1)
atomic.StoreInt64(&.czData.lastMsgRecvTime, time.Now().UnixNano())
}
func ( *http2Server) () int64 {
:= make(chan uint32, 1)
:= time.NewTimer(time.Second)
defer .Stop()
.controlBuf.put(&outFlowControlSizeRequest{})
select {
case := <-:
return int64()
case <-.done:
return -1
case <-.C:
return -2
}
}
func ( time.Duration) time.Duration {
if == infinity {
return 0
![]() |
The pages are generated with Golds v0.3.2-preview. (GOOS=darwin GOARCH=amd64) Golds is a Go 101 project developed by Tapir Liu. PR and bug reports are welcome and can be submitted to the issue list. Please follow @Go100and1 (reachable from the left QR code) to get the latest news of Golds. |