Command Line Tool for managing Apache Kafka

kafkactl

A command-line interface for interaction with Apache Kafka

Build Status | command docs

Features

  • command auto-completion for bash, zsh, fish shell including dynamic completion for e.g. topics or consumer groups.
  • support for avro schemas
  • Configuration of different contexts
  • directly access kafka clusters inside your kubernetes cluster
  • support for consuming and producing protobuf-encoded messages

asciicast

Installation

You can install the pre-compiled binary or compile from source.

Install the pre-compiled binary

snap:

snap install kafkactl

homebrew:

# install tap repostory once
brew tap deviceinsight/packages
# install kafkactl
brew install deviceinsight/packages/kafkactl
# upgrade kafkactl
brew upgrade deviceinsight/packages/kafkactl

deb/rpm:

Download the .deb or .rpm from the releases page and install with dpkg -i and rpm -i respectively.

yay (AUR)

There's a kafkactl AUR package available for Arch. Install it with your AUR helper of choice (e.g. yay):

yay -S kafkactl

manually:

Download the pre-compiled binaries from the releases page and copy to the desired location.

Compiling from source

go get -u github.com/deviceinsight/kafkactl

NOTE: make sure that kafkactl is on PATH otherwise auto-completion won't work.

Configuration

If no config file is found, a default config is generated in $HOME/.config/kafkactl/config.yml. This configuration is suitable to get started with a single node cluster on a local machine.

Create a config file

Create $HOME/.config/kafkactl/config.yml with a definition of contexts that should be available

contexts:
  default:
    brokers:
    - localhost:9092
  remote-cluster:
    brokers:
    - remote-cluster001:9092
    - remote-cluster002:9092
    - remote-cluster003:9092

    # optional: tls config
    tls:
      enabled: true
      ca: my-ca
      cert: my-cert
      certKey: my-key
      # set insecure to true to ignore all tls verification (defaults to false)
      insecure: false

    # optional: sasl support
    sasl:
      enabled: true
      username: admin
      password: admin
      # optional configure sasl mechanism as plaintext, scram-sha256, scram-sha512 (defaults to plaintext)
      mechanism: scram-sha512
  
    # optional: access clusters running kubernetes
    kubernetes:
      enabled: false
      binary: kubectl #optional
      kubeConfig: ~/.kube/config #optional
      kubeContext: my-cluster
      namespace: my-namespace

    # optional: clientID config (defaults to kafkactl-{username})
    clientID: my-client-id
    
    # optional: kafkaVersion (defaults to 2.5.0)
    kafkaVersion: 1.1.1

    # optional: timeout for admin requests (defaults to 3s)
    requestTimeout: 10s

    # optional: avro schema registry
    avro:
      schemaRegistry: localhost:8081
    
    # optional: default protobuf messages search paths
    protobuf:
       importPaths:
          - "/usr/include/protobuf"
       protoFiles:
          - "someMessage.proto"
          - "otherMessage.proto"
       protosetFiles:
          - "/usr/include/protoset/other.protoset"
    
    # optional: changes the default partitioner
    defaultPartitioner: "hash"

    # optional: changes default required acks in produce request
    # see: https://pkg.go.dev/github.com/Shopify/sarama?utm_source=godoc#RequiredAcks
    requiredAcks: "WaitForAll"


current-context: default

The config file location is resolved by

  • checking for a provided commandline argument: --config-file=$PATH_TO_CONFIG
  • or by evaluating the environment variable: export KAFKA_CTL_CONFIG=$PATH_TO_CONFIG
  • or as default the config file is looked up from one of the following locations:
    • $HOME/.config/kafkactl/config.yml
    • $HOME/.kafkactl/config.yml
    • $SNAP_REAL_HOME/.kafkactl/config.yml
    • $SNAP_DATA/kafkactl/config.yml
    • /etc/kafkactl/config.yml

Auto completion

bash

NOTE: if you installed via snap, bash completion should work automatically.

source <(kafkactl completion bash)

To load completions for each session, execute once: Linux:

kafkactl completion bash > /etc/bash_completion.d/kafkactl

MacOS:

kafkactl completion bash > /usr/local/etc/bash_completion.d/kafkactl

zsh

source <(kafkactl completion zsh)

To load completions for each session, execute once:

kafkactl completion zsh > "${fpath[1]}/_kafkactl"

Fish

kafkactl completion fish | source

To load completions for each session, execute once:

kafkactl completion fish > ~/.config/fish/completions/kafkactl.fish

Running in docker

Assuming your Kafka broker is accessible as kafka:9092, you can list topics by running:

docker run --env BROKERS=kafka:9092 deviceinsight/kafkactl:latest get topics

If a more elaborate config is needed, you can mount it as a volume:

docker run -v /absolute/path/to/config.yml:/etc/kafkactl/config.yml deviceinsight/kafkactl get topics

Configuration via environment variables

Every key in the config.yml can be overwritten via environment variables. The corresponding environment variable for a key can be found by applying the following rules:

  1. replace . by _
  2. replace - by _
  3. write the key name in ALL CAPS

