Source File
transport.go
Belonging Package
golang.org/x/net/http2
package http2
import (
mathrand
)
transportDefaultConnFlow = 1 << 30
transportDefaultStreamFlow = 4 << 20
transportDefaultStreamMinRefresh = 4 << 10
defaultUserAgent = "Go-http-client/2.0"
)
t1 *http.Transport
connPoolOnce sync.Once
connPoolOrDef ClientConnPool // non-nil version of ConnPool
}
func ( *Transport) () uint32 {
if .MaxHeaderListSize == 0 {
return 10 << 20
}
if .MaxHeaderListSize == 0xffffffff {
return 0
}
return .MaxHeaderListSize
}
func ( *Transport) () bool {
return .DisableCompression || (.t1 != nil && .t1.DisableCompression)
}
func ( *Transport) () time.Duration {
if .PingTimeout == 0 {
return 15 * time.Second
}
return .PingTimeout
}
func ( *http.Transport) error {
, := ConfigureTransports()
return
}
func ( *http.Transport) (*Transport, error) {
return configureTransports()
}
func ( *http.Transport) (*Transport, error) {
:= new(clientConnPool)
:= &Transport{
ConnPool: noDialClientConnPool{},
t1: ,
}
.t =
if := registerHTTPSProtocol(, noDialH2RoundTripper{}); != nil {
return nil,
}
if .TLSClientConfig == nil {
.TLSClientConfig = new(tls.Config)
}
if !strSliceContains(.TLSClientConfig.NextProtos, "h2") {
.TLSClientConfig.NextProtos = append([]string{"h2"}, .TLSClientConfig.NextProtos...)
}
if !strSliceContains(.TLSClientConfig.NextProtos, "http/1.1") {
.TLSClientConfig.NextProtos = append(.TLSClientConfig.NextProtos, "http/1.1")
}
:= func( string, *tls.Conn) http.RoundTripper {
:= authorityAddr("https", )
if , := .addConnIfNeeded(, , ); != nil {
go .Close()
return erringRoundTripper{}
go .Close()
}
return
}
if := .TLSNextProto; len() == 0 {
.TLSNextProto = map[string]func(string, *tls.Conn) http.RoundTripper{
"h2": ,
}
} else {
["h2"] =
}
return , nil
}
func ( *Transport) () ClientConnPool {
.connPoolOnce.Do(.initConnPool)
return .connPoolOrDef
}
func ( *Transport) () {
if .ConnPool != nil {
.connPoolOrDef = .ConnPool
} else {
.connPoolOrDef = &clientConnPool{t: }
}
}
readerDone chan struct{} // closed on error
readerErr error // set before readerDone is closed
idleTimeout time.Duration // or 0 for never
idleTimer *time.Timer
mu sync.Mutex // guards following
cond *sync.Cond // hold mu; broadcast on flow/closed changes
flow flow // our conn-level flow control quota (cs.flow is per stream)
inflow flow // peer's conn-level flow control
closing bool
closed bool
wantSettingsAck bool // we sent a SETTINGS frame and haven't heard back
goAway *GoAwayFrame // if non-nil, the GoAwayFrame we received
goAwayDebug string // goAway frame's debug data, retained as a string
streams map[uint32]*clientStream // client-initiated
nextStreamID uint32
pendingRequests int // requests blocked and waiting to be sent because len(streams) == maxConcurrentStreams
pings map[[8]byte]chan struct{} // in flight ping data to notification channel
bw *bufio.Writer
br *bufio.Reader
fr *Framer
lastActive time.Time
maxFrameSize uint32
maxConcurrentStreams uint32
peerMaxHeaderListSize uint64
initialWindowSize uint32
hbuf bytes.Buffer // HPACK encoder writes into this
henc *hpack.Encoder
freeBuf [][]byte
wmu sync.Mutex // held while writing; acquire AFTER mu if holding both
werr error // first write error that has occurred
}
type clientStream struct {
cc *ClientConn
req *http.Request
trace *httptrace.ClientTrace // or nil
ID uint32
resc chan resAndError
bufPipe pipe // buffered pipe with the flow-controlled response payload
startedWrite bool // started request body write; guarded by cc.mu
requestedGzip bool
on100 func() // optional code to run if get a 100 continue response
flow flow // guarded by cc.mu
inflow flow // guarded by cc.mu
bytesRemain int64 // -1 means unknown; owned by transportResponseBody.Read
readErr error // sticky read error; owned by transportResponseBody.Read
stopReqBody error // if non-nil, stop writing req body; guarded by cc.mu
didReset bool // whether we sent a RST_STREAM to the server; guarded by cc.mu
peerReset chan struct{} // closed on peer reset
resetErr error // populated before peerReset is closed
done chan struct{} // closed when stream remove from cc.streams map; close calls guarded by cc.mu
firstByte bool // got the first response byte
pastHeaders bool // got first MetaHeadersFrame (actual headers)
pastTrailers bool // got optional second MetaHeadersFrame (trailers)
num1xx uint8 // number of 1xx responses seen
trailer http.Header // accumulated trailers
resTrailer *http.Header // client's Response.Trailer
}
func ( *clientStream) () func(int, textproto.MIMEHeader) error {
if := got1xxFuncForTests; != nil {
return
}
return traceGot1xxResponseFunc(.trace)
}
func ( *clientStream) ( *http.Request) {
if := awaitRequestCancel(, .done); != nil {
.cancelStream()
.bufPipe.CloseWithError()
}
}
func ( *clientStream) () {
:= .cc
.mu.Lock()
:= .didReset
.didReset = true
.mu.Unlock()
if ! {
.writeStreamReset(.ID, ErrCodeCancel, nil)
.forgetStreamID(.ID)
}
}
func ( *clientStream) () error {
select {
case <-.peerReset:
return .resetErr
case <-.done:
return errStreamClosed
default:
return nil
}
}
func ( *clientStream) () bool {
:= .cc
.mu.Lock()
defer .mu.Unlock()
return .startedWrite
}
func ( *clientStream) ( error) {
if == nil {
panic("nil error")
}
:= .cc
.mu.Lock()
.stopReqBody =
.cond.Broadcast()
.mu.Unlock()
}
type stickyErrWriter struct {
w io.Writer
err *error
}
func ( stickyErrWriter) ( []byte) ( int, error) {
if *.err != nil {
return 0, *.err
}
, = .w.Write()
*.err =
return
}
type noCachedConnError struct{}
func (noCachedConnError) () {}
func (noCachedConnError) () string { return "http2: no cached connection was available" }
func ( error) bool {
, := .(interface{ () })
return
}
var ErrNoCachedConn error = noCachedConnError{}
OnlyCachedConn bool
}
func ( *Transport) ( *http.Request) (*http.Response, error) {
return .RoundTripOpt(, RoundTripOpt{})
}
func ( *Transport) ( *http.Request, RoundTripOpt) (*http.Response, error) {
if !(.URL.Scheme == "https" || (.URL.Scheme == "http" && .AllowHTTP)) {
return nil, errors.New("http2: unsupported scheme")
}
:= authorityAddr(.URL.Scheme, .URL.Host)
for := 0; ; ++ {
, := .connPool().GetClientConn(, )
if != nil {
.vlogf("http2: Transport failed to get client conn for %s: %v", , )
return nil,
}
:= !atomic.CompareAndSwapUint32(&.reused, 0, 1)
traceGotConn(, , )
, , := .roundTrip()
if != nil && <= 6 {
func ( *Transport) () {
if , := .connPool().(clientConnPoolIdleCloser); {
.closeIdleConnections()
}
}
var (
errClientConnClosed = errors.New("http2: client conn is closed")
errClientConnUnusable = errors.New("http2: client conn not usable")
errClientConnGotGoAway = errors.New("http2: Transport received Server's graceful shutdown GOAWAY")
)
if ! {
return , nil
}
return nil, fmt.Errorf("http2: Transport: cannot retry err [%v] after Request.Body was written; define Request.GetBody to avoid this error", )
}
func ( error) bool {
if == errClientConnUnusable || == errClientConnGotGoAway {
return true
}
if , := .(StreamError); {
return .Code == ErrCodeRefusedStream
}
return false
}
func ( *Transport) ( string, bool) (*ClientConn, error) {
, , := net.SplitHostPort()
if != nil {
return nil,
}
, := .dialTLS()("tcp", , .newTLSConfig())
if != nil {
return nil,
}
return .newClientConn(, )
}
func ( *Transport) ( string) *tls.Config {
:= new(tls.Config)
if .TLSClientConfig != nil {
* = *.TLSClientConfig.Clone()
}
if !strSliceContains(.NextProtos, NextProtoTLS) {
.NextProtos = append([]string{NextProtoTLS}, .NextProtos...)
}
if .ServerName == "" {
.ServerName =
}
return
}
func ( *Transport) () func(string, string, *tls.Config) (net.Conn, error) {
if .DialTLS != nil {
return .DialTLS
}
return .dialTLSDefault
}
func ( *Transport) (, string, *tls.Config) (net.Conn, error) {
, := tls.Dial(, , )
if != nil {
return nil,
}
if := .Handshake(); != nil {
return nil,
}
if !.InsecureSkipVerify {
if := .VerifyHostname(.ServerName); != nil {
return nil,
}
}
:= .ConnectionState()
if := .NegotiatedProtocol; != NextProtoTLS {
return nil, fmt.Errorf("http2: unexpected ALPN protocol %q; want %q", , NextProtoTLS)
}
if !.NegotiatedProtocolIsMutual {
return nil, errors.New("http2: could not negotiate protocol mutually")
}
return , nil
}
func ( *Transport) () bool {
return .t1 != nil && .t1.DisableKeepAlives
}
func ( *Transport) () time.Duration {
if .t1 == nil {
return 0
}
return .t1.ExpectContinueTimeout
}
func ( *Transport) ( net.Conn) (*ClientConn, error) {
return .newClientConn(, .disableKeepAlives())
}
func ( *Transport) ( net.Conn, bool) (*ClientConn, error) {
:= &ClientConn{
t: ,
tconn: ,
readerDone: make(chan struct{}),
nextStreamID: 1,
maxFrameSize: 16 << 10, // spec default
initialWindowSize: 65535, // spec default
maxConcurrentStreams: 1000, // "infinite", per spec. 1000 seems good enough.
peerMaxHeaderListSize: 0xffffffffffffffff, // "infinite", per spec. Use 2^64-1 instead.
streams: make(map[uint32]*clientStream),
singleUse: ,
wantSettingsAck: true,
pings: make(map[[8]byte]chan struct{}),
}
if := .idleConnTimeout(); != 0 {
.idleTimeout =
.idleTimer = time.AfterFunc(, .onIdleTimeout)
}
if VerboseLogs {
.vlogf("http2: Transport creating client conn %p to %v", , .RemoteAddr())
}
.cond = sync.NewCond(&.mu)
.flow.add(int32(initialWindowSize))
.bw = bufio.NewWriter(stickyErrWriter{, &.werr})
.br = bufio.NewReader()
.fr = NewFramer(.bw, .br)
.fr.ReadMetaHeaders = hpack.NewDecoder(initialHeaderTableSize, nil)
.fr.MaxHeaderListSize = .maxHeaderListSize()
.henc = hpack.NewEncoder(&.hbuf)
if .AllowHTTP {
.nextStreamID = 3
}
if , := .(connectionStater); {
:= .ConnectionState()
.tlsState = &
}
:= []Setting{
{ID: SettingEnablePush, Val: 0},
{ID: SettingInitialWindowSize, Val: transportDefaultStreamFlow},
}
if := .maxHeaderListSize(); != 0 {
= append(, Setting{ID: SettingMaxHeaderListSize, Val: })
}
.bw.Write(clientPreface)
.fr.WriteSettings(...)
.fr.WriteWindowUpdate(0, transportDefaultConnFlow)
.inflow.add(transportDefaultConnFlow + initialWindowSize)
.bw.Flush()
if .werr != nil {
.Close()
return nil, .werr
}
go .readLoop()
return , nil
}
func ( *ClientConn) () {
, := context.WithTimeout(context.Background(), )
defer ()
:= .Ping()
if != nil {
.closeForLostPing()
.t.connPool().MarkDead()
return
}
}
func ( *ClientConn) ( *GoAwayFrame) {
.mu.Lock()
defer .mu.Unlock()
:= .goAway
.goAway =
if .goAwayDebug == "" {
.goAwayDebug = string(.DebugData())
}
if != nil && .ErrCode != ErrCodeNo {
.goAway.ErrCode = .ErrCode
}
:= .LastStreamID
for , := range .streams {
if > {
select {
case .resc <- resAndError{err: errClientConnGotGoAway}:
default:
}
}
}
}
func ( *ClientConn) () bool {
.mu.Lock()
defer .mu.Unlock()
return .canTakeNewRequestLocked()
}
type clientConnIdleState struct {
canTakeNewRequest bool
freshConn bool // whether it's unused by any previous request
}
func ( *ClientConn) () clientConnIdleState {
.mu.Lock()
defer .mu.Unlock()
return .idleStateLocked()
}
func ( *ClientConn) () ( clientConnIdleState) {
if .singleUse && .nextStreamID > 1 {
return
}
var bool
= true
} else {
= int64(len(.streams)+1) < int64(.maxConcurrentStreams)
}
.canTakeNewRequest = .goAway == nil && !.closed && !.closing && &&
int64(.nextStreamID)+2*int64(.pendingRequests) < math.MaxInt32 &&
!.tooIdleLocked()
.freshConn = .nextStreamID == 1 && .canTakeNewRequest
return
}
func ( *ClientConn) () bool {
:= .idleStateLocked()
return .canTakeNewRequest
}
return .idleTimeout != 0 && !.lastIdle.IsZero() && time.Since(.lastIdle.Round(0)) > .idleTimeout
}
func ( *ClientConn) () {
.closeIfIdle()
}
func ( *ClientConn) () {
.mu.Lock()
if len(.streams) > 0 {
.mu.Unlock()
return
}
.closed = true
.mu.Unlock()
if VerboseLogs {
.vlogf("http2: Transport closing idle conn %p (forSingleUse=%v, maxStream=%v)", , .singleUse, -2)
}
.tconn.Close()
}
var shutdownEnterWaitStateHook = func() {}
func ( *ClientConn) ( context.Context) error {
if := .sendGoAway(); != nil {
return
return nil
:= .nextStreamID
if := .fr.WriteGoAway(, ErrCodeNo, nil); != nil {
return
}
if := .bw.Flush(); != nil {
return
func ( *ClientConn) () error {
:= errors.New("http2: client connection force closed via ClientConn.Close")
return .closeForError()
}
func ( *ClientConn) () error {
:= errors.New("http2: client connection lost")
return .closeForError()
}
const maxAllocFrameSize = 512 << 10
func ( *ClientConn) () []byte {
.mu.Lock()
:= .maxFrameSize
if > maxAllocFrameSize {
= maxAllocFrameSize
}
for , := range .freeBuf {
if len() >= int() {
.freeBuf[] = nil
.mu.Unlock()
return [:]
}
}
.mu.Unlock()
return make([]byte, )
}
func ( *ClientConn) ( []byte) {
.mu.Lock()
defer .mu.Unlock()
const = 4 // arbitrary; 4 concurrent requests per conn? investigate.
if len(.freeBuf) < {
.freeBuf = append(.freeBuf, )
return
}
for , := range .freeBuf {
if == nil {
.freeBuf[] =
return
}
}
var errRequestCanceled = errors.New("net/http: request canceled")
func ( *http.Request) (string, error) {
:= make([]string, 0, len(.Trailer))
for := range .Trailer {
= http.CanonicalHeaderKey()
switch {
case "Transfer-Encoding", "Trailer", "Content-Length":
return "", fmt.Errorf("invalid Trailer key %q", )
}
= append(, )
}
if len() > 0 {
sort.Strings()
return strings.Join(, ","), nil
}
return "", nil
}
func ( *ClientConn) () time.Duration {
if .t.t1 != nil {
return .t.t1.ResponseHeaderTimeout
return 0
}
func ( *http.Request) error {
if := .Header.Get("Upgrade"); != "" {
return fmt.Errorf("http2: invalid Upgrade request header: %q", .Header["Upgrade"])
}
if := .Header["Transfer-Encoding"]; len() > 0 && (len() > 1 || [0] != "" && [0] != "chunked") {
return fmt.Errorf("http2: invalid Transfer-Encoding request header: %q", )
}
if := .Header["Connection"]; len() > 0 && (len() > 1 || [0] != "" && !strings.EqualFold([0], "close") && !strings.EqualFold([0], "keep-alive")) {
return fmt.Errorf("http2: invalid Connection request header: %q", )
}
return nil
}
func ( *http.Request) int64 {
if .Body == nil || .Body == http.NoBody {
return 0
}
if .ContentLength != 0 {
return .ContentLength
}
return -1
}
func ( *ClientConn) ( *http.Request) (*http.Response, error) {
, , := .roundTrip()
return ,
}
func ( *ClientConn) ( *http.Request) ( *http.Response, bool, error) {
if := checkConnHeaders(); != nil {
return nil, false,
}
if .idleTimer != nil {
.idleTimer.Stop()
}
, := commaSeparatedTrailers()
if != nil {
return nil, false,
}
:= != ""
.mu.Lock()
if := .awaitOpenSlotForRequest(); != nil {
.mu.Unlock()
return nil, false,
}
:= .Body
:= actualContentLength()
:= != 0
= true
}
, := .encodeHeaders(, , , )
if != nil {
.mu.Unlock()
return nil, false,
}
:= .newStream()
.req =
.trace = httptrace.ContextClientTrace(.Context())
.requestedGzip =
:= .t.getBodyWriterState(, )
.on100 = .on100
defer func() {
.wmu.Lock()
:= .werr
.wmu.Unlock()
if != nil {
.Close()
}
}()
.wmu.Lock()
:= ! && !
:= .writeHeaders(.ID, , int(.maxFrameSize), )
.wmu.Unlock()
traceWroteHeaders(.trace)
.mu.Unlock()
if != nil {
if {
.Body.Close() // per RoundTripper contract
.cancel()
}
traceWroteRequest(.trace, )
return nil, false,
}
var <-chan time.Time
if {
.scheduleBodyWrite()
} else {
traceWroteRequest(.trace, nil)
if := .responseHeaderTimeout(); != 0 {
:= time.NewTimer()
defer .Stop()
= .C
}
}
:= .resc
:= false
:= .Context()
:= func( resAndError) (*http.Response, bool, error) {
:= .res
.cancel()
.abortRequestBodyWrite(errStopReqBodyWrite)
if && ! {
<-.resc
}
}
if .err != nil {
.forgetStreamID(.ID)
return nil, .getStartedWrite(), .err
}
.Request =
.TLS = .tlsState
return , false, nil
}
for {
select {
case := <-:
return ()
case <-:
if ! || {
.writeStreamReset(.ID, ErrCodeCancel, nil)
} else {
.cancel()
.abortRequestBodyWrite(errStopReqBodyWriteAndCancel)
<-.resc
}
.forgetStreamID(.ID)
return nil, .getStartedWrite(), errTimeout
case <-.Done():
if ! || {
.writeStreamReset(.ID, ErrCodeCancel, nil)
} else {
.cancel()
.abortRequestBodyWrite(errStopReqBodyWriteAndCancel)
<-.resc
}
.forgetStreamID(.ID)
return nil, .getStartedWrite(), .Err()
case <-.Cancel:
if ! || {
.writeStreamReset(.ID, ErrCodeCancel, nil)
} else {
.cancel()
.abortRequestBodyWrite(errStopReqBodyWriteAndCancel)
<-.resc
}
.forgetStreamID(.ID)
return nil, .getStartedWrite(), errRequestCanceled
return nil, .getStartedWrite(), .resetErr
case := <-.resc:
select {
case := <-:
return ()
default:
}
if != nil {
.forgetStreamID(.ID)
return nil, .getStartedWrite(),
}
if := .responseHeaderTimeout(); != 0 {
:= time.NewTimer()
defer .Stop()
= .C
}
}
}
}
func ( *ClientConn) ( *http.Request) error {
var chan struct{}
var error // guarded by cc.mu
for {
.lastActive = time.Now()
if .closed || !.canTakeNewRequestLocked() {
if != nil {
close()
}
return errClientConnUnusable
}
.lastIdle = time.Time{}
if int64(len(.streams))+1 <= int64(.maxConcurrentStreams) {
if != nil {
close()
}
return nil
if == nil {
= make(chan struct{})
go func() {
if := awaitRequestCancel(, ); != nil {
.mu.Lock()
=
.cond.Broadcast()
.mu.Unlock()
}
}()
}
.pendingRequests++
.cond.Wait()
.pendingRequests--
if != nil {
return
}
}
}
func ( *ClientConn) ( uint32, bool, int, []byte) error {
:= true // first frame written (HEADERS is first, then CONTINUATION)
for len() > 0 && .werr == nil {
:=
if len() > {
= [:]
}
= [len():]
:= len() == 0
if {
.fr.WriteHeaders(HeadersFrameParam{
StreamID: ,
BlockFragment: ,
EndStream: ,
EndHeaders: ,
})
= false
} else {
.fr.WriteContinuation(, , )
}
errStopReqBodyWrite = errors.New("http2: aborting request body write")
errStopReqBodyWriteAndCancel = errors.New("http2: canceling request")
errReqBodyTooLong = errors.New("http2: request body larger than specified content length")
)
func ( *clientStream) ( io.Reader, io.Closer) ( error) {
:= .cc
:= false // whether we sent the final DATA frame w/ END_STREAM
:= .frameScratchBuffer()
defer .putFrameScratchBuffer()
defer func() {
var int
, = .Read([:])
-= int64()
}
if < 0 {
= errReqBodyTooLong
.writeStreamReset(.ID, ErrCodeCancel, )
return
}
}
if == io.EOF {
= true
= nil
} else if != nil {
.writeStreamReset(.ID, ErrCodeCancel, )
return
}
:= [:]
for len() > 0 && == nil {
var int32
, = .awaitFlowControl(len())
switch {
case == errStopReqBodyWrite:
return
case == errStopReqBodyWriteAndCancel:
.writeStreamReset(.ID, ErrCodeCancel, nil)
return
case != nil:
return
}
.wmu.Lock()
:= [:]
= [:]
= && len() == 0 && !
= .fr.WriteData(.ID, , )
return nil
}
var []byte
if {
.mu.Lock()
, = .encodeTrailers()
.mu.Unlock()
if != nil {
.writeStreamReset(.ID, ErrCodeInternal, )
.forgetStreamID(.ID)
return
}
}
.mu.Lock()
:= int(.maxFrameSize)
.mu.Unlock()
.wmu.Lock()
defer .wmu.Unlock()
func ( *clientStream) ( int) ( int32, error) {
:= .cc
.mu.Lock()
defer .mu.Unlock()
for {
if .closed {
return 0, errClientConnClosed
}
if .stopReqBody != nil {
return 0, .stopReqBody
}
if := .checkResetOrDone(); != nil {
return 0,
}
if := .flow.available(); > 0 {
:=
if int() > {
= int32() // can't truncate int; take is int32
}
if > int32(.maxFrameSize) {
= int32(.maxFrameSize)
}
.flow.take()
return , nil
}
.cond.Wait()
}
}
func ( *ClientConn) ( *http.Request, bool, string, int64) ([]byte, error) {
.hbuf.Reset()
:= .Host
if == "" {
= .URL.Host
}
, := httpguts.PunycodeHostPort()
if != nil {
return nil,
}
var string
if .Method != "CONNECT" {
= .URL.RequestURI()
if !validPseudoPath() {
:=
= strings.TrimPrefix(, .URL.Scheme+"://"+)
if !validPseudoPath() {
if .URL.Opaque != "" {
return nil, fmt.Errorf("invalid request :path %q from URL.Opaque = %q", , .URL.Opaque)
} else {
return nil, fmt.Errorf("invalid request :path %q", )
}
}
}
}
continue
for +1 <= len() && [] == ' ' {
++
}
= [:]
}
if len() > 0 {
("cookie", )
}
}
continue
}
for , := range {
(, )
}
}
if shouldSendReqContentLength(.Method, ) {
("content-length", strconv.FormatInt(, 10))
}
if {
("accept-encoding", "gzip")
}
if ! {
("user-agent", defaultUserAgent)
}
}
:= uint64(0)
(func(, string) {
:= hpack.HeaderField{Name: , Value: }
+= uint64(.Size())
})
if > .peerMaxHeaderListSize {
return nil, errRequestHeaderListSize
}
:= httptrace.ContextClientTrace(.Context())
:= traceHasWroteHeaderField()
(func(, string) {
= strings.ToLower()
.writeHeader(, )
if {
traceWroteHeaderField(, , )
}
})
return .hbuf.Bytes(), nil
}
func ( *ClientConn) ( *http.Request) ([]byte, error) {
.hbuf.Reset()
:= uint64(0)
for , := range .Trailer {
for , := range {
:= hpack.HeaderField{Name: , Value: }
+= uint64(.Size())
}
}
if > .peerMaxHeaderListSize {
return nil, errRequestHeaderListSize
}
:= strings.ToLower()
for , := range {
.writeHeader(, )
}
}
return .hbuf.Bytes(), nil
}
func ( *ClientConn) (, string) {
if VerboseLogs {
log.Printf("http2: Transport encoding header %q = %q", , )
}
.henc.WriteField(hpack.HeaderField{Name: , Value: })
}
type resAndError struct {
_ incomparable
res *http.Response
err error
}
func ( *ClientConn) () *clientStream {
:= &clientStream{
cc: ,
ID: .nextStreamID,
resc: make(chan resAndError, 1),
peerReset: make(chan struct{}),
done: make(chan struct{}),
}
.flow.add(int32(.initialWindowSize))
.flow.setConnFlow(&.flow)
.inflow.add(transportDefaultStreamFlow)
.inflow.setConnFlow(&.inflow)
.nextStreamID += 2
.streams[.ID] =
return
}
func ( *ClientConn) ( uint32) {
.streamByID(, true)
}
func ( *ClientConn) ( uint32, bool) *clientStream {
.mu.Lock()
defer .mu.Unlock()
:= .streams[]
if && != nil && !.closed {
.lastActive = time.Now()
delete(.streams, )
if len(.streams) == 0 && .idleTimer != nil {
.idleTimer.Reset(.idleTimeout)
.lastIdle = time.Now()
}
type clientConnReadLoop struct {
_ incomparable
cc *ClientConn
closeWhenIdle bool
}
func ( *ClientConn) () {
:= &clientConnReadLoop{cc: }
defer .cleanup()
.readerErr = .run()
if , := .readerErr.(ConnectionError); {
.wmu.Lock()
.fr.WriteGoAway(0, ErrCode(), nil)
.wmu.Unlock()
}
}
type GoAwayError struct {
LastStreamID uint32
ErrCode ErrCode
DebugData string
}
func ( GoAwayError) () string {
return fmt.Sprintf("http2: server sent GOAWAY and closed the connection; LastStreamID=%v, ErrCode=%v, debug=%q",
.LastStreamID, .ErrCode, .DebugData)
}
func ( error) bool {
if == io.EOF {
return true
}
, := .(*net.OpError)
return && .Op == "read"
}
func ( *clientConnReadLoop) () {
:= .cc
defer .tconn.Close()
defer .t.connPool().MarkDead()
defer close(.readerDone)
if .idleTimer != nil {
.idleTimer.Stop()
}
:= .readerErr
.mu.Lock()
if .goAway != nil && isEOFOrNetReadError() {
= GoAwayError{
LastStreamID: .goAway.LastStreamID,
ErrCode: .goAway.ErrCode,
DebugData: .goAwayDebug,
}
} else if == io.EOF {
= io.ErrUnexpectedEOF
}
for , := range .streams {
.bufPipe.CloseWithError() // no-op if already closed
select {
case .resc <- resAndError{err: }:
default:
}
close(.done)
}
.closed = true
.cond.Broadcast()
.mu.Unlock()
}
func ( *clientConnReadLoop) () error {
:= .cc
.closeWhenIdle = .t.disableKeepAlives() || .singleUse
:= false // ever saw a HEADERS reply
:= false
:= .t.ReadIdleTimeout
var *time.Timer
if != 0 {
= time.AfterFunc(, .healthCheck)
defer .Stop()
}
for {
, := .fr.ReadFrame()
if != nil {
.Reset()
}
if != nil {
.vlogf("http2: Transport readFrame error on conn %p: (%T) %v", , , )
}
if , := .(StreamError); {
if := .streamByID(.StreamID, false); != nil {
.cc.writeStreamReset(.ID, .Code, )
.cc.forgetStreamID(.ID)
if .Cause == nil {
.Cause = .fr.errDetail
}
.endStreamError(, )
}
continue
} else if != nil {
return
}
if VerboseLogs {
.vlogf("http2: Transport received %s", summarizeFrame())
}
if ! {
if , := .(*SettingsFrame); ! {
.logf("protocol error: received %T before a SETTINGS frame", )
return ConnectionError(ErrCodeProtocol)
}
= true
}
:= false // whether frame might transition us to idle
switch f := .(type) {
case *MetaHeadersFrame:
= .processHeaders()
= true
= true
case *DataFrame:
= .processData()
= true
case *GoAwayFrame:
= .processGoAway()
= true
case *RSTStreamFrame:
= .processResetStream()
= true
case *SettingsFrame:
= .processSettings()
case *PushPromiseFrame:
= .processPushPromise()
case *WindowUpdateFrame:
= .processWindowUpdate()
case *PingFrame:
= .processPing()
default:
.logf("Transport: unhandled response frame type %T", )
}
if != nil {
if VerboseLogs {
.vlogf("http2: Transport conn %p received error from processing frame %v: %v", , summarizeFrame(), )
}
return
}
if .closeWhenIdle && && {
.closeIfIdle()
}
}
}
func ( *clientConnReadLoop) ( *MetaHeadersFrame) error {
:= .cc
:= .streamByID(.StreamID, false)
return nil
}
if .req.Body != nil {
defer .forgetStreamID(.StreamID)
} else {
.forgetStreamID(.StreamID)
}
}
if !.firstByte {
traceFirstResponseByte(.trace)
}
.firstByte = true
}
if !.pastHeaders {
.pastHeaders = true
} else {
return .processTrailers(, )
}
, := .handleResponse(, )
if != nil {
if , := .(ConnectionError); {
return
.cc.writeStreamReset(.StreamID, ErrCodeProtocol, )
.forgetStreamID(.ID)
.resc <- resAndError{err: }
return nil // return nil from process* funcs to keep conn alive
}
return nil
}
.resTrailer = &.Trailer
.resc <- resAndError{res: }
return nil
}
func ( *clientConnReadLoop) ( *clientStream, *MetaHeadersFrame) (*http.Response, error) {
if .Truncated {
return nil, errResponseHeaderListSize
}
:= .PseudoValue("status")
if == "" {
return nil, errors.New("malformed response from server: missing status pseudo header")
}
, := strconv.Atoi()
if != nil {
return nil, errors.New("malformed response from server: malformed non-numeric status pseudo header")
}
:= .RegularFields()
:= make([]string, len())
:= make(http.Header, len())
:= &http.Response{
Proto: "HTTP/2.0",
ProtoMajor: 2,
Header: ,
StatusCode: ,
Status: + " " + http.StatusText(),
}
for , := range {
:= http.CanonicalHeaderKey(.Name)
if == "Trailer" {
:= .Trailer
if == nil {
= make(http.Header)
.Trailer =
}
foreachHeaderElement(.Value, func( string) {
[http.CanonicalHeaderKey()] = nil
})
} else {
:= []
, = [:1:1], [1:]
[0] = .Value
[] =
} else {
[] = append(, .Value)
}
}
}
if >= 100 && <= 199 {
.num1xx++
const = 5 // arbitrary bound on number of informational responses, same as net/http
if .num1xx > {
return nil, errors.New("http2: too many 1xx informational responses")
}
if := .get1xxTraceFunc(); != nil {
if := (, textproto.MIMEHeader()); != nil {
return nil,
}
}
if == 100 {
traceGot100Continue(.trace)
if .on100 != nil {
.on100() // forces any write delay timer to fire
}
}
.pastHeaders = false // do it all again
return nil, nil
}
:= .StreamEnded()
:= .req.Method == "HEAD"
if ! || {
.ContentLength = -1
if := .Header["Content-Length"]; len() == 1 {
if , := strconv.ParseUint([0], 10, 63); == nil {
.ContentLength = int64()
}
}
}
if || {
.Body = noBody
return , nil
}
.bufPipe = pipe{b: &dataBuffer{expected: .ContentLength}}
.bytesRemain = .ContentLength
.Body = transportResponseBody{}
go .awaitRequestCancel(.req)
if .requestedGzip && .Header.Get("Content-Encoding") == "gzip" {
.Header.Del("Content-Encoding")
.Header.Del("Content-Length")
.ContentLength = -1
.Body = &gzipReader{body: .Body}
.Uncompressed = true
}
return , nil
}
func ( *clientConnReadLoop) ( *clientStream, *MetaHeadersFrame) error {
return ConnectionError(ErrCodeProtocol)
}
.pastTrailers = true
return ConnectionError(ErrCodeProtocol)
}
return ConnectionError(ErrCodeProtocol)
}
:= make(http.Header)
for , := range .RegularFields() {
:= http.CanonicalHeaderKey(.Name)
[] = append([], .Value)
}
.trailer =
.endStream()
return nil
}
type transportResponseBody struct {
cs *clientStream
}
func ( transportResponseBody) ( []byte) ( int, error) {
:= .cs
:= .cc
if .readErr != nil {
return 0, .readErr
}
, = .cs.bufPipe.Read()
if .bytesRemain != -1 {
if int64() > .bytesRemain {
= int(.bytesRemain)
if == nil {
= errors.New("net/http: server replied with more than declared Content-Length; truncated")
.writeStreamReset(.ID, ErrCodeProtocol, )
}
.readErr =
return int(.bytesRemain),
}
.bytesRemain -= int64()
if == io.EOF && .bytesRemain > 0 {
= io.ErrUnexpectedEOF
.readErr =
return ,
}
}
if := .inflow.available(); < transportDefaultConnFlow/2 {
= transportDefaultConnFlow -
.inflow.add()
}
:= int(.inflow.available()) + .bufPipe.Len()
if < transportDefaultStreamFlow-transportDefaultStreamMinRefresh {
= int32(transportDefaultStreamFlow - )
.inflow.add()
}
}
if != 0 || != 0 {
.wmu.Lock()
defer .wmu.Unlock()
if != 0 {
.fr.WriteWindowUpdate(0, mustUint31())
}
if != 0 {
.fr.WriteWindowUpdate(.ID, mustUint31())
}
.bw.Flush()
}
return
}
var errClosedResponseBody = errors.New("http2: response body closed")
func ( transportResponseBody) () error {
:= .cs
:= .cc
:= .bufPipe.Err() == io.EOF
:= .bufPipe.Len()
if > 0 || ! {
.mu.Lock()
.wmu.Lock()
if ! {
.fr.WriteRSTStream(.ID, ErrCodeCancel)
.didReset = true
if > 0 {
.inflow.add(int32())
.fr.WriteWindowUpdate(0, uint32())
}
.bw.Flush()
.wmu.Unlock()
.mu.Unlock()
}
.bufPipe.BreakWithError(errClosedResponseBody)
.forgetStreamID(.ID)
return nil
}
func ( *clientConnReadLoop) ( *DataFrame) error {
:= .cc
:= .streamByID(.StreamID, .StreamEnded())
:= .Data()
if == nil {
.mu.Lock()
:= .nextStreamID
.mu.Unlock()
.logf("http2: Transport received unsolicited DATA frame; closing connection")
return ConnectionError(ErrCodeProtocol)
if .Length > 0 {
.mu.Lock()
.inflow.add(int32(.Length))
.mu.Unlock()
.wmu.Lock()
.fr.WriteWindowUpdate(0, uint32(.Length))
.bw.Flush()
.wmu.Unlock()
}
return nil
}
if !.firstByte {
.logf("protocol error: received DATA before a HEADERS frame")
.endStreamError(, StreamError{
StreamID: .StreamID,
Code: ErrCodeProtocol,
})
return nil
}
if .Length > 0 {
if .req.Method == "HEAD" && len() > 0 {
.logf("protocol error: received DATA on a HEAD request")
.endStreamError(, StreamError{
StreamID: .StreamID,
Code: ErrCodeProtocol,
})
return nil
:= .didReset
if {
+= len()
}
if > 0 {
.inflow.add(int32())
.wmu.Lock()
.fr.WriteWindowUpdate(0, uint32())
if ! {
.inflow.add(int32())
.fr.WriteWindowUpdate(.ID, uint32())
}
.bw.Flush()
.wmu.Unlock()
}
.mu.Unlock()
if len() > 0 && ! {
if , := .bufPipe.Write(); != nil {
.endStreamError(, )
return
}
}
}
if .StreamEnded() {
.endStream()
}
return nil
}
.endStreamError(, nil)
}
func ( *clientConnReadLoop) ( *clientStream, error) {
var func()
if == nil {
= io.EOF
= .copyTrailers
}
if isConnectionCloseRequest(.req) {
.closeWhenIdle = true
}
.bufPipe.closeWithErrorAndCode(, )
select {
case .resc <- resAndError{err: }:
default:
}
}
func ( *clientStream) () {
for , := range .trailer {
:= .resTrailer
if * == nil {
* = make(http.Header)
}
(*)[] =
}
}
func ( *clientConnReadLoop) ( *GoAwayFrame) error {
:= .cc
.t.connPool().MarkDead()
.vlogf("transport got GOAWAY with error code = %v", .ErrCode)
}
.setGoAway()
return nil
}
func ( *clientConnReadLoop) ( *SettingsFrame) error {
:= .cc
.mu.Lock()
defer .mu.Unlock()
if .IsAck() {
if .wantSettingsAck {
.wantSettingsAck = false
return nil
}
return ConnectionError(ErrCodeProtocol)
}
:= .ForeachSetting(func( Setting) error {
switch .ID {
case SettingMaxFrameSize:
.maxFrameSize = .Val
case SettingMaxConcurrentStreams:
.maxConcurrentStreams = .Val
case SettingMaxHeaderListSize:
.peerMaxHeaderListSize = uint64(.Val)
if .Val > math.MaxInt32 {
return ConnectionError(ErrCodeFlowControl)
}
.vlogf("Unhandled Setting: %v", )
}
return nil
})
if != nil {
return
}
.wmu.Lock()
defer .wmu.Unlock()
.fr.WriteSettingsAck()
.bw.Flush()
return .werr
}
func ( *clientConnReadLoop) ( *WindowUpdateFrame) error {
:= .cc
:= .streamByID(.StreamID, false)
if .StreamID != 0 && == nil {
return nil
}
.mu.Lock()
defer .mu.Unlock()
:= &.flow
if != nil {
= &.flow
}
if !.add(int32(.Increment)) {
return ConnectionError(ErrCodeFlowControl)
}
.cond.Broadcast()
return nil
}
func ( *clientConnReadLoop) ( *RSTStreamFrame) error {
:= .cc.streamByID(.StreamID, true)
func ( *ClientConn) ( context.Context) error {
return ConnectionError(ErrCodeProtocol)
}
.wmu.Lock()
.fr.WriteRSTStream(, )
.bw.Flush()
.wmu.Unlock()
}
var (
errResponseHeaderListSize = errors.New("http2: response header list larger than advertised limit")
errRequestHeaderListSize = errors.New("http2: request header list larger than peer's advertised limit")
)
func ( *ClientConn) ( string, ...interface{}) {
.t.logf(, ...)
}
func ( *ClientConn) ( string, ...interface{}) {
.t.vlogf(, ...)
}
func ( *Transport) ( string, ...interface{}) {
if VerboseLogs {
.logf(, ...)
}
}
func ( *Transport) ( string, ...interface{}) {
log.Printf(, ...)
}
var noBody io.ReadCloser = ioutil.NopCloser(bytes.NewReader(nil))
func ( []string, string) bool {
for , := range {
if == {
return true
}
}
return false
}
type erringRoundTripper struct{ err error }
func ( erringRoundTripper) () error { return .err }
func ( erringRoundTripper) (*http.Request) (*http.Response, error) { return nil, .err }
type gzipReader struct {
_ incomparable
body io.ReadCloser // underlying Response.Body
zr *gzip.Reader // lazily-initialized gzip reader
zerr error // sticky error
}
func ( *gzipReader) ( []byte) ( int, error) {
if .zerr != nil {
return 0, .zerr
}
if .zr == nil {
.zr, = gzip.NewReader(.body)
if != nil {
.zerr =
return 0,
}
}
return .zr.Read()
}
func ( *gzipReader) () error {
return .body.Close()
}
type errorReader struct{ err error }
func ( errorReader) ( []byte) (int, error) { return 0, .err }
type bodyWriterState struct {
cs *clientStream
timer *time.Timer // if non-nil, we're doing a delayed write
fnonce *sync.Once // to call fn with
fn func() // the code to run in the goroutine, writing the body
resc chan error // result of fn's execution
delay time.Duration // how long we should delay a delayed write for
}
func ( *Transport) ( *clientStream, io.Reader) ( bodyWriterState) {
.cs =
if == nil {
return
}
:= make(chan error, 1)
.resc =
.fn = func() {
.cc.mu.Lock()
.startedWrite = true
.cc.mu.Unlock()
<- .writeRequestBody(, .req.Body)
}
.delay = .expectContinueTimeout()
if .delay == 0 ||
!httpguts.HeaderValuesContainsToken(
.req.Header["Expect"],
"100-continue") {
return
}
.fnonce = new(sync.Once)
func ( bodyWriterState) () {
func ( *http.Transport, noDialH2RoundTripper) ( error) {
defer func() {
if := recover(); != nil {
= fmt.Errorf("%v", )
}
}()
.RegisterProtocol("https", )
return nil
}
type noDialH2RoundTripper struct{ *Transport }
func ( noDialH2RoundTripper) ( *http.Request) (*http.Response, error) {
, := .Transport.RoundTrip()
if isNoCachedConnError() {
return nil, http.ErrSkipAltProtocol
}
return ,
}
func ( *Transport) () time.Duration {
if .t1 != nil {
return .t1.IdleConnTimeout
}
return 0
}
func ( *http.Request, string) {
:= httptrace.ContextClientTrace(.Context())
if == nil || .GetConn == nil {
return
}
.GetConn()
}
func ( *http.Request, *ClientConn, bool) {
:= httptrace.ContextClientTrace(.Context())
if == nil || .GotConn == nil {
return
}
:= httptrace.GotConnInfo{Conn: .tconn}
.Reused =
.mu.Lock()
.WasIdle = len(.streams) == 0 &&
if .WasIdle && !.lastActive.IsZero() {
.IdleTime = time.Now().Sub(.lastActive)
}
.mu.Unlock()
.GotConn()
}
func ( *httptrace.ClientTrace) {
if != nil && .WroteHeaders != nil {
.WroteHeaders()
}
}
func ( *httptrace.ClientTrace) {
if != nil && .Got100Continue != nil {
.Got100Continue()
}
}
func ( *httptrace.ClientTrace) {
if != nil && .Wait100Continue != nil {
.Wait100Continue()
}
}
func ( *httptrace.ClientTrace, error) {
if != nil && .WroteRequest != nil {
.WroteRequest(httptrace.WroteRequestInfo{Err: })
}
}
func ( *httptrace.ClientTrace) {
if != nil && .GotFirstResponseByte != nil {
.GotFirstResponseByte()
}
![]() |
The pages are generated with Golds v0.3.2-preview. (GOOS=darwin GOARCH=amd64) Golds is a Go 101 project developed by Tapir Liu. PR and bug reports are welcome and can be submitted to the issue list. Please follow @Go100and1 (reachable from the left QR code) to get the latest news of Golds. |