Helpers
Overview
This page documents the package-level helpers that make common usage patterns easy while preserving the core Workers semantics (pool options, StopOnError, preserve-order, buffers, and lifecycle ownership).
| Helper | Signature (short) | Purpose | 
|---|---|---|
| RunAll | RunAll(ctx, tasks, opts...) ([]R, error) | Execute a batch of Task[R]; return all results and a single aggregated error | 
| Map | Map(ctx, in []T, fn(T)->(R,error), opts...) ([]R, error) | Convenience mapping over a slice; uses RunAll under the hood | 
| ForEach | ForEach(ctx, in []T, fn(T)->error, opts...) error | Apply a side-effecting fn to each input; return aggregated error only | 
| RunStream | RunStream(ctx, in <-chan Task[R], opts...) (<-chan R, <-chan error, error) | Consume tasks from a channel and stream results/errors | 
| MapStream | MapStream(ctx, in <-chan T, fn(T)->(R,error), opts...) (<-chan R, <-chan error, error) | Convenience streaming map over a channel of T | 
| ForEachStream | ForEachStream(ctx, in <-chan T, fn(T)->error, opts...) (<-chan error, error) | Apply a side-effecting fn to each streamed input; stream per-item errors | 
Notes
- Ordering: By default, results are delivered in completion order. When WithPreserveOrder()is provided, results are delivered in the original input order (with buffering and potential head-of-line blocking).
- StopOnError: When WithStopOnError()is provided, the engine cancels on the first error. Batch helpers will stop enqueuing new tasks. Stream helpers stop reading from the input channel and then wait for already-started tasks to finish before closing result/error channels.
- Backpressure: Streams rely on configured buffer sizes and consumers draining the returned channels; otherwise the pipeline can stall.
RunAll
Signature:
RunAll[R any](ctx context.Context, tasks []workers.Task[R], opts ...workers.Option) ([]R, error)
Overview
- Owns lifecycle: Start → enqueue wrapped tasks → wait for started completions → Close → collect outputs.
- Returns all results and a single aggregated error (errors.Join), nilif no errors.
Key behaviors
- Ordering: completion order by default; WithPreserveOrder()enforces input order.
- StopOnError: cancels on first error; tasks not yet started may never run; only the first error is forwarded outward (subsequent internal errors are drained to avoid blocking).
Example
ctx := context.Background()
tasks := []workers.Task[int]{
    workers.TaskValue[int](func(context.Context) int { return 1 }),
    workers.TaskFunc[int](func(context.Context) (int, error) { return 2, nil }),
}
results, err := workers.RunAll[int](ctx, tasks, workers.WithDynamicPool())
if err != nil {
    // aggregated errors
}
// results contains [1,2] in completion order
Map
Signature:
Map[T any, R any](ctx context.Context, in []T, fn func(context.Context, T) (R, error), opts ...workers.Option) ([]R, error)
Overview
- Convenience wrapper over RunAll for slices. Turns each Tinto aTask[R]usingfnand executes the batch.
Example
ctx := context.Background()
xs := []int{1,2,3}
results, err := workers.Map[int,int](ctx, xs, func(ctx context.Context, v int) (int, error) {
    return v*v, nil
}, workers.WithDynamicPool())
// results: [1,4,9]
ForEach
Signature:
ForEach[T any](ctx context.Context, in []T, fn func(context.Context, T) error, opts ...workers.Option) error
Overview
- Batch apply a side-effecting function; returns aggregated error only.
Example
ctx := context.Background()
xs := []string{"a","b","c"}
err := workers.ForEach[string](ctx, xs, func(ctx context.Context, s string) error {
    // side effect
    return nil
}, workers.WithFixedPool(4))
RunStream
Signature:
RunStream[R any](ctx context.Context, in <-chan workers.Task[R], opts ...workers.Option) (<-chan R, <-chan error, error)
Overview
- Consumes Task[R]frominand streams results/errors via the returned channels.
- Owns lifecycle: a forwarder reads from in, enqueues tasks, waits for started completions, then callsClose()to close outputs.
Key behaviors
- Ordering: completion order by default; WithPreserveOrder()emits input-order results.
- StopOnError: on the first error, cancels the internal context; forwarder stops reading in, waits for started tasks, and then closes outputs. Exactly one outward error is forwarded.
Example
ctx := context.Background()
in := make(chan workers.Task[int], 8)
out, errs, err := workers.RunStream[int](ctx, in, workers.WithDynamicPool())
if err != nil { /* setup failed */ }
go func() {
    for i := 0; i < 10; i++ {
        v := i
        in <- workers.TaskValue[int](func(context.Context) int { return v })
    }
    close(in)
}()
for r := range out {
    _ = r // consume results
}
for e := range errs {
    _ = e // consume errors
}
MapStream
Signature:
MapStream[T any, R any](ctx context.Context, in <-chan T, fn func(context.Context, T) (R, error), opts ...workers.Option) (<-chan R, <-chan error, error)
Overview
- Convenience over RunStream: wraps each Tinto aTask[R]usingfn, streams results and errors.
Examples
ctx := context.Background()
in := make(chan int, 16)
out, errs, err := workers.MapStream[int,int](ctx, in, func(ctx context.Context, v int) (int, error) {
    return v*2, nil
}, workers.WithDynamicPool())
if err != nil { /* setup failed */ }
go func() {
    for i := 0; i < 5; i++ { in <- i }
    close(in)
}()
for r := range out { _ = r }
for e := range errs { _ = e }
With StopOnError
out, errs, _ := workers.MapStream[int,int](ctx, in, func(ctx context.Context, v int) (int, error) {
    if v == 3 { return 0, fmt.Errorf("boom") }
    return v, nil
}, workers.WithFixedPool(2), workers.WithStopOnError())
ForEachStream
Signature:
ForEachStream[T any](ctx context.Context, in <-chan T, fn func(context.Context, T) error, opts ...workers.Option) (<-chan error, error)
Overview
- Streamed side-effects: wrap each Tinto an error-only task; stream per-item failures viaerrs.
- The errschannel is closed when the stream is fully processed or canceled.
Example
ctx := context.Background()
in := make(chan string, 8)
errs, err := workers.ForEachStream[string](ctx, in, func(ctx context.Context, s string) error {
    // do something with s
    return nil
}, workers.WithDynamicPool())
if err != nil { /* setup failed */ }
go func() {
    for _, s := range []string{"a","b","c"} { in <- s }
    close(in)
}()
for e := range errs { _ = e }
Options and common patterns
- Pool selection: WithDynamicPool()(default) vsWithFixedPool(n). Fixed pools limit concurrency; dynamic pools scale with load.
- Buffers: WithTasksBuffer,WithResultsBuffer,WithErrorsBuffer, and (for StopOnError)WithStopOnErrorBuffer.
- StopOnError: WithStopOnError()cancels the internal context on the first error. Batch helpers stop enqueuing; stream helpers stop reading input. Only the first error is forwarded outward; subsequent errors are drained internally to avoid blocking.
- Preserve order: WithPreserveOrder()introduces buffering and head-of-line blocking to emit results in input order.
Caveats
- Backpressure: If result/error consumers don’t drain the returned channels (stream helpers), sends can block and stall the pipeline.
- Non-cooperative work: If your function ignores context, the helpers will still stop waiting (tasks return early with cancellation) but the underlying goroutine may continue doing work in the background. Prefer context-aware code.