Go - CQRS / Event Sourcing made easy - Go

Eventhus

CQRS/ES toolkit for Go.

CQRS stands for Command Query Responsibility Segregation. It's a pattern that I first heard described by Greg Young. At its heart is the notion that you can use a different model to update information than the model you use to read information.

The mainstream approach people use for interacting with an information system is to treat it as a CRUD datastore. By this I mean that we have mental model of some record structure where we can create new records, read records, update existing records, and delete records when we're done with them. In the simplest case, our interactions are all about storing and retrieving these records.

Event Sourcing ensures that every change to the state of an application is captured in an event object, and that these event objects are themselves stored in the sequence they were applied for the same lifetime as the application state itself.

Examples

bank account shows a full example with deposits and withdrawls.

Usage

There are 3 basic units of work: event, command and aggregate.

Command

A command describes an action that should be performed; it's always named in the imperative tense such as PerformDeposit or CreateAccount.

Let’s start with some code:

import "github.com/mishudark/eventhus"

// PerformDeposit to an account
type PerformDeposit struct {
	eventhus.BaseCommand
	Amount int
}

At the beginning, we create the PerformDeposit command. It contains an anonymous struct field of type eventhus.BaseCommand. This means PerformDeposit automatically acquires all the methods of eventhus.BaseCommand.

You can also define custom fields, in this case Amount contains a quantity to be deposited into an account.

Event

An event is the notification that something happened in the past. You can view an event as the representation of the reaction to a command after being executed. All events should be represented as verbs in the past tense such as CustomerRelocated, CargoShipped or InventoryLossageRecorded.

We create the DepositPerformed event; it's a pure go struct, and it's the past equivalent to the previous command PerformDeposit:

// DepositPerformed event
type DepositPerformed struct {
	Amount int
}

Aggregate

The aggregate is a logical boundary for things that can change in a business transaction of a given context. In the Eventhus context, it simplifies the process the commands and produce events.

Show me the code!

import "github.com/mishudark/eventhus"

//Account of bank
type Account struct {
	eventhus.BaseAggregate
	Owner   string
	Balance int
}

We create the Account aggregate. It contains an anonymous struct field of type eventhus.BaseAggregate. This means Account automatically acquires all the methods of eventhus.BaseAggregate.

Additionally Account has the fields Balance and Owner that represent the basic info of this context.

Now that we have our aggregate, we need to process the PerformDeposit command that we created earlier:

// HandleCommand create events and validate based on such command
func (a *Account) HandleCommand(command eventhus.Command) error {
	event := eventhus.Event{
		AggregateID:   a.ID,
		AggregateType: "Account",
	}

	switch c := command.(type) {
	case CreateAccount:
		event.AggregateID = c.AggregateID
		event.Data = &AccountCreated{c.Owner}

	case PerformDeposit:
		event.Data = &DepositPerformed{
			c.Amount,
		}
	}

	a.BaseAggregate.ApplyChangeHelper(a, event, true)
	return nil
}

First, we create an event with the basic info AggregateID as an identifier and AggregateType with the same name as our aggregate. Next, we use a switch to determine the type of the command and produce an event as a result.

Finally, the event should be applied to our aggregate; we use the helper BaseAggregate.ApplyChangeHelper with the params aggregate, event and the last argument set to true, meaning it should be stored and published via event store and event publisher.

Note: eventhus.BaseAggregate has some helper methods to make our life easier, we use HandleCommand to process a command and produce the respective event.

The last step in the aggregate journey is to apply the events to our aggregate:

// ApplyChange to account
func (a *Account) ApplyChange(event eventhus.Event) {
	switch e := event.Data.(type) {
	case *AccountCreated:
		a.Owner = e.Owner
		a.ID = event.AggregateID
	case *DepositPerformed:
		a.Balance += e.Amount
	}
}

Also, we use a switch-case format to determine the type of the event (note that events are pointers), and apply the respective changes.

Note: The aggregate is never saved in its current state. Instead, it is stored as a series of events that can recreate the aggregate in its last state.

Saving the events, publishing them, and recreating an aggregate from event store is made by Eventhus out of the box.

Config

Eventhus needs to be configured to manage events and commands, and to know where to store and publish events.

Event Store

Currently, it has support for MongoDB. Rethinkdb is in the scope to be added.

We create an event store with config.Mongo; it accepts host, port and table as arguments:

import "github.com/mishudark/eventhus/config"
...

config.Mongo("localhost", 27017, "bank") // event store

Event Publisher

RabbitMQ and Nats.io are supported.

We create an eventbus with config.Nats, it accepts url data config and useSSL as arguments:

import 	"github.com/mishudark/eventhus/config"
...

config.Nats("nats://ruser:T0pS3cr3t@localhost:4222", false) // event bus

Wire it all together

Now that we have all the pieces, we can register our events, commands and aggregates:

import (
	"github.com/mishudark/eventhus"
	"github.com/mishudark/eventhus/commandhandler/basic"
	"github.com/mishudark/eventhus/config"
	"github.com/mishudark/eventhus/examples/bank"
)

func getConfig() (eventhus.CommandBus, error) {
	// register events
	reg := eventhus.NewEventRegister()
	reg.Set(bank.AccountCreated{})
	reg.Set(bank.DepositPerformed{})
	reg.Set(bank.WithdrawalPerformed{})

    // wire all parts together
	return config.NewClient(
		config.Mongo("localhost", 27017, "bank"),                    // event store
		config.Nats("nats://ruser:T0pS3cr3t@localhost:4222", false), // event bus
		config.AsyncCommandBus(30),                                  // command bus
		config.WireCommands(
			&bank.Account{},          // aggregate
			basic.NewCommandHandler,  // command handler
			"bank",                   // event store bucket
			"account",                // event store subset
			bank.CreateAccount{},     // command
			bank.PerformDeposit{},    // command
			bank.PerformWithdrawal{}, // command
		),
	)
}

Now you are ready to process commands:

uuid, _ := utils.UUID()

// 1) Create an account
var account bank.CreateAccount
account.AggregateID = uuid
account.Owner = "mishudark"

commandBus.HandleCommand(account)

First, we generate a new UUID. This is because is a new account and we need a unique identifier. After we created the basic structure of our CreateAccount command, we only need to send it using the commandbus created in our config.

Event consumer

You should listen to your eventbus, the format of the event is always the same, only the data key changes in the function of your event struct.

{
  "id": "0000XSNJG0SB2WDBTATBYEC51P",
  "aggregate_id": "0000XSNJG0N0ZVS3YXM4D7ZZ9Z",
  "aggregate_type": "Account",
  "version": 1,
  "type": "AccountCreated",
  "data": {
    "owner": "mishudark"
  }
}

Prior Art

