franz-go contains a high performance, pure Go library for interacting with Kafka from 0.8.0 through 2.7.0+. Producing, consuming, transacting, administrating, etc.

franz-go - Apache Kafka client written in Go

GoDev GitHub GitHub tag (latest SemVer) Discord Chat

Franz-go is an all-encompassing Apache Kafka client fully written Go. This library aims to provide every Kafka feature from Apache Kafka v0.8.0 onward. It has support for transactions, regex topic consuming, the latest partitioning strategies, data loss detection, closest replica fetching, and more. If a client KIP exists, this library aims to support it.

This library attempts to provide an intuitive API while interacting with Kafka the way Kafka expects (timeouts, etc.).

Features

  • Feature complete client (up to Kafka v2.7.0+)
  • Supported compression types: snappy, gzip, lz4 and zstd
  • SSL/TLS Support
  • Exactly once semantics / idempotent producing
  • Transactions support
  • All SASL mechanisms are supported (OAuthBearer, GSSAPI/Kerberos, SCRAM-SHA-256/512 and plain)
  • Supported Kafka versions >=0.8
  • Provides low level functionality (such as sending API requests) as well as high level functionality (e.g. consuming in groups)
  • Utilizes modern & idiomatic Go (support for contexts, variadic configuration options, ...)
  • Highly performant, see Performance (benchmarks will be added)
  • Written in pure Go (no wrapper lib for a C library or other bindings)
  • Ability to add detailed log messages or metrics using hooks

Getting started

Basic usage for producing and consuming Kafka messages looks like this:

seeds := []string{"localhost:9092"}
client, err := kgo.NewClient(kgo.SeedBrokers(seeds...))
if err != nil {
    panic(err)
}
defer client.Close()

ctx := context.Background()

