Skip to main content

Examples

Overview

ExamplePurpose
Basic ExampleLaunch multiple tasks, gather results.
Passing Arguments to TasksDemonstrate how to pass arguments to task functions using closures.
Accumulate and Start After DelayUse StartImmediately: false. Show effect of TasksBufferSize.
Timeout and CancellationShow context.WithTimeout canceling tasks.
Fixed vs Dynamic PoolShow control over worker count.
Stop on ErrorDemonstrate stopping after the first error.
Handling Panics GracefullyPanic-safe tasks without crashing the controller.

Basic Example

In this example, we concurrently calculate Fibonacci numbers for a range of values from 20 down to 11.
The fibonacci() function implements a basic recursive algorithm.

A workers controller is created with the StartImmediately option set to true, meaning tasks are executed as soon as they are added.

Results and errors are handled through dedicated channels, and the channels are manually closed after processing.


package main

import (
"context"
"fmt"
"log"
"sync"

"github.com/ygrebnov/workers"
)

// fibonacci calculates Fibonacci numbers.
func fibonacci(n int) int {
if n <= 1 {
return n
}
return fibonacci(n-1) + fibonacci(n-2)
}

func main() {
// Create a new controller with a dynamic number of workers.
// Type parameter is used to specify the type of task result.
w := workers.New[string](context.Background(), &workers.Config{StartImmediately: true})

wg := sync.WaitGroup{}

// Receive and print results or handle errors in a separate goroutine.
go func() {
for range 10 {
select {
case result := <-w.GetResults():
fmt.Println(result)
case err := <-w.GetErrors():
fmt.Println("error executing task:", err)
}

wg.Done()
}
}()

for i := 20; i >= 11; i-- {
wg.Add(1)

// Add ten tasks calculating the Fibonacci number for a given index.
// A task is a function that takes a context and returns a string.
err := w.AddTask(func(ctx context.Context) string {
return fmt.Sprintf("Calculated Fibonacci for: %d, result: %d.", i, fibonacci(i))
})
if err != nil {
log.Fatalf("failed to add task: %v", err)
}
}

wg.Wait() // Wait for all tasks to finish.

// Close channels.
close(w.GetResults())
close(w.GetErrors())
}

Example Output

Calculated Fibonacci for: 20, result: 6765.
Calculated Fibonacci for: 18, result: 2584.
Calculated Fibonacci for: 19, result: 4181.
Calculated Fibonacci for: 17, result: 1597.
Calculated Fibonacci for: 16, result: 987.
Calculated Fibonacci for: 15, result: 610.
Calculated Fibonacci for: 14, result: 377.
Calculated Fibonacci for: 12, result: 144.
Calculated Fibonacci for: 13, result: 233.
Calculated Fibonacci for: 11, result: 89.

Passing Arguments to Tasks

In this example, we demonstrate how to pass arguments to tasks using closures.

Workers controller is constructed using workers.New[string] method. It means that we can pass tasks with three possible signatures:

  • func(context.Context) string,
  • func(context.Context) (string, error), or
  • func(context.Context) error.

Neither of these signatures allows passing any arbitrary argument. To do that, we define a helper function newCalculateFibonacciTask which takes an argument to be processed in the task and returns a new task function with the required signature.


package main

import (
"context"
"fmt"
"log"
"sync"

"github.com/ygrebnov/workers"
)

// fibonacci calculates Fibonacci numbers.
func fibonacci(n int) int {
if n <= 1 {
return n
}
return fibonacci(n-1) + fibonacci(n-2)
}

// newCalculateFibonacciTask returns a new task function.
func newCalculateFibonacciTask(n int) func(ctx context.Context) string {
return func(ctx context.Context) string {
return fmt.Sprintf("Calculated Fibonacci for: %d, result: %d.", n, fibonacci(n))
}
}

