Source File
server.go
Belonging Package
golang.org/x/pkgsite/internal/worker
package worker
import (
)
type Server struct {
cfg *config.Config
indexClient *index.Client
proxyClient *proxy.Client
sourceClient *source.Client
redisHAClient *redis.Client
cache *cache.Cache
db *postgres.DB
queue queue.Queue
reportingClient *errorreporting.Client
templates map[string]*template.Template
staticPath template.TrustedSource
getExperiments func() []*internal.Experiment
}
type ServerConfig struct {
DB *postgres.DB
IndexClient *index.Client
ProxyClient *proxy.Client
SourceClient *source.Client
RedisHAClient *redis.Client
RedisCacheClient *redis.Client
Queue queue.Queue
ReportingClient *errorreporting.Client
StaticPath template.TrustedSource
GetExperiments func() []*internal.Experiment
}
const (
indexTemplate = "index.tmpl"
versionsTemplate = "versions.tmpl"
)
func ( *config.Config, ServerConfig) ( *Server, error) {
defer derrors.Wrap(&, "NewServer(db, %+v)", )
, := parseTemplate(.StaticPath, template.TrustedSourceFromConstant(indexTemplate))
if != nil {
return nil,
}
, := parseTemplate(.StaticPath, template.TrustedSourceFromConstant(versionsTemplate))
if != nil {
return nil,
}
dochtml.LoadTemplates(template.TrustedSourceJoin(.StaticPath, template.TrustedSourceFromConstant("html/doc")))
:= map[string]*template.Template{
indexTemplate: ,
versionsTemplate: ,
}
var *cache.Cache
if .RedisCacheClient != nil {
= cache.New(.RedisCacheClient)
}
return &Server{
cfg: ,
db: .DB,
indexClient: .IndexClient,
proxyClient: .ProxyClient,
sourceClient: .SourceClient,
redisHAClient: .RedisHAClient,
cache: ,
queue: .Queue,
reportingClient: .ReportingClient,
templates: ,
staticPath: .StaticPath,
getExperiments: .GetExperiments,
}, nil
}
:= middleware.Identity()
if .reportingClient != nil {
= middleware.ErrorReporting(.reportingClient.Report)
}
("/poll", (.errorHandler(.handlePollIndex)))
("/update-imported-by-count", (.errorHandler(.handleUpdateImportedByCount)))
("/fetch/", http.StripPrefix("/fetch", (http.HandlerFunc(.handleFetch))))
("/fetch-std-master/", (.errorHandler(.handleFetchStdMaster)))
("/enqueue", (.errorHandler(.handleEnqueue)))
("/requeue", (.errorHandler(.handleEnqueue)))
("/reprocess", (.errorHandler(.handleReprocess)))
("/populate-stdlib", (.errorHandler(.handlePopulateStdLib)))
("/repopulate-search-documents", (.errorHandler(.handleRepopulateSearchDocuments)))
("/clear-cache", (.errorHandler(.clearCache)))
("/delete/", http.StripPrefix("/delete", (.errorHandler(.handleDelete))))
("/clean", (.errorHandler(.handleClean)))
("/static/", http.StripPrefix("/static/", http.FileServer(http.Dir(.staticPath.String()))))
("/versions", http.HandlerFunc(.handleHTMLPage(.doVersionsPage)))
("/healthz", http.HandlerFunc(.handleHealthCheck))
("/favicon.ico", http.HandlerFunc(func( http.ResponseWriter, *http.Request) {
http.ServeFile(, , "content/static/img/worker-favicon.ico")
}))
("/", http.HandlerFunc(.handleHTMLPage(.doIndexPage)))
}
func ( *Server) ( http.ResponseWriter, *http.Request) error {
:= parseLimitParam(, 100)
:= .FormValue("before")
if == "" {
return &serverError{
http.StatusBadRequest,
errors.New("must provide 'before' query param as an RFC3339 datetime"),
}
}
, := time.Parse(time.RFC3339, )
if != nil {
return &serverError{http.StatusBadRequest, }
}
:= .Context()
log.Infof(, "Repopulating search documents for %d packages", )
, := .db.GetPackagesForSearchDocumentUpsert(, , )
if != nil {
return
}
for , := range {
if := postgres.UpsertSearchDocument(, .db.Underlying(), ); != nil {
return
}
}
return nil
}
func ( *Server) ( http.ResponseWriter, *http.Request) {
if .URL.Path == "/" {
.Header().Set("Content-Type", "text/html; charset=utf-8")
fmt.Fprintf(, "<h1>Hello, Go Discovery Fetch Service!</h1>")
fmt.Fprintf(, `<p><a href="/fetch/rsc.io/quote/@v/v1.0.0">Fetch an example module</a></p>`)
return
}
, := .doFetch(, )
if == http.StatusInternalServerError || == http.StatusServiceUnavailable {
log.Infof(.Context(), "doFetch of %s returned %d; returning that code to retry task", .URL.Path, )
http.Error(, http.StatusText(), )
return
}
if /100 != 2 {
log.Infof(.Context(), "doFetch of %s returned code %d; returning OK to avoid retry", .URL.Path, )
}
.Header().Set("Content-Type", "text/plain; charset=utf-8")
if /100 == 2 {
log.Info(.Context(), )
fmt.Fprintln(, )
}
fmt.Fprintln(, http.StatusText())
}
func ( *Server) ( http.ResponseWriter, *http.Request) (string, int) {
:= .Context()
, , := parseModulePathAndVersion(.URL.Path)
if != nil {
return .Error(), http.StatusBadRequest
}
:= &Fetcher{
ProxyClient: .proxyClient.WithZipCache(),
SourceClient: .sourceClient,
DB: .db,
Cache: .cache,
}
if .FormValue(queue.DisableProxyFetchParam) == queue.DisableProxyFetchValue {
.ProxyClient = .ProxyClient.WithFetchDisabled()
}
, , := .FetchAndUpdateState(, , , .cfg.AppVersionLabel())
if == http.StatusInternalServerError {
.reportError(, , , )
return .Error(),
}
return fmt.Sprintf("fetched and updated %s@%s", , ),
}
func ( *Server) ( context.Context, error, http.ResponseWriter, *http.Request) {
if .reportingClient == nil {
return
var []byte
if := (*derrors.StackError)(nil); errors.As(, &) {
= .Stack
}
.reportingClient.Report(errorreporting.Entry{
Error: ,
Req: ,
Stack: ,
})
.Header().Set(config.BypassErrorReportingHeader, "true")
}
func ( string) (string, string, error) {
:= strings.TrimPrefix(, "/")
if strings.HasSuffix(, "/@latest") {
:= strings.TrimSuffix(, "/@latest")
if == "" {
return "", "", fmt.Errorf("invalid module path: %q", )
}
return , internal.LatestVersion, nil
}
:= strings.Split(, "/@v/")
if len() != 2 {
return "", "", fmt.Errorf("invalid path: %q", )
}
if [0] == "" || [1] == "" {
return "", "", fmt.Errorf("invalid path: %q", )
}
return [0], [1], nil
}
func ( *Server) ( http.ResponseWriter, *http.Request) ( error) {
defer derrors.Wrap(&, "handlePollIndex(%q)", .URL.Path)
:= .Context()
:= parseLimitParam(, 10)
, := .db.LatestIndexTimestamp()
if != nil {
return
}
, := .indexClient.GetVersions(, , )
if != nil {
return
}
if := .db.InsertIndexVersions(, ); != nil {
return
}
log.Infof(, "Inserted %d modules from the index", len())
.computeProcessingLag()
return nil
}
func ( *Server) ( context.Context) {
, := .db.StalenessTimestamp()
if errors.Is(, derrors.NotFound) {
recordProcessingLag(, 0)
} else if != nil {
log.Warningf(, "StalenessTimestamp: %v", )
return
recordProcessingLag(, time.Since())
}
}
func ( *Server) ( http.ResponseWriter, *http.Request) ( error) {
defer derrors.Wrap(&, "handleEnqueue(%q)", .URL.Path)
:= .Context()
:= parseLimitParam(, 10)
:= .FormValue("suffix") // append to task name to avoid deduplication
:= trace.FromContext(.Context())
.Annotate([]trace.Attribute{trace.Int64Attribute("limit", int64())}, "processed limit")
, := .db.GetNextModulesToFetch(, )
if != nil {
return
}
.Annotate([]trace.Attribute{trace.Int64Attribute("modules to fetch", int64(len()))}, "processed limit")
.Header().Set("Content-Type", "text/plain")
log.Infof(, "Scheduling modules to be fetched: queuing %d modules", len())
const = 10
var (
sync.Mutex
, int
)
:= make(chan struct{}, )
for , := range {
:=
<- struct{}{}
go func() {
defer func() { <- }()
, := .queue.ScheduleFetch(, .ModulePath, .Version, ,
shouldDisableProxyFetch())
.Lock()
if != nil {
log.Errorf(, "enqueuing: %v", )
++
} else if {
++
recordEnqueue(.Context(), .Status)
}
.Unlock()
}()
func ( *Server) ( func( http.ResponseWriter, *http.Request) error) http.HandlerFunc {
return func( http.ResponseWriter, *http.Request) {
if := (, ); != nil {
log.Errorf(.Context(), "handleHTMLPage", )
http.Error(, http.StatusText(http.StatusInternalServerError), http.StatusInternalServerError)
}
}
}
func ( *Server) ( http.ResponseWriter, *http.Request) error {
, , , := stdlib.Zip("master")
if != nil {
return
}
, := .db.GetVersionMap(.Context(), stdlib.ModulePath, "master")
if != nil {
return
}
if .ResolvedVersion != {
if , := .queue.ScheduleFetch(.Context(), stdlib.ModulePath, "master", "", false); != nil {
return fmt.Errorf("error scheduling fetch for %s: %w", "master", )
}
}
return .db.DeletePseudoversionsExcept(.Context(), stdlib.ModulePath, .ResolvedVersion)
}
func ( *Server) ( http.ResponseWriter, *http.Request) error {
, := .doPopulateStdLib(.Context(), .FormValue("suffix"))
.Header().Set("Content-Type", "text/plain; charset=utf-8")
if != nil {
return fmt.Errorf("handlePopulateStdLib: %v", )
}
log.Infof(.Context(), "handlePopulateStdLib: %s", )
_, _ = io.WriteString(, )
return nil
}
func ( *Server) ( context.Context, string) (string, error) {
, := stdlib.Versions()
if != nil {
return "",
}
for , := range {
if , := .queue.ScheduleFetch(, stdlib.ModulePath, , , false); != nil {
return "", fmt.Errorf("error scheduling fetch for %s: %w", , )
}
}
return fmt.Sprintf("Scheduling modules to be fetched: %s.\n", strings.Join(, ", ")), nil
}
func ( *Server) ( http.ResponseWriter, *http.Request) error {
:= .FormValue("app_version")
if == "" {
return &serverError{http.StatusBadRequest, errors.New("app_version was not specified")}
}
if := config.ValidateAppVersion(); != nil {
return &serverError{http.StatusBadRequest, fmt.Errorf("config.ValidateAppVersion(%q): %v", , )}
}
:= .FormValue("status")
if != "" {
, := strconv.Atoi()
if != nil {
return &serverError{http.StatusBadRequest, fmt.Errorf("status is invalid: %q", )}
}
if := .db.UpdateModuleVersionStatesWithStatus(.Context(), , ); != nil {
return
}
fmt.Fprintf(, "Scheduled modules to be reprocessed for appVersion > %q and status = %d.", , )
return nil
}
if := .db.UpdateModuleVersionStatesForReprocessing(.Context(), ); != nil {
return
}
fmt.Fprintf(, "Scheduled modules to be reprocessed for appVersion > %q.", )
return nil
}
func ( *Server) ( http.ResponseWriter, *http.Request) error {
if .cache == nil {
return errors.New("redis cache client is not configured")
}
if := .cache.Clear(.Context()); != nil {
return
}
fmt.Fprint(, "Cache cleared.")
return nil
}
func ( *Server) ( http.ResponseWriter, *http.Request) error {
, , := parseModulePathAndVersion(.URL.Path)
if != nil {
return &serverError{http.StatusBadRequest, }
}
if := .db.DeleteModule(.Context(), , ); != nil {
return &serverError{http.StatusInternalServerError, }
}
fmt.Fprintf(, "Deleted %s@%s", , )
return nil
}
const cleanDays = 7
func ( *Server) ( http.ResponseWriter, *http.Request) ( error) {
defer derrors.Wrap(&, "handleClean")
:= .Context()
:= .FormValue("limit")
:= .FormValue("module")
switch {
case == "" && == "":
return errors.New("need 'limit' or 'module' query param")
case != "" && != "":
return errors.New("need exactly one of 'limit' or 'module' query param")
case != "":
, := .db.GetModuleVersionsToClean(, cleanDays, parseLimitParam(, 1000))
if != nil {
return
}
log.Infof(, "cleaning %d modules", len())
if := .db.CleanModuleVersions(, , "Bulk deleted via /clean endpoint"); != nil {
return
}
fmt.Fprintf(, "Cleaned %d module versions.\n", len())
return nil
default: // module != ""
log.Infof(, "cleaning module %q", )
if := .db.CleanModule(, , "Manually deleted via /clean endpoint"); != nil {
return
}
fmt.Fprintf(, "Cleaned module %q\n", )
return nil
}
}
func ( *Server) ( http.ResponseWriter, *http.Request) {
if := .db.Underlying().Ping(); != nil {
http.Error(, fmt.Sprintf("DB ping failed: %v", ), http.StatusInternalServerError)
return
}
fmt.Fprintln(, "OK")
}
func (, template.TrustedSource) (*template.Template, error) {
if .String() == "" {
return nil, nil
}
:= template.TrustedSourceJoin(, template.TrustedSourceFromConstant("html/worker"), )
return template.New(.String()).Funcs(template.FuncMap{
"truncate": truncate,
"timefmt": formatTime,
"bytesToMi": bytesToMi,
"pct": percentage,
"timeSince": func( time.Time) time.Duration {
return time.Since().Round(time.Second)
},
"timeSub": func(, time.Time) time.Duration {
return .Sub().Round(time.Second)
},
}).ParseFilesFromTrustedSources()
}
func ( int, *string) *string {
if == nil {
return nil
}
if len(*) <= {
return
}
:= (*)[:] + "..."
return &
}
var locNewYork *time.Location
func () {
var error
locNewYork, = time.LoadLocation("America/New_York")
if != nil {
log.Fatalf(context.Background(), "time.LoadLocation: %v", )
}
}
func ( *time.Time) string {
if == nil {
return "Never"
}
return .In(locNewYork).Format("2006-01-02 15:04:05")
}
func (, interface{}) int {
:= toUint64()
if == 0 {
return 0
}
return int(toUint64() * 100 / )
}
func ( interface{}) uint64 {
:= reflect.ValueOf()
switch .Kind() {
case reflect.Int, reflect.Int8, reflect.Int16, reflect.Int32, reflect.Int64:
return uint64(.Int())
default: // assume uint
return .Uint()
}
}
func ( *http.Request, int) int {
const = "limit"
:= .FormValue()
if == "" {
return
}
, := strconv.Atoi()
if != nil {
log.Errorf(.Context(), "parsing query parameter %q: %v", , )
return
}
return
}
type serverError struct {
status int // HTTP status code
err error // wrapped error
}
func ( *serverError) () string {
return fmt.Sprintf("%d (%s): %v", .status, http.StatusText(.status), .err)
}
func ( *Server) ( func( http.ResponseWriter, *http.Request) error) http.HandlerFunc {
return func( http.ResponseWriter, *http.Request) {
if := (, ); != nil {
.serveError(, , )
}
}
}
func ( *Server) ( http.ResponseWriter, *http.Request, error) {
:= .Context()
, := .(*serverError)
if ! {
= &serverError{status: http.StatusInternalServerError, err: }
}
if .status == http.StatusInternalServerError {
log.Error(, .err)
.reportError(, , , )
} else {
log.Infof(, "returning %d (%s) for error %v", .status, http.StatusText(.status), )
}
http.Error(, .err.Error(), .status)
![]() |
The pages are generated with Golds v0.3.2-preview. (GOOS=darwin GOARCH=amd64) Golds is a Go 101 project developed by Tapir Liu. PR and bug reports are welcome and can be submitted to the issue list. Please follow @Go100and1 (reachable from the left QR code) to get the latest news of Golds. |