Package migrate reads migrations from sources and runs them against databases. Sources are defined by the `source.Driver` and databases by the `database.Driver` interface. The driver interfaces are kept "dump", all migration logic is kept in this package.
package migrate

import (
	
	
	
	
	

	

	
	iurl 
	
)
DefaultPrefetchMigrations sets the number of migrations to pre-read from the source. This is helpful if the source is remote, but has little effect for a local source (i.e. file system). Please note that this setting has a major impact on the memory usage, since each pre-read migration is buffered in memory. See DefaultBufferSize.
DefaultLockTimeout sets the max time a database driver has to acquire a lock.
var DefaultLockTimeout = 15 * time.Second

var (
	ErrNoChange       = errors.New("no change")
	ErrNilVersion     = errors.New("no migration")
	ErrInvalidVersion = errors.New("version must be >= -1")
	ErrLocked         = errors.New("database locked")
	ErrLockTimeout    = errors.New("timeout: can't acquire database lock")
)
ErrShortLimit is an error returned when not enough migrations can be returned by a source for a given limit.
type ErrShortLimit struct {
	Short uint
}
Error implements the error interface.
func ( ErrShortLimit) () string {
	return fmt.Sprintf("limit %v short", .Short)
}

type ErrDirty struct {
	Version int
}

func ( ErrDirty) () string {
	return fmt.Sprintf("Dirty database version %v. Fix and force version.", .Version)
}