func main() {
// Create a new controller with a dynamic number of workers.
// The controller starts immediately.
// Type parameter is used to specify the type of task result.
w := workers.New[string](context.Background(), &workers.Config{StartImmediately: true})

wg := sync.WaitGroup{}

// Receive and print results or handle errors in a separate goroutine.
go func() {
for range 10 {
select {
case result := <-w.GetResults():
fmt.Println(result)
case err := <-w.GetErrors():
fmt.Println("error executing task:", err)
}

wg.Done()
}
}()

// Add tasks with passing arguments using a helper function.
for i := 20; i >= 11; i-- {
wg.Add(1)

err := w.AddTask(newCalculateFibonacciTask(i))
if err != nil {
log.Fatalf("failed to add task: %v", err)
}
}

wg.Wait() // Wait for all tasks to finish.

// Close channels.
close(w.GetResults())
close(w.GetErrors())
}

Example Output

Calculated Fibonacci for: 20, result: 6765.
Calculated Fibonacci for: 18, result: 2584.
Calculated Fibonacci for: 19, result: 4181.
Calculated Fibonacci for: 17, result: 1597.
Calculated Fibonacci for: 16, result: 987.
Calculated Fibonacci for: 15, result: 610.
Calculated Fibonacci for: 14, result: 377.
Calculated Fibonacci for: 12, result: 144.
Calculated Fibonacci for: 13, result: 233.
Calculated Fibonacci for: 11, result: 89.

Accumulate and Start After Delay

In this example, we demonstrate how to accumulate tasks before starting their execution.
A workers controller is created with StartImmediately: false (default), meaning tasks are collected first and executed later when Start() is called. Additionally, TasksBufferSize is set to 10 to allow for task accumulation before starting execution.

We simulate two different "processors" running in separate goroutines that add tasks with different function signatures.


package main

import (
"context"
"fmt"
"log"
"sync"
"time"

"github.com/ygrebnov/workers"
)

// fibonacci calculates Fibonacci numbers.
func fibonacci(n int) int {
if n <= 1 {
return n
}
return fibonacci(n-1) + fibonacci(n-2)
}

// newCalculateFibonacciTask returns a task function
// with 'func(ctx context.Context) string' signature.
func newCalculateFibonacciTask(n int) func(ctx context.Context) string {
return func(ctx context.Context) string {
return fmt.Sprintf("Calculated Fibonacci for: %d, result: %d.", n, fibonacci(n))
}
}

// newCalculateFibonacciWithErrorTask returns a task function
// with 'func(ctx context.Context) (string, error)' signature.
func newCalculateFibonacciWithErrorTask(n int) func(ctx context.Context) (string, error) {
return func(ctx context.Context) (string, error) {
return fmt.Sprintf("Calculated Fibonacci for: %d, result: %d.", n, fibonacci(n)), nil
}
}

func main() {
// Create a workers controller which does not start immediately.
// Set TasksBufferSize to 10 to allow for task accumulation.
w := workers.New[string](context.TODO(), &workers.Config{TasksBufferSize: 10})

wg := sync.WaitGroup{}

// Receive and print results or handle errors in a separate goroutine.
go func() {
for range 10 {
select {
case result := <-w.GetResults():
fmt.Println(result)
case err := <-w.GetErrors():
fmt.Println("error executing task:", err)
}

wg.Done()
}
}()

// Create channels to signal completion of task addition by processors.
p1Done := make(chan struct{}, 1)
p2Done := make(chan struct{}, 1)

// Processor 1: adds tasks returning string results.
go func() {
for i := 20; i >= 11; i -= 2 {
wg.Add(1)

time.Sleep(50 * time.Millisecond) // Simulate task preparation delay.

err := w.AddTask(newCalculateFibonacciTask(i))
if err != nil {
log.Fatalf("failed to add task: %v", err)
}
}

p1Done <- struct{}{} // Signal completion of processor 1.
}()

// Processor 2: adds tasks returning (string, error) results.
go func() {
for i := 19; i >= 11; i -= 2 {
wg.Add(1)

time.Sleep(50 * time.Millisecond) // Simulate task preparation delay

err := w.AddTask(newCalculateFibonacciWithErrorTask(i))
if err != nil {
log.Fatalf("failed to add task: %v", err)
}
}

p2Done <- struct{}{} // Signal completion of processor 2.
}()

// Wait for both processors to finish adding tasks.
<-p1Done
<-p2Done

// Start executing accumulated tasks.
w.Start(context.Background())

wg.Wait() // Wait for all tasks to finish.

// Close channels after processing.
close(w.GetResults())
close(w.GetErrors())
}

