Flow ControlCredit-BasedDetectionResolutionKafka Lag

Backpressure

Backpressure occurs when downstream operators can't keep up with upstream throughput. Flink's credit-based flow control propagates backpressure naturally through the pipeline, preventing data loss without unbounded buffering.

30 min read7 sections
01

What Backpressure Is

Backpressure is the situation where a downstream operator processes data slower than the upstream operator produces it. Without flow control, buffers would grow unbounded until OOM. With flow control, the slow operator signals upstream to slow down — pressure propagates backward through the pipeline.

šŸš—

The Highway Traffic Analogy

Backpressure is like traffic on a highway. When there's an accident (slow operator) ahead, cars (events) pile up behind it. The congestion propagates backward — cars 5 miles back slow down even though they can't see the accident. Eventually, the on-ramp (source) gets backed up too. Flink's flow control is like traffic signals on the on-ramp — they meter the flow to match downstream capacity.

backpressure-scenario.txttext
Backpressure scenario:

Source (100K events/sec) → Map → KeyBy → Window → Sink (50K events/sec)
                                                         ↑
                                                    BOTTLENECK!

Without flow control:
  Source produces 100K/sec
  Sink can only handle 50K/sec
  Buffer grows by 50K events/sec
  After 60 seconds: 3M events buffered
  After 5 minutes: OOM crash šŸ’„

With Flink's credit-based flow control:
  Sink signals "I'm full" to Window operator
  Window signals "I'm full" to KeyBy
  KeyBy signals "I'm full" to Map
  Map signals "I'm full" to Source
  Source slows down to 50K/sec (matches sink capacity)
  → No unbounded buffering, no OOM
  → But: end-to-end latency increases
  → And: Kafka consumer lag grows (source reads slower)

Backpressure is Normal

Some backpressure is normal and expected — it means your flow control is working. Sustained backpressure is the problem — it means your pipeline can't keep up with input rate and you need to either increase capacity or reduce input.

02

How Flink Handles Backpressure

Flink uses a credit-based flow control mechanism. Each receiver tells the sender how many buffers it can accept (credits). When credits run out, the sender stops sending. This propagates naturally through the entire pipeline without any central coordinator.

credit-based-flow.txttext
Credit-Based Flow Control:

Sender (upstream)                    Receiver (downstream)
ā”Œā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”                    ā”Œā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”
│ Output Buffer│ ──── data ────→    │ Input Buffer │
│              │ ←── credits ───    │              │
ā””ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”˜                    ā””ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”˜

1. Receiver has N input buffers available (e.g., 4)
2. Receiver sends credits=4 to sender
3. Sender sends up to 4 buffers of data
4. Receiver processes buffers, frees them
5. Receiver sends new credits as buffers are freed
6. If receiver is slow → no credits sent → sender blocks

Network buffer pool per TaskManager:
  - Configured via taskmanager.memory.network.fraction (default 10%)
  - Split into input buffers and output buffers
  - Each channel (connection between operators) gets exclusive buffers
  - Floating buffers shared across channels for flexibility

Backpressure propagation:
  Sink full → no credits to Window → Window blocks
  Window full → no credits to KeyBy → KeyBy blocks
  KeyBy full → no credits to Source → Source stops reading Kafka
  → Kafka consumer lag increases (visible externally)
1

Receiver runs out of input buffers

The downstream operator is processing slowly. Its input buffer pool fills up completely.

2

No credits sent upstream

Since no buffers are free, the receiver can't grant credits to the sender.

3

Sender's output buffers fill

The sender can't send data (no credits), so its output buffers fill up.

4

Sender blocks on output

The sender's thread blocks when trying to write to a full output buffer. It stops processing input.

5

Pressure propagates upstream

The blocked sender now can't consume from ITS input buffers, causing the same cascade upstream.

03

Detecting Backpressure

Flink provides multiple ways to detect and diagnose backpressure: the Web UI backpressure indicator, task metrics, and checkpoint duration analysis.

detecting-backpressure.txttext
Detection Methods:

1. Flink Web UI — Backpressure Tab
   Each operator shows: OK / LOW / HIGH
   - OK: < 10% of time backpressured
   - LOW: 10-50% of time backpressured
   - HIGH: > 50% of time backpressured

   Reading the UI:
   Source [OK] → Map [OK] → Window [HIGH] → Sink [OK]
                                    ↑
   The FIRST operator showing HIGH is the bottleneck's UPSTREAM
   The actual bottleneck is the operator AFTER the backpressured one
   → Sink is the bottleneck (Window is backpressured because Sink is slow)

