errgroup with goroutine worker limits

Actions Status Go Report Card release Coverage GoDoc license

neilotoole/errgroup

neilotoole/errgroup is a drop-in alternative to Go's wonderful sync/errgroup but limited to N goroutines. This is useful for interaction with rate-limited APIs, databases, and the like.

Overview

In effect, neilotoole/errgroup is sync/errgroup but with a worker pool of N goroutines. The exported API is identical but for an additional function WithContextN, which allows the caller to specify the maximum number of goroutines (numG) and the capacity of the queue channel (qSize) used to hold work before it is picked up by a worker goroutine. The zero Group and the Group returned by WithContext have numG and qSize equal to runtime.NumCPU.

Usage

The exported API of this package mirrors the sync/errgroup package. The only change needed is the import path of the package, from:

import (
  "golang.org/x/sync/errgroup"
)

to

import (
  "github.com/neilotoole/errgroup"
)

Then use in the normal manner. See the godoc for more.

g, ctx := errgroup.WithContext(ctx)
g.Go(func() error {
    // do something
    return nil
})

err := g.Wait()

Many users will have no need to tweak the numG and qCh params. However, benchmarking may suggest particular values for your workload. For that you'll need WithContextN:

numG, qSize := 8, 4
g, ctx := errgroup.WithContextN(ctx, numG, qSize)

Performance

The motivation for creating neilotoole/errgroup was to provide rate-limiting while maintaining the lovely sync/errgroup semantics. Sacrificing some performance vs sync/errgroup was assumed. However, benchmarking suggests that this implementation can be more effective than sync/errgroup when tuned for a specific workload.

Below is a selection of benchmark results. How to read this: a workload is X tasks of Y complexity. The workload is executed for:

  • sync/errgroup, listed as sync_errgroup
  • a non-parallel implementation (sequential)
  • various {numG, qSize} configurations of neilotoole/errgroup, listed as errgroupn_{numG}_{qSize}
BenchmarkGroup_Short/complexity_5/tasks_50/errgroupn_default_16_16-16         	   25574	     46867 ns/op	     688 B/op	      12 allocs/op
BenchmarkGroup_Short/complexity_5/tasks_50/errgroupn_4_4-16                   	   24908	     48926 ns/op	     592 B/op	      12 allocs/op
BenchmarkGroup_Short/complexity_5/tasks_50/errgroupn_16_4-16                  	   24895	     48313 ns/op	     592 B/op	      12 allocs/op
BenchmarkGroup_Short/complexity_5/tasks_50/errgroupn_32_4-16                  	   24853	     48284 ns/op	     592 B/op	      12 allocs/op
BenchmarkGroup_Short/complexity_5/tasks_50/sync_errgroup-16                   	   18784	     65826 ns/op	    1858 B/op	      55 allocs/op
BenchmarkGroup_Short/complexity_5/tasks_50/sequential-16                      	   10000	    111483 ns/op	       0 B/op	       0 allocs/op

BenchmarkGroup_Short/complexity_20/tasks_50/errgroupn_default_16_16-16        	    3745	    325993 ns/op	    1168 B/op	      27 allocs/op
BenchmarkGroup_Short/complexity_20/tasks_50/errgroupn_4_4-16                  	    5186	    227034 ns/op	    1072 B/op	      27 allocs/op
BenchmarkGroup_Short/complexity_20/tasks_50/errgroupn_16_4-16                 	    3970	    312816 ns/op	    1076 B/op	      27 allocs/op
BenchmarkGroup_Short/complexity_20/tasks_50/errgroupn_32_4-16                 	    3715	    320757 ns/op	    1073 B/op	      27 allocs/op
BenchmarkGroup_Short/complexity_20/tasks_50/sync_errgroup-16                  	    2739	    432093 ns/op	    1862 B/op	      55 allocs/op
BenchmarkGroup_Short/complexity_20/tasks_50/sequential-16                     	    2306	    520947 ns/op	       0 B/op	       0 allocs/op

