How It Works
Workers Pools
Workers library supports two types of workers pools: dynamic and fixed-limit.
The choice between them depends on the nature of the tasks being executed.
- Dynamic pool is suitable for most cases.
- Fixed-limit pool is preferred for cases requiring limiting the number of concurrent tasks.
Dynamic Pool
The dynamic pool is implemented using a wrapper over Go's sync.Pool
.
It is initialized by setting workers.Config.MaxWorkers
to 0
(the default value):
w := workers.New[string](context.Background(), &workers.Config{MaxWorkers: 0})
or simply:
w := workers.New[string](context.Background(), nil)
With a dynamic pool, workers are created and reused automatically as needed, optimizing resource usage without hard limits.
Fixed-Limit Pool
The fixed-limit pool is initialized by setting workers.Config.MaxWorkers
to a positive integer:
w := workers.New[string](context.Background(), &workers.Config{MaxWorkers: 10})
You can also dynamically limit the number of workers to the number of logical CPUs available:
w := workers.New[string](context.Background(), &workers.Config{MaxWorkers: runtime.NumCPU()})
In a fixed pool:
- New workers are added sequentially when tasks are received, up to the configured maximum.
- Once the pool reaches its limit, new tasks wait for an available worker.
- Each worker can execute only one task at a time, ensuring controlled concurrency.
Benchmark
The tests show that fixed-limit pool workers are slightly faster with executing memory-heavier tasks, while dynamic pool workers are faster with simple tasks. A 'bigger' task below means performing some operations on slices of 10_000_000 to 100_000_000 elements (step 10_000_000). A 'smaller' task means performing some operations on slices of 1_000 to 5_000 elements (step 2).
BenchmarkWorkers/fixed_less_big_start_immediately-8 1 13183144125 ns/op
BenchmarkWorkers/dynamic_less_big_start_immediately-8 1 16433296791 ns/op
BenchmarkWorkers/fixed_less_big_accumulate-8 1 16738591167 ns/op
BenchmarkWorkers/dynamic_less_big_accumulate-8 1 20579221959 ns/op
BenchmarkWorkers/fixed_more_small_start_immediately-8 12 85020462 ns/op
BenchmarkWorkers/dynamic_more_small_start_immediately-8 14 74789342 ns/op
BenchmarkWorkers/fixed_more_small_accumulate-8 14 76420140 ns/op
BenchmarkWorkers/dynamic_more_small_accumulate-8 12 87226969 ns/op
Summary
Type | Description |
---|---|
Dynamic | No hard worker limit; uses sync.Pool ; adapts automatically. |
Fixed | Limits concurrency to a maximum number; better for resource-bound tasks. |
Tasks
Tasks are added to the queue using the Add()
method. Each task is passed as a function with one of the following signatures:
func(context.Context) R
func(context.Context) (R, error)
func(context.Context) error
where R
is the result type, matching the type parameter used when constructing the controller with workers.New()
.
See Examples for practical usage examples.
Task Dispatch
- Tasks are assigned to workers after calling the
Start()
method. Start()
can be called manually or triggered automatically ifStartImmediately
is set totrue
in theworkers.Config
struct.
Task Execution
- Once started, tasks are dispatched and assigned to available workers from the pool (see Workers Pools for more details).
- A worker executes its assigned task immediately.
- Task results and errors are returned through dedicated channels (see Results for details).
Error Handling
- If
StopOnError
is enabled in the configuration, workers will stop executing new tasks upon encountering the first error. - If a panic occurs during task execution, it is safely recovered and sent as an error through the error channel.
Results
Task execution results and errors are returned through dedicated channels.
Accessing Channels
- Use
GetResults()
to access the results channel. - Use
GetErrors()
to access the errors channel.
Both channels are buffered and should be closed manually after processing to avoid memory leaks or deadlocks.
See Tasks for how tasks are dispatched and executed.
Order of Results
The order of results may vary due to the concurrent nature of task execution. The original order of tasks is not guaranteed to be preserved.
Example
select {
case result := <-w.GetResults():
fmt.Println(result)
case err := <-w.GetErrors():
fmt.Println("error executing task:", err)
}
close(w.GetResults())
close(w.GetErrors())
Channel Types
- Results channel:
chan R
, whereR
is the result type specified during controller creation (workers.New()
). - Errors channel:
chan error
.