Sarama is a Go library for Apache Kafka 0.8, and up.

sarama

Go Reference Build Status Coverage

Sarama is an MIT-licensed Go client library for Apache Kafka version 0.8 (and later).

Getting started

  • API documentation and examples are available via pkg.go.dev.
  • Mocks for testing are available in the mocks subpackage.
  • The examples directory contains more elaborate example applications.
  • The tools directory contains command line tools that can be useful for testing, diagnostics, and instrumentation.

You might also want to look at the Frequently Asked Questions.

Compatibility and API stability

Sarama provides a "2 releases + 2 months" compatibility guarantee: we support the two latest stable releases of Kafka and Go, and we provide a two month grace period for older releases. This means we currently officially support Go 1.14 through 1.15, and Kafka 2.5 through 2.7, although older releases are still likely to work.

Sarama follows semantic versioning and provides API stability via the gopkg.in service. You can import a version with a guaranteed stable API via http://gopkg.in/Shopify/sarama.v1. A changelog is available here.

Contributing

Comments
  • Implement a higher-level consumer group

    Implement a higher-level consumer group

    Related to my other PR #1083, this is another attempt, this time with a higher-level consumer API. I have been going back and forth on this one, I can't find a better way to make it flexible enough to support all the use cases and comprehensible at the same time. I would really appreciate some feedback.

  • OffsetManager Implementation

    OffsetManager Implementation

    There are still some TODOs sprinkled throughout, but not all of them necessarily need to be fixed before we ship this (it's already pretty close to the minimum viable implementation). The only critical missing piece is implementing AsyncClose for the PartitionOffsetManager.

    Regardless, it's at the state where it can be opened up for comments. This should completely supersede #379.

  • Lockless multiproducer

    Lockless multiproducer

    CC @sirupsen @wvanbergen @burke @fw42 @graemej

    Simple non-batching synchronous tests are working. Needs batching tests, but as Burke mentioned yesterday they are very messy with the current framework.

  • New producer design [beta]

    New producer design [beta]

    @wvanbergen @burke @Sirupsen @graemej and whoever else might be interested.

    This is an idea for a different API for the producer that is somewhat more channel/goroutine based. I was thinking about alternative architectures after reviewing the PR which adds the channels returning failed/successful messages. I started playing with things (after also reviewing the original multiproducer PRs) and came up with this.

    It ended up being basically a complete rewrite, so reviewing the diff won't be too helpful, just read the new producer.go straight. It's very much a work-in-progress, I'm more interested in architectural thoughts right now than code nits. It seems to pass the simple tests, but that's primarily accidental, shrug.

    Misc. notes on things that might cause questions:

    • User communication (input of new messages, receipt of errors) is all done with channels now, so the common idiom I expect would be a select on those two channels to avoid blocking.
    • It doesn't have a "synchronous" mode, instead the separate SimpleProducer uses the AckSuccesses flag to effectively imitate that.
    • In the normal success path, a message ends up passing through five goroutines before being bundled into a message and passed to the Broker object to put on the wire. Not sure if this will impact performance noticeably, but it means that if any intermediate step blocks (i.e. a metadata request for a new leader for a topic) then all other messages not directly impacted will continue processing, which is nice (assuming you configure a large enough channel buffer).
    • I delayed actually converting the Key/Value elements to bytes until the very last moment. This seems cleaner, and also potentially simplifies the addition of streaming messages which somebody asked for recently.
    • Hopefully the retry path is easier to reason about than the old one, as it is exactly the same as the normal path now (the message gets resubmitted on exactly the same channel as the user submits messages on, it just has an extra flag set). It should still preserve ordering correctly though (see the leaderDispatcher logic for this).
  • panic: sync: negative WaitGroup counter

    panic: sync: negative WaitGroup counter

    Seeing this occasionally in production:

    Revision: 23d523386ce0c886e56c9faf1b9c78b07e5b8c90

    panic: sync: negative WaitGroup counter
    
    goroutine 444 [running]:
    sync.(*WaitGroup).Add(0xc208e6c510, 0xffffffffffffffff)
            /usr/src/go/src/sync/waitgroup.go:67 +0x96
    sync.(*WaitGroup).Done(0xc208e6c510)
            /usr/src/go/src/sync/waitgroup.go:85 +0x31
    github.com/Shopify/sarama.(*asyncProducer).returnSuccesses(0xc208e6c4d0, 0xc29ffe9600, 0x3f, 0x40)
            /opt/goproj/Godeps/_workspace/src/github.com/Shopify/sarama/async_producer.go:753 +0xd3
    github.com/Shopify/sarama.(*asyncProducer).flusher(0xc208e6c4d0, 0xc20814c540, 0xc2080b9800)
            /opt/goproj/Godeps/_workspace/src/github.com/Shopify/sarama/async_producer.go:560 +0xafe
    github.com/Shopify/sarama.funcĀ·006()
            /opt/goproj/Godeps/_workspace/src/github.com/Shopify/sarama/async_producer.go:430 +0x43
    github.com/Shopify/sarama.withRecover(0xc20802b220)
            /opt/goproj/Godeps/_workspace/src/github.com/Shopify/sarama/utils.go:42 +0x3a
    created by github.com/Shopify/sarama.(*asyncProducer).messageAggregator
            /opt/goproj/Godeps/_workspace/src/github.com/Shopify/sarama/async_producer.go:430 +0x205
    
  • Not seeing all messages when syncProducer called from goroutines

    Not seeing all messages when syncProducer called from goroutines

    Versions

    Please specify real version numbers or git SHAs, not just "Latest" since that changes fairly regularly. Sarama Version: 1.15.0 Kafka Version: 0.11.0 Go Version: 1.9.2 darwin/amd64

    Configuration

    What configuration values are you using for Sarama and Kafka?

    Sarama configuration

    config.Producer.RequiredAcks = sarama.WaitForAll
    config.Producer.Retry.Max = 10
    config.Producer.Return.Successes = true
    config.Producer.Return.Errors = true
    config.Version = sarama.V0_11_0_0
    

    Kafka configuration

    broker.id=0
    delete.topic.enable=true
    listeners=PLAINTEXT://localhost:9092
    num.network.threads=3
    num.io.threads=8
    socket.send.buffer.bytes=102400
    socket.receive.buffer.bytes=102400
    socket.request.max.bytes=104857600
    log.dirs=/usr/local/Cellar/kafka/kafka-log-1
    num.partitions=1
    num.recovery.threads.per.data.dir=1
    offsets.topic.replication.factor=1
    transaction.state.log.replication.factor=1
    transaction.state.log.min.isr=1
    auto.create.topics.enable=false 
    log.retention.hours=168
    log.segment.bytes=1073741824
    log.retention.check.interval.ms=300000
    zookeeper.connection.timeout.ms=6000
    group.initial.rebalance.delay.ms=0
    
    Logs

    When filing an issue please provide logs from Sarama and Kafka if at all possible. You can set sarama.Logger to a log.Logger to capture Sarama debug output.

    sarama: client.go:115: Initializing new client
    sarama: config.go:351: ClientID is the default of 'sarama', you should consider setting it to something application-specific.
    sarama: config.go:351: ClientID is the default of 'sarama', you should consider setting it to something application-specific.
    sarama: client.go:646: client/metadata fetching metadata for all topics from broker localhost:9092
    sarama: broker.go:146: Connected to broker at localhost:9092 (unregistered)
    sarama: client.go:429: client/brokers registered new broker #0 at localhost:9092
    sarama: client.go:655: client/metadata found some partitions to be leaderless
    sarama: client.go:635: client/metadata retrying after 250ms... (3 attempts remaining)
    sarama: client.go:646: client/metadata fetching metadata for all topics from broker localhost:9092
    sarama: client.go:655: client/metadata found some partitions to be leaderless
    sarama: client.go:635: client/metadata retrying after 250ms... (2 attempts remaining)
    sarama: client.go:646: client/metadata fetching metadata for all topics from broker localhost:9092
    sarama: client.go:655: client/metadata found some partitions to be leaderless
    sarama: client.go:635: client/metadata retrying after 250ms... (1 attempts remaining)
    sarama: client.go:646: client/metadata fetching metadata for all topics from broker localhost:9092
    sarama: client.go:655: client/metadata found some partitions to be leaderless
    sarama: client.go:161: Successfully initialized new client
    sarama: config.go:351: ClientID is the default of 'sarama', you should consider setting it to something application-specific.
    sarama: async_producer.go:601: producer/broker/0 starting up
    sarama: async_producer.go:612: producer/broker/0 state change to [open] on people/0
    sarama: broker.go:144: Connected to broker at localhost:9092 (registered as #0)
    
    Problem Description

    When calling SendMessage on a single instance of syncProducer from multiple goroutines, some messages seem to fail to be produced to Kafka. I've looked at what ends up on the stream using Apache's kafka-console-consumer and it shows only a fraction of the messages on the stream anywhere from half of the messages down to none. I wrote my own consumer using sarama and it's the same issue, however I get the below error message back from sarama. I want to use syncProducer because I need to guarantee that messages will be published to the stream in the order that they're received by my application. Maybe I've just implemented it wrong, but right now I'm out of ideas and I'm hoping someone on here can help me out.

    sarama: consumer.go:755: consumer/broker/0 abandoned subscription to people/0 because kafka: response did not contain all the expected topic/partition blocks
    Error: kafka: error while consuming people/0: kafka: response did not contain all the expected topic/partition blocks
    sarama: consumer.go:345: consumer/people/0 finding new broker
    sarama: client.go:644: client/metadata fetching metadata for [people] from broker localhost:9092
    sarama: consumer.go:711: consumer/broker/0 added subscription to people/0
    

    Here's how I created my topic: bin/kafka-topics --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic people

    I'm running a single broker on my local machine. I've written a sample program that can reproduce the issue. It's also worth noting that none of the calls to sendMessage() are returning errors when I run the code.

    main.go

    package main
    
    import (
    	"bytes"
    	"fmt"
    	"log"
    	"strconv"
    	"sync"
    	"syncProducer/streamer"
    
    	"github.com/Shopify/sarama"
    	"github.com/linkedin/goavro"
    	uuid "github.com/satori/go.uuid"
    )
    
    const personSchema = `{
    	"type":"record",
    	"name":"Person",
    	"namespace":"com.example.people",
    	"fields":[
    		{
    			"name":"Name",
    			"type":"string"
    		},
    		{
    			"name":"Address",
    			"type":"string"
    		},{
    			"name":"City",
    			"type":"string"
    		},
    		{
    			"name":"State",
    			"type":"string"
    		},
    		{
    			"name":"ZIP",
    			"type":"long"
    		}
    	]
    }`
    
    var (
    	personCodec *goavro.Codec
    	buf         bytes.Buffer
    )
    
    type (
    	person struct {
    		Name    string
    		Address string
    		City    string
    		State   string
    		ZIP     int64
    	}
    )
    
    func main() {
    	var err error
    	personCodec, err = goavro.NewCodec(personSchema)
    	if err != nil {
    		panic(err)
    	}
    
    	producer, err := newSyncProducer()
    	if err != nil {
    		panic(err)
    	}
    	streamer := streamer.New(producer)
    
    	// Create 10 avro message bodies
    	var people [][]byte
    	for i := 1; i < 11; i++ {
    		aPerson := person{
    			Name:    "Bob #" + strconv.Itoa(i),
    			Address: strconv.Itoa(i) + " Main St.",
    			City:    "SomeTown",
    			State:   "CA",
    			ZIP:     90210,
    		}
    		data, err := convertToAvro(aPerson)
    		if err != nil {
    			panic("Could not convert aPerson " + strconv.Itoa(i) + " to avro.")
    		}
    		people = append(people, data)
    	}
    
    	errc := make(chan error, 10)
    
    	var wg sync.WaitGroup
    	// Send messages
    	for _, person := range people {
    		wg.Add(1)
    		go func(person []byte, c chan error, wg *sync.WaitGroup) {
    			uuid := uuid.NewV4().String()
    			err := streamer.SendActivity("people", uuid, "CreatePerson", person, nil)
    			c <- err
    			wg.Done()
    		}(person, errc, &wg)
    	}
    
    	wg.Wait()
    	close(errc)
    	fmt.Println("Completed!")
    	for i := range errc {
    		fmt.Println(i)
    		if i != nil {
    			fmt.Printf("Exit: %v\n", i)
    		}
    	}
    
    	fmt.Print(&buf)
    }
    
    func convertToAvro(aPerson person) ([]byte, error) {
    	data, err := personCodec.BinaryFromNative(nil, map[string]interface{}{
    		"Name":    aPerson.Name,
    		"Address": aPerson.Address,
    		"City":    aPerson.City,
    		"State":   aPerson.State,
    		"ZIP":     aPerson.ZIP,
    	})
    	if err != nil {
    		return nil, err
    	}
    
    	return data, nil
    }
    
    func newSyncProducer() (sarama.SyncProducer, error) {
    	config := sarama.NewConfig()
    	config.Producer.RequiredAcks = sarama.WaitForAll // Wait for all in-sync replicas to ack the message
    	config.Producer.Retry.Max = 10                   // Retry up to 10 times to produce the message
    	config.Producer.Return.Successes = true          // Required when using syncproducer
    	config.Producer.Return.Errors = true
    	config.Version = sarama.V0_11_0_0
    
    	sarama.Logger = log.New(&buf, "sarama: ", log.Lshortfile)
    
    	return sarama.NewSyncProducer([]string{"localhost:9092"}, config)
    }
    

    streamer.go

    package streamer
    
    import (
    	"github.com/Shopify/sarama"
    	"github.com/pkg/errors"
    )
    
    const (
    	MessageTypeHeaderKey = "message-type"
    	MessageIDHeaderKey = "message-id"
    )
    
    type (
    	// Metadata contains metadata for a given activity.
    	Metadata map[string][]string
    
    	// Streamer handles streaming activities to a topic.
    	Streamer struct {
    		producer sarama.SyncProducer
    	}
    )
    
    var (
    	// ErrNoSubjects denotes that no subjects were provided.
    	ErrNoSubjects = errors.New("At least one subject is required")
    )
    
    // New creates a new streamer.
    func New(producer sarama.SyncProducer) *Streamer {
    	return &Streamer{
    		producer: producer,
    	}
    }
    
    // SendActivity encapsulates the provided metadata and data in a message and send it to a topic.
    func (s *Streamer) SendActivity(topic string, messageID string, messageHeaderValue string, data []byte, metadata Metadata) error {
    	_, _, err := s.producer.SendMessage(&sarama.ProducerMessage{
    		Topic: topic,
    		Key:   sarama.StringEncoder(messageID),
    		Value: sarama.ByteEncoder(data),
    		Headers: []sarama.RecordHeader{
    			sarama.RecordHeader{
    				Key:   []byte(MessageIDHeaderKey),
    				Value: []byte(messageID),
    			},
    			sarama.RecordHeader{
    				Key:   []byte(MessageTypeHeaderKey),
    				Value: []byte(messageHeaderValue),
    			},
    		},
    	})
    	if err != nil {
    		return errors.Wrapf(err, "Error sending message to topic %s for ID %s", topic, messageID)
    	}
    
    	return nil
    }
    
  • MultiConsumer

    MultiConsumer

    @wvanbergen @snormore @mkobetic (and anyone else who cares)

    The current consumer works just fine, but is limited to fetching just a single partition of a single topic at once. You can run multiple consumers, but this has several issues:

    • They can't use the same client, or else the broker connection has contention problems (brokers handle requests on a single connection serially, so if it has to wait for MaxWaitTime before responding, this introduces severe latency when multiplexing busy and non-busy topics). Using separate clients works, but generates a lot of extra connections and metadata traffic.
    • You don't get messages from multiple partitions batched into a single response, when that makes sense for efficiency.

    The ideal solution is to have a consumer capable of fetching from multiple topic/partitions at once. This has been at the back of my mind for a while now, so I already have a design ready; if the design makes sense to y'all, then somebody just needs to write it :)

    While the user currently specifies a single topic/partition to the constructor, they should now be able to specify a set of topic/partitions. Since some of the configuration that is currently a singleton actually needs to be per-partition (OffsetMethod and OffsetValue at least), this would probably be of type map[string]map[int32]*ConsumerPartitionConfig. I considered permitting the dynamic adding/removing of partitions to the set, but I don't see a strong use case and it complicated a bunch of things.

    Events are returned to the user exactly the same way they are now, over a single channel. I considered a separate channel per topic/partition but it complicated the base case, and the events already contain information on topic/partition so it's not hard for the user to dispatch appropriately if they really want to.

    The constructor starts up one "controller" goroutine, which starts up and manages one goroutine per broker-that-has-a-partition-we-care-about and is responsible for (initially) dispatching each topic/partition to the appropriate broker's goroutine. The broker goroutine looks a lot like the current fetchMessages method with a few tweaks:

    • Some minor work needed to handle multiple blocks in the requests and responses.
    • When a topic/partition is reassigned to a new broker, that topic/partition gets returned to the controller via a channel; the goroutine tracks how many it is "responsible" for and exits if that reaches 0.
    • Similarly when a broker goes down hard, all topic/partitions are returned to the controller for re-dispatching and the goroutine exits.

    I expect the success case to be fairly straightforward - as always, the complexity will come when reasoning through the failure cases and ensuring that topic/partitions are redispatched correctly, messages are not accidentally skipped in that case, etc. etc.

    When the consumer is closed, it signals the controller which cleans up its children before exiting.

    Thoughts?

  • Better Broker Connection Management

    Better Broker Connection Management

    Following up to #7, #9, #10, #13 our connection management still has problems.

    First some things to know about Kafka that complicate the issue:

    • Broker metadata requests do not seem required to return all the brokers they know about (it is not documented which ones they do return, but I suspect it is the only the subset of brokers that are leading a partition whose data is also in the response).
    • The user only specifies (host, port) pairs while Kafka metadata returns (host, port, brokerId) triples, with no guarantee that the host matches the user-specified hostname, and with no apparent way to query a broker's own brokerId.

    Now some of the issues with our current implementation:

    • If the user provides three addresses, only the last of which is valid, we wait for the first two connections to completely fail when fetching metadata before we try the third. Since the TCP connection timeout can be very long (many minutes) this can take a long time. Trying them all in parallel and using the first valid one would be better.
    • Once we've had an address fail, we discard it forever. Over the lifetime of a long-running cluster, all of the nodes may be down at one point or another, meaning that eventually we may 'run out' of nodes and abort.

    Thoughts on potential future design:

    • We should connect to brokers lazily, ie only when we actually want to talk to them, not just when we know they exist.
    • Since there is only ever one leader for a partition, if the connection fails it doesn't make sense to just try another broker. Therefore the caller should decide to proceed, which means that the client's leader function shouldn't guarantee that the broker it returns is connected, only connecting.
    • However, when fetching metadata, any broker will do, so the any function should guarantee that the broker it returns is actually connected. Walking through bad brokers to find a good one could be slow, but connecting to all the brokers just to pick one and drop the others is gross. We could try to connect to one, and if it hasn't succeed in 5 seconds, start another, etc. That should cover the common case, since if it hasn't responded after 5 seconds it probably won't, and if it does then we'll drop it right away, oh well. More importantly, we won't spawn hundreds of connections if the first broker does respond normally.
    • User-provided addresses should be managed separately (which they already are) since they don't have brokerIds. They're only useful for metadata requests, so they should be disconnected entirely 99% of the time. If one of them fails, we should probably just mark it as recently-failed, and retry it after some time has passed. Only if all user-supplied nodes have recently failed (and we have no metadata from the cluster itself) should we abort somehow.

    Open Questions:

    • If a user-supplied broker is used to satisfy a call to any, when should it be disconnected? The extraBroker field in client currently is a hack to work around this, and it still leaves one connection floating around for now reason. Presumably disconnectBroker could be used and made smarter? However, if a metadata-supplied broker is used, calling disconnectBroker will pull it out from underneath other code that could be using it as a leader. Do we need reference-counting?
    • There is currently no way to tell that a Broker is 'Connecting' you can only tell that the lock is currently held. This means that if two concurrent leader calls would return the same broker and both call Open, the second one will block when we don't want it to. Do we need a second lock in the brokers? That gets really messy...
  • Async producer can overflow itself at high rate

    Async producer can overflow itself at high rate

    Versions

    Please specify real version numbers or git SHAs, not just "Latest" since that changes fairly regularly. Sarama Version: b1da1753dedcf77d053613b7eae907b98a2ddad5 Kafka Version: Irrelevant Go Version: b1da1753dedcf77d053613b7eae907b98a2ddad5

    Configuration

    What configuration values are you using for Sarama and Kafka?

    Sarama async producer:

    	conf := sarama.NewConfig()
    	conf.Metadata.Retry.Max = 1
    	conf.Metadata.Retry.Backoff = 250 * time.Millisecond
    	conf.Producer.RequiredAcks = sarama.RequiredAcks(sarama.WaitForLocal)
    	conf.Producer.Timeout = 1 * time.Second
    	conf.Producer.MaxMessageBytes = 16 << 20 // 16MB
    	conf.Producer.Flush.Bytes = 16 << 20 // 16MB
    	conf.Producer.Flush.Frequency = time.Minute
    	conf.Producer.Compression = sarama.CompressionNone // otherwise Kafka goes nuts
    	conf.Producer.Return.Errors = true
    	conf.Producer.Partitioner = NewIdentityPartitioner
    
    Logs

    Sarama logs:

    2017-01-09T23:30:21.504 myhost 2017/01/09 23:30:19 Kafka producer err: kafka: Failed to produce message to topic requests: kafka server: Message was too large, server rejected it to avoid allocation error.
    
    Problem Description

    Problem is in this function:

    • https://github.com/Shopify/sarama/blob/0fb560e5f7fbcaee2f75e3c34174320709f69944/async_producer.go#L583-L648

    We produce messages at such rate (100K/s+) so select always picks up processing of new incoming messages over rolling over and flushing existing ones.

    I applied to following patch:

    diff --git a/vendor/github.com/Shopify/sarama/async_producer.go b/vendor/github.com/Shopify/sarama/async_producer.go
    index e7ae8c2..13a888b 100644
    --- a/vendor/github.com/Shopify/sarama/async_producer.go
    +++ b/vendor/github.com/Shopify/sarama/async_producer.go
    @@ -2,6 +2,7 @@ package sarama
     
     import (
     	"fmt"
    +	"log"
     	"sync"
     	"time"
     
    @@ -249,6 +250,7 @@ func (p *asyncProducer) dispatcher() {
     		}
     
     		if msg.byteSize() > p.conf.Producer.MaxMessageBytes {
    +			log.Printf("Got message size bigger than allowed max message bytes %d > %d", msg.byteSize(), p.conf.Producer.MaxMessageBytes)
     			p.returnError(msg, ErrMessageSizeTooLarge)
     			continue
     		}
    @@ -577,9 +579,12 @@ func (bp *brokerProducer) run() {
     	var output chan<- *produceSet
     	Logger.Printf("producer/broker/%d starting up\n", bp.broker.ID())
     
    +	wasReadyTimes := 0
    +
     	for {
     		select {
     		case msg := <-bp.input:
    +			log.Println("INPUT MESSAGE")
     			if msg == nil {
     				bp.shutdown()
     				return
    @@ -625,14 +630,23 @@ func (bp *brokerProducer) run() {
     				bp.timer = time.After(bp.parent.conf.Producer.Flush.Frequency)
     			}
     		case <-bp.timer:
    +			log.Println("TIMER FIRED")
     			bp.timerFired = true
     		case output <- bp.buffer:
    +			wasReadyTimes = 0
    +			log.Println("ROLL OVER")
     			bp.rollOver()
     		case response := <-bp.responses:
    +			log.Println("HANDLING RESPONSE")
     			bp.handleResponse(response)
     		}
     
     		if bp.timerFired || bp.buffer.readyToFlush() {
    +			log.Println("READY TO FLUSH YAY")
    +			wasReadyTimes++
    +			if wasReadyTimes > 10 {
    +				log.Fatal("We were ready for a long time, but it did not happen. Exiting.")
    +			}
     			output = bp.output
     		} else {
     			output = nil
    diff --git a/vendor/github.com/Shopify/sarama/produce_set.go b/vendor/github.com/Shopify/sarama/produce_set.go
    index 9fe5f79..91a127f 100644
    --- a/vendor/github.com/Shopify/sarama/produce_set.go
    +++ b/vendor/github.com/Shopify/sarama/produce_set.go
    @@ -1,6 +1,9 @@
     package sarama
     
    -import "time"
    +import (
    +	"log"
    +	"time"
    +)
     
     type partitionSet struct {
     	msgs        []*ProducerMessage
    @@ -147,6 +150,7 @@ func (ps *produceSet) readyToFlush() bool {
     		return true
     	// If we've passed the byte trigger-point
     	case ps.parent.conf.Producer.Flush.Bytes > 0 && ps.bufferBytes >= ps.parent.conf.Producer.Flush.Bytes:
    +		log.Printf("ready to flush because buffer bytes are big enough: %d >= %d", ps.bufferBytes, ps.parent.conf.Producer.Flush.Bytes)
     		return true
     	default:
     		return false
    
    

    The output looks like this: https://gist.github.com/bobrik/27071d61d5ec98ed15ffd1cb5331f3f4.

    Without log.Fatal I've seen buffer sizes go as high as 40MB, which is bigger than message.max.bytes=33554432 in our cluster.

    The solution is probably to do rollOver outside of select.

  • Memory leak sometimes when restarting kafka partitions

    Memory leak sometimes when restarting kafka partitions

    I know this isn't a lot to go on, but hope you can give me pointers to debug this.

    Restarting kafka brokers occasionally produces ever increasing memory usage until the go process dies OOM. This isn't consistent, but especially happens we have to restart multiple kafka nodes at once. When I run pprof/heap I see the memory usage by kafka messages but cannot tell where inside the kafka code the memory is being held onto :( It only happens occasionally and only on our production tier. I also don't see increased goroutine numbers. We're running with 256 brokers and about 12 kafka hosts, using the kafka revision just after you fixed the WaitGroup bug. The log has messages like

    Failed to connect to XYZ: getsocopt: connection refused client/brokers deregistering XYZ client/brokers deregistered broker #-1 at kafkaXYZ:9092 ...

    Even when kafka is stable, and the messages go away, memory usage still increases forever until we OOM. A restart and things work great.

    Here is the config we use

        conf.Net.DialTimeout = time.Second * 30
    
        conf.ChannelBufferSize = 256
    
        conf.Producer.RequiredAcks = sarama.WaitForLocal
        conf.Producer.Return.Errors = true
        conf.Producer.Return.Successes = true
        conf.Producer.Partitioner = partitioner
        conf.Producer.Flush.Frequency = time.Millisecond * 250
        conf.Producer.Flush.Messages = 3000
        conf.Producer.Flush.Bytes = 83886080
    

    The biggest issues for us are that even when kafka recovers memory usage still increases forever in this strait line upward.

    When kafka failures happen, is it possible that multiple buffers could be created internally that end up buffering points?

    Note we also don't see increases in messages on the error channel.

  • consume slow

    consume slow

    Versions

    Sarama Version: 1.10.0 Kafka Version: 2.11-0.10.0.0 Go Version: 1.6.2

    Configuration

    What configuration values are you using for Sarama and Kafka?

    default

    Problem Description

    my code is as follows:

    consumer, err := sarama.NewConsumer([]string{kafka_host}, nil)
    if err != nil {
        panic(err)
    }
    
    defer func() {
        if err := consumer.Close(); err != nil {
            log.Fatalln(err)
        }
    }()
    
    partitionConsumer, err := consumer.ConsumePartition(raw_topic, 0, sarama.OffsetOldest) // from head
    if err != nil {
        panic(err)
    }
    
    defer func() {
        if err := partitionConsumer.Close(); err != nil {
            log.Fatalln(err)
        }
    }()
    
    // Trap SIGINT to trigger a shutdown.
    signals := make(chan os.Signal, 1)
    signal.Notify(signals, os.Interrupt)
    
    consumed := 0
    fmt.Printf("start at %v\n", time.Now())
    
        ConsumerLoop:
    for {
        select {
        case <-partitionConsumer.Messages():
            consumed++
            if consumed%10000 == 0 {
                fmt.Printf("consume %v at %v\n", consumed, time.Now())
            }
        case <-signals:
            break ConsumerLoop
        }
    }
    
    log.Printf("Consumed: %d\n", consumed)
    

    my kafka is full of messages, and the output is as follows:

          start at 2016-08-12 09:56:28.704023291 +0800 CST
          consume 10000 at 2016-08-12 09:56:50.771316475 +0800 CST
          consume 20000 at 2016-08-12 09:57:12.237980514 +0800 CST
          consume 30000 at 2016-08-12 09:57:33.174455169 +0800 CST
          ....
    

    It tooks about 2~3ms to consume a message, is this normal? it is too slow for me

  • fix(metrics): fix race when accessing metric registry

    fix(metrics): fix race when accessing metric registry

    A race condition was introduced in 5b04c98528ddcbb2e23cfaea937428e31ba232b2 (feat(metrics): track consumer-fetch-response-size) when passing the metric registry around to get additional metrics. Notably, handleResponsePromise() could access the registry after the broker has been closed and is tentatively being reopened. This triggers a data race because b.metricRegistry is being set during Open() (as it is part of the configuration).

    We fix this by reverting the addition of handleResponsePromise() as a method to Broker. Instead, we provide it with the metric registry as an argument. An alternative would have been to get the metric registry before the select call. However, removing it as a method make it clearer than this function is not allowed to access the broker internals as they are not protected by the lock and the broker may not be alive any more.

    All the following calls to b.metricRegistry are done while the lock is held:

    • inside Open(), the lock is held, including inside the goroutine
    • inside Close(), the lock is held
    • AsyncProduce() has a contract that it must be called while the broker is open, we keep a copy of the metric registry to use inside the callback
    • sendInternal(), has a contract that the lock should be held
    • authenticateViaSASLv1() is called from Open() and sendWithPromise(), both of them holding the lock
    • sendAndReceiveSASLHandshake() is called from
    • authenticateViaSASLv0/v1(), which are called from Open() and sendWithPromise()

    I am unsure about responseReceiver(), however, it is also calling b.readFull() which accesses b.conn, so I suppose it is safe.

    This leaves sendAndReceive() which is calling send(), which is calling sendWithPromise() which puts a lock. We move the lock to sendAndReceive() instead. send() is only called from sendAndReceiver() and we put a lock for sendWithPromise() other caller.

    The test has been stolen from #2393 from @samuelhewitt. #2393 is an alternative proposal using a RW lock to protect b.metricRegistry.

    Fix #2320

  • Consider switching default Kafka version to at least 2.1

    Consider switching default Kafka version to at least 2.1

    The default Kafka version was set to 1.0 via #1787. This was a huge improvement over the previous default and the hope was that proper api versions support would be supported some time later.

    2.5 years have passed and the default remains 1.0 if I understand correctly (please correct me if this is wrong).

    I am working on a KIP to remove older protocol versions in Apache Kafka 4.0 and the proposed baseline is Apache Kafka 2.1:

    https://cwiki.apache.org/confluence/display/KAFKA/KIP-896%3A+Remove+old+client+protocol+API+versions+in+Kafka+4.0

    The KIP is still under discussion (i.e. it hasn't been approved yet), but I wanted to:

    1. Give an early heads up to this project since it would impact your users with the current default
    2. Get your input on whether bumping the default Kafka version in the near term is something you would consider

    Thanks!

  • OffsetManager func constructRequest() should not set offsetCommitRequestBlock.timestamp when groupInstanceId is not nil

    OffsetManager func constructRequest() should not set offsetCommitRequestBlock.timestamp when groupInstanceId is not nil

    Versions

    Please specify real version numbers or git SHAs, not just "Latest" since that changes fairly regularly.

    | Sarama | Kafka | Go | |--------|-------|----| | 1.37.2 | 2.4.2 | 1.18 |

    Configuration

    What configuration values are you using for Sarama and Kafka?

    conf.Consumer.Group.InstanceId = someInstanceId
    conf.Consumer.Offsets.Retention == 0 // default value
    
    Logs
    Non-zero timestamp specified for OffsetCommitRequest not v1, it will be ignored
    Non-zero timestamp specified for OffsetCommitRequest not v1, it will be ignored
    
    Problem Description

    When calling func constructRequest() with conf.Consumer.Offsets.Retention == 0, OffsetCommitRequest with version 1 will be created. After offsetCommitRequestBlock is created with perPartitionTimestamp(ReceiveTime), OffsetCommitRequest.version is changed to 7. Thus, in func (b *offsetCommitRequestBlock) encode, the following check logic will print extra log every time.

    	if version == 1 {
    		pe.putInt64(b.timestamp)
    	} else if b.timestamp != 0 {
    		Logger.Println("Non-zero timestamp specified for OffsetCommitRequest not v1, it will be ignored")
    	}
    
  • Timeline on supporting kafka version 3.3.1 (Kraft)

    Timeline on supporting kafka version 3.3.1 (Kraft)

    Versions

    latest (v1.37.2)

    Configuration

    What configuration values are you using for Sarama and Kafka?

    V3_2_3_0  = newKafkaVersion(3, 2, 3, 0)
    
    Problem Description

    What is the timeline on supporting the latest Kraft production-ready Kafka version 3.3.1?

    The latest Sarama only has version variables until v3.2.3

  • Add an ErrorLogger to distinguish info from errors

    Add an ErrorLogger to distinguish info from errors

    Versions

    All

    Configuration

    What configuration values are you using for Sarama and Kafka? default

    Logs

    n/a

    Problem Description

    Sarama has a StdLogger and a DebugLogger, but no ErrorLogger. We override the existing loggers and push the messages to our logging stack, with either debug or info log level. However we'd like to know what messages are actually errors and push those with an error level so our systems can notify and record them correctly. It looks like Sarama uses the StdLogger for both informational and error messages, so we cannot easily extract the errors.

    Examples of error messages: Failed to connect to broker %s: %s Error while performing SASL handshake %s

    Examples of informational messages: consumer/broker/%d added subscription to %s/%d producer/leader/%s/%d selected broker %d

    One alternative I can see is to shift the informational logs to the debug logger, since most of the messages do seem to be errors.

    If you think this is reasonable, I'm happy to contribute a PR.

    Thanks!

pubsub controller using kafka and base on sarama. Easy controll flow for actions streamming, event driven.

Psub helper for create system using kafka to streaming and events driven base. Install go get github.com/teng231/psub have 3 env variables for config

Sep 26, 2022
Apache Kafka Web UI for exploring messages, consumers, configurations and more with a focus on a good UI & UX.
Apache Kafka Web UI for exploring messages, consumers, configurations and more with a focus on a good UI & UX.

Kowl - Apache Kafka Web UI Kowl (previously known as Kafka Owl) is a web application that helps you to explore messages in your Apache Kafka cluster a

Jan 3, 2023
Study project that uses Apache Kafka as syncing mechanism between two databases, with producers and consumers written in Go.

Kafka DB Sync Study project that uses Apache Kafka as syncing mechanisms between a monolith DB and a microservice. The main purpose of this project is

Dec 5, 2021
A quick introduction to how Apache Kafka works and differs from other messaging systems using an example application.
A quick introduction to how Apache Kafka works and differs from other messaging systems using an example application.

Apache Kafka in 6 minutes A quick introduction to how Apache Kafka works and differs from other messaging systems using an example application. In thi

Oct 27, 2021
Confluent's Apache Kafka Golang client

Confluent's Golang Client for Apache KafkaTM confluent-kafka-go is Confluent's Golang client for Apache Kafka and the Confluent Platform. Features: Hi

Dec 30, 2022
Modern CLI for Apache Kafka, written in Go.
Modern CLI for Apache Kafka, written in Go.

Kaf Kafka CLI inspired by kubectl & docker Install Install from source: go get -u github.com/birdayz/kaf/cmd/kaf Install binary: curl https://raw.git

Dec 31, 2022
CLI Tool to Stress Apache Kafka Clusters

Kafka Stress - Stress Test Tool for Kafka Clusters, Producers and Consumers Tunning Installation Docker docker pull fidelissauro/kafka-stress:latest d

Nov 13, 2022
franz-go - A complete Apache Kafka client written in Go

franz-go contains a feature complete, pure Go library for interacting with Kafka from 0.8.0 through 2.8.0+. Producing, consuming, transacting, administrating, etc.

Dec 29, 2022
Testing Apache Kafka using Go.

Apache Kafka Go Testing Apache Kafka using Go. Instructions Provision the single node Kafka cluster using Docker: docker-compose -p apache-kafka-go up

Dec 17, 2021
Higher level abstraction for Sarama.
Higher level abstraction for Sarama.

kafka-do v0.1.5 kafka-do What Higher level abstraction for Sarama. Why We want to be able to write our kafka applications without making the same thin

Sep 30, 2021
provider-kafka is a Crossplane Provider that is used to manage Kafka resources.

provider-kafka provider-kafka is a Crossplane Provider that is used to manage Kafka resources. Usage Create a provider secret containing a json like t

Oct 29, 2022
A CLI tool for interacting with Kafka through the Confluent Kafka Rest Proxy

kafkactl Table of contents kafkactl Table of contents Overview Build Development Overview kafkactl is a CLI tool to interact with Kafka through the Co

Nov 1, 2021
Apache Pulsar Go Client Library

Apache Pulsar Go Client Library A Go client library for the Apache Pulsar project. Goal This projects is developing a pure-Go client library for Pulsa

Jan 4, 2023
Producer x Consumer example using Kafka library and Go.

Go - Kafka - Example Apache Kafka Commands First of all, run the docker-compose docker-compose up, than run docker exec -it kafka_kafka_1 bash Topics

Dec 8, 2021
franz-go contains a high performance, pure Go library for interacting with Kafka from 0.8.0 through 2.7.0+. Producing, consuming, transacting, administrating, etc.

franz-go - Apache Kafka client written in Go Franz-go is an all-encompassing Apache Kafka client fully written Go. This library aims to provide every

Dec 29, 2022
kafka watcher for casbin library

Casbin Kafka Watcher Casbin watcher for kafka This watcher library will enable users to dynamically change casbin policies through kakfa messages Infl

May 8, 2021
Implementation of the NELI leader election protocol for Go and Kafka
Implementation of the NELI leader election protocol for Go and Kafka

goNELI Implementation of the NELI leader election protocol for Go and Kafka. goNELI encapsulates the 'fast' variation of the protocol, running in excl

Dec 8, 2022
Kafka producer and consumer tool in protobuf format.

protokaf Kafka producer and consumer tool in protobuf format. Features Consume and produce messages using Protobuf protocol Trace messages with Jaeger

Nov 15, 2022
ChizBroker is a fast and simple GRPC based implementation of kafka.
ChizBroker is a fast and simple GRPC based implementation of kafka.

Chiz Broker: a broker for fun ChizBroker is a fast and simple GRPC based implementation of kafka. Features: Ready to be deployed on kubernetes Prometh

Oct 30, 2022