A goroutine pool for Go

Tunny

godoc for Jeffail/tunny goreportcard for Jeffail/tunny

Tunny is a Golang library for spawning and managing a goroutine pool, allowing you to limit work coming from any number of goroutines with a synchronous API.

A fixed goroutine pool is helpful when you have work coming from an arbitrary number of asynchronous sources, but a limited capacity for parallel processing. For example, when processing jobs from HTTP requests that are CPU heavy you can create a pool with a size that matches your CPU count.

Install

go get github.com/Jeffail/tunny

Or, using dep:

dep ensure -add github.com/Jeffail/tunny

Use

For most cases your heavy work can be expressed in a simple func(), where you can use NewFunc. Let's see how this looks using our HTTP requests to CPU count example:

package main

import (
	"io/ioutil"
	"net/http"
	"runtime"

	"github.com/Jeffail/tunny"
)

func main() {
	numCPUs := runtime.NumCPU()

	pool := tunny.NewFunc(numCPUs, func(payload interface{}) interface{} {
		var result []byte

		// TODO: Something CPU heavy with payload

		return result
	})
	defer pool.Close()

	http.HandleFunc("/work", func(w http.ResponseWriter, r *http.Request) {
		input, err := ioutil.ReadAll(r.Body)
		if err != nil {
			http.Error(w, "Internal error", http.StatusInternalServerError)
		}
		defer r.Body.Close()

		// Funnel this work into our pool. This call is synchronous and will
		// block until the job is completed.
		result := pool.Process(input)

		w.Write(result.([]byte))
	})

	http.ListenAndServe(":8080", nil)
}

Tunny also supports timeouts. You can replace the Process call above to the following:

result, err := pool.ProcessTimed(input, time.Second*5)
if err == tunny.ErrJobTimedOut {
	http.Error(w, "Request timed out", http.StatusRequestTimeout)
}

You can also use the context from the request (or any other context) to handle timeouts and deadlines. Simply replace the Process call to the following:

result, err := pool.ProcessCtx(r.Context(), input)
if err == context.DeadlineExceeded {
	http.Error(w, "Request timed out", http.StatusRequestTimeout)
}

Changing Pool Size

The size of a Tunny pool can be changed at any time with SetSize(int):

pool.SetSize(10) // 10 goroutines
pool.SetSize(100) // 100 goroutines

This is safe to perform from any goroutine even if others are still processing.

Goroutines With State

Sometimes each goroutine within a Tunny pool will require its own managed state. In this case you should implement tunny.Worker, which includes calls for terminating, interrupting (in case a job times out and is no longer needed) and blocking the next job allocation until a condition is met.

When creating a pool using Worker types you will need to provide a constructor function for spawning your custom implementation:

pool := tunny.New(poolSize, func() Worker {
	// TODO: Any per-goroutine state allocation here.
	return newCustomWorker()
})

This allows Tunny to create and destroy Worker types cleanly when the pool size is changed.

Ordering

Backlogged jobs are not guaranteed to be processed in order. Due to the current implementation of channels and select blocks a stack of backlogged jobs will be processed as a FIFO queue. However, this behaviour is not part of the spec and should not be relied upon.

