Source File
server.go
Belonging Package
golang.org/x/net/http2
package http2
import (
)
const (
prefaceTimeout = 10 * time.Second
firstSettingsTimeout = 2 * time.Second // should be in-flight with preface anyway
handlerChunkWriteSize = 4 << 10
defaultMaxStreams = 250 // TODO: make this 100 as the GFE seems to?
maxQueuedControlFrames = 10000
)
var (
errClientDisconnected = errors.New("client disconnected")
errClosedBody = errors.New("body closed by handler")
errHandlerComplete = errors.New("http2: request body closed due to handler exiting")
errStreamClosed = errors.New("http2: stream closed")
)
var responseWriterStatePool = sync.Pool{
New: func() interface{} {
:= &responseWriterState{}
.bw = bufio.NewWriterSize(chunkWriter{}, handlerChunkWriteSize)
return
},
}
var (
testHookOnConn func()
testHookGetServerConn func(*serverConn)
testHookOnPanicMu *sync.Mutex // nil except in tests
testHookOnPanic func(sc *serverConn, panicVal interface{}) (rePanic bool)
)
NewWriteScheduler func() WriteScheduler
state *serverInternalState
}
func ( *Server) () int32 {
if .MaxUploadBufferPerConnection > initialWindowSize {
return .MaxUploadBufferPerConnection
}
return 1 << 20
}
func ( *Server) () int32 {
if .MaxUploadBufferPerStream > 0 {
return .MaxUploadBufferPerStream
}
return 1 << 20
}
func ( *Server) () uint32 {
if := .MaxReadFrameSize; >= minMaxFrameSize && <= maxFrameSize {
return
}
return defaultMaxReadFrameSize
}
func ( *Server) () uint32 {
if := .MaxConcurrentStreams; > 0 {
return
}
return defaultMaxStreams
}
return maxQueuedControlFrames
}
type serverInternalState struct {
mu sync.Mutex
activeConns map[*serverConn]struct{}
}
func ( *serverInternalState) ( *serverConn) {
if == nil {
return // if the Server was used without calling ConfigureServer
}
.mu.Lock()
.activeConns[] = struct{}{}
.mu.Unlock()
}
func ( *serverInternalState) ( *serverConn) {
if == nil {
return // if the Server was used without calling ConfigureServer
}
.mu.Lock()
delete(.activeConns, )
.mu.Unlock()
}
func ( *serverInternalState) () {
if == nil {
return // if the Server was used without calling ConfigureServer
}
.mu.Lock()
for := range .activeConns {
.startGracefulShutdown()
}
.mu.Unlock()
}
func ( *http.Server, *Server) error {
if == nil {
panic("nil *http.Server")
}
if == nil {
= new(Server)
}
.state = &serverInternalState{activeConns: make(map[*serverConn]struct{})}
if , := , ; .IdleTimeout == 0 {
if .IdleTimeout != 0 {
.IdleTimeout = .IdleTimeout
} else {
.IdleTimeout = .ReadTimeout
}
}
.RegisterOnShutdown(.state.startGracefulShutdown)
if .TLSConfig == nil {
.TLSConfig = new(tls.Config)
:= false
:= false
for , := range .TLSConfig.CipherSuites {
switch {
tls.TLS_ECDHE_ECDSA_WITH_AES_128_GCM_SHA256:
= true
}
if isBadCipher() {
= true
} else if {
return fmt.Errorf("http2: TLSConfig.CipherSuites index %d contains an HTTP/2-approved cipher suite (%#04x), but it comes after unapproved cipher suites. With this configuration, clients that don't support previous, approved cipher suites may be given an unapproved one and reject the connection.", , )
}
}
if ! {
return fmt.Errorf("http2: TLSConfig.CipherSuites is missing an HTTP/2-required AES_128_GCM_SHA256 cipher (need at least one of TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256 or TLS_ECDHE_ECDSA_WITH_AES_128_GCM_SHA256).")
}
}
.TLSConfig.PreferServerCipherSuites = true
:= false
for , := range .TLSConfig.NextProtos {
if == NextProtoTLS {
= true
break
}
}
if ! {
.TLSConfig.NextProtos = append(.TLSConfig.NextProtos, NextProtoTLS)
}
if .TLSNextProto == nil {
.TLSNextProto = map[string]func(*http.Server, *tls.Conn, http.Handler){}
}
:= func( *http.Server, *tls.Conn, http.Handler) {
if testHookOnConn != nil {
testHookOnConn()
var context.Context
type interface {
() context.Context
}
if , := .(); {
= .()
}
.ServeConn(, &ServeConnOpts{
Context: ,
Handler: ,
BaseConfig: ,
})
}
.TLSNextProto[NextProtoTLS] =
return nil
}
Handler http.Handler
}
func ( *ServeConnOpts) () context.Context {
if != nil && .Context != nil {
return .Context
}
return context.Background()
}
func ( *ServeConnOpts) () *http.Server {
if != nil && .BaseConfig != nil {
return .BaseConfig
}
return new(http.Server)
}
func ( *ServeConnOpts) () http.Handler {
if != nil {
if .Handler != nil {
return .Handler
}
if .BaseConfig != nil && .BaseConfig.Handler != nil {
return .BaseConfig.Handler
}
}
return http.DefaultServeMux
}
func ( *Server) ( net.Conn, *ServeConnOpts) {
, := serverConnBaseContext(, )
defer ()
:= &serverConn{
srv: ,
hs: .baseConfig(),
conn: ,
baseCtx: ,
remoteAddrStr: .RemoteAddr().String(),
bw: newBufferedWriter(),
handler: .handler(),
streams: make(map[uint32]*stream),
readFrameCh: make(chan readFrameResult),
wantWriteFrameCh: make(chan FrameWriteRequest, 8),
serveMsgCh: make(chan interface{}, 8),
wroteFrameCh: make(chan frameWriteResult, 1), // buffered; one send in writeFrameAsync
bodyReadCh: make(chan bodyReadMsg), // buffering doesn't matter either way
doneServing: make(chan struct{}),
clientMaxStreams: math.MaxUint32, // Section 6.5.2: "Initially, there is no limit to this value"
advMaxStreams: .maxConcurrentStreams(),
initialStreamSendWindowSize: initialWindowSize,
maxFrameSize: initialMaxFrameSize,
headerTableSize: initialHeaderTableSize,
serveG: newGoroutineLock(),
pushEnabled: true,
}
.state.registerConn()
defer .state.unregisterConn()
if .hs.WriteTimeout != 0 {
.conn.SetWriteDeadline(time.Time{})
}
if .NewWriteScheduler != nil {
.writeSched = .NewWriteScheduler()
} else {
.writeSched = NewRandomWriteScheduler()
}
.flow.add(initialWindowSize)
.inflow.add(initialWindowSize)
.hpackEncoder = hpack.NewEncoder(&.headerWriteBuf)
:= NewFramer(.bw, )
.ReadMetaHeaders = hpack.NewDecoder(initialHeaderTableSize, nil)
.MaxHeaderListSize = .maxHeaderListSize()
.SetMaxReadFrameSize(.maxReadFrameSize())
.framer =
if , := .(connectionStater); {
.tlsState = new(tls.ConnectionState)
if .tlsState.Version < tls.VersionTLS12 {
.rejectConn(ErrCodeInadequateSecurity, "TLS version too low")
return
}
}
.rejectConn(ErrCodeInadequateSecurity, fmt.Sprintf("Prohibited TLS 1.2 Cipher Suite: %x", .tlsState.CipherSuite))
return
}
}
if := testHookGetServerConn; != nil {
()
}
.serve()
}
func ( net.Conn, *ServeConnOpts) ( context.Context, func()) {
, = context.WithCancel(.context())
= context.WithValue(, http.LocalAddrContextKey, .LocalAddr())
if := .baseConfig(); != nil {
= context.WithValue(, http.ServerContextKey, )
}
return
}
func ( *serverConn) ( ErrCode, string) {
srv *Server
hs *http.Server
conn net.Conn
bw *bufferedWriter // writing to conn
handler http.Handler
baseCtx context.Context
framer *Framer
doneServing chan struct{} // closed when serverConn.serve ends
readFrameCh chan readFrameResult // written by serverConn.readFrames
wantWriteFrameCh chan FrameWriteRequest // from handlers -> serve
wroteFrameCh chan frameWriteResult // from writeFrameAsync -> serve, tickles more frame writes
bodyReadCh chan bodyReadMsg // from handlers -> serve
serveMsgCh chan interface{} // misc messages & code to send to / run on the serve loop
flow flow // conn-wide (not stream-specific) outbound flow control
inflow flow // conn-wide inbound flow control
tlsState *tls.ConnectionState // shared by all handlers, like net/http
remoteAddrStr string
writeSched WriteScheduler
serveG goroutineLock // used to verify funcs are on serve()
pushEnabled bool
sawFirstSettings bool // got the initial SETTINGS frame after the preface
needToSendSettingsAck bool
unackedSettings int // how many SETTINGS have we sent without ACKs?
queuedControlFrames int // control frames in the writeSched queue
clientMaxStreams uint32 // SETTINGS_MAX_CONCURRENT_STREAMS from client (our PUSH_PROMISE limit)
advMaxStreams uint32 // our SETTINGS_MAX_CONCURRENT_STREAMS advertised the client
curClientStreams uint32 // number of open streams initiated by the client
curPushedStreams uint32 // number of open streams initiated by server push
maxClientStreamID uint32 // max ever seen from client (odd), or 0 if there have been no client requests
maxPushPromiseID uint32 // ID of the last push promise (even), or 0 if there have been no pushes
streams map[uint32]*stream
initialStreamSendWindowSize int32
maxFrameSize int32
headerTableSize uint32
peerMaxHeaderListSize uint32 // zero means unknown (default)
canonHeader map[string]string // http2-lower-case -> Go-Canonical-Case
writingFrame bool // started writing a frame (on serve goroutine or separate)
writingFrameAsync bool // started a frame on its own goroutine but haven't heard back on wroteFrameCh
needsFrameFlush bool // last frame write wasn't a flush
inGoAway bool // we've started to or sent GOAWAY
inFrameScheduleLoop bool // whether we're in the scheduleFrameWrite loop
needToSendGoAway bool // we need to schedule a GOAWAY frame write
goAwayCode ErrCode
shutdownTimer *time.Timer // nil until used
idleTimer *time.Timer // nil if unused
shutdownOnce sync.Once
}
func ( *serverConn) () uint32 {
:= .hs.MaxHeaderBytes
if <= 0 {
= http.DefaultMaxHeaderBytes
const = 32 // per http2 spec
const = 10 // conservative
return uint32( + *)
}
func ( *serverConn) () uint32 {
.serveG.check()
return .curClientStreams + .curPushedStreams
}
bodyBytes int64 // body bytes seen so far
declBodyBytes int64 // or -1 if undeclared
flow flow // limits writing from Handler to client
inflow flow // what the client is allowed to POST/etc to us
state streamState
resetQueued bool // RST_STREAM queued for write; set by sc.resetStream
gotTrailerHeader bool // HEADER frame for trailers was seen
wroteHeaders bool // whether we wrote headers (not status 100)
writeDeadline *time.Timer // nil if unused
trailer http.Header // accumulated trailers
reqTrailer http.Header // handler's Request.Trailer
}
func ( *serverConn) () *Framer { return .framer }
func ( *serverConn) () error { return .conn.Close() }
func ( *serverConn) () error { return .bw.Flush() }
func ( *serverConn) () (*hpack.Encoder, *bytes.Buffer) {
return .hpackEncoder, &.headerWriteBuf
}
func ( *serverConn) ( uint32) (streamState, *stream) {
if %2 == 1 {
if <= .maxClientStreamID {
return stateClosed, nil
}
} else {
if <= .maxPushPromiseID {
return stateClosed, nil
}
}
return stateIdle, nil
}
if runtime.GOOS == "windows" {
if , := .(*net.OpError); && .Op == "read" {
if , := .Err.(*os.SyscallError); && .Syscall == "wsarecv" {
const = 10053
const = 10054
if := errno(.Err); == || == {
return true
}
}
}
}
return false
}
func ( *serverConn) ( error, string, ...interface{}) {
if == nil {
return
}
.vlogf(, ...)
} else {
.logf(, ...)
}
}
func ( *serverConn) ( string) string {
.serveG.check()
buildCommonHeaderMapsOnce()
, := commonCanonHeader[]
if {
return
}
, = .canonHeader[]
if {
return
}
if .canonHeader == nil {
.canonHeader = make(map[string]string)
}
= http.CanonicalHeaderKey()
.canonHeader[] =
return
}
type readFrameResult struct {
f Frame // valid until readMore is called
err error
readMore func()
}
func ( *serverConn) () {
:= make(gate)
:= .Done
for {
, := .framer.ReadFrame()
select {
case .readFrameCh <- readFrameResult{, , }:
case <-.doneServing:
return
}
select {
case <-:
case <-.doneServing:
return
}
if terminalReadFrameError() {
return
}
}
}
type frameWriteResult struct {
_ incomparable
wr FrameWriteRequest // what was written (or attempted)
err error // result of the writeFrame call
}
func ( *serverConn) ( FrameWriteRequest) {
:= .write.writeFrame()
.wroteFrameCh <- frameWriteResult{wr: , err: }
}
func ( *serverConn) () {
.serveG.check()
for , := range .streams {
.closeStream(, errClientDisconnected)
}
}
func ( *serverConn) () {
.serveG.check()
if := .shutdownTimer; != nil {
.Stop()
}
}
if testHookOnPanicMu != nil {
testHookOnPanicMu.Lock()
defer testHookOnPanicMu.Unlock()
}
if testHookOnPanic != nil {
if := recover(); != nil {
if testHookOnPanic(, ) {
panic()
}
}
}
}
func ( *serverConn) () {
.serveG.check()
defer .notePanic()
defer .conn.Close()
defer .closeAllStreamsOnConnClose()
defer .stopShutdownTimer()
defer close(.doneServing) // unblocks handlers trying to send
if VerboseLogs {
.vlogf("http2: server connection from %v on %p", .conn.RemoteAddr(), .hs)
}
.writeFrame(FrameWriteRequest{
write: writeSettings{
{SettingMaxFrameSize, .srv.maxReadFrameSize()},
{SettingMaxConcurrentStreams, .advMaxStreams},
{SettingMaxHeaderListSize, .maxHeaderListSize()},
{SettingInitialWindowSize, uint32(.srv.initialStreamRecvWindowSize())},
},
})
.unackedSettings++
if := .srv.initialConnRecvWindowSize() - initialWindowSize; > 0 {
.sendWindowUpdate(nil, int())
}
if := .readPreface(); != nil {
.condlogf(, "http2: server: error reading preface from client %v: %v", .conn.RemoteAddr(), )
return
.setConnState(http.StateActive)
.setConnState(http.StateIdle)
if .srv.IdleTimeout != 0 {
.idleTimer = time.AfterFunc(.srv.IdleTimeout, .onIdleTimer)
defer .idleTimer.Stop()
}
go .readFrames() // closed by defer sc.conn.Close above
:= time.AfterFunc(firstSettingsTimeout, .onSettingsTimer)
defer .Stop()
:= 0
for {
++
select {
case := <-.wantWriteFrameCh:
if , := .write.(StreamError); {
.resetStream()
break
}
.writeFrame()
case := <-.wroteFrameCh:
.wroteFrame()
case := <-.readFrameCh:
if !.processFrameFromReader() {
return
}
.readMore()
if != nil {
.Stop()
= nil
}
case := <-.bodyReadCh:
.noteBodyRead(.st, .n)
case := <-.serveMsgCh:
switch v := .(type) {
case func(int):
() // for testing
case *serverMessage:
switch {
case settingsTimerMsg:
.logf("timeout waiting for SETTINGS frames from %v", .conn.RemoteAddr())
return
case idleTimerMsg:
.vlogf("connection is idle")
.goAway(ErrCodeNo)
case shutdownTimerMsg:
.vlogf("GOAWAY close timer fired; closing conn from %v", .conn.RemoteAddr())
return
case gracefulShutdownMsg:
.startGracefulShutdownInternal()
default:
panic("unknown timer")
}
case *startPushRequest:
.startPush()
default:
panic(fmt.Sprintf("unexpected type %T", ))
}
}
if .queuedControlFrames > .srv.maxQueuedControlFrames() {
.vlogf("http2: too many control frames in send queue, closing connection")
return
}
:= .inGoAway && !.needToSendGoAway && !.writingFrame
:= .goAwayCode == ErrCodeNo && .curOpenStreams() == 0
if && .shutdownTimer == nil && (.goAwayCode != ErrCodeNo || ) {
.shutDownIn(goAwayTimeout)
}
}
}
func ( *serverConn) ( <-chan struct{}, chan struct{}) {
select {
case <-.doneServing:
case <-:
close()
}
}
type serverMessage int
var (
settingsTimerMsg = new(serverMessage)
idleTimerMsg = new(serverMessage)
shutdownTimerMsg = new(serverMessage)
gracefulShutdownMsg = new(serverMessage)
)
func ( *serverConn) () { .sendServeMsg(settingsTimerMsg) }
func ( *serverConn) () { .sendServeMsg(idleTimerMsg) }
func ( *serverConn) () { .sendServeMsg(shutdownTimerMsg) }
func ( *serverConn) ( interface{}) {
.serveG.checkNotOn() // NOT
select {
case .serveMsgCh <- :
case <-.doneServing:
}
}
var errPrefaceTimeout = errors.New("timeout waiting for client preface")
func ( *serverConn) () error {
:= make(chan error, 1)
:= make([]byte, len(ClientPreface))
if , := io.ReadFull(.conn, ); != nil {
<-
} else if !bytes.Equal(, clientPreface) {
<- fmt.Errorf("bogus greeting %q", )
} else {
<- nil
}
}()
:= time.NewTimer(prefaceTimeout) // TODO: configurable on *Server?
defer .Stop()
select {
case <-.C:
return errPrefaceTimeout
case := <-:
if == nil {
if VerboseLogs {
.vlogf("http2: server: client %v said hello", .conn.RemoteAddr())
}
}
return
}
}
var errChanPool = sync.Pool{
New: func() interface{} { return make(chan error, 1) },
}
var writeDataPool = sync.Pool{
New: func() interface{} { return new(writeData) },
}
func ( *serverConn) ( *stream, []byte, bool) error {
:= errChanPool.Get().(chan error)
:= writeDataPool.Get().(*writeData)
* = writeData{.id, , }
:= .writeFrameFromHandler(FrameWriteRequest{
write: ,
stream: ,
done: ,
})
if != nil {
return
}
var bool // the frame write is done (successfully or not)
select {
case = <-:
= true
case <-.doneServing:
return errClientDisconnected
select {
case = <-:
= true
default:
return errStreamClosed
}
}
errChanPool.Put()
if {
writeDataPool.Put()
}
return
}
func ( *serverConn) ( FrameWriteRequest) error {
.serveG.checkNotOn() // NOT
select {
case .wantWriteFrameCh <- :
return nil
return errClientDisconnected
}
}
func ( *serverConn) ( FrameWriteRequest) {
.serveG.check()
var bool
if .StreamID() != 0 {
, := .write.(StreamError)
if , := .state(.StreamID()); == stateClosed && ! {
= true
}
}
switch .write.(type) {
case *writeResHeaders:
.stream.wroteHeaders = true
case write100ContinueHeadersFrame:
if .queuedControlFrames < 0 {
.conn.Close()
}
}
.writeSched.Push()
}
.scheduleFrameWrite()
}
func ( *serverConn) ( FrameWriteRequest) {
.serveG.check()
if .writingFrame {
panic("internal error: can only be writing one frame at a time")
}
:= .stream
if != nil {
switch .state {
case stateHalfClosedLocal:
switch .write.(type) {
default:
panic(fmt.Sprintf("internal error: attempt to send frame on a half-closed-local stream: %v", ))
}
case stateClosed:
panic(fmt.Sprintf("internal error: attempt to send frame on a closed stream: %v", ))
}
}
if , := .write.(*writePushPromise); {
var error
.promisedID, = .allocatePromisedID()
if != nil {
.writingFrameAsync = false
.replyToWriter()
return
}
}
.writingFrame = true
.needsFrameFlush = true
if .write.staysWithinBuffer(.bw.Available()) {
.writingFrameAsync = false
:= .write.writeFrame()
.wroteFrame(frameWriteResult{wr: , err: })
} else {
.writingFrameAsync = true
go .writeFrameAsync()
}
}
var errHandlerPanicked = errors.New("http2: handler panicked")
func ( *serverConn) ( frameWriteResult) {
.serveG.check()
if !.writingFrame {
panic("internal error: expected to be already writing a frame")
}
.writingFrame = false
.writingFrameAsync = false
:= .wr
if writeEndsStream(.write) {
:= .stream
if == nil {
panic("internal error: expecting non-nil stream")
}
switch .state {
.resetStream(streamError(.id, ErrCodeNo))
case stateHalfClosedRemote:
.closeStream(, errHandlerComplete)
}
} else {
switch v := .write.(type) {
if , := .streams[.StreamID]; {
.closeStream(, )
}
case handlerPanicRST:
.closeStream(.stream, errHandlerPanicked)
}
}
.replyToWriter(.err)
.scheduleFrameWrite()
}
func ( *serverConn) () {
.serveG.check()
if .writingFrame || .inFrameScheduleLoop {
return
}
.inFrameScheduleLoop = true
for !.writingFrameAsync {
if .needToSendGoAway {
.needToSendGoAway = false
.startFrameWrite(FrameWriteRequest{
write: &writeGoAway{
maxStreamID: .maxClientStreamID,
code: .goAwayCode,
},
})
continue
}
if .needToSendSettingsAck {
.needToSendSettingsAck = false
.startFrameWrite(FrameWriteRequest{write: writeSettingsAck{}})
continue
}
if !.inGoAway || .goAwayCode == ErrCodeNo {
if , := .writeSched.Pop(); {
if .isControl() {
.queuedControlFrames--
}
.startFrameWrite()
continue
}
}
if .needsFrameFlush {
.startFrameWrite(FrameWriteRequest{write: flushFrameWriter{}})
.needsFrameFlush = false // after startFrameWrite, since it sets this true
continue
}
break
}
.inFrameScheduleLoop = false
}
func ( *serverConn) () {
.serveG.checkNotOn() // NOT
.shutdownOnce.Do(func() { .sendServeMsg(gracefulShutdownMsg) })
}
var goAwayTimeout = 1 * time.Second
func ( *serverConn) () {
.goAway(ErrCodeNo)
}
func ( *serverConn) ( ErrCode) {
.serveG.check()
if .inGoAway {
return
}
.inGoAway = true
.needToSendGoAway = true
.goAwayCode =
.scheduleFrameWrite()
}
func ( *serverConn) ( time.Duration) {
.serveG.check()
.shutdownTimer = time.AfterFunc(, .onShutdownTimer)
}
func ( *serverConn) ( StreamError) {
.serveG.check()
.writeFrame(FrameWriteRequest{write: })
if , := .streams[.StreamID]; {
.resetQueued = true
}
}
func ( *serverConn) ( readFrameResult) bool {
.serveG.check()
:= .err
if != nil {
if == ErrFrameTooLarge {
.goAway(ErrCodeFrameSize)
return true // goAway will close the loop
}
:= == io.EOF || == io.ErrUnexpectedEOF || isClosedConnError()
return false
}
} else {
:= .f
if VerboseLogs {
.vlogf("http2: server read frame %v", summarizeFrame())
}
= .processFrame()
if == nil {
return true
}
}
switch ev := .(type) {
case StreamError:
.resetStream()
return true
case goAwayFlowError:
.goAway(ErrCodeFlowControl)
return true
case ConnectionError:
.logf("http2: server connection error from %v: %v", .conn.RemoteAddr(), )
.goAway(ErrCode())
return true // goAway will handle shutdown
default:
if .err != nil {
.vlogf("http2: server closing client connection; error reading frame from client %s: %v", .conn.RemoteAddr(), )
} else {
.logf("http2: server closing client connection: %v", )
}
return false
}
}
func ( *serverConn) ( Frame) error {
.serveG.check()
if !.sawFirstSettings {
if , := .(*SettingsFrame); ! {
return ConnectionError(ErrCodeProtocol)
}
.sawFirstSettings = true
}
switch f := .(type) {
case *SettingsFrame:
return .processSettings()
case *MetaHeadersFrame:
return .processHeaders()
case *WindowUpdateFrame:
return .processWindowUpdate()
case *PingFrame:
return .processPing()
case *DataFrame:
return .processData()
case *RSTStreamFrame:
return .processResetStream()
case *PriorityFrame:
return .processPriority()
case *GoAwayFrame:
return .processGoAway()
return ConnectionError(ErrCodeProtocol)
default:
.vlogf("http2: server ignoring frame: %v", .Header())
return nil
}
}
func ( *serverConn) ( *PingFrame) error {
.serveG.check()
return nil
}
return ConnectionError(ErrCodeProtocol)
}
if .inGoAway && .goAwayCode != ErrCodeNo {
return nil
}
.writeFrame(FrameWriteRequest{write: writePingAck{}})
return nil
}
func ( *serverConn) ( *WindowUpdateFrame) error {
.serveG.check()
switch {
case .StreamID != 0: // stream-level flow control
, := .state(.StreamID)
return ConnectionError(ErrCodeProtocol)
}
return nil
}
if !.flow.add(int32(.Increment)) {
return streamError(.StreamID, ErrCodeFlowControl)
}
default: // connection-level flow control
if !.flow.add(int32(.Increment)) {
return goAwayFlowError{}
}
}
.scheduleFrameWrite()
return nil
}
func ( *serverConn) ( *RSTStreamFrame) error {
.serveG.check()
, := .state(.StreamID)
return ConnectionError(ErrCodeProtocol)
}
if != nil {
.cancelCtx()
.closeStream(, streamError(.StreamID, .ErrCode))
}
return nil
}
func ( *serverConn) ( *stream, error) {
.serveG.check()
if .state == stateIdle || .state == stateClosed {
panic(fmt.Sprintf("invariant; can't close stream in state %v", .state))
}
.state = stateClosed
if .writeDeadline != nil {
.writeDeadline.Stop()
}
if .isPushed() {
.curPushedStreams--
} else {
.curClientStreams--
}
delete(.streams, .id)
if len(.streams) == 0 {
.setConnState(http.StateIdle)
if .srv.IdleTimeout != 0 {
.idleTimer.Reset(.srv.IdleTimeout)
}
if h1ServerKeepAlivesDisabled(.hs) {
.startGracefulShutdownInternal()
}
}
.sendWindowUpdate(nil, .Len())
.CloseWithError()
}
.cw.Close() // signals Handler's CloseNotifier, unblocks writes, etc
.writeSched.CloseStream(.id)
}
func ( *serverConn) ( *SettingsFrame) error {
.serveG.check()
if .IsAck() {
.unackedSettings--
return ConnectionError(ErrCodeProtocol)
}
return nil
}
return ConnectionError(ErrCodeProtocol)
}
if := .ForeachSetting(.processSetting); != nil {
return
.needToSendSettingsAck = true
.scheduleFrameWrite()
return nil
}
func ( *serverConn) ( Setting) error {
.serveG.check()
if := .Valid(); != nil {
return
}
if VerboseLogs {
.vlogf("http2: server processing setting %v", )
}
switch .ID {
case SettingHeaderTableSize:
.headerTableSize = .Val
.hpackEncoder.SetMaxDynamicTableSize(.Val)
case SettingEnablePush:
.pushEnabled = .Val != 0
case SettingMaxConcurrentStreams:
.clientMaxStreams = .Val
case SettingInitialWindowSize:
return .processSettingInitialWindowSize(.Val)
case SettingMaxFrameSize:
.maxFrameSize = int32(.Val) // the maximum valid s.Val is < 2^31
case SettingMaxHeaderListSize:
.peerMaxHeaderListSize = .Val
if VerboseLogs {
.vlogf("http2: server ignoring unknown setting %v", )
}
}
return nil
}
func ( *serverConn) ( uint32) error {
:= .initialStreamSendWindowSize
.initialStreamSendWindowSize = int32()
:= int32() - // may be negative
for , := range .streams {
return ConnectionError(ErrCodeFlowControl)
}
}
return nil
}
func ( *serverConn) ( *DataFrame) error {
.serveG.check()
:= .Header().StreamID
return ConnectionError(ErrCodeProtocol)
}
if .inflow.available() < int32(.Length) {
return streamError(, ErrCodeFlowControl)
return nil
}
return streamError(, ErrCodeStreamClosed)
}
if .body == nil {
panic("internal error: should have a body in this state")
}
if .declBodyBytes != -1 && .bodyBytes+int64(len()) > .declBodyBytes {
return streamError(, ErrCodeProtocol)
}
if .inflow.available() < int32(.Length) {
return streamError(, ErrCodeFlowControl)
}
.inflow.take(int32(.Length))
if len() > 0 {
, := .body.Write()
if != nil {
.sendWindowUpdate(nil, int(.Length)-)
return streamError(, ErrCodeStreamClosed)
}
if != len() {
panic("internal error: bad Writer")
}
.bodyBytes += int64(len())
}
if := int32(.Length) - int32(len()); > 0 {
.sendWindowUpdate32(nil, )
.sendWindowUpdate32(, )
}
}
if .StreamEnded() {
.endStream()
}
return nil
}
func ( *serverConn) ( *GoAwayFrame) error {
.serveG.check()
if .ErrCode != ErrCodeNo {
.logf("http2: received GOAWAY %+v, starting graceful shutdown", )
} else {
.vlogf("http2: received GOAWAY %+v, starting graceful shutdown", )
}
.pushEnabled = false
return nil
}
func ( *stream) () {
:= .sc
.serveG.check()
if .declBodyBytes != -1 && .declBodyBytes != .bodyBytes {
.body.CloseWithError(fmt.Errorf("request declared a Content-Length of %d but only wrote %d bytes",
.declBodyBytes, .bodyBytes))
} else {
.body.closeWithErrorAndCode(io.EOF, .copyTrailersToHandlerRequest)
.body.CloseWithError(io.EOF)
}
.state = stateHalfClosedRemote
}
.reqTrailer[] =
}
}
}
func ( *stream) () {
.sc.writeFrameFromHandler(FrameWriteRequest{write: streamError(.id, ErrCodeInternal)})
}
func ( *serverConn) ( *MetaHeadersFrame) error {
.serveG.check()
:= .StreamID
return nil
if %2 != 1 {
return ConnectionError(ErrCodeProtocol)
return nil
if .state == stateHalfClosedRemote {
return streamError(, ErrCodeStreamClosed)
}
return .processTrailerHeaders()
}
if <= .maxClientStreamID {
return ConnectionError(ErrCodeProtocol)
}
.maxClientStreamID =
if .idleTimer != nil {
.idleTimer.Stop()
}
if .curClientStreams+1 > .advMaxStreams {
return streamError(, ErrCodeProtocol)
return streamError(, ErrCodeRefusedStream)
}
:= stateOpen
if .StreamEnded() {
= stateHalfClosedRemote
}
:= .newStream(, 0, )
if .HasPriority() {
if := checkPriority(.StreamID, .Priority); != nil {
return
}
.writeSched.AdjustStream(.id, .Priority)
}
, , := .newWriterAndRequest(, )
if != nil {
return
}
.reqTrailer = .Trailer
if .reqTrailer != nil {
.trailer = make(http.Header)
}
.body = .Body.(*requestBody).pipe // may be nil
.declBodyBytes = .ContentLength
:= .handler.ServeHTTP
= handleHeaderListTooLong
} else if := checkValidHTTP2RequestHeaders(.Header); != nil {
= new400Handler()
}
if .hs.ReadTimeout != 0 {
.conn.SetReadDeadline(time.Time{})
}
go .runHandler(, , )
return nil
}
func ( *stream) ( *MetaHeadersFrame) error {
:= .sc
.serveG.check()
if .gotTrailerHeader {
return ConnectionError(ErrCodeProtocol)
}
.gotTrailerHeader = true
if !.StreamEnded() {
return streamError(.id, ErrCodeProtocol)
}
if len(.PseudoFields()) > 0 {
return streamError(.id, ErrCodeProtocol)
}
if .trailer != nil {
for , := range .RegularFields() {
:= .canonicalHeader(.Name)
return streamError(.id, ErrCodeProtocol)
}
.trailer[] = append(.trailer[], .Value)
}
}
.endStream()
return nil
}
func ( uint32, PriorityParam) error {
return streamError(, ErrCodeProtocol)
}
return nil
}
func ( *serverConn) ( *PriorityFrame) error {
if .inGoAway {
return nil
}
if := checkPriority(.StreamID, .PriorityParam); != nil {
return
}
.writeSched.AdjustStream(.StreamID, .PriorityParam)
return nil
}
func ( *serverConn) (, uint32, streamState) *stream {
.serveG.check()
if == 0 {
panic("internal error: cannot create stream with id 0")
}
, := context.WithCancel(.baseCtx)
:= &stream{
sc: ,
id: ,
state: ,
ctx: ,
cancelCtx: ,
}
.cw.Init()
.flow.conn = &.flow // link to conn-level counter
.flow.add(.initialStreamSendWindowSize)
.inflow.conn = &.inflow // link to conn-level counter
.inflow.add(.srv.initialStreamRecvWindowSize())
if .hs.WriteTimeout != 0 {
.writeDeadline = time.AfterFunc(.hs.WriteTimeout, .onWriteTimeout)
}
.streams[] =
.writeSched.OpenStream(.id, OpenStreamOptions{PusherID: })
if .isPushed() {
.curPushedStreams++
} else {
.curClientStreams++
}
if .curOpenStreams() == 1 {
.setConnState(http.StateActive)
}
return
}
func ( *serverConn) ( *stream, *MetaHeadersFrame) (*responseWriter, *http.Request, error) {
.serveG.check()
:= requestParam{
method: .PseudoValue("method"),
scheme: .PseudoValue("scheme"),
authority: .PseudoValue("authority"),
path: .PseudoValue("path"),
}
:= .method == "CONNECT"
if {
if .path != "" || .scheme != "" || .authority == "" {
return nil, nil, streamError(.StreamID, ErrCodeProtocol)
}
return nil, nil, streamError(.StreamID, ErrCodeProtocol)
}
:= !.StreamEnded()
return nil, nil, streamError(.StreamID, ErrCodeProtocol)
}
.header = make(http.Header)
for , := range .RegularFields() {
.header.Add(.canonicalHeader(.Name), .Value)
}
if .authority == "" {
.authority = .header.Get("Host")
}
, , := .newWriterAndRequestNoBody(, )
if != nil {
return nil, nil,
}
if {
if , := .header["Content-Length"]; {
if , := strconv.ParseUint([0], 10, 63); == nil {
.ContentLength = int64()
} else {
.ContentLength = 0
}
} else {
.ContentLength = -1
}
.Body.(*requestBody).pipe = &pipe{
b: &dataBuffer{expected: .ContentLength},
}
}
return , , nil
}
type requestParam struct {
method string
scheme, authority, path string
header http.Header
}
func ( *serverConn) ( *stream, requestParam) (*responseWriter, *http.Request, error) {
.serveG.check()
var *tls.ConnectionState // nil if not scheme https
if .scheme == "https" {
= .tlsState
}
:= .header.Get("Expect") == "100-continue"
if {
.header.Del("Expect")
var http.Header
for , := range .header["Trailer"] {
for , := range strings.Split(, ",") {
= http.CanonicalHeaderKey(textproto.TrimString())
switch {
default:
if == nil {
= make(http.Header)
}
[] = nil
}
}
}
delete(.header, "Trailer")
var *url.URL
var string
if .method == "CONNECT" {
= &url.URL{Host: .authority}
= .authority // mimic HTTP/1 server behavior
} else {
var error
, = url.ParseRequestURI(.path)
if != nil {
return nil, nil, streamError(.id, ErrCodeProtocol)
}
= .path
}
:= &requestBody{
conn: ,
stream: ,
needsContinue: ,
}
:= &http.Request{
Method: .method,
URL: ,
RemoteAddr: .remoteAddrStr,
Header: .header,
RequestURI: ,
Proto: "HTTP/2.0",
ProtoMajor: 2,
ProtoMinor: 0,
TLS: ,
Host: .authority,
Body: ,
Trailer: ,
}
= .WithContext(.ctx)
:= responseWriterStatePool.Get().(*responseWriterState)
:= .bw
* = responseWriterState{} // zero all the fields
.conn =
.bw =
.bw.Reset(chunkWriter{})
.stream =
.req =
.body =
:= &responseWriter{rws: }
return , , nil
}
func ( *serverConn) ( *responseWriter, *http.Request, func(http.ResponseWriter, *http.Request)) {
:= true
defer func() {
.rws.stream.cancelCtx()
if {
:= recover()
.writeFrameFromHandler(FrameWriteRequest{
write: handlerPanicRST{.rws.stream.id},
stream: .rws.stream,
if != nil && != http.ErrAbortHandler {
const = 64 << 10
:= make([]byte, )
= [:runtime.Stack(, false)]
.logf("http2: panic serving %v: %v\n%s", .conn.RemoteAddr(), , )
}
return
}
.handlerDone()
}()
(, )
= false
}
const = 431 // only in Go 1.6+
.WriteHeader()
io.WriteString(, "<h1>HTTP Error 431</h1><p>Request Header Field(s) Too Large</p>")
}
func ( *serverConn) ( *stream, *writeResHeaders) error {
.serveG.checkNotOn() // NOT on
var chan error
= errChanPool.Get().(chan error)
}
if := .writeFrameFromHandler(FrameWriteRequest{
write: ,
stream: ,
done: ,
}); != nil {
return
}
if != nil {
select {
case := <-:
errChanPool.Put()
return
case <-.doneServing:
return errClientDisconnected
case <-.cw:
return errStreamClosed
}
}
return nil
}
func ( *serverConn) ( *stream) {
.writeFrameFromHandler(FrameWriteRequest{
write: write100ContinueHeadersFrame{.id},
stream: ,
})
}
type bodyReadMsg struct {
st *stream
n int
}
func ( *serverConn) ( *stream, int, error) {
.serveG.checkNotOn() // NOT on
if > 0 {
select {
case .bodyReadCh <- bodyReadMsg{, }:
case <-.doneServing:
}
}
}
func ( *serverConn) ( *stream, int) {
.serveG.check()
.sendWindowUpdate(nil, ) // conn-level
.sendWindowUpdate(, )
}
}
func ( *serverConn) ( *stream, int) {
const = 1<<31 - 1
for >= {
.sendWindowUpdate32(, )
-=
}
.sendWindowUpdate32(, int32())
}
func ( *serverConn) ( *stream, int32) {
.serveG.check()
if == 0 {
return
}
if < 0 {
panic("negative update")
}
var uint32
if != nil {
= .id
}
.writeFrame(FrameWriteRequest{
write: writeWindowUpdate{streamID: , n: uint32()},
stream: ,
})
var bool
if == nil {
= .inflow.add()
} else {
= .inflow.add()
}
if ! {
panic("internal error; sent too many window updates without decrements?")
}
}
type requestBody struct {
_ incomparable
stream *stream
conn *serverConn
closed bool // for use by Close only
sawEOF bool // for use by Read only
pipe *pipe // non-nil if we have a HTTP entity message body
needsContinue bool // need to send a 100-continue
}
func ( *requestBody) () error {
if .pipe != nil && !.closed {
.pipe.BreakWithError(errClosedBody)
}
.closed = true
return nil
}
func ( *requestBody) ( []byte) ( int, error) {
if .needsContinue {
.needsContinue = false
.conn.write100ContinueHeaders(.stream)
}
if .pipe == nil || .sawEOF {
return 0, io.EOF
}
, = .pipe.Read()
if == io.EOF {
.sawEOF = true
}
if .conn == nil && inTests {
return
}
.conn.noteBodyReadFromHandler(.stream, , )
return
}
type responseWriter struct {
rws *responseWriterState
}
var (
_ http.CloseNotifier = (*responseWriter)(nil)
_ http.Flusher = (*responseWriter)(nil)
_ stringWriter = (*responseWriter)(nil)
)
stream *stream
req *http.Request
body *requestBody // to close at end of request, if DATA frames didn't
conn *serverConn
handlerHeader http.Header // nil until called
snapHeader http.Header // snapshot of handlerHeader at WriteHeader time
trailers []string // set in writeChunk
status int // status code passed to WriteHeader
wroteHeader bool // WriteHeader called (explicitly or implicitly). Not necessarily sent to user yet.
sentHeader bool // have we sent the header frame?
handlerDone bool // handler has finished
dirty bool // a Write failed; don't reuse this responseWriterState
sentContentLen int64 // non-zero if handler set a Content-Length header
wroteBytes int64
closeNotifierMu sync.Mutex // guards closeNotifierCh
closeNotifierCh chan bool // nil until first used
}
type chunkWriter struct{ rws *responseWriterState }
func ( chunkWriter) ( []byte) ( int, error) { return .rws.writeChunk() }
func ( *responseWriterState) () bool { return len(.trailers) > 0 }
func ( *responseWriterState) () bool {
for , := range .trailers {
if , := .handlerHeader[]; {
return true
}
}
return false
}
func ( *responseWriterState) ( string) {
= http.CanonicalHeaderKey()
func ( *responseWriterState) ( []byte) ( int, error) {
if !.wroteHeader {
.writeHeader(200)
}
:= .req.Method == "HEAD"
if !.sentHeader {
.sentHeader = true
var , string
if = .snapHeader.Get("Content-Length"); != "" {
.snapHeader.Del("Content-Length")
if , := strconv.ParseUint(, 10, 63); == nil {
.sentContentLen = int64()
} else {
= ""
}
}
if == "" && .handlerDone && bodyAllowedForStatus(.status) && (len() > 0 || !) {
= strconv.Itoa(len())
}
:= .snapHeader.Get("Content-Encoding")
:= len() > 0
if ! && ! && bodyAllowedForStatus(.status) && len() > 0 {
= http.DetectContentType()
}
var string
= time.Now().UTC().Format(http.TimeFormat)
}
for , := range .snapHeader["Trailer"] {
foreachHeaderElement(, .declareTrailer)
}
if , := .snapHeader["Connection"]; {
:= .snapHeader.Get("Connection")
delete(.snapHeader, "Connection")
if == "close" {
.conn.startGracefulShutdown()
}
}
:= (.handlerDone && !.hasTrailers() && len() == 0) ||
= .conn.writeHeaders(.stream, &writeResHeaders{
streamID: .stream.id,
httpResCode: .status,
h: .snapHeader,
endStream: ,
contentType: ,
contentLength: ,
date: ,
})
if != nil {
.dirty = true
return 0,
}
if {
return 0, nil
}
}
if {
return len(), nil
}
if len() == 0 && !.handlerDone {
return 0, nil
}
if .handlerDone {
.promoteUndeclaredTrailers()
}
:= .hasNonemptyTrailers()
:= .handlerDone && !
const TrailerPrefix = "Trailer:"
func ( *responseWriterState) () {
for , := range .handlerHeader {
if !strings.HasPrefix(, TrailerPrefix) {
continue
}
:= strings.TrimPrefix(, TrailerPrefix)
.declareTrailer()
.handlerHeader[http.CanonicalHeaderKey()] =
}
if len(.trailers) > 1 {
:= sorterPool.Get().(*sorter)
.SortStrings(.trailers)
sorterPool.Put()
}
}
func ( *responseWriter) () {
:= .rws
if == nil {
panic("Header called after Handler finished")
}
if .bw.Buffered() > 0 {
return
}
.writeChunk(nil)
}
}
func ( *responseWriter) () <-chan bool {
:= .rws
if == nil {
panic("CloseNotify called after Handler finished")
}
.closeNotifierMu.Lock()
:= .closeNotifierCh
if == nil {
= make(chan bool, 1)
.closeNotifierCh =
:= .stream.cw
go func() {
.Wait() // wait for close
<- true
}()
}
.closeNotifierMu.Unlock()
return
}
func ( *responseWriter) () http.Header {
:= .rws
if == nil {
panic("Header called after Handler finished")
}
if .handlerHeader == nil {
.handlerHeader = make(http.Header)
}
return .handlerHeader
}
if < 100 || > 999 {
panic(fmt.Sprintf("invalid WriteHeader code %v", ))
}
}
func ( *responseWriter) ( int) {
:= .rws
if == nil {
panic("WriteHeader called after Handler finished")
}
.writeHeader()
}
func ( *responseWriterState) ( int) {
if !.wroteHeader {
checkWriteHeaderCode()
.wroteHeader = true
.status =
if len(.handlerHeader) > 0 {
.snapHeader = cloneHeader(.handlerHeader)
}
}
}
func ( http.Header) http.Header {
:= make(http.Header, len())
for , := range {
:= make([]string, len())
copy(, )
[] =
}
return
}
func ( *responseWriter) ( int, []byte, string) ( int, error) {
:= .rws
if == nil {
panic("Write called after Handler finished")
}
if !.wroteHeader {
.WriteHeader(200)
}
if !bodyAllowedForStatus(.status) {
return 0, http.ErrBodyNotAllowed
}
.wroteBytes += int64(len()) + int64(len()) // only one can be set
return 0, errors.New("http2: handler wrote more than declared Content-Length")
}
if != nil {
return .bw.Write()
} else {
return .bw.WriteString()
}
}
func ( *responseWriter) () {
:= .rws
:= .dirty
.handlerDone = true
.Flush()
.rws = nil
responseWriterStatePool.Put()
}
}
var (
ErrRecursivePush = errors.New("http2: recursive push not allowed")
ErrPushLimitReached = errors.New("http2: push would exceed peer's SETTINGS_MAX_CONCURRENT_STREAMS")
)
var _ http.Pusher = (*responseWriter)(nil)
func ( *responseWriter) ( string, *http.PushOptions) error {
:= .rws.stream
:= .sc
.serveG.checkNotOn()
if .isPushed() {
return ErrRecursivePush
}
if == nil {
= new(http.PushOptions)
}
, := url.Parse()
if != nil {
return
}
if .Scheme == "" {
if !strings.HasPrefix(, "/") {
return fmt.Errorf("target must be an absolute URL or an absolute path: %q", )
}
.Scheme =
.Host = .rws.req.Host
} else {
if .Scheme != {
return fmt.Errorf("cannot push URL with scheme %q from request with scheme %q", .Scheme, )
}
if .Host == "" {
return errors.New("URL must have a host")
}
}
for := range .Header {
if strings.HasPrefix(, ":") {
return fmt.Errorf("promised request headers cannot include pseudo header %q", )
if .Method != "GET" && .Method != "HEAD" {
return fmt.Errorf("method %q must be GET or HEAD", .Method)
}
:= &startPushRequest{
parent: ,
method: .Method,
url: ,
header: cloneHeader(.Header),
done: errChanPool.Get().(chan error),
}
select {
case <-.doneServing:
return errClientDisconnected
case <-.cw:
return errStreamClosed
case .serveMsgCh <- :
}
select {
case <-.doneServing:
return errClientDisconnected
case <-.cw:
return errStreamClosed
case := <-.done:
errChanPool.Put(.done)
return
}
}
type startPushRequest struct {
parent *stream
method string
url *url.URL
header http.Header
done chan error
}
func ( *serverConn) ( *startPushRequest) {
.serveG.check()
.done <- errStreamClosed
return
}
if !.pushEnabled {
.done <- http.ErrNotSupported
return
}
if !.pushEnabled {
return 0, http.ErrNotSupported
if .curPushedStreams+1 > .clientMaxStreams {
return 0, ErrPushLimitReached
}
if .maxPushPromiseID+2 >= 1<<31 {
.startGracefulShutdownInternal()
return 0, ErrPushLimitReached
}
.maxPushPromiseID += 2
:= .maxPushPromiseID
:= .newStream(, .parent.id, stateHalfClosedRemote)
, , := .newWriterAndRequestNoBody(, requestParam{
method: .method,
scheme: .url.Scheme,
authority: .url.Host,
path: .url.RequestURI(),
header: cloneHeader(.header), // clone since handler runs concurrently with writing the PUSH_PROMISE
})
func ( string, func(string)) {
= textproto.TrimString()
if == "" {
return
}
if !strings.Contains(, ",") {
()
return
}
for , := range strings.Split(, ",") {
if = textproto.TrimString(); != "" {
()
}
}
}
var connHeaders = []string{
"Connection",
"Keep-Alive",
"Proxy-Connection",
"Transfer-Encoding",
"Upgrade",
}
func ( http.Header) error {
for , := range connHeaders {
if , := []; {
return fmt.Errorf("request header %q is not valid in HTTP/2", )
}
}
:= ["Te"]
if len() > 0 && (len() > 1 || ([0] != "trailers" && [0] != "")) {
return errors.New(`request header "TE" may only be "trailers" in HTTP/2`)
}
return nil
}
func ( error) http.HandlerFunc {
return func( http.ResponseWriter, *http.Request) {
http.Error(, .Error(), http.StatusBadRequest)
}
}
![]() |
The pages are generated with Golds v0.3.2-preview. (GOOS=darwin GOARCH=amd64) Golds is a Go 101 project developed by Tapir Liu. PR and bug reports are welcome and can be submitted to the issue list. Please follow @Go100and1 (reachable from the left QR code) to get the latest news of Golds. |