A universal mechanism to manage goroutine lifecycles

run

GoDoc Build Status Go Report Card Apache 2 licensed

run.Group is a universal mechanism to manage goroutine lifecycles.

Create a zero-value run.Group, and then add actors to it. Actors are defined as a pair of functions: an execute function, which should run synchronously; and an interrupt function, which, when invoked, should cause the execute function to return. Finally, invoke Run, which concurrently runs all of the actors, waits until the first actor exits, invokes the interrupt functions, and finally returns control to the caller only once all actors have returned. This general-purpose API allows callers to model pretty much any runnable task, and achieve well-defined lifecycle semantics for the group.

run.Group was written to manage component lifecycles in func main for OK Log. But it's useful in any circumstance where you need to orchestrate multiple goroutines as a unit whole. Click here to see a video of a talk where run.Group is described.

Examples

context.Context

ctx, cancel := context.WithCancel(context.Background())
g.Add(func() error {
	return myProcess(ctx, ...)
}, func(error) {
	cancel()
})

net.Listener

ln, _ := net.Listen("tcp", ":8080")
g.Add(func() error {
	return http.Serve(ln, nil)
}, func(error) {
	ln.Close()
})

io.ReadCloser

var conn io.ReadCloser = ...
g.Add(func() error {
	s := bufio.NewScanner(conn)
	for s.Scan() {
		println(s.Text())
	}
	return s.Err()
}, func(error) {
	conn.Close()
})

Comparisons

Package run is somewhat similar to package errgroup, except it doesn't require actor goroutines to understand context semantics.

It's somewhat similar to package tomb.v1 or tomb.v2, except it has a much smaller API surface, delegating e.g. staged shutdown of goroutines to the caller.

