Skip to content
Maintained by AxonOps — production-grade documentation from engineers who operate distributed databases at scale

Kafka CLI Tools

Command-line tools for Apache Kafka administration and operations.


Tool Overview

Tool Purpose
kafka-topics.sh Topic management
kafka-consumer-groups.sh Consumer group management
kafka-configs.sh Configuration management
kafka-acls.sh ACL management
kafka-reassign-partitions.sh Partition reassignment
kafka-leader-election.sh Leader election
kafka-metadata.sh KRaft metadata inspection
kafka-dump-log.sh Log segment inspection

kafka-topics.sh

List Topics

# List all topics
kafka-topics.sh --bootstrap-server kafka:9092 --list

# List with details
kafka-topics.sh --bootstrap-server kafka:9092 --describe

Create Topic

kafka-topics.sh --bootstrap-server kafka:9092 \
  --create \
  --topic my-topic \
  --partitions 12 \
  --replication-factor 3

# With configuration
kafka-topics.sh --bootstrap-server kafka:9092 \
  --create \
  --topic my-topic \
  --partitions 12 \
  --replication-factor 3 \
  --config retention.ms=604800000 \
  --config cleanup.policy=compact

Describe Topic

# Single topic
kafka-topics.sh --bootstrap-server kafka:9092 \
  --describe --topic my-topic

# Show under-replicated partitions
kafka-topics.sh --bootstrap-server kafka:9092 \
  --describe --under-replicated-partitions

# Show unavailable partitions
kafka-topics.sh --bootstrap-server kafka:9092 \
  --describe --unavailable-partitions

Alter Topic

# Increase partitions (cannot decrease)
kafka-topics.sh --bootstrap-server kafka:9092 \
  --alter --topic my-topic --partitions 24

Delete Topic

kafka-topics.sh --bootstrap-server kafka:9092 \
  --delete --topic my-topic

kafka-consumer-groups.sh

List Consumer Groups

kafka-consumer-groups.sh --bootstrap-server kafka:9092 --list

# With state
kafka-consumer-groups.sh --bootstrap-server kafka:9092 \
  --list --state

Describe Consumer Group

# Show members and offsets
kafka-consumer-groups.sh --bootstrap-server kafka:9092 \
  --describe --group my-group

# Show only members
kafka-consumer-groups.sh --bootstrap-server kafka:9092 \
  --describe --group my-group --members

# Show verbose member details
kafka-consumer-groups.sh --bootstrap-server kafka:9092 \
  --describe --group my-group --members --verbose

# Show state
kafka-consumer-groups.sh --bootstrap-server kafka:9092 \
  --describe --group my-group --state

Reset Offsets

# Reset to earliest (dry run)
kafka-consumer-groups.sh --bootstrap-server kafka:9092 \
  --group my-group \
  --reset-offsets \
  --to-earliest \
  --topic my-topic \
  --dry-run

# Reset to earliest (execute)
kafka-consumer-groups.sh --bootstrap-server kafka:9092 \
  --group my-group \
  --reset-offsets \
  --to-earliest \
  --topic my-topic \
  --execute

# Reset to specific offset
kafka-consumer-groups.sh --bootstrap-server kafka:9092 \
  --group my-group \
  --reset-offsets \
  --to-offset 1000 \
  --topic my-topic:0 \
  --execute

# Reset to timestamp
kafka-consumer-groups.sh --bootstrap-server kafka:9092 \
  --group my-group \
  --reset-offsets \
  --to-datetime "2024-01-15T10:00:00.000" \
  --all-topics \
  --execute

# Reset by shifting
kafka-consumer-groups.sh --bootstrap-server kafka:9092 \
  --group my-group \
  --reset-offsets \
  --shift-by -100 \
  --topic my-topic \
  --execute

Delete Consumer Group

# Group must be empty (no active members)
kafka-consumer-groups.sh --bootstrap-server kafka:9092 \
  --delete --group my-group

kafka-configs.sh

Describe Configuration

# Broker config
kafka-configs.sh --bootstrap-server kafka:9092 \
  --entity-type brokers --entity-name 1 --describe

# All brokers
kafka-configs.sh --bootstrap-server kafka:9092 \
  --entity-type brokers --describe --all