2. Metrics (Prometheus/Grafana)
   - outPoolUsage: % of output buffers in use (high = sending blocked)
   - inPoolUsage: % of input buffers in use (high = receiving backed up)
   - numRecordsOutPerSecond: throughput (dropping = backpressure)
   - busyTimeMsPerSecond: time operator is busy (1000 = fully saturated)

3. Checkpoint Duration
   - Long checkpoint duration often indicates backpressure
   - Barrier alignment takes longer when buffers are full
   - Check "Alignment Duration" in checkpoint details

4. Kafka Consumer Lag
   - Growing lag = source is reading slower than production rate
   - Often caused by downstream backpressure slowing the source

Backpressure Indicators

  • āœ…Web UI shows HIGH backpressure on operators upstream of bottleneck
  • āœ…outPoolUsage > 90% on the operator before the bottleneck
  • āœ…busyTimeMsPerSecond = 1000 on the bottleneck operator (100% busy)
  • āœ…Checkpoint alignment duration increasing over time
  • āœ…Kafka consumer lag growing steadily
  • āœ…numRecordsOutPerSecond dropping below input rate
04

Resolving Backpressure

Resolving backpressure requires identifying the bottleneck and either increasing its capacity or reducing the load. The approach depends on whether the bottleneck is CPU-bound, I/O-bound, or state-access-bound.

Bottleneck TypeSymptomsSolutions
CPU-bound operatorbusyTime=1000, high CPU usageIncrease parallelism, optimize UDF code, enable chaining
I/O-bound sinkLow CPU but high outPoolUsageBatch writes, async I/O, increase sink parallelism
State access (RocksDB)High disk I/O, slow processElementTune RocksDB (cache, write buffer), use SSD, reduce state size
Skewed keysSome subtasks HIGH, others OKPre-aggregate before keyBy, use rebalance(), salt hot keys
GC pressurePeriodic spikes in latencyIncrease heap, tune GC, reduce object creation, use RocksDB
resolution-strategies.javajava
// Strategy 1: Increase parallelism of bottleneck operator
stream
    .keyBy(Event::getUserId)
    .process(new ExpensiveFunction())
    .setParallelism(32)  // increase just this operator
    .name("Expensive Processing");

// Strategy 2: Async I/O for slow external calls
AsyncDataStream.unorderedWait(
    stream,
    new AsyncDatabaseLookup(),
    30, TimeUnit.SECONDS,
    200  // 200 concurrent requests (up from default)
);

// Strategy 3: Pre-aggregate to reduce downstream load
stream
    .keyBy(Event::getRegion)
    .window(TumblingProcessingTimeWindows.of(Time.seconds(1)))
    .reduce((a, b) -> merge(a, b))  // pre-aggregate
    .keyBy(Result::getRegion)
    .process(new ExpensiveSink());  // less data to process

// Strategy 4: Handle data skew with salting
stream
    .map(event -> {
        // Add random salt to hot keys
        String saltedKey = event.getKey() + "_" + random.nextInt(10);
        return event.withKey(saltedKey);
    })
    .keyBy(Event::getKey)
    .window(TumblingEventTimeWindows.of(Time.minutes(1)))
    .reduce(reducer)
    // Second stage: aggregate salt partitions
    .keyBy(result -> result.getOriginalKey())
    .window(TumblingEventTimeWindows.of(Time.minutes(1)))
    .reduce(reducer);

Find the Bottleneck First

Don't increase parallelism blindly. First identify which operator is the actual bottleneck (the one AFTER the backpressured operators). Then determine WHY it's slow (CPU, I/O, state, skew). The fix depends on the root cause — more parallelism doesn't help if the problem is a slow external system or data skew.

05

Backpressure vs Kafka Consumer Lag

Kafka consumer lag and Flink backpressure are related but distinct concepts. Backpressure is the internal flow control mechanism. Consumer lag is the external symptom visible in Kafka when the source can't read fast enough.

lag-vs-backpressure.txttext
Relationship between backpressure and Kafka lag:

Normal operation (no backpressure):
  Kafka production rate: 100K events/sec
  Flink consumption rate: 100K events/sec
  Consumer lag: ~0 (caught up)
  Backpressure: none

Backpressure scenario:
  Kafka production rate: 100K events/sec
  Flink processing capacity: 60K events/sec (bottleneck downstream)
  Flink consumption rate: 60K events/sec (throttled by backpressure)
  Consumer lag: growing by 40K events/sec
  Backpressure: HIGH on operators upstream of bottleneck

Key insight:
  Backpressure → Source reads slower → Lag grows
  Lag is the SYMPTOM, backpressure is the MECHANISM

Lag without backpressure (different problem):
  - Source parallelism < Kafka partitions (some partitions unread)
  - Deserialization is the bottleneck (source itself is slow)
  - Network issues between Flink and Kafka

