Confluent's Apache Kafka Golang client

Confluent's Golang Client for Apache KafkaTM

confluent-kafka-go is Confluent's Golang client for Apache Kafka and the Confluent Platform.

Features:

  • High performance - confluent-kafka-go is a lightweight wrapper around librdkafka, a finely tuned C client.

  • Reliability - There are a lot of details to get right when writing an Apache Kafka client. We get them right in one place (librdkafka) and leverage this work across all of our clients (also confluent-kafka-python and confluent-kafka-dotnet).

  • Supported - Commercial support is offered by Confluent.

  • Future proof - Confluent, founded by the creators of Kafka, is building a streaming platform with Apache Kafka at its core. It's high priority for us that client features keep pace with core Apache Kafka and components of the Confluent Platform.

The Golang bindings provides a high-level Producer and Consumer with support for the balanced consumer groups of Apache Kafka 0.9 and above.

See the API documentation for more information.

Examples

High-level balanced consumer

import (
	"fmt"
	"gopkg.in/confluentinc/confluent-kafka-go.v1/kafka"
)

func main() {

	c, err := kafka.NewConsumer(&kafka.ConfigMap{
		"bootstrap.servers": "localhost",
		"group.id":          "myGroup",
		"auto.offset.reset": "earliest",
	})

	if err != nil {
		panic(err)
	}

	c.SubscribeTopics([]string{"myTopic", "^aRegex.*[Tt]opic"}, nil)

	for {
		msg, err := c.ReadMessage(-1)
		if err == nil {
			fmt.Printf("Message on %s: %s\n", msg.TopicPartition, string(msg.Value))
		} else {
			// The client will automatically try to recover from all errors.
			fmt.Printf("Consumer error: %v (%v)\n", err, msg)
		}
	}

	c.Close()
}

Producer

import (
	"fmt"
	"gopkg.in/confluentinc/confluent-kafka-go.v1/kafka"
)

func main() {

	p, err := kafka.NewProducer(&kafka.ConfigMap{"bootstrap.servers": "localhost"})
	if err != nil {
		panic(err)
	}

	defer p.Close()

	// Delivery report handler for produced messages
	go func() {
		for e := range p.Events() {
			switch ev := e.(type) {
			case *kafka.Message:
				if ev.TopicPartition.Error != nil {
					fmt.Printf("Delivery failed: %v\n", ev.TopicPartition)
				} else {
					fmt.Printf("Delivered message to %v\n", ev.TopicPartition)
				}
			}
		}
	}()

	// Produce messages to topic (asynchronously)
	topic := "myTopic"
	for _, word := range []string{"Welcome", "to", "the", "Confluent", "Kafka", "Golang", "client"} {
		p.Produce(&kafka.Message{
			TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: kafka.PartitionAny},
			Value:          []byte(word),
		}, nil)
	}

	// Wait for message deliveries before shutting down
	p.Flush(15 * 1000)
}

More elaborate examples are available in the examples directory, including how to configure the Go client for use with Confluent Cloud.

Getting Started

Supports Go 1.11+ and librdkafka 1.4.0+.

Using Go Modules

Starting with Go 1.13, you can use Go Modules to install confluent-kafka-go.

Import the kafka package from GitHub in your code:

import "github.com/confluentinc/confluent-kafka-go/kafka"

Build your project:

go build ./...

If you are building for Alpine Linux (musl), -tags musl must be specified.

go build -tags musl ./...

A dependency to the latest stable version of confluent-kafka-go should be automatically added to your go.mod file.

Install the client

If Go modules can't be used we recommend that you version pin the confluent-kafka-go import to v1 using gopkg.in:

Manual install:

go get -u gopkg.in/confluentinc/confluent-kafka-go.v1/kafka

Golang import:

import "gopkg.in/confluentinc/confluent-kafka-go.v1/kafka"

librdkafka

Prebuilt librdkafka binaries are included with the Go client and librdkafka does not need to be installed separately on the build or target system. The following platforms are supported by the prebuilt librdkafka binaries:

  • Mac OSX x64
  • glibc-based Linux x64 (e.g., RedHat, Debian, CentOS, Ubuntu, etc) - without GSSAPI/Kerberos support
  • musl-based Linux 64 (Alpine) - without GSSAPI/Kerberos support

When building your application for Alpine Linux (musl libc) you must pass -tags musl to go get, go build, etc.

CGO_ENABLED must NOT be set to 0 since the Go client is based on the C library librdkafka.

If GSSAPI/Kerberos authentication support is required you will need to install librdkafka separately, see the Installing librdkafka chapter below, and then build your Go application with -tags dynamic.

Installing librdkafka

If the bundled librdkafka build is not supported on your platform, or you need a librdkafka with GSSAPI/Kerberos support, you must install librdkafka manually on the build and target system using one of the following alternatives:

  • For Debian and Ubuntu based distros, install librdkafka-dev from the standard repositories or using Confluent's Deb repository.
  • For Redhat based distros, install librdkafka-devel using Confluent's YUM repository.
  • For MacOS X, install librdkafka from Homebrew. You may also need to brew install pkg-config if you don't already have it: brew install librdkafka pkg-config.
  • For Alpine: apk add librdkafka-dev pkgconf
  • confluent-kafka-go is not supported on Windows.
  • For source builds, see instructions below.

Build from source:

git clone https://github.com/edenhill/librdkafka.git
cd librdkafka
./configure
make
sudo make install

After installing librdkafka you will need to build your Go application with -tags dynamic.

Note: If you use the master branch of the Go client, then you need to use the master branch of librdkafka.

confluent-kafka-go requires librdkafka v1.4.0 or later.

API Strands

There are two main API strands: function and channel based.

Function Based Consumer

Messages, errors and events are polled through the consumer.Poll() function.

Pros:

  • More direct mapping to underlying librdkafka functionality.

Cons:

  • Makes it harder to read from multiple channels, but a go-routine easily solves that (see Cons in channel based consumer above about outdated events).
  • Slower than the channel consumer.

See examples/consumer_example

Channel Based Consumer (deprecated)

Deprecated: The channel based consumer is deprecated due to the channel issues mentioned below. Use the function based consumer.

Messages, errors and events are posted on the consumer.Events channel for the application to read.

Pros:

  • Possibly more Golang:ish
  • Makes reading from multiple channels easy
  • Fast

Cons:

  • Outdated events and messages may be consumed due to the buffering nature of channels. The extent is limited, but not remedied, by the Events channel buffer size (go.events.channel.size).

See examples/consumer_channel_example

Channel Based Producer

Application writes messages to the producer.ProducerChannel. Delivery reports are emitted on the producer.Events or specified private channel.

Pros:

  • Go:ish
  • Proper channel backpressure if librdkafka internal queue is full.

Cons:

  • Double queueing: messages are first queued in the channel (size is configurable) and then inside librdkafka.

See examples/producer_channel_example

Function Based Producer

Application calls producer.Produce() to produce messages. Delivery reports are emitted on the producer.Events or specified private channel.

Pros:

  • Go:ish

Cons:

  • Produce() is a non-blocking call, if the internal librdkafka queue is full the call will fail.
  • Somewhat slower than the channel producer.

See examples/producer_example

License

Apache License v2.0

KAFKA is a registered trademark of The Apache Software Foundation and has been licensed for use by confluent-kafka-go. confluent-kafka-go has no affiliation with and is not endorsed by The Apache Software Foundation.

Developer Notes

See kafka/README

Contributions to the code, examples, documentation, et.al, are very much appreciated.

Make your changes, run gofmt, tests, etc, push your branch, create a PR, and sign the CLA.