Example Output

Calculated Fibonacci for: 12, result: 144.
Calculated Fibonacci for: 11, result: 89.
Calculated Fibonacci for: 17, result: 1597.
Calculated Fibonacci for: 18, result: 2584.
Calculated Fibonacci for: 20, result: 6765.
Calculated Fibonacci for: 16, result: 987.
Calculated Fibonacci for: 13, result: 233.
Calculated Fibonacci for: 15, result: 610.
Calculated Fibonacci for: 19, result: 4181.
Calculated Fibonacci for: 14, result: 377.

Timeout and Cancellation

In this example, we demonstrate how to cancel the execution of tasks using a context with a timeout.
A workers controller is created with a TasksBufferSize set to allow for task accumulation before starting execution.

A context.WithTimeout is used to automatically stop the task execution after a predefined time limit, ensuring that long-running tasks are safely canceled.


package main

import (
"context"
"fmt"
"log"
"sync"
"time"

"github.com/ygrebnov/workers"
)

// fibonacci calculates Fibonacci numbers.
func fibonacci(n int) int {
if n <= 1 {
return n
}
return fibonacci(n-1) + fibonacci(n-2)
}

// newCalculateFibonacciTask returns a new task function.
func newCalculateFibonacciTask(n int) func(ctx context.Context) string {
return func(ctx context.Context) string {
// Simulate a long-running task
time.Sleep(time.Millisecond * 600 * time.Duration(n))

return fmt.Sprintf("Calculated Fibonacci for: %d, result: %d.", n, fibonacci(n))
}
}

func main() {
// Create a workers controller.
// Set TasksBufferSize to 10 to allow for task accumulation.
// Note that leaving TasksBufferSize at zero will lead to a deadlock,
// as the tasks are added before the workers are started.
w := workers.New[string](context.TODO(), &workers.Config{TasksBufferSize: 10})

wg := sync.WaitGroup{}

// Receive and print results or handle errors in a separate goroutine.
go func() {
for range 10 {
select {
case result := <-w.GetResults():
fmt.Println(result)
case err := <-w.GetErrors():
fmt.Println("error executing task:", err)
}

wg.Done()
}
}()

// Add tasks to the workers controller.
for i := 1; i <= 10; i++ {
wg.Add(1)

err := w.AddTask(newCalculateFibonacciTask(i))
if err != nil {
log.Fatalf("failed to add task: %v", err)
}
}

// Create a context with a timeout of 2 seconds.
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel() // Ensure the context is canceled to release resources.

// Start the workers.
w.Start(ctx)

// Wait for all tasks to finish.
wg.Wait()

// Close the results and errors channels.
close(w.GetResults())
close(w.GetErrors())
}

Example Output

Calculated Fibonacci for: 1, result: 1.
Calculated Fibonacci for: 2, result: 1.
Calculated Fibonacci for: 3, result: 2.
error executing task: task execution cancelled: context deadline exceeded
error executing task: task execution cancelled: context deadline exceeded
error executing task: task execution cancelled: context deadline exceeded
error executing task: task execution cancelled: context deadline exceeded
error executing task: task execution cancelled: context deadline exceeded
error executing task: task execution cancelled: context deadline exceeded
error executing task: task execution cancelled: context deadline exceeded

Fixed-Limit vs Dynamic Pool

In this example, we demonstrate the difference between a fixed-limit worker pool and a dynamic worker pool.
A fixed-limit pool restricts the number of concurrent workers based on MaxWorkers, while a dynamic pool grows and shrinks automatically as needed. See Workers Pools for more details about the workers pools types.

We create two separate controllers and use them to execute the same set of tasks.

package main

import (
"context"
"fmt"
"log"
"runtime"
"sync"

"github.com/ygrebnov/workers"
)

// fibonacci calculates Fibonacci numbers.
func fibonacci(n int) int {
if n <= 1 {
return n
}
return fibonacci(n-1) + fibonacci(n-2)
}

