Skip to content
Maintained by AxonOps — production-grade documentation from engineers who operate distributed databases at scale

Kafka Core Protocol APIs

This document specifies the core Kafka protocol APIs used for message production, consumption, and cluster metadata. These APIs form the foundation of all Kafka client operations.


API Key Reference

Core APIs

API Key Name Purpose
0 Produce Send records to partitions
1 Fetch Retrieve records from partitions
2 ListOffsets Query offset by timestamp
3 Metadata Discover cluster topology
18 ApiVersions Query supported API versions

Produce API (Key 0)

Overview

The Produce API sends record batches to topic partitions. It is the primary API for message production.

Version History

Version Kafka Key Changes
0 0.8.0 Initial version (removed in 4.0)
3 0.11.0 Transactional ID + message format v2 (4.0 baseline)
4 0.11.0 KAFKA_STORAGE_ERROR
5 1.0.0 Log start offset in response
7 2.1.0 Zstandard compression (KIP-110)
8 2.3.0 Record errors + error message (KIP-467)
9 2.4.0 Flexible versions
10 3.7.0 Current leader + node endpoints (KIP-951)
11 3.8.0 TRANSACTION_ABORTABLE (KIP-890)
13 4.0.0 Topic IDs (KIP-516)

Request Schema

ProduceRequest =>
    transactional_id: NULLABLE_STRING
    acks: INT16
    timeout_ms: INT32
    topic_data: [TopicData]

TopicData =>
    name: STRING
    topic_id: UUID
    partition_data: [PartitionData]

PartitionData =>
    index: INT32
    records: RECORDS

Topic names are used through v12; v13+ uses topic_id instead.

Field Type Description
transactional_id NULLABLE_STRING Transaction ID (null for non-transactional)
acks INT16 Required acknowledgments (-1, 0, 1)
timeout_ms INT32 Request timeout in milliseconds
topic_data ARRAY Per-topic record data
partition_data ARRAY Per-partition record batches
records RECORDS Record batch data

Response Schema

ProduceResponse =>
    responses: [TopicResponse]
    throttle_time_ms: INT32
    node_endpoints: [NodeEndpoint]

TopicResponse =>
    name: STRING
    topic_id: UUID
    partition_responses: [PartitionResponse]

PartitionResponse =>
    index: INT32
    error_code: INT16
    base_offset: INT64
    log_append_time_ms: INT64
    log_start_offset: INT64
    record_errors: [RecordError]
    error_message: NULLABLE_STRING
    current_leader: LeaderIdAndEpoch

NodeEndpoint =>
    node_id: INT32
    host: STRING
    port: INT32
    rack: NULLABLE_STRING
Field Type Description
error_code INT16 Partition-level error
base_offset INT64 Offset of first record in batch
log_append_time_ms INT64 Timestamp (-1 if CreateTime)
log_start_offset INT64 Log start offset
record_errors ARRAY Per-record errors (v8+)
throttle_time_ms INT32 Quota throttle time
current_leader STRUCT Suggested leader for future requests (v10+)
node_endpoints ARRAY Endpoint list for leaders in response (v10+)

acks Semantics

acks Name Guarantee Response
0 Fire-and-forget None No response sent
1 Leader Leader wrote to local log After leader persist
-1 All All ISR replicas wrote After ISR persist

uml diagram

Behavioral Contract

Aspect Guarantee
Ordering Records within a batch must be written in batch order
Atomicity Batch to single partition must succeed or fail atomically
Durability Depends on acks setting and min.insync.replicas
Idempotence With enable.idempotence, duplicates are prevented

acks=0 No Response

With acks=0, the broker must not send a response. The client must not wait for a response. Delivery is not confirmed.

Error Handling

Error Code Retriable Cause Recovery
NOT_LEADER_OR_FOLLOWER Stale leader Refresh metadata, retry
REQUEST_TIMED_OUT Broker timeout Retry with backoff
NOT_ENOUGH_REPLICAS ISR too small Wait, retry
MESSAGE_TOO_LARGE Record exceeds limit Reduce message size
TOPIC_AUTHORIZATION_FAILED No Write permission Check ACLs
OUT_OF_ORDER_SEQUENCE_NUMBER Sequence gap Fatal for idempotent

Fetch API (Key 1)

Overview