e.g. the key contexts.default.tls.certKey has the corresponding environment variable CONTEXTS_DEFAULT_TLS_CERTKEY.

If environment variables for the default context should be set, the prefix CONTEXTS_DEFAULT_ can be omitted. So, instead of CONTEXTS_DEFAULT_TLS_CERTKEY one can also set TLS_CERTKEY. See root_test.go for more examples.

Running in Kubernetes

🚧 This feature is still experimental.

If your kafka cluster is not directly accessible from your machine, but it is accessible from a kubernetes cluster which in turn is accessible via kubectl from your machine you can configure kubernetes support:

contexts:
  kafka-cluster:
    brokers:
      - broker1:9092
      - broker2:9092
    kubernetes:
      enabled: true
      binary: kubectl #optional
      kubeContext: k8s-cluster
      namespace: k8s-namespace

Instead of directly talking to kafka brokers a kafkactl docker image is deployed as a pod into the kubernetes cluster, and the defined namespace. Standard-Input and Standard-Output are then wired between the pod and your shell running kafkactl.

There are two options:

  1. You can run kafkactl attach with your kubernetes cluster configured. This will use kubectl run to create a pod in the configured kubeContext/namespace which runs an image of kafkactl and gives you a bash into the container. Standard-in is piped to the pod and standard-out, standard-err directly to your shell. You even get auto-completion.

  2. You can run any other kafkactl command with your kubernetes cluster configured. Instead of directly querying the cluster a pod is deployed, and input/output are wired between pod and your shell.

The names of the brokers have to match the service names used to access kafka in your cluster. A command like this should give you this information:

kubectl get svc | grep kafka

💡 The first option takes a bit longer to start up since an Ubuntu based docker image is used in order to have a bash available. The second option uses a docker image build from scratch and should therefore be quicker. Which option is more suitable, will depend on your use-case.

⚠️ currently kafkactl must NOT be installed via snap in order for the kubernetes feature to work. The snap runs in a sandbox and is therefore unable to access the kubectl binary.

Command documentation

The documentation for all available commands can be found here:

command docs

Examples

Consuming messages

Consuming messages from a topic can be done with:

kafkactl consume my-topic

In order to consume starting from the oldest offset use:

kafkactl consume my-topic --from-beginning

The following example prints message key and timestamp as well as partition and offset in yaml format:

kafkactl consume my-topic --print-keys --print-timestamps -o yaml

Headers of kafka messages can be printed with the parameter --print-headers e.g.:

kafkactl consume my-topic --print-headers -o yaml

If one is only interested in the last n messages this can be achieved by --tail e.g.:

kafkactl consume my-topic --tail=5

The consumer can be stopped when the latest offset is reached using --exit parameter e.g.:

kafkactl consume my-topic --from-beginning --exit

The following example prints keys in hex and values in base64:

kafkactl consume my-topic --print-keys --key-encoding=hex --value-encoding=base64

The consumer can convert protobuf messages to JSON in keys (optional) and values:

kafkactl consume my-topic --value-proto-type MyTopicValue --key-proto-type MyTopicKey --proto-file kafkamsg.proto

Producing messages

Producing messages can be done in multiple ways. If we want to produce a message with key='my-key', value='my-value' to the topic my-topic this can be achieved with one of the following commands:

echo "my-key#my-value" | kafkactl produce my-topic --separator=#
echo "my-value" | kafkactl produce my-topic --key=my-key
kafkactl produce my-topic --key=my-key --value=my-value

If we have a file containing messages where each line contains key and value separated by #, the file can be used as input to produce messages to topic my-topic:

cat myfile | kafkactl produce my-topic --separator=#

The same can be accomplished without piping the file to stdin with the --file parameter:

kafkactl produce my-topic --separator=# --file=myfile

If the messages in the input file need to be split by a different delimiter than \n a custom line separator can be provided:

kafkactl produce my-topic --separator=# --lineSeparator=|| --file=myfile

NOTE: if the file was generated with kafkactl consume --print-keys --print-timestamps my-topic the produce command is able to detect the message timestamp in the input and will ignore it.

the number of messages produced per second can be controlled with the --rate parameter:

cat myfile | kafkactl produce my-topic --separator=# --rate=200

It is also possible to specify the partition to insert the message:

kafkactl produce my-topic --key=my-key --value=my-value --partition=2

Additionally, a different partitioning scheme can be used. When a key is provided the default partitioner uses the hash of the key to assign a partition. So the same key will end up in the same partition:

# the following 3 messages will all be inserted to the same partition
kafkactl produce my-topic --key=my-key --value=my-value
kafkactl produce my-topic --key=my-key --value=my-value
kafkactl produce my-topic --key=my-key --value=my-value

# the following 3 messages will probably be inserted to different partitions
kafkactl produce my-topic --key=my-key --value=my-value --partitioner=random
kafkactl produce my-topic --key=my-key --value=my-value --partitioner=random
kafkactl produce my-topic --key=my-key --value=my-value --partitioner=random

