👷 Library for safely running groups of workers concurrently or consecutively that require input and output through channels

go workers

Maintainability Go Report Card GoCover Go Reference

Examples

Getting Started

Pull in the dependency

go get github.com/catmullet/go-workers

Add the import to your project

giving an alias helps since go-workers doesn't exactly follow conventions.
(If your using a JetBrains IDE it should automatically give it an alias)

import (
    worker "github.com/catmullet/go-workers"
)

Create a new worker worker

The NewWorker factory method returns a new worker.
(Method chaining can be performed on this method like calling .Work() immediately after.)

type MyWorker struct {}

func NewMyWorker() *MyWorker {
	return &MyWorker{}
}

func (my *MyWorker) Work(w *goworker.Worker, in interface{}) error {
	// work iteration here
}

worker := worker.NewWorker(ctx, NewMyWorker(), numberOfWorkers)

Send work to worker

Send accepts an interface. So send it anything you want.

worker.Send("Hello World")

Close and wait for the worker to finish and handle errors

Any error that bubbles up from your worker functions will return here.

if err := worker.Close(); err != nil {
    //Handle error
}

Working With Multiple Workers

Passing work form one worker to the next

By using the InFrom method you can tell workerTwo to accept output from workerOne

workerOne := worker.NewWorker(ctx, NewMyWorker(), 100).Work()
workerTwo := worker.NewWorker(ctx, NewMyWorkerTwo(), 100).InFrom(workerOne).Work()

Accepting output from multiple workers

It is possible to accept output from more than one worker but it is up to you to determine what is coming from which worker. (They will send on the same channel.)

workerOne := worker.NewWorker(ctx, NewMyWorker(), 100).Work()
workerTwo := worker.NewWorker(ctx, NewMyWorkerTwo(), 100).Work()
workerThree := worker.NewWorker(ctx, NewMyWorkerThree(), 100).InFrom(workerOne, workerTwo).Work()

Passing Fields To Workers

Adding Values

Fields can be passed via the workers object. Be sure as with any concurrency in Golang that your variables are concurrent safe. Most often the golang documentation will state the package or parts of it are concurrent safe. If it does not state so there is a good chance it isn't. Use the sync package to lock and unlock for writes on unsafe variables. (It is good practice NOT to defer in the work function.)

worker ONLY use the Send() method to get data into your worker. It is not shared memory unlike the worker objects values.

type MyWorker struct {
	message string
}

func NewMyWorker(message string) *MyWorker {
	return &MyWorker{message}
}

func (my *MyWorker) Work(w *goworker.Worker, in interface{}) error {
	fmt.Println(my.message)
}

worker := worker.NewWorker(ctx, NewMyWorker(), 100).Work()

Setting Timeouts or Deadlines

If your workers needs to stop at a deadline or you just need to have a timeout use the SetTimeout or SetDeadline methods. (These must be in place before setting the workers off to work.)

 // Setting a timeout of 2 seconds
 timeoutWorker.SetTimeout(2 * time.Second)

 // Setting a deadline of 4 hours from now
 deadlineWorker.SetDeadline(time.Now().Add(4 * time.Hour))

func workerFunction(w *worker.Worker, in interface{}) error {
	fmt.Println(in)
	time.Sleep(1 * time.Second)
}

Performance Hints

Buffered Writer

If you want to write out to a file or just stdout you can use SetWriterOut(writer io.Writer). The worker will have the following methods available

worker.Println()
worker.Printf()
worker.Print()

The workers use a buffered writer for output and can be up to 3 times faster than the fmt package. Just be mindful it won't write out to the console as quickly as an unbuffered writer. It will sync and eventually flush everything at the end, making it ideal for writing out to a file.

Using GOGC env variable

If your application is based solely around using workers, consider upping the percentage of when the scheduler will garbage collect. (ex. GOGC=200) 200% -> 300% is a good starting point. Make sure your machine has some good memory behind it. By upping the percentage your application will interupt the workers less, meaning they get more work done. However, be aware of the rest of your applications needs when modifying this variable.

Using GOMAXPROCS env variable

For workers that run quick bursts of lots of simple data consider lowering the GOMAXPROCS. Be carfeful though, this can affect your entire applicaitons performance. Profile your application and benchmark it. See where your application runs best.

Similar Resources

Batch processing library for Go supports generics & values returning

Aggregator Aggregator is a batch processing library for Go supports returning values. You can group up and process batch of tasks with keys in a singl

Dec 29, 2022

Simple application that waits for a given time and attempts and then exits

Wait Simple application that waits for a given time and attempts and then exits. WAIT_HOSTS is a list of hosts to wait for. e.g. WAIT_HOSTS=tcp://app:

Nov 24, 2021

🐜🐜🐜 ants is a high-performance and low-cost goroutine pool in Go, inspired by fasthttp./ ants 是一个高性能且低损耗的 goroutine 池。

🐜🐜🐜 ants is a high-performance and low-cost goroutine pool in Go, inspired by fasthttp./ ants 是一个高性能且低损耗的 goroutine 池。

A goroutine pool for Go English | 🇨🇳 中文 📖 Introduction Library ants implements a goroutine pool with fixed capacity, managing and recycling a massi