The Fetch API retrieves record batches from topic partitions. It supports long-polling, session-based fetching, and transactional isolation.

Version History

Version Kafka Key Changes
0 0.8.0 Initial version (removed in 4.0)
4 0.11.0 Isolation level (4.0 baseline)
5 1.0.0 Log start offset
7 1.1.0 Fetch sessions
9 2.1.0 Current leader epoch (KIP-320)
10 2.1.0 Zstandard support (KIP-110)
11 2.3.0 Rack ID
12 2.4.0 Flexible versions + last fetched epoch
13 2.8.0 Topic IDs (KIP-516)
14 3.4.0 Tiered storage offset moved (KIP-405)
15 3.5.0 Replica state (KIP-903)
16 3.7.0 Node endpoints (KIP-951)
17 3.8.0 Replica directory ID (KIP-853)
18 4.1.0 High-watermark in request (KIP-1166)

Request Schema

FetchRequest =>
    cluster_id: NULLABLE_STRING
    replica_id: INT32
    replica_state: ReplicaState
    max_wait_ms: INT32
    min_bytes: INT32
    max_bytes: INT32
    isolation_level: INT8
    session_id: INT32
    session_epoch: INT32
    topics: [TopicRequest]
    forgotten_topics_data: [ForgottenTopic]
    rack_id: STRING

TopicRequest =>
    topic: STRING
    topic_id: UUID
    partitions: [PartitionRequest]

PartitionRequest =>
    partition: INT32
    current_leader_epoch: INT32
    fetch_offset: INT64
    last_fetched_epoch: INT32
    log_start_offset: INT64
    partition_max_bytes: INT32
    replica_directory_id: UUID
    high_watermark: INT64

ReplicaState =>
    replica_id: INT32
    replica_epoch: INT64

Topic names are used through v12; v13+ uses topic_id instead.

Field Type Description
cluster_id NULLABLE_STRING Cluster ID for validation (v12+)
replica_id INT32 Replica ID (-1 for consumers, v0-14)
replica_state STRUCT Replica ID + epoch (v15+)
max_wait_ms INT32 Maximum wait time for data
min_bytes INT32 Minimum bytes to return
max_bytes INT32 Maximum bytes to return
isolation_level INT8 0=read_uncommitted, 1=read_committed
session_id INT32 Fetch session ID (0 for new)
fetch_offset INT64 Offset to fetch from
last_fetched_epoch INT32 Last fetched epoch for fencing (v12+)
partition_max_bytes INT32 Maximum bytes per partition
forgotten_topics_data ARRAY Partitions to remove from session (v7+)
rack_id STRING Consumer rack ID (v11+)

Response Schema

FetchResponse =>
    throttle_time_ms: INT32
    error_code: INT16
    session_id: INT32
    responses: [TopicResponse]
    node_endpoints: [NodeEndpoint]

TopicResponse =>
    topic: STRING
    topic_id: UUID
    partitions: [PartitionResponse]

PartitionResponse =>
    partition: INT32
    error_code: INT16
    high_watermark: INT64
    last_stable_offset: INT64
    log_start_offset: INT64
    diverging_epoch: EpochEndOffset
    current_leader: LeaderIdAndEpoch
    snapshot_id: SnapshotId
    aborted_transactions: [AbortedTransaction]
    preferred_read_replica: INT32
    records: RECORDS

EpochEndOffset =>
    epoch: INT32
    end_offset: INT64

SnapshotId =>
    end_offset: INT64
    epoch: INT32
Field Type Description
high_watermark INT64 End offset of committed data
last_stable_offset INT64 End of non-transactional or committed data
log_start_offset INT64 Log start offset
aborted_transactions ARRAY Aborted transaction markers
preferred_read_replica INT32 Suggested follower for reads
records RECORDS Fetched record batches

Isolation Levels

Level Value Behavior
read_uncommitted 0 Returns all records up to high watermark
read_committed 1 Returns only committed records (filters aborted transactions)

uml diagram

Long Polling

The Fetch API supports long polling via min_bytes and max_wait_ms:

uml diagram

Behavioral Contract

Aspect Guarantee
Ordering Records returned in offset order per partition
Completeness All records in requested range (up to size limits)
Isolation With read_committed, no uncommitted transactional records
Freshness May return slightly stale data after leader change

