Kafka Consumer Group Protocol APIs
This document specifies the Kafka protocol APIs used for consumer group management, coordination, and offset tracking. These APIs implement the consumer group protocol that enables dynamic partition assignment and coordinated consumption.
Consumer Group API Reference
| API Key |
Name |
Purpose |
| 8 |
OffsetCommit |
Commit consumer offsets |
| 9 |
OffsetFetch |
Retrieve committed offsets |
| 10 |
FindCoordinator |
Locate group coordinator |
| 11 |
JoinGroup |
Join or create consumer group |
| 12 |
Heartbeat |
Maintain group membership |
| 13 |
LeaveGroup |
Leave consumer group |
| 14 |
SyncGroup |
Synchronize partition assignments |
| 15 |
DescribeGroups |
Describe group state |
| 16 |
ListGroups |
List all groups |
| 42 |
DeleteGroups |
Delete consumer groups |
| 47 |
OffsetDelete |
Delete committed offsets |
Group Membership Protocol
Protocol Overview

FindCoordinator API (Key 10)
Overview
The FindCoordinator API locates the broker serving as coordinator for a consumer group, transaction, or share group.
Version History
| Version |
Kafka |
Key Changes |
| 0 |
0.8.2 |
Initial version (group only) |
| 1 |
0.10.0 |
Throttle time |
| 2 |
0.11.0 |
Key type (transaction support) |
| 3 |
2.4.0 |
Flexible versions |
| 4 |
3.6.0 |
Batched requests (KIP-699) |
| 5 |
3.8.0 |
TRANSACTION_ABORTABLE errors (KIP-890) |
| 6 |
4.0.0 |
Share groups (KIP-932) |
Request Schema
FindCoordinatorRequest =>
key: STRING
key_type: INT8
coordinator_keys: [STRING]
| Field |
Type |
Description |
key |
STRING |
Coordinator key (v0-3) |
key_type |
INT8 |
0=GROUP, 1=TRANSACTION, 2=SHARE |
coordinator_keys |
ARRAY |
Multiple keys (v4+) |
For key_type=SHARE, the coordinator key format is groupId:topicId:partition.
Response Schema
FindCoordinatorResponse =>
throttle_time_ms: INT32
error_code: INT16
error_message: NULLABLE_STRING
node_id: INT32
host: STRING
port: INT32
coordinators: [Coordinator]
Coordinator =>
key: STRING
node_id: INT32
host: STRING
port: INT32
error_code: INT16
error_message: NULLABLE_STRING
| Field |
Type |
Description |
node_id |
INT32 |
Coordinator broker ID (v0-3) |
host |
STRING |
Coordinator hostname (v0-3) |
port |
INT32 |
Coordinator port (v0-3) |
Coordinator Selection
The coordinator is deterministically selected based on the coordinator key and internal topic:
partition = abs(hash(key)) % internal_topic.partitions
coordinator = leader(internal_topic, partition)
Key types map to internal topics:
- GROUP ->
__consumer_offsets
- TRANSACTION ->
__transaction_state
- SHARE ->
__share_group_state
Behavioral Contract
| Aspect |
Guarantee |
| Determinism |
Same group ID always maps to same partition |
| Consistency |
Coordinator stable unless broker fails |
| Failover |
On coordinator failure, new leader elected |
Error Handling
| Error Code |
Retriable |
Cause |
Recovery |
| COORDINATOR_NOT_AVAILABLE |
✅ |
Coordinator initializing |
Wait, retry |
| NOT_COORDINATOR |
✅ |
Coordinator moved |
Retry FindCoordinator |
| GROUP_AUTHORIZATION_FAILED |
❌ |
No Describe permission |
Check ACLs |
JoinGroup API (Key 11)
Overview
The JoinGroup API joins a consumer to a group, triggering rebalancing if necessary. The coordinator selects a group leader.
Version History
| Version |
Kafka |
Key Changes |
| 0 |
0.9.0 |
Initial version |
| 1 |
0.10.1 |
Rebalance timeout |
| 4 |
2.1.0 |
Second join with assigned member ID |
| 5 |
2.3.0 |
Group instance ID |
| 6 |
2.4.0 |
Flexible versions |
| 7 |
2.7.0 |
Protocol type in response (KIP-559) |
| 8 |
3.0.0 |
Reason field (KIP-800) |
| 9 |
3.5.0 |
Skip assignment in response |
Request Schema
JoinGroupRequest =>
group_id: STRING
session_timeout_ms: INT32
rebalance_timeout_ms: INT32
member_id: STRING
group_instance_id: NULLABLE_STRING
protocol_type: STRING
protocols: [Protocol]
reason: NULLABLE_STRING
Protocol =>
name: STRING
metadata: BYTES
| Field |
Type |
Description |
group_id |
STRING |
Consumer group identifier |
session_timeout_ms |
INT32 |
Session timeout for heartbeats |
rebalance_timeout_ms |
INT32 |
Maximum time to join |
member_id |
STRING |
Member ID (empty for new members) |
group_instance_id |
NULLABLE_STRING |
Static membership ID |
protocol_type |
STRING |
Protocol type (e.g., "consumer") |
protocols |
ARRAY |
Supported assignment protocols |
Response Schema
JoinGroupResponse =>
throttle_time_ms: INT32
error_code: INT16
generation_id: INT32
protocol_type: NULLABLE_STRING
protocol_name: NULLABLE_STRING
leader: STRING
skip_assignment: BOOLEAN
member_id: STRING
members: [Member]
Member =>
member_id: STRING
group_instance_id: NULLABLE_STRING
metadata: BYTES
| Field |
Type |
Description |
generation_id |
INT32 |
Group generation (increments per rebalance) |
protocol_name |
NULLABLE_STRING |
Selected assignment protocol |
leader |
STRING |
Group leader member ID |
member_id |
STRING |
Assigned member ID |
members |
ARRAY |
Member list (leader only) |
Rebalance Process

