A user friendly RabbitMQ library written in Golang.

TurboCookedRabbit

A user friendly RabbitMQ library written in Golang to help use streadway/amqp.
Based on my work found at CookedRabbit.

Go Report Card


Sourcegraph
HitCount

Work Recently Finished (v2.0.0 -> v2.1.0)

  • Several non-breaking hotfixes up to v2.0.9.
  • Several small quality of life improvements added by request.
  • Breaking Change (v2.1.0) - ReceivedMessage now contains the Amqp.Delivery (no more cloning/copying that functionality)
  • Breaking Change (v2.1.0) - LetterID is now a UUID by default.
  • Breaking Change (v2.1.0) - LetterID is now mapping to MessageID in RabbitMQ on Publish and is mapped to a string MessageID on ReceivedMessage.

The major goals (now that everything is working well) are to aid quality of life and visibility.

Be sure to visit tests for examples on how to do a variety of actions with the library. They are always kept up to date, even if the README.md falls short at times.

Developer's Notes

  • Golang 1.15.5 (2020/11/15)
  • RabbitMQ Server v3.8.5 (simple localhost)
  • Erlang v23.0
  • Streadway/Amqp v1.0.0
Click here to see how to migrate (or get V2) and breaking details!

TO GET (including V2)

go get -u "github.com/houseofcat/turbocookedrabbit/v2"

If I hotfix legacy go get -u "github.com/houseofcat/turbocookedrabbit/v1"

To continue using legacy up to v1.2.3 go get -u "github.com/houseofcat/turbocookedrabbit"

TO UPGRADE TO V2

Everything is unified to TCR.
Convert your imports to a single "github.com/houseofcat/turbocookedrabbit/v2/pkg/tcr".
And where you have pools. or models. or publisher. or utils. replace it with just this tcr.

Why am I being so complicated? See below...

Started Semantic Versioning

  • Separate go.mods.
  • Separate pkgs.

