Goka is a compact yet powerful distributed stream processing library for Apache Kafka written in Go.

Goka

License Unit Tests/System Tests GoDoc Go Report Card

Goka is a compact yet powerful distributed stream processing library for Apache Kafka written in Go. Goka aims to reduce the complexity of building highly scalable and highly available microservices.

Goka extends the concept of Kafka consumer groups by binding a state table to them and persisting them in Kafka. Goka provides sane defaults and a pluggable architecture.

Features

  • Message Input and Output

    Goka handles all the message input and output for you. You only have to provide one or more callback functions that handle messages from any of the Kafka topics you are interested in. You only ever have to deal with deserialized messages.

  • Scaling

    Goka automatically distributes the processing and state across multiple instances of a service. This enables effortless scaling when the load increases.

  • Fault Tolerance

    In case of a failure, Goka will redistribute the failed instance's workload and state across the remaining healthy instances. All state is safely stored in Kafka and messages delivered with at-least-once semantics.

  • Built-in Monitoring and Introspection

    Goka provides a web interface for monitoring performance and querying values in the state.

  • Modularity

    Goka fosters a pluggable architecture which enables you to replace for example the storage layer or the Kafka communication layer.

Documentation

This README provides a brief, high level overview of the ideas behind Goka. A more detailed introduction of the project can be found in this blog post.

Package API documentation is available at GoDoc and the Wiki provides several tips for configuring, extending, and deploying Goka applications.

Installation

You can install Goka by running the following command:

$ go get -u github.com/lovoo/goka

Configuration

Goka relies on Sarama to perform the actual communication with Kafka, which offers many configuration settings. The config is documented here.

In most cases, you need to modify the config, e.g. to set the Kafka Version.

cfg := goka.DefaultConfig()
cfg.Version = sarama.V2_4_0_0
goka.ReplaceGlobalConfig(cfg)

This makes all goka components use the updated config.

If you do need specific configuration for different components, you need to pass customized builders to the component's constructor, e.g.

cfg := goka.DefaultConfig()
// modify the config with component-specific settings


// use the config by creating a builder which allows to override global config
goka.NewProcessor(// ...,
	goka.WithConsumerGroupBuilder(
		goka.ConsumerGroupBuilderWithConfig(cfg),
	),
	// ...
)

Concepts

Goka relies on Kafka for message passing, fault-tolerant state storage and workload partitioning.

  • Emitters deliver key-value messages into Kafka. As an example, an emitter could be a database handler emitting the state changes into Kafka for other interested applications to consume.

  • Processor is a set of callback functions that consume and perform state transformations upon delivery of these emitted messages. Processor groups are formed of one or more instances of a processor. Goka distributes the partitions of the input topics across all processor instances in a processor group. This enables effortless scaling and fault-tolerance. If a processor instance fails, its partitions and state are reassigned to the remaining healthy members of the processor group. Processors can also emit further messages into Kafka.

  • Group table is the state of a processor group. It is a partitioned key-value table stored in Kafka that belongs to a single processor group. If a processor instance fails, the remaining instances will take over the group table partitions of the failed instance recovering them from Kafka.

  • Views are local caches of a complete group table. Views provide read-only access to the group tables and can be used to provide external services for example through a gRPC interface.

  • Local storage keeps a local copy of the group table partitions to speedup recovery and reduce memory utilization. By default, the local storage uses LevelDB, but in-memory map and Redis-based storage are also available.

Get Started

An example Goka application could look like the following. An emitter emits a single message with key "some-key" and value "some-value" into the "example-stream" topic. A processor processes the "example-stream" topic counting the number of messages delivered for "some-key". The counter is persisted in the "example-group-table" topic. To locally start a dockerized Zookeeper and Kafka instances, execute make start with the Makefile in the examples folder.

package main

import (
	"context"
	"log"
	"os"
	"os/signal"
	"syscall"
	"time"

	"github.com/lovoo/goka"
	"github.com/lovoo/goka/codec"
)

var (
	brokers             = []string{"localhost:9092"}
	topic   goka.Stream = "example-stream"
	group   goka.Group  = "example-group"
)

