Copyright 2021 The Go Authors. All rights reserved. Use of this source code is governed by a BSD-style license that can be found in the LICENSE file.

package database

import (
	
	
	
	
	

	
	
	
	
)
CopyUpsert upserts rows into table using the pgx driver's CopyFrom method. It returns an error if the underlying driver is not pgx. columns is the list of columns to upsert. src is the source of the rows to upsert. conflictColumns are the columns that might conflict (i.e. that have a UNIQUE constraint). If dropColumn is non-empty, that column will be dropped from the temporary table before copying. Use dropColumn for generated ID columns. CopyUpsert works by first creating a temporary table, populating it with CopyFrom, and then running an INSERT...SELECT...ON CONFLICT to upsert its rows into the original table.
func ( *DB) ( context.Context,  string,  []string,  pgx.CopyFromSource,  []string,  string) ( error) {
	defer derrors.Wrap(&, "CopyUpsert(%q)", )

	if !.InTransaction() {
		return errors.New("not in a transaction")
	}

	return .WithPGXConn(func( *pgx.Conn) error {
		 := fmt.Sprintf("__%s_copy", )
		 := fmt.Sprintf(`
			DROP TABLE IF EXISTS %s;
			CREATE TEMP TABLE %[1]s (LIKE %s) ON COMMIT DROP;
		`, , )
		if  != "" {
			 += fmt.Sprintf("ALTER TABLE %s DROP COLUMN %s", , )
		}
		_,  = .Exec(, )
		if  != nil {
			return 
		}
		 := time.Now()
		,  := .CopyFrom(, []string{}, , )
		if  != nil {
			return fmt.Errorf("CopyFrom: %w", )
		}
		if !QueryLoggingDisabled {
			log.Debugf(, "CopyUpsert(%q): copied %d rows in %s", , , time.Since())
		}
		 := buildUpsertConflictAction(, )
		 := strings.Join(, ", ")
		 := fmt.Sprintf("INSERT INTO %s (%s) SELECT %s FROM %s %s", , , , , )
		defer logQuery(, , nil, .instanceID, .IsRetryable())(&)
		 = time.Now()
		,  := .Exec(, )
		if  != nil {
			return 
		}
		if !QueryLoggingDisabled {
			log.Debugf(, "CopyUpsert(%q): upserted %d rows in %s", , .RowsAffected(), time.Since())
		}
		return nil
	})
}

func ( *DB) ( func( *pgx.Conn) error) error {
	if !.InTransaction() {
		return errors.New("not in a transaction")
	}
	return .conn.Raw(func( interface{}) error {
		if ,  := .(*wrapConn);  {
			 = .underlying
		}
		,  := .(*stdlib.Conn)
		if ! {
			return fmt.Errorf("DB driver is not pgx or wrapper; conn type is %T", )
		}
		return (.Conn())
	})
}
A RowItem is a row of values or an error.
type RowItem struct {
	Values []interface{}
	Err    error
}
CopyFromChan returns a CopyFromSource that gets its rows from a channel.
func ( <-chan RowItem) pgx.CopyFromSource {
	return &chanCopySource{c: }
}

type chanCopySource struct {
	c    <-chan RowItem
	next RowItem
}
Next implements CopyFromSource.Next.
func ( *chanCopySource) () bool {
	if .next.Err != nil {
		return false
	}
	var  bool
	.next,  = <-.c
	return 
}
Values implements CopyFromSource.Values.
func ( *chanCopySource) () ([]interface{}, error) {
	return .next.Values, .next.Err
}
Err implements CopyFromSource.Err.
func ( *chanCopySource) () error {
	return .next.Err