Skip to content
Maintained by AxonOps — production-grade documentation from engineers who operate distributed databases at scale

Implementing Dead Letter Queues

This guide covers implementation patterns for Dead Letter Queues in Kafka applications. For conceptual background, see Dead Letter Queue Concepts.


Basic DLQ Producer

A DLQ producer wraps the error handling logic and sends failed messages to a dead letter topic.

uml diagram

Java Implementation

public class DLQProducer {

    private final KafkaProducer<byte[], byte[]> producer;
    private final String dlqTopicSuffix;

    public DLQProducer(Properties props, String dlqTopicSuffix) {
        // Use byte[] serializers to preserve original payload
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
            ByteArraySerializer.class.getName());
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
            ByteArraySerializer.class.getName());

        this.producer = new KafkaProducer<>(props);
        this.dlqTopicSuffix = dlqTopicSuffix;
    }

    public void sendToDLQ(ConsumerRecord<byte[], byte[]> original,
                          Exception error,
                          int retryCount) {

        String dlqTopic = original.topic() + dlqTopicSuffix;

        // Build headers with error metadata
        List<Header> headers = new ArrayList<>(original.headers().toArray().length + 10);

        // Preserve original headers
        for (Header h : original.headers()) {
            headers.add(h);
        }

        // Add DLQ metadata
        headers.add(header("dlq.original.topic", original.topic()));
        headers.add(header("dlq.original.partition", original.partition()));
        headers.add(header("dlq.original.offset", original.offset()));
        headers.add(header("dlq.original.timestamp", original.timestamp()));
        headers.add(header("dlq.error.class", error.getClass().getName()));
        headers.add(header("dlq.error.message", error.getMessage()));
        headers.add(header("dlq.retry.count", retryCount));
        headers.add(header("dlq.failed.timestamp", System.currentTimeMillis()));

        ProducerRecord<byte[], byte[]> dlqRecord = new ProducerRecord<>(
            dlqTopic,
            null,  // Let Kafka assign partition
            original.key(),
            original.value(),
            headers
        );

        producer.send(dlqRecord, (metadata, ex) -> {
            if (ex != null) {
                // DLQ send failed - log and alert
                log.error("Failed to send to DLQ: topic={}, offset={}",
                    original.topic(), original.offset(), ex);
            }
        });
    }

    private RecordHeader header(String key, Object value) {
        return new RecordHeader(key, String.valueOf(value).getBytes(UTF_8));
    }
}

Consumer with DLQ Integration

Basic Pattern

uml diagram

Implementation

public class DLQConsumer {

    private final KafkaConsumer<byte[], byte[]> consumer;
    private final DLQProducer dlqProducer;
    private final MessageProcessor processor;
    private final int maxRetries;
    private final Map<TopicPartitionOffset, Integer> retryCount = new HashMap<>();

    public void consume() {
        while (running) {
            ConsumerRecords<byte[], byte[]> records = consumer.poll(Duration.ofMillis(100));

            for (ConsumerRecord<byte[], byte[]> record : records) {
                processWithDLQ(record);
            }

            consumer.commitSync();
        }
    }

    private void processWithDLQ(ConsumerRecord<byte[], byte[]> record) {
        TopicPartitionOffset tpo = new TopicPartitionOffset(
            record.topic(), record.partition(), record.offset());

        int attempts = retryCount.getOrDefault(tpo, 0);

        try {
            processor.process(record);
            retryCount.remove(tpo);  // Success - clear retry count

        } catch (TransientException e) {
            // Transient error - retry or DLQ
            if (attempts < maxRetries) {
                retryCount.put(tpo, attempts + 1);
                throw e;  // Will be retried on next poll
            } else {
                dlqProducer.sendToDLQ(record, e, attempts);
                retryCount.remove(tpo);
            }

        } catch (PermanentException e) {
            // Permanent error - straight to DLQ
            dlqProducer.sendToDLQ(record, e, attempts);
            retryCount.remove(tpo);

        } catch (Exception e) {
            // Unknown error - treat as permanent
            dlqProducer.sendToDLQ(record, e, attempts);
            retryCount.remove(tpo);
        }
    }
}

Error Classification

Proper error classification determines whether to retry or send to DLQ immediately.

uml diagram

Implementation

public class ErrorClassifier {

    private static final Set<Class<? extends Exception>> TRANSIENT_ERRORS = Set.of(
        ConnectException.class,
        SocketTimeoutException.class,
        RetryableException.class,
        SQLTransientException.class,
        OptimisticLockException.class
    );