Owner
mishudark
FuchsiaOS engineer, Go, Flutter, Elixir, Rust, React
mishudark
Comments
  • using with gokit

    using with gokit

    https://github.com/go-kit/kit

    gokit only supports simple RPC.

    I dont knwo if eventhus is dead, but it looks like it can pretty easily work with gokit.

  • assignment mismatch for badger (but not using badger)

    assignment mismatch for badger (but not using badger)

    Currently trying to use this library gives me this error when running:

    # github.com/mishudark/eventhus/eventstore/badger
    ../../../go/src/github.com/mishudark/eventhus/eventstore/badger/badger.go:120:12: assignment mismatch: 2 variables but item.Value returns 1 values
    ../../../go/src/github.com/mishudark/eventhus/eventstore/badger/badger.go:120:25: not enough arguments in call to item.Value
    	have ()
    	want (func([]byte) error)
    ../../../go/src/github.com/mishudark/eventhus/eventstore/badger/badger.go:175:22: assignment mismatch: 2 variables but item.Value returns 1 values
    ../../../go/src/github.com/mishudark/eventhus/eventstore/badger/badger.go:175:35: not enough arguments in call to item.Value
    	have ()
    	want (func([]byte) error)
    

    What's interesting is that when I try to run the example bank application from a cloned github repo, it works fine. But if I separate the example from the repo folder or make my own implementation, I constantly get this error.

  • Questions about implementation details

    Questions about implementation details

    Hi, I'm using your repo as a foundation to implement event-sourcing. While experimenting with it I faced with following issues:

    1. Why it should not be possible to create an event without to specify a version in the command? Other event-store libs like https://github.com/envato/event_sourcery provide the exact version match optionally.
    2. For every aggregate, we need to set the ID https://github.com/mishudark/eventhus/blob/d39839fa19e8887803bc738de679ba614aade956/examples/bank/account.go#L24 the same for https://github.com/mishudark/eventhus/blob/d39839fa19e8887803bc738de679ba614aade956/examples/bank/account.go#L41. This can be easily abstracted.

    Btw: Are you using this lib in production? There is a lot of room for improvements (error handling in commandbus, retry-mechanism, graceful shutdown of connections, logging). From my understanding, this project is just a playground for you right?

    I could solve most of the issues and implemented a SQL integration with https://gorm.io/. If anybody is interested in it I will take the time to create a PR against v2.

  • Add mocked event store

    Add mocked event store

    This PR adds a mocked event store.

    Background

    For testing I'd like to use this library without an actual mongodb and rabbitmq.

    For that I need mock implementations (or in-memory) which I try to create right now.

    Changes

    • Add package eventstore/mock including basic tests
    • Update documentation (which includes texts for a mocked event bus which I'd like to add later)
  • Example doesn't work

    Example doesn't work

    Hi, I'm trying to experiment with your repo but I run into

    https://github.com/mishudark/eventhus/blob/d39839fa19e8887803bc738de679ba614aade956/commandhandler/basic/basic.go#L48-L50

    How should it work? How can I create the initial aggregation event?

  • Adding eventstore badger. Adding eventbus mosquitto.

    Adding eventstore badger. Adding eventbus mosquitto.

    I am using badger + mosquitto for my project. Feel free to add them to the implementation. One shortcoming that I see in this implementation (for badger and mosquitto) is that they ought to use connection pooling: eventhus need to tell the eventstore/eventbus that it is shutting down in order to close open connections.

  • MongoDB Eventstore has a 16MB limit of per aggregate root

    MongoDB Eventstore has a 16MB limit of per aggregate root

    MongoDB has a 16MB limit per document hard-coded, since eventhus uses a single document to store the events, there's a limitation there.

    I would recommend using per-document events, instead of per-document aggregates, and you can still make that atomic by simply removing the if condition here and remove entirely this else statement.

    Of course, a change to Client.Load() to do a findAll instead of findOne, and sorting by the version later. What you guys think?

  • Initialize view model. Retrieve events given aggregate id.

    Initialize view model. Retrieve events given aggregate id.

    As I understand a 'canonical' CQRS-ES pattern reading clients are encouraged to maintain there own view models, optimized for their use case. A reading client would subscribe to the event queue and build it's view model based on the events it's receive.

    The CQRS-ES pattern does not dictate the nature of this view model:

    • if a reading client is keeping the state of the entire domain, a single domain entity.
    • if it keeps all information or a specific subset
    • if it keeps the information inevitably, or just keep it for the duration of a session.

    Keeping a local copy of a single aggregate as well as keeping a syncronized (read only) query db is equally valid use cases from the reading client.

    I am going to look at a view model representing a single uniquely identifiable aggregate. It could equally well be a set of such aggregates, the entire DDD, it does not matter. The reading client might join the CQRS-ES entwork at any time, meanting that we don't initially know the state of the aggregate. Looking at the life-cycle of my view-model / aggregate there are the following situations.

    1. When a new aggregate is created. The client can safely assume that it is creating a new unique aggregate. It will issue a new command using a new aggregate id and version 0.
    2. When the aggregate exist in the view model. The next command created will use the existing aggregate id and version.
    3. When the command have failed, indicated an invalid (outdated) version: The client will need to catch up with the history of the aggregate.
    4. When the client have the id of the aggregate but no view-model: The client will need to catch up with the history of the aggregate.

    So far I assumed that the aggregate have a unique id that I have. This happens to be safe for the application that I am currenly building. It is also possible that the aggregate can additonally be identified by a natural key that also uniquely identifies the aggregate. The CQRS-ES pattern propose a reading client building an index (mapping natural key to aggregate id) to care for these situations. To care for building the index there is also the following situation.

    1. When the client does not have the aggregate key and can not safely assume that the aggregate does not exist. Look up the aggregate, bootstrap the index: retrieve all aggregates with all events.

    For now I am mostly interested in (3.) and (4.). How do the client get all the events of an existing aggregate? The client have the aggregate id, but does not have an up to date view model.

    I would want the account example to cover cases (3.) and (4.). How should the client retrieve all (missing) events given an aggregate id?

  • EventBus interface question

    EventBus interface question

    I see EventBus has publish but not a subscribe? Just curious how the events propagated to projects for querying? I apologize if subscribe is defined somewhere else as I did not read the source code completely.

  • Atomic publish & save

    Atomic publish & save

    I see your code and you do not publish event to MQ and saving to DB Atomically. This mean that is a service fails before sending an event, no one will be notified

Cloud-native and easy-to-use application management platform | 云原生且易用的应用管理平台
Cloud-native and easy-to-use application management platform | 云原生且易用的应用管理平台

Website • Documentation What is NEW! August 24, 2020 ,Rainbond 5.2 Stable version is officially released View Release Rainbond Introduction Cloud nati

Dec 29, 2022
Rpcx-framework - An RPC microservices framework based on rpcx, simple and easy to use, ultra fast and efficient, powerful, service discovery, service governance, service layering, version control, routing label registration.

RPCX Framework An RPC microservices framework based on rpcx. Features: simple and easy to use, ultra fast and efficient, powerful, service discovery,

Jan 5, 2022
Idiomatic Event Sourcing in Go
Idiomatic Event Sourcing in Go

Event Sourcing for Go Idiomatic library to help you build Event Sourced application in Go. Please note The library is currently under development and

Oct 27, 2022
A pluggable backend API that enforces the Event Sourcing Pattern for persisting & broadcasting application state changes
A pluggable backend API that enforces the Event Sourcing Pattern for persisting & broadcasting application state changes

A pluggable "Application State Gateway" that enforces the Event Sourcing Pattern for securely persisting & broadcasting application state ch

Nov 1, 2022
A pluggable backend API that enforces the Event Sourcing Pattern for persisting & broadcasting application state changes
A pluggable backend API that enforces the Event Sourcing Pattern for persisting & broadcasting application state changes

A pluggable "Application State Gateway" that enforces the Event Sourcing Pattern for securely persisting & broadcasting application state changes

Nov 1, 2022
This example showcases an event-sourced CQRS system based on github.com/romshark/eventlog

Eventlog Example This example is showcasing an eventually consistent, fault-tolerant, event sourced system following the CQRS (Command-Query-Responsib

Mar 13, 2022
Build event-driven and event streaming applications with ease

Commander ?? Commander is Go library for writing event-driven applications. Enabling event sourcing, RPC over messages, SAGA's, bidirectional streamin

Dec 19, 2022
KEDA is a Kubernetes-based Event Driven Autoscaling component. It provides event driven scale for any container running in Kubernetes
 KEDA is a Kubernetes-based Event Driven Autoscaling component. It provides event driven scale for any container running in Kubernetes

Kubernetes-based Event Driven Autoscaling KEDA allows for fine-grained autoscaling (including to/from zero) for event driven Kubernetes workloads. KED

Jan 7, 2023
gevent imply go-event which tries to make event handling easier.

gevent imply go-event which tries to make event handling easier. What does gevent want to do Async execute jobs safely without too many go routines. S

Nov 10, 2021
Go-serverless-eth-event-listener - Go serverless, ethereum contract event listener with a sample contract

go-serverless-eth-event-listener This repository is for showing how to listen sm

May 19, 2022
Govent is an event bus framework for DDD event source implement

Govent is an event bus framework for DDD event source implement. Govent can also solve the package circular dependency problem.

Jan 28, 2022
Event-planning-go - GRAPHQL Project for Event Planning

About The Project GRAPHQL Project for Event Planning Building the project with l

Mar 13, 2022
Go Server/API boilerplate using best practices DDD CQRS ES gRPC
Go Server/API boilerplate using best practices DDD CQRS ES gRPC

Go Server/API boilerplate using best practices DDD CQRS ES gRPC

Jan 6, 2023
Go gRPC Kafka CQRS microservices with tracing

Golang CQRS Kafka gRPC Postgresql MongoDB Redis microservices example ?? ??‍?? Full list what has been used: Kafka as messages broker gRPC Go implemen

Jan 1, 2023
Go-gin-ddd-cqrs - Clean api rest with Go, Gin and GORM
Go-gin-ddd-cqrs - Clean api rest with Go, Gin and GORM

GOLANG API REST Clean api rest with Go, Gin and GORM. Clean Architecture with DD

Oct 21, 2022
☔🎀 Translation made with simplicity, yet robust. ~ Backend portion of Arisu, made in Go.

☔ Tsubaki Website • Discord • Discussions Translation made with simplicity, yet robust. Made with ?? using TypeScript, React with Next.js. Tsubaki is

Jan 13, 2022
Easy to use distributed event bus similar to Kafka
Easy to use distributed event bus similar to Kafka

chukcha Easy to use distributed event bus similar to Kafka. The event bus is designed to be used as a persistent intermediate storage buffer for any k

Dec 30, 2022
High-performance, non-blocking, event-driven, easy-to-use networking framework written in Go, support tls/http1.x/websocket.

High-performance, non-blocking, event-driven, easy-to-use networking framework written in Go, support tls/http1.x/websocket.

Jan 8, 2023
pubsub controller using kafka and base on sarama. Easy controll flow for actions streamming, event driven.

Psub helper for create system using kafka to streaming and events driven base. Install go get github.com/teng231/psub have 3 env variables for config

Sep 26, 2022
Eye - An easy-use lib for event-driven pattern

?? Eye Eye 是一个简单易用的事件驱动模式库。 Read me in English ?? 功能特性 敬请期待。。。 历史版本的特性请查看 HISTOR

Jan 17, 2022