Versions
Please specify real version numbers or git SHAs, not just "Latest" since that changes fairly regularly.
Sarama Version: 1.15.0
Kafka Version: 0.11.0
Go Version: 1.9.2 darwin/amd64
Configuration
What configuration values are you using for Sarama and Kafka?
Sarama configuration
config.Producer.RequiredAcks = sarama.WaitForAll
config.Producer.Retry.Max = 10
config.Producer.Return.Successes = true
config.Producer.Return.Errors = true
config.Version = sarama.V0_11_0_0
Kafka configuration
broker.id=0
delete.topic.enable=true
listeners=PLAINTEXT://localhost:9092
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/usr/local/Cellar/kafka/kafka-log-1
num.partitions=1
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
auto.create.topics.enable=false
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connection.timeout.ms=6000
group.initial.rebalance.delay.ms=0
Logs
When filing an issue please provide logs from Sarama and Kafka if at all
possible. You can set sarama.Logger
to a log.Logger
to capture Sarama debug
output.
sarama: client.go:115: Initializing new client
sarama: config.go:351: ClientID is the default of 'sarama', you should consider setting it to something application-specific.
sarama: config.go:351: ClientID is the default of 'sarama', you should consider setting it to something application-specific.
sarama: client.go:646: client/metadata fetching metadata for all topics from broker localhost:9092
sarama: broker.go:146: Connected to broker at localhost:9092 (unregistered)
sarama: client.go:429: client/brokers registered new broker #0 at localhost:9092
sarama: client.go:655: client/metadata found some partitions to be leaderless
sarama: client.go:635: client/metadata retrying after 250ms... (3 attempts remaining)
sarama: client.go:646: client/metadata fetching metadata for all topics from broker localhost:9092
sarama: client.go:655: client/metadata found some partitions to be leaderless
sarama: client.go:635: client/metadata retrying after 250ms... (2 attempts remaining)
sarama: client.go:646: client/metadata fetching metadata for all topics from broker localhost:9092
sarama: client.go:655: client/metadata found some partitions to be leaderless
sarama: client.go:635: client/metadata retrying after 250ms... (1 attempts remaining)
sarama: client.go:646: client/metadata fetching metadata for all topics from broker localhost:9092
sarama: client.go:655: client/metadata found some partitions to be leaderless
sarama: client.go:161: Successfully initialized new client
sarama: config.go:351: ClientID is the default of 'sarama', you should consider setting it to something application-specific.
sarama: async_producer.go:601: producer/broker/0 starting up
sarama: async_producer.go:612: producer/broker/0 state change to [open] on people/0
sarama: broker.go:144: Connected to broker at localhost:9092 (registered as #0)
Problem Description
When calling SendMessage on a single instance of syncProducer from multiple goroutines, some messages seem to fail to be produced to Kafka. I've looked at what ends up on the stream using Apache's kafka-console-consumer and it shows only a fraction of the messages on the stream anywhere from half of the messages down to none. I wrote my own consumer using sarama and it's the same issue, however I get the below error message back from sarama. I want to use syncProducer because I need to guarantee that messages will be published to the stream in the order that they're received by my application. Maybe I've just implemented it wrong, but right now I'm out of ideas and I'm hoping someone on here can help me out.
sarama: consumer.go:755: consumer/broker/0 abandoned subscription to people/0 because kafka: response did not contain all the expected topic/partition blocks
Error: kafka: error while consuming people/0: kafka: response did not contain all the expected topic/partition blocks
sarama: consumer.go:345: consumer/people/0 finding new broker
sarama: client.go:644: client/metadata fetching metadata for [people] from broker localhost:9092
sarama: consumer.go:711: consumer/broker/0 added subscription to people/0
Here's how I created my topic: bin/kafka-topics --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic people
I'm running a single broker on my local machine. I've written a sample program that can reproduce the issue. It's also worth noting that none of the calls to sendMessage() are returning errors when I run the code.
main.go
package main
import (
"bytes"
"fmt"
"log"
"strconv"
"sync"
"syncProducer/streamer"
"github.com/Shopify/sarama"
"github.com/linkedin/goavro"
uuid "github.com/satori/go.uuid"
)
const personSchema = `{
"type":"record",
"name":"Person",
"namespace":"com.example.people",
"fields":[
{
"name":"Name",
"type":"string"
},
{
"name":"Address",
"type":"string"
},{
"name":"City",
"type":"string"
},
{
"name":"State",
"type":"string"
},
{
"name":"ZIP",
"type":"long"
}
]
}`
var (
personCodec *goavro.Codec
buf bytes.Buffer
)
type (
person struct {
Name string
Address string
City string
State string
ZIP int64
}
)
func main() {
var err error
personCodec, err = goavro.NewCodec(personSchema)
if err != nil {
panic(err)
}
producer, err := newSyncProducer()
if err != nil {
panic(err)
}
streamer := streamer.New(producer)
// Create 10 avro message bodies
var people [][]byte
for i := 1; i < 11; i++ {
aPerson := person{
Name: "Bob #" + strconv.Itoa(i),
Address: strconv.Itoa(i) + " Main St.",
City: "SomeTown",
State: "CA",
ZIP: 90210,
}
data, err := convertToAvro(aPerson)
if err != nil {
panic("Could not convert aPerson " + strconv.Itoa(i) + " to avro.")
}
people = append(people, data)
}
errc := make(chan error, 10)
var wg sync.WaitGroup
// Send messages
for _, person := range people {
wg.Add(1)
go func(person []byte, c chan error, wg *sync.WaitGroup) {
uuid := uuid.NewV4().String()
err := streamer.SendActivity("people", uuid, "CreatePerson", person, nil)
c <- err
wg.Done()
}(person, errc, &wg)
}
wg.Wait()
close(errc)
fmt.Println("Completed!")
for i := range errc {
fmt.Println(i)
if i != nil {
fmt.Printf("Exit: %v\n", i)
}
}
fmt.Print(&buf)
}
func convertToAvro(aPerson person) ([]byte, error) {
data, err := personCodec.BinaryFromNative(nil, map[string]interface{}{
"Name": aPerson.Name,
"Address": aPerson.Address,
"City": aPerson.City,
"State": aPerson.State,
"ZIP": aPerson.ZIP,
})
if err != nil {
return nil, err
}
return data, nil
}
func newSyncProducer() (sarama.SyncProducer, error) {
config := sarama.NewConfig()
config.Producer.RequiredAcks = sarama.WaitForAll // Wait for all in-sync replicas to ack the message
config.Producer.Retry.Max = 10 // Retry up to 10 times to produce the message
config.Producer.Return.Successes = true // Required when using syncproducer
config.Producer.Return.Errors = true
config.Version = sarama.V0_11_0_0
sarama.Logger = log.New(&buf, "sarama: ", log.Lshortfile)
return sarama.NewSyncProducer([]string{"localhost:9092"}, config)
}
streamer.go
package streamer
import (
"github.com/Shopify/sarama"
"github.com/pkg/errors"
)
const (
MessageTypeHeaderKey = "message-type"
MessageIDHeaderKey = "message-id"
)
type (
// Metadata contains metadata for a given activity.
Metadata map[string][]string
// Streamer handles streaming activities to a topic.
Streamer struct {
producer sarama.SyncProducer
}
)
var (
// ErrNoSubjects denotes that no subjects were provided.
ErrNoSubjects = errors.New("At least one subject is required")
)
// New creates a new streamer.
func New(producer sarama.SyncProducer) *Streamer {
return &Streamer{
producer: producer,
}
}
// SendActivity encapsulates the provided metadata and data in a message and send it to a topic.
func (s *Streamer) SendActivity(topic string, messageID string, messageHeaderValue string, data []byte, metadata Metadata) error {
_, _, err := s.producer.SendMessage(&sarama.ProducerMessage{
Topic: topic,
Key: sarama.StringEncoder(messageID),
Value: sarama.ByteEncoder(data),
Headers: []sarama.RecordHeader{
sarama.RecordHeader{
Key: []byte(MessageIDHeaderKey),
Value: []byte(messageID),
},
sarama.RecordHeader{
Key: []byte(MessageTypeHeaderKey),
Value: []byte(messageHeaderValue),
},
},
})
if err != nil {
return errors.Wrapf(err, "Error sending message to topic %s for ID %s", topic, messageID)
}
return nil
}