Declarative streaming ETL for mundane tasks, written in Go

Benthos

godoc for Jeffail/benthos goreportcard for Jeffail/benthos Build Status

Benthos is a high performance and resilient stream processor, able to connect various sources and sinks in a range of brokering patterns and perform hydration, enrichments, transformations and filters on payloads.

It comes with a powerful mapping language, is easy to deploy and monitor, and ready to drop into your pipeline either as a static binary, docker image, or serverless function, making it cloud native as heck.

Benthos is fully declarative, with stream pipelines defined in a single config file, allowing you to specify connectors and a list of processing stages:

input:
  gcp_pubsub:
    project: foo
    subscription: bar

pipeline:
  processors:
    - bloblang: |
        root.message = this
        root.meta.link_count = this.links.length()
        root.user.age = this.user.age.number()

output:
  redis_streams:
    url: tcp://TODO:6379
    stream: baz
    max_in_flight: 20

Delivery Guarantees

Yep, we got 'em. Benthos implements transaction based resiliency with back pressure. When connecting to at-least-once sources and sinks it guarantees at-least-once delivery without needing to persist messages during transit.

Supported Sources & Sinks

AWS (DynamoDB, Kinesis, S3, SQS, SNS), Azure (Blob storage, Queue storage, Table storage), Cassandra, Elasticsearch, File, GCP (pub/sub), HDFS, HTTP (server and client, including websockets), Kafka, Memcached, MQTT, Nanomsg, NATS, NATS Streaming, NSQ, AMQP 0.91 (RabbitMQ), AMQP 1, Redis (streams, list, pubsub, hashes), SQL (MySQL, PostgreSQL, Clickhouse), Stdin/Stdout, TCP & UDP, sockets and ZMQ4.

Connectors are being added constantly, if something you want is missing then open an issue.

Documentation

If you want to dive fully into Benthos then don't waste your time in this dump, check out the documentation site.

For guidance on how to configure more advanced stream processing concepts such as stream joins, enrichment workflows, etc, check out the cookbooks section.

For guidance on building your own custom plugins check out this example repo.

Install

Grab a binary for your OS from here. Or use this script:

curl -Lsf https://sh.benthos.dev | bash

Or pull the docker image:

docker pull jeffail/benthos

Benthos can also be installed via Homebrew:

brew install benthos

For more information check out the getting started guide.

Run

benthos -c ./config.yaml

Or, with docker:

# Send HTTP /POST data to Kafka:
docker run --rm \
	-e "INPUT_TYPE=http_server" \
	-e "OUTPUT_TYPE=kafka" \
	-e "OUTPUT_KAFKA_ADDRESSES=kafka-server:9092" \
	-e "OUTPUT_KAFKA_TOPIC=benthos_topic" \
	-p 4195:4195 \
	jeffail/benthos

# Using your own config file:
docker run --rm -v /path/to/your/config.yaml:/benthos.yaml jeffail/benthos

Monitoring

Health Checks

Benthos serves two HTTP endpoints for health checks:

  • /ping can be used as a liveness probe as it always returns a 200.
  • /ready can be used as a readiness probe as it serves a 200 only when both the input and output are connected, otherwise a 503 is returned.

Metrics

Benthos exposes lots of metrics either to Statsd, Prometheus or for debugging purposes an HTTP endpoint that returns a JSON formatted object. The target can be specified via config.

Tracing

Benthos also emits opentracing events to a tracer of your choice (currently only Jaeger is supported) which can be used to visualise the processors within a pipeline.

Configuration

Benthos provides lots of tools for making configuration discovery, debugging and organisation easy. You can read about them here.

Environment Variables

It is possible to select fields inside a configuration file to be set via environment variables. The docker image, for example, is built with a config file where all common fields can be set this way.

Build

Build with Go (1.15 or later):

git clone [email protected]:Jeffail/benthos
cd benthos
make

Lint

Benthos uses golangci-lint for linting, which you can install with:

