Skip to content
Maintained by AxonOps — production-grade documentation from engineers who operate distributed databases at scale

Kafka Consumer Group Protocol APIs

This document specifies the Kafka protocol APIs used for consumer group management, coordination, and offset tracking. These APIs implement the consumer group protocol that enables dynamic partition assignment and coordinated consumption.


Consumer Group API Reference

API Key Name Purpose
8 OffsetCommit Commit consumer offsets
9 OffsetFetch Retrieve committed offsets
10 FindCoordinator Locate group coordinator
11 JoinGroup Join or create consumer group
12 Heartbeat Maintain group membership
13 LeaveGroup Leave consumer group
14 SyncGroup Synchronize partition assignments
15 DescribeGroups Describe group state
16 ListGroups List all groups
42 DeleteGroups Delete consumer groups
47 OffsetDelete Delete committed offsets

Group Membership Protocol

Protocol Overview

uml diagram


FindCoordinator API (Key 10)

Overview

The FindCoordinator API locates the broker serving as coordinator for a consumer group, transaction, or share group.

Version History

Version Kafka Key Changes
0 0.8.2 Initial version (group only)
1 0.10.0 Throttle time
2 0.11.0 Key type (transaction support)
3 2.4.0 Flexible versions
4 3.6.0 Batched requests (KIP-699)
5 3.8.0 TRANSACTION_ABORTABLE errors (KIP-890)
6 4.0.0 Share groups (KIP-932)

Request Schema

FindCoordinatorRequest =>
    key: STRING
    key_type: INT8
    coordinator_keys: [STRING]
Field Type Description
key STRING Coordinator key (v0-3)
key_type INT8 0=GROUP, 1=TRANSACTION, 2=SHARE
coordinator_keys ARRAY Multiple keys (v4+)

For key_type=SHARE, the coordinator key format is groupId:topicId:partition.

Response Schema

FindCoordinatorResponse =>
    throttle_time_ms: INT32
    error_code: INT16
    error_message: NULLABLE_STRING
    node_id: INT32
    host: STRING
    port: INT32
    coordinators: [Coordinator]

Coordinator =>
    key: STRING
    node_id: INT32
    host: STRING
    port: INT32
    error_code: INT16
    error_message: NULLABLE_STRING
Field Type Description
node_id INT32 Coordinator broker ID (v0-3)
host STRING Coordinator hostname (v0-3)
port INT32 Coordinator port (v0-3)

Coordinator Selection

The coordinator is deterministically selected based on the coordinator key and internal topic:

partition = abs(hash(key)) % internal_topic.partitions
coordinator = leader(internal_topic, partition)

Key types map to internal topics:

  • GROUP -> __consumer_offsets
  • TRANSACTION -> __transaction_state
  • SHARE -> __share_group_state

Behavioral Contract

Aspect Guarantee
Determinism Same group ID always maps to same partition
Consistency Coordinator stable unless broker fails
Failover On coordinator failure, new leader elected

Error Handling

Error Code Retriable Cause Recovery
COORDINATOR_NOT_AVAILABLE Coordinator initializing Wait, retry
NOT_COORDINATOR Coordinator moved Retry FindCoordinator
GROUP_AUTHORIZATION_FAILED No Describe permission Check ACLs

JoinGroup API (Key 11)

Overview

The JoinGroup API joins a consumer to a group, triggering rebalancing if necessary. The coordinator selects a group leader.

Version History

Version Kafka Key Changes
0 0.9.0 Initial version
1 0.10.1 Rebalance timeout
4 2.1.0 Second join with assigned member ID
5 2.3.0 Group instance ID
6 2.4.0 Flexible versions
7 2.7.0 Protocol type in response (KIP-559)
8 3.0.0 Reason field (KIP-800)
9 3.5.0 Skip assignment in response

Request Schema

JoinGroupRequest =>
    group_id: STRING
    session_timeout_ms: INT32
    rebalance_timeout_ms: INT32
    member_id: STRING
    group_instance_id: NULLABLE_STRING
    protocol_type: STRING
    protocols: [Protocol]
    reason: NULLABLE_STRING

Protocol =>
    name: STRING
    metadata: BYTES
