🔊Minimalist message bus implementation for internal communication

🔊 Bus

GoDoc Build Status Coverage Status Go Report Card GitHub license

Bus is a minimalist event/message bus implementation for internal communication. It is heavily inspired from my event_bus package for Elixir language.

API

The method names and arities/args are stable now. No change should be expected on the package for the version 1.x.x except any bug fixes.

Installation

Via go packages: go get github.com/mustafaturan/bus

Usage

Configure

The package requires a unique id generator to assign ids to events. You can write your own function to generate unique ids or use a package that provides unique id generation functionality.

The bus package respect to software design choice of the packages/projects. It supports both singleton and dependency injection to init a bus instance.

Hint: Check the demo project for the singleton configuration.

Here is a sample initilization using monoton id generator:

import (
    "github.com/mustafaturan/bus"
    "github.com/mustafaturan/monoton"
    "github.com/mustafaturan/monoton/sequencer"
)

func NewBus() *bus.Bus {
    // configure id generator (it doesn't have to be monoton)
    node        := uint64(1)
    initialTime := uint64(1577865600000) // set 2020-01-01 PST as initial time
    m, err := monoton.New(sequencer.NewMillisecond(), node, initialTime)
    if err != nil {
        panic(err)
    }

    // init an id generator
    var idGenerator bus.Next = (*m).Next

    // create a new bus instance
    b, err := bus.NewBus(idGenerator)
    if err != nil {
        panic(err)
    }

    // maybe register topics in here
    b.RegisterTopics("order.received", "order.fulfilled")

    return b
}

Register Event Topics

To emit events to the topics, topic names need to be registered first:

// register topics
b.RegisterTopics("order.received", "order.fulfilled")

Register Event Handlers

To receive topic events you need to register handlers; A handler basically requires two vals which are a Handle function and topic Matcher regex pattern.

handler := bus.Handler{
    Handle: func(e *bus.Event) {
        // do something
        // NOTE: Highly recommended to process the event in an async way
    },
    Matcher: ".*", // matches all topics
}
b.RegisterHandler("a unique key for the handler", &handler)

Emit Events

// if txID val is blank, bus package generates one using the id generator
ctx := context.Background()
ctx = context.WithValue(ctx, bus.CtxKeyTxID, "some-transaction-id-if-exists")

// event topic name (must be registered before)
topic := "order.received"

// interface{} data for event
order := make(map[string]string)
order["orderID"]     = "123456"
order["orderAmount"] = "112.20"
order["currency"]    = "USD"

// emit the event
event, err := b.Emit(ctx, topic, order)

if err != nil {
    // report the err
    fmt.Println(err)
}

// if the caller needs the event, a ref for the event is returning as result of
// the `Emit` call.
fmt.Println(event)

Processing Events

When an event is emitted, the topic handlers receive the event synchronously. It is highly recommended to process events asynchronous. Package leave the decision to the packages/projects to use concurrency abstractions depending on use-cases. Each handlers receive the same event as ref of bus.Event struct:

// Event data structure
type Event struct {
	ID         string      // identifier
	TxID       string      // transaction identifier
	Topic      string      // topic name
	Data       interface{} // actual event data
	OccurredAt int64       // creation time in nanoseconds
}

Sample Project

A demo project with three consumers which increments a counter for each event topic, printer consumer which prints all events and lastly calculator consumer which sums amounts.

Benchmarks

BenchmarkEmit-4   	 5983903	       200 ns/op	     104 B/op	       2 allocs/op

Contributing

All contributors should follow Contributing Guidelines before creating pull requests.

Credits

Mustafa Turan

License

Apache License 2.0

Copyright (c) 2020 Mustafa Turan

Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions:

The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software.

THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.

Similar Resources

A library for scheduling when to dispatch a message to a channel

gosd go-schedulable-dispatcher (gosd), is a library for scheduling when to dispatch a message to a channel. Implementation The implementation provides

Sep 27, 2022

:incoming_envelope: A fast Message/Event Hub using publish/subscribe pattern with support for topics like* rabbitMQ exchanges for Go applications

Hub 📨 A fast enough Event Hub for go applications using publish/subscribe with support patterns on topics like rabbitMQ exchanges. Table of Contents

Dec 17, 2022

Machinery is an asynchronous task queue/job queue based on distributed message passing.

Machinery is an asynchronous task queue/job queue based on distributed message passing.

Machinery Machinery is an asynchronous task queue/job queue based on distributed message passing. V2 Experiment First Steps Configuration Lock Broker

