Examples
Overview
Example | Purpose |
---|---|
Basic Example | Launch multiple tasks, gather results. |
Passing Arguments to Tasks | Demonstrate how to pass arguments to task functions using closures. |
Accumulate and Start After Delay | Use StartImmediately: false . Show effect of TasksBufferSize . |
Timeout and Cancellation | Show context.WithTimeout canceling tasks. |
Fixed vs Dynamic Pool | Show control over worker count. |
Stop on Error | Demonstrate stopping after the first error. |
Handling Panics Gracefully | Panic-safe tasks without crashing the controller. |
Basic Example
📝 Description
In this example, we concurrently calculate Fibonacci numbers for a range of values from 20 down to 11.
The fibonacci()
function implements a basic recursive algorithm.
A workers controller is created with the StartImmediately
option set to true
, meaning tasks are executed as soon as they are added.
Results and errors are handled through dedicated channels, and the channels are manually closed after processing.
🚀 Code
package main
import (
"context"
"fmt"
"log"
"sync"
"github.com/ygrebnov/workers"
)
// fibonacci calculates Fibonacci numbers.
func fibonacci(n int) int {
if n <= 1 {
return n
}
return fibonacci(n-1) + fibonacci(n-2)
}
func main() {
// Create a new controller with a dynamic number of workers.
// Type parameter is used to specify the type of task result.
w := workers.New[string](context.Background(), &workers.Config{StartImmediately: true})
wg := sync.WaitGroup{}
// Receive and print results or handle errors in a separate goroutine.
go func() {
for range 10 {
select {
case result := <-w.GetResults():
fmt.Println(result)
case err := <-w.GetErrors():
fmt.Println("error executing task:", err)
}
wg.Done()
}
}()
for i := 20; i >= 11; i-- {
wg.Add(1)
// Add ten tasks calculating the Fibonacci number for a given index.
// A task is a function that takes a context and returns a string.
err := w.AddTask(func(ctx context.Context) string {
return fmt.Sprintf("Calculated Fibonacci for: %d, result: %d.", i, fibonacci(i))
})
if err != nil {
log.Fatalf("failed to add task: %v", err)
}
}
wg.Wait() // Wait for all tasks to finish.
// Close channels.
close(w.GetResults())
close(w.GetErrors())
}
📋 Example Output
Calculated Fibonacci for: 20, result: 6765.
Calculated Fibonacci for: 18, result: 2584.
Calculated Fibonacci for: 19, result: 4181.
Calculated Fibonacci for: 17, result: 1597.
Calculated Fibonacci for: 16, result: 987.
Calculated Fibonacci for: 15, result: 610.
Calculated Fibonacci for: 14, result: 377.
Calculated Fibonacci for: 12, result: 144.
Calculated Fibonacci for: 13, result: 233.
Calculated Fibonacci for: 11, result: 89.
Passing Arguments to Tasks
📝 Description
In this example, we demonstrate how to pass arguments to tasks using closures.
Workers controller is constructed using workers.New[string]
method. It means that we can pass tasks with three possible signatures:
func(context.Context) string
,func(context.Context) (string, error)
, orfunc(context.Context) error
.
Neither of these signatures allows passing any arbitrary argument. To do that, we define a helper function newCalculateFibonacciTask
which takes an argument to be processed in the task and returns a new task function with the required signature.
🚀 Code
package main
import (
"context"
"fmt"
"log"
"sync"
"github.com/ygrebnov/workers"
)
// fibonacci calculates Fibonacci numbers.
func fibonacci(n int) int {
if n <= 1 {
return n
}
return fibonacci(n-1) + fibonacci(n-2)
}
// newCalculateFibonacciTask returns a new task function.
func newCalculateFibonacciTask(n int) func(ctx context.Context) string {
return func(ctx context.Context) string {
return fmt.Sprintf("Calculated Fibonacci for: %d, result: %d.", n, fibonacci(n))
}
}
func main() {
// Create a new controller with a dynamic number of workers.
// The controller starts immediately.
// Type parameter is used to specify the type of task result.
w := workers.New[string](context.Background(), &workers.Config{StartImmediately: true})
wg := sync.WaitGroup{}
// Receive and print results or handle errors in a separate goroutine.
go func() {
for range 10 {
select {
case result := <-w.GetResults():
fmt.Println(result)
case err := <-w.GetErrors():
fmt.Println("error executing task:", err)
}
wg.Done()
}
}()
// Add tasks with passing arguments using a helper function.
for i := 20; i >= 11; i-- {
wg.Add(1)
err := w.AddTask(newCalculateFibonacciTask(i))
if err != nil {
log.Fatalf("failed to add task: %v", err)
}
}
wg.Wait() // Wait for all tasks to finish.
// Close channels.
close(w.GetResults())
close(w.GetErrors())
}
📋 Example Output
Calculated Fibonacci for: 20, result: 6765.
Calculated Fibonacci for: 18, result: 2584.
Calculated Fibonacci for: 19, result: 4181.
Calculated Fibonacci for: 17, result: 1597.
Calculated Fibonacci for: 16, result: 987.
Calculated Fibonacci for: 15, result: 610.
Calculated Fibonacci for: 14, result: 377.
Calculated Fibonacci for: 12, result: 144.
Calculated Fibonacci for: 13, result: 233.
Calculated Fibonacci for: 11, result: 89.
Accumulate and Start After Delay
📝 Description
In this example, we demonstrate how to accumulate tasks before starting their execution.
A workers controller is created with StartImmediately: false
(default), meaning tasks are collected first and executed later when Start()
is called. Additionally, TasksBufferSize
is set to 10 to allow for task accumulation before starting execution.
We simulate two different "processors" running in separate goroutines that add tasks with different function signatures.
🚀 Code
package main
import (
"context"
"fmt"
"log"
"sync"
"time"
"github.com/ygrebnov/workers"
)
// fibonacci calculates Fibonacci numbers.
func fibonacci(n int) int {
if n <= 1 {
return n
}
return fibonacci(n-1) + fibonacci(n-2)
}
// newCalculateFibonacciTask returns a task function
// with 'func(ctx context.Context) string' signature.
func newCalculateFibonacciTask(n int) func(ctx context.Context) string {
return func(ctx context.Context) string {
return fmt.Sprintf("Calculated Fibonacci for: %d, result: %d.", n, fibonacci(n))
}
}
// newCalculateFibonacciWithErrorTask returns a task function
// with 'func(ctx context.Context) (string, error)' signature.
func newCalculateFibonacciWithErrorTask(n int) func(ctx context.Context) (string, error) {
return func(ctx context.Context) (string, error) {
return fmt.Sprintf("Calculated Fibonacci for: %d, result: %d.", n, fibonacci(n)), nil
}
}
func main() {
// Create a workers controller which does not start immediately.
// Set TasksBufferSize to 10 to allow for task accumulation.
w := workers.New[string](context.TODO(), &workers.Config{TasksBufferSize: 10})
wg := sync.WaitGroup{}
// Receive and print results or handle errors in a separate goroutine.
go func() {
for range 10 {
select {
case result := <-w.GetResults():
fmt.Println(result)
case err := <-w.GetErrors():
fmt.Println("error executing task:", err)
}
wg.Done()
}
}()
// Create channels to signal completion of task addition by processors.
p1Done := make(chan struct{}, 1)
p2Done := make(chan struct{}, 1)
// Processor 1: adds tasks returning string results.
go func() {
for i := 20; i >= 11; i -= 2 {
wg.Add(1)
time.Sleep(50 * time.Millisecond) // Simulate task preparation delay.
err := w.AddTask(newCalculateFibonacciTask(i))
if err != nil {
log.Fatalf("failed to add task: %v", err)
}
}
p1Done <- struct{}{} // Signal completion of processor 1.
}()
// Processor 2: adds tasks returning (string, error) results.
go func() {
for i := 19; i >= 11; i -= 2 {
wg.Add(1)
time.Sleep(50 * time.Millisecond) // Simulate task preparation delay
err := w.AddTask(newCalculateFibonacciWithErrorTask(i))
if err != nil {
log.Fatalf("failed to add task: %v", err)
}
}
p2Done <- struct{}{} // Signal completion of processor 2.
}()
// Wait for both processors to finish adding tasks.
<-p1Done
<-p2Done
// Start executing accumulated tasks.
w.Start(context.Background())
wg.Wait() // Wait for all tasks to finish.
// Close channels after processing.
close(w.GetResults())
close(w.GetErrors())
}
📋 Example Output
Calculated Fibonacci for: 12, result: 144.
Calculated Fibonacci for: 11, result: 89.
Calculated Fibonacci for: 17, result: 1597.
Calculated Fibonacci for: 18, result: 2584.
Calculated Fibonacci for: 20, result: 6765.
Calculated Fibonacci for: 16, result: 987.
Calculated Fibonacci for: 13, result: 233.
Calculated Fibonacci for: 15, result: 610.
Calculated Fibonacci for: 19, result: 4181.
Calculated Fibonacci for: 14, result: 377.
Timeout and Cancellation
📝 Description
In this example, we demonstrate how to cancel the execution of tasks using a context with a timeout.
A workers controller is created with a TasksBufferSize
set to allow for task accumulation before starting execution.
A context.WithTimeout
is used to automatically stop the task execution after a predefined time limit, ensuring that long-running tasks are safely canceled.
🚀 Code
package main
import (
"context"
"fmt"
"log"
"time"
"github.com/ygrebnov/workers"
)
// fibonacci calculates Fibonacci numbers recursively.
func fibonacci(n int) int {
if n <= 1 {
return n
}
return fibonacci(n-1) + fibonacci(n-2)
}
// newCalculateFibonacciTask returns a new task function.
func newCalculateFibonacciTask(n int) func(ctx context.Context) string {
return func(ctx context.Context) string {
// Simulate a long-running task
time.Sleep(time.Millisecond * 600 * time.Duration(n))
return fmt.Sprintf("Calculated Fibonacci for: %d, result: %d.", n, fibonacci(n))
}
}
func main() {
// Create a workers controller with a buffer for pending tasks.
// Set TasksBufferSize to 10 to allow for task accumulation.
// Note that in this particular example, leaving TasksBufferSize at zero will lead to a deadlock,
// as the tasks are added before the workers are started.
w := workers.New[string](context.TODO(), &workers.Config{TasksBufferSize: 10})
// Create a context with a timeout of 2 seconds.
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel() // Ensure the context is canceled to release resources.
// Add tasks to the workers controller.
for i := 1; i <= 10; i++ {
err := w.AddTask(newCalculateFibonacciTask(i))
if err != nil {
log.Fatalf("failed to add task: %v", err)
}
}
// Start the workers.
w.Start(ctx)
// Process results or handle errors until the context is canceled.
for {
select {
case <-ctx.Done():
fmt.Println("Context canceled due to timeout.")
// Close the results and errors channels.
close(w.GetResults())
close(w.GetErrors())
return
case result := <-w.GetResults():
fmt.Println(result)
case err := <-w.GetErrors():
log.Fatalf("error executing task: %v", err)
}
}
}
📋 Example Output
Calculated Fibonacci for: 1, result: 1.
Calculated Fibonacci for: 2, result: 1.
Calculated Fibonacci for: 3, result: 2.
Context canceled due to timeout.
Fixed vs Dynamic Pool
📝 Description
In this example, we demonstrate the difference between a fixed worker pool and a dynamic worker pool.
A fixed pool restricts the number of concurrent workers based on MaxWorkers
, while a dynamic pool grows and shrinks automatically as needed.
We create two separate controllers and use them to execute the same set of tasks.
🚀 Code
package main
import (
"context"
"fmt"
"log"
"runtime"
"github.com/ygrebnov/workers"
)
// fibonacci calculates Fibonacci numbers recursively.
func fibonacci(n int) int {
if n <= 1 {
return n
}
return fibonacci(n-1) + fibonacci(n-2)
}
func main() {
// Create a fixed-size worker pool with number of workers equal to number of CPU cores.
w1 := workers.New[string](context.Background(), &workers.Config{
StartImmediately: true,
MaxWorkers: uint(runtime.NumCPU()),
})
// Create a dynamic worker pool (MaxWorkers = 0).
w2 := workers.New[string](context.Background(), &workers.Config{
StartImmediately: true,
})
// Add tasks to both controllers.
for i := 20; i >= 11; i-- {
err := w1.AddTask(func(ctx context.Context) string {
return fmt.Sprintf("[Fixed] Calculated Fibonacci for: %d, result: %d.", i, fibonacci(i))
})
if err != nil {
log.Fatalf("failed to add task to fixed workers: %v", err)
}
err = w2.AddTask(func(ctx context.Context) string {
return fmt.Sprintf("[Dynamic] Calculated Fibonacci for: %d, result: %d.", i, fibonacci(i))
})
if err != nil {
log.Fatalf("failed to add task to dynamic workers: %v", err)
}
}
// Collect results from both worker pools.
for range 20 {
select {
case result := <-w1.GetResults():
fmt.Println(result)
case result := <-w2.GetResults():
fmt.Println(result)
case err := <-w1.GetErrors():
log.Fatalf("error executing fixed workers task: %v", err)
case err := <-w2.GetErrors():
log.Fatalf("error executing dynamic workers task: %v", err)
}
}
// Close result and error channels after processing.
close(w1.GetResults())
close(w2.GetResults())
close(w1.GetErrors())
close(w2.GetErrors())
}
📋 Example Output
[Dynamic] Calculated Fibonacci for: 18, result: 2584.
[Dynamic] Calculated Fibonacci for: 19, result: 4181.
[Dynamic] Calculated Fibonacci for: 20, result: 6765.
[Dynamic] Calculated Fibonacci for: 17, result: 1597.
[Dynamic] Calculated Fibonacci for: 15, result: 610.
[Fixed] Calculated Fibonacci for: 20, result: 6765.
[Fixed] Calculated Fibonacci for: 19, result: 4181.
[Fixed] Calculated Fibonacci for: 18, result: 2584.
[Dynamic] Calculated Fibonacci for: 16, result: 987.
[Dynamic] Calculated Fibonacci for: 13, result: 233.
[Dynamic] Calculated Fibonacci for: 14, result: 377.
[Fixed] Calculated Fibonacci for: 17, result: 1597.
[Fixed] Calculated Fibonacci for: 16, result: 987.
[Fixed] Calculated Fibonacci for: 14, result: 377.
[Fixed] Calculated Fibonacci for: 15, result: 610.
[Dynamic] Calculated Fibonacci for: 12, result: 144.
[Dynamic] Calculated Fibonacci for: 11, result: 89.
[Fixed] Calculated Fibonacci for: 13, result: 233.
[Fixed] Calculated Fibonacci for: 11, result: 89.
[Fixed] Calculated Fibonacci for: 12, result: 144.
Note: As you can see from the example output, due to a very lightweight task function, workers with dynamic-size pool produce results faster.
Stop on Error
📝 Description
In this example, we demonstrate how to stop task execution automatically when the first error is encountered.
A workers controller is created with StopOnError
set to true, which means that if any task returns an error, no further tasks will be started.
We simulate this by adding a set of Fibonacci calculation tasks, where one specific task (for input 15
) deliberately returns an error.
This shows how to prioritize fast failure and recovery in error-sensitive workflows.
🚀 Code
package main
import (
"context"
"fmt"
"log"
"time"
"github.com/ygrebnov/workers"
)
// fibonacci calculates Fibonacci numbers recursively.
func fibonacci(n int) int {
if n <= 1 {
return n
}
return fibonacci(n-1) + fibonacci(n-2)
}
// newCalculateFibonacciWithErrorTask returns a new task function.
func newCalculateFibonacciWithErrorTask(n int) func(ctx context.Context) (string, error) {
return func(ctx context.Context) (string, error) {
time.Sleep(50 * time.Millisecond) // Simulate some delay
// Function returns an error for n == 15
if n == 15 {
return "", fmt.Errorf("error calculating Fibonacci for: %d", n)
}
return fmt.Sprintf("Calculated Fibonacci for: %d, result: %d.", n, fibonacci(n)), nil
}
}
func main() {
// Create a workers controller with 'StopOnError' set to true.
w := workers.New[string](context.Background(), &workers.Config{StartImmediately: true, StopOnError: true})
// Add tasks to the workers controller.
for i := 20; i >= 11; i-- {
err := w.AddTask(newCalculateFibonacciWithErrorTask(i))
if err != nil {
log.Fatalf("failed to add task to workers: %v", err)
}
}
// Receive and print results or handle errors.
for range 10 {
select {
case result := <-w.GetResults():
fmt.Println(result)
case err := <-w.GetErrors():
log.Fatalf("error executing task: %v", err)
}
}
// Close result and error channels after processing.
close(w.GetResults())
close(w.GetErrors())
}
📋 Example Output
Calculated Fibonacci for: 19, result: 4181.
Calculated Fibonacci for: 11, result: 89.
Calculated Fibonacci for: 17, result: 1597.
Calculated Fibonacci for: 16, result: 987.
Calculated Fibonacci for: 14, result: 377.
2025/04/29 08:54:22 error executing task: error calculating Fibonacci for: 15
Handle Panics Gracefully
📝 Description
In this example, we demonstrate how panics inside task functions are safely recovered and reported as standard errors. The workers controller is designed to automatically catch panics during task execution and send them to the errors channel without crashing the entire application.
We simulate this by creating a set of Fibonacci calculation tasks, where one specific task (for input 15
) deliberately triggers a panic.
This ensures that even unexpected runtime errors are gracefully handled and the system remains stable.
🚀 Code
package main
import (
"context"
"fmt"
"log"
"time"
"github.com/ygrebnov/workers"
)
// fibonacci calculates Fibonacci numbers recursively.
func fibonacci(n int) int {
if n <= 1 {
return n
}
return fibonacci(n-1) + fibonacci(n-2)
}
// newCalculateFibonacciTask returns a new task function.
func newCalculateFibonacciTask(n int) func(ctx context.Context) string {
return func(ctx context.Context) string {
time.Sleep(50 * time.Millisecond) // Simulate some delay
// Function panics for n == 15
if n == 15 {
panic(fmt.Sprintf("panic during calculating Fibonacci for: %d", n))
}
return fmt.Sprintf("Calculated Fibonacci for: %d, result: %d.", n, fibonacci(n))
}
}
func main() {
// Create a workers controller.
w := workers.New[string](context.Background(), &workers.Config{StartImmediately: true})
// Add tasks to the workers controller.
for i := 20; i >= 11; i-- {
err := w.AddTask(newCalculateFibonacciTask(i))
if err != nil {
log.Fatalf("failed to add task to workers: %v", err)
}
}
// Receive and print results or handle errors.
for range 10 {
select {
case result := <-w.GetResults():
fmt.Println(result)
case err := <-w.GetErrors():
fmt.Printf("error executing task: %v\n", err)
}
}
// Close result and error channels after processing.
close(w.GetResults())
close(w.GetErrors())
}
📋 Example Output
Calculated Fibonacci for: 13, result: 233.
Calculated Fibonacci for: 18, result: 2584.
error executing task: task execution panicked: panic during calculating Fibonacci for: 15
Calculated Fibonacci for: 11, result: 89.
Calculated Fibonacci for: 14, result: 377.
Calculated Fibonacci for: 16, result: 987.
Calculated Fibonacci for: 20, result: 6765.
Calculated Fibonacci for: 17, result: 1597.
Calculated Fibonacci for: 19, result: 4181.