A tiny wrapper over amqp exchanges and queues 🚌 ✨

Rabbus 🚌

  • A tiny wrapper over amqp exchanges and queues.
  • In memory retries with exponential backoff for sending messages.
  • Protect producer calls with circuit breaker.
  • Automatic reconnect to RabbitMQ broker when connection is lost.
  • Go channel API.

Installation

go get -u github.com/rafaeljesus/rabbus

Usage

The rabbus package exposes an interface for emitting and listening RabbitMQ messages.

Emit

import (
	"context"
	"time"

	"github.com/rafaeljesus/rabbus"
)

func main() {
	timeout := time.After(time.Second * 3)
	cbStateChangeFunc := func(name, from, to string) {
		// do something when state is changed
	}
	r, err := rabbus.New(
		rabbusDsn,
		rabbus.Durable(true),
		rabbus.Attempts(5),
		rabbus.Sleep(time.Second*2),
		rabbus.Threshold(3),
		rabbus.OnStateChange(cbStateChangeFunc),
	)
	if err != nil {
		// handle error
	}

	defer func(r Rabbus) {
		if err := r.Close(); err != nil {
			// handle error
		}
	}(r)

	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()

	go r.Run(ctx)

	msg := rabbus.Message{
		Exchange: "test_ex",
		Kind:     "topic",
		Key:      "test_key",
		Payload:  []byte(`foo`),
	}

	r.EmitAsync() <- msg

	for {
		select {
		case <-r.EmitOk():
			// message was sent
		case <-r.EmitErr():
			// failed to send message
		case <-timeout:
			// handle timeout error
		}
	}
}

Listen

import (
	"context"
	"encoding/json"
	"time"

	"github.com/rafaeljesus/rabbus"
)

func main() {
	timeout := time.After(time.Second * 3)
	cbStateChangeFunc := func(name, from, to string) {
		// do something when state is changed
	}
	r, err := rabbus.New(
		rabbusDsn,
		rabbus.Durable(true),
		rabbus.Attempts(5),
		rabbus.Sleep(time.Second*2),
		rabbus.Threshold(3),
		rabbus.OnStateChange(cbStateChangeFunc),
	)
	if err != nil {
		// handle error
	}

	defer func(r Rabbus) {
		if err := r.Close(); err != nil {
			// handle error
		}
	}(r)

	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()

	go r.Run(ctx)

	messages, err := r.Listen(rabbus.ListenConfig{
		Exchange:    "events_ex",
		Kind:        "topic",
		Key:         "events_key",
		Queue:       "events_q",
		DeclareArgs: rabbus.NewDeclareArgs().WithMessageTTL(15 * time.Minute).With("foo", "bar"),
		BindArgs:    rabbus.NewBindArgs().With("baz", "qux"),
	})
	if err != nil {
		// handle errors during adding listener
	}
	defer close(messages)

	go func(messages chan ConsumerMessage) {
		for m := range messages {
			m.Ack(false)
		}
	}(messages)
}

Contributing

  • Fork it
  • Create your feature branch (git checkout -b my-new-feature)
  • Commit your changes (git commit -am 'Add some feature')
  • Push to the branch (git push origin my-new-feature)
  • Create new Pull Request

Badges

Build Status Go Report Card Go Doc


GitHub @rafaeljesus  ·  Medium @_jesus_rafael  ·  Twitter @_jesus_rafael

