Goworkers - Zero dependency Golang worker pool

Golang Worker Pool

Build Status

Zero dependency golang goroutines pool library. It is useful in situations where you wish to control the number of goroutines in concurrent activities. It has the following features:

  • Allows consumers to configure maxWorkers and job queue size.
  • A pool can also be created with warm workers. This eliminates any small delay in getting a ready worker to process jobs.
  • Provides both blocking and non-blocking retrival of results of one or more Jobs
  • All jobs are timed and tracked.
  • Support for context.Context for every job that is submitted. All deadlines and timeouts should be enforced via context.Context
  • APIs provided to create different processor functions - convention is borrowed from functional programming (e.g fn, mapper, consumer). Additions processor functions will be introduced in future.

Usage:

To use this library in your golang project, you can get the dependency via:

go get -u github.com/unmarshall/goworkers

Create a new Pool

func main() {
    // Create a new Pool. Every pool has an id associated to it. There are optional
    // configuration which you configure the pool with. The below configuration will
    // create a pool ("pool-1") with max 10 workers, a job queue which is big enough to hold
    // up to 100 jobs and the pool will be started with 2 live workers waiting for jobs.
    // if optional configuration of `WithWarmWorkers` is not set then pool will not have any live workers
    // and workers will be created/scaled up to Max-Workers when jobs are submitted.
    p, err := gwp.NewPool("pool-1", 10, gwp.WithMaxJobs(100), gwp.WithWarmWorkers(2))
    if err != nil {
        panic(err) // you can handle it differently. This is just an example
    }
    defer func () {
        _ = p.Close() // once you are done with the pool you should close it.
    }()
}

Creating and submitting a simple Job

A new job is created passing a processor which takes no input and returns only error. If it needs any input for its processing then it can be passed to it via its closure.

func main() {
    // create an initialize pool
    // ------------------------------------------------------------------------------
    // Creating a simple processor function and submitting it.
    jobFuture, err := p.NewJob(context.Background(), "job1", processor).Process()
    // err can happen only if there is an issue submitting this Job. If the job is submitted
    // to the pool then any error thereafter can be accessed via JobFuture
    if err != nil {
        // handle error
    }
    // Await will block till the result is available or the context has been cancelled.
    // There is a non-blocking variant `jobFuture.Stream()` which will provide you a channel
    // on which you can poll for the `JobResult` once it is available
    jobResult := jobFuture.Await()
    // Each JobResult will contain a result (optional, in the above case processor function passed in the job does not return any value other than error)
    // Additional JobResult will contain status, error (if any) and metrics.
    fmt.Printf("result: %v, status: %s, error: %+v, metric: %+v", jobResult.Result, jobResult.Status, jobResult.Err, jobResult.Metric)
}

func processor() error {
// your processing code should go here. 
return nil
}

Create a mapper job

Mapper in functional programming is a function which takes an input and returns an output (of the same or different type).

func main() {
    // create an initialize pool
    // ------------------------------------------------------------------------------
    // create payload that you wish to pass
    var payload interface{}

    // Creating a simple processor function and submitting it.
    jobFuture, err := p.NewMapperJob(context.Background(), "job1", mapProcessor).ProcessPayload(payload)
    // err can happen only if there is an issue submitting this Job. If the job is submitted
    // to the pool then any error thereafter can be accessed via JobFuture
    if err != nil {
    // handle error
    }
    // Await will block till the result is available or the context has been cancelled.
    // There is a non-blocking variant `jobFuture.Stream()` which will provide you a channel
    // on which you can poll for the `JobResult` once it is available
    jobResult := jobFuture.Await()
    // Each JobResult will contain a result (optional, in the above case processor function passed in the job does not return any value other than error)
    // Additional JobResult will contain status, error (if any) and metrics.
    fmt.Printf("result: %v, status: %s, error: %+v, metric: %+v", jobResult.Result, jobResult.Status, jobResult.Err, jobResult.Metric)
}

func mapProcessor(payload interface{}) (interface{}, error) {
	// your processing code should go here. 
	return nil, nil
}

Process batch payloads via Mapper Job

If you have a batch of payloads that you wish to process concurrently then you can submit all the payloads together.

NOTE: Ensure that there is sufficient capacity that you have configured as pool's jobQ size, else your request will be rejected.

