Copyright 2019 The Go Authors. All rights reserved. Use of this source code is governed by a BSD-style license that can be found in the LICENSE file.
The worker command runs a service with the primary job of fetching modules from a proxy and writing them to the database.
package main

import (
	
	
	
	
	
	
	

	
	
	
	_  // for pgx driver
	
	
	
	
	
	
	
	
	
	
	
	
)

var (
	timeout   = config.GetEnvInt("GO_DISCOVERY_WORKER_TIMEOUT_MINUTES", 10)
	queueName = config.GetEnv("GO_DISCOVERY_WORKER_TASK_QUEUE", "")
flag used in call to safehtml/template.TrustedSourceFromFlag
	_                  = flag.String("static", "content/static", "path to folder containing static files served")
	bypassLicenseCheck = flag.Bool("bypass_license_check", false, "insert all data into the DB, even for non-redistributable paths")
)

func () {
	flag.Parse()

	 := context.Background()

	,  := config.Init()
	if  != nil {
		log.Fatal(, )
	}
	.Dump(os.Stdout)

	log.SetLevel(.LogLevel)

	if .UseProfiler {
		if  := profiler.Start(profiler.Config{});  != nil {
			log.Fatalf(, "profiler.Start: %v", )
		}
	}

	readProxyRemoved()

	,  := cmdconfig.OpenDB(, , *bypassLicenseCheck)
	if  != nil {
		log.Fatalf(, "%v", )
	}
	defer .Close()

	populateExcluded(, )

	,  := index.New(.IndexURL)
	if  != nil {
		log.Fatal(, )
	}
	,  := proxy.New(.ProxyURL)
	if  != nil {
		log.Fatal(, )
	}
	 := source.NewClient(config.SourceTimeout)
	 := cmdconfig.ExperimentGetter(, )
	,  := queue.New(, , queueName, *workers, ,
		func( context.Context, ,  string) (int, error) {
			 := &worker.Fetcher{
				ProxyClient:  ,
				SourceClient: ,
				DB:           ,
			}
			, ,  := .FetchAndUpdateState(, , , .AppVersionLabel())
			return , 
		})
	if  != nil {
		log.Fatalf(, "queue.New: %v", )
	}

	 := cmdconfig.ReportingClient(, )
	 := getHARedis(, )
	 := getCacheRedis(, )
	 := cmdconfig.Experimenter(, , , )
	,  := worker.NewServer(, worker.ServerConfig{
		DB:               ,
		IndexClient:      ,
		ProxyClient:      ,
		SourceClient:     ,
		RedisHAClient:    ,
		RedisCacheClient: ,
		Queue:            ,
		ReportingClient:  ,
		StaticPath:       template.TrustedSourceFromFlag(flag.Lookup("static").Value),
		GetExperiments:   .Experiments,
	})
	if  != nil {
		log.Fatal(, )
	}
	 := dcensus.NewRouter(nil)
	.Install(.Handle)

	 := append(dcensus.ServerViews,
		worker.EnqueueResponseCount,
		worker.ProcessingLag,
		fetch.FetchLatencyDistribution,
		fetch.FetchResponseCount,
		fetch.SheddedFetchCount,
		fetch.FetchPackageCount)
	if  := dcensus.Init(, ...);  != nil {
		log.Fatal(, )
We are not currently forwarding any ports on AppEngine, so serving debug information is broken.
	if !.OnAppEngine() {
		,  := dcensus.NewServer()
		if  != nil {
			log.Fatal(, )
		}
		go http.ListenAndServe(.DebugAddr("localhost:8001"), )
	}

	 := middleware.Identity()
	if  := os.Getenv("GO_DISCOVERY_IAP_AUDIENCE");  != "" {
		 = middleware.ValidateIAPHeader()
	}

	 := middleware.Chain(
		middleware.RequestLog(cmdconfig.Logger(, , "worker-log")),
		middleware.Timeout(time.Duration(timeout)*time.Minute),
		,
		middleware.Experiment(),
	)
	http.Handle("/", ())

	 := .HostAddr("localhost:8000")
	log.Infof(, "Listening on addr %s", )
	log.Fatal(, http.ListenAndServe(, nil))
}

We update completions with one big pipeline, so we need long write timeouts. ReadTimeout is increased only to be consistent with WriteTimeout.
	return getRedis(, .RedisHAHost, .RedisHAPort, 5*time.Minute, 5*time.Minute)
}

func ( context.Context,  *config.Config) *redis.Client {
	return getRedis(, .RedisCacheHost, .RedisCachePort, 0, 6*time.Second)
}

func ( context.Context, ,  string, ,  time.Duration) *redis.Client {
	if  == "" {
		return nil
	}
	var  time.Duration
	if ,  := .Deadline();  {
		 = time.Until()
	}
	return redis.NewClient(&redis.Options{
		Addr:          + ":" + ,
		DialTimeout:  ,
		WriteTimeout: ,
		ReadTimeout:  ,
	})
}
Read a file of module versions that we should ignore because the are in the index but not stored in the proxy. Format of the file: each line is module@version
func ( context.Context) {
	 := config.GetEnv("GO_DISCOVERY_PROXY_REMOVED", "")
	if  == "" {
		return
	}
	,  := readFileLines()
	if  != nil {
		log.Fatal(, )
	}
	for ,  := range  {
		worker.ProxyRemoved[] = true
	}
	log.Infof(, "read %d excluded module versions from %s", len(worker.ProxyRemoved), )
}
populateExcluded adds each element of excludedPrefixes to the excluded_prefixes table if it isn't already present.
func ( context.Context,  *postgres.DB) {
	 := config.GetEnv("GO_DISCOVERY_EXCLUDED_FILENAME", "")
	if  == "" {
		return
	}
	,  := readFileLines()
	if  != nil {
		log.Fatal(, )
	}
	 := os.Getenv("USER")
	if  == "" {
		 = "worker"
	}
	for ,  := range  {
		var ,  string
		 := strings.IndexAny(, " \t")
		if  >= 0 {
			 = [:]
			 = strings.TrimSpace([+1:])
		}
		if  == "" {
			log.Fatalf(, "missing reason in %s, line %q", , )
		}
		,  := .IsExcluded(, )
		if  != nil {
			log.Fatalf(, "db.IsExcluded(%q): %v", , )
		}
		if ! {
			if  := .InsertExcludedPrefix(, , , );  != nil {
				log.Fatalf(, "db.InsertExcludedPrefix(%q, %q, %q): %v", , , , )
			}
		}
	}
}
readFileLines reads filename and returns its lines, trimmed of whitespace. Blank lines and lines whose first non-blank character is '#' are omitted.
func ( string) ([]string, error) {
	var  []string
	,  := os.Open()
	if  != nil {
		return nil, 
	}
	defer .Close()
	 := bufio.NewScanner()
	for .Scan() {
		 := strings.TrimSpace(.Text())
		if  == "" || [0] == '#' {
			continue
		}
		 = append(, )
	}
	if  := .Err();  != nil {
		return nil, 
	}
	return , nil