    private static final Set<Class<? extends Exception>> PERMANENT_ERRORS = Set.of(
        SerializationException.class,
        JsonParseException.class,
        ValidationException.class,
        ConstraintViolationException.class,
        IllegalArgumentException.class
    );

    public ErrorType classify(Exception e) {
        // Check for poison messages (errors that might crash the JVM)
        if (e instanceof OutOfMemoryError ||
            e instanceof StackOverflowError) {
            return ErrorType.POISON;
        }

        // Check exception hierarchy
        for (Class<? extends Exception> transient : TRANSIENT_ERRORS) {
            if (transient.isAssignableFrom(e.getClass())) {
                return ErrorType.TRANSIENT;
            }
        }

        for (Class<? extends Exception> permanent : PERMANENT_ERRORS) {
            if (permanent.isAssignableFrom(e.getClass())) {
                return ErrorType.PERMANENT;
            }
        }

        // Check cause chain
        Throwable cause = e.getCause();
        if (cause != null && cause != e) {
            return classify((Exception) cause);
        }

        // Default to permanent (fail safe)
        return ErrorType.PERMANENT;
    }

    public boolean isTransient(Exception e) {
        return classify(e) == ErrorType.TRANSIENT;
    }
}

Retry with Backoff

Exponential Backoff Pattern

uml diagram

Implementation with Retry Topics

public class RetryableConsumer {

    private final KafkaConsumer<byte[], byte[]> consumer;
    private final KafkaProducer<byte[], byte[]> producer;
    private final String baseTopic;
    private final List<RetryTier> retryTiers;
    private final String dlqTopic;

    record RetryTier(String topic, Duration delay, int maxAttempts) {}

    public RetryableConsumer(String baseTopic) {
        this.baseTopic = baseTopic;
        this.dlqTopic = baseTopic + ".dlq";
        this.retryTiers = List.of(
            new RetryTier(baseTopic + ".retry-1m", Duration.ofMinutes(1), 3),
            new RetryTier(baseTopic + ".retry-5m", Duration.ofMinutes(5), 2),
            new RetryTier(baseTopic + ".retry-30m", Duration.ofMinutes(30), 1)
        );

        // Subscribe to main topic and all retry topics
        List<String> topics = new ArrayList<>();
        topics.add(baseTopic);
        retryTiers.forEach(t -> topics.add(t.topic()));
        consumer.subscribe(topics);
    }

    public void processWithRetry(ConsumerRecord<byte[], byte[]> record) {
        int attempt = getAttemptNumber(record);
        String sourceTopic = getOriginalTopic(record);

        try {
            process(record);
        } catch (Exception e) {
            if (!errorClassifier.isTransient(e)) {
                sendToDLQ(record, e, attempt);
                return;
            }

            // Find next retry tier
            Optional<RetryTier> nextTier = findNextTier(record.topic(), attempt);

            if (nextTier.isPresent()) {
                sendToRetry(record, nextTier.get(), attempt + 1);
            } else {
                sendToDLQ(record, e, attempt);
            }
        }
    }

    private void sendToRetry(ConsumerRecord<byte[], byte[]> record,
                             RetryTier tier,
                             int attempt) {
        List<Header> headers = new ArrayList<>();
        copyHeaders(record.headers(), headers);

        headers.add(header("retry.attempt", attempt));
        headers.add(header("retry.original.topic", getOriginalTopic(record)));
        headers.add(header("retry.scheduled.time",
            System.currentTimeMillis() + tier.delay().toMillis()));

        ProducerRecord<byte[], byte[]> retryRecord = new ProducerRecord<>(
            tier.topic(), null, record.key(), record.value(), headers);

        producer.send(retryRecord);
    }

    private int getAttemptNumber(ConsumerRecord<byte[], byte[]> record) {
        Header h = record.headers().lastHeader("retry.attempt");
        return h != null ? Integer.parseInt(new String(h.value())) : 1;
    }

    private String getOriginalTopic(ConsumerRecord<byte[], byte[]> record) {
        Header h = record.headers().lastHeader("retry.original.topic");
        return h != null ? new String(h.value()) : record.topic();
    }
}

Delayed Consumption for Retry Topics

Retry topics need delayed consumption. Two approaches:

Approach 1: Consumer Pause/Resume

uml diagram

public class DelayedRetryConsumer {

