Apache Pulsar Go Client Library

GoDoc Go Report Card Language LICENSE

Apache Pulsar Go Client Library

A Go client library for the Apache Pulsar project.

Goal

This projects is developing a pure-Go client library for Pulsar that does not depend on the C++ Pulsar library.

Once feature parity and stability are reached, this will supersede the current CGo based library.

Requirements

  • Go 1.11+

Status

Check the Projects page at https://github.com/apache/pulsar-client-go/projects for tracking the status and the progress.

Usage

Import the client library:

import "github.com/apache/pulsar-client-go/pulsar"

Create a Producer:

client, err := pulsar.NewClient(pulsar.ClientOptions{
    URL: "pulsar://localhost:6650",
})

defer client.Close()

producer, err := client.CreateProducer(pulsar.ProducerOptions{
	Topic: "my-topic",
})

_, err = producer.Send(context.Background(), &pulsar.ProducerMessage{
	Payload: []byte("hello"),
})

defer producer.Close()

if err != nil {
    fmt.Println("Failed to publish message", err)
}
fmt.Println("Published message")

Create a Consumer:

client, err := pulsar.NewClient(pulsar.ClientOptions{
    URL: "pulsar://localhost:6650",
})

defer client.Close()

consumer, err := client.Subscribe(pulsar.ConsumerOptions{
        Topic:            "my-topic",
        SubscriptionName: "my-sub",
        Type:             pulsar.Shared,
    })

defer consumer.Close()

msg, err := consumer.Receive(context.Background())
    if err != nil {
        log.Fatal(err)
    }

fmt.Printf("Received message msgId: %#v -- content: '%s'\n",
            msg.ID(), string(msg.Payload()))

Create a Reader:

client, err := pulsar.NewClient(pulsar.ClientOptions{URL: "pulsar://localhost:6650"})
if err != nil {
	log.Fatal(err)
}

defer client.Close()

reader, err := client.CreateReader(pulsar.ReaderOptions{
	Topic:          "topic-1",
	StartMessageID: pulsar.EarliestMessageID(),
})
if err != nil {
	log.Fatal(err)
}
defer reader.Close()

for reader.HasNext() {
	msg, err := reader.Next(context.Background())
	if err != nil {
		log.Fatal(err)
	}

	fmt.Printf("Received message msgId: %#v -- content: '%s'\n",
		msg.ID(), string(msg.Payload()))
}

Contributing

Contributions are welcomed and greatly appreciated. See CONTRIBUTING.md for details on submitting patches and the contribution workflow.

Contact

Mailing lists
Name Scope
[email protected] User-related discussions Subscribe Unsubscribe Archives
[email protected] Development-related discussions Subscribe Unsubscribe Archives
Slack

Pulsar slack channel #dev-go at https://apache-pulsar.slack.com/

You can self-register at https://apache-pulsar.herokuapp.com/

License

Licensed under the Apache License, Version 2.0: http://www.apache.org/licenses/LICENSE-2.0