All legacy work will be done with a new module /v1/pkg/***.
All v2 work will be done with a new module /v2/pkg/tcr.

Apologies for all the confusion but if I don't do this, apparently I couldn't use the v2.0.0 tagging system and allow people to @upgrade. This is a stupid design choice by the Go Modules and was irrevocably harder to get VSCode to understand what the hell I was trying to do and therefore, not shit it's pants.

Learn more about the fabulous world of PhD's at Google designing things!
https://blog.golang.org/v2-go-modules

Breaking Change Notice (v1.x.x -> v2.0.0)

Decided to opt-in for a near total rewrite. Simpler. Cleaner. Easier to use. The AutoPublish is now using PublishWithConfirmation and so what I now try to emulate is "At Least Once" style of delivery.

Reason For Rewrite?
I am better at Golang, in general, and I now have a year of experience using my own software in a production environment. This rewrite has an even stronger auto-recovery mechanism and even harder Publish (PublishWithConfirmation) system. In some ways, performance has significantly increased. In other ways, it sacrified performance for resilience.

Basic Performance

Click for seeing Publisher performance!

Test Settings

i7 8700K @ 4.7GHz  
Samsung EVO 970  
RabbitMQ Server 3.8.5 / Erlang 23.0 installed on the same host.    
Single Queue Publish (Without Confirmations)  
Messages Transient Type, 1500-2500 bytes, wrapped themselves, uncompressed.  
100,000 Count Publish Test  
NON-DEBUG
Benchmark Starts: 2020-07-01 13:01:37.6260111 -0400 EDT m=+0.044880301
Benchmark End: 2020-07-01 13:01:40.6130211 -0400 EDT m=+3.031890301
Messages: 33478.294348 msg/s

Click for seeing AutoPublisher performance!

Test Settings

i7 8700K @ 4.7GHz  
Samsung EVO 970  
RabbitMQ Server 3.8.5 / Erlang 23.0 installed on the same host.   
Single Queue Publish (With Confirmations) / Single Consumer  
Messages Durable Type, 1500-2500 bytes, wrapped themselves, uncompressed.  
Two Hour Stress Test, Consumer/Publisher running in tandem.  
DEBUG  
Failed to Queue For Publishing: 0
Publisher Errors: 0
Messages Published: 25882186
Messages Failed to Publish: 132
Consumer Errors: 0
Messages Acked: 25883360
Messages Failed to Ack: 0
Messages Received: 25883360
Messages Unexpected: 0
Messages Duplicated: 0
PASS

AutoPublisher (PublishWithConfirmation) averaged around 3,594.75 msg/s.
Consumer averaged around 3,594.91 msg/s. Probably limited to the AutoPublisher.

The Seasoning/Config

Click for details on creating a config from file!

The config.json is just a quality of life feature. You don't have to use it. I just like how easy it is to change configurations on the fly.

config, err := tcr.ConvertJSONFileToConfig("testseasoning.json")

The full structure RabbitSeasoning is available under pkg/tcr/configs.go


The Publisher

Click for creating publisher examples!

Assuming you have a ConnectionPool already setup. Creating a publisher can be achieved like so:

publisher := tcr.NewPublisherFromConfig(Seasoning, ConnectionPool)


Click for simple publish example!

Once you have a publisher, you can perform a relatively simple publish.

letter := tcr.CreateMockLetter(1, "", "TcrTestQueue", nil)
publisher.Publish(letter)

This CreateMockLetter method creates a simple HelloWorld message letter with no ExchangeName and a QueueName/RoutingKey of "TcrTestQueue". The helper function creates bytes for "h e l l o w o r l d" as a message body.

The concept of a Letter may seem clunky but the real advantage is async publishing and replay-ability. And you still have streadway/amqp to rely on should prefer simple publshing straight to an amqp.Channel.


Click for simple publish with confirmation example!

We use PublishWithConfirmation when we want a more resilient publish mechanism. It will wait for a publish confirmation until timeout.

letter := tcr.CreateMockRandomLetter("TcrTestQueue")
publisher.PublishWithConfirmation(letter, time.Millisecond*500)

WaitLoop:
for {
	select {
	case receipt := <-publisher.PublishReceipts():
		if !receipt.Success {
			// log?
			// requeue?
			// break WaitLoop?
		}
	default:
		time.Sleep(time.Millisecond * 1)
	}
}

The default behavior for a RabbitService subscribed to a publisher's PublishReceipts() is to automatically retry Success == false receipts with QueueLetter().


Click for simple publish with confirmation and context example!

ctx, cancel := context.WithTimeout(context.Background(), time.Duration(1)*time.Minute)
letter := tcr.CreateMockRandomLetter("TcrTestQueue")

publisher.PublishWithConfirmationContext(ctx, letter)

WaitLoop:
for {
	select {
	case receipt := <-publisher.PublishReceipts():
		if !receipt.Success {
			// log?
			// requeue?
			// break WaitLoop?
		}
	default:
		time.Sleep(time.Millisecond * 1)
	}
}

cancel()


Click for AutoPublish example!

Once you have a publisher, you can perform StartAutoPublishing!

// This tells the Publisher to start reading an **internal queue**, and process Publishing concurrently but with individual RabbitMQ channels.
publisher.StartAutoPublishing()

GetPublishReceipts:
for {
    select {
case receipt := <-publisher.PublishReceipts():
		if !receipt.Success {
			// log?
			// requeue?
			// break WaitLoop?
		}
    default:
        time.Sleep(1 * time.Millisecond)
    }
}

Then just invoke QueueLetter to queue up a letter on send. It returns false if it failed to queue up the letter to send. Usually that happens when Shutdown has been called.

ok := publisher.QueueLetter(letter) // How simple is that!

...or more complex such as...

for _, letter := range letters {
    if publisher.QueueLetter(letter) {
		//report sucess
	}
}

So you can see why we use these message containers called letter. The letter has the body and envelope inside of it. It has everything you need to publish it. Think of it a small, highly configurable, message body and the intended address. This allows for async replay on failure.

Notice that you don't have anything to do with channels and connections (even on outage)!


Click for a more convoluted AutoPublish example!

Let's say the above example was too simple for you... ...let's up the over engineering a notch on what you can do with AutoPublish.

publisher.StartAutoPublish() // this will retry based on the Letter.RetryCount passed in.

timer := time.NewTimer(1 * time.Minute) // Stop Listening to notifications after 1 minute.

messageCount = 1000
connectionErrors := 0
successCount := 0
failureCount := 0

ReceivePublishConfirmations:
    for {
        select {
        case <-timer.C:
            break ReceivePublishConfirmations  
        case err := <-connectionPool.Errors():
            if err != nil {
                connectionErrors++ // Count ConnectionPool failures.
            }
            break
    	case publish := <-publisher.PublishReceipts():
            if publish.Success {
                successCount++
            } else {
                failureCount++
            }

            // I am only expecting to publish 1000 messages
            if successCount+failureCount == messageCount { 
                break ReceivePublishConfirmations
            }

            break
        default:
            time.Sleep(1 * time.Millisecond)
            break
        }
    }

We have finished our work, we succeeded or failed to publish 1000 messages. So now we want to shutdown everything!

publisher.Shutdown(false)


The Consumer

Click for simple Consumer usage example!

Consumer provides a simple Get and GetBatch much like the Publisher has a simple Publish.

delivery, err := consumer.Get("TcrTestQueue")

Exit Conditions:

  • On Error: Error Return, Nil Message Return
  • On Not Ok: Nil Error Return, Nil Message Return
  • On OK: Nil Error Return, Message Returned

We also provide a simple Batch version of this call.

delivery, err := consumer.GetBatch("TcrTestQueue", 10)

Exit Conditions:

  • On Error: Error Return, Nil Messages Return
  • On Not Ok: Nil Error Return, Available Messages Return (0 upto (nth - 1) message)
  • When BatchSize is Reached: Nil Error Return, All Messages Return (n messages)


Click for an actual Consumer consuming example!

Let's start with the ConsumerConfig, and again, the config is just a quality of life feature. You don't have to use it.

Here is a JSON map/dictionary wrapped in a ConsumerConfigs.

"ConsumerConfigs": {
	"TurboCookedRabbitConsumer-Ackable": {
		"Enabled": true,
		"QueueName": "TcrTestQueue",
		"ConsumerName": "TurboCookedRabbitConsumer-Ackable",
		"AutoAck": false,
		"Exclusive": false,
		"NoWait": false,
		"QosCountOverride": 100,
		"SleepOnErrorInterval": 0,
		"SleepOnIdleInterval": 0
	}
},

And finding this object after it was loaded from a JSON file.

consumerConfig, ok := config.ConsumerConfigs["TurboCookedRabbitConsumer-AutoAck"]

Creating the Consumer from Config after creating a ConnectionPool.

consumer := consumer.NewConsumerFromConfig(consumerConfig, connectionPool)

Then start Consumer?

consumer.StartConsuming()

Thats it! Wait where our my messages?! MY QUEUE IS DRAINING!

Oh, right! That's over here, keeping with the out of process design...

ConsumeMessages:
    for {
        select {
        case message := <-consumer.Messages():

            requeueError := false
            var err error
            /* Do something with the message! */
            if message.IsAckable { // Message might be Ackable - be sure to check!
                if err != nil {
                    message.Nack(requeueError)
                }

                message.Acknowledge()
            }

        default:
            time.Sleep(100 * time.Millisecond) // No messages == optional nap time.
        }
    }

Alternatively you could provide an action for the consumer (this will bypass your internal message buffer).

consumer.StartConsumingWithAction(
		func(msg *tcr.ReceivedMessage) {
			if err := msg.Acknowledge(); err != nil {
				fmt.Printf("Error acking message: %v\r\n", msg.Body)
			}
		})


Wait! What the hell is coming out of <-ReceivedMessages()

Great question. I toyed with the idea of returning Letters like Publisher uses (and I may still at some point) but for now you receive a tcr.ReceivedMessage.

But... why? Because the payload/data/message body is in there but, more importantly, it contains the means of quickly acking the message! It didn't feel right being merged with a tcr.Letter. I may revert and use the base amqp.Delivery which does all this and more... I just didn't want users to have to also pull in streadway/amqp to simplify their imports. If you were already using it wouldn't be an issue. This design is still being code reviewed in my head.

