Source File
copy_from.go
Belonging Package
github.com/jackc/pgx/v4
package pgx
import (
)
func ( [][]interface{}) CopyFromSource {
return ©FromRows{rows: , idx: -1}
}
type copyFromRows struct {
rows [][]interface{}
idx int
}
func ( *copyFromRows) () bool {
.idx++
return .idx < len(.rows)
}
func ( *copyFromRows) () ([]interface{}, error) {
return .rows[.idx], nil
}
func ( *copyFromRows) () error {
return nil
}
func ( int, func(int) ([]interface{}, error)) CopyFromSource {
return ©FromSlice{next: , idx: -1, len: }
}
type copyFromSlice struct {
next func(int) ([]interface{}, error)
idx int
len int
err error
}
func ( *copyFromSlice) () bool {
.idx++
return .idx < .len
}
func ( *copyFromSlice) () ([]interface{}, error) {
, := .next(.idx)
if != nil {
.err =
}
return ,
}
func ( *copyFromSlice) () error {
return .err
}
Next() bool
Values() ([]interface{}, error)
Err() error
}
type copyFrom struct {
conn *Conn
tableName Identifier
columnNames []string
rowSrc CopyFromSource
readerErrChan chan error
}
func ( *copyFrom) ( context.Context) (int64, error) {
:= .tableName.Sanitize()
:= &bytes.Buffer{}
for , := range .columnNames {
if != 0 {
.WriteString(", ")
}
.WriteString(quoteIdentifier())
}
:= .String()
, := .conn.Prepare(, "", fmt.Sprintf("select %s from %s", , ))
if != nil {
return 0,
}
, := io.Pipe()
:= make(chan struct{})
go func() {
defer close()
:= .conn.wbuf
= append(, "PGCOPY\n\377\r\n\000"...)
= pgio.AppendInt32(, 0)
= pgio.AppendInt32(, 0)
:= true
for {
var error
, , = .buildCopyBuf(, )
if != nil {
.CloseWithError()
return
}
if .rowSrc.Err() != nil {
.CloseWithError(.rowSrc.Err())
return
}
if len() > 0 {
_, = .Write()
if != nil {
.Close()
return
}
}
= [:0]
}
.Close()
}()
:= time.Now()
, := .conn.pgConn.CopyFrom(, , fmt.Sprintf("copy %s ( %s ) from stdin binary;", , ))
.Close()
<-
:= .RowsAffected()
if == nil {
if .conn.shouldLog(LogLevelInfo) {
:= time.Now()
.conn.log(, LogLevelInfo, "CopyFrom", map[string]interface{}{"tableName": .tableName, "columnNames": .columnNames, "time": .Sub(), "rowCount": })
}
} else if .conn.shouldLog(LogLevelError) {
.conn.log(, LogLevelError, "CopyFrom", map[string]interface{}{"err": , "tableName": .tableName, "columnNames": .columnNames})
}
return ,
}
func ( *copyFrom) ( []byte, *pgconn.StatementDescription) (bool, []byte, error) {
for .rowSrc.Next() {
, := .rowSrc.Values()
if != nil {
return false, nil,
}
if len() != len(.columnNames) {
return false, nil, fmt.Errorf("expected %d values, got %d values", len(.columnNames), len())
}
= pgio.AppendInt16(, int16(len(.columnNames)))
for , := range {
, = encodePreparedStatementArgument(.conn.connInfo, , .Fields[].DataTypeOID, )
if != nil {
return false, nil,
}
}
if len() > 65536 {
return true, , nil
}
}
return false, , nil
}
func ( *Conn) ( context.Context, Identifier, []string, CopyFromSource) (int64, error) {
:= ©From{
conn: ,
tableName: ,
columnNames: ,
rowSrc: ,
readerErrChan: make(chan error),
}
return .run()
![]() |
The pages are generated with Golds v0.3.2-preview. (GOOS=darwin GOARCH=amd64) Golds is a Go 101 project developed by Tapir Liu. PR and bug reports are welcome and can be submitted to the issue list. Please follow @Go100and1 (reachable from the left QR code) to get the latest news of Golds. |