Go client for AMQP 0.9.1

Build Status GoDoc

Go RabbitMQ Client Library

This is an AMQP 0.9.1 client with RabbitMQ extensions in Go.

Project Maturity

This project has been used in production systems for many years. It is reasonably mature and feature complete, and as of November 2016 has a team of maintainers.

Future API changes are unlikely but possible. They will be discussed on Github issues along with any bugs or enhancements.

Supported Go Versions

This library supports two most recent Go release series, currently 1.10 and 1.11.

Supported RabbitMQ Versions

This project supports RabbitMQ versions starting with 2.0 but primarily tested against reasonably recent 3.x releases. Some features and behaviours may be server version-specific.

Goals

Provide a functional interface that closely represents the AMQP 0.9.1 model targeted to RabbitMQ as a server. This includes the minimum necessary to interact the semantics of the protocol.

Non-goals

Things not intended to be supported.

  • Auto reconnect and re-synchronization of client and server topologies.
    • Reconnection would require understanding the error paths when the topology cannot be declared on reconnect. This would require a new set of types and code paths that are best suited at the call-site of this package. AMQP has a dynamic topology that needs all peers to agree. If this doesn't happen, the behavior is undefined. Instead of producing a possible interface with undefined behavior, this package is designed to be simple for the caller to implement the necessary connection-time topology declaration so that reconnection is trivial and encapsulated in the caller's application code.
  • AMQP Protocol negotiation for forward or backward compatibility.
    • 0.9.1 is stable and widely deployed. Versions 0.10 and 1.0 are divergent specifications that change the semantics and wire format of the protocol. We will accept patches for other protocol support but have no plans for implementation ourselves.
  • Anything other than PLAIN and EXTERNAL authentication mechanisms.
    • Keeping the mechanisms interface modular makes it possible to extend outside of this package. If other mechanisms prove to be popular, then we would accept patches to include them in this package.

Usage

See the 'examples' subdirectory for simple producers and consumers executables. If you have a use-case in mind which isn't well-represented by the examples, please file an issue.

Documentation

Use Godoc documentation for reference and usage.

RabbitMQ tutorials in Go are also available.

Contributing

Pull requests are very much welcomed. Create your pull request on a non-master branch, make sure a test or example is included that covers your change and your commits represent coherent changes that include a reason for the change.

To run the integration tests, make sure you have RabbitMQ running on any host, export the environment variable AMQP_URL=amqp://host/ and run go test -tags integration. TravisCI will also run the integration tests.

Thanks to the community of contributors.

External packages

License

BSD 2 clause - see LICENSE for more details.

