Skip to content
Maintained by AxonOps — production-grade documentation from engineers who operate distributed databases at scale

Kafka Share Group Protocol APIs

This document specifies the Kafka protocol APIs used for Share Groups, a new consumer model introduced in KIP-932 (Kafka 4.0+). Share Groups provide queue-like semantics where messages are distributed among consumers without client-side partition assignment.


Share Group Overview

Share Groups differ fundamentally from traditional Consumer Groups:

Aspect Consumer Groups Share Groups
Partition assignment Each partition assigned to one consumer No partition assignment
Message delivery All messages in partition to assigned consumer Messages distributed across consumers
Ordering Per-partition ordering guaranteed No ordering guarantee
Acknowledgment Offset-based Per-record acknowledgment
Use case Stream processing Work queue, task distribution

Share Group API Reference

Group Coordination

API Key Name Purpose
76 ShareGroupHeartbeat Maintain membership and receive assignments
77 ShareGroupDescribe Describe share group state

Data Operations

API Key Name Purpose
78 ShareFetch Fetch records from share group
79 ShareAcknowledge Acknowledge record processing

State Management

API Key Name Purpose
83 InitializeShareGroupState Initialize share partition state
84 ReadShareGroupState Read share partition state
85 WriteShareGroupState Write share partition state
86 DeleteShareGroupState Delete share partition state
87 ReadShareGroupStateSummary Read share partition state summary

Offset Management

API Key Name Purpose
90 DescribeShareGroupOffsets Describe share group offsets
91 AlterShareGroupOffsets Alter share group offsets
92 DeleteShareGroupOffsets Delete share group offsets

ShareGroupHeartbeat API (Key 76)

Overview

The ShareGroupHeartbeat API maintains share group membership. Unlike consumer groups which use separate JoinGroup/SyncGroup/Heartbeat APIs, share groups use a single heartbeat API for all coordination.

Request Schema

ShareGroupHeartbeatRequest =>
    group_id: STRING
    member_id: STRING
    member_epoch: INT32
    rack_id: NULLABLE_STRING
    subscribed_topic_names: [STRING]
Field Type Description
group_id STRING Share group identifier
member_id STRING Member ID (client-generated)
member_epoch INT32 Member epoch (0=join, -1=leave)
rack_id NULLABLE_STRING Rack ID (null if unchanged)
subscribed_topic_names ARRAY Subscribed topic names (null if unchanged)

Response Schema

ShareGroupHeartbeatResponse =>
    throttle_time_ms: INT32
    error_code: INT16
    error_message: NULLABLE_STRING
    member_id: NULLABLE_STRING
    member_epoch: INT32
    heartbeat_interval_ms: INT32
    assignment: Assignment

Assignment =>
    topic_partitions: [TopicPartition]

TopicPartition =>
    topic_id: UUID
    partitions: [INT32]

Behavioral Contract

Aspect Guarantee
Epoch fencing Stale members rejected via epoch
Assignment Server-side assignment, no leader election
Heartbeat interval Server specifies next heartbeat timing
Versioning v0 early access removed in 4.1; v1 stable

ShareGroupDescribe API (Key 77)

Overview

The ShareGroupDescribe API retrieves detailed information about share groups.

Request Schema

ShareGroupDescribeRequest =>
    group_ids: [STRING]
    include_authorized_operations: BOOLEAN

Response Schema

ShareGroupDescribeResponse =>
    throttle_time_ms: INT32
    groups: [Group]

Group =>
    error_code: INT16
    error_message: NULLABLE_STRING
    group_id: STRING
    group_state: STRING
    group_epoch: INT32
    assignment_epoch: INT32
    assignor_name: STRING
    members: [Member]
    authorized_operations: INT32

Member =>
    member_id: STRING
    rack_id: NULLABLE_STRING
    member_epoch: INT32
    client_id: STRING
    client_host: STRING
    subscribed_topic_names: [STRING]
    assignment: Assignment

Assignment =>
    topic_partitions: [TopicPartition]

TopicPartition =>
    topic_id: UUID
    topic_name: STRING
    partitions: [INT32]

ShareFetch API (Key 78)

Overview

The ShareFetch API retrieves records from a share group. Unlike regular Fetch, records are distributed across consumers and require acknowledgment.

Request Schema

ShareFetchRequest =>
    group_id: NULLABLE_STRING
    member_id: NULLABLE_STRING
    share_session_epoch: INT32
    max_wait_ms: INT32
    min_bytes: INT32
    max_bytes: INT32
    max_records: INT32
    batch_size: INT32
    share_acquire_mode: INT8
    is_renew_ack: BOOLEAN
    topics: [Topic]
    forgotten_topics_data: [ForgottenTopic]

Topic =>
    topic_id: UUID
    partitions: [Partition]

Partition =>
    partition_index: INT32
    partition_max_bytes: INT32
    acknowledgement_batches: [AcknowledgementBatch]

AcknowledgementBatch =>
    first_offset: INT64
    last_offset: INT64
    acknowledge_types: [INT8]
Field Type Description
group_id STRING Share group identifier
share_session_epoch INT32 Session epoch for incremental fetch
share_acquire_mode INT8 0=batch-optimized, 1=record-limit (v2+)
is_renew_ack BOOLEAN True if renew acks are present (v2+)
acknowledgement_batches ARRAY Inline acknowledgments (optional)

Response Schema

