Copyright 2019 The Go Authors. All rights reserved. Use of this source code is governed by a BSD-style license that can be found in the LICENSE file.
Package worker provides functionality for running a worker service. Its primary operation is to fetch modules from a proxy and write them to the database.
package worker

import (
	
	
	
	
	
	
	
	
	
	

	
	
	
	
	
	
	
	
	
	
	
	
	
	
	
	
	
)
NewServer creates a new Server with the given dependencies.
Install registers server routes using the given handler registration func.
rmw wires in error reporting to the handler. It is configured here, in Install, because not every handler should have error reporting.
Each AppEngine instance is created in response to a start request, which is an empty HTTP GET request to /_ah/start when scaling is set to manual or basic, and /_ah/warmup when scaling is automatic and min_instances is set. AppEngine sends this request to bring an instance into existence. See details for /_ah/start at https://cloud.google.com/appengine/docs/standard/go/how-instances-are-managed#startup and for /_ah/warmup at https://cloud.google.com/appengine/docs/standard/go/configuring-warmup-requests.
	("/_ah/", http.HandlerFunc(func( http.ResponseWriter,  *http.Request) {
		log.Infof(.Context(), "Request made to %q", .URL.Path)
	}))
scheduled: poll polls the Module Index for new modules that have been published and inserts that metadata into module_version_states. This endpoint is intended to be invoked periodically by a scheduler. See the note about duplicate tasks for "/enqueue" below.
	("/poll", (.errorHandler(.handlePollIndex)))
scheduled: update-imported-by-count update the imported_by_count for packages in search_documents where imported_by_count_updated_at is null or imported_by_count_updated_at < version_updated_at. This endpoint is intended to be invoked periodically by a scheduler.
	("/update-imported-by-count", (.errorHandler(.handleUpdateImportedByCount)))
task-queue: fetch fetches a module version from the Module Mirror, and processes the contents, and inserts it into the database. If a fetch request fails for any reason other than an http.StatusInternalServerError, it will return an http.StatusOK so that the task queue does not retry fetching module versions that have a terminal error. This endpoint is intended to be invoked by a task queue with semantics like Google Cloud Task Queues.
	("/fetch/", http.StripPrefix("/fetch", (http.HandlerFunc(.handleFetch))))
scheduled: fetch-std-master checks if the std@master version in the database is up to date with the version at HEAD. If not, a fetch request is queued to refresh the std@master version.
	("/fetch-std-master/", (.errorHandler(.handleFetchStdMaster)))