func main() {
// Create a fixed-size worker pool with number of workers equal to number of logical CPUs.
w1 := workers.New[string](
context.Background(),
&workers.Config{
StartImmediately: true,
MaxWorkers: uint(runtime.NumCPU()),
},
)

// Create a dynamic worker pool (MaxWorkers = 0).
w2 := workers.New[string](
context.Background(),
&workers.Config{
StartImmediately: true,
},
)

wg := sync.WaitGroup{}

// Collect results from both worker pools.
go func() {
for range 20 {
select {
case result := <-w1.GetResults():
fmt.Println(result)

case result := <-w2.GetResults():
fmt.Println(result)

case err := <-w1.GetErrors():
fmt.Println("error executing fixed-limit workers task:", err)

case err := <-w2.GetErrors():
fmt.Println("error executing dynamic workers task:", err)
}

wg.Done()
}
}()

// Add tasks to both controllers.
for i := 20; i >= 11; i-- {
wg.Add(2)

err := w1.AddTask(func(ctx context.Context) string {
return fmt.Sprintf("[Fixed] Calculated Fibonacci for: %d, result: %d.", i, fibonacci(i))
})
if err != nil {
log.Fatalf("failed to add task to fixed workers: %v", err)
}

err = w2.AddTask(func(ctx context.Context) string {
return fmt.Sprintf("[Dynamic] Calculated Fibonacci for: %d, result: %d.", i, fibonacci(i))
})
if err != nil {
log.Fatalf("failed to add task to dynamic workers: %v", err)
}
}

wg.Wait() // Wait for all tasks to finish.

// Close result and error channels after processing.
close(w1.GetResults())
close(w2.GetResults())
close(w1.GetErrors())
close(w2.GetErrors())
}

Example Output

[Fixed] Calculated Fibonacci for: 20, result: 6765.
[Fixed] Calculated Fibonacci for: 19, result: 4181.
[Fixed] Calculated Fibonacci for: 17, result: 1597.
[Fixed] Calculated Fibonacci for: 18, result: 2584.
[Dynamic] Calculated Fibonacci for: 19, result: 4181.
[Dynamic] Calculated Fibonacci for: 20, result: 6765.
[Dynamic] Calculated Fibonacci for: 18, result: 2584.
[Fixed] Calculated Fibonacci for: 16, result: 987.
[Dynamic] Calculated Fibonacci for: 15, result: 610.
[Dynamic] Calculated Fibonacci for: 17, result: 1597.
[Fixed] Calculated Fibonacci for: 14, result: 377.
[Fixed] Calculated Fibonacci for: 15, result: 610.
[Fixed] Calculated Fibonacci for: 13, result: 233.
[Dynamic] Calculated Fibonacci for: 13, result: 233.
[Dynamic] Calculated Fibonacci for: 16, result: 987.
[Dynamic] Calculated Fibonacci for: 14, result: 377.
[Dynamic] Calculated Fibonacci for: 12, result: 144.
[Fixed] Calculated Fibonacci for: 11, result: 89.
[Fixed] Calculated Fibonacci for: 12, result: 144.
[Dynamic] Calculated Fibonacci for: 11, result: 89.

See Benchmarks for more details on the performance comparison of different worker pool types.


Stop on Error

In this example, we demonstrate how to stop task execution automatically when the first error is encountered. A workers controller is created with StopOnError set to true, which means that if any task returns an error, no further tasks will be started.

We simulate this by adding a set of Fibonacci calculation tasks, where one specific task (for input 19) deliberately returns an error.


package main

import (
"context"
"fmt"
"log"
"time"

"github.com/ygrebnov/workers"
)

// fibonacci calculates Fibonacci numbers.
func fibonacci(n int) int {
if n <= 1 {
return n
}
return fibonacci(n-1) + fibonacci(n-2)
}

// newCalculateFibonacciWithErrorTask returns a new task function.
func newCalculateFibonacciWithErrorTask(n int) func(ctx context.Context) (string, error) {
return func(ctx context.Context) (string, error) {
// Simulate some delay.
time.Sleep(50 * time.Millisecond)

// Function returns an error for n == 19.
if n == 19 {
return "", fmt.Errorf("error calculating Fibonacci for: %d", n)
}

return fmt.Sprintf("Calculated Fibonacci for: %d, result: %d.", n, fibonacci(n)), nil
}
}