One of the complexities of RabbitMQ is that you need to Acknowledge off the same Channel that it was received on. That makes out of process designs like mine prone to two things: hackery and/or memory leaks (passing the channels around everywhere WITH messages).

There are two things I hate about RabbitMQ

  • Channels close on error.
  • Messages have to be acknowledge on the same channel.

What I have attempted to do is to make your life blissful by not forcing you to deal with it. The rules are still there, but hopefully, I give you the tools to not stress out about it and to simplify out of process acknowledgements.

That being said, there is only so much I can hide in my library, which is why I have exposed .Errors(), so that you can code and log accordingly.

err := consumer.StartConsuming()
// Handle failure to start.

ctx, cancel := context.WithTimeout(context.Background(), time.Duration(1)*time.Minute) // Timeouts

ConsumeMessages:
for {
    select {
    case <-ctx.Done():
        fmt.Print("\r\nContextTimeout\r\n")
        break ConsumeMessages
    case message := <-consumer.ReceivedMessages(): // View Messages
        fmt.Printf("Message Received: %s\r\n", string(message.Body))
    case err := <-consumer.Errors(): // View Consumer errors
        /* Handle */
    case err := <-ConnectionPool.Errors(): // View ConnectionPool errors
        /* Handle */
    default:
        time.Sleep(100 * time.Millisecond)
        break
    }
}

Here you may trigger StopConsuming with this

immediately := false
flushMessages := false
err = consumer.StopConsuming(immediately, flushMessages)

But be mindful there are Channel Buffers internally that may be full and goroutines waiting to add even more.

I have provided some tools that can be used to help with this. You will see them sprinkled periodically through my tests.

consumer.FlushStop() // could have been called more than once.
consumer.FlushErrors() // errors can quickly build up if you stop listening to them
consumer.FlushMessages() // lets say the ackable messages you have can't be acked and you just need to flush them all out of memory

Becareful with FlushMessages(). If you are autoAck = true and receiving ackAble messages, this is not safe. You will wipe them from your memory and they are not still in the original queue! If you were using manual ack, i.e. autoAck = false then you are free to do so safely. Your next consumer will pick up where you left off.

Here I demonstrate a very busy ConsumerLoop. Just replace all the counter variables with logging and then an action performed with the message and this could be a production microservice loop.

ConsumeLoop:
	for {
		select {
		case <-timeOut:
			break ConsumeLoop
		case receipt := <-publisher.PublishReceipts():
			if receipt.Success {
				fmt.Printf("%s: Published Success - LetterID: %d\r\n", time.Now(), receipt.LetterID)
				messagesPublished++
			} else {
				fmt.Printf("%s: Published Failed Error - LetterID: %d\r\n", time.Now(), receipt.LetterID)
				messagesFailedToPublish++
			}
		case err := <-ConnectionPool.Errors():
			fmt.Printf("%s: ConnectionPool Error - %s\r\n", time.Now(), err)
			connectionPoolErrors++
		case err := <-consumer.Errors():
			fmt.Printf("%s: Consumer Error - %s\r\n", time.Now(), err)
			consumerErrors++
		case message := <-consumer.ReceivedMessages():
			messagesReceived++
			fmt.Printf("%s: ConsumedMessage\r\n", time.Now())
			go func(msg *tcr.ReceivedMessage) {
				err := msg.Acknowledge()
				if err != nil {
					fmt.Printf("%s: AckMessage Error - %s\r\n", time.Now(), err)
					messagesFailedToAck++
				} else {
					fmt.Printf("%s: AckMessaged\r\n", time.Now())
					messagesAcked++
				}
			}(message)
		default:
			time.Sleep(100 * time.Millisecond)
		}
	}


The Pools

ConnectionPools, how do they even work?!

ChannelPools have been removed for simplification. Unfortunately for the ConnectionPool, there is still a bit of complexity here. If you have one Connection, I generally recommend around 5 Channels to be built on top of each connection. Your mileage may vary so be sure to test!

Ex.) ConnectionCount: 5 => ChannelCount: 25

I allow most features to be configurable via PoolConfig.

"PoolConfig": {
	"URI": "amqp://guest:guest@localhost:5672/",
	"ConnectionName": "TurboCookedRabbit",
	"SleepOnErrorInterval": 100,
	"MaxCacheChannelCount": 50,
	"MaxConnectionCount": 10,
	"Heartbeat": 30,
	"ConnectionTimeout": 10,
	"TLSConfig": {
		"EnableTLS": false,
		"PEMCertLocation": "test/catest.pem",
		"LocalCertLocation": "client/cert.ca",
		"CertServerName": "hostname-in-cert"
	}
}

There is a chance for a pause/delay/lag when there are no Connections/Channels available. High performance on your system may require fine tuning and benchmarking. The thing is though, you can't just add Connections and Channels evenly. Connections, server side, are not an infinite resource (channel construction/destruction isn't really either!). You can't keep just adding connections though so I alleviate that by keeping them cached/pooled for you.

The following code demonstrates one super important part with ConnectionPools: flag erred Channels. RabbitMQ server closes Channels on error, meaning this little guy is dead. You normally won't know it's dead until the next time you use it - and that can mean messages lost. By flagging the channel as having had an error, when returning it, we process the dead channel and attempt replace it.

// Publish sends a single message to the address on the letter using a cached ChannelHost.
// Subscribe to PublishReceipts to see success and errors.
// For proper resilience (at least once delivery guarantee over shaky network) use PublishWithConfirmation
func (pub *Publisher) Publish(letter *Letter, skipReceipt bool) {

	chanHost := pub.ConnectionPool.GetChannelFromPool()

	err := chanHost.Channel.Publish(
		letter.Envelope.Exchange,
		letter.Envelope.RoutingKey,
		letter.Envelope.Mandatory,
		letter.Envelope.Immediate,
		amqp.Publishing{
			ContentType:  letter.Envelope.ContentType,
			Body:         letter.Body,
			Headers:      amqp.Table(letter.Envelope.Headers),
			DeliveryMode: letter.Envelope.DeliveryMode,
		},
	)

	if !skipReceipt {
		pub.publishReceipt(letter, err)
	}

	pub.ConnectionPool.ReturnChannel(chanHost, err != nil)
}


Click here to see how to build a Connection and Channel Pool!

