[Go] Lightweight eventbus with async compatibility for Go

EventBus

GoDoc Coverage Status Build Status

Package EventBus is the little and lightweight eventbus with async compatibility for GoLang.

Installation

Make sure that Go is installed on your computer. Type the following command in your terminal:

go get github.com/asaskevich/EventBus

After it the package is ready to use.

Import package in your project

Add following line in your *.go file:

import "github.com/asaskevich/EventBus"

If you unhappy to use long EventBus, you can do something like this:

import (
	evbus "github.com/asaskevich/EventBus"
)

Example

func calculator(a int, b int) {
	fmt.Printf("%d\n", a + b)
}

func main() {
	bus := EventBus.New();
	bus.Subscribe("main:calculator", calculator);
	bus.Publish("main:calculator", 20, 40);
	bus.Unsubscribe("main:calculator", calculator);
}

Implemented methods

  • New()
  • Subscribe()
  • SubscribeOnce()
  • HasCallback()
  • Unsubscribe()
  • Publish()
  • SubscribeAsync()
  • SubscribeOnceAsync()
  • WaitAsync()

New()

New returns new EventBus with empty handlers.

bus := EventBus.New();

Subscribe(topic string, fn interface{}) error

Subscribe to a topic. Returns error if fn is not a function.

func Handler() { ... }
...
bus.Subscribe("topic:handler", Handler)

SubscribeOnce(topic string, fn interface{}) error

Subscribe to a topic once. Handler will be removed after executing. Returns error if fn is not a function.

func HelloWorld() { ... }
...
bus.SubscribeOnce("topic:handler", HelloWorld)

Unsubscribe(topic string, fn interface{}) error

Remove callback defined for a topic. Returns error if there are no callbacks subscribed to the topic.

bus.Unsubscribe("topic:handler", HelloWord);

HasCallback(topic string) bool

Returns true if exists any callback subscribed to the topic.

Publish(topic string, args ...interface{})

Publish executes callback defined for a topic. Any additional argument will be transferred to the callback.

func Handler(str string) { ... }
...
bus.Subscribe("topic:handler", Handler)
...
bus.Publish("topic:handler", "Hello, World!");

SubscribeAsync(topic string, fn interface{}, transactional bool)

Subscribe to a topic with an asynchronous callback. Returns error if fn is not a function.

func slowCalculator(a, b int) {
	time.Sleep(3 * time.Second)
	fmt.Printf("%d\n", a + b)
}

bus := EventBus.New()
bus.SubscribeAsync("main:slow_calculator", slowCalculator, false)

bus.Publish("main:slow_calculator", 20, 60)

fmt.Println("start: do some stuff while waiting for a result")
fmt.Println("end: do some stuff while waiting for a result")

bus.WaitAsync() // wait for all async callbacks to complete

fmt.Println("do some stuff after waiting for result")

Transactional determines whether subsequent callbacks for a topic are run serially (true) or concurrently(false)

SubscribeOnceAsync(topic string, args ...interface{})

SubscribeOnceAsync works like SubscribeOnce except the callback to executed asynchronously

WaitAsync()

WaitAsync waits for all async callbacks to complete.

Cross Process Events

Works with two rpc services:

  • a client service to listen to remotely published events from a server
  • a server service to listen to client subscriptions

server.go

func main() {
    server := NewServer(":2010", "/_server_bus_", New())
    server.Start()
    // ...
    server.EventBus().Publish("main:calculator", 4, 6)
    // ...
    server.Stop()
}

client.go

func main() {
    client := NewClient(":2015", "/_client_bus_", New())
    client.Start()
    client.Subscribe("main:calculator", calculator, ":2010", "/_server_bus_")
    // ...
    client.Stop()
}

Notes

Documentation is available here: godoc.org. Full information about code coverage is also available here: EventBus on gocover.io.

Support

If you do have a contribution for the package feel free to put up a Pull Request or open Issue.

Special thanks to contributors