    public void consumeWithDelay() {
        while (running) {
            ConsumerRecords<byte[], byte[]> records = consumer.poll(Duration.ofMillis(100));

            for (ConsumerRecord<byte[], byte[]> record : records) {
                long scheduledTime = getScheduledTime(record);
                long now = System.currentTimeMillis();

                if (scheduledTime > now) {
                    // Pause partition and wait
                    TopicPartition tp = new TopicPartition(
                        record.topic(), record.partition());

                    consumer.pause(List.of(tp));

                    try {
                        Thread.sleep(scheduledTime - now);
                    } finally {
                        consumer.resume(List.of(tp));
                    }
                }

                processWithRetry(record);
            }

            consumer.commitSync();
        }
    }

    private long getScheduledTime(ConsumerRecord<byte[], byte[]> record) {
        Header h = record.headers().lastHeader("retry.scheduled.time");
        return h != null ? Long.parseLong(new String(h.value())) : 0;
    }
}

Approach 2: Timestamp-Based Partitioning

// Use message timestamp + delay as the retry time
// Consumer only processes messages where timestamp + delay <= now

public class TimestampBasedRetryConsumer {

    private final Duration retryDelay;

    public void consume() {
        while (running) {
            ConsumerRecords<byte[], byte[]> records = consumer.poll(Duration.ofMillis(100));

            List<ConsumerRecord<byte[], byte[]>> ready = new ArrayList<>();
            List<ConsumerRecord<byte[], byte[]>> notReady = new ArrayList<>();

            long now = System.currentTimeMillis();

            for (ConsumerRecord<byte[], byte[]> record : records) {
                if (record.timestamp() + retryDelay.toMillis() <= now) {
                    ready.add(record);
                } else {
                    notReady.add(record);
                }
            }

            // Process ready messages
            for (ConsumerRecord<byte[], byte[]> record : ready) {
                processWithRetry(record);
            }

            // Pause partitions with not-ready messages
            if (!notReady.isEmpty()) {
                // Seek back and pause - will retry on next poll
                // when delay has elapsed
            }
        }
    }
}

Spring Kafka Integration

Spring Kafka provides built-in DLQ support through DefaultErrorHandler and DeadLetterPublishingRecoverer.

uml diagram

Configuration

@Configuration
public class KafkaConfig {

    @Bean
    public DefaultErrorHandler errorHandler(
            KafkaTemplate<Object, Object> kafkaTemplate) {

        // DLQ recoverer - sends to {topic}.DLT
        DeadLetterPublishingRecoverer recoverer =
            new DeadLetterPublishingRecoverer(kafkaTemplate,
                (record, ex) -> new TopicPartition(
                    record.topic() + ".DLT", record.partition()));

        // Backoff: 1s, 2s, 4s, then DLQ
        ExponentialBackOff backOff = new ExponentialBackOff(1000L, 2.0);
        backOff.setMaxElapsedTime(10000L);

        DefaultErrorHandler handler = new DefaultErrorHandler(recoverer, backOff);

        // Don't retry these exceptions
        handler.addNotRetryableExceptions(
            SerializationException.class,
            DeserializationException.class,
            ValidationException.class
        );

        return handler;
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, String>
            kafkaListenerContainerFactory(
                ConsumerFactory<String, String> consumerFactory,
                DefaultErrorHandler errorHandler) {

        ConcurrentKafkaListenerContainerFactory<String, String> factory =
            new ConcurrentKafkaListenerContainerFactory<>();

        factory.setConsumerFactory(consumerFactory);
        factory.setCommonErrorHandler(errorHandler);

        return factory;
    }
}

Listener

@Service
public class OrderConsumer {

    @KafkaListener(topics = "orders")
    public void consume(ConsumerRecord<String, Order> record) {
        // Exceptions thrown here trigger the error handler
        Order order = record.value();
        validateOrder(order);
        processOrder(order);
    }

    @KafkaListener(topics = "orders.DLT")  // DLQ listener
    public void consumeDLQ(ConsumerRecord<String, Order> record) {
        log.error("DLQ message: topic={}, partition={}, offset={}, error={}",
            record.topic(),
            record.partition(),
            record.offset(),
            new String(record.headers()
                .lastHeader(KafkaHeaders.DLT_EXCEPTION_MESSAGE).value()));

        // Alert, store for review, etc.
        alertService.sendDLQAlert(record);
    }
}

Kafka Streams DLQ

Kafka Streams requires custom handling since there's no built-in DLQ.

uml diagram

Implementation

public class StreamsWithDLQ {