Um... this is the easy relatively easy to do with configs.

cp, err := tcr.NewConnectionPool(Seasoning.PoolConfig)
if err != nil {
	// blow up
}


Click here to see how to get and use a Connection!

This isn't really necessary to use amqp.Connection directly in my library, but you are free to do so.

connHost, err := ConnectionPool.GetConnection()
channel, err := connHost.Connection.Channel()
if err != nil {
	ConnectionPool.ReturnConnection(connHost, true)
}
ConnectionPool.ReturnConnection(connHost, false)

This ChannelHost is like a wrapper around the AmqpChannel that adds a few features like Errors and ReturnMessages. You also don't have to use my Publisher, Consumer, and Topologer, or RabbitService. You can use the ConnectionPool by itself if you just like the idea of backing your already existing code behind a ConnectionPool that has recovery and TCP socket load balancing!

The Publisher/Consumer/Topologer all use code similar to this and should help provide a simple understanding of the ConnectionPool.

chanHost := ConnectionPool.GetChannelFromPool()
err := chanHost.Channel.Publish(
		exchangeName,
		routingKey,
		mandatory,
		immediate,
		amqp.Publishing{
			ContentType: contentType,
			Body:        body,
		},
    )
ConnectionPool.ReturnChannel(chanHost, err != nil)


Click here to see how to get and use a Channel!

chanHost := ConnectionPool.GetChannelFromPool()

ConnectionPool.ReturnChannel(chanHost, false)

This ChannelHost is like a wrapper around the AmqpChannel that adds a few features like Errors and ReturnMessages. You also don't have to use my Publisher, Consumer, and Topologer, or RabbitService. You can use the ConnectionPool yourself if you just like the idea of backing your already existing code behind a ConnectionPool that has recovery and TCP socket load balancing!

The Publisher/Consumer/Topologer all use code similar to this!

chanHost := ConnectionPool.GetChannelFromPool()
err := chanHost.Channel.Publish(
		exchangeName,
		routingKey,
		mandatory,
		immediate,
		amqp.Publishing{
			ContentType: contentType,
			Body:        body,
		},
    )
ConnectionPool.ReturnChannel(chanHost, err != nil)


Click here to see how to get a transient Channel!

This should look like pretty standard RabbitMQ code once you get a normal amqp.Channel out.

ackable := true
channel := ConnectionPool.GetTransientChannel(ackable)
defer channel.Close() // remember to close when you are done!


What happens during an outage?

You will get errors performing actions. You indicate to the library your action failed, err != nil, and we will try restoring connectivity for you.


Click to see how one may properly prepare for an outage!

Observe a retry publish strategy with the following code example:

cp, err := tcr.NewConnectionPool(Seasoning.PoolConfig)
if err != nil {
	// blow up
}

iterations := 0
retryCount := 10

for iterations < retryCount {

	chanHost := ConnectionPool.GetChannelFromPool() // we are always getting channels on each publish

	letter := tcr.CreateMockRandomLetter("TcrTestQueue")

	err := chanHost.Channel.Publish(
		letter.Envelope.Exchange,
		letter.Envelope.RoutingKey,
		letter.Envelope.Mandatory,
		letter.Envelope.Immediate,
		amqp.Publishing{
			ContentType: letter.Envelope.ContentType,
			Body:        letter.Body,
		},
	)

	if err == nil {
		ConnectionPool.ReturnChannel(chanHost, false) // we are always returning the channels
		break 
	}

	ConnectionPool.ReturnChannel(chanHost, true) // we are always returning the channels
	time.Sleep(10 * time.Second)
	iterations++

	if iterations == 10 {
		break
	}
}

cp.Shutdown()

You can create publishing tests in loops while manually shutting down the RabbitMQ connections! This is great for chaos engineering testing!

Severing all connections:
C:\Program Files\RabbitMQ Server\rabbitmq_server-3.8.5\sbin>rabbitmqctl.bat close_all_connections "suck it, trebek"

SleepOnErrorInterval aids in slowing the system down when errors are occurring. Stops you from rapidly taking down your RabbitMQ nodes when they are experiencing issues.

"PoolConfig": {
	"URI": "amqp://guest:guest@localhost:5672/",
	"ConnectionName": "TurboCookedRabbit",
	"SleepOnErrorInterval": 100,
	"MaxCacheChannelCount": 50,
	"MaxConnectionCount": 10,
	"Heartbeat": 30,
	"ConnectionTimeout": 10,
	"TLSConfig": {
		"EnableTLS": false,
		"PEMCertLocation": "test/catest.pem",
		"LocalCertLocation": "client/cert.ca",
		"CertServerName": "hostname-in-cert"
	}
},


The Topologer

How do I create/delete/bind queues and exchanges?

Coming from plain streadway/amqp there isn't too much to it. Call the right method with the right parameters.

I have however integrated those relatively painless methods now with a ConnectionPool and added a TopologyConfig for a JSON style of batch topology creation/binding. The real advantages here is that I allow things in bulk and allow you to build topology from a topology.json file.

Creating an Exchange with a tcr.Exchange

err := top.CreateExchangeFromConfig(exchange) // tcr.Exchange
if err != nil {
    return err
}

Or if you prefer it more manual:

exchangeName := "FancyName"
exchangeType := "fanout"
passiveDeclare, durable, autoDelete, internal, noWait := false, true, false, false, false

err := top.CreateExchange(exchangeName, exchangeType, passiveDeclare, durable, autoDelete, internal, noWait, nil)
if err != nil {
    return err
}

Creating an Queue with a tcr.Queue

err := top.CreateQueueFromConfig(queue) // tcr.Queue
if err != nil {
    return err
}

Or, again, if you prefer it more manual:

queueName := "FancyQueueName"
passiveDeclare, durable, autoDelete, exclusive, noWait := false, true, false, false, false

err := top.CreateQueue(queueName, passiveDeclare, durable, autoDelete, exclusive, noWait, nil)
if err != nil {
    return err
}


How do I do this in bulk?

Here I demonstrate the Topology as JSON (full sample is checked in as testtopology.json)