Jan 7, 2023

A single binary, simple, message queue.

MiniQueue A stupid simple, single binary message queue using HTTP/2. Most messaging workloads don't require enormous amounts of data, endless features

Nov 9, 2022

GTA(Go Task Async) is a lightweight reliable asynchronous task and transaction message library for Golang

GTA (Go Task Async) is a lightweight and reliable asynchronous task and transaction message library for by golang.

Jun 4, 2022

The Xiaomi message push service is a system-level channel on MIUI and is universal across the platform, which can provide developers with stable, reliable, and efficient push services.

Go-Push-API MiPush、JiPush、UMeng MiPush The Xiaomi message push service is a system-level channel on MIUI and is universal across the platform, which c

Oct 20, 2022

GopherSay allow you to display a message said by a cute random Gopher.

GopherSay About Welcome in GopherSay! GopherSay is inspired by Cowsay program. GopherSay allow you to display a message said by a cute random Gopher.

Nov 23, 2022

Golang module/tool for decoding proto buffer without message definition.

Golang module/tool for decoding proto buffer without message definition.

Golang module/tool for decoding proto buffer without message definition.

Nov 11, 2022

Testing message queues with RabbitMQ

Rabbit-MessageQueue Just a repository of RabbitMQ simple usage for queueing messages. You can use this as a sender or a receiver. More information is

Mar 10, 2022
Comments
  • How to setup the point of synchronization between consumers and producer?

    How to setup the point of synchronization between consumers and producer?

    I see this in example code:

    // give some time to process events for async consumers
    time.Sleep(time.Millisecond * 25)
    

    This approach can not be use in production, Is there a more formal way to setup producer and consumers join point?

    If there is one consumer, within Event.Data can use channel or something to synchronize, buf if with multiple consumers, this approach is not feasible.

    Any suggestion?

  • Clarify Semantics

    Clarify Semantics

    From the readme it's unclear how events are consumed. Will events always make it to all consumers? or can a consumer stop the processing? What is the order of processing of events? Is the order guaranteed? Would be great to clarify this.

Messagebus - Simple Message Bus Written in Golang

MessageBus Simple Message Bus Written in Golang How to Use go get gopkg.io/Usada

Apr 21, 2022
Alertmanager go message broker - A simple message broker made to integrate with alertmanager/prometheus

Alertmanager message broker Prerequisites Go 1.16+ Sqllite driver About: The alertmanager message broker is a project made to meet some of my needs to

Dec 27, 2021
Native Go bindings for D-Bus

dbus dbus is a simple library that implements native Go client bindings for the D-Bus message bus system. Features Complete native implementation of t

Dec 30, 2022
Gin best practices, gin development scaffolding, too late to explain, get on the bus.

Table of Contents generated with DocToc gin_scaffold 现在开始 文件分层 log / redis / mysql / http.client 常用方法 swagger文档生成 gin_scaffold Gin best practices, gin

Dec 27, 2022
Easy to use distributed event bus similar to Kafka
Easy to use distributed event bus similar to Kafka

chukcha Easy to use distributed event bus similar to Kafka. The event bus is designed to be used as a persistent intermediate storage buffer for any k

Dec 30, 2022
Govent is an event bus framework for DDD event source implement

Govent is an event bus framework for DDD event source implement. Govent can also solve the package circular dependency problem.

Jan 28, 2022
Mizu - API traffic viewer for Kubernetes enabling you to view all API communication between microservices
Mizu - API traffic viewer for Kubernetes enabling you to view all API communication between microservices

The API Traffic Viewer for Kubernetes A simple-yet-powerful API traffic viewer f

Jan 9, 2023
:notes: Minimalist websocket framework for Go
:notes: Minimalist websocket framework for Go

melody ?? Minimalist websocket framework for Go. Melody is websocket framework based on github.com/gorilla/websocket that abstracts away the tedious p

Dec 30, 2022
NanoMDM is a minimalist Apple MDM server heavily inspired by MicroMDM

NanoMDM NanoMDM is a minimalist Apple MDM server heavily inspired by MicroMDM. Getting started & Documentation Quickstart A quick guide to get NanoMDM

Dec 28, 2022
An n:m message multiplexer written in Go
An n:m message multiplexer written in Go

What is Gollum? Gollum is an n:m multiplexer that gathers messages from different sources and broadcasts them to a set of destinations. Gollum origina

Dec 23, 2022