A wrapper of streadway/amqp that provides reconnection logic and sane defaults

go-rabbitmq

Wrapper of streadway/amqp that provides reconnection logic and sane defaults. Hit the project with a star if you find it useful

Supported by Qvault

Deploy

Motivation

Streadway's AMQP library is currently the most robust and well-supported Go client I'm aware of. It's a fantastic option and I recommend starting there and seeing if it fulfills your needs. Their project has made an effort to stay within the scope of the AMQP protocol, as such, no reconnection logic and few ease-of-use abstractions are provided.

The goal with go-rabbitmq is to still provide most all of the nitty-gritty functionality of AMQP, but to make it easier to work with via a higher-level API. Particularly:

  • Automatic reconnection
  • Multithreaded consumers via a handler function
  • Reasonable defaults
  • Flow control handling

⚙️ Installation

Outside of a Go module:

go get github.com/wagslane/go-rabbitmq

🚀 Quick Start Consumer

Default options

consumer, err := rabbitmq.NewConsumer("amqp://user:pass@localhost")
if err != nil {
    log.Fatal(err)
}
err = consumer.StartConsuming(
    func(d rabbitmq.Delivery) bool {
        log.Printf("consumed: %v", string(d.Body))
        // true to ACK, false to NACK
        return true
    },
    "my_queue",
    []string{"routing_key1", "routing_key2"}
)
if err != nil {
    log.Fatal(err)
}

With options

consumer, err := rabbitmq.NewConsumer(
    "amqp://user:pass@localhost",
    // can pass nothing for no logging
    func(opts *rabbitmq.ConsumerOptions) {
        opts.Logging = true
    },
)
if err != nil {
    log.Fatal(err)
}
err = consumer.StartConsuming(
    func(d rabbitmq.Delivery) bool {
        log.Printf("consumed: %v", string(d.Body))
        // true to ACK, false to NACK
        return true
    },
    "my_queue",
    []string{"routing_key1", "routing_key2"},
    // can pass nothing here for defaults
    func(opts *rabbitmq.ConsumeOptions) {
        opts.QueueDurable = true
        opts.Concurrency = 10
        opts.QOSPrefetch = 100
    },
)
if err != nil {
    log.Fatal(err)
}

🚀 Quick Start Publisher

Default options

publisher, returns, err := rabbitmq.NewPublisher("amqp://user:pass@localhost")
if err != nil {
    log.Fatal(err)
}
err = publisher.Publish([]byte("hello, world"), []string{"routing_key"})
if err != nil {
    log.Fatal(err)
}

With options

publisher, returns, err := rabbitmq.NewPublisher(
    "amqp://user:pass@localhost",
    // can pass nothing for no logging
    func(opts *rabbitmq.PublisherOptions) {
        opts.Logging = true
    },
)
if err != nil {
    log.Fatal(err)
}
err = publisher.Publish(
    []byte("hello, world"),
    []string{"routing_key"},
    // leave blank for defaults
    func(opts *rabbitmq.PublishOptions) {
        opts.DeliveryMode = rabbitmq.Persistent
        opts.Mandatory = true
        opts.ContentType = "application/json"
    },
)
if err != nil {
    log.Fatal(err)
}

go func() {
    for r := range returns {
        log.Printf("message returned from server: %s", string(r.Body))
    }
}()

💬 Contact

Twitter Follow

Submit an issue (above in the issues tab)

Transient Dependencies

My goal is to keep dependencies limited to 1, github.com/streadway/amqp.

👏 Contributing

I love help! Contribute by forking the repo and opening pull requests. Please ensure that your code passes the existing tests and linting, and write tests to test your changes if applicable.

All pull requests should be submitted to the main branch.

