Declarative streaming ETL for mundane tasks, written in Go

Benthos

godoc for Jeffail/benthos goreportcard for Jeffail/benthos Build Status

Benthos is a high performance and resilient stream processor, able to connect various sources and sinks in a range of brokering patterns and perform hydration, enrichments, transformations and filters on payloads.

It comes with a powerful mapping language, is easy to deploy and monitor, and ready to drop into your pipeline either as a static binary, docker image, or serverless function, making it cloud native as heck.

Benthos is fully declarative, with stream pipelines defined in a single config file, allowing you to specify connectors and a list of processing stages:

input:
  gcp_pubsub:
    project: foo
    subscription: bar

pipeline:
  processors:
    - bloblang: |
        root.message = this
        root.meta.link_count = this.links.length()
        root.user.age = this.user.age.number()

output:
  redis_streams:
    url: tcp://TODO:6379
    stream: baz
    max_in_flight: 20

Delivery Guarantees

Yep, we got 'em. Benthos implements transaction based resiliency with back pressure. When connecting to at-least-once sources and sinks it guarantees at-least-once delivery without needing to persist messages during transit.

Supported Sources & Sinks

AWS (DynamoDB, Kinesis, S3, SQS, SNS), Azure (Blob storage, Queue storage, Table storage), Cassandra, Elasticsearch, File, GCP (pub/sub), HDFS, HTTP (server and client, including websockets), Kafka, Memcached, MQTT, Nanomsg, NATS, NATS Streaming, NSQ, AMQP 0.91 (RabbitMQ), AMQP 1, Redis (streams, list, pubsub, hashes), SQL (MySQL, PostgreSQL, Clickhouse), Stdin/Stdout, TCP & UDP, sockets and ZMQ4.

Connectors are being added constantly, if something you want is missing then open an issue.

Documentation

If you want to dive fully into Benthos then don't waste your time in this dump, check out the documentation site.

For guidance on how to configure more advanced stream processing concepts such as stream joins, enrichment workflows, etc, check out the cookbooks section.

For guidance on building your own custom plugins check out this example repo.

Install

Grab a binary for your OS from here. Or use this script:

curl -Lsf https://sh.benthos.dev | bash

Or pull the docker image:

docker pull jeffail/benthos

Benthos can also be installed via Homebrew:

brew install benthos

For more information check out the getting started guide.

Run

benthos -c ./config.yaml

Or, with docker:

# Send HTTP /POST data to Kafka:
docker run --rm \
	-e "INPUT_TYPE=http_server" \
	-e "OUTPUT_TYPE=kafka" \
	-e "OUTPUT_KAFKA_ADDRESSES=kafka-server:9092" \
	-e "OUTPUT_KAFKA_TOPIC=benthos_topic" \
	-p 4195:4195 \
	jeffail/benthos

# Using your own config file:
docker run --rm -v /path/to/your/config.yaml:/benthos.yaml jeffail/benthos

Monitoring

Health Checks

Benthos serves two HTTP endpoints for health checks:

  • /ping can be used as a liveness probe as it always returns a 200.
  • /ready can be used as a readiness probe as it serves a 200 only when both the input and output are connected, otherwise a 503 is returned.

Metrics

Benthos exposes lots of metrics either to Statsd, Prometheus or for debugging purposes an HTTP endpoint that returns a JSON formatted object. The target can be specified via config.

Tracing

Benthos also emits opentracing events to a tracer of your choice (currently only Jaeger is supported) which can be used to visualise the processors within a pipeline.

Configuration

Benthos provides lots of tools for making configuration discovery, debugging and organisation easy. You can read about them here.

Environment Variables

It is possible to select fields inside a configuration file to be set via environment variables. The docker image, for example, is built with a config file where all common fields can be set this way.

Build

Build with Go (1.15 or later):

git clone [email protected]:Jeffail/benthos
cd benthos
make

Lint

Benthos uses golangci-lint for linting, which you can install with:

curl -sSfL https://raw.githubusercontent.com/golangci/golangci-lint/master/install.sh | sh -s -- -b $(go env GOPATH)/bin v1.35.2

And then run it with make lint.

Plugins

It's pretty easy to write your own custom plugins for Benthos, take a look at this repo for examples and build instructions.

Docker Builds

There's a multi-stage Dockerfile for creating a Benthos docker image which results in a minimal image from scratch. You can build it with:

make docker

Then use the image:

docker run --rm \
	-v /path/to/your/benthos.yaml:/config.yaml \
	-v /tmp/data:/data \
	-p 4195:4195 \
	benthos -c /config.yaml

