Run tasks concurrently with limits

Workerpool

Go Reference CI Go Report Card

Package workerpool implements a concurrency limiting worker pool. Worker routines are spawned on demand as tasks are submitted.

This package is mostly useful when tasks are CPU bound and spawning too many routines would be detrimental to performance. It features a straightforward API and no external dependencies. See the section below for a usage example.

Example

package main

import (
	"context"
	"fmt"
	"os"
	"runtime"

	"github.com/cilium/workerpool"
)

// IsPrime returns true if n is prime, false otherwise.
func IsPrime(n int64) bool {
	if n < 2 {
		return false
	}
	for p := int64(2); p*p <= n; p++ {
		if n%p == 0 {
			return false
		}
	}
	return true
}

func main() {
	wp := workerpool.New(runtime.NumCPU())
	for i, n := 0, int64(1_000_000_000_000_000_000); n < 1_000_000_000_000_000_100; i, n = i+1, n+1 {
		n := n // https://golang.org/doc/faq#closures_and_goroutines
		id := fmt.Sprintf("task #%d", i)
		// Use Submit to submit tasks for processing. Submit blocks when no
		// worker is available to pick up the task.
		err := wp.Submit(id, func(_ context.Context) error {
			fmt.Println("isprime", n)
			if IsPrime(n) {
				fmt.Println(n, "is prime!")
			}
			return nil
		})
		// Submit fails when the pool is closed (ErrClosed) or being drained
		// (ErrDrained). Check for the error when appropriate.
		if err != nil {
			fmt.Fprintln(os.Stderr, err)
			return
		}
	}

	// Drain prevents submitting new tasks and blocks until all submitted tasks
	// complete.
	tasks, err := wp.Drain()
	if err != nil {
		fmt.Fprintln(os.Stderr, err)
		return
	}

	// Iterating over the results is useful if non-nil errors can be expected.
	for _, task := range tasks {
		// Err returns the error that the task returned after execution.
		if err := task.Err(); err != nil {
			fmt.Println("task", task, "failed:", err)
		}
	}

	// Close should be called once the worker pool is no longer necessary.
	if err := wp.Close(); err != nil {
		fmt.Fprintln(os.Stderr, err)
	}
}
Owner
Cilium
eBPF-based Networking, Security, and Observability
Cilium
Comments
  • tests cleanup

    tests cleanup

    Having many tests in one big function is not ideal. This PR split some tests into their own test function along with some other cleanup. Probably best viewed commit by commit.

  • ci: enable more golangci-lint checkers, update config for Go 1.18

    ci: enable more golangci-lint checkers, update config for Go 1.18

    Commit 3f5cb13ba7dd12398c07192996e4fcaaa8bc3d68 updated CI for Go 1.18 as well as the version of golangci-lint. However, it omitted updating the golangci-lint configuration to default for Go 1.18 instead of Go 1.17. This commit does this and also enables more checkers.

    This commit also updated the golangci-lint action configuration for v3 without actually updating the action version itself, which prevents it from running. This issue is addressed in the first commit of this PR.

  • Wait for worker to be submitted in TestWorkerPoolLen

    Wait for worker to be submitted in TestWorkerPoolLen

    Occasionally, TestWorkerPoolLen will fail with:

    === RUN TestWorkerPoolLen workerpool_test.go:90: got 0; want 1 --- FAIL: TestWorkerPoolLen (0.00s)

    see e.g. https://github.com/cilium/workerpool/runs/5717463554?check_suite_focus=true

    This seems to happen more often since switching to Go 1.18.

    Fix this by waiting for the worker to be submitted before checking (*WorkerPool).Len.

  • Avoid crash when running task with (*task).run == nil

    Avoid crash when running task with (*task).run == nil

    The following program currently leads to a crash due to nil pointer dereference:

    wp := workerpool.New(runtime.NumCPU())
    defer wp.Close()
    if err := wp.Submit("foobar", nil); err != nil {
    	panic(err)
    }
    tasks, err := wp.Drain()
    

    Fix this by checking (*task).run before running it. This way we keep the semantics of returning a result for every submitted task. The alternative solution would be to not enqueue the task on (*WorkerPool).Submit("foobar", nil) and return either nil or an error. Both would lead to an imbalance between bumber of submitted tasks and returned results.

  • tasks: create taskResult to be returned by Drain

    tasks: create taskResult to be returned by Drain

    Before this patch, the workerpool would keep a reference to the task struct even after its task func it has completed to provide the resulting error (if any) when Drain() is called. Since the task struct has a reference to the user-provided provided task func, the task func cannot be garbage collected until Drain() is called. This could be problematic as (1) the number of task is unbounded and (2) the workerpool has no control over the memory used by the task func.

    This patch introduce a taskResult struct satisfying the Task interface that doesn't keep a reference to the task func.

  • Move tests into a separate package

    Move tests into a separate package

    Moving the test code out of the workerpool package allows to write tests from the perspective of a real user of the package. We cannot possibly fiddle around with internals and keep tests focussed on the exposed API.

    Currently, there's still one case where we want to check internal state: the (*WorkerPool).tasks capacity check. For that, export a (*WorkerPool).TasksCap method for testing only.

  • Run staticcheck as part of CI

    Run staticcheck as part of CI

    Use go install behavior in Go 1.16 to get staticcheck whithout having to add it as a package dependency.

    Also run gofmt only with Go 1.16. In the past there occasionally have been changes in gofmt behavior between Go versions. In order to avoid these, only run gofmt from the latest Go version, i.e. currently Go 1.16.

  • .github: use matrix strategy and test with Go 1.15 as well

    .github: use matrix strategy and test with Go 1.15 as well

    Test on Go 1.15 as well, given it is still officially supported (as per https://golang.org/doc/devel/release.html#policy), go.mod states go1.15 and we use no Go 1.16 specific features.

  • add context to the task function signature

    add context to the task function signature

    There are use-cases where the caller does not care about the results of tasks processing and thus never calls the Drain method. This is typically the case for tasks that are never expected to return unless a context is cancelled. To accommodate for this use-case, add a context as a parameter to the task function.

    As tasks are now allowed to run forever unless the context is cancelled, ensure that calling the Close method cancels the tasks context to properly tear down all workers. It is the responsibility of the library user to ensure submitted tasks respect the context.

  • workerpool: refactor results to avoid unnecessary copy

    workerpool: refactor results to avoid unnecessary copy

    Background: To avoid leaking the internal implementation of the task struct, WorkPool.Drain() return a slice of Task interface.

    Before this patch, the WorkPool struct would maintain a slice of concret task struct pointers to be used as return value for the Drain() method. Since Drain() returns a slice of Task interface, a conversion was required.

    This patch make it so the WorkPool results is a slice of Task interface instead. That way it can be returned as-is by Drain().

errgroup with goroutine worker limits

neilotoole/errgroup neilotoole/errgroup is a drop-in alternative to Go's wonderful sync/errgroup but limited to N goroutines. This is useful for inter

Dec 15, 2022
👷 Library for safely running groups of workers concurrently or consecutively that require input and output through channels
👷 Library for safely running groups of workers concurrently or consecutively that require input and output through channels

Examples Quickstart Multiple Go Workers Passing Fields Getting Started Pull in the dependency go get github.com/catmullet/go-workers Add the import to

Dec 1, 2022
Set up tasks to be executed in parallel.

A simple Go library to set up tasks to be executed in parallel. package main import ( "context" "log" "github.com/bep/workers" ) func main() {

Sep 18, 2022
Run functions in parallel :comet:

Parallel fn Run functions in parallel. Limit the number of goroutines running at the same time. Installation go get -u github.com/rafaeljesus/parallel

Sep 26, 2022
Limits the number of goroutines that are allowed to run concurrently

Golang Concurrency Manager Golang Concurrency Manager package limits the number of goroutines that are allowed to run concurrently. Installation Run t

Dec 12, 2022
Package tasks is an easy to use in-process scheduler for recurring tasks in Go

Tasks Package tasks is an easy to use in-process scheduler for recurring tasks in Go. Tasks is focused on high frequency tasks that run quick, and oft

Dec 18, 2022
Delay-tasks - A delayed tasks implementation for golang
Delay-tasks - A delayed tasks implementation for golang

delay-tasks An implementation of delayed tasks. Usage $ git clone https://github

Jan 14, 2022
errgroup with goroutine worker limits

neilotoole/errgroup neilotoole/errgroup is a drop-in alternative to Go's wonderful sync/errgroup but limited to N goroutines. This is useful for inter

Dec 15, 2022
Golang io.Reader and io.Writer but with limits

LimitIO io.Reader and io.Writer with limit.

Dec 14, 2022
Ransomware: a type of malware that prevents or limits users from accessing their system
Ransomware: a type of malware that prevents or limits users from accessing their system

Ransomware Note 1: This project is purely academic, use at your own risk. I do not encourage in any way the use of this software illegally or to attac

Nov 17, 2021
GitHub Rate Limits Prometheus exporter. Works with both App and PAT credentials
GitHub Rate Limits Prometheus exporter. Works with both App and PAT credentials

Github Rate Limit Prometheus Exporter A prometheus exporter which scrapes GitHub API for the rate limits used by PAT/GitHub App. Helm Chart with value

Sep 19, 2022
Pacemaker - Rate limit library. Currently implemented rate limits are

PaceMaker Rate limit library. Currently implemented rate limits are Fixed window

Nov 5, 2022
Ready is a program to run tasks before a commit using a pre-commit git hook.
Ready is a program to run tasks before a commit using a pre-commit git hook.

Ready Ready is a program to run tasks before a commit using a pre-commit git hook. For example, you can automatically run formatting, linting, and tes

Aug 23, 2022
run/stop goroutines/tasks securely, recursively

grunner - run/stop goroutines/tasks securely, recursively. s1 := grunner.New() s1.Defer(func() { fmt.Println("s1 stopped 2") }) s1.Defer(func() {

Apr 22, 2022
A package to allow one to concurrently go through a filesystem with ease

skywalker Skywalker is a package to allow one to concurrently go through a filesystem with ease. Features Concurrency BlackList filtering WhiteList fi

Nov 14, 2022
👷 Library for safely running groups of workers concurrently or consecutively that require input and output through channels
👷 Library for safely running groups of workers concurrently or consecutively that require input and output through channels

Examples Quickstart Multiple Go Workers Passing Fields Getting Started Pull in the dependency go get github.com/catmullet/go-workers Add the import to

Dec 1, 2022
This is a go implementation of Segmented Sieve and non Segmented sieve to produce prime numbers concurrently.

Prime This is a Go library to produce prime numbers using all available cpu cores. Installation $ go get github.com/kavehmz/prime Usage package main

Dec 30, 2022
Testing framework for Go. Allows writing self-documenting tests/specifications, and executes them concurrently and safely isolated. [UNMAINTAINED]

GoSpec GoSpec is a BDD-style testing framework for the Go programming language. It allows writing self-documenting tests/specs, and executes them in p

Nov 28, 2022
SizedWaitGroup has the same role and close to the same API as the Golang sync.WaitGroup but it adds a limit on the amount of goroutines started concurrently.

SizedWaitGroup SizedWaitGroup has the same role and API as sync.WaitGroup but it adds a limit of the amount of goroutines started concurrently. SizedW

Jan 8, 2023
Multiplexer over TCP. Useful if target server only allows you to create limited tcp connections concurrently.

tcp-multiplexer Use it in front of target server and let your client programs connect it, if target server only allows you to create limited tcp conne

May 27, 2021