Skip to content
Maintained by AxonOps — production-grade documentation from engineers who operate distributed databases at scale

Kafka Authentication

Authentication mechanisms for verifying client and broker identities in Apache Kafka.


Authentication Mechanisms

Mechanism Description Use Case
SASL/PLAIN Username/password Development, simple setups
SASL/SCRAM Salted challenge-response Production without Kerberos
SASL/GSSAPI Kerberos Enterprise with KDC
SASL/OAUTHBEARER OAuth 2.0 tokens Cloud-native environments
mTLS Mutual TLS certificates Certificate-based auth

SASL/SCRAM

SCRAM (Salted Challenge Response Authentication Mechanism) provides secure password-based authentication.

Broker Configuration

# server.properties

# Listeners
listeners=SASL_SSL://0.0.0.0:9093
advertised.listeners=SASL_SSL://kafka1:9093

# Inter-broker communication
security.inter.broker.protocol=SASL_SSL
sasl.mechanism.inter.broker.protocol=SCRAM-SHA-512

# Enabled mechanisms
sasl.enabled.mechanisms=SCRAM-SHA-512

# JAAS configuration
listener.name.sasl_ssl.scram-sha-512.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required \
  username="kafka-broker" \
  password="broker-password";

# TLS (required for SASL_SSL)
ssl.keystore.location=/etc/kafka/ssl/kafka.keystore.jks
ssl.keystore.password=keystore-password
ssl.key.password=key-password
ssl.truststore.location=/etc/kafka/ssl/kafka.truststore.jks
ssl.truststore.password=truststore-password

Create SCRAM Credentials

# Create broker user
kafka-configs.sh --bootstrap-server kafka:9092 \
  --alter \
  --add-config 'SCRAM-SHA-512=[password=broker-password]' \
  --entity-type users \
  --entity-name kafka-broker

# Create application user
kafka-configs.sh --bootstrap-server kafka:9092 \
  --alter \
  --add-config 'SCRAM-SHA-512=[password=app-password]' \
  --entity-type users \
  --entity-name my-application

# List users
kafka-configs.sh --bootstrap-server kafka:9092 \
  --describe \
  --entity-type users

Client Configuration

# client.properties
security.protocol=SASL_SSL
sasl.mechanism=SCRAM-SHA-512
sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required \
  username="my-application" \
  password="app-password";

ssl.truststore.location=/etc/kafka/ssl/client.truststore.jks
ssl.truststore.password=truststore-password

Java Client

Properties props = new Properties();
props.put("bootstrap.servers", "kafka:9093");
props.put("security.protocol", "SASL_SSL");
props.put("sasl.mechanism", "SCRAM-SHA-512");
props.put("sasl.jaas.config",
    "org.apache.kafka.common.security.scram.ScramLoginModule required " +
    "username=\"my-application\" " +
    "password=\"app-password\";");
props.put("ssl.truststore.location", "/path/to/truststore.jks");
props.put("ssl.truststore.password", "truststore-password");

SASL/PLAIN

Simple username/password authentication. Should only be used with TLS encryption.

Broker Configuration

# server.properties
listeners=SASL_SSL://0.0.0.0:9093
sasl.enabled.mechanisms=PLAIN

# JAAS configuration with users
listener.name.sasl_ssl.plain.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
  username="admin" \
  password="admin-password" \
  user_admin="admin-password" \
  user_producer="producer-password" \
  user_consumer="consumer-password";

Client Configuration

security.protocol=SASL_SSL
sasl.mechanism=PLAIN
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
  username="producer" \
  password="producer-password";

Security Note

SASL/PLAIN transmits credentials in plain text. Always use with TLS (SASL_SSL).


mTLS (Mutual TLS)

Certificate-based authentication where both client and server present certificates.

Broker Configuration

# server.properties
listeners=SSL://0.0.0.0:9093
advertised.listeners=SSL://kafka1:9093

security.inter.broker.protocol=SSL

# Keystore (broker identity)
ssl.keystore.location=/etc/kafka/ssl/kafka.keystore.jks
ssl.keystore.password=keystore-password
ssl.key.password=key-password

# Truststore (trusted CAs)
ssl.truststore.location=/etc/kafka/ssl/kafka.truststore.jks
ssl.truststore.password=truststore-password

# Require client certificates
ssl.client.auth=required

# Principal mapping
ssl.principal.mapping.rules=RULE:^CN=([^,]+),.*$/$1/

Client Configuration

security.protocol=SSL

# Client keystore (client identity)
ssl.keystore.location=/etc/kafka/ssl/client.keystore.jks
ssl.keystore.password=keystore-password
ssl.key.password=key-password