// Emit messages forever every second
func runEmitter() {
	emitter, err := goka.NewEmitter(brokers, topic, new(codec.String))
	if err != nil {
		log.Fatalf("error creating emitter: %v", err)
	}
	defer emitter.Finish()
	for {
		time.Sleep(1 * time.Second)
		err = emitter.EmitSync("some-key", "some-value")
		if err != nil {
			log.Fatalf("error emitting message: %v", err)
		}
	}
}

// process messages until ctrl-c is pressed
func runProcessor() {
	// process callback is invoked for each message delivered from
	// "example-stream" topic.
	cb := func(ctx goka.Context, msg interface{}) {
		var counter int64
		// ctx.Value() gets from the group table the value that is stored for
		// the message's key.
		if val := ctx.Value(); val != nil {
			counter = val.(int64)
		}
		counter++
		// SetValue stores the incremented counter in the group table for in
		// the message's key.
		ctx.SetValue(counter)
		log.Printf("key = %s, counter = %v, msg = %v", ctx.Key(), counter, msg)
	}

	// Define a new processor group. The group defines all inputs, outputs, and
	// serialization formats. The group-table topic is "example-group-table".
	g := goka.DefineGroup(group,
		goka.Input(topic, new(codec.String), cb),
		goka.Persist(new(codec.Int64)),
	)

	p, err := goka.NewProcessor(brokers, g)
	if err != nil {
		log.Fatalf("error creating processor: %v", err)
	}
	ctx, cancel := context.WithCancel(context.Background())
	done := make(chan bool)
	go func() {
		defer close(done)
		if err = p.Run(ctx); err != nil {
			log.Fatalf("error running processor: %v", err)
		} else {
			log.Printf("Processor shutdown cleanly")
		}
	}()

	wait := make(chan os.Signal, 1)
	signal.Notify(wait, syscall.SIGINT, syscall.SIGTERM)
	<-wait   // wait for SIGINT/SIGTERM
	cancel() // gracefully stop processor
	<-done
}

func main() {
	go runEmitter() // emits one message and stops
	runProcessor()  // press ctrl-c to stop
}

A very similar example is also in 1-simplest. Just run go run examples/1-simplest/main.go.

Note that tables have to be configured in Kafka with log compaction. For details check the Wiki.

How to contribute

Contributions are always welcome. Please fork the repo, create a pull request against master, and be sure tests pass. See the GitHub Flow for details.

