Kafka - CLI and GUI tools

Connecting to Kafka for administration

kafka shell

(GUI access tools are an alternative to using the shell)

Once Zookeeper and Kafka containers are running, you can execute the following Terminal command to start a Kafka shell:

Just replace kafka with the value of container_name or container_id, if you’ve decided to name it differently in the docker-compose.yml file.

kafka-topics.sh

$ docker ls
$ docker exec -it <container-name or container-id> bash
root@ba7f577efce3:/learning# kafka-topics.sh
root@ba7f577efce3:/learning# kafka-topics.sh --bootstrap-server localhost:9092 --topic hello-world --create

All Kafka shell scripts are located in `/opt/kafka_<version>/bin` If you are not running kafka in docker, but have it downloaded to your local,

cd /opt/kafka_2.13-2.8.1/bin
  • Creating topics

    Here’s the command to create a Kafka topic:

    (If you are using docker without zookeeper server setup)
    kafka-topics.sh --bootstrap-server localhost:9092 --topic hello-world --create
    
    (If you are using zookeeper)
    kafka-topics.sh --create --zookeeper zookeeper:2181 --replication-factor 1 --partitions 1 --topic first_kafka_topic
    
    # create topic with default partitons
    kafka-topics.sh --bootstrap-server localhost:9092 --topic order-events
    
    # create topic with partitons
    kafka-topics.sh --bootstrap-server localhost:9092 --topic order-events --create --partitions 2
    
    # create topic with replicaiton factor
    kafka-topics.sh --bootstrap-server localhost:9092 --topic order-events --create --replication-factor 3
    
    # alter the number of partitions of an existing topic
    kafka-topics.sh --bootstrap-server xx.xx.xx.xx:9092 --alter --topic order-events --partitions 3
    kafka-topics.sh --bootstrap-server xx.xx.xx.xx:9092 --describe --topic order-events
    
    # check the number of message in a topic
    kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list 20.150.137.17:39092 --topic order-events
    
  • List topics

    kafka-topics.sh --bootstrap-server localhost:9092 --list
    
    kafka-topics.sh --list --zookeeper zookeeper:2181
    
    # create a kafka topic called hello-world
    # we assume that directory which contains 'kafka-topics.sh' is included in the PATH
    kafka-topics.sh --bootstrap-server localhost:9092 --topic hello-world --create
    
    # list all topics
    kafka-topics.sh --bootstrap-server localhost:9092 --list
    
  • Describe an existing topic

    kafka-topics.sh --bootstrap-server localhost:9092 --describe --topic hello-world
    Topic: hello-world	TopicId: 41eEX-5tSjWyBVjKGn7_6A	PartitionCount: 1	ReplicationFactor: 1	Configs:
            Topic: hello-world	Partition: 0	Leader: 1	Replicas: 1	Isr: 1
    
    ./kafka-topics.sh --bootstrap-server localhost:9092 --describe --topic usage-detail
    
    kafka-topics.sh --bootstrap-server localhost:9092 --topic hello-world --describe
    
  • Delete an existing topic

    kafka-topics.sh --bootstrap-server localhost:9092 --delete --topic hello-world
    
    kafka-topic.sh --bootstrap-server localhost:9092 --topic hello-world --delete
    

kafka-console-producer.sh

To lauch the producer window:

# to produce messages
kafka-console-producer.sh --bootstrap-server localhost:9092 --topic hello-world

root@ba7f577efce3:/kafka/bin# ./kafka-console-producer.sh
--batch-size <Integer: size>             Number of messages to send in a single
                                           batch if they are not being sent
                                           synchronously. please note that this
                                           option will be replaced if max-
                                           partition-memory-bytes is also set
                                           (default: 16384)
--timeout <Long: timeout_ms>             If set and the producer is running in
                                           asynchronous mode, this gives the
                                           maximum amount of time a message
                                           will queue awaiting sufficient batch
                                           size. The value is given in ms. This
                                           is the option to control `linger.ms`
                                           in producer configs. (default: 1000)

# For immediate delivery, set the values for these two parameters to minimal values.

# linger.ms
# default value for timeout is 1 second - meaning, all the messages that are meant to be written to the topic in that one second are batched together by the producer.
# change that to 100 milli second - the messages will be written to the topics quicker than they used to before
kafka-console-producer.sh --bootstrap-server localhost:9092 --topic hello-world --timeout 100
kafka-console-producer.sh --bootstrap-server localhost:9092 --topic hello-world --batch-size 10000