    public Topology buildTopology() {
        StreamsBuilder builder = new StreamsBuilder();

        KStream<String, byte[]> orders = builder.stream("orders");

        // Branch into success and failure streams
        Map<String, KStream<String, byte[]>> branches = orders
            .split(Named.as("process-"))
            .branch((key, value) -> tryProcess(key, value),
                    Branched.as("success"))
            .defaultBranch(Branched.as("failure"));

        // Success stream to output topic
        branches.get("process-success")
            .to("orders.processed");

        // Failure stream to DLQ with error headers
        branches.get("process-failure")
            .transformValues(DLQTransformer::new)
            .to("orders.dlq");

        return builder.build();
    }

    private boolean tryProcess(String key, byte[] value) {
        try {
            // Attempt processing
            processOrder(key, value);
            return true;
        } catch (Exception e) {
            // Store error for DLQ transformer
            errorContext.set(e);
            return false;
        }
    }

    static class DLQTransformer implements ValueTransformer<byte[], byte[]> {
        private ProcessorContext context;

        @Override
        public void init(ProcessorContext context) {
            this.context = context;
        }

        @Override
        public byte[] transform(byte[] value) {
            Exception error = errorContext.get();

            // Add error headers
            context.headers().add("dlq.error.class",
                error.getClass().getName().getBytes());
            context.headers().add("dlq.error.message",
                error.getMessage().getBytes());
            context.headers().add("dlq.failed.timestamp",
                String.valueOf(System.currentTimeMillis()).getBytes());

            return value;
        }

        @Override
        public void close() {}
    }
}

Deserialization Error DLQ

Handle deserialization errors before they reach the consumer.

uml diagram

ErrorHandlingDeserializer Configuration

Properties props = new Properties();
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
    ErrorHandlingDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
    ErrorHandlingDeserializer.class.getName());

// Delegate deserializers
props.put(ErrorHandlingDeserializer.KEY_DESERIALIZER_CLASS,
    StringDeserializer.class.getName());
props.put(ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS,
    JsonDeserializer.class.getName());

// Consumer receives null + headers on deser failure

Handling Deserialization Failures

@KafkaListener(topics = "orders")
public void consume(ConsumerRecord<String, Order> record,
                    @Header(name = KafkaHeaders.DLT_EXCEPTION_MESSAGE,
                            required = false) byte[] errorMessage) {

    if (errorMessage != null) {
        // Deserialization failed - send raw bytes to DLQ
        dlqProducer.sendDeserializationError(record, new String(errorMessage));
        return;
    }

    // Normal processing
    processOrder(record.value());
}

Testing DLQ Implementation

@EmbeddedKafka(partitions = 1,
    topics = {"orders", "orders.dlq"})
class DLQConsumerTest {

    @Autowired
    private EmbeddedKafkaBroker embeddedKafka;

    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    @Test
    void shouldSendPoisonMessageToDLQ() throws Exception {
        // Send poison message (invalid JSON)
        kafkaTemplate.send("orders", "order-1", "{invalid json}");

        // Wait for DLQ
        Consumer<String, String> dlqConsumer = createConsumer("orders.dlq");
        ConsumerRecords<String, String> dlqRecords =
            KafkaTestUtils.getRecords(dlqConsumer, Duration.ofSeconds(10));

        assertThat(dlqRecords.count()).isEqualTo(1);

        ConsumerRecord<String, String> dlqRecord = dlqRecords.iterator().next();
        assertThat(dlqRecord.value()).isEqualTo("{invalid json}");
        assertThat(dlqRecord.headers().lastHeader("dlq.error.class"))
            .isNotNull();
    }

    @Test
    void shouldRetryTransientErrorsBeforeDLQ() throws Exception {
        // Configure mock to fail 2 times then succeed
        doThrow(new ConnectException("timeout"))
            .doThrow(new ConnectException("timeout"))
            .doNothing()
            .when(mockProcessor).process(any());

        kafkaTemplate.send("orders", "order-1", "{\"id\": 1}");

        // Verify no DLQ (succeeded after retry)
        Consumer<String, String> dlqConsumer = createConsumer("orders.dlq");
        ConsumerRecords<String, String> dlqRecords =
            KafkaTestUtils.getRecords(dlqConsumer, Duration.ofSeconds(5));

        assertThat(dlqRecords.count()).isEqualTo(0);

        // Verify processing happened 3 times
        verify(mockProcessor, times(3)).process(any());
    }
}