Message headers can also be written:

kafkactl produce my-topic --key=my-key --value=my-value --header key1:value1 --header key2:value\:2

The following example writes the key from base64 and value from hex:

kafkactl produce my-topic --key=dGVzdC1rZXk= --key-encoding=base64 --value=0000000000000000 --value-encoding=hex

You can control how many replica acknowledgements are needed for a response:

kafkactl produce my-topic --key=my-key --value=my-value --required-acks=WaitForAll

Producing null values (tombstone record) is also possible:

 kafkactl produce my-topic --null-value

Producing protobuf message converted from JSON:

kafkactl produce my-topic --key='{"keyField":123}' --key-proto-type MyKeyMessage --value='{"valueField":"value"}' --value-proto-type MyValueMessage --proto-file kafkamsg.proto

Avro support

In order to enable avro support you just have to add the schema registry to your configuration:

contexts:
  localhost:
    avro:
      schemaRegistry: localhost:8081

Producing to an avro topic

kafkactl will lookup the topic in the schema registry in order to determine if key or value needs to be avro encoded. If producing with the latest schemaVersion is sufficient, no additional configuration is needed an kafkactl handles this automatically.

If however one needs to produce an older schemaVersion this can be achieved by providing the parameters keySchemaVersion, valueSchemaVersion.