Comments
  • Remove boilerplate in subscribe functions

    Remove boilerplate in subscribe functions

    This moves all of the repeated code from each of the public subscribe functions into a single private function. I believe it makes the API simpler overall and easier to read.

    gofmt updated all of the spacing in the file, so you can look directly at commit e7eac98 for the actual changes.

    Thanks for this bit of code! It's exactly what I need and very easy to understand.

    (PS. This should also get the test coverage to 100%)

  • Two subscribers?

    Two subscribers?

    As I see sources -

    func (bus *EventBus) Subscribe(topic string, fn interface{}) error {
        ...
        bus.handlers[topic] = &eventHandler{
            v, false, false, false, sync.Mutex{},
        }
        ...
    }
    

    It bus support only one subscriber for each topic? If I add second subscriber it will overwrite first.

  • PublishAsync

    PublishAsync

    Hi,

    Considering the callback func slowCalculator, the way PublishAsync works using Publish means that if multiple events are published asynchronously to the same topic, each subsequent goroutine has to wait for the earlier goroutine callback to finish before executing.

    This may be ideal in some scenarios where there may be contention if the callbacks are run in two separate go routines (transactional behaviour).

    However, this may not be ideal where there is no contention if a given callback runs more than once concurrently.

    The idea tested for this scenario is to

    • lock
    • do all necessary the map operations
    • unlock
    • execute the handler

    This will allow each subsequent goroutine callback to gain access to the lock almost instantly and allow the same callback to run multiple times concurrently.

    As part of a slightly bigger change, I was thinking of a new struct to represent a handler that contains

    • the callback as reflect.Value
    • flagOnce
    • async bool, to determine whether the callback should run asynchronously
    • transactional bool, to determine whether to use the default behaviour or allow the new behaviour

    Additionally:

    • PublishAsync will be unexported and Publish will be the general purpose method for publishing events.
    • SubscribeAsync will be implemented to set the behaviour for async callbacks

    The downside to this approach is having 3 extra bools for each handler. The advantage is only one map will be needed.

    I'm hoping for some feedback, what you think works, what doesn't or any suggestions related to this change.

  • use copy() instead of append() in publish()

    use copy() instead of append() in publish()

    Thanks for some very useful code.

    In Publish(), it does an append in order to prevent issues with removal.

    I found a test suite that shows that copy is faster. The similar code would be BenchmarkAppendAlloc (645ns/op) vs. BenchmarkCopy (521ns/op).

    https://gist.github.com/xogeny/b819af6a0cf8ba1caaef

    go test -bench=.
    goos: darwin
    goarch: amd64
    BenchmarkAppend-12               	 1000000	      1087 ns/op
    BenchmarkAppendAlloc-12          	 3000000	       645 ns/op
    BenchmarkAppendAllocInline-12    	 3000000	       508 ns/op
    BenchmarkCopy-12                 	 5000000	       521 ns/op
    PASS
    ok  	_/tmp/append_vs_copy	8.642s
    
  • Stuck when trying to send response back to message Bus

    Stuck when trying to send response back to message Bus

    Hi, I have modified the example and wanted to send the response of the calculator back on the bus, on a different topic. I would like to chain different functions to react on events down a chain. When I run "go run" process is stuck, nothing happens.

    package main
    import (
        "fmt"
        "github.com/asaskevich/EventBus"
        "time"
    )
    
    var (
        bus  EventBus.Bus;
    )
    
    func calculator1(a int, b int) {
        fmt.Printf("calc1: %d\n", a + b)
        bus.Publish("print", "calc1 calculated : %d\n", a + b)
    }
    
    func printer(s string) {
    	fmt.Println(s)
    }
    
    func main() {
    	bus = EventBus.New();
        bus.Subscribe("calc", calculator1);
    	bus.Subscribe("print", printer);
    	sum := 1
        for sum < 10 {
            fmt.Println(sum)
            bus.Publish("calc", sum, sum);
            time.Sleep(1000 * time.Millisecond)
            sum += 1
        }
        bus.Unsubscribe("calc", calculator1);
        bus.Unsubscribe("print", printer);
    }
    

    Output

    > go run poc3.go
    1
    calc1: 2
    ^Csignal: interrupt
    
    

    I am on linux 64 bit.

  • added publish async

    added publish async

    func slowCalculator(a, b int) { time.Sleep(3 * time.Second) fmt.Printf("%d\n", a + b) }

    bus.Subscribe("main:slow_calculator", slowCalculator); bus.PublishAsync("main:slow_calculator", 30, 70); fmt.Println("waiting for result")

    bus.WaitAsync();

    bus.Unsubscribe("main:slow_calculator");

  • Allow call Publish with nil values

    Allow call Publish with nil values

    In my application, it's common to have events with nil arguments. Example. Without this fix I always get reflect: Call using zero Value argument error.

    This fix also should slightly increase performance by allocating all necessary memory to store arguments immediately.

  • Fixed bug with subscribe/unsubscribe flow for type methods

    Fixed bug with subscribe/unsubscribe flow for type methods

    I found an interesting case with subscribe/unsubscribe workflow. This test case demonstrates the issue

    type handler struct {
    	val int
    }
    
    func (h *handler) Handle() {
    	h.val++
    }
    
    func TestUnsubscribeMethod(t *testing.T) {
    	bus := New()
    	h := &handler{val: 0}
    
    	bus.Subscribe("topic", h.Handle)
    	bus.Publish("topic")
    	if bus.Unsubscribe("topic", h.Handle) != nil {
    		t.Fail()
    	}
    	if bus.Unsubscribe("topic", h.Handle) == nil {
    		t.Fail()
    	}
    	bus.Publish("topic")
    	bus.WaitAsync()
    
    	if h.val != 1 {
    		t.Fail()
    	}
    }
    

    It seems that simple comparing doesn't work for reflect.Value objects. This PR aims to fix this issue.

  • Handler locking should not affect eventbus lock

    Handler locking should not affect eventbus lock

    A transactional async subscription receives two messages. The first message blocks forever in the handler. The second message blocks forever in handler.Lock(), and never releases eventbus.lock.Unlock(). And so the eventbus deadlocks.

  • Added interfaces and fixed concurrency issues

    Added interfaces and fixed concurrency issues

    The PR contains:

    • additional interfaces wrapped over the EventBus struct
    • fixed concurrent access error on async handler removing
    • fixed race on async handler execution due to ineffective handler lock
    • minor formatting fixes using gofmt and linter
  • Introduce bus interface instead of struct

    Introduce bus interface instead of struct

    Hello,

    Could we replace structs with interfaces to facilitate testing of bus-dependent services? This would also provide an elegant way to implement the network bus: it would simply be an another implementation of the interface. I'm working on a corresponding PR. It would be useful to tag releases as well as more and more people use dependency management tools.

    Cheers, Michal

  • Indexing error due to multiple removeHandler

    Indexing error due to multiple removeHandler

    In line 141 func Publish

    // Publish executes callback defined for a topic. Any additional argument will be transferred to the callback.
    func (bus *EventBus) Publish(topic string, args ...interface{}) {
    	bus.lock.Lock() // will unlock if handler is not found or always after setUpPublish
    	defer bus.lock.Unlock()
    	if handlers, ok := bus.handlers[topic]; ok && 0 < len(handlers) {
    		// Handlers slice may be changed by removeHandler and Unsubscribe during iteration,
    		// so make a copy and iterate the copied slice.
    		copyHandlers := make([]*eventHandler, len(handlers))
    		copy(copyHandlers, handlers)
    		for i, handler := range copyHandlers {
    			if handler.flagOnce {
    				bus.removeHandler(topic, i) // multiple operation causes indexing error
    			}
    			if !handler.async {
    				bus.doPublish(handler, topic, args...)
    			} else {
    				bus.wg.Add(1)
    				if handler.transactional {
    					bus.lock.Unlock()
    					handler.Lock()
    					bus.lock.Lock()
    				}
    				go bus.doPublishAsync(handler, topic, args...)
    			}
    		}
    	}
    }
    

    i and handler are ranged in copyHandlers while remove operation actions in bus.handlers. 🤨

  • There is no release version available

    There is no release version available

    Hello, Thank you for this beautifully simple and functional library. Can you please release a version so that we can use it instead of master? This helps in shielding our projects from changes in master branch.

  • do not hold lock on whole publish

    do not hold lock on whole publish

    Fix #35 Fix #52

    This PR does not holds lock during whole publish process. It saves from deadlocks when callbacks also trying to publish something or manage subscription on the bus.

    Before this publish worked with copy of active handlers anyway, so now it's safe to work with bus because it uses locks only to manage internal state, not causing deadlock in subsequent calls from handlers.

    There is a slight change in the handler object: once boolean flag replaced with pointer to sync.Once. This achieves following goals:

    • Use separate synchronization to ensure handler invoked only once.
    • Uses pointer to allow sync.Once to be nullable and skip it when not needed.
    • Uses uniqueness property of pointers in handlers slice to reliably find own record for removal.
  • `Lock` aquired when publish an event from subscriber.

    `Lock` aquired when publish an event from subscriber.

    Current behaviour

    The main function Publish an event1 and relative handler of event1, publish event2, but the event2 handler is not calling, and the program is not exited.

    bus.Subscribe("event2", func() {
    	fmt.Println("event2")
    })
    bus.Subscribe("event1", func(bus EventBus.Bus) {
    	fmt.Println("event1")
    	time.Sleep(time.Second * 5)
    	bus.Publish("event2")
    })
    bus.Publish("event1", bus)
    

    Further debugging

    • I had seen in subscribe we lock bus, so whenever publish already locked, and in SubscribeAsync we can't lock bus. Is that default behaviour?
  • Add go.mod file for use with go modules

    Add go.mod file for use with go modules

    For those of us using go modules, it would be nice to support it. Pretty straightforward here since this is only one package.

    BTW, the go version number doesn't mean that that version is required. Here's Ian's explanation about the version number.

    The repo will also need a version tagged so those using modules can point at it.

    Thanks Alex!