Owner
Ashley Jeffs
If you want to get in touch please find me in person I'm not good with computers.
Ashley Jeffs
Comments
  • Recommended Settings for IO bound jobs?

    Recommended Settings for IO bound jobs?

    Hi there all! I was wondering if anybody have any recommended settings(as in GOMAXPROCS and number of workers) for IO bound jobs that I can start with? If you're wondering what IO bound jobs I'm doing, I'm accessing a remote mysql database. Would like some suggestions if you have any. Thanks!

  • doesn't work with 32 bit Windows

    doesn't work with 32 bit Windows

    It's all ok for 64 bit, but if I set GOARCH=386, there is a panic. My test code:

    package main
    
    import (
    	"github.com/Jeffail/tunny"
    )
    
    var pool *tunny.Pool
    
    func main() {
    
    	pool = tunny.NewFunc(10, func(data interface{}) interface{} {
    		str := data.(string)
    		println("str:", str)
    
    		return nil
    	})
    	defer pool.Close()
    
    	pool.Process("11111111")
    
    	for {
    	}
    
    }
    

    And the panic output:

    $ go run . panic: runtime error: invalid memory address or nil pointer dereference [signal 0xc0000005 code=0x0 addr=0x0 pc=0x401bdc]

    goroutine 1 [running]: runtime/internal/atomic.Xadd64(0x1305801c, 0x1, 0x0, 0x13058014, 0x44c7d4) D:/Go/src/runtime/internal/atomic/asm_386.s:102 +0xc github.com/Jeffail/tunny.(*Pool).Process(0x13058000, 0x459ae0, 0x473168, 0x13046090, 0x13044030) d:/gopath/src/github.com/Jeffail/tunny/tunny.go:152 +0x3e main.main() e:/go/3/3.go:19 +0x7f exit status 2

  • Change example to set GOMAXPROCS to numCPUs+1

    Change example to set GOMAXPROCS to numCPUs+1

    You should really have an OS thread spare to handle non-busy work, like handling the HTTP requests.

    Otherwise you are relying on Go's 'preemption' to give the HTTP handler a chance, which may or may not happen in a timely manner depending on the nature of your "heavy work".

  • PSA: API Changes - If your builds suddenly broke read this

    PSA: API Changes - If your builds suddenly broke read this

    The tunny API has been changed recently. If you have found that builds are suddenly broken and this repo has shown up in the error logs then it is likely due to this. You have two options:

    Update to the latest API

    The new API is simpler, comes with performance improvements and adds more capabilities, so it might be worth simply updating. For the majority of cases that will mean switching from:

    pool, _ := tunny.CreatePool(n, func(object interface{}) interface{} {
        // Stuff
    }).Open()
    defer pool.Close()
    
    foo, _ := pool.SendWork(bar)
    

    To:

    pool := tunny.NewFunc(n, func(object interface{}) interface{} {
        // Stuff
    })
    defer pool.Close()
    
    foo := pool.Process(bar)
    

    Use vendoring

    The old API is tagged at version 0.0.1, therefore you can use vendoring to continue building with the old API by targeting that version.

    If you are using deps then add this to your Gopkg.toml file:

    [[constraint]]
    name = "github.com/Jeffail/tunny"
    version = "0.0.1"
    
  • Panic on pool.SetSize(5) via http request

    Panic on pool.SetSize(5) via http request

    
    var pool tunny.Pool
    
    ...
    
    	numCPUs := runtime.NumCPU()
    
    	pool := tunny.NewFunc(numCPUs, func(payload interface{}) interface{} {
    		time.Sleep(time.Second)
    		log.Println(fmt.Sprintf("superprocess %v", payload))
    
    		return nil
    	})
    	pool.SetSize(1)
    	defer pool.Close()
    
    ...
    
    	http.HandleFunc("/tune", endpointTune)
    
    ...
    
    
    	go func() {
    
    		for {
    			for Deque.Len() != 0 {
    				go pool.Process(Deque.PopFront())
    			}
    			time.Sleep(time.Second)
    		}
    	}()
    
    package main
    
    import (
    	"net/http"
    )
    
    
    type TuneRequest struct {
    	PoolSize int
    }
    
    
    func endpointTune(res http.ResponseWriter, req *http.Request) {
    	r := TuneRequest{}
    
    	err := decoder.Decode(&r, req.URL.Query())
    
    	if err != nil {
    		return
    	}
    
    	pool.SetSize(r.PoolSize)
    }
    
    2020/04/07 11:45:23 http: panic serving [::1]:57105: runtime error: invalid memory address or nil pointer dereference
    goroutine 27 [running]:
    net/http.(*conn).serve.func1(0xc0000a2b40)
    	/Go/src/net/http/server.go:1772 +0x150
    panic(0x840e40, 0xb3e540)
    	/Go/src/runtime/panic.go:973 +0x429
    github.com/Jeffail/tunny.(*Pool).SetSize(0xb4b460, 0x5)
    	/Go/bin/src/github.com/Jeffail/tunny/tunny.go:236 +0x108
    main.endpointTune(0x90ef20, 0xc00010c000, 0xc000132100)
    	/src/endpointTune.go:24 +0xf4
    net/http.HandlerFunc.ServeHTTP(0x8bc580, 0x90ef20, 0xc00010c000, 0xc000132100)
    	/Go/src/net/http/server.go:2012 +0x4b
    net/http.(*ServeMux).ServeHTTP(0xb4b4e0, 0x90ef20, 0xc00010c000, 0xc000132100)
    	/Go/src/net/http/server.go:2387 +0x1ad
    net/http.serverHandler.ServeHTTP(0xc000180000, 0x90ef20, 0xc00010c000, 0xc000132100)
    	/Go/src/net/http/server.go:2807 +0x216
    net/http.(*conn).serve(0xc0000a2b40, 0x90f520, 0xc0000e1780)
    	/Go/src/net/http/server.go:1895 +0x171d
    created by net/http.(*Server).Serve
    	/Go/src/net/http/server.go:2933 +0x938
    
  • tunny performance

    tunny performance

    i want to use tunny as my process pool && i test it with this code:

    package main
    
    import (
        "github.com/Jeffail/tunny"
        "github.com/pkg/profile"
    )
    
    type Worker struct {
    }
    
    func (this *Worker) TunnyReady() bool {
        return true
    }
    
    func (this *Worker) TunnyJob(interface{}) interface{} {
        return nil 
    }
    
    func main() {
        defer profile.Start(profile.BlockProfile).Stop()
    
        workers := make([]tunny.TunnyWorker, 100, 100)
        for i, _ := range workers {
            workers[i] = &Worker{}
        }   
    
        pool, _ := tunny.CreateCustomPool(workers).Open()
        defer pool.Close()
    
        //wg := new(sync.WaitGroup)
        //wg.add(10)
    
        //for i := 0; i < 10; i++ {
        //  go func() {
        //      for i:=0 i
        //      defer wg.Done()
        //  }()
        //} 
        //wg.Wait()
    
        for i := 0; i < 10000000; i++ {
            pool.SendWork(i)
        }   
    }
    

    i find that all wokers are blocked here. I think that readyChan can have one element at most, why it's blocked here?

    ROUTINE ======================== github.com/Jeffail/tunny.(*workerWrapper).Loop in /home/eric/Code/Go/GOPATH/src/github.com/Jeffail/tunny/worker.go
             0    7.67hrs (flat, cum) 99.92% of Total
             .          .     46:           break
             .          .     47:       }
             .          .     48:       time.Sleep(tout * time.Millisecond)
             .          .     49:   }
             .          .     50:
             .   283.72ms     51:   wrapper.readyChan <- 1
             .          .     52:
             .   161.92ms     53:   for data := range wrapper.jobChan {
             .     23.28s     54:       wrapper.outputChan <- wrapper.worker.TunnyJob(data)
             .          .     55:       for !wrapper.worker.TunnyReady() {
             .          .     56:           if atomic.LoadUint32(&wrapper.poolOpen) == 0 {
             .          .     57:               break
             .          .     58:           }
             .          .     59:           time.Sleep(tout * time.Millisecond)
             .          .     60:       }
             .    7.67hrs     61:       wrapper.readyChan <- 1
             .          .     62:   }
             .          .     63:
             .          .     64:   close(wrapper.readyChan)
             .          .     65:   close(wrapper.outputChan)
    
  • Getting the worker id in the job function

    Getting the worker id in the job function

    Hello ! It would be really helpful to me to get the worker ID in the executed job, because I want to parallelize work using a pool of singletons that are stateful, so one should never process more than 1 task at a time, but I can create X of them.

    I was thinking of something like this:

    tunny.NewFunc(3, func(payload interface{}, workerId int) interface{} {
        // workerId can be 0, 1, or 2
    })
    

    Could it be an interesting feature? Can make a PR if you think it have a chance to be included in the project.

    ====

    Edit: Another way would be to be able to provide to tunny our own workers, implementing the tunny worker interface (this way we can expose the required element in the function) - I think it may be a cleaner way to implement it (@naggingant)

    What do you suggest ?

  • I don't understand 1 moment in new API

    I don't understand 1 moment in new API

    example old

    for {
      _, err_time := pool.SendWorkTimed(time.Millisecond, func() {
        slog.Log(fmt.Sprintf("%#v", "wait connection"), nil)   
        ln.Accept()
      })
    
      if err_time != nil {
        slog.Log(fmt.Sprintf("%#v", "timeout"), nil)
        time.Sleep(time.Millisecond)
      }
    }
    

    example new

    for {
      _, err_time := pool.ProcessTimed(func() {
        slog.Log(fmt.Sprintf("%#v", "wait connection"), nil)   
        ln.Accept()
      }, time.Millisecond)
    
      if err_time == tunny.ErrJobTimedOut {
        slog.Log(fmt.Sprintf("%#v", "timeout"), nil)
        time.Sleep(time.Millisecond)
      }
    }
    

    In old version api the function WorkTimed will wait accept connection In new version api the function ProcessTimed will throw always timeout error

    Where right working? I used to think what workTimed for wait free goroutine from pool But new api said for me what procesTimed about timeout for function execute time?

    Sorry for my bad english.. Thanks

  • Panic behaviour notes

    Panic behaviour notes

    May I know how tunny behaves if panic happens inside the NewFunc. Does tunny re-spawn the worker or what is the ideal way to handle panic in a NewFunc worker?

  • Process a pool asynchronously

    Process a pool asynchronously

    After big API changes how we can process a pool asynchronously?

    Also what about callback functions?

    TIP for author: When you change the lib API you have to respect your users and

    1. don't make brake changes if not necessary
    2. keep the same feature as before unless you did some terrible wrong before.
    3. if you did the 1 and 2 it's ok but the first thing that you have to do is to create a migration guide!

    Please, answer the 2 question above!

  • Use tunny for IO bound concurrency?

    Use tunny for IO bound concurrency?

    Hi, I should mention that I'm new to Golang.

    I'm trying to write a solution that behaves as follows: – I have a web service built with go-json-rest framework; – handling endpoint, let's say test, I'm going to make outbound HTTP requests (from 1 to N) and get back the results of each; – to get those requests concurrently, I'm just using goroutines one per each HTTP request (similar basic approach here: http://blog.narenarya.in/concurrent-http-in-go.html)

    This approach is good enough for basic usage, but when we talk about resources (go routines) then we should care about managing it somehow. So, what do you think, is it ok using Tunny to handle such kind of resource management or we can just use semaphores like this:

    semaphor := make(chan int, concurrency)
    for url := range urls {
      semaphor <- 1
      go func( url string ) {
        // do http get here
        <-semaphor //block on semaphore
      }(url)
    }
    

    ?

    Additionally, I'd like to notify client if there are no resources to handle request with some HTTP status.

    Thanks in advance

  • feat: give high priority to closeChan

    feat: give high priority to closeChan

    give more priority to closeChan. Because select will randomly select one when multiple channel available, when many closeChan and reqChan come at the same time in extreme cases, giving high priority to closeChan to prevent closeChan to be ignored all the time

  • Update go.mod

    Update go.mod

    for fix go mod download error as follows:

    github.com/jeffail/tunny: github.com/jeffail/[email protected]: parsing go.mod:
            module declares its path as: github.com/Jeffail/tunny
                    but was required as: github.com/jeffail/tunny
    
  • Go1.18 feature type parameter

    Go1.18 feature type parameter

    Hi, Go 1.18 supports type parameter✨ that can be used to reduce the usage of interface{} / any and type casting, i wonder if we can support this feature soon as 1.18 release on Feb 2022, Thanks :)

  • when I was using this codes, some panics happened!

    when I was using this codes, some panics happened!

    `package goTest

    import ( "fmt" "github.com/Jeffail/tunny" "strconv" "testing" "time" )

    func Test_pools(t *testing.T) {

    pool := tunny.NewFunc(11, func(payload interface{}) interface{} {
    	fmt.Println(payload.(int))
    	test1(payload.(int))
    	time.Sleep(time.Second)
    	return nil
    })
    defer pool.Close()
    for i := 0; i < 10; i++ {
    	go pool.Process(i)
    }
    pool.Close()
    

    } func test1(i int){ fmt.Println("a"+strconv.Itoa(i)) } `

    problem `GOROOT=/usr/local/Cellar/go/1.14.6/libexec #gosetup GOPATH=/Users/gudaixin/go #gosetup /usr/local/Cellar/go/1.14.6/libexec/bin/go test -c -o /private/var/folders/ty/2yjqwcmn1xv6d9v7blg39x0h0000gn/T/___Test_pools_in_ProjectGoModule_src_goTest ProjectGoModule/src/goTest #gosetup /usr/local/Cellar/go/1.14.6/libexec/bin/go tool test2json -t /private/var/folders/ty/2yjqwcmn1xv6d9v7blg39x0h0000gn/T/___Test_pools_in_ProjectGoModule_src_goTest -test.v -test.run ^Test_pools$ #gosetup === RUN Test_pools 6 a6 7 a7 8 a8 9 a9 1 a1 panic: the pool is not running

    goroutine 34 [running]: github.com/Jeffail/tunny.(*Pool).Process(0xc0000a4640, 0x1114780, 0xc0000a61c0, 0xa9978700000000f, 0x61da5e0f) /Users/gudaixin/go/pkg/mod/github.com/!jeffail/[email protected]/tunny.go:158 +0x13d created by ProjectGoModule/src/goTest.Test_pools /Users/gudaixin/Public/PrivatePersonGoProject/ProjectGoModule/src/goTest/pool_test.go:21 +0xb1

    Process finished with exit code 1 `

