A lightweight transactional message bus on top of RabbitMQ

CircleCI Go Report Card Coverage Status GitHub release

grabbit

A lightweight transactional message bus on top of RabbitMQ supporting:

  1. Supported Messaging Styles
    • One Way (Fire and forget)
    • Publish/Subscribe
    • Aync Command/Reply
    • Blocking Command/Reply (RPC)
  2. Transactional message processing
  3. Message Orchestration via the Saga pattern
  4. At least once reliable messaging via Transaction Outbox and Publisher Confirms
  5. Retry and backoffs
  6. Structured logging
  7. Reporting Metrics via Prometheus
  8. Distributed Tracing via OpenTracing
  9. Extensible serialization with default support for gob, protobuf and avro

Stable release

the v1.x branch contains the latest stable releases of grabbit and one should track that branch to get point and minor release updates.

Supported transactional resources

  1. MySql > 8.0 (InnoDB)

Basic Usage

  • For a complete sample application see the vacation booking sample app in the examples directory

The following outlines the basic usage of grabbit. For a complete view of how you would use grabbit including how to write saga's and handle deadlettering refer to grabbit/tests package

import (
  "github.com/wework/grabbit/gbus"
  "github.com/wework/grabbit/gbus/builder"
)

Define a message

type SomeMessage struct {}

func(SomeMessage) SchemaName() string{
   return "some.unique.namespace.somemessage"
}

Creating a transactional GBus instance

gb := builder.
        New().
    Bus("connection string to RabbitMQ").
    Txnl("mysql", "connection string to mysql").
    WithConfirms().
    Build("name of your service")

Register a command handler

handler := func(invocation gbus.Invocation, message *gbus.BusMessage) error{
    cmd, ok := message.Payload.(*SomeCommand)
    if ok {
      fmt.Printf("handler invoked with  message %v", cmd)
            return nil
    }

        return fmt.Errorf("failed to handle message")
  }

gb.HandleMessage(SomeCommand{}, handler)

Register an event handler

eventHandler := func(invocation gbus.Invocation, message *gbus.BusMessage) {
    evt, ok := message.Payload.(*SomeEvent)
    if ok {
      fmt.Printf("handler invoked with event %v", evt)
            return nil
    }

        return fmt.Errorf("failed to handle event")
  }

gb.HandleEvent("name of exchange", "name of topic", SomeEvent{}, eventHandler)

Start the bus

gb.Start()
defer gb.Shutdown()

Send a command

gb.Send(context.Background(), "name of service you are sending the command to", gbus.NewBusMessage(SomeCommand{}))

Publish an event

gb.Publish(context.Background(), "name of exchange", "name of topic", gbus.NewBusMessage(SomeEvent{}))

RPC style call

request := gbus.NewBusMessage(SomeRPCRequest{})
reply := gbus.NewBusMessage(SomeRPCReply{})
timeOut := 2 * time.Second

reply, e := gb.RPC(context.Background(), "name of service you are sending the request to", request, reply, timeOut)

if e != nil{
  fmt.Printf("rpc call failed with error %v", e)
} else{
  fmt.Printf("rpc call returned with reply %v", reply)
}

Testing

  1. ensure that you have the dependencies installed: go get -v -t -d ./...
  2. make sure to first: docker-compose up -V -d
  3. then to run the tests: go test ./...