Owner
LOVOO
The org page for LOVOO, Europe's leading dating app.
LOVOO
Comments
  • Processor failures: expected rebalance OK but received rebalance error

    Processor failures: expected rebalance OK but received rebalance error

    I'm running into an issue where upon starting up some goka processors, I get an error that looks like this:

    kafka error: expected rebalance OK but received rebalance error

    It appears to be coming from here: https://github.com/lovoo/goka/blob/b49460590708a8a0fe3f64840926095a85e7dc8f/kafka/group_consumer.go#L102

    I've noticed it never happens when I run it locally, happens sometimes when I run it in our sandbox environment, and happens consistently in our production environment. I'm thinking this may be because these environments differ in number of partitions per topic (i believe i'm using 2 locally, 8 in sandbox, and 32 in production).

    Does anyone know what could be causing this or have some ideas of what I could check to investigate further?

  • ctx.Value() and View#Get(ctx.Key()) constantly do not match

    ctx.Value() and View#Get(ctx.Key()) constantly do not match

    From my understanding, when you run ctx.SetValue(x) then x is set in the table then emitted to Kafka.

    That means, that when a subsequent messages comes in to a goka.Input ProcessCallback, then the ctx.Value() should be already set to the latest value emitted. Which means also if I initialize a view as such

    view, err := goka.NewView([]string{localhost:9092}, goka.GroupTable("some-group"), new(SomeCodec)
    

    and, in the ProcessCallback I call

    view.Get(ctx.Key)
    

    I should be equal to the return of ctx.Value()

    I have a case where that is not the case. I have a group with 5 inputs and one persist. It is having all kinds of stale reads. For example.

    • Message 1 from topic 1 comes in gets persisted
    • Message 2 from topic 2 comes in, ctx.Value() returns nil so fallback to view.Get(ctx.Key) which returns a value from the table that I can work with. It gets persisted.
    • Message 3 comes in, ctx.Value() returns the result of the persistence of message 1 but not message 2. view.Get(ctx.Key) returns the result of persistence of message 1 & message 2.

    Any ideas why is this inconsistency present? Is it a bug? Or am I missing something here?

  • View: expose view and connection state

    View: expose view and connection state

    @mattburman in #239

    Additionally, it would be nice to expose the view state: possibly ViewStateIdle, ViewStateCatchUp, or ViewStateRunning? Ideally I want to be able to differentiate between being behind and a connection cant currently be established.

    I have also found the view.Recovered method returns true whilst it is struggling to connect to kafka. Is this intended? I need some way of determining whether the app is connected to kafka for metrics. It looks like partitionStats.Status also returns 4 (PartitionRunning) when no connection to kafka can be established. Stalled returns false too

  • #318 avoid losing an element in leveldb seeker

    #318 avoid losing an element in leveldb seeker

    This fix covers iterators for partition tables. Views use the merge iterator which was not affected by the bug due to calling Next upon initialization and caching the pair before calling Next again (which was perfect for Seek).

    That means the merge iterator would have paniced with other implementations such as in-memory which is used the tester package. I have added a test case to demonstrate the panic and the fix.

  • Emit messages with headers

    Emit messages with headers

    Since Kafka v0.11, messages can contain headers, it would be nice to be able to specify them when emitting messages and receive them in our processors. I'm interested in submitting a PR if that's something you'd like to consider adding to goka.

  • Lost lots of messages due to async produce

    Lost lots of messages due to async produce

    I recently write a test script which produce 20 million messages to kafka with goka, but there is only about 14 million messages after produce complete. The disk space is enough and no error messages in kafak logs and client logs. The script and kafka is running in docker, and under a subnet.

    test scripts:

    func GokaAsyncProduce() {
    	emitter, err := goka.NewEmitter(
    		viper.GetStringSlice("kafkaConfig.brokerUrls"),
    		goka.Stream(viper.GetString("kafkaConfig.topic")),
    		new(codec.String),
    	)
    	if err != nil {
    		log.Fatalf("error creating emitter: %v", err)
    	}
    
    	startTime = time.Now().UnixNano()
    
    	preTime := time.Now().UnixNano()
    	preN := 0
    
    	for n := 0; n < count; n++ {
    		bs := getPkg()
    		_, err = emitter.Emit("", string(bs))
    		if err != nil {
    			log.Fatalf("error emitting message: %v", err)
    		}
    
    		currTime := time.Now().UnixNano()
    		if float64(currTime-preTime) > float64(collectInterval)*math.Pow10(9) {
    			currN := n - preN
    			currSpeed := currN / collectInterval
    			fmt.Printf("produce speed: %v pps", currSpeed)
    
    			preTime = currTime
    			preN = n
    
    			PrintMemUsage()
    			PrintCPUUsage()
    		}
    	}
    	emitter.Finish()
    
    	endTime = time.Now().UnixNano()
    }
    

    Count messages using docker-compose exec kafka kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list kafka:9092 --topic test --time -1 --offsets 1

  • Using a tester with an emitter doesn't seem to be working

    Using a tester with an emitter doesn't seem to be working

    Am I doing something wrong? The following test fails...

    package emitme
    
    import (
    	"bytes"
    	"io/ioutil"
    	"log"
    	"net/http"
    	"net/http/httptest"
    	"testing"
    
    	"github.com/lovoo/goka"
    	"github.com/lovoo/goka/codec"
    	"github.com/lovoo/goka/tester"
    	"github.com/stretchr/testify/assert"
    )
    
    func handle(emitter *goka.Emitter) func(http.ResponseWriter, *http.Request) {
    	return func(w http.ResponseWriter, r *http.Request) {
    		body, err := ioutil.ReadAll(r.Body)
    		if err != nil {
    			w.WriteHeader(http.StatusBadRequest)
    			return
    		}
    
                    log.Printf("emitting %s\n", string(body))
    
    		if _, err := emitter.Emit("", body); err != nil {
    			log.Printf("error emitting: %v", err)
    			w.WriteHeader(http.StatusInternalServerError)
    			return
    		}
    
    		w.WriteHeader(http.StatusNoContent)
    	}
    }
    
    var job = []byte("hello, world!")
    
    func TestEmitter(t *testing.T) {
    	var gkt = tester.New(t)
    
    	em, _ := goka.NewEmitter(nil, goka.Stream("test"), new(codec.Bytes), goka.WithEmitterTester(gkt))
    	est := gkt.NewQueueTracker("test")
    
    	r, _ := http.NewRequest("POST", "http://test.me/job", bytes.NewReader(job))
    	w := httptest.NewRecorder()
    
    	handle(em)(w, r)
    
    	em.Finish()
    
    	_, _, ok := est.Next()
    	assert.True(t, ok)
    }
    
    

    Using the tester with a Goka Processor works with similar test code (ie. the return value of est.Next() is what I expect it to be). Does using the tester to provide topics for the Emitter to write to not work like it does with Processors, or am I just doing something wrong?

  • Proposal: replace Start() and Stop() with Run(context.Context)

    Proposal: replace Start() and Stop() with Run(context.Context)

    Currently the Start() method of processors and views use errgroup to create goroutines for each partition and passes a context.Context to them. The Stop() method simply cancels the context to stop all goroutines.

    I think it would be nice to use the same mechanism to control multiple processors and views running on the same program. For that I'd propose to replace the Start() and Stop() methods with a Run(context.Context) method. The usage would be something like this:

    // create the context and cancel function
    ctx, cancel := context.WithCancel(context.Background())
    
    // create error group
    g, ctx := errgroup.WithContext(ctx)
    
    // create processors and views
    p, _ := goka.NewProcessor(brokers, graph)
    ...
    
    // start processor and views passing the context   
    g.Go(func() error { return p.Run(ctx) })
    
    // catch signals
    go func() {
      wait := make(chan os.Signal, 1)
      signal.Notify(wait, syscall.SIGINT, syscall.SIGTERM)
      <-wait   // wait for SIGINT/SIGTERM
      cancel() // gracefully stop processors and views
    }()
    
    if err := g.Wait(); err != nil {
      log.Fatalln(err)
    }
    

    Perhaps we could still support Start() and Stop() via some wrapper... something like this:

            p, err := goka.StartStopper(goka.NewProcessor(brokers, graph))
    	if err != nil {
    		log.Fatalf("error creating processor: %v", err)
    	}
    	go func() {
    		if err = p.Start(); err != nil {
    			log.Fatalf("error running processor: %v", err)
    		}
    	}()
    
    	wait := make(chan os.Signal, 1)
    	signal.Notify(wait, syscall.SIGINT, syscall.SIGTERM)
    	<-wait   // wait for SIGINT/SIGTERM
    	p.Stop() // gracefully stop processor
    

    Any opinions?

  • Propose: expose processor state

    Propose: expose processor state

    It will be nice to add a method to expose the processor.state field in order to have more fine grained information about processor state. Examples could be:

    func (g *Processor) GetState() *Signal { return g.state }

    func (g *Processor) IsInState(state State) bool { return g.state.IsState(state) }

  • Examples `simplest` and `clicks` panic with

    Examples `simplest` and `clicks` panic with "non-positive interval for NewTicker"

    Simplest and clicks are the only examples I tried so I can't speak for the others.

    I copied the Makefile from ./examples and ran make start. I copied the main.go from the examples and ran go run .

    Output for "simplest" is

    message emitted
    2020/07/06 14:36:32 Processor [example-group]: starting
    2020/07/06 14:36:32 Processor [example-group]: creating consumer
    2020/07/06 14:36:32 Processor [example-group]: creating producer
    2020/07/06 14:36:32 Processor: rebalancing: map[]
    2020/07/06 14:36:32 Processor: dispatcher started
    panic: non-positive interval for NewTicker
    
    goroutine 118 [running]:
    time.NewTicker(0x0, 0x0)
            /home/linuxbrew/.linuxbrew/Cellar/go/1.14.4/libexec/src/time/tick.go:23 +0x147
    github.com/bsm/sarama-cluster.(*Consumer).cmLoop(0xc0002e0000, 0xc000328ba0)
            /home/pofl/Documents/go/pkg/mod/github.com/bsm/[email protected]+incompatible/consumer.go:452 +0x5a
    github.com/bsm/sarama-cluster.(*loopTomb).Go.func1(0xc00030ace0, 0xc0003108f0)
            /home/pofl/Documents/go/pkg/mod/github.com/bsm/[email protected]+incompatible/util.go:73 +0x7b
    created by github.com/bsm/sarama-cluster.(*loopTomb).Go
            /home/pofl/Documents/go/pkg/mod/github.com/bsm/[email protected]+incompatible/util.go:69 +0x66
    exit status 2
    

    Output for "clicks" is

    2020/07/06 14:25:13 Table mini-group-table has 10 partitions
    2020/07/06 14:25:13 Processor [mini-group]: starting
    2020/07/06 14:25:13 Processor [mini-group]: creating consumer
    2020/07/06 14:25:13 Processor [mini-group]: creating producer
    2020/07/06 14:25:13 Processor: rebalancing: map[]
    2020/07/06 14:25:13 Processor: dispatcher started
    View opened at http://localhost:9095/
    2020/07/06 14:25:13 view [mini-group-table]: starting
    2020/07/06 14:25:13 view [mini-group-table]: partition 9 started
    2020/07/06 14:25:13 view [mini-group-table]: partition 3 started
    2020/07/06 14:25:13 view [mini-group-table]: partition 5 started
    2020/07/06 14:25:13 view [mini-group-table]: partition 6 started
    2020/07/06 14:25:13 view [mini-group-table]: partition 7 started
    2020/07/06 14:25:13 view [mini-group-table]: partition 4 started
    2020/07/06 14:25:13 view [mini-group-table]: partition 8 started
    2020/07/06 14:25:13 view [mini-group-table]: partition 1 started
    2020/07/06 14:25:13 view [mini-group-table]: partition 0 started
    2020/07/06 14:25:13 view [mini-group-table]: partition 2 started
    2020/07/06 14:25:13 Processor: dispatcher stopped
    panic: non-positive interval for NewTicker
    
    goroutine 243 [running]:
    time.NewTicker(0x0, 0x0)
            /home/linuxbrew/.linuxbrew/Cellar/go/1.14.4/libexec/src/time/tick.go:23 +0x147
    github.com/bsm/sarama-cluster.(*Consumer).cmLoop(0xc0001da000, 0xc000182ba0)
            /home/pofl/Documents/go/pkg/mod/github.com/bsm/[email protected]+incompatible/consumer.go:452 +0x5a
    github.com/bsm/sarama-cluster.(*loopTomb).Go.func1(0xc0000ac9e0, 0xc005596250)
            /home/pofl/Documents/go/pkg/mod/github.com/bsm/[email protected]+incompatible/util.go:73 +0x7b
    created by github.com/bsm/sarama-cluster.(*loopTomb).Go
            /home/pofl/Documents/go/pkg/mod/github.com/bsm/[email protected]+incompatible/util.go:69 +0x66
    exit status 2
    

    The problem seems to be with the sarama-cluster config. Panic occurs here https://github.com/bsm/sarama-cluster/blob/master/consumer.go#L452

    sarama-cluster is deprecated btw.

  • Samara-cluster dependency seems to have an issue

    Samara-cluster dependency seems to have an issue

    I'm getting a samara-cluster issue when I try to get and when I try to run the sample code. Both go and dep are freshly installed, OSX 10.13.6.

    ➜  gokatest go get -u github.com/lovoo/goka       
    # github.com/bsm/sarama-cluster
    ../github.com/bsm/sarama-cluster/consumer.go:452:59: c.client.config.Config.Consumer.Offsets.CommitInterval undefined (type struct { AutoCommit struct { Enable bool; Interval time.Duration }; Initial int64; Retention time.Duration; Retry struct { Max int } } has no field or method CommitInterval)
    
  • Pause and Resume Functionality in Processor

    Pause and Resume Functionality in Processor

    Would it be possible to add functionality to pause and resume the Processor?

    The sarama.Consumer already has these methods: Pause, PauseAll, Resume, ResumeAll.

    Thanks

  • Question: Goka compatibility with Kafka 3.3.1 (KRaft)

    Question: Goka compatibility with Kafka 3.3.1 (KRaft)

    Setups

    Goka: 1.1.7, Sarama v1.37.2

    Kafka: 3.3.1 (in KRaft mode)

    Cluster config: num.partitions=2, default.replication.factor=3, number of brokers=3

    Problem description

    The TopicManager can successfully create topics on the newly created KRaft Kafka cluster. However, the following error occurs:

    "Error: 1 error occurred:
      * error setting up (partition=0): Setup failed. Cannot start processor for partition 0: 1 error occurred:
      * kafka tells us there's no message in the topic, but our cache has one. The table might be gone. Try to delete your local cache! Topic p3-p3-dev-6-procs-group-table, partition 0, hwm 0, local offset 6350632"
    

    Tried to remove the cache file stored under storagePath ,

    goka.WithStorageBuilder(storage.CachedStorageBuilder(10000, storagePath, nil))

    which contains:

    p3-p3-dev-6-procs-group-table.0
       000022.log  CURRENT  CURRENT.bak  LOCK  LOG  MANIFEST-000023
    p3-p3-dev-6-procs-group-table.1
       000030.log  CURRENT  CURRENT.bak  LOCK  LOG  MANIFEST-000031
    

    and restarted the service , but it did not resolve the issue. The local offset always seem to have an unreasonable number.

    Topic detail

    Topic: p3-p3-dev-6-procs-group-table    TopicId: sS43h6seRae2Bk3umNsBBA PartitionCount: 2       ReplicationFactor: 3    Configs: segment.bytes=1073741824
            Topic: p3-p3-dev-6-procs-group-table    Partition: 0    Leader: 2       Replicas: 2,3,1 Isr: 2,3,1
            Topic: p3-p3-dev-6-procs-group-table    Partition: 1    Leader: 3       Replicas: 3,1,2 Isr: 3,1,2
    
    $ ./bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list localhost:9092 --topic p3-p3-dev-6-procs-group-table
    p3-p3-dev-6-procs-group-table:0:0
    p3-p3-dev-6-procs-group-table:1:0
    

    Questions

    1. Any further advice on determining whether the issue falls on our service side or goka incompatibility?

    2. If goka is not compatible with KRaft cluster (no Zookeeper) yet, is there a foreseeable timeline on its support?

    P.S.

    Will the blog post be back again? The Link is currently deleted: https://lovoodotblog.wordpress.com/2017/05/23/goka/

  • check partition mismatch

    check partition mismatch

    if the group table already exists, it is not checked for copartitioning with the input topics - which is dangerous. This needs to be checked when starting the processor

  • clean tests

    clean tests

    the processor integration test is unclean and has many commented test cases. Implement them or remove them: https://github.com/lovoo/goka/blob/master/integrationtest/processor_test.go#L227

Study project that uses Apache Kafka as syncing mechanism between two databases, with producers and consumers written in Go.

Kafka DB Sync Study project that uses Apache Kafka as syncing mechanisms between a monolith DB and a microservice. The main purpose of this project is

Dec 5, 2021
Sarama is a Go library for Apache Kafka 0.8, and up.

sarama Sarama is an MIT-licensed Go client library for Apache Kafka version 0.8 (and later). Getting started API documentation and examples are availa

Jan 1, 2023
Cluster extensions for Sarama, the Go client library for Apache Kafka 0.9

Cluster extensions for Sarama, the Go client library for Apache Kafka 0.9 (and later).

Dec 28, 2022
Confluent's Apache Kafka Golang client

Confluent's Golang Client for Apache KafkaTM confluent-kafka-go is Confluent's Golang client for Apache Kafka and the Confluent Platform. Features: Hi

Dec 30, 2022
Apache Kafka Web UI for exploring messages, consumers, configurations and more with a focus on a good UI & UX.
Apache Kafka Web UI for exploring messages, consumers, configurations and more with a focus on a good UI & UX.

Kowl - Apache Kafka Web UI Kowl (previously known as Kafka Owl) is a web application that helps you to explore messages in your Apache Kafka cluster a

Jan 3, 2023
CLI Tool to Stress Apache Kafka Clusters

Kafka Stress - Stress Test Tool for Kafka Clusters, Producers and Consumers Tunning Installation Docker docker pull fidelissauro/kafka-stress:latest d

Nov 13, 2022
A quick introduction to how Apache Kafka works and differs from other messaging systems using an example application.
A quick introduction to how Apache Kafka works and differs from other messaging systems using an example application.

Apache Kafka in 6 minutes A quick introduction to how Apache Kafka works and differs from other messaging systems using an example application. In thi

Oct 27, 2021
Testing Apache Kafka using Go.

Apache Kafka Go Testing Apache Kafka using Go. Instructions Provision the single node Kafka cluster using Docker: docker-compose -p apache-kafka-go up

Dec 17, 2021
provider-kafka is a Crossplane Provider that is used to manage Kafka resources.

provider-kafka provider-kafka is a Crossplane Provider that is used to manage Kafka resources. Usage Create a provider secret containing a json like t

Oct 29, 2022
A CLI tool for interacting with Kafka through the Confluent Kafka Rest Proxy

kafkactl Table of contents kafkactl Table of contents Overview Build Development Overview kafkactl is a CLI tool to interact with Kafka through the Co

Nov 1, 2021
Build platforms that flexibly mix SQL, batch, and stream processing paradigms

Overview Gazette makes it easy to build platforms that flexibly mix SQL, batch, and millisecond-latency streaming processing paradigms. It enables tea

Dec 12, 2022
Easy to use distributed event bus similar to Kafka
Easy to use distributed event bus similar to Kafka

chukcha Easy to use distributed event bus similar to Kafka. The event bus is designed to be used as a persistent intermediate storage buffer for any k

Dec 30, 2022
Apache Pulsar Go Client Library

Apache Pulsar Go Client Library A Go client library for the Apache Pulsar project. Goal This projects is developing a pure-Go client library for Pulsa

Jan 4, 2023
franz-go contains a high performance, pure Go library for interacting with Kafka from 0.8.0 through 2.7.0+. Producing, consuming, transacting, administrating, etc.

franz-go - Apache Kafka client written in Go Franz-go is an all-encompassing Apache Kafka client fully written Go. This library aims to provide every

Dec 29, 2022
kafka watcher for casbin library

Casbin Kafka Watcher Casbin watcher for kafka This watcher library will enable users to dynamically change casbin policies through kakfa messages Infl

May 8, 2021
Producer x Consumer example using Kafka library and Go.

Go - Kafka - Example Apache Kafka Commands First of all, run the docker-compose docker-compose up, than run docker exec -it kafka_kafka_1 bash Topics

Dec 8, 2021
Using golang to produce data to kinesis data stream

Using golang to produce data to kinesis data stream What is this The idea behind this repo was to quickly determine how easy it would be to add a serv

Dec 22, 2022
Implementation of the NELI leader election protocol for Go and Kafka
Implementation of the NELI leader election protocol for Go and Kafka

goNELI Implementation of the NELI leader election protocol for Go and Kafka. goNELI encapsulates the 'fast' variation of the protocol, running in excl

Dec 8, 2022
ChanBroker, a Broker for goroutine, is simliar to kafka

Introduction chanbroker, a Broker for goroutine, is simliar to kafka In chanbroker has three types of goroutine: Producer Consumer(Subscriber) Broker

Aug 12, 2021