Monitoring both:
  - Kafka lag: tells you HOW FAR BEHIND you are
  - Backpressure metrics: tell you WHERE the bottleneck is
  - Together: diagnose both the severity and the cause
MetricWhat It Tells YouWhere to Look
Kafka consumer lagHow far behind real-time you areKafka monitoring (Burrow, Confluent)
Backpressure (Web UI)Which operator is the bottleneckFlink Web UI → Backpressure tab
busyTimeMsPerSecondHow saturated each operator isFlink metrics (Prometheus)
Checkpoint durationOverall pipeline healthFlink Web UI → Checkpoints

Acceptable Lag

Some consumer lag is acceptable depending on your SLA. If your requirement is "results within 5 minutes of event time," then 2 minutes of lag is fine. Monitor lag trends — steady lag is okay, growing lag means you're falling further behind and need to scale.

06

Interview Questions

Q:What is backpressure and how does Flink handle it?

A: Backpressure occurs when downstream operators can't keep up with upstream throughput. Flink handles it with credit-based flow control: each receiver tells the sender how many buffers (credits) it can accept. When credits run out, the sender blocks. This propagates naturally upstream — eventually the source slows down to match the slowest operator's capacity. No data is lost, no unbounded buffering occurs. The trade-off: end-to-end latency increases and Kafka consumer lag grows.

Q:How do you identify the bottleneck operator causing backpressure?

A: In the Flink Web UI, look at the backpressure indicators. The bottleneck is the operator AFTER the last backpressured operator. Example: Source[OK] → Map[HIGH] → Window[HIGH] → Sink[OK]. The Sink is the bottleneck — Map and Window are backpressured because Sink is slow. Confirm with metrics: the bottleneck has busyTimeMsPerSecond=1000 (100% busy) and low outPoolUsage (it's not blocked sending, it's busy processing). Upstream operators have high outPoolUsage (blocked trying to send).

Q:What's the relationship between Flink backpressure and Kafka consumer lag?

A: They're cause and effect. When downstream backpressure propagates to the Kafka source, the source reads slower than the production rate. This causes consumer lag to grow. Lag is the external symptom; backpressure is the internal mechanism. Important: lag can exist without backpressure (source parallelism too low, deserialization bottleneck). And backpressure can exist without lag (if the source isn't Kafka, or if the bottleneck is between two internal operators). Monitor both: lag tells you severity, backpressure tells you location.

Q:How would you resolve backpressure caused by data skew?

A: Data skew means some keys have much more data than others — a few subtasks are overloaded while others are idle. Solutions: (1) Two-stage aggregation with key salting — add random suffix to hot keys, aggregate per-salt, then aggregate across salts. (2) Pre-aggregation — use a small tumbling window before the expensive operation to reduce volume. (3) Rebalance — if the operation doesn't need keying, use .rebalance() for round-robin distribution. (4) Custom partitioner — distribute hot keys across multiple subtasks with a custom KeySelector.

07

Common Mistakes

šŸŽÆ

Increasing parallelism of the wrong operator

Seeing backpressure on the Map operator and increasing Map's parallelism. But Map is backpressured because the downstream Sink is slow — increasing Map parallelism makes it worse (more data hitting the same slow sink).

āœ…Always identify the actual bottleneck (operator AFTER the backpressured ones) before scaling. Increase parallelism of the bottleneck operator, not the backpressured upstream operators.

šŸ”§

Adding network buffers to fix backpressure

Increasing taskmanager.memory.network.fraction thinking more buffers will fix backpressure. More buffers just delay the onset — they fill up eventually and backpressure returns, now with higher memory usage.

āœ…More buffers don't fix throughput problems — they just add latency before backpressure kicks in. Fix the root cause: optimize the bottleneck operator, increase its parallelism, or reduce input rate.

šŸ“Š

Ignoring data skew

All subtasks of an operator show OK except one that shows HIGH backpressure. The hot key problem — one key has 100x more data than others, overloading a single subtask.

āœ…Check per-subtask metrics in the Web UI. If one subtask is saturated while others are idle, you have skew. Use key salting (two-stage aggregation), pre-aggregation, or a custom partitioner to distribute hot keys.

⚔

Not monitoring backpressure proactively

Only noticing backpressure when Kafka lag alerts fire or results are delayed. By then, you're already behind and catching up takes time.

āœ…Set up Grafana dashboards with busyTimeMsPerSecond and outPoolUsage per operator. Alert when busyTime > 800ms/sec (80% saturated) — this gives you warning before full backpressure develops. Also monitor checkpoint duration trends.