Field Type Description
group_id STRING Consumer group identifier
session_timeout_ms INT32 Session timeout for heartbeats
rebalance_timeout_ms INT32 Maximum time to join
member_id STRING Member ID (empty for new members)
group_instance_id NULLABLE_STRING Static membership ID
protocol_type STRING Protocol type (e.g., "consumer")
protocols ARRAY Supported assignment protocols

Response Schema

JoinGroupResponse =>
    throttle_time_ms: INT32
    error_code: INT16
    generation_id: INT32
    protocol_type: NULLABLE_STRING
    protocol_name: NULLABLE_STRING
    leader: STRING
    skip_assignment: BOOLEAN
    member_id: STRING
    members: [Member]

Member =>
    member_id: STRING
    group_instance_id: NULLABLE_STRING
    metadata: BYTES
Field Type Description
generation_id INT32 Group generation (increments per rebalance)
protocol_name NULLABLE_STRING Selected assignment protocol
leader STRING Group leader member ID
member_id STRING Assigned member ID
members ARRAY Member list (leader only)

Rebalance Process

uml diagram

Member Assignment

uml diagram

Behavioral Contract

Aspect Guarantee
Leader election Coordinator selects leader (often the first to join)
Generation ID Increments on each successful rebalance
Member ID Assigned by coordinator, must be used in subsequent requests
Timeout Members not completing join within rebalance_timeout are removed

Static Group Membership

With group.instance.id:

Behavior Static Membership
Rejoin after restart Preserves member ID
Session timeout Longer grace period
Rebalance avoidance No rebalance on transient failures

Heartbeat API (Key 12)

Overview

The Heartbeat API maintains consumer group membership and detects failures.

Version History

Version Kafka Key Changes
0 0.9.0 Initial version
1 0.10.1 Throttle time
2 0.11.0 Response before throttling
3 2.3.0 Group instance ID
4 2.4.0 Flexible versions

Request Schema

HeartbeatRequest =>
    group_id: STRING
    generation_id: INT32
    member_id: STRING
    group_instance_id: NULLABLE_STRING
Field Type Description
group_id STRING Consumer group ID
generation_id INT32 Current group generation
member_id STRING Member's assigned ID
group_instance_id NULLABLE_STRING Static membership ID

Response Schema

HeartbeatResponse =>
    throttle_time_ms: INT32
    error_code: INT16

Behavioral Contract

Aspect Guarantee
Frequency Must send within session.timeout.ms
Failure detection Missing heartbeats trigger rebalance
Rebalance signal REBALANCE_IN_PROGRESS indicates pending rebalance

Error Handling

Error Code Meaning Recovery
NONE Success Continue heartbeating
REBALANCE_IN_PROGRESS Rebalance started Rejoin group
ILLEGAL_GENERATION Stale generation Rejoin group
UNKNOWN_MEMBER_ID Member removed Rejoin group
FENCED_INSTANCE_ID Static member fenced Exit or rejoin

SyncGroup API (Key 14)

Overview

The SyncGroup API distributes partition assignments after a successful rebalance.

Version History

Version Kafka Key Changes
0 0.9.0 Initial version
1 0.10.1 Throttle time
2 0.11.0 Response improvements
3 2.3.0 Group instance ID
4 2.4.0 Flexible versions
5 2.7.0 Protocol type/name

Request Schema

SyncGroupRequest =>
    group_id: STRING
    generation_id: INT32
    member_id: STRING
    group_instance_id: NULLABLE_STRING
    protocol_type: NULLABLE_STRING
    protocol_name: NULLABLE_STRING
    assignments: [Assignment]

Assignment =>
    member_id: STRING
    assignment: BYTES
Field Type Description
assignments ARRAY Partition assignments (leader only)

Response Schema

SyncGroupResponse =>
    throttle_time_ms: INT32
    error_code: INT16
    protocol_type: NULLABLE_STRING
    protocol_name: NULLABLE_STRING
    assignment: BYTES
Field Type Description
assignment BYTES Member's partition assignment

Assignment Format

The assignment bytes follow a protocol-specific format. For the "consumer" protocol:

ConsumerProtocolAssignment =>
    assigned_partitions: [TopicPartition]
    user_data: BYTES

TopicPartition =>
    topic: STRING
    partitions: [INT32]

Behavioral Contract

Aspect Guarantee
Leader responsibility Only leader must include assignments
Follower behavior Non-leaders must send empty assignments
Atomicity All members receive assignments when coordinator responds
Consistency Same assignment for same generation

LeaveGroup API (Key 13)

