Copyright 2020 The Go Authors. All rights reserved. Use of this source code is governed by a BSD-style license that can be found in the LICENSE file.

package postgres

import (
	
	
	
	
	

	
	
	
	
)
UpdateModuleVersionStatesForReprocessing marks modules to be reprocessed that were processed prior to the provided appVersion.
func ( *DB) ( context.Context,  string) ( error) {
	defer derrors.WrapStack(&, "UpdateModuleVersionStatesForReprocessing(ctx, %q)", )

	for ,  := range []int{
		http.StatusOK,
		derrors.ToStatus(derrors.HasIncompletePackages),
		derrors.ToStatus(derrors.DBModuleInsertInvalid),
	} {
		if  := .UpdateModuleVersionStatesWithStatus(, , );  != nil {
			return 
		}
	}
	return nil
}
UpdateModuleVersionStatesForReprocessingReleaseVersionsOnly marks modules to be reprocessed that were processed prior to the provided appVersion.
func ( *DB) ( context.Context,  string) ( error) {
	 := `
		UPDATE module_version_states mvs
		SET
			status = (
				CASE WHEN status=200 THEN 520
					 WHEN status=290 THEN 521
					 END
				),
			next_processed_after = CURRENT_TIMESTAMP,
			last_processed_at = NULL
		WHERE
			app_version < $1
			AND (status = 200 OR status = 290)
			AND right(sort_version, 1) = '~' -- release versions only
			AND NOT incompatible;`
	,  := .db.Exec(, , )
	if  != nil {
		return 
	}
	log.Infof(, "Updated release and non-incompatible versions of module_version_states with status=200 and status=290 and app_version < %q; %d affected", , )
	return nil
}
UpdateModuleVersionStatesForReprocessingLatestOnly marks modules to be reprocessed that were processed prior to the provided appVersion.
func ( *DB) ( context.Context,  string) ( error) {
	 := `
		UPDATE module_version_states mvs
		SET
			status = (
				CASE WHEN status=200 THEN 520
					 WHEN status=290 THEN 521
					 END
				),
			next_processed_after = CURRENT_TIMESTAMP,
			last_processed_at = NULL
		FROM (
			SELECT DISTINCT ON (module_path) module_path, version
			FROM module_version_states
			ORDER BY
				module_path,
				incompatible,
				right(sort_version, 1) = '~' DESC, -- prefer release versions
				sort_version DESC
		) latest
		WHERE
			app_version < $1
			AND (status = 200 OR status = 290)
			AND latest.module_path = mvs.module_path
			AND latest.version = mvs.version;`
	,  := .db.Exec(, , )
	if  != nil {
		return 
	}
	log.Infof(, "Updated latest version of module_version_states with status=200 and status=290 and app_version < %q; %d affected", , )
	return nil
}