Jan 2, 2023

A sync.WaitGroup with error handling and concurrency control

go-waitgroup How to use An package that allows you to use the constructs of a sync.WaitGroup to create a pool of goroutines and control the concurrenc

Dec 31, 2022

🐝 A Highly Performant and easy to use goroutine pool for Go

🐝 A Highly Performant and easy to use goroutine pool for Go

gohive Package gohive implements a simple and easy to use goroutine pool for Go Features Pool can be created with a specific size as per the requireme

Sep 26, 2022

Go asynchronous simple function utilities, for managing execution of closures and callbacks

Go asynchronous simple function utilities, for managing execution of closures and callbacks

⚙️ gollback gollback - Go asynchronous simple function utilities, for managing execution of closures and callbacks 📖 ABOUT Contributors: Rafał Lorenz

Dec 29, 2022

Minimalistic and High-performance goroutine worker pool written in Go

pond Minimalistic and High-performance goroutine worker pool written in Go Motivation This library is meant to provide a simple way to limit concurren

Dec 22, 2022

:speedboat: a limited consumer goroutine or unlimited goroutine pool for easier goroutine handling and cancellation

Package pool Package pool implements a limited consumer goroutine or unlimited goroutine pool for easier goroutine handling and cancellation. Features

Jan 1, 2023

go routine control, abstraction of the Main and some useful Executors.如果你不会管理Goroutine的话,用它

go routine control, abstraction of the Main and some useful Executors.如果你不会管理Goroutine的话,用它

routine Routine Architecture Quick Start package main import ( "log" "context" "github.com/x-mod/routine" ) func main(){ if err := routine.Main

Dec 6, 2022
Comments
  • Error not bubbling up

    Error not bubbling up

    Describe the bug The error returned from the worker are not bubbling up

    To Reproduce

    package main
    
    import (
    	"context"
    	"errors"
    	"fmt"
    	"github.com/catmullet/go-workers"
    )
    
    func main() {
    	ctx := context.Background()
    	rnr := workers.NewRunner(ctx, NewWorker(), 1).Start()
    
    	rnr.Send("Print this out")
    
    	if err := rnr.Wait(); err != nil {
    		fmt.Println(err)
    	}
    
    }
    
    type MyWorker struct {
    }
    
    func NewWorker() workers.Worker {
    	return &MyWorker{}
    }
    
    func (wo *MyWorker) Work(in interface{}, out chan<- interface{}) error {
    	return errors.New("empty name")
    }
    

    Expected behavior From documentation the error code should be printed out but it's empty (nil)

  • Worker Time outs

    Worker Time outs

    Is your feature request related to a problem? Please describe. What if I want to only wait, say, 100 seconds per worker

    Describe the solution you'd like After n seconds terminate worker and send err message

  • Adding legacy integration methods.

    Adding legacy integration methods.

      // OutChannel Returns the workers output channel if worker is not joined with another worker.
      // If the worker already has a child worker attached this function will return an error (workers.ErrOutChannelUpdate).
      OutChannel() (chan interface{}, error)
    
       // InChannel Returns the workers intake channel for use with legacy systems, otherwise use workers Send() method.
       InChannel() chan interface{}
    
Unlimited job queue for go, using a pool of concurrent workers processing the job queue entries

kyoo: A Go library providing an unlimited job queue and concurrent worker pools About kyoo is the phonetic transcription of the word queue. It provide

Dec 21, 2022
Limits the number of goroutines that are allowed to run concurrently

Golang Concurrency Manager Golang Concurrency Manager package limits the number of goroutines that are allowed to run concurrently. Installation Run t

Dec 12, 2022
Run tasks concurrently with limits

Workerpool Package workerpool implements a concurrency limiting worker pool. Worker routines are spawned on demand as tasks are submitted. This packag

Nov 3, 2022
A simple and useful goroutine concurrent library.

Taskgroup A simple and useful goroutine concurrent library. Installation go get github.com/anthhub/taskgroup

May 19, 2021
Queue is a Golang library for spawning and managing a Goroutine pool

Queue is a Golang library for spawning and managing a Goroutine pool, Alloowing you to create multiple worker according to limit CPU number of machine.

Jan 9, 2023
Queue is a Golang library for spawning and managing a Goroutine pool

Queue is a Golang library for spawning and managing a Goroutine pool, Alloowing you to create multiple worker according to limit CPU number of machine.

Jan 2, 2023
Worker pool library with auto-scaling, backpressure, and easy composability of pools into pipelines

workerpool Worker pool library with auto-scaling, backpressure, and easy composability of pools into pipelines. Uses Go 1.18 generics. Notable differe

Oct 5, 2022
Alternative sync library for Go
Alternative sync library for Go

Alternative sync library for Go. Overview Future - A placeholder object for a value that may not yet exist. Promise - While futures are defined as a t

Dec 23, 2022
parallel: a Go Parallel Processing Library

parallel: a Go Parallel Processing Library Concurrency is hard. This library doesn't aim to make it easy, but it will hopefully make it a little less

May 9, 2022
Worker - A Golang library that provides worker pools

Worker A Golang library that provides worker pools. Usage See *_test.go files. T

Apr 15, 2022