Source File
conn.go
Belonging Package
github.com/jackc/pgx/v4
package pgx
import (
)
PreferSimpleProtocol bool
createdByParseConfig bool // Used to enforce created by ParseConfig rule.
}
func ( *ConnConfig) () *ConnConfig {
:= new(ConnConfig)
* = *
.Config = *.Config.Copy()
return
}
func ( *ConnConfig) () string { return .connString }
type BuildStatementCacheFunc func(conn *pgconn.PgConn) stmtcache.Cache
type Conn struct {
pgConn *pgconn.PgConn
config *ConnConfig // config used when establishing this connection
preparedStatements map[string]*pgconn.StatementDescription
stmtcache stmtcache.Cache
logger Logger
logLevel LogLevel
notifications []*pgconn.Notification
doneChan chan struct{}
closedChan chan error
connInfo *pgtype.ConnInfo
wbuf []byte
preallocatedRows []connRows
eqb extendedQueryBuilder
}
type Identifier []string
func ( Identifier) () string {
:= make([]string, len())
for := range {
:= strings.ReplaceAll([], string([]byte{0}), "")
[] = `"` + strings.ReplaceAll(, `"`, `""`) + `"`
}
return strings.Join(, ".")
}
var ErrInvalidLogLevel = errors.New("invalid log level")
func ( string) (*ConnConfig, error) {
, := pgconn.ParseConfig()
if != nil {
return nil,
}
var BuildStatementCacheFunc
:= 512
:= stmtcache.ModePrepare
if , := .RuntimeParams["statement_cache_capacity"]; {
delete(.RuntimeParams, "statement_cache_capacity")
, := strconv.ParseInt(, 10, 32)
if != nil {
return nil, fmt.Errorf("cannot parse statement_cache_capacity: %w", )
}
= int()
}
if , := .RuntimeParams["statement_cache_mode"]; {
delete(.RuntimeParams, "statement_cache_mode")
switch {
case "prepare":
= stmtcache.ModePrepare
case "describe":
= stmtcache.ModeDescribe
default:
return nil, fmt.Errorf("invalid statement_cache_mod: %s", )
}
}
if > 0 {
= func( *pgconn.PgConn) stmtcache.Cache {
return stmtcache.New(, , )
}
}
:= false
if , := .RuntimeParams["prefer_simple_protocol"]; {
delete(.RuntimeParams, "prefer_simple_protocol")
if , := strconv.ParseBool(); == nil {
=
} else {
return nil, fmt.Errorf("invalid prefer_simple_protocol: %v", )
}
}
:= &ConnConfig{
Config: *,
createdByParseConfig: true,
LogLevel: LogLevelInfo,
BuildStatementCache: ,
PreferSimpleProtocol: ,
connString: ,
}
return , nil
}
if !.createdByParseConfig {
panic("config must be created by ParseConfig")
}
:=
if .Config.OnNotification == nil {
.Config.OnNotification = .bufferNotifications
} else {
if .shouldLog(LogLevelDebug) {
.log(, LogLevelDebug, "pgx notification handler disabled by application supplied OnNotification", map[string]interface{}{"host": .Config.Host})
}
}
if .shouldLog(LogLevelInfo) {
.log(, LogLevelInfo, "Dialing PostgreSQL server", map[string]interface{}{"host": .Config.Host})
}
.pgConn, = pgconn.ConnectConfig(, &.Config)
if != nil {
if .shouldLog(LogLevelError) {
.log(, LogLevelError, "connect failed", map[string]interface{}{"err": })
}
return nil,
}
.preparedStatements = make(map[string]*pgconn.StatementDescription)
.doneChan = make(chan struct{})
.closedChan = make(chan error)
.wbuf = make([]byte, 0, 1024)
if .config.BuildStatementCache != nil {
.stmtcache = .config.BuildStatementCache(.pgConn)
}
if , := .Config.RuntimeParams["replication"]; {
return , nil
}
return , nil
}
func ( *Conn) ( context.Context, , string) ( *pgconn.StatementDescription, error) {
if != "" {
var bool
if , = .preparedStatements[]; && .SQL == {
return , nil
}
}
if .shouldLog(LogLevelError) {
defer func() {
if != nil {
.log(, LogLevelError, "Prepare failed", map[string]interface{}{"err": , "name": , "sql": })
}
}()
}
, = .pgConn.Prepare(, , , nil)
if != nil {
return nil,
}
if != "" {
.preparedStatements[] =
}
return , nil
}
func ( *Conn) ( context.Context, string) error {
delete(.preparedStatements, )
, := .pgConn.Exec(, "deallocate "+quoteIdentifier()).ReadAll()
return
}
func ( *Conn) ( *pgconn.PgConn, *pgconn.Notification) {
.notifications = append(.notifications, )
}
func ( *Conn) ( context.Context) (*pgconn.Notification, error) {
var *pgconn.Notification
if len(.notifications) > 0 {
= .notifications[0]
.notifications = .notifications[1:]
return , nil
}
:= .pgConn.WaitForNotification()
if len(.notifications) > 0 {
= .notifications[0]
.notifications = .notifications[1:]
}
return ,
}
func ( *Conn) () bool {
return .pgConn.IsClosed()
}
func ( *Conn) ( error) {
if .IsClosed() {
return
}
, := context.WithCancel(context.Background())
() // force immediate hard cancel
.pgConn.Close()
}
func ( *Conn) ( LogLevel) bool {
return .logger != nil && .logLevel >=
}
func ( *Conn) ( context.Context, LogLevel, string, map[string]interface{}) {
if == nil {
= map[string]interface{}{}
}
if .pgConn != nil && .pgConn.PID() != 0 {
["pid"] = .pgConn.PID()
}
.logger.Log(, , , )
}
func ( string) string {
return `"` + strings.ReplaceAll(, `"`, `""`) + `"`
}
func ( *Conn) ( context.Context) error {
, := .Exec(, ";")
return
}
func ( Rows, error) (map[string]uint32, error) {
if != nil {
return nil,
}
defer .Close()
:= make(map[string]uint32, 256)
for .Next() {
var uint32
var pgtype.Text
if = .Scan(&, &); != nil {
return nil,
}
[.String] =
}
if = .Err(); != nil {
return nil,
}
return ,
}
func ( *Conn) () *ConnConfig { return .config.Copy() }
func ( *Conn) ( context.Context, string, ...interface{}) (pgconn.CommandTag, error) {
:= time.Now()
, := .exec(, , ...)
if != nil {
if .shouldLog(LogLevelError) {
.log(, LogLevelError, "Exec", map[string]interface{}{"sql": , "args": logQueryArgs(), "err": })
}
return ,
}
if .shouldLog(LogLevelInfo) {
:= time.Now()
.log(, LogLevelInfo, "Exec", map[string]interface{}{"sql": , "args": logQueryArgs(), "time": .Sub(), "commandTag": })
}
return ,
}
func ( *Conn) ( context.Context, string, ...interface{}) ( pgconn.CommandTag, error) {
:= .config.PreferSimpleProtocol
:
for len() > 0 {
switch arg := [0].(type) {
case QuerySimpleProtocol:
= bool()
= [1:]
default:
break
}
}
if , := .preparedStatements[]; {
return .execPrepared(, , )
}
if {
return .execSimpleProtocol(, , )
}
if len() == 0 {
return .execSimpleProtocol(, , )
}
if .stmtcache != nil {
, := .stmtcache.Get(, )
if != nil {
return nil,
}
if .stmtcache.Mode() == stmtcache.ModeDescribe {
return .execParams(, , )
}
return .execPrepared(, , )
}
, := .Prepare(, "", )
if != nil {
return nil,
}
return .execPrepared(, , )
}
func ( *Conn) ( context.Context, string, []interface{}) ( pgconn.CommandTag, error) {
if len() > 0 {
, = .sanitizeForSimpleQuery(, ...)
if != nil {
return nil,
}
}
:= .pgConn.Exec(, )
for .NextResult() {
, = .ResultReader().Close()
}
= .Close()
return ,
}
func ( *Conn) ( *pgconn.StatementDescription, []interface{}) error {
if len(.ParamOIDs) != len() {
return fmt.Errorf("expected %d arguments, got %d", len(.ParamOIDs), len())
}
.eqb.Reset()
, := convertDriverValuers()
if != nil {
return
}
for := range {
= .eqb.AppendParam(.connInfo, .ParamOIDs[], [])
if != nil {
return
}
}
for := range .Fields {
.eqb.AppendResultFormat(.ConnInfo().ResultFormatCodeForOID(.Fields[].DataTypeOID))
}
return nil
}
func ( *Conn) ( context.Context, *pgconn.StatementDescription, []interface{}) (pgconn.CommandTag, error) {
:= .execParamsAndPreparedPrefix(, )
if != nil {
return nil,
}
:= .pgConn.ExecParams(, .SQL, .eqb.paramValues, .ParamOIDs, .eqb.paramFormats, .eqb.resultFormats).Read()
return .CommandTag, .Err
}
func ( *Conn) ( context.Context, *pgconn.StatementDescription, []interface{}) (pgconn.CommandTag, error) {
:= .execParamsAndPreparedPrefix(, )
if != nil {
return nil,
}
:= .pgConn.ExecPrepared(, .Name, .eqb.paramValues, .eqb.paramFormats, .eqb.resultFormats).Read()
return .CommandTag, .Err
}
func ( *Conn) ( context.Context, string, []interface{}) *connRows {
if len(.preallocatedRows) == 0 {
.preallocatedRows = make([]connRows, 64)
}
:= &.preallocatedRows[len(.preallocatedRows)-1]
.preallocatedRows = .preallocatedRows[0 : len(.preallocatedRows)-1]
.ctx =
.logger =
.connInfo = .connInfo
.startTime = time.Now()
.sql =
.args =
.conn =
return
}
type QuerySimpleProtocol bool
type QueryResultFormats []int16
type QueryResultFormatsByOID map[uint32]int16
func ( *Conn) ( context.Context, string, ...interface{}) (Rows, error) {
var QueryResultFormats
var QueryResultFormatsByOID
:= .config.PreferSimpleProtocol
:
for len() > 0 {
switch arg := [0].(type) {
case QueryResultFormats:
=
= [1:]
case QueryResultFormatsByOID:
=
= [1:]
case QuerySimpleProtocol:
= bool()
= [1:]
default:
break
}
}
:= .getRows(, , )
var error
, := .preparedStatements[]
if && ! {
, = .sanitizeForSimpleQuery(, ...)
if != nil {
.fatal()
return ,
}
:= .pgConn.Exec(, )
if .NextResult() {
.resultReader = .ResultReader()
.multiResultReader =
} else {
= .Close()
.fatal()
return ,
}
return , nil
}
.eqb.Reset()
if ! {
if .stmtcache != nil {
, = .stmtcache.Get(, )
if != nil {
.fatal()
return , .err
}
} else {
, = .pgConn.Prepare(, "", , nil)
if != nil {
.fatal()
return , .err
}
}
}
if len(.ParamOIDs) != len() {
.fatal(fmt.Errorf("expected %d arguments, got %d", len(.ParamOIDs), len()))
return , .err
}
.sql = .SQL
, = convertDriverValuers()
if != nil {
.fatal()
return , .err
}
for := range {
= .eqb.AppendParam(.connInfo, .ParamOIDs[], [])
if != nil {
.fatal()
return , .err
}
}
if != nil {
= make([]int16, len(.Fields))
for := range {
[] = [uint32(.Fields[].DataTypeOID)]
}
}
if == nil {
for := range .Fields {
.eqb.AppendResultFormat(.ConnInfo().ResultFormatCodeForOID(.Fields[].DataTypeOID))
}
= .eqb.resultFormats
}
if .stmtcache != nil && .stmtcache.Mode() == stmtcache.ModeDescribe {
.resultReader = .pgConn.ExecParams(, , .eqb.paramValues, .ParamOIDs, .eqb.paramFormats, )
} else {
.resultReader = .pgConn.ExecPrepared(, .Name, .eqb.paramValues, .eqb.paramFormats, )
}
return , .err
}
type QueryFuncRow interface {
FieldDescriptions() []pgproto3.FieldDescription
RawValues() [][]byte
}
func ( *Conn) ( context.Context, string, []interface{}, []interface{}, func(QueryFuncRow) error) (pgconn.CommandTag, error) {
, := .Query(, , ...)
if != nil {
return nil,
}
defer .Close()
for .Next() {
= .Scan(...)
if != nil {
return nil,
}
= ()
if != nil {
return nil,
}
}
if := .Err(); != nil {
return nil,
}
return .CommandTag(), nil
}
func ( *Conn) ( context.Context, *Batch) BatchResults {
:= .config.PreferSimpleProtocol
var strings.Builder
if {
for , := range .items {
if > 0 {
.WriteByte(';')
}
, := .sanitizeForSimpleQuery(.query, .arguments...)
if != nil {
return &batchResults{ctx: , conn: , err: }
}
.WriteString()
}
:= .pgConn.Exec(, .String())
return &batchResults{
ctx: ,
conn: ,
mrr: ,
b: ,
ix: 0,
}
}
:= map[string]struct{}{}
for , := range .items {
if , := .preparedStatements[.query]; {
continue
}
[.query] = struct{}{}
}
var stmtcache.Cache
if len() > 0 {
if .stmtcache != nil && .stmtcache.Cap() >= len() {
= .stmtcache
} else {
= stmtcache.New(.pgConn, stmtcache.ModeDescribe, len())
}
for , := range {
, := .Get(, )
if != nil {
return &batchResults{ctx: , conn: , err: }
}
}
}
:= &pgconn.Batch{}
for , := range .items {
.eqb.Reset()
:= .preparedStatements[.query]
if == nil {
var error
, = .Get(, .query)
panic("BUG: unexpected error from stmtCache")
}
}
if len(.ParamOIDs) != len(.arguments) {
return &batchResults{ctx: , conn: , err: fmt.Errorf("mismatched param and argument count")}
}
, := convertDriverValuers(.arguments)
if != nil {
return &batchResults{ctx: , conn: , err: }
}
for := range {
= .eqb.AppendParam(.connInfo, .ParamOIDs[], [])
if != nil {
return &batchResults{ctx: , conn: , err: }
}
}
for := range .Fields {
.eqb.AppendResultFormat(.ConnInfo().ResultFormatCodeForOID(.Fields[].DataTypeOID))
}
if .Name == "" {
.ExecParams(.query, .eqb.paramValues, .ParamOIDs, .eqb.paramFormats, .eqb.resultFormats)
} else {
.ExecPrepared(.Name, .eqb.paramValues, .eqb.paramFormats, .eqb.resultFormats)
}
}
:= .pgConn.ExecBatch(, )
return &batchResults{
ctx: ,
conn: ,
mrr: ,
b: ,
ix: 0,
}
}
func ( *Conn) ( string, ...interface{}) (string, error) {
if .pgConn.ParameterStatus("standard_conforming_strings") != "on" {
return "", errors.New("simple protocol queries must be run with standard_conforming_strings=on")
}
if .pgConn.ParameterStatus("client_encoding") != "UTF8" {
return "", errors.New("simple protocol queries must be run with client_encoding=UTF8")
}
var error
:= make([]interface{}, len())
for , := range {
[], = convertSimpleArgument(.connInfo, )
if != nil {
return "",
}
}
return sanitize.SanitizeSQL(, ...)
![]() |
The pages are generated with Golds v0.3.2-preview. (GOOS=darwin GOARCH=amd64) Golds is a Go 101 project developed by Tapir Liu. PR and bug reports are welcome and can be submitted to the issue list. Please follow @Go100and1 (reachable from the left QR code) to get the latest news of Golds. |