Source File
queue.go
Belonging Package
golang.org/x/pkgsite/internal/queue
package queue
import (
cloudtasks
taskspb
)
func ( context.Context, *config.Config, string, int, middleware.ExperimentGetter, inMemoryProcessFunc) (Queue, error) {
if !.OnGCP() {
, := ()
if != nil {
return nil,
}
var []string
for , := range {
if .Rollout > 0 {
= append(, .Name)
}
}
return NewInMemory(, , , ), nil
}
, := cloudtasks.NewClient()
if != nil {
return nil,
}
, := newGCP(, , )
if != nil {
return nil,
}
log.Infof(, "enqueuing at %s with queueURL=%q", .queueName, .queueURL)
return , nil
}
func ( *config.Config, *cloudtasks.Client, string) ( *GCP, error) {
defer derrors.Wrap(&, "newGCP(cfg, client, %q)", )
if == "" {
return nil, errors.New("empty queueID")
}
if .ProjectID == "" {
return nil, errors.New("empty ProjectID")
}
if .LocationID == "" {
return nil, errors.New("empty LocationID")
}
if .QueueURL == "" {
return nil, errors.New("empty QueueURL")
}
if .ServiceAccount == "" {
return nil, errors.New("empty ServiceAccount")
}
if .QueueAudience == "" {
return nil, errors.New("empty QueueAudience")
}
return &GCP{
client: ,
queueName: fmt.Sprintf("projects/%s/locations/%s/queues/%s", .ProjectID, .LocationID, ),
queueURL: .QueueURL,
token: &taskspb.HttpRequest_OidcToken{
OidcToken: &taskspb.OidcToken{
ServiceAccountEmail: .ServiceAccount,
Audience: .QueueAudience,
},
},
}, nil
}
, := context.WithTimeout(, 30*time.Second)
defer ()
if == internal.UnknownModulePath {
return false, errors.New("given unknown module path")
}
:= .newTaskRequest(, , , )
= true
if , := .client.CreateTask(, ); != nil {
if status.Code() == codes.AlreadyExists {
log.Debugf(, "ignoring duplicate task ID %s: %s@%s", .Task.Name, , )
= false
} else {
return false, fmt.Errorf("q.client.CreateTask(ctx, req): %v", )
}
}
return , nil
}
const maxCloudTasksTimeout = 30 * time.Minute
const (
DisableProxyFetchParam = "proxyfetch"
DisableProxyFetchValue = "off"
)
func ( *GCP) (, , string, bool) *taskspb.CreateTaskRequest {
:= newTaskID(, )
:= fmt.Sprintf("/fetch/%s/@v/%s", , )
if {
+= fmt.Sprintf("?%s=%s", DisableProxyFetchParam, DisableProxyFetchValue)
}
:= &taskspb.Task{
Name: fmt.Sprintf("%s/tasks/%s", .queueName, ),
DispatchDeadline: ptypes.DurationProto(maxCloudTasksTimeout),
}
.MessageType = &taskspb.Task_HttpRequest{
HttpRequest: &taskspb.HttpRequest{
HttpMethod: taskspb.HttpMethod_POST,
Url: .queueURL + ,
AuthorizationHeader: .token,
},
}
:= &taskspb.CreateTaskRequest{
Parent: .queueName,
Task: ,
:= fnv.New32()
io.WriteString(, )
var strings.Builder
for , := range {
switch {
case >= 'A' && <= 'Z' || >= 'a' && <= 'z' || >= '0' && <= '9' || == '-':
.WriteRune()
case == '_':
.WriteString("__")
case == '/':
.WriteString("_-")
case == '@':
.WriteString("_v")
case == '.':
.WriteString("_o")
default:
fmt.Fprintf(&, "_%04x", )
}
}
return fmt.Sprintf("%04x-%s", , &)
}
type moduleVersion struct {
modulePath, version string
}
type InMemory struct {
queue chan moduleVersion
sem chan struct{}
experiments []string
}
type inMemoryProcessFunc func(context.Context, string, string) (int, error)
go func( moduleVersion) {
defer func() { <-.sem }()
log.Infof(, "Fetch requested: %q %q (workerCount = %d)", .modulePath, .version, cap(.sem))
, := context.WithTimeout(, 5*time.Minute)
= experiment.NewContext(, ...)
defer ()
if , := (, .modulePath, .version); != nil {
log.Error(, )
}
}()
}
}()
return
}
![]() |
The pages are generated with Golds v0.3.2-preview. (GOOS=darwin GOARCH=amd64) Golds is a Go 101 project developed by Tapir Liu. PR and bug reports are welcome and can be submitted to the issue list. Please follow @Go100and1 (reachable from the left QR code) to get the latest news of Golds. |