Owner
OK Log
The logs are gonna be okay
OK Log
Comments
  • Handle second SIGINT

    Handle second SIGINT

    Applications often listen to additional interrupt signals after the first one and terminate the application (with exit code 2) if it fails to gracefully stop after the first signal.

    Do you think it would make sense in run as well?

  • Feature Request: Defer a function until interrupted

    Feature Request: Defer a function until interrupted

    I propose extending go's defer where the function would be called when the group is interrupted. Something like:

    // Defer a func until interrupted
    func (g *Group) Defer(f func()) {
    	c := make(chan struct{}, 1)
    	execute := func() error {
    		<-c
    		f()
    		return nil
    	}
    	interrupt := func(error) {
    		c <- struct{}{}
    	}
    	g.Add(execute, interrupt)
    }
    

    Thanks for a useful package.

  • Consider handling panics of goroutines

    Consider handling panics of goroutines

    run.Group spawns one or more goroutines which execute given functions. However, goroutine do not and cannot inherit panic handlers from parent goroutines so a panic() in one of the child goroutines will kill the whole program. Please consider adding a new parameter to Group.Run() which will add a recover() and then return it as an error to the caller. Until then we are forced to add a wrapper function which will do that like here: https://github.com/improbable-eng/thanos/pull/932.

  • This package is only remotely similar to errgroup

    This package is only remotely similar to errgroup

    Note that I'm sure this is due to not understanding this, but I hope then that my feedback can maybe help improve the docs/examples.

    I have used errgroup -- where the semantics is:

    The first call to return a non-nil error cancels the group; its error will be returned by Wait.

    The tests/examplesn in this repo with multiple coordinated goroutines seem to focus on the error case and not so much on the happy case.

    I adjusted the TestMany test to not return any errors, and I find it surprising that it fails:

    func TestManyOK(t *testing.T) {
    	var g run.Group
    	g.Add(func() error { return nil }, func(error) {})
    	cancel := make(chan struct{})
    	counter := 0
    	g.Add(func() error {
    		for i := 0; i < 100; i++ {
    			counter++
    			// Do some work
    			strings.Repeat("a", 3000)
    			select {
    			case <-cancel:
    				return nil
    			default:
    			}
    		}
    		return nil
    	}, func(err error) {
    		close(cancel)
    	})
    
    	res := make(chan error)
    	go func() { res <- g.Run() }()
    	select {
    	case err := <-res:
    		if err != nil {
    			t.Error("got error:", err)
    		}
    		if counter != 100 {
    			t.Error("got", counter)
    
    		}
    	case <-time.After(100 * time.Millisecond):
    		t.Error("timeout")
    	}
    }
    
    
  • Proposal: Call each `g.actors` `.interrupt(err)` method in reverse

    Proposal: Call each `g.actors` `.interrupt(err)` method in reverse

    Firstly, I've just converted a small project over to use run.Group() and I love it - so thank you.

    For the project, there will be a number of long running top-level actors hence this package is pretty much perfect. The way we construct these actors, is "in the order dictated by their dependency relationships" (as quoted from here https://blog.gopheracademy.com/advent-2017/run-group/). This works because the g.actors .execute() functions are called in order.

    But (just like defer in some ways) it might make sense to call the g.actors .interrupt(err) functions in reverse order so the later actors that depend on the former actors are closed down and tidied up first, with the actors at the base of the relationships being closed last.

    Your thoughts appreciated and apologies for opening a mostly inconclusive issue on a small, well used, and mature project.

  • Problematic licensing

    Problematic licensing

    Allow me to apologize in advance for opening an issue about licensing lest it be considered bikeshedding.

    Please consider changing the license from Apache 2.0 (AL2) to a freer and simpler license such as the 2 or 3 clause BSD licenses or the MIT license. Unfortunately, AL2 is incompatible with GPLv2, cutting off AL2-licensed and GPLv2-licensed projects from each other, splitting the open source ecosystem. This needless conflict is contrary to the purpose of a permissive license: unencumbered sharing and cooperation.

    Additionally, AL2 is long and verbose, and its book-keeping clause is yet more bureaucracy for developers to waste their time and screen space on. Seeing as you have not included the massive AL2 license header in the source files, as suggested in AL2's appendix, I take it you prefer your source files to contain source code, not bureaucratic boilerplate like license headers and obligatory "prominent notices" stating that the file has been changed by so-and-so.

    Truly permissive licenses, such as the BSD and MIT licenses, do not have these problems, allowing developers to focus on software rather than legal headaches.

  • Any reason why interrupt functions are executed synchronously?

    Any reason why interrupt functions are executed synchronously?

    Hi @peterbourgon ,

    Long time since my last question but... I just faced an issue. Context:

    • I use your library to deal with multiple modules
    • when a SIGTERM is received, the group is supposed to interrupt all actors
    • it always worked in the past but... today I got into troubles
    • one of my interrupt has "httpServer.Shutdown(ctx)" which waits for all connections to be closed (but it could the same for grpcServer.GracefulStop())

    Today, one of my graceful methods to close connections got stuck... and since all interrupts are called in a synchronous for (https://github.com/oklog/run/blob/master/group.go#L45-L48), the rest of the actors were still running until... the supervisor (Kubernetes) decided the application had reached the time limit to shut down... and simply SIGKILL the program.

    It has left some modules not closing... (it seems nothing too bad but that's annoying).

    I would like to understand why the interrupt functions are not called into multiple goroutines? Still by switching to async interruptions a module would have not been closed (the blocking one), but at least the rest of actors could have.

    Since I cannot reproduce my issue (the Kubernetes time limit was 30s, I don't know any request lasting for so long, and I have no websocket... I cannot exactly explain what happened), I will probably try to play with a context.WithCancel(ctx) for the HTTP servers when closing .Shutdown(ctx), and for the gRPC, launch aside the .GracefulStop() a goroutine so after X seconds if still not closed it will just .Close() the server (abruptly). But in addition depending on your answer, I could also fork your library to do async interruptions... having your thoughts could help 😃 👍

    Thank you,

  • Best way to not call interrupt functions when

    Best way to not call interrupt functions when "err = nil"

    Hi @peterbourgon ,

    Thanks so much for this library, it's really easy to use and it's been a while for me now 👍

    I have a specific question, as soon as an execute function will return, the run.Group will call each interrupt function even if there is no error.

    In my case I have a service manager that accepts you to add your own module as soon as you "implement" some functions like "Start/Stop/Ready...". That's great to supervise all modules and terminate them properly when needed.

    My only concern is when someone implement a Start() that is none-blocking (but that needs to be considered as a module), on my side I will add it to the run.Group but since it returns nil just after the group starts, it interrupts all other modules.

    The solution I see right now is to make modules explicitly implement a function IsSynchronous() bool and if yes, I execute the Start() directly without adding it to the run.Group.

    I could otherwise tell module developers the Start() is required to be async and they would implement empty channel to block... but I feel like it's not a proper way.

    If you have any suggestion, I'm interested 😄 (maybe I need to redesign having "generic" modules to manage)

    Thank you!

  • Implement GracefulRestart actor

    Implement GracefulRestart actor

    I regularly use run and tableflip together in systemd managed applications for graceful restarts.

    Would you be interested in adding support for it?

    The actor does not actually require to import tableflip as a dependency. Here is the current code: https://github.com/sagikazarmark/appkit/blob/master/run/upgrade.go

    I'd be happy to send a PR if you like it.

  • Deregister signal channel

    Deregister signal channel

    If SignalHandler is being used as a mechanism to exit the program entirely, these resources will be cleaned up naturally. But if not, the signal should be explicitly deregistered.

    Make sure the channel is eligible to be garbage collected and that no additional signals should be delivered to the channel after this actor exits.

    See also: https://go-review.googlesource.com/c/go/+/219640/2/src/context/context.go#543

  • Allow adding after run

    Allow adding after run

    This would allow supporting cases where one task spawns sub-tasks. For example, if you need to get some URL and then once you have it you'll know that you need N other resources, and you want them all to be part of a single run.Group.

Related tags
🐜🐜🐜 ants is a high-performance and low-cost goroutine pool in Go, inspired by fasthttp./ ants 是一个高性能且低损耗的 goroutine 池。
🐜🐜🐜 ants is a high-performance and low-cost goroutine pool in Go, inspired by fasthttp./ ants 是一个高性能且低损耗的 goroutine 池。

A goroutine pool for Go English | ???? 中文 ?? Introduction Library ants implements a goroutine pool with fixed capacity, managing and recycling a massi

Jan 2, 2023
🚧 Flexible mechanism to make execution flow interruptible.

?? breaker Flexible mechanism to make execution flow interruptible. ?? Idea The breaker carries a cancellation signal to interrupt an action execution

Dec 13, 2022
🐝 A Highly Performant and easy to use goroutine pool for Go
🐝 A Highly Performant and easy to use goroutine pool for Go

gohive Package gohive implements a simple and easy to use goroutine pool for Go Features Pool can be created with a specific size as per the requireme

Sep 26, 2022
golang worker pool , Concurrency limiting goroutine pool

golang worker pool 中文说明 Concurrency limiting goroutine pool. Limits the concurrency of task execution, not the number of tasks queued. Never blocks su

Dec 19, 2022
Lightweight Goroutine pool

grpool Lightweight Goroutine pool Clients can submit jobs. Dispatcher takes job, and sends it to first available worker. When worker is done with proc

Dec 6, 2022
errgroup with goroutine worker limits

neilotoole/errgroup neilotoole/errgroup is a drop-in alternative to Go's wonderful sync/errgroup but limited to N goroutines. This is useful for inter

Dec 15, 2022
Minimalistic and High-performance goroutine worker pool written in Go

pond Minimalistic and High-performance goroutine worker pool written in Go Motivation This library is meant to provide a simple way to limit concurren

Dec 22, 2022
A goroutine pool for Go
A goroutine pool for Go

Tunny is a Golang library for spawning and managing a goroutine pool, allowing you to limit work coming from any number of goroutines with a synchrono

Dec 31, 2022
Concurrency limiting goroutine pool

workerpool Concurrency limiting goroutine pool. Limits the concurrency of task execution, not the number of tasks queued. Never blocks submitting task

Dec 28, 2022
Waiting group for collecting goroutine information.

在go语言waitGroup和errGroup都是用来控制goroutine的并发的方式,前者只能等待所有goroutine执行完成之后再执行Wait()函数后面的代码并且不

Dec 3, 2021
A simple and useful goroutine concurrent library.

Taskgroup A simple and useful goroutine concurrent library. Installation go get github.com/anthhub/taskgroup

May 19, 2021
Provides some convenient API, includes Goid(), AllGoid(), and LocalStorage, which is a goroutine's local storage, just like ThreadLocal in other languages.

routine 中文版 routine encapsulates and provides some easy-to-use, high-performance goroutine context access interfaces, which can help you access corout

Dec 30, 2022
Queue is a Golang library for spawning and managing a Goroutine pool

Queue is a Golang library for spawning and managing a Goroutine pool, Alloowing you to create multiple worker according to limit CPU number of machine.

Jan 9, 2023
Queue is a Golang library for spawning and managing a Goroutine pool

Queue is a Golang library for spawning and managing a Goroutine pool, Alloowing you to create multiple worker according to limit CPU number of machine.

Jan 2, 2023
goroutine pool in golang

goroutine pool in golang

Nov 1, 2021
A cross goroutine storage tool with very simple implementation and function.

Simple-goroutine-local is a cross goroutine storage tool with very simple implementation and function (the concept is similar to Java ThreadLocal). Ge

Jan 13, 2022
Goroutine Leak Detector

Leaktest Refactored, tested variant of the goroutine leak detector found in both net/http tests and the cockroachdb source tree. Takes a snapshot of r

Dec 26, 2022
A lib for monitoring runtime goroutine stack

Overview A lib for monitoring runtime goroutine stack. Such as wait for goroutines to exit, leak detection, etc. Features context.Context first design

Oct 31, 2022
:speedboat: a limited consumer goroutine or unlimited goroutine pool for easier goroutine handling and cancellation

Package pool Package pool implements a limited consumer goroutine or unlimited goroutine pool for easier goroutine handling and cancellation. Features

Jan 1, 2023
🐜🐜🐜 ants is a high-performance and low-cost goroutine pool in Go, inspired by fasthttp./ ants 是一个高性能且低损耗的 goroutine 池。
🐜🐜🐜 ants is a high-performance and low-cost goroutine pool in Go, inspired by fasthttp./ ants 是一个高性能且低损耗的 goroutine 池。

A goroutine pool for Go English | ???? 中文 ?? Introduction Library ants implements a goroutine pool with fixed capacity, managing and recycling a massi

Jan 2, 2023