Owner
Confluent Inc.
Real-time streams powered by Apache Kafka®
Confluent Inc.
Comments
  • How to use Golang client on Windows

    How to use Golang client on Windows

    Description

    How can I configure windows in order for my Golang client to work ? e.g. where should I place librdkafka client ?

    How to reproduce

    1. download librdkafka from https://www.nuget.org/packages/librdkafka.redist/ & unzip its contents
    2. go get -u github.com/confluentinc/confluent-kafka-go/kafka
    3. setup pkg-config for windows
    4. go run main.go results in
    Package rdkafka was not found in the pkg-config search path.
    Perhaps you should add the directory containing `rdkafka.pc'
    to the PKG_CONFIG_PATH environment variable
    No package 'rdkafka' found
    pkg-config: exit status 1
    
  • Slow Producer and huge memory leak

    Slow Producer and huge memory leak

    I am sending messages to Kafka using this code:

    	deliveryChan := make(chan kafka.Event)
    	topic:=viper.GetString("kafka.forwardtopic")
    	kf.Producer.Produce(&kafka.Message{TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: kafka.PartitionAny}, Value: []byte(bulk)}, deliveryChan)
    	e := <-deliveryChan
    	m := e.(*kafka.Message)
    
    	if m.TopicPartition.Error != nil {
    		logger.Errorf("Delivery failed: %v\n", m.TopicPartition.Error)
    	}
    	close(deliveryChan)
    

    However this is extremely slow. Sometimes it takes a second or even 2. I guess it hangs on:

    e := <-deliveryChan Because it is waiting for Kafka acknowledge.

    So I tried the same without the channel because I don't really need Kafka acknowledge:

    	//deliveryChan := make(chan kafka.Event)
    	topic:=viper.GetString("kafka.forwardtopic")
    	kf.Producer.Produce(&kafka.Message{TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: kafka.PartitionAny}, Value: []byte(bulk)}, nil)
    	//e := <-deliveryChan
    	//m := e.(*kafka.Message)
    
    	//if m.TopicPartition.Error != nil {
    	//	logger.Errorf("Delivery failed: %v\n", m.TopicPartition.Error)
    	//}
    	//close(deliveryChan)
    

    But this creates a huge memory leak and my app crashes after few minutes:

  • Schema registry support

    Schema registry support

    I am not able to pass url for schema registry url in the config map. It would be great if somebody point me to right place. I looked at all the documentation. We are using confluent kakfa and schema registry all over the place. In java & python we are able to pass. But not able to use it go.

  • Crash on message retry enqueue

    Crash on message retry enqueue

    Description

    We have large scale deployment of kafka producers using confluent-kafka-go and come across many occurrences of crashes associated with the following assertion failure:rd_kafka_assert(NULL, rkmq->rkmq_msg_cnt > 0) in rdkafka_msg.h. It appears to occur more often while we have under replicated partitions or while we perform partition reassignment. The following stack trace has been found to be shared among all the crash occurrences:

    Using host libthread_db library "/lib/x86_64-linux-gnu/libthread_db.so.1".
    Program terminated with signal SIGABRT, Aborted.
    #0  runtime.raise () at /usr/local/go/src/runtime/sys_linux_amd64.s:159
    +------------------------------------------------------------------------------+
    | backtrace                                                                    |
    +------------------------------------------------------------------------------+
    #0  runtime.raise () at /usr/local/go/src/runtime/sys_linux_amd64.s:159
    #1  0x0000000000458d82 in runtime.raisebadsignal (c=0x7f0fb77fbd98, sig=6) at /usr/local/go/src/runtime/signal_unix.go:491
    #2  0x00000000004591a3 in runtime.badsignal (c=0x7f0fb77fbd98, sig=6) at /usr/local/go/src/runtime/signal_unix.go:600
    #3  0x0000000000458988 in runtime.sigtrampgo (ctx=0x7f0fb77fbe40, info=0x7f0fb77fbf70, sig=6) at /usr/local/go/src/runtime/signal_unix.go:297
    #4  0x0000000000471863 in runtime.sigtramp () at /usr/local/go/src/runtime/sys_linux_amd64.s:352
    #5  
    #6  0x00007f0fdaecdc37 in __GI_raise (sig=sig@entry=6) at ../nptl/sysdeps/unix/sysv/linux/raise.c:56
    #7  0x00007f0fdaed1028 in __GI_abort () at abort.c:89
    #8  0x000000000066456a in rd_kafka_crash (file=file@entry=0x7c15c3 "rdkafka_msg.h", line=line@entry=263, function=function@entry=0x7c1980 <__FUNCTION__.19041> "rd_kafka_msgq_deq", rk=rk@entry=0x0, reason=reason@entry=0x7c1708 "assert: rkmq->rkmq_msg_cnt > 0") at rdkafka.c:3102
    #9  0x000000000041a520 in rd_kafka_msgq_deq (do_count=, rkm=, rkmq=) at rdkafka_msg.h:263
    #10 0x0000000000683e09 in rd_kafka_msgq_deq (do_count=, rkm=, rkmq=) at rdkafka_msg.h:264
    #11 rd_kafka_msgq_age_scan (rkmq=0x1ba88, rkmq@entry=0x7f0ebc044a88, timedout=0x1bad5, timedout@entry=0x7f0fb77fc5f0, now=6, now@entry=2173403318391) at rdkafka_msg.c:577
    #12 0x0000000000687875 in rd_kafka_topic_scan_all (rk=rk@entry=0x7f0fcc001400, now=2173403318391) at rdkafka_topic.c:1135
    #13 0x0000000000662e56 in rd_kafka_topic_scan_tmr_cb (rkts=, arg=) at rdkafka.c:1194
    #14 0x000000000068a448 in rd_kafka_timers_run (rkts=rkts@entry=0x7f0fcc001c18, timeout_us=timeout_us@entry=0) at rdkafka_timer.c:251
    #15 0x000000000066c8d7 in rd_kafka_thread_main (arg=arg@entry=0x7f0fcc001400) at rdkafka.c:1270
    #16 0x00000000006cdb97 in _thrd_wrapper_function (aArg=) at tinycthread.c:624
    #17 0x00007f0fdb470184 in start_thread (arg=0x7f0fb77fe700) at pthread_create.c:312
    #18 0x00007f0fdaf94ffd in clone () at ../sysdeps/unix/sysv/linux/x86_64/clone.S:111
    

    How to reproduce

    We tried and have not yet been successful reproducing in a lab environment. We'd like to know of any suggestions of changes in the librdkafka code such that it prints out things useful to diagnose this issue. Also, we briefly reviewed the later librdkafka commits and didn't find anything that looks like it addresses this crash, but if we misidentified this, we would be happy to rebuild and retest with the lastest librdkafka commit in master.

    Checklist

    Please provide the following information:

    • [x] confluent-kafka-go and librdkafka version (LibraryVersion()): confluent-kafka-go commit: 1112e2c08a15bce669a99cc35284a690fd2086b8 librdkafka: 0.11.4-RC1B
    • [x] Apache Kafka broker version: 0.11.0.1
    • [x] Client configuration: ConfigMap{...}
    			"bootstrap.servers":      brokerList,
    			"queue.buffering.max.ms": "500",
    			"compression.codec":      "gzip",
    			"batch.num.messages":     "256",
    			"security.protocol":      "ssl",
    			"queue.buffering.max.messages": 8000,
    			"broker.version.fallback":      "0.10.2.1",
    			"statistics.interval.ms":       60000,
    
    • [x] Operating system: ubuntu 14.04
    • [ ] Provide client logs (with "debug": ".." as necessary) We didn't know witch debug setting would be useful for this so didn't enable any debug flag. Please let us know if there is a debug flag we should enable.
    • [ ] Provide broker log excerpts
    • [x] Critical issue
  • Clang error when building with v1.4.0

    Clang error when building with v1.4.0

    Description

    I upgraded to the latest version of the go client (v1.4.0) and ran brew upgrade librdkafka. But the client is looking for librdkafka in the package directory and it is not present.

    How to reproduce

    In an existing project that uses confluent-kafka-go, run go get -u gopkg.in/confluentinc/confluent-kafka-go.v1/kafka and go build ./...

    You should receive something similar to:

    clang: error: no such file or directory: '$GOHOME/src/github.com/your-user/your-service/vendor/gopkg.in/confluentinc/confluent-kafka-go.v1/kafka/librdkafka
    /librdkafka_darwin.a'
    

    Checklist

    Please provide the following information:

    • [x] confluent-kafka-go and librdkafka version (LibraryVersion()): v1.4.0 (both)
    • [ n/a ] Apache Kafka broker version:
    • [ n/a ] Client configuration:
    • [x] Operating system: Mac OS X 10.15.4
    • [ n/a ] Provide client logs
    • [ n/a ] Provide broker log excerpts
    • [ ] Critical issue
  • Consumer.Close() never returns with multiple consumers in the same process

    Consumer.Close() never returns with multiple consumers in the same process

    I'm running 2 consumers on the same topic and the same consumer group, using Poll() to consume messages. After having consumed messages with both of them, calling Close() on any of the consumers blocks indefinitely, reproducible 100% of the time. Debug logs show things being stuck at "waiting for rebalance_cb"

    This is the gist with the go code to reproduce and all relevant information: https://gist.github.com/dtheodor/26821f951502aab2e325f860b459cbfc

    This could be the same issue as https://github.com/confluentinc/confluent-kafka-go/issues/65 but you are talking about using the Events channel there, which I am not.

    Calling Unassign() right before calling Close() allows Close() to return successfully, but having to call Unassign looks bogus since I never have to use Assign or Unassign in any other place, this is managed by the lib for me.

  • 1.4.0: Building apps fails on latest alpine, when using bundled librdkafka

    1.4.0: Building apps fails on latest alpine, when using bundled librdkafka

    Description

    When using the 1.4.0 release, the bundled librdkafka fails to work as intended with the latest alpine.

    How to reproduce

    Build an application which uses github.com/confluentinc/confluent-kafka-go, in a container, using the alpine:3.11 image (which, at the time of writing, is actually alpine:3.11.5) as a base. Errors such as the following will be displayed:

     go: downloading github.com/confluentinc/confluent-kafka-go v1.4.0
     # github.com/confluentinc/confluent-kafka-go/kafka
     /usr/lib/gcc/x86_64-alpine-linux-musl/9.2.0/../../../../x86_64-alpine-linux-musl/bin/ld: /go/pkg/mod/github.com/confluentinc/[email protected]/kafka/librdkafka/librdkafka_glibc_linux.a(rdkafka_txnmgr.o): in function `rd_kafka_txn_set_fatal_error':
     (.text+0x141): undefined reference to `__strdup'
     /usr/lib/gcc/x86_64-alpine-linux-musl/9.2.0/../../../../x86_64-alpine-linux-musl/bin/ld: /go/pkg/mod/github.com/confluentinc/[email protected]/kafka/librdkafka/librdkafka_glibc_linux.a(rdkafka_txnmgr.o): in function `rd_kafka_txn_set_abortable_error':
     (.text+0x64f): undefined reference to `__strdup'
    ...
    

    Installing a fresh build of librdkafka 1.4.0 into the container and building the application with -tags dynamic resolves the issue.

    Could possibly be because the bundled librdkafka was built from a :edge release of alpine, instead of :latest?

    Checklist

    Please provide the following information:

    • [x] confluent-kafka-go and librdkafka version (LibraryVersion()): 1.4.0, bundled.
    • [ ] Apache Kafka broker version: N/A
    • [ ] Client configuration: N/A
    • [x] Operating system: Alpine Linux
    • [ ] Provide client logs (with "debug": ".." as necessary): N/A
    • [ ] Provide broker log excerpts: N/A
    • [ ] Critical issue: No
  • WIP: Avro/Schema-Registry

    WIP: Avro/Schema-Registry

    This is a request for a review/comment of a WIP Schema Registry implementation.

    I have attempted to stick to the Java API as much as possible with some minor changes but I've run into a bit of a mental block on deserialization. I'm not sure how to best handle deserialization into specific types.

    I have toyed with a few implementations but I think they too are less than ideal

    1. take a new()interface{} function pointer upon instantiating the deserializer. This is similar to the way sync.Pool works

    2. Move Avro serialization/deserialization out of the Producer/Consumer opting instead to have a stateful marshaler/unmarshaler.

    3. The current implementation which takes an instance on instantiation and makes a zero copy then places the results in the Message.Object field which was added in the serde_support development branch to facilitate handling objects for serialization/deserialization.

    In the end I suspect the best way to move forward is option 2. This puts serialization/deserialization outside of the producer/consumer however which may not be what we want.

    All relevant code is located in the encoding/avro folder. Specifically serializer.go

  • librdkafka v0.11.4 does not exist

    librdkafka v0.11.4 does not exist

    Description

    The go get -u github.com/confluentinc/confluent-kafka-go/kafka is unable to install the client and produces the error

    ../github.com/confluentinc/confluent-kafka-go/kafka/00version.go:43:2: error: "confluent-kafka-go requires librdkafka v0.11.4 or later. Install the latest version of librdkafka from Homebrew by running `brew install librdkafka` or `brew upgrade librdkafka`"
    #error "confluent-kafka-go requires librdkafka v0.11.4 or later. Install the latest version of librdkafka from Homebrew running `brew install librdkafka` or `brew upgrade librdkafka`"
     ^
    1 error generated.
    

    When brew install librdkafa is run it fetches the v0.11.3 which is the latest stable version released https://github.com/edenhill/librdkafka/releases

    » brew info librdkafka                                                                                                                                                                             
    librdkafka: stable 0.11.3 (bottled), HEAD
    The Apache Kafka C/C++ library
    https://github.com/edenhill/librdkafka
    /usr/local/Cellar/librdkafka/0.11.3 (14 files, 1.9MB) *
      Poured from bottle on 2018-03-16 at 13:33:24
    From: https://github.com/Homebrew/homebrew-core/blob/master/Formula/librdkafka.rb
    ==> Dependencies
    Build: pkg-config ✔
    Required: lzlib ✔, openssl ✔
    Recommended: lz4 ✔
    ==> Options
    --without-lz4
    	Build without lz4 support
    --HEAD
    	Install HEAD version
    
    • [ ] confluent-kafka-go version (0.11.0)
    • [ ] Operating system: Mac OSX
  • segmentation fault in rd_kafka_cgrp_handle_SyncGroup

    segmentation fault in rd_kafka_cgrp_handle_SyncGroup

    Description

    I'm trying to build a simple consumer that just logs messages, but am getting seg faults when running docker. It runs fine locally on my mac.

    Here is the backtrace from the coredump.

    (gdb) bt
    #0  free (p=<optimized out>) at src/malloc/malloc.c:482
    #1  0x0000000000786126 in trim (self=self@entry=0x1635990, n=<optimized out>) at src/malloc/malloc.c:317
    #2  0x00000000007864d4 in malloc (n=<optimized out>, n@entry=85) at src/malloc/malloc.c:364
    #3  0x00000000007929fa in __strdup (s=0x7fb46b044120 "v1.requestor._requestorId.orgs._orgId.users._userId.tokens._tokenId.command._command") at src/string/strdup.c:8
    #4  0x0000000000746b17 in rd_strdup (s=0x7fb46b044120 "v1.requestor._requestorId.orgs._orgId.users._userId.tokens._tokenId.command._command") at rd.h:117
    #5  rd_kafka_topic_partition_list_add0 (rktparlist=0x16095c0, topic=0x7fb46b044120 "v1.requestor._requestorId.orgs._orgId.users._userId.tokens._tokenId.command._command", partition=5, _private=0x0)
        at rdkafka_partition.c:2573
    #6  0x000000000073ad84 in rd_kafka_cgrp_handle_SyncGroup (rkcg=rkcg@entry=0x157e740, rkb=rkb@entry=0x157f760, err=err@entry=RD_KAFKA_RESP_ERR_NO_ERROR, member_state=member_state@entry=0x7fb46b057e30)
        at rdkafka_cgrp.c:3243
    #7  0x0000000000727e4b in rd_kafka_handle_SyncGroup (rk=<optimized out>, rkb=0x157f760, err=<optimized out>, rkbuf=<optimized out>, request=0x15808c0, opaque=0x157e740) at rdkafka_request.c:1106
    #8  0x00000000007122fe in rd_kafka_buf_callback (rk=<optimized out>, rkb=<optimized out>, err=RD_KAFKA_RESP_ERR_NO_ERROR, response=0x16095e0, request=0x15808c0) at rdkafka_buf.c:448
    #9  0x00000000007132f7 in rd_kafka_buf_handle_op (rko=<optimized out>, err=<optimized out>) at rdkafka_buf.c:397
    #10 0x000000000071f175 in rd_kafka_op_handle_std (rk=<optimized out>, rkq=<optimized out>, rko=<optimized out>, cb_type=<optimized out>) at rdkafka_op.c:616
    #11 0x000000000071f1b5 in rd_kafka_op_handle (rk=0x157da40, rkq=0x7fb46b058110, rko=0x15e3ae0, cb_type=RD_KAFKA_Q_CB_CALLBACK, opaque=0x0, callback=0x0) at rdkafka_op.c:647
    #12 0x00000000007184dc in rd_kafka_q_serve (rkq=0x157e420, timeout_ms=<optimized out>, max_cnt=max_cnt@entry=0, cb_type=cb_type@entry=RD_KAFKA_Q_CB_CALLBACK, callback=callback@entry=0x0, 
        opaque=opaque@entry=0x0) at rdkafka_queue.c:467
    #13 0x00000000006f0484 in rd_kafka_thread_main (arg=arg@entry=0x157da40) at rdkafka.c:1418
    #14 0x000000000074e637 in _thrd_wrapper_function (aArg=<optimized out>) at tinycthread.c:633
    #15 0x0000000000793bdc in start (p=0x7fb46b059ae8) at src/thread/pthread_create.c:150
    #16 0x0000000000794b5e in __clone () at src/thread/x86_64/clone.s:21
    

    Here are the last few lines of the rdkafka debug output before the coredump

    %7|1535210152.560|ASSIGN|rdkafka#consumer-1| [thrd:main]:   v1.requestor._requestorId.orgs._orgId.customer-extensions._customerId [5]
    %7|1535210152.560|ASSIGNOR|rdkafka#consumer-1| [thrd:main]: Group "kafkalogger": "range" assignor run for 1 member(s)
    %7|1535210152.560|CGRPJOINSTATE|rdkafka#consumer-1| [thrd:main]: Group "kafkalogger" changed join state wait-metadata -> wait-sync (v1, state up)
    %7|1535210152.561|SEND|rdkafka#consumer-1| [thrd:kafka:9092/bootstrap]: kafka:9092/1: Sent SyncGroupRequest (v0, 31712 bytes @ 0, CorrId 7)
    %7|1535210152.561|BROADCAST|rdkafka#consumer-1| [thrd:kafka:9092/bootstrap]: Broadcasting state change
    %7|1535210152.570|RECV|rdkafka#consumer-1| [thrd:kafka:9092/bootstrap]: kafka:9092/1: Received SyncGroupResponse (v0, 31580 bytes, CorrId 7, rtt 9.24ms)
    %7|1535210152.570|SYNCGROUP|rdkafka#consumer-1| [thrd:main]: SyncGroup response: Success (31574 bytes of MemberState data)
    

    How to reproduce

    I'm running against a single node kafka broker running kafka 1.1.0 in a docker image running on alpine linux 3.7. It has a lot of topics though (~300).

    The code I'm running is essentially the example consumer in the README of this repo.

    main.go

    package main
    
    import (
    	"fmt"
    
    	"github.com/confluentinc/confluent-kafka-go/kafka"
    )
    
    func main() {
    
    	c, err := kafka.NewConsumer(&kafka.ConfigMap{
    		"bootstrap.servers": "kafka:9092",
    		"group.id":          "myGroup",
    		"auto.offset.reset": "earliest",
    	})
    
    	if err != nil {
    		panic(err)
    	}
    
    	c.SubscribeTopics([]string{"^v.*"}, nil)
    
    	for {
    		msg, err := c.ReadMessage(-1)
    		if err == nil {
    			fmt.Printf("Message on %s: %s\n", msg.TopicPartition, string(msg.Value))
    		} else {
    			fmt.Printf("Consumer error: %v (%v)\n", err, msg)
    			break
    		}
    	}
    
    	c.Close()
    }
    

    Dockerfile

    FROM golang:1.10-alpine3.8 as builder
    
    # librdkakfa
    RUN apk --update --no-cache add git openssh bash g++ make
    RUN git clone https://github.com/edenhill/librdkafka.git
    WORKDIR ./librdkafka
    RUN ./configure --prefix /usr && make && make install
    
    ARG gopath="/go"
    ENV GOPATH=${gopath}
    WORKDIR $GOPATH/src/github.com/gerad/kafkalogger/
    
    COPY . ./
    RUN go build -tags static_all -o kafkalogger .
    
    FROM alpine:3.8
    WORKDIR /kafkalogger/
    # Install CA certificates
    RUN apk --update --no-cache add ca-certificates gdb
    ENV USER gerad
    RUN addgroup -S $USER && adduser -S $USER $USER && chown $USER:$USER .
    USER gerad
    RUN ulimit -c unlimited
    COPY --from=builder /go/src/github.com/gerad/kafkalogger/kafkalogger .
    
    ENTRYPOINT ["./kafkalogger"]
    

    I see this behavior on master and the latest release v0.11.4

    Checklist

    Please provide the following information:

    • [x] confluent-kafka-go and librdkafka version (LibraryVersion()): 722431 0.11.6-PRE2-10-gbfc821
    • [x] Apache Kafka broker version: 1.1.0
    • [x] Client configuration:
    ConfigMap{
      "bootstrap.servers": "kafka:9092",
      "group.id":          "myGroup",
      "auto.offset.reset": "earliest",
    }
    
    • [x] Operating system: Linux - Alpine 3.8
    • [x] Provide client logs (with "debug": ".." as necessary)
    • [x] Provide broker log excerpts
    [2018-08-25 15:43:00,013] INFO [GroupCoordinator 1]: Preparing to rebalance group myGroup with old generation 12 (__consumer_offsets-17) (kafka.coordinator.group.GroupCoordinator)
    [2018-08-25 15:43:03,016] INFO [GroupCoordinator 1]: Stabilized group myGroup generation 13 (__consumer_offsets-17) (kafka.coordinator.group.GroupCoordinator)
    [2018-08-25 15:43:03,053] INFO [GroupCoordinator 1]: Assignment received from leader for group myGroup for generation 13 (kafka.coordinator.group.GroupCoordinator)
    
    • [ ] Critical issue
  • Make dynamic build use system librdkafka headers

    Make dynamic build use system librdkafka headers

    cgo adds -I${SRCDIR} to the header search path by default; therefore, using #include <librdkafka/rdkafka.h> can actually still find the bundled librdkafka header in preference to the system one even when -tags=dynamic is specified.

    This change instead makes the static build set a preprocessor macro USE_VENDORED_LIBRDKAFKA, which is used in a new header rdkafka_select.h to either import the bundled one (with a special name, and quotes), or the system one (with the <> import and the expected name).

    Fixes #514

  • [DO NOT MERGE] librdkafka static bundle fixes required for next release

    [DO NOT MERGE] librdkafka static bundle fixes required for next release

    THIS MUST NOT BE MERGED BEFORE THE NEXT IMPORT

    So what you do is, when it is time for the next librdkafka import or RC: $ git checkout master $ git pull --rebase origin master $ git fetch --tags $ git checkout impfix # this PR $ cd kafka/librdkafka_vendor $ ./import.sh --develop /tmp/librdkafka-static-bundle-v2.0.0-RC2.tgz # or whatever version

    If everything is ok, push it:

    $ git checkout import_v2.0.0-RC2 # or whatever verison it is $ push --dry-run origin import_v2.0.0-RC2

    Go to github, review, MERGE! Do not squash or rebase, MERGE!

  • Offset reset callback is required when doing manual commits

    Offset reset callback is required when doing manual commits

    Description

    When the auto.offset.reset is set to latest and the enable.auto.offset.store is set to false, the reset offset should be committed to the broker in-order to report the correct lag. When there is an offset reset we should be notified of this event so that we can commit that offset to the broker. If we do not commit this offset to the broker then lag is reported even though there isn't any lag

    How to reproduce

    Checklist

    Please provide the following information:

    • [x] confluent-kafka-go and librdkafka version (LibraryVersion()): v1.9.2
    • [x] Apache Kafka broker version: v2.3.1
    • [x] Client configuration: ConfigMap{ "bootstrap.servers": "<sensitive>", "debug": "consumer,cgrp", "group.id": "consumer_001", "enable.auto.commit": false, "enable.auto.offset.store": false, "session.timeout.ms": 10000, "auto.offset.reset": "latest", "max.poll.interval.ms": 15000, }
    • [x] Operating system: OSX 11.5.2
    • [x] Provide client logs (with "debug": ".." as necessary)
    Screenshot 2022-12-14 at 12 20 53 PM
    • [x] Broker lag metrics
    Screenshot 2022-12-14 at 12 21 26 PM Screenshot 2022-12-14 at 12 21 46 PM
    • [x] Critical issue

    As you can see that the offset was reset to the log end offset because offset reset was set to latest, but this offset is not committed as we do not know which offset to store using the StoreOffsets method

  • Question Regarding Kafka Protobuf Producer - Specifying Schemas and Versions

    Question Regarding Kafka Protobuf Producer - Specifying Schemas and Versions

    Description

    Hi,

    I am new to Kafka Connect and working with Protobuf serialization and also this entire process in general. My current task is to evaluate the data flow process between Kafka topics and inserts into TimescaleDB using JDBC sink connector with Kafka Schema Registry. I have almost everything up and running and am trying to test the E2E flow using a sample producer.

    However, I had a few general questions on Kafka Protobuf Producer, an example is provided in this repo here: https://github.com/confluentinc/confluent-kafka-go/blob/5ba3caae52b04aaf7b4e22d5e30737b42ca948cd/schemaregistry/serde/protobuf/protobuf.go

    I was hoping someone could explain to me a few things here:

    In the referenced example above, only the schema URL is given. But let's say that you had multiple proto schema subjects available (example: proto.testrecord and proto.anotherrecord) in the Schema Registry. In this new example, we have the two schemas showing as:

    proto.testrecord

    message TestRecord {
        string cluster_name = 1;
        string id = 2;
        string hostname = 3;
        string metric = 4;
        int64 value = 5;
        string value_text = 6;
        int64 timestamp = 7;
    } 
    

    proto.anotherrecord

    message AnotherRecord{
        string source_name= 1;
        string map_id= 2;
        string hostname = 3;
        string metric_group = 4;
        int64 value = 5;
        string value_text = 6;
        int64 timestamp = 7;
    } 
    

    Let's say you also have two topics and one (producer 1) should use the first schema subject and the second (producer 2) should use the second schema subject to validate/conform the data for inserts. Now let's you are creating a producer for first topic (producer 1) and it should use the proto.testrecord subject from the Schema Registry for serialization. How would you configure/tell the producer to use the correct schema subject and also the exact version (if multiple existed)? Or are not supposed to specify those due to the way the process works?

    I noticed the repo example provided doesn't specify any of that information and I am trying to understand exactly how it knows or defaults to a certain subject and version.

    According to this: https://github.com/confluentinc/confluent-kafka-go/blob/5ba3caae52b04aaf7b4e22d5e30737b42ca948cd/schemaregistry/serde/config.go#L36. Looks like you can specify the SchemaId (via UseSchemaID var) which tells it which schema subject to use but unsure about how to specify versioning.

    I probably missed something during my reading and am not understanding things correctly and could use some help/discussion.

    Any help is appreciated, thanks!

  • Add SeekPartitions wrapper for rd_kafka_seek_partitions function to support seeking multiple partitions at a time

    Add SeekPartitions wrapper for rd_kafka_seek_partitions function to support seeking multiple partitions at a time

    Description

    The Seek function only supports seeking one partition at a time. This is due to the underlying limitation of the rd_kafka_seek function also only seeking one partition at a time.

    However, the rd_kafka_seek function was marked as deprecated, with rd_kafka_seek_partitions being the new preferred alternative. This function takes a list of partitions.

    Please consider adding a new function named SeekPartitions or similar to the Go consumer, which wraps rd_kafka_seek_partitions. This will allow us to avoid having to write loops like this:

    	assigned, _ := consumer.Assignment()
    	for i := range assigned {
    		assigned[i].Offset = 0
    	}
    	for _, a := range assigned {
    		_ = consumer.Seek(a, 0)
    	}
    

    and instead write:

    	assigned, _ := consumer.Assignment()
    	for i := range assigned {
    		assigned[i].Offset = 0
    	}
    	_ = consumer.SeekPartitions(assigned, 0)
    

    Checklist

    Please provide the following information:

    • [X] confluent-kafka-go and librdkafka version (LibraryVersion()): master branch: a reference to rd_kafka_seek_partitions cannot be found anywhere indicating that there is no existing wrapper function
    • [ ] Apache Kafka broker version: N/A
    • [ ] Client configuration: ConfigMap{...} N/A
    • [ ] Operating system: N/A
    • [ ] Provide client logs (with "debug": ".." as necessary): N/A
    • [ ] Provide broker log excerpts: N/A
    • [ ] Critical issue
  • Bug/producer consumer adminclient validity handling

    Bug/producer consumer adminclient validity handling

    • Added isClosed new field to Producer struct which acts as flag to know if Producer is open or closed.

    • Added new function verifyProducer() function which checks if producer is valid or not by doing null checks and closed checks to avoid seg faults for C-GO or go-panics.

franz-go - A complete Apache Kafka client written in Go

franz-go contains a feature complete, pure Go library for interacting with Kafka from 0.8.0 through 2.8.0+. Producing, consuming, transacting, administrating, etc.

Dec 29, 2022
Sarama is a Go library for Apache Kafka 0.8, and up.

sarama Sarama is an MIT-licensed Go client library for Apache Kafka version 0.8 (and later). Getting started API documentation and examples are availa

Jan 1, 2023
Apache Kafka Web UI for exploring messages, consumers, configurations and more with a focus on a good UI & UX.
Apache Kafka Web UI for exploring messages, consumers, configurations and more with a focus on a good UI & UX.

Kowl - Apache Kafka Web UI Kowl (previously known as Kafka Owl) is a web application that helps you to explore messages in your Apache Kafka cluster a

Jan 3, 2023
Modern CLI for Apache Kafka, written in Go.
Modern CLI for Apache Kafka, written in Go.

Kaf Kafka CLI inspired by kubectl & docker Install Install from source: go get -u github.com/birdayz/kaf/cmd/kaf Install binary: curl https://raw.git

Dec 31, 2022
CLI Tool to Stress Apache Kafka Clusters

Kafka Stress - Stress Test Tool for Kafka Clusters, Producers and Consumers Tunning Installation Docker docker pull fidelissauro/kafka-stress:latest d

Nov 13, 2022
Study project that uses Apache Kafka as syncing mechanism between two databases, with producers and consumers written in Go.

Kafka DB Sync Study project that uses Apache Kafka as syncing mechanisms between a monolith DB and a microservice. The main purpose of this project is

Dec 5, 2021
A quick introduction to how Apache Kafka works and differs from other messaging systems using an example application.
A quick introduction to how Apache Kafka works and differs from other messaging systems using an example application.

Apache Kafka in 6 minutes A quick introduction to how Apache Kafka works and differs from other messaging systems using an example application. In thi

Oct 27, 2021
Testing Apache Kafka using Go.

Apache Kafka Go Testing Apache Kafka using Go. Instructions Provision the single node Kafka cluster using Docker: docker-compose -p apache-kafka-go up

Dec 17, 2021
provider-kafka is a Crossplane Provider that is used to manage Kafka resources.

provider-kafka provider-kafka is a Crossplane Provider that is used to manage Kafka resources. Usage Create a provider secret containing a json like t

Oct 29, 2022
A CLI tool for interacting with Kafka through the Confluent Kafka Rest Proxy

kafkactl Table of contents kafkactl Table of contents Overview Build Development Overview kafkactl is a CLI tool to interact with Kafka through the Co

Nov 1, 2021
Apache Pulsar Go Client Library

Apache Pulsar Go Client Library A Go client library for the Apache Pulsar project. Goal This projects is developing a pure-Go client library for Pulsa

Jan 4, 2023
Go client libraries for Kafka

kafka-go Motivations We rely on both Go and Kafka a lot at Segment. Unfortunately, the state of the Go client libraries for Kafka at the time of this

Jan 10, 2022
Experiments using Go 1.18beta1's Generic typings and the Segmentio kafka-go consumer client

Experiments using Go 1.18beta1's Generic typings and the Segmentio kafka-go consumer client

Jan 28, 2022
Kafka implemented in Golang with built-in coordination (No ZooKeeper, single binary install, Cloud Native)

Jocko Distributed commit log service in Go that is wire compatible with Kafka. Created by @travisjeffery, continued by nash. Goals: Protocol compatibl

Aug 9, 2021
GoLang + Kafka example project

Golang Kafka Example Sample Golang Kafka Consumer and Producer Setup Apache Kafka Quickstart Producer go run cmd/producer/main.go Consumer flags: brok

Nov 9, 2021
Example Golang Event-Driven with kafka Microservices Choreography

Microservices Choreography A demonstration for event sourcing using Go and Kafka example Microservices Choreography. To run this project: Install Go I

Dec 2, 2021
Laboratório de Kafka com Golang - Full Cycle

APACHE KAFKA "O Apache Kafka é uma plataforma distribuída de streaming de eventos open-source que é utilizada por milhares de empresas para uma alta p

Jan 27, 2022
Implementation of the NELI leader election protocol for Go and Kafka
Implementation of the NELI leader election protocol for Go and Kafka

goNELI Implementation of the NELI leader election protocol for Go and Kafka. goNELI encapsulates the 'fast' variation of the protocol, running in excl

Dec 8, 2022
ChanBroker, a Broker for goroutine, is simliar to kafka

Introduction chanbroker, a Broker for goroutine, is simliar to kafka In chanbroker has three types of goroutine: Producer Consumer(Subscriber) Broker

Aug 12, 2021