Copyright 2019 The Go Authors. All rights reserved. Use of this source code is governed by a BSD-style license that can be found in the LICENSE file.
Package queue provides queue implementations that can be used for asynchronous scheduling of fetch actions.
package queue

import (
	
	
	
	
	
	
	
	

	cloudtasks 
	
	
	
	
	
	
	
	taskspb 
	
	
)
A Queue provides an interface for asynchronous scheduling of fetch actions.
type Queue interface {
	ScheduleFetch(ctx context.Context, modulePath, version, suffix string, disableProxyFetch bool) (bool, error)
}
New creates a new Queue with name queueName based on the configuration in cfg. When running locally, Queue uses numWorkers concurrent workers.
func ( context.Context,  *config.Config,  string,  int,  middleware.ExperimentGetter,  inMemoryProcessFunc) (Queue, error) {
	if !.OnGCP() {
		,  := ()
		if  != nil {
			return nil, 
		}
		var  []string
		for ,  := range  {
			if .Rollout > 0 {
				 = append(, .Name)
			}
		}
		return NewInMemory(, , , ), nil
	}

	,  := cloudtasks.NewClient()
	if  != nil {
		return nil, 
	}
	,  := newGCP(, , )
	if  != nil {
		return nil, 
	}
	log.Infof(, "enqueuing at %s with queueURL=%q", .queueName, .queueURL)
	return , nil
}
GCP provides a Queue implementation backed by the Google Cloud Tasks API.
type GCP struct {
	client    *cloudtasks.Client
	queueName string // full GCP name of the queue
token holds information that lets the task queue construct an authorized request to the worker. Since the worker sits behind the IAP, the queue needs an identity token that includes the identity of a service account that has access, and the client ID for the IAP. We use the service account of the current process.
NewGCP returns a new Queue that can be used to enqueue tasks using the cloud tasks API. The given queueID should be the name of the queue in the cloud tasks console.
func ( *config.Config,  *cloudtasks.Client,  string) ( *GCP,  error) {
	defer derrors.Wrap(&, "newGCP(cfg, client, %q)", )
	if  == "" {
		return nil, errors.New("empty queueID")
	}
	if .ProjectID == "" {
		return nil, errors.New("empty ProjectID")
	}
	if .LocationID == "" {
		return nil, errors.New("empty LocationID")
	}
	if .QueueURL == "" {
		return nil, errors.New("empty QueueURL")
	}
	if .ServiceAccount == "" {
		return nil, errors.New("empty ServiceAccount")
	}
	if .QueueAudience == "" {
		return nil, errors.New("empty QueueAudience")
	}
	return &GCP{
		client:    ,
		queueName: fmt.Sprintf("projects/%s/locations/%s/queues/%s", .ProjectID, .LocationID, ),
		queueURL:  .QueueURL,
		token: &taskspb.HttpRequest_OidcToken{
			OidcToken: &taskspb.OidcToken{
				ServiceAccountEmail: .ServiceAccount,
				Audience:            .QueueAudience,
			},
		},
	}, nil
}
ScheduleFetch enqueues a task on GCP to fetch the given modulePath and version. It returns an error if there was an error hashing the task name, or an error pushing the task to GCP. If the task was a duplicate, it returns (false, nil).
func ( *GCP) ( context.Context, , ,  string,  bool) ( bool,  error) {
	defer derrors.WrapStack(&, "queue.ScheduleFetch(%q, %q, %q)", , , )
Cloud Tasks enforces an RPC timeout of at most 30s. I couldn't find this in the documentation, but using a larger value, or no timeout, results in an InvalidArgument error with the text "The deadline cannot be more than 30s in the future."
	,  := context.WithTimeout(, 30*time.Second)
	defer ()

	if  == internal.UnknownModulePath {
		return false, errors.New("given unknown module path")
	}
	 := .newTaskRequest(, , , )
	 = true
	if ,  := .client.CreateTask(, );  != nil {
		if status.Code() == codes.AlreadyExists {
			log.Debugf(, "ignoring duplicate task ID %s: %s@%s", .Task.Name, , )
			 = false
		} else {
			return false, fmt.Errorf("q.client.CreateTask(ctx, req): %v", )
		}
	}
	return , nil
}
Maximum timeout for HTTP tasks. See https://cloud.google.com/tasks/docs/creating-http-target-tasks.
const maxCloudTasksTimeout = 30 * time.Minute

const (
	DisableProxyFetchParam = "proxyfetch"
	DisableProxyFetchValue = "off"
)

func ( *GCP) (, ,  string,  bool) *taskspb.CreateTaskRequest {
	 := newTaskID(, )
	 := fmt.Sprintf("/fetch/%s/@v/%s", , )
	if  {
		 += fmt.Sprintf("?%s=%s", DisableProxyFetchParam, DisableProxyFetchValue)
	}
	 := &taskspb.Task{
		Name:             fmt.Sprintf("%s/tasks/%s", .queueName, ),
		DispatchDeadline: ptypes.DurationProto(maxCloudTasksTimeout),
	}
	.MessageType = &taskspb.Task_HttpRequest{
		HttpRequest: &taskspb.HttpRequest{
			HttpMethod:          taskspb.HttpMethod_POST,
			Url:                 .queueURL + ,
			AuthorizationHeader: .token,
		},
	}
	 := &taskspb.CreateTaskRequest{
		Parent: .queueName,
		Task:   ,
If suffix is non-empty, append it to the task name. This lets us force reprocessing of tasks that would normally be de-duplicated.
	if  != "" {
		.Task.Name += "-" + 
	}
	return 
}
Create a task ID for the given module path and version. Task IDs can contain only letters ([A-Za-z]), numbers ([0-9]), hyphens (-), or underscores (_).
func (,  string) string {
Compute a hash to use as a prefix, so the task IDs are distributed uniformly. See https://cloud.google.com/tasks/docs/reference/rpc/google.cloud.tasks.v2#task under "Task De-duplication".
	 := fnv.New32()
	io.WriteString(, )
Escape the name so it contains only valid characters. Do our best to make it readable.
	var  strings.Builder
	for ,  := range  {
		switch {
		case  >= 'A' &&  <= 'Z' ||  >= 'a' &&  <= 'z' ||  >= '0' &&  <= '9' ||  == '-':
			.WriteRune()
		case  == '_':
			.WriteString("__")
		case  == '/':
			.WriteString("_-")
		case  == '@':
			.WriteString("_v")
		case  == '.':
			.WriteString("_o")
		default:
			fmt.Fprintf(&, "_%04x", )
		}
	}
	return fmt.Sprintf("%04x-%s", , &)
}

type moduleVersion struct {
	modulePath, version string
}
InMemory is a Queue implementation that schedules in-process fetch operations. Unlike the GCP task queue, it will not automatically retry tasks on failure. This should only be used for local development.
type InMemory struct {
	queue       chan moduleVersion
	sem         chan struct{}
	experiments []string
}

type inMemoryProcessFunc func(context.Context, string, string) (int, error)
NewInMemory creates a new InMemory that asynchronously fetches from proxyClient and stores in db. It uses workerCount parallelism to execute these fetches.
func ( context.Context,  int,  []string,  inMemoryProcessFunc) *InMemory {
	 := &InMemory{
		queue:       make(chan moduleVersion, 1000),
		sem:         make(chan struct{}, ),
		experiments: ,
	}
	go func() {
		for  := range .queue {
			select {
			case <-.Done():
				return
			case .sem <- struct{}{}:
			}
If a worker is available, make a request to the fetch service inside a goroutine and wait for it to finish.
			go func( moduleVersion) {
				defer func() { <-.sem }()

				log.Infof(, "Fetch requested: %q %q (workerCount = %d)", .modulePath, .version, cap(.sem))

				,  := context.WithTimeout(, 5*time.Minute)
				 = experiment.NewContext(, ...)
				defer ()

				if ,  := (, .modulePath, .version);  != nil {
					log.Error(, )
				}
			}()
		}
	}()
	return 
}
ScheduleFetch pushes a fetch task into the local queue to be processed asynchronously.
func ( *InMemory) ( context.Context, , ,  string,  bool) (bool, error) {
	.queue <- moduleVersion{, }
	return true, nil
}
WaitForTesting waits for all queued requests to finish. It should only be used by test code.
func ( InMemory) ( context.Context) {
	for  := 0;  < cap(.sem); ++ {
		select {
		case <-.Done():
			return
		case .sem <- struct{}{}:
		}
	}
	close(.queue)