type Migrate struct {
	sourceName   string
	sourceDrv    source.Driver
	databaseName string
	databaseDrv  database.Driver
Log accepts a Logger interface
GracefulStop accepts `true` and will stop executing migrations as soon as possible at a safe break point, so that the database is not corrupted.
PrefetchMigrations defaults to DefaultPrefetchMigrations, but can be set per Migrate instance.
LockTimeout defaults to DefaultLockTimeout, but can be set per Migrate instance.
New returns a new Migrate instance from a source URL and a database URL. The URL scheme is defined by each driver.
func (,  string) (*Migrate, error) {
	 := newCommon()

	,  := iurl.SchemeFromURL()
	if  != nil {
		return nil, 
	}
	.sourceName = 

	,  := iurl.SchemeFromURL()
	if  != nil {
		return nil, 
	}
	.databaseName = 

	,  := source.Open()
	if  != nil {
		return nil, 
	}
	.sourceDrv = 

	,  := database.Open()
	if  != nil {
		return nil, 
	}
	.databaseDrv = 

	return , nil
}
NewWithDatabaseInstance returns a new Migrate instance from a source URL and an existing database instance. The source URL scheme is defined by each driver. Use any string that can serve as an identifier during logging as databaseName. You are responsible for closing the underlying database client if necessary.
func ( string,  string,  database.Driver) (*Migrate, error) {
	 := newCommon()

	,  := iurl.SchemeFromURL()
	if  != nil {
		return nil, 
	}
	.sourceName = 

	.databaseName = 

	,  := source.Open()
	if  != nil {
		return nil, 
	}
	.sourceDrv = 

	.databaseDrv = 

	return , nil
}
NewWithSourceInstance returns a new Migrate instance from an existing source instance and a database URL. The database URL scheme is defined by each driver. Use any string that can serve as an identifier during logging as sourceName. You are responsible for closing the underlying source client if necessary.
func ( string,  source.Driver,  string) (*Migrate, error) {
	 := newCommon()

	,  := iurl.SchemeFromURL()
	if  != nil {
		return nil, 
	}
	.databaseName = 

	.sourceName = 

	,  := database.Open()
	if  != nil {
		return nil, 
	}
	.databaseDrv = 

	.sourceDrv = 

	return , nil
}
NewWithInstance returns a new Migrate instance from an existing source and database instance. Use any string that can serve as an identifier during logging as sourceName and databaseName. You are responsible for closing down the underlying source and database client if necessary.
func ( string,  source.Driver,  string,  database.Driver) (*Migrate, error) {
	 := newCommon()

	.sourceName = 
	.databaseName = 

	.sourceDrv = 
	.databaseDrv = 

	return , nil
}

func () *Migrate {
	return &Migrate{
		GracefulStop:       make(chan bool, 1),
		PrefetchMigrations: DefaultPrefetchMigrations,
		LockTimeout:        DefaultLockTimeout,
		isLockedMu:         &sync.Mutex{},
	}
}
Close closes the source and the database.
func ( *Migrate) () ( error,  error) {
	 := make(chan error)
	 := make(chan error)

	.logVerbosePrintf("Closing source and database\n")

	go func() {
		 <- .databaseDrv.Close()
	}()

	go func() {
		 <- .sourceDrv.Close()
	}()

	return <-, <-
}
Migrate looks at the currently active migration version, then migrates either up or down to the specified version.
func ( *Migrate) ( uint) error {
	if  := .lock();  != nil {
		return 
	}

	, ,  := .databaseDrv.Version()
	if  != nil {
		return .unlockErr()
	}

	if  {
		return .unlockErr(ErrDirty{})
	}

	 := make(chan interface{}, .PrefetchMigrations)
	go .read(, int(), )

	return .unlockErr(.runMigrations())
}
Steps looks at the currently active migration version. It will migrate up if n > 0, and down if n < 0.
func ( *Migrate) ( int) error {
	if  == 0 {
		return ErrNoChange
	}

	if  := .lock();  != nil {
		return 
	}

	, ,  := .databaseDrv.Version()
	if  != nil {
		return .unlockErr()
	}

	if  {
		return .unlockErr(ErrDirty{})
	}

	 := make(chan interface{}, .PrefetchMigrations)

	if  > 0 {
		go .readUp(, , )
	} else {
		go .readDown(, -, )
	}

	return .unlockErr(.runMigrations())
}
Up looks at the currently active migration version and will migrate all the way up (applying all up migrations).
func ( *Migrate) () error {
	if  := .lock();  != nil {
		return 
	}

	, ,  := .databaseDrv.Version()
	if  != nil {
		return .unlockErr()
	}

	if  {
		return .unlockErr(ErrDirty{})
	}

	 := make(chan interface{}, .PrefetchMigrations)

	go .readUp(, -1, )
	return .unlockErr(.runMigrations())
}
Down looks at the currently active migration version and will migrate all the way down (applying all down migrations).
func ( *Migrate) () error {
	if  := .lock();  != nil {
		return 
	}

	, ,  := .databaseDrv.Version()
	if  != nil {
		return .unlockErr()
	}

	if  {
		return .unlockErr(ErrDirty{})
	}

	 := make(chan interface{}, .PrefetchMigrations)
	go .readDown(, -1, )
	return .unlockErr(.runMigrations())
}
Drop deletes everything in the database.
func ( *Migrate) () error {
	if  := .lock();  != nil {
		return 
	}
	if  := .databaseDrv.Drop();  != nil {
		return .unlockErr()
	}
	return .unlock()
}
Run runs any migration provided by you against the database. It does not check any currently active version in database. Usually you don't need this function at all. Use Migrate, Steps, Up or Down instead.
func ( *Migrate) ( ...*Migration) error {
	if len() == 0 {
		return ErrNoChange
	}

	if  := .lock();  != nil {
		return 
	}

	, ,  := .databaseDrv.Version()
	if  != nil {
		return .unlockErr()
	}

	if  {
		return .unlockErr(ErrDirty{})
	}

	 := make(chan interface{}, .PrefetchMigrations)

	go func() {
		defer close()
		for ,  := range  {
			if .PrefetchMigrations > 0 && .Body != nil {
				.logVerbosePrintf("Start buffering %v\n", .LogString())
			} else {
				.logVerbosePrintf("Scheduled %v\n", .LogString())
			}

			 <- 
			go func( *Migration) {
				if  := .Buffer();  != nil {
					.logErr()
				}
			}()
		}
	}()

	return .unlockErr(.runMigrations())
}
Force sets a migration version. It does not check any currently active version in database. It resets the dirty state to false.
func ( *Migrate) ( int) error {
	if  < -1 {
		return ErrInvalidVersion
	}

	if  := .lock();  != nil {
		return 
	}

	if  := .databaseDrv.SetVersion(, false);  != nil {
		return .unlockErr()
	}

	return .unlock()
}
Version returns the currently active migration version. If no migration has been applied, yet, it will return ErrNilVersion.
func ( *Migrate) () ( uint,  bool,  error) {
	, ,  := .databaseDrv.Version()
	if  != nil {
		return 0, false, 
	}

	if  == database.NilVersion {
		return 0, false, ErrNilVersion
	}

	return suint(), , nil
}
read reads either up or down migrations from source `from` to `to`. Each migration is then written to the ret channel. If an error occurs during reading, that error is written to the ret channel, too. Once read is done reading it will close the ret channel.
func ( *Migrate) ( int,  int,  chan<- interface{}) {
	defer close()
check if from version exists
	if  >= 0 {
		if  := .versionExists(suint());  != nil {
			 <- 
			return
		}
	}
check if to version exists
	if  >= 0 {
		if  := .versionExists(suint());  != nil {
			 <- 
			return
		}
	}
no change?
	if  ==  {
		 <- ErrNoChange
		return
	}

it's going up apply first migration if from is nil version
		if  == -1 {
			,  := .sourceDrv.First()
			if  != nil {
				 <- 
				return
			}

			,  := .newMigration(, int())
			if  != nil {
				 <- 
				return
			}

			 <- 
			go func() {
				if  := .Buffer();  != nil {
					.logErr()
				}
			}()

			 = int()
		}
run until we reach target ...
		for  <  {
			if .stop() {
				return
			}

			,  := .sourceDrv.Next(suint())
			if  != nil {
				 <- 
				return
			}

			,  := .newMigration(, int())
			if  != nil {
				 <- 
				return
			}

			 <- 
			go func() {
				if  := .Buffer();  != nil {
					.logErr()
				}
			}()

			 = int()
		}

it's going down run until we reach target ...
		for  >  &&  >= 0 {
			if .stop() {
				return
			}

			,  := .sourceDrv.Prev(suint())
apply nil migration
				,  := .newMigration(suint(), -1)
				if  != nil {
					 <- 
					return
				}
				 <- 
				go func() {
					if  := .Buffer();  != nil {
						.logErr()
					}
				}()

				return

			} else if  != nil {
				 <- 
				return
			}

			,  := .newMigration(suint(), int())
			if  != nil {
				 <- 
				return
			}

			 <- 
			go func() {
				if  := .Buffer();  != nil {
					.logErr()
				}
			}()

			 = int()
		}
	}
}
readUp reads up migrations from `from` limitted by `limit`. limit can be -1, implying no limit and reading until there are no more migrations. Each migration is then written to the ret channel. If an error occurs during reading, that error is written to the ret channel, too. Once readUp is done reading it will close the ret channel.
func ( *Migrate) ( int,  int,  chan<- interface{}) {
	defer close()
check if from version exists
	if  >= 0 {
		if  := .versionExists(suint());  != nil {
			 <- 
			return
		}
	}

	if  == 0 {
		 <- ErrNoChange
		return
	}

	 := 0
	for  <  ||  == -1 {
		if .stop() {
			return
		}
apply first migration if from is nil version
		if  == -1 {
			,  := .sourceDrv.First()
			if  != nil {
				 <- 
				return
			}

			,  := .newMigration(, int())
			if  != nil {
				 <- 
				return
			}

			 <- 
			go func() {
				if  := .Buffer();  != nil {
					.logErr()
				}
			}()
			 = int()
			++
			continue
		}
apply next migration
		,  := .sourceDrv.Next(suint())
no limit, but no migrations applied?
			if  == -1 &&  == 0 {
				 <- ErrNoChange
				return
			}
no limit, reached end
			if  == -1 {
				return
			}
reached end, and didn't apply any migrations
			if  > 0 &&  == 0 {
				 <- os.ErrNotExist
				return
			}
applied less migrations than limit?
			if  <  {
				 <- ErrShortLimit{suint( - )}
				return
			}
		}
		if  != nil {
			 <- 
			return
		}

		,  := .newMigration(, int())
		if  != nil {
			 <- 
			return
		}

		 <- 
		go func() {
			if  := .Buffer();  != nil {
				.logErr()
			}
		}()
		 = int()
		++
	}
}
readDown reads down migrations from `from` limitted by `limit`. limit can be -1, implying no limit and reading until there are no more migrations. Each migration is then written to the ret channel. If an error occurs during reading, that error is written to the ret channel, too. Once readDown is done reading it will close the ret channel.
func ( *Migrate) ( int,  int,  chan<- interface{}) {
	defer close()
check if from version exists
	if  >= 0 {
		if  := .versionExists(suint());  != nil {
			 <- 
			return
		}
	}

	if  == 0 {
		 <- ErrNoChange
		return
	}
no change if already at nil version
	if  == -1 &&  == -1 {
		 <- ErrNoChange
		return
	}
can't go over limit if already at nil version
	if  == -1 &&  > 0 {
		 <- os.ErrNotExist
		return
	}

	 := 0
	for  <  ||  == -1 {
		if .stop() {
			return
		}

		,  := .sourceDrv.Prev(suint())
no limit or haven't reached limit, apply "first" migration
			if  == -1 || - > 0 {
				,  := .sourceDrv.First()
				if  != nil {
					 <- 
					return
				}

				,  := .newMigration(, -1)
				if  != nil {
					 <- 
					return
				}
				 <- 
				go func() {
					if  := .Buffer();  != nil {
						.logErr()
					}
				}()
				++
			}

			if  <  {
				 <- ErrShortLimit{suint( - )}
			}
			return
		}
		if  != nil {
			 <- 
			return
		}

		,  := .newMigration(suint(), int())
		if  != nil {
			 <- 
			return
		}

		 <- 
		go func() {
			if  := .Buffer();  != nil {
				.logErr()
			}
		}()
		 = int()
		++
	}
}
runMigrations reads *Migration and error from a channel. Any other type sent on this channel will result in a panic. Each migration is then proxied to the database driver and run against the database. Before running a newly received migration it will check if it's supposed to stop execution because it might have received a stop signal on the GracefulStop channel.
func ( *Migrate) ( <-chan interface{}) error {
	for  := range  {

		if .stop() {
			return nil
		}

		switch r := .(type) {
		case error:
			return 

		case *Migration:
			 := 
set version with dirty state
			if  := .databaseDrv.SetVersion(.TargetVersion, true);  != nil {
				return 
			}

			if .Body != nil {
				.logVerbosePrintf("Read and execute %v\n", .LogString())
				if  := .databaseDrv.Run(.BufferedBody);  != nil {
					return 
				}
			}
set clean state
			if  := .databaseDrv.SetVersion(.TargetVersion, false);  != nil {
				return 
			}

			 := time.Now()
			 := .FinishedReading.Sub(.StartedBuffering)
			 := .Sub(.FinishedReading)
log either verbose or normal
			if .Log != nil {
				if .Log.Verbose() {
					.logPrintf("Finished %v (read %v, ran %v)\n", .LogString(), , )
				} else {
					.logPrintf("%v (%v)\n", .LogString(), +)
				}
			}

		default:
			return fmt.Errorf("unknown type: %T with value: %+v", , )
		}
	}
	return nil
}
versionExists checks the source if either the up or down migration for the specified migration version exists.
try up migration first
	, ,  := .sourceDrv.ReadUp()
	if  == nil {
		defer func() {
			if  := .Close();  != nil {
				 = multierror.Append(, )
			}
		}()
	}
	if os.IsExist() {
		return nil
	} else if !os.IsNotExist() {
		return 
	}
