Source File
versionstate.go
Belonging Package
golang.org/x/pkgsite/internal/postgres
package postgres
import (
)
func ( *DB) ( context.Context, []*internal.IndexVersion) ( error) {
defer derrors.WrapStack(&, "InsertIndexVersions(ctx, %v)", )
var []interface{}
for , := range {
= append(, .Path, .Version, version.ForSorting(.Version), .Timestamp, 0, "", "", version.IsIncompatible(.Version))
}
:= []string{"module_path", "version", "sort_version", "index_timestamp", "status", "error", "go_mod_path", "incompatible"}
:= `
ON CONFLICT
(module_path, version)
DO UPDATE SET
index_timestamp=excluded.index_timestamp,
next_processed_after=CURRENT_TIMESTAMP,
status=0`
return .db.Transact(, sql.LevelDefault, func( *database.DB) error {
return .BulkInsert(, "module_version_states", , , )
})
}
type ModuleVersionStateForUpsert struct {
ModulePath string
Version string
AppVersion string
Timestamp time.Time
Status int
HasGoMod bool
GoModPath string
FetchErr error
PackageVersionStates []*internal.PackageVersionState
}
:= len(.PackageVersionStates)
= &
}
return .db.Transact(, sql.LevelDefault, func( *database.DB) error {
if := upsertModuleVersionState(, , , ); != nil {
return
if := updateModulesStatus(, , .ModulePath, .Version, .Status); != nil {
return
}
if len(.PackageVersionStates) == 0 {
return nil
}
return upsertPackageVersionStates(, , .PackageVersionStates)
})
}
func ( context.Context, *database.DB, *int, *ModuleVersionStateForUpsert) ( error) {
defer derrors.WrapStack(&, "upsertModuleVersionState(%q, %q, ...)", .ModulePath, .Version)
, := trace.StartSpan(, "upsertModuleVersionState")
defer .End()
var string
if .FetchErr != nil {
= .FetchErr.Error()
}
, := .Exec(, `
INSERT INTO module_version_states AS mvs (
module_path,
version,
sort_version,
app_version,
index_timestamp,
status,
has_go_mod,
go_mod_path,
error,
num_packages,
incompatible)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11)
ON CONFLICT (module_path, version)
DO UPDATE
SET
app_version=excluded.app_version,
status=excluded.status,
has_go_mod=excluded.has_go_mod,
go_mod_path=excluded.go_mod_path,
error=excluded.error,
num_packages=excluded.num_packages,
try_count=mvs.try_count+1,
last_processed_at=CURRENT_TIMESTAMP,
-- back off exponentially until 1 hour, then at constant 1-hour intervals
next_processed_after=CASE
WHEN mvs.last_processed_at IS NULL THEN
CURRENT_TIMESTAMP + INTERVAL '1 minute'
WHEN 2*(mvs.next_processed_after - mvs.last_processed_at) < INTERVAL '1 hour' THEN
CURRENT_TIMESTAMP + 2*(mvs.next_processed_after - mvs.last_processed_at)
ELSE
CURRENT_TIMESTAMP + INTERVAL '1 hour'
END;`,
.ModulePath, .Version, version.ForSorting(.Version),
.AppVersion, .Timestamp, .Status, .HasGoMod, .GoModPath, , ,
version.IsIncompatible(.Version))
if != nil {
return
}
if != 1 {
return fmt.Errorf("module version state update affected %d rows, expected exactly 1", )
}
return nil
}
func ( context.Context, *database.DB, , string, int) ( error) {
defer derrors.WrapStack(&, "updateModulesStatus(%q, %q, %d)", , , )
:= `UPDATE modules
SET
status = $1
WHERE
module_path = $2
AND version = $3;`
, := .Exec(, , , , )
if != nil {
return
}
if > 1 {
return fmt.Errorf("module status update affected %d rows, expected at most 1", )
}
return nil
}
func ( *DB) ( context.Context, , string, int, string) ( error) {
defer derrors.WrapStack(&, "UpdateModuleVersionStatus(%q, %q, %d)", , , )
:= `
UPDATE module_version_states
SET status = $3, error = $4
WHERE module_path = $1 AND version = $2
`
_, = .db.Exec(, , , , , )
return
}
func ( context.Context, *database.DB, []*internal.PackageVersionState) ( error) {
defer derrors.WrapStack(&, "upsertPackageVersionStates")
, := trace.StartSpan(, "upsertPackageVersionStates")
defer .End()
sort.Slice(, func(, int) bool {
return [].PackagePath < [].PackagePath
})
var []interface{}
for , := range {
= append(, .PackagePath, .ModulePath, .Version, .Status, .Error)
}
return .BulkInsert(, "package_version_states",
[]string{
"package_path",
"module_path",
"version",
"status",
"error",
},
,
`ON CONFLICT (module_path, package_path, version)
DO UPDATE
SET
package_path=excluded.package_path,
module_path=excluded.module_path,
version=excluded.version,
status=excluded.status,
error=excluded.error`)
}
func ( *DB) ( context.Context) ( time.Time, error) {
defer derrors.WrapStack(&, "LatestIndexTimestamp(ctx)")
:= `SELECT index_timestamp
FROM module_version_states
ORDER BY index_timestamp DESC
LIMIT 1`
var time.Time
:= .db.QueryRow(, )
switch := .Scan(&); {
case sql.ErrNoRows:
return time.Time{}, nil
case nil:
return , nil
default:
return time.Time{},
}
}
const moduleVersionStateColumns = `
module_path,
version,
index_timestamp,
created_at,
status,
error,
try_count,
last_processed_at,
next_processed_after,
app_version,
has_go_mod,
go_mod_path,
num_packages`
func ( func( ...interface{}) error) (*internal.ModuleVersionState, error) {
var (
internal.ModuleVersionState
pq.NullTime
sql.NullInt64
sql.NullBool
)
if := (&.ModulePath, &.Version, &.IndexTimestamp, &.CreatedAt, &.Status, &.Error,
&.TryCount, &.LastProcessedAt, &.NextProcessedAfter, &.AppVersion, &, &.GoModPath,
&); != nil {
return nil,
}
if .Valid {
:= .Time
.LastProcessedAt = &
}
if .Valid {
.HasGoMod = .Bool
}
if .Valid {
:= int(.Int64)
.NumPackages = &
}
return &, nil
}
func ( *DB) ( context.Context, string, ...interface{}) ([]*internal.ModuleVersionState, error) {
:= fmt.Sprintf(, moduleVersionStateColumns)
, := .db.Query(, , ...)
if != nil {
return nil,
}
defer .Close()
var []*internal.ModuleVersionState
for .Next() {
, := scanModuleVersionState(.Scan)
if != nil {
return nil, fmt.Errorf("rows.Scan(): %v", )
}
= append(, )
}
return , nil
}
func ( *DB) ( context.Context, int) ( []*internal.ModuleVersionState, error) {
defer derrors.WrapStack(&, "GetRecentFailedVersions(ctx, %d)", )
:= `
SELECT %s
FROM
module_version_states
WHERE status=500
ORDER BY last_processed_at DESC
LIMIT $1`
return .queryModuleVersionStates(, , )
}
func ( *DB) ( context.Context, int) ( []*internal.ModuleVersionState, error) {
defer derrors.WrapStack(&, "GetRecentVersions(ctx, %d)", )
:= `
SELECT %s
FROM
module_version_states
ORDER BY created_at DESC
LIMIT $1`
return .queryModuleVersionStates(, , )
}
func ( *DB) ( context.Context, , string) ( *internal.ModuleVersionState, error) {
defer derrors.WrapStack(&, "GetModuleVersionState(ctx, %q, %q)", , )
:= fmt.Sprintf(`
SELECT %s
FROM
module_version_states
WHERE
module_path = $1
AND version = $2;`, moduleVersionStateColumns)
:= .db.QueryRow(, , , )
, := scanModuleVersionState(.Scan)
switch {
case nil:
return , nil
case sql.ErrNoRows:
return nil, derrors.NotFound
default:
return nil, fmt.Errorf("row.Scan(): %v", )
}
}
func ( *DB) ( context.Context, , string) ( []*internal.PackageVersionState, error) {
defer derrors.WrapStack(&, "GetPackageVersionState(ctx, %q, %q)", , )
:= `
SELECT
package_path,
module_path,
version,
status,
error
FROM
package_version_states
WHERE
module_path = $1
AND version = $2;`
var []*internal.PackageVersionState
:= func( *sql.Rows) error {
var internal.PackageVersionState
if := .Scan(&.PackagePath, &.ModulePath, &.Version,
&.Status, &.Error); != nil {
return fmt.Errorf("rows.Scan(): %v", )
}
= append(, &)
return nil
}
if := .db.RunQuery(, , , , ); != nil {
return nil,
}
return , nil
}
func ( *DB) ( context.Context, , , string) ( *internal.PackageVersionState, error) {
defer derrors.WrapStack(&, "GetPackageVersionState(ctx, %q, %q, %q)", , , )
:= `
SELECT
package_path,
module_path,
version,
status,
error
FROM
package_version_states
WHERE
package_path = $1
AND module_path = $2
AND version = $3;`
var internal.PackageVersionState
= .db.QueryRow(, , , , ).Scan(
&.PackagePath, &.ModulePath, &.Version,
&.Status, &.Error)
switch {
case nil:
return &, nil
case sql.ErrNoRows:
return nil, derrors.NotFound
default:
return nil, fmt.Errorf("row.Scan(): %v", )
}
}
type VersionStats struct {
LatestTimestamp time.Time
VersionCounts map[int]int // from status to number of rows
}
func ( *DB) ( context.Context) ( *VersionStats, error) {
defer derrors.WrapStack(&, "GetVersionStats(ctx)")
:= `
SELECT
status,
max(index_timestamp),
count(*)
FROM
module_version_states
GROUP BY status;`
:= &VersionStats{
VersionCounts: make(map[int]int),
}
= .db.RunQuery(, , func( *sql.Rows) error {
var (
sql.NullInt64
time.Time
int
)
if := .Scan(&, &, &); != nil {
return fmt.Errorf("row.Scan(): %v", )
}
if .After(.LatestTimestamp) {
.LatestTimestamp =
}
.VersionCounts[int(.Int64)] =
return nil
})
if != nil {
return nil,
}
return , nil
}
![]() |
The pages are generated with Golds v0.3.2-preview. (GOOS=darwin GOARCH=amd64) Golds is a Go 101 project developed by Tapir Liu. PR and bug reports are welcome and can be submitted to the issue list. Please follow @Go100and1 (reachable from the left QR code) to get the latest news of Golds. |