BenchmarkGroup_Short/complexity_40/tasks_250/errgroupn_default_16_16-16       	     354	   3602666 ns/op	    1822 B/op	      47 allocs/op
BenchmarkGroup_Short/complexity_40/tasks_250/errgroupn_4_4-16                 	     420	   2468605 ns/op	    1712 B/op	      47 allocs/op
BenchmarkGroup_Short/complexity_40/tasks_250/errgroupn_16_4-16                	     334	   3581349 ns/op	    1716 B/op	      47 allocs/op
BenchmarkGroup_Short/complexity_40/tasks_250/errgroupn_32_4-16                	     310	   3890316 ns/op	    1712 B/op	      47 allocs/op
BenchmarkGroup_Short/complexity_40/tasks_250/sync_errgroup-16                 	     253	   4740462 ns/op	    8303 B/op	     255 allocs/op
BenchmarkGroup_Short/complexity_40/tasks_250/sequential-16                    	     200	   5924693 ns/op	       0 B/op	       0 allocs/op

The overall impression is that neilotoole/errgroup can provide higher throughput than sync/errgroup for these (CPU-intensive) workloads, sometimes significantly so. As always, these benchmark results should not be taken as gospel: your results may vary.

Design Note

Why require an explicit qSize limit?

If the number of calls to Group.Go results in qCh becoming full, the Go method will block until worker goroutines relieve qCh. This behavior is in contrast to sync/errgroup's Go method, which doesn't block. While neilotoole/errgroup aims to be as much of a behaviorally similar "drop-in" alternative to sync/errgroup as possible, this blocking behavior is a conscious deviation.

Noting that the capacity of qCh is controlled by qSize, it's probable an alternative implementation could be built that uses a (growable) slice acting - if qCh is full - as a buffer for functions passed to Go. Consideration of this potential design led to this issue regarding unlimited capacity channels, or perhaps better characterized in this particular case as "growable capacity channels". If such a feature existed in the language, it's possible that this implementation might have taken advantage of it, at least in the first-pass release (benchmarking notwithstanding). However benchmarking seems to suggest that a relatively small qSize has performance benefits for some workloads, so it's possible that the explicit qSize requirement is a better design choice regardless.