func main() {
// Create a workers controller with 'StopOnError' set to true.
w := workers.New[string](
context.Background(),
&workers.Config{
StartImmediately: true,
StopOnError: true,
// If TasksBufferSize is set to 0, AddTask() will return an error after a task execution error occurs.
TasksBufferSize: 10,
},
)

done := make(chan struct{}, 1) // Channel to signal results processing completion.

// Receive and print results or handle errors in a separate goroutine.
go func() {
// Set a timer to stop processing after 500 milliseconds.
timer := time.NewTimer(500 * time.Millisecond)

for range 10 {
select {
case <-timer.C:
done <- struct{}{}
return
case result := <-w.GetResults():
fmt.Println(result)
case err := <-w.GetErrors():
fmt.Println("error executing task:", err)
}
}

done <- struct{}{} // To prevent deadlock if timer is not triggered.
}()

// Add tasks with passing arguments using a helper function.
for i := 20; i >= 11; i-- {
err := w.AddTask(newCalculateFibonacciWithErrorTask(i))
if err != nil {
log.Fatalf("failed to add task: %v", err)
}
}

<-done // Wait for results processing to finish.

// Close result and error channels after processing.
close(w.GetResults())
close(w.GetErrors())
}

Example Output

Calculated Fibonacci for: 18, result: 2584.
Calculated Fibonacci for: 14, result: 377.
Calculated Fibonacci for: 16, result: 987.
Calculated Fibonacci for: 17, result: 1597.
Calculated Fibonacci for: 15, result: 610.
error executing task: error calculating Fibonacci for: 19

Note: as you can see from the output, the order of results may vary due to the concurrent nature of task execution. The original order of tasks is not guaranteed to be preserved.


Handle Panics Gracefully

In this example, we demonstrate how panics inside task functions are safely recovered and reported as standard errors. The workers controller is designed to automatically catch panics during task execution and send them to the errors channel without crashing the entire application.

We simulate this by creating a set of Fibonacci calculation tasks, where one specific task (for input 15) deliberately triggers a panic. This ensures that even unexpected runtime errors are gracefully handled and the system remains stable.


package main

import (
"context"
"fmt"
"log"
"sync"
"time"

"github.com/ygrebnov/workers"
)

// fibonacci calculates Fibonacci numbers.
func fibonacci(n int) int {
if n <= 1 {
return n
}
return fibonacci(n-1) + fibonacci(n-2)
}

// newCalculateFibonacciTask returns a new task function.
func newCalculateFibonacciTask(n int) func(ctx context.Context) string {
return func(ctx context.Context) string {
time.Sleep(50 * time.Millisecond) // Simulate some delay.

// Function panics for n == 15
if n == 15 {
panic(fmt.Sprintf("panic during calculating Fibonacci for: %d", n))
}

return fmt.Sprintf("Calculated Fibonacci for: %d, result: %d.", n, fibonacci(n))
}
}

func main() {
// Create a workers controller.
w := workers.New[string](context.Background(), &workers.Config{StartImmediately: true})

wg := sync.WaitGroup{}

// Receive and print results or handle errors in a separate goroutine.
go func() {
for range 10 {
select {
case result := <-w.GetResults():
fmt.Println(result)
case err := <-w.GetErrors():
fmt.Println("error executing task:", err)
}

wg.Done()
}
}()

// Add tasks to the controller.
for i := 20; i >= 11; i-- {
wg.Add(1)

err := w.AddTask(newCalculateFibonacciTask(i))
if err != nil {
log.Fatalf("failed to add task to workers: %v", err)
}
}

wg.Wait() // Wait for all tasks to finish.

// Close result and error channels after processing.
close(w.GetResults())
close(w.GetErrors())
}

Example Output

Calculated Fibonacci for: 18, result: 2584.
Calculated Fibonacci for: 17, result: 1597.
error executing task: task execution panicked: panic during calculating Fibonacci for: 15
Calculated Fibonacci for: 13, result: 233.
Calculated Fibonacci for: 14, result: 377.
Calculated Fibonacci for: 12, result: 144.
Calculated Fibonacci for: 11, result: 89.
Calculated Fibonacci for: 19, result: 4181.
Calculated Fibonacci for: 16, result: 987.
Calculated Fibonacci for: 20, result: 6765.