then try down migration
	, ,  := .sourceDrv.ReadDown()
	if  == nil {
		defer func() {
			if  := .Close();  != nil {
				 = multierror.Append(, )
			}
		}()
	}
	if os.IsExist() {
		return nil
	} else if !os.IsNotExist() {
		return 
	}

	.logErr(fmt.Errorf("no migration found for version %d", ))
	return os.ErrNotExist
}
stop returns true if no more migrations should be run against the database because a stop signal was received on the GracefulStop channel. Calls are cheap and this function is not blocking.
func ( *Migrate) () bool {
	if .isGracefulStop {
		return true
	}

	select {
	case <-.GracefulStop:
		.isGracefulStop = true
		return true

	default:
		return false
	}
}
newMigration is a helper func that returns a *Migration for the specified version and targetVersion.
func ( *Migrate) ( uint,  int) (*Migration, error) {
	var  *Migration

	if  >= int() {
		, ,  := .sourceDrv.ReadUp()
create "empty" migration
			,  = NewMigration(nil, "", , )
			if  != nil {
				return nil, 
			}

		} else if  != nil {
			return nil, 

create migration from up source
			,  = NewMigration(, , , )
			if  != nil {
				return nil, 
			}
		}

	} else {
		, ,  := .sourceDrv.ReadDown()
create "empty" migration
			,  = NewMigration(nil, "", , )
			if  != nil {
				return nil, 
			}

		} else if  != nil {
			return nil, 

create migration from down source
			,  = NewMigration(, , , )
			if  != nil {
				return nil, 
			}
		}
	}

	if .PrefetchMigrations > 0 && .Body != nil {
		.logVerbosePrintf("Start buffering %v\n", .LogString())
	} else {
		.logVerbosePrintf("Scheduled %v\n", .LogString())
	}

	return , nil
}
lock is a thread safe helper function to lock the database. It should be called as late as possible when running migrations.
func ( *Migrate) () error {
	.isLockedMu.Lock()
	defer .isLockedMu.Unlock()

	if .isLocked {
		return ErrLocked
	}
create done channel, used in the timeout goroutine
	 := make(chan bool, 1)
	defer func() {
		 <- true
	}()
use errchan to signal error back to this context
	 := make(chan error, 2)
start timeout goroutine
	 := time.After(.LockTimeout)
	go func() {
		for {
			select {
			case <-:
				return
			case <-:
				 <- ErrLockTimeout
				return
			}
		}
	}()
now try to acquire the lock
	go func() {
		if  := .databaseDrv.Lock();  != nil {
			 <- 
		} else {
			 <- nil
		}
	}()
wait until we either receive ErrLockTimeout or error from Lock operation
	 := <-
	if  == nil {
		.isLocked = true
	}
	return 
}
unlock is a thread safe helper function to unlock the database. It should be called as early as possible when no more migrations are expected to be executed.
func ( *Migrate) () error {
	.isLockedMu.Lock()
	defer .isLockedMu.Unlock()

BUG: Can potentially create a deadlock. Add a timeout.
		return 
	}

	.isLocked = false
	return nil
}
unlockErr calls unlock and returns a combined error if a prevErr is not nil.
func ( *Migrate) ( error) error {
	if  := .unlock();  != nil {
		return multierror.Append(, )
	}
	return 
}
logPrintf writes to m.Log if not nil
func ( *Migrate) ( string,  ...interface{}) {
	if .Log != nil {
		.Log.Printf(, ...)
	}
}
logVerbosePrintf writes to m.Log if not nil. Use for verbose logging output.
func ( *Migrate) ( string,  ...interface{}) {
	if .Log != nil && .Log.Verbose() {
		.Log.Printf(, ...)
	}
}
logErr writes error to m.Log if not nil
func ( *Migrate) ( error) {
	if .Log != nil {
		.Log.Printf("error: %v", )
	}