# Topic config
kafka-configs.sh --bootstrap-server kafka:9092 \
  --entity-type topics --entity-name my-topic --describe

# Client quotas
kafka-configs.sh --bootstrap-server kafka:9092 \
  --entity-type users --entity-name my-user --describe

Alter Configuration

# Add/update broker config
kafka-configs.sh --bootstrap-server kafka:9092 \
  --entity-type brokers --entity-name 1 \
  --alter --add-config log.cleaner.threads=4

# Add/update topic config
kafka-configs.sh --bootstrap-server kafka:9092 \
  --entity-type topics --entity-name my-topic \
  --alter --add-config retention.ms=86400000

# Delete config (revert to default)
kafka-configs.sh --bootstrap-server kafka:9092 \
  --entity-type topics --entity-name my-topic \
  --alter --delete-config retention.ms

Client Quotas

# Set producer quota
kafka-configs.sh --bootstrap-server kafka:9092 \
  --entity-type users --entity-name my-user \
  --alter --add-config producer_byte_rate=1048576

# Set consumer quota
kafka-configs.sh --bootstrap-server kafka:9092 \
  --entity-type users --entity-name my-user \
  --alter --add-config consumer_byte_rate=2097152

kafka-acls.sh

List ACLs

# All ACLs
kafka-acls.sh --bootstrap-server kafka:9092 --list

# For specific topic
kafka-acls.sh --bootstrap-server kafka:9092 \
  --list --topic my-topic

# For specific principal
kafka-acls.sh --bootstrap-server kafka:9092 \
  --list --principal User:my-user

Add ACLs

# Producer access
kafka-acls.sh --bootstrap-server kafka:9092 \
  --add \
  --allow-principal User:producer-app \
  --operation Write \
  --operation Describe \
  --topic my-topic

# Consumer access
kafka-acls.sh --bootstrap-server kafka:9092 \
  --add \
  --allow-principal User:consumer-app \
  --operation Read \
  --operation Describe \
  --topic my-topic \
  --group my-group

# Wildcard topic access
kafka-acls.sh --bootstrap-server kafka:9092 \
  --add \
  --allow-principal User:admin \
  --operation All \
  --topic '*'

Remove ACLs

kafka-acls.sh --bootstrap-server kafka:9092 \
  --remove \
  --allow-principal User:producer-app \
  --operation Write \
  --topic my-topic

kafka-reassign-partitions.sh

Generate Reassignment Plan

# Create topics JSON file
cat > topics.json << 'EOF'
{
  "topics": [
    {"topic": "my-topic"}
  ],
  "version": 1
}
EOF

# Generate plan
kafka-reassign-partitions.sh --bootstrap-server kafka:9092 \
  --topics-to-move-json-file topics.json \
  --broker-list "1,2,3,4" \
  --generate

Execute Reassignment

# Save generated plan to file, then execute
kafka-reassign-partitions.sh --bootstrap-server kafka:9092 \
  --reassignment-json-file reassignment.json \
  --execute

# With throttle
kafka-reassign-partitions.sh --bootstrap-server kafka:9092 \
  --reassignment-json-file reassignment.json \
  --throttle 50000000 \
  --execute

Verify Reassignment

kafka-reassign-partitions.sh --bootstrap-server kafka:9092 \
  --reassignment-json-file reassignment.json \
  --verify

kafka-leader-election.sh

# Preferred leader election for all partitions
kafka-leader-election.sh --bootstrap-server kafka:9092 \
  --election-type preferred \
  --all-topic-partitions

# For specific topic
kafka-leader-election.sh --bootstrap-server kafka:9092 \
  --election-type preferred \
  --topic my-topic

# Unclean election (data loss risk)
kafka-leader-election.sh --bootstrap-server kafka:9092 \
  --election-type unclean \
  --topic my-topic \
  --partition 0

kafka-metadata.sh (KRaft)