Owner
Rafael Jesus
fork() -> exec(...)
Rafael Jesus
Comments
  • There is some issue on publishing when there is a rabbitmq disconnection

    There is some issue on publishing when there is a rabbitmq disconnection

    In case of a deconnection we encounter a "deadlock" that prevent rabbus from reconnecting.

    Calling NotifyClose in the amqp package will try to acquire a lock: https://github.com/streadway/amqp/blob/master/connection.go#L267-L278 This is what happen in the first goroutine (#41) of my log.

    In case of an error an internal goroutine of amqp will try to notify every receiver that en error has been encountered by sending the error to the channels previously registered: https://github.com/streadway/amqp/blob/master/connection.go#L387-L389 This is done after acquiring the lock of the connection. This is the second goroutine (#14).

    So the second goroutine as lock the mutex and is blocked waiting someone to read the channel, while the firstgoroutine, the one that should read the channel, is blocked by the mutex while trying to add a new channel to notify.

    Here a trace that exhibit this behavior:

    goroutine 41 [semacquire, 29 minutes]:
    sync.runtime_SemacquireMutex(0xc420212158, 0xd78600)
    	/usr/local/go/src/runtime/sema.go:71 +0x3d
    sync.(*Mutex).Lock(0xc420212154)
    	/usr/local/go/src/sync/mutex.go:134 +0x108
    github.com/rafaeljesus/rabbus/vendor/github.com/streadway/amqp.(*Connection).NotifyClose(0xc420212140, 0xc420379bc0, 0x0)
    	/home/kinou/go/src/github.com/rafaeljesus/rabbus/vendor/github.com/streadway/amqp/connection.go:268 +0x43
    github.com/rafaeljesus/rabbus/internal/amqp.(*Amqp).NotifyClose(0xc42017ae60, 0xc420379bc0, 0xc420379bc0)
    	/home/kinou/go/src/github.com/rafaeljesus/rabbus/internal/amqp/amqp.go:68 +0x38
    github.com/rafaeljesus/rabbus.(*Rabbus).Run(0xc42030c410, 0xe28da0, 0xc42018c640, 0x1, 0x0)
    	/home/kinou/go/src/github.com/rafaeljesus/rabbus/rabbus.go:201 +0xb0
    main.main.func3(0xc42030c410, 0xe28da0, 0xc42018c640, 0xc42017c620)
    	/home/kinou/go/src/github.com/CanalTP/gormungandr/cmd/schedules/main.go:169 +0x43
    created by main.main
    	/home/kinou/go/src/github.com/CanalTP/gormungandr/cmd/schedules/main.go:168 +0xcc2
    
    goroutine 14 [chan send, 30 minutes]:
    github.com/rafaeljesus/rabbus/vendor/github.com/streadway/amqp.(*Connection).shutdown.func1()
    	/home/kinou/go/src/github.com/rafaeljesus/rabbus/vendor/github.com/streadway/amqp/connection.go:388 +0xc9
    sync.(*Once).Do(0xc420212140, 0xc4203cadc8)
    	/usr/local/go/src/sync/once.go:44 +0xbe
    github.com/rafaeljesus/rabbus/vendor/github.com/streadway/amqp.(*Connection).shutdown(0xc420212140, 0xc4202dfdc0)
    	/home/kinou/go/src/github.com/rafaeljesus/rabbus/vendor/github.com/streadway/amqp/connection.go:382 +0x5b
    github.com/rafaeljesus/rabbus/vendor/github.com/streadway/amqp.(*Connection).dispatch0(0xc420212140, 0xe230a0, 0xc4202dfd40)
    	/home/kinou/go/src/github.com/rafaeljesus/rabbus/vendor/github.com/streadway/amqp/connection.go:444 +0x364
    github.com/rafaeljesus/rabbus/vendor/github.com/streadway/amqp.(*Connection).demux(0xc420212140, 0xe230a0, 0xc4202dfd40)
    	/home/kinou/go/src/github.com/rafaeljesus/rabbus/vendor/github.com/streadway/amqp/connection.go:427 +0x5d
    github.com/rafaeljesus/rabbus/vendor/github.com/streadway/amqp.(*Connection).reader(0xc420212140, 0x7f05890710d0, 0xc42000e798)
    	/home/kinou/go/src/github.com/rafaeljesus/rabbus/vendor/github.com/streadway/amqp/connection.go:521 +0xe6
    created by github.com/rafaeljesus/rabbus/vendor/github.com/streadway/amqp.Open
    	/home/kinou/go/src/github.com/rafaeljesus/rabbus/vendor/github.com/streadway/amqp/connection.go:228 +0x287
    
    

    This is caused by this code in Rabbus::Run:

        Select {
            ...
    		case err, ok := <-r.NotifyClose(make(chan *amqp.Error)):
    			if ok {
    				r.handleAmqpClose(err)
    			}
            ...
        }
    
    

    NotifyClose shouldn't be call on every iteration of the loop, but only one time for each connection as it register a receiver that will be valid until it closed. I think that for each message published/emited we add a new receiver with NotifyClose, and only the last one will be be listen to, so even without the Mutex we will still be stuck.

    With This PR we only call NotifyClose one time per connections, this has solved the problem in my manual testing.

  • Get rid of hidden goroutines

    Get rid of hidden goroutines

    This PR expose Run method with support for contexts, with this new design rabbus does't spawn hidden goroutines anymore, giving full control for the user.

    Before:

    r, _ := rabbus.New(dsn)
    r.EmitAsync() <- rabbus.Message{}
    
    for {
      select {
        case <-r.EmitOk():
        case err := <- r.EmitErr():
        case timeout:
      }
    }
    

    Now:

    r, _ := rabbus.New(dsn)
    ctx, cancel := context.WithCancel(context.Background())
    defer cancel()
    
    go r.Run(ctx)
    
    r.EmitAsync() <- rabbus.Message{}
    
    for {
      select {
        case <-r.EmitOk():
        case err := <- r.EmitErr():
        case timeout:
      }
    }
    
  • Enable configuration to passive connection to exchange

    Enable configuration to passive connection to exchange

    Ref #7

    I'm just validating whether the Listen should use a passive connection or a default one.

    Plus I didn't figure out how to test this property in amqp.Channel struct since it has only private properties.

  • increase reliability when dealing with closed channels and closure notifications

    increase reliability when dealing with closed channels and closure notifications

    If the emit channel is closed, the code will never reopen it. So, we should return an error in this case, as the rabbus.Rabbus has entered into a fail state, and cannot continue.

    NotifyClose stacks channels into a slice https://github.com/streadway/amqp/blob/master/channel.go#L444 This means that calling this multiple times results in a long slice of notification channels all of which have nobody listening on them because the iteration of the switch they are from is now gone. When a close event occurred, it would send an error on all of the channels, so that the last NotifyClose should still get its error message, but since they were generally unbuffered, it’s just as likely that the close notification process would block indefinitely on the channel send, waiting on a receiver that no longer is possible.

    Finally, when NotifyClose triggered, and re-establishing a consumer failed, we were closing all of the channels. (This enters the fail state noted above where the emit channel is now closed.) But by setting the r.reconn to a length of 10, some of the reconnect values kept being delivered while waiting for the end of the channel, allowing other reconnections to build back up, but this would hide the problem that we were actually closing the whole rabbus.Rabbus, which is not what the code clearly intended to do.

  • Make content encoding configurable when publishing message

    Make content encoding configurable when publishing message

    This PR add support for customizable ContentEncoding on the Message class. This aim to improve compatibility with consumer in other language when using binary format.

  • Create const for Kind field on ListenConfig and Message structs

    Create const for Kind field on ListenConfig and Message structs

    When i need create a new ListenConfig and Message, i need to pass a string to Kind field. Can we have some const for this? For example, i got an error when pass "Direct" not "direct" for this field.

    Thanks and congratulations for the great package!

  • Support for passive exchange connection

    Support for passive exchange connection

    amqp has the option to declare a passive exchange.

    sometimes we don't want to our applicantion to have control over exchange (like create/delete it) so passive connections is the right way to get this.

  • Avoid declare the exchange in every message sent

    Avoid declare the exchange in every message sent

    Avoid declare the exchange in every message sent also added benchmark test for EmmitAsync result:

    benchmark                old ns/op     new ns/op     delta
    BenchmarkEmitAsync-4     326063        50410         -84.54%
    

    resolve #3

  • Avoid declare exchange in every message

    Avoid declare exchange in every message

    Since #2 the library is declaring the exchange in every message received from the emit channel. This will kill the performance, right? WDYT, can we use some "cache" of topic already declared and avoid this unnecessary requests?

    PS. I can make a PR with this change

  • Fix close connection ordering issue

    Fix close connection ordering issue

    closing r.emit before notifyClose would cause this error Run exit on a wrong case https://github.com/rafaeljesus/rabbus/blob/master/rabbus.go#L201

    and you'll receive that error any time you execute rabbus.Close

  • add test for notify disconnect

    add test for notify disconnect

    I had to remove the Parallel as is was messing with the rabbus_publish_subscribe test.

    I use the rabbitmq management API to close the connection. Without #34 this test will deadlock

  • Use default exchange

    Use default exchange

    Hi there :wave:

    Is it possible to use RabbitMQ default exchange with Rabbus? The validation done here: https://github.com/rafaeljesus/rabbus/blob/master/rabbus.go#L151 doesn't allow an empty string, but this is actually how to select the default exchange: https://www.rabbitmq.com/tutorials/amqp-concepts.html#exchange-default

    Is there any workaround?

    Thanks!

  • Default Durable(true) doesn’t work

    Default Durable(true) doesn’t work

    So I started patch/durable-default-true and just made sure the durable flag is set from the start as documentation on Durable notes.

    However, this breaks the tests:

    $ go test
    --- FAIL: TestRabbus (3.00s)
        --- FAIL: TestRabbus/emit_async_message (3.00s)
            rabbus_test.go:239: unexpected durable: true
            rabbus_test.go:285: got timeout error during emit async
    FAIL
    exit status 1
    FAIL	github.com/rafaeljesus/rabbus	3.003s
    
  • EmitAsync closes the Consumer channel

    EmitAsync closes the Consumer channel

    I have created a consumer with rabbus.Listen. In the goroutine that handles the messages for this consumer I use EmitAsync to reply to a message. After calling EmitAsync, the consumer is no longer active. According to the RabbitMQ management page, the channel has closed. This means that subsequent messages are no longer handled. However, the message handler goroutine keeps running.

