:incoming_envelope: A fast Message/Event Hub using publish/subscribe pattern with support for topics like* rabbitMQ exchanges for Go applications

Hub

📨 A fast enough Event Hub for go applications using publish/subscribe with support patterns on topics like rabbitMQ exchanges.

Release Software License Actions Status Coverage Status Go Doc Go Report Card Say Thanks!


Table of Contents

Install

To install this library you can go get it but I encourage you to always vendor your dependencies or use one of the version tags of this project.

go get -u github.com/leandro-lugaresi/hub
dep ensure --add github.com/leandro-lugaresi/hub

Usage

Subscribers

Hub provides subscribers as buffered (cap > 0) and unbuffered (cap = 0) channels but we have two different types of subscribers:

  • Subscriber this is the default subscriber and it's a blocking subscriber so if the channel is full and you try to send another message the send operation will block until the subscriber consumes some message.
  • NonBlockingSubscriber this subscriber will never block on the publish side but if the capacity of the channel is reached the publish operation will be lost and an alert will be trigged. This should be used only if loose data is acceptable. ie: metrics, logs

Topics

This library uses the same concept of topic exchanges on rabbiMQ, so the message name is used to find all the subscribers that match the topic, like a route. The topic must be a list of words delimited by dots (.) however, there is one important special case for binding keys: * (star) can substitute for exactly one word.

Examples & Demos

package main

import (
	"fmt"
	"sync"

	"github.com/leandro-lugaresi/hub"
)
func main() {
	h := hub.New()
	var wg sync.WaitGroup
	// the cap param is used to create one buffered channel with cap = 10
	// If you wan an unbuferred channel use the 0 cap
	sub := h.Subscribe(10, "account.login.*", "account.changepassword.*")
	wg.Add(1)
	go func(s hub.Subscription) {
		for msg := range s.Receiver {
			fmt.Printf("receive msg with topic %s and id %d\n", msg.Name, msg.Fields["id"])
		}
		wg.Done()
	}(sub)

	h.Publish(hub.Message{
		Name:   "account.login.failed",
		Fields: hub.Fields{"id": 123},
	})
	h.Publish(hub.Message{
		Name:   "account.changepassword.failed",
		Fields: hub.Fields{"id": 456},
	})
	h.Publish(hub.Message{
		Name:   "account.login.success",
		Fields: hub.Fields{"id": 123},
	})
	// message not routed to this subscriber
	h.Publish(hub.Message{
		Name:   "account.foo.failed",
		Fields: hub.Fields{"id": 789},
	})

	// close all the subscribers
	h.Close()
	// wait until finish all the messages on buffer
	wg.Wait()

	// Output:
	// receive msg with topic account.login.failed and id 123
	// receive msg with topic account.changepassword.failed and id 456
	// receive msg with topic account.login.success and id 123
}

See more here!

Benchmarks

To run the benchmarks you can execute:

make bench

Currently, I only have the benchmarks of the CSTrie used internally. I will provide more benchmarks.

Throughput

The project have one test for throughput, just execute:

make throughput

In a intel(R) core(TM) i5-4460 CPU @ 3.20GHz x4 we got this results:

go test -v -timeout 60s github.com/leandro-lugaresi/hub -run ^TestThroughput -args -throughput
=== RUN   TestThroughput
1317530.091292 msg/sec
--- PASS: TestThroughput (3.04s)
PASS
ok      github.com/leandro-lugaresi/hub 3.192s

CSTrie

This project uses internally an awesome Concurrent Subscription Trie done by @tylertreat. If you want to learn more about see this blog post and the code is here


This project adheres to the Contributor Covenant code of conduct. By participating, you are expected to uphold this code. We appreciate your contribution. Please refer to our contributing guidelines for further information

