Source File
pgconn.go
Belonging Package
github.com/jackc/pgconn
package pgconn
import (
)
const (
connStatusUninitialized = iota
connStatusConnecting
connStatusClosed
connStatusIdle
connStatusBusy
)
const wbufLen = 1024
type NoticeHandler func(*PgConn, *Notice)
type NotificationHandler func(*PgConn, *Notification)
type Frontend interface {
Receive() (pgproto3.BackendMessage, error)
}
type PgConn struct {
conn net.Conn // the underlying TCP or unix domain socket connection
pid uint32 // backend pid
secretKey uint32 // key to use to send a cancel query message to the server
parameterStatuses map[string]string // parameters that have been reported by the server
txStatus byte
frontend Frontend
config *Config
status byte // One of connStatus* constants
bufferingReceive bool
bufferingReceiveMux sync.Mutex
bufferingReceiveMsg pgproto3.BackendMessage
bufferingReceiveErr error
peekedMsg pgproto3.BackendMessage
wbuf []byte // write buffer
resultReader ResultReader
multiResultReader MultiResultReader
contextWatcher *ctxwatch.ContextWatcher
cleanupDone chan struct{}
}
func ( context.Context, string) (*PgConn, error) {
, := ParseConfig()
if != nil {
return nil,
}
return ConnectConfig(, )
}
if !.createdByParseConfig {
panic("config must be created by ParseConfig")
}
if .ConnectTimeout != 0 {
var context.CancelFunc
, = context.WithTimeout(, .ConnectTimeout)
defer ()
:= []*FallbackConfig{
{
Host: .Host,
Port: .Port,
TLSConfig: .TLSConfig,
},
}
= append(, .Fallbacks...)
, = expandWithIPs(, .LookupFunc, )
if != nil {
return nil, &connectError{config: , msg: "hostname resolving error", err: }
}
if len() == 0 {
return nil, &connectError{config: , msg: "hostname resolving error", err: errors.New("ip addr wasn't found")}
}
for , := range {
, = connect(, , )
if == nil {
break
} else if , := .(*PgError); {
= &connectError{config: , msg: "server error", err: }
:= "28P01" // worng password
:= "28000" // db does not exist
if .Code == || .Code == {
break
}
}
}
if != nil {
return nil, // no need to wrap in connectError because it will already be wrapped in all cases except PgError
}
if .AfterConnect != nil {
:= .AfterConnect(, )
if != nil {
.conn.Close()
return nil, &connectError{config: , msg: "AfterConnect error", err: }
}
}
return , nil
}
func ( context.Context, LookupFunc, []*FallbackConfig) ([]*FallbackConfig, error) {
var []*FallbackConfig
if strings.HasPrefix(.Host, "/") {
= append(, &FallbackConfig{
Host: .Host,
Port: .Port,
TLSConfig: .TLSConfig,
})
continue
}
, := (, .Host)
if != nil {
return nil,
}
for , := range {
= append(, &FallbackConfig{
Host: ,
Port: .Port,
TLSConfig: .TLSConfig,
})
}
}
return , nil
}
func ( context.Context, *Config, *FallbackConfig) (*PgConn, error) {
:= new(PgConn)
.config =
.wbuf = make([]byte, 0, wbufLen)
.cleanupDone = make(chan struct{})
var error
, := NetworkAddress(.Host, .Port)
.conn, = .DialFunc(, , )
if != nil {
return nil, &connectError{config: , msg: "dial error", err: }
}
.parameterStatuses = make(map[string]string)
if .TLSConfig != nil {
if := .startTLS(.TLSConfig); != nil {
.conn.Close()
return nil, &connectError{config: , msg: "tls error", err: }
}
}
.status = connStatusConnecting
.contextWatcher = ctxwatch.NewContextWatcher(
func() { .conn.SetDeadline(time.Date(1, 1, 1, 1, 1, 1, 1, time.UTC)) },
func() { .conn.SetDeadline(time.Time{}) },
)
.contextWatcher.Watch()
defer .contextWatcher.Unwatch()
.frontend = .BuildFrontend(.conn, .conn)
:= pgproto3.StartupMessage{
ProtocolVersion: pgproto3.ProtocolVersionNumber,
Parameters: make(map[string]string),
}
for , := range .RuntimeParams {
.Parameters[] =
}
.Parameters["user"] = .User
if .Database != "" {
.Parameters["database"] = .Database
}
if , := .conn.Write(.Encode(.wbuf)); != nil {
.conn.Close()
return nil, &connectError{config: , msg: "failed to write startup message", err: }
}
for {
, := .receiveMessage()
if != nil {
.conn.Close()
if , := .(*PgError); {
return nil,
}
return nil, &connectError{config: , msg: "failed to receive message", err: }
}
switch msg := .(type) {
case *pgproto3.BackendKeyData:
.pid = .ProcessID
.secretKey = .SecretKey
case *pgproto3.AuthenticationOk:
case *pgproto3.AuthenticationCleartextPassword:
= .txPasswordMessage(.config.Password)
if != nil {
.conn.Close()
return nil, &connectError{config: , msg: "failed to write password message", err: }
}
case *pgproto3.AuthenticationMD5Password:
:= "md5" + hexMD5(hexMD5(.config.Password+.config.User)+string(.Salt[:]))
= .txPasswordMessage()
if != nil {
.conn.Close()
return nil, &connectError{config: , msg: "failed to write password message", err: }
}
case *pgproto3.AuthenticationSASL:
= .scramAuth(.AuthMechanisms)
if != nil {
.conn.Close()
return nil, &connectError{config: , msg: "failed SASL auth", err: }
}
case *pgproto3.ReadyForQuery:
.status = connStatusIdle
.contextWatcher.Unwatch()
:= .ValidateConnect(, )
if != nil {
.conn.Close()
return nil, &connectError{config: , msg: "ValidateConnect failed", err: }
}
}
return , nil
case *pgproto3.ErrorResponse:
.conn.Close()
return nil, ErrorResponseToPgError()
default:
.conn.Close()
return nil, &connectError{config: , msg: "received unexpected message", err: }
}
}
}
func ( *PgConn) ( *tls.Config) ( error) {
= binary.Write(.conn, binary.BigEndian, []int32{8, 80877103})
if != nil {
return
}
:= make([]byte, 1)
if _, = io.ReadFull(.conn, ); != nil {
return
}
if [0] != 'S' {
return errors.New("server refused TLS connection")
}
.conn = tls.Client(.conn, )
return nil
}
func ( *PgConn) ( string) ( error) {
:= &pgproto3.PasswordMessage{Password: }
_, = .conn.Write(.Encode(.wbuf))
return
}
func ( string) string {
:= md5.New()
io.WriteString(, )
return hex.EncodeToString(.Sum(nil))
}
func ( *PgConn) () chan struct{} {
if .bufferingReceive {
panic("BUG: signalMessage when already in progress")
}
.bufferingReceive = true
.bufferingReceiveMux.Lock()
:= make(chan struct{})
go func() {
.bufferingReceiveMsg, .bufferingReceiveErr = .frontend.Receive()
.bufferingReceiveMux.Unlock()
close()
}()
return
}
func ( *PgConn) ( context.Context, []byte) error {
if := .lock(); != nil {
return
}
defer .unlock()
if != context.Background() {
select {
case <-.Done():
return &contextAlreadyDoneError{err: .Err()}
default:
}
.contextWatcher.Watch()
defer .contextWatcher.Unwatch()
}
, := .conn.Write()
if != nil {
.asyncClose()
return &writeError{err: , safeToRetry: == 0}
}
return nil
}
func ( *PgConn) ( context.Context) (pgproto3.BackendMessage, error) {
if := .lock(); != nil {
return nil,
}
defer .unlock()
if != context.Background() {
select {
case <-.Done():
return nil, &contextAlreadyDoneError{err: .Err()}
default:
}
.contextWatcher.Watch()
defer .contextWatcher.Unwatch()
}
, := .receiveMessage()
if != nil {
= &pgconnError{msg: "receive message failed", err: , safeToRetry: true}
}
return ,
}
func ( *PgConn) () (pgproto3.BackendMessage, error) {
if .peekedMsg != nil {
return .peekedMsg, nil
}
var pgproto3.BackendMessage
var error
if .bufferingReceive {
.bufferingReceiveMux.Lock()
= .bufferingReceiveMsg
= .bufferingReceiveErr
.bufferingReceiveMux.Unlock()
.bufferingReceive = false
func ( *PgConn) () (pgproto3.BackendMessage, error) {
, := .peekMessage()
if , := .(net.Error); !( && .Timeout()) {
.asyncClose()
}
return nil,
}
.peekedMsg = nil
switch msg := .(type) {
case *pgproto3.ReadyForQuery:
.txStatus = .TxStatus
case *pgproto3.ParameterStatus:
.parameterStatuses[.Name] = .Value
case *pgproto3.ErrorResponse:
if .Severity == "FATAL" {
.status = connStatusClosed
.conn.Close() // Ignore error as the connection is already broken and there is already an error to return.
close(.cleanupDone)
return nil, ErrorResponseToPgError()
}
case *pgproto3.NoticeResponse:
if .config.OnNotice != nil {
.config.OnNotice(, noticeResponseToNotice())
}
case *pgproto3.NotificationResponse:
if .config.OnNotification != nil {
.config.OnNotification(, &Notification{PID: .PID, Channel: .Channel, Payload: .Payload})
}
}
return , nil
}
func ( *PgConn) ( context.Context) error {
if .status == connStatusClosed {
return nil
}
.status = connStatusClosed
defer close(.cleanupDone)
defer .conn.Close()
.contextWatcher.Unwatch()
.contextWatcher.Watch()
defer .contextWatcher.Unwatch()
}
func ( *PgConn) () {
if .status == connStatusClosed {
return
}
.status = connStatusClosed
go func() {
defer close(.cleanupDone)
defer .conn.Close()
:= time.Now().Add(time.Second * 15)
, := context.WithDeadline(context.Background(), )
defer ()
.CancelRequest()
.conn.SetDeadline()
.conn.Write([]byte{'X', 0, 0, 0, 4})
.conn.Read(make([]byte, 1))
}()
}
func ( *PgConn) () chan (struct{}) {
return .cleanupDone
}
func ( *PgConn) () bool {
return .status < connStatusIdle
}
func ( *PgConn) () bool {
return .status == connStatusBusy
}
func ( *PgConn) () error {
switch .status {
case connStatusBusy:
return &connLockError{status: "conn busy"} // This only should be possible in case of an application bug.
case connStatusClosed:
return &connLockError{status: "conn closed"}
case connStatusUninitialized:
return &connLockError{status: "conn uninitialized"}
}
.status = connStatusBusy
return nil
}
func ( *PgConn) () {
switch .status {
case connStatusBusy:
.status = connStatusIdle
case connStatusClosed:
default:
panic("BUG: cannot unlock unlocked connection") // This should only be possible if there is a bug in this package.
}
}
func ( *PgConn) ( string) string {
return .parameterStatuses[]
}
type CommandTag []byte
func ( CommandTag) () bool {
return len() >= 6 &&
[0] == 'I' &&
[1] == 'N' &&
[2] == 'S' &&
[3] == 'E' &&
[4] == 'R' &&
[5] == 'T'
}
func ( CommandTag) () bool {
return len() >= 6 &&
[0] == 'U' &&
[1] == 'P' &&
[2] == 'D' &&
[3] == 'A' &&
[4] == 'T' &&
[5] == 'E'
}
func ( CommandTag) () bool {
return len() >= 6 &&
[0] == 'D' &&
[1] == 'E' &&
[2] == 'L' &&
[3] == 'E' &&
[4] == 'T' &&
[5] == 'E'
}
func ( CommandTag) () bool {
return len() >= 6 &&
[0] == 'S' &&
[1] == 'E' &&
[2] == 'L' &&
[3] == 'E' &&
[4] == 'C' &&
[5] == 'T'
}
type StatementDescription struct {
Name string
SQL string
ParamOIDs []uint32
Fields []pgproto3.FieldDescription
}
func ( *PgConn) ( context.Context, , string, []uint32) (*StatementDescription, error) {
if := .lock(); != nil {
return nil,
}
defer .unlock()
if != context.Background() {
select {
case <-.Done():
return nil, &contextAlreadyDoneError{err: .Err()}
default:
}
.contextWatcher.Watch()
defer .contextWatcher.Unwatch()
}
:= .wbuf
= (&pgproto3.Parse{Name: , Query: , ParameterOIDs: }).Encode()
= (&pgproto3.Describe{ObjectType: 'S', Name: }).Encode()
= (&pgproto3.Sync{}).Encode()
, := .conn.Write()
if != nil {
.asyncClose()
return nil, &writeError{err: , safeToRetry: == 0}
}
:= &StatementDescription{Name: , SQL: }
var error
:
for {
, := .receiveMessage()
if != nil {
.asyncClose()
return nil,
}
switch msg := .(type) {
case *pgproto3.ParameterDescription:
.ParamOIDs = make([]uint32, len(.ParameterOIDs))
copy(.ParamOIDs, .ParameterOIDs)
case *pgproto3.RowDescription:
.Fields = make([]pgproto3.FieldDescription, len(.Fields))
copy(.Fields, .Fields)
case *pgproto3.ErrorResponse:
= ErrorResponseToPgError()
case *pgproto3.ReadyForQuery:
break
}
}
if != nil {
return nil,
}
return , nil
}
func ( *pgproto3.ErrorResponse) *PgError {
return &PgError{
Severity: .Severity,
Code: string(.Code),
Message: string(.Message),
Detail: string(.Detail),
Hint: .Hint,
Position: .Position,
InternalPosition: .InternalPosition,
InternalQuery: string(.InternalQuery),
Where: string(.Where),
SchemaName: string(.SchemaName),
TableName: string(.TableName),
ColumnName: string(.ColumnName),
DataTypeName: string(.DataTypeName),
ConstraintName: .ConstraintName,
File: string(.File),
Line: .Line,
Routine: string(.Routine),
}
}
func ( *pgproto3.NoticeResponse) *Notice {
:= ErrorResponseToPgError((*pgproto3.ErrorResponse)())
return (*Notice)()
}
:= .conn.RemoteAddr()
, := .config.DialFunc(, .Network(), .String())
if != nil {
return
}
defer .Close()
if != context.Background() {
:= ctxwatch.NewContextWatcher(
func() { .SetDeadline(time.Date(1, 1, 1, 1, 1, 1, 1, time.UTC)) },
func() { .SetDeadline(time.Time{}) },
)
.Watch()
defer .Unwatch()
}
:= make([]byte, 16)
binary.BigEndian.PutUint32([0:4], 16)
binary.BigEndian.PutUint32([4:8], 80877102)
binary.BigEndian.PutUint32([8:12], uint32(.pid))
binary.BigEndian.PutUint32([12:16], uint32(.secretKey))
_, = .Write()
if != nil {
return
}
_, = .Read()
if != io.EOF {
return
}
return nil
}
func ( *PgConn) ( context.Context) error {
if := .lock(); != nil {
return
}
defer .unlock()
if != context.Background() {
select {
case <-.Done():
return .Err()
default:
}
.contextWatcher.Watch()
defer .contextWatcher.Unwatch()
}
for {
, := .receiveMessage()
if != nil {
return
}
switch .(type) {
case *pgproto3.NotificationResponse:
return nil
}
}
}
func ( *PgConn) ( context.Context, string) *MultiResultReader {
if := .lock(); != nil {
return &MultiResultReader{
closed: true,
err: ,
}
}
.multiResultReader = MultiResultReader{
pgConn: ,
ctx: ,
}
:= &.multiResultReader
if != context.Background() {
select {
case <-.Done():
.closed = true
.err = &contextAlreadyDoneError{err: .Err()}
.unlock()
return
default:
}
.contextWatcher.Watch()
}
:= .wbuf
= (&pgproto3.Query{String: }).Encode()
, := .conn.Write()
if != nil {
.asyncClose()
.contextWatcher.Unwatch()
.closed = true
.err = &writeError{err: , safeToRetry: == 0}
.unlock()
return
}
return
}
func ( *PgConn) ( context.Context) *MultiResultReader {
if := .lock(); != nil {
return &MultiResultReader{
closed: true,
err: ,
}
}
.multiResultReader = MultiResultReader{
pgConn: ,
ctx: ,
}
:= &.multiResultReader
if != context.Background() {
select {
case <-.Done():
.closed = true
.err = &contextAlreadyDoneError{err: .Err()}
.unlock()
return
default:
}
.contextWatcher.Watch()
}
return
}
func ( *PgConn) ( context.Context, string, [][]byte, []uint32, []int16, []int16) *ResultReader {
:= .execExtendedPrefix(, )
if .closed {
return
}
:= .wbuf
= (&pgproto3.Parse{Query: , ParameterOIDs: }).Encode()
= (&pgproto3.Bind{ParameterFormatCodes: , Parameters: , ResultFormatCodes: }).Encode()
.execExtendedSuffix(, )
return
}
func ( *PgConn) ( context.Context, string, [][]byte, []int16, []int16) *ResultReader {
:= .execExtendedPrefix(, )
if .closed {
return
}
:= .wbuf
= (&pgproto3.Bind{PreparedStatement: , ParameterFormatCodes: , Parameters: , ResultFormatCodes: }).Encode()
.execExtendedSuffix(, )
return
}
func ( *PgConn) ( context.Context, [][]byte) *ResultReader {
.resultReader = ResultReader{
pgConn: ,
ctx: ,
}
:= &.resultReader
if := .lock(); != nil {
.concludeCommand(nil, )
.closed = true
return
}
if len() > math.MaxUint16 {
.concludeCommand(nil, fmt.Errorf("extended protocol limited to %v parameters", math.MaxUint16))
.closed = true
.unlock()
return
}
if != context.Background() {
select {
case <-.Done():
.concludeCommand(nil, &contextAlreadyDoneError{err: .Err()})
.closed = true
.unlock()
return
default:
}
.contextWatcher.Watch()
}
return
}
func ( *PgConn) ( []byte, *ResultReader) {
= (&pgproto3.Describe{ObjectType: 'P'}).Encode()
= (&pgproto3.Execute{}).Encode()
= (&pgproto3.Sync{}).Encode()
, := .conn.Write()
if != nil {
.asyncClose()
.concludeCommand(nil, &writeError{err: , safeToRetry: == 0})
.contextWatcher.Unwatch()
.closed = true
.unlock()
return
}
.readUntilRowDescription()
}
func ( *PgConn) ( context.Context, io.Writer, string) (CommandTag, error) {
if := .lock(); != nil {
return nil,
}
if != context.Background() {
select {
case <-.Done():
.unlock()
return nil, &contextAlreadyDoneError{err: .Err()}
default:
}
.contextWatcher.Watch()
defer .contextWatcher.Unwatch()
}
:= .wbuf
= (&pgproto3.Query{String: }).Encode()
, := .conn.Write()
if != nil {
.asyncClose()
.unlock()
return nil, &writeError{err: , safeToRetry: == 0}
}
var CommandTag
var error
for {
, := .receiveMessage()
if != nil {
.asyncClose()
return nil,
}
switch msg := .(type) {
case *pgproto3.CopyDone:
case *pgproto3.CopyData:
, := .Write(.Data)
if != nil {
.asyncClose()
return nil,
}
case *pgproto3.ReadyForQuery:
.unlock()
return ,
case *pgproto3.CommandComplete:
= CommandTag(.CommandTag)
case *pgproto3.ErrorResponse:
= ErrorResponseToPgError()
}
}
}
func ( *PgConn) ( context.Context, io.Reader, string) (CommandTag, error) {
if := .lock(); != nil {
return nil,
}
defer .unlock()
if != context.Background() {
select {
case <-.Done():
return nil, &contextAlreadyDoneError{err: .Err()}
default:
}
.contextWatcher.Watch()
defer .contextWatcher.Unwatch()
}
:= .wbuf
= (&pgproto3.Query{String: }).Encode()
, := .conn.Write()
if != nil {
.asyncClose()
return nil, &writeError{err: , safeToRetry: == 0}
}
var CommandTag
var error
:= true
for {
, := .receiveMessage()
if != nil {
.asyncClose()
return nil,
}
switch msg := .(type) {
case *pgproto3.CopyInResponse:
= false
case *pgproto3.ErrorResponse:
= ErrorResponseToPgError()
case *pgproto3.ReadyForQuery:
return ,
}
}
.conn.Close()
<-
return
}
}
if != nil {
<-
return
}
select {
case <-:
return
default:
}
}
}()
var error
for == nil && == nil {
select {
case = <-:
case <-:
, := .receiveMessage()
if != nil {
.asyncClose()
return nil,
}
switch msg := .(type) {
case *pgproto3.ErrorResponse:
= ErrorResponseToPgError()
default:
= .signalMessage()
}
}
}
close()
= [:0]
if == io.EOF || != nil {
:= &pgproto3.CopyDone{}
= .Encode()
} else {
:= &pgproto3.CopyFail{Message: .Error()}
= .Encode()
}
_, = .conn.Write()
if != nil {
.asyncClose()
return nil,
}
for {
, := .receiveMessage()
if != nil {
.asyncClose()
return nil,
}
switch msg := .(type) {
case *pgproto3.ReadyForQuery:
return ,
case *pgproto3.CommandComplete:
= CommandTag(.CommandTag)
case *pgproto3.ErrorResponse:
= ErrorResponseToPgError()
}
}
}
func ( *MultiResultReader) () ([]*Result, error) {
var []*Result
for .NextResult() {
= append(, .ResultReader().Read())
}
:= .Close()
return ,
}
func ( *MultiResultReader) () (pgproto3.BackendMessage, error) {
, := .pgConn.receiveMessage()
if != nil {
.pgConn.contextWatcher.Unwatch()
.err =
.closed = true
.pgConn.asyncClose()
return nil, .err
}
switch msg := .(type) {
case *pgproto3.ReadyForQuery:
.pgConn.contextWatcher.Unwatch()
.closed = true
.pgConn.unlock()
case *pgproto3.ErrorResponse:
.err = ErrorResponseToPgError()
}
return , nil
}
func ( *MultiResultReader) () bool {
for !.closed && .err == nil {
, := .receiveMessage()
if != nil {
return false
}
switch msg := .(type) {
case *pgproto3.RowDescription:
.pgConn.resultReader = ResultReader{
pgConn: .pgConn,
multiResultReader: ,
ctx: .ctx,
fieldDescriptions: .Fields,
}
.rr = &.pgConn.resultReader
return true
case *pgproto3.CommandComplete:
.pgConn.resultReader = ResultReader{
commandTag: CommandTag(.CommandTag),
commandConcluded: true,
closed: true,
}
.rr = &.pgConn.resultReader
return true
case *pgproto3.EmptyQueryResponse:
return false
}
}
return false
}
func ( *MultiResultReader) () *ResultReader {
return .rr
}
func ( *MultiResultReader) () error {
for !.closed {
, := .receiveMessage()
if != nil {
return .err
}
}
return .err
}
type Result struct {
FieldDescriptions []pgproto3.FieldDescription
Rows [][][]byte
CommandTag CommandTag
Err error
}
func ( *ResultReader) () *Result {
:= &Result{}
for .NextRow() {
if .FieldDescriptions == nil {
.FieldDescriptions = make([]pgproto3.FieldDescription, len(.FieldDescriptions()))
copy(.FieldDescriptions, .FieldDescriptions())
}
:= make([][]byte, len(.Values()))
copy(, .Values())
.Rows = append(.Rows, )
}
.CommandTag, .Err = .Close()
return
}
func ( *ResultReader) () bool {
for !.commandConcluded {
, := .receiveMessage()
if != nil {
return false
}
switch msg := .(type) {
case *pgproto3.DataRow:
.rowValues = .Values
return true
}
}
return false
}
func ( *ResultReader) () []pgproto3.FieldDescription {
return .fieldDescriptions
}
func ( *ResultReader) () [][]byte {
return .rowValues
}
func ( *ResultReader) () (CommandTag, error) {
if .closed {
return .commandTag, .err
}
.closed = true
for !.commandConcluded {
, := .receiveMessage()
if != nil {
return nil, .err
}
}
if .multiResultReader == nil {
for {
, := .receiveMessage()
if != nil {
return nil, .err
}
case *pgproto3.ErrorResponse:
.err = ErrorResponseToPgError()
case *pgproto3.ReadyForQuery:
.pgConn.contextWatcher.Unwatch()
.pgConn.unlock()
return .commandTag, .err
}
}
}
return .commandTag, .err
}
func ( *ResultReader) () {
, := .pgConn.peekMessage()
if , := .(*pgproto3.DataRow); {
return
}
, _ = .receiveMessage()
if , := .(*pgproto3.RowDescription); {
return
}
}
}
func ( *ResultReader) () ( pgproto3.BackendMessage, error) {
if .multiResultReader == nil {
, = .pgConn.receiveMessage()
} else {
, = .multiResultReader.receiveMessage()
}
if != nil {
.concludeCommand(nil, )
.pgConn.contextWatcher.Unwatch()
.closed = true
if .multiResultReader == nil {
.pgConn.asyncClose()
}
return nil, .err
}
switch msg := .(type) {
case *pgproto3.RowDescription:
.fieldDescriptions = .Fields
case *pgproto3.CommandComplete:
.concludeCommand(CommandTag(.CommandTag), nil)
case *pgproto3.EmptyQueryResponse:
.concludeCommand(nil, nil)
case *pgproto3.ErrorResponse:
.concludeCommand(nil, ErrorResponseToPgError())
}
return , nil
}
if != nil && .err == nil {
.err =
}
if .commandConcluded {
return
}
.commandTag =
.rowValues = nil
.commandConcluded = true
}
func ( *Batch) ( string, [][]byte, []int16, []int16) {
.buf = (&pgproto3.Bind{PreparedStatement: , ParameterFormatCodes: , Parameters: , ResultFormatCodes: }).Encode(.buf)
.buf = (&pgproto3.Describe{ObjectType: 'P'}).Encode(.buf)
.buf = (&pgproto3.Execute{}).Encode(.buf)
}
func ( *PgConn) ( context.Context, *Batch) *MultiResultReader {
if := .lock(); != nil {
return &MultiResultReader{
closed: true,
err: ,
}
}
.multiResultReader = MultiResultReader{
pgConn: ,
ctx: ,
}
:= &.multiResultReader
if != context.Background() {
select {
case <-.Done():
.closed = true
.err = &contextAlreadyDoneError{err: .Err()}
.unlock()
return
default:
}
.contextWatcher.Watch()
}
.buf = (&pgproto3.Sync{}).Encode(.buf)
func ( *PgConn) ( string) (string, error) {
if .ParameterStatus("standard_conforming_strings") != "on" {
return "", errors.New("EscapeString must be run with standard_conforming_strings=on")
}
if .ParameterStatus("client_encoding") != "UTF8" {
return "", errors.New("EscapeString must be run with client_encoding=UTF8")
}
return strings.Replace(, "'", "''", -1), nil
}
type HijackedConn struct {
Conn net.Conn // the underlying TCP or unix domain socket connection
PID uint32 // backend pid
SecretKey uint32 // key to use to send a cancel query message to the server
ParameterStatuses map[string]string // parameters that have been reported by the server
TxStatus byte
Frontend Frontend
Config *Config
}
func ( *PgConn) () (*HijackedConn, error) {
if := .lock(); != nil {
return nil,
}
.status = connStatusClosed
return &HijackedConn{
Conn: .conn,
PID: .pid,
SecretKey: .secretKey,
ParameterStatuses: .parameterStatuses,
TxStatus: .txStatus,
Frontend: .frontend,
Config: .config,
}, nil
}
func ( *HijackedConn) (*PgConn, error) {
:= &PgConn{
conn: .Conn,
pid: .PID,
secretKey: .SecretKey,
parameterStatuses: .ParameterStatuses,
txStatus: .TxStatus,
frontend: .Frontend,
config: .Config,
status: connStatusIdle,
wbuf: make([]byte, 0, wbufLen),
cleanupDone: make(chan struct{}),
}
.contextWatcher = ctxwatch.NewContextWatcher(
func() { .conn.SetDeadline(time.Date(1, 1, 1, 1, 1, 1, 1, time.UTC)) },
func() { .conn.SetDeadline(time.Time{}) },
)
return , nil
![]() |
The pages are generated with Golds v0.3.2-preview. (GOOS=darwin GOARCH=amd64) Golds is a Go 101 project developed by Tapir Liu. PR and bug reports are welcome and can be submitted to the issue list. Please follow @Go100and1 (reachable from the left QR code) to get the latest news of Golds. |