# Truststore (trusted CAs)
ssl.truststore.location=/etc/kafka/ssl/client.truststore.jks
ssl.truststore.password=truststore-password

Certificate Generation

# Generate CA
openssl req -new -x509 -keyout ca-key -out ca-cert -days 365 \
  -subj "/CN=Kafka-CA" -nodes

# Generate broker keystore
keytool -keystore kafka.keystore.jks -alias kafka-broker \
  -validity 365 -genkey -keyalg RSA -storepass changeit \
  -dname "CN=kafka1.example.com"

# Create CSR
keytool -keystore kafka.keystore.jks -alias kafka-broker \
  -certreq -file kafka-broker.csr -storepass changeit

# Sign certificate
openssl x509 -req -CA ca-cert -CAkey ca-key \
  -in kafka-broker.csr -out kafka-broker-signed.crt \
  -days 365 -CAcreateserial

# Import CA cert
keytool -keystore kafka.keystore.jks -alias CARoot \
  -import -file ca-cert -storepass changeit -noprompt

# Import signed cert
keytool -keystore kafka.keystore.jks -alias kafka-broker \
  -import -file kafka-broker-signed.crt -storepass changeit

# Create truststore with CA
keytool -keystore kafka.truststore.jks -alias CARoot \
  -import -file ca-cert -storepass changeit -noprompt

SASL/OAUTHBEARER

OAuth 2.0 token-based authentication for modern identity systems.

Broker Configuration

# server.properties
listeners=SASL_SSL://0.0.0.0:9093
sasl.enabled.mechanisms=OAUTHBEARER

# Custom callback handler for token validation
listener.name.sasl_ssl.oauthbearer.sasl.server.callback.handler.class=\
  com.example.OAuthBearerValidatorCallbackHandler

# OIDC settings
listener.name.sasl_ssl.oauthbearer.sasl.oauthbearer.jwks.endpoint.url=\
  https://identity-provider/.well-known/jwks.json
listener.name.sasl_ssl.oauthbearer.sasl.oauthbearer.expected.audience=kafka

Client Configuration

security.protocol=SASL_SSL
sasl.mechanism=OAUTHBEARER
sasl.login.callback.handler.class=\
  org.apache.kafka.common.security.oauthbearer.secured.OAuthBearerLoginCallbackHandler
sasl.oauthbearer.token.endpoint.url=https://identity-provider/oauth2/token
sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required \
  clientId="kafka-client" \
  clientSecret="client-secret";

Multiple Listeners

Configure different authentication per listener.

# Different auth for internal vs external
listeners=INTERNAL://0.0.0.0:9092,EXTERNAL://0.0.0.0:9093
listener.security.protocol.map=INTERNAL:SASL_PLAINTEXT,EXTERNAL:SASL_SSL

# Internal uses PLAIN
listener.name.internal.sasl.enabled.mechanisms=PLAIN
listener.name.internal.plain.sasl.jaas.config=...

# External uses SCRAM
listener.name.external.sasl.enabled.mechanisms=SCRAM-SHA-512
listener.name.external.scram-sha-512.sasl.jaas.config=...

inter.broker.listener.name=INTERNAL

Troubleshooting

Issue Cause Solution
Authentication failed Invalid credentials Verify username/password
SSL handshake failed Certificate mismatch Check truststore contains CA
SCRAM user not found User not created Run kafka-configs to create user
Principal mapping failed Invalid mapping rule Check ssl.principal.mapping.rules

SASL/GSSAPI (Kerberos)

Enterprise authentication using Kerberos Key Distribution Center (KDC).

Prerequisites

  1. Access to Kerberos KDC (Active Directory or MIT Kerberos)
  2. Principal for each broker: kafka/{hostname}@{REALM}
  3. Principal for each client application
  4. Keytab files for service accounts

Create Kerberos Principals

# Create broker principal
sudo /usr/sbin/kadmin.local -q 'addprinc -randkey kafka/[email protected]'

# Export to keytab
sudo /usr/sbin/kadmin.local -q "ktadd -k /etc/security/keytabs/kafka.keytab kafka/[email protected]"

Broker Configuration

# server.properties
listeners=SASL_SSL://0.0.0.0:9093
security.inter.broker.protocol=SASL_SSL
sasl.mechanism.inter.broker.protocol=GSSAPI
sasl.enabled.mechanisms=GSSAPI
sasl.kerberos.service.name=kafka

JAAS configuration (kafka_server_jaas.conf):

