Skip to content
Maintained by AxonOps — production-grade documentation from engineers who operate distributed databases at scale Get Cassandra Help Get Kafka Help

Kafka Connect Converters

Converters handle serialization and deserialization between Connect's internal data format and Kafka's byte arrays.


Overview

uml diagram


Available Converters

Converter Format Schema Support Use Case
JsonConverter JSON Optional Development, debugging
AvroConverter Avro binary Yes (SR) Production, schema evolution
ProtobufConverter Protobuf binary Yes (SR) gRPC ecosystems
JsonSchemaConverter JSON Yes (SR) JSON with validation
StringConverter Plain text No Simple text data
ByteArrayConverter Raw bytes No Pre-serialized data

JSON Converter

Without Schema

key.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=false
value.converter=org.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable=false

Output:

{"user_id": 123, "event": "login", "timestamp": 1699900000}

With Embedded Schema

key.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=true
value.converter=org.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable=true

Output:

{
  "schema": {
    "type": "struct",
    "fields": [
      {"field": "user_id", "type": "int32"},
      {"field": "event", "type": "string"},
      {"field": "timestamp", "type": "int64"}
    ]
  },
  "payload": {
    "user_id": 123,
    "event": "login",
    "timestamp": 1699900000
  }
}

With External Schema (Kafka 4.2+)

In Kafka 4.2+ (KIP-1054), JsonConverter supports external schemas via the schema.content configuration. When enabled, the schema is stored externally rather than embedded in each message, significantly reducing message sizes for high-volume topics.

key.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=true
key.converter.schema.content=external
value.converter=org.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable=true
value.converter.schema.content=external

With schema.content=external, only the payload is included in the message body. The schema is registered and retrieved from Schema Registry, combining the simplicity of JSON with the efficiency of schema-managed formats.


Avro Converter

Requires Schema Registry.

key.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=http://schema-registry:8081
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://schema-registry:8081

# Optional: auto-register schemas
value.converter.auto.register.schemas=true

Benefits: - Compact binary format - Schema evolution support - Type safety


Protobuf Converter

key.converter=io.confluent.connect.protobuf.ProtobufConverter
key.converter.schema.registry.url=http://schema-registry:8081
value.converter=io.confluent.connect.protobuf.ProtobufConverter
value.converter.schema.registry.url=http://schema-registry:8081

String Converter

For simple text data without schema:

key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.storage.StringConverter

Converter Selection

Scenario Recommended
Development JsonConverter (schemas.enable=false)
Production AvroConverter or ProtobufConverter
Existing JSON consumers JsonConverter or JsonSchemaConverter
Text logs StringConverter
Pre-serialized ByteArrayConverter

Per-Connector Override

Override worker-level converters for specific connectors:

{
  "name": "my-connector",
  "config": {
    "connector.class": "...",
    "key.converter": "org.apache.kafka.connect.storage.StringConverter",
    "value.converter": "org.apache.kafka.connect.json.JsonConverter",
    "value.converter.schemas.enable": "false"
  }
}