provider-kafka is a Crossplane Provider that is used to manage Kafka resources.

provider-kafka

provider-kafka is a Crossplane Provider that is used to manage Kafka resources.

Usage

  1. Create a provider secret containing a json like the following, see expected schema here:

    " } } ">
    {
      "brokers":[
        "kafka-dev-0.kafka-dev-headless:9092"
       ],
       "sasl":{
         "mechanism":"PLAIN",
         "username":"user",
         "password":"
         
          "
       }
    }
    
         
  2. Create a k8s secret containing above config:

    kubectl -n crossplane-system create secret generic kafka-creds --from-file=credentials=kc.json
    
  3. Create a ProviderConfig, see this as an example.

  4. Create a managed resource see, see this for an example creating a Kafka topic.

Development

Setting up a Development Kafka Cluster

The following instructions will setup a development environment where you will have a locally running Kafka installation (SASL-Plain enabled). To change the configuration of your instance further, please see available helm parameters here.

  1. (Optional) Create a local kind cluster unless you want to develop against an existing k8s cluster.

  2. Install the Kafka helm chart:

    helm repo add bitnami https://charts.bitnami.com/bitnami
    kubectl create ns kafka-cluster
    helm upgrade --install kafka-dev -n kafka-cluster bitnami/kafka --set auth.clientProtocol=sasl --set deleteTopicEnable=true --wait
    

    Username is "user", obtain password using the following

    kubectl -n kafka-cluster exec kafka-dev-0 -- cat /opt/bitnami/kafka/config/kafka_jaas.conf
    
  3. Install kubefwd.

  4. Run kubefwd for kafka-cluster namespace which will make internal k8s services locally accessible:

    sudo kubefwd svc -n kafka-cluster
    
  5. (optional) Install kafka cli.

  6. (optional) Configure the kafka cli to talk against local Kafka installation:

    1. Create a config file for the client with the following content at ~/.kaf/config:

      TLS: null security-protocol: "" schema-registry-url: "" ">
      current-cluster: local
      clusteroverride: ""
      clusters:
      - name: local
        version: ""
        brokers:
        - kafka-dev-0.kafka-dev-headless:9092
        SASL:
          mechanism: PLAIN
          username: user
          password: 
             
              
        TLS: null
        security-protocol: ""
        schema-registry-url: ""
      
             
      1. Verify that cli could talk to the Kafka cluster:
      kaf nodes
      

Building and Running the provider locally

Run against a Kubernetes cluster:

make run

Build, push, and install:

make all

Build image:

make image

Push image:

make push

Build binary:

