Transactional outbox harvester for Postgres → Kafka, written in Go

logo 

Go version Build Release Codecov Go Report Card Total alerts GoDoc Reference

goharvest is a Go implementation of the Transactional Outbox pattern for Postgres and Kafka.

Transactional Outbox

While goharvest is a complex beast, the end result is dead simple: to publish Kafka messages reliably and atomically, simply write a record to a dedicated outbox table in a transaction, alongside any other database changes. (Outbox schema provided below.) goharvest scrapes the outbox table in the background and publishes records to a Kafka topic of the application's choosing, using the key, value and headers specified in the outbox record. goharvest currently works with Postgres. It maintains causal order of messages and does not require CDC to be enabled on the database, making for a zero-hassle setup. It handles thousands of records/second on commodity hardware.

Getting started

1. Create an outbox table for your application

CREATE TABLE IF NOT EXISTS outbox (
  id                  BIGSERIAL PRIMARY KEY,
  create_time         TIMESTAMP WITH TIME ZONE NOT NULL,
  kafka_topic         VARCHAR(249) NOT NULL,
  kafka_key           VARCHAR(100) NOT NULL,  -- pick your own maximum key size
  kafka_value         VARCHAR(10000),         -- pick your own maximum value size
  kafka_header_keys   TEXT[] NOT NULL,
  kafka_header_values TEXT[] NOT NULL,
  leader_id           UUID
)

2. Run goharvest

Standalone mode

This runs goharvest within a separate process called reaper, which will work alongside any application that writes to a standard outbox. (Not just applications written in Go.)

Install reaper

go get -u github.com/obsidiandynamics/goharvest/cmd/reaper

Create reaper.yaml configuration

harvest:
  baseKafkaConfig: 
    bootstrap.servers: localhost:9092
  producerKafkaConfig:
    compression.type: lz4
    delivery.timeout.ms: 10000
  leaderTopic: my-app-name
  leaderGroupID: my-app-name
  dataSource: host=localhost port=5432 user=postgres password= dbname=postgres sslmode=disable
  outboxTable: outbox
  limits:
    minPollInterval: 1s
    heartbeatTimeout: 5s
    maxInFlightRecords: 1000
    minMetricsInterval: 5s
    sendConcurrency: 4
    sendBuffer: 10
logging:
  level: Debug

Start reaper

reaper -f reaper.yaml

Embedded mode

goharvest can be run in the same process as your application.

Add the dependency

go get -u github.com/obsidiandynamics/goharvest

Create and start a Harvest instance

import "github.com/obsidiandynamics/goharvest"
// Configure the harvester. It will use its own database and Kafka connections under the hood.
config := Config{
  BaseKafkaConfig: KafkaConfigMap{
    "bootstrap.servers": "localhost:9092",
  },
  DataSource: "host=localhost port=5432 user=postgres password= dbname=postgres sslmode=disable",
}

// Create a new harvester.
harvest, err := New(config)
if err != nil {
  panic(err)
}

// Start harvesting in the background.
err = harvest.Start()
if err != nil {
  panic(err)
}

// Wait indefinitely for the harvester to end.
log.Fatal(harvest.Await())

Using a custom logger

goharvest uses log.Printf for output by default. Logger configuration is courtesy of the Scribe façade, from libstdgo. The example below uses a Logrus binding for Scribe.

import (
  "github.com/obsidiandynamics/goharvest"
  scribelogrus "github.com/obsidiandynamics/libstdgo/scribe/logrus"
  "github.com/sirupsen/logrus"
)
log := logrus.StandardLogger()
log.SetLevel(logrus.DebugLevel)

// Configure the custom logger using a binding.
config := Config{
  BaseKafkaConfig: KafkaConfigMap{
    "bootstrap.servers": "localhost:9092",
  },
  Scribe:     scribe.New(scribelogrus.Bind()),
  DataSource: "host=localhost port=5432 user=postgres password= dbname=postgres sslmode=disable",
}

Listening for leader status updates

Just like goharvest uses NELI to piggy-back on Kafka's leader election, you can piggy-back on goharvest to get leader status updates:

log := logrus.StandardLogger()
log.SetLevel(logrus.TraceLevel)
config := Config{
  BaseKafkaConfig: KafkaConfigMap{
    "bootstrap.servers": "localhost:9092",
  },
  DataSource: "host=localhost port=5432 user=postgres password= dbname=postgres sslmode=disable",
  Scribe:     scribe.New(scribelogrus.Bind()),
}