Overview

The LeaveGroup API gracefully removes members from a consumer group.

Version History

Version Kafka Key Changes
0 0.9.0 Initial version
1 0.10.1 Throttle time
2 0.11.0 Response before throttling
3 2.3.0 Batch member removal + group instance ID
4 2.4.0 Flexible versions
5 3.0.0 Reason field (KIP-800)

Request Schema

LeaveGroupRequest =>
    group_id: STRING
    member_id: STRING
    members: [MemberIdentity]

MemberIdentity =>
    member_id: STRING
    group_instance_id: NULLABLE_STRING
    reason: NULLABLE_STRING

Response Schema

LeaveGroupResponse =>
    throttle_time_ms: INT32
    error_code: INT16
    members: [MemberResponse]

MemberResponse =>
    member_id: STRING
    group_instance_id: NULLABLE_STRING
    error_code: INT16

Behavioral Contract

Aspect Guarantee
Immediate effect Member removed immediately
Rebalance trigger Remaining members notified via heartbeat
Static members May leave without triggering immediate rebalance

OffsetCommit API (Key 8)

Overview

The OffsetCommit API stores consumer offsets in the __consumer_offsets topic.

Version History

Version Kafka Key Changes
0 0.8.1 Initial version (removed in 4.0)
1 0.8.2 Commit timestamp (removed in 4.0)
2 0.9.0 Retention time (4.0 baseline)
3 0.11.0 Throttle time
5 2.1.0 Retention time removed
6 2.1.0 Leader epoch
7 2.3.0 Group instance ID
8 2.4.0 Flexible versions
9 3.5.0 Consumer group protocol (KIP-848)
10 4.0.0 Topic IDs

Request Schema

OffsetCommitRequest =>
    group_id: STRING
    generation_id_or_member_epoch: INT32
    member_id: STRING
    group_instance_id: NULLABLE_STRING
    topics: [Topic]

Topic =>
    name: STRING
    topic_id: UUID
    partitions: [Partition]

Partition =>
    partition_index: INT32
    committed_offset: INT64
    committed_leader_epoch: INT32
    committed_metadata: NULLABLE_STRING

Topic names are used through v9; v10+ uses topic_id instead.

Field Type Description
generation_id_or_member_epoch INT32 Group generation (classic) or member epoch (consumer protocol)
committed_offset INT64 Offset to commit
committed_leader_epoch INT32 Leader epoch of committed offset
committed_metadata NULLABLE_STRING Application metadata

Response Schema

OffsetCommitResponse =>
    throttle_time_ms: INT32
    topics: [Topic]

Topic =>
    name: STRING
    topic_id: UUID
    partitions: [Partition]

Partition =>
    partition_index: INT32
    error_code: INT16

Behavioral Contract

Aspect Guarantee
Durability Committed to replicated __consumer_offsets topic
Visibility Available immediately after successful commit
Generation check Must match current generation (if group member)
Retention Subject to offsets.retention.minutes

Error Handling

Error Code Retriable Cause Recovery
ILLEGAL_GENERATION Stale generation Rejoin group
UNKNOWN_MEMBER_ID Not a group member Rejoin group
OFFSET_METADATA_TOO_LARGE Metadata too large Reduce metadata
GROUP_AUTHORIZATION_FAILED No Read permission Check ACLs

OffsetFetch API (Key 9)

Overview

The OffsetFetch API retrieves committed offsets for a consumer group.

Version History

Version Kafka Key Changes
0 0.8.1 Initial version (removed in 4.0)
1 0.8.2 Kafka-stored offsets (4.0 baseline)
2 0.10.2 All partitions + top-level error
3 0.11.0 Throttle time
5 2.1.0 Leader epoch
6 2.4.0 Flexible versions
7 2.5.0 Require stable
8 3.0.0 Multiple groups
9 3.5.0 Consumer group protocol (KIP-848)
10 4.0.0 Topic IDs

Request Schema

OffsetFetchRequest =>
    group_id: STRING
    topics: [Topic]
    groups: [Group]
    require_stable: BOOLEAN

Topic =>
    name: STRING
    partition_indexes: [INT32]

Group =>
    group_id: STRING
    member_id: NULLABLE_STRING
    member_epoch: INT32
    topics: [Topic]
Field Type Description
topics ARRAY Topics/partitions to fetch (null for all)
require_stable BOOLEAN If true, unstable offsets return a retriable error

