Source File
reverseproxy.go
Belonging Package
net/http/httputil
package httputil
import (
)
ModifyResponse func(*http.Response) error
ErrorHandler func(http.ResponseWriter, *http.Request, error)
}
type BufferPool interface {
Get() []byte
Put([]byte)
}
func (, string) string {
:= strings.HasSuffix(, "/")
:= strings.HasPrefix(, "/")
switch {
case && :
return + [1:]
case ! && !:
return + "/" +
}
return +
}
func (, *url.URL) (, string) {
if .RawPath == "" && .RawPath == "" {
return singleJoiningSlash(.Path, .Path), ""
var hopHeaders = []string{
"Connection",
"Proxy-Connection", // non-standard but still sent by libcurl and rejected by e.g. google
"Keep-Alive",
"Proxy-Authenticate",
"Proxy-Authorization",
"Te", // canonicalized version of "TE"
"Trailer", // not Trailers per URL above; https://www.rfc-editor.org/errata_search.php?eid=4522
"Transfer-Encoding",
"Upgrade",
}
func ( *ReverseProxy) ( http.ResponseWriter, *http.Request, error) {
.logf("http: proxy error: %v", )
.WriteHeader(http.StatusBadGateway)
}
func ( *ReverseProxy) () func(http.ResponseWriter, *http.Request, error) {
if .ErrorHandler != nil {
return .ErrorHandler
}
return .defaultErrorHandler
}
func ( *ReverseProxy) ( http.ResponseWriter, *http.Response, *http.Request) bool {
if .ModifyResponse == nil {
return true
}
if := .ModifyResponse(); != nil {
.Body.Close()
.getErrorHandler()(, , )
return false
}
return true
}
func ( *ReverseProxy) ( http.ResponseWriter, *http.Request) {
:= .Transport
if == nil {
= http.DefaultTransport
}
:= .Context()
if , := .(http.CloseNotifier); {
var context.CancelFunc
, = context.WithCancel()
defer ()
:= .CloseNotify()
go func() {
select {
case <-:
()
case <-.Done():
}
}()
}
:= .Clone()
if .ContentLength == 0 {
.Body = nil // Issue 16036: nil Body for http.Transport retries
}
if .Header == nil {
.Header = make(http.Header) // Issue 33142: historical behavior was to always allocate
}
.Director()
.Close = false
:= upgradeType(.Header)
removeConnectionHeaders(.Header)
for , := range hopHeaders {
:= .Header.Get()
if == "" {
continue
}
if .StatusCode == http.StatusSwitchingProtocols {
if !.modifyResponse(, , ) {
return
}
.handleUpgradeResponse(, , )
return
}
removeConnectionHeaders(.Header)
for , := range hopHeaders {
.Header.Del()
}
if !.modifyResponse(, , ) {
return
}
copyHeader(.Header(), .Header)
:= len(.Trailer)
if > 0 {
:= make([]string, 0, len(.Trailer))
for := range .Trailer {
= append(, )
}
.Header().Add("Trailer", strings.Join(, ", "))
}
.WriteHeader(.StatusCode)
= .copyResponse(, .Body, .flushInterval())
if != nil {
if !shouldPanicOnCopyError() {
.logf("suppressing panic for copyResponse error in test; copy error: %v", )
return
}
panic(http.ErrAbortHandler)
}
.Body.Close() // close now, instead of defer, to populate res.Trailer
if , := .(http.Flusher); {
.Flush()
}
}
if len(.Trailer) == {
copyHeader(.Header(), .Trailer)
return
}
for , := range .Trailer {
= http.TrailerPrefix +
for , := range {
.Header().Add(, )
}
}
}
var inOurTests bool // whether we're in our own tests
return true
}
return true
return false
}
if == "text/event-stream" {
return -1 // negative means immediately
}
if .ContentLength == -1 {
return -1
}
return .FlushInterval
}
func ( *ReverseProxy) ( io.Writer, io.Reader, time.Duration) error {
if != 0 {
if , := .(writeFlusher); {
:= &maxLatencyWriter{
dst: ,
latency: ,
}
defer .stop()
.flushPending = true
.t = time.AfterFunc(, .delayedFlush)
=
}
}
var []byte
if .BufferPool != nil {
= .BufferPool.Get()
defer .BufferPool.Put()
}
, := .copyBuffer(, , )
return
}
func ( *ReverseProxy) ( io.Writer, io.Reader, []byte) (int64, error) {
if len() == 0 {
= make([]byte, 32*1024)
}
var int64
for {
, := .Read()
if != nil && != io.EOF && != context.Canceled {
.logf("httputil: ReverseProxy read error during body copy: %v", )
}
if > 0 {
, := .Write([:])
if > 0 {
+= int64()
}
if != nil {
return ,
}
if != {
return , io.ErrShortWrite
}
}
if != nil {
if == io.EOF {
= nil
}
return ,
}
}
}
func ( *ReverseProxy) ( string, ...interface{}) {
if .ErrorLog != nil {
.ErrorLog.Printf(, ...)
} else {
log.Printf(, ...)
}
}
type writeFlusher interface {
io.Writer
http.Flusher
}
type maxLatencyWriter struct {
dst writeFlusher
latency time.Duration // non-zero; negative means to flush immediately
mu sync.Mutex // protects t, flushPending, and dst.Flush
t *time.Timer
flushPending bool
}
func ( *maxLatencyWriter) ( []byte) ( int, error) {
.mu.Lock()
defer .mu.Unlock()
, = .dst.Write()
if .latency < 0 {
.dst.Flush()
return
}
if .flushPending {
return
}
if .t == nil {
.t = time.AfterFunc(.latency, .delayedFlush)
} else {
.t.Reset(.latency)
}
.flushPending = true
return
}
func ( *maxLatencyWriter) () {
.mu.Lock()
defer .mu.Unlock()
if !.flushPending { // if stop was called but AfterFunc already started this goroutine
return
}
.dst.Flush()
.flushPending = false
}
func ( *maxLatencyWriter) () {
.mu.Lock()
defer .mu.Unlock()
.flushPending = false
if .t != nil {
.t.Stop()
}
}
func ( http.Header) string {
if !httpguts.HeaderValuesContainsToken(["Connection"], "Upgrade") {
return ""
}
return strings.ToLower(.Get("Upgrade"))
}
func ( *ReverseProxy) ( http.ResponseWriter, *http.Request, *http.Response) {
:= upgradeType(.Header)
:= upgradeType(.Header)
if != {
.getErrorHandler()(, , fmt.Errorf("backend tried to switch protocol %q when %q was requested", , ))
return
}
, := .(http.Hijacker)
if ! {
.getErrorHandler()(, , fmt.Errorf("can't switch protocols using non-Hijacker ResponseWriter type %T", ))
return
}
, := .Body.(io.ReadWriteCloser)
if ! {
.getErrorHandler()(, , fmt.Errorf("internal error: 101 switching protocols response with non-writable body"))
return
}
:= make(chan bool)
select {
case <-.Context().Done():
case <-:
}
.Close()
}()
defer close()
, , := .Hijack()
if != nil {
.getErrorHandler()(, , fmt.Errorf("Hijack failed on protocol switch: %v", ))
return
}
defer .Close()
copyHeader(.Header(), .Header)
.Header = .Header()
.Body = nil // so res.Write only writes the headers; we have res.Body in backConn above
if := .Write(); != nil {
.getErrorHandler()(, , fmt.Errorf("response write: %v", ))
return
}
if := .Flush(); != nil {
.getErrorHandler()(, , fmt.Errorf("response flush: %v", ))
return
}
:= make(chan error, 1)
:= switchProtocolCopier{user: , backend: }
go .copyToBackend()
go .copyFromBackend()
<-
return
}
type switchProtocolCopier struct {
user, backend io.ReadWriter
}
func ( switchProtocolCopier) ( chan<- error) {
, := io.Copy(.user, .backend)
<-
}
func ( switchProtocolCopier) ( chan<- error) {
, := io.Copy(.backend, .user)
<-
![]() |
The pages are generated with Golds v0.3.2-preview. (GOOS=darwin GOARCH=amd64) Golds is a Go 101 project developed by Tapir Liu. PR and bug reports are welcome and can be submitted to the issue list. Please follow @Go100and1 (reachable from the left QR code) to get the latest news of Golds. |