Skip to content
Maintained by AxonOps — production-grade documentation from engineers who operate distributed databases at scale Get Cassandra Help Get Kafka Help

Kafka Connect Error Handling

Kafka Connect provides configurable error handling for record processing failures.


Error Tolerance

Fail-Fast (Default)

{
  "errors.tolerance": "none"
}

Connector stops on first error. Use for critical data pipelines where any failure requires attention.

Skip Errors

{
  "errors.tolerance": "all"
}

Log errors and continue processing. Combine with dead letter queue for failed records.


Dead Letter Queue

Route failed records to a DLQ topic for later analysis:

{
  "name": "my-connector",
  "config": {
    "connector.class": "...",
    "errors.tolerance": "all",
    "errors.deadletterqueue.topic.name": "dlq-my-connector",
    "errors.deadletterqueue.topic.replication.factor": "3",
    "errors.deadletterqueue.context.headers.enable": "true"
  }
}

DLQ Headers

When errors.deadletterqueue.context.headers.enable=true:

Header Description
__connect.errors.topic Original topic
__connect.errors.partition Original partition
__connect.errors.offset Original offset
__connect.errors.connector.name Connector name
__connect.errors.task.id Task ID
__connect.errors.exception.class.name Exception class
__connect.errors.exception.message Error message

uml diagram


Error Logging

{
  "errors.log.enable": "true",
  "errors.log.include.messages": "true"
}
Configuration Description
errors.log.enable Log errors to Connect worker log
errors.log.include.messages Include record content (may expose sensitive data)

Error Retry

For transient failures:

{
  "errors.retry.timeout": "300000",
  "errors.retry.delay.max.ms": "60000"
}
Configuration Default Description
errors.retry.timeout 0 Total retry time (0 = no retry)
errors.retry.delay.max.ms 60000 Max delay between retries

Complete Configuration

{
  "name": "resilient-sink",
  "config": {
    "connector.class": "...",
    "topics": "events",

    "errors.tolerance": "all",
    "errors.retry.timeout": "300000",
    "errors.retry.delay.max.ms": "60000",

    "errors.deadletterqueue.topic.name": "dlq-resilient-sink",
    "errors.deadletterqueue.topic.replication.factor": "3",
    "errors.deadletterqueue.context.headers.enable": "true",

    "errors.log.enable": "true",
    "errors.log.include.messages": "false"
  }
}

Processing DLQ Records

// Consume and analyze DLQ
consumer.subscribe(Collections.singletonList("dlq-my-connector"));

while (true) {
    ConsumerRecords<byte[], byte[]> records = consumer.poll(Duration.ofSeconds(1));
    for (ConsumerRecord<byte[], byte[]> record : records) {
        String originalTopic = headerValue(record, "__connect.errors.topic");
        String errorClass = headerValue(record, "__connect.errors.exception.class.name");
        String errorMessage = headerValue(record, "__connect.errors.exception.message");

        log.error("Failed record from {} - {}: {}",
            originalTopic, errorClass, errorMessage);

        // Analyze and potentially replay
    }
}

Error Handling Strategy

Scenario Configuration
Critical data, no loss errors.tolerance=none
High volume, some loss OK errors.tolerance=all + DLQ
Transient failures expected errors.retry.timeout > 0
Debug failures errors.log.include.messages=true