curl -sSfL https://raw.githubusercontent.com/golangci/golangci-lint/master/install.sh | sh -s -- -b $(go env GOPATH)/bin v1.35.2

And then run it with make lint.

Plugins

It's pretty easy to write your own custom plugins for Benthos, take a look at this repo for examples and build instructions.

Docker Builds

There's a multi-stage Dockerfile for creating a Benthos docker image which results in a minimal image from scratch. You can build it with:

make docker

Then use the image:

docker run --rm \
	-v /path/to/your/benthos.yaml:/config.yaml \
	-v /tmp/data:/data \
	-p 4195:4195 \
	benthos -c /config.yaml

There are a few examples here that show you some ways of setting up Benthos containers using docker-compose.

ZMQ4 Support

Benthos supports ZMQ4 for both data input and output. To add this you need to install libzmq4 and use the compile time flag when building Benthos:

make TAGS=ZMQ4

Or to build a docker image using CGO, which includes ZMQ:

make docker-cgo

Contributing

Contributions are welcome, please read the guidelines, come and chat (links are on the community page), and watch your back.

Owner
Ashley Jeffs
If you want to get in touch please find me in person I'm not good with computers.
Ashley Jeffs
Comments
  • Roadmap planning - USE CASES WANTED

    Roadmap planning - USE CASES WANTED

    TLDR: Please share your use cases here or send them privately to [email protected], we will use these to help steer the roadmap towards Benthos version 4.

    Hey everyone,

    Benthos has been at major version 3 for over a year now, and I consider it to be a pretty cool achievement that given all the dope features added we've managed to keep both the Benthos config spec and APIs fully backwards compatible.

    However, eventually it would be nice to cut a new major release and prune all of the dead weight that has accumulated during this time. In order to do that I want to be sure that we've fully thought through any potential breaking changes that could be bundled along with it, and that requires a roadmap.

    Therefore I'm hoping to hear from you on how you use Benthos, what works well for you and what doesn't, and any features you'd hope to see eventually that would make your lives easier.

    Some features that might come into play are:

    • Improved plugin APIs (need more use cases to help shape this)
    • HTTP poll APIs for the dynamic components as well as streams mode, allowing Benthos to pull configs
    • Configuration file reloading
    • Expanded logging, metrics and tracing options

    Please let us know how you use (or intend to use) Benthos, either by posting here or getting in touch privately at [email protected]. You can also find us on the discord channel at https://discord.gg/6VaWjzP, more chat options can be found on the community page.

    Once planning starts I'm going to be looking into organising public roadmap planning sessions.

  • Tell me what annoys you

    Tell me what annoys you

    If you're using Benthos and find something about it frustrating, but not worthy of raising an issue, please give me a brief description here. It could be anything: features, documentation, performance, me.

    Also, if the thing that annoys you is already posted then give it a thumbs up.

  • RFC: Benthos Lambda changes

    RFC: Benthos Lambda changes

    I've been tinkering with the Lambda binary and had a few things I wanted to run by folks and get thoughts on. I'm happy to split this into multiple issues for multiple threads if we need it. To demo things, I've put together https://github.com/kconwayatlassian/benthos-ext that implements each of the ideas below:

    • Ability to both produce to one or more outputs in addition to returning the processed message. This enables us to use any combination of outputs and brokers but still return the message as the Lambda response.

    • Offering the Lambda function as a code export instead of only a binary for custom builds outside of Benthos and code reuse in things that adapt Lambda to other runtimes.

    • Adding some test coverage of the Lambda functionality.

    I'm looking for feedback on 1) if any of the ideas have value beyond my team that would justify having them in Benthos and 2) feedback on whether the demo code would be acceptable as a Benthos contribution or how I'd need to refactor in order to meet the project standards/style.

  • Output `gcp_pubsub` attributes

    Output `gcp_pubsub` attributes

    Is it possible to promote properties of a JSON input to attributes in gcp_pubsub?

    https://cloud.google.com/pubsub/docs/reference/rest/v1/PubsubMessage

    I would like to promote things like event timestamps to attributes.time.

    It looks like the gcp_pubsub output puts the data into Message Body on the GCP side.

    Screen Shot 2020-02-14 at 11 10 44 AM

    We are using the attributes to sort the data in Cloud Dataflow:

    read = PubsubIO
        .readMessagesWithAttributes()
        .withTimestampAttribute("time")
        .fromTopic(defaultOptions.getTopic());
    

    EDIT: In the pubsub GO lib the attributes would go in the attributes field of the message struct here: https://github.com/googleapis/google-cloud-go/blob/pubsub/v1.2.0/pubsub/message.go#L36

    You could likely pull off fields from the JSON object using jmespath and store them in an additional input to the gcp_pubsub output.

    Likely this would require code updates. But I wanted to confirm it was not possible via some other means and whether the parsing of attributes is possible.

    Thanks!

  • AMQP 1.0 (beta): amqp:link:detach-forced

    AMQP 1.0 (beta): amqp:link:detach-forced

    Using Service Bus from Azure (which uses AMQP 1.0)

    Messages are received correctly, but after I while I get this message:

    Failed to read message: link detached, reason: *Error{Condition: amqp:link:detach-forced, Description: The link 'G19:151274741:MB9xjRjuCfMcXqGvOKbKyGeG9fJypgtW-sjO4ErGrubNtxbFpGcCxx' is force detached. Code: consumer(link11331556). Details: InnerMessageReceiver was closed. TrackingId:e33664c70000930b00ace7e45f28340d_G19_B11, SystemTracker:queue-name-dev:Queue:queue-name, Timestamp:2020-08-03T16:01:10, Info: map[]}
    

    I will analyze it more tomorrow and update the ticket if I find anything helpful

    My config:

    input:
      broker:
        inputs:
        - amqp_1:
            url: "myqueueurl"
            source_address: "my-queue-name"
        batching:
          count: 180
          period: "50s"
    // processors etc.....
    
  • Can't output from kinesis-balanced or kinesis

    Can't output from kinesis-balanced or kinesis

    I have the following config and benthos 3.11.0:

    input: kinesis_balanced: stream: what-a-great-stream start_from_oldest: false dynamodb_table: table-for-benthos dynamodb_billing_mode: PAY_PER_REQUEST region: us-east-1 credentials: profile: what-a-great-profile output: file: path: outfile.txt delimiter: ""

    Using the same AWS profile, on us-east-1, I'm able to put records on this stream, get records from it, and when I scan dynamodb table-for-benthos I see leases are being updated by benthos as long as it's running. The file, outfile.txt is getting created, and if I swap the input to stdin it works.

    I'm not getting any errors, and I'm not getting any output from kinesis.

    I'm so close to living the dream, I can taste it.

  • FEEDBACK WANTED: Considering using Go 1.18 for the next release

    FEEDBACK WANTED: Considering using Go 1.18 for the next release

    Hey everyone it's been a while since we last updated the minimum Go version for Benthos and I'm considering updating to 1.18. The obvious immediate benefit would be to use generics in our own packages but it would also mean being able to use libraries that themselves use generics without having annoying build toggles.

    Like before I'm pinning this issue, if setting the minimum version to 1.18 would be an issue for you or your team then let me know. None of this is blocking us from implementing anything critical so I'd rather delay than cause problems downstream.

  • Add MSSQL driver

    Add MSSQL driver

    Fixes #797.

    I tested the output like so (might be worth adding some integration tests for this too, since only Postgres is covered under lib/test/integration:

    localhost> docker run --rm -it -e ACCEPT_EULA=Y -e SA_PASSWORD=ins4n3lyStrongP4ssword -p1433:1433 microsoft/mssql-server-linux
    
    container> /opt/mssql-tools/bin/sqlcmd -l 30 -S localhost -h-1 -V1 -U sa -P ins4n3lyStrongP4ssword -Q "create table footable(data varchar(50) not null);"
    
    input:
      stdin:
        codec: lines
    output:
      sql:
        driver: mssql
        data_source_name: "sqlserver://sa:ins4n3lyStrongP4ssword@localhost:1433?database=master"
        query: "insert into footable(data) values(?);"
        args:
          - ${! json("data") }
    
    localhost> ./target/bin/benthos -c sql.yaml
    {"data":"foo"}
    
    container> /opt/mssql-tools/bin/sqlcmd -l 30 -S localhost -h-1 -V1 -U sa -P ins4n3lyStrongP4ssword -Q "select * from footable;"
    foo
    
    (1 rows affected)
    
  • adding syslog processor

    adding syslog processor

    Hey, @Jeffail , I have a little something for benthos.

    I used github.com/influxdata/go-syslog to parse a message and I decided to implement just rfc5424 support. Shouldn't be a real problem to include another rfc under processor's wing but it is quite good for a starter. I am diabolically sad that rfc3164 (one of the popular) can't be parsed because the library hasn't implemented it yet, but then again I can't hold a grudge for longer than a few minutes, so..

    I think, there is no need to provide functionality via config letting users not to create some of final JSON structure (not all of us need those field and those might be better off in the metadata), but every use case is unique and it's not fair to throw everyone in the same barrel, especially if it's limited space inside. So, overall, it should be easy to unset some of those JSON fields afterwards.

    I also used bestEffort option which implies processor won't crash during the parsing if there would be some problem in an original message as long as it is bound, in a certain degree, to rfc.

  • Kafka Input only gets the topic messages, does not get keys

    Kafka Input only gets the topic messages, does not get keys

    Currently the input of type Kafka will only get the message from a given topic, is there any plan to enable also getting the keys?

    My use case is very simply that I need to back up the _schemas topic, but then in order to restore I need to be able to produce those messages with the same key they were originally produced with.

    Would you accept a PR for this?

  • How to execute sql statement in a loop?

    How to execute sql statement in a loop?

    I want to loop insert data from array into database. just like this:

    input:
      generate:
        mapping: |
          root = {"sqldata":["delete from b.tax where month_of_salary='202208'","delete from a.tax where month_of_salary='202208'"]}
        interval: 0s
        count: 1
    pipeline:
      processors:
        - while:
            at_least_once: false
            max_loops: 0
            check: this.sqldata.length() > 0
            processors:
              - sql_raw:
                  driver: clickhouse
                  dsn: clickhouse://[username[:password]@][netloc][:port]/dbname[?param1=value1&...&paramN=valueN]
                  query: this.sqldata[i]
              
    output:
      drop: {}
    

    the processor of while is seem not appropriate and i can't get index of array.How to loop through array and execute sql statement?

  • csv_parse: single line - cannot access elements of array

    csv_parse: single line - cannot access elements of array

    Hi, I'm having trouble parsing a single csv line with csv_parse , it parses correctly but I can't figure out how to access the elements. i'm using curl and the http_server to post the file to benthos. curl command

    curl -X POST --data-binary @./test2.csv http://127.0.0.1:4195/benthos/post
    

    test file test2.csv

    "line1";"1"
    "line2";"2"
    

    config.yaml

    input:
      label: "http_post"
      http_server:
        address: ""
        path: /post
        allowed_verbs:
          - POST
    buffer:
      none: {}
    pipeline:
      threads: 1
      processors:
        # - resource: "latin1_to_utf8"
        - resource: "split_by_linefeed"
        - resource: "parse_csv_lines"
        - resource: "map_csv_line"
    output:
      label: "to_stdout"
      stdout:
        codec: lines
    

    resources.yaml

    processor_resources:
      - label: "latin1_to_utf8"
        subprocess: 
          name: "/usr/bin/stdbuf" #requires recode (iconv alternative)
          args: ["-oL","/usr/bin/recode", "LATIN1..UTF-8"]
      - label: "parse_csv_lines"
        mutation: |
          let tmp = content().string().parse_csv(parse_header_row:false,delimiter:";",lazy_quotes:false)
          root.err = error() | deleted()
          root.arrayofarray = $tmp
          root.aofa_type = $tmp.type()
          root.array = $tmp.index(0)
          root.a_type = $tmp.index(0).type() #unknown
      - label: "map_csv_line"
        mapping: |
          root.field1 = this.array.index(0) #fails
      - label: "split_by_linefeed"
        unarchive:
          format: lines    
    

    benthos -r ./resources/resources.yaml -c config.yaml

    NFO Running main config from specified file       @service=benthos path=config.yaml
    INFO Subprocess started                            @service=benthos label=latin1_to_utf8 path=root.processor_resources
    INFO Launching a benthos instance, use CTRL+C to close  @service=benthos
    INFO Listening for HTTP requests at: http://0.0.0.0:4195  @service=benthos
    ERRO failed assignment (line 1): expected array value, got unknown from field `this.array`  @service=benthos label=map_csv_line path=root.processor_resources
    ERRO failed assignment (line 1): expected array value, got unknown from field `this.array`  @service=benthos label=map_csv_line path=root.processor_resources
    {"a_type":"unknown","aofa_type":"array","array":["line1","1"],"arrayofarray":[["line1","1"]]}
    {"a_type":"unknown","aofa_type":"array","array":["line2","2"],"arrayofarray":[["line2","2"]]}
    

    the output of parse_csv is an array, containing (what looks like) another array therefore I was expecting a_type to be an array as well ... but the type is unknown. How can I coerce/parse the string(?) as an array? - what I want to do is positionally map/accesss the elements of the array

    thanks!

  • How to get a substring in bloblang

    How to get a substring in bloblang

    Hi! How can I extract a substring using bloblang?

    Example: Given input "FooBar" Expected output "Bar"

    It is a trivial function that I could not find in the list of string manipulation methods. If it does not exist, is there any alternative?

  • OPCUA plugin to receive data from OPCUA server

    OPCUA plugin to receive data from OPCUA server

    Description:

    I am working on a project that requires data from an OPCUA server to be forwarded into Benthos. An OPCUA plugin for Benthos would be very helpful in achieving this.

    As OPCUA is still relevant in the industrial internet of things (IIoT), a plugin for Benthos would be useful for many users. OPCUA is a communication protocol designed to facilitate the interoperability of devices and systems from different vendors and to enable secure and reliable data exchange in real-time. It is widely adopted by industrial automation vendors and is suitable for use in complex and large-scale IIoT environments.

    Steps to reproduce:

    N/A

    Expected behavior:

    A plugin that allows Benthos to receive data from an OPCUA server.

    Actual behavior: There is currently no OPCUA plugin available for Benthos.

    Helpful library: https://github.com/gopcua/opcua

  • Add tracing to franz-go

    Add tracing to franz-go

    Hello,

    We have developed a plugin[1] for franz-go that adds tracing to Kafka records and exports them to OpenTelemetry. This plugin allows tracing context propagation from upstream input into Kafka records and from Kafka records to downstream output.

    We are interested in knowing if the Benthos team would like to incorporate this capability into the project. We can contribute to this feature and would be happy to work with the team. Please let us know your thoughts on this.

    [1] https://github.com/twmb/franz-go/pull/294

  • Allow ExtJSON in mongo queries

    Allow ExtJSON in mongo queries

    We now purposefully unmarshal the Mongo query via the bson library. This adds implicit support for ObjectIds ($oid), Timestamps ($ts), Regexes ($regex), etc. in queries.

    This may offer a workaround for #1516 as well (from what I've understood).

    First golang contribution, feel free to nitpick 😅

Streamhub: a toolkit crafted for streaming-powered applications written in Go

✉️ Streamhub Streamhub is a toolkit crafted for streaming-powered applications w

Jun 4, 2022
Kfchc - Kafka Connect (connectors / tasks) HealthCheck For AWS ALB and more

kfchc / Kafka Connect HealthCheck Kafka Connect (connectors / tasks) HealthCheck

Jan 1, 2022
Build event-driven and event streaming applications with ease

Commander ?? Commander is Go library for writing event-driven applications. Enabling event sourcing, RPC over messages, SAGA's, bidirectional streamin

Dec 19, 2022
nanoQ — high-performance brokerless Pub/Sub for streaming real-time data

nanoQ — high-performance brokerless Pub/Sub for streaming real-time data nanoQ is a very minimalistic (opinionated/limited) Pub/Sub transport library.

Nov 9, 2022
Simple, high-performance event streaming broker

Styx Styx is a simple and high-performance event streaming broker. It aims to provide teams of all sizes with a simple to operate, disk-persisted publ

Nov 24, 2022
replicate messages from streaming channel to jetstream

NATS Streaming/Jetstream Replicator [SJR] Introduction This project replicates messages from streaming channels to jetstream. but why? At Snapp when w

Dec 15, 2022
Service responsible for streaming Kafka messages.

kafka-stream ????‍♂️ Service responsible for streaming Kafka messages. What it does? This service reads all messages from the input topic and sends th

Oct 16, 2021
Мост между NATS streaming и MQ Series

Мост между NATS streaming и MQ Series Оригинальный репозиторий https://github.com/nats-io/nats-mq NATS-MQ Bridge This project implements a simple, but

Nov 26, 2021
Basic Event Streaming - Fundamentals of Kafka Studies (BESt-FunKS)

Apache Kafka My study repo for Apache Kafka. Based on this tutorial. Contents Overview Key Terms Event Topic Producer Consumer Partition Getting Start

Mar 2, 2022
An n:m message multiplexer written in Go
An n:m message multiplexer written in Go

What is Gollum? Gollum is an n:m multiplexer that gathers messages from different sources and broadcasts them to a set of destinations. Gollum origina

Dec 23, 2022
A push notification server written in Go (Golang).
A push notification server written in Go (Golang).

gorush A push notification micro server using Gin framework written in Go (Golang) and see the demo app. Contents gorush Contents Support Platform Fea

Jan 8, 2023
websocket based messaging server written in golang

Guble Messaging Server Guble is a simple user-facing messaging and data replication server written in Go. Overview Guble is in an early state (release

Oct 19, 2022
A user friendly RabbitMQ library written in Golang.

TurboCookedRabbit A user friendly RabbitMQ library written in Golang to help use streadway/amqp. Based on my work found at CookedRabbit. Work Recently

Jan 6, 2023
Modern CLI for Apache Kafka, written in Go.
Modern CLI for Apache Kafka, written in Go.

Kaf Kafka CLI inspired by kubectl & docker Install Install from source: go get -u github.com/birdayz/kaf/cmd/kaf Install binary: curl https://raw.git

Dec 31, 2022
A basic presence tracker, that sends messages when the provided user's presence changed, written with discordgo

presenceTracker A basic presence tracker, that sends messages when the provided user's presence changed, written with discordgo Just put the User ID t

Oct 12, 2022
franz-go - A complete Apache Kafka client written in Go

franz-go contains a feature complete, pure Go library for interacting with Kafka from 0.8.0 through 2.8.0+. Producing, consuming, transacting, administrating, etc.

Dec 29, 2022
A script written to automatically send whatsapp messages

whatsapp-auto-message A script written to automatically send whatsapp messages Config Dosyasını Ayarlama { "numbers": "phone_numbers.txt", "sleep": 2,

Jul 21, 2022
Study project that uses Apache Kafka as syncing mechanism between two databases, with producers and consumers written in Go.

Kafka DB Sync Study project that uses Apache Kafka as syncing mechanisms between a monolith DB and a microservice. The main purpose of this project is

Dec 5, 2021
Apr 12, 2022