Kafka Producers & Consumers
How data flows in and out of Kafka — batching, partitioning strategies, consumer groups, rebalancing, and delivery guarantees.
Table of Contents
Producer Fundamentals
A Kafka producer sends records to topics. The producer is responsible for choosing which partition a record goes to (via key hashing or explicit assignment), serializing the data, and handling retries on failure.
Producer send() flow: 1. Serialize key and value (using configured serializer) 2. Determine target partition: - If key is set: hash(key) % numPartitions - If key is null: sticky partitioner (batch to one, then rotate) - If partition is explicitly set: use that partition 3. Add record to internal batch buffer (per partition) 4. Background sender thread ships batches to brokers 5. Broker acknowledges (based on acks config) 6. On failure: retry up to retries times with retry.backoff.ms delay Key producer configs: bootstrap.servers = "broker1:9092,broker2:9092,broker3:9092" key.serializer = StringSerializer (or Avro, Protobuf, JSON) value.serializer = StringSerializer acks = "all" retries = 2147483647 (Integer.MAX_VALUE — effectively infinite) delivery.timeout.ms = 120000 (2 min — outer bound on retries) enable.idempotence = true
Partitioning Strategies
| Strategy | How It Works | Ordering Guarantee | Use Case |
|---|---|---|---|
| Key-based (default) | hash(key) % numPartitions | All records with same key → same partition → ordered | Per-entity ordering (user events, order events) |
| Sticky (null key, Kafka 2.4+) | Batch to one partition, rotate on batch completion | No ordering guarantee | High-throughput, no ordering needed (metrics, logs) |
| Round-robin (null key, legacy) | Rotate partition per record | No ordering guarantee | Even distribution, small batches (less efficient) |
| Custom partitioner | Application logic determines partition | Application-defined | Geo-routing, priority lanes, custom load balancing |
Batching & Compression
Kafka producers batch records together before sending to brokers. Batching amortizes network overhead and enables compression — both critical for high throughput.
# How batching works: # Producer accumulates records in per-partition buffers # A batch is sent when EITHER condition is met: # 1. batch.size bytes accumulated (default: 16 KB) # 2. linger.ms time elapsed since first record in batch (default: 0 ms) # Throughput-optimized settings: batch.size = 65536 # 64 KB batches linger.ms = 20 # Wait up to 20ms to fill batch compression.type = lz4 # Compress batches before sending # Latency-optimized settings: batch.size = 16384 # 16 KB (default) linger.ms = 0 # Send immediately (no waiting) compression.type = none # No compression overhead # The trade-off: # Higher linger.ms → larger batches → better throughput, higher latency # Lower linger.ms → smaller batches → lower latency, more network calls
Compression Algorithms
| Algorithm | Compression Ratio | CPU Cost | Best For |
|---|---|---|---|
| none | 1:1 (no compression) | Zero | Already-compressed data, ultra-low latency |
| gzip | Best ratio (~70% reduction) | Highest | Bandwidth-constrained, archival topics |
| snappy | Moderate (~50% reduction) | Low | Balanced — good default for most workloads |
| lz4 | Good (~60% reduction) | Very low | High-throughput, low-latency — recommended default |
| zstd | Excellent (~65% reduction) | Moderate | Best ratio-to-speed trade-off for large messages |
💡 Compression Happens at Batch Level
Kafka compresses entire batches, not individual records. Larger batches compress better because similar records share patterns. This is why increasing batch.size and linger.ms improves both throughput AND compression ratio.
Idempotent Producers
Without idempotency, producer retries can cause duplicate records. If the broker writes a record but the ack is lost (network timeout), the producer retries and the record is written again. Idempotent producers solve this.
# Enable idempotency (default since Kafka 3.0): enable.idempotence = true # How it works: # 1. Producer gets a unique Producer ID (PID) on initialization # 2. Each record gets a sequence number (per partition, monotonically increasing) # 3. Broker tracks: (PID, partition) → last sequence number # 4. If broker receives a record with sequence ≤ last seen → duplicate, discard # 5. If broker receives a record with sequence > last + 1 → out of order, reject # What this guarantees: # ✓ Exactly-once delivery from producer to broker (per partition) # ✓ No duplicates from retries # ✓ In-order delivery within a partition # What this requires: # acks = all (enforced automatically) # retries > 0 (enforced automatically) # max.in.flight.requests.per.connection ≤ 5 (enforced automatically) # What this does NOT cover: # ✗ Cross-partition atomicity (use transactions for that) # ✗ Consumer-side deduplication (consumer may process twice on rebalance)
The Numbered Envelope
Imagine sending numbered envelopes to a mailroom. Each envelope has your sender ID and a sequence number. If the mailroom receives envelope #5 from you but already has #5 filed, it discards the duplicate. If it receives #7 but only has up to #4, it knows #5 and #6 are missing and rejects #7. This is exactly how Kafka's idempotent producer works — the broker deduplicates based on (ProducerID, SequenceNumber).
⚠️ max.in.flight.requests.per.connection
Without idempotency, if max.in.flight.requests > 1, retries can reorder records (batch 2 succeeds, batch 1 fails and retries, landing after batch 2). With idempotency enabled, Kafka handles this correctly for up to 5 in-flight requests — the broker rejects out-of-sequence writes and the producer retries in order.
Consumer Groups
A consumer group is the fundamental unit of consumption in Kafka. Each partition is assigned to exactly one consumer within a group. Multiple consumer groups can read the same topic independently — this is how Kafka achieves fan-out.
Topic: "orders" (6 partitions) Consumer Group "order-processing" (3 consumers): Consumer A → [P0, P1] (handles 2 partitions) Consumer B → [P2, P3] (handles 2 partitions) Consumer C → [P4, P5] (handles 2 partitions) Consumer Group "analytics" (2 consumers): Consumer X → [P0, P1, P2] (handles 3 partitions) Consumer Y → [P3, P4, P5] (handles 3 partitions) Key rules: 1. Each partition → exactly ONE consumer in a group (no sharing) 2. One consumer can handle MULTIPLE partitions 3. If consumers > partitions → some consumers are idle 4. If consumers < partitions → some consumers handle multiple 5. Different groups are completely independent (different offsets) 6. Max parallelism = number of partitions
Pizza Slices at a Party
A topic with 6 partitions is a pizza cut into 6 slices. A consumer group is a group of friends sharing the pizza. With 3 friends, each gets 2 slices. With 6 friends, each gets 1 slice. With 8 friends, 2 are left without pizza (idle consumers). A second group of friends (another consumer group) gets their own identical pizza — they don't compete with the first group.
Partition Assignment Strategies
| Strategy | How It Works | Pros | Cons |
|---|---|---|---|
| RangeAssignor | Assigns contiguous partitions per topic to each consumer | Simple, predictable | Can be uneven across multiple topics |
| RoundRobinAssignor | Distributes partitions round-robin across consumers | More even distribution | More partition movement on rebalance |
| StickyAssignor | Balanced assignment that minimizes partition movement | Fewer reassignments on rebalance | Still stop-the-world rebalance |
| CooperativeStickyAssignor | Incremental rebalancing — only moves necessary partitions | No stop-the-world; consumers keep processing | May take multiple rebalance rounds to converge |
🎯 Use CooperativeStickyAssignor in Production
The default RangeAssignor causes stop-the-world rebalances where ALL consumers stop processing. CooperativeStickyAssignor only revokes the partitions that need to move — other consumers continue processing uninterrupted. Set partition.assignment.strategy= org.apache.kafka.clients.consumer.CooperativeStickyAssignor.
Rebalancing
A rebalance is the process of redistributing partitions among consumers in a group. It's triggered when the group membership changes — a consumer joins, leaves, crashes, or the topic's partition count changes.
Triggers: - Consumer joins the group (new instance deployed) - Consumer leaves gracefully (shutdown, close()) - Consumer crashes (missed heartbeats → session.timeout.ms exceeded) - Consumer takes too long to process (max.poll.interval.ms exceeded) - Partition count changes (topic altered) Classic (Eager) Rebalance: 1. Group Coordinator detects membership change 2. ALL consumers revoke ALL their partitions (stop processing) 3. Group Leader computes new assignment 4. All consumers receive new assignment and resume Problem: ENTIRE group stops processing during rebalance Cooperative (Incremental) Rebalance: 1. Group Coordinator detects membership change 2. Only affected partitions are revoked from current owners 3. Revoked partitions are assigned to new owners 4. Non-affected consumers continue processing uninterrupted Benefit: Minimal disruption — most consumers keep working
Critical Consumer Timeouts
| Config | Default | Purpose | If Exceeded |
|---|---|---|---|
| session.timeout.ms | 45000 (45s) | How long before a consumer is considered dead (no heartbeat) | Consumer removed from group → rebalance triggered |
| heartbeat.interval.ms | 3000 (3s) | How often consumer sends heartbeats to coordinator | Should be ≤ 1/3 of session.timeout.ms |
| max.poll.interval.ms | 300000 (5 min) | Max time between poll() calls (processing time budget) | Consumer considered stuck → removed from group → rebalance |
⚠️ Rebalance Storms
If your processing takes longer than max.poll.interval.ms, the consumer is kicked out, triggering a rebalance. The rebalance reassigns its partitions to another consumer, which may also exceed the timeout — cascading into a "rebalance storm" where the group never stabilizes. Fix: increase max.poll.interval.ms or reduce max.poll.records to process smaller batches.
Offset Management
Kafka stores committed offsets in an internal topic called __consumer_offsets. This is how Kafka tracks each consumer group's progress per partition. Understanding offset management is critical for delivery guarantees.
# Pattern 1: Auto-commit (default, dangerous) enable.auto.commit = true auto.commit.interval.ms = 5000 # Problem: commits happen on a timer, not after processing # If consumer crashes after auto-commit but before processing → data loss # If consumer crashes after processing but before auto-commit → duplicates # Pattern 2: Manual commit after processing (recommended) enable.auto.commit = false while (true) { records = consumer.poll(Duration.ofMillis(1000)) for (record in records) { process(record) // Process first } consumer.commitSync() // Then commit (blocks until confirmed) } # Pattern 3: Manual async commit (higher throughput) enable.auto.commit = false while (true) { records = consumer.poll(Duration.ofMillis(1000)) for (record in records) { process(record) } consumer.commitAsync((offsets, exception) -> { if (exception != null) log.error("Commit failed", exception) }) } # Pattern 4: Commit per partition (fine-grained) # Useful when processing is slow and you want progress per partition for (partition in records.partitions()) { partitionRecords = records.records(partition) for (record in partitionRecords) { process(record) } lastOffset = partitionRecords.last().offset() consumer.commitSync(Map.of(partition, new OffsetAndMetadata(lastOffset + 1))) }
Consumer Lag
Consumer lag = latest offset in partition − last committed offset by consumer group. It's the primary health metric for Kafka consumers — it tells you how far behind a consumer is from real-time.
Lag Monitoring Best Practices
- ✅Alert when lag exceeds a threshold (e.g., > 10,000 records or > 5 minutes behind)
- ✅Monitor lag per partition — a single slow partition indicates a hot key or slow processing
- ✅Increasing lag = consumer is slower than producer; needs more consumers or faster processing
- ✅Stable non-zero lag = consumer is keeping up but with a fixed delay (may be acceptable)
- ✅Tools: kafka-consumer-groups.sh --describe, Burrow, Confluent Control Center, Prometheus exporters
Delivery Guarantees & Exactly-Once
| Guarantee | How to Achieve | Risk | Use Case |
|---|---|---|---|
| At-most-once | Commit offset BEFORE processing | Data loss if crash during processing | Metrics where occasional loss is acceptable |
| At-least-once | Commit offset AFTER processing | Duplicates if crash after processing but before commit | Most use cases — make consumers idempotent |
| Exactly-once (within Kafka) | Idempotent producer + transactions + read_committed | Higher latency, more complexity | Financial systems, read-process-write pipelines |
Kafka Transactions
# Transactional producer + consumer for exactly-once processing: # Producer config: transactional.id = "order-processor-1" # Unique per instance enable.idempotence = true # Required for transactions # Consumer config: isolation.level = "read_committed" # Only see committed transaction records enable.auto.commit = false # Offsets committed within transaction # The read-process-write loop: producer.initTransactions() while (true) { records = consumer.poll(Duration.ofMillis(1000)) producer.beginTransaction() try { for (record in records) { result = transform(record) producer.send(new ProducerRecord("output-topic", result)) } // Commit consumer offsets within the same transaction producer.sendOffsetsToTransaction(currentOffsets, consumerGroupId) producer.commitTransaction() } catch (exception) { producer.abortTransaction() // Offsets not committed → records will be re-consumed } } # What this guarantees: # ✓ Output records + offset commit are atomic # ✓ Either both happen or neither happens # ✓ No duplicates in output topic, no records skipped # What this does NOT guarantee: # ✗ Exactly-once to external systems (DB writes, API calls) # ✗ For external side effects, make them idempotent
🔑 Exactly-Once Has Boundaries
Kafka's exactly-once semantics (EOS) only applies within Kafka — from input topic through processing to output topic. If your consumer writes to a database or calls an external API, Kafka cannot guarantee those side effects happen exactly once. You must make external operations idempotent (e.g., use unique IDs, upserts, or deduplication tables).
Interview Questions
Q:How do consumer groups enable both load balancing and fan-out?
A: Within a consumer group, partitions are distributed among consumers — this is load balancing (each record processed by one consumer). Across consumer groups, each group gets ALL records independently — this is fan-out (same event triggers order-processing AND analytics AND notifications). One topic, multiple groups, each with their own offsets.
Q:What causes a rebalance and how do you minimize its impact?
A: Rebalances are triggered by: consumer join/leave/crash, exceeding max.poll.interval.ms, or partition count changes. Minimize impact by: (1) Use CooperativeStickyAssignor for incremental rebalancing. (2) Tune max.poll.interval.ms to match your processing time. (3) Use static group membership (group.instance.id) to avoid rebalances on restarts. (4) Reduce max.poll.records if processing is slow.
Q:Explain the difference between idempotent producers and transactions.
A: Idempotent producers prevent duplicates within a single partition from retries (using PID + sequence numbers). Transactions provide atomicity across multiple partitions and topics — either all records in a transaction are committed or none are. Transactions also allow atomic offset commits, enabling exactly-once read-process-write pipelines. Idempotency is a prerequisite for transactions.
Q:Why is consumer lag the most important Kafka metric?
A: Consumer lag (latest offset - committed offset) tells you how far behind a consumer is from real-time. Increasing lag means the consumer can't keep up with the producer — data is getting stale. This directly impacts user experience (delayed notifications, stale dashboards). It's actionable: add more consumers, optimize processing, or increase partitions. Zero lag means real-time processing.
Q:How does Kafka achieve exactly-once semantics?
A: EOS requires three components working together: (1) Idempotent producer — prevents duplicate writes from retries. (2) Transactions — atomic writes across partitions + atomic offset commits. (3) read_committed isolation on consumers — only reads records from committed transactions. Together, this ensures each input record produces exactly one set of output records with no duplicates or gaps. But EOS only works within Kafka — external side effects need separate idempotency.
Common Mistakes
Using auto-commit in production
Leaving enable.auto.commit=true and assuming records are processed before commit. Auto-commit is time-based, not processing-based — it commits on a timer regardless of whether processing succeeded.
✅Disable auto-commit and commit manually after successful processing. Auto-commit is time-based, not processing-based — it can cause both data loss and duplicates.
More consumers than partitions
Scaling to 12 consumers for a topic with 6 partitions, expecting 2x throughput. Extra consumers sit completely idle — they receive no partitions.
✅Max parallelism = partition count. Extra consumers sit idle. If you need more throughput, increase partitions first, then add consumers.
Ignoring max.poll.interval.ms
Processing large batches that take 10+ minutes without adjusting max.poll.interval.ms. The consumer gets kicked out, triggering a rebalance storm.
✅Either increase max.poll.interval.ms to match your processing time, or reduce max.poll.records to process smaller batches within the timeout. Exceeding this timeout triggers a rebalance.
Not making consumers idempotent
Assuming at-least-once delivery means exactly-once processing. Rebalances and crashes WILL cause duplicate delivery — it's a guarantee, not an edge case.
✅At-least-once means duplicates WILL happen (rebalances, crashes). Design consumers to handle duplicates: use unique IDs, upserts, or deduplication tables. Idempotent processing is your responsibility.
Synchronous processing in the poll loop
Making blocking HTTP calls or DB writes inside the poll loop, causing max.poll.interval.ms timeout and cascading rebalances.
✅Use async processing with a bounded work queue, or pause() partitions while processing and resume() when ready. Keep the poll loop responsive to avoid rebalances.