Comments
  • HasNext returns false when reading last message inclusive

    HasNext returns false when reading last message inclusive

    Expected behavior

    Reader.HasNext() should return true when when reading last message inclusive.

    Actual behavior

    Reader.HasNext() returns false.

    Steps to reproduce

    package main
    
    import (
    	"context"
    	"fmt"
    	"testing"
    
    	"github.com/apache/pulsar-client-go/pulsar"
    	"github.com/stretchr/testify/assert"
    )
    
    const (
    	lookupURL = "pulsar://localhost:6650"
    	topic     = "persistent://public/default/test"
    )
    
    func TestReaderLatestInclusiveHasNext(t *testing.T) {
    	client, err := pulsar.NewClient(pulsar.ClientOptions{
    		URL: lookupURL,
    	})
    
    	assert.Nil(t, err)
    	defer client.Close()
    
    	ctx := context.Background()
    
    	// create producer
    	producer, err := client.CreateProducer(pulsar.ProducerOptions{
    		Topic:           topic,
    		DisableBatching: true,
    	})
    	assert.Nil(t, err)
    	defer producer.Close()
    
    	// send 10 messages
    	var lastMsgID pulsar.MessageID
    	for i := 0; i < 10; i++ {
    		lastMsgID, err = producer.Send(ctx, &pulsar.ProducerMessage{
    			Payload: []byte(fmt.Sprintf("hello-%d", i)),
    		})
    		assert.NoError(t, err)
    		assert.NotNil(t, lastMsgID)
    	}
    
    	// create reader on the last message (inclusive)
    	reader, err := client.CreateReader(pulsar.ReaderOptions{
    		Topic:                   topic,
    		StartMessageID:          pulsar.LatestMessageID(),
    		StartMessageIDInclusive: true,
    	})
    
    	assert.Nil(t, err)
    	defer reader.Close()
    
    	var msgID pulsar.MessageID
    	if reader.HasNext() {
    		msg, err := reader.Next(context.Background())
    		assert.NoError(t, err)
    
    		assert.Equal(t, []byte("hello-9"), msg.Payload()) // updated after original post
    		msgID = msg.ID()
    	}
    
    	assert.Equal(t, lastMsgID.Serialize(), msgID.Serialize()) // updated after original post
    }
    

    System configuration

    Pulsar version: 2.6.0

  • Go build fails to compile

    Go build fails to compile

    Problem

    I'm tying to build the code, which just import "github.com/apache/pulsar-client-go/pulsar", and output is below.

    go: finding module for package github.com/apache/pulsar-client-go/pulsar
    go: downloading github.com/apache/pulsar-client-go v0.3.0
    go: found github.com/apache/pulsar-client-go/pulsar in github.com/apache/pulsar-client-go v0.3.0
    go: downloading github.com/prometheus/client_golang v1.7.1
    go: downloading github.com/pkg/errors v0.9.1
    go: downloading github.com/yahoo/athenz v1.8.55
    go: downloading github.com/apache/pulsar-client-go/oauth2 v0.0.0-20200715083626-b9f8c5cedefb
    go: downloading github.com/sirupsen/logrus v1.4.2
    go: downloading github.com/gogo/protobuf v1.3.1
    go: downloading github.com/klauspost/compress v1.10.8
    go: downloading github.com/spaolacci/murmur3 v1.1.0
    go: downloading github.com/pierrec/lz4 v2.0.5+incompatible
    go: downloading github.com/linkedin/goavro/v2 v2.9.8
    go: downloading github.com/datadog/zstd v1.4.6-0.20200617134701-89f69fb7df32
    go: downloading golang.org/x/oauth2 v0.0.0-20200107190931-bf48bf16ab8d
    go: downloading golang.org/x/sys v0.0.0-20200615200032-f1bc736245b1
    go: downloading github.com/golang/snappy v0.0.1
    go: downloading github.com/dgrijalva/jwt-go v3.2.0+incompatible
    go: downloading github.com/99designs/keyring v1.1.5
    go: downloading golang.org/x/net v0.0.0-20200520004742-59133d7f0dd7
    go: downloading github.com/dvsekhvalnov/jose2go v0.0.0-20180829124132-7f401d37b68a
    go: downloading github.com/keybase/go-keychain v0.0.0-20190712205309-48d3d31d256d
    go: downloading github.com/mtibben/percent v0.2.1
    go: downloading golang.org/x/crypto v0.0.0-20190701094942-4def268fd1a4
    go: downloading github.com/mitchellh/go-homedir v1.1.0
    go: downloading github.com/ardielle/ardielle-go v1.5.2
    go: downloading github.com/golang/protobuf v1.4.2
    go: downloading github.com/prometheus/client_model v0.2.0
    go: downloading github.com/prometheus/common v0.10.0
    go: downloading github.com/cespare/xxhash/v2 v2.1.1
    go: downloading github.com/beorn7/perks v1.0.1
    go: downloading github.com/prometheus/procfs v0.1.3
    go: downloading github.com/matttproud/golang_protobuf_extensions v1.0.1
    go: downloading google.golang.org/protobuf v1.23.0
    # github.com/keybase/go-keychain
    ../../../../pkg/mod/github.com/keybase/[email protected]/macos_1.10.go:89:13: could not determine kind of name for C.SecTrustedApplicationCreateFromPath
    

    It may be caused by referring github.com/keybase/[email protected]/macos_1.10.go because that is not latest one.

    Environment

    os: macOS Mojave 10.14.6 go: go1.15.3 darwin/amd64 pulsar-client-go: 0.3.0

  • [ISSUE #68][feat]add Option config for init (#68)

    [ISSUE #68][feat]add Option config for init (#68)

    Change-Id: I85a9c9f20e61e126b617eab919d2405a3ebda087

    Master Issue: #68

    Motivation

    simple initialization of Producer

    Modifications

    add ProducerOption for newProducer of client

    Verifying this change

    • [x] Make sure that the change passes the CI checks.

    Does this pull request potentially affect one of the following parts:

    If yes was chosen, please highlight the changes

    • Dependencies (does it add or upgrade a dependency): (yes / no)
    • The public API: (yes / no)
    • The schema: (yes / no / don't know)
    • The default values of configurations: (yes / no)
    • The wire protocol: (yes / no)

    Documentation

    • Does this pull request introduce a new feature? (yes / no)
    • If yes, how is the feature documented? (not applicable / docs / GoDocs / not documented)
    • If a feature is not applicable for documentation, explain why?
    • If a feature is not documented yet in this PR, please create a followup issue for adding the documentation
  • [Issue 662] Fix race in connection.go waitUntilReady()

    [Issue 662] Fix race in connection.go waitUntilReady()

    Fixes #662

    Motivation

    As described in #662, there appears to be a potential race condition in connection.go function waitUntilReady(): the cond.Broadcast() can occur before the cond.Wait().

    [EDIT:] To be explicit, this is the issue:

    [goroutine A] calls c.getState() and sees that it is not set to connectionReady [goroutine B] changes the state to connectionReady [goroutine B] sends a cond.Broadcast(), which goes nowhere because no goroutine is waiting. [goroutine A] calls cond.Wait(), which never completes

    Modifications

    Function waitUntilReady() was previously holding the global c.Lock() on the connection. From my reading of the code, this mutex is intended to protect the cnx variable. I think that the use of c.Lock() in waitUntilReady() was probably just a typo.

    Instead, I think the author probably intended to grab the lock associated with the sync.Cond, i.e. c.cond.L.Lock(). This looks like the correct thing to do when using a sync.Cond. I think there should be a corresponding lock around the cond.Broadcast() also. See https://stackoverflow.com/questions/36857167/how-to-correctly-use-sync-cond/42772799#42772799 for more details.

    Verifying this change

    I am unsure if this change is covered by existing tests. It fixes a rare race condition, so I think it would be quite difficult to test for.

    I have deployed this change on my own production system, and it doesn't obviously break anything. I will report back if I see any issues that might be related to it.

    Does this pull request potentially affect one of the following parts:

    • Dependencies (does it add or upgrade a dependency): no
    • The public API: no
    • The schema: no
    • The default values of configurations: no
    • The wire protocol: no

    Documentation

    • Does this pull request introduce a new feature? no
    • If yes, how is the feature documented? not applicable
  • feat: support multiple schema version for producer and consumer

    feat: support multiple schema version for producer and consumer

    Motivation

    Implement PIP-43. Support multiple schema version for producer and consumer.

    Modifications

    • add schema cache for producer and consumer
    • add DisableMultiSchema option for producer

    Verifying this change

    • [x] Make sure that the change passes the CI checks.

    (Please pick either of the following options)

    Does this pull request potentially affect one of the following parts:

    If yes was chosen, please highlight the changes

    • Dependencies (does it add or upgrade a dependency): (no)
    • The public API: (yes)
    • The schema: (yes)
    • The default values of configurations: (yes)
    • The wire protocol: (no)

    Documentation

    • Does this pull request introduce a new feature? (no)
    • If yes, how is the feature documented? (not applicable / docs / GoDocs / not documented)
    • If a feature is not applicable for documentation, explain why?
    • If a feature is not documented yet in this PR, please create a followup issue for adding the documentation
  • consumer.Receive () is blocking, availablePermits are decreasing, but msgBacklog is increasing

    consumer.Receive () is blocking, availablePermits are decreasing, but msgBacklog is increasing

    Expected behavior

    The topic stats as follows:

    {
      "msgRateIn" : 0.0,
      "msgThroughputIn" : 0.0,
      "msgRateOut" : 0.0,
      "msgThroughputOut" : 0.0,
      "averageMsgSize" : 0.0,
      "storageSize" : 0,
      "publishers" : [ ],
      "subscriptions" : {
        "response:proxy:8787:9:1577364969737" : {
          "msgRateOut" : 0.0,
          "msgThroughputOut" : 0.0,
          "msgRateRedeliver" : 0.0,
          "msgBacklog" : 0,
          "blockedSubscriptionOnUnackedMsgs" : false,
          "msgDelayed" : 0,
          "unackedMessages" : 0,
          "type" : "Failover",
          "activeConsumerName" : "lxxlc",
          "msgRateExpired" : 0.0,
          "consumers" : [ {
            "msgRateOut" : 0.0,
            "msgThroughputOut" : 0.0,
            "msgRateRedeliver" : 0.0,
            "consumerName" : "lxxlc",
            "availablePermits" : 1024,
            "unackedMessages" : 0,
            "blockedConsumerOnUnackedMsgs" : false,
            "metadata" : { },
            "connectedSince" : "2019-12-26T20:56:09.832+08:00",
            "address" : "/10.32.68.213:57108"
          } ],
          "isReplicated" : false
        }
      },
      "replication" : { },
      "deduplicationStatus" : "Disabled"
    }
    {
      "msgRateIn" : 0.0,
      "msgThroughputIn" : 0.0,
      "msgRateOut" : 0.0,
      "msgThroughputOut" : 0.0,
      "averageMsgSize" : 0.0,
      "storageSize" : 2967,
      "publishers" : [ {
        "msgRateIn" : 0.0,
        "msgThroughputIn" : 0.0,
        "averageMsgSize" : 0.0,
        "producerId" : 1,
        "metadata" : { },
        "producerName" : "qa-pulsar-ten-5-279",
        "connectedSince" : "2019-12-26T20:56:40.736+08:00",
        "address" : "/10.32.68.213:43156"
      } ],
      "subscriptions" : {
        "response:proxy:8787:9:1577364969737" : {
          "msgRateOut" : 0.0,
          "msgThroughputOut" : 0.0,
          "msgRateRedeliver" : 0.0,
          "msgBacklog" : 1,
          "blockedSubscriptionOnUnackedMsgs" : false,
          "msgDelayed" : 0,
          "unackedMessages" : 0,
          "type" : "Failover",
          "activeConsumerName" : "lxxlc",
          "msgRateExpired" : 0.0,
          "consumers" : [ {
            "msgRateOut" : 0.0,
            "msgThroughputOut" : 0.0,
            "msgRateRedeliver" : 0.0,
            "consumerName" : "lxxlc",
            "availablePermits" : 1021,
            "unackedMessages" : 0,
            "blockedConsumerOnUnackedMsgs" : false,
            "metadata" : { },
            "connectedSince" : "2019-12-26T20:56:09.832+08:00",
            "address" : "/10.32.68.213:57108"
          } ],
          "isReplicated" : false
        }
      },
      "replication" : { },
      "deduplicationStatus" : "Disabled"
    }
    {
      "msgRateIn" : 0.0,
      "msgThroughputIn" : 0.0,
      "msgRateOut" : 0.0,
      "msgThroughputOut" : 0.0,
      "averageMsgSize" : 0.0,
      "storageSize" : 5681,
      "publishers" : [ {
        "msgRateIn" : 0.0,
        "msgThroughputIn" : 0.0,
        "averageMsgSize" : 0.0,
        "producerId" : 1,
        "metadata" : { },
        "producerName" : "qa-pulsar-ten-5-279",
        "connectedSince" : "2019-12-26T20:56:40.736+08:00",
        "address" : "/10.32.68.213:43156"
      } ],
      "subscriptions" : {
        "response:proxy:8787:9:1577364969737" : {
          "msgRateOut" : 0.0,
          "msgThroughputOut" : 0.0,
          "msgRateRedeliver" : 0.0,
          "msgBacklog" : 3,
          "blockedSubscriptionOnUnackedMsgs" : false,
          "msgDelayed" : 0,
          "unackedMessages" : 0,
          "type" : "Failover",
          "activeConsumerName" : "lxxlc",
          "msgRateExpired" : 0.0,
          "consumers" : [ {
            "msgRateOut" : 0.0,
            "msgThroughputOut" : 0.0,
            "msgRateRedeliver" : 0.0,
            "consumerName" : "lxxlc",
            "availablePermits" : 1019,
            "unackedMessages" : 0,
            "blockedConsumerOnUnackedMsgs" : false,
            "metadata" : { },
            "connectedSince" : "2019-12-26T20:56:09.832+08:00",
            "address" : "/10.32.68.213:57108"
          } ],
          "isReplicated" : false
        }
      },
      "replication" : { },
      "deduplicationStatus" : "Disabled"
    }
    {
      "msgRateIn" : 0.0,
      "msgThroughputIn" : 0.0,
      "msgRateOut" : 0.0,
      "msgThroughputOut" : 0.0,
      "averageMsgSize" : 0.0,
      "storageSize" : 7169,
      "publishers" : [ {
        "msgRateIn" : 0.0,
        "msgThroughputIn" : 0.0,
        "averageMsgSize" : 0.0,
        "producerId" : 1,
        "metadata" : { },
        "producerName" : "qa-pulsar-ten-5-279",
        "connectedSince" : "2019-12-26T20:56:40.736+08:00",
        "address" : "/10.32.68.213:43156"
      } ],
      "subscriptions" : {
        "response:proxy:8787:9:1577364969737" : {
          "msgRateOut" : 0.0,
          "msgThroughputOut" : 0.0,
          "msgRateRedeliver" : 0.0,
          "msgBacklog" : 5,
          "blockedSubscriptionOnUnackedMsgs" : false,
          "msgDelayed" : 0,
          "unackedMessages" : 0,
          "type" : "Failover",
          "activeConsumerName" : "lxxlc",
          "msgRateExpired" : 0.0,
          "consumers" : [ {
            "msgRateOut" : 0.0,
            "msgThroughputOut" : 0.0,
            "msgRateRedeliver" : 0.0,
            "consumerName" : "lxxlc",
            "availablePermits" : 1017,
            "unackedMessages" : 0,
            "blockedConsumerOnUnackedMsgs" : false,
            "metadata" : { },
            "connectedSince" : "2019-12-26T20:56:09.832+08:00",
            "address" : "/10.32.68.213:57108"
          } ],
          "isReplicated" : false
        }
      },
      "replication" : { },
      "deduplicationStatus" : "Disabled"
    }
    {
      "msgRateIn" : 0.0,
      "msgThroughputIn" : 0.0,
      "msgRateOut" : 0.0,
      "msgThroughputOut" : 0.0,
      "averageMsgSize" : 0.0,
      "storageSize" : 8718,
      "publishers" : [ {
        "msgRateIn" : 0.0,
        "msgThroughputIn" : 0.0,
        "averageMsgSize" : 0.0,
        "producerId" : 1,
        "metadata" : { },
        "producerName" : "qa-pulsar-ten-5-279",
        "connectedSince" : "2019-12-26T20:56:40.736+08:00",
        "address" : "/10.32.68.213:43156"
      } ],
      "subscriptions" : {
        "response:proxy:8787:9:1577364969737" : {
          "msgRateOut" : 0.0,
          "msgThroughputOut" : 0.0,
          "msgRateRedeliver" : 0.0,
          "msgBacklog" : 8,
          "blockedSubscriptionOnUnackedMsgs" : false,
          "msgDelayed" : 0,
          "unackedMessages" : 0,
          "type" : "Failover",
          "activeConsumerName" : "lxxlc",
          "msgRateExpired" : 0.0,
          "consumers" : [ {
            "msgRateOut" : 0.0,
            "msgThroughputOut" : 0.0,
            "msgRateRedeliver" : 0.0,
            "consumerName" : "lxxlc",
            "availablePermits" : 1014,
            "unackedMessages" : 0,
            "blockedConsumerOnUnackedMsgs" : false,
            "metadata" : { },
            "connectedSince" : "2019-12-26T20:56:09.832+08:00",
            "address" : "/10.32.68.213:57108"
          } ],
          "isReplicated" : false
        }
      },
      "replication" : { },
      "deduplicationStatus" : "Disabled"
    }
    {
      "msgRateIn" : 0.0,
      "msgThroughputIn" : 0.0,
      "msgRateOut" : 0.0,
      "msgThroughputOut" : 0.0,
      "averageMsgSize" : 0.0,
      "storageSize" : 6,
      "publishers" : [ {
        "msgRateIn" : 0.0,
        "msgThroughputIn" : 0.0,
        "averageMsgSize" : 0.0,
        "producerId" : 1,
        "metadata" : { },
        "producerName" : "qa-pulsar-ten-5-279",
        "connectedSince" : "2019-12-26T20:56:40.736+08:00",
        "address" : "/10.32.68.213:43156"
      } ],
      "subscriptions" : {
        "response:proxy:8787:9:1577364969737" : {
          "msgRateOut" : 0.0,
          "msgThroughputOut" : 0.0,
          "msgRateRedeliver" : 0.0,
          "msgBacklog" : 0,
          "blockedSubscriptionOnUnackedMsgs" : false,
          "msgDelayed" : 0,
          "unackedMessages" : 0,
          "type" : "Failover",
          "activeConsumerName" : "lxxlc",
          "msgRateExpired" : 0.0,
          "consumers" : [ {
            "msgRateOut" : 0.0,
            "msgThroughputOut" : 0.0,
            "msgRateRedeliver" : 0.0,
            "consumerName" : "lxxlc",
            "availablePermits" : 1011,
            "unackedMessages" : 0,
            "blockedConsumerOnUnackedMsgs" : false,
            "metadata" : { },
            "connectedSince" : "2019-12-26T20:56:09.832+08:00",
            "address" : "/10.32.68.213:57108"
          } ],
          "isReplicated" : false
        }
      },
      "replication" : { },
      "deduplicationStatus" : "Disabled"
    }
    

    System configuration

    Pulsar version: 2.4.0 type : Failover

  • [oauth2] Remove oauth2 go.mod and go.sum

    [oauth2] Remove oauth2 go.mod and go.sum

    Motivation

    When the oauth2 sub-directory was added in PR #313 there wasn't any justification given (AFAICT) for creating a separate sub-module instead of just treating the oauth2 sources as a normal directory. Since the oauth2 module does not have a separate release cycle, treating it as a sub-module just adds unnecessary complexity.

    Modifications

    Removed go.mod and go.sum from the oauth2 sub-directory. Updated the main go.mod and go.sum to included the necessary dependencies.

  • [WIP][consumer] Deadlock with in-flight acknowledgements

    [WIP][consumer] Deadlock with in-flight acknowledgements

    Motivation

    This is an integration test built with the intent of replicating the bug reported in this issue. I opened this PR to gather some feedback before working on a fix.

    A possible solution would be to move the connectionClosed events out of the eventsCh channel and have them into their own channel (e.g. connectionEventsCh): https://github.com/apache/pulsar-client-go/blob/master/pulsar/consumer_partition.go#L752

    This is because some of the events travelling through the eventsCh channel rely on an open connection to finish. So if the eventsCh channel gets full with, say, in-flight ackRequest events, the connectionClosed event cannot be processed until the ackRequest are finished and the ackRequest cannot finish because the connection is closed and the connectionClosed event (which should trigger a reconnection to the broker) is stuck waiting to be pushed to the eventsCh channel.

    Given that the connectionClosed event is needed to trigger a reconnection to the broker I think it could make sense to have it in a separate channel.

    If this approach sounds sensible to you I can start making modifications.

    Replicate the bug

    The test doesn't always fail, you'll have to try again. In my case it fails 1 out of 3 times (more or less). So if you run it just once and you see the test complete instead of failing, then please run it again.

    Once the test fails you should see that it timed out after five minutes and that it printed a lot of logs saying Trying to ack {messageID}.

    Please check the attached deadlock.zip which shows the logs of a failing test.

    Running the test

    • make sure you have docker running and usable from the user that runs the test
    • make sure you do docker pull apachepulsar/pulsar:2.6.1 first
    • then do: go test ./pulsar/ -test.run ^TestDeadlock$ -v -tags integration -count 1

    deadlock.zip

  • short read when reading frame size

    short read when reading frame size

    Hi. While using pulsar client, I got

    level=info message "Error reading from connection" error="Short read when reading frame size"
    

    Question is: what does this log mean? what can I do for it, or, How could I deal with it?

    Thx for your help. @sijie @wolfstudy

  • Catch up Java client function list

    Catch up Java client function list

    Non-Support Features:

    • [ ] Schema
    • [x] seek by time
    • [x] nack
    • [x] replication
    • [x] Interceptor
    • [x] DeadLetterPolicy
    • [x] receive with timeout
    • [x] multi topics
    • [x] topics pattern
    • [x] auto update partitions
    • [ ] read compact
    • [x] auth
  • define logger interface and add Logger field to ClientOptions

    define logger interface and add Logger field to ClientOptions

    Motivation

    enable users to configure the logger used by the client and use their own implementation. If no logger is provided, a wrapped logrus.StandardLogger() will be used. This PR only solved part of the problem mentioned in the issue https://github.com/apache/pulsar-client-go/issues/228.

    Modifications

    • define Logger and Entry interfaces used by the client
    • add Logger field to ClientOptions
    • add logger field to internal structures
    • provide a logger implementation backed by logrus
    • implement a no-op logger
  • [feature] Max Retry per msg feature added

    [feature] Max Retry per msg feature added

    (If this PR fixes a github issue, please add Fixes #<xyz>.)

    Fixes #890

    Motivation

    Add a feature to allow Max Retry per msg.

    Modifications

    Msg will have max reconsume times property. if not present use default property from consumer max deliveries

    Verifying this change

    • [ ] Make sure that the change passes the CI checks.

    This change added tests and can be verified as follows:

    • Added Integration tests for per msg retry login

    Does this pull request potentially affect one of the following parts:

    If yes was chosen, please highlight the changes

    • Dependencies (does it add or upgrade a dependency): no
    • The public API: (yes / no)
    • The schema: (yes / no / don't know)
    • The default values of configurations: no
    • The wire protocol: (yes / no)

    Documentation

    • Does this pull request introduce a new feature? yes
    • If yes, how is the feature documented? (not applicable / docs / GoDocs / not documented)
    • If a feature is not applicable for documentation, explain why?
    • If a feature is not documented yet in this PR, please create a followup issue for adding the documentation
  • [feature] Support batch index ACK

    [feature] Support batch index ACK

    Fixes https://github.com/apache/pulsar-client-go/issues/894

    Modifications

    Add an EnableBatchIndexAcknowledgment to specify whether batch index ACK is enabled. Since this feature requires the conversion between a bit set and its underlying long array, which is similar to Java's BitSet, this commit introduces github.com/bits-and-blooms/bitset dependency to replace the big.Int based implementation of the bit set.

    Add a BatchSize() method to MessageId to indicate the size of the ack_set field. When the batch index ACK happens, convert the []uint64 to the []int64 as the ack_set field in CommandAck. When receiving messages, convert the ack_set field in CommandMessage to filter the acknowledged single messages.

    Remove the duplicated code in AckID and AckIDWithResponse.

    Verifications

    TestBatchIndexAck is added to cover the case whether AckWithResponse is enabled and both individual and cumulative ACK.

  • [feature request] Support SNAPPY CompressionType

    [feature request] Support SNAPPY CompressionType

    Motivation

    As the title says. In go client, Snappy compression is not yet supported.

    https://github.com/apache/pulsar-client-go/blob/77c7ccbd144b00d17c320c5f67cadaedb53f6b1e/pulsar/producer.go#L38-L43

  • [feature request] Expose the chunk config of consumer to the reader

    [feature request] Expose the chunk config of consumer to the reader

    Motivation

    About the chunk config of the Consumer, we need to make it exposed in Reader.

    https://github.com/apache/pulsar-client-go/blob/23777362f3d132f98eff56c4f511f761227d8cf4/pulsar/consumer_partition.go#L112-L114

  • [feature request] Support Transaction API

    [feature request] Support Transaction API

    Motivation

    Now, the pulsar-client-go does not support the go-client, so I want to support transaction API for the go client.

    Solution

    Add Transaction API for go client.

Kafka, Beanstalkd, Pulsar Pub/Sub framework

go-queue Kafka, Beanstalkd, Pulsar Pub/Sub framework.

Sep 17, 2022
Cluster extensions for Sarama, the Go client library for Apache Kafka 0.9

Cluster extensions for Sarama, the Go client library for Apache Kafka 0.9 (and later).

Dec 28, 2022
Confluent's Apache Kafka Golang client

Confluent's Golang Client for Apache KafkaTM confluent-kafka-go is Confluent's Golang client for Apache Kafka and the Confluent Platform. Features: Hi

Dec 30, 2022
franz-go - A complete Apache Kafka client written in Go

franz-go contains a feature complete, pure Go library for interacting with Kafka from 0.8.0 through 2.8.0+. Producing, consuming, transacting, administrating, etc.

Dec 29, 2022
Sarama is a Go library for Apache Kafka 0.8, and up.

sarama Sarama is an MIT-licensed Go client library for Apache Kafka version 0.8 (and later). Getting started API documentation and examples are availa

Jan 1, 2023
Apache Kafka Web UI for exploring messages, consumers, configurations and more with a focus on a good UI & UX.
Apache Kafka Web UI for exploring messages, consumers, configurations and more with a focus on a good UI & UX.

Kowl - Apache Kafka Web UI Kowl (previously known as Kafka Owl) is a web application that helps you to explore messages in your Apache Kafka cluster a

Jan 3, 2023
Modern CLI for Apache Kafka, written in Go.
Modern CLI for Apache Kafka, written in Go.

Kaf Kafka CLI inspired by kubectl & docker Install Install from source: go get -u github.com/birdayz/kaf/cmd/kaf Install binary: curl https://raw.git

Dec 31, 2022
CLI Tool to Stress Apache Kafka Clusters

Kafka Stress - Stress Test Tool for Kafka Clusters, Producers and Consumers Tunning Installation Docker docker pull fidelissauro/kafka-stress:latest d

Nov 13, 2022
Study project that uses Apache Kafka as syncing mechanism between two databases, with producers and consumers written in Go.

Kafka DB Sync Study project that uses Apache Kafka as syncing mechanisms between a monolith DB and a microservice. The main purpose of this project is

Dec 5, 2021
A quick introduction to how Apache Kafka works and differs from other messaging systems using an example application.
A quick introduction to how Apache Kafka works and differs from other messaging systems using an example application.

Apache Kafka in 6 minutes A quick introduction to how Apache Kafka works and differs from other messaging systems using an example application. In thi

Oct 27, 2021
Testing Apache Kafka using Go.

Apache Kafka Go Testing Apache Kafka using Go. Instructions Provision the single node Kafka cluster using Docker: docker-compose -p apache-kafka-go up

Dec 17, 2021
golang client library to Viessmann Vitotrol web service

Package go-vitotrol provides access to the Viessmann™ Vitotrol™ cloud API for controlling/monitoring boilers. See https://www.viessmann.com/app_vitoda

Nov 16, 2022
Go client library SDK for Ably realtime messaging service

Ably Go A Go client library for www.ably.io, the realtime messaging service. Installation ~ $ go get -u github.com/ably/ably-go/ably Feature support T

Dec 2, 2022
It's client library written in Golang for interacting with Linkedin Cruise Control using its HTTP API.

go-cruise-control It's client library (written in Golang) for interacting with Linkedin Cruise Control using its HTTP API. Supported Cruise Control ve

Jan 10, 2022
Go client to reliable queues based on Redis Cluster Streams

Ami Go client to reliable queues based on Redis Cluster Streams. Consume/produce performance Performance is dependent from: Redis Cluster nodes count;

Dec 12, 2022
Golang client for NATS, the cloud native messaging system.

NATS - Go Client A Go client for the NATS messaging system. Installation # Go client go get github.com/nats-io/nats.go/ # Server go get github.com/na

Jan 5, 2023
RabbitMQ Reconnection client

rmqconn RabbitMQ Reconnection for Golang Wrapper over amqp.Connection and amqp.Dial. Allowing to do a reconnection when the connection is broken befor

Sep 27, 2022
An easy-to-use CLI client for RabbitMQ.

buneary, pronounced bun-ear-y, is an easy-to-use RabbitMQ command line client for managing exchanges, managing queues and publishing messages to exchanges.

Sep 3, 2022
An AMQP 0-9-1 Go client maintained by the RabbitMQ team. Originally by @streadway: `streadway/amqp`

Go RabbitMQ Client Library This is a Go AMQP 0.9.1 client maintained by the RabbitMQ core team. It was originally developed by Sean Treadway. Differen

Jan 1, 2023