func main() {
    // create an initialize pool
    // ------------------------------------------------------------------------------
    // create payloads that you wish to pass
    var payloads []interface{}

    // Creating a simple processor function and submitting it.
    jobFuture, err := p.NewMapperJob(context.Background(), "job1", mapProcessor).ProcessPayloadBatch(payloads)
    // err can happen only if there is an issue submitting this Job. If the job is submitted
    // to the pool then any error thereafter can be accessed via JobFuture
    if err != nil {
        // handle error
    }
    // jobFuture.Stream will return a channel which will contain one or more JobResults (one per payload).
    resultsChannel := jobFuture.Stream()
    // you can range over the resultsChannel to get the results
    results := make([]JobResult, 0, len(payloads))
    for r := range resultsChannel {
    	results = append(results, r)
    }
}

func mapProcessor(payload interface{}) (interface{}, error) {
// your processing code should go here. 
return nil, nil
}
Similar Resources

Go-ldap-pool - A simple connection pool for go-ldap

Basic connection pool for go-ldap This little library use the go-ldap library an

Dec 17, 2022

Simple in-memory job queue for Golang using worker-based dispatching

artifex Simple in-memory job queue for Golang using worker-based dispatching Documentation here: https://godoc.org/github.com/mborders/artifex Cron jo

Dec 24, 2022

goworker is a Go-based background worker that runs 10 to 100,000* times faster than Ruby-based workers.

goworker goworker is a Resque-compatible, Go-based background worker. It allows you to push jobs into a queue using an expressive language like Ruby w

Jan 6, 2023

errgroup with goroutine worker limits

neilotoole/errgroup neilotoole/errgroup is a drop-in alternative to Go's wonderful sync/errgroup but limited to N goroutines. This is useful for inter

Dec 15, 2022

Golang simple thread pool implementation

Golang Threadpool implementation Scalable threadpool implementation using Go to handle the huge network trafic. Install go get github.com/shettyh/thre

Dec 12, 2022

Off heap golang memory pool

Stealthpool stealthpool provides a memory pool that allocates blocks off-heap that will NOT be tracked by the garbage collector. The name stealthpool

Dec 5, 2022

Queue is a Golang library for spawning and managing a Goroutine pool

Queue is a Golang library for spawning and managing a Goroutine pool, Alloowing you to create multiple worker according to limit CPU number of machine.

Jan 9, 2023

Queue is a Golang library for spawning and managing a Goroutine pool

Queue is a Golang library for spawning and managing a Goroutine pool, Alloowing you to create multiple worker according to limit CPU number of machine.

Jan 2, 2023

goroutine pool in golang

goroutine pool in golang

Nov 1, 2021
Golang Implementation of Worker Pool/ Thread Pool

Golang Implementation of Worker Pool/ Thread Pool

Jun 18, 2022
Work pool channlege - An url hash retriever worker pool for getting hash digest for a collection of urls

Code challenge The aim of this project is to provide an url hash retriever worke

Feb 16, 2022
Worker - A Golang library that provides worker pools

Worker A Golang library that provides worker pools. Usage See *_test.go files. T

Apr 15, 2022
Minimalistic and High-performance goroutine worker pool written in Go

pond Minimalistic and High-performance goroutine worker pool written in Go Motivation This library is meant to provide a simple way to limit concurren

Dec 22, 2022
Go simple async worker pool
Go simple async worker pool

??‍?? worker-pool Go simple async worker pool. ?? ABOUT Worker pool is a software design pattern for achieving concurrency of task execution. Maintain

Sep 26, 2022
Worker-Pool written in GO

go-workerpool Worker-Pool written in GO Installation go get github.com/agungsid/go-workerpool Usage package main type SampleSeeder struct{} func (s

Jun 10, 2022
Deadly simple worker pool

go-worker-pool Deadly simple worker pool Usage package main import ( "errors" workerpool "github.com/zelenin/go-worker-pool" "log" "time" ) func

Dec 10, 2021
Go-async - Worker pool (fan-in/fan-out)

Worker pool (fan-in/fan-out) func main() { pool := worker.NewPool(2) ctx := co

Aug 26, 2022
Worker pool library with auto-scaling, backpressure, and easy composability of pools into pipelines

workerpool Worker pool library with auto-scaling, backpressure, and easy composability of pools into pipelines. Uses Go 1.18 generics. Notable differe

Oct 5, 2022
Go-miningcore-pool - Miningcore Pool written in GOlang

Go-Miningcore-Pool (COMING SOON) Miningcore Pool written in GOlang 0x01 Configur

Apr 24, 2022