func ( *DB) ( context.Context,  int,  string) ( error) {
	 := `UPDATE module_version_states
			SET
				status = $2,
				next_processed_after = CURRENT_TIMESTAMP,
				last_processed_at = NULL
			WHERE
				app_version < $1
				AND status = $3;`
	,  := .db.Exec(, , , derrors.ToReprocessStatus(), )
	if  != nil {
		return 
	}
	log.Infof(,
		"Updated module_version_states with status=%d and app_version < %q to status=%d; %d affected",
		, , derrors.ToReprocessStatus(), )
	return nil
}
largeModulePackageThresold represents the package threshold at which it becomes difficult to process packages. Modules with more than this number of packages are generally different versions or forks of kubernetes, aws-sdk-go, azure-sdk-go, and bilibili.
largeModulesLimit represents the number of large modules that we are willing to enqueue at a given time. var for testing.
var largeModulesLimit = config.GetEnvInt("GO_DISCOVERY_LARGE_MODULES_LIMIT", 100)
GetNextModulesToFetch returns the next batch of modules that need to be processed. We prioritize modules based on (1) whether it has status zero (never processed), (2) whether it is the latest version, (3) if it is an alternative module, and (4) the number of packages it has. We want to leave time-consuming modules until the end and process them at a slower rate to reduce database load and timeouts. We also want to leave alternative modules towards the end, since these will incur unnecessary deletes otherwise.
func ( *DB) ( context.Context,  int) ( []*internal.ModuleVersionState,  error) {
	defer derrors.WrapStack(&, "GetNextModulesToFetch(ctx, %d)", )
	 := nextModulesToProcessQuery

	var  []*internal.ModuleVersionState
	 := fmt.Sprintf(, moduleVersionStateColumns)

Scan the last two columns separately; they are in the query only for sorting.
		 := func( ...interface{}) error {
			var (
				 bool
				   int
			)
			return .Scan(append(, &, &)...)
		}
		,  := scanModuleVersionState()
		if  != nil {
			return 
		}
		 = append(, )
		return nil
	}
	if  := .db.RunQuery(, , , );  != nil {
		return nil, 
	}
	if len() == 0 {
		log.Infof(, "No modules to requeue")
	} else {
		 := func( *int) string {
			if  == nil {
				return "NULL"
			}
			return strconv.Itoa(*)
		}
		 := [0]
		 := [len()-1]
		 := fmt.Sprintf("%s <= num_packages <= %s", (.NumPackages), (.NumPackages))
		log.Infof(, "GetNextModulesToFetch: num_modules=%d; %s; start_module=%q; end_module=%q",
			len(), ,
			fmt.Sprintf("%s/@v/%s", .ModulePath, .Version),
			fmt.Sprintf("%s/@v/%s", .ModulePath, .Version))
	}
Don't return more than largeModulesLimit of modules that have more than largeModulePackageThreshold packages, or of modules with status zero.
	 := 0
	for ,  := range  {
		if .Status == 0 || (.NumPackages != nil && *.NumPackages >= largeModulePackageThreshold) {
			++
		}
		if  > largeModulesLimit {
			return [:], nil
		}
	}
	return , nil
}
This query prioritizes latest versions, but other than that, it tries to avoid grouping modules in any way except by latest and status code: processing is much smoother when they are enqueued in random order. To make the result deterministic for testing, we hash the module path and version rather than actually choosing a random number. md5 is built in to postgres and is an adequate hash for this purpose.
const nextModulesToProcessQuery = `
	-- Make a table of the latest versions of each module.
	WITH latest_versions AS (
		SELECT DISTINCT ON (module_path) module_path, version
		FROM module_version_states
		ORDER BY
			module_path,
			incompatible,
			right(sort_version, 1) = '~' DESC, -- prefer release versions
			sort_version DESC
	)
	SELECT %s, latest, npkg
	FROM (
		SELECT
			%[1]s,
			((module_path, version) IN (SELECT * FROM latest_versions)) AS latest,
			COALESCE(num_packages, 0) AS npkg
		FROM module_version_states
	) s
	WHERE next_processed_after < CURRENT_TIMESTAMP
		AND (status = 0 OR status >= 500)
	ORDER BY
		CASE
			-- new modules
			WHEN status = 0 THEN 0
			WHEN latest THEN
				CASE
					-- with SheddingLoad or ReprocessStatusOK or ReprocessHasIncompletePackages
					WHEN status = 503 or status = 520 OR status = 521 THEN 1
					-- with ReprocessBadModule or ReprocessAlternative or ReprocessDBModuleInsertInvalid
					WHEN status = 540 OR status = 541 OR status = 542 THEN 2
					ELSE 5
				END
			-- non-latest
			WHEN status = 503 or status = 520 OR status = 521 THEN 3
			WHEN status = 540 OR status = 541 OR status = 542 THEN 4
			ELSE 5
		END,
		-- process new modules in the order they arrived at the index
		CASE WHEN status = 0 THEN index_timestamp ELSE CURRENT_TIMESTAMP END,
		md5(module_path||version) -- deterministic but effectively random
	LIMIT $1