ShareFetchResponse =>
    throttle_time_ms: INT32
    error_code: INT16
    error_message: NULLABLE_STRING
    acquisition_lock_timeout_ms: INT32
    responses: [Response]
    node_endpoints: [NodeEndpoint]

Response =>
    topic_id: UUID
    partitions: [Partition]

Partition =>
    partition_index: INT32
    error_code: INT16
    error_message: NULLABLE_STRING
    acknowledge_error_code: INT16
    acknowledge_error_message: NULLABLE_STRING
    current_leader: LeaderIdAndEpoch
    records: RECORDS
    acquired_records: [AcquiredRecords]

AcquiredRecords =>
    first_offset: INT64
    last_offset: INT64
    delivery_count: INT16
Field Type Description
acquired_records ARRAY Records acquired by this consumer
delivery_count INT16 Number of delivery attempts

Behavioral Contract

Aspect Guarantee
Record locking Fetched records locked to consumer
Delivery tracking Delivery count tracks retries
Timeout Unacknowledged records released after timeout

ShareAcknowledge API (Key 79)

Overview

The ShareAcknowledge API acknowledges processing of records fetched via ShareFetch.

Request Schema

ShareAcknowledgeRequest =>
    group_id: NULLABLE_STRING
    member_id: NULLABLE_STRING
    share_session_epoch: INT32
    is_renew_ack: BOOLEAN
    topics: [Topic]

Topic =>
    topic_id: UUID
    partitions: [Partition]

Partition =>
    partition_index: INT32
    acknowledgement_batches: [AcknowledgementBatch]

AcknowledgementBatch =>
    first_offset: INT64
    last_offset: INT64
    acknowledge_types: [INT8]

Acknowledge Types

Value Type Description
0 GAP Placeholder for gaps
1 ACCEPT Successfully processed
2 RELEASE Release for redelivery
3 REJECT Reject permanently (DLQ)
4 RENEW Renew record lock (v2+)

Response Schema

ShareAcknowledgeResponse =>
    throttle_time_ms: INT32
    error_code: INT16
    error_message: NULLABLE_STRING
    acquisition_lock_timeout_ms: INT32
    responses: [Response]
    node_endpoints: [NodeEndpoint]

Response =>
    topic_id: UUID
    partitions: [Partition]

Partition =>
    partition_index: INT32
    error_code: INT16
    error_message: NULLABLE_STRING
    current_leader: LeaderIdAndEpoch

Behavioral Contract

Aspect Guarantee
ACCEPT Record marked complete, not redelivered
RELEASE Record released for another consumer
REJECT Record sent to dead letter queue (if configured)

State Management APIs

InitializeShareGroupState API (Key 83)

Initializes the share partition state when a share group first accesses a partition.

InitializeShareGroupStateRequest =>
    group_id: STRING
    topics: [Topic]

Topic =>
    topic_id: UUID
    partitions: [Partition]

Partition =>
    partition: INT32
    state_epoch: INT32
    start_offset: INT64

ReadShareGroupState API (Key 84)

Reads the current state of share partitions.

ReadShareGroupStateRequest =>
    group_id: STRING
    topics: [Topic]

Topic =>
    topic_id: UUID
    partitions: [Partition]

Partition =>
    partition: INT32
    leader_epoch: INT32

WriteShareGroupState API (Key 85)

Writes updated share partition state.

WriteShareGroupStateRequest =>
    group_id: STRING
    topics: [Topic]

Topic =>
    topic_id: UUID
    partitions: [Partition]

Partition =>
    partition: INT32
    state_epoch: INT32
    leader_epoch: INT32
    start_offset: INT64
    delivery_complete_count: INT32
    state_batches: [StateBatch]

StateBatch =>
    first_offset: INT64
    last_offset: INT64
    delivery_state: INT8
    delivery_count: INT16

DeleteShareGroupState API (Key 86)

Deletes share partition state when cleaning up a share group.

DeleteShareGroupStateRequest =>
    group_id: STRING
    topics: [Topic]

Topic =>
    topic_id: UUID
    partitions: [INT32]

ReadShareGroupStateSummary API (Key 87)

Reads a summary of share partition state for monitoring.

ReadShareGroupStateSummaryRequest =>
    group_id: STRING
    topics: [Topic]

Topic =>
    topic_id: UUID
    partitions: [Partition]

Partition =>
    partition: INT32
    leader_epoch: INT32

Offset Management APIs

DescribeShareGroupOffsets API (Key 90)

Describes the current offsets for a share group.

DescribeShareGroupOffsetsRequest =>
    groups: [Group]

Group =>
    group_id: STRING
    topics: [Topic]

Topic =>
    topic_name: STRING
    partitions: [INT32]

AlterShareGroupOffsets API (Key 91)

Alters the offsets for a share group (administrative operation).

AlterShareGroupOffsetsRequest =>
    group_id: STRING
    topics: [Topic]

Topic =>
    topic_name: STRING
    partitions: [Partition]

Partition =>
    partition_index: INT32
    start_offset: INT64

DeleteShareGroupOffsets API (Key 92)

Deletes offset information for a share group.

DeleteShareGroupOffsetsRequest =>
    group_id: STRING
    topics: [Topic]

Topic =>
    topic_name: STRING

Version Compatibility

Feature Minimum Kafka Version
Share Groups 4.0.0
All Share Group APIs 4.0.0

Kafka 4.0+ Feature

Share Groups are a new feature in Kafka 4.0. These APIs are not available in earlier versions.