An AMQP 0-9-1 Go client maintained by the RabbitMQ team. Originally by @streadway: `streadway/amqp`

Go RabbitMQ Client Library This is a Go AMQP 0.9.1 client maintained by the RabbitMQ core team. It was originally developed by Sean Treadway. Differen

Jan 1, 2023
A wrapper of streadway/amqp that provides reconnection logic and sane defaults

go-rabbitmq Wrapper of streadway/amqp that provides reconnection logic and sane defaults. Hit the project with a star if you find it useful ⭐ Supporte

Dec 28, 2022
Golang AMQP wrapper for RabbitMQ with better API

go-rabbitmq Golang AMQP wrapper for RabbitMQ with better API Table of Contents Background Features Usage Installation Connect to RabbitMQ Declare Queu

Dec 21, 2022
Go client to reliable queues based on Redis Cluster Streams

Ami Go client to reliable queues based on Redis Cluster Streams. Consume/produce performance Performance is dependent from: Redis Cluster nodes count;

Dec 12, 2022
Testing message queues with RabbitMQ

Rabbit-MessageQueue Just a repository of RabbitMQ simple usage for queueing messages. You can use this as a sender or a receiver. More information is

Mar 10, 2022
:incoming_envelope: A fast Message/Event Hub using publish/subscribe pattern with support for topics like* rabbitMQ exchanges for Go applications