Related tags
Go simple async message bus
Go simple async message bus

?? message-bus Go simple async message bus. ?? ABOUT Contributors: Rafał Lorenz Want to contribute ? Feel free to send pull requests! Have problems, b

Dec 29, 2022
RapidMQ is a pure, extremely productive, lightweight and reliable library for managing of the local messages queue

RapidMQ RapidMQ is a pure, extremely productive, lightweight and reliable library for managing of the local messages queue in the Go programming langu

Sep 27, 2022
⚡️ A lightweight service that will build and store your go projects binaries, Integrated with Github, Gitlab, Bitbucket and Bitbucket Server.
⚡️ A lightweight service that will build and store your go projects binaries, Integrated with Github, Gitlab, Bitbucket and  Bitbucket Server.

Rabbit A lightweight service that will build and store your go projects binaries. Rabbit is a lightweight service that will build and store your go pr

Nov 19, 2022
A lightweight, distributed and reliable message queue based on Redis

nmq A lightweight, distributed and reliable message queue based on Redis Get Started Download go get github.com/inuggets/nmq Usage import "github.com

Nov 22, 2021
A lightweight, thread-safe FIFO queue with fixed capacity.

FIFO Queue Thread-safe, lightweight, tested FIFO queue with fixed size, which is built upon list for performance ;-) This implement is inspired by htt

