OAuth/OIDC Authentication (SASL/OAUTHBEARER)¶
SASL/OAUTHBEARER enables OAuth 2.0 and OpenID Connect (OIDC) authentication for Kafka. It integrates with cloud identity providers like Azure AD, Okta, Auth0, and Keycloak.
Overview¶
When to Use OAuth¶
| Use Case | Recommendation |
|---|---|
| Cloud-native deployments | Recommended |
| Azure AD / Entra ID integration | Recommended |
| Okta / Auth0 / Keycloak | Recommended |
| Microservices architecture | Recommended |
| No existing identity provider | Consider SCRAM |
| On-premises without IdP | Consider Kerberos or SCRAM |
OAuth Benefits¶
| Feature | Benefit |
|---|---|
| Token-based | Short-lived, automatically refreshed |
| Centralized identity | Single source of truth |
| Fine-grained scopes | Permission control via claims |
| No password storage | Tokens from IdP only |
| Standard protocol | Broad tooling support |
Version Requirements¶
| Feature | Kafka Version |
|---|---|
| SASL/OAUTHBEARER (custom callback) | 2.0.0+ |
| Built-in OIDC support | 3.1.0+ |
| Token refresh without reconnect | 2.2.0+ |
Use Kafka 3.1+ for OIDC
Kafka 3.1 introduced built-in OIDC support with OAuthBearerLoginCallbackHandler. Earlier versions require custom callback implementation.
OAuth Flow¶
Key concepts:
- Access Token - JWT containing identity and claims
- Client Credentials - Application identity (client_id/secret)
- Token Refresh - Automatic renewal before expiry
- Claims - Identity attributes (username, groups, scopes)
Identity Provider Setup¶
Azure AD / Entra ID¶
- Register application:
Azure Portal -> Azure Active Directory -> App Registrations -> New Registration
Name: kafka-cluster
Account types: Single tenant
- Configure API permissions:
API Permissions -> Add permission -> APIs my organization uses
Select your Kafka app -> Add permissions
- Create client secret:
Certificates & secrets -> New client secret
Note: Copy secret value immediately, shown only once
- Get endpoints:
Token endpoint: https://login.microsoftonline.com/{tenant-id}/oauth2/v2.0/token
JWKS URI: https://login.microsoftonline.com/{tenant-id}/discovery/v2.0/keys
Okta¶
- Create application:
Applications -> Create App Integration
Sign-in method: OIDC
Application type: Service (Machine-to-Machine)
- Configure:
Client authentication: Client secret
Grant type: Client Credentials
- Get endpoints:
Token endpoint: https://{your-domain}.okta.com/oauth2/default/v1/token
JWKS URI: https://{your-domain}.okta.com/oauth2/default/v1/keys
Keycloak¶
- Create client:
Clients -> Create Client
Client ID: kafka-broker
Client authentication: ON
Authentication flow: Service accounts roles
- Get endpoints:
Token endpoint: https://{keycloak}/realms/{realm}/protocol/openid-connect/token
JWKS URI: https://{keycloak}/realms/{realm}/protocol/openid-connect/certs
Broker Configuration (Kafka 3.1+)¶
Basic OIDC Setup¶
# server.properties
# Listener configuration
listeners=SASL_SSL://0.0.0.0:9093
advertised.listeners=SASL_SSL://kafka1.example.com:9093
# Security protocol
security.inter.broker.protocol=SASL_SSL
sasl.mechanism.inter.broker.protocol=OAUTHBEARER
# Enable OAuth
sasl.enabled.mechanisms=OAUTHBEARER
# OIDC configuration
listener.name.sasl_ssl.oauthbearer.sasl.jaas.config=\
org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required \
clientId="kafka-broker" \
clientSecret="${OAUTH_CLIENT_SECRET}" \
scope="kafka" ;
# Server callback handler (validates incoming tokens)
listener.name.sasl_ssl.oauthbearer.sasl.server.callback.handler.class=\
org.apache.kafka.common.security.oauthbearer.secured.OAuthBearerValidatorCallbackHandler
# Login callback handler (obtains tokens for inter-broker)
listener.name.sasl_ssl.oauthbearer.sasl.login.callback.handler.class=\
org.apache.kafka.common.security.oauthbearer.secured.OAuthBearerLoginCallbackHandler
# OIDC discovery
sasl.oauthbearer.token.endpoint.url=https://login.microsoftonline.com/{tenant}/oauth2/v2.0/token
sasl.oauthbearer.jwks.endpoint.url=https://login.microsoftonline.com/{tenant}/discovery/v2.0/keys
# Expected audience (your Kafka app's client ID)
sasl.oauthbearer.expected.audience=api://kafka-cluster
# Clock skew tolerance (seconds)
sasl.oauthbearer.clock.skew.seconds=30
TLS Configuration¶
# SSL settings
ssl.keystore.type=PKCS12
ssl.keystore.location=/etc/kafka/ssl/kafka.keystore.p12
ssl.keystore.password=${KEYSTORE_PASSWORD}
ssl.key.password=${KEY_PASSWORD}
ssl.truststore.type=PKCS12
ssl.truststore.location=/etc/kafka/ssl/kafka.truststore.p12
ssl.truststore.password=${TRUSTSTORE_PASSWORD}
ssl.enabled.protocols=TLSv1.3,TLSv1.2
ssl.endpoint.identification.algorithm=HTTPS
Azure AD Configuration¶
# Azure AD specific
listener.name.sasl_ssl.oauthbearer.sasl.jaas.config=\
org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required \
clientId="${AZURE_CLIENT_ID}" \
clientSecret="${AZURE_CLIENT_SECRET}" \
scope="api://kafka-cluster/.default" ;
sasl.oauthbearer.token.endpoint.url=https://login.microsoftonline.com/${AZURE_TENANT_ID}/oauth2/v2.0/token
sasl.oauthbearer.jwks.endpoint.url=https://login.microsoftonline.com/${AZURE_TENANT_ID}/discovery/v2.0/keys
sasl.oauthbearer.expected.audience=api://kafka-cluster
sasl.oauthbearer.expected.issuer=https://sts.windows.net/${AZURE_TENANT_ID}/
Okta Configuration¶
# Okta specific
listener.name.sasl_ssl.oauthbearer.sasl.jaas.config=\
org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required \
clientId="${OKTA_CLIENT_ID}" \
clientSecret="${OKTA_CLIENT_SECRET}" \
scope="kafka" ;
sasl.oauthbearer.token.endpoint.url=https://${OKTA_DOMAIN}/oauth2/default/v1/token
sasl.oauthbearer.jwks.endpoint.url=https://${OKTA_DOMAIN}/oauth2/default/v1/keys
sasl.oauthbearer.expected.audience=api://default
sasl.oauthbearer.expected.issuer=https://${OKTA_DOMAIN}/oauth2/default
Keycloak Configuration¶
# Keycloak specific
listener.name.sasl_ssl.oauthbearer.sasl.jaas.config=\
org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required \
clientId="${KEYCLOAK_CLIENT_ID}" \
clientSecret="${KEYCLOAK_CLIENT_SECRET}" \
scope="openid kafka" ;
sasl.oauthbearer.token.endpoint.url=https://${KEYCLOAK_HOST}/realms/${REALM}/protocol/openid-connect/token
sasl.oauthbearer.jwks.endpoint.url=https://${KEYCLOAK_HOST}/realms/${REALM}/protocol/openid-connect/certs
sasl.oauthbearer.expected.audience=${KEYCLOAK_CLIENT_ID}
sasl.oauthbearer.expected.issuer=https://${KEYCLOAK_HOST}/realms/${REALM}
Principal Extraction¶
Extract user identity from JWT claims for authorization:
Claim Mapping¶
# Extract principal from specific claim
sasl.oauthbearer.sub.claim.name=preferred_username
# Or use multiple claims
# Default: uses 'sub' claim
| Claim | Description | Example |
|---|---|---|
sub |
Subject (default) | UUID or email |
preferred_username |
Human-readable username | [email protected] |
email |
Email address | [email protected] |
azp |
Authorized party | client_id |
Group-Based Authorization¶
# Extract groups from token
sasl.oauthbearer.groups.claim.name=groups
# Maps to Kafka principals for ACLs
# Group claim: ["kafka-admins", "kafka-producers"]
# Results in User:alice being member of groups
Client Configuration¶
Java Client (Kafka 3.1+)¶
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import java.util.Properties;
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka1:9093,kafka2:9093");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
// Security configuration
props.put("security.protocol", "SASL_SSL");
props.put("sasl.mechanism", "OAUTHBEARER");
// OIDC configuration
props.put("sasl.jaas.config",
"org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required " +
"clientId=\"my-application\" " +
"clientSecret=\"" + System.getenv("OAUTH_CLIENT_SECRET") + "\" " +
"scope=\"kafka\";");
props.put("sasl.login.callback.handler.class",
"org.apache.kafka.common.security.oauthbearer.secured.OAuthBearerLoginCallbackHandler");
props.put("sasl.oauthbearer.token.endpoint.url",
"https://login.microsoftonline.com/{tenant}/oauth2/v2.0/token");
// TLS configuration
props.put("ssl.truststore.location", "/etc/kafka/ssl/client.truststore.p12");
props.put("ssl.truststore.password", "truststore-password");
props.put("ssl.truststore.type", "PKCS12");
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
Spring Boot¶
application.yml:
spring:
kafka:
bootstrap-servers: kafka1:9093,kafka2:9093
properties:
security.protocol: SASL_SSL
sasl.mechanism: OAUTHBEARER
sasl.jaas.config: >
org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required
clientId="${OAUTH_CLIENT_ID}"
clientSecret="${OAUTH_CLIENT_SECRET}"
scope="kafka";
sasl.login.callback.handler.class: org.apache.kafka.common.security.oauthbearer.secured.OAuthBearerLoginCallbackHandler
sasl.oauthbearer.token.endpoint.url: ${OAUTH_TOKEN_ENDPOINT}
ssl:
trust-store-location: classpath:truststore.p12
trust-store-password: ${TRUSTSTORE_PASSWORD}
trust-store-type: PKCS12
Python (confluent-kafka)¶
from confluent_kafka import Producer
import requests
# Get token from IdP
def get_oauth_token():
response = requests.post(
'https://login.microsoftonline.com/{tenant}/oauth2/v2.0/token',
data={
'client_id': 'my-application',
'client_secret': os.environ['OAUTH_CLIENT_SECRET'],
'grant_type': 'client_credentials',
'scope': 'api://kafka-cluster/.default'
}
)
return response.json()['access_token']
def oauth_cb(config_str):
return get_oauth_token(), time.time() + 3600
config = {
'bootstrap.servers': 'kafka1:9093,kafka2:9093',
'security.protocol': 'SASL_SSL',
'sasl.mechanism': 'OAUTHBEARER',
'oauth_cb': oauth_cb,
'ssl.ca.location': '/etc/kafka/ssl/ca-cert.pem',
}
producer = Producer(config)
Command-Line Tools¶
client.properties:
security.protocol=SASL_SSL
sasl.mechanism=OAUTHBEARER
sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required \
clientId="admin-tool" \
clientSecret="${OAUTH_CLIENT_SECRET}" \
scope="kafka";
sasl.login.callback.handler.class=org.apache.kafka.common.security.oauthbearer.secured.OAuthBearerLoginCallbackHandler
sasl.oauthbearer.token.endpoint.url=https://login.microsoftonline.com/{tenant}/oauth2/v2.0/token
ssl.truststore.location=/etc/kafka/ssl/client.truststore.p12
ssl.truststore.password=truststore-password
kafka-topics.sh --bootstrap-server kafka:9093 \
--command-config client.properties \
--list
Token Refresh¶
Automatic Refresh¶
Kafka clients automatically refresh tokens before expiry:
# Client refresh settings
sasl.login.refresh.window.factor=0.8
sasl.login.refresh.window.jitter=0.05
sasl.login.refresh.min.period.seconds=60
sasl.login.refresh.buffer.seconds=5
| Property | Default | Description |
|---|---|---|
refresh.window.factor |
0.8 | Refresh at 80% of token lifetime |
refresh.window.jitter |
0.05 | Random jitter |
refresh.min.period.seconds |
60 | Minimum refresh interval |
refresh.buffer.seconds |
5 | Buffer before expiry |
Connection Re-authentication¶
Enable broker-initiated re-authentication (Kafka 2.2+):
Broker:
# Force re-authentication every hour
connections.max.reauth.ms=3600000
Custom Callback Handler (Pre-3.1)¶
For Kafka versions before 3.1 or custom token logic:
Login Callback (Token Acquisition)¶
import org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule;
import org.apache.kafka.common.security.oauthbearer.OAuthBearerTokenCallback;
import javax.security.auth.callback.*;
import javax.security.auth.login.AppConfigurationEntry;
import java.util.*;
public class CustomOAuthLoginCallbackHandler implements AuthenticateCallbackHandler {
private String tokenEndpoint;
private String clientId;
private String clientSecret;
@Override
public void configure(Map<String, ?> configs, String saslMechanism,
List<AppConfigurationEntry> jaasConfigEntries) {
// Extract configuration
this.tokenEndpoint = (String) configs.get("oauth.token.endpoint");
this.clientId = (String) configs.get("oauth.client.id");
this.clientSecret = (String) configs.get("oauth.client.secret");
}
@Override
public void handle(Callback[] callbacks) throws UnsupportedCallbackException {
for (Callback callback : callbacks) {
if (callback instanceof OAuthBearerTokenCallback) {
handleTokenCallback((OAuthBearerTokenCallback) callback);
} else {
throw new UnsupportedCallbackException(callback);
}
}
}
private void handleTokenCallback(OAuthBearerTokenCallback callback) {
// Fetch token from IdP
String accessToken = fetchAccessToken();
// Parse JWT and create token
OAuthBearerToken token = new OAuthBearerTokenImpl(accessToken);
callback.token(token);
}
private String fetchAccessToken() {
// HTTP POST to token endpoint
// Return access_token from response
}
@Override
public void close() {}
}
Validator Callback (Token Validation)¶
public class CustomOAuthValidatorCallbackHandler implements AuthenticateCallbackHandler {
private String jwksEndpoint;
private JWKSet jwkSet;
@Override
public void configure(Map<String, ?> configs, String saslMechanism,
List<AppConfigurationEntry> jaasConfigEntries) {
this.jwksEndpoint = (String) configs.get("oauth.jwks.endpoint");
// Fetch JWKS for signature verification
this.jwkSet = fetchJWKS();
}
@Override
public void handle(Callback[] callbacks) throws UnsupportedCallbackException {
for (Callback callback : callbacks) {
if (callback instanceof OAuthBearerValidatorCallback) {
handleValidatorCallback((OAuthBearerValidatorCallback) callback);
} else {
throw new UnsupportedCallbackException(callback);
}
}
}
private void handleValidatorCallback(OAuthBearerValidatorCallback callback) {
String tokenValue = callback.tokenValue();
// Validate JWT signature using JWKS
// Validate expiration, audience, issuer
// Extract claims
if (isValid(tokenValue)) {
callback.token(new OAuthBearerTokenImpl(tokenValue));
} else {
callback.error("invalid_token", "Token validation failed", null);
}
}
@Override
public void close() {}
}
Register Custom Handlers¶
# Broker
listener.name.sasl_ssl.oauthbearer.sasl.server.callback.handler.class=\
com.example.CustomOAuthValidatorCallbackHandler
listener.name.sasl_ssl.oauthbearer.sasl.login.callback.handler.class=\
com.example.CustomOAuthLoginCallbackHandler
# Client
sasl.login.callback.handler.class=com.example.CustomOAuthLoginCallbackHandler
Security Best Practices¶
Token Security¶
| Best Practice | Description |
|---|---|
| Short-lived tokens | Use tokens with 1 hour or less lifetime |
| Secure client secrets | Store in secrets manager, not config files |
| Validate all claims | Audience, issuer, expiration |
| Use TLS | Always SASL_SSL, never SASL_PLAINTEXT |
| Least privilege scopes | Request only needed permissions |
Client Secret Management¶
Environment variables:
export OAUTH_CLIENT_SECRET=$(vault kv get -field=secret secret/kafka/oauth)
Kubernetes secrets:
apiVersion: v1
kind: Secret
metadata:
name: kafka-oauth
type: Opaque
stringData:
client-secret: your-client-secret
---
apiVersion: apps/v1
kind: Deployment
spec:
template:
spec:
containers:
- name: kafka
env:
- name: OAUTH_CLIENT_SECRET
valueFrom:
secretKeyRef:
name: kafka-oauth
key: client-secret
Network Security¶
# HTTPS for all OAuth endpoints
sasl.oauthbearer.token.endpoint.url=https://...
sasl.oauthbearer.jwks.endpoint.url=https://...
# TLS for Kafka
ssl.endpoint.identification.algorithm=HTTPS
Troubleshooting¶
Common Errors¶
| Error | Cause | Solution |
|---|---|---|
invalid_token |
Expired or malformed token | Check token expiry, IdP config |
invalid_client |
Wrong client credentials | Verify client_id/secret |
invalid_scope |
Requested scope not granted | Check IdP permission config |
Token validation failed |
Signature verification failed | Check JWKS endpoint, clock skew |
Connection refused |
Cannot reach IdP | Check network, firewall |
Debug Logging¶
Broker:
# log4j.properties
log4j.logger.org.apache.kafka.common.security.oauthbearer=DEBUG
log4j.logger.org.apache.kafka.common.security=DEBUG
Client:
props.put("sasl.login.connect.timeout.ms", "10000");
// Enable DEBUG logging in log4j/logback
Test Token Acquisition¶
# Test token endpoint directly
curl -X POST https://login.microsoftonline.com/{tenant}/oauth2/v2.0/token \
-d "client_id={client_id}" \
-d "client_secret={client_secret}" \
-d "grant_type=client_credentials" \
-d "scope=api://kafka-cluster/.default"
# Decode JWT
echo $TOKEN | cut -d'.' -f2 | base64 -d | jq .
Verify JWKS Endpoint¶
# Fetch JWKS
curl https://login.microsoftonline.com/{tenant}/discovery/v2.0/keys | jq .
Check Clock Synchronization¶
OAuth tokens are time-sensitive:
# Check time
date
timedatectl status
# Compare with IdP
curl -I https://login.microsoftonline.com | grep Date
Related Documentation¶
- Authentication Overview - Mechanism comparison
- SASL/SCRAM - Password-based authentication
- Delegation Tokens - Lightweight tokens
- Authorization - ACL configuration
- Encryption - TLS setup