Kafka Connect¶
Kafka Connect is a framework for streaming data between Apache Kafka and external systems using pre-built or custom connectors.
Overview¶
Kafka Connect eliminates the need to write custom integration code for common data sources and sinks. The framework handles:
- Parallelization and scaling
- Offset management and exactly-once delivery
- Schema integration with Schema Registry
- Fault tolerance and automatic recovery
- Standardized monitoring and operations
Architecture¶
Components¶
| Component | Description |
|---|---|
| Worker | JVM process that executes connectors and tasks |
| Connector | Plugin that defines how to connect to external system |
| Task | Unit of work; connectors are divided into tasks for parallelism |
| Converter | Serializes/deserializes data between Connect and Kafka |
| Transform | Modifies records in-flight (Single Message Transforms) |
Worker Architecture¶
Data Flow¶
Deployment Modes¶
Standalone Mode¶
Single worker process—suitable for development and simple use cases.
# Start standalone worker
connect-standalone.sh \
config/connect-standalone.properties \
config/file-source.properties \
config/file-sink.properties
Standalone properties:
bootstrap.servers=localhost:9092
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
# Offset storage (local file)
offset.storage.file.filename=/tmp/connect.offsets
# REST API
rest.port=8083
| Characteristic | Standalone Mode |
|---|---|
| Workers | Single process |
| Offset storage | Local file |
| Fault tolerance | None |
| Scaling | Not supported |
| Use case | Development, testing |
Distributed Mode¶
Multiple workers forming a cluster—required for production.
# Start distributed worker (on each node)
connect-distributed.sh config/connect-distributed.properties
Distributed properties:
bootstrap.servers=kafka1:9092,kafka2:9092,kafka3:9092
# Group coordination
group.id=connect-cluster
# Offset storage (Kafka topics)
offset.storage.topic=connect-offsets
offset.storage.replication.factor=3
offset.storage.partitions=25
# Config storage
config.storage.topic=connect-configs
config.storage.replication.factor=3
# Status storage
status.storage.topic=connect-status
status.storage.replication.factor=3
status.storage.partitions=5
# Converters
key.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=http://schema-registry:8081
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://schema-registry:8081
# REST API
rest.advertised.host.name=connect-worker-1
rest.port=8083
| Characteristic | Distributed Mode |
|---|---|
| Workers | Multiple processes (cluster) |
| Offset storage | Kafka topic (connect-offsets) |
| Fault tolerance | Automatic task redistribution |
| Scaling | Add workers to scale |
| Use case | Production |
Internal Topics¶
| Topic | Purpose | Recommended Config |
|---|---|---|
connect-offsets |
Source connector offsets | RF=3, partitions=25 |
connect-configs |
Connector configurations | RF=3, partitions=1, compacted |
connect-status |
Connector/task status | RF=3, partitions=5, compacted |
Deployment Architectures¶
Standalone Architecture¶
Single worker process for development and simple integrations.
Distributed Architecture¶
Multi-worker cluster for production deployments.
Co-located Deployment¶
Connect workers on Kafka broker nodes—suitable for smaller clusters.
Dedicated Connect Cluster¶
Separate Connect cluster—recommended for production.
Sidecar Pattern¶
Connect worker per application—useful for application-specific integrations.
Kubernetes Deployment¶
Connect cluster on Kubernetes with horizontal scaling.
Kubernetes manifest example:
apiVersion: apps/v1
kind: Deployment
metadata:
name: kafka-connect
spec:
replicas: 3
selector:
matchLabels:
app: kafka-connect
template:
metadata:
labels:
app: kafka-connect
spec:
containers:
- name: connect
image: confluentinc/cp-kafka-connect:7.5.0
ports:
- containerPort: 8083
env:
- name: CONNECT_BOOTSTRAP_SERVERS
value: "kafka:9092"
- name: CONNECT_GROUP_ID
value: "connect-cluster"
- name: CONNECT_REST_ADVERTISED_HOST_NAME
valueFrom:
fieldRef:
fieldPath: status.podIP
resources:
requests:
memory: "2Gi"
cpu: "1"
limits:
memory: "4Gi"
cpu: "2"
---
apiVersion: v1
kind: Service
metadata:
name: kafka-connect
spec:
ports:
- port: 8083
selector:
app: kafka-connect
Multi-Region Deployment¶
Separate Connect clusters per region for geo-distributed workloads.
Deployment Comparison¶
| Pattern | Scaling | Fault Tolerance | Resource Isolation | Use Case |
|---|---|---|---|---|
| Standalone | None | None | N/A | Development |
| Co-located | With brokers | Shared | Poor | Small clusters |
| Dedicated cluster | Independent | Independent | Good | Production |
| Sidecar | Per application | Per application | Excellent | App-specific |
| Kubernetes | HPA/VPA | Pod replacement | Good | Cloud-native |
| Multi-region | Per region | Regional | Excellent | Global deployments |
Sizing Guidelines¶
| Workload | Workers | Memory per Worker | CPU per Worker |
|---|---|---|---|
| Light (< 10 connectors) | 2-3 | 2-4 GB | 1-2 cores |
| Medium (10-50 connectors) | 3-5 | 4-8 GB | 2-4 cores |
| Heavy (50+ connectors) | 5-10+ | 8-16 GB | 4-8 cores |
Worker Sizing
Worker memory depends on:
- Number of tasks per worker
- Message size and throughput
- Converter complexity (Avro/Protobuf vs JSON)
- Transform chain depth
Connector Configuration¶
Creating Connectors via REST API¶
# Create a Cassandra Sink connector
curl -X POST http://connect:8083/connectors \
-H "Content-Type: application/json" \
-d '{
"name": "cassandra-sink",
"config": {
"connector.class": "com.datastax.oss.kafka.sink.CassandraSinkConnector",
"tasks.max": "1",
"topics": "events",
"contactPoints": "cassandra1,cassandra2,cassandra3",
"loadBalancing.localDc": "datacenter1",
"port": "9042",
"topic.events.keyspace.table.mapping": "events.events_by_time",
"topic.events.keyspace.table.consistencyLevel": "LOCAL_QUORUM"
}
}'
Common Configuration Properties¶
| Property | Description |
|---|---|
name |
Unique connector name |
connector.class |
Fully qualified connector class |
tasks.max |
Maximum number of tasks |
key.converter |
Key converter (overrides worker default) |
value.converter |
Value converter (overrides worker default) |
transforms |
Comma-separated list of transforms |
errors.tolerance |
Error handling: none, all |
errors.deadletterqueue.topic.name |
Dead letter queue topic |
Connector Lifecycle¶
REST API Reference¶
Connector Management¶
| Endpoint | Method | Description |
|---|---|---|
/connectors |
GET | List all connectors |
/connectors |
POST | Create connector |
/connectors/{name} |
GET | Get connector info |
/connectors/{name} |
DELETE | Delete connector |
/connectors/{name}/config |
GET | Get connector config |
/connectors/{name}/config |
PUT | Update connector config |
/connectors/{name}/status |
GET | Get connector status |
/connectors/{name}/restart |
POST | Restart connector |
/connectors/{name}/pause |
PUT | Pause connector |
/connectors/{name}/resume |
PUT | Resume connector |
/connectors/{name}/stop |
PUT | Stop connector (deallocate resources) |
Task Management¶
| Endpoint | Method | Description |
|---|---|---|
/connectors/{name}/tasks |
GET | List tasks |
/connectors/{name}/tasks/{id}/status |
GET | Get task status |
/connectors/{name}/tasks/{id}/restart |
POST | Restart task |
Offset Management¶
Manage connector offsets (connector must be stopped):
| Endpoint | Method | Description |
|---|---|---|
/connectors/{name}/offsets |
GET | Get current offsets |
/connectors/{name}/offsets |
DELETE | Reset offsets |
/connectors/{name}/offsets |
PATCH | Alter offsets |
Alter source connector offsets:
# Stop connector first
curl -X PUT http://connect:8083/connectors/my-connector/stop
# Alter offsets
curl -X PATCH http://connect:8083/connectors/my-connector/offsets \
-H "Content-Type: application/json" \
-d '{
"offsets": [
{
"partition": {"filename": "test.txt"},
"offset": {"position": 30}
}
]
}'
# Resume connector
curl -X PUT http://connect:8083/connectors/my-connector/resume
Reset sink connector offsets:
# Stop and reset to re-consume from beginning
curl -X PUT http://connect:8083/connectors/my-sink/stop
curl -X DELETE http://connect:8083/connectors/my-sink/offsets
curl -X PUT http://connect:8083/connectors/my-sink/resume
Cluster Information¶
| Endpoint | Method | Description |
|---|---|---|
/ |
GET | Cluster info |
/connector-plugins |
GET | List installed plugins |
/connector-plugins/{plugin}/config/validate |
PUT | Validate config |
Admin Logging¶
Dynamically adjust log levels:
| Endpoint | Method | Description |
|---|---|---|
/admin/loggers |
GET | List loggers with explicit levels |
/admin/loggers/{name} |
GET | Get logger level |
/admin/loggers/{name} |
PUT | Set logger level |
# Enable debug logging for connector
curl -X PUT http://connect:8083/admin/loggers/org.apache.kafka.connect \
-H "Content-Type: application/json" \
-d '{"level": "DEBUG"}'
Converters¶
Converters serialize and deserialize data between Connect's internal format and Kafka.
Available Converters¶
| Converter | Format | Schema Support |
|---|---|---|
JsonConverter |
JSON | Optional (schemas.enable) |
AvroConverter |
Avro binary | Yes (Schema Registry) |
ProtobufConverter |
Protobuf binary | Yes (Schema Registry) |
JsonSchemaConverter |
JSON with schema | Yes (Schema Registry) |
StringConverter |
Plain string | No |
ByteArrayConverter |
Raw bytes | No |
Converter Configuration¶
# JSON without schemas (simple)
key.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=false
value.converter=org.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable=false
# Avro with Schema Registry (production)
key.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=http://schema-registry:8081
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://schema-registry:8081
Converter Selection Guide¶
| Use Case | Recommended Converter |
|---|---|
| Development/debugging | JsonConverter (schemas.enable=false) |
| Production with schema evolution | AvroConverter or ProtobufConverter |
| Existing JSON consumers | JsonConverter or JsonSchemaConverter |
| Maximum compatibility | StringConverter (manual serialization) |
Single Message Transforms (SMTs)¶
SMTs modify records as they flow through Connect—useful for simple transformations without custom code.
Built-in Transforms¶
| Transform | Description |
|---|---|
InsertField |
Add field with static or metadata value |
ReplaceField |
Rename, include, or exclude fields |
MaskField |
Replace field value with valid null |
ValueToKey |
Copy fields from value to key |
ExtractField |
Extract single field from struct |
SetSchemaMetadata |
Set schema name and version |
TimestampRouter |
Route to topic based on timestamp |
RegexRouter |
Route to topic based on regex |
Flatten |
Flatten nested structures |
Cast |
Cast field to different type |
HeaderFrom |
Copy field to header |
InsertHeader |
Add static header |
DropHeaders |
Remove headers |
Filter |
Drop records matching predicate |
Transform Configuration¶
{
"name": "my-connector",
"config": {
"connector.class": "...",
"transforms": "addTimestamp,route",
"transforms.addTimestamp.type": "org.apache.kafka.connect.transforms.InsertField$Value",
"transforms.addTimestamp.timestamp.field": "processed_at",
"transforms.route.type": "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.route.regex": "(.*)_raw",
"transforms.route.replacement": "$1_processed"
}
}
Transform Chain¶
Predicates¶
Apply transforms conditionally based on message properties:
Built-in Predicates:
| Predicate | Description |
|---|---|
TopicNameMatches |
Match records where topic name matches regex |
HasHeaderKey |
Match records with specific header key |
RecordIsTombstone |
Match tombstone records (null value) |
Predicate Configuration:
{
"name": "my-connector",
"config": {
"connector.class": "...",
"transforms": "FilterFoo,ExtractBar",
"transforms.FilterFoo.type": "org.apache.kafka.connect.transforms.Filter",
"transforms.FilterFoo.predicate": "IsFoo",
"transforms.ExtractBar.type": "org.apache.kafka.connect.transforms.ExtractField$Key",
"transforms.ExtractBar.field": "other_field",
"transforms.ExtractBar.predicate": "IsBar",
"transforms.ExtractBar.negate": "true",
"predicates": "IsFoo,IsBar",
"predicates.IsFoo.type": "org.apache.kafka.connect.transforms.predicates.TopicNameMatches",
"predicates.IsFoo.pattern": "foo",
"predicates.IsBar.type": "org.apache.kafka.connect.transforms.predicates.TopicNameMatches",
"predicates.IsBar.pattern": "bar"
}
}
| Property | Description |
|---|---|
predicate |
Associate predicate alias with transform |
negate |
Invert predicate match (apply when NOT matched) |
Error Handling¶
Error Tolerance¶
{
"name": "my-connector",
"config": {
"connector.class": "...",
"errors.tolerance": "all",
"errors.deadletterqueue.topic.name": "my-connector-dlq",
"errors.deadletterqueue.topic.replication.factor": 3,
"errors.deadletterqueue.context.headers.enable": true,
"errors.log.enable": true,
"errors.log.include.messages": true
}
}
| Configuration | Description |
|---|---|
errors.tolerance=none |
Fail on first error (default) |
errors.tolerance=all |
Log errors and continue |
errors.deadletterqueue.topic.name |
Topic for failed records |
errors.deadletterqueue.context.headers.enable |
Include error context in headers |
errors.log.enable |
Log errors to Connect log |
errors.log.include.messages |
Include record content in logs |
Dead Letter Queue¶
Exactly-Once Delivery¶
Kafka Connect supports exactly-once semantics for source connectors (Kafka 3.3+).
Source Connector EOS¶
# Worker configuration
exactly.once.source.support=enabled
transaction.boundary=poll # or connector, interval
# Connector configuration (automatically uses transactions)
| Transaction Boundary | Behavior |
|---|---|
poll |
Transaction per poll() call |
connector |
Connector defines boundaries |
interval |
Transaction every N milliseconds |
Sink Connector EOS¶
Sink connectors achieve exactly-once through idempotent writes to external systems:
| Strategy | Implementation |
|---|---|
| Upsert | Use primary key for idempotent updates |
| Deduplication | Track processed offsets in sink |
| Transactions | Commit offset with sink transaction |
Monitoring¶
Key Metrics¶
| Metric | Description | Alert Threshold |
|---|---|---|
connector-count |
Number of connectors | Expected count |
task-count |
Number of running tasks | Expected count |
connector-startup-failure-total |
Connector startup failures | > 0 |
task-startup-failure-total |
Task startup failures | > 0 |
source-record-poll-total |
Records polled by source | Depends on workload |
sink-record-send-total |
Records sent by sink | Depends on workload |
offset-commit-failure-total |
Offset commit failures | > 0 |
deadletterqueue-produce-total |
Records sent to DLQ | > 0 (investigate) |
JMX MBeans¶
kafka.connect:type=connector-metrics,connector={connector}
kafka.connect:type=connector-task-metrics,connector={connector},task={task}
kafka.connect:type=source-task-metrics,connector={connector},task={task}
kafka.connect:type=sink-task-metrics,connector={connector},task={task}
Scaling¶
Horizontal Scaling¶
Add workers to the Connect cluster to distribute load:
Task Parallelism¶
Increase tasks.max for connectors that support parallelism:
{
"name": "jdbc-source",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
"tasks.max": "10",
"table.whitelist": "orders,customers,products"
}
}
| Connector Type | Parallelism Model |
|---|---|
| HTTP Source | One task per endpoint (typically) |
| File Source | One task per file or directory |
| S3 Sink | Tasks share topic partitions |
| Cassandra Sink | Tasks share topic partitions |
Connector Development¶
Building custom connectors to integrate Kafka with proprietary or unsupported systems.
Connector Components¶
| Component | Interface | Purpose |
|---|---|---|
| SourceConnector | SourceConnector |
Configuration and task distribution for imports |
| SourceTask | SourceTask |
Read data from external system |
| SinkConnector | SinkConnector |
Configuration and task distribution for exports |
| SinkTask | SinkTask |
Write data to external system |
Source Connector Structure¶
public class MySourceConnector extends SourceConnector {
private Map<String, String> configProps;
@Override
public void start(Map<String, String> props) {
this.configProps = props;
}
@Override
public Class<? extends Task> taskClass() {
return MySourceTask.class;
}
@Override
public List<Map<String, String>> taskConfigs(int maxTasks) {
// Distribute work across tasks
List<Map<String, String>> configs = new ArrayList<>();
for (int i = 0; i < maxTasks; i++) {
Map<String, String> taskConfig = new HashMap<>(configProps);
taskConfig.put("task.id", String.valueOf(i));
configs.add(taskConfig);
}
return configs;
}
@Override
public void stop() {
// Clean up resources
}
@Override
public ConfigDef config() {
return new ConfigDef()
.define("connection.url", Type.STRING, Importance.HIGH, "Connection URL");
}
@Override
public String version() {
return "1.0.0";
}
}
Source Task Implementation¶
public class MySourceTask extends SourceTask {
private String connectionUrl;
@Override
public void start(Map<String, String> props) {
connectionUrl = props.get("connection.url");
// Initialize connection
}
@Override
public List<SourceRecord> poll() throws InterruptedException {
List<SourceRecord> records = new ArrayList<>();
// Read from external system
List<DataItem> items = fetchData();
for (DataItem item : items) {
Map<String, ?> sourcePartition = Collections.singletonMap("source", connectionUrl);
Map<String, ?> sourceOffset = Collections.singletonMap("position", item.getOffset());
records.add(new SourceRecord(
sourcePartition,
sourceOffset,
"target-topic",
Schema.STRING_SCHEMA,
item.getKey(),
Schema.STRING_SCHEMA,
item.getValue()
));
}
return records;
}
@Override
public void stop() {
// Close connections
}
@Override
public String version() {
return "1.0.0";
}
}
Sink Task Implementation¶
public class MySinkTask extends SinkTask {
private ErrantRecordReporter reporter;
@Override
public void start(Map<String, String> props) {
// Initialize connection
try {
reporter = context.errantRecordReporter();
} catch (NoSuchMethodError e) {
reporter = null; // Older Connect runtime
}
}
@Override
public void put(Collection<SinkRecord> records) {
for (SinkRecord record : records) {
try {
writeToDestination(record);
} catch (Exception e) {
if (reporter != null) {
reporter.report(record, e); // Send to DLQ
} else {
throw new ConnectException("Write failed", e);
}
}
}
}
@Override
public void flush(Map<TopicPartition, OffsetAndMetadata> offsets) {
// Ensure all data is persisted before offset commit
}
@Override
public void stop() {
// Close connections
}
@Override
public String version() {
return "1.0.0";
}
}
Plugin Discovery¶
Register connector class in META-INF/services/:
# META-INF/services/org.apache.kafka.connect.source.SourceConnector
com.example.MySourceConnector
# META-INF/services/org.apache.kafka.connect.sink.SinkConnector
com.example.MySinkConnector
Resume from Offset¶
@Override
public void start(Map<String, String> props) {
Map<String, Object> partition = Collections.singletonMap("source", connectionUrl);
Map<String, Object> offset = context.offsetStorageReader().offset(partition);
if (offset != null) {
Long lastPosition = (Long) offset.get("position");
seekToPosition(lastPosition);
}
}
Kafka 4.2 Connect Enhancements¶
Kafka 4.2 includes several Kafka Connect improvements:
| Feature | KIP | Description |
|---|---|---|
| External schema in JsonConverter | KIP-1054 | schema.content configuration for external schemas, reducing message sizes |
| Allowlist override policy | KIP-1188 | New ConnectorClientConfigOverridePolicy based on allowlists, enabling fine-grained control over which client configurations connectors may override |
Connector Client Configuration Override Policy (Kafka 4.2+)¶
In Kafka 4.2+ (KIP-1188), the new "Allowlist" ConnectorClientConfigOverridePolicy provides a middle ground between None (no overrides permitted) and All (all overrides permitted). Administrators can specify an explicit list of client configuration properties that connectors are allowed to override, restricting access to sensitive settings while still enabling necessary customization.
connector.client.config.override.policy=org.apache.kafka.connect.connector.policy.AllowListConnectorClientConfigOverridePolicy
connector.client.config.override.policy.allowed.configs=batch.size,linger.ms,compression.type
Related Documentation¶
- Connectors - Connector guides (Cassandra, S3, HTTP)
- Transforms - Single Message Transforms
- Converters - Serialization configuration
- Error Handling - DLQ and error tolerance
- Exactly-Once - EOS configuration
- Operations - Monitoring and management
- Kafka Connect Concepts - Conceptual overview