{
	"Exchanges": [
		{
			"Name": "MyTestExchangeRoot",
			"Type": "direct",
			"PassiveDeclare": false,
			"Durable": true,
			"AutoDelete": false,
			"InternalOnly": false,
			"NoWait": false
		},
		{
			"Name": "MyTestExchange.Child01",
			"Type": "direct",
			"PassiveDeclare": false,
			"Durable": true,
			"AutoDelete": false,
			"InternalOnly": false,
			"NoWait": false
		},
		{
			"Name": "MyTestExchange.Child02",
			"Type": "direct",
			"PassiveDeclare": false,
			"Durable": true,
			"AutoDelete": false,
			"InternalOnly": false,
			"NoWait": false
		}
	],
	"Queues": [
		{
			"Name": "QueueAttachedToRoot",
			"PassiveDeclare": false,
			"Durable": true,
			"AutoDelete": false,
			"Exclusive": false,
			"NoWait": false
		},
		{
			"Name": "QueueAttachedToExch01",
			"PassiveDeclare": false,
			"Durable": true,
			"AutoDelete": false,
			"Exclusive": false,
			"NoWait": false
		},
		{
			"Name": "QueueAttachedToExch02",
			"PassiveDeclare": false,
			"Durable": true,
			"AutoDelete": false,
			"Exclusive": false,
			"NoWait": false
		}
	],
	"QueueBindings": [
		{
			"QueueName": "QueueAttachedToRoot",
			"ExchangeName": "MyTestExchangeRoot",
			"RoutingKey": "RoutingKeyRoot",
			"NoWait": false
		},
		{
			"QueueName": "QueueAttachedToExch01",
			"ExchangeName": "MyTestExchange.Child01",
			"RoutingKey": "RoutingKey1",
			"NoWait": false
		},
		{
			"QueueName": "QueueAttachedToExch02",
			"ExchangeName": "MyTestExchange.Child02",
			"RoutingKey": "RoutingKey2",
			"NoWait": false
		}
	],
	"ExchangeBindings": [
		{
			"ExchangeName": "MyTestExchange.Child01",
			"ParentExchangeName": "MyTestExchangeRoot",
			"RoutingKey": "ExchangeKey1",
			"NoWait": false
		},
		{
			"ExchangeName": "MyTestExchange.Child02",
			"ParentExchangeName": "MyTestExchange.Child01",
			"RoutingKey": "ExchangeKey2",
			"NoWait": false
		}
	]
}

I have provided a helper method for turning it into a TopologyConfig.

topologyConfig, err := tcr.ConvertJSONFileToTopologyConfig("testtopology.json")

Creating a simple and shareable ConnectionPool.

cp, err := tcr.NewConnectionPool(Seasoning.PoolConfig)

Using the ConnectionPool to create our Topologer.

topologer := tcr.NewTopologer(cp)

Assuming you have a blank slate RabbitMQ server, this shouldn't error out as long as you can connect to it.

ignoreErrors := false
err = topologer.BuildToplogy(topologyConfig, ignoreErrors)

Fin.

That's it really. In the future I will have more features. Just know that I think you can export your current Server configuration from the Server itself.


The RabbitService

Click here to see how RabbitService simplifies things even more!

Here I demonstrate the steps of loading the JSON configuration and creating a new RabbitService!

service, err := tcr.NewRabbitService(Seasoning, "", "", nil, nil)

or with encryption/salt added

service, err := tcr.NewRabbitService(Seasoning, "PasswordyPassword", "SaltySalt", nil, nil)

optionally providing actions for processing errors and publish receipts

service, err := tcr.NewRabbitService(
    Seasoning,
    "PasswordyPassword",
    "SaltySalt",
	processPublishReceipts, //func(*PublishReceipt)
	processError) //func(error)

RabbitService provides default behaviors for these functions when they are nil. On Error for example, we write to console. On PublishReceipts that are unsuccesful, we requeue the message for Publish on your behalf using the AutoPublisher.

The service has direct access to a Publisher and Topologer

rs.Topologer.CreateExchangeFromConfig(exchange)
skipReceipt := true
rs.Publisher.Publish(letter, skipReceipt)

The consumer section is more complicated but I read the map of consumers that were in config and built them out for you to use when ready:

var consumer *consumer.Consumer
consumer, err := rs.GetConsumer("MyConsumer")
consumer.StartConsuming()

And don't forget to subscribe to ReceivedMessages() when using StartConsuming() to actually get them out of the internal buffer!


But wait, there's more!

The service allows JSON Marshalling, Argon2 hashing, Aes-128/192/256 bit encryption, and GZIP/ZSTD compression.
Note: ZSTD is from 3rd party library and it's working but in Beta - if worried use the standard vanilla GZIP.

Setting Up Hashing (required for Encryption):

password := "SuperStreetFighter2Turbo"
salt := "MBisonDidNothingWrong"

Service.SetHashForEncryption(password, salt)

The password/passphrase is your responsibility on keeping it safe. I recommend a Key Vault of some flavor.

We set the HashKey internally to the Service so you can do seamless encryption during Service.Publish and what you have in the corresponding Configs added to RabbitSeasoning. Here are some decent settings for Argon2 hashing.

"EncryptionConfig" : {
	"Enabled": true,
	"Type": "aes",
	"TimeConsideration": 1,
	"MemoryMultiplier": 64,
	"Threads": 2
},
"CompressionConfig": {
	"Enabled": true,
	"Type": "gzip"
},

And all of this is built-in into the Service level Publisher.

Here are some examples...

JSON Marshalled Data Example:

Service.Config.EncryptionConfig.Enabled = false
Service.Config.CompressionConfig.Enabled = false

wrapData := false
data := interface{}
err := Service.Publish(data, "MyExchange", "MyQueue", wrapData)
if err != nil {
	
}

Isn't that easy?

Let's add compression!

  1. Marshal interface{} into bytes.
  2. Compress bytes.
  3. Publish.
Service.Config.EncryptionConfig.Enabled = false
Service.Config.CompressionConfig.Enabled = true
Service.Config.CompressionConfig.Type = "gzip"

wrapData := false
data := interface{}
err := Service.Publish(data, "MyExchange", "MyQueue", wrapData)
if err != nil {
	
}