make build
Comments
  • add support for SASL mechanism AWS_MSK_IAM. fixes #34

    add support for SASL mechanism AWS_MSK_IAM. fixes #34

    PR not completely done, more opening this for discussion.

    Description of your changes

    Enables users to authenticate towards AWS MSK Kafka cluster with injected AWS credentials.

    Fixes #34

    I have:

    • [x] Read and followed Crossplane's contribution process.
    • [x] Run make reviewable test to ensure this PR is ready for review.

    How has this code been tested

    Currently only tested locally by "simulating" an EKS cluster environment, i.e by "injecting" the token and set the corresponding env vars that would otherwise be "automagically" injected.

    Plan is to do a "real" test on a EKS cluster before merge.

  • feat: adding TLS and SCRAM-SHA-512 support for AWS MSK

    feat: adding TLS and SCRAM-SHA-512 support for AWS MSK

    Description of your changes

    This PR adds support for the provider to interface with an AWS Managed Kafka (MSK) cluster, featuring:

    • encrypted transport using TLS
    • SASL authentication method SCRAM-SHA-512

    Quoting https://docs.aws.amazon.com/msk/latest/developerguide/msk-password.html#msk-password-limitations:

    Amazon MSK only supports SCRAM-SHA-512 authentication.

    The following ProviderConfig Secret enables the features:

        {
          "brokers": [
            "b-1.aaa.bbb.c6.kafka.eu-central-1.amazonaws.com:9096",
            "b-2.aaa.bbb.c6.kafka.eu-central-1.amazonaws.com:9096",
            "b-3.aaa.bbb.c6.kafka.eu-central-1.amazonaws.com:9096"
          ],
          "tls": { },
          "sasl": {
            "mechanism": "scram-sha-512",
            "username": "$USERNAME",
            "password": "$PASSWORD"
          }
        }
    

    Note I have not added any additional parameters to the TLS configuration block, since MSK is signed by AWS public CA which is trusted by CA bundles.

    (Off topic: for mTLS support this block can be extended, but then the provider needs to be able to read k8s Secrets for the key pair as I believe we should not include the keypair in this config Secret, but use refs, to support cert-manager created keypair secrets.)

    I have:

    • [X] Read and followed Crossplane's contribution process.
    • [X] Run make reviewable test to ensure this PR is ready for review. <--- client_test.go does not yet exist, so could not add tests.

    How has this code been tested

    I've used an MSK cluster with TLS and SCRAM auth enabled. The provider works.

  • Provider unhealthy (UnhealthyPackageRevision)

    Provider unhealthy (UnhealthyPackageRevision)

    What happened?

    crossplane-provider-kafka-controller gets Unhealthy status right after Helm install (clean install)

    How can we reproduce it?

    Install Helm Chart crossplane-stable/crossplane (1.9.1) with provider-kafka in values:

    provider:
      packages:
        - crossplane/provider-kafka-controller:stable
    

    Provider is assigned installed status, but unhealthy. No pods except crossplane and crossplane-rbac-manager are running.

    From ProviderRevision:

    status:
      conditions:
        - lastTransitionTime: '2022-09-26T13:42:33Z'
          reason: UnhealthyPackageRevision
          status: 'False'
          type: Healthy
    

    k get Provider -o yaml:

    apiVersion: v1
    items:
    - apiVersion: pkg.crossplane.io/v1
      kind: Provider
      metadata:
        creationTimestamp: "2022-09-26T13:42:09Z"
        generation: 1
        name: crossplane-provider-kafka-controller
        resourceVersion: "1099151477"
        uid: 1dcb7352-2ac7-453a-8dfa-83530b03650e
      spec:
        ignoreCrossplaneConstraints: false
        package: crossplane/provider-kafka-controller:stable
        packagePullPolicy: IfNotPresent
        revisionActivationPolicy: Automatic
        revisionHistoryLimit: 1
        skipDependencyResolution: false
      status:
        conditions:
        - lastTransitionTime: "2022-09-26T13:42:33Z"
          reason: UnhealthyPackageRevision
          status: "False"
          type: Healthy
        - lastTransitionTime: "2022-09-26T13:42:27Z"
          reason: ActivePackageRevision
          status: "True"
          type: Installed
        currentIdentifier: crossplane/provider-kafka-controller:stable
        currentRevision: crossplane-provider-kafka-controller-711b31489f8d
    kind: List
    metadata:
      resourceVersion: ""
      selfLink: ""
    

    What environment did it happen in?

    Crossplane version: 1.9.1 K8s version: 1.21.5 (on-premise)

  • Support to Add, Remove and List ACLs (Fixes #6)

    Support to Add, Remove and List ACLs (Fixes #6)

    Description of your changes

    This Pull Request adds feature complete ACL support to Add, Remove and List ACLs. Fixes #6

    I have:

    • [ x ] Read and followed Crossplane's contribution process.
    • [ x ] Run make reviewable test to ensure this PR is ready for review.

    How has this code been tested

    Automated tests are forthcoming

    1. Created an ACL by applying examples/acl/acl.yaml
    2. Verified by running kcl admin acl describe --type any --pattern match --op any --perm any
    3. Updated ACL by changing a Principle in examples/acl/acl.yml
    4. Applied the ACL and noticed the Update threw an error
    5. Deleted ACL by deleting examples/acl/acl.yaml
    6. Verified by same process as bullet 2
  • Refactoring Controller to Utilize a Topic Client (Resolves #5)

    Refactoring Controller to Utilize a Topic Client (Resolves #5)

    Description of your changes

    This PR is a result of input on changes made in PR #11. PR #12 was opened with initial recommendations, which is incorporated here.

    In this PR, controller logic in PR #11 is refactored to make use of a Kafka Topic Client. This is tied to the GitHub issue #5.

    I have:

    • [ x ] Read and followed Crossplane's contribution process.
    • [ x ] Run make reviewable test to ensure this PR is ready for review.

    How has this code been tested

    Testing was performed manually as of 11/23.

  • Support SASL mechanism AWS_MSK_IAM for use with MSK

    Support SASL mechanism AWS_MSK_IAM for use with MSK

    What problem are you facing?

    Would like to be able to use AWS IAM when authenticating against an MSK cluster.

    How could Crossplane help solve your problem?

    Users would be able to attach a role to the provider and therefor follow the same pattern that many other tools use when running in EKS.

    I would be happy to make this contribution myself but can't find a contribution doc, so just let me know what approach you like.

  • Client helpers for Kafka Topic

    Client helpers for Kafka Topic

    This PR is to demonstrate how to use late initialization in managed reconciler and defines function signatures of common client helpers in Crossplane providers.

    As a response to https://github.com/crossplane-contrib/provider-kafka/pull/11#issuecomment-969107228

  • mTLS support

    mTLS support

    What problem are you facing?

    I'd like to use this provider to manage resources in an MSK cluster with mTLS auth enabled. Currently the provider does not support this, nor TLS. Before I start thinking about submitting this feature, I'd like to reach out to hear of ideas about implementation/configuration etc.

    The used Kafka library https://github.com/twmb/franz-go/ supports a pluggable dialer, which is used by the Kafka CLI kcl https://github.com/twmb/kcl/ to provide mTLS. Based on this I expect adding mTLS support to this provider should be possible and straightforward.

    I'd like to hear your ideas about how to add the needed configuration in https://github.com/crossplane-contrib/provider-kafka/blob/main/internal/clients/kafka/config.go

    How could Crossplane help solve your problem?

    Implement the feature, or provide the information needed for submitting a successful PR. Thanks!

  • release first version

    release first version

    What problem are you facing?

    there is no release of provider-kafka yet. No API documentation shows up at https://doc.crds.dev/github.com/crossplane-contrib/provider-kafka

    How could Crossplane help solve your problem?

    Release a first version, based on https://crossplane.io/docs/v1.7/contributing/release-process.html

  • ArgoCD-Crossplane: Topic finalizer prevents deletion

    ArgoCD-Crossplane: Topic finalizer prevents deletion

    Hi team,

    Is there a way to override/remove Topic default finalizer (finalizer.managedresource.crossplane.io) declaratively?

    The use-case: ArgoCD cannot remove Topic object after manifest deletion in Git and synchronization never ends: image

    If to remove finalizer from Topic's spec manually - all goes as expected immediately.

    Already tried:

    metadata:
      finalizers:
        - resources-finalizer.argocd.argoproj.io
    

    and

    metadata:
      finalizers: []
    

    but no luck, finalizer.managedresource.crossplane.io finalizer is still added automatically.

    crossplane:v1.9.1 provider-kafka-controller:v0.1.0 argocd:v2.4.12

  • DevTools: Kafka Env Refresh

    DevTools: Kafka Env Refresh

    Description of your changes

    We've created a super fast script to refresh the Kafka development environment running locally. It deletes relevant keys and namespaces, reinstalls Kafka on Kubernetes, and sets up the passwords using generic files.

    May be better applied in the Makefile, but for now it works and is efficient.

    The prerequisites are having everything installed. This does not break the process of manual installs using the README, or alter the process in any other way.

    This is for development only, and won't work in other environments, nor is it intended to. I placed them in a subdirectory devutils to prevent confusion. Can easily be deleted and deprecated at release time.

  • Topic controller Observe function doesn't distinguish between non-existing topics and errors from the client in retrieving the topic

    Topic controller Observe function doesn't distinguish between non-existing topics and errors from the client in retrieving the topic

    From controller/topic/topic.go Observe() function:

    	tpc, err := topic.Get(ctx, c.kafkaClient, meta.GetExternalName(cr))
            if tpc == nil {
    		return managed.ExternalObservation{ResourceExists: false}, nil
    	}
    	if err != nil {
    		return managed.ExternalObservation{}, errors.Wrapf(err, "cannot get topic spec from topic client")
    	}
    

    topic.Get either returns a topic or an error. In case of errors, the topic will always be nil and the function will return on the first error check. If the topic is not nil err will always be nil, therefore the code in that if block is unreachable.

    This code was probably developed because topic.Get() returns an error in both cases when a topic does not exist of when something went wrong, and it is not possible with just the error itself to discern which of the two happened.

    As a result, errors from the client performing a Get are interpreted as "Topic does not exist" issuing a topic creation when not necessary

  • ACLs cannot be deleted

    ACLs cannot be deleted

    What happened?

    Last weeks I've been working on engineering a Kafka GitOps feature for our MSK clusters. For this I use Crossplane and this provider. To make the provider work with MSK I've contributed TLS + SCRAM authentication support.

    Now managing Topics works great. Creation of ACLs works too, but deletion is not possible. To make this work I changed:

    https://github.com/crossplane-contrib/provider-kafka/blob/d085e96353054c2807f37c86435130b8a5569f5c/internal/clients/kafka/acl/acl.go#L59-L68

    ... into:

            resp, err := cl.DescribeACLs(ctx, ab)
            if err != nil {
                    return nil, errors.Wrap(err, "describe ACLs response is empty")
            }
            if exists := resp[0].Described; len(exists) == 0 {
                    return nil, nil // no matching ACLs found
            }
    

    The original code throws an error if no ACLs exist for specific criteria. My code allows this.

    Now my code works flawlessly (for MSK), but since it is a significant change to the logic and it implies 'delete ACL' never worked, I wonder if I am missing something. So, I'd like a discussion before I submit a patch.

    Cheers.

    How can we reproduce it?

    In short:

    • create a ACL (this works)
    • attempt to delete it, which fails

    In detail:

    I create the ACL. Here is the resource and its good health:

    $ kubectl get accesscontrollist.acl.kafka.crossplane.io
    NAME                                               READY   SYNCED   EXTERNAL-NAME                                                                                                                                                                                                                                      AGE
    acl-managed-by-crossplane-kafka-provider-acltest   True    True     {"ResourceName":"topic-managed-by-crossplane-kafka-provider-acltest","ResourceType":"Topic","ResourcePrincipal":"User:Foo","ResourceHost":"*","ResourceOperation":"Read","ResourcePermissionType":"Allow","ResourcePatternTypeFilter":"Literal"}   92s
    

    With kcl I check the Kafka side of things:

    $ kcl admin acl describe --type any --pattern match --op any --perm any --name topic-managed-by-crossplane-kafka-provider-acltest
    TYPE   NAME                                                PATTERN  PRINCIPAL                                                           HOST  OPERATION  PERMISSION  ERROR  ERROR MESSAGE
    TOPIC  topic-managed-by-crossplane-kafka-provider-acltest  LITERAL  User:Foo                                                            *     READ       ALLOW
    

    So, indeed: the ACL exists.

    Now let's delete the ACL:

    $ kubectl delete accesscontrollist.acl.kafka.crossplane.io/acl-managed-by-crossplane-kafka-provider-acltest
    accesscontrollist.acl.kafka.crossplane.io "acl-managed-by-crossplane-kafka-provider-acltest" deleted
    <hangs>
    

    The delete command hangs. Meanwhile in Kafka the ACL has been removed.

    Checking the to-be-deleted ACL resource from another terminal shows it's now unREADY and unSYNCED. Both as expected:

    $ k get accesscontrollist.acl.kafka.crossplane.io
    NAME                                               READY   SYNCED   EXTERNAL-NAME                                                                                                                                                                                                                                      AGE
    acl-managed-by-crossplane-kafka-provider-acltest   False   False    {"ResourceName":"topic-managed-by-crossplane-kafka-provider-acltest","ResourceType":"Topic","ResourcePrincipal":"User:Foo","ResourceHost":"*","ResourceOperation":"Read","ResourcePermissionType":"Allow","ResourcePatternTypeFilter":"Literal"}   16m
    

    Checking the kafka provider (running in debug mode) logs I see this:

    2022-07-13T11:43:26.954+0200	DEBUG	provider-kafka	Cannot observe external resource	{"controller": "managed/accesscontrollist.acl.kafka.crossplane.io", "request": "/acl-managed-by-crossplane-kafka-provider-acltest", "uid": "90943fdb-d5d6-44ab-83b6-77f9dc66a15a", "version": "179916", "external-name": "{\"ResourceName\":\"topic-managed-by-crossplane-kafka-provider-acltest\",\"ResourceType\":\"Topic\",\"ResourcePrincipal\":\"User:Foo\",\"ResourceHost\":\"*\",\"ResourceOperation\":\"Read\",\"ResourcePermissionType\":\"Allow\",\"ResourcePatternTypeFilter\":\"Literal\"}", "error": "cannot List ACLs: no create response for acl", "errorVerbose": "no create response for acl\ngithub.com/crossplane-contrib/provider-kafka/internal/clients/kafka/acl.List\n\t/Users/rtoma/redacted/projects/provider-kafka-fork/internal/clients/kafka/acl/acl.go:66\ngithub.com/crossplane-contrib/provider-kafka/internal/controller/acl.(*external).Observe\n\t/Users/rtoma/redacted/projects/provider-kafka-fork/internal/controller/acl/acl.go:158\ngithub.com/crossplane/crossplane-runtime/pkg/reconciler/managed.(*Reconciler).Reconcile\n\t/Users/rtoma/go/pkg/mod/github.com/crossplane/[email protected]/pkg/reconciler/managed/reconciler.go:620\nsigs.k8s.io/controller-runtime/pkg/internal/controller.(*Controller).reconcileHandler\n\t/Users/rtoma/go/pkg/mod/sigs.k8s.io/[email protected]/pkg/internal/controller/controller.go:298\nsigs.k8s.io/controller-runtime/pkg/internal/controller.(*Controller).processNextWorkItem\n\t/Users/rtoma/go/pkg/mod/sigs.k8s.io/[email protected]/pkg/internal/controller/controller.go:253\nsigs.k8s.io/controller-runtime/pkg/internal/controller.(*Controller).Start.func2.2\n\t/Users/rtoma/go/pkg/mod/sigs.k8s.io/[email protected]/pkg/internal/controller/controller.go:214\nruntime.goexit\n\t/opt/homebrew/Cellar/go/1.18.3/libexec/src/runtime/asm_arm64.s:1263\ncannot List ACLs\ngithub.com/crossplane-contrib/provider-kafka/internal/controller/acl.(*external).Observe\n\t/Users/rtoma/redacted/projects/provider-kafka-fork/internal/controller/acl/acl.go:161\ngithub.com/crossplane/crossplane-runtime/pkg/reconciler/managed.(*Reconciler).Reconcile\n\t/Users/rtoma/go/pkg/mod/github.com/crossplane/[email protected]/pkg/reconciler/managed/reconciler.go:620\nsigs.k8s.io/controller-runtime/pkg/internal/controller.(*Controller).reconcileHandler\n\t/Users/rtoma/go/pkg/mod/sigs.k8s.io/[email protected]/pkg/internal/controller/controller.go:298\nsigs.k8s.io/controller-runtime/pkg/internal/controller.(*Controller).processNextWorkItem\n\t/Users/rtoma/go/pkg/mod/sigs.k8s.io/[email protected]/pkg/internal/controller/controller.go:253\nsigs.k8s.io/controller-runtime/pkg/internal/controller.(*Controller).Start.func2.2\n\t/Users/rtoma/go/pkg/mod/sigs.k8s.io/[email protected]/pkg/internal/controller/controller.go:214\nruntime.goexit\n\t/opt/homebrew/Cellar/go/1.18.3/libexec/src/runtime/asm_arm64.s:1263"}
    2022-07-13T11:43:26.955+0200	DEBUG	controller-runtime.manager.events	Warning	{"object": {"kind":"AccessControlList","name":"acl-managed-by-crossplane-kafka-provider-acltest","uid":"90943fdb-d5d6-44ab-83b6-77f9dc66a15a","apiVersion":"acl.kafka.crossplane.io/v1alpha1","resourceVersion":"179916"}, "reason": "CannotObserveExternalResource", "message": "cannot List ACLs: no create response for acl"}
    

    From above debug blob I'd like to highlight:

    "errorVerbose": "no create response for acl
      github.com/crossplane-contrib/provider-kafka/internal/clients/kafka/acl.List
      /Users/rtoma/redacted/projects/provider-kafka-fork/internal/clients/kafka/acl/acl.go:66
    

    This is why I believe 'delete ACL' is flawed. The acl.List method throws an error when no ACLs exist. Now to me finding no matching ACLs seems like the expected result of a delete ACL action. But maybe I'm missing something?

    What environment did it happen in?

    Crossplane version: 1.8.1 Kafka provider: 0.1.0 with TLS/SCRAM support Kubernetes: 1.22.8 (OpenShift on AWS)

  • Kafka connect and connectors

    Kafka connect and connectors

    The https://strimzi.io/ operators allow the management of kafka connect and connectors, that works well if we're managing everything with strimzi but not so much if we're using AWS MSK for example, it would be nice if this provider had the same capabilities as strimzi.

    These are the custom resources they provide: https://strimzi.io/docs/operators/in-development/overview.html#configuration-points-resources_str

  • Support ACL configuration via Zookeeper

    Support ACL configuration via Zookeeper

    What problem are you facing?

    A fully feature complete provider-kafka needs to be able to use zookeeper to store and configure ACLs on the server per Kafka’s ACL structure specification and resource patterns.

    How could Crossplane help solve your problem?

    Users of the provider would have the ability to have the control plane to store ACL configurations on the server in accordance with Kafka's ACL structure specification.

stratus is a cross-cloud identity broker that allows workloads with an identity issued by one cloud provider to exchange this identity for a workload identity issued by another cloud provider.
stratus is a cross-cloud identity broker that allows workloads with an identity issued by one cloud provider to exchange this identity for a workload identity issued by another cloud provider.

stratus stratus is a cross-cloud identity broker that allows workloads with an identity issued by one cloud provider to exchange this identity for a w

Dec 26, 2021
Confluent's Apache Kafka Golang client

Confluent's Golang Client for Apache KafkaTM confluent-kafka-go is Confluent's Golang client for Apache Kafka and the Confluent Platform. Features: Hi

Dec 30, 2022
Sarama is a Go library for Apache Kafka 0.8, and up.

sarama Sarama is an MIT-licensed Go client library for Apache Kafka version 0.8 (and later). Getting started API documentation and examples are availa

Jan 1, 2023
Implementation of the NELI leader election protocol for Go and Kafka
Implementation of the NELI leader election protocol for Go and Kafka

goNELI Implementation of the NELI leader election protocol for Go and Kafka. goNELI encapsulates the 'fast' variation of the protocol, running in excl

Dec 8, 2022
ChanBroker, a Broker for goroutine, is simliar to kafka

Introduction chanbroker, a Broker for goroutine, is simliar to kafka In chanbroker has three types of goroutine: Producer Consumer(Subscriber) Broker

Aug 12, 2021
Apache Kafka Web UI for exploring messages, consumers, configurations and more with a focus on a good UI & UX.
Apache Kafka Web UI for exploring messages, consumers, configurations and more with a focus on a good UI & UX.

Kowl - Apache Kafka Web UI Kowl (previously known as Kafka Owl) is a web application that helps you to explore messages in your Apache Kafka cluster a

Jan 3, 2023
franz-go contains a high performance, pure Go library for interacting with Kafka from 0.8.0 through 2.7.0+. Producing, consuming, transacting, administrating, etc.

franz-go - Apache Kafka client written in Go Franz-go is an all-encompassing Apache Kafka client fully written Go. This library aims to provide every

Dec 29, 2022
Modern CLI for Apache Kafka, written in Go.
Modern CLI for Apache Kafka, written in Go.

Kaf Kafka CLI inspired by kubectl & docker Install Install from source: go get -u github.com/birdayz/kaf/cmd/kaf Install binary: curl https://raw.git

Dec 31, 2022
Easy to use distributed event bus similar to Kafka
Easy to use distributed event bus similar to Kafka

chukcha Easy to use distributed event bus similar to Kafka. The event bus is designed to be used as a persistent intermediate storage buffer for any k

Dec 30, 2022
Kafka implemented in Golang with built-in coordination (No ZooKeeper, single binary install, Cloud Native)

Jocko Distributed commit log service in Go that is wire compatible with Kafka. Created by @travisjeffery, continued by nash. Goals: Protocol compatibl

Aug 9, 2021
Cluster extensions for Sarama, the Go client library for Apache Kafka 0.9

Cluster extensions for Sarama, the Go client library for Apache Kafka 0.9 (and later).

Dec 28, 2022
kafka watcher for casbin library

Casbin Kafka Watcher Casbin watcher for kafka This watcher library will enable users to dynamically change casbin policies through kakfa messages Infl

May 8, 2021
CLI Tool to Stress Apache Kafka Clusters

Kafka Stress - Stress Test Tool for Kafka Clusters, Producers and Consumers Tunning Installation Docker docker pull fidelissauro/kafka-stress:latest d

Nov 13, 2022
franz-go - A complete Apache Kafka client written in Go

franz-go contains a feature complete, pure Go library for interacting with Kafka from 0.8.0 through 2.8.0+. Producing, consuming, transacting, administrating, etc.

Dec 29, 2022
Go gRPC Kafka CQRS microservices with tracing

Golang CQRS Kafka gRPC Postgresql MongoDB Redis microservices example ?? ??‍?? Full list what has been used: Kafka as messages broker gRPC Go implemen

Jan 1, 2023
pubsub controller using kafka and base on sarama. Easy controll flow for actions streamming, event driven.

Psub helper for create system using kafka to streaming and events driven base. Install go get github.com/teng231/psub have 3 env variables for config

Sep 26, 2022
Kafka producer and consumer tool in protobuf format.

protokaf Kafka producer and consumer tool in protobuf format. Features Consume and produce messages using Protobuf protocol Trace messages with Jaeger

Nov 15, 2022
Kafka tool to emit tombstones for messages based on header value matches

Langolier Langolier is a CLI tool to consume a Kafka topic and emit tombstones for messages matched by header name/value pairs. Usage Usage of langoli

Sep 22, 2021
Fixed column file to avro/kafka

shredder shredds Fixed column file to avro/kafka . Implementation uses Avro schema and multicore Speed around 220mb/sec per Core using 4 core on a 1Gb

Dec 20, 2021