Skip to content
Maintained by AxonOps — production-grade documentation from engineers who operate distributed databases at scale Get Cassandra Help Get Kafka Help

Kafka Cluster Management

Operational procedures for managing Apache Kafka clusters.


Cluster Operations Overview

uml diagram


Adding Brokers

Pre-Addition Checklist

  • [ ] New broker has same Kafka version
  • [ ] Network connectivity to all existing brokers
  • [ ] Sufficient disk space
  • [ ] Proper configuration (broker.id, listeners, etc.)

Addition Process

  1. Configure the new broker
# server.properties
broker.id=4  # Unique ID
node.id=4    # KRaft mode

# Controller connection
controller.quorum.voters=1@ctrl1:9093,2@ctrl2:9093,3@ctrl3:9093

# Listeners
listeners=PLAINTEXT://0.0.0.0:9092
advertised.listeners=PLAINTEXT://broker4:9092

# Log directories
log.dirs=/var/kafka-logs

# Rack (if applicable)
broker.rack=rack-2
  1. Start the broker
# Format storage (KRaft mode, first time only)
kafka-storage.sh format -t <cluster-id> -c /etc/kafka/server.properties

# Start broker
kafka-server-start.sh /etc/kafka/server.properties
  1. Verify registration
# Check broker is registered
kafka-broker-api-versions.sh --bootstrap-server broker4:9092