Dec 16, 2021
A lightweight event collection system.
A lightweight event collection system.

Honeypot A self-contained, multi-protocol streaming event collection system with ambitions to be as boring as benthos. Honeypot is primarily built for

Dec 6, 2022
[Go] Lightweight eventbus with async compatibility for Go

EventBus Package EventBus is the little and lightweight eventbus with async compatibility for GoLang. Installation Make sure that Go is installed on y

Jan 3, 2023
[Go] Lightweight eventbus with async compatibility for Go

[Go] Lightweight eventbus with async compatibility for Go

Jan 3, 2023
A lightweight eventbus that simplifies communication between goroutines,

English | 简体中文 eventbus A lightweight eventbus that simplifies communication between goroutines, it supports synchronous and asynchronous message publ

Apr 28, 2023
GTA(Go Task Async) is a lightweight reliable asynchronous task and transaction message library for Golang

GTA (Go Task Async) is a lightweight and reliable asynchronous task and transaction message library for by golang.

Jun 4, 2022
Oct 8, 2022
VMail - check the markup (HTML, CSS) of HTML email template compatibility with email clients
VMail - check the markup (HTML, CSS) of HTML email template compatibility with email clients

VMail - check the markup (HTML, CSS) of HTML email template compatibility with email clients Email clients use different rendering standards. This is

Dec 17, 2022
ets2-dlc-repacker is a Windows / Linux / MacOS CLI util to automatically repack older DLC archives for compatibility with newer versions.
ets2-dlc-repacker is a Windows / Linux / MacOS CLI util to automatically repack older DLC archives for compatibility with newer versions.

ets2-dlc-repacker is a Windows / Linux / MacOS CLI util to automatically repack older DLC archives for compatibility with newer versions.

Dec 26, 2021
Go-archvariant - Go package for determining the maximum compatibility version of the current system

go-archvariant Go package for determining the maximum compatibility version of t

Feb 19, 2022
Dec 27, 2022
Go simple async worker pool
Go simple async worker pool

??‍?? worker-pool Go simple async worker pool. ?? ABOUT Worker pool is a software design pattern for achieving concurrency of task execution. Maintain

Sep 26, 2022
High performance async-io(proactor) networking for Golang。golangのための高性能非同期io(proactor)ネットワーキング
High performance async-io(proactor) networking for Golang。golangのための高性能非同期io(proactor)ネットワーキング

gaio Introduction 中文介绍 For a typical golang network program, you would first conn := lis.Accept() to get a connection and go func(net.Conn) to start a

Dec 29, 2022
Go simple async message bus
Go simple async message bus

?? message-bus Go simple async message bus. ?? ABOUT Contributors: Rafał Lorenz Want to contribute ? Feel free to send pull requests! Have problems, b

Dec 29, 2022
Async management of servers, containers, workstations...basically anything that runs an operating system.

steward What is it ? Command And Control anything asynchronously. Send shell commands to control your servers by passing a message that will have guar

Dec 14, 2022
Async peer communication protocol & library
Async peer communication protocol & library

Gotalk exists to make it easy for programs to talk with one another over the internet, like a web app coordinating with a web server, or a bunch of programs dividing work amongst each other.

Jan 5, 2023