Error Handling

Error Code Retriable Cause Recovery
OFFSET_OUT_OF_RANGE Invalid fetch offset Reset to valid offset
NOT_LEADER_OR_FOLLOWER Stale leader Refresh metadata, retry
UNKNOWN_TOPIC_OR_PARTITION Topic not found Wait, retry
KAFKA_STORAGE_ERROR Disk error Wait, retry different replica

ListOffsets API (Key 2)

Overview

The ListOffsets API retrieves offsets by timestamp or special offset positions (earliest, latest).

Version History

Version Kafka Key Changes
0 0.8.0 Initial version (removed in 4.0)
1 0.10.1 Single-offset response (4.0 baseline)
2 0.11.0 Isolation level
4 2.1.0 Leader epoch
5 2.2.0 OFFSET_NOT_AVAILABLE
6 2.4.0 Flexible versions
7 2.8.0 MAX_TIMESTAMP (KIP-734)
8 3.4.0 EARLIEST_LOCAL (KIP-405)
9 3.6.0 LATEST_TIERED (KIP-1005)
10 3.8.0 Remote list offsets (KIP-1075)
11 4.0.0 EARLIEST_PENDING_UPLOAD (KIP-1023)

Request Schema

ListOffsetsRequest =>
    replica_id: INT32
    isolation_level: INT8
    topics: [TopicRequest]
    timeout_ms: INT32

TopicRequest =>
    name: STRING
    partitions: [PartitionRequest]

PartitionRequest =>
    partition_index: INT32
    current_leader_epoch: INT32
    timestamp: INT64
Field Type Description
replica_id INT32 Replica ID (-1 for consumers)
isolation_level INT8 0=read_uncommitted, 1=read_committed
timestamp INT64 Target timestamp or special value
timeout_ms INT32 Timeout for remote tiered reads (v10+)

Special Timestamp Values

Value Name Meaning
-1 LATEST Latest offset (log end offset)
-2 EARLIEST Earliest offset (log start offset)
-3 MAX_TIMESTAMP Offset of record with max timestamp (v7+)
-4 EARLIEST_LOCAL Earliest local log offset (v8+)
-5 LATEST_TIERED Latest tiered storage offset (v9+)
-6 EARLIEST_PENDING_UPLOAD Earliest pending upload offset (v11+)
≥0 Timestamp First offset with timestamp ≥ value

Response Schema

ListOffsetsResponse =>
    throttle_time_ms: INT32
    topics: [TopicResponse]

TopicResponse =>
    name: STRING
    partitions: [PartitionResponse]

PartitionResponse =>
    partition_index: INT32
    error_code: INT16
    timestamp: INT64
    offset: INT64
    leader_epoch: INT32
Field Type Description
timestamp INT64 Timestamp of returned offset (-1 if none)
offset INT64 Found offset
leader_epoch INT32 Leader epoch of returned offset

Behavioral Contract

Aspect Guarantee
Timestamp lookup Returns first offset where record timestamp ≥ requested
EARLIEST Returns log start offset
LATEST Returns high watermark (read_uncommitted) or LSO (read_committed)
Not found Returns -1 for offset if no matching record

Metadata API (Key 3)

Overview

The Metadata API retrieves cluster topology, broker information, and topic/partition metadata.

Version History

Version Kafka Key Changes
0 0.8.0 Initial version
1 0.10.0 Rack ID support
2 0.10.1 Cluster ID
3 0.10.2 Throttle time
4 0.11.0 Topic-level errors
5 1.0.0 Offline replicas
6 1.1.0 Response before throttling
7 2.0.0 Leader epoch
8 2.1.0 Allow topic auto-create control
9 2.4.0 Flexible versions
10 2.8.0 Topic ID field (not implemented)
11 3.0.0 Cluster authorized ops deprecated
12 3.4.0 Topic ID supported
13 4.0.0 Top-level error code

Request Schema

MetadataRequest =>
    topics: [TopicRequest]
    allow_auto_topic_creation: BOOLEAN
    include_cluster_authorized_operations: BOOLEAN
    include_topic_authorized_operations: BOOLEAN

TopicRequest =>
    topic_id: UUID
    name: NULLABLE_STRING