golang worker pool , Concurrency limiting goroutine pool

golang worker pool 中文说明 Concurrency limiting goroutine pool. Limits the concurrency of task execution, not the number of tasks queued. Never blocks su

Dec 19, 2022
🐝 A Highly Performant and easy to use goroutine pool for Go
🐝 A Highly Performant and easy to use goroutine pool for Go

gohive Package gohive implements a simple and easy to use goroutine pool for Go Features Pool can be created with a specific size as per the requireme

Sep 26, 2022
Lightweight Goroutine pool

grpool Lightweight Goroutine pool Clients can submit jobs. Dispatcher takes job, and sends it to first available worker. When worker is done with proc

Dec 6, 2022
Minimalistic and High-performance goroutine worker pool written in Go

pond Minimalistic and High-performance goroutine worker pool written in Go Motivation This library is meant to provide a simple way to limit concurren

Dec 22, 2022
A goroutine pool for Go
A goroutine pool for Go

Tunny is a Golang library for spawning and managing a goroutine pool, allowing you to limit work coming from any number of goroutines with a synchrono

Dec 31, 2022
Concurrency limiting goroutine pool

workerpool Concurrency limiting goroutine pool. Limits the concurrency of task execution, not the number of tasks queued. Never blocks submitting task

Dec 28, 2022
Queue is a Golang library for spawning and managing a Goroutine pool