scheduled: enqueue queries the module_version_states table for the next batch of module versions to process, and enqueues them for processing. Normally this will not cause duplicate processing, because Cloud Tasks are de-duplicated. That does not apply after a task has been finished or deleted for Server.taskIDChangeInterval (see https://cloud.google.com/tasks/docs/reference/rpc/google.cloud.tasks.v2#createtaskrequest, under "Task De-duplication"). If you cannot wait, you can force duplicate tasks by providing any string as the "suffix" query parameter.
	("/enqueue", (.errorHandler(.handleEnqueue)))
TODO: remove after /queue is in production and the scheduler jobs have been changed. scheduled: requeue queries the module_version_states table for the next batch of module versions to process, and enqueues them for processing. Normally this will not cause duplicate processing, because Cloud Tasks are de-duplicated. That does not apply after a task has been finished or deleted for about an hour https://cloud.google.com/tasks/docs/reference/rpc/google.cloud.tasks.v2#createtaskrequest, under "Task De-duplication"). If you cannot wait, you can force duplicate tasks by providing any string as the "suffix" query parameter.
	("/requeue", (.errorHandler(.handleEnqueue)))
manual: reprocess sets a reprocess status for all records in the module_version_states table that were processed by an app_version that occurred after the provided app_version param, so that they will be scheduled for reprocessing the next time a request to /enqueue is made. If a status param is provided only module versions with that status will be reprocessed.
	("/reprocess", (.errorHandler(.handleReprocess)))
manual: populate-stdlib inserts all modules of the Go standard library into the tasks queue to be processed and inserted into the database. handlePopulateStdLib should be updated whenever a new version of Go is released. see the comments on duplicate tasks for "/requeue", above.
	("/populate-stdlib", (.errorHandler(.handlePopulateStdLib)))
manual: populate-search-documents repopulates every row in the search_documents table that was last updated before the time in the "before" query parameter.
	("/repopulate-search-documents", (.errorHandler(.handleRepopulateSearchDocuments)))
manual: clear-cache clears the redis cache.
	("/clear-cache", (.errorHandler(.clearCache)))
manual: delete the specified module version.
	("/delete/", http.StripPrefix("/delete", (.errorHandler(.handleDelete))))
scheduled ("limit" query param): clean some eligible module versions selected from the DB manual ("module" query param): clean all versions of a given module.
	("/clean", (.errorHandler(.handleClean)))

	("/static/", http.StripPrefix("/static/", http.FileServer(http.Dir(.staticPath.String()))))
returns an HTML page displaying information about recent versions that were processed.
	("/versions", http.HandlerFunc(.handleHTMLPage(.doVersionsPage)))
Health check.
	("/healthz", http.HandlerFunc(.handleHealthCheck))

	("/favicon.ico", http.HandlerFunc(func( http.ResponseWriter,  *http.Request) {
		http.ServeFile(, , "content/static/img/worker-favicon.ico")
	}))
returns an HTML page displaying the homepage.
handleUpdateImportedByCount updates imported_by_count for all packages.
func ( *Server) ( http.ResponseWriter,  *http.Request) error {
	,  := .db.UpdateSearchDocumentsImportedByCount(.Context())
	if  != nil {
		return 
	}
	fmt.Fprintf(, "updated %d packages", )
	return nil
}
handleRepopulateSearchDocuments repopulates every row in the search_documents table that was last updated before the given time.
func ( *Server) ( http.ResponseWriter,  *http.Request) error {
	 := parseLimitParam(, 100)
	 := .FormValue("before")
	if  == "" {
		return &serverError{
			http.StatusBadRequest,
			errors.New("must provide 'before' query param as an RFC3339 datetime"),
		}
	}
	,  := time.Parse(time.RFC3339, )
	if  != nil {
		return &serverError{http.StatusBadRequest, }
	}

	 := .Context()
	log.Infof(, "Repopulating search documents for %d packages", )
	,  := .db.GetPackagesForSearchDocumentUpsert(, , )
	if  != nil {
		return 
	}

	for ,  := range  {
		if  := postgres.UpsertSearchDocument(, .db.Underlying(), );  != nil {
			return 
		}
	}
	return nil
}
handleFetch executes a fetch request and returns a http.StatusOK if the status is not http.StatusInternalServerError, so that the task queue does not retry fetching module versions that have a terminal error.
func ( *Server) ( http.ResponseWriter,  *http.Request) {
	if .URL.Path == "/" {
		.Header().Set("Content-Type", "text/html; charset=utf-8")
		fmt.Fprintf(, "<h1>Hello, Go Discovery Fetch Service!</h1>")
		fmt.Fprintf(, `<p><a href="/fetch/rsc.io/quote/@v/v1.0.0">Fetch an example module</a></p>`)
		return
	}
	,  := .doFetch(, )
	if  == http.StatusInternalServerError ||  == http.StatusServiceUnavailable {
		log.Infof(.Context(), "doFetch of %s returned %d; returning that code to retry task", .URL.Path, )
		http.Error(, http.StatusText(), )
		return
	}
	if /100 != 2 {
		log.Infof(.Context(), "doFetch of %s returned code %d; returning OK to avoid retry", .URL.Path, )
	}
	.Header().Set("Content-Type", "text/plain; charset=utf-8")
	if /100 == 2 {
		log.Info(.Context(), )
		fmt.Fprintln(, )
	}
	fmt.Fprintln(, http.StatusText())
}
doFetch executes a fetch request and returns the msg and status.
func ( *Server) ( http.ResponseWriter,  *http.Request) (string, int) {
	 := .Context()
	, ,  := parseModulePathAndVersion(.URL.Path)
	if  != nil {
		return .Error(), http.StatusBadRequest
	}

	 := &Fetcher{
		ProxyClient:  .proxyClient.WithZipCache(),
		SourceClient: .sourceClient,
		DB:           .db,
		Cache:        .cache,
	}
	if .FormValue(queue.DisableProxyFetchParam) == queue.DisableProxyFetchValue {
		.ProxyClient = .ProxyClient.WithFetchDisabled()
	}
	, ,  := .FetchAndUpdateState(, , , .cfg.AppVersionLabel())
	if  == http.StatusInternalServerError {
		.reportError(, , , )
		return .Error(), 
	}
	return fmt.Sprintf("fetched and updated %s@%s", , ), 
}
reportError sends the error to the GCP Error Reporting service. TODO(jba): factor out from here and frontend/server.go.
func ( *Server) ( context.Context,  error,  http.ResponseWriter,  *http.Request) {
	if .reportingClient == nil {
		return
Extract the stack trace from the error if there is one.
	var  []byte
	if  := (*derrors.StackError)(nil); errors.As(, &) {
		 = .Stack
	}
	.reportingClient.Report(errorreporting.Entry{
		Error: ,
		Req:   ,
		Stack: ,
	})
Bypass the error-reporting middleware.
parseModulePathAndVersion returns the module and version specified by p. p is assumed to have either of the following two structures: - <module>/@v/<version> - <module>/@latest (this is symmetric with the proxy url scheme)
func ( string) (string, string, error) {
	 := strings.TrimPrefix(, "/")
	if strings.HasSuffix(, "/@latest") {
		 := strings.TrimSuffix(, "/@latest")
		if  == "" {
			return "", "", fmt.Errorf("invalid module path: %q", )
		}
		return , internal.LatestVersion, nil
	}
	 := strings.Split(, "/@v/")
	if len() != 2 {
		return "", "", fmt.Errorf("invalid path: %q", )
	}
	if [0] == "" || [1] == "" {
		return "", "", fmt.Errorf("invalid path: %q", )
	}
	return [0], [1], nil
}

func ( *Server) ( http.ResponseWriter,  *http.Request) ( error) {
	defer derrors.Wrap(&, "handlePollIndex(%q)", .URL.Path)
	 := .Context()
	 := parseLimitParam(, 10)
	,  := .db.LatestIndexTimestamp()
	if  != nil {
		return 
	}
	,  := .indexClient.GetVersions(, , )
	if  != nil {
		return 
	}
	if  := .db.InsertIndexVersions(, );  != nil {
		return 
	}
	log.Infof(, "Inserted %d modules from the index", len())
	.computeProcessingLag()
	return nil
}

func ( *Server) ( context.Context) {
	,  := .db.StalenessTimestamp()
	if errors.Is(, derrors.NotFound) {
		recordProcessingLag(, 0)
	} else if  != nil {
		log.Warningf(, "StalenessTimestamp: %v", )
		return
If the times on this machine and the machine that wrote the index timestamp into the DB are out of sync, then the difference we compute here will be off. But that is unlikely since both machines are running on GCP.
handleEnqueue queries the module_version_states table for the next batch of module versions to process, and enqueues them for processing. Note that this may cause duplicate processing.
func ( *Server) ( http.ResponseWriter,  *http.Request) ( error) {
	defer derrors.Wrap(&, "handleEnqueue(%q)", .URL.Path)
	 := .Context()
	 := parseLimitParam(, 10)
	 := .FormValue("suffix") // append to task name to avoid deduplication
	 := trace.FromContext(.Context())
	.Annotate([]trace.Attribute{trace.Int64Attribute("limit", int64())}, "processed limit")
	,  := .db.GetNextModulesToFetch(, )
	if  != nil {
		return 
	}

	.Annotate([]trace.Attribute{trace.Int64Attribute("modules to fetch", int64(len()))}, "processed limit")
	.Header().Set("Content-Type", "text/plain")
	log.Infof(, "Scheduling modules to be fetched: queuing %d modules", len())
Enqueue concurrently, because sequentially takes a while.
	const  = 10
	var (
		                 sync.Mutex
		,  int
	)
	 := make(chan struct{}, )
	for ,  := range  {
		 := 
		 <- struct{}{}
		go func() {
			defer func() { <- }()
			,  := .queue.ScheduleFetch(, .ModulePath, .Version, ,
				shouldDisableProxyFetch())
			.Lock()
			if  != nil {
				log.Errorf(, "enqueuing: %v", )
				++
			} else if  {
				++
				recordEnqueue(.Context(), .Status)
			}
			.Unlock()
		}()
Wait for goroutines to finish.
	for  := 0;  < ; ++ {
		 <- struct{}{}
	}
	log.Infof(, "Successfully scheduled modules to be fetched: %d modules enqueued, %d errors", , )
	return nil
}

Don't ask the proxy to fetch if this module is being reprocessed. We use codes 52x and 54x for reprocessing.
	return .Status/10 == 52 || .Status/10 == 54
}
handleHTMLPage returns an HTML page using a template from s.templates.
func ( *Server) ( func( http.ResponseWriter,  *http.Request) error) http.HandlerFunc {
	return func( http.ResponseWriter,  *http.Request) {
		if  := (, );  != nil {
			log.Errorf(.Context(), "handleHTMLPage", )
			http.Error(, http.StatusText(http.StatusInternalServerError), http.StatusInternalServerError)
		}
	}
}

func ( *Server) ( http.ResponseWriter,  *http.Request) error {
	, , ,  := stdlib.Zip("master")
	if  != nil {
		return 
	}
	,  := .db.GetVersionMap(.Context(), stdlib.ModulePath, "master")
	if  != nil {
		return 
	}
	if .ResolvedVersion !=  {
		if ,  := .queue.ScheduleFetch(.Context(), stdlib.ModulePath, "master", "", false);  != nil {
			return fmt.Errorf("error scheduling fetch for %s: %w", "master", )
		}
	}
	return .db.DeletePseudoversionsExcept(.Context(), stdlib.ModulePath, .ResolvedVersion)
}

func ( *Server) ( http.ResponseWriter,  *http.Request) error {
	,  := .doPopulateStdLib(.Context(), .FormValue("suffix"))
	.Header().Set("Content-Type", "text/plain; charset=utf-8")
	if  != nil {
		return fmt.Errorf("handlePopulateStdLib: %v", )
	}
	log.Infof(.Context(), "handlePopulateStdLib: %s", )
	_, _ = io.WriteString(, )
	return nil
}

func ( *Server) ( context.Context,  string) (string, error) {
	,  := stdlib.Versions()
	if  != nil {
		return "", 
	}
	for ,  := range  {
		if ,  := .queue.ScheduleFetch(, stdlib.ModulePath, , , false);  != nil {
			return "", fmt.Errorf("error scheduling fetch for %s: %w", , )
		}
	}
	return fmt.Sprintf("Scheduling modules to be fetched: %s.\n", strings.Join(, ", ")), nil
}

func ( *Server) ( http.ResponseWriter,  *http.Request) error {
	 := .FormValue("app_version")
	if  == "" {
		return &serverError{http.StatusBadRequest, errors.New("app_version was not specified")}
	}
	if  := config.ValidateAppVersion();  != nil {
		return &serverError{http.StatusBadRequest, fmt.Errorf("config.ValidateAppVersion(%q): %v", , )}
	}
Reprocess only the latest version of a module version with a previous status of 200 or 290.
	 := .FormValue("latest_only") == "true"
	if  {
		if  := .db.UpdateModuleVersionStatesForReprocessingLatestOnly(.Context(), );  != nil {
			return 
		}
		fmt.Fprintf(, "Scheduled latest version of modules to be reprocessed for appVersion > %q.", )
		return nil
	}
Reprocess only module versions with the given status code.
	 := .FormValue("status")
	if  != "" {
		,  := strconv.Atoi()
		if  != nil {
			return &serverError{http.StatusBadRequest, fmt.Errorf("status is invalid: %q", )}
		}
		if  := .db.UpdateModuleVersionStatesWithStatus(.Context(), , );  != nil {
			return 
		}
		fmt.Fprintf(, "Scheduled modules to be reprocessed for appVersion > %q and status = %d.", , )
		return nil
	}
Reprocess only versions with version type release and status of 200 or 290.
	 := .FormValue("release_only") == "true"
	if  {
		if  := .db.UpdateModuleVersionStatesForReprocessingReleaseVersionsOnly(.Context(), );  != nil {
			return 
		}
		fmt.Fprintf(, "Scheduled release and non-incompatible version of modules to be reprocessed for appVersion > %q.", )
		return nil
	}
Reprocess all module versions in module_version_states.
	if  := .db.UpdateModuleVersionStatesForReprocessing(.Context(), );  != nil {
		return 
	}
	fmt.Fprintf(, "Scheduled modules to be reprocessed for appVersion > %q.", )
	return nil
}

func ( *Server) ( http.ResponseWriter,  *http.Request) error {
	if .cache == nil {
		return errors.New("redis cache client is not configured")
	}
	if  := .cache.Clear(.Context());  != nil {
		return 
	}
	fmt.Fprint(, "Cache cleared.")
	return nil
}
handleDelete deletes the specified module version.
func ( *Server) ( http.ResponseWriter,  *http.Request) error {
	, ,  := parseModulePathAndVersion(.URL.Path)
	if  != nil {
		return &serverError{http.StatusBadRequest, }
	}
	if  := .db.DeleteModule(.Context(), , );  != nil {
		return &serverError{http.StatusInternalServerError, }
	}
	fmt.Fprintf(, "Deleted %s@%s", , )
	return nil
}
Consider a module version for cleaning only if it is older than this.
const cleanDays = 7
handleClean handles a request to clean module versions. If the request has a 'limit' query parameter, then up to that many module versions are selected from the DB among those eligible for cleaning, and they are cleaned. If the request has a 'module' query parameter, all versions of that module path are cleaned. It is an error if neither or both query parameters are provided.
func ( *Server) ( http.ResponseWriter,  *http.Request) ( error) {
	defer derrors.Wrap(&, "handleClean")
	 := .Context()

	 := .FormValue("limit")
	 := .FormValue("module")
	switch {
	case  == "" &&  == "":
		return errors.New("need 'limit' or 'module' query param")

	case  != "" &&  != "":
		return errors.New("need exactly one of 'limit' or 'module' query param")

	case  != "":
		,  := .db.GetModuleVersionsToClean(, cleanDays, parseLimitParam(, 1000))
		if  != nil {
			return 
		}
		log.Infof(, "cleaning %d modules", len())
		if  := .db.CleanModuleVersions(, , "Bulk deleted via /clean endpoint");  != nil {
			return 
		}
		fmt.Fprintf(, "Cleaned %d module versions.\n", len())
		return nil

	default: // module != ""
		log.Infof(, "cleaning module %q", )
		if  := .db.CleanModule(, , "Manually deleted via /clean endpoint");  != nil {
			return 
		}
		fmt.Fprintf(, "Cleaned module %q\n", )
		return nil
	}
}

func ( *Server) ( http.ResponseWriter,  *http.Request) {
	if  := .db.Underlying().Ping();  != nil {
		http.Error(, fmt.Sprintf("DB ping failed: %v", ), http.StatusInternalServerError)
		return
	}
	fmt.Fprintln(, "OK")
}
Parse the template for the status page.
func (,  template.TrustedSource) (*template.Template, error) {
	if .String() == "" {
		return nil, nil
	}
	 := template.TrustedSourceJoin(, template.TrustedSourceFromConstant("html/worker"), )
	return template.New(.String()).Funcs(template.FuncMap{
		"truncate":  truncate,
		"timefmt":   formatTime,
		"bytesToMi": bytesToMi,
		"pct":       percentage,
		"timeSince": func( time.Time) time.Duration {
			return time.Since().Round(time.Second)
		},
		"timeSub": func(,  time.Time) time.Duration {
			return .Sub().Round(time.Second)
		},
	}).ParseFilesFromTrustedSources()
}

func ( int,  *string) *string {

	if  == nil {
		return nil
	}
	if len(*) <=  {
		return 
	}
	 := (*)[:] + "..."
	return &
}

var locNewYork *time.Location

func () {
	var  error
	locNewYork,  = time.LoadLocation("America/New_York")
	if  != nil {
		log.Fatalf(context.Background(), "time.LoadLocation: %v", )
	}
}

func ( *time.Time) string {
	if  == nil {
		return "Never"
	}
	return .In(locNewYork).Format("2006-01-02 15:04:05")
}
bytesToMi converts an integral value of bytes into mebibytes.
func ( uint64) uint64 {
	return  / (1024 * 1024)
}
percentage computes the truncated percentage of x/y. It returns 0 if y is 0. x and y can be any int or uint type.
func (,  interface{}) int {
	 := toUint64()
	if  == 0 {
		return 0
	}
	return int(toUint64() * 100 / )
}

func ( interface{}) uint64 {
	 := reflect.ValueOf()
	switch .Kind() {
	case reflect.Int, reflect.Int8, reflect.Int16, reflect.Int32, reflect.Int64:
		return uint64(.Int())
	default: // assume uint
		return .Uint()
	}
}
parseLimitParam parses the query parameter "limit" as an integer. If the parameter is missing or there is a parse error, it is logged and the default value is returned.
func ( *http.Request,  int) int {
	const  = "limit"
	 := .FormValue()
	if  == "" {
		return 
	}
	,  := strconv.Atoi()
	if  != nil {
		log.Errorf(.Context(), "parsing query parameter %q: %v", , )
		return 
	}
	return 
}

type serverError struct {
	status int   // HTTP status code
	err    error // wrapped error
}

func ( *serverError) () string {
	return fmt.Sprintf("%d (%s): %v", .status, http.StatusText(.status), .err)
}
errorHandler converts a function that returns an error into an http.HandlerFunc.
func ( *Server) ( func( http.ResponseWriter,  *http.Request) error) http.HandlerFunc {
	return func( http.ResponseWriter,  *http.Request) {
		if  := (, );  != nil {
			.serveError(, , )
		}
	}
}

func ( *Server) ( http.ResponseWriter,  *http.Request,  error) {
	 := .Context()
	,  := .(*serverError)
	if ! {
		 = &serverError{status: http.StatusInternalServerError, err: }
	}
	if .status == http.StatusInternalServerError {
		log.Error(, .err)
		.reportError(, , , )
	} else {
		log.Infof(, "returning %d (%s) for error %v", .status, http.StatusText(.status), )
	}
	http.Error(, .err.Error(), .status)