Owner
Lane Wagner
I like Go and Rust, and I tolerate JavaScript and Python. @wagslane on twitter.
Lane Wagner
Comments
  • Consumer won't reconnect

    Consumer won't reconnect

    Hi,

    Somehow when network got disconnected for a minutes or so and then network up again, I got a log

    gorabbit: rabbit consumer goroutine closed
    

    After that, consumer stops receiving new message. Why wont it reconnect?

    I'm using version 0.6.2, and my code is (more or less) like this

    rabbitmqConsumer, err := rabbitmq.NewConsumer(config.constructURL(), amqp.Config{}, rabbitmq.WithConsumerOptionsLogging)
    ...
    rabbitmqConsumer.StartConsuming(
      func(message rabbitmq.Delivery) bool {
        return true
      },
      "",
      []string{""},
      func(options *rabbitmq.ConsumeOptions) {
        options.QueueExclusive = true
        options.ConsumerExclusive = true
        options.QueueDurable = true
        options.BindingExchange = &rabbitmq.BindingExchangeOptions{
          Name:    "my_rabbitmq_topic",
          Kind:    "fanout",
          Durable: true,
        }
      },
    )
    
  • Each consumer/ producer has it's own connection?

    Each consumer/ producer has it's own connection?

    Hello, from my understanding of the code, each consumer/ publisher will have its own connection, which contradicts the recommendation of 1 connection per application. In theory, every consumer/ producer should use the same connection but have their own channel? Is this something that's intentional by design, or is there room for improvement here?

  • Why is Exchange configuration driven by Consumers instead of Publishers

    Why is Exchange configuration driven by Consumers instead of Publishers

    This library seems to have Consumers drive the configuration of the Exchanges rather than the Publishers. For example, Consumers determine if an exchange should auto delete, be durable, or the kind (e.g. topic).

    Sample:

    	err = consumer.StartConsuming(
    			func(d rmq.Delivery) rmq.Action {
    				log.Printf("consumed: %v", string(d.Body))
    				// true to ACK, false to NACK
    				return rmq.Ack
    			},
    			QUEUE,
    			[]string{TOPIC},
    			rmq.WithConsumeOptionsBindingExchangeAutoDelete,
    			rmq.WithConsumeOptionsBindingExchangeDurable,
    			rmq.WithConsumeOptionsBindingExchangeName(""),
    			rmq.WithConsumeOptionsBindingExchangeKind("topic"),
    			rmq.WithConsumeOptionsQueueArgs(queueArgs),
    

    Isn't this a bit backwards since Publishers are the ones who write to Exchanges and should therefore configure its settings, and then Consumers simply configure the queues that they use? Was this design selected out of personal preference or was there a technical reason for it?

  • Add log level to Printf method of the Logger interface

    Add log level to Printf method of the Logger interface

    Currently it is not possible to define which log-level message has: info or warning or error. Need to extend Printf method with logLevel variable. I can suggest PR.

    It's a breaking change, but don't really see how such feature can be added without breaking change.

  • runs consumer reconnection by any rabbit error

    runs consumer reconnection by any rabbit error

    I think that may be good idea to run consumer reconnection by any error because there is situations when internet connection to rabbit service breaks for a while and after that consumer not working anymore. It might be worth adding some field for set up this behavior to the consumer settings fields, but I'm not sure.

  • Possible data race

    Possible data race

    After updating to the version I get:

      ==================
      WARNING: DATA RACE
      Write at 0x00c000474220 by goroutine 33:
        github.com/wagslane/go-rabbitmq.(*Publisher).startNotifyFlowHandler()
            go/pkg/mod/github.com/wagslane/[email protected]/publish.go:234 +0x70
    
      Previous read at 0x00c000474220 by goroutine 24:
        github.com/wagslane/go-rabbitmq.NewPublisher()
            go/pkg/mod/github.com/wagslane/[email protected]/publish.go:167 +0x3b3
        git.vonroll-infratec.com/go/meas/pkg/queue.NewPublisher()
            ws/go/meas/pkg/queue/rab_pub.go:48 +0x604
        git.vonroll-infratec.com/go/meas/pkg/queue.(*RabbitSuite).SetupTest()
            ws/go/meas/pkg/queue/rabbit_test.go:43 +0x119
        github.com/stretchr/testify/suite.Run.func1()
            go/pkg/mod/github.com/rzajac/[email protected]/suite/suite.go:148 +0x8b4
        testing.tRunner()
            /usr/local/go/src/testing/testing.go:1193 +0x202
    
      Goroutine 33 (running) created at:
        github.com/wagslane/go-rabbitmq.(*Publisher).startNotifyHandlers()
            go/pkg/mod/github.com/wagslane/[email protected]/publish.go:229 +0x164
        github.com/wagslane/go-rabbitmq.NewPublisher.func1()
            go/pkg/mod/github.com/wagslane/[email protected]/publish.go:160 +0x44
    
      Goroutine 24 (running) created at:
        testing.(*T).Run()
            /usr/local/go/src/testing/testing.go:1238 +0x5d7
        github.com/stretchr/testify/suite.runTests()
            go/pkg/mod/github.com/rzajac/[email protected]/suite/suite.go:203 +0xf7
        github.com/stretchr/testify/suite.Run()
            go/pkg/mod/github.com/rzajac/[email protected]/suite/suite.go:176 +0x944
        git.vonroll-infratec.com/go/meas/pkg/queue.TestRabbit()
            ws/go/meas/pkg/queue/rabbit_test.go:19 +0xa4
        testing.tRunner()
            /usr/local/go/src/testing/testing.go:1193 +0x202
      ==================
    

    in my tests.

  • Make channelManager public

    Make channelManager public

    Hi !

    Thanks for this useful lib! I'm wondering if the channelManager struct could be exposed (renamed to ChannelManager) to be imported. It could also be useful to expose the channelManager struct values of Consumer / Publisher.

    My use case is that I'd like to do manual creation of exchange and queues. I can create a new new AMQP channel but I'd lose the auto reconnect handling of the channelManager.

    Thanks!

  • Allow consumers to disable requeuing of messages

    Allow consumers to disable requeuing of messages

    Currently, the delivery is either acknowledge or not based on the bool return value of a consumer. But it is also automatically requeued. In some cases I do not want the delivery to be requeued if its body contains for example invalid JSON and it needs to be send to for example a dead letter queue for further introspection.

    I can imagine that a quick solution would be to change the return value of the consumer to something like func(d Delivery) (bool, bool) where the second bool indicates if it should be requeued or not. I can easily create a PR for that behaviour but wanted to check it first.

  • Declare process for Publisher | Part 2

    Declare process for Publisher | Part 2

    Split #80 into two parts. This part contains:

    • declaration of queues, exchanges and bindings is also possible if the Publisher is used (#43 )
    • updated examples
  • Implementation for graceful shutdown for consumers

    Implementation for graceful shutdown for consumers

    There seems to be missing case while handling SIGINT, SIGTERM signals in the implementation, if the consumers are running in goroutine. One possible implementation can be stop listening to new messages (closing the channel), and provide deadline context to running goroutines.

    Would like to know if there can more elegant way for handling these case, also open for contributing.

  • How to publish with confirms

    How to publish with confirms

    confirms := make(chan amqp.Confirmation)
    ch.NotifyPublish(confirms)
    go func() {
        for confirm := range confirms {
    	    if confirm.Ack {
    		    // code when messages is confirmed
    		    log.Printf("Confirmed")
    	    } else {
    		    // code when messages is nack-ed
    		    log.Printf("Nacked")
    	    }
        }
    }()
    
    
  • interface for connection struct

    interface for connection struct

    Currently, it is very difficult to write unit tests for the codebase that uses this library. There is no exported interface for the connection struct and it is difficult to mock this.

    Please provide guidelines for writing unit tests.

  • Not reconnecting in all cases

    Not reconnecting in all cases

    We have observed that the module isn't reconnecting in all cases, but still haven't found the specific problem.

    Logs from one issue (sorry for the mixture of formats, only the connection manager had a custom logger applied):

    {"level":"info","ts":"2022-12-16T04:05:23.661704901Z","caller":"worker/worker.go:109","msg":"Handled message"} <-- last message
    2022/12/16 04:59:48 gorabbit ERROR: attempting to reconnect to amqp server after cancel with error: ctag-/bin/worker-1
    2022/12/16 04:59:48 gorabbit INFO: waiting 5s seconds to attempt to reconnect to amqp server
    2022/12/16 04:59:48 gorabbit INFO: rabbit consumer goroutine closed
    2022/12/16 04:59:53 gorabbit WARN: successfully reconnected to amqp server after cancel
    2022/12/16 04:59:53 gorabbit INFO: successful recovery from: ctag-/bin/worker-1
    2022/12/16 04:59:53 gorabbit ERROR: error restarting consumer goroutines after cancel or close: declare queue failed: Exception (404) Reason: "NOT_FOUND - queue 'new-builds' in vhost 'platform-development' process is stopped by supervisor"
    2022/12/16 04:59:53 gorabbit ERROR: attempting to reconnect to amqp server after close with error: Exception (404) Reason: "NOT_FOUND - queue 'new-builds' in vhost 'platform-development' process is stopped by supervisor"
    2022/12/16 04:59:53 gorabbit INFO: waiting 5s seconds to attempt to reconnect to amqp server
    2022/12/16 04:59:58 gorabbit WARN: error closing channel while reconnecting: Exception (504) Reason: "channel/connection is not open"
    2022/12/16 04:59:58 gorabbit WARN: successfully reconnected to amqp server
    2022/12/16 04:59:58 gorabbit INFO: successful recovery from: Exception (404) Reason: "NOT_FOUND - queue 'new-builds' in vhost 'platform-development' process is stopped by supervisor"
    {"level":"error","ts":"2022-12-16T05:00:41.13624477Z","logger":"rabbitmq","caller":"[email protected]/logger.go:25","msg":"attempting to reconnect to amqp server after connection close with error: Exception (501) Reason: \"write tcp 10.0.1.19:49792->10.32.75.145:5672: write: connection reset by peer\"","stacktrace":"XYZ/golib/hrabbitmq.Logger.Errorf\n\t/go/pkg/mod/XYZ/golib/[email protected]/logger.go:25\ngithub.com/wagslane/go-rabbitmq/internal/connectionmanager.(*ConnectionManager).startNotifyClose\n\t/go/pkg/mod/github.com/wagslane/[email protected]/internal/connectionmanager/connection_manager.go:85"}
    {"level":"info","ts":"2022-12-16T05:00:41.136325279Z","logger":"rabbitmq","caller":"[email protected]/logger.go:33","msg":"waiting 5s seconds to attempt to reconnect to amqp server"}
    {"level":"error","ts":"2022-12-16T05:01:16.137948493Z","logger":"rabbitmq","caller":"[email protected]/logger.go:25","msg":"error reconnecting to amqp server: dial tcp: lookup rabbitmq.rabbitmq.svc.cluster.local: i/o timeout","stacktrace":"XYZ/golib/hrabbitmq.Logger.Errorf\n\t/go/pkg/mod/XYZ/golib/[email protected]/logger.go:25\ngithub.com/wagslane/go-rabbitmq/internal/connectionmanager.(*ConnectionManager).reconnectLoop\n\t/go/pkg/mod/github.com/wagslane/[email protected]/internal/connectionmanager/connection_manager.go:115\ngithub.com/wagslane/go-rabbitmq/internal/connectionmanager.(*ConnectionManager).startNotifyClose\n\t/go/pkg/mod/github.com/wagslane/[email protected]/internal/connectionmanager/connection_manager.go:86"}
    {"level":"info","ts":"2022-12-16T05:01:16.138040023Z","logger":"rabbitmq","caller":"[email protected]/logger.go:33","msg":"waiting 5s seconds to attempt to reconnect to amqp server"}
    {"level":"error","ts":"2022-12-16T05:01:51.141472319Z","logger":"rabbitmq","caller":"[email protected]/logger.go:25","msg":"error reconnecting to amqp server: dial tcp: lookup rabbitmq.rabbitmq.svc.cluster.local: i/o timeout","stacktrace":"XYZ/golib/hrabbitmq.Logger.Errorf\n\t/go/pkg/mod/XYZ/golib/[email protected]/logger.go:25\ngithub.com/wagslane/go-rabbitmq/internal/connectionmanager.(*ConnectionManager).reconnectLoop\n\t/go/pkg/mod/github.com/wagslane/[email protected]/internal/connectionmanager/connection_manager.go:115\ngithub.com/wagslane/go-rabbitmq/internal/connectionmanager.(*ConnectionManager).startNotifyClose\n\t/go/pkg/mod/github.com/wagslane/[email protected]/internal/connectionmanager/connection_manager.go:86"}
    {"level":"info","ts":"2022-12-16T05:01:51.141562769Z","logger":"rabbitmq","caller":"[email protected]/logger.go:33","msg":"waiting 5s seconds to attempt to reconnect to amqp server"}
    {"level":"error","ts":"2022-12-16T05:02:26.143127796Z","logger":"rabbitmq","caller":"[email protected]/logger.go:25","msg":"error reconnecting to amqp server: dial tcp: lookup rabbitmq.rabbitmq.svc.cluster.local: i/o timeout","stacktrace":"XYZ/golib/hrabbitmq.Logger.Errorf\n\t/go/pkg/mod/XYZ/golib/[email protected]/logger.go:25\ngithub.com/wagslane/go-rabbitmq/internal/connectionmanager.(*ConnectionManager).reconnectLoop\n\t/go/pkg/mod/github.com/wagslane/[email protected]/internal/connectionmanager/connection_manager.go:115\ngithub.com/wagslane/go-rabbitmq/internal/connectionmanager.(*ConnectionManager).startNotifyClose\n\t/go/pkg/mod/github.com/wagslane/[email protected]/internal/connectionmanager/connection_manager.go:86"}
    {"level":"info","ts":"2022-12-16T05:02:26.143234192Z","logger":"rabbitmq","caller":"[email protected]/logger.go:33","msg":"waiting 5s seconds to attempt to reconnect to amqp server"}
    {"level":"error","ts":"2022-12-16T05:03:01.145103317Z","logger":"rabbitmq","caller":"[email protected]/logger.go:25","msg":"error reconnecting to amqp server: dial tcp: lookup rabbitmq.rabbitmq.svc.cluster.local: i/o timeout","stacktrace":"XYZ/golib/hrabbitmq.Logger.Errorf\n\t/go/pkg/mod/XYZ/golib/[email protected]/logger.go:25\ngithub.com/wagslane/go-rabbitmq/internal/connectionmanager.(*ConnectionManager).reconnectLoop\n\t/go/pkg/mod/github.com/wagslane/[email protected]/internal/connectionmanager/connection_manager.go:115\ngithub.com/wagslane/go-rabbitmq/internal/connectionmanager.(*ConnectionManager).startNotifyClose\n\t/go/pkg/mod/github.com/wagslane/[email protected]/internal/connectionmanager/connection_manager.go:86"}
    {"level":"info","ts":"2022-12-16T05:03:01.145347574Z","logger":"rabbitmq","caller":"[email protected]/logger.go:33","msg":"waiting 5s seconds to attempt to reconnect to amqp server"}
    {"level":"error","ts":"2022-12-16T05:03:36.147081783Z","logger":"rabbitmq","caller":"[email protected]/logger.go:25","msg":"error reconnecting to amqp server: dial tcp: lookup rabbitmq.rabbitmq.svc.cluster.local: i/o timeout","stacktrace":"XYZ/golib/hrabbitmq.Logger.Errorf\n\t/go/pkg/mod/XYZ/golib/[email protected]/logger.go:25\ngithub.com/wagslane/go-rabbitmq/internal/connectionmanager.(*ConnectionManager).reconnectLoop\n\t/go/pkg/mod/github.com/wagslane/[email protected]/internal/connectionmanager/connection_manager.go:115\ngithub.com/wagslane/go-rabbitmq/internal/connectionmanager.(*ConnectionManager).startNotifyClose\n\t/go/pkg/mod/github.com/wagslane/[email protected]/internal/connectionmanager/connection_manager.go:86"}
    {"level":"info","ts":"2022-12-16T05:03:36.147184473Z","logger":"rabbitmq","caller":"[email protected]/logger.go:33","msg":"waiting 5s seconds to attempt to reconnect to amqp server"}
    {"level":"error","ts":"2022-12-16T05:04:11.148442465Z","logger":"rabbitmq","caller":"[email protected]/logger.go:25","msg":"error reconnecting to amqp server: dial tcp: lookup rabbitmq.rabbitmq.svc.cluster.local: i/o timeout","stacktrace":"XYZ/golib/hrabbitmq.Logger.Errorf\n\t/go/pkg/mod/XYZ/golib/[email protected]/logger.go:25\ngithub.com/wagslane/go-rabbitmq/internal/connectionmanager.(*ConnectionManager).reconnectLoop\n\t/go/pkg/mod/github.com/wagslane/[email protected]/internal/connectionmanager/connection_manager.go:115\ngithub.com/wagslane/go-rabbitmq/internal/connectionmanager.(*ConnectionManager).startNotifyClose\n\t/go/pkg/mod/github.com/wagslane/[email protected]/internal/connectionmanager/connection_manager.go:86"}
    {"level":"info","ts":"2022-12-16T05:04:11.148524676Z","logger":"rabbitmq","caller":"[email protected]/logger.go:33","msg":"waiting 5s seconds to attempt to reconnect to amqp server"}
    {"level":"error","ts":"2022-12-16T05:04:46.150196103Z","logger":"rabbitmq","caller":"[email protected]/logger.go:25","msg":"error reconnecting to amqp server: dial tcp: lookup rabbitmq.rabbitmq.svc.cluster.local: i/o timeout","stacktrace":"XYZ/golib/hrabbitmq.Logger.Errorf\n\t/go/pkg/mod/XYZ/golib/[email protected]/logger.go:25\ngithub.com/wagslane/go-rabbitmq/internal/connectionmanager.(*ConnectionManager).reconnectLoop\n\t/go/pkg/mod/github.com/wagslane/[email protected]/internal/connectionmanager/connection_manager.go:115\ngithub.com/wagslane/go-rabbitmq/internal/connectionmanager.(*ConnectionManager).startNotifyClose\n\t/go/pkg/mod/github.com/wagslane/[email protected]/internal/connectionmanager/connection_manager.go:86"}
    {"level":"info","ts":"2022-12-16T05:04:46.150313791Z","logger":"rabbitmq","caller":"[email protected]/logger.go:33","msg":"waiting 5s seconds to attempt to reconnect to amqp server"}
    {"level":"error","ts":"2022-12-16T05:05:21.15146101Z","logger":"rabbitmq","caller":"[email protected]/logger.go:25","msg":"error reconnecting to amqp server: dial tcp: lookup rabbitmq.rabbitmq.svc.cluster.local: i/o timeout","stacktrace":"XYZ/golib/hrabbitmq.Logger.Errorf\n\t/go/pkg/mod/XYZ/golib/[email protected]/logger.go:25\ngithub.com/wagslane/go-rabbitmq/internal/connectionmanager.(*ConnectionManager).reconnectLoop\n\t/go/pkg/mod/github.com/wagslane/[email protected]/internal/connectionmanager/connection_manager.go:115\ngithub.com/wagslane/go-rabbitmq/internal/connectionmanager.(*ConnectionManager).startNotifyClose\n\t/go/pkg/mod/github.com/wagslane/[email protected]/internal/connectionmanager/connection_manager.go:86"}
    {"level":"info","ts":"2022-12-16T05:05:21.151567677Z","logger":"rabbitmq","caller":"[email protected]/logger.go:33","msg":"waiting 5s seconds to attempt to reconnect to amqp server"}
    {"level":"error","ts":"2022-12-16T05:05:56.152881546Z","logger":"rabbitmq","caller":"[email protected]/logger.go:25","msg":"error reconnecting to amqp server: dial tcp: lookup rabbitmq.rabbitmq.svc.cluster.local: i/o timeout","stacktrace":"XYZ/golib/hrabbitmq.Logger.Errorf\n\t/go/pkg/mod/XYZ/golib/[email protected]/logger.go:25\ngithub.com/wagslane/go-rabbitmq/internal/connectionmanager.(*ConnectionManager).reconnectLoop\n\t/go/pkg/mod/github.com/wagslane/[email protected]/internal/connectionmanager/connection_manager.go:115\ngithub.com/wagslane/go-rabbitmq/internal/connectionmanager.(*ConnectionManager).startNotifyClose\n\t/go/pkg/mod/github.com/wagslane/[email protected]/internal/connectionmanager/connection_manager.go:86"}
    {"level":"info","ts":"2022-12-16T05:05:56.15299162Z","logger":"rabbitmq","caller":"[email protected]/logger.go:33","msg":"waiting 5s seconds to attempt to reconnect to amqp server"}
    {"level":"warn","ts":"2022-12-16T05:06:14.267313916Z","logger":"rabbitmq","caller":"[email protected]/logger.go:29","msg":"error closing connection while reconnecting: Exception (504) Reason: \"channel/connection is not open\""}
    {"level":"warn","ts":"2022-12-16T05:06:14.267384733Z","logger":"rabbitmq","caller":"[email protected]/logger.go:29","msg":"successfully reconnected to amqp server"}
    {"level":"info","ts":"2022-12-16T05:06:14.267430768Z","logger":"rabbitmq","caller":"[email protected]/logger.go:33","msg":"successful connection recovery from: Exception (501) Reason: \"write tcp 10.0.1.19:49792->10.32.75.145:5672: write: connection reset by peer\""}
    
    <-- no further messages handled
    

    Code:

    conn, err := rabbitmq.NewConn(
      connStr,
      rabbitmq.WithConnectionOptionsLogger(hrabbitmq.NewLogger(logger)),
    )
    if err != nil {
      sugar.Fatalw("failed to create a rabbit connection", "err", err)
    }
    defer conn.Close()
    
    consumer, err := rabbitmq.NewConsumer(conn, func(d rabbitmq.Delivery) rabbitmq.Action {
      // omitted handler logic
      return rabbitmq.Ack
    },
      "new-builds",
      rabbitmq.WithConsumerOptionsRoutingKey("organization.*.addBuild"),
      rabbitmq.WithConsumerOptionsRoutingKey("organization.*.addVersion"),
      rabbitmq.WithConsumerOptionsQueueDurable,
      rabbitmq.WithConsumerOptionsQOSPrefetch(1),
      rabbitmq.WithConsumerOptionsExchangeName(conf.RabbitmqExchange),
      rabbitmq.WithConsumerOptionsExchangeKind("topic"),
      rabbitmq.WithConsumerOptionsExchangeDurable,
    )
    if err != nil {
      sugar.Fatalw("failed to start consumer", "err", err)
    }
    defer consumer.Close()
    

    Used version github.com/wagslane/go-rabbitmq v0.11.0

    If you have any pointers what we could look into, that would be appreciated.

  • Remove deprecated 'Publish' method

    Remove deprecated 'Publish' method

    This PR adds PublishWithContext for publishing to Rabbitmq.

    As a requirement, github.com/rabbitmq/amqp091-go needs to be updated. Removed vendor directory, in favor of go mod download

    Fixes: #90

RabbitMQ Reconnection client

rmqconn RabbitMQ Reconnection for Golang Wrapper over amqp.Connection and amqp.Dial. Allowing to do a reconnection when the connection is broken befor

Sep 27, 2022
A tiny wrapper over amqp exchanges and queues 🚌 ✨

Rabbus ?? ✨ A tiny wrapper over amqp exchanges and queues. In memory retries with exponential backoff for sending messages. Protect producer calls wit

Dec 18, 2022
Golang AMQP wrapper for RabbitMQ with better API

go-rabbitmq Golang AMQP wrapper for RabbitMQ with better API Table of Contents Background Features Usage Installation Connect to RabbitMQ Declare Queu

Dec 21, 2022
Cadence is a distributed, scalable, durable, and highly available orchestration engine to execute asynchronous long-running business logic in a scalable and resilient way.

Cadence Visit cadenceworkflow.io to learn about Cadence. This repo contains the source code of the Cadence server. To implement workflows, activities

Jan 9, 2023
Declare AMQP entities like queues, producers, and consumers in a declarative way. Can be used to work with RabbitMQ.

About This package provides an ability to encapsulate creation and configuration of RabbitMQ([AMQP])(https://www.amqp.org) entities like queues, excha

Dec 28, 2022
Go library to build event driven applications easily using AMQP as a backend.

event Go library to build event driven applications easily using AMQP as a backend. Publisher package main import ( "github.com/creekorful/event" "

Dec 5, 2021
golang amqp rabbitmq produce consume

Step 1: Container Run Container docker run -itp 9001:9001 --name go_temp -v /usr/local/project/temp/go_amqp/:/home/ -d golang:1.16.6 Enter Container

Nov 26, 2021
Tool for collect statistics from AMQP (RabbitMQ) broker. Good for cloud native service calculation.

amqp-statisticator Tool for collect statistics around your AMQP broker. For example RabbitMQ expose a lot information trought the management API, but

Dec 13, 2021
Gorabbit - Simple library for AMQP Rabbit MQ publish subscribe

gorabbit Rabbit MQ Publish & Subscribe Simple library for AMQP Rabbit MQ publish

Oct 4, 2022
A tiny wrapper around NSQ topic and channel :rocket:

Event Bus NSQ A tiny wrapper around go-nsq topic and channel. Protect nsq calls with gobreaker. Installation go get -u github.com/rafaeljesus/nsq-even

Sep 27, 2022
goczmq is a golang wrapper for CZMQ.

goczmq Introduction A golang interface to the CZMQ v4.2 API. Install Dependencies libsodium libzmq czmq For CZMQ master go get github.com/zeromq/goczm

Jan 5, 2023
Golang API wrapper for MangaDex v5's MVP API.

mangodex Golang API wrapper for MangaDex v5's MVP API. Full documentation is found here. This API is still in Open Beta, so testing may not be complet

Oct 23, 2022
High abstraction wrapper for Golang Rabbit MQ Client

GRMQ Go Rabbit MQ What are the typical use-cases of RabbitMQ broker ? We create

Nov 3, 2022
redisqueue provides a producer and consumer of a queue that uses Redis streams

redisqueue redisqueue provides a producer and consumer of a queue that uses Redis streams. Features A Producer struct to make enqueuing messages easy.

Dec 29, 2022
Uniqush is a free and open source software system which provides a unified push service for server side notification to apps on mobile devices.

Homepage Download Blog/News @uniqush Introduction Uniqush (\ˈyü-nə-ku̇sh\ "uni" pronounced as in "unified", and "qush" pronounced as in "cushion") is

Jan 9, 2023
Package notify provides an implementation of the Gnome DBus Notifications Specification.

go-notify Package notify provides an implementation of the Gnome DBus Notifications Specification. Examples Display a simple notification. ntf := noti

Dec 27, 2022
⚡️ A lightweight service that will build and store your go projects binaries, Integrated with Github, Gitlab, Bitbucket and Bitbucket Server.
⚡️ A lightweight service that will build and store your go projects binaries, Integrated with Github, Gitlab, Bitbucket and  Bitbucket Server.

Rabbit A lightweight service that will build and store your go projects binaries. Rabbit is a lightweight service that will build and store your go pr

Nov 19, 2022
The Xiaomi message push service is a system-level channel on MIUI and is universal across the platform, which can provide developers with stable, reliable, and efficient push services.

Go-Push-API MiPush、JiPush、UMeng MiPush The Xiaomi message push service is a system-level channel on MIUI and is universal across the platform, which c

Oct 20, 2022
May 11, 2023