Example
# create a topic
kafkactl create topic avro_topic
# add a schema for the topic value
curl -X POST -H "Content-Type: application/vnd.schemaregistry.v1+json" \
--data '{"schema": "{\"type\": \"record\", \"name\": \"LongList\", \"fields\" : [{\"name\": \"next\", \"type\": [\"null\", \"LongList\"], \"default\": null}]}"}' \
http://localhost:8081/subjects/avro_topic-value/versions
# produce a message
kafkactl produce avro_topic --value {\"next\":{\"LongList\":{}}}
# consume the message
kafkactl consume avro_topic --from-beginning --print-schema -o yaml

Consuming from an avro topic

As for producing kafkactl will also lookup the topic in the schema registry to determine if key or value needs to be decoded with an avro schema.

The consume command handles this automatically and no configuration is needed.

An additional parameter print-schema can be provided to display the schema used for decoding.

Protobuf support

kafkactl can consume and produce protobuf-encoded messages. In order to enable protobuf serialization/deserialization you should add flag --value-proto-type and optionally --key-proto-type (if keys encoded in protobuf format) with type name. Protobuf-encoded messages are mapped with pbjson.

kafkactl will search messages in following order:

  1. Protoset files specified in --protoset-file flag
  2. Protoset files specified in context.protobuf.protosetFiles config value
  3. Proto files specified in --proto-file flag
  4. Proto files specified in context.protobuf.protoFiles config value

Proto files may require some dependencies in import sections. To specify additional lookup paths use --proto-import-path flag or context.protobuf.importPaths config value.

If provided message types was not found kafkactl will return error.

Note that if you want to use raw proto files protoc installation don't need to be installed.

Also note that protoset files must be compiled with included imports:

protoc -o kafkamsg.protoset --include_imports kafkamsg.proto

Example

Assume you have following proto schema in kafkamsg.proto:

syntax = "proto3";

import "google/protobuf/timestamp.proto";

message TopicMessage {
  google.protobuf.Timestamp produced_at = 1;
  int64 num = 2;
}

message TopicKey {
  float fvalue = 1;
}

"well-known" google/protobuf types are included so no additional proto files needed.

To produce message run

kafkactl produce <topic> --key '{"fvalue":1.2}' --key-proto-type TopicKey --value '{"producedAt":"2021-12-01T14:10:12Z","num":"1"}' --value-proto-type TopicValue --proto-file kafkamsg.proto

or with protoset

kafkactl produce <topic> --key '{"fvalue":1.2}' --key-proto-type TopicKey --value '{"producedAt":"2021-12-01T14:10:12Z","num":"1"}' --value-proto-type TopicValue --protoset-file kafkamsg.protoset

To consume messages run

kafkactl consume <topic> --key-proto-type TopicKey --value-proto-type TopicValue --proto-file kafkamsg.proto

or with protoset

kafkactl consume <topic> --key-proto-type TopicKey --value-proto-type TopicValue --protoset-file kafkamsg.protoset

Altering topics

Using the alter topic command allows you to change the partition count, replication factor and topic-level configurations of an existing topic.

The partition count can be increased with:

kafkactl alter topic my-topic --partitions 32

The replication factor can be altered with:

kafkactl alter topic my-topic --replication-factor 2

ℹ️ when altering replication factor, kafkactl tries to keep the number of replicas assigned to each broker balanced. If you need more control over the assigned replicas use alter partition directly.

The topic configs can be edited by supplying key value pairs as follows:

kafkactl alter topic my-topic --config retention.ms=3600000 --config cleanup.policy=compact

💡 use the flag --validate-only to perform a dry-run without actually modifying the topic

Altering partitions

The assigned replicas of a partition can directly be altered with:

# set brokers 102,103 as replicas for partition 3 of topic my-topic
kafkactl alter topic my-topic 3 -r 102,103

Consumer groups

In order to get a list of consumer groups the get consumer-groups command can be used:

# all available consumer groups
kafkactl get consumer-groups 
# only consumer groups for a single topic
kafkactl get consumer-groups --topic my-topic
# using command alias
kafkactl get cg

To get detailed information about the consumer group use describe consumer-group. If the parameter --partitions is provided details will be printed for each partition otherwise the partitions are aggregated to the clients.

# describe a consumer group
kafkactl describe consumer-group my-group 
# show partition details only for partitions with lag
kafkactl describe consumer-group my-group --only-with-lag
# show details only for a single topic
kafkactl describe consumer-group my-group --topic my-topic
# using command alias
kafkactl describe cg my-group

Reset consumer group offsets

in order to ensure the reset does what it is expected, per default only the results are printed without actually executing it. Use the additional parameter --execute to perform the reset.

# reset offset of for all partitions to oldest offset
kafkactl reset offset my-group --topic my-topic --oldest
# reset offset of for all partitions to newest offset
kafkactl reset offset my-group --topic my-topic --newest
# reset offset for a single partition to specific offset
kafkactl reset offset my-group --topic my-topic --partition 5 --offset 100

Delete consumer group offsets

In order to delete a consumer group offset use delete offset

# delete offset for all partitions of topic my-topic
kafkactl delete offset my-group --topic my-topic
# delete offset for partition 1 of topic my-topic
kafkactl delete offset my-group --topic my-topic --partition 1

Delete consumer groups

In order to delete a consumer group or a list of consumer groups use delete consumer-group

# delete consumer group my-group
kafkactl delete consumer-group my-group

ACL Management

Available ACL operations are documented here.

Create a new ACL

# create an acl that allows topic read for a user 'consumer'
kafkactl create acl --topic my-topic --operation read --principal User:consumer --allow
# create an acl that denies topic write for a user 'consumer' coming from a specific host
kafkactl create acl --topic my-topic --operation write --host 1.2.3.4 --principal User:consumer --deny
# allow multiple operations
kafkactl create acl --topic my-topic --operation read --operation describe --principal User:consumer --allow
# allow on all topics with prefix common prefix
kafkactl create acl --topic my-prefix --pattern prefixed --operation read --principal User:consumer --allow

List ACLs

# list all acl
kafkactl get acl
# list all acl (alias command)
kafkactl get access-control-list
# filter only topic resources
kafkactl get acl --topics
# filter only consumer group resources with operation read
kafkactl get acl --groups --operation read

Delete ACLs

# delete all topic read acls
kafkactl delete acl --topics --operation read --pattern any
# delete all topic acls for any operation
kafkactl delete acl --topics --operation any --pattern any
# delete all cluster acls for any operation
kafkactl delete acl --cluster --operation any --pattern any
# delete all consumer-group acls with operation describe, patternType prefixed and permissionType allow
kafkactl delete acl --groups --operation describe --pattern prefixed --allow

Getting Brokers

To get the list of brokers of a kafka cluster use get brokers

# get the list of brokers
kafkactl get brokers

Describe Broker

To view configs for a single broker use describe broker

# describe broker
kafkactl describe broker 1

Development

In order to see linter errors before commit, add the following pre-commit hook:

pip install --user pre-commit
pre-commit install
Comments
  • kafkactl get consumer-groups fails

    kafkactl get consumer-groups fails

    Error message: failed to get group member assignment: kafka: insufficient data to decode packet, more bytes expected

    All other commands work just fine.

    kafkactl version: cmd.info{version:"v1.23.1", buildTime:"2021-11-23T13:08:18Z", gitCommit:"9cb9f72", goVersion:"go1.16.10", compiler:"gc", platform:"darwin/amd64"}

  • Possibility to produce avro encoded message to schema registry powered topic

    Possibility to produce avro encoded message to schema registry powered topic

    Hello

    Some tools allow to produce Avro encoded messages to a topic, using schema from schema registry, with pure JSON file as input message.

    For example this https://github.com/tchiotludo/akhq

  • Remove topic from consumer group

    Remove topic from consumer group

    When an application is refactored to replace one input topic by another one, old topic must be removed from the consumer group (we track consumer group lag using something similar to https://github.com/lightbend/kafka-lag-exporter).

    kafka-consumer-groups allows such operation with a combination of --delete-offsets and --topic.

    ./kafka-consumer-groups.sh  --bootstrap-server 10.10.10.10:9092 --delete-offsets --group my_group --topic old_topic
    

    kafkactl doesn't seems to support this feature. It is a deliberate choice or a feature gap?

  • Support starting off with the entire system root CA pool instead of o…

    Support starting off with the entire system root CA pool instead of o…

    …nly the passed in CA

    Description

    This change allows starting with the system CA pool and adding in the additional CA. AWS MSK with client auth using a private CA in ACM seems to require this to operate correctly (both the AWS trusted CAs and the private CA need to be in the pool).

    I'll update this PR with docs provided it looks good otherwise.

    Type of change

    Please delete options that are not relevant.

    • [ ] Bug fix (non-breaking change which fixes an issue)
    • [X] New feature (non-breaking change which adds functionality)
    • [ ] Breaking change (fix or feature that would cause existing functionality to not work as expected)

    Documentation

    • [x] the change is mentioned in the ## [Unreleased] section of CHANGELOG.md
    • [ ] the configuration yaml was changed and the example config in README.md was updated
    • [ ] a usage example was added to README.md
  • Add default config generation if no config found

    Add default config generation if no config found

    For issue #22 I am not quite sure about implementation. I've tried to found a way to ask viper if there is config file, but found none. So it's a glob checking here for a file. Maybe should move some constants like 'yml' and 'config' to variables.

  • extend the documentation of the command completion of zsh

    extend the documentation of the command completion of zsh

    Description

    In the documentation is a little mistake about the zsh completion. The documentation of zsh recommends installing all completions to the $ZSH/cache/completions directory and reloading the completion plugin. The file .zprofile is executed in every new terminal session. So every time the completion is activated. It's more explicit.

    Type of change

    • [X] Bug fix (non-breaking change which fixes an issue)

    Documentation

    • [x] fix the zsh completion inREADME.md
  • It is not working with latest kubectl

    It is not working with latest kubectl

    $ kafkactl get topics
    Error: unknown flag: --generator
    See 'kubectl run --help' for usage.
    command "/usr/local/bin/kubectl" exited with non-zero status:
    
    PATH:
      /usr/local/bin/kubectl
    
    $ kubectl version
    Client Version: version.Info{Major:"1", Minor:"21", GitVersion:"v1.21.0", GitCommit:"cb303e613a121a29364f75cc67d3d580833a7479", GitTreeState:"clean", BuildDate:"2021-04-08T21:16:14Z", GoVersion:"go1.16.3", Compiler:"gc", Platform:"darwin/amd64"}
    

    Am I doing something wrong? It used to work and it stopped recently.

  • Docker image does not include CA certificates

    Docker image does not include CA certificates

    First off: Great work on allowing us to configure Kafkactl using environment variables! Makes the tool very useable with Amazon MSK

    The issue: You provided docker image does not seem to include any CA certificates. In order to connect to a TLS host (In this case an Amazon MSK cluster using certificates provided by Amazon) one needs to either set TLS_INSECURE=true or TLS_CA=/path/to/provided/ca.

    Unfortunately, in my use-case, adding the CA certificate manually does not work very well, as the container is running on a remote host (and is launched ad-hoc).

    Using the TLS_INSECURE=true is a workaround, but not optimal, especially not for production.

    Command example:

    user@machine dsp % kubectl run -i --tty --rm kafkactl --image=deviceinsight/kafkactl:latest --restart=Never --env="TLS_ENABLED=true" --env="BROKERS=$(aws --profile saml kafka get-bootstrap-brokers --cluster-arn $(aws --profile saml kafka list-clusters --cluster-name-filter 'msk' --query 'ClusterInfoList[0].{arn:ClusterArn}' | jq -r .arn) | jq -r .BootstrapBrokerStringTls | sed 's/,/ /g')" -- kafkactl -V get topics
    If you don't see a command prompt, try pressing enter.
    [sarama  ] 2020/09/19 20:49:42 client/metadata fetching metadata for all topics from broker b-3.msk.jp2jsm.c4.kafka.eu-west-1.amazonaws.com:9094
    [sarama  ] 2020/09/19 20:49:42 Failed to connect to broker b-3.msk.jp2jsm.c4.kafka.eu-west-1.amazonaws.com:9094: x509: certificate signed by unknown authority
    [sarama  ] 2020/09/19 20:49:42 client/metadata got error from broker -1 while fetching metadata: x509: certificate signed by unknown authority
    [sarama  ] 2020/09/19 20:49:42 client/metadata fetching metadata for all topics from broker b-6.msk.jp2jsm.c4.kafka.eu-west-1.amazonaws.com:9094
    [sarama  ] 2020/09/19 20:49:42 Failed to connect to broker b-6.msk.jp2jsm.c4.kafka.eu-west-1.amazonaws.com:9094: x509: certificate signed by unknown authority
    [sarama  ] 2020/09/19 20:49:42 client/metadata got error from broker -1 while fetching metadata: x509: certificate signed by unknown authority
    [sarama  ] 2020/09/19 20:49:42 client/metadata fetching metadata for all topics from broker b-1.msk.jp2jsm.c4.kafka.eu-west-1.amazonaws.com:9094
    [sarama  ] 2020/09/19 20:49:42 Failed to connect to broker b-1.msk.jp2jsm.c4.kafka.eu-west-1.amazonaws.com:9094: x509: certificate signed by unknown authority
    [sarama  ] 2020/09/19 20:49:42 client/metadata got error from broker -1 while fetching metadata: x509: certificate signed by unknown authority
    [sarama  ] 2020/09/19 20:49:42 client/metadata no available broker to send metadata request to
    [sarama  ] 2020/09/19 20:49:42 client/brokers resurrecting 3 dead seed brokers
    [sarama  ] 2020/09/19 20:49:42 Closing Client
    failed to create cluster admin: kafka: client has run out of available brokers to talk to (Is your cluster reachable?)
    pod "kafkactl" deleted
    pod default/kafkactl terminated (Error)
    user@machine dsp % kubectl run -i --tty --rm kafkactl --image=deviceinsight/kafkactl:latest --restart=Never --env="TLS_INSECURE=true" --env="TLS_ENABLED=true" --env="BROKERS=$(aws --profile saml kafka get-bootstrap-brokers --cluster-arn $(aws --profile saml kafka list-clusters --cluster-name-filter 'msk' --query 'ClusterInfoList[0].{arn:ClusterArn}' | jq -r .arn) | jq -r .BootstrapBrokerStringTls | sed 's/,/ /g')" -- kafkactl -V get topics
    [kafkactl] 2020/09/19 20:50:11 generated default config at /home/kafkactl/.config/kafkactl/config.yml
    [kafkactl] 2020/09/19 20:50:11 Using config file: /home/kafkactl/.config/kafkactl/config.yml
    [kafkactl] 2020/09/19 20:50:11 Assuming kafkaVersion: 2.0.0
    [kafkactl] 2020/09/19 20:50:11 Assuming kafkaVersion: 2.0.0
    [kafkactl] 2020/09/19 20:50:11 TLS is enabled.
    [sarama  ] 2020/09/19 20:50:11 Initializing new client
    [sarama  ] 2020/09/19 20:50:11 client/metadata fetching metadata for all topics from broker b-6.msk.jp2jsm.c4.kafka.eu-west-1.amazonaws.com:9094
    [sarama  ] 2020/09/19 20:50:11 Connected to broker at b-6.msk.jp2jsm.c4.kafka.eu-west-1.amazonaws.com:9094 (unregistered)
    [sarama  ] 2020/09/19 20:50:11 client/brokers registered new broker #5 at b-5.msk.jp2jsm.c4.kafka.eu-west-1.amazonaws.com:9094
    [sarama  ] 2020/09/19 20:50:11 client/brokers registered new broker #4 at b-4.msk.jp2jsm.c4.kafka.eu-west-1.amazonaws.com:9094
    [sarama  ] 2020/09/19 20:50:11 client/brokers registered new broker #1 at b-1.msk.jp2jsm.c4.kafka.eu-west-1.amazonaws.com:9094
    [sarama  ] 2020/09/19 20:50:11 client/brokers registered new broker #6 at b-6.msk.jp2jsm.c4.kafka.eu-west-1.amazonaws.com:9094
    [sarama  ] 2020/09/19 20:50:11 client/brokers registered new broker #2 at b-2.msk.jp2jsm.c4.kafka.eu-west-1.amazonaws.com:9094
    [sarama  ] 2020/09/19 20:50:11 client/brokers registered new broker #3 at b-3.msk.jp2jsm.c4.kafka.eu-west-1.amazonaws.com:9094
    [sarama  ] 2020/09/19 20:50:11 Successfully initialized new client
    [kafkactl] 2020/09/19 20:50:11 TLS is enabled.
    [sarama  ] 2020/09/19 20:50:11 Initializing new client
    [sarama  ] 2020/09/19 20:50:11 client/metadata fetching metadata for all topics from broker b-6.msk.jp2jsm.c4.kafka.eu-west-1.amazonaws.com:9094
    [sarama  ] 2020/09/19 20:50:12 Connected to broker at b-3.msk.jp2jsm.c4.kafka.eu-west-1.amazonaws.com:9094 (registered as #3)
    [sarama  ] 2020/09/19 20:50:12 Connected to broker at b-6.msk.jp2jsm.c4.kafka.eu-west-1.amazonaws.com:9094 (unregistered)
    [sarama  ] 2020/09/19 20:50:12 client/brokers registered new broker #5 at b-5.msk.jp2jsm.c4.kafka.eu-west-1.amazonaws.com:9094
    [sarama  ] 2020/09/19 20:50:12 client/brokers registered new broker #4 at b-4.msk.jp2jsm.c4.kafka.eu-west-1.amazonaws.com:9094
    [sarama  ] 2020/09/19 20:50:12 client/brokers registered new broker #1 at b-1.msk.jp2jsm.c4.kafka.eu-west-1.amazonaws.com:9094
    [sarama  ] 2020/09/19 20:50:12 client/brokers registered new broker #6 at b-6.msk.jp2jsm.c4.kafka.eu-west-1.amazonaws.com:9094
    [sarama  ] 2020/09/19 20:50:12 client/brokers registered new broker #2 at b-2.msk.jp2jsm.c4.kafka.eu-west-1.amazonaws.com:9094
    [sarama  ] 2020/09/19 20:50:12 client/brokers registered new broker #3 at b-3.msk.jp2jsm.c4.kafka.eu-west-1.amazonaws.com:9094
    [sarama  ] 2020/09/19 20:50:12 Successfully initialized new client
    TOPIC                  PARTITIONS
    __consumer_offsets     50
    pod "kafkactl" deleted
    

    Please consider installing CA certificates to the docker ubuntu image. It should only use around 5801kB more space.

  • Add parameter --rate to produce command

    Add parameter --rate to produce command

    add a parameter --rate which controls the rate with which messages are produced. this is useful to limit the number of produced messages when inserting messages on a production environment. --rate should be an integer that represents the number of messages that should be produced per second.

  • Add protobuf deserialization to consumer and serialization to producer

    Add protobuf deserialization to consumer and serialization to producer

    Description

    Added protobuf serialization in producer and deserialization in consumer. To use this user must supply raw .proto schemas by using --proto-file flag and --proto-import-path flag to resolve dependencies mentioned in import sections. User also can supply compiled .protoset file(s) using --protoset-file flag. Values for flags above also duplicated in config file.

    To enable protobuf serialization or deserialization user must specify --value-proto-message flag which contains message type name.

    Key may be optionally serialized/deserialized by using flag --key-proto-message.

    Fixes #110

    Type of change

    • [x] New feature (non-breaking change which adds functionality)

    Documentation

    • [ ] the change is mentioned in the ## [Unreleased] section of CHANGELOG.md
    • [x] the configuration yaml was changed and the example config in README.md was updated
    • [x] a usage example was added to README.md
  • "topic % does not exist" error is given even when topic auto creation is enabled

    Trying to produce a message to the non-existent topic gives a "topic % does not exist" error in any case (seems that the logic responsible for this is here ) while it should be actually possible to do this if auto.create.topics.enable is true in kafka config.

    P.S. Thanks for the great CLI tool, btw. It's quite handy to use for pet projects as well as for the "real" work :)

  • Consume from a Topic and produce to another

    Consume from a Topic and produce to another

    Hello, I find your tool very useful and complete for my purposes and I want to thank you for that. I have one question I did not find an answer autonomously. I would like to retrieve events from a DLQ topic and after a manual check I would like to submit them again to the original topic, passing even headers. I tried to consume messages with headers in json format but I found no way to resubmit them.

  • Unable to provide multiple values for broker via env variable

    Unable to provide multiple values for broker via env variable

    Using kafkactl via docker container and when passing BROKERS="broker1:9094,broker2:9094" I get error that boils down to: too many colons in address . I thin it is not parsing this as a list, but instead treating it as one host/port combination.

    Is there a proper way to pass a list of hosts via a variable?

  • Add --print-partitions to consume

    Add --print-partitions to consume

    Description

    Add --print-partitions to consume command.

    Sometimes you know the key of a message but not the partition. --print-partitions can be useful to discover the partition and run a subsequent query on the relevant partition. It can also be useful when a consumer partitioner is used.

    Type of change

    • [x] New feature (non-breaking change which adds functionality)

    Documentation

    • [ ] the change is mentioned in the ## [Unreleased] section of CHANGELOG.md
    • [ ] the configuration yaml was changed and the example config in README.md was updated
    • [ ] a usage example was added to README.md
    • [x] tests for the changes have been implemented (see: Testing your changes)
  • When increasing replication-factor on a topic, does it maintain rack awareness?

    When increasing replication-factor on a topic, does it maintain rack awareness?

    We have rack-awareness on for our cluster. If I use kafkactl to increase the replication-factor on a topic, does the new replication-factor and assignment maintain rack awareness?

    We used kafkactl last week, and it now appears that one of the topics is no longer rack-safe. I'm checking to see if the topic that is no longer rack-safe is the topic that we used kafkactl on.

    I browsed through the code for kafkactl, and couldn't tell if you were taking into account broker racks, when increasing replication factor. https://github.com/deviceinsight/kafkactl/blob/105b48154bc0644d5aa33f194f0d870694dd4908/internal/topic/topic-operation.go#L501-L545

    Thanks!

  • Deleting multiple topic at once by regexp

    Deleting multiple topic at once by regexp

    I have a group of topics with some prefix and would like to delete them by one command. Something similar to this:

    ./bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic 'service1-.*'
    
  • Integrate with credential-storages

    Integrate with credential-storages

    Currently kafkactl stores passwords (for sasl or maybe tls certs in the future) inside config file. It is not secure because passwords available for any application running from current user. I want to discuss about integration with special credential storage. I.e. Docker uses https://github.com/docker/docker-credential-helpers to interact with keychain, wincred, etc. I propose to use https://github.com/99designs/keyring library because it has integration with many system storages and truly cross-platform encrypted-file based storage. But kafkactl built with disabled cgo and keychain backend requires it. User can specify global storage backends settings in config. Flow may look like this: when something requires passwords kafkactl attempts to find it by "context key" (context name+auth method type or certificate fingerprint). If credentials was not found user will be prompted for them. Credentials may be reset with commandline flag. Any thoughs?

textnote is a command line tool for quickly creating and managing daily plain text notes.

textnote is a command line tool for quickly creating and managing daily plain text notes. It is designed for ease of use to encourage the practice of daily, organized note taking. textnote intentionally facilitates only the management (creation, opening, organizing, and consolidated archiving) of notes, following the philosophy that notes are best written in a text editor and not via a CLI.

Jan 2, 2023
CraftTalk Command Line Tool helps with managing CraftTalk releases on baremetal instances

ctcli - CraftTalk Command Line Tool Commands help Shows help version Shows version init Initializes specified root directory as a ctcli dir. ctcli --r

Jan 20, 2022
A twitch focused command line tool for producing, archiving and managing live stream content. Built for Linux.

twinx is a live-streaming command line tool for Linux. It connects streaming services (like Twitch, OBS and YouTube) together via a common title and description.

Oct 17, 2022
MyApps is a universal command line tool for managing manually installed applications.
MyApps is a universal command line tool for managing manually installed applications.

MyApps MyApps is a universal command line tool for managing manually installed applications. Disclaimer I wrote this tool over two long nights while p

Jul 15, 2022
e2d is a command-line tool for deploying and managing etcd clusters, both in the cloud or on bare-metal

e2d is a command-line tool for deploying and managing etcd clusters, both in the cloud or on bare-metal. It also includes e2db, an ORM-like abstraction for working with etcd.

Aug 23, 2022
An open-source GitLab command line tool bringing GitLab's cool features to your command line
An open-source GitLab command line tool bringing GitLab's cool features to your command line

GLab is an open source GitLab CLI tool bringing GitLab to your terminal next to where you are already working with git and your code without switching

Dec 30, 2022
A command line tool to prompt for a value to be included in another command line.

readval is a command line tool which is designed for one specific purpose—to prompt for a value to be included in another command line. readval prints

Dec 22, 2021
A client for managing authzed or any API-compatible system from your command line.

zed A client for managing authzed or any API-compatible system from your command line. Installation zed is currently packaged by as a head-only Homebr

Dec 31, 2022
git-xargs is a command-line tool (CLI) for making updates across multiple Github repositories with a single command.
git-xargs is a command-line tool (CLI) for making updates across multiple Github repositories with a single command.

Table of contents Introduction Reference Contributing Introduction Overview git-xargs is a command-line tool (CLI) for making updates across multiple

Dec 31, 2022
git-xargs is a command-line tool (CLI) for making updates across multiple GitHub repositories with a single command
git-xargs is a command-line tool (CLI) for making updates across multiple GitHub repositories with a single command

git-xargs is a command-line tool (CLI) for making updates across multiple GitHub repositories with a single command. You give git-xargs:

Feb 5, 2022
A command line tool for simplified docker volume command built with go

dockervol A command line tool for simplified docker volume command built with go. Features: Remove anonymous volume (beta) Remove volume by matching n

Dec 18, 2021
fofax is a fofa query tool written in go, positioned as a command-line tool and characterized by simplicity and speed.
fofax is a fofa query tool written in go, positioned as a command-line tool and characterized by simplicity and speed.

fofaX 0x00 Introduction fofax is a fofa query tool written in go, positioned as

Jan 8, 2023
Go package to make lightweight ASCII line graph ╭┈╯ in command line apps with no other dependencies.
Go package to make lightweight ASCII line graph ╭┈╯ in command line apps with no other dependencies.

asciigraph Go package to make lightweight ASCII line graphs ╭┈╯. Installation go get github.com/guptarohit/asciigraph Usage Basic graph package main

Jan 8, 2023
Package command provide simple API to create modern command-line interface

Package command Package command provide simple API to create modern command-line interface, mainly for lightweight usage, inspired by cobra Usage pack

Jan 16, 2022
Watcher - A simple command line app to watch files in a directory for changes and run a command when files change!

Watcher - Develop your programs easily Watcher watches all the files present in the directory it is run from of the directory that is specified while

Mar 27, 2022
Arduino command line tool
Arduino command line tool

arduino-cli Arduino CLI is an all-in-one solution that provides builder, Boards/Library Manager, uploader, discovery and many other tools needed to us

Jan 7, 2023
Command line tool for Google Cloud Datastore, written in Go
Command line tool for Google Cloud Datastore, written in Go

dsio dsio is a command line tool for Google Cloud Datastore. This tool is under development. Please use in your own risk. Features Bulk upsert entitie

Feb 8, 2022
A simple command line tool using which you can skip phone number based SMS verification by using a temporary phone number that acts like a proxy.
A simple command line tool using which you can skip phone number based SMS verification by using a temporary phone number that acts like a proxy.

Fake-SMS A simple command line tool using which you can skip phone number based SMS verification by using a temporary phone number that acts like a pr

Dec 31, 2022
pgCenter is a command-line admin tool for observing and troubleshooting Postgres.
pgCenter is a command-line admin tool for observing and troubleshooting Postgres.

Command-line admin tool for observing and troubleshooting Postgres.

Dec 29, 2022