Sarama is a Go library for Apache Kafka 0.8, and up.

sarama

Go Reference Build Status Coverage

Sarama is an MIT-licensed Go client library for Apache Kafka version 0.8 (and later).

Getting started

  • API documentation and examples are available via pkg.go.dev.
  • Mocks for testing are available in the mocks subpackage.
  • The examples directory contains more elaborate example applications.
  • The tools directory contains command line tools that can be useful for testing, diagnostics, and instrumentation.

You might also want to look at the Frequently Asked Questions.

Compatibility and API stability

Sarama provides a "2 releases + 2 months" compatibility guarantee: we support the two latest stable releases of Kafka and Go, and we provide a two month grace period for older releases. This means we currently officially support Go 1.14 through 1.15, and Kafka 2.5 through 2.7, although older releases are still likely to work.

Sarama follows semantic versioning and provides API stability via the gopkg.in service. You can import a version with a guaranteed stable API via http://gopkg.in/Shopify/sarama.v1. A changelog is available here.

Contributing

Comments
  • Implement a higher-level consumer group

    Implement a higher-level consumer group

    Related to my other PR #1083, this is another attempt, this time with a higher-level consumer API. I have been going back and forth on this one, I can't find a better way to make it flexible enough to support all the use cases and comprehensible at the same time. I would really appreciate some feedback.

  • OffsetManager Implementation

    OffsetManager Implementation

    There are still some TODOs sprinkled throughout, but not all of them necessarily need to be fixed before we ship this (it's already pretty close to the minimum viable implementation). The only critical missing piece is implementing AsyncClose for the PartitionOffsetManager.

    Regardless, it's at the state where it can be opened up for comments. This should completely supersede #379.

  • Lockless multiproducer

    Lockless multiproducer

    CC @sirupsen @wvanbergen @burke @fw42 @graemej

    Simple non-batching synchronous tests are working. Needs batching tests, but as Burke mentioned yesterday they are very messy with the current framework.

  • New producer design [beta]

    New producer design [beta]

    @wvanbergen @burke @Sirupsen @graemej and whoever else might be interested.

    This is an idea for a different API for the producer that is somewhat more channel/goroutine based. I was thinking about alternative architectures after reviewing the PR which adds the channels returning failed/successful messages. I started playing with things (after also reviewing the original multiproducer PRs) and came up with this.

    It ended up being basically a complete rewrite, so reviewing the diff won't be too helpful, just read the new producer.go straight. It's very much a work-in-progress, I'm more interested in architectural thoughts right now than code nits. It seems to pass the simple tests, but that's primarily accidental, shrug.

    Misc. notes on things that might cause questions:

    • User communication (input of new messages, receipt of errors) is all done with channels now, so the common idiom I expect would be a select on those two channels to avoid blocking.
    • It doesn't have a "synchronous" mode, instead the separate SimpleProducer uses the AckSuccesses flag to effectively imitate that.
    • In the normal success path, a message ends up passing through five goroutines before being bundled into a message and passed to the Broker object to put on the wire. Not sure if this will impact performance noticeably, but it means that if any intermediate step blocks (i.e. a metadata request for a new leader for a topic) then all other messages not directly impacted will continue processing, which is nice (assuming you configure a large enough channel buffer).
    • I delayed actually converting the Key/Value elements to bytes until the very last moment. This seems cleaner, and also potentially simplifies the addition of streaming messages which somebody asked for recently.
    • Hopefully the retry path is easier to reason about than the old one, as it is exactly the same as the normal path now (the message gets resubmitted on exactly the same channel as the user submits messages on, it just has an extra flag set). It should still preserve ordering correctly though (see the leaderDispatcher logic for this).
  • panic: sync: negative WaitGroup counter

    panic: sync: negative WaitGroup counter

    Seeing this occasionally in production:

    Revision: 23d523386ce0c886e56c9faf1b9c78b07e5b8c90

    panic: sync: negative WaitGroup counter
    
    goroutine 444 [running]:
    sync.(*WaitGroup).Add(0xc208e6c510, 0xffffffffffffffff)
            /usr/src/go/src/sync/waitgroup.go:67 +0x96
    sync.(*WaitGroup).Done(0xc208e6c510)
            /usr/src/go/src/sync/waitgroup.go:85 +0x31
    github.com/Shopify/sarama.(*asyncProducer).returnSuccesses(0xc208e6c4d0, 0xc29ffe9600, 0x3f, 0x40)
            /opt/goproj/Godeps/_workspace/src/github.com/Shopify/sarama/async_producer.go:753 +0xd3
    github.com/Shopify/sarama.(*asyncProducer).flusher(0xc208e6c4d0, 0xc20814c540, 0xc2080b9800)
            /opt/goproj/Godeps/_workspace/src/github.com/Shopify/sarama/async_producer.go:560 +0xafe
    github.com/Shopify/sarama.funcĀ·006()
            /opt/goproj/Godeps/_workspace/src/github.com/Shopify/sarama/async_producer.go:430 +0x43
    github.com/Shopify/sarama.withRecover(0xc20802b220)
            /opt/goproj/Godeps/_workspace/src/github.com/Shopify/sarama/utils.go:42 +0x3a
    created by github.com/Shopify/sarama.(*asyncProducer).messageAggregator
            /opt/goproj/Godeps/_workspace/src/github.com/Shopify/sarama/async_producer.go:430 +0x205
    
  • Not seeing all messages when syncProducer called from goroutines

    Not seeing all messages when syncProducer called from goroutines

    Versions

    Please specify real version numbers or git SHAs, not just "Latest" since that changes fairly regularly. Sarama Version: 1.15.0 Kafka Version: 0.11.0 Go Version: 1.9.2 darwin/amd64

    Configuration

    What configuration values are you using for Sarama and Kafka?

    Sarama configuration

    config.Producer.RequiredAcks = sarama.WaitForAll
    config.Producer.Retry.Max = 10
    config.Producer.Return.Successes = true
    config.Producer.Return.Errors = true
    config.Version = sarama.V0_11_0_0
    

    Kafka configuration

    broker.id=0
    delete.topic.enable=true
    listeners=PLAINTEXT://localhost:9092
    num.network.threads=3
    num.io.threads=8
    socket.send.buffer.bytes=102400
    socket.receive.buffer.bytes=102400
    socket.request.max.bytes=104857600
    log.dirs=/usr/local/Cellar/kafka/kafka-log-1
    num.partitions=1
    num.recovery.threads.per.data.dir=1
    offsets.topic.replication.factor=1
    transaction.state.log.replication.factor=1
    transaction.state.log.min.isr=1
    auto.create.topics.enable=false 
    log.retention.hours=168
    log.segment.bytes=1073741824
    log.retention.check.interval.ms=300000
    zookeeper.connection.timeout.ms=6000
    group.initial.rebalance.delay.ms=0
    
    Logs

    When filing an issue please provide logs from Sarama and Kafka if at all possible. You can set sarama.Logger to a log.Logger to capture Sarama debug output.

    sarama: client.go:115: Initializing new client
    sarama: config.go:351: ClientID is the default of 'sarama', you should consider setting it to something application-specific.
    sarama: config.go:351: ClientID is the default of 'sarama', you should consider setting it to something application-specific.
    sarama: client.go:646: client/metadata fetching metadata for all topics from broker localhost:9092
    sarama: broker.go:146: Connected to broker at localhost:9092 (unregistered)
    sarama: client.go:429: client/brokers registered new broker #0 at localhost:9092
    sarama: client.go:655: client/metadata found some partitions to be leaderless
    sarama: client.go:635: client/metadata retrying after 250ms... (3 attempts remaining)
    sarama: client.go:646: client/metadata fetching metadata for all topics from broker localhost:9092
    sarama: client.go:655: client/metadata found some partitions to be leaderless
    sarama: client.go:635: client/metadata retrying after 250ms... (2 attempts remaining)
    sarama: client.go:646: client/metadata fetching metadata for all topics from broker localhost:9092
    sarama: client.go:655: client/metadata found some partitions to be leaderless
    sarama: client.go:635: client/metadata retrying after 250ms... (1 attempts remaining)
    sarama: client.go:646: client/metadata fetching metadata for all topics from broker localhost:9092
    sarama: client.go:655: client/metadata found some partitions to be leaderless
    sarama: client.go:161: Successfully initialized new client
    sarama: config.go:351: ClientID is the default of 'sarama', you should consider setting it to something application-specific.
    sarama: async_producer.go:601: producer/broker/0 starting up
    sarama: async_producer.go:612: producer/broker/0 state change to [open] on people/0
    sarama: broker.go:144: Connected to broker at localhost:9092 (registered as #0)
    
    Problem Description

    When calling SendMessage on a single instance of syncProducer from multiple goroutines, some messages seem to fail to be produced to Kafka. I've looked at what ends up on the stream using Apache's kafka-console-consumer and it shows only a fraction of the messages on the stream anywhere from half of the messages down to none. I wrote my own consumer using sarama and it's the same issue, however I get the below error message back from sarama. I want to use syncProducer because I need to guarantee that messages will be published to the stream in the order that they're received by my application. Maybe I've just implemented it wrong, but right now I'm out of ideas and I'm hoping someone on here can help me out.

    sarama: consumer.go:755: consumer/broker/0 abandoned subscription to people/0 because kafka: response did not contain all the expected topic/partition blocks
    Error: kafka: error while consuming people/0: kafka: response did not contain all the expected topic/partition blocks
    sarama: consumer.go:345: consumer/people/0 finding new broker
    sarama: client.go:644: client/metadata fetching metadata for [people] from broker localhost:9092
    sarama: consumer.go:711: consumer/broker/0 added subscription to people/0
    

    Here's how I created my topic: bin/kafka-topics --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic people

    I'm running a single broker on my local machine. I've written a sample program that can reproduce the issue. It's also worth noting that none of the calls to sendMessage() are returning errors when I run the code.

    main.go

    package main
    
    import (
    	"bytes"
    	"fmt"
    	"log"
    	"strconv"
    	"sync"
    	"syncProducer/streamer"
    
    	"github.com/Shopify/sarama"
    	"github.com/linkedin/goavro"
    	uuid "github.com/satori/go.uuid"
    )
    
    const personSchema = `{
    	"type":"record",
    	"name":"Person",
    	"namespace":"com.example.people",
    	"fields":[
    		{
    			"name":"Name",
    			"type":"string"
    		},
    		{
    			"name":"Address",
    			"type":"string"
    		},{
    			"name":"City",
    			"type":"string"
    		},
    		{
    			"name":"State",
    			"type":"string"
    		},
    		{
    			"name":"ZIP",
    			"type":"long"
    		}
    	]
    }`
    
    var (
    	personCodec *goavro.Codec
    	buf         bytes.Buffer
    )
    
    type (
    	person struct {
    		Name    string
    		Address string
    		City    string
    		State   string
    		ZIP     int64
    	}
    )
    
    func main() {
    	var err error
    	personCodec, err = goavro.NewCodec(personSchema)
    	if err != nil {
    		panic(err)
    	}
    
    	producer, err := newSyncProducer()
    	if err != nil {
    		panic(err)
    	}
    	streamer := streamer.New(producer)
    
    	// Create 10 avro message bodies
    	var people [][]byte
    	for i := 1; i < 11; i++ {
    		aPerson := person{
    			Name:    "Bob #" + strconv.Itoa(i),
    			Address: strconv.Itoa(i) + " Main St.",
    			City:    "SomeTown",
    			State:   "CA",
    			ZIP:     90210,
    		}
    		data, err := convertToAvro(aPerson)
    		if err != nil {
    			panic("Could not convert aPerson " + strconv.Itoa(i) + " to avro.")
    		}
    		people = append(people, data)
    	}
    
    	errc := make(chan error, 10)
    
    	var wg sync.WaitGroup
    	// Send messages
    	for _, person := range people {
    		wg.Add(1)
    		go func(person []byte, c chan error, wg *sync.WaitGroup) {
    			uuid := uuid.NewV4().String()
    			err := streamer.SendActivity("people", uuid, "CreatePerson", person, nil)
    			c <- err
    			wg.Done()
    		}(person, errc, &wg)
    	}
    
    	wg.Wait()
    	close(errc)
    	fmt.Println("Completed!")
    	for i := range errc {
    		fmt.Println(i)
    		if i != nil {
    			fmt.Printf("Exit: %v\n", i)
    		}
    	}
    
    	fmt.Print(&buf)
    }
    
    func convertToAvro(aPerson person) ([]byte, error) {
    	data, err := personCodec.BinaryFromNative(nil, map[string]interface{}{
    		"Name":    aPerson.Name,
    		"Address": aPerson.Address,
    		"City":    aPerson.City,
    		"State":   aPerson.State,
    		"ZIP":     aPerson.ZIP,
    	})
    	if err != nil {
    		return nil, err
    	}
    
    	return data, nil
    }
    
    func newSyncProducer() (sarama.SyncProducer, error) {
    	config := sarama.NewConfig()
    	config.Producer.RequiredAcks = sarama.WaitForAll // Wait for all in-sync replicas to ack the message
    	config.Producer.Retry.Max = 10                   // Retry up to 10 times to produce the message
    	config.Producer.Return.Successes = true          // Required when using syncproducer
    	config.Producer.Return.Errors = true
    	config.Version = sarama.V0_11_0_0
    
    	sarama.Logger = log.New(&buf, "sarama: ", log.Lshortfile)
    
    	return sarama.NewSyncProducer([]string{"localhost:9092"}, config)
    }
    

    streamer.go

    package streamer
    
    import (
    	"github.com/Shopify/sarama"
    	"github.com/pkg/errors"
    )
    
    const (
    	MessageTypeHeaderKey = "message-type"
    	MessageIDHeaderKey = "message-id"
    )
    
    type (
    	// Metadata contains metadata for a given activity.
    	Metadata map[string][]string
    
    	// Streamer handles streaming activities to a topic.
    	Streamer struct {
    		producer sarama.SyncProducer
    	}
    )
    
    var (
    	// ErrNoSubjects denotes that no subjects were provided.
    	ErrNoSubjects = errors.New("At least one subject is required")
    )
    
    // New creates a new streamer.
    func New(producer sarama.SyncProducer) *Streamer {
    	return &Streamer{
    		producer: producer,
    	}
    }
    
    // SendActivity encapsulates the provided metadata and data in a message and send it to a topic.
    func (s *Streamer) SendActivity(topic string, messageID string, messageHeaderValue string, data []byte, metadata Metadata) error {
    	_, _, err := s.producer.SendMessage(&sarama.ProducerMessage{
    		Topic: topic,
    		Key:   sarama.StringEncoder(messageID),
    		Value: sarama.ByteEncoder(data),
    		Headers: []sarama.RecordHeader{
    			sarama.RecordHeader{
    				Key:   []byte(MessageIDHeaderKey),
    				Value: []byte(messageID),
    			},
    			sarama.RecordHeader{
    				Key:   []byte(MessageTypeHeaderKey),
    				Value: []byte(messageHeaderValue),
    			},
    		},
    	})
    	if err != nil {
    		return errors.Wrapf(err, "Error sending message to topic %s for ID %s", topic, messageID)
    	}
    
    	return nil
    }
    
  • MultiConsumer

    MultiConsumer

    @wvanbergen @snormore @mkobetic (and anyone else who cares)

    The current consumer works just fine, but is limited to fetching just a single partition of a single topic at once. You can run multiple consumers, but this has several issues:

    • They can't use the same client, or else the broker connection has contention problems (brokers handle requests on a single connection serially, so if it has to wait for MaxWaitTime before responding, this introduces severe latency when multiplexing busy and non-busy topics). Using separate clients works, but generates a lot of extra connections and metadata traffic.
    • You don't get messages from multiple partitions batched into a single response, when that makes sense for efficiency.

    The ideal solution is to have a consumer capable of fetching from multiple topic/partitions at once. This has been at the back of my mind for a while now, so I already have a design ready; if the design makes sense to y'all, then somebody just needs to write it :)

    While the user currently specifies a single topic/partition to the constructor, they should now be able to specify a set of topic/partitions. Since some of the configuration that is currently a singleton actually needs to be per-partition (OffsetMethod and OffsetValue at least), this would probably be of type map[string]map[int32]*ConsumerPartitionConfig. I considered permitting the dynamic adding/removing of partitions to the set, but I don't see a strong use case and it complicated a bunch of things.

    Events are returned to the user exactly the same way they are now, over a single channel. I considered a separate channel per topic/partition but it complicated the base case, and the events already contain information on topic/partition so it's not hard for the user to dispatch appropriately if they really want to.

    The constructor starts up one "controller" goroutine, which starts up and manages one goroutine per broker-that-has-a-partition-we-care-about and is responsible for (initially) dispatching each topic/partition to the appropriate broker's goroutine. The broker goroutine looks a lot like the current fetchMessages method with a few tweaks:

    • Some minor work needed to handle multiple blocks in the requests and responses.
    • When a topic/partition is reassigned to a new broker, that topic/partition gets returned to the controller via a channel; the goroutine tracks how many it is "responsible" for and exits if that reaches 0.
    • Similarly when a broker goes down hard, all topic/partitions are returned to the controller for re-dispatching and the goroutine exits.

    I expect the success case to be fairly straightforward - as always, the complexity will come when reasoning through the failure cases and ensuring that topic/partitions are redispatched correctly, messages are not accidentally skipped in that case, etc. etc.

    When the consumer is closed, it signals the controller which cleans up its children before exiting.

    Thoughts?

  • Better Broker Connection Management

    Better Broker Connection Management

    Following up to #7, #9, #10, #13 our connection management still has problems.

    First some things to know about Kafka that complicate the issue:

    • Broker metadata requests do not seem required to return all the brokers they know about (it is not documented which ones they do return, but I suspect it is the only the subset of brokers that are leading a partition whose data is also in the response).
    • The user only specifies (host, port) pairs while Kafka metadata returns (host, port, brokerId) triples, with no guarantee that the host matches the user-specified hostname, and with no apparent way to query a broker's own brokerId.

    Now some of the issues with our current implementation:

    • If the user provides three addresses, only the last of which is valid, we wait for the first two connections to completely fail when fetching metadata before we try the third. Since the TCP connection timeout can be very long (many minutes) this can take a long time. Trying them all in parallel and using the first valid one would be better.
    • Once we've had an address fail, we discard it forever. Over the lifetime of a long-running cluster, all of the nodes may be down at one point or another, meaning that eventually we may 'run out' of nodes and abort.

    Thoughts on potential future design:

    • We should connect to brokers lazily, ie only when we actually want to talk to them, not just when we know they exist.
    • Since there is only ever one leader for a partition, if the connection fails it doesn't make sense to just try another broker. Therefore the caller should decide to proceed, which means that the client's leader function shouldn't guarantee that the broker it returns is connected, only connecting.
    • However, when fetching metadata, any broker will do, so the any function should guarantee that the broker it returns is actually connected. Walking through bad brokers to find a good one could be slow, but connecting to all the brokers just to pick one and drop the others is gross. We could try to connect to one, and if it hasn't succeed in 5 seconds, start another, etc. That should cover the common case, since if it hasn't responded after 5 seconds it probably won't, and if it does then we'll drop it right away, oh well. More importantly, we won't spawn hundreds of connections if the first broker does respond normally.
    • User-provided addresses should be managed separately (which they already are) since they don't have brokerIds. They're only useful for metadata requests, so they should be disconnected entirely 99% of the time. If one of them fails, we should probably just mark it as recently-failed, and retry it after some time has passed. Only if all user-supplied nodes have recently failed (and we have no metadata from the cluster itself) should we abort somehow.

    Open Questions:

    • If a user-supplied broker is used to satisfy a call to any, when should it be disconnected? The extraBroker field in client currently is a hack to work around this, and it still leaves one connection floating around for now reason. Presumably disconnectBroker could be used and made smarter? However, if a metadata-supplied broker is used, calling disconnectBroker will pull it out from underneath other code that could be using it as a leader. Do we need reference-counting?
    • There is currently no way to tell that a Broker is 'Connecting' you can only tell that the lock is currently held. This means that if two concurrent leader calls would return the same broker and both call Open, the second one will block when we don't want it to. Do we need a second lock in the brokers? That gets really messy...
  • Async producer can overflow itself at high rate

    Async producer can overflow itself at high rate

    Versions

    Please specify real version numbers or git SHAs, not just "Latest" since that changes fairly regularly. Sarama Version: b1da1753dedcf77d053613b7eae907b98a2ddad5 Kafka Version: Irrelevant Go Version: b1da1753dedcf77d053613b7eae907b98a2ddad5

    Configuration

    What configuration values are you using for Sarama and Kafka?

    Sarama async producer:

    	conf := sarama.NewConfig()
    	conf.Metadata.Retry.Max = 1
    	conf.Metadata.Retry.Backoff = 250 * time.Millisecond
    	conf.Producer.RequiredAcks = sarama.RequiredAcks(sarama.WaitForLocal)
    	conf.Producer.Timeout = 1 * time.Second
    	conf.Producer.MaxMessageBytes = 16 << 20 // 16MB
    	conf.Producer.Flush.Bytes = 16 << 20 // 16MB
    	conf.Producer.Flush.Frequency = time.Minute
    	conf.Producer.Compression = sarama.CompressionNone // otherwise Kafka goes nuts
    	conf.Producer.Return.Errors = true
    	conf.Producer.Partitioner = NewIdentityPartitioner
    
    Logs

    Sarama logs:

    2017-01-09T23:30:21.504 myhost 2017/01/09 23:30:19 Kafka producer err: kafka: Failed to produce message to topic requests: kafka server: Message was too large, server rejected it to avoid allocation error.
    
    Problem Description

    Problem is in this function:

    • https://github.com/Shopify/sarama/blob/0fb560e5f7fbcaee2f75e3c34174320709f69944/async_producer.go#L583-L648

    We produce messages at such rate (100K/s+) so select always picks up processing of new incoming messages over rolling over and flushing existing ones.

    I applied to following patch:

    diff --git a/vendor/github.com/Shopify/sarama/async_producer.go b/vendor/github.com/Shopify/sarama/async_producer.go
    index e7ae8c2..13a888b 100644
    --- a/vendor/github.com/Shopify/sarama/async_producer.go
    +++ b/vendor/github.com/Shopify/sarama/async_producer.go
    @@ -2,6 +2,7 @@ package sarama
     
     import (
     	"fmt"
    +	"log"
     	"sync"
     	"time"
     
    @@ -249,6 +250,7 @@ func (p *asyncProducer) dispatcher() {
     		}
     
     		if msg.byteSize() > p.conf.Producer.MaxMessageBytes {
    +			log.Printf("Got message size bigger than allowed max message bytes %d > %d", msg.byteSize(), p.conf.Producer.MaxMessageBytes)
     			p.returnError(msg, ErrMessageSizeTooLarge)
     			continue
     		}
    @@ -577,9 +579,12 @@ func (bp *brokerProducer) run() {
     	var output chan<- *produceSet
     	Logger.Printf("producer/broker/%d starting up\n", bp.broker.ID())
     
    +	wasReadyTimes := 0
    +
     	for {
     		select {
     		case msg := <-bp.input:
    +			log.Println("INPUT MESSAGE")
     			if msg == nil {
     				bp.shutdown()
     				return
    @@ -625,14 +630,23 @@ func (bp *brokerProducer) run() {
     				bp.timer = time.After(bp.parent.conf.Producer.Flush.Frequency)
     			}
     		case <-bp.timer:
    +			log.Println("TIMER FIRED")
     			bp.timerFired = true
     		case output <- bp.buffer:
    +			wasReadyTimes = 0
    +			log.Println("ROLL OVER")
     			bp.rollOver()
     		case response := <-bp.responses:
    +			log.Println("HANDLING RESPONSE")
     			bp.handleResponse(response)
     		}
     
     		if bp.timerFired || bp.buffer.readyToFlush() {
    +			log.Println("READY TO FLUSH YAY")
    +			wasReadyTimes++
    +			if wasReadyTimes > 10 {
    +				log.Fatal("We were ready for a long time, but it did not happen. Exiting.")
    +			}
     			output = bp.output
     		} else {
     			output = nil
    diff --git a/vendor/github.com/Shopify/sarama/produce_set.go b/vendor/github.com/Shopify/sarama/produce_set.go
    index 9fe5f79..91a127f 100644
    --- a/vendor/github.com/Shopify/sarama/produce_set.go
    +++ b/vendor/github.com/Shopify/sarama/produce_set.go
    @@ -1,6 +1,9 @@
     package sarama
     
    -import "time"
    +import (
    +	"log"
    +	"time"
    +)
     
     type partitionSet struct {
     	msgs        []*ProducerMessage
    @@ -147,6 +150,7 @@ func (ps *produceSet) readyToFlush() bool {
     		return true
     	// If we've passed the byte trigger-point
     	case ps.parent.conf.Producer.Flush.Bytes > 0 && ps.bufferBytes >= ps.parent.conf.Producer.Flush.Bytes:
    +		log.Printf("ready to flush because buffer bytes are big enough: %d >= %d", ps.bufferBytes, ps.parent.conf.Producer.Flush.Bytes)
     		return true
     	default:
     		return false
    
    

    The output looks like this: https://gist.github.com/bobrik/27071d61d5ec98ed15ffd1cb5331f3f4.

    Without log.Fatal I've seen buffer sizes go as high as 40MB, which is bigger than message.max.bytes=33554432 in our cluster.

    The solution is probably to do rollOver outside of select.

  • Memory leak sometimes when restarting kafka partitions

    Memory leak sometimes when restarting kafka partitions

    I know this isn't a lot to go on, but hope you can give me pointers to debug this.

    Restarting kafka brokers occasionally produces ever increasing memory usage until the go process dies OOM. This isn't consistent, but especially happens we have to restart multiple kafka nodes at once. When I run pprof/heap I see the memory usage by kafka messages but cannot tell where inside the kafka code the memory is being held onto :( It only happens occasionally and only on our production tier. I also don't see increased goroutine numbers. We're running with 256 brokers and about 12 kafka hosts, using the kafka revision just after you fixed the WaitGroup bug. The log has messages like

    Failed to connect to XYZ: getsocopt: connection refused client/brokers deregistering XYZ client/brokers deregistered broker #-1 at kafkaXYZ:9092 ...

    Even when kafka is stable, and the messages go away, memory usage still increases forever until we OOM. A restart and things work great.

    Here is the config we use

        conf.Net.DialTimeout = time.Second * 30
    
        conf.ChannelBufferSize = 256
    
        conf.Producer.RequiredAcks = sarama.WaitForLocal
        conf.Producer.Return.Errors = true
        conf.Producer.Return.Successes = true
        conf.Producer.Partitioner = partitioner
        conf.Producer.Flush.Frequency = time.Millisecond * 250
        conf.Producer.Flush.Messages = 3000
        conf.Producer.Flush.Bytes = 83886080
    

    The biggest issues for us are that even when kafka recovers memory usage still increases forever in this strait line upward.

    When kafka failures happen, is it possible that multiple buffers could be created internally that end up buffering points?

    Note we also don't see increases in messages on the error channel.

  • [WIP] Offset manager tests

    [WIP] Offset manager tests

    This PR adds tests for the offset manager in PR #461. The majority of the code now has coverage.

    Notes

    • I was unable to reach the bom.updateSubscriptions loop in the brokerOffsetManager.abort() method; pointers for that would be appreciated.
    • The tests use a pretty simple setup; adding a second partition offset manager (in an attempt to reach the code in the previous point) led to a a broker unreference/refresh and a need for more mock responses that I didn't get to.
    • After spending a while with it, the code is still somewhat difficult to reason about, due to a lot of channels and resultant side effects. Also, 3 different objects whose responsibilities are not totally clear.

    @eapache @wvanbergen @horkhe

  • sarama/v2 should not leak its internals to the caller

    sarama/v2 should not leak its internals to the caller

    The Broker type used internally by Sarama is also directly used in the MetadataResponse type to decode the Broker id, host and (if applicable) rack into. This internal type then leaks further into the admin.go functionality such as admin.DescribeCluster()

    For Sarama/v2 we should ensure that the protocol types are distinct from any internal structs, and ideally the admin.go interfaces should also have their own dedicated types for what is returned to the enduser.

  • Azure Event Hubs: Broker not connected error when using alias for disaster recovery

    Azure Event Hubs: Broker not connected error when using alias for disaster recovery

    Versions

    | Sarama | Kafka | Go | |--------|-------|----| | 1.32.0| Azure Event Hubs| 1.18|

    Configuration

    What configuration values are you using for Sarama and Kafka?

    prdCfg.Version = sarama.V1_0_2_0 //I have also tried with V1_0_0_0
    prdCfg.Producer.Return.Successes = true
    cnsCfg.Version = sarama.V1_0_2_0
    if config.Cfg.SaramaLog == "y" {
    sarama.Logger, _ = zap.NewStdLogAt(logging.Get().Parent.With(zap.String("name", "sarama")), zapcore.DebugLevel)
    }
    cnsCfg.Consumer.Return.Errors = true
    if strings.Contains(config.Cfg.OutputBinderBroker, "localhost") {
    return
    }
    prdCfg.Net.SASL.Enable = true
    prdCfg.Net.DialTimeout = 10 * time.Second
    prdCfg.Net.SASL.User = config.Cfg.OutputBinderUser
    prdCfg.Net.SASL.Password = config.Cfg.OutputBinderPassword
    prdCfg.Net.SASL.Mechanism = "PLAIN"
    prdCfg.Net.TLS.Enable = true
    tlsConfig := &tls.Config{
    InsecureSkipVerify: true,
    ClientAuth:         0,
    }
    prdCfg.Net.TLS.Config = tlsConfig
    
    cnsCfg.Net.SASL.Enable = true
    cnsCfg.Net.DialTimeout = 10 * time.Second
    cnsCfg.Net.SASL.User = config.Cfg.InputBinderUser
    cnsCfg.Net.SASL.Password = config.Cfg.InputBinderPassword
    cnsCfg.Net.SASL.Mechanism = "PLAIN"
    cnsCfg.Net.TLS.Enable = true
    cnsCfg.Net.TLS.Config = tlsConfig
    
    Logs
    logs: CLICK ME

    {"log.level":"debug","@timestamp":"2022-04-28T14:41:30.585Z","log.origin":{"file.name":"[email protected]/consumer.go","file.line":920},"message":"consumer/broker/0 added subscription to reset-password/1","name":"sarama","ecs.version":"1.6.0"}
    
    {"log.level":"debug","@timestamp":"2022-04-28T14:41:29.590Z","log.origin":{"file.name":"[email protected]/broker.go","file.line":240},"message":"Connected to broker at st-evnt-bked-cana-we-01.servicebus.windows.net:9093 (registered as #0)","name":"sarama","ecs.version":"1.6.0"}
    
    {"log.level":"debug","@timestamp":"2022-04-28T14:41:29.590Z","log.origin":{"file.name":"[email protected]/broker.go","file.line":1242},"message":"SASL authentication successful with broker st-evnt-bked-cana-we-01.servicebus.windows.net:9093:4 - [0 0 0 0]","name":"sarama","ecs.version":"1.6.0"}
    
    {"log.level":"debug","@timestamp":"2022-04-28T14:41:29.588Z","log.origin":{"file.name":"[email protected]/broker.go","file.line":1169},"message":"Successful SASL handshake. Available mechanisms: [PLAIN OAUTHBEARER]","name":"sarama","ecs.version":"1.6.0"}
    
    {"log.level":"debug","@timestamp":"2022-04-28T14:41:29.541Z","log.origin":{"file.name":"[email protected]/client.go","file.line":959},"message":"client/brokers replaced registered broker #0 with st-evnt-bked-cana-we-01.servicebus.windows.net:9093","name":"sarama","ecs.version":"1.6.0"}
    
    {"log.level":"debug","@timestamp":"2022-04-28T14:41:29.541Z","log.origin":{"file.name":"[email protected]/broker.go","file.line":163},"message":"ClientID is the default of 'sarama', you should consider setting it to something application-specific.","name":"sarama","ecs.version":"1.6.0"}
    
    {"log.level":"debug","@timestamp":"2022-04-28T14:41:29.542Z","log.origin":{"file.name":"[email protected]/consumer.go","file.line":920},"message":"consumer/broker/0 added subscription to reset-password/0","name":"sarama","ecs.version":"1.6.0"}
    
    {"log.level":"debug","@timestamp":"2022-04-28T14:41:29.542Z","log.origin":{"file.name":"[email protected]/broker.go","file.line":299},"message":"Closed connection to broker st-evnt-bked-cana-geo-01.servicebus.windows.net:9093","name":"sarama","ecs.version":"1.6.0"}
    
    {"log.level":"debug","@timestamp":"2022-04-28T14:41:29.541Z","log.origin":{"file.name":"[email protected]/client.go","file.line":883},"message":"client/metadata fetching metadata for [reset-password] from broker st-evnt-bked-cana-geo-01.servicebus.windows.net:9093","name":"sarama","ecs.version":"1.6.0"}
    
    {"log.level":"debug","@timestamp":"2022-04-28T14:41:29.541Z","log.origin":{"file.name":"[email protected]/client.go","file.line":883},"message":"client/metadata fetching metadata for [reset-password] from broker st-evnt-bked-cana-geo-01.servicebus.windows.net:9093","name":"sarama","ecs.version":"1.6.0"}
    
    {"log.level":"debug","@timestamp":"2022-04-28T14:41:27.540Z","log.origin":{"file.name":"[email protected]/utils.go","file.line":43},"message":"consumer/broker/0 disconnecting due to error processing FetchRequest: kafka: broker not connected","name":"sarama","ecs.version":"1.6.0"}
    
    {"log.level":"error","@timestamp":"2022-04-28T14:41:27.540Z","log.origin":{"file.name":"kafka/consumer.go","file.line":45},"message":"Error on consumer","consumer_group":"&sarama.consumerGroup{client:(*sarama.client)(0xc0000d4090), config:(*sarama.Config)(0xc00022f180), consumer:(*sarama.consumer)(0xc0000d0630), groupID:\"reset-password-dequeuer\", memberID:\"st-evnt-bked-cana-we-01.servicebus.windows.net:c:reset-password-dequeuer:I:sarama-908e8d713767408eb1d516136b0914e7\", errors:(chan error)(0xc000282cc0), lock:sync.Mutex{state:1, sema:0x0}, closed:(chan sarama.none)(0xc0001003c0), closeOnce:sync.Once{done:0x0, m:sync.Mutex{state:0, sema:0x0}}, userData:[]uint8(nil)}","error":{"message":"kafka: error while consuming reset-password/1: kafka: broker not connected"},"ecs.version":"1.6.0"}
    
    {"log.level":"error","@timestamp":"2022-04-28T14:41:27.540Z","log.origin":{"file.name":"kafka/consumer.go","file.line":45},"message":"Error on consumer","consumer_group":"&sarama.consumerGroup{client:(*sarama.client)(0xc0000d4090), config:(*sarama.Config)(0xc00022f180), consumer:(*sarama.consumer)(0xc0000d0630), groupID:\"reset-password-dequeuer\", memberID:\"st-evnt-bked-cana-we-01.servicebus.windows.net:c:reset-password-dequeuer:I:sarama-908e8d713767408eb1d516136b0914e7\", errors:(chan error)(0xc000282cc0), lock:sync.Mutex{state:1, sema:0x0}, closed:(chan sarama.none)(0xc0001003c0), closeOnce:sync.Once{done:0x0, m:sync.Mutex{state:0, sema:0x0}}, userData:[]uint8(nil)}","error":{"message":"kafka: error while consuming reset-password/0: kafka: broker not connected"},"ecs.version":"1.6.0"}
    
    {"log.level":"debug","@timestamp":"2022-04-28T14:41:27.540Z","log.origin":{"file.name":"[email protected]/broker.go","file.line":299},"message":"Closed connection to broker st-evnt-bked-cana-we-01.servicebus.windows.net:9093","name":"sarama","ecs.version":"1.6.0"}
    
    {"log.level":"debug","@timestamp":"2022-04-28T14:41:27.403Z","log.origin":{"file.name":"[email protected]/broker.go","file.line":1242},"message":"SASL authentication successful with broker st-evnt-bked-cana-geo-01.servicebus.windows.net:9093:4 - [0 0 0 0]","name":"sarama","ecs.version":"1.6.0"}
    
    {"log.level":"debug","@timestamp":"2022-04-28T14:41:27.404Z","log.origin":{"file.name":"[email protected]/broker.go","file.line":240},"message":"Connected to broker at st-evnt-bked-cana-geo-01.servicebus.windows.net:9093 (registered as #0)","name":"sarama","ecs.version":"1.6.0"}
    
    {"log.level":"debug","@timestamp":"2022-04-28T14:41:27.402Z","log.origin":{"file.name":"[email protected]/broker.go","file.line":1169},"message":"Successful SASL handshake. Available mechanisms: [PLAIN OAUTHBEARER]","name":"sarama","ecs.version":"1.6.0"}
    
    {"log.level":"debug","@timestamp":"2022-04-28T14:41:27.309Z","log.origin":{"file.name":"[email protected]/client.go","file.line":1072},"message":"client/coordinator coordinator for consumergroup reset-password-dequeuer is #0 (st-evnt-bked-cana-geo-01.servicebus.windows.net:9093)","name":"sarama","ecs.version":"1.6.0"}
    
    {"log.level":"debug","@timestamp":"2022-04-28T14:41:27.309Z","log.origin":{"file.name":"[email protected]/client.go","file.line":596},"message":"client/brokers replaced registered broker #0 with st-evnt-bked-cana-geo-01.servicebus.windows.net:9093","name":"sarama","ecs.version":"1.6.0"}
    
    {"log.level":"debug","@timestamp":"2022-04-28T14:41:27.309Z","log.origin":{"file.name":"[email protected]/broker.go","file.line":163},"message":"ClientID is the default of 'sarama', you should consider setting it to something application-specific.","name":"sarama","ecs.version":"1.6.0"}
    
    {"log.level":"debug","@timestamp":"2022-04-28T14:41:27.308Z","log.origin":{"file.name":"[email protected]/client.go","file.line":1050},"message":"client/coordinator requesting coordinator for consumergroup reset-password-dequeuer from st-evnt-bked-cana-geo-01.servicebus.windows.net:9093","name":"sarama","ecs.version":"1.6.0"}
    
    {"log.level":"error","@timestamp":"2022-04-28T14:41:26.308Z","log.origin":{"file.name":"kafka/consumer.go","file.line":45},"message":"Error on consumer","consumer_group":"&sarama.consumerGroup{client:(*sarama.client)(0xc0000d4090), config:(*sarama.Config)(0xc00022f180), consumer:(*sarama.consumer)(0xc0000d0630), groupID:\"reset-password-dequeuer\", memberID:\"st-evnt-bked-cana-we-01.servicebus.windows.net:c:reset-password-dequeuer:I:sarama-908e8d713767408eb1d516136b0914e7\", errors:(chan error)(0xc000282cc0), lock:sync.Mutex{state:1, sema:0x0}, closed:(chan sarama.none)(0xc0001003c0), closeOnce:sync.Once{done:0x0, m:sync.Mutex{state:0, sema:0x0}}, userData:[]uint8(nil)}","error":{"message":"kafka: error while consuming reset-password/0: kafka: broker not connected"},"ecs.version":"1.6.0"}
    
    {"log.level":"error","@timestamp":"2022-04-28T14:41:26.308Z","log.origin":{"file.name":"kafka/consumer.go","file.line":45},"message":"Error on consumer","consumer_group":"&sarama.consumerGroup{client:(*sarama.client)(0xc0000d4090), config:(*sarama.Config)(0xc00022f180), consumer:(*sarama.consumer)(0xc0000d0630), groupID:\"reset-password-dequeuer\", memberID:\"st-evnt-bked-cana-we-01.servicebus.windows.net:c:reset-password-dequeuer:I:sarama-908e8d713767408eb1d516136b0914e7\", errors:(chan error)(0xc000282cc0), lock:sync.Mutex{state:1, sema:0x0}, closed:(chan sarama.none)(0xc0001003c0), closeOnce:sync.Once{done:0x0, m:sync.Mutex{state:0, sema:0x0}}, userData:[]uint8(nil)}","error":{"message":"kafka: error while consuming reset-password/1: kafka: broker not connected"},"ecs.version":"1.6.0"}
    
    {"log.level":"debug","@timestamp":"2022-04-28T14:41:25.623Z","log.origin":{"file.name":"kafka/producer.go","file.line":34},"message":"message sent","partition":0,"offset":14,"ecs.version":"1.6.0"}
    
    {"log.level":"debug","@timestamp":"2022-04-28T14:41:25.585Z","log.origin":{"file.name":"[email protected]/utils.go","file.line":43},"message":"producer/broker/0 starting up","name":"sarama","ecs.version":"1.6.0"}
    
    {"log.level":"debug","@timestamp":"2022-04-28T14:41:25.586Z","log.origin":{"file.name":"[email protected]/utils.go","file.line":43},"message":"producer/broker/0 state change to [open] on reset-password/0","name":"sarama","ecs.version":"1.6.0"}
    
    {"log.level":"debug","@timestamp":"2022-04-28T14:41:25.585Z","log.origin":{"file.name":"kafka/handler.go","file.line":40},"message":"Message processing","topic":"reset-password","partition":0,"offset":15,"ecs.version":"1.6.0"}
    
    {"log.level":"debug","@timestamp":"2022-04-28T14:41:25.585Z","log.origin":{"file.name":"kafka/handler.go","file.line":41},"message":"Message content","content":"{\r\n  \"Event\": {\r\n    \"id\": \"0123456789\",\r\n    \"type\": \"AbilitazioniServiziTelematici\",\r\n    \"notification_time\": 1634131014999\r\n  },\r\n  \"Action\": {\r\n    \"type\": \"Update\",\r\n    \"subtype\": \"Reset_password\"\r\n  },\r\n  \"Element\": {\r\n    \"client_code\": \"0056999\"\r\n  },\r\n  \"Data\": {\r\n    \"fiscal_code\": \"TSTMJR57C1xxxxxx\"\r\n  }\r\n}","ecs.version":"1.6.0"}
    
    
    

    Problem Description

    We have a service written in go which connects to azure event hub through the sarama library: it reads a message from a queue and it publishes it into another. We receives an error from the sarama consumer every time the service reads a message from the queue:

    "kafka: error while consuming reset-password/1: kafka: broker not connected"

    The error occurs when the service connects to event hub through the alias for disaster recovery (broker st-evnt-bked-cana-geo-01.servicebus.windows.net:9093). We have the alias st-evnt-bked-cana-geo-01 with the primary namespace st-evnt-bked-cana-we-01 and the secondary namespace st-evnt-bked-cana-ne-01. If we connect directly with the broker st-evnt-bked-cana-we-01.servicebus.windows.net:9093, than the error doesn't occur. Although this error, the message is received and published on the output queue, but we are concerned about the impact on the performance, because we can see from the logs that sarama reconnects to another broker (st-evnt-bked-cana-geo-01 or st-evnt-bked-cana-we-01). We already opened a ticket on Azure, but for Microsoft everything is ok.

  • AsyncProducer produce wrong message to Kafka

    AsyncProducer produce wrong message to Kafka

    Versions

    Please specify real version numbers or git SHAs, not just "Latest" since that changes fairly regularly.

    | Sarama | Kafka | Go | |--------|-------|----| | v1.32.0 | bitnami/kafka:3.1.0 | 1.16.4 |

    Configuration

    What configuration values are you using for Sarama and Kafka?

    func GetKafkaAsyncProducer() sarama.AsyncProducer {
    	if kafkaProducer != nil {
    		return *kafkaProducer
    	}
    
    	kafkaEtcdConfBytes, err := GetConfig(constant.EtcdKeyKafkaConfig)
    	if err != nil {
    		panic(err)
    	}
    
    	err = json.Unmarshal(kafkaEtcdConfBytes, &KafkaConf)
    	if err != nil {
    		panic(err)
    	}
    
    	kafkaConfig := sarama.NewConfig()
    	sarama.Logger = log2.New(os.Stdout, "[sarama] ", log2.LstdFlags)
    	kafkaConfig.Net.SASL.Enable = false
    	version, err := sarama.ParseKafkaVersion("3.1.0")
    	if err != nil {
    		panic(err)
    	}
    	kafkaConfig.Version = version
    	kafkaConfig.Producer.RequiredAcks = sarama.WaitForAll
    	kafkaConfig.Producer.Partitioner = sarama.NewRandomPartitioner
    	kafkaConfig.Producer.Return.Errors = true
    
    	p, err := sarama.NewAsyncProducer(KafkaConf.Endpoint, kafkaConfig)
    	if err != nil {
    		panic(err)
    	}
    	kafkaProducer = &p
    	return *kafkaProducer
    }
    
    Logs

    When filing an issue please provide logs from Sarama and Kafka if at all possible. You can set sarama.Logger to a log.Logger to capture Sarama debug output.

    logs: CLICK ME

    Problem Description

    When using AsyncProducer, will lead to message deliver problem. The bahvior is the first message is work as expect, but when recieve the scoend message, the message getting mess up. My sitution is put message {"L":"INFO","T":"2022-04-24T20:45:22+08:00","M":"User-Agent: PostmanRuntime/7.26.10","RID":"43447343-dbcf-4056-a99b-cde0d9ebb5b7"} into AsyncProducer.Input(), and the consumer get "RID":"43447343-dbcf-4056-a99b-cde0d9ebb5b7""M":"User-Agent: PostmanRuntime/7.26.10","RID":"43447343-dbcf-4056-a99b-cde0d9ebb5b7"} This problem will not show up when using SyncProducer and kafka_console_producer.sh.

  • Consumer stops consuming a single partition in a topic

    Consumer stops consuming a single partition in a topic

    Versions

    Please specify real version numbers or git SHAs, not just "Latest" since that changes fairly regularly.

    | Sarama | Kafka | Go | |--------|-------|----| | v1.30.1| 2.8 | 1.16.5|

    Configuration

    Assignment strategy is round robin

    What configuration values are you using for Sarama and Kafka?

    Logs

    When filing an issue please provide logs from Sarama and Kafka if at all possible. You can set sarama.Logger to a log.Logger to capture Sarama debug output.

    logs: CLICK ME

    Problem Description

    My app consume many topics. I have been running for a month. Right now, it cannot consume one partition from one topic. However, the other topics partitions are still get consumed. I wonder what can cause this partition stick problem. Right now, I resolve it by restarting the application.

pubsub controller using kafka and base on sarama. Easy controll flow for actions streamming, event driven.

Psub helper for create system using kafka to streaming and events driven base. Install go get github.com/teng231/psub have 3 env variables for config

May 10, 2022
Higher level abstraction for Sarama.
Higher level abstraction for Sarama.

kafka-do v0.1.5 kafka-do What Higher level abstraction for Sarama. Why We want to be able to write our kafka applications without making the same thin

Sep 30, 2021
Apache Kafka Web UI for exploring messages, consumers, configurations and more with a focus on a good UI & UX.
Apache Kafka Web UI for exploring messages, consumers, configurations and more with a focus on a good UI & UX.

Kowl - Apache Kafka Web UI Kowl (previously known as Kafka Owl) is a web application that helps you to explore messages in your Apache Kafka cluster a

May 17, 2022
Study project that uses Apache Kafka as syncing mechanism between two databases, with producers and consumers written in Go.

Kafka DB Sync Study project that uses Apache Kafka as syncing mechanisms between a monolith DB and a microservice. The main purpose of this project is

Dec 5, 2021
A quick introduction to how Apache Kafka works and differs from other messaging systems using an example application.
A quick introduction to how Apache Kafka works and differs from other messaging systems using an example application.

Apache Kafka in 6 minutes A quick introduction to how Apache Kafka works and differs from other messaging systems using an example application. In thi

Oct 27, 2021
Confluent's Apache Kafka Golang client

Confluent's Golang Client for Apache KafkaTM confluent-kafka-go is Confluent's Golang client for Apache Kafka and the Confluent Platform. Features: Hi

May 16, 2022
Modern CLI for Apache Kafka, written in Go.
Modern CLI for Apache Kafka, written in Go.

Kaf Kafka CLI inspired by kubectl & docker Install Install from source: go get -u github.com/birdayz/kaf/cmd/kaf Install binary: curl https://raw.git

May 8, 2022
CLI Tool to Stress Apache Kafka Clusters

Kafka Stress - Stress Test Tool for Kafka Clusters, Producers and Consumers Tunning Installation Docker docker pull fidelissauro/kafka-stress:latest d

Mar 19, 2022
franz-go - A complete Apache Kafka client written in Go

franz-go contains a feature complete, pure Go library for interacting with Kafka from 0.8.0 through 2.8.0+. Producing, consuming, transacting, administrating, etc.

May 13, 2022
Testing Apache Kafka using Go.

Apache Kafka Go Testing Apache Kafka using Go. Instructions Provision the single node Kafka cluster using Docker: docker-compose -p apache-kafka-go up

Dec 17, 2021
provider-kafka is a Crossplane Provider that is used to manage Kafka resources.

provider-kafka provider-kafka is a Crossplane Provider that is used to manage Kafka resources. Usage Create a provider secret containing a json like t

Apr 27, 2022
A CLI tool for interacting with Kafka through the Confluent Kafka Rest Proxy

kafkactl Table of contents kafkactl Table of contents Overview Build Development Overview kafkactl is a CLI tool to interact with Kafka through the Co

Nov 1, 2021
Apache Pulsar Go Client Library

Apache Pulsar Go Client Library A Go client library for the Apache Pulsar project. Goal This projects is developing a pure-Go client library for Pulsa

May 15, 2022
Producer x Consumer example using Kafka library and Go.

Go - Kafka - Example Apache Kafka Commands First of all, run the docker-compose docker-compose up, than run docker exec -it kafka_kafka_1 bash Topics

Dec 8, 2021
franz-go contains a high performance, pure Go library for interacting with Kafka from 0.8.0 through 2.7.0+. Producing, consuming, transacting, administrating, etc.

franz-go - Apache Kafka client written in Go Franz-go is an all-encompassing Apache Kafka client fully written Go. This library aims to provide every

May 14, 2022
kafka watcher for casbin library

Casbin Kafka Watcher Casbin watcher for kafka This watcher library will enable users to dynamically change casbin policies through kakfa messages Infl

May 8, 2021
Implementation of the NELI leader election protocol for Go and Kafka
Implementation of the NELI leader election protocol for Go and Kafka

goNELI Implementation of the NELI leader election protocol for Go and Kafka. goNELI encapsulates the 'fast' variation of the protocol, running in excl

Apr 28, 2022
Kafka producer and consumer tool in protobuf format.

protokaf Kafka producer and consumer tool in protobuf format. Features Consume and produce messages using Protobuf protocol Trace messages with Jaeger

Apr 6, 2022
ChizBroker is a fast and simple GRPC based implementation of kafka.
ChizBroker is a fast and simple GRPC based implementation of kafka.

Chiz Broker: a broker for fun ChizBroker is a fast and simple GRPC based implementation of kafka. Features: Ready to be deployed on kubernetes Prometh

May 3, 2022