Owner
Leandro Lugaresi
Gopher | Storage engines and event source enthusiast
Leandro Lugaresi
Comments
  • GitHub actions

    GitHub actions

    This PR updates the CI and test tools for this library.

    • [X] Remove TravisCI configurations
    • [X] Added initial GitHub actions workflow
    • [X] Removed old coverage merge tool
    • [X] Update go to support go modules
    • [X] Update docs
    • [X] use golangci-lint instead of gometalinter
  • add multiple topic subscribers

    add multiple topic subscribers

    in some cases, the subscribers need to subscribe to multiple topics and the * isn`t enough.

    Now all the subscribers can subscribe to multiple topics. Reducing the use of select from multiple channels

  • Remove message helper functions

    Remove message helper functions

    it's hard to do the right thing when casting errors happen. Sometimes we need to return the zero value, others panic or log To avoid problems this cast must be done on the subscription side

  • Simplify api

    Simplify api

    I removed the Subscriber exported interface and change the Subscription to have a channel instead The nonBlockingSubscriber was changed and instead of an ring buffer we will use one channel as well This simplified the api and removed the only dependency of this library but this cost in terms of performance. This will not be a problem because this software should not be the hot path of a program

  • update workflows to work on forks

    update workflows to work on forks

    This PR fixes two problems on pull requests from forks:

    • [X] Reviewdog failing because the token is not available on forks
    • [X] Tests and coverage not running
  • Fix panic on closed channel

    Fix panic on closed channel

    Fix panic in subscribers

    When hub publishes messages, it first finds all matching subscribers and then calls sub.Set(m) on them:

    // Publish will send an event to all the subscribers matching the event name.
    func (h *Hub) Publish(m Message) {
    	for k, v := range h.fields {
    		m.Fields[k] = v
    	}
    
    	for _, sub := range h.matcher.Lookup(m.Topic()) {
    		sub.Set(m)
    	}
    }
    

    However, if Unsubscribe is called after the filtering but before the Set, the subscriber's underlying channel is closed and Set tries to write into a closed channel.

    This PR adds channels and goroutines responsible for closing the channel.

  • Wildcard for subscribing to all events

    Wildcard for subscribing to all events

    Wildcard subscriptions to a topic prefix, like account.* work fine, but how can I subscribe to all events? I tried hub.Susbcribe(1, "*"), but apparently that doesn't work.

Build event-driven and event streaming applications with ease

Commander ?? Commander is Go library for writing event-driven applications. Enabling event sourcing, RPC over messages, SAGA's, bidirectional streamin

Dec 19, 2022
Pause / Unpause NSQ Topics and Channels

Action pause unpause empty info check Worker Pool 1 <= n <= len(target) 0 for unlimited pool depend on how many the targets Target Array of topics or

Jun 29, 2022
Simple docker container to publish a fixed message to a specified queue. Created to be used with k8s CRON scheduling.

RabbitMQ Publish CRON Simple docker container to publish a fixed message to a specified rabbitmq exchange. Created to be used as part of a Kubernetes

Dec 20, 2021
Testing message queues with RabbitMQ

Rabbit-MessageQueue Just a repository of RabbitMQ simple usage for queueing messages. You can use this as a sender or a receiver. More information is

Mar 10, 2022
Alertmanager go message broker - A simple message broker made to integrate with alertmanager/prometheus

Alertmanager message broker Prerequisites Go 1.16+ Sqllite driver About: The alertmanager message broker is a project made to meet some of my needs to

Dec 27, 2021
Govent is an event bus framework for DDD event source implement

Govent is an event bus framework for DDD event source implement. Govent can also solve the package circular dependency problem.

Jan 28, 2022
Event-planning-go - GRAPHQL Project for Event Planning

About The Project GRAPHQL Project for Event Planning Building the project with l

Mar 13, 2022
Go library to build event driven applications easily using AMQP as a backend.

event Go library to build event driven applications easily using AMQP as a backend. Publisher package main import ( "github.com/creekorful/event" "

Dec 5, 2021
Declare AMQP entities like queues, producers, and consumers in a declarative way. Can be used to work with RabbitMQ.

About This package provides an ability to encapsulate creation and configuration of RabbitMQ([AMQP])(https://www.amqp.org) entities like queues, excha

Dec 28, 2022
A tiny wrapper over amqp exchanges and queues 🚌 ✨

Rabbus ?? ✨ A tiny wrapper over amqp exchanges and queues. In memory retries with exponential backoff for sending messages. Protect producer calls wit

Dec 18, 2022
Abstraction layer for simple rabbitMQ connection, messaging and administration
Abstraction layer for simple rabbitMQ connection, messaging and administration

Jazz Abstraction layer for quick and simple rabbitMQ connection, messaging and administration. Inspired by Jazz Jackrabbit and his eternal hatred towa

Dec 12, 2022
RabbitMQ wire tap and swiss army knife
RabbitMQ wire tap and swiss army knife

rabtap - RabbitMQ wire tap Swiss army knife for RabbitMQ. Tap/Pub/Sub messages, create/delete/bind queues and exchanges, inspect broker. Contents Feat

Dec 28, 2022
RabbitMQ Reconnection client

rmqconn RabbitMQ Reconnection for Golang Wrapper over amqp.Connection and amqp.Dial. Allowing to do a reconnection when the connection is broken befor

Sep 27, 2022
An easy-to-use CLI client for RabbitMQ.

buneary, pronounced bun-ear-y, is an easy-to-use RabbitMQ command line client for managing exchanges, managing queues and publishing messages to exchanges.

Sep 3, 2022
A user friendly RabbitMQ library written in Golang.

TurboCookedRabbit A user friendly RabbitMQ library written in Golang to help use streadway/amqp. Based on my work found at CookedRabbit. Work Recently

Jan 6, 2023
🚀 Golang, Go Fiber, RabbitMQ, MongoDB, Docker, Kubernetes, GitHub Actions and Digital Ocean
🚀 Golang, Go Fiber, RabbitMQ, MongoDB, Docker, Kubernetes, GitHub Actions and Digital Ocean

Bookings Solução de cadastro de usuários e reservas. Tecnologias Utilizadas Golang MongoDB RabbitMQ Github Actions Docker Hub Docker Kubernetes Digita

Feb 18, 2022
An AMQP 0-9-1 Go client maintained by the RabbitMQ team. Originally by @streadway: `streadway/amqp`

Go RabbitMQ Client Library This is a Go AMQP 0.9.1 client maintained by the RabbitMQ core team. It was originally developed by Sean Treadway. Differen

Jan 1, 2023
Golang AMQP wrapper for RabbitMQ with better API

go-rabbitmq Golang AMQP wrapper for RabbitMQ with better API Table of Contents Background Features Usage Installation Connect to RabbitMQ Declare Queu

Dec 21, 2022
High level manegment for rabbitmq.

High level manegment for rabbitmq. Features Simple configuration bootstrap. Gracefully shutting down. Consume messages in parallel specifying a number

Sep 24, 2022