Field Type Description
topics ARRAY Topics to fetch (null for all topics)
allow_auto_topic_creation BOOLEAN Allow auto-creation of missing topics
include_cluster_authorized_operations BOOLEAN Include cluster ACL info (v8-10)
include_topic_authorized_operations BOOLEAN Include topic ACL info

Topic IDs are supported in v12+; v10-11 include the field but brokers do not implement it.

Response Schema

MetadataResponse =>
    throttle_time_ms: INT32
    brokers: [BrokerMetadata]
    cluster_id: NULLABLE_STRING
    controller_id: INT32
    topics: [TopicMetadata]
    cluster_authorized_operations: INT32
    error_code: INT16

BrokerMetadata =>
    node_id: INT32
    host: STRING
    port: INT32
    rack: NULLABLE_STRING

TopicMetadata =>
    error_code: INT16
    name: STRING
    topic_id: UUID
    is_internal: BOOLEAN
    partitions: [PartitionMetadata]
    topic_authorized_operations: INT32

PartitionMetadata =>
    error_code: INT16
    partition_index: INT32
    leader_id: INT32
    leader_epoch: INT32
    replica_nodes: [INT32]
    isr_nodes: [INT32]
    offline_replicas: [INT32]
Field Type Description
controller_id INT32 Current controller broker ID
cluster_id NULLABLE_STRING Cluster identifier
leader_id INT32 Partition leader broker ID (-1 if none)
replica_nodes ARRAY All replica broker IDs
isr_nodes ARRAY In-sync replica broker IDs
offline_replicas ARRAY Offline replica broker IDs

Behavioral Contract

Aspect Guarantee
Completeness All brokers the client may need to contact
Freshness May be slightly stale after topology changes
Auto-creation May create topics if enabled and requested
Leader info Leader may have changed since response

Metadata Staleness

Metadata responses may be stale. Clients must handle NOT_LEADER_OR_FOLLOWER errors by refreshing metadata and retrying.

Client Caching

Behavior Recommendation
Cache metadata per cluster should
Refresh on NOT_LEADER errors must
Periodic refresh interval metadata.max.age.ms

ApiVersions API (Key 18)

Overview

The ApiVersions API queries the broker for supported API versions. It is the first API called during connection setup.

Version History

Version Kafka Key Changes
0 0.10.0 Initial version
1 0.10.1 Throttle time
2 2.1.0 Response before throttling
3 2.4.0 Flexible versions
4 4.0.0 SupportedFeatures min version fix

Request Schema

ApiVersionsRequest =>
    client_software_name: STRING
    client_software_version: STRING
Field Type Description
client_software_name STRING Client library name (v3+)
client_software_version STRING Client library version (v3+)

Response Schema

ApiVersionsResponse =>
    error_code: INT16
    api_versions: [ApiVersion]
    throttle_time_ms: INT32
    supported_features: [SupportedFeature]
    finalized_features_epoch: INT64
    finalized_features: [FinalizedFeature]
    zk_migration_ready: BOOLEAN

ApiVersion =>
    api_key: INT16
    min_version: INT16
    max_version: INT16
Field Type Description
api_key INT16 API identifier
min_version INT16 Minimum supported version
max_version INT16 Maximum supported version

Special Handling

Behavior Description
Pre-authentication Broker must respond before SASL authentication
Version tolerance Broker should accept any valid request version
Error fallback On UNSUPPORTED_VERSION, client may try older version

uml diagram

Behavioral Contract

Aspect Guarantee
Availability Must be available before authentication
Accuracy Version ranges must reflect actual capabilities
Completeness Must include all supported APIs
Consistency Should not change during connection lifetime

Complete API Key Table

All Kafka Protocol APIs