KafkaServer {
    com.sun.security.auth.module.Krb5LoginModule required
    useKeyTab=true
    storeKey=true
    keyTab="/etc/security/keytabs/kafka.keytab"
    principal="kafka/[email protected]";
};

JVM parameters:

-Djava.security.krb5.conf=/etc/kafka/krb5.conf
-Djava.security.auth.login.config=/etc/kafka/kafka_server_jaas.conf

Client Configuration

security.protocol=SASL_SSL
sasl.mechanism=GSSAPI
sasl.kerberos.service.name=kafka
sasl.jaas.config=com.sun.security.auth.module.Krb5LoginModule required \
  useKeyTab=true \
  storeKey=true \
  keyTab="/etc/security/keytabs/client.keytab" \
  principal="[email protected]";

For interactive use with kinit:

sasl.jaas.config=com.sun.security.auth.module.Krb5LoginModule required \
  useTicketCache=true;

JAAS Configuration

Configuration Precedence

JAAS configuration can be specified at multiple levels. The order of precedence:

  1. Broker property: listener.name.{listenerName}.{mechanism}.sasl.jaas.config
  2. Static JAAS file section: {listenerName}.KafkaServer
  3. Static JAAS file section: KafkaServer

Example with multiple mechanisms:

# server.properties
listener.name.sasl_ssl.scram-sha-512.sasl.jaas.config=\
  org.apache.kafka.common.security.scram.ScramLoginModule required \
  username="admin" password="admin-secret";

listener.name.sasl_ssl.plain.sasl.jaas.config=\
  org.apache.kafka.common.security.plain.PlainLoginModule required \
  username="admin" password="admin-secret" \
  user_admin="admin-secret" user_alice="alice-secret";

Static JAAS File

For clients using static JAAS configuration:

kafka_client_jaas.conf:

KafkaClient {
    org.apache.kafka.common.security.scram.ScramLoginModule required
    username="my-application"
    password="app-password";
};

JVM parameter:

-Djava.security.auth.login.config=/etc/kafka/kafka_client_jaas.conf

SCRAM Credential Management

Initial Credentials (KRaft)

For KRaft clusters, create initial credentials during storage formatting:

# Format storage with initial SCRAM credentials
kafka-storage.sh format \
  -t $(kafka-storage.sh random-uuid) \
  -c config/kraft/server.properties \
  --add-scram 'SCRAM-SHA-512=[name="admin",password="admin-secret"]'

Runtime Credential Management

# Create user with custom iteration count
kafka-configs.sh --bootstrap-server kafka:9092 \
  --alter \
  --add-config 'SCRAM-SHA-512=[iterations=8192,password=user-password]' \
  --entity-type users \
  --entity-name my-user

# List user credentials (shows hash, not password)
kafka-configs.sh --bootstrap-server kafka:9092 \
  --describe \
  --entity-type users \
  --entity-name my-user

# Delete user credentials
kafka-configs.sh --bootstrap-server kafka:9092 \
  --alter \
  --delete-config 'SCRAM-SHA-512' \
  --entity-type users \
  --entity-name my-user

Security Considerations

Consideration Recommendation
Storage SCRAM credentials stored in metadata log (KRaft)
Hash functions Only SHA-256 and SHA-512 supported
Iterations Minimum 4096 (default); increase for stronger protection
Transport Always use with TLS (SASL_SSL)
Controller security Ensure KRaft controllers on secure, private network

Delegation Tokens

Delegation tokens provide lightweight authentication for short-lived operations without exposing primary credentials.

Enable Delegation Tokens

# server.properties
delegation.token.secret.key=<base64-encoded-secret>
delegation.token.max.lifetime.ms=604800000  # 7 days
delegation.token.expiry.time.ms=86400000    # 1 day
delegation.token.expiry.check.interval.ms=3600000  # 1 hour

Create Delegation Token

kafka-delegation-tokens.sh --bootstrap-server kafka:9092 \
  --command-config admin.properties \
  --create \
  --max-life-time-period 86400000 \
  --owner-principal User:my-user

Use Delegation Token

security.protocol=SASL_SSL
sasl.mechanism=SCRAM-SHA-512
sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required \
  username="my-user" \
  password="<delegation-token-value>" \
  tokenauth=true;

Connection Performance

DNS Lookup Performance

Clients perform reverse DNS lookups during SASL handshake. Use fully qualified domain names (FQDNs) in both bootstrap.servers and broker advertised.listeners to avoid slow handshakes.


Re-authentication

Enable periodic re-authentication for long-running connections:

# Broker configuration
connections.max.reauth.ms=3600000  # Re-authenticate every hour

Client connections are re-authenticated transparently without disconnection when credentials or tokens are refreshed.