Hub ?? A fast enough Event Hub for go applications using publish/subscribe with support patterns on topics like rabbitMQ exchanges. Table of Contents

Dec 17, 2022
Go library to build event driven applications easily using AMQP as a backend.

event Go library to build event driven applications easily using AMQP as a backend. Publisher package main import ( "github.com/creekorful/event" "

Dec 5, 2021
golang amqp rabbitmq produce consume

Step 1: Container Run Container docker run -itp 9001:9001 --name go_temp -v /usr/local/project/temp/go_amqp/:/home/ -d golang:1.16.6 Enter Container

Nov 26, 2021
Tool for collect statistics from AMQP (RabbitMQ) broker. Good for cloud native service calculation.

amqp-statisticator Tool for collect statistics around your AMQP broker. For example RabbitMQ expose a lot information trought the management API, but

Dec 13, 2021
Gorabbit - Simple library for AMQP Rabbit MQ publish subscribe

gorabbit Rabbit MQ Publish & Subscribe Simple library for AMQP Rabbit MQ publish

Oct 4, 2022
A tiny wrapper around NSQ topic and channel :rocket:

Event Bus NSQ A tiny wrapper around go-nsq topic and channel. Protect nsq calls with gobreaker. Installation go get -u github.com/rafaeljesus/nsq-even

Sep 27, 2022
Batch messages over a time interval

timebatch timebatch is a package for batching messages over a time interval. This can be useful for receiving messages that occur "quickly" and sendin

Nov 3, 2021
goczmq is a golang wrapper for CZMQ.

goczmq Introduction A golang interface to the CZMQ v4.2 API. Install Dependencies libsodium libzmq czmq For CZMQ master go get github.com/zeromq/goczm

Jan 5, 2023
Golang API wrapper for MangaDex v5's MVP API.

mangodex Golang API wrapper for MangaDex v5's MVP API. Full documentation is found here. This API is still in Open Beta, so testing may not be complet

Oct 23, 2022
High abstraction wrapper for Golang Rabbit MQ Client

GRMQ Go Rabbit MQ What are the typical use-cases of RabbitMQ broker ? We create

Nov 3, 2022
⚡️ A lightweight service that will build and store your go projects binaries, Integrated with Github, Gitlab, Bitbucket and Bitbucket Server.
⚡️ A lightweight service that will build and store your go projects binaries, Integrated with Github, Gitlab, Bitbucket and  Bitbucket Server.

Rabbit A lightweight service that will build and store your go projects binaries. Rabbit is a lightweight service that will build and store your go pr

Nov 19, 2022
Cadence is a distributed, scalable, durable, and highly available orchestration engine to execute asynchronous long-running business logic in a scalable and resilient way.

Cadence Visit cadenceworkflow.io to learn about Cadence. This repo contains the source code of the Cadence server. To implement workflows, activities

Jan 9, 2023
The Xiaomi message push service is a system-level channel on MIUI and is universal across the platform, which can provide developers with stable, reliable, and efficient push services.

Go-Push-API MiPush、JiPush、UMeng MiPush The Xiaomi message push service is a system-level channel on MIUI and is universal across the platform, which c

Oct 20, 2022
May 11, 2023