Key Name Category First Version
0 Produce Core 0.8.0
1 Fetch Core 0.8.0
2 ListOffsets Core 0.8.0
3 Metadata Core 0.8.0
4 LeaderAndIsr Controller 0.8.0
5 StopReplica Controller 0.8.0
6 UpdateMetadata Controller 0.8.0
7 ControlledShutdown Controller 0.8.0
8 OffsetCommit Consumer 0.8.1
9 OffsetFetch Consumer 0.8.1
10 FindCoordinator Consumer 0.8.2
11 JoinGroup Consumer 0.9.0
12 Heartbeat Consumer 0.9.0
13 LeaveGroup Consumer 0.9.0
14 SyncGroup Consumer 0.9.0
15 DescribeGroups Consumer 0.9.0
16 ListGroups Consumer 0.9.0
17 SaslHandshake Auth 0.10.0
18 ApiVersions Core 0.10.0
19 CreateTopics Admin 0.10.1
20 DeleteTopics Admin 0.10.1
21 DeleteRecords Admin 0.11.0
22 InitProducerId Transaction 0.11.0
23 OffsetForLeaderEpoch Replication 0.11.0
24 AddPartitionsToTxn Transaction 0.11.0
25 AddOffsetsToTxn Transaction 0.11.0
26 EndTxn Transaction 0.11.0
27 WriteTxnMarkers Transaction 0.11.0
28 TxnOffsetCommit Transaction 0.11.0
29 DescribeAcls Admin 0.11.0
30 CreateAcls Admin 0.11.0
31 DeleteAcls Admin 0.11.0
32 DescribeConfigs Admin 0.11.0
33 AlterConfigs Admin 0.11.0
34 AlterReplicaLogDirs Admin 0.11.0
35 DescribeLogDirs Admin 0.11.0
36 SaslAuthenticate Auth 1.0.0
37 CreatePartitions Admin 1.0.0
38 CreateDelegationToken Auth 1.1.0
39 RenewDelegationToken Auth 1.1.0
40 ExpireDelegationToken Auth 1.1.0
41 DescribeDelegationToken Auth 1.1.0
42 DeleteGroups Consumer 1.1.0
43 ElectLeaders Admin 2.2.0
44 IncrementalAlterConfigs Admin 2.3.0
45 AlterPartitionReassignments Admin 2.4.0
46 ListPartitionReassignments Admin 2.4.0
47 OffsetDelete Consumer 0.11.0
48 DescribeClientQuotas Admin 2.6.0
49 AlterClientQuotas Admin 2.6.0
50 DescribeUserScramCredentials Auth 2.7.0
51 AlterUserScramCredentials Auth 2.7.0
52 Vote KRaft 2.7.0
53 BeginQuorumEpoch KRaft 2.7.0
54 EndQuorumEpoch KRaft 2.7.0
55 DescribeQuorum KRaft 2.7.0
56 AlterPartition Controller 2.7.0
57 UpdateFeatures Admin 2.7.0
58 Envelope KRaft 2.7.0
59 FetchSnapshot KRaft 3.0.0
60 DescribeCluster Admin 3.0.0
61 DescribeProducers Admin 3.0.0
62 BrokerRegistration KRaft 3.0.0
63 BrokerHeartbeat KRaft 3.0.0
64 UnregisterBroker KRaft 3.0.0
65 DescribeTransactions Transaction 3.0.0
66 ListTransactions Transaction 3.0.0
67 AllocateProducerIds KRaft 3.0.0
68 ConsumerGroupHeartbeat Consumer 3.5.0
69 ConsumerGroupDescribe Consumer 3.5.0
70 ControllerRegistration KRaft 3.5.0
71 GetTelemetrySubscriptions Telemetry 3.5.0
72 PushTelemetry Telemetry 3.5.0
73 AssignReplicasToDirs KRaft 3.6.0
74 ListConfigResources Telemetry 3.6.0
75 DescribeTopicPartitions Admin 3.7.0
76 ShareGroupHeartbeat Share 4.0.0
77 ShareGroupDescribe Share 4.0.0
78 ShareFetch Share 4.0.0
79 ShareAcknowledge Share 4.0.0
80 AddRaftVoter KRaft 4.0.0
81 RemoveRaftVoter KRaft 4.0.0
82 UpdateRaftVoter KRaft 4.0.0
83 InitializeShareGroupState Share 4.0.0
84 ReadShareGroupState Share 4.0.0
85 WriteShareGroupState Share 4.0.0
86 DeleteShareGroupState Share 4.0.0
87 ReadShareGroupStateSummary Share 4.0.0
88 StreamsGroupHeartbeat Streams 4.0.0
89 StreamsGroupDescribe Streams 4.0.0
90 DescribeShareGroupOffsets Share 4.0.0
91 AlterShareGroupOffsets Share 4.0.0
92 DeleteShareGroupOffsets Share 4.0.0