# Check cluster membership
kafka-metadata.sh --snapshot /var/kafka-logs/__cluster_metadata-0/*.log \
  --command "brokers"
  1. Reassign partitions to new broker
# Generate reassignment plan including new broker
cat > topics.json << 'EOF'
{
  "topics": [{"topic": "topic1"}, {"topic": "topic2"}],
  "version": 1
}
EOF

kafka-reassign-partitions.sh --bootstrap-server kafka:9092 \
  --topics-to-move-json-file topics.json \
  --broker-list "1,2,3,4" \
  --generate

# Execute reassignment
kafka-reassign-partitions.sh --bootstrap-server kafka:9092 \
  --reassignment-json-file reassignment.json \
  --throttle 100000000 \
  --execute

Removing Brokers

Pre-Removal Checklist

  • [ ] All partitions have replicas on other brokers
  • [ ] Reassignment completed successfully
  • [ ] No under-replicated partitions

Decommission Process

  1. Move all partitions off the broker
# Generate plan without the broker to remove (broker 4)
kafka-reassign-partitions.sh --bootstrap-server kafka:9092 \
  --topics-to-move-json-file topics.json \
  --broker-list "1,2,3" \
  --generate

# Execute
kafka-reassign-partitions.sh --bootstrap-server kafka:9092 \
  --reassignment-json-file reassignment.json \
  --throttle 100000000 \
  --execute

# Verify completion
kafka-reassign-partitions.sh --bootstrap-server kafka:9092 \
  --reassignment-json-file reassignment.json \
  --verify
  1. Verify no partitions remain
# Check broker has no partitions
kafka-log-dirs.sh --bootstrap-server kafka:9092 \
  --describe --broker-list 4
  1. Graceful shutdown
# Stop broker gracefully (controlled shutdown)
kafka-server-stop.sh

# Or
kill -TERM <kafka-pid>
  1. Verify removal
# Check broker is no longer registered
kafka-metadata.sh --snapshot /var/kafka-logs/__cluster_metadata-0/*.log \
  --command "brokers"

Rolling Upgrades

Upgrade Process

uml diagram

Step-by-Step

  1. Prepare for upgrade
# Before upgrade, set in server.properties
inter.broker.protocol.version=3.6
log.message.format.version=3.6

# Note: in Kafka 4.x, inter.broker.protocol.version and log.message.format.version are removed.
  1. Upgrade each broker
#!/bin/bash
# upgrade-broker.sh

BROKER=$1

echo "Upgrading $BROKER..."

# Stop broker
ssh $BROKER "sudo systemctl stop kafka"

# Install new version
ssh $BROKER "sudo yum install kafka-3.7.0"  # Or appropriate command

# Start broker
ssh $BROKER "sudo systemctl start kafka"

# Wait for recovery
sleep 30

# Verify health
kafka-broker-api-versions.sh --bootstrap-server $BROKER:9092

# Check under-replicated partitions
kafka-topics.sh --bootstrap-server $BROKER:9092 \
  --describe --under-replicated-partitions

echo "Waiting for ISR to recover..."
while true; do
  URP=$(kafka-topics.sh --bootstrap-server $BROKER:9092 \
    --describe --under-replicated-partitions 2>/dev/null | wc -l)
  if [ "$URP" -eq 0 ]; then
    echo "ISR recovered"
    break
  fi
  sleep 10
done
  1. After all brokers upgraded
# Update protocol version
inter.broker.protocol.version=3.7
log.message.format.version=3.7
  1. Rolling restart to apply
for broker in broker1 broker2 broker3; do
  ./upgrade-broker.sh $broker
done

Partition Reassignment

Generate Reassignment Plan

# Create topics file
cat > topics.json << 'EOF'
{
  "topics": [
    {"topic": "orders"},
    {"topic": "events"}
  ],
  "version": 1
}
EOF

# Generate plan
kafka-reassign-partitions.sh --bootstrap-server kafka:9092 \
  --topics-to-move-json-file topics.json \
  --broker-list "1,2,3,4" \
  --generate

Custom Reassignment

{
  "version": 1,
  "partitions": [
    {"topic": "orders", "partition": 0, "replicas": [1,2,3]},
    {"topic": "orders", "partition": 1, "replicas": [2,3,4]},
    {"topic": "orders", "partition": 2, "replicas": [3,4,1]}
  ]
}

Execute with Throttling

# Start reassignment with throttle (100 MB/s)
kafka-reassign-partitions.sh --bootstrap-server kafka:9092 \
  --reassignment-json-file reassignment.json \
  --throttle 100000000 \
  --execute

# Monitor progress
kafka-reassign-partitions.sh --bootstrap-server kafka:9092 \
  --reassignment-json-file reassignment.json \
  --verify

# Adjust throttle if needed
kafka-reassign-partitions.sh --bootstrap-server kafka:9092 \
  --reassignment-json-file reassignment.json \
  --throttle 200000000 \
  --execute

# Remove throttle after completion
kafka-reassign-partitions.sh --bootstrap-server kafka:9092 \
  --reassignment-json-file reassignment.json \
  --verify

Leader Election

Preferred Leader Election

# All partitions
kafka-leader-election.sh --bootstrap-server kafka:9092 \
  --election-type preferred \
  --all-topic-partitions

# Specific topic
kafka-leader-election.sh --bootstrap-server kafka:9092 \
  --election-type preferred \
  --topic my-topic

# Specific partition
kafka-leader-election.sh --bootstrap-server kafka:9092 \
  --election-type preferred \
  --topic my-topic \
  --partition 0

Unclean Leader Election

Data Loss Risk

Unclean leader election allows out-of-sync replicas to become leader, potentially causing data loss.

kafka-leader-election.sh --bootstrap-server kafka:9092 \
  --election-type unclean \
  --topic my-topic \
  --partition 0

Increasing Partitions

# Increase partition count (cannot decrease)
kafka-topics.sh --bootstrap-server kafka:9092 \
  --alter \
  --topic my-topic \
  --partitions 24

Key-Based Ordering

Increasing partitions changes key-to-partition mapping. Existing keys may route to different partitions.


Topic Management

Create Topic

kafka-topics.sh --bootstrap-server kafka:9092 \
  --create \
  --topic new-topic \
  --partitions 12 \
  --replication-factor 3 \
  --config retention.ms=604800000 \
  --config cleanup.policy=delete

Delete Topic

# Delete topic (data is permanently removed)
kafka-topics.sh --bootstrap-server kafka:9092 \
  --delete \
  --topic old-topic

Modify Configuration

# Add/update config
kafka-configs.sh --bootstrap-server kafka:9092 \
  --entity-type topics \
  --entity-name my-topic \
  --alter \
  --add-config retention.ms=86400000

# Remove config (revert to default)
kafka-configs.sh --bootstrap-server kafka:9092 \
  --entity-type topics \
  --entity-name my-topic \
  --alter \
  --delete-config retention.ms

Health Verification

Post-Operation Checks

#!/bin/bash
# verify-cluster-health.sh

BOOTSTRAP="kafka:9092"

echo "=== Cluster Health Check ==="

# Check broker connectivity
echo "Checking broker connectivity..."
kafka-broker-api-versions.sh --bootstrap-server $BOOTSTRAP > /dev/null 2>&1
if [ $? -eq 0 ]; then
  echo "✓ Broker connectivity OK"
else
  echo "✗ Broker connectivity FAILED"
  exit 1
fi

# Check offline partitions
OFFLINE=$(kafka-topics.sh --bootstrap-server $BOOTSTRAP \
  --describe --unavailable-partitions 2>/dev/null | grep -c "Topic:")
echo "Offline partitions: $OFFLINE"
if [ "$OFFLINE" -gt 0 ]; then
  echo "✗ CRITICAL: Offline partitions detected"
  exit 2
fi

# Check under-replicated partitions
URP=$(kafka-topics.sh --bootstrap-server $BOOTSTRAP \
  --describe --under-replicated-partitions 2>/dev/null | grep -c "Topic:")
echo "Under-replicated partitions: $URP"
if [ "$URP" -gt 0 ]; then
  echo "⚠ WARNING: Under-replicated partitions"
fi

echo "=== Health Check Complete ==="