// 1.) Producing a message
// All record production goes through Produce, and the callback can be used
// to allow for syncronous or asyncronous production.
var wg sync.WaitGroup
wg.Add(1)
record := &kgo.Record{Topic: "foo", Value: []byte("bar")}
err := client.Produce(ctx, record, func(_ *Record, err error) {
        defer wg.Done()
        if err != nil {
                fmt.Printf("record had a produce error: %v\n", err)
        }

}
if err != nil {
        panic("we are unable to produce if the context is canceled, we have hit max buffered," +
                "or if we are transactional and not in a transaction")
}
wg.Wait()

// 2.) Consuming messages from a topic
// Consuming can either be direct (no consumer group), or through a group. Below, we use a group.
// client.AssignGroup("my-group-identifier", kgo.GroupTopics("foo"))
for {
        fetches := client.PollFetches(ctx)
        iter := fetches.RecordIter()
        for !iter.Done() {
            record := iter.Next()
            fmt.Println(string(record.Value))
        }
}

Version Pinning

By default, the client issues an ApiVersions request on connect to brokers and defaults to using the maximum supported version for requests that each broker supports.

Kafka 0.10.0 introduced the ApiVersions request; if you are working with brokers older than that, you must use the kversions package. Use the MaxVersions option for the client if you do so.

As well, it is recommended to set the MaxVersions to the version of your broker cluster. Until KIP-584 is implemented, it is possible that if you do not pin a max version, this client will speak with some features to one broker while not to another when you are in the middle of a broker update roll.

Metrics

Using hooks you can attach to any events happening within franz-go. This allows you to use your favorite metric library and collect the metrics you are interested in.

Supported KIPs

Theoretically, this library supports every (non-Java-specific) client facing KIP. Most are tested, some need testing. Any KIP that simply adds or modifies a protocol is supported by code generation.

  • KIP-12 (sasl & ssl; 0.9.0)
  • KIP-13 (throttling; supported but not obeyed)
  • KIP-31 (relative offsets in message set; 0.10.0)
  • KIP-32 (timestamps in message set v1; 0.10.0)
  • KIP-35 (adds ApiVersion; 0.10.0)
  • KIP-36 (rack aware replica assignment; 0.10.0)
  • KIP-40 (ListGroups and DescribeGroup v0; 0.9.0)
  • KIP-43 (sasl enhancements & handshake; 0.10.0)
  • KIP-54 (sticky group assignment)
  • KIP-62 (join group rebalnce timeout, background thread heartbeats; v0.10.1)
  • KIP-74 (fetch response size limit; 0.10.1)
  • KIP-78 (cluster id in metadata; 0.10.1)
  • KIP-79 (list offset req/resp timestamp field; 0.10.1)
  • KIP-84 (sasl scram; 0.10.2)
  • KIP-98 (EOS; 0.11.0)
  • KIP-101 (offset for leader epoch introduced; broker usage yet; 0.11.0)
  • KIP-107 (delete records; 0.11.0)
  • KIP-108 (validate create topic; 0.10.2)
  • KIP-110 (zstd; 2.1.0)
  • KIP-112 (JBOD disk failure, protocol changes; 1.0.0)
  • KIP-113 (JBOD log dir movement, protocol additions; 1.0.0)
  • KIP-124 (request rate quotas; 0.11.0)
  • KIP-133 (describe & alter configs; 0.11.0)
  • KIP-152 (more sasl, introduce sasl authenticate; 1.0.0)
  • KIP-183 (elect preferred leaders; 2.2.0)
  • KIP-185 (idempotent is default; 1.0.0)
  • KIP-195 (create partitions request; 1.0.0)
  • KIP-207 (new error in list offset request; 2.2.0)
  • KIP-219 (throttling happens after response; 2.0.0)
  • KIP-226 (describe configs v1; 1.1.0)
  • KIP-227 (incremental fetch requests; 1.1.0)
  • KIP-229 (delete groups request; 1.1.0)
  • KIP-255 (oauth via sasl/oauthbearer; 2.0.0)
  • KIP-279 (leader / follower failover; changed offsets for leader epoch; 2.0.0)
  • KIP-320 (fetcher log truncation detection; 2.1.0)
  • KIP-322 (new error when delete topics is disabled; 2.1.0)
  • KIP-339 (incremental alter configs; 2.3.0)
  • KIP-341 (sticky group bug fix)
  • KIP-342 (oauth extensions; 2.1.0)
  • KIP-345 (static group membership, see KAFKA-8224)
  • KIP-360 (safe epoch bumping for UNKNOWN_PRODUCER_ID; 2.5.0)
  • KIP-368 (periodically reauth sasl; 2.2.0)
  • KIP-369 (always round robin produce partitioner; 2.4.0)
  • KIP-380 (inter-broker command changes; 2.2.0)
  • KIP-392 (fetch request from closest replica w/ rack; 2.2.0)
  • KIP-394 (require member.id for initial join; 2.2.0)
  • KIP-412 (dynamic log levels with incremental alter configs; 2.4.0)
  • KIP-429 (incremental rebalance, see KAFKA-8179; 2.4.0)
  • KIP-430 (include authorized ops in describe groups; 2.3.0)
  • KIP-447 (transaction changes to better support group changes; 2.5.0)
  • KIP-455 (admin replica reassignment; 2.4.0)
  • KIP-460 (admin leader election; 2.4.0)
  • KIP-464 (defaults for create topic; 2.4.0)
  • KIP-467 (produce response error change for per-record errors; 2.4.0)
  • KIP-480 (sticky partition producing; 2.4.0)
  • KIP-482 (tagged fields; KAFKA-8885; 2.4.0)
  • KIP-496 (offset delete admin command; 2.4.0)
  • KIP-497 (new API to alter ISR; 2.7.0)
  • KIP-498 (add max bound on reads; unimplemented in Kafka)
  • KIP-511 (add client name / version in apiversions req; 2.4.0)
  • KIP-518 (list groups by state; 2.6.0)
  • KIP-525 (create topics v5 returns configs; 2.4.0)
  • KIP-526 (reduce metadata lookups; done minus part 2, which we wont do)
  • KIP-546 (client quota APIs; 2.5.0)
  • KIP-554 (broker side SCRAM API; 2.7.0)
  • KIP-559 (protocol info in sync / join; 2.5.0)
  • KIP-569 (doc/type in describe configs; 2.6.0)
  • KIP-570 (leader epoch in stop replica; 2.6.0)
  • KIP-580 (exponential backoff; 2.6.0)
  • KIP-588 (producer recovery from txn timeout; 2.7.0)
  • KIP-590 (support for forwarding admin requests; 2.7.0)
  • KIP-595 (new APIs for raft protocol; 2.7.0)
  • KIP-599 (throttle create/delete topic/partition; 2.7.0)
  • KIP-700 (describe cluster; 2.8.0)
Owner
Comments
  • ILLEGAL_SASL_STATE: Request is not valid given the current SASL state

    ILLEGAL_SASL_STATE: Request is not valid given the current SASL state

    We are using franz-go to connect to an AWS MSK cluster using an IAM role. We are seeing our OnPartitionsLost and/or OnPartitionsRevoked methods called each time franz-go attempts to re-authenticate with the MSK cluster using SASL/IAM. This is resulting in the service getting stuck in a re-authentication loop for 2-3 seconds each time before it re-creates the connection to Kafka.

    Logs:

    2022-11-10T00:23:54.933-07:00 [INFO] sasl lifetime minus 2.5s lower bound latency results in immediate reauthentication, sleeping 100ms to avoid spin-loop; broker: 3, session_lifetime: 1.068s, latency_lower_bound: 2.5s
    2022-11-10T00:23:55.035-07:00 [INFO] metadata update triggered; why: opportunistic load during source backoff
    2022-11-10T00:23:55.127-07:00 [INFO] sasl lifetime minus 2.5s lower bound latency results in immediate reauthentication, sleeping 100ms to avoid spin-loop; broker: 1, session_lifetime: 873ms, latency_lower_bound: 2.5s
    2022-11-10T00:23:55.241-07:00 [INFO] sasl lifetime minus 2.5s lower bound latency results in immediate reauthentication, sleeping 100ms to avoid spin-loop; broker: 1, session_lifetime: 760ms, latency_lower_bound: 2.5s
    2022-11-10T00:23:55.299-07:00 [INFO] sasl lifetime minus 2.5s lower bound latency results in immediate reauthentication, sleeping 100ms to avoid spin-loop; broker: 3, session_lifetime: 702ms, latency_lower_bound: 2.5s
    2022-11-10T00:23:55.413-07:00 [INFO] sasl lifetime minus 2.5s lower bound latency results in immediate reauthentication, sleeping 100ms to avoid spin-loop; broker: 3, session_lifetime: 588ms, latency_lower_bound: 2.5s
    2022-11-10T00:23:55.563-07:00 [INFO] metadata update triggered; why: re-updating metadata due to err: ILLEGAL_SASL_STATE: Request is not valid given the current SASL state.
    2022-11-10T00:23:55.618-07:00 [INFO] sasl lifetime minus 2.5s lower bound latency results in immediate reauthentication, sleeping 100ms to avoid spin-loop; broker: 2, session_lifetime: 383ms, latency_lower_bound: 2.5s
    2022-11-10T00:23:56.122-07:00 [ERROR] unable to initialize sasl; broker: 3, err: [***REDACTED***]: Session too short: SASL_AUTHENTICATION_FAILED: SASL Authentication failed.
    2022-11-10T00:23:56.131-07:00 [INFO] metadata update triggered; why: re-updating metadata due to err: ILLEGAL_SASL_STATE: Request is not valid given the current SASL state.
    2022-11-10T00:23:57.609-07:00 [INFO] metadata update triggered; why: re-updating metadata due to err: Cannot change principals during re-authentication from ***REDACTED***: ***REDACTED***: SASL_AUTHENTICATION_FAILED: SASL Authentication failed.
    2022-11-10T00:35:07.692-07:00 [INFO] sasl lifetime minus 2.5s lower bound latency results in immediate reauthentication, sleeping 100ms to avoid spin-loop; broker: 1, session_lifetime: 2.308s, latency_lower_bound: 2.5s
    2022-11-10T00:35:07.794-07:00 [INFO] metadata update triggered; why: opportunistic load during source backoff
    2022-11-10T00:35:07.855-07:00 [INFO] sasl lifetime minus 2.5s lower bound latency results in immediate reauthentication, sleeping 100ms to avoid spin-loop; broker: 1, session_lifetime: 2.146s, latency_lower_bound: 2.5s
    2022-11-10T00:35:07.904-07:00 [INFO] sasl lifetime minus 2.5s lower bound latency results in immediate reauthentication, sleeping 100ms to avoid spin-loop; broker: 3, session_lifetime: 2.096s, latency_lower_bound: 2.5s
    2022-11-10T00:35:07.969-07:00 [INFO] sasl lifetime minus 2.5s lower bound latency results in immediate reauthentication, sleeping 100ms to avoid spin-loop; broker: 1, session_lifetime: 2.032s, latency_lower_bound: 2.5s
    2022-11-10T00:35:08.090-07:00 [INFO] sasl lifetime minus 2.5s lower bound latency results in immediate reauthentication, sleeping 100ms to avoid spin-loop; broker: 1, session_lifetime: 1.91s, latency_lower_bound: 2.5s
    2022-11-10T00:35:08.195-07:00 [INFO] sasl lifetime minus 2.5s lower bound latency results in immediate reauthentication, sleeping 100ms to avoid spin-loop; broker: 2, session_lifetime: 1.805s, latency_lower_bound: 2.5s
    2022-11-10T00:35:08.205-07:00 [INFO] sasl lifetime minus 2.5s lower bound latency results in immediate reauthentication, sleeping 100ms to avoid spin-loop; broker: 1, session_lifetime: 1.796s, latency_lower_bound: 2.5s
    2022-11-10T00:35:08.330-07:00 [INFO] metadata update triggered; why: opportunistic load during source backoff
    2022-11-10T00:35:08.359-07:00 [INFO] sasl lifetime minus 2.5s lower bound latency results in immediate reauthentication, sleeping 100ms to avoid spin-loop; broker: 2, session_lifetime: 1.642s, latency_lower_bound: 2.5s
    2022-11-10T00:35:08.365-07:00 [INFO] sasl lifetime minus 2.5s lower bound latency results in immediate reauthentication, sleeping 100ms to avoid spin-loop; broker: 3, session_lifetime: 1.636s, latency_lower_bound: 2.5s
    2022-11-10T00:35:08.478-07:00 [INFO] sasl lifetime minus 2.5s lower bound latency results in immediate reauthentication, sleeping 100ms to avoid spin-loop; broker: 3, session_lifetime: 1.522s, latency_lower_bound: 2.5s
    2022-11-10T00:35:08.561-07:00 [INFO] sasl lifetime minus 2.5s lower bound latency results in immediate reauthentication, sleeping 100ms to avoid spin-loop; broker: 2, session_lifetime: 1.44s, latency_lower_bound: 2.5s
    2022-11-10T00:35:08.693-07:00 [INFO] sasl lifetime minus 2.5s lower bound latency results in immediate reauthentication, sleeping 100ms to avoid spin-loop; broker: 2, session_lifetime: 1.308s, latency_lower_bound: 2.5s
    2022-11-10T00:35:08.795-07:00 [INFO] heartbeat errored; group: ***REDACTED***, err: ILLEGAL_SASL_STATE: Request is not valid given the current SASL state.
    2022-11-10T00:35:08.795-07:00 [INFO] assigning partitions; why: clearing assignment at end of group management session, how: unassigning everything, input:
    2022-11-10T00:35:08.796-07:00 [ERROR] join and sync loop errored; group: ***REDACTED***, err: ILLEGAL_SASL_STATE: Request is not valid given the current SASL state., consecutive_errors: 5, backoff: 2.5s
    2022-11-10T00:35:09.023-07:00 [INFO] metadata update triggered; why: re-updating metadata due to err: ILLEGAL_SASL_STATE: Request is not valid given the current SASL state.
    2022-11-10T00:35:09.087-07:00 [INFO] sasl lifetime minus 2.5s lower bound latency results in immediate reauthentication, sleeping 100ms to avoid spin-loop; broker: 3, session_lifetime: 914ms, latency_lower_bound: 2.5s
    2022-11-10T00:35:09.206-07:00 [INFO] sasl lifetime minus 2.5s lower bound latency results in immediate reauthentication, sleeping 100ms to avoid spin-loop; broker: 3, session_lifetime: 795ms, latency_lower_bound: 2.5s
    2022-11-10T00:35:10.252-07:00 [INFO] metadata update triggered; why: re-updating metadata due to err: ILLEGAL_SASL_STATE: Request is not valid given the current SASL state.
    2022-11-10T00:35:11.296-07:00 [INFO] joining group; group: ***REDACTED***
    2022-11-10T00:35:11.354-07:00 [INFO] joined, balancing group; group: ***REDACTED***, member_id: ***REDACTED***, instance_id: <nil>, generation: 58, balance_protocol: cooperative-sticky, leader: true
    2022-11-10T00:35:11.354-07:00 [INFO] balancing group as leader
    2022-11-10T00:35:11.354-07:00 [INFO] balance group member; id: ***REDACTED***, interests: interested topics: [***REDACTED*** ***REDACTED*** ***REDACTED***], previously owned:
    2022-11-10T00:35:11.354-07:00 [INFO] balanced; plan: ***REDACTED***{***REDACTED***[0 1 2 3], ***REDACTED***[0 1 2 3], ***REDACTED***[0 1 2 3]}
    2022-11-10T00:35:11.354-07:00 [INFO] syncing; group: ***REDACTED***, protocol_type: consumer, protocol: cooperative-sticky
    2022-11-10T00:35:11.361-07:00 [INFO] synced; group: ***REDACTED***, assigned: ***REDACTED***[0 1 2 3], ***REDACTED***[0 1 2 3], ***REDACTED***[0 1 2 3]
    2022-11-10T00:35:11.361-07:00 [INFO] new group session begun; group: ***REDACTED***, added: ***REDACTED***[0 1 2 3], ***REDACTED***[0 1 2 3], ***REDACTED***[0 1 2 3], lost:
    2022-11-10T00:35:11.361-07:00 [INFO] beginning heartbeat loop; group: ***REDACTED***
    2022-11-10T00:35:11.406-07:00 [INFO] assigning partitions; why: newly fetched offsets for group ***REDACTED***, how: assigning everything new, keeping current assignment, input: ***REDACTED***[1{-2.-1 0} 2{-2.-1 0} 0{-2.-1 0} 3{-2.-1 0}], ***REDACTED***[2{24031.14 0} 1{23435.14 0} 0{27120.16 0} 3{26493.16 0}], ***REDACTED***[1{7953.16 0} 0{9211.13 0} 3{8975.14 0} 2{8149.12 0}]
    2022-11-10T00:37:02.642-07:00 [INFO] sasl lifetime minus 2.5s lower bound latency results in immediate reauthentication, sleeping 100ms to avoid spin-loop; broker: 1, session_lifetime: 2.359s, latency_lower_bound: 2.5s
    2022-11-10T00:37:02.743-07:00 [INFO] metadata update triggered; why: opportunistic load during source backoff
    2022-11-10T00:37:02.775-07:00 [INFO] sasl lifetime minus 2.5s lower bound latency results in immediate reauthentication, sleeping 100ms to avoid spin-loop; broker: 2, session_lifetime: 2.226s, latency_lower_bound: 2.5s
    
    
  • [Question] Recovering from INVALID_PRODUCER_EPOCH

    [Question] Recovering from INVALID_PRODUCER_EPOCH

    Hey @twmb, over the course of testing a transactional producer I've written I've encountered this error from EndTransaction after a rebalance:

    INVALID_PRODUCER_EPOCH: Producer attempted an operation with an old epoch.
    

    For a bit of context, this is from a consumer (currently Sarama, will be franz-go soon) that does the following:

    1. client.BeginTransaction
    2. Consumes a record from a topic
    3. Writes a bunch of records to a different topic (client.Send)
    4. If all goes well, client.EndTransaction(ctx, kgo.TryCommit)
    5. If something goes wrong in step 3 or 4, there's a client.AbortBufferedRecords followed by a client.EndTransaction(ctx, kgo.TryAbort)

    We're running multiple instances of these consumers, each reading from one or more partitions on the input topic and writing (randomly) to partitions on the output topic. These tasks can be relatively long running and rebalances may occur during any of those steps due to deploys, scaling, kafka operations, etc.

    What happens after a rebalance is they start throwing those kerr.InvalidProducerEpoch errors. A little digging suggests this is due to franz-go's handling of KIP-588 and that seems to make sense, but what isn't clear to me is how to recover from this situation. Right now, the producers just spin forever on that error - able to begin a transaction but not write records or end the transaction (though aborting still works).

    I remembered this doc from config.TransactionalID, but I'm not sure it applies to me since I'm not using a franz-go consumer.

    // Note that, unless using Kafka 2.5.0, a consumer group rebalance may be // problematic. Production should finish and be committed before the client // rejoins the group. It may be safer to use an eager group balancer and just // abort the transaction. Alternatively, any time a partition is revoked, you // could abort the transaction and reset offsets being consumed.

    So with that in mind and with the bold assumption that I'm doing this correctly in the first place, what is the proper way to recover from this situation with the franz-go API?

  • panic: runtime error: invalid memory address or nil pointer dereference

    panic: runtime error: invalid memory address or nil pointer dereference

    Client version v0.6.9 Kafka Version: kafka 2.6 Connection Auth: MTLS connection Auto commit disabled

    Our Kafka dev cluster can be under high load, so no idea if its related

    panic: runtime error: invalid memory address or nil pointer dereference
    [signal SIGSEGV: segmentation violation code=0x1 addr=0x0 pc=0x7d381f]
    
    goroutine 10119 [running]:
    github.com/twmb/franz-go/pkg/kgo.(*topicPartitions).load(...)
            /go/pkg/mod/github.com/twmb/[email protected]/pkg/kgo/topics_and_partitions.go:73
    github.com/twmb/franz-go/pkg/kgo.(*consumer).assignPartitions(0xc0007326a0, 0xc0030022d0, 0xc00084d500)
            /go/pkg/mod/github.com/twmb/[email protected]/pkg/kgo/consumer.go:357 +0x47f
    github.com/twmb/franz-go/pkg/kgo.(*groupConsumer).fetchOffsets(0xc00084d4a0, 0xfc34a8, 0xc002bcce40, 0xc0024914a0, 0x0, 0x0)
            /go/pkg/mod/github.com/twmb/[email protected]/pkg/kgo/consumer_group.go:1201 +0x4d7
    github.com/twmb/franz-go/pkg/kgo.(*groupConsumer).setupAssignedAndHeartbeat.func3(0xc002bd7080, 0xc002554060, 0xc00084d4a0, 0xc0024914a0, 0xfc34a8, 0xc002bcce40)
            /go/pkg/mod/github.com/twmb/[email protected]/pkg/kgo/consumer_group.go:750 +0x145
    created by github.com/twmb/franz-go/pkg/kgo.(*groupConsumer).setupAssignedAndHeartbeat
            /go/pkg/mod/github.com/twmb/[email protected]/pkg/kgo/consumer_group.go:746 +0x387
    
  • Possible record loss on unclean kill of consumer when using automatic offset commit

    Possible record loss on unclean kill of consumer when using automatic offset commit

    Because the offset commit and consumption are done independently it is possible to have record loss on the consumer side if the consumer process has a kill -9 applied to it mid loop. You can reproduce this by producing 1000 records, then run up a consumer and sleep at the 500 record mark. Then issue a kill -9 on the consumer process. When the consumer starts back up the full 1000 records sometimes don't all get consumed. This is presumably because the offsets for these records have already been committed. When you run the Go confluent lib this does not happen because the offset commit occurs inside the Poll call itself, so there is no message loss.

  • Kafka read error with SASL auth: TOPIC_AUTHORIZATION_FAILED: Not authorized to access topics: [Topic authorization failed.] + GROUP_AUTHORIZATION_FAILED: Not authorized to access group: Group authorization failed

    Kafka read error with SASL auth: TOPIC_AUTHORIZATION_FAILED: Not authorized to access topics: [Topic authorization failed.] + GROUP_AUTHORIZATION_FAILED: Not authorized to access group: Group authorization failed

    I have a service that continuosly writes (and then reads) from the same topic every 5 seconds.

    I have two different clients using franz-go, one with TLS auth and one with SASL via msk_iam auth.

    My writer seems to be fine but errors every few hours. The errors are always the same.

    TOPIC_AUTHORIZATION_FAILED: Not authorized to access topics: [Topic authorization failed.]
    

    I will get 200k of these errors over the course of 4 minutes and then everything resumes fine again.

    My reader code was based heavily on this example: https://github.com/twmb/franz-go/blob/master/examples/goroutine_per_partition_consuming/autocommit_marks/main.go

    This is a snipped of the code:

    for {
    	fetches := r.service.PollRecords(ctx, 1000)
    	err := fetches.Err0()
    	if err != nil {
    		if errors.Is(err, kafka.ErrClientClosed) {
    			contextLogger.Error().Err(err).Msg("Client closed")
    			return err
    		}
    		if errors.Is(err, context.Canceled) {
    			contextLogger.Error().Err(err).Msg("Context cancelled closed")
    			return err
    		}
    	}
    	fetches.EachError(func(topic string, partition int32, err error) {
    		contextLogger.Error().
    			Err(err).
    			Str("Topic", topic).
    			Int32("Partition", partition).
    			Msg("Cannot fetch message")
    	})
    	fetches.EachPartition(func(fetchedTopicPartition kafka.FetchTopicPartition) {
    		tp := topicPartition{
    			topic:     fetchedTopicPartition.Topic,
    			partition: fetchedTopicPartition.Partition,
    		}
    
    		r.consumers[tp].records <- fetchedTopicPartition
    	})
    
    	r.service.AllowRebalance()
    }
    

    And this is how the client is instantiated:

    	franzGoReader := &FranzGoReader{
    		consumers: make(map[topicPartition]*franzGoPartitionReader),
    		log:       contextLogger,
    	}
    
    	kafkaAddr := strings.Split(brokersEndpoint, ",")
    	clientOpts := []kafka.Opt{
    		kafka.SeedBrokers(kafkaAddr...),
    		kafka.ConsumeTopics(kafkaTopic),
    		kafka.ConsumerGroup(consumerGroup),
    		// default is CooperativeStickyBalancer
    		kafka.Balancers(kafka.RangeBalancer()),
    		kafka.ConsumeResetOffset(kafka.NewOffset().AtStart()),
    		kafka.OnPartitionsAssigned(franzGoReader.assigned),
    		kafka.OnPartitionsRevoked(franzGoReader.revoked),
    		kafka.OnPartitionsLost(franzGoReader.lost),
    		kafka.AutoCommitMarks(),
    		kafka.AutoCommitInterval(3 * time.Second),
    		kafka.BlockRebalanceOnPoll(),
    		kafka.WithLogger(&franzGoLogger{
    			log: contextLogger,
    		}),
    	}
    	if useAuth {
    		clientOpts = append(clientOpts, kafka.DialTLSConfig(&tls.Config{}))
    
    		sess, err := session.NewSession()
    		if err != nil {
    			contextLogger.Error().Err(err).Msg("Could not get aws session")
    		}
    		clientOpts = append(clientOpts, kafka.SASL(aws.ManagedStreamingIAM(func(ctx context.Context) (aws.Auth, error) {
    			val, err := sess.Config.Credentials.GetWithContext(ctx)
    			if err != nil {
    				return aws.Auth{}, err
    			}
    			return aws.Auth{
    				AccessKey:    val.AccessKeyID,
    				SecretKey:    val.SecretAccessKey,
    				SessionToken: val.SessionToken,
    				UserAgent:    "kafka-health-monitor",
    			}, nil
    		})))
    	}
    
    	cl, err := kafka.NewClient(clientOpts...)
    	if err != nil {
    		contextLogger.Error().Err(err).Msg("Could not create FranzGo client")
    		return nil, err
    	}
    
    

    partitions assigned callback:

    func (r *FranzGoReader) assigned(_ context.Context, cl *kafka.Client, assignedTopicPartitions map[string][]int32) {
    	contextLogger := r.log.With().Logger()
    	contextLogger.Debug().Msg("Partitions assigned")
    
    	for topic, partitions := range assignedTopicPartitions {
    		for _, partition := range partitions {
    			contextLoggerPartionReader := contextLogger.With().
    				Str("Topic", topic).
    				Int32("Partition", partition).
    				Logger()
    			partitionReader := &franzGoPartitionReader{
    				service:   cl,
    				log:       contextLoggerPartionReader,
    				topic:     topic,
    				partition: partition,
    
    				quit:    make(chan struct{}),
    				done:    make(chan struct{}),
    				records: make(chan kafka.FetchTopicPartition, 5),
    			}
    			r.consumers[topicPartition{topic, partition}] = partitionReader
    			go partitionReader.consume()
    		}
    	}
    }
    

    partitions lost callback:

    func (r *FranzGoReader) lost(_ context.Context, _ *kafka.Client, topicPartionsLost map[string][]int32) {
    	contextLogger := r.log.With().Logger()
    	contextLogger.Debug().Msg("Partitions lost")
    
    	r.killPartitionReaders(topicPartionsLost)
    }
    
    func (r *FranzGoReader) killPartitionReaders(topicPartions map[string][]int32) {
    	contextLogger := r.log.With().Logger()
    	contextLogger.Debug().Msg("Kill partitionReaders")
    
    	wg := &sync.WaitGroup{}
    	defer wg.Wait()
    
    	for topic, partitions := range topicPartions {
    		for _, partition := range partitions {
    			wg.Add(1)
    
    			tp := topicPartition{topic, partition}
    			partitionReader := r.consumers[tp]
    			delete(r.consumers, tp)
    			go func() {
    				partitionReader.close()
    				wg.Done()
    			}()
    		}
    	}
    }
    
    func (pr *franzGoPartitionReader) close() {
    	contextLogger := pr.log.With().Logger()
    	contextLogger.Debug().Msg("Closing partitionReader")
    
    	close(pr.quit)
    	contextLogger.Debug().Msg("Waiting for work to finish")
    	<-pr.done
    }
    
  • Getting

    Getting "invalid short sasl lifetime millis" periodically with AWS MSK using SASL authentication

    I'm getting "invalid short sasl lifetime millis" periodically (after a few hours) with AWS MSK using SASL authentication.

    Looking at the code, it errors out when the lifetimeMillis is less than 5 sec but would re-authenticate if the time is greater than 5 sec.

    The periodical re-authentication works most of the time because AWS auth token expires every 15min and I only see this issue after a few hours. When it happens, lifetimeMillis become less than 5s. Could this be a clock sync issue? Any idea how I can prevent this error from happening?

  • Fix integration test action

    Fix integration test action

    Hey @twmb, following up on #218, I think this should do the trick using the bitnami/kafka container in Zookeeperless mode. I had to adjust the tests a bit so the KGO_SEEDS env var is read by each test which needs to connect to Kafka.

    I also tried to use Redpanda, but there are some issues with it. Could you please let me know how you got the tests to pass against it? It may just be that I'm not configuring it correctly (maybe it doesn't like the fact that I'm running it as a single node?), but my Kafka knowledge is too limited to troubleshoot it. Here's what I tried:

    > docker run --rm -p9092:9092 docker.vectorized.io/vectorized/redpanda:latest start --default-log-level=debug --smp 1 --reserve-memory 0M --overprovisioned --set redpanda.enable_transactions=true --kafka-addr 0.0.0.0:9092 --advertise-kafka-addr 127.0.0.1:9092
    > # Wait for container to start
    > KGO_TEST_RF=1 KGO_TEST_RECORDS=50000 go test -v -count 1 ./...
    

    I pulled the current latest tag of the Redpanda container (sha256:84c8cd98ed7a50e4f378af55600c92c4fb097afab4ec164c3ded9b0e884754c4).

    I guess this log entry might be relevant: [ERROR] fatal InitProducerID error, failing all buffered records; broker: 1, err: INVALID_PRODUCER_EPOCH: Producer attempted an operation with an old epoch.

    Here are the full logs:

    redpanda.log test.log

  • Manual commit multiple partitions consumer

    Manual commit multiple partitions consumer

    Slightly modified version of https://github.com/twmb/franz-go/tree/master/examples/goroutine_per_partition_consuming

    Using manual commits.

    During testing of this i noticed that when adding new consumers I see errors from err := cl.CommitRecords(context.Background(), recs...) The errors where mainly

    error”:“ILLEGAL_GENERATION: Specified group generation id is not valid.”
    

    But also included

    REBALANCE_IN_PROGRESS: The group is rebalancing, so a rejoin is needed.”
    

    There did not seem to be a clear pattern in the errors

  • Producer failover

    Producer failover

    I've a Kafka cluster with 3 brokers. But I've faced a problem while one broker was falled down (ex., upgrading or failure). The producer continues trying to ProduceSync() to fallen broker infinitely. Sometimes producer switches to other workable broker.

    I've investigated how the ProduceSync() works and found out that it uses Sinc.produce() and as a consequence, it use Sinc.doSequenced(). Is it correct? Sinc.doSequenced() uses only one broker, fetched by Client.brokerOrErr(). And doesn't changed it after errors.

    Is it correct behaviour?

    How can I limit the producing loop by time? I want to drop an error if ProduceSync cannot to produce after 10s, example.

    <134>1 14:36:46.810145 beginning transaction transactional_id=http-kafka-28692 
    <135>1 14:36:46.810351 opening connection to broker addr=192.168.178.83:9093 broker=3 
    <135>1 14:36:46.817657 connection opened to broker broker=3 addr=192.168.178.83:9093 
    <135>1 14:36:46.817770 connection initialized successfully addr=192.168.178.83:9093 broker=3 
    <135>1 14:36:46.817821 wrote AddPartitionsToTxn v3 time_to_write=29.227µs err=<nil> broker=3 bytes_written=83 write_wait=7.403375ms 
    <135>1 14:36:46.823620 read AddPartitionsToTxn v3 err=<nil> broker=3 bytes_read=50 read_wait=78.86µs time_to_read=5.755618ms 
    <135>1 14:36:46.823723 retrying request tries=1 backoff=230.298076ms request_error=<nil> response_error=COORDINATOR_NOT_AVAILABLE: The coordinator is not available. 
    <135>1 14:36:47.057487 wrote FindCoordinator v4 broker=3 bytes_written=42 write_wait=185.586µs time_to_write=42.689µs err=<nil> 
    <135>1 14:36:47.058197 read FindCoordinator v4 broker=3 bytes_read=62 read_wait=145.794µs time_to_read=611.298µs err=<nil> 
    <135>1 14:36:47.058400 wrote AddPartitionsToTxn v3 write_wait=51.985µs time_to_write=36.932µs err=<nil> broker=3 bytes_written=83 
    <135>1 14:36:47.059458 read AddPartitionsToTxn v3 broker=3 bytes_read=50 read_wait=143.05µs time_to_read=910.882µs err=<nil> 
    <135>1 14:36:47.059522 retrying request tries=2 backoff=457.460797ms request_error=<nil> response_error=COORDINATOR_NOT_AVAILABLE: The coordinator is not available. 
    <135>1 14:36:47.520361 opening connection to broker addr=192.168.178.79:9093 broker=seed 0 
    <132>1 14:36:57.523937 unable to open connection to broker addr=192.168.178.79:9093 broker=seed 0 err=dial tcp 192.168.178.79:9093: i/o timeout 
    <135>1 14:36:57.524082 retrying request tries=1 backoff=284.533848ms request_error=unable to dial: dial tcp 192.168.178.79:9093: i/o timeout response_error=<nil> 
    <135>1 14:36:57.824323 opening connection to broker addr=192.168.178.79:9093 broker=1 
    <132>1 14:37:07.826475 unable to open connection to broker addr=192.168.178.79:9093 broker=1 err=dial tcp 192.168.178.79:9093: i/o timeout 
    <135>1 14:37:07.826858 retrying request request_error=unable to dial: dial tcp 192.168.178.79:9093: i/o timeout response_error=<nil> tries=2 backoff=586.29262ms 
    <135>1 14:37:08.416674 wrote FindCoordinator v4 time_to_write=44.76µs err=<nil> broker=3 bytes_written=42 write_wait=85.71µs 
    <135>1 14:37:08.417393 read FindCoordinator v4 broker=3 bytes_read=62 read_wait=127.606µs time_to_read=635.265µs err=<nil> 
    <135>1 14:37:08.417536 wrote AddPartitionsToTxn v3 bytes_written=83 write_wait=36.114µs time_to_write=29.627µs err=<nil> broker=3 
    <135>1 14:37:08.418749 read AddPartitionsToTxn v3 broker=3 bytes_read=50 read_wait=86.14µs time_to_read=1.111242ms err=<nil> 
    <135>1 14:37:08.418806 retrying request request_error=<nil> response_error=COORDINATOR_NOT_AVAILABLE: The coordinator is not available. tries=3 backoff=1.108865437s 
    <135>1 14:37:09.535911 opening connection to broker addr=192.168.178.82:9093 broker=seed 1 
    <132>1 14:37:19.536643 unable to open connection to broker broker=seed 1 err=dial tcp 192.168.178.82:9093: i/o timeout addr=192.168.178.82:9093 
    <135>1 14:37:19.536786 retrying request tries=1 backoff=231.021421ms request_error=unable to dial: dial tcp 192.168.178.82:9093: i/o timeout response_error=<nil> 
    <135>1 14:37:19.771543 opening connection to broker addr=192.168.178.79:9093 broker=1 
    <132>1 14:37:29.775081 unable to open connection to broker addr=192.168.178.79:9093 broker=1 err=dial tcp 192.168.178.79:9093: i/o timeout 
    <135>1 14:37:29.775508 retrying request tries=2 backoff=434.104674ms request_error=unable to dial: dial tcp 192.168.178.79:9093: i/o timeout response_error=<nil> 
    <135>1 14:37:30.210920 wrote FindCoordinator v4 write_wait=71.458µs time_to_write=47.876µs err=<nil> broker=3 bytes_written=42 
    <135>1 14:37:30.211422 read FindCoordinator v4 bytes_read=62 read_wait=151.959µs time_to_read=377.955µs err=<nil> broker=3 
    <135>1 14:37:30.211613 wrote AddPartitionsToTxn v3 broker=3 bytes_written=83 write_wait=51.084µs time_to_write=30.96µs err=<nil> 
    <135>1 14:37:30.212745 read AddPartitionsToTxn v3 broker=3 bytes_read=50 read_wait=120.874µs time_to_read=1.021783ms err=<nil> 
    <134>1 14:37:30.212836 metadata update triggered why=opportunistic load during sink backoff 
    <135>1 14:37:30.212905 opening connection to broker addr=192.168.178.83:9093 broker=seed 2 
    <135>1 14:37:30.220655 connection opened to broker addr=192.168.178.83:9093 broker=seed 2 
    <135>1 14:37:30.220725 connection initialized successfully addr=192.168.178.83:9093 broker=seed 2 
    <135>1 14:37:30.220782 wrote Metadata v11 broker=seed 2 bytes_written=65 write_wait=7.822396ms time_to_write=21.885µs err=<nil> 
    <135>1 14:37:30.224335 read Metadata v11 broker=seed 2 bytes_read=286 read_wait=48.847µs time_to_read=3.497792ms err=<nil> 
    <135>1 14:37:30.224395 metadata refresh topic partition data changed partition=0 new_leader=3 new_leader_epoch=29 old_leader=3 old_leader_epoch=28 topic=test-topic 
    <135>1 14:37:30.224448 metadata refresh topic partition data changed topic=test-topic partition=1 new_leader=3 new_leader_epoch=27 old_leader=3 old_leader_epoch=26 
    <135>1 14:37:30.224497 metadata refresh topic partition data changed topic=test-topic partition=2 new_leader=3 new_leader_epoch=35 old_leader=1 old_leader_epoch=34 
    <135>1 14:37:30.224541 metadata refresh topic partition data changed old_leader=1 old_leader_epoch=28 topic=test-topic partition=3 new_leader=3 new_leader_epoch=29 
    <135>1 14:37:30.224643 wrote FindCoordinator v4 err=<nil> broker=3 bytes_written=42 write_wait=23.683µs time_to_write=20.286µs 
    <135>1 14:37:30.225128 read FindCoordinator v4 broker=3 bytes_read=62 read_wait=91.079µs time_to_read=395.637µs err=<nil> 
    <135>1 14:37:30.225263 wrote AddPartitionsToTxn v3 bytes_written=83 write_wait=29.977µs time_to_write=27.46µs err=<nil> broker=3 
    <135>1 14:37:30.226259 read AddPartitionsToTxn v3 broker=3 bytes_read=50 read_wait=94.474µs time_to_read=894.645µs err=<nil> 
    <135>1 14:37:30.226318 retrying request tries=1 backoff=243.469586ms request_error=<nil> response_error=COORDINATOR_NOT_AVAILABLE: The coordinator is not available. 
    <135>1 14:37:30.481937 opening connection to broker addr=192.168.178.79:9093 broker=seed 0 
    <132>1 14:37:40.485280 unable to open connection to broker addr=192.168.178.79:9093 broker=seed 0 err=dial tcp 192.168.178.79:9093: i/o timeout 
    <135>1 14:37:40.485443 retrying request tries=1 backoff=216.634833ms request_error=unable to dial: dial tcp 192.168.178.79:9093: i/o timeout response_error=<nil> 
    <135>1 14:37:40.708462 wrote FindCoordinator v4 broker=3 bytes_written=42 write_wait=102.446µs time_to_write=36.821µs err=<nil> 
    <135>1 14:37:40.709121 read FindCoordinator v4 broker=3 bytes_read=62 read_wait=172.093µs time_to_read=511.864µs err=<nil> 
    ...
    <135>1 14:43:38.429327 wrote AddPartitionsToTxn v3 broker=3 bytes_written=83 write_wait=46.453µs time_to_write=34.388µs err=<nil> 
    <135>1 14:43:38.430795 read AddPartitionsToTxn v3 time_to_read=1.371719ms err=<nil> broker=3 bytes_read=50 read_wait=91.482µs 
    <134>1 14:43:38.430855 metadata update triggered why=opportunistic load during sink backoff 
    <135>1 14:43:38.431034 wrote Metadata v11 broker=3 bytes_written=65 write_wait=47.974µs time_to_write=27.359µs err=<nil> 
    <135>1 14:43:38.431955 read Metadata v11 time_to_read=842.77µs err=<nil> broker=3 bytes_read=294 read_wait=81.705µs 
    <135>1 14:43:38.432040 metadata refresh has identical topic partition data topic=test-topic partition=0 leader=3 leader_epoch=29 
    <135>1 14:43:38.432119 metadata refresh has identical topic partition data leader=3 leader_epoch=27 topic=test-topic partition=1 
    <135>1 14:43:38.432199 metadata refresh has identical topic partition data topic=test-topic partition=2 leader=3 leader_epoch=35 
    <135>1 14:43:38.432285 metadata refresh has identical topic partition data partition=3 leader=3 leader_epoch=29 topic=test-topic 
    <135>1 14:43:40.937708 wrote FindCoordinator v4 err=<nil> broker=3 bytes_written=42 write_wait=116.952µs time_to_write=64.928µs 
    <135>1 14:43:40.938280 read FindCoordinator v4 broker=3 bytes_read=62 read_wait=143.222µs time_to_read=463.45µs err=<nil> 
    <135>1 14:43:40.938462 wrote AddPartitionsToTxn v3 write_wait=39.842µs time_to_write=34.013µs err=<nil> broker=3 bytes_written=83 
    <135>1 14:43:40.962693 read AddPartitionsToTxn v3 bytes_read=50 read_wait=115.381µs time_to_read=24.075232ms err=<nil> broker=3 
    <135>1 14:43:40.962784 opening connection to broker addr=192.168.178.83:9093 broker=3 
    <135>1 14:43:40.970550 connection opened to broker addr=192.168.178.83:9093 broker=3 
    <135>1 14:43:40.970672 connection initialized successfully addr=192.168.178.83:9093 broker=3 
    <135>1 14:43:40.971699 wrote Produce v9 broker=3 bytes_written=702 write_wait=8.879035ms time_to_write=40.176µs err=<nil> 
    <135>1 14:43:41.043202 read Produce v9 err=<nil> broker=3 bytes_read=76 read_wait=188.858µs time_to_read=71.2313ms 
    <135>1 14:43:41.043333 produced broker=3 to=test-topic[3{115=>116}] 
    <134>1 14:43:41.043384 flushing 
    <135>1 14:43:41.043432 flushed 
    <135>1 14:43:41.043475 transaction ending, no group loaded; this must be a producer-only transaction, not consume-modify-produce EOS 
    <134>1 14:43:41.043522 ending transaction epoch=0 commit=true transactional_id=http-kafka-28692 producer_id=281000 
    <135>1 14:43:41.043570 wrote EndTxn v3 bytes_written=51 write_wait=46.112µs time_to_write=35.409µs err=<nil> broker=3 
    <135>1 14:43:41.070314 read EndTxn v3 broker=3 bytes_read=16 read_wait=70.508µs time_to_read=26.590678ms err=<nil> 
    
  • Finer grained transaction control needed - Feature request

    Finer grained transaction control needed - Feature request

    Hi there. Love the project and am considering using it. However, the GroupTransactSession semantics seem to be a bit limiting for concurrent processing of partitions. I have a consumer application that interacts with external systems, and it seems there is no good way to exclude a partition from a group transaction.

    in the eos example given:

    		if err := sess.Begin(); err != nil {
    			// Similar to above, we only encounter errors here if
    			// we are not transactional or are already in a
    			// transaction. We should not hit this error.
    			die("unable to start transaction: %v", err)
    		}
    
    		e := kgo.AbortingFirstErrPromise(sess.Client())
    		fetches.EachRecord(func(r *kgo.Record) {
    			sess.Produce(ctx, kgo.StringRecord("eos "+string(r.Value)), e.Promise())
    		})
    		committed, err := sess.End(ctx, e.Err() == nil)
    		
    

    I could throw each record into a go routine for it's partitions and then wait for all of them to finish before consuming the next set of records, but if one partition is running slow, the entire batch returned in fetches will block. This seems like both a throughput and latency issue for my use case.

    The shorthand of what I would like to be able to do is something like this:

    fetches := client.PollFetches(ctx)
    
    fetches.EachPartition(func (p) {
       go func () {
           producerClient := transactionalProducerPool.Borrow()
           defer transactionalProducerPool.Release(producerClient)
           producerClient.BeginTransaction()
           process(p.Records)
           producerClient.Flush()
           producerClient.CommitOffsetsForTransaction(getHighestOffsets(p.Records))
           producerClient.EndTransaction()
       }()
    })
    

    The CommitOffsetsForTransaction method existed at one point as it is referenced in the documentation for EndTransaction. It seems that this method was made package private. Is there any way to export this method again (or some version of it)?

  • error=

    error="unknown attributes on uncompressed message 8"

    We are piloting this project out and are running in to this error when trying to consume from one of our brokers:

    unknown attributes on uncompressed message 8
    

    The error is being returned from fetches.Errors()

    This particular broker cluster has the following settings:

    version=2.2.1
    inter.broker.protocol.version: "2.2"
    log.message.format.version: "0.10.1"
    

    We do not see the same issue on another broker cluster with the following settings:

    version: 2.7.1
    inter.broker.protocol.version: 2.7.1
    log.message.format.version: "2.7"
    

    I've tried passing different versions to the client via kgo.MaxVersions to no avail

    Is there some other option that I should be setting here or is there any way to get more debug information about what is happening here?

  • kmsg: commit new ConsumerMemberMetadata for KIP-792

    kmsg: commit new ConsumerMemberMetadata for KIP-792

    For #272, we need to tag the kmsg protocol first.

    This was originally merged, but for fear of Kafka changing the protocol again before release, this is waiting until the next Kafka release.

  • Add Kotel plugin

    Add Kotel plugin

    This PR adds OpenTelemetry support to Franz-go as a plugin, allowing for the export of traces and metrics using OpenTelemetry. We have made every effort to adhere to the Open Telemetry specification and follow conventions for span names, attributes, and other elements, as outlined in the Messaging Semantic Conventions.

    Our main goal is to track the movement of requests from their source to their destination. To do this, we create three spans for traces, one of which is optional. The OnProduceRecordBuffered span represents the send span, while the OnProduceRecordUnbuffered span completes the send span and records any errors. The OnFetchRecordBuffered span represents the receive span, and may also create a log-append span. This log-append span is a debatable feature, but was requested by our team.

    Both producer and consumer clients can use tracing contexts for context propagation. For example, a producer client may instrument an HTTP request and track it as it moves through Kafka, while a consumer client can extract tracing contexts from records and propagate them downstream to Kafka or other systems. Consumers can also add process spans after consuming records.

    For metrics, we have followed the same logic used in the existing metric plugin.

    We hope that these changes will improve the functionality of Franz-go and provide valuable insights through the use of OpenTelemetry. We have made every effort to ensure that these changes adhere to the Open Telemetry specification and follow best practices. If you have any questions or concerns, please don't hesitate to let us know. We look forward to your approval of this PR.

  • Support for protobuf message index?

    Support for protobuf message index?

    Hello! First of all, I'm very happy with how this library is setup, and I would love to continue using it. I'm running into an issue with Confluent Kafka where I'm unable to consume protobuf messages serialized using an SR schema with message index.

    Looking at the Serde implementation, it looks like it's accounting for schema ID (magic byte 0 and big endian uint32), but I don't see a way to make it account for the message index bytes. Is that supported and I'm not seeing it?

    Naturally, I'll need to account for these indexes on the producer side too.

franz-go - A complete Apache Kafka client written in Go

franz-go contains a feature complete, pure Go library for interacting with Kafka from 0.8.0 through 2.8.0+. Producing, consuming, transacting, administrating, etc.

Dec 29, 2022
provider-kafka is a Crossplane Provider that is used to manage Kafka resources.

provider-kafka provider-kafka is a Crossplane Provider that is used to manage Kafka resources. Usage Create a provider secret containing a json like t

Oct 29, 2022
go broker interface,you can use kafka,redis,pulsar etc.

broker go broker interface,you can use kafka,redis,pulsar etc. pulsar in docker run pulsar in docker docker run -dit \ --name pulsar-sever \ -p 6650:

Sep 8, 2022
It's client library written in Golang for interacting with Linkedin Cruise Control using its HTTP API.

go-cruise-control It's client library (written in Golang) for interacting with Linkedin Cruise Control using its HTTP API. Supported Cruise Control ve

Jan 10, 2022
A go library for interacting with Google Verified SMS

verifiedsms This is a go library for interacting with the Google Verified SMS service. You'll need to already be signed up as a Verified SMS Partner t

Aug 18, 2022
Sarama is a Go library for Apache Kafka 0.8, and up.

sarama Sarama is an MIT-licensed Go client library for Apache Kafka version 0.8 (and later). Getting started API documentation and examples are availa

Jan 1, 2023
Cluster extensions for Sarama, the Go client library for Apache Kafka 0.9

Cluster extensions for Sarama, the Go client library for Apache Kafka 0.9 (and later).

Dec 28, 2022
kafka watcher for casbin library

Casbin Kafka Watcher Casbin watcher for kafka This watcher library will enable users to dynamically change casbin policies through kakfa messages Infl

May 8, 2021
Producer x Consumer example using Kafka library and Go.

Go - Kafka - Example Apache Kafka Commands First of all, run the docker-compose docker-compose up, than run docker exec -it kafka_kafka_1 bash Topics

Dec 8, 2021
nanoQ — high-performance brokerless Pub/Sub for streaming real-time data

nanoQ — high-performance brokerless Pub/Sub for streaming real-time data nanoQ is a very minimalistic (opinionated/limited) Pub/Sub transport library.

Nov 9, 2022
High-Performance server for NATS, the cloud native messaging system.
High-Performance server for NATS, the cloud native messaging system.

NATS is a simple, secure and performant communications system for digital systems, services and devices. NATS is part of the Cloud Native Computing Fo

Jan 2, 2023
Simple, high-performance event streaming broker

Styx Styx is a simple and high-performance event streaming broker. It aims to provide teams of all sizes with a simple to operate, disk-persisted publ

Nov 24, 2022
Confluent's Apache Kafka Golang client

Confluent's Golang Client for Apache KafkaTM confluent-kafka-go is Confluent's Golang client for Apache Kafka and the Confluent Platform. Features: Hi

Dec 30, 2022
Implementation of the NELI leader election protocol for Go and Kafka
Implementation of the NELI leader election protocol for Go and Kafka

goNELI Implementation of the NELI leader election protocol for Go and Kafka. goNELI encapsulates the 'fast' variation of the protocol, running in excl

Dec 8, 2022
ChanBroker, a Broker for goroutine, is simliar to kafka

Introduction chanbroker, a Broker for goroutine, is simliar to kafka In chanbroker has three types of goroutine: Producer Consumer(Subscriber) Broker

Aug 12, 2021
Apache Kafka Web UI for exploring messages, consumers, configurations and more with a focus on a good UI & UX.
Apache Kafka Web UI for exploring messages, consumers, configurations and more with a focus on a good UI & UX.

Kowl - Apache Kafka Web UI Kowl (previously known as Kafka Owl) is a web application that helps you to explore messages in your Apache Kafka cluster a

Jan 3, 2023
Modern CLI for Apache Kafka, written in Go.
Modern CLI for Apache Kafka, written in Go.

Kaf Kafka CLI inspired by kubectl & docker Install Install from source: go get -u github.com/birdayz/kaf/cmd/kaf Install binary: curl https://raw.git

Dec 31, 2022
Easy to use distributed event bus similar to Kafka
Easy to use distributed event bus similar to Kafka

chukcha Easy to use distributed event bus similar to Kafka. The event bus is designed to be used as a persistent intermediate storage buffer for any k

Dec 30, 2022
Kafka implemented in Golang with built-in coordination (No ZooKeeper, single binary install, Cloud Native)

Jocko Distributed commit log service in Go that is wire compatible with Kafka. Created by @travisjeffery, continued by nash. Goals: Protocol compatibl

Aug 9, 2021