Response Schema

OffsetFetchResponse =>
    throttle_time_ms: INT32
    topics: [Topic]
    error_code: INT16
    groups: [Group]

Topic =>
    name: STRING
    partitions: [Partition]

Partition =>
    partition_index: INT32
    committed_offset: INT64
    committed_leader_epoch: INT32
    metadata: NULLABLE_STRING
    error_code: INT16
Field Type Description
committed_offset INT64 Last committed offset (-1 if none)
committed_leader_epoch INT32 Leader epoch of committed offset
metadata NULLABLE_STRING Application metadata

Behavioral Contract

Aspect Guarantee
Consistency Returns latest committed offset
No offset Returns -1 if no offset committed
require_stable With true, waits for pending transactions

DescribeGroups API (Key 15)

Overview

The DescribeGroups API retrieves detailed information about consumer groups.

Version History

Version Kafka Key Changes
0 0.9.0 Initial version
1 0.10.1 Throttle time
2 0.11.0 Response improvements
3 2.3.0 Authorized operations
4 2.4.0 KIP-345
5 2.4.0 Flexible versions
6 3.9.0 GROUP_ID_NOT_FOUND (KIP-1043)

Request Schema

DescribeGroupsRequest =>
    groups: [STRING]
    include_authorized_operations: BOOLEAN

Response Schema

DescribeGroupsResponse =>
    throttle_time_ms: INT32
    groups: [Group]

Group =>
    error_code: INT16
    group_id: STRING
    group_state: STRING
    protocol_type: STRING
    protocol_data: STRING
    members: [Member]
    authorized_operations: INT32

Member =>
    member_id: STRING
    group_instance_id: NULLABLE_STRING
    client_id: STRING
    client_host: STRING
    member_metadata: BYTES
    member_assignment: BYTES
Field Type Description
group_state STRING Current group state
protocol_type STRING Protocol type (e.g., "consumer")
protocol_data STRING Selected protocol name
member_assignment BYTES Current partition assignment

Group States

State Description
Empty No active members
PreparingRebalance Rebalance in progress
CompletingRebalance Waiting for SyncGroup
Stable Active and stable
Dead Group being deleted

ListGroups API (Key 16)

Overview

The ListGroups API lists all consumer groups on a broker.

Version History

Version Kafka Key Changes
0 0.9.0 Initial version
1 0.10.1 Throttle time
3 2.4.0 Flexible versions
4 2.6.0 States filter (KIP-518)
5 3.5.0 Types filter (KIP-848)

Request Schema

ListGroupsRequest =>
    states_filter: [STRING]
    types_filter: [STRING]
Field Type Description
states_filter ARRAY Filter by group states
types_filter ARRAY Filter by group types

Response Schema

ListGroupsResponse =>
    throttle_time_ms: INT32
    error_code: INT16
    groups: [Group]

Group =>
    group_id: STRING
    protocol_type: STRING
    group_state: STRING
    group_type: STRING

DeleteGroups API (Key 42)

Overview

The DeleteGroups API deletes consumer groups.

Version History

Version Kafka Key Changes
0 1.1.0 Initial version
1 2.0.0 Response improvements
2 2.4.0 Flexible versions

Request Schema

DeleteGroupsRequest =>
    groups_names: [STRING]

Response Schema

DeleteGroupsResponse =>
    throttle_time_ms: INT32
    results: [Result]

Result =>
    group_id: STRING
    error_code: INT16

Behavioral Contract

Aspect Guarantee
Precondition Group must be Empty or Dead
Atomicity Each group deletion is independent
Effect Removes group and all committed offsets

OffsetDelete API (Key 47)

Overview

The OffsetDelete API deletes committed offsets for a group and set of partitions.

Version History

Version Kafka Key Changes
0 0.11.0 Initial version

Request Schema

OffsetDeleteRequest =>
    group_id: STRING
    topics: [Topic]

Topic =>
    name: STRING
    partitions: [Partition]

Partition =>
    partition_index: INT32

Response Schema

OffsetDeleteResponse =>
    error_code: INT16
    throttle_time_ms: INT32
    topics: [Topic]

Topic =>
    name: STRING
    partitions: [Partition]

Partition =>
    partition_index: INT32
    error_code: INT16

Consumer Group Protocol Summary

Complete Flow

uml diagram