Skip to main content

Helpers

Overview

This page documents the package-level helpers that make common usage patterns easy while preserving the core Workers semantics (pool options, StopOnError, preserve-order, buffers, and lifecycle ownership).

HelperSignature (short)Purpose
RunAllRunAll(ctx, tasks, opts...) ([]R, error)Execute a batch of Task[R]; return all results and a single aggregated error
MapMap(ctx, in []T, fn(T)->(R,error), opts...) ([]R, error)Convenience mapping over a slice; uses RunAll under the hood
ForEachForEach(ctx, in []T, fn(T)->error, opts...) errorApply a side-effecting fn to each input; return aggregated error only
RunStreamRunStream(ctx, in <-chan Task[R], opts...) (<-chan R, <-chan error, error)Consume tasks from a channel and stream results/errors
MapStreamMapStream(ctx, in <-chan T, fn(T)->(R,error), opts...) (<-chan R, <-chan error, error)Convenience streaming map over a channel of T
ForEachStreamForEachStream(ctx, in <-chan T, fn(T)->error, opts...) (<-chan error, error)Apply a side-effecting fn to each streamed input; stream per-item errors

Notes

  • Ordering: By default, results are delivered in completion order. When WithPreserveOrder() is provided, results are delivered in the original input order (with buffering and potential head-of-line blocking).
  • StopOnError: When WithStopOnError() is provided, the engine cancels on the first error. Batch helpers will stop enqueuing new tasks. Stream helpers stop reading from the input channel and then wait for already-started tasks to finish before closing result/error channels.
  • Backpressure: Streams rely on configured buffer sizes and consumers draining the returned channels; otherwise the pipeline can stall.

RunAll

Signature:

RunAll[R any](ctx context.Context, tasks []workers.Task[R], opts ...workers.Option) ([]R, error)

Overview

  • Owns lifecycle: Start → enqueue wrapped tasks → wait for started completions → Close → collect outputs.
  • Returns all results and a single aggregated error (errors.Join), nil if no errors.

Key behaviors

  • Ordering: completion order by default; WithPreserveOrder() enforces input order.
  • StopOnError: cancels on first error; tasks not yet started may never run; only the first error is forwarded outward (subsequent internal errors are drained to avoid blocking).

Example

ctx := context.Background()
tasks := []workers.Task[int]{
workers.TaskValue[int](func(context.Context) int { return 1 }),
workers.TaskFunc[int](func(context.Context) (int, error) { return 2, nil }),
}
results, err := workers.RunAll[int](ctx, tasks, workers.WithDynamicPool())
if err != nil {
// aggregated errors
}
// results contains [1,2] in completion order

Map

Signature:

Map[T any, R any](ctx context.Context, in []T, fn func(context.Context, T) (R, error), opts ...workers.Option) ([]R, error)

Overview

  • Convenience wrapper over RunAll for slices. Turns each T into a Task[R] using fn and executes the batch.

Example

ctx := context.Background()
xs := []int{1,2,3}
results, err := workers.Map[int,int](ctx, xs, func(ctx context.Context, v int) (int, error) {
return v*v, nil
}, workers.WithDynamicPool())
// results: [1,4,9]

ForEach

Signature:

ForEach[T any](ctx context.Context, in []T, fn func(context.Context, T) error, opts ...workers.Option) error

Overview

  • Batch apply a side-effecting function; returns aggregated error only.

Example

ctx := context.Background()
xs := []string{"a","b","c"}
err := workers.ForEach[string](ctx, xs, func(ctx context.Context, s string) error {
// side effect
return nil
}, workers.WithFixedPool(4))

RunStream

Signature:

RunStream[R any](ctx context.Context, in <-chan workers.Task[R], opts ...workers.Option) (<-chan R, <-chan error, error)

Overview

  • Consumes Task[R] from in and streams results/errors via the returned channels.
  • Owns lifecycle: a forwarder reads from in, enqueues tasks, waits for started completions, then calls Close() to close outputs.

Key behaviors

  • Ordering: completion order by default; WithPreserveOrder() emits input-order results.
  • StopOnError: on the first error, cancels the internal context; forwarder stops reading in, waits for started tasks, and then closes outputs. Exactly one outward error is forwarded.

Example

ctx := context.Background()
in := make(chan workers.Task[int], 8)
out, errs, err := workers.RunStream[int](ctx, in, workers.WithDynamicPool())
if err != nil { /* setup failed */ }

go func() {
for i := 0; i < 10; i++ {
v := i
in <- workers.TaskValue[int](func(context.Context) int { return v })
}
close(in)
}()

for r := range out {
_ = r // consume results
}
for e := range errs {
_ = e // consume errors
}

MapStream

Signature:

MapStream[T any, R any](ctx context.Context, in <-chan T, fn func(context.Context, T) (R, error), opts ...workers.Option) (<-chan R, <-chan error, error)

Overview

  • Convenience over RunStream: wraps each T into a Task[R] using fn, streams results and errors.

Examples

ctx := context.Background()
in := make(chan int, 16)
out, errs, err := workers.MapStream[int,int](ctx, in, func(ctx context.Context, v int) (int, error) {
return v*2, nil
}, workers.WithDynamicPool())
if err != nil { /* setup failed */ }

go func() {
for i := 0; i < 5; i++ { in <- i }
close(in)
}()

for r := range out { _ = r }
for e := range errs { _ = e }

With StopOnError

out, errs, _ := workers.MapStream[int,int](ctx, in, func(ctx context.Context, v int) (int, error) {
if v == 3 { return 0, fmt.Errorf("boom") }
return v, nil
}, workers.WithFixedPool(2), workers.WithStopOnError())

ForEachStream

Signature:

ForEachStream[T any](ctx context.Context, in <-chan T, fn func(context.Context, T) error, opts ...workers.Option) (<-chan error, error)

Overview

  • Streamed side-effects: wrap each T into an error-only task; stream per-item failures via errs.
  • The errs channel is closed when the stream is fully processed or canceled.

Example

ctx := context.Background()
in := make(chan string, 8)
errs, err := workers.ForEachStream[string](ctx, in, func(ctx context.Context, s string) error {
// do something with s
return nil
}, workers.WithDynamicPool())
if err != nil { /* setup failed */ }

go func() {
for _, s := range []string{"a","b","c"} { in <- s }
close(in)
}()

for e := range errs { _ = e }

Options and common patterns

  • Pool selection: WithDynamicPool() (default) vs WithFixedPool(n). Fixed pools limit concurrency; dynamic pools scale with load.
  • Buffers: WithTasksBuffer, WithResultsBuffer, WithErrorsBuffer, and (for StopOnError) WithStopOnErrorBuffer.
  • StopOnError: WithStopOnError() cancels the internal context on the first error. Batch helpers stop enqueuing; stream helpers stop reading input. Only the first error is forwarded outward; subsequent errors are drained internally to avoid blocking.
  • Preserve order: WithPreserveOrder() introduces buffering and head-of-line blocking to emit results in input order.

Caveats

  • Backpressure: If result/error consumers don’t drain the returned channels (stream helpers), sends can block and stall the pipeline.
  • Non-cooperative work: If your function ignores context, the helpers will still stop waiting (tasks return early with cancellation) but the underlying goroutine may continue doing work in the background. Prefer context-aware code.