Source File
database.go
Belonging Package
golang.org/x/pkgsite/internal/database
package database
import (
)
func ( *sql.DB, string) *DB {
return &DB{db: , instanceID: }
}
func ( *DB) () error {
return .db.Ping()
}
func ( *DB) () bool {
return .tx != nil
}
func ( *DB) () bool {
return .tx != nil && isRetryable(.opts.Isolation)
}
var passwordRegexp = regexp.MustCompile(`password=\S+`)
func ( string) string {
return passwordRegexp.ReplaceAllLiteralString(, "password=REDACTED")
}
func ( *DB) ( context.Context, string, ...interface{}) ( int64, error) {
defer logQuery(, , , .instanceID, .IsRetryable())(&)
, := .execResult(, , ...)
if != nil {
return 0,
}
, := .RowsAffected()
if != nil {
return 0, fmt.Errorf("RowsAffected: %v", )
}
return , nil
}
func ( *DB) ( context.Context, string, ...interface{}) ( *sql.Rows, error) {
defer logQuery(, , , .instanceID, .IsRetryable())(&)
if .tx != nil {
return .tx.QueryContext(, , ...)
}
return .db.QueryContext(, , ...)
}
func ( *DB) ( context.Context, string, ...interface{}) *sql.Row {
defer logQuery(, , , .instanceID, .IsRetryable())(nil)
:= time.Now()
defer func() {
if .Err() != nil {
, := .Deadline()
:= fmt.Sprintf("args=%v; elapsed=%q, start=%q, deadline=%q", , time.Since(), , )
log.Errorf(, "QueryRow context error: %v "+, .Err())
}
}()
if .tx != nil {
return .tx.QueryRowContext(, , ...)
}
return .db.QueryRowContext(, , ...)
}
func ( *DB) ( context.Context, string) (*sql.Stmt, error) {
defer logQuery(, "preparing "+, nil, .instanceID, .IsRetryable())
if .tx != nil {
return .tx.PrepareContext(, )
}
return .db.PrepareContext(, )
}
:= &sql.TxOptions{Isolation: }
if isRetryable() {
return .transactWithRetry(, , )
}
return .transact(, , )
}
func ( sql.IsolationLevel) bool {
return == sql.LevelRepeatableRead || == sql.LevelSerializable
}
const = 30
for := 0; <= ; ++ {
= .transact(, , )
if isSerializationFailure() {
.mu.Lock()
if > .maxRetries {
.maxRetries =
}
.mu.Unlock()
log.Debugf(, "serialization failure; retrying")
continue
}
if != nil {
log.Debugf(, "transactWithRetry: error type %T: %[1]v", )
if strings.Contains(.Error(), serializationFailureCode) {
return fmt.Errorf("error text has %q but not recognized as serialization failure: type %T, err %v",
serializationFailureCode, , )
}
}
if > 0 {
log.Debugf(, "retried serializable transaction %d time(s)", )
}
return
}
return fmt.Errorf("reached max number of tries due to serialization failure (%d)", )
}
var *pq.Error
if errors.As(, &) && .Code == serializationFailureCode {
return true
}
var *pgconn.PgError
if errors.As(, &) && .Code == serializationFailureCode {
return true
}
return false
}
func ( *DB) ( context.Context, *sql.TxOptions, func(*DB) error) ( error) {
if .InTransaction() {
return errors.New("a DB Transact function was called on a DB already in a transaction")
}
, := .db.Conn()
if != nil {
return
}
defer .Close()
, := .BeginTx(, )
if != nil {
return fmt.Errorf("conn.BeginTx(): %w", )
}
defer func() {
if := recover(); != nil {
.Rollback()
panic()
} else if != nil {
.Rollback()
} else {
if := .Commit(); != nil {
= fmt.Errorf("tx.Commit(): %w", )
}
}
}()
:= New(.db, .instanceID)
.tx =
.conn =
.opts = *
defer .logTransaction()(&)
if := (); != nil {
return fmt.Errorf("txFunc(tx): %w", )
}
return nil
}
func ( *DB) () int {
.mu.Lock()
defer .mu.Unlock()
return .maxRetries
}
const OnConflictDoNothing = "ON CONFLICT DO NOTHING"
func ( *DB) ( context.Context, string, []string, []interface{}, string, []string, func(*sql.Rows) error) ( error) {
defer derrors.Wrap(&, "DB.BulkInsertReturning(ctx, %q, %v, [%d values], %q, %v, scanFunc)",
, , len(), , )
if == nil || == nil {
return errors.New("need returningColumns and scan function")
}
return .bulkInsert(, , , , , , )
}
func ( *DB) ( context.Context, string, []string, []interface{}, []string) error {
:= buildUpsertConflictAction(, )
return .BulkInsert(, , , , )
}
func ( *DB) ( context.Context, string, []string, []interface{}, , []string, func(*sql.Rows) error) error {
:= buildUpsertConflictAction(, )
return .BulkInsertReturning(, , , , , , )
}
func ( *DB) ( context.Context, string, , []string, []interface{}, string, func(*sql.Rows) error) ( error) {
if := len() % len(); != 0 {
return fmt.Errorf("modulus of len(values) and len(columns) must be 0: got %d", )
}
return fmt.Errorf("too many columns to insert: %d", len())
}
:= func( int) (*sql.Stmt, error) {
return .Prepare(, buildInsertQuery(, , , , ))
}
var *sql.Stmt
for := 0; < len(); += {
:= +
if <= len() && == nil {
, = ()
if != nil {
return
}
defer .Close()
} else if > len() {
= len()
, = ( - )
if != nil {
return
}
defer .Close()
}
:= [:]
var error
if == nil {
_, = .ExecContext(, ...)
} else {
var *sql.Rows
, = .QueryContext(, ...)
if != nil {
return
}
= processRows(, )
}
if != nil {
return fmt.Errorf("running bulk insert query, values[%d:%d]): %w", , , )
}
}
return nil
}
if == {
break
}
.WriteString(", ")
}
if != "" {
.WriteString(" " + )
}
if len() > 0 {
fmt.Fprintf(&, " RETURNING %s", strings.Join(, ", "))
}
return .String()
}
func (, []string) string {
var []string
for , := range {
= append(, fmt.Sprintf("%s=excluded.%[1]s", ))
}
return fmt.Sprintf("ON CONFLICT (%s) DO UPDATE SET %s",
strings.Join(, ", "),
strings.Join(, ", "))
}
var maxBulkUpdateArrayLen = 10000
func ( *DB) ( context.Context, string, , []string, [][]interface{}) ( error) {
defer derrors.Wrap(&, "DB.BulkUpdate(ctx, tx, %q, %v, [%d values])",
, , len())
if len() < 2 {
return errors.New("need at least two columns")
}
if len() != len() {
return errors.New("len(values) != len(columns)")
}
:= len([0])
for , := range [1:] {
if len() != {
return errors.New("all values slices must be the same length")
}
}
:= buildBulkUpdateQuery(, , )
for := 0; < ; += maxBulkUpdateArrayLen {
:= + maxBulkUpdateArrayLen
if > {
=
}
var []interface{}
for , := range {
= append(, pq.Array([:]))
}
if , := .Exec(, , ...); != nil {
return fmt.Errorf("db.Exec(%q, values[%d:%d]): %w", , , , )
}
}
return nil
}
func ( string, , []string) string {
type emptyStringScanner struct {
ptr *string
}
func ( emptyStringScanner) ( interface{}) error {
var sql.NullString
if := .Scan(); != nil {
return
}
*.ptr = .String
return nil
}
func ( *string) sql.Scanner {
return emptyStringScanner{}
![]() |
The pages are generated with Golds v0.3.2-preview. (GOOS=darwin GOARCH=amd64) Golds is a Go 101 project developed by Tapir Liu. PR and bug reports are welcome and can be submitted to the issue list. Please follow @Go100and1 (reachable from the left QR code) to get the latest news of Golds. |