Owner
WeWork
Working with and contributing to the open source community
WeWork
Comments
  • Optimistic locking violation when saving saga instances should retry handler until success

    Optimistic locking violation when saving saga instances should retry handler until success

    When a saga instance gets invoked and persisted to the saga store an optimistic locking error may occur due to concurrent invocations. Currently when this happens the general retry strategy retries the message until success or until the set max retry count is exceeded and the message gets rejected.

    In highly concurrent saga's this may cause unneeded operational efforts due to failed messages. In case of concurrency violations we should retry the message until success.

  • first commit towards supporting persistent timeouts

    first commit towards supporting persistent timeouts

    Timeouts now are not passed as bus messages to avoid the need to deal with serialization. Timeout interface changed so that handling a timeout now accepts a transaction and a bus instance instead of an invocation and a bus message #32

  • added metrics for transactional outbox

    added metrics for transactional outbox

    The following metrics were added outbox_total_records: reports the total amount of records currently in the outbox outbox_pending_delivery: reports the total amount of records pending delivery currently in the outbox outbox_pending_removal: reports the total amount of records that were sent and pending removal currently in the outbox

    #36

  • Add generic handler metrics with the message type as a label

    Add generic handler metrics with the message type as a label

    Is your feature request related to a problem? Please describe. The handler metrics that grabbit exposes are per handler which makes creating generic dashboards difficult.

    Describe the solution you'd like I'd like generic metrics with the message type as a label, for the result counters the result would be a label as well

  • Allow configuring max retries

    Allow configuring max retries

    retries

    Provide a way to externally configure the maximum amount of retries handlers get executed before declaring the message as poison and rejecting it

    Closes #35

  • migrations fail when building a Bus for test db

    migrations fail when building a Bus for test db

    Describe the bug Getting an error when building a Bus - in the migrations part, test db ONLY: migrator: error while running migrations: error executing golang migration: Error 1060: Duplicate column name 'started_by_request_of_svc'. This error happens in sagaStoreAddSagaCreatorDetails, and causes the test to run forever (doesn't even panic). Note: when building a bus for the app itself - no error there. The errors occur in circleCI too. Table when running test: grabbit_testinvoicematching_sagas. Table when running app: grabbit_invoicematchingweb_sagas.

    If I delete the test DB entirely between tests - the migration works for the first time we build the bus. If I don't delete the test db between tests - I get that error in the second test onwards...

    To Reproduce Steps to reproduce the behavior:

    1. Build a test db twice...
    2. Or: simply run 2 tests that use the bus builder in IM.
  • 👌 feat(message:deduplication) implementing the feature #33

    👌 feat(message:deduplication) implementing the feature #33

    A way to manage message de-duplication in grabbit.

    Implementation details:

    • Each instance would set its own policy, it can be the default - None, reject duplicates - Reject and ack duplicates - Ack
    • Each service can set a duration to store each message in the duplicates table
    • Once a message is received from grabbit we check if the message-id is in our table
    • If it's in the table we invoke the policy
    • If it's not in the table we add the message, using our internal transaction, to be stored in the db
    • Once the processing is complete we and the internal tx is committed the message-id is also committed into the DB.
    • In the background we have an additional scheduler running which deletes messages that are older then what is set in the policy (currently hardcoded every minute to reduce the number of messages deleted in iteration to a minute)

    This PR closes issue #33

  • Allow saga's to override correlation logic.

    Allow saga's to override correlation logic.

    By default, grabbit correlates a message to the correct saga instance when a handler replies to an incoming message. There are many cases in which you would like to interact with a saga instance and send it messages not in the context of replying to a specific message originating from the saga.

    This is an improvement and should provide a better API than the one developed for #172

  • Fetching saga instances from the saga store should be optimized

    Fetching saga instances from the saga store should be optimized

    The current implementation of the mysql saga store, when fetching saga instances by sage type, fetches all saved instances in one round trip which may cause a performance issue when there are large amounts of saga instances that need to be fetched.

    In order to prevent a potential performance hit the fetching logic should be optimized to include paging and potentially parallel the fetching using a set of goroutines

  • Invoking the RPC interface from a handler fails

    Invoking the RPC interface from a handler fails

GoogleBookAPI is built on top of flogo, a flow based application.

GoogleBookAPI Example GoogleBookAPI is built on top of flogo, a flow based application. Upon launch for first time, the application creates a topic go

Nov 19, 2021
rpipe relays message between child process and redis pubsub channel.

rpipe rpipe relays message between child process and redis pubsub channel. rpipe subscribes Redis channel named HOSTNAME. rpipe spawns child process a

Jan 27, 2022
Remark42 is a self-hosted, lightweight, and simple comment engine
Remark42 is a self-hosted, lightweight, and simple comment engine

Remark42 is a self-hosted, lightweight, and simple (yet functional) comment engine, which doesn't spy on users. It can be embedded into blogs, articles or any other place where readers add comments.

Dec 28, 2022
Best lightweight, powerful and really fast Api with Golang (Fiber, REL, Dbmate) PostgreSqL

Best lightweight, powerful and really fast Api with Golang (Fiber, REL, Dbmate) PostgreSqL

Dec 26, 2021
Alibaba iLogtail : The Lightweight Collector of SLS in Alibaba Cloud
Alibaba iLogtail : The Lightweight Collector of SLS in Alibaba Cloud

Alibaba iLogtail - The Lightweight Collector of SLS in Alibaba Cloud | 中文版本 iLogtail was born for observable scenarios and has many production-level f

Dec 27, 2022
Super lightweight, easy-to-develop, general purpose golang framework

Super lightweight, easy-to-develop, general purpose golang framework ??

Jun 14, 2022
Prometheus Common Data Exporter can parse JSON, XML, yaml or other format data from various sources (such as HTTP response message, local file, TCP response message and UDP response message) into Prometheus metric data.
Prometheus Common Data Exporter can parse JSON, XML, yaml or other format data from various sources (such as HTTP response message, local file, TCP response message and UDP response message) into Prometheus metric data.

Prometheus Common Data Exporter Prometheus Common Data Exporter 用于将多种来源(如http响应报文、本地文件、TCP响应报文、UDP响应报文)的Json、xml、yaml或其它格式的数据,解析为Prometheus metric数据。

May 18, 2022
Sending emails using email server talking to RabbitMQ and send grid server sending emails to email ids consumed from RabbitMQ
Sending emails using email server talking to RabbitMQ and send grid server sending emails to email ids consumed from RabbitMQ

Sending emails using email server talking to RabbitMQ and send grid server sending emails to email ids consumed from RabbitMQ

Oct 27, 2022
🔊Minimalist message bus implementation for internal communication

?? Bus Bus is a minimalist event/message bus implementation for internal communication. It is heavily inspired from my event_bus package for Elixir la

Jan 3, 2023
Go simple async message bus
Go simple async message bus

?? message-bus Go simple async message bus. ?? ABOUT Contributors: Rafał Lorenz Want to contribute ? Feel free to send pull requests! Have problems, b

Dec 29, 2022
Messagebus - Simple Message Bus Written in Golang

MessageBus Simple Message Bus Written in Golang How to Use go get gopkg.io/Usada

Apr 21, 2022
:incoming_envelope: A fast Message/Event Hub using publish/subscribe pattern with support for topics like* rabbitMQ exchanges for Go applications

Hub ?? A fast enough Event Hub for go applications using publish/subscribe with support patterns on topics like rabbitMQ exchanges. Table of Contents

Dec 17, 2022
Testing message queues with RabbitMQ

Rabbit-MessageQueue Just a repository of RabbitMQ simple usage for queueing messages. You can use this as a sender or a receiver. More information is

Mar 10, 2022
top in container - Running the original top command in a container
top in container - Running the original top command in a container

Running the original top command in a container will not get information of the container, many metrics like uptime, users, load average, tasks, cpu, memory, are about the host in fact. topic(top in container) will retrieve those metrics from container instead, and shows the status of the container, not the host.

Dec 2, 2022
Golang package that generates clean, responsive HTML e-mails for sending transactional mail
Golang package that generates clean, responsive HTML e-mails for sending transactional mail

Hermes Hermes is the Go port of the great mailgen engine for Node.js. Check their work, it's awesome! It's a package that generates clean, responsive

Dec 28, 2022
Software Transactional Locks
Software Transactional Locks

Software Transactional Locks Package stl provides multiple atomic dynamic shared/exclusive locks, based on Software Transactional Memory (STM) concurr

Nov 5, 2022
Replacement of ApacheBench(ab), support for transactional requests, support for command line and package references to HTTP stress testing tool.

stress stress is an HTTP stress testing tool. Through this tool, you can do a stress test on the HTTP service and get detailed test results. It is ins

Aug 23, 2022
A demonstration of the transactional outbox messaging pattern (+ Log Trailing) with Amazon DynamoDB (+ Streams) written in Go.
A demonstration of the transactional outbox messaging pattern (+ Log Trailing) with Amazon DynamoDB (+ Streams) written in Go.

Transactional Outbox Pattern in Amazon DynamoDB A demonstration of the transactional outbox messaging pattern (+ Log Trailing) with Amazon DynamoDB (+

Apr 12, 2022
Apr 12, 2022
Tidb - An open-source NewSQL database that supports Hybrid Transactional and Analytical Processing (HTAP) workloads
Tidb - An open-source NewSQL database that supports Hybrid Transactional and Analytical Processing (HTAP) workloads

What is TiDB? TiDB ("Ti" stands for Titanium) is an open-source NewSQL database

Jan 5, 2022