franz-go - A complete Apache Kafka client written in Go

franz-go - A complete Apache Kafka client written in Go

GoDev GitHub GitHub tag (latest SemVer) Discord Chat

Franz-go is an all-encompassing Apache Kafka client fully written Go. This library aims to provide every Kafka feature from Apache Kafka v0.8.0 onward. It has support for transactions, regex topic consuming, the latest partitioning strategies, data loss detection, closest replica fetching, and more. If a client KIP exists, this library aims to support it.

This library attempts to provide an intuitive API while interacting with Kafka the way Kafka expects (timeouts, etc.).

Features

  • Feature complete client (Kafka >= 0.8.0 through v2.8.0+)
  • Full Exactly-Once-Semantics (EOS)
  • Idempotent & transactional producers
  • Simple (legacy) consumer
  • Group consumers with eager (roundrobin, range, sticky) and cooperative (cooperative-sticky) balancers
  • All compression types supported: gzip, snappy, lz4, zstd
  • SSL/TLS provided through custom dialer options
  • All SASL mechanisms supported (GSSAPI/Kerberos, PLAIN, SCRAM, and OAUTHBEARER)
  • Low-level admin functionality supported through a simple Request function
  • Utilizes modern & idiomatic Go (support for contexts, variadic configuration options, ...)
  • Highly performant by avoiding channels and goroutines where not necessary
  • Written in pure Go (no wrapper lib for a C library or other bindings)
  • Ability to add detailed log messages or metrics using hooks
  • Plug-in metrics support for prometheus, zap, etc.

Works with any Kafka compatible brokers:

  • Redpanda: the fastest and most efficient Kafka compatible event streaming platform
  • Kafka: the original Java project
  • Microsoft Event Hubs
  • Amazon MSK

Getting started

Here's a basic overview of producing and consuming:

0 { // All errors are retried internally when fetching, but non-retriable errors are // returned from polls so that users can notice and take action. panic(fmt.Sprint(errs)) } // We can iterate through a record iterator... iter := fetches.RecordIter() for !iter.Done() { record := iter.Next() fmt.Println(string(record.Value), "from an iterator!") } // or a callback function. fetches.EachPartition(func(p kgo.FetchTopicPartition) { for _, record := range p.Partition.Records { fmt.Println(string(record.Value), "from range inside a callback!") } // We can even use a second callback! p.EachRecord(func(record *Record) { fmt.Println(string(record.Value), "from a second callback!") }) }) } ">
seeds := []string{"localhost:9092"}
// One client can both produce and consume!
// Consuming can either be direct (no consumer group), or through a group. Below, we use a group.
cl, err := kgo.NewClient(
	kgo.SeedBrokers(seeds...),
	kgo.ConsumerGroup("my-group-identifier"),
	kgo.ConsumeTopics("foo"),
)
if err != nil {
	panic(err)
}
defer cl.Close()

ctx := context.Background()

// 1.) Producing a message
// All record production goes through Produce, and the callback can be used
// to allow for synchronous or asynchronous production.
var wg sync.WaitGroup
wg.Add(1)
record := &kgo.Record{Topic: "foo", Value: []byte("bar")}
cl.Produce(ctx, record, func(_ *Record, err error) {
	defer wg.Done()
	if err != nil {
		fmt.Printf("record had a produce error: %v\n", err)
	}

})
wg.Wait()

// Alternatively, ProduceSync exists to synchronously produce a batch of records.
if err := cl.ProduceSync(ctx, record).FirstErr(); err != nil {
	fmt.Printf("record had a produce error while synchronously producing: %v\n", err)
}

// 2.) Consuming messages from a topic
for {
	fetches := cl.PollFetches(ctx)
	if errs := fetches.Errors(); len(errs) > 0 {
		// All errors are retried internally when fetching, but non-retriable errors are
		// returned from polls so that users can notice and take action.
		panic(fmt.Sprint(errs))
	}

	// We can iterate through a record iterator...
	iter := fetches.RecordIter()
	for !iter.Done() {
		record := iter.Next()
		fmt.Println(string(record.Value), "from an iterator!")
	}

	// or a callback function.
	fetches.EachPartition(func(p kgo.FetchTopicPartition) {
		for _, record := range p.Partition.Records {
			fmt.Println(string(record.Value), "from range inside a callback!")
		}

		// We can even use a second callback!
		p.EachRecord(func(record *Record) {
			fmt.Println(string(record.Value), "from a second callback!")
		})
	})
}

This only shows producing and consuming in the most basic sense, and does not show the full list of options to customize how the client runs, nor does it show transactional producing / consuming. Check out the examples directory for more!

API reference documentation can be found on GoDev. Supplementary information can be found in the docs directory:

docs
├── admin requests — an overview of how to issue admin requests
├── package layout — describes the packages in franz-go
├── producing and consuming — descriptions of producing & consuming & the guarantees
└── transactions — a description of transactions and the safety even in a pre-KIP-447 world

Version Pinning

By default, the client issues an ApiVersions request on connect to brokers and defaults to using the maximum supported version for requests that each broker supports.

Kafka 0.10.0 introduced the ApiVersions request; if you are working with brokers older than that, you must use the kversions package. Use the MaxVersions option for the client if you do so.

As well, it is recommended to set the MaxVersions to the version of your broker cluster. Until KIP-584 is implemented, it is possible that if you do not pin a max version, this client will speak with some features to one broker while not to another when you are in the middle of a broker update roll.

Metrics & logging

Note there exists plug-in packages that allow you to easily add prometheus metrics, go-metrics, zap logging, etc. to your client! See the plugin directory for more information! These plugins are provided under dedicated modules, e.g. github.com/twmb/franz-go/plugin/[email protected].

The franz-go client takes a neutral approach to metrics by providing hooks that you can use to plug in your own metrics.

All connections, disconnections, reads, writes, and throttles can be hooked into, as well as per-batch produce & consume metrics. If there is an aspect of the library that you wish you could have insight into, please open an issue and we can discuss adding another hook.

Hooks allow you to log in the event of specific errors, or to trace latencies, count bytes, etc., all with your favorite monitoring systems.

In addition to hooks, logging can be plugged in with a general Logger interface. A basic logger is provided if you just want to write to a given file in a simple format. All logs have a message and then key/value pairs of supplementary information. It is recommended to always use a logger and to use LogLevelInfo.

See this example for an expansive example of integrating with prometheus! Alternatively, see this example for how to use the plug-in prometheus package!

Benchmarks

This client is quite fast; it is the fastest and most cpu and memory efficient client in Go.

For 100 byte messages,

  • This client is 4x faster at producing than confluent-kafka-go, and up to 10x-20x faster (at the expense of more memory usage) at consuming.

  • This client is 2.5x faster at producing than sarama, and 1.5x faster at consuming.

  • This client is 2.4x faster at producing than segment's kafka-go, and so much faster at consuming that I'm not sure I wrote the consuming comparison correctly here.

To check benchmarks yourself, see the bench example. This example lets you produce or consume to a cluster and see the byte / record rate. The compare subdirectory shows comparison code.

Supported KIPs

Theoretically, this library supports every (non-Java-specific) client facing KIP. Any KIP that simply adds or modifies a protocol is supported by code generation.

KIP Kafka release Status
KIP-1 — Disallow acks > 1 0.8.3 Supported & Enforced
KIP-4 — Request protocol changes 0.9.0 through 0.10.1 Supported
KIP-8 — Flush method on Producer 0.8.3 Supported
KIP-12 — SASL & SSL 0.9.0 Supported
KIP-13 — Throttling (on broker) 0.9.0 Supported
KIP-15 — Close with a timeout 0.9.0 Supported (via context)
KIP-19 — Request timeouts 0.9.0 Supported
KIP-22 — Custom partitioners 0.9.0 Supported
KIP-31 — Relative offsets in message sets 0.10.0 Supported
KIP-32 — Timestamps in message set v1 0.10.0 Supported
KIP-35 — ApiVersion 0.10.0 Supported
KIP-40 — ListGroups and DescribeGroups 0.9.0 Supported
KIP-41 — max.poll.records 0.10.0 Supported (via PollRecords)
KIP-42 — Producer & consumer interceptors 0.10.0 Partial support (hooks)
KIP-43 — SASL PLAIN & handshake 0.10.0 Supported
KIP-48 — Delegation tokens 1.1.0 Supported
KIP-54 — Sticky partitioning 0.11.0 Supported
KIP-57 — Fix lz4 0.10.0 Supported
KIP-62 — background heartbeats & improvements 0.10.1 Supported
KIP-70 — On{Assigned,Revoked} 0.10.1 Supported
KIP-74 — Fetch response size limits 0.10.1 Supported
KIP-78 — ClusterID in Metadata 0.10.1 Supported
KIP-79 — List offsets for times 0.10.1 Supported
KIP-81 — Bound fetch memory usage WIP Supported (through a combo of options)
KIP-82 — Record headers 0.11.0 Supported
KIP-84 — SASL SCRAM 0.10.2 Supported
KIP-86 — SASL Callbacks 0.10.2 Supported (through callback fns)
KIP-88 — OffsetFetch for admins 0.10.2 Supported
KIP-91 — Intuitive producer timeouts 2.1.0 Supported (as a matter of opinion)
KIP-97 — Backwards compat for old brokers 0.10.2 Supported
KIP-98 — EOS 0.11.0 Supported
KIP-101 — OffsetForLeaderEpoch v0 0.11.0 Supported
KIP-102 — Consumer close timeouts 0.10.2 Supported (via context)
KIP-107 — DeleteRecords 0.11.0 Supported
KIP-108 — CreateTopic validate only field 0.10.2 Supported
KIP-110 — zstd 2.1.0 Supported
KIP-112 — Broker request protocol changes 1.0.0 Supported
KIP-113 — LogDir requests 1.0.0 Supported
KIP-117 — Admin client 0.11.0 Supported (via kmsg)
KIP-124 — Request rate quotas 0.11.0 Supported
KIP-126 — Ensure proper batch size after compression 0.11.0 Supported (avoided entirely)
KIP-133 — Describe & Alter configs 0.11.0 Supported
KIP-140 — ACLs 0.11.0 Supported
KIP-144 — Broker reconnect backoff 0.11.0 Supported
KIP-152 — More SASL; SASLAuthenticate 1.0.0 Supported
KIP-183 — Elect preferred leaders 2.2.0 Supported
KIP-185 — Idempotency is default 1.0.0 Supported
KIP-192 — Cleaner idempotence semantics 1.0.0 Supported
KIP-195 — CreatePartitions 1.0.0 Supported
KIP-204 — DeleteRecords via admin API 1.1.0 Supported
KIP-207 — New error in ListOffsets 2.2.0 Supported
KIP-219 — Client-side throttling 2.0.0 Supported
KIP-222 — Group operations via admin API 2.0.0 Supported
KIP-226 — Describe configs v1 1.1.0 Supported
KIP-227 — Incremental fetch 1.1.0 Supported
KIP-229 — DeleteGroups 1.1.0 Supported
KIP-249 — Delegation tokens in admin API 2.0.0 Supported
KIP-255 — SASL OAUTHBEARER 2.0.0 Supported
KIP-266 — Fix indefinite consumer timeouts 2.0.0 Supported (via context)
KIP-279 — OffsetForLeaderEpoch bump 2.0.0 Supported
KIP-289 — Default group.id to null 2.2.0 Supported
KIP-294 — TLS verification 2.0.0 Supported (via dialer)
KIP-302 — Use multiple addrs for resolved hostnames 2.1.0 Supported (via dialer)
KIP-320 — Fetcher: detect log truncation 2.1.0 Supported
KIP-322 — DeleteTopics disabled error code 2.1.0 Supported
KIP-339 — IncrementalAlterConfigs 2.3.0 Supported
KIP-341 — Sticky group bugfix ? Supported
KIP-342 — OAUTHBEARER extensions 2.1.0 Supported
KIP-345 — Static group membership 2.4.0 Supported
KIP-357 — List ACLs per principal via admin API 2.1.0 Supported
KIP-360 — Safe epoch bumping for UNKNOWN_PRODUCER_ID 2.5.0 Supported
KIP-361 — Allow disable auto topic creation 2.3.0 Supported
KIP-368 — Periodically reauthenticate SASL 2.2.0 Supported
KIP-369 — An always round robin produce partitioner 2.4.0 Supported
KIP-380 — Inter-broker protocol changes 2.2.0 Supported
KIP-389 — Group max size error 2.2.0 Supported
KIP-392 — Closest replica fetching w/ rack 2.2.0 Supported
KIP-394 — Require member.id for initial join request 2.2.0 Supported
KIP-396 — Commit offsets manually 2.4.0 Supported
KIP-412 — Dynamic log levels w/ IncrementalAlterConfigs 2.4.0 Supported
KIP-429 — Incremental rebalance (see KAFKA-8179) 2.4.0 Supported
KIP-430 — Authorized ops in DescribeGroups 2.3.0 Supported
KIP-447 — Producer scalability for EOS 2.5.0 Supported
KIP-455 — Replica reassignment API 2.4.0 Supported
KIP-460 — Leader election API 2.4.0 Supported
KIP-464 — CreateTopic defaults 2.4.0 Supported
KIP-467 — Per-record error codes when producing 2.4.0 Supported (and ignored)
KIP-480 — Sticky partition producing 2.4.0 Supported
KIP-482 — Tagged fields (KAFKA-8885) 2.4.0 Supported
KIP-496 — OffsetDelete admin command 2.4.0 Supported
KIP-497 — New AlterISR API 2.7.0 Supported
KIP-498 — Max bound on reads ? Supported
KIP-511 — Client name/version in ApiVersions request 2.4.0 Supported
KIP-514 — Bounded Flush 2.4.0 Supported (via context)
KIP-516 — Topic IDs ??? Supported as it is implemented
KIP-518 — List groups by state 2.6.0 Supported
KIP-519 — Configurable SSL "engine" 2.6.0 Supported (via dialer)
KIP-525 — CreateTopics v5 returns configs 2.4.0 Supported
KIP-526 — Reduce metadata lookups 2.5.0 Supported
KIP-533 — Default API timeout (total time, not per request) 2.5.0 Supported (via RetryTimeout)
KIP-546 — Client Quota APIs 2.5.0 Supported
KIP-554 — Broker side SCRAM APIs 2.7.0 Supported
KIP-559 — Protocol info in sync/join 2.5.0 Supported
KIP-568 — Explicit rebalance triggering on the consumer 2.6.0 Supported
KIP-569 — Docs & type in DescribeConfigs 2.6.0 Supported
KIP-570 — Leader epoch in StopReplica 2.6.0 Supported
KIP-580 — Exponential backoff 2.6.0 Supported
KIP-584 — Versioning scheme for features ? Supported (nothing to do yet)
KIP-588 — Producer recovery from txn timeout 2.7.0 Supported
KIP-590 — Envelope (broker only) 2.7.0 Supported
KIP-595 — New APIs for raft protocol 2.7.0 Supported
KIP-599 — Throttling on create/delete topic/partition 2.7.0 Supported
KIP-602 — Use all resolved addrs by default 2.6.0 Supported (via dialer)
KIP-651 — Support PEM 2.7.0 Supported (via dialer)
KIP-654 — Aborted txns with unflushed data is not fatal 2.7.0 Supported (default behavior)
KIP-664 — Describe producers / etc. 2.8.0 (mostly) Supported
KIP-679 — Strongest producer guarantee by default 3.0.0 Supported (by default always)
KIP-699 — Batch FindCoordinators 3.0.0 Supported
KIP-700 — DescribeCluster 2.8.0 Supported
KIP-709 — Batch OffsetFetch 3.0.0 Supported
KIP-730 - AllocateProducerIDs 3.0.0 Supported
KIP-734 — Support MaxTimestamp in ListOffsets 3.0.0 Supported (simple version bump)
KIP-735 — Bump default session timeout ? Supported

Missing from above but included in librdkafka is:

  • KIP-85, which does not seem relevant for franz-go
  • KIP-92 for consumer lag metrics, which is better suited for an external system via the admin api
  • KIP-223 for more metrics
  • KIP-235, which is confusing but may be implement via a custom dialer and custom kerberos?
  • KIP-359 to verify leader epoch when producing; this is easy to support but actually is not implemented in Kafka yet
  • KIP-421 for dynamic values in configs; librdkafka mentions it does not support it, and neither does franz-go for the same reason (we do not use a config file)
  • KIP-436 is about yet another metric
  • KIP-517, more metrics
Owner
Comments
  • ILLEGAL_SASL_STATE: Request is not valid given the current SASL state

    ILLEGAL_SASL_STATE: Request is not valid given the current SASL state

    We are using franz-go to connect to an AWS MSK cluster using an IAM role. We are seeing our OnPartitionsLost and/or OnPartitionsRevoked methods called each time franz-go attempts to re-authenticate with the MSK cluster using SASL/IAM. This is resulting in the service getting stuck in a re-authentication loop for 2-3 seconds each time before it re-creates the connection to Kafka.

    Logs:

    2022-11-10T00:23:54.933-07:00 [INFO] sasl lifetime minus 2.5s lower bound latency results in immediate reauthentication, sleeping 100ms to avoid spin-loop; broker: 3, session_lifetime: 1.068s, latency_lower_bound: 2.5s
    2022-11-10T00:23:55.035-07:00 [INFO] metadata update triggered; why: opportunistic load during source backoff
    2022-11-10T00:23:55.127-07:00 [INFO] sasl lifetime minus 2.5s lower bound latency results in immediate reauthentication, sleeping 100ms to avoid spin-loop; broker: 1, session_lifetime: 873ms, latency_lower_bound: 2.5s
    2022-11-10T00:23:55.241-07:00 [INFO] sasl lifetime minus 2.5s lower bound latency results in immediate reauthentication, sleeping 100ms to avoid spin-loop; broker: 1, session_lifetime: 760ms, latency_lower_bound: 2.5s
    2022-11-10T00:23:55.299-07:00 [INFO] sasl lifetime minus 2.5s lower bound latency results in immediate reauthentication, sleeping 100ms to avoid spin-loop; broker: 3, session_lifetime: 702ms, latency_lower_bound: 2.5s
    2022-11-10T00:23:55.413-07:00 [INFO] sasl lifetime minus 2.5s lower bound latency results in immediate reauthentication, sleeping 100ms to avoid spin-loop; broker: 3, session_lifetime: 588ms, latency_lower_bound: 2.5s
    2022-11-10T00:23:55.563-07:00 [INFO] metadata update triggered; why: re-updating metadata due to err: ILLEGAL_SASL_STATE: Request is not valid given the current SASL state.
    2022-11-10T00:23:55.618-07:00 [INFO] sasl lifetime minus 2.5s lower bound latency results in immediate reauthentication, sleeping 100ms to avoid spin-loop; broker: 2, session_lifetime: 383ms, latency_lower_bound: 2.5s
    2022-11-10T00:23:56.122-07:00 [ERROR] unable to initialize sasl; broker: 3, err: [***REDACTED***]: Session too short: SASL_AUTHENTICATION_FAILED: SASL Authentication failed.
    2022-11-10T00:23:56.131-07:00 [INFO] metadata update triggered; why: re-updating metadata due to err: ILLEGAL_SASL_STATE: Request is not valid given the current SASL state.
    2022-11-10T00:23:57.609-07:00 [INFO] metadata update triggered; why: re-updating metadata due to err: Cannot change principals during re-authentication from ***REDACTED***: ***REDACTED***: SASL_AUTHENTICATION_FAILED: SASL Authentication failed.
    2022-11-10T00:35:07.692-07:00 [INFO] sasl lifetime minus 2.5s lower bound latency results in immediate reauthentication, sleeping 100ms to avoid spin-loop; broker: 1, session_lifetime: 2.308s, latency_lower_bound: 2.5s
    2022-11-10T00:35:07.794-07:00 [INFO] metadata update triggered; why: opportunistic load during source backoff
    2022-11-10T00:35:07.855-07:00 [INFO] sasl lifetime minus 2.5s lower bound latency results in immediate reauthentication, sleeping 100ms to avoid spin-loop; broker: 1, session_lifetime: 2.146s, latency_lower_bound: 2.5s
    2022-11-10T00:35:07.904-07:00 [INFO] sasl lifetime minus 2.5s lower bound latency results in immediate reauthentication, sleeping 100ms to avoid spin-loop; broker: 3, session_lifetime: 2.096s, latency_lower_bound: 2.5s
    2022-11-10T00:35:07.969-07:00 [INFO] sasl lifetime minus 2.5s lower bound latency results in immediate reauthentication, sleeping 100ms to avoid spin-loop; broker: 1, session_lifetime: 2.032s, latency_lower_bound: 2.5s
    2022-11-10T00:35:08.090-07:00 [INFO] sasl lifetime minus 2.5s lower bound latency results in immediate reauthentication, sleeping 100ms to avoid spin-loop; broker: 1, session_lifetime: 1.91s, latency_lower_bound: 2.5s
    2022-11-10T00:35:08.195-07:00 [INFO] sasl lifetime minus 2.5s lower bound latency results in immediate reauthentication, sleeping 100ms to avoid spin-loop; broker: 2, session_lifetime: 1.805s, latency_lower_bound: 2.5s
    2022-11-10T00:35:08.205-07:00 [INFO] sasl lifetime minus 2.5s lower bound latency results in immediate reauthentication, sleeping 100ms to avoid spin-loop; broker: 1, session_lifetime: 1.796s, latency_lower_bound: 2.5s
    2022-11-10T00:35:08.330-07:00 [INFO] metadata update triggered; why: opportunistic load during source backoff
    2022-11-10T00:35:08.359-07:00 [INFO] sasl lifetime minus 2.5s lower bound latency results in immediate reauthentication, sleeping 100ms to avoid spin-loop; broker: 2, session_lifetime: 1.642s, latency_lower_bound: 2.5s
    2022-11-10T00:35:08.365-07:00 [INFO] sasl lifetime minus 2.5s lower bound latency results in immediate reauthentication, sleeping 100ms to avoid spin-loop; broker: 3, session_lifetime: 1.636s, latency_lower_bound: 2.5s
    2022-11-10T00:35:08.478-07:00 [INFO] sasl lifetime minus 2.5s lower bound latency results in immediate reauthentication, sleeping 100ms to avoid spin-loop; broker: 3, session_lifetime: 1.522s, latency_lower_bound: 2.5s
    2022-11-10T00:35:08.561-07:00 [INFO] sasl lifetime minus 2.5s lower bound latency results in immediate reauthentication, sleeping 100ms to avoid spin-loop; broker: 2, session_lifetime: 1.44s, latency_lower_bound: 2.5s
    2022-11-10T00:35:08.693-07:00 [INFO] sasl lifetime minus 2.5s lower bound latency results in immediate reauthentication, sleeping 100ms to avoid spin-loop; broker: 2, session_lifetime: 1.308s, latency_lower_bound: 2.5s
    2022-11-10T00:35:08.795-07:00 [INFO] heartbeat errored; group: ***REDACTED***, err: ILLEGAL_SASL_STATE: Request is not valid given the current SASL state.
    2022-11-10T00:35:08.795-07:00 [INFO] assigning partitions; why: clearing assignment at end of group management session, how: unassigning everything, input:
    2022-11-10T00:35:08.796-07:00 [ERROR] join and sync loop errored; group: ***REDACTED***, err: ILLEGAL_SASL_STATE: Request is not valid given the current SASL state., consecutive_errors: 5, backoff: 2.5s
    2022-11-10T00:35:09.023-07:00 [INFO] metadata update triggered; why: re-updating metadata due to err: ILLEGAL_SASL_STATE: Request is not valid given the current SASL state.
    2022-11-10T00:35:09.087-07:00 [INFO] sasl lifetime minus 2.5s lower bound latency results in immediate reauthentication, sleeping 100ms to avoid spin-loop; broker: 3, session_lifetime: 914ms, latency_lower_bound: 2.5s
    2022-11-10T00:35:09.206-07:00 [INFO] sasl lifetime minus 2.5s lower bound latency results in immediate reauthentication, sleeping 100ms to avoid spin-loop; broker: 3, session_lifetime: 795ms, latency_lower_bound: 2.5s
    2022-11-10T00:35:10.252-07:00 [INFO] metadata update triggered; why: re-updating metadata due to err: ILLEGAL_SASL_STATE: Request is not valid given the current SASL state.
    2022-11-10T00:35:11.296-07:00 [INFO] joining group; group: ***REDACTED***
    2022-11-10T00:35:11.354-07:00 [INFO] joined, balancing group; group: ***REDACTED***, member_id: ***REDACTED***, instance_id: <nil>, generation: 58, balance_protocol: cooperative-sticky, leader: true
    2022-11-10T00:35:11.354-07:00 [INFO] balancing group as leader
    2022-11-10T00:35:11.354-07:00 [INFO] balance group member; id: ***REDACTED***, interests: interested topics: [***REDACTED*** ***REDACTED*** ***REDACTED***], previously owned:
    2022-11-10T00:35:11.354-07:00 [INFO] balanced; plan: ***REDACTED***{***REDACTED***[0 1 2 3], ***REDACTED***[0 1 2 3], ***REDACTED***[0 1 2 3]}
    2022-11-10T00:35:11.354-07:00 [INFO] syncing; group: ***REDACTED***, protocol_type: consumer, protocol: cooperative-sticky
    2022-11-10T00:35:11.361-07:00 [INFO] synced; group: ***REDACTED***, assigned: ***REDACTED***[0 1 2 3], ***REDACTED***[0 1 2 3], ***REDACTED***[0 1 2 3]
    2022-11-10T00:35:11.361-07:00 [INFO] new group session begun; group: ***REDACTED***, added: ***REDACTED***[0 1 2 3], ***REDACTED***[0 1 2 3], ***REDACTED***[0 1 2 3], lost:
    2022-11-10T00:35:11.361-07:00 [INFO] beginning heartbeat loop; group: ***REDACTED***
    2022-11-10T00:35:11.406-07:00 [INFO] assigning partitions; why: newly fetched offsets for group ***REDACTED***, how: assigning everything new, keeping current assignment, input: ***REDACTED***[1{-2.-1 0} 2{-2.-1 0} 0{-2.-1 0} 3{-2.-1 0}], ***REDACTED***[2{24031.14 0} 1{23435.14 0} 0{27120.16 0} 3{26493.16 0}], ***REDACTED***[1{7953.16 0} 0{9211.13 0} 3{8975.14 0} 2{8149.12 0}]
    2022-11-10T00:37:02.642-07:00 [INFO] sasl lifetime minus 2.5s lower bound latency results in immediate reauthentication, sleeping 100ms to avoid spin-loop; broker: 1, session_lifetime: 2.359s, latency_lower_bound: 2.5s
    2022-11-10T00:37:02.743-07:00 [INFO] metadata update triggered; why: opportunistic load during source backoff
    2022-11-10T00:37:02.775-07:00 [INFO] sasl lifetime minus 2.5s lower bound latency results in immediate reauthentication, sleeping 100ms to avoid spin-loop; broker: 2, session_lifetime: 2.226s, latency_lower_bound: 2.5s
    
    
  • [Question] Recovering from INVALID_PRODUCER_EPOCH

    [Question] Recovering from INVALID_PRODUCER_EPOCH

    Hey @twmb, over the course of testing a transactional producer I've written I've encountered this error from EndTransaction after a rebalance:

    INVALID_PRODUCER_EPOCH: Producer attempted an operation with an old epoch.
    

    For a bit of context, this is from a consumer (currently Sarama, will be franz-go soon) that does the following:

    1. client.BeginTransaction
    2. Consumes a record from a topic
    3. Writes a bunch of records to a different topic (client.Send)
    4. If all goes well, client.EndTransaction(ctx, kgo.TryCommit)
    5. If something goes wrong in step 3 or 4, there's a client.AbortBufferedRecords followed by a client.EndTransaction(ctx, kgo.TryAbort)

    We're running multiple instances of these consumers, each reading from one or more partitions on the input topic and writing (randomly) to partitions on the output topic. These tasks can be relatively long running and rebalances may occur during any of those steps due to deploys, scaling, kafka operations, etc.

    What happens after a rebalance is they start throwing those kerr.InvalidProducerEpoch errors. A little digging suggests this is due to franz-go's handling of KIP-588 and that seems to make sense, but what isn't clear to me is how to recover from this situation. Right now, the producers just spin forever on that error - able to begin a transaction but not write records or end the transaction (though aborting still works).

    I remembered this doc from config.TransactionalID, but I'm not sure it applies to me since I'm not using a franz-go consumer.

    // Note that, unless using Kafka 2.5.0, a consumer group rebalance may be // problematic. Production should finish and be committed before the client // rejoins the group. It may be safer to use an eager group balancer and just // abort the transaction. Alternatively, any time a partition is revoked, you // could abort the transaction and reset offsets being consumed.

    So with that in mind and with the bold assumption that I'm doing this correctly in the first place, what is the proper way to recover from this situation with the franz-go API?

  • panic: runtime error: invalid memory address or nil pointer dereference

    panic: runtime error: invalid memory address or nil pointer dereference

    Client version v0.6.9 Kafka Version: kafka 2.6 Connection Auth: MTLS connection Auto commit disabled

    Our Kafka dev cluster can be under high load, so no idea if its related

    panic: runtime error: invalid memory address or nil pointer dereference
    [signal SIGSEGV: segmentation violation code=0x1 addr=0x0 pc=0x7d381f]
    
    goroutine 10119 [running]:
    github.com/twmb/franz-go/pkg/kgo.(*topicPartitions).load(...)
            /go/pkg/mod/github.com/twmb/[email protected]/pkg/kgo/topics_and_partitions.go:73
    github.com/twmb/franz-go/pkg/kgo.(*consumer).assignPartitions(0xc0007326a0, 0xc0030022d0, 0xc00084d500)
            /go/pkg/mod/github.com/twmb/[email protected]/pkg/kgo/consumer.go:357 +0x47f
    github.com/twmb/franz-go/pkg/kgo.(*groupConsumer).fetchOffsets(0xc00084d4a0, 0xfc34a8, 0xc002bcce40, 0xc0024914a0, 0x0, 0x0)
            /go/pkg/mod/github.com/twmb/[email protected]/pkg/kgo/consumer_group.go:1201 +0x4d7
    github.com/twmb/franz-go/pkg/kgo.(*groupConsumer).setupAssignedAndHeartbeat.func3(0xc002bd7080, 0xc002554060, 0xc00084d4a0, 0xc0024914a0, 0xfc34a8, 0xc002bcce40)
            /go/pkg/mod/github.com/twmb/[email protected]/pkg/kgo/consumer_group.go:750 +0x145
    created by github.com/twmb/franz-go/pkg/kgo.(*groupConsumer).setupAssignedAndHeartbeat
            /go/pkg/mod/github.com/twmb/[email protected]/pkg/kgo/consumer_group.go:746 +0x387
    
  • Possible record loss on unclean kill of consumer when using automatic offset commit

    Possible record loss on unclean kill of consumer when using automatic offset commit

    Because the offset commit and consumption are done independently it is possible to have record loss on the consumer side if the consumer process has a kill -9 applied to it mid loop. You can reproduce this by producing 1000 records, then run up a consumer and sleep at the 500 record mark. Then issue a kill -9 on the consumer process. When the consumer starts back up the full 1000 records sometimes don't all get consumed. This is presumably because the offsets for these records have already been committed. When you run the Go confluent lib this does not happen because the offset commit occurs inside the Poll call itself, so there is no message loss.

  • Kafka read error with SASL auth: TOPIC_AUTHORIZATION_FAILED: Not authorized to access topics: [Topic authorization failed.] + GROUP_AUTHORIZATION_FAILED: Not authorized to access group: Group authorization failed

    Kafka read error with SASL auth: TOPIC_AUTHORIZATION_FAILED: Not authorized to access topics: [Topic authorization failed.] + GROUP_AUTHORIZATION_FAILED: Not authorized to access group: Group authorization failed

    I have a service that continuosly writes (and then reads) from the same topic every 5 seconds.

    I have two different clients using franz-go, one with TLS auth and one with SASL via msk_iam auth.

    My writer seems to be fine but errors every few hours. The errors are always the same.

    TOPIC_AUTHORIZATION_FAILED: Not authorized to access topics: [Topic authorization failed.]
    

    I will get 200k of these errors over the course of 4 minutes and then everything resumes fine again.

    My reader code was based heavily on this example: https://github.com/twmb/franz-go/blob/master/examples/goroutine_per_partition_consuming/autocommit_marks/main.go

    This is a snipped of the code:

    for {
    	fetches := r.service.PollRecords(ctx, 1000)
    	err := fetches.Err0()
    	if err != nil {
    		if errors.Is(err, kafka.ErrClientClosed) {
    			contextLogger.Error().Err(err).Msg("Client closed")
    			return err
    		}
    		if errors.Is(err, context.Canceled) {
    			contextLogger.Error().Err(err).Msg("Context cancelled closed")
    			return err
    		}
    	}
    	fetches.EachError(func(topic string, partition int32, err error) {
    		contextLogger.Error().
    			Err(err).
    			Str("Topic", topic).
    			Int32("Partition", partition).
    			Msg("Cannot fetch message")
    	})
    	fetches.EachPartition(func(fetchedTopicPartition kafka.FetchTopicPartition) {
    		tp := topicPartition{
    			topic:     fetchedTopicPartition.Topic,
    			partition: fetchedTopicPartition.Partition,
    		}
    
    		r.consumers[tp].records <- fetchedTopicPartition
    	})
    
    	r.service.AllowRebalance()
    }
    

    And this is how the client is instantiated:

    	franzGoReader := &FranzGoReader{
    		consumers: make(map[topicPartition]*franzGoPartitionReader),
    		log:       contextLogger,
    	}
    
    	kafkaAddr := strings.Split(brokersEndpoint, ",")
    	clientOpts := []kafka.Opt{
    		kafka.SeedBrokers(kafkaAddr...),
    		kafka.ConsumeTopics(kafkaTopic),
    		kafka.ConsumerGroup(consumerGroup),
    		// default is CooperativeStickyBalancer
    		kafka.Balancers(kafka.RangeBalancer()),
    		kafka.ConsumeResetOffset(kafka.NewOffset().AtStart()),
    		kafka.OnPartitionsAssigned(franzGoReader.assigned),
    		kafka.OnPartitionsRevoked(franzGoReader.revoked),
    		kafka.OnPartitionsLost(franzGoReader.lost),
    		kafka.AutoCommitMarks(),
    		kafka.AutoCommitInterval(3 * time.Second),
    		kafka.BlockRebalanceOnPoll(),
    		kafka.WithLogger(&franzGoLogger{
    			log: contextLogger,
    		}),
    	}
    	if useAuth {
    		clientOpts = append(clientOpts, kafka.DialTLSConfig(&tls.Config{}))
    
    		sess, err := session.NewSession()
    		if err != nil {
    			contextLogger.Error().Err(err).Msg("Could not get aws session")
    		}
    		clientOpts = append(clientOpts, kafka.SASL(aws.ManagedStreamingIAM(func(ctx context.Context) (aws.Auth, error) {
    			val, err := sess.Config.Credentials.GetWithContext(ctx)
    			if err != nil {
    				return aws.Auth{}, err
    			}
    			return aws.Auth{
    				AccessKey:    val.AccessKeyID,
    				SecretKey:    val.SecretAccessKey,
    				SessionToken: val.SessionToken,
    				UserAgent:    "kafka-health-monitor",
    			}, nil
    		})))
    	}
    
    	cl, err := kafka.NewClient(clientOpts...)
    	if err != nil {
    		contextLogger.Error().Err(err).Msg("Could not create FranzGo client")
    		return nil, err
    	}
    
    

    partitions assigned callback:

    func (r *FranzGoReader) assigned(_ context.Context, cl *kafka.Client, assignedTopicPartitions map[string][]int32) {
    	contextLogger := r.log.With().Logger()
    	contextLogger.Debug().Msg("Partitions assigned")
    
    	for topic, partitions := range assignedTopicPartitions {
    		for _, partition := range partitions {
    			contextLoggerPartionReader := contextLogger.With().
    				Str("Topic", topic).
    				Int32("Partition", partition).
    				Logger()
    			partitionReader := &franzGoPartitionReader{
    				service:   cl,
    				log:       contextLoggerPartionReader,
    				topic:     topic,
    				partition: partition,
    
    				quit:    make(chan struct{}),
    				done:    make(chan struct{}),
    				records: make(chan kafka.FetchTopicPartition, 5),
    			}
    			r.consumers[topicPartition{topic, partition}] = partitionReader
    			go partitionReader.consume()
    		}
    	}
    }
    

    partitions lost callback:

    func (r *FranzGoReader) lost(_ context.Context, _ *kafka.Client, topicPartionsLost map[string][]int32) {
    	contextLogger := r.log.With().Logger()
    	contextLogger.Debug().Msg("Partitions lost")
    
    	r.killPartitionReaders(topicPartionsLost)
    }
    
    func (r *FranzGoReader) killPartitionReaders(topicPartions map[string][]int32) {
    	contextLogger := r.log.With().Logger()
    	contextLogger.Debug().Msg("Kill partitionReaders")
    
    	wg := &sync.WaitGroup{}
    	defer wg.Wait()
    
    	for topic, partitions := range topicPartions {
    		for _, partition := range partitions {
    			wg.Add(1)
    
    			tp := topicPartition{topic, partition}
    			partitionReader := r.consumers[tp]
    			delete(r.consumers, tp)
    			go func() {
    				partitionReader.close()
    				wg.Done()
    			}()
    		}
    	}
    }
    
    func (pr *franzGoPartitionReader) close() {
    	contextLogger := pr.log.With().Logger()
    	contextLogger.Debug().Msg("Closing partitionReader")
    
    	close(pr.quit)
    	contextLogger.Debug().Msg("Waiting for work to finish")
    	<-pr.done
    }
    
  • Getting

    Getting "invalid short sasl lifetime millis" periodically with AWS MSK using SASL authentication

    I'm getting "invalid short sasl lifetime millis" periodically (after a few hours) with AWS MSK using SASL authentication.

    Looking at the code, it errors out when the lifetimeMillis is less than 5 sec but would re-authenticate if the time is greater than 5 sec.

    The periodical re-authentication works most of the time because AWS auth token expires every 15min and I only see this issue after a few hours. When it happens, lifetimeMillis become less than 5s. Could this be a clock sync issue? Any idea how I can prevent this error from happening?

  • Fix integration test action

    Fix integration test action

    Hey @twmb, following up on #218, I think this should do the trick using the bitnami/kafka container in Zookeeperless mode. I had to adjust the tests a bit so the KGO_SEEDS env var is read by each test which needs to connect to Kafka.

    I also tried to use Redpanda, but there are some issues with it. Could you please let me know how you got the tests to pass against it? It may just be that I'm not configuring it correctly (maybe it doesn't like the fact that I'm running it as a single node?), but my Kafka knowledge is too limited to troubleshoot it. Here's what I tried:

    > docker run --rm -p9092:9092 docker.vectorized.io/vectorized/redpanda:latest start --default-log-level=debug --smp 1 --reserve-memory 0M --overprovisioned --set redpanda.enable_transactions=true --kafka-addr 0.0.0.0:9092 --advertise-kafka-addr 127.0.0.1:9092
    > # Wait for container to start
    > KGO_TEST_RF=1 KGO_TEST_RECORDS=50000 go test -v -count 1 ./...
    

    I pulled the current latest tag of the Redpanda container (sha256:84c8cd98ed7a50e4f378af55600c92c4fb097afab4ec164c3ded9b0e884754c4).

    I guess this log entry might be relevant: [ERROR] fatal InitProducerID error, failing all buffered records; broker: 1, err: INVALID_PRODUCER_EPOCH: Producer attempted an operation with an old epoch.

    Here are the full logs:

    redpanda.log test.log

  • Manual commit multiple partitions consumer

    Manual commit multiple partitions consumer

    Slightly modified version of https://github.com/twmb/franz-go/tree/master/examples/goroutine_per_partition_consuming

    Using manual commits.

    During testing of this i noticed that when adding new consumers I see errors from err := cl.CommitRecords(context.Background(), recs...) The errors where mainly

    error”:“ILLEGAL_GENERATION: Specified group generation id is not valid.”
    

    But also included

    REBALANCE_IN_PROGRESS: The group is rebalancing, so a rejoin is needed.”
    

    There did not seem to be a clear pattern in the errors

  • Producer failover

    Producer failover

    I've a Kafka cluster with 3 brokers. But I've faced a problem while one broker was falled down (ex., upgrading or failure). The producer continues trying to ProduceSync() to fallen broker infinitely. Sometimes producer switches to other workable broker.

    I've investigated how the ProduceSync() works and found out that it uses Sinc.produce() and as a consequence, it use Sinc.doSequenced(). Is it correct? Sinc.doSequenced() uses only one broker, fetched by Client.brokerOrErr(). And doesn't changed it after errors.

    Is it correct behaviour?

    How can I limit the producing loop by time? I want to drop an error if ProduceSync cannot to produce after 10s, example.

    <134>1 14:36:46.810145 beginning transaction transactional_id=http-kafka-28692 
    <135>1 14:36:46.810351 opening connection to broker addr=192.168.178.83:9093 broker=3 
    <135>1 14:36:46.817657 connection opened to broker broker=3 addr=192.168.178.83:9093 
    <135>1 14:36:46.817770 connection initialized successfully addr=192.168.178.83:9093 broker=3 
    <135>1 14:36:46.817821 wrote AddPartitionsToTxn v3 time_to_write=29.227µs err=<nil> broker=3 bytes_written=83 write_wait=7.403375ms 
    <135>1 14:36:46.823620 read AddPartitionsToTxn v3 err=<nil> broker=3 bytes_read=50 read_wait=78.86µs time_to_read=5.755618ms 
    <135>1 14:36:46.823723 retrying request tries=1 backoff=230.298076ms request_error=<nil> response_error=COORDINATOR_NOT_AVAILABLE: The coordinator is not available. 
    <135>1 14:36:47.057487 wrote FindCoordinator v4 broker=3 bytes_written=42 write_wait=185.586µs time_to_write=42.689µs err=<nil> 
    <135>1 14:36:47.058197 read FindCoordinator v4 broker=3 bytes_read=62 read_wait=145.794µs time_to_read=611.298µs err=<nil> 
    <135>1 14:36:47.058400 wrote AddPartitionsToTxn v3 write_wait=51.985µs time_to_write=36.932µs err=<nil> broker=3 bytes_written=83 
    <135>1 14:36:47.059458 read AddPartitionsToTxn v3 broker=3 bytes_read=50 read_wait=143.05µs time_to_read=910.882µs err=<nil> 
    <135>1 14:36:47.059522 retrying request tries=2 backoff=457.460797ms request_error=<nil> response_error=COORDINATOR_NOT_AVAILABLE: The coordinator is not available. 
    <135>1 14:36:47.520361 opening connection to broker addr=192.168.178.79:9093 broker=seed 0 
    <132>1 14:36:57.523937 unable to open connection to broker addr=192.168.178.79:9093 broker=seed 0 err=dial tcp 192.168.178.79:9093: i/o timeout 
    <135>1 14:36:57.524082 retrying request tries=1 backoff=284.533848ms request_error=unable to dial: dial tcp 192.168.178.79:9093: i/o timeout response_error=<nil> 
    <135>1 14:36:57.824323 opening connection to broker addr=192.168.178.79:9093 broker=1 
    <132>1 14:37:07.826475 unable to open connection to broker addr=192.168.178.79:9093 broker=1 err=dial tcp 192.168.178.79:9093: i/o timeout 
    <135>1 14:37:07.826858 retrying request request_error=unable to dial: dial tcp 192.168.178.79:9093: i/o timeout response_error=<nil> tries=2 backoff=586.29262ms 
    <135>1 14:37:08.416674 wrote FindCoordinator v4 time_to_write=44.76µs err=<nil> broker=3 bytes_written=42 write_wait=85.71µs 
    <135>1 14:37:08.417393 read FindCoordinator v4 broker=3 bytes_read=62 read_wait=127.606µs time_to_read=635.265µs err=<nil> 
    <135>1 14:37:08.417536 wrote AddPartitionsToTxn v3 bytes_written=83 write_wait=36.114µs time_to_write=29.627µs err=<nil> broker=3 
    <135>1 14:37:08.418749 read AddPartitionsToTxn v3 broker=3 bytes_read=50 read_wait=86.14µs time_to_read=1.111242ms err=<nil> 
    <135>1 14:37:08.418806 retrying request request_error=<nil> response_error=COORDINATOR_NOT_AVAILABLE: The coordinator is not available. tries=3 backoff=1.108865437s 
    <135>1 14:37:09.535911 opening connection to broker addr=192.168.178.82:9093 broker=seed 1 
    <132>1 14:37:19.536643 unable to open connection to broker broker=seed 1 err=dial tcp 192.168.178.82:9093: i/o timeout addr=192.168.178.82:9093 
    <135>1 14:37:19.536786 retrying request tries=1 backoff=231.021421ms request_error=unable to dial: dial tcp 192.168.178.82:9093: i/o timeout response_error=<nil> 
    <135>1 14:37:19.771543 opening connection to broker addr=192.168.178.79:9093 broker=1 
    <132>1 14:37:29.775081 unable to open connection to broker addr=192.168.178.79:9093 broker=1 err=dial tcp 192.168.178.79:9093: i/o timeout 
    <135>1 14:37:29.775508 retrying request tries=2 backoff=434.104674ms request_error=unable to dial: dial tcp 192.168.178.79:9093: i/o timeout response_error=<nil> 
    <135>1 14:37:30.210920 wrote FindCoordinator v4 write_wait=71.458µs time_to_write=47.876µs err=<nil> broker=3 bytes_written=42 
    <135>1 14:37:30.211422 read FindCoordinator v4 bytes_read=62 read_wait=151.959µs time_to_read=377.955µs err=<nil> broker=3 
    <135>1 14:37:30.211613 wrote AddPartitionsToTxn v3 broker=3 bytes_written=83 write_wait=51.084µs time_to_write=30.96µs err=<nil> 
    <135>1 14:37:30.212745 read AddPartitionsToTxn v3 broker=3 bytes_read=50 read_wait=120.874µs time_to_read=1.021783ms err=<nil> 
    <134>1 14:37:30.212836 metadata update triggered why=opportunistic load during sink backoff 
    <135>1 14:37:30.212905 opening connection to broker addr=192.168.178.83:9093 broker=seed 2 
    <135>1 14:37:30.220655 connection opened to broker addr=192.168.178.83:9093 broker=seed 2 
    <135>1 14:37:30.220725 connection initialized successfully addr=192.168.178.83:9093 broker=seed 2 
    <135>1 14:37:30.220782 wrote Metadata v11 broker=seed 2 bytes_written=65 write_wait=7.822396ms time_to_write=21.885µs err=<nil> 
    <135>1 14:37:30.224335 read Metadata v11 broker=seed 2 bytes_read=286 read_wait=48.847µs time_to_read=3.497792ms err=<nil> 
    <135>1 14:37:30.224395 metadata refresh topic partition data changed partition=0 new_leader=3 new_leader_epoch=29 old_leader=3 old_leader_epoch=28 topic=test-topic 
    <135>1 14:37:30.224448 metadata refresh topic partition data changed topic=test-topic partition=1 new_leader=3 new_leader_epoch=27 old_leader=3 old_leader_epoch=26 
    <135>1 14:37:30.224497 metadata refresh topic partition data changed topic=test-topic partition=2 new_leader=3 new_leader_epoch=35 old_leader=1 old_leader_epoch=34 
    <135>1 14:37:30.224541 metadata refresh topic partition data changed old_leader=1 old_leader_epoch=28 topic=test-topic partition=3 new_leader=3 new_leader_epoch=29 
    <135>1 14:37:30.224643 wrote FindCoordinator v4 err=<nil> broker=3 bytes_written=42 write_wait=23.683µs time_to_write=20.286µs 
    <135>1 14:37:30.225128 read FindCoordinator v4 broker=3 bytes_read=62 read_wait=91.079µs time_to_read=395.637µs err=<nil> 
    <135>1 14:37:30.225263 wrote AddPartitionsToTxn v3 bytes_written=83 write_wait=29.977µs time_to_write=27.46µs err=<nil> broker=3 
    <135>1 14:37:30.226259 read AddPartitionsToTxn v3 broker=3 bytes_read=50 read_wait=94.474µs time_to_read=894.645µs err=<nil> 
    <135>1 14:37:30.226318 retrying request tries=1 backoff=243.469586ms request_error=<nil> response_error=COORDINATOR_NOT_AVAILABLE: The coordinator is not available. 
    <135>1 14:37:30.481937 opening connection to broker addr=192.168.178.79:9093 broker=seed 0 
    <132>1 14:37:40.485280 unable to open connection to broker addr=192.168.178.79:9093 broker=seed 0 err=dial tcp 192.168.178.79:9093: i/o timeout 
    <135>1 14:37:40.485443 retrying request tries=1 backoff=216.634833ms request_error=unable to dial: dial tcp 192.168.178.79:9093: i/o timeout response_error=<nil> 
    <135>1 14:37:40.708462 wrote FindCoordinator v4 broker=3 bytes_written=42 write_wait=102.446µs time_to_write=36.821µs err=<nil> 
    <135>1 14:37:40.709121 read FindCoordinator v4 broker=3 bytes_read=62 read_wait=172.093µs time_to_read=511.864µs err=<nil> 
    ...
    <135>1 14:43:38.429327 wrote AddPartitionsToTxn v3 broker=3 bytes_written=83 write_wait=46.453µs time_to_write=34.388µs err=<nil> 
    <135>1 14:43:38.430795 read AddPartitionsToTxn v3 time_to_read=1.371719ms err=<nil> broker=3 bytes_read=50 read_wait=91.482µs 
    <134>1 14:43:38.430855 metadata update triggered why=opportunistic load during sink backoff 
    <135>1 14:43:38.431034 wrote Metadata v11 broker=3 bytes_written=65 write_wait=47.974µs time_to_write=27.359µs err=<nil> 
    <135>1 14:43:38.431955 read Metadata v11 time_to_read=842.77µs err=<nil> broker=3 bytes_read=294 read_wait=81.705µs 
    <135>1 14:43:38.432040 metadata refresh has identical topic partition data topic=test-topic partition=0 leader=3 leader_epoch=29 
    <135>1 14:43:38.432119 metadata refresh has identical topic partition data leader=3 leader_epoch=27 topic=test-topic partition=1 
    <135>1 14:43:38.432199 metadata refresh has identical topic partition data topic=test-topic partition=2 leader=3 leader_epoch=35 
    <135>1 14:43:38.432285 metadata refresh has identical topic partition data partition=3 leader=3 leader_epoch=29 topic=test-topic 
    <135>1 14:43:40.937708 wrote FindCoordinator v4 err=<nil> broker=3 bytes_written=42 write_wait=116.952µs time_to_write=64.928µs 
    <135>1 14:43:40.938280 read FindCoordinator v4 broker=3 bytes_read=62 read_wait=143.222µs time_to_read=463.45µs err=<nil> 
    <135>1 14:43:40.938462 wrote AddPartitionsToTxn v3 write_wait=39.842µs time_to_write=34.013µs err=<nil> broker=3 bytes_written=83 
    <135>1 14:43:40.962693 read AddPartitionsToTxn v3 bytes_read=50 read_wait=115.381µs time_to_read=24.075232ms err=<nil> broker=3 
    <135>1 14:43:40.962784 opening connection to broker addr=192.168.178.83:9093 broker=3 
    <135>1 14:43:40.970550 connection opened to broker addr=192.168.178.83:9093 broker=3 
    <135>1 14:43:40.970672 connection initialized successfully addr=192.168.178.83:9093 broker=3 
    <135>1 14:43:40.971699 wrote Produce v9 broker=3 bytes_written=702 write_wait=8.879035ms time_to_write=40.176µs err=<nil> 
    <135>1 14:43:41.043202 read Produce v9 err=<nil> broker=3 bytes_read=76 read_wait=188.858µs time_to_read=71.2313ms 
    <135>1 14:43:41.043333 produced broker=3 to=test-topic[3{115=>116}] 
    <134>1 14:43:41.043384 flushing 
    <135>1 14:43:41.043432 flushed 
    <135>1 14:43:41.043475 transaction ending, no group loaded; this must be a producer-only transaction, not consume-modify-produce EOS 
    <134>1 14:43:41.043522 ending transaction epoch=0 commit=true transactional_id=http-kafka-28692 producer_id=281000 
    <135>1 14:43:41.043570 wrote EndTxn v3 bytes_written=51 write_wait=46.112µs time_to_write=35.409µs err=<nil> broker=3 
    <135>1 14:43:41.070314 read EndTxn v3 broker=3 bytes_read=16 read_wait=70.508µs time_to_read=26.590678ms err=<nil> 
    
  • Finer grained transaction control needed - Feature request

    Finer grained transaction control needed - Feature request

    Hi there. Love the project and am considering using it. However, the GroupTransactSession semantics seem to be a bit limiting for concurrent processing of partitions. I have a consumer application that interacts with external systems, and it seems there is no good way to exclude a partition from a group transaction.

    in the eos example given:

    		if err := sess.Begin(); err != nil {
    			// Similar to above, we only encounter errors here if
    			// we are not transactional or are already in a
    			// transaction. We should not hit this error.
    			die("unable to start transaction: %v", err)
    		}
    
    		e := kgo.AbortingFirstErrPromise(sess.Client())
    		fetches.EachRecord(func(r *kgo.Record) {
    			sess.Produce(ctx, kgo.StringRecord("eos "+string(r.Value)), e.Promise())
    		})
    		committed, err := sess.End(ctx, e.Err() == nil)
    		
    

    I could throw each record into a go routine for it's partitions and then wait for all of them to finish before consuming the next set of records, but if one partition is running slow, the entire batch returned in fetches will block. This seems like both a throughput and latency issue for my use case.

    The shorthand of what I would like to be able to do is something like this:

    fetches := client.PollFetches(ctx)
    
    fetches.EachPartition(func (p) {
       go func () {
           producerClient := transactionalProducerPool.Borrow()
           defer transactionalProducerPool.Release(producerClient)
           producerClient.BeginTransaction()
           process(p.Records)
           producerClient.Flush()
           producerClient.CommitOffsetsForTransaction(getHighestOffsets(p.Records))
           producerClient.EndTransaction()
       }()
    })
    

    The CommitOffsetsForTransaction method existed at one point as it is referenced in the documentation for EndTransaction. It seems that this method was made package private. Is there any way to export this method again (or some version of it)?

  • error=

    error="unknown attributes on uncompressed message 8"

    We are piloting this project out and are running in to this error when trying to consume from one of our brokers:

    unknown attributes on uncompressed message 8
    

    The error is being returned from fetches.Errors()

    This particular broker cluster has the following settings:

    version=2.2.1
    inter.broker.protocol.version: "2.2"
    log.message.format.version: "0.10.1"
    

    We do not see the same issue on another broker cluster with the following settings:

    version: 2.7.1
    inter.broker.protocol.version: 2.7.1
    log.message.format.version: "2.7"
    

    I've tried passing different versions to the client via kgo.MaxVersions to no avail

    Is there some other option that I should be setting here or is there any way to get more debug information about what is happening here?

  • Simpler function for creating a single topic in admin client

    Simpler function for creating a single topic in admin client

    Problem

    Creating a single topic is possibly one of the most popular functions used in the franz-go admin client. Currently there's only a function for creating multiple topics with the same configuration which may be handy. However, if you only want to create one topic and check if that was successful the required error checking remind me of using the kmsg package. I don't like the fact that I have to check for an error at multiple places to figure out whether my operation was successful or not:

    		createTopicResponses, err := k.admCl.CreateTopics(
    			ctx,
    			1,
    			1,
    			map[string]*string{
    				"cleanup.policy": kadm.StringPtr("compact"),
    				"segment.bytes":  kadm.StringPtr("8388608"), // 8 MiB
    			},
    			k.cfg.kafkaTopicName,
    		)
    		if err != nil {
    			return fmt.Errorf("failed to create topic: %w", err)
    		}
    
    		topicResponse, exists := createTopicResponses[k.cfg.kafkaTopicName]
    		if !exists {
    			return fmt.Errorf("no topic response for the requested topic creation request")
    		}
    		if topicResponse.Err != nil {
    			return fmt.Errorf("failed to create topic: %w", err)
    		}
    

    Proposal:

    I think there should be an API that offers the creation of a single topic:

    // CreateTopic issues a create topics request with the given partitions,
    // replication factor, and (optional) configs for the given topic name. Under the hood,
    // this uses the default 15s request timeout and lets Kafka choose where to
    // place partitions.
    //
    // This package includes a StringPtr function to aid in building config values.
    //
    // If the topic could not be created this function will return an error. An error may be returned
    // due to authorization failure, a failed network request, a missing controller or other issues.
    func (cl *Client) CreateTopic(
    	ctx context.Context,
    	partitions int32,
    	replicationFactor int16,
    	configs map[string]*string,
            topic string) (CreateTopicResponse, error) {}
    

    Note: While this looks very similar to the CreateTopics API - just for a single topic - the error handling here is more opinionated as errors reported on the Kafka API (for each individual topic) will also be returned as plain errors. This does not make sense for the CreateTopics method as partial failure/success may be an option. To make this difference clearer one could also consider naming this CreateTopicSimple()

  • Is it ok to keep the producer connection open?

    Is it ok to keep the producer connection open?

    I have a workflow where I post the success event at the end to Kafka. Currently it takes about 2 seconds to create connection to Kafka > Push message > close the connection.

    I need to push millions of messages into Kafka. I wonder if it would be ok for me to pass the connection into the Context at the very top of the workflow into my worker, and then I never close it and just keep it open and use it over and over again.

    Would I be leaking resources?

  • UNKNOWN_TOPIC_ID: This server does not host this topic ID

    UNKNOWN_TOPIC_ID: This server does not host this topic ID

    I'm unable to consume from a topic created in Confluent Platform 7.2.1 due to UNKNOWN_TOPIC_ID errors. I'm using the latest version of franz-go (v.1.10.4).

    Topic ID is a fairly recent change to the fetch protocol (KIP-516) and the latest version of franz-go includes a fix for handling unknown topic IDs, but it doesn't help in this case:

    See consumer.log for debug logging.

  • Add Kotel plugin

    Add Kotel plugin

    This PR adds OpenTelemetry support to Franz-go as a plugin, allowing for the export of traces and metrics using OpenTelemetry. We have made every effort to adhere to the Open Telemetry specification and follow conventions for span names, attributes, and other elements, as outlined in the Messaging Semantic Conventions.

    Our main goal is to track the movement of requests from their source to their destination. To do this, we create three spans for traces, one of which is optional. The OnProduceRecordBuffered span represents the send span, while the OnProduceRecordUnbuffered span completes the send span and records any errors. The OnFetchRecordBuffered span represents the receive span, and may also create a log-append span. This log-append span is a debatable feature, but was requested by our team.

    Both producer and consumer clients can use tracing contexts for context propagation. For example, a producer client may instrument an HTTP request and track it as it moves through Kafka, while a consumer client can extract tracing contexts from records and propagate them downstream to Kafka or other systems. Consumers can also add process spans after consuming records.

    For metrics, we have followed the same logic used in the existing metric plugin.

    We hope that these changes will improve the functionality of Franz-go and provide valuable insights through the use of OpenTelemetry. We have made every effort to ensure that these changes adhere to the Open Telemetry specification and follow best practices. If you have any questions or concerns, please don't hesitate to let us know. We look forward to your approval of this PR.

  • Error run with armv7 ?(v1.10.4)

    Error run with armv7 ?(v1.10.4)

    Error in 👇 ? if atomic.AddInt64(&p.bufferedRecords, 1) > cl.cfg.maxBufferedRecords { xxxx }

    panic: unaligned 64-bit atomic operation
    
    goroutine 37 [running]:
    runtime/internal/atomic.panicUnaligned()
    	/usr/local/go/src/runtime/internal/atomic/unaligned.go:8 +0x24
    runtime/internal/atomic.Xadd64(0x18c6254, 0x1)
    	/usr/local/go/src/runtime/internal/atomic/atomic_arm.s:258 +0x14
    github.com/twmb/franz-go/pkg/kgo.(*Client).produce(0x18c6000, {0x63be48, 0x14a6000}, 0x17465b0, 0x5a80e4, 0x1)
    	/root/go/pkg/mod/github.com/twmb/[email protected]/pkg/kgo/producer.go:381 +0xd4
    github.com/twmb/franz-go/pkg/kgo.(*Client).Produce(...)
    	/root/go/pkg/mod/github.com/twmb/[email protected]/pkg/kgo/producer.go:355
    pvp-agent/net.(*KafkaEmit).Start.func1(0x14faa40, 0x14f8720)
    	/opt/pvp-agent/src/net/kafka_emit.go:74 +0x3b4
    created by pvp-agent/net.(*KafkaEmit).Start
    	/opt/pvp-agent/src/net/kafka_emit.go:28 +0xd8
    exit status 2
    
    
Confluent's Apache Kafka Golang client

Confluent's Golang Client for Apache KafkaTM confluent-kafka-go is Confluent's Golang client for Apache Kafka and the Confluent Platform. Features: Hi

Dec 30, 2022
Cluster extensions for Sarama, the Go client library for Apache Kafka 0.9

Cluster extensions for Sarama, the Go client library for Apache Kafka 0.9 (and later).

Dec 28, 2022
Modern CLI for Apache Kafka, written in Go.
Modern CLI for Apache Kafka, written in Go.

Kaf Kafka CLI inspired by kubectl & docker Install Install from source: go get -u github.com/birdayz/kaf/cmd/kaf Install binary: curl https://raw.git

Dec 31, 2022
Study project that uses Apache Kafka as syncing mechanism between two databases, with producers and consumers written in Go.

Kafka DB Sync Study project that uses Apache Kafka as syncing mechanisms between a monolith DB and a microservice. The main purpose of this project is

Dec 5, 2021
Sarama is a Go library for Apache Kafka 0.8, and up.

sarama Sarama is an MIT-licensed Go client library for Apache Kafka version 0.8 (and later). Getting started API documentation and examples are availa

Jan 1, 2023
Apache Kafka Web UI for exploring messages, consumers, configurations and more with a focus on a good UI & UX.
Apache Kafka Web UI for exploring messages, consumers, configurations and more with a focus on a good UI & UX.

Kowl - Apache Kafka Web UI Kowl (previously known as Kafka Owl) is a web application that helps you to explore messages in your Apache Kafka cluster a

Jan 3, 2023
CLI Tool to Stress Apache Kafka Clusters

Kafka Stress - Stress Test Tool for Kafka Clusters, Producers and Consumers Tunning Installation Docker docker pull fidelissauro/kafka-stress:latest d

Nov 13, 2022
A quick introduction to how Apache Kafka works and differs from other messaging systems using an example application.
A quick introduction to how Apache Kafka works and differs from other messaging systems using an example application.

Apache Kafka in 6 minutes A quick introduction to how Apache Kafka works and differs from other messaging systems using an example application. In thi

Oct 27, 2021
Testing Apache Kafka using Go.

Apache Kafka Go Testing Apache Kafka using Go. Instructions Provision the single node Kafka cluster using Docker: docker-compose -p apache-kafka-go up

Dec 17, 2021
provider-kafka is a Crossplane Provider that is used to manage Kafka resources.

provider-kafka provider-kafka is a Crossplane Provider that is used to manage Kafka resources. Usage Create a provider secret containing a json like t

Oct 29, 2022
A CLI tool for interacting with Kafka through the Confluent Kafka Rest Proxy

kafkactl Table of contents kafkactl Table of contents Overview Build Development Overview kafkactl is a CLI tool to interact with Kafka through the Co

Nov 1, 2021
Apache Pulsar Go Client Library

Apache Pulsar Go Client Library A Go client library for the Apache Pulsar project. Goal This projects is developing a pure-Go client library for Pulsa

Jan 4, 2023
Go client libraries for Kafka

kafka-go Motivations We rely on both Go and Kafka a lot at Segment. Unfortunately, the state of the Go client libraries for Kafka at the time of this

Jan 10, 2022
Experiments using Go 1.18beta1's Generic typings and the Segmentio kafka-go consumer client

Experiments using Go 1.18beta1's Generic typings and the Segmentio kafka-go consumer client

Jan 28, 2022
Implementation of the NELI leader election protocol for Go and Kafka
Implementation of the NELI leader election protocol for Go and Kafka

goNELI Implementation of the NELI leader election protocol for Go and Kafka. goNELI encapsulates the 'fast' variation of the protocol, running in excl

Dec 8, 2022
ChanBroker, a Broker for goroutine, is simliar to kafka

Introduction chanbroker, a Broker for goroutine, is simliar to kafka In chanbroker has three types of goroutine: Producer Consumer(Subscriber) Broker

Aug 12, 2021
Easy to use distributed event bus similar to Kafka
Easy to use distributed event bus similar to Kafka

chukcha Easy to use distributed event bus similar to Kafka. The event bus is designed to be used as a persistent intermediate storage buffer for any k

Dec 30, 2022
Kafka implemented in Golang with built-in coordination (No ZooKeeper, single binary install, Cloud Native)

Jocko Distributed commit log service in Go that is wire compatible with Kafka. Created by @travisjeffery, continued by nash. Goals: Protocol compatibl

Aug 9, 2021
kafka watcher for casbin library

Casbin Kafka Watcher Casbin watcher for kafka This watcher library will enable users to dynamically change casbin policies through kakfa messages Infl

May 8, 2021