Involved Source Files
Package bundler supports bundling (batching) of items. Bundling amortizes an
action with fixed costs over multiple items. For example, if an API provides
an RPC that accepts a list of items as input, but clients would prefer
adding items one at a time, then a Bundler can accept individual items from
the client and bundle many of them into a single RPC.
This package is experimental and subject to change without notice.
Package-Level Type Names (total 3, in which 1 are exported)
/* sort exporteds by: | */
A Bundler collects items added to it into a bundle until the bundle
exceeds a given size, then calls a user-provided function to handle the
bundle.
The exported fields are only safe to modify prior to the first call to Add
or AddWait.
The maximum number of bytes that the Bundler will keep in memory before
returning ErrOverflow. The default is DefaultBufferedByteLimit.
The maximum size of a bundle, in bytes. Zero means unlimited.
Once the number of bytes in current bundle reaches this threshold, handle
the bundle. The default is DefaultBundleByteThreshold. This triggers handling,
but does not cap the total size of a bundle.
Once a bundle has this many items, handle the bundle. Since only one
item at a time is added to a bundle, no bundle will exceed this
threshold, so it also serves as a limit. The default is
DefaultBundleCountThreshold.
Starting from the time that the first message is added to a bundle, once
this delay has passed, handle the bundle. The default is DefaultDelayThreshold.
The maximum number of handler invocations that can be running at once.
The default is 1.
The current bundle we're adding items to. Not yet in the queue.
Appended to the queue once the flushTimer fires or the bundle
thresholds/limits are reached. If curBundle is nil and tail is
not, we first try to add items to tail. Once tail is full or handled,
we create a new curBundle for the incoming item.
// counts outstanding bundles since last flush
// implements DelayThreshold
// called to handle a bundle
// # of bundles currently being handled (i.e. handler is invoked on them)
The next bundle in the queue to be handled. Nil if the queue is
empty.
// nil (zero value) for slice of items
The first call to Add or AddWait, mode will be add or addWait respectively.
If there wasn't call yet then mode is none.
// guards access to fields below
// signal used to wait for prior flush
// enforces BufferedByteLimit
// guards semaphore initialization
The last bundle in the queue to be handled. Nil if the queue is
empty. If curBundle is nil and tail isn't, we attempt to add new
items to the tail until if becomes full or has been passed to the
handler.
Add adds item to the current bundle. It marks the bundle for handling and
starts a new one if any of the thresholds or limits are exceeded.
The type of item must be assignable to the itemExample parameter of the NewBundler
method, otherwise there will be a panic.
If the item's size exceeds the maximum bundle size (Bundler.BundleByteLimit), then
the item can never be handled. Add returns ErrOversizedItem in this case.
If adding the item would exceed the maximum memory allowed
(Bundler.BufferedByteLimit) or an AddWait call is blocked waiting for
memory, Add returns ErrOverflow.
Add never blocks.
AddWait adds item to the current bundle. It marks the bundle for handling and
starts a new one if any of the thresholds or limits are exceeded.
If the item's size exceeds the maximum bundle size (Bundler.BundleByteLimit), then
the item can never be handled. AddWait returns ErrOversizedItem in this case.
If adding the item would exceed the maximum memory allowed (Bundler.BufferedByteLimit),
AddWait blocks until space is available or ctx is done.
Calls to Add and AddWait should not be mixed on the same Bundler.
Flush invokes the handler for all remaining items in the Bundler and waits
for it to return.
add adds item to the tail of the bundle queue or curBundle depending on space
and nil-ness (see inline comments). It marks curBundle for handling (by
appending it to the queue) if any of the thresholds or limits are exceeded.
curBundle is lazily initialized. It requires that b.mu is locked.
canFit returns true if bu can fit an additional item of size bytes based
on the limits of Bundler b.
enqueueCurBundle moves curBundle to the end of the queue. The bundle may be
handled immediately if we are below HandlerLimit. It requires that b.mu is
locked.
handle calls the user-specified handler on the given bundle. handle is
intended to be run as a goroutine. After the handler returns, we update the
byte total. handle continues processing additional bundles that are ready.
If no more bundles are ready, the handler count is decremented and the
goroutine ends.
(*T) initSemaphores()
next returns the next bundle that is ready for handling and removes it from
the internal queue. It requires that b.mu is locked.
(*T) postHandle(bu *bundle) *bundle
setMode sets the state of Bundler's mode. If mode was defined before
and passed state is different from it then return an error.
tryHandleBundles is the timer callback that handles or queues any current
bundle after DelayThreshold time, even if the bundle isn't completely full.
*T : net/http.Flusher
func NewBundler(itemExample interface{}, handler func(interface{})) *Bundler
A bundle is a group of items that were added individually and will be passed
to a handler as a slice.
// the counter that tracks flush completion
// slice of T
// bundles are handled in order as a linked list queue
// size in bytes of all items
add appends item to this bundle and increments the total size. It requires
that b.mu is locked.
func (*Bundler).next() *bundle
func (*Bundler).postHandle(bu *bundle) *bundle
func (*Bundler).canFit(bu *bundle, size int) bool
func (*Bundler).handle(bu *bundle)
func (*Bundler).postHandle(bu *bundle) *bundle
Package-Level Functions (only one, which is exported)
NewBundler creates a new Bundler.
itemExample is a value of the type that will be bundled. For example, if you
want to create bundles of *Entry, you could pass &Entry{} for itemExample.
handler is a function that will be called on each bundle. If itemExample is
of type T, the argument to handler is of type []T. handler is always called
sequentially for each bundle, and never in parallel.
Configure the Bundler by setting its thresholds and limits before calling
any of its methods.
Package-Level Variables (total 3, in which 2 are exported)
ErrOverflow indicates that Bundler's stored bytes exceeds its BufferedByteLimit.
ErrOversizedItem indicates that an item's size exceeds the maximum bundle size.
errMixedMethods indicates that mutually exclusive methods has been
called subsequently.
Package-Level Constants (total 7, in which 4 are exported)
The pages are generated with Goldsv0.3.2-preview. (GOOS=darwin GOARCH=amd64)
Golds is a Go 101 project developed by Tapir Liu.
PR and bug reports are welcome and can be submitted to the issue list.
Please follow @Go100and1 (reachable from the left QR code) to get the latest news of Golds.