Member Assignment

Behavioral Contract
| Aspect |
Guarantee |
| Leader election |
Coordinator selects leader (often the first to join) |
| Generation ID |
Increments on each successful rebalance |
| Member ID |
Assigned by coordinator, must be used in subsequent requests |
| Timeout |
Members not completing join within rebalance_timeout are removed |
Static Group Membership
With group.instance.id:
| Behavior |
Static Membership |
| Rejoin after restart |
Preserves member ID |
| Session timeout |
Longer grace period |
| Rebalance avoidance |
No rebalance on transient failures |
Heartbeat API (Key 12)
Overview
The Heartbeat API maintains consumer group membership and detects failures.
Version History
| Version |
Kafka |
Key Changes |
| 0 |
0.9.0 |
Initial version |
| 1 |
0.10.1 |
Throttle time |
| 2 |
0.11.0 |
Response before throttling |
| 3 |
2.3.0 |
Group instance ID |
| 4 |
2.4.0 |
Flexible versions |
Request Schema
HeartbeatRequest =>
group_id: STRING
generation_id: INT32
member_id: STRING
group_instance_id: NULLABLE_STRING
| Field |
Type |
Description |
group_id |
STRING |
Consumer group ID |
generation_id |
INT32 |
Current group generation |
member_id |
STRING |
Member's assigned ID |
group_instance_id |
NULLABLE_STRING |
Static membership ID |
Response Schema
HeartbeatResponse =>
throttle_time_ms: INT32
error_code: INT16
Behavioral Contract
| Aspect |
Guarantee |
| Frequency |
Must send within session.timeout.ms |
| Failure detection |
Missing heartbeats trigger rebalance |
| Rebalance signal |
REBALANCE_IN_PROGRESS indicates pending rebalance |
Error Handling
| Error Code |
Meaning |
Recovery |
| NONE |
Success |
Continue heartbeating |
| REBALANCE_IN_PROGRESS |
Rebalance started |
Rejoin group |
| ILLEGAL_GENERATION |
Stale generation |
Rejoin group |
| UNKNOWN_MEMBER_ID |
Member removed |
Rejoin group |
| FENCED_INSTANCE_ID |
Static member fenced |
Exit or rejoin |
SyncGroup API (Key 14)
Overview
The SyncGroup API distributes partition assignments after a successful rebalance.
Version History
| Version |
Kafka |
Key Changes |
| 0 |
0.9.0 |
Initial version |
| 1 |
0.10.1 |
Throttle time |
| 2 |
0.11.0 |
Response improvements |
| 3 |
2.3.0 |
Group instance ID |
| 4 |
2.4.0 |
Flexible versions |
| 5 |
2.7.0 |
Protocol type/name |
Request Schema
SyncGroupRequest =>
group_id: STRING
generation_id: INT32
member_id: STRING
group_instance_id: NULLABLE_STRING
protocol_type: NULLABLE_STRING
protocol_name: NULLABLE_STRING
assignments: [Assignment]
Assignment =>
member_id: STRING
assignment: BYTES
| Field |
Type |
Description |
assignments |
ARRAY |
Partition assignments (leader only) |
Response Schema
SyncGroupResponse =>
throttle_time_ms: INT32
error_code: INT16
protocol_type: NULLABLE_STRING
protocol_name: NULLABLE_STRING
assignment: BYTES
| Field |
Type |
Description |
assignment |
BYTES |
Member's partition assignment |
The assignment bytes follow a protocol-specific format. For the "consumer" protocol:
ConsumerProtocolAssignment =>
assigned_partitions: [TopicPartition]
user_data: BYTES
TopicPartition =>
topic: STRING
partitions: [INT32]
Behavioral Contract
| Aspect |
Guarantee |
| Leader responsibility |
Only leader must include assignments |
| Follower behavior |
Non-leaders must send empty assignments |
| Atomicity |
All members receive assignments when coordinator responds |
| Consistency |
Same assignment for same generation |
LeaveGroup API (Key 13)
Overview
The LeaveGroup API gracefully removes members from a consumer group.
Version History
| Version |
Kafka |
Key Changes |
| 0 |
0.9.0 |
Initial version |
| 1 |
0.10.1 |
Throttle time |
| 2 |
0.11.0 |
Response before throttling |
| 3 |
2.3.0 |
Batch member removal + group instance ID |
| 4 |
2.4.0 |
Flexible versions |
| 5 |
3.0.0 |
Reason field (KIP-800) |
Request Schema
LeaveGroupRequest =>
group_id: STRING
member_id: STRING
members: [MemberIdentity]
MemberIdentity =>
member_id: STRING
group_instance_id: NULLABLE_STRING
reason: NULLABLE_STRING
Response Schema
LeaveGroupResponse =>
throttle_time_ms: INT32
error_code: INT16
members: [MemberResponse]
MemberResponse =>
member_id: STRING
group_instance_id: NULLABLE_STRING
error_code: INT16
Behavioral Contract
| Aspect |
Guarantee |
| Immediate effect |
Member removed immediately |
| Rebalance trigger |
Remaining members notified via heartbeat |
| Static members |
May leave without triggering immediate rebalance |
OffsetCommit API (Key 8)
Overview
The OffsetCommit API stores consumer offsets in the __consumer_offsets topic.
Version History
| Version |
Kafka |
Key Changes |
| 0 |
0.8.1 |
Initial version (removed in 4.0) |
| 1 |
0.8.2 |
Commit timestamp (removed in 4.0) |
| 2 |
0.9.0 |
Retention time (4.0 baseline) |
| 3 |
0.11.0 |
Throttle time |
| 5 |
2.1.0 |
Retention time removed |
| 6 |
2.1.0 |
Leader epoch |
| 7 |
2.3.0 |
Group instance ID |
| 8 |
2.4.0 |
Flexible versions |
| 9 |
3.5.0 |
Consumer group protocol (KIP-848) |
| 10 |
4.0.0 |
Topic IDs |
Request Schema
OffsetCommitRequest =>
group_id: STRING
generation_id_or_member_epoch: INT32
member_id: STRING
group_instance_id: NULLABLE_STRING
topics: [Topic]
Topic =>
name: STRING
topic_id: UUID
partitions: [Partition]
Partition =>
partition_index: INT32
committed_offset: INT64
committed_leader_epoch: INT32
committed_metadata: NULLABLE_STRING
Topic names are used through v9; v10+ uses topic_id instead.
| Field |
Type |
Description |
generation_id_or_member_epoch |
INT32 |
Group generation (classic) or member epoch (consumer protocol) |
committed_offset |
INT64 |
Offset to commit |
committed_leader_epoch |
INT32 |
Leader epoch of committed offset |
committed_metadata |
NULLABLE_STRING |
Application metadata |
Response Schema
OffsetCommitResponse =>
throttle_time_ms: INT32
topics: [Topic]
Topic =>
name: STRING
topic_id: UUID
partitions: [Partition]
Partition =>
partition_index: INT32
error_code: INT16
Behavioral Contract
| Aspect |
Guarantee |
| Durability |
Committed to replicated __consumer_offsets topic |
| Visibility |
Available immediately after successful commit |
| Generation check |
Must match current generation (if group member) |
| Retention |
Subject to offsets.retention.minutes |
Error Handling
| Error Code |
Retriable |
Cause |
Recovery |
| ILLEGAL_GENERATION |
❌ |
Stale generation |
Rejoin group |
| UNKNOWN_MEMBER_ID |
❌ |
Not a group member |
Rejoin group |
| OFFSET_METADATA_TOO_LARGE |
❌ |
Metadata too large |
Reduce metadata |
| GROUP_AUTHORIZATION_FAILED |
❌ |
No Read permission |
Check ACLs |
OffsetFetch API (Key 9)
Overview
The OffsetFetch API retrieves committed offsets for a consumer group.
Version History
| Version |
Kafka |
Key Changes |
| 0 |
0.8.1 |
Initial version (removed in 4.0) |
| 1 |
0.8.2 |
Kafka-stored offsets (4.0 baseline) |
| 2 |
0.10.2 |
All partitions + top-level error |
| 3 |
0.11.0 |
Throttle time |
| 5 |
2.1.0 |
Leader epoch |
| 6 |
2.4.0 |
Flexible versions |
| 7 |
2.5.0 |
Require stable |
| 8 |
3.0.0 |
Multiple groups |
| 9 |
3.5.0 |
Consumer group protocol (KIP-848) |
| 10 |
4.0.0 |
Topic IDs |
Request Schema
OffsetFetchRequest =>
group_id: STRING
topics: [Topic]
groups: [Group]
require_stable: BOOLEAN
Topic =>
name: STRING
partition_indexes: [INT32]
Group =>
group_id: STRING
member_id: NULLABLE_STRING
member_epoch: INT32
topics: [Topic]
| Field |
Type |
Description |
topics |
ARRAY |
Topics/partitions to fetch (null for all) |
require_stable |
BOOLEAN |
If true, unstable offsets return a retriable error |
Response Schema
OffsetFetchResponse =>
throttle_time_ms: INT32
topics: [Topic]
error_code: INT16
groups: [Group]
Topic =>
name: STRING
partitions: [Partition]
Partition =>
partition_index: INT32
committed_offset: INT64
committed_leader_epoch: INT32
metadata: NULLABLE_STRING
error_code: INT16
| Field |
Type |
Description |
committed_offset |
INT64 |
Last committed offset (-1 if none) |
committed_leader_epoch |
INT32 |
Leader epoch of committed offset |
metadata |
NULLABLE_STRING |
Application metadata |
Behavioral Contract
| Aspect |
Guarantee |
| Consistency |
Returns latest committed offset |
| No offset |
Returns -1 if no offset committed |
| require_stable |
With true, waits for pending transactions |
DescribeGroups API (Key 15)
Overview
The DescribeGroups API retrieves detailed information about consumer groups.
Version History
| Version |
Kafka |
Key Changes |
| 0 |
0.9.0 |
Initial version |
| 1 |
0.10.1 |
Throttle time |
| 2 |
0.11.0 |
Response improvements |
| 3 |
2.3.0 |
Authorized operations |
| 4 |
2.4.0 |
KIP-345 |
| 5 |
2.4.0 |
Flexible versions |
| 6 |
3.9.0 |
GROUP_ID_NOT_FOUND (KIP-1043) |
Request Schema
DescribeGroupsRequest =>
groups: [STRING]
include_authorized_operations: BOOLEAN
Response Schema
DescribeGroupsResponse =>
throttle_time_ms: INT32
groups: [Group]
Group =>
error_code: INT16
group_id: STRING
group_state: STRING
protocol_type: STRING
protocol_data: STRING
members: [Member]
authorized_operations: INT32
Member =>
member_id: STRING
group_instance_id: NULLABLE_STRING
client_id: STRING
client_host: STRING
member_metadata: BYTES
member_assignment: BYTES
| Field |
Type |
Description |
group_state |
STRING |
Current group state |
protocol_type |
STRING |
Protocol type (e.g., "consumer") |
protocol_data |
STRING |
Selected protocol name |
member_assignment |
BYTES |
Current partition assignment |
Group States
| State |
Description |
Empty |
No active members |
PreparingRebalance |
Rebalance in progress |
CompletingRebalance |
Waiting for SyncGroup |
Stable |
Active and stable |
Dead |
Group being deleted |
ListGroups API (Key 16)
Overview
The ListGroups API lists all consumer groups on a broker.
Version History
| Version |
Kafka |
Key Changes |
| 0 |
0.9.0 |
Initial version |
| 1 |
0.10.1 |
Throttle time |
| 3 |
2.4.0 |
Flexible versions |
| 4 |
2.6.0 |
States filter (KIP-518) |
| 5 |
3.5.0 |
Types filter (KIP-848) |
Request Schema
ListGroupsRequest =>
states_filter: [STRING]
types_filter: [STRING]
| Field |
Type |
Description |
states_filter |
ARRAY |
Filter by group states |
types_filter |
ARRAY |
Filter by group types |
Response Schema
ListGroupsResponse =>
throttle_time_ms: INT32
error_code: INT16
groups: [Group]
Group =>
group_id: STRING
protocol_type: STRING
group_state: STRING
group_type: STRING
DeleteGroups API (Key 42)
Overview
The DeleteGroups API deletes consumer groups.
Version History
| Version |
Kafka |
Key Changes |
| 0 |
1.1.0 |
Initial version |
| 1 |
2.0.0 |
Response improvements |
| 2 |
2.4.0 |
Flexible versions |
Request Schema
DeleteGroupsRequest =>
groups_names: [STRING]
Response Schema
DeleteGroupsResponse =>
throttle_time_ms: INT32
results: [Result]
Result =>
group_id: STRING
error_code: INT16
Behavioral Contract
| Aspect |
Guarantee |
| Precondition |
Group must be Empty or Dead |
| Atomicity |
Each group deletion is independent |
| Effect |
Removes group and all committed offsets |
OffsetDelete API (Key 47)
Overview
The OffsetDelete API deletes committed offsets for a group and set of partitions.
Version History
| Version |
Kafka |
Key Changes |
| 0 |
0.11.0 |
Initial version |
Request Schema
OffsetDeleteRequest =>
group_id: STRING
topics: [Topic]
Topic =>
name: STRING
partitions: [Partition]
Partition =>
partition_index: INT32
Response Schema
OffsetDeleteResponse =>
error_code: INT16
throttle_time_ms: INT32
topics: [Topic]
Topic =>
name: STRING
partitions: [Partition]
Partition =>
partition_index: INT32
error_code: INT16
Consumer Group Protocol Summary
Complete Flow
