Mutual TLS Authentication (mTLS)¶
Mutual TLS (mTLS) provides certificate-based authentication where both clients and brokers present certificates to verify identity. It eliminates passwords entirely, relying on PKI infrastructure for identity management.
Overview¶
When to Use mTLS¶
| Use Case | Recommendation |
|---|---|
| Existing PKI infrastructure | Recommended |
| Zero-trust environments | Recommended |
| Service-to-service auth | Recommended |
| IoT/device authentication | Recommended |
| User authentication | Consider OAuth/SCRAM |
| No PKI available | Consider SCRAM |
mTLS Benefits¶
| Feature | Benefit |
|---|---|
| No passwords | Eliminates credential management |
| Strong identity | Cryptographic verification |
| Certificate lifecycle | Automated rotation via PKI |
| Mutual verification | Both parties authenticated |
| Network encryption | Built into TLS handshake |
mTLS vs Other Mechanisms¶
| Feature | mTLS | SCRAM | OAuth |
|---|---|---|---|
| Password required | No | Yes | No (tokens) |
| External infrastructure | PKI/CA | None | IdP |
| Credential rotation | Cert renewal | Manual | Token refresh |
| Identity source | Certificate | Broker config | IdP |
Version Requirements¶
| Feature | Kafka Version |
|---|---|
| SSL/TLS authentication | 0.9.0+ |
| Per-listener SSL config | 1.0.0+ |
| PEM format support | 2.7.0+ |
| Custom principal builder | 0.10.0+ |
Certificate Architecture¶
Certificate requirements:
| Component | Certificate Purpose | Key Fields |
|---|---|---|
| CA | Trust anchor | Root or intermediate CA |
| Broker | Server identity | CN or SAN matching hostname |
| Client | Client identity | CN identifying application |
Certificate Generation¶
Using OpenSSL¶
1. Create CA:
# Generate CA private key
openssl genrsa -out ca-key.pem 4096
# Create CA certificate
openssl req -new -x509 -days 3650 -key ca-key.pem -out ca-cert.pem \
-subj "/C=US/ST=CA/L=San Francisco/O=MyOrg/CN=Kafka-CA"
2. Create broker certificate:
# Generate broker private key
openssl genrsa -out kafka1-key.pem 2048
# Create certificate signing request (CSR)
openssl req -new -key kafka1-key.pem -out kafka1.csr \
-subj "/C=US/ST=CA/L=San Francisco/O=MyOrg/CN=kafka1.example.com"
# Create SAN extension file
cat > kafka1-san.ext << EOF
subjectAltName=DNS:kafka1.example.com,DNS:localhost,IP:192.168.1.10
EOF
# Sign certificate with CA
openssl x509 -req -days 365 -in kafka1.csr -CA ca-cert.pem -CAkey ca-key.pem \
-CAcreateserial -out kafka1-cert.pem -extfile kafka1-san.ext
3. Create client certificate:
# Generate client private key
openssl genrsa -out client-key.pem 2048
# Create CSR
openssl req -new -key client-key.pem -out client.csr \
-subj "/C=US/ST=CA/L=San Francisco/O=MyOrg/OU=Applications/CN=producer-app"
# Sign with CA
openssl x509 -req -days 365 -in client.csr -CA ca-cert.pem -CAkey ca-key.pem \
-CAcreateserial -out client-cert.pem
Create Keystores (PKCS12)¶
# Broker keystore (certificate + private key)
openssl pkcs12 -export -in kafka1-cert.pem -inkey kafka1-key.pem \
-out kafka1.keystore.p12 -name kafka1 \
-CAfile ca-cert.pem -caname root -password pass:keystore-password
# Truststore (CA certificate)
keytool -import -file ca-cert.pem -keystore kafka.truststore.p12 \
-storetype PKCS12 -alias ca -storepass truststore-password -noprompt
# Client keystore
openssl pkcs12 -export -in client-cert.pem -inkey client-key.pem \
-out client.keystore.p12 -name client \
-CAfile ca-cert.pem -caname root -password pass:keystore-password
Using cfssl¶
# Install cfssl
go install github.com/cloudflare/cfssl/cmd/cfssl@latest
go install github.com/cloudflare/cfssl/cmd/cfssljson@latest
# CA config
cat > ca-config.json << EOF
{
"signing": {
"default": {
"expiry": "8760h"
},
"profiles": {
"server": {
"usages": ["signing", "key encipherment", "server auth"],
"expiry": "8760h"
},
"client": {
"usages": ["signing", "key encipherment", "client auth"],
"expiry": "8760h"
}
}
}
}
EOF
# Generate CA
cfssl gencert -initca ca-csr.json | cfssljson -bare ca
# Generate broker cert
cfssl gencert -ca=ca.pem -ca-key=ca-key.pem -config=ca-config.json \
-profile=server kafka1-csr.json | cfssljson -bare kafka1
# Generate client cert
cfssl gencert -ca=ca.pem -ca-key=ca-key.pem -config=ca-config.json \
-profile=client client-csr.json | cfssljson -bare client
Broker Configuration¶
Basic mTLS Setup¶
# server.properties
# Listener using SSL protocol (mTLS)
listeners=SSL://0.0.0.0:9093
advertised.listeners=SSL://kafka1.example.com:9093
# Inter-broker communication
security.inter.broker.protocol=SSL
# Require client certificate authentication
ssl.client.auth=required
# Broker keystore (certificate + private key)
ssl.keystore.type=PKCS12
ssl.keystore.location=/etc/kafka/ssl/kafka.keystore.p12
ssl.keystore.password=${KEYSTORE_PASSWORD}
ssl.key.password=${KEY_PASSWORD}
# Truststore (CA certificates)
ssl.truststore.type=PKCS12
ssl.truststore.location=/etc/kafka/ssl/kafka.truststore.p12
ssl.truststore.password=${TRUSTSTORE_PASSWORD}
# TLS protocol settings
ssl.enabled.protocols=TLSv1.3,TLSv1.2
ssl.endpoint.identification.algorithm=HTTPS
Client Auth Modes¶
| Value | Behavior |
|---|---|
required |
Client must present valid certificate |
requested |
Client certificate optional |
none |
No client certificate requested |
PEM Format (Kafka 2.7+)¶
# Use PEM files directly (no keystore)
ssl.keystore.type=PEM
ssl.keystore.certificate.chain=/etc/kafka/ssl/kafka1-cert.pem
ssl.keystore.key=/etc/kafka/ssl/kafka1-key.pem
ssl.truststore.type=PEM
ssl.truststore.certificates=/etc/kafka/ssl/ca-cert.pem
Multiple Listeners¶
Configure different authentication per listener:
# Define listeners
listeners=INTERNAL://0.0.0.0:9092,MTLS://0.0.0.0:9093
# Map to protocols
listener.security.protocol.map=INTERNAL:PLAINTEXT,MTLS:SSL
# mTLS listener requires client cert
listener.name.mtls.ssl.client.auth=required
# Internal listener (trusted network)
listener.name.internal.ssl.client.auth=none
Principal Mapping¶
Default Principal¶
By default, the client's Distinguished Name (DN) becomes the Kafka principal:
Certificate DN: CN=producer-app,OU=Applications,O=MyOrg
Kafka Principal: User:CN=producer-app,OU=Applications,O=MyOrg
Custom Principal Builder¶
Extract specific certificate fields:
# Use Common Name only
ssl.principal.mapping.rules=RULE:^CN=([^,]+).*$/$1/
# Examples:
# CN=producer-app,OU=Applications -> producer-app
# CN=admin,OU=Admins,O=MyOrg -> admin
Multiple Mapping Rules¶
ssl.principal.mapping.rules=\
RULE:^CN=([^,]+),OU=Services.*$/$1/,\
RULE:^CN=([^,]+),OU=Users.*$/User:$1/,\
DEFAULT
| Rule | Input DN | Output Principal |
|---|---|---|
| Rule 1 | CN=producer,OU=Services,O=MyOrg |
producer |
| Rule 2 | CN=alice,OU=Users,O=MyOrg |
User:alice |
| DEFAULT | CN=other,OU=Other |
CN=other,OU=Other |
Custom Principal Builder Class¶
For complex logic, implement a custom builder:
import org.apache.kafka.common.security.auth.KafkaPrincipalBuilder;
import org.apache.kafka.common.security.auth.AuthenticationContext;
import org.apache.kafka.common.security.auth.KafkaPrincipal;
import org.apache.kafka.common.security.auth.SslAuthenticationContext;
import javax.net.ssl.SSLSession;
import java.security.cert.X509Certificate;
public class CustomPrincipalBuilder implements KafkaPrincipalBuilder {
@Override
public KafkaPrincipal build(AuthenticationContext context) {
if (context instanceof SslAuthenticationContext) {
SSLSession session = ((SslAuthenticationContext) context).session();
try {
X509Certificate cert = (X509Certificate) session.getPeerCertificates()[0];
String cn = extractCN(cert.getSubjectDN().getName());
return new KafkaPrincipal(KafkaPrincipal.USER_TYPE, cn);
} catch (Exception e) {
return KafkaPrincipal.ANONYMOUS;
}
}
return KafkaPrincipal.ANONYMOUS;
}
private String extractCN(String dn) {
// Extract CN from DN
for (String part : dn.split(",")) {
if (part.trim().startsWith("CN=")) {
return part.trim().substring(3);
}
}
return dn;
}
}
# Register custom builder
principal.builder.class=com.example.CustomPrincipalBuilder
Client Configuration¶
Java Client¶
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import java.util.Properties;
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka1:9093,kafka2:9093");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
// SSL/mTLS configuration
props.put("security.protocol", "SSL");
// Client keystore (client certificate + private key)
props.put("ssl.keystore.type", "PKCS12");
props.put("ssl.keystore.location", "/etc/kafka/ssl/client.keystore.p12");
props.put("ssl.keystore.password", "keystore-password");
props.put("ssl.key.password", "key-password");
// Truststore (CA certificate)
props.put("ssl.truststore.type", "PKCS12");
props.put("ssl.truststore.location", "/etc/kafka/ssl/client.truststore.p12");
props.put("ssl.truststore.password", "truststore-password");
// Verify broker hostname
props.put("ssl.endpoint.identification.algorithm", "HTTPS");
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
Spring Boot¶
application.yml:
spring:
kafka:
bootstrap-servers: kafka1:9093,kafka2:9093
properties:
security.protocol: SSL
ssl.endpoint.identification.algorithm: HTTPS
ssl:
key-store-location: classpath:client.keystore.p12
key-store-password: ${KEYSTORE_PASSWORD}
key-store-type: PKCS12
key-password: ${KEY_PASSWORD}
trust-store-location: classpath:truststore.p12
trust-store-password: ${TRUSTSTORE_PASSWORD}
trust-store-type: PKCS12
Python (confluent-kafka)¶
from confluent_kafka import Producer
config = {
'bootstrap.servers': 'kafka1:9093,kafka2:9093',
'security.protocol': 'SSL',
# Client certificate and key
'ssl.certificate.location': '/etc/kafka/ssl/client-cert.pem',
'ssl.key.location': '/etc/kafka/ssl/client-key.pem',
'ssl.key.password': 'key-password',
# CA certificate
'ssl.ca.location': '/etc/kafka/ssl/ca-cert.pem',
# Verify broker hostname
'ssl.endpoint.identification.algorithm': 'https',
}
producer = Producer(config)
Go (confluent-kafka-go)¶
import (
"github.com/confluentinc/confluent-kafka-go/v2/kafka"
)
producer, err := kafka.NewProducer(&kafka.ConfigMap{
"bootstrap.servers": "kafka1:9093,kafka2:9093",
"security.protocol": "SSL",
"ssl.certificate.location": "/etc/kafka/ssl/client-cert.pem",
"ssl.key.location": "/etc/kafka/ssl/client-key.pem",
"ssl.key.password": "key-password",
"ssl.ca.location": "/etc/kafka/ssl/ca-cert.pem",
"ssl.endpoint.identification.algorithm": "https",
})
Command-Line Tools¶
client-ssl.properties:
security.protocol=SSL
ssl.keystore.type=PKCS12
ssl.keystore.location=/etc/kafka/ssl/client.keystore.p12
ssl.keystore.password=keystore-password
ssl.key.password=key-password
ssl.truststore.type=PKCS12
ssl.truststore.location=/etc/kafka/ssl/client.truststore.p12
ssl.truststore.password=truststore-password
ssl.endpoint.identification.algorithm=HTTPS
# List topics
kafka-topics.sh --bootstrap-server kafka:9093 \
--command-config client-ssl.properties \
--list
# Produce messages
kafka-console-producer.sh --bootstrap-server kafka:9093 \
--topic my-topic \
--producer.config client-ssl.properties
# Consume messages
kafka-console-consumer.sh --bootstrap-server kafka:9093 \
--topic my-topic \
--consumer.config client-ssl.properties \
--from-beginning
Certificate Rotation¶
Broker Certificate Rotation¶
- Generate new certificates signed by same CA
- Update keystore on each broker
- Rolling restart brokers
# Verify new certificate before deployment
openssl x509 -in new-kafka1-cert.pem -text -noout
# Update keystore
openssl pkcs12 -export -in new-kafka1-cert.pem -inkey new-kafka1-key.pem \
-out kafka1.keystore.p12 -name kafka1 \
-password pass:keystore-password
# Copy to broker and restart
scp kafka1.keystore.p12 kafka1:/etc/kafka/ssl/
ssh kafka1 'systemctl restart kafka'
CA Rotation¶
Rotating the CA requires more care:
- Add new CA to all truststores (alongside old CA)
- Rolling restart all brokers and clients
- Issue new certificates signed by new CA
- Deploy new certificates with rolling restarts
- Remove old CA from truststores
- Final rolling restart
Automated Rotation with cert-manager¶
# Kubernetes cert-manager Certificate
apiVersion: cert-manager.io/v1
kind: Certificate
metadata:
name: kafka-broker
spec:
secretName: kafka-broker-tls
issuerRef:
name: kafka-ca-issuer
kind: ClusterIssuer
commonName: kafka1.example.com
dnsNames:
- kafka1.example.com
- kafka1
duration: 8760h # 1 year
renewBefore: 720h # 30 days
privateKey:
algorithm: RSA
size: 2048
Security Best Practices¶
Certificate Requirements¶
| Requirement | Recommendation |
|---|---|
| Key size | RSA 2048+ or ECDSA P-256+ |
| Validity period | 1 year for services, shorter for rotation |
| SAN extension | Include all hostnames/IPs |
| Key usage | Server auth for brokers, client auth for clients |
File Security¶
# Restrict keystore access
chmod 400 /etc/kafka/ssl/*.keystore.p12
chown kafka:kafka /etc/kafka/ssl/*.keystore.p12
# Private keys
chmod 400 /etc/kafka/ssl/*-key.pem
chown kafka:kafka /etc/kafka/ssl/*-key.pem
Certificate Validation¶
# Always verify broker hostname
ssl.endpoint.identification.algorithm=HTTPS
# Use TLS 1.2 or higher
ssl.enabled.protocols=TLSv1.3,TLSv1.2
# Strong cipher suites (example)
ssl.cipher.suites=TLS_AES_256_GCM_SHA384,TLS_CHACHA20_POLY1305_SHA256
Troubleshooting¶
Common Errors¶
| Error | Cause | Solution |
|---|---|---|
PKIX path building failed |
CA not in truststore | Add CA to truststore |
Certificate unknown |
Client cert not trusted | Sign with trusted CA |
Hostname verification failed |
SAN doesn't match hostname | Add hostname to SAN |
No cipher suites in common |
Protocol/cipher mismatch | Check ssl.enabled.protocols |
Keystore password incorrect |
Wrong password | Verify password |
Debug TLS Handshake¶
# JVM debug parameter
-Djavax.net.debug=ssl:handshake
# Or more verbose
-Djavax.net.debug=all
Verify Certificates¶
# Check certificate details
openssl x509 -in kafka1-cert.pem -text -noout
# Verify certificate chain
openssl verify -CAfile ca-cert.pem kafka1-cert.pem
# Check keystore contents
keytool -list -v -keystore kafka.keystore.p12 -storepass password
# Test SSL connection
openssl s_client -connect kafka1:9093 -CAfile ca-cert.pem \
-cert client-cert.pem -key client-key.pem
Check Broker TLS Status¶
# Test broker connection
kafka-broker-api-versions.sh --bootstrap-server kafka:9093 \
--command-config client-ssl.properties
# Check broker logs
grep -i "ssl\|tls\|certificate" /var/log/kafka/server.log | tail -50
Combining mTLS with SASL¶
Use mTLS for transport security and SASL for authentication:
# Broker: SASL authentication over TLS
listeners=SASL_SSL://0.0.0.0:9093
# Client cert optional (SASL provides auth)
ssl.client.auth=requested
# SASL mechanism
sasl.enabled.mechanisms=SCRAM-SHA-512
This provides:
- TLS encryption and optional client certificate verification
- SASL-based identity for authorization
Related Documentation¶
- Authentication Overview - Mechanism comparison
- SASL/SCRAM - Password-based authentication
- Encryption - TLS configuration details
- Authorization - ACL configuration