To reverse it into a struct!

  • Consume Message (get your bytes)
  • Decompress Bytes (with matching type)
  • Unmarshal bytes to your struct!
  • Profit!

What about Encryption?

Well if you are following my config example, we will encrypt using a SymmetricKey / AES-256 bit, with nonce and a salty 32-bit HashKey from Argon2.

Service.Config.EncryptionConfig.Enabled = true
Service.Config.CompressionConfig.Enabled = false

wrapData := false
data := interface{}
err := Service.Publish(data, "MyExchange", "MyQueue", wrapData)
if err != nil {
	
}

Boom, finished! That's it. You have encrypted your entire payload in the queue. Nobody can read it without your passphrase and salt.

So to reverse it into a struct, you need to:

  • Consume Message (get your bytes)
  • Decrypt Bytes (with matching type)
  • Unmarshal bytes to your struct!
  • Profit!

What about Comcryption (a word I just made up)?

Good lord, fine!

The steps this takes is this:

  1. Marshal interface{} into bytes.
  2. Compress bytes.
  3. Encrypt bytes.
  4. Publish.
Service.Config.EncryptionConfig.Enabled = true
Service.Config.CompressionConfig.Enabled = true

wrapData := false
data := interface{}
err := Service.Publish(data, "MyExchange", "MyQueue", wrapData)
if err != nil {
   
}

So to reverse comcryption, you need to:

  • Consume Message (get your bytes)
  • Decrypt Bytes (with matching type)
  • Decompress bytes (with matching type)
  • Unmarshal bytes to your struct!

Depending on your payloads, if it's tons of random bytes/strings, compression won't do much for you - probably even increase size. AES encryption only adds little byte size overhead for the nonce I believe.

Here is a possible good use case for comcryption. It is a beefy 5KB+ JSON string of dynamic, but not random, sensitive data. Quite possibly PII/PCI user data dump. Think list of Credit Cards, Transactions, or HIPAA data. Basically anything you would see in GDPR bingo!

So healthy sized JSONs generally compress well ~ 85-97% at times. If it's sensitive, it needs to be encrypted. Smaller (compressed) bytes encrypt faster. Comcryption!

So what's the downside? It's slow, might need tweaking still... but it's slow. At least compared to plain publishing.

SECURITY WARNING

This doesn't really apply to my use cases, however, some forms of deflate/gzip, combined with some protocols, created a vulnerability by compressing and then encrypting.

I would be terrible if I didn't make you aware of CRIME and BREACH attacks. https://crypto.stackexchange.com/questions/29972/is-there-an-existing-cryptography-algorithm-method-that-both-encrypts-and-comp/29974#29974

So you can choose wisely :)


Wait... what was that wrap boolean?

I knew I forgot something!

Consider the following example, here we are performing Comcryption.

Service.Config.EncryptionConfig.Enabled = true
Service.Config.CompressionConfig.Enabled = true

wrapData := false
data := interface{}
err := Service.Publish(data, "MyExchange", "MyQueue", wrapData)
if err != nil {
	
}

The problem here is that the message could leave you blinded by the dark! I tried to enhance this process, by wrapping your bits.
If you wrap your message, it is always of type tcr.WrappedBody coming back out.

The following change (with the above code)...

wrapData = true

...produces this message wrapper.

{
	"LetterID": 0,
	"Body": {
		"Encrypted": true,
		"EncryptionType": "aes",
		"Compressed": true,
		"CompressionType": "gzip",
		"UTCDateTime": "2019-09-22T19:13:55Z",
		"Data": "+uLJxH1YC1u5KzJUGTImKcaTccSY3gXsMaCoHneJDF+9/9JDaX/Fort92w8VWyTiKqgQj+2gqIaAXyHwFObtjL3RAxTn5uF/QIguvuZ+/2X8qn/+QDByuCY3qkRKu3HHzmwd+GPfgNacyaQgS2/hD2uoFrwR67W332CHWA=="
	}
}

You definitely can't tell this is M. Bison's Social Security Number, but can see the metadata.

The idea around this metadata is that it could help identify when a passphrase was used to create this, then you can determine which key was live based on UTCDateTime. This means that you have to work out key rotations from your end of things.

The inner Data deserializes to []byte, which means based on a consumed tcr.WrappedBody, you know immediately if it is a compressed, encrypted, or just a JSON []byte.


I think I bitshifted to the 4th dimension... how the hell do I get my object/struct back?

I am going to assume we are Comcrypting, so adjust this example to your needs

First we get our data out of a Consumer, once we have a tcr.Body.Data []byte, we can begin reversing it.

var json = jsoniter.ConfigFastest // optional - can use built-in json if you prefer

message := <-consumer.Messages() // get comprypted message

wrappedBody := &tcr.WrappedBody{}
err = json.Unmarshal(message.Body, wrappedBody) // unmarshal as ModdedLetter
if err != nil {
	// I probably have a bug.
}

buffer := bytes.NewBuffer(wrappedBody.Body.Data)

// Helper function to get the original data (pre-wrapped) out based on current service settings.
// You would have to remember to write a decompress/decrypt step without this for comcrypted messages.
err = tcr.ReadPayload(buffer, Service.Config.CompressionConfig, Service.Config.EncryptionConfig)
if err != nil {
	// I probably have a bug.
}

myStruct := &MyStruct{}
err = json.Unmarshal(buffer.Bytes(), myStruct) // unmarshal as actual type!
if err != nil {
	// You probably have a bug!
}