./kafka-console-producer.sh --bootstrap-server localhost:9092 --topic usage-detail

kafka-console-consumer.sh

To lauch the consumer window and monitor it (launch this in a sepate terminal from the producer. We should see the messeges that are being produced and consumed in real-time)

# to consume messages
# default behavior is, it will only consume new messages
kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic hello-world

# to consume from beginning
kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic hello-world --from-beginning
# Remember, no other consumer in the consumer-group should've consumed the messages already. If some other consumer already consumed the messages, this consumer will not read messages even though we use the --from-beginning option.

./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic usage-detail --from-beginning
# to print offset, time etc
kafka-console-consumer.sh \
    --bootstrap-server localhost:9092 \
    --topic hello-world \
    --property print.offset=true \
    --property print.timestamp=true

consumer groups

# create console producer
kafka-console-producer.sh \
    --bootstrap-server localhost:9092 \
    --topic hello-world \
    --property key.separator=: \
    --property parse.key=true

# create console consumer with a group
kafka-console-consumer.sh \
    --bootstrap-server localhost:9092 \
    --topic hello-world \
    --property print.offset=true \
    --property print.key=true \
    --group name

# list all the consumer groups
 kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list

 # describe a consumer group
 kafka-consumer-groups.sh \
    --bootstrap-server localhost:9092 \
    --group my-cg1 \
    --describe

 # describe consumer groups for a given topic
kafka-run-class.sh kafka.admin.ConsumerGroupCommand \
    --all-groups \
    --bootstrap-server 20.150.137.17:39092 \
    --describe | grep mytopic1

reset offset

# stop the consumers before you enter this command

# dry-run
 kafka-consumer-groups.sh \
    --bootstrap-server localhost:9092 \
    --group cg \
    --topic hello-world \
    --reset-offsets \
    --shift-by -3 \
    --dry-run

# reset offset by shifting the offset
 kafka-consumer-groups.sh \
    --bootstrap-server localhost:9092 \
    --group cg \
    --topic hello-world \
    --reset-offsets \
    --shift-by -3 \
    --execute

# reset by duration
kafka-consumer-groups.sh \
    --bootstrap-server localhost:9092 \
    --topic hello-world \
    --group cg \
    --reset-offsets \
    --by-duration PT5M \
    --execute

# -- to the beginning
kafka-consumer-groups.sh \
    --bootstrap-server localhost:9092 \
    --topic hello-world \
    --group cg \
    --reset-offsets \
    --to-earliest \
    --execute

# -- to the end
kafka-consumer-groups.sh \
    --bootstrap-server localhost:9092 \
    --topic hello-world \
    --group cg \
    --reset-offsets \
    --to-latest \
    --execute

# -- to date-time
kafka-consumer-groups.sh \
    --bootstrap-server localhost:9092 \
    --topic hello-world \
    --group cg \
    --reset-offsets \
    --to-datetime 2023-01-01T01:00:00.000 \
    --execute

To test transactions

kafka-topics.sh --bootstrap-server localhost:9092 --topic transfer-requests --create

kafka-topics.sh --bootstrap-server localhost:9092 --topic transaction-events --create

kafka-console-producer.sh \
    --bootstrap-server localhost:9092 \
    --topic transfer-requests \
    --property key.separator=: \
    --property parse.key=true

// Read the messages that are committed after successful acknowledging of the message
kafka-console-consumer.sh \
    --bootstrap-server localhost:9092 \
    --topic transaction-events \
    --property print.key=true \
    --isolation-level=read_committed \
    --from-beginning

// Read all messages regardless of whether they are are committed after successful acknowledging of the message or not
kafka-console-consumer.sh \
    --bootstrap-server localhost:9092 \
    --topic transaction-events \
    --property print.key=true \
    --from-beginning

GUI Tools

  1. Use this tool for GUI views of local kafka cluster: https://github.com/redpanda-data/console

    Command to run it:

    docker run --network=host -p 8080:8080 -e KAFKA_BROKERS=localhost:9092 docker.redpanda.com/vectorized/console:latest
    

    It runs at port 8080

    And use the browser: http://localhost:8080/topics

  2. GUI tool to explore kafka setup: https://www.kafkamagic.com/ (never used this myself - don’t know how this works)

  3. There are more tools listed here: https://dev.to/dariusx/recommend-a-simple-kafka-ui-tool-5gob


Links to this note