// Create a new harvester and register an event hander.
harvest, err := New(config)

// Register a handler callback, invoked when an event occurs within goharvest.
// The callback is completely optional; it lets the application piggy-back on leader
// status updates, in case it needs to schedule some additional work (other than
// harvesting outbox records) that should only be run on one process at any given time.
harvest.SetEventHandler(func(e Event) {
  switch event := e.(type) {
  case LeaderAcquired:
    // The application may initialise any state necessary to perform work as a leader.
    log.Infof("Got event: leader acquired: %v", event.LeaderID())
  case LeaderRefreshed:
    // Indicates that a new leader ID was generated, as a result of having to remark
    // a record (typically as due to an earlier delivery error). This is purely
    // informational; there is nothing an application should do about this, other
    // than taking note of the new leader ID if it has come to rely on it.
    log.Infof("Got event: leader refreshed: %v", event.LeaderID())
  case LeaderRevoked:
    // The application may block the callback until it wraps up any in-flight
    // activity. Only upon returning from the callback, will a new leader be elected.
    log.Infof("Got event: leader revoked")
  case LeaderFenced:
    // The application must immediately terminate any ongoing activity, on the assumption
    // that another leader may be imminently elected. Unlike the handling of LeaderRevoked,
    // blocking in the callback will not prevent a new leader from being elected.
    log.Infof("Got event: leader fenced")
  case MeterRead:
    // Periodic statistics regarding the harvester's throughput.
    log.Infof("Got event: meter read: %v", event.Stats())
  }
})

// Start harvesting in the background.
err = harvest.Start()

Which mode should I use

Running goharvest in standalone mode using reaper is the recommended approach for most use cases, as it fully insulates the harvester from the rest of the application. Ideally, you should deploy reaper as a sidecar daemon, to run alongside your application. All the reaper needs is access to the outbox table and the Kafka cluster.

Embedded goharvest is useful if you require additional insights into its operation, which is accomplished by registering an EventHandler callback, as shown in the example above. This callback is invoked whenever the underlying leader status changes, which may be useful if you need to schedule additional workloads that should only be run on one process at any given time.

3. Write outbox records

Directly, using SQL

You can write database records from any app, by simply issuing the following INSERT statement:

INSERT INTO ${outbox_table} (
  create_time, 
  kafka_topic, 
  kafka_key, 
  kafka_value, 
  kafka_header_keys, 
  kafka_header_values
)
VALUES (NOW(), $1, $2, $3, $4, $5)

Replace ${outbox_table} and bind the query variables as appropriate:

  • kafka_topic column specifies an arbitrary topic name, which may differ among records.
  • kafka_key is a mandatory string key. Each record must be published with a specified key, which will affect its placement among the topic's partitions.
  • kafka_value is an optional string value. If unspecified, the record will be published with a nil value, allowing it to be used as a compaction tombstone.
  • kafka_header_keys and kafka_header_values are arrays that specify the keys and values of record headers. When used each element in kafka_header_keys corresponds to an element in kafka_header_values at the same index. If not using headers, set both arrays to empty.

Note: Writing outbox records should be performed in the same transaction as other related database updates. Otherwise, messaging will not be atomic — the updates may be stably persisted while the message might be lost, and vice versa.

Using stasher

The goharvest library comes with a stasher helper package for writing records to an outbox.

One-off messages

When one database update corresponds to one message, the easiest approach is to call Stasher.Stash():

import "github.com/obsidiandynamics/goharvest"
db, err := sql.Open("postgres", "host=localhost port=5432 user=postgres password= dbname=postgres sslmode=disable")
if err != nil {
  panic(err)
}
defer db.Close()

st := New("outbox")

// Begin a transaction.
tx, _ := db.Begin()
defer tx.Rollback()

// Update other database entities in transaction scope.

// Stash an outbox record for subsequent harvesting.
err = st.Stash(tx, goharvest.OutboxRecord{
  KafkaTopic: "my-app.topic",
  KafkaKey:   "hello",
  KafkaValue: goharvest.String("world"),
  KafkaHeaders: goharvest.KafkaHeaders{
    {Key: "applicationId", Value: "my-app"},
  },
})
if err != nil {
  panic(err)
}

// Commit the transaction.
tx.Commit()

Multiple messages

Sending multiple messages within a single transaction may be done more efficiently using prepared statements:

// Begin a transaction.
tx, _ := db.Begin()
defer tx.Rollback()

// Update other database entities in transaction scope.
// ...