There are a few examples here that show you some ways of setting up Benthos containers using docker-compose.

ZMQ4 Support

Benthos supports ZMQ4 for both data input and output. To add this you need to install libzmq4 and use the compile time flag when building Benthos:

make TAGS=ZMQ4

Or to build a docker image using CGO, which includes ZMQ:

make docker-cgo

Contributing

Contributions are welcome, please read the guidelines, come and chat (links are on the community page), and watch your back.

Owner
Ashley Jeffs
If you want to get in touch please find me in person I'm not good with computers.
Ashley Jeffs
Comments
  • Roadmap planning - USE CASES WANTED

    Roadmap planning - USE CASES WANTED

    TLDR: Please share your use cases here or send them privately to [email protected], we will use these to help steer the roadmap towards Benthos version 4.

    Hey everyone,

    Benthos has been at major version 3 for over a year now, and I consider it to be a pretty cool achievement that given all the dope features added we've managed to keep both the Benthos config spec and APIs fully backwards compatible.

    However, eventually it would be nice to cut a new major release and prune all of the dead weight that has accumulated during this time. In order to do that I want to be sure that we've fully thought through any potential breaking changes that could be bundled along with it, and that requires a roadmap.

    Therefore I'm hoping to hear from you on how you use Benthos, what works well for you and what doesn't, and any features you'd hope to see eventually that would make your lives easier.

    Some features that might come into play are:

    • Improved plugin APIs (need more use cases to help shape this)
    • HTTP poll APIs for the dynamic components as well as streams mode, allowing Benthos to pull configs
    • Configuration file reloading
    • Expanded logging, metrics and tracing options

    Please let us know how you use (or intend to use) Benthos, either by posting here or getting in touch privately at [email protected]. You can also find us on the discord channel at https://discord.gg/6VaWjzP, more chat options can be found on the community page.

    Once planning starts I'm going to be looking into organising public roadmap planning sessions.

  • Tell me what annoys you

    Tell me what annoys you

    If you're using Benthos and find something about it frustrating, but not worthy of raising an issue, please give me a brief description here. It could be anything: features, documentation, performance, me.

    Also, if the thing that annoys you is already posted then give it a thumbs up.

  • RFC: Benthos Lambda changes

    RFC: Benthos Lambda changes

    I've been tinkering with the Lambda binary and had a few things I wanted to run by folks and get thoughts on. I'm happy to split this into multiple issues for multiple threads if we need it. To demo things, I've put together https://github.com/kconwayatlassian/benthos-ext that implements each of the ideas below:

    • Ability to both produce to one or more outputs in addition to returning the processed message. This enables us to use any combination of outputs and brokers but still return the message as the Lambda response.

    • Offering the Lambda function as a code export instead of only a binary for custom builds outside of Benthos and code reuse in things that adapt Lambda to other runtimes.

    • Adding some test coverage of the Lambda functionality.

    I'm looking for feedback on 1) if any of the ideas have value beyond my team that would justify having them in Benthos and 2) feedback on whether the demo code would be acceptable as a Benthos contribution or how I'd need to refactor in order to meet the project standards/style.

  • Output `gcp_pubsub` attributes

    Output `gcp_pubsub` attributes

    Is it possible to promote properties of a JSON input to attributes in gcp_pubsub?

    https://cloud.google.com/pubsub/docs/reference/rest/v1/PubsubMessage

    I would like to promote things like event timestamps to attributes.time.

    It looks like the gcp_pubsub output puts the data into Message Body on the GCP side.

    Screen Shot 2020-02-14 at 11 10 44 AM

    We are using the attributes to sort the data in Cloud Dataflow:

    read = PubsubIO
        .readMessagesWithAttributes()
        .withTimestampAttribute("time")
        .fromTopic(defaultOptions.getTopic());
    

    EDIT: In the pubsub GO lib the attributes would go in the attributes field of the message struct here: https://github.com/googleapis/google-cloud-go/blob/pubsub/v1.2.0/pubsub/message.go#L36

    You could likely pull off fields from the JSON object using jmespath and store them in an additional input to the gcp_pubsub output.

    Likely this would require code updates. But I wanted to confirm it was not possible via some other means and whether the parsing of attributes is possible.

    Thanks!

  • AMQP 1.0 (beta): amqp:link:detach-forced

    AMQP 1.0 (beta): amqp:link:detach-forced

    Using Service Bus from Azure (which uses AMQP 1.0)

    Messages are received correctly, but after I while I get this message:

    Failed to read message: link detached, reason: *Error{Condition: amqp:link:detach-forced, Description: The link 'G19:151274741:MB9xjRjuCfMcXqGvOKbKyGeG9fJypgtW-sjO4ErGrubNtxbFpGcCxx' is force detached. Code: consumer(link11331556). Details: InnerMessageReceiver was closed. TrackingId:e33664c70000930b00ace7e45f28340d_G19_B11, SystemTracker:queue-name-dev:Queue:queue-name, Timestamp:2020-08-03T16:01:10, Info: map[]}
    

    I will analyze it more tomorrow and update the ticket if I find anything helpful

    My config:

    input:
      broker:
        inputs:
        - amqp_1:
            url: "myqueueurl"
            source_address: "my-queue-name"
        batching:
          count: 180
          period: "50s"
    // processors etc.....
    
  • Can't output from kinesis-balanced or kinesis

    Can't output from kinesis-balanced or kinesis

    I have the following config and benthos 3.11.0:

    input: kinesis_balanced: stream: what-a-great-stream start_from_oldest: false dynamodb_table: table-for-benthos dynamodb_billing_mode: PAY_PER_REQUEST region: us-east-1 credentials: profile: what-a-great-profile output: file: path: outfile.txt delimiter: ""

    Using the same AWS profile, on us-east-1, I'm able to put records on this stream, get records from it, and when I scan dynamodb table-for-benthos I see leases are being updated by benthos as long as it's running. The file, outfile.txt is getting created, and if I swap the input to stdin it works.

    I'm not getting any errors, and I'm not getting any output from kinesis.

    I'm so close to living the dream, I can taste it.

  • FEEDBACK WANTED: Considering using Go 1.18 for the next release

    FEEDBACK WANTED: Considering using Go 1.18 for the next release

    Hey everyone it's been a while since we last updated the minimum Go version for Benthos and I'm considering updating to 1.18. The obvious immediate benefit would be to use generics in our own packages but it would also mean being able to use libraries that themselves use generics without having annoying build toggles.

    Like before I'm pinning this issue, if setting the minimum version to 1.18 would be an issue for you or your team then let me know. None of this is blocking us from implementing anything critical so I'd rather delay than cause problems downstream.

  • Add MSSQL driver

    Add MSSQL driver

    Fixes #797.

    I tested the output like so (might be worth adding some integration tests for this too, since only Postgres is covered under lib/test/integration:

    localhost> docker run --rm -it -e ACCEPT_EULA=Y -e SA_PASSWORD=ins4n3lyStrongP4ssword -p1433:1433 microsoft/mssql-server-linux
    
    container> /opt/mssql-tools/bin/sqlcmd -l 30 -S localhost -h-1 -V1 -U sa -P ins4n3lyStrongP4ssword -Q "create table footable(data varchar(50) not null);"
    
    input:
      stdin:
        codec: lines
    output:
      sql:
        driver: mssql
        data_source_name: "sqlserver://sa:ins4n3lyStrongP4ssword@localhost:1433?database=master"
        query: "insert into footable(data) values(?);"
        args:
          - ${! json("data") }
    
    localhost> ./target/bin/benthos -c sql.yaml
    {"data":"foo"}
    
    container> /opt/mssql-tools/bin/sqlcmd -l 30 -S localhost -h-1 -V1 -U sa -P ins4n3lyStrongP4ssword -Q "select * from footable;"
    foo
    
    (1 rows affected)
    
  • adding syslog processor

    adding syslog processor

    Hey, @Jeffail , I have a little something for benthos.

    I used github.com/influxdata/go-syslog to parse a message and I decided to implement just rfc5424 support. Shouldn't be a real problem to include another rfc under processor's wing but it is quite good for a starter. I am diabolically sad that rfc3164 (one of the popular) can't be parsed because the library hasn't implemented it yet, but then again I can't hold a grudge for longer than a few minutes, so..

    I think, there is no need to provide functionality via config letting users not to create some of final JSON structure (not all of us need those field and those might be better off in the metadata), but every use case is unique and it's not fair to throw everyone in the same barrel, especially if it's limited space inside. So, overall, it should be easy to unset some of those JSON fields afterwards.

    I also used bestEffort option which implies processor won't crash during the parsing if there would be some problem in an original message as long as it is bound, in a certain degree, to rfc.

  • Kafka Input only gets the topic messages, does not get keys

    Kafka Input only gets the topic messages, does not get keys

    Currently the input of type Kafka will only get the message from a given topic, is there any plan to enable also getting the keys?

    My use case is very simply that I need to back up the _schemas topic, but then in order to restore I need to be able to produce those messages with the same key they were originally produced with.

    Would you accept a PR for this?

  • How to execute sql statement in a loop?

    How to execute sql statement in a loop?

    I want to loop insert data from array into database. just like this:

    input:
      generate:
        mapping: |
          root = {"sqldata":["delete from b.tax where month_of_salary='202208'","delete from a.tax where month_of_salary='202208'"]}
        interval: 0s
        count: 1
    pipeline:
      processors:
        - while:
            at_least_once: false
            max_loops: 0
            check: this.sqldata.length() > 0
            processors:
              - sql_raw:
                  driver: clickhouse
                  dsn: clickhouse://[username[:password]@][netloc][:port]/dbname[?param1=value1&...&paramN=valueN]
                  query: this.sqldata[i]
              
    output:
      drop: {}
    

    the processor of while is seem not appropriate and i can't get index of array.How to loop through array and execute sql statement?

  • Set the default value of conn_max_idle for sql_* components to 2

    Set the default value of conn_max_idle for sql_* components to 2

    Currently, this is implicitly set to 0, which deviates from the default value (2) that is specified in the standard library docs here.

    Guess some people who are currently using conn_max_idle: 0 implicitly might bump into performance issues, not sure...

  • Add tracing to franz-go

    Add tracing to franz-go

    Hello,

    We have developed a plugin[1] for franz-go that adds tracing to Kafka records and exports them to OpenTelemetry. This plugin allows tracing context propagation from upstream input into Kafka records and from Kafka records to downstream output.

    We are interested in knowing if the Benthos team would like to incorporate this capability into the project. We can contribute to this feature and would be happy to work with the team. Please let us know your thoughts on this.

    [1] https://github.com/twmb/franz-go/pull/294

  • Allow ExtJSON in mongo queries

    Allow ExtJSON in mongo queries

    We now purposefully unmarshal the Mongo query via the bson library. This adds implicit support for ObjectIds ($oid), Timestamps ($ts), Regexes ($regex), etc. in queries.

    This may offer a workaround for #1516 as well (from what I've understood).

    First golang contribution, feel free to nitpick ๐Ÿ˜…

  • Batching/Kinesis output doesn't guarantee ordering

    Batching/Kinesis output doesn't guarantee ordering

    I was looking at using benthos to replace a custom rolled solution, however the use-case requires strict ordering (within a hash key). Therefore I was looking at how benthos handles that on the Kinesis output.

    Due to the indiscriminate usage of PutRecords with (possibly) equal ParitionKey's, it's possible for some in-between records to fail, be retried and be appended at the end of a batch rather than their rightful position. Not sure if I explained this well, but the first half of this article describes the issue (and solution) in more detail.

    From what I've seen, it'd require doing the groupBy around here and possibly do multiple batches in parallel.

  • Oracle SQL driver doesn't return timestamps correctly

    Oracle SQL driver doesn't return timestamps correctly

    This issue is meant to track https://github.com/sijms/go-ora/issues/287 which has all the details. In essence, one can't currently SELECT columns of type TIMESTAMP directly, since they won't be deserialised correctly in sqlRowToMap and they'll end up containing an empty object ({}) instead of the actual timestamp value. I guess a workaround is to call the TO_CHAR() function on the TIMESTAMP column in the SELECT statement until this is addressed somehow.

  • GCP Pubsub Output - Ability to set options to the NewClient for pubsub publish

    GCP Pubsub Output - Ability to set options to the NewClient for pubsub publish

    Can we provide ability to add ClientOptions when building NewClient in output_pubsub flow? I am mainly looking at the option of setting the endpoint to a region specific pubsub endpoint. If providing the ability to some of those supported options is not recommended or going to be a breaking change, can we provide option to add specific ones as config like endpoint etc.. ?

    File that needs to be changed: https://github.com/benthosdev/benthos/blob/48c7bb7a0718bdcef453cc156f60f2ffafb48730/internal/impl/gcp/output_pubsub.go#L101

    Google Pubsub library doc related to Client Option: https://pkg.go.dev/cloud.google.com/go/pubsub#NewClient https://pkg.go.dev/google.golang.org/api/option#ClientOption

churro is a cloud-native Extract-Transform-Load (ETL) application designed to build, scale, and manage data pipeline applications.

Churro - ETL for Kubernetes churro is a cloud-native Extract-Transform-Load (ETL) application designed to build, scale, and manage data pipeline appli

Mar 10, 2022
xyr is a very lightweight, simple and powerful data ETL platform that helps you to query available data sources using SQL.

xyr [WIP] xyr is a very lightweight, simple and powerful data ETL platform that helps you to query available data sources using SQL. Supported Drivers

Dec 2, 2022
CUE is an open source data constraint language which aims to simplify tasks involving defining and using data.

CUE is an open source data constraint language which aims to simplify tasks involving defining and using data.

Jan 1, 2023
Fast, efficient, and scalable distributed map/reduce system, DAG execution, in memory or on disk, written in pure Go, runs standalone or distributedly.

Gleam Gleam is a high performance and efficient distributed execution system, and also simple, generic, flexible and easy to customize. Gleam is built

Jan 5, 2023
A clean, safe, user-friendly implementation of GraphQL's Dataloader, written with generics in go

go-dataloader A clean, safe, user-friendly implementation of GraphQL's Dataloader, written with generics in go (go1.18beta1). Features written in gene

Dec 30, 2022
Declarative streaming ETL for mundane tasks, written in Go
Declarative streaming ETL for mundane tasks, written in Go

Benthos is a high performance and resilient stream processor, able to connect various sources and sinks in a range of brokering patterns and perform hydration, enrichments, transformations and filters on payloads.

Dec 29, 2022
Tnbassist - A CLI tool for thenewboston blockchain to perform various mundane tasks like taking daily accounts backup

TNB Assist is a CLI (Command Line Interface) tool for thenewboston blockchain to perform various mundane tasks like taking daily accounts backup, computing statistics, etc easier.

Feb 14, 2022
A library for performing data pipeline / ETL tasks in Go.
A library for performing data pipeline / ETL tasks in Go.

Ratchet A library for performing data pipeline / ETL tasks in Go. The Go programming language's simplicity, execution speed, and concurrency support m

Jan 19, 2022
omniparser: a native Golang ETL streaming parser and transform library for CSV, JSON, XML, EDI, text, etc.
omniparser: a native Golang ETL streaming parser and transform library for CSV, JSON, XML, EDI, text, etc.

omniparser Omniparser is a native Golang ETL parser that ingests input data of various formats (CSV, txt, fixed length/width, XML, EDI/X12/EDIFACT, JS

Jan 4, 2023
Package tasks is an easy to use in-process scheduler for recurring tasks in Go

Tasks Package tasks is an easy to use in-process scheduler for recurring tasks in Go. Tasks is focused on high frequency tasks that run quick, and oft

Dec 18, 2022
Delay-tasks - A delayed tasks implementation for golang
Delay-tasks - A delayed tasks implementation for golang

delay-tasks An implementation of delayed tasks. Usage $ git clone https://github

Jan 14, 2022
Server and client implementation of the grpc go libraries to perform unary, client streaming, server streaming and full duplex RPCs from gRPC go introduction

Description This is an implementation of a gRPC client and server that provides route guidance from gRPC Basics: Go tutorial. It demonstrates how to u

Nov 24, 2021
churro is a cloud-native Extract-Transform-Load (ETL) application designed to build, scale, and manage data pipeline applications.

Churro - ETL for Kubernetes churro is a cloud-native Extract-Transform-Load (ETL) application designed to build, scale, and manage data pipeline appli

Mar 10, 2022
xyr is a very lightweight, simple and powerful data ETL platform that helps you to query available data sources using SQL.

xyr [WIP] xyr is a very lightweight, simple and powerful data ETL platform that helps you to query available data sources using SQL. Supported Drivers

Dec 2, 2022
Declare AMQP entities like queues, producers, and consumers in a declarative way. Can be used to work with RabbitMQ.

About This package provides an ability to encapsulate creation and configuration of RabbitMQ([AMQP])(https://www.amqp.org) entities like queues, excha

Dec 28, 2022
A declarative struct-tag-based HTML unmarshaling or scraping package for Go built on top of the goquery library

goq Example import ( "log" "net/http" "astuart.co/goq" ) // Structured representation for github file name table type example struct { Title str

Dec 12, 2022
Declarative penetration testing orchestration framework

Decker - Penetration Testing Orchestration Framework Purpose Decker is a penetration testing orchestration framework. It leverages HashiCorp Configura

Nov 10, 2022
Continuous Delivery for Declarative Kubernetes, Serverless and Infrastructure Applications
Continuous Delivery for Declarative Kubernetes, Serverless and Infrastructure Applications

Continuous Delivery for Declarative Kubernetes, Serverless and Infrastructure Applications Explore PipeCD docs ยป Overview PipeCD provides a unified co

Jan 3, 2023