Command Palette

Search for a command to run...

Level 3 · 30 min

Advanced Kafka

Advanced Kafka covers exactly-once semantics (EOS), idempotent producers, transactions, and log compaction. These features enable Kafka to be used as a transactional messaging backbone — not just a best-effort fire-and-forget bus.

Exactly-Once Semantics

Exactly-once semantics (EOS) means each message is processed exactly once — no duplicates, no losses. Achieving EOS requires three components working together: (1) idempotent producers to prevent duplicates from producer retries, (2) Kafka transactions for atomic multi-partition writes, and (3) consumers reading committed messages only (`isolation.level=read_committed`). The read-process-write pattern with transactions: consume from input topic, process, write to output topic and commit input offsets atomically in one transaction. EOS was introduced in Kafka 0.11. Kafka Streams uses it natively since Kafka 2.5 with `processing.guarantee=exactly_once_v2` (EOS-V2 — uses a single shared transaction coordinator per StreamThread, replacing the EOS-V1 per-partition coordinator approach that had scalability issues at high partition counts). Key detail: `isolation.level=read_committed` causes consumers to hold back records from partitions with open transactions until those transactions are committed or aborted — this introduces latency equal to the transaction duration, typically 100-500ms in practice.

Idempotent Producers and Transactions

During a peak traffic window, a payment processing service lost network connectivity to Kafka for 800ms. The producer retried the in-flight batch 3 times before the connection recovered. Without idempotence, all 3 retries succeeded — 3 copies of 47 payment-completed events were appended. The fraud detection system counted 47 transactions as 141, triggering account freezes for 47 customers. Root cause: `enable.idempotence=false` (legacy default). With idempotent producer, each message carries a producer ID (PID) assigned by the broker and a monotonically increasing sequence number per partition. The broker rejects duplicates when it receives a sequence number it has already seen from the same PID — exactly one copy is stored regardless of how many retries. Producer epoch: when a producer instance initializes, the broker increments the epoch for that transactional.id and fences any zombie producers with lower epochs — preventing stale producers from appending after a leader failover. Kafka transactions: `transactional.id` must be unique per producer instance (e.g., `payment-processor-pod-0`). A transaction fence — if two producers use the same transactional.id, the newer one fences the older one. Key insight from Designing Data-Intensive Applications: "Distributed transactions use atomic commit to ensure that changes take effect exactly once, while log-based systems are often based on deterministic retry and idempotence." Kleppmann frames exactly-once semantics not as a single feature but as a guarantee achieved through composition: idempotent producers eliminate producer-side duplicates, transactions make multi-partition writes atomic, and `isolation.level=read_committed` on consumers prevents reading uncommitted or aborted data. Each layer addresses a distinct failure mode. The practical overhead of EOS in Kafka Streams with `exactly_once_v2` is approximately 3-5% throughput reduction versus at-least-once, making it viable for most financial workloads where duplicate prevention justifies the cost.

Log Compaction and Tiered Storage

Log compaction keeps the latest value for each key — instead of deleting old records by time, it retains the most recent record per key. Tombstone: sending a record with null value marks the key for deletion in the next compaction. The compaction cleaner runs in the background, merging log segments and retaining only the latest offset per key; `min.cleanable.dirty.ratio=0.5` controls when compaction kicks in (when 50% of the log is dirty). `min.compaction.lag.ms` prevents compaction of records newer than this value — useful if consumers need a guaranteed window to see all versions of a key. Tiered storage (Kafka 3.6, production-ready): brokers offload older log segments to object storage (S3, GCS) while keeping recent segments local. This decouples storage scaling from broker compute — you can retain 1 year of events without buying more disk. Brokers serve recent reads from local NVMe; older reads are fetched from S3 transparently. Critical for analytics use cases where long retention was previously cost-prohibitive. Config: `remote.log.storage.system.enable=true`, `remote.log.metadata.manager.class.name=org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManager`.

Key Takeaways

  • EOS requires all three components: idempotent producers + Kafka transactions + read_committed isolation on consumers.
  • Idempotent producers alone prevent duplicates from retries — a significant improvement over at-least-once with minimal overhead.
  • Log compacted topics are ideal for state changelog — they retain the latest value per key indefinitely, enabling state reconstruction.

Code example

// Transactional producer: exactly-once read-process-write
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG,
  "payment-processor-" + instanceId);  // unique per pod
props.put(ProducerConfig.TRANSACTION_TIMEOUT_MS_CONFIG, 60_000);

producer.initTransactions(); // registers with transaction coordinator, fences zombies
try {
  producer.beginTransaction();
  ConsumerRecords<K,V> records = consumer.poll(Duration.ofMillis(500));
  List<ProducerRecord<K,V>> results = processPayments(records);
  results.forEach(producer::send);
  // atomically commit offsets within the transaction
  Map<TopicPartition, OffsetAndMetadata> offsets = toOffsets(records);
  producer.sendOffsetsToTransaction(offsets, consumer.groupMetadata());
  producer.commitTransaction();  // consumers with read_committed now see the records
} catch (ProducerFencedException e) {
  log.error("Fenced by newer epoch — this instance is a zombie, shutting down", e);
  throw e;  // do NOT abort — just exit
} catch (Exception e) {
  producer.abortTransaction();   // atomically discards all sends and offset commits
  throw e;
}