Level 2 · 25 min
Consumer Groups
Consumer groups are Kafka's mechanism for scalable, fault-tolerant message consumption. A group of consumers shares the work of reading a topic — each partition is assigned to exactly one consumer per group. Understanding group coordination, rebalancing, lag, and offset strategies is essential for building reliable consumers.
Consumer Group Coordination
All consumers in a group share a group.id. Kafka assigns each partition to exactly one consumer in the group — never two consumers in the same group read the same partition simultaneously. This enables horizontal scaling (add consumers to scale up) and fault tolerance (if one consumer fails, its partitions are reassigned). The group coordinator is a specific broker elected by hashing group.id to an __consumer_offsets partition — whichever broker leads that partition coordinates the group. On JOIN_GROUP, one consumer is elected leader and receives the full member list; it runs the assignor algorithm locally and returns the assignment in SYNC_GROUP. This leader election is client-side, not broker-side — the broker only enforces the result. Static membership (group.instance.id, Kafka 2.3+) lets a consumer rejoin with the same identity after a restart, skipping the rebalance if it rejoins within `session.timeout.ms`.
Partition Rebalancing
A payments team ran a rolling deployment of 12 consumers against a 12-partition topic. They used the default EagerAssignor. Each pod restart triggered a group rebalance. With 12 pods and a staggered 30-second deploy interval, the group was in a continuous rebalance for 6 minutes straight — no consumer processed a single message for that entire window. Lag climbed to 2.3M messages. Downstream order-fulfillment services read stale payment status, tentatively approved orders for accounts that had already hit credit limits, and sent 18,000 duplicate approval notifications. Fix: switch to CooperativeStickyAssignor (Kafka 2.4+). Only the restarting pod's partitions are revoked during each cycle — the other 11 consumers never pause. Total processing interruption during the same rolling deploy dropped from 6 minutes to under 4 seconds. Configuration: `partition.assignment.strategy=org.apache.kafka.clients.consumer.CooperativeStickyAssignor`. Combine with static membership (`group.instance.id`) so pods that restart quickly don't trigger a rebalance at all if they rejoin within `session.timeout.ms`. Key insight from Designing Data-Intensive Applications: "The number of nodes sharing the work of consuming a topic can be at most the number of log partitions in that topic, because messages within the same partition are delivered to the same node. If a single message is slow to process, it holds up the processing of subsequent messages in that partition — a form of head-of-line blocking." This is why `max.poll.records` is the primary lever for consumer throughput tuning: reducing it limits head-of-line blocking within a batch at the cost of more poll round-trips. Kleppmann also notes that the Kafka consumer offset is architecturally equivalent to the replication log sequence number — a consumer can be rewound to any offset to replay events, which is impossible with traditional queue-based brokers that delete messages on delivery.
Consumer Lag and Offset Strategies
Consumer lag = latest offset in partition - consumer's committed offset. High lag means the consumer is falling behind producers. Monitor with kafka-consumer-groups.sh --describe or JMX metrics (`kafka.consumer:type=consumer-fetch-manager-metrics,records-lag-max`). Alert at two thresholds: warning at 10K records lag, critical at 100K. For offset commits: auto-commit (enable.auto.commit=true) commits every `auto.commit.interval.ms=5000` — risk of reprocessing if consumer crashes between commits. Manual commit: commitSync() after successful processing (at-least-once), or commitAsync() for higher throughput. Critical configs: `max.poll.interval.ms=300000` (max time between polls before Kafka considers the consumer dead), `session.timeout.ms=45000` (heartbeat timeout — must be between group.min.session.timeout.ms and group.max.session.timeout.ms), `heartbeat.interval.ms=15000` (should be ~1/3 of session.timeout), `max.poll.records=500` (tune down if processing is slow), `fetch.min.bytes=50000` (wait for 50KB before returning — reduces fetch round trips at the cost of a bit of latency). For strong guarantees: commit offsets atomically with state updates in the same DB transaction (exactly-once pattern with transactions).
Code example
// Consumer with manual commit + dead letter queue
Properties props = new Properties();
props.put(ConsumerConfig.GROUP_ID_CONFIG, "order-processor");
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 300_000);
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 45_000);
props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 15_000);
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 200);
props.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, 50_000);
props.put("partition.assignment.strategy",
"org.apache.kafka.clients.consumer.CooperativeStickyAssignor");
while (true) {
ConsumerRecords<K,V> batch = consumer.poll(Duration.ofMillis(500));
for (ConsumerRecord<K,V> r : batch) {
try {
processRecord(r);
} catch (NonRetryableException e) {
dlqProducer.send(new ProducerRecord<>("orders-dlq", r.key(), r.value()));
}
}
consumer.commitSync(); // at-least-once; idempotent processing handles duplicates
}