Comments
  • There is no recover in startG

    There is no recover in startG

    There is no recover in startG, so it could cause a process crash once the goroutine panic.

    Maybe we could solve it through

    func (g *Group) startG() {
    	g.wg.Add(1)
    	go func() {
    		defer func() {
    			if r := recover(); r != nil {
    				buf := make([]byte, 64<<10)
    				buf = buf[:runtime.Stack(buf, false)]
    				g.err = fmt.Errorf("errgroup: panic recovered: %s\n %s", r, buf)
    			}
    			g.wg.Done()
    		}()
    
    		var f func() error
    
    
  • Suggestion: mention the new `SetLimit` function found in the golang.org/x/sync `errgroup` package in README.md

    Suggestion: mention the new `SetLimit` function found in the golang.org/x/sync `errgroup` package in README.md

    It looks like the sync/errgroup package now has a SetLimit function. You can see the documentation at https://pkg.go.dev/golang.org/x/sync/errgroup#Group.SetLimit . I believe that this is a fairly recent happening.

    I feel like a mention of this new function in the original package in the README.md is probably merited.

    There is some difference in the two APIs of the two packages; this one has a WithContextN(ctx, numG, qSize) function whereas that one has just WithContext(ctx) and SetLimit(n) functions. At a glance it seems like the n in the original pretty similar to the numG parameter here, and there doesn't seem to be a qSize parameter in the original package. I definitely have not compared the two packages in detail, or really even attempted to understand the implementation of either one. I am just an end-user.

    I used this package some time ago at a previous job (maybe two of them, actually, I can't quite remember) and it was helpful for my purposes, so thank you for that. Ultimately, I'm wondering now if using this package in new code is necessary or recommended, given the new SetLimit function in the original package, and would appreciate hearing any thoughts from more knowledgeable sources.

  • fix: Fix deadlock raised in #11

    fix: Fix deadlock raised in #11

    This fixes #11 .

    Root cause:

    • When an error occurs we exit a worker by returning from startG function.
    • If the number of workers gCount reaches 0 while all other functions are already in the buffer then a deadlock will be reached since there is no chance a new worker will be created and the current number of workers is 0.

    Fix: (Note: there are different ways of achieving this)

    • Before adding to the buffer check if there was already an error, in case there was then just exit and don't try to add to buffer.
    • Unit test added
  • Deadlock with context

    Deadlock with context

    package main
    
    import (
    	"fmt"
    	"context"
    	"github.com/neilotoole/errgroup"
    )
    
    func main() {
    	ctx, cancel := context.WithCancel(context.Background())
    	defer cancel()
    
    
    	eg, ctx := errgroup.WithContextN(ctx, 1, 1)
    	eg.Go(func() error { return nil })
    	eg.Go(func() error { return nil })
    	eg.Go(func() error { return nil })
    	eg.Go(func() error { return fmt.Errorf("sample error") })
    	eg.Go(func() error { return nil })
    	eg.Go(func() error { return nil })
    	eg.Go(func() error { return nil })
    	
    	if err := eg.Wait(); err != nil {
    		fmt.Printf("Error group error: %s", err.Error())
    	}
    
    }
    
    

    If you run this multiple times there is a chance at deadlocking. What am I doing wrong here?

  • Always run functions given to Go()?

    Always run functions given to Go()?

    Hi agai!. Sorry to be opening so many issues :-P.

    I came across another interesting behaviour difference compared to golang.org/x/sync/errgroup: It's possible for functions given to Go() to never be run.

    This can happen, I believe, if a worker goroutine encounters an error (such as a context cancellation) and then returns (in the goroutine in startG()). Then in Wait(), since our only condition to stop is waiting on the waitgroup, we end up setting qCh to nil and returning. This could leave messages in qCh that never get seen.

    golang.org/x/sync/errgroup on the other hand will always run a goroutine given to Go().

    It would be pretty racey to cause and I haven't reproduced it, but I encountered an issue with a service I have where it assumes a function must be run, even if it's going to error.

    I'm not sure what the best thing to do is however. If there's been an error, should we bother trying to run any queued functions? I'm not sure. But perhaps so since it would be a difference compared to what golang.org/x/sync/errgroup does.

  • Checking context in Go()

    Checking context in Go()

    Hi again!

    Another use of this package brought to mind a possible improvement: Checking whether the context is cancelled in Go().

    This isn't something golang.org/x/sync does, but given this package's Go() can block, I wonder if it might be useful.

    In my use case I am calling Go() thousands of times with a limited number of workers. I wondered what would happen if one of the goroutines returned with an error. Currently in my case they will do some non-context aware work before using the context and then they would see it is cancelled and return. So there would be a bunch of work done and likely Go() would be blocked, causing the error to not be seeing as quickly as it might be otherwise. Whereas if Go() checked the context and returned without invoking the callback, this would be less of a concern.

    Basically my use looks something like:

    for _, work := range thingsToDo {
        eg.Go(func() error {
            somethingNotContextAware()
            return somethingContextAware(ctx)
        })
    }
    
    if err := eg.Wait(); err != nil {
        return err
    }
    

    Essentially I'm thinking of making it as quick as possible to get to knowing there was an error.

    I could check the context in my callback or make my code more context aware, but I suppose I'm thinking it might be a nice addition to this package. Although I wonder whether it'd need an additional Go() function that is context aware, unless we held on to the context when creating the errgroup.

    I could probably come up with a PR if you think it'd be suitable.

    Thanks for your time!

Worker - A Golang library that provides worker pools

Worker A Golang library that provides worker pools. Usage See *_test.go files. T

Apr 15, 2022
golang worker pool , Concurrency limiting goroutine pool

golang worker pool δΈ­ζ–‡θ―΄ζ˜Ž Concurrency limiting goroutine pool. Limits the concurrency of task execution, not the number of tasks queued. Never blocks su

Dec 19, 2022
Minimalistic and High-performance goroutine worker pool written in Go

pond Minimalistic and High-performance goroutine worker pool written in Go Motivation This library is meant to provide a simple way to limit concurren

Dec 22, 2022
🐜🐜🐜 ants is a high-performance and low-cost goroutine pool in Go, inspired by fasthttp./ ants ζ˜―δΈ€δΈͺι«˜ζ€§θƒ½δΈ”δ½ŽζŸθ€—ηš„ goroutine 池。
🐜🐜🐜 ants is a high-performance and low-cost goroutine pool in Go, inspired by fasthttp./ ants ζ˜―δΈ€δΈͺι«˜ζ€§θƒ½δΈ”δ½ŽζŸθ€—ηš„ goroutine 池。

A goroutine pool for Go English | ???? δΈ­ζ–‡ ?? Introduction Library ants implements a goroutine pool with fixed capacity, managing and recycling a massi

Jan 2, 2023
Limits the number of goroutines that are allowed to run concurrently

Golang Concurrency Manager Golang Concurrency Manager package limits the number of goroutines that are allowed to run concurrently. Installation Run t

Dec 12, 2022
Run tasks concurrently with limits

Workerpool Package workerpool implements a concurrency limiting worker pool. Worker routines are spawned on demand as tasks are submitted. This packag

Nov 3, 2022
Simple in-memory job queue for Golang using worker-based dispatching

artifex Simple in-memory job queue for Golang using worker-based dispatching Documentation here: https://godoc.org/github.com/mborders/artifex Cron jo

Dec 24, 2022
goworker is a Go-based background worker that runs 10 to 100,000* times faster than Ruby-based workers.

goworker goworker is a Resque-compatible, Go-based background worker. It allows you to push jobs into a queue using an expressive language like Ruby w

Jan 6, 2023
Go simple async worker pool
Go simple async worker pool

??‍?? worker-pool Go simple async worker pool. ?? ABOUT Worker pool is a software design pattern for achieving concurrency of task execution. Maintain

Sep 26, 2022
Worker-Pool written in GO

go-workerpool Worker-Pool written in GO Installation go get github.com/agungsid/go-workerpool Usage package main type SampleSeeder struct{} func (s

Jun 10, 2022
Deadly simple worker pool

go-worker-pool Deadly simple worker pool Usage package main import ( "errors" workerpool "github.com/zelenin/go-worker-pool" "log" "time" ) func

Dec 10, 2021
Go-async - Worker pool (fan-in/fan-out)

Worker pool (fan-in/fan-out) func main() { pool := worker.NewPool(2) ctx := co

Aug 26, 2022
Golang Implementation of Worker Pool/ Thread Pool

Golang Implementation of Worker Pool/ Thread Pool

Jun 18, 2022
Goworkers - Zero dependency Golang worker pool

Golang Worker Pool Zero dependency golang goroutines pool library. It is useful

Apr 28, 2022
Work pool channlege - An url hash retriever worker pool for getting hash digest for a collection of urls

Code challenge The aim of this project is to provide an url hash retriever worke

Feb 16, 2022
Worker pool library with auto-scaling, backpressure, and easy composability of pools into pipelines

workerpool Worker pool library with auto-scaling, backpressure, and easy composability of pools into pipelines. Uses Go 1.18 generics. Notable differe

Oct 5, 2022
🐝 A Highly Performant and easy to use goroutine pool for Go
🐝 A Highly Performant and easy to use goroutine pool for Go

gohive Package gohive implements a simple and easy to use goroutine pool for Go Features Pool can be created with a specific size as per the requireme

Sep 26, 2022
Lightweight Goroutine pool

grpool Lightweight Goroutine pool Clients can submit jobs. Dispatcher takes job, and sends it to first available worker. When worker is done with proc

Dec 6, 2022
A goroutine pool for Go
A goroutine pool for Go

Tunny is a Golang library for spawning and managing a goroutine pool, allowing you to limit work coming from any number of goroutines with a synchrono

Dec 31, 2022