# Describe cluster
kafka-metadata.sh --snapshot /var/kafka-logs/__cluster_metadata-0/*.log \
  --command "describe"

# List brokers
kafka-metadata.sh --snapshot /var/kafka-logs/__cluster_metadata-0/*.log \
  --command "brokers"

# Show topic details
kafka-metadata.sh --snapshot /var/kafka-logs/__cluster_metadata-0/*.log \
  --command "topic" --topic-name my-topic

kafka-dump-log.sh

# Dump log segment
kafka-dump-log.sh --files /var/kafka-logs/my-topic-0/00000000000000000000.log \
  --print-data-log

# Dump index
kafka-dump-log.sh --files /var/kafka-logs/my-topic-0/00000000000000000000.index

# Verify indexes
kafka-dump-log.sh --files /var/kafka-logs/my-topic-0/00000000000000000000.log \
  --index-sanity-check

Console Producer/Consumer

Console Producer

kafka-console-producer.sh --bootstrap-server kafka:9092 \
  --topic my-topic

# With key
kafka-console-producer.sh --bootstrap-server kafka:9092 \
  --topic my-topic \
  --property "parse.key=true" \
  --property "key.separator=:"

Console Consumer

# From beginning
kafka-console-consumer.sh --bootstrap-server kafka:9092 \
  --topic my-topic \
  --from-beginning

# With keys
kafka-console-consumer.sh --bootstrap-server kafka:9092 \
  --topic my-topic \
  --property print.key=true \
  --property print.timestamp=true

# Specific partition and offset
kafka-console-consumer.sh --bootstrap-server kafka:9092 \
  --topic my-topic \
  --partition 0 \
  --offset 100

# Max messages
kafka-console-consumer.sh --bootstrap-server kafka:9092 \
  --topic my-topic \
  --max-messages 10

kafka-share-groups.sh

Share groups (KIP-932) allow multiple consumers to share partition consumption.

List Share Groups

kafka-share-groups.sh --bootstrap-server kafka:9092 --list

Describe Share Group

# Show current state
kafka-share-groups.sh --bootstrap-server kafka:9092 \
  --describe --group my-share-group

# Show members
kafka-share-groups.sh --bootstrap-server kafka:9092 \
  --describe --group my-share-group --members

# Show state summary
kafka-share-groups.sh --bootstrap-server kafka:9092 \
  --describe --group my-share-group --state

Reset Share Group Offsets

# Reset to latest (dry run)
kafka-share-groups.sh --bootstrap-server kafka:9092 \
  --reset-offsets --group my-share-group \
  --topic my-topic --to-latest --dry-run

# Reset to latest (execute)
kafka-share-groups.sh --bootstrap-server kafka:9092 \
  --reset-offsets --group my-share-group \
  --topic my-topic --to-latest --execute

Delete Share Group

kafka-share-groups.sh --bootstrap-server kafka:9092 \
  --delete --group my-share-group

kafka-groups.sh

List all group types (consumer, share, streams).

# List all groups with type
kafka-groups.sh --bootstrap-server kafka:9092 --list

# Output shows group type
# GROUP                    TYPE                     PROTOCOL
# my-consumer-group        Consumer                 consumer
# my-share-group           Share                    share

Partition Reassignment with Throttling

Limit bandwidth during data migration to reduce impact on production traffic.

Execute with Throttle

# Throttle inter-broker replication to 50 MB/s
kafka-reassign-partitions.sh --bootstrap-server kafka:9092 \
  --reassignment-json-file reassignment.json \
  --throttle 50000000 \
  --execute

# Also throttle disk-to-disk moves
kafka-reassign-partitions.sh --bootstrap-server kafka:9092 \
  --reassignment-json-file reassignment.json \
  --throttle 50000000 \
  --replica-alter-log-dirs-throttle 100000000 \
  --execute

Adjust Throttle During Migration

# Increase throttle if migration is too slow
kafka-reassign-partitions.sh --bootstrap-server kafka:9092 \
  --reassignment-json-file reassignment.json \
  --throttle 100000000 \
  --additional \
  --execute

Verify and Remove Throttle

# Verify completion - this also removes throttle
kafka-reassign-partitions.sh --bootstrap-server kafka:9092 \
  --reassignment-json-file reassignment.json \
  --verify

Throttle Removal

Always run --verify after reassignment completes. Failing to remove the throttle can limit normal replication traffic.

View Current Throttle Settings

# View broker throttle
kafka-configs.sh --bootstrap-server kafka:9092 \
  --entity-type brokers --describe

# View topic throttle
kafka-configs.sh --bootstrap-server kafka:9092 \
  --entity-type topics --entity-name my-topic --describe

Increasing Replication Factor

Create Reassignment Plan

cat > increase-rf.json << 'EOF'
{
  "version": 1,
  "partitions": [
    {"topic": "my-topic", "partition": 0, "replicas": [1, 2, 3]}
  ]
}
EOF

Execute and Verify

# Execute
kafka-reassign-partitions.sh --bootstrap-server kafka:9092 \
  --reassignment-json-file increase-rf.json \
  --execute

# Verify
kafka-reassign-partitions.sh --bootstrap-server kafka:9092 \
  --reassignment-json-file increase-rf.json \
  --verify

# Confirm new replication factor
kafka-topics.sh --bootstrap-server kafka:9092 \
  --topic my-topic --describe

Quota Management

Set User Quotas

# Set producer and consumer byte rate for user
kafka-configs.sh --bootstrap-server kafka:9092 \
  --entity-type users --entity-name my-user \
  --alter --add-config 'producer_byte_rate=1048576,consumer_byte_rate=2097152'

# Set request rate quota (percentage of broker capacity)
kafka-configs.sh --bootstrap-server kafka:9092 \
  --entity-type users --entity-name my-user \
  --alter --add-config 'request_percentage=50'

Set Client-ID Quotas

kafka-configs.sh --bootstrap-server kafka:9092 \
  --entity-type clients --entity-name my-client \
  --alter --add-config 'producer_byte_rate=1048576,consumer_byte_rate=2097152'

Set Combined User+Client Quotas

kafka-configs.sh --bootstrap-server kafka:9092 \
  --entity-type users --entity-name my-user \
  --entity-type clients --entity-name my-client \
  --alter --add-config 'producer_byte_rate=1048576'

Set Default Quotas

# Default for all users
kafka-configs.sh --bootstrap-server kafka:9092 \
  --entity-type users --entity-default \
  --alter --add-config 'producer_byte_rate=1048576'

# Default for all clients
kafka-configs.sh --bootstrap-server kafka:9092 \
  --entity-type clients --entity-default \
  --alter --add-config 'producer_byte_rate=1048576'

Describe Quotas

# Describe user quota
kafka-configs.sh --bootstrap-server kafka:9092 \
  --entity-type users --entity-name my-user --describe

# Describe all user quotas
kafka-configs.sh --bootstrap-server kafka:9092 \
  --entity-type users --describe

# Describe combined user+client quotas
kafka-configs.sh --bootstrap-server kafka:9092 \
  --entity-type users --entity-type clients --describe

Cluster Operations

Graceful Shutdown

Enable controlled shutdown for graceful broker restarts:

# server.properties
controlled.shutdown.enable=true

With controlled shutdown enabled, the broker:

  1. Syncs all logs to disk (avoiding recovery on restart)
  2. Migrates leadership to other replicas before stopping
  3. Minimizes partition unavailability to milliseconds

Preferred Leader Election

# Trigger preferred leader election for all partitions
kafka-leader-election.sh --bootstrap-server kafka:9092 \
  --election-type preferred \
  --all-topic-partitions

# Auto-rebalance can also be enabled in broker config
# auto.leader.rebalance.enable=true

Rack Awareness

Configure brokers for rack-aware replica placement:

# server.properties on each broker
broker.rack=rack-1

When rack awareness is configured:

  • Replicas of each partition spread across different racks
  • Partition spans min(#racks, replication-factor) racks
  • Survives rack-level failures

Broker API Versions

Check supported API versions for compatibility:

kafka-broker-api-versions.sh --bootstrap-server kafka:9092

Log Verification

Verify Log Integrity

# Verify indexes
kafka-dump-log.sh --files /var/kafka-logs/my-topic-0/00000000000000000000.log \
  --index-sanity-check

# Dump transaction state
kafka-dump-log.sh --files /var/kafka-logs/__transaction_state-0/00000000000000000000.log \
  --print-data-log

Offset Explorer

# Get offsets for all partitions
kafka-get-offsets.sh --bootstrap-server kafka:9092 \
  --topic my-topic

# Get earliest and latest offsets
kafka-get-offsets.sh --bootstrap-server kafka:9092 \
  --topic my-topic --time -2  # earliest

kafka-get-offsets.sh --bootstrap-server kafka:9092 \
  --topic my-topic --time -1  # latest