// Formulates a prepared statement that may be reused within the scope of the transaction.
prestash, _ := st.Prepare(tx)

// Publish a bunch of messages using the same prepared statement.
for i := 0; i < 10; i++ {
  // Stash an outbox record for subsequent harvesting.
  err = prestash.Stash(goharvest.OutboxRecord{
    KafkaTopic: "my-app.topic",
    KafkaKey:   "hello",
    KafkaValue: goharvest.String("world"),
    KafkaHeaders: goharvest.KafkaHeaders{
      {Key: "applicationId", Value: "my-app"},
    },
  })
  if err != nil {
    panic(err)
  }
}

// Commit the transaction.
tx.Commit()

Configuration

There are handful of parameters that for configuring goharvest, assigned via the Config struct:

Parameter Default value Description
BaseKafkaConfig Map containing bootstrap.servers=localhost:9092. Configuration shared by the underlying Kafka producer and consumer clients, including those used for leader election.
ProducerKafkaConfig Empty map. Additional configuration on top of BaseKafkaConfig that is specific to the producer clients created by goharvest for publishing harvested messages. This configuration does not apply to the underlying NELI leader election protocol.
LeaderGroupID Assumes the filename of the application binary. Used by the underlying leader election protocol as a unique identifier shared by all instances in a group of competing processes. The LeaderGroupID is used as Kafka group.id property under the hood, when subscribing to the leader election topic.
LeaderTopic Assumes the value of LeaderGroupID, suffixed with the string .neli. Used by NELI as the name of the Kafka topic for orchestrating leader election. Competing processes subscribe to the same topic under an identical consumer group ID, using Kafka's exclusive partition assignment as a mechanism for arbitrating leader status.
DataSource Local Postgres data source host=localhost port=5432 user=postgres password= dbname=postgres sslmode=disable. The database driver-specific data source string.
OutboxTable outbox The name of the outbox table, optionally including the schema name.
Scribe Scribe configured with bindings for log.Printf(); effectively the result of running scribe.New(scribe.StandardBinding()). The logging façade used by the library, preconfigured with your logger of choice. See Scribe GoDocs.
Name A string in the form {hostname}_{pid}_{time}, where {hostname} is the result of invoking os.Hostname(), {pid} is the process ID, and {time} is the UNIX epoch time, in seconds. The symbolic name of this instance. This field is informational only, accompanying all log messages.
Limits.MinPollInterval 100 ms The lower bound on the poll interval, preventing the over-polling of Kafka on successive Pulse() invocations. Assuming Pulse() is called repeatedly by the application, NELI may poll Kafka at a longer interval than MinPollInterval. (Regular polling is necessary to prove client's liveness and maintain internal partition assignment, but polling excessively is counterproductive.)
Limits.HeartbeatTimeout 5 s The period that a leader will maintain its status, not having received a heartbeat message on the leader topic. After the timeout elapses, the leader will assume a network partition and will voluntarily yield its status, signalling a LeaderFenced event to the application.
Limits.QueueTimeout 30 s The maximum period of time a record may be queued after having been marked, before timing out and triggering a remark.
Limits.MarkBackoff 10 ms The backoff delay introduced by the mark thread when a query returns no results, indicating the absence of backlogged records. A mark backoff prevents aggressive querying of the database in the absence of a steady flow of outbox records.
Limits.IOErrorBackoff 500 ms The backoff delay introduced when any of the mark, purge or reset queries encounter a database error.
Limits.MaxInFlightRecords 1000 An upper bound on the number of marked records that may be in flight at any given time. I.e. the number of records that have been enqueued with a producer client, for which acknowledgements have yet to be received.
Limits.SendConcurrency 8 The number of concurrent shards used for queuing causally unrelated records. Each shard is equipped with a dedicated producer client, allowing for its records to be sent independently of other shards.
Limits.SendBuffer 10 The maximum number of marked records that may be buffered for subsequent sending, for any given shard. When the buffer is full, the marker will halt — waiting for records to be sent and for their acknowledgements to flow through.
Limits.MarkQueryRecords 100 An upper bound on the number of records that may be marked in any given query. Limiting this number avoids long-running database queries.
Limits.MinMetricsInterval 5 s The minimum interval at which throughput metrics are emitted. Metrics are emitted conservatively and may be observed less frequently; in fact, throughput metrics are only emitted upon a successful message acknowledgement, which will not occur during periods of inactivity.

Docs

Design

Comparison of messaging patterns

Comparison of harvesting methods

FAQ

Similar Resources

Easy to use distributed event bus similar to Kafka

Easy to use distributed event bus similar to Kafka

chukcha Easy to use distributed event bus similar to Kafka. The event bus is designed to be used as a persistent intermediate storage buffer for any k

Dec 30, 2022

Kafka implemented in Golang with built-in coordination (No ZooKeeper, single binary install, Cloud Native)

Jocko Distributed commit log service in Go that is wire compatible with Kafka. Created by @travisjeffery, continued by nash. Goals: Protocol compatibl

Aug 9, 2021

Cluster extensions for Sarama, the Go client library for Apache Kafka 0.9

Cluster extensions for Sarama, the Go client library for Apache Kafka 0.9 (and later).

Dec 28, 2022

kafka watcher for casbin library

Casbin Kafka Watcher Casbin watcher for kafka This watcher library will enable users to dynamically change casbin policies through kakfa messages Infl

May 8, 2021

CLI Tool to Stress Apache Kafka Clusters

Kafka Stress - Stress Test Tool for Kafka Clusters, Producers and Consumers Tunning Installation Docker docker pull fidelissauro/kafka-stress:latest d

Nov 13, 2022

Go gRPC Kafka CQRS microservices with tracing

Golang CQRS Kafka gRPC Postgresql MongoDB Redis microservices example 👋 👨‍💻 Full list what has been used: Kafka as messages broker gRPC Go implemen

Jan 1, 2023

pubsub controller using kafka and base on sarama. Easy controll flow for actions streamming, event driven.

Psub helper for create system using kafka to streaming and events driven base. Install go get github.com/teng231/psub have 3 env variables for config

Sep 26, 2022

Kafka producer and consumer tool in protobuf format.

protokaf Kafka producer and consumer tool in protobuf format. Features Consume and produce messages using Protobuf protocol Trace messages with Jaeger

Nov 15, 2022

Kafka tool to emit tombstones for messages based on header value matches

Langolier Langolier is a CLI tool to consume a Kafka topic and emit tombstones for messages matched by header name/value pairs. Usage Usage of langoli

Sep 22, 2021
Comments
  • Update confluent kafka go dep

    Update confluent kafka go dep

    Hi all, First of all I want to say good job in creating this package. I tried to use it as a package inside my application but I got stuck in this https://github.com/confluentinc/confluent-kafka-go/issues/450 confluent kafka go issue that has been fixed in v1.4.2. TL; DR; if you try to vendor confluent kafka go v1.4.0 you end up without a confluent folder and the package won't compile... Can we please update at least to v1.4.2? (i can provide a pr for that) thanks

  • Upgrade confluent kafka go to 1.4.2

    Upgrade confluent kafka go to 1.4.2

    Upgrade confluent kafka go to v1.4.2. Sadly looks like internally confluent kafka go imports

    github.com/confluentinc/confluent-kafka-go
    

    so it results indirectly imported

    I could not upgrade to librdkafka 1.5.2 because integration tests are not passing anymore (i suspect a problem in goneli)

    resolves #1

  • Add CodeQL workflow for GitHub code scanning

    Add CodeQL workflow for GitHub code scanning

    Hi obsidiandynamics/goharvest!

    This is a one-off automatically generated pull request from LGTM.com :robot:. You might have heard that we’ve integrated LGTM’s underlying CodeQL analysis engine natively into GitHub. The result is GitHub code scanning!

    With LGTM fully integrated into code scanning, we are focused on improving CodeQL within the native GitHub code scanning experience. In order to take advantage of current and future improvements to our analysis capabilities, we suggest you enable code scanning on your repository. Please take a look at our blog post for more information.

    This pull request enables code scanning by adding an auto-generated codeql.yml workflow file for GitHub Actions to your repository — take a look! We tested it before opening this pull request, so all should be working :heavy_check_mark:. In fact, you might already have seen some alerts appear on this pull request!

    Where needed and if possible, we’ve adjusted the configuration to the needs of your particular repository. But of course, you should feel free to tweak it further! Check this page for detailed documentation.

    Questions? Check out the FAQ below!

    FAQ

    Click here to expand the FAQ section

    How often will the code scanning analysis run?

    By default, code scanning will trigger a scan with the CodeQL engine on the following events:

    • On every pull request — to flag up potential security problems for you to investigate before merging a PR.
    • On every push to your default branch and other protected branches — this keeps the analysis results on your repository’s Security tab up to date.
    • Once a week at a fixed time — to make sure you benefit from the latest updated security analysis even when no code was committed or PRs were opened.

    What will this cost?

    Nothing! The CodeQL engine will run inside GitHub Actions, making use of your unlimited free compute minutes for public repositories.

    What types of problems does CodeQL find?

    The CodeQL engine that powers GitHub code scanning is the exact same engine that powers LGTM.com. The exact set of rules has been tweaked slightly, but you should see almost exactly the same types of alerts as you were used to on LGTM.com: we’ve enabled the security-and-quality query suite for you.

    How do I upgrade my CodeQL engine?

    No need! New versions of the CodeQL analysis are constantly deployed on GitHub.com; your repository will automatically benefit from the most recently released version.

    The analysis doesn’t seem to be working

    If you get an error in GitHub Actions that indicates that CodeQL wasn’t able to analyze your code, please follow the instructions here to debug the analysis.

    How do I disable LGTM.com?

    If you have LGTM’s automatic pull request analysis enabled, then you can follow these steps to disable the LGTM pull request analysis. You don’t actually need to remove your repository from LGTM.com; it will automatically be removed in the next few months as part of the deprecation of LGTM.com (more info here).

    Which source code hosting platforms does code scanning support?

    GitHub code scanning is deeply integrated within GitHub itself. If you’d like to scan source code that is hosted elsewhere, we suggest that you create a mirror of that code on GitHub.

    How do I know this PR is legitimate?

    This PR is filed by the official LGTM.com GitHub App, in line with the deprecation timeline that was announced on the official GitHub Blog. The proposed GitHub Action workflow uses the official open source GitHub CodeQL Action. If you have any other questions or concerns, please join the discussion here in the official GitHub community!

    I have another question / how do I get in touch?

    Please join the discussion here to ask further questions and send us suggestions!

A CLI tool for interacting with Kafka through the Confluent Kafka Rest Proxy

kafkactl Table of contents kafkactl Table of contents Overview Build Development Overview kafkactl is a CLI tool to interact with Kafka through the Co

Nov 1, 2021
Modern CLI for Apache Kafka, written in Go.
Modern CLI for Apache Kafka, written in Go.

Kaf Kafka CLI inspired by kubectl & docker Install Install from source: go get -u github.com/birdayz/kaf/cmd/kaf Install binary: curl https://raw.git

Dec 31, 2022
franz-go - A complete Apache Kafka client written in Go

franz-go contains a feature complete, pure Go library for interacting with Kafka from 0.8.0 through 2.8.0+. Producing, consuming, transacting, administrating, etc.

Dec 29, 2022
Study project that uses Apache Kafka as syncing mechanism between two databases, with producers and consumers written in Go.

Kafka DB Sync Study project that uses Apache Kafka as syncing mechanisms between a monolith DB and a microservice. The main purpose of this project is

Dec 5, 2021
Confluent's Apache Kafka Golang client

Confluent's Golang Client for Apache KafkaTM confluent-kafka-go is Confluent's Golang client for Apache Kafka and the Confluent Platform. Features: Hi

Dec 30, 2022
Sarama is a Go library for Apache Kafka 0.8, and up.

sarama Sarama is an MIT-licensed Go client library for Apache Kafka version 0.8 (and later). Getting started API documentation and examples are availa

Jan 1, 2023
Implementation of the NELI leader election protocol for Go and Kafka
Implementation of the NELI leader election protocol for Go and Kafka

goNELI Implementation of the NELI leader election protocol for Go and Kafka. goNELI encapsulates the 'fast' variation of the protocol, running in excl

Dec 8, 2022
ChanBroker, a Broker for goroutine, is simliar to kafka

Introduction chanbroker, a Broker for goroutine, is simliar to kafka In chanbroker has three types of goroutine: Producer Consumer(Subscriber) Broker

Aug 12, 2021
Apache Kafka Web UI for exploring messages, consumers, configurations and more with a focus on a good UI & UX.
Apache Kafka Web UI for exploring messages, consumers, configurations and more with a focus on a good UI & UX.

Kowl - Apache Kafka Web UI Kowl (previously known as Kafka Owl) is a web application that helps you to explore messages in your Apache Kafka cluster a

Jan 3, 2023
franz-go contains a high performance, pure Go library for interacting with Kafka from 0.8.0 through 2.7.0+. Producing, consuming, transacting, administrating, etc.

franz-go - Apache Kafka client written in Go Franz-go is an all-encompassing Apache Kafka client fully written Go. This library aims to provide every

Dec 29, 2022