Owner
Tristan (HouseCat) Hyams
I like math.
Tristan (HouseCat) Hyams
Comments
  • PublishWithConfirmation infinte loop

    PublishWithConfirmation infinte loop

    Hi,

    I use PublishWithConfirmation function but if rabbit return an error (confirmation.Ack = false) the function enter into an infinite loop because there is a "goto Publish" statement that reset the timeout and re-publish a message every times.

    It's correct? or It's a bug? Can you manage attempts?

    thanks

  • Publish Confirmation Timeout

    Publish Confirmation Timeout

    config:

    Seasoning:
        EncryptionConfig:
          Enabled: false
          Type: aes
          TimeConsideration: 1
          Threads: 2
        CompressionConfig:
          Enabled: false
          Type: gzip
        PoolConfig:
          URI: amqps://user:[email protected]/virtualhost
          Heartbeat: 1800
          ConnectionTimeout: 5
          SleepOnErrorInterval: 1
          MaxConnectionCount: 10
          MaxCacheChannelCount: 10
        ConsumerConfigs:
          order_event_sync_ib:
            ConsumerName: order.event.sync.ib
            Enabled: true
            QueueName: order.event.sync.ib
            AutoAck: true
            Exclusive: true
            NoWait: true
            SleepOnErrorInterval: 1
            SleepOnErrorInterval: 1
        PublisherConfig:
          AutoAck: true
          SleepOnErrorInterval: 1
          SleepOnErrorInterval: 1
          PublishTimeOutInterval: 5
          MaxRetryCount: 5
      Topology:
        Exchanges:
        - Name: request.fix.itiviti
          Type: "fanout"
          PassiveDeclare: false
          Durable: true
          AutoDelete: false
          InternalOnly: false
          NoWait: true
        Queues:
        - Name: egress.itiviti
          PassiveDeclare: false
          Durable: true
          AutoDelete: false
          Exclusive: false
          NoWait: true
        QueueBindings:
        - QueueName: egress.itiviti
          ExchangeName: request.fix.itiviti
          NoWait: true
          NoWait: true
    

    code:

    func NewRabbitMQProvider(cfg *RabbitMQConfig, applicationName string) (*RabbitMQProvider, error) {
    	cp, err := tcr.NewConnectionPool(cfg.Seasoning.PoolConfig)
    	if err != nil {
    		return nil, util.WrapError(err)
    	}
    
    	t := tcr.NewTopologer(cp)
    	if err != nil {
    		return nil, util.WrapError(err)
    	}
    	err = t.BuildTopology(&cfg.Topology, false)
    	if err != nil {
    		return nil, util.WrapError(err)
    	}
    
    	return &RabbitMQProvider{
    		seasoning: cfg.Seasoning,
    		connPool:  cp,
    	}, nil
    }
    
    func (p *RabbitMQProvider) Publish(letter *tcr.Letter, skipReceipt bool, publishReceiptHandler func(*tcr.PublishReceipt)) {
    	publisher := p.ProvidePublisher()
    	p.handlerChan <- publishReceiptIDHandler{
    		handler:  publishReceiptHandler,
    		letterID: letter.LetterID.String(),
    	}
    	publisher.PublishWithConfirmationTransient(letter, 10)
    }
    
    letter := &tcr.Letter{
    		LetterID: id,
    		Body:     []byte(nos.ToMessage().String()),
    		Envelope: &tcr.Envelope{
    			Exchange:     "request.fix.itiviti",
    			RoutingKey:   "egress.itiviti",
    			ContentType:  "text/plain",
    			Mandatory:    true, // put this message on at least one queue, if not, send it back
    			Immediate:    true, // send immediately or not at all
    			Priority:     0,    // not relevant
    			DeliveryMode: 1,    // do not store messages to disc to resend in case of server crash, storing in disc affects latency and would cause messages to be asyncronously sent
    		},
    	}
    
    	b.rabbitMQProvider.Publish(letter, false, func(r *tcr.PublishReceipt) {
    		if r.Success {
    			fmt.Println(fmt.Sprintf("letter %s published successfully", letter.LetterID.String()))
    		} else {
    			fmt.Println(fmt.Sprintf("letter %s errored on publish with: %s", letter.LetterID.String(), r.Error.Error()))
    		}
    	})
    

    seeing the following error message:

    letter 3183b158-2a7e-11ec-a460-acde48001122 errored on publish with: publish confirmation for LetterID: 3183b158-2a7e-11ec-a460-acde48001122 wasn't received in a timely manner (10ms) - recommend retry/requeue

  • How To Publish A String Message (i.e. how to go from a string into a tcr.Letter)

    How To Publish A String Message (i.e. how to go from a string into a tcr.Letter)

    I've been looking for documentation regarding letters. I can't find anything except the part that creates a mock letter. If i have a message in the form of a string, how do I turn that into a Letter that is publishable?

    Thanks in advance

  • CPU usage issue of the Rabbit Service

    CPU usage issue of the Rabbit Service

    The Rabbit Service start the auto publishing by default. The auto publishing also depends on the SleepOnIdle number to setup the loop. If the SleepOnIdle is very small, the CPU usage will be really high. Suggest to use channel to feed the auto publish rather than the for loop.

  • New functions to expose Errors from Connection Pool and LetterId from RabbitService

    New functions to expose Errors from Connection Pool and LetterId from RabbitService

    Hi, this is a great RabbitMQ library, we are looking at deploying it to production.

    Meanwhile, we have a scenario which requires to extend your library to support synchronized publish with confirmation. We plan to do the extension at our side first. However, we need 2 additional public functions from Connection Pool and Rabbit Service.

    • ConnectionPool.Errors() to expose the error raised internally to support logging and tracing
    • RabbitService.GetNewLetterID() to expose the letter id counter for external extension methods but also keep the letter id logic within the RabbitService to keep the id synced all the time.
  • How to Ensure Publish

    How to Ensure Publish

    Hello,

    Can you please explain me how to be sure that a message published to an exchange is effectively delivered, as far I can understand looking at your code, there is no implementation of Channel.NotifyReturn listener.

    I'm having some trouble for example in my case I'm trying to publish to a non existent exchange but also if I' ve subscribed to all notifications chan (service, channel) I'm not getting back any error from the library, but the server effectively and as per protocol raise the error.

    Maybe I'm missing something, please can You help me understanding better this very nice library

    Thanks in advance

  • How to use Topologer

    How to use Topologer

    Is the Topologer similar to a database migration where it will create the queues/exchanges if they do not exist? How is one meant to use the Topologer, as part of the application or as part of a migration? I wouldn't want to put topology changes into my application code, I'd rather run a migration of sorts that updates the RabbitMQ cluster when I am migrating my application(s) to a new version. Is there any way to use the Topologer as part of a migration other than compiling a little app to do just that? Thanks.

  • how can i push message with priotity field?

    how can i push message with priotity field?

    Hi,

    How can I set a priority of message (https://www.rabbitmq.com/priority.html)?

    Can you add "priotiry" field into Envelope struct of letter.go file?

    Thanks

  • Make property Started public so it can be used to check if the Rabbit…

    Make property Started public so it can be used to check if the Rabbit…

    Make property Started public so it can be used to check if the Rabbit consumer has started in the consuming application or library that is using turbocookedrabbit.

  • Error when i try to use library version >= 2.X.X

    Error when i try to use library version >= 2.X.X

    When I try to use a library version v2.0.6 but I get this error: require github.com/houseofcat/turbocookedrabbit: version "v2.0.6" invalid: module contains a go.mod file, so major version must be compatible: should be v0 or v1, not v2

    I use go 1.13, how can i use the last version of library?...

    thanks in advance

  • Can you return Header in Message struct?

    Can you return Header in Message struct?

    Hello,

    Great library!!! I need to read the header of message that i received in a queue(for dead letter exchange into the message header rabbitmq write some util information).

    Can you add a message Header in models.Message struct? example : type Message struct { IsAckable bool Headers amqp.Table Body []byte deliveryTag uint64 amqpChan *amqp.Channel }

    Thanks in advance.

  • support cluster

    support cluster

    I used https://github.com/sirius1024/go-amqp-reconnect to reconnect and connect to a cluster of rabbitmq.

    Would it be possible to support this feature?

    https://github.com/sirius1024/go-amqp-reconnect/blob/master/rabbitmq/rabbitmq.go#L118

🚀 Golang, Go Fiber, RabbitMQ, MongoDB, Docker, Kubernetes, GitHub Actions and Digital Ocean
🚀 Golang, Go Fiber, RabbitMQ, MongoDB, Docker, Kubernetes, GitHub Actions and Digital Ocean

Bookings Solução de cadastro de usuários e reservas. Tecnologias Utilizadas Golang MongoDB RabbitMQ Github Actions Docker Hub Docker Kubernetes Digita

Feb 18, 2022
Golang AMQP wrapper for RabbitMQ with better API

go-rabbitmq Golang AMQP wrapper for RabbitMQ with better API Table of Contents Background Features Usage Installation Connect to RabbitMQ Declare Queu

Dec 21, 2022
A high-level RabbitMQ driver for Golang.

grabbitmq A high-level RabbitMQ driver for Golang. Import in your project: go get github.com/shaswata56/grabbitmq Usage Demo: package main import (

Aug 2, 2022
golang amqp rabbitmq produce consume

Step 1: Container Run Container docker run -itp 9001:9001 --name go_temp -v /usr/local/project/temp/go_amqp/:/home/ -d golang:1.16.6 Enter Container

Nov 26, 2021
Declare AMQP entities like queues, producers, and consumers in a declarative way. Can be used to work with RabbitMQ.

About This package provides an ability to encapsulate creation and configuration of RabbitMQ([AMQP])(https://www.amqp.org) entities like queues, excha

Dec 28, 2022
:incoming_envelope: A fast Message/Event Hub using publish/subscribe pattern with support for topics like* rabbitMQ exchanges for Go applications

Hub ?? A fast enough Event Hub for go applications using publish/subscribe with support patterns on topics like rabbitMQ exchanges. Table of Contents

Dec 17, 2022
Abstraction layer for simple rabbitMQ connection, messaging and administration
Abstraction layer for simple rabbitMQ connection, messaging and administration

Jazz Abstraction layer for quick and simple rabbitMQ connection, messaging and administration. Inspired by Jazz Jackrabbit and his eternal hatred towa

Dec 12, 2022
RabbitMQ wire tap and swiss army knife
RabbitMQ wire tap and swiss army knife

rabtap - RabbitMQ wire tap Swiss army knife for RabbitMQ. Tap/Pub/Sub messages, create/delete/bind queues and exchanges, inspect broker. Contents Feat

Dec 28, 2022
RabbitMQ Reconnection client

rmqconn RabbitMQ Reconnection for Golang Wrapper over amqp.Connection and amqp.Dial. Allowing to do a reconnection when the connection is broken befor

Sep 27, 2022
An easy-to-use CLI client for RabbitMQ.

buneary, pronounced bun-ear-y, is an easy-to-use RabbitMQ command line client for managing exchanges, managing queues and publishing messages to exchanges.

Sep 3, 2022
An AMQP 0-9-1 Go client maintained by the RabbitMQ team. Originally by @streadway: `streadway/amqp`

Go RabbitMQ Client Library This is a Go AMQP 0.9.1 client maintained by the RabbitMQ core team. It was originally developed by Sean Treadway. Differen

Jan 1, 2023
High level manegment for rabbitmq.

High level manegment for rabbitmq. Features Simple configuration bootstrap. Gracefully shutting down. Consume messages in parallel specifying a number

Sep 24, 2022
Testing message queues with RabbitMQ

Rabbit-MessageQueue Just a repository of RabbitMQ simple usage for queueing messages. You can use this as a sender or a receiver. More information is

Mar 10, 2022
A RabbitMQ connection pool write in pure go

A RabbitMQ connection pool write in pure go

Oct 8, 2021
App for test hypothesis about API for rabbitmq

REST API для работы с RabbitMQ Приложение для работы с брокером сообщений RabbitMQ через REST API. Основная мысль - что одиночные сообщения отправлять

Nov 12, 2021
RPC realization with use RabbitMQ

RPC examples for RabbitMQ Description RPC example using RabbitMQ. In this example use the 6 tutorial RabbitMQ with some changes: exchange between clie

Nov 24, 2021
Go service for CRUD note, log tracking by RabbitMQ
Go service for CRUD note, log tracking by RabbitMQ

Service for CRUD note, log tracking by RabbitMQ Architecture Three components: Service note CRUD, use a DB RabbitMQ for saving messages pushed from se

Nov 29, 2021
Tool for collect statistics from AMQP (RabbitMQ) broker. Good for cloud native service calculation.

amqp-statisticator Tool for collect statistics around your AMQP broker. For example RabbitMQ expose a lot information trought the management API, but

Dec 13, 2021
Simple go app with RabbitMQ in docker-compose

Docker-compose stand with RabbitMQ and sender/reciever. About First app is a producer that sends messages (1 per second / while-true loop), the second

Jan 5, 2022