Comments
  • Memory leak when closing channels

    Memory leak when closing channels

    This might be related to #149, although this issue occurs with autoAck disabled and Nack'ing messages, while 149 mentions autoAck being enabled.

    Steps to reproduce:

    1. Create a queue in RabbitMQ with ~50 messages in it, make each message a kb or so large to demonstrate it easier
    2. Open a new connection
    3. Open a new channel
    4. Set Qos on channel to 50 prefetchCount, 0 prefetchSize, global false
    5. Create a consumer with auto ack false
    6. Consume a single message
    7. Nack the message with multiple false, requeue true
    8. Optionally Cancel the consumer (if this is done or not doesn't seem to change anything)
    9. Close the channel
    10. Loop from step 3

    Each time this loops, it leaks a non-trivial amount of memory (up to a gig in few minutes):

    (pprof) top
    412.77MB of 420.27MB total (98.21%)
    Dropped 109 nodes (cum <= 2.10MB)
          flat  flat%   sum%        cum   cum%
      412.77MB 98.21% 98.21%   413.77MB 98.45%  github.com/streadway/amqp.(*Channel).recvContent
             0     0% 98.21%   413.77MB 98.45%  github.com/streadway/amqp.(*Connection).demux
             0     0% 98.21%   413.77MB 98.45%  github.com/streadway/amqp.(*Connection).dispatchN
             0     0% 98.21%   418.27MB 99.52%  github.com/streadway/amqp.(*Connection).reader
             0     0% 98.21%     4.50MB  1.07%  github.com/streadway/amqp.(*reader).ReadFrame
             0     0% 98.21%     3.50MB  0.83%  github.com/streadway/amqp.(*reader).parseHeaderFrame
             0     0% 98.21%     3.50MB  0.83%  github.com/streadway/amqp.readTable
             0     0% 98.21%   419.77MB 99.88%  runtime.goexit
    
  • Channel.Publish blocks indefinitely

    Channel.Publish blocks indefinitely

    Channel.Publish sometimes blocks indefinitely. The following code is a test case to reproduce this. It reproduces most of time I run it. Adding a sleep between inside the for loop does not help.

    package main
    
    import (
    	"log"
    
    	"github.com/streadway/amqp"
    )
    
    func main() {
    
    	const queueName = "q1"
    
    	if conn, stdErr := amqp.Dial("amqp://guest:guest@localhost:5672/"); stdErr != nil {
    		log.Fatalln(stdErr)
    	} else if channel, stdErr := conn.Channel(); stdErr != nil {
    		log.Fatalln(stdErr)
    	} else if stdErr := channel.Confirm(false); stdErr != nil {
    		log.Fatalln(stdErr)
    	} else if _, stdErr := channel.QueueDeclare(
    		queueName, //queue
    		true,      //durable
    		false,     //autodelete
    		false,     //exclusive
    		false,     //nowait
    		nil,       //arguments
    	); stdErr != nil {
    		log.Fatalln(stdErr)
    	} else {
    		confirmationChan := channel.NotifyPublish(make(chan amqp.Confirmation))
    
    		for i := 0; i < 10000; i++ {
    			log.Println("publish ", i)
    
    			if stdErr := channel.Publish(
    				"",        // exchange
    				queueName, // routing key
    				false,     // mandatory
    				false,     // immediate
    				amqp.Publishing{
    					Body: []byte{},
    				}); stdErr != nil {
    				log.Fatalln(stdErr)
    			} else {
    				log.Println("pre confirmation check")
    				confirmation := <-confirmationChan
    				log.Println("post confirmation check")
    				if !confirmation.Ack {
    					log.Fatalln("confirmation not ack")
    				}
    			}
    			// time.Sleep(time.Millisecond * 1000)
    		}
    	}
    }
    
  • go modules support

    go modules support

    Would you be interested in supporting Go Modules. This would entail:

    1. Generating a go.mod and go.sum in the root of this repo.
    2. Adopting semver and releasing versions (not explicitly required, but highly recommended)
    3. Updating test running for modules support.

    I'm more than happy to open a PR with these changes, but would like to discuss them here before doing so!

  • Channel is not thread safe

    Channel is not thread safe

    Channel.shutdown(*error) function sets me.send function using lock. However Ack, Nack, Reject functions are not using the same lock.

    Is the Channel supposed to be thread safe? Simple fix would be just add locking to those methods missing it, but I am not sure if there is something else behind this.

  • Project activity

    Project activity

    Hey @streadway, looks like you are not actively maintaining this project. We are using this project at the moment, and some of the pull request fixes some issues we are having in our setup.

    I think this is the most used rabbitmq library for go at the moment (correct me if I'm wrong here), are you looking for maintainers?, happy to help

    Thanks!

  • Support connection.blocked (coming in RabbitMQ 3.2)

    Support connection.blocked (coming in RabbitMQ 3.2)

    This is not an issue but rather a heads up.

    There is a new feature in RabbitMQ nightlies that requires client support. Java, .NET, Erlang and Ruby (Bunny, amqp gem) already support it their development branches, with librabbitmq-c and 2 Python clients having some progress on it.

    It would be nice if the Go client had support for it before 3.2 comes out (which won't happen for a few more months). I'd be happy to help with it but unfortunately not in the near future.

  • Guard against racy channel allocation and connection close

    Guard against racy channel allocation and connection close

    Proposes a solution to #251.

    As the only write to c.channels is in c.allocateChannel() a simple c.isClosed() guard is sufficient to protect against concurrent writes - multiple readers is OK and does not require a lock.

    Because c.allocateChannel() holds the connection mutex for the duration of the call and c.shutdown() acquires it after c.closed has been set, I don't believe there is any racy interaction using c.closed.

    The test case is from the issue reported by @jmalloc - thanks it helped massively!

    The changes to channel.go is a separate issue (and separate commit - I can remove it if you want) that eliminates a race between channel.shutdown() and channel.call(). It looks like the original code intended to avoid this race, but didn't get complete coverage - it became noticeable using -race after fixing the allocation race, so I included it in this PR.

    Running AMQP_URL="amqp://guest:guest@localhost:5672/" go test ./... -tags=integration -race no longer produces any race warnings for me, though I still get:

    --- FAIL: TestExchangeDeclarePrecondition (34.66s)
    	integration_test.go:1759: dial integration server: Exception (501) Reason: "read tcp 127.0.0.1:58857->127.0.0.1:5672: i/o timeout"
    

    I've always had this test case fail during integration tests and assume it's unrelated.

  • Connection issue: Exception (501) Reason:

    Connection issue: Exception (501) Reason: "EOF" exit status 1

    I am running RabbitMQ 3.6.9, Erlang 19.3 in a docker container and trying to open the connection using conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/").

    I am able to connect to RabbitMQ directly using http://localhost:5672. Am I missing something?

    Thanks Arul

  • Channel.Publish's blocks upon disconnection during high volume publishes

    Channel.Publish's blocks upon disconnection during high volume publishes

    Firstly, let me say I'm not entirely sure of the behaviour I am observing (yet). I will continue to study it and update here.

    It seems as though if you try to Publish to a Channel before shutdown has a chance to run, it will block forever.

    The easiest way to observe this is do a high volume of Publishes to a channel and shut RabbitMQ down part way through.

    I have amended simple_producer.go to send infinite publishes, which you may use to reproduce.

    To reproduce:

    1. Spin up RabbitMQ
    2. go run simple_infinite_producer.go -reliable=false
    3. Shut down RabbitMQ after a second or two.

    You may observe that the application will block when the last message is something like: 2013/10/31 17:36:36 pre-publish test-exchange 1383201396132042002 and nothing proceeds it.

    From the little debugging I have done so far, it seems as though this is happening because send is being called before the shutdown logic has a chance to replace the send function for another.

  • Better reconnect support

    Better reconnect support

    https://github.com/streadway/amqp/blob/master/reconnect_test.go

    Looking at how you are doing reconnect it causes a slight issue if you are using go routines. In that case you have to start managing all your goroutines with something like http://stackoverflow.com/a/6807784/2049333 which can be a real mess. I just wanted to open up a discussion to see if this could be improved or if that is not possible.

    I was thinking something like not closing the channel when the server stops and allow for a call like TryReconnect attached to amqp.Connection that would try and reopen the connection with the current connection information.

    I know overall you don't want to support reconnect but doing so is currently a bit of a mess in my opinion.

  • Publish method does not return

    Publish method does not return "confirms.Published" counter(deliveryTag)

    I have a listener registered through NotifyPublish and it is asynchronously getting confirmation and if ack=false then I republish the message or act upon it, but I get delivery tag from this listener channel and I do not know which payload this delivery tag is of.

    While looks like we can return the https://github.com/streadway/amqp/blob/v1.0.0/confirms.go#L36 which looks like the delivery tag for the given channel, but we are ignoring it in Publish method https://github.com/streadway/amqp/blob/v1.0.0/channel.go#L1360.

  • tls problem

    tls problem

    Hello.

    https://go.dev/play/p/cuOKquzMu7K 2022/10/11 19:50:50 Failed to connect to RabbitMQ: x509: certificate signed by unknown authority exit status 1

    Please help. tell me how to connect a certificate in this code so that you can contact the rabbit server with an example?

  • Coverity Scan - Identical code for different branches

    Coverity Scan - Identical code for different branches

    File: https://github.com/streadway/amqp/blob/master/read.go Details:

    1. The condition is redundant In github.​com/streadway/amqp.​readFloat32(io.​Reader): The same code is executed regardless of the condition (CWE-398)
    2. The condition is redundant In github.​com/streadway/amqp.​readFloat64(io.​Reader): The same code is executed regardless of the condition (CWE-398) Screenshots:
    image 2. image
  • Documentation contradiction in regards to flow

    Documentation contradiction in regards to flow

    // Listeners for active=true flow control.  When true is sent to a listener,
    // publishing should pause until false is sent to listeners.
    flows []chan bool
    
    NotifyFlow registers a listener for basic.flow methods sent by the server.
    When `false` is sent on one of the listener channels, all publishers should
    pause until a `true` is sent.
    

    One claims that true means that clients should pause. The other says clients should pause when false is sent. Which is it?

  • RabbitMQ has open connections(Too many File descriptors piling up on broker) for publish

    RabbitMQ has open connections(Too many File descriptors piling up on broker) for publish

    Hi, We're using amqp library to publish messages to a Queue, We've encountered 2 issues here

    1. Same connection cannot be reused if we try to publish via goroutines. An error 504 connection/channel closed is returned.
    2. As a hotfix, we're creating new connections for each publish, but here somehow the connection destructor is not working and RabbitMQ node still has the connection channel opened. As a result too many file descriptors are being piled and node goes down.

    Would appreciate any help/pointers

  • consume stops receiving messages after reconnecting

    consume stops receiving messages after reconnecting

    Hello, I've been working on a project that should have a reconnection feature, in case a consumer were to disconnect due to being idle for a while, with a similar approach to the one used here.

    However, I've noticed that after reconnecting after a forced close from the UI, the channel stops receiving messages all together. I work with several workers at once, so this one is just rendered useless. However, the connection is reestablished.

    Now, I was wondering if this is an issue on my part, or if there is something that I am overlooking? I am more than happy to share my code if needed, but I want to understand why this could be happening in the first place.

Cross-platform client for PostgreSQL databases

pgweb Web-based PostgreSQL database browser written in Go. Overview Pgweb is a web-based database browser for PostgreSQL, written in Go and works on O

Dec 30, 2022
Go client for Redis

Redigo Redigo is a Go client for the Redis database. Features A Print-like API with support for all Redis commands. Pipelining, including pipelined tr

Dec 29, 2022
Type-safe Redis client for Golang
Type-safe Redis client for Golang

Redis client for Golang Join Discord to ask questions. Documentation Reference Examples RealWorld example app Ecosystem Redis Mock. Distributed Locks.

Dec 29, 2022
Interactive client for PostgreSQL and MySQL
Interactive client for PostgreSQL and MySQL

dblab Interactive client for PostgreSQL and MySQL. Overview dblab is a fast and lightweight interactive terminal based UI application for PostgreSQL a

Jan 8, 2023
Cross-platform client for PostgreSQL databases

pgweb Web-based PostgreSQL database browser written in Go. Overview Pgweb is a web-based database browser for PostgreSQL, written in Go and works on O

Dec 30, 2022
[mirror] the database client and tools for the Go vulnerability database

The Go Vulnerability Database golang.org/x/vulndb This repository is a prototype of the Go Vulnerability Database. Read the Draft Design. Neither the

Dec 29, 2022
REST based Redis client built on top of Upstash REST API

An HTTP/REST based Redis client built on top of Upstash REST API.

Jul 31, 2022
Migration tool for ksqlDB, which uses the ksqldb-go client.
Migration tool for ksqlDB, which uses the ksqldb-go client.

ksqldb-migrate Migration tool for ksqlDB, which uses the ksqldb-go client.

Nov 15, 2022
A client for TiKV

client-tikv ./tikv-client --pd 127.0.0.1:2379,127.0.0.2:2379,127.0.0.3:2379 usage You can query the value directly according to the key. tikv> select

Apr 16, 2022
Client to import measurements to timestream databases.

Timestream DB Client Client to import measurements to timestream databases. Supported Databases/Services AWS Timestream AWS Timestream Run NewTimestre

Jan 11, 2022
Go-clickhouse - ClickHouse client for Go

ClickHouse client for Go 1.18+ This client uses native protocol to communicate w

Jan 9, 2023
Go client for AMQP 0.9.1

Go RabbitMQ Client Library This is an AMQP 0.9.1 client with RabbitMQ extensions in Go. Project Maturity This project has been used in production syst

Jan 6, 2023
Declare AMQP entities like queues, producers, and consumers in a declarative way. Can be used to work with RabbitMQ.

About This package provides an ability to encapsulate creation and configuration of RabbitMQ([AMQP])(https://www.amqp.org) entities like queues, excha

Dec 28, 2022
A tiny wrapper over amqp exchanges and queues 🚌 ✨

Rabbus ?? ✨ A tiny wrapper over amqp exchanges and queues. In memory retries with exponential backoff for sending messages. Protect producer calls wit

Dec 18, 2022
A wrapper of streadway/amqp that provides reconnection logic and sane defaults

go-rabbitmq Wrapper of streadway/amqp that provides reconnection logic and sane defaults. Hit the project with a star if you find it useful ⭐ Supporte

Dec 28, 2022
Golang AMQP wrapper for RabbitMQ with better API

go-rabbitmq Golang AMQP wrapper for RabbitMQ with better API Table of Contents Background Features Usage Installation Connect to RabbitMQ Declare Queu

Dec 21, 2022
command-line tool to publish, subscribe, and process messages for AMQP 0.9.1 compliant message brokers

Bunny A BSD licenced, go-powered CLI tool for publishing and subscribing to RabbitMQ

Sep 11, 2021
Go library to build event driven applications easily using AMQP as a backend.

event Go library to build event driven applications easily using AMQP as a backend. Publisher package main import ( "github.com/creekorful/event" "

Dec 5, 2021
golang amqp rabbitmq produce consume

Step 1: Container Run Container docker run -itp 9001:9001 --name go_temp -v /usr/local/project/temp/go_amqp/:/home/ -d golang:1.16.6 Enter Container

Nov 26, 2021
Tool for collect statistics from AMQP (RabbitMQ) broker. Good for cloud native service calculation.

amqp-statisticator Tool for collect statistics around your AMQP broker. For example RabbitMQ expose a lot information trought the management API, but

Dec 13, 2021