Queue is a Golang library for spawning and managing a Goroutine pool, Alloowing you to create multiple worker according to limit CPU number of machine.

Jan 9, 2023
Queue is a Golang library for spawning and managing a Goroutine pool

Queue is a Golang library for spawning and managing a Goroutine pool, Alloowing you to create multiple worker according to limit CPU number of machine.

Jan 2, 2023
goroutine pool in golang

goroutine pool in golang

Nov 1, 2021
Golang Implementation of Worker Pool/ Thread Pool

Golang Implementation of Worker Pool/ Thread Pool

Jun 18, 2022
Go-miningcore-pool - Miningcore Pool written in GOlang

Go-Miningcore-Pool (COMING SOON) Miningcore Pool written in GOlang 0x01 Configur

Apr 24, 2022
Go-ldap-pool - A simple connection pool for go-ldap

Basic connection pool for go-ldap This little library use the go-ldap library an

Dec 17, 2022
Work pool channlege - An url hash retriever worker pool for getting hash digest for a collection of urls

Code challenge The aim of this project is to provide an url hash retriever worke

Feb 16, 2022
errgroup with goroutine worker limits

neilotoole/errgroup neilotoole/errgroup is a drop-in alternative to Go's wonderful sync/errgroup but limited to N goroutines. This is useful for inter

Dec 15, 2022
Waiting group for collecting goroutine information.

在go语言waitGroup和errGroup都是用来控制goroutine的并发的方式,前者只能等待所有goroutine执行完成之后再执行Wait()函数后面的代码并且不

Dec 3, 2021
A simple and useful goroutine concurrent library.

Taskgroup A simple and useful goroutine concurrent library. Installation go get github.com/anthhub/taskgroup

May 19, 2021
Provides some convenient API, includes Goid(), AllGoid(), and LocalStorage, which is a goroutine's local storage, just like ThreadLocal in other languages.

routine 中文版 routine encapsulates and provides some easy-to-use, high-performance goroutine context access interfaces, which can help you access corout

Dec 30, 2022
A universal mechanism to manage goroutine lifecycles

A universal mechanism to manage goroutine lifecycles

Dec 29, 2022
A cross goroutine storage tool with very simple implementation and function.

Simple-goroutine-local is a cross goroutine storage tool with very simple implementation and function (the concept is similar to Java ThreadLocal). Ge

Jan 13, 2022