CheckpointsSavepointsChandy-LamportExactly-OnceTwo-Phase CommitBarriers

Fault Tolerance & Checkpointing

Flink achieves fault tolerance through periodic distributed snapshots (checkpoints) using the Chandy-Lamport algorithm. Combined with two-phase commit for sinks, it provides end-to-end exactly-once semantics even across failures.

45 min read9 sections
01

The Challenge

Stream processing runs continuously — 24/7, for months or years. During that time, machines crash, networks partition, and disks fail. The challenge: how do you recover from failures without losing data or producing duplicate results, while processing millions of events per second?

šŸ“ø

The Group Photo Analogy

Imagine taking a group photo of 100 people who are constantly moving and talking. You need a consistent snapshot — everyone frozen at the same logical instant. You can't actually stop everyone (that would halt processing). Instead, you send a 'say cheese' signal that propagates through the group. When each person hears it, they freeze their pose (save state). The photo is consistent even though people froze at slightly different wall-clock times — the signal ensures logical consistency.

failure-scenarios.txttext
What can go wrong in a streaming job:

Hardware failures:
  - TaskManager JVM crashes (OOM, segfault)
  - Machine dies (power, hardware failure)
  - Disk corruption or full

Software failures:
  - User code throws exception (NPE, parsing error)
  - Serialization failure (schema mismatch)
  - Timeout (checkpoint takes too long)

Infrastructure failures:
  - Network partition between TaskManagers
  - Kafka broker unavailable
  - S3/HDFS checkpoint storage unreachable

Without fault tolerance:
  - All in-flight state is LOST
  - Events processed since last output are LOST
  - Kafka offsets not committed → reprocessing → DUPLICATES
  - Hours of computation wasted

With Flink checkpointing:
  āœ… State restored to last consistent snapshot
  āœ… Source rewound to checkpoint position
  āœ… Processing resumes from consistent point
  āœ… No data loss, no duplicates (exactly-once)

Continuous vs Batch Recovery

Unlike batch jobs that can simply restart from scratch, streaming jobs have accumulated state (counters, windows, sessions) that took hours or days to build. Restarting from zero means losing all that state. Checkpoints preserve state so recovery takes seconds, not hours.

02

Checkpointing

Checkpointing is Flink's mechanism for creating consistent distributed snapshots of the entire job state. At configurable intervals, Flink snapshots all operator state and source positions to durable storage. On failure, the job restores from the latest completed checkpoint.

checkpoint-config.javajava
// Enable checkpointing
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

// Checkpoint every 60 seconds
env.enableCheckpointing(60_000);

// Checkpoint configuration
CheckpointConfig config = env.getCheckpointConfig();

// Exactly-once vs at-least-once
config.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);

// Minimum time between checkpoints (prevents overlap)
config.setMinPauseBetweenCheckpoints(30_000);

// Checkpoint timeout (fail if not complete in 10 min)
config.setCheckpointTimeout(600_000);

// Max concurrent checkpoints
config.setMaxConcurrentCheckpoints(1);

// Tolerate N checkpoint failures before job fails
config.setTolerableCheckpointFailureNumber(3);

// Keep checkpoints on cancellation (for manual recovery)
config.setExternalizedCheckpointCleanup(
    ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);

// Checkpoint storage
config.setCheckpointStorage("s3://my-bucket/checkpoints/");

Checkpoint Lifecycle

  • āœ…JobManager triggers checkpoint at configured interval
  • āœ…Barriers injected into source streams (one per checkpoint)
  • āœ…Barriers flow through the DAG with regular events
  • āœ…Each operator snapshots state when barrier arrives
  • āœ…State written asynchronously to checkpoint storage
  • āœ…JobManager confirms checkpoint when all operators report success
  • āœ…Old checkpoints cleaned up (keep last N)

Checkpoint Interval Trade-off

Shorter intervals (e.g., 10s) mean less data to reprocess on failure but more I/O overhead and potential backpressure from frequent snapshots. Longer intervals (e.g., 5min) mean less overhead but more reprocessing on failure. Start with 60 seconds and adjust based on your recovery time requirements and checkpoint duration metrics.

03

Chandy-Lamport Algorithm

Flink uses a variant of the Chandy-Lamport distributed snapshot algorithm. The key mechanism is checkpoint barriers — special markers injected into the stream that separate events belonging to different checkpoints. Barriers ensure a consistent cut across all operators without stopping processing.

barrier-alignment.txttext
Aligned Checkpoint Barriers:

Source 1: [e1] [e2] [B_n] [e3] [e4] ...
Source 2: [e5] [e6] [e7] [B_n] [e8] ...

Operator with 2 inputs:
  1. Receives B_n from Source 1 first
  2. BLOCKS Source 1 input (buffers events after barrier)
  3. Continues processing Source 2 until B_n arrives
  4. Receives B_n from Source 2
  5. NOW: takes state snapshot (consistent point)
  6. Emits B_n downstream
  7. Unblocks Source 1, processes buffered events

Timeline:
  Source 1: ──[e1]──[e2]──[B]──[e3]──[e4]──────────→
                              ↓
  Operator:  processes e1,e2  │ BLOCKS  │ snapshot │ resume
                              ↓         ↓          ↓
  Source 2: ──[e5]──[e6]──[e7]──[B]────────────────→
                                   ↑
                              still processing

Problem: blocking causes backpressure during alignment!
PropertyAligned BarriersUnaligned Barriers (1.11+)
MechanismBlock fast input until slow input's barrier arrivesSnapshot in-flight data between barriers
Backpressure during checkpointYes — blocked input causes upstream backpressureNo — no blocking, immediate snapshot
Checkpoint sizeSmaller (only operator state)Larger (state + in-flight buffers)
Checkpoint speedSlower (waits for alignment)Faster (immediate)
Best forLow-latency jobs with balanced inputsJobs with backpressure or skewed inputs
unaligned-checkpoints.javajava
// Enable unaligned checkpoints (Flink 1.11+)
env.getCheckpointConfig().enableUnalignedCheckpoints();

// Unaligned checkpoints:
// - Don't block inputs during barrier alignment
// - Instead, snapshot in-flight data (buffers) as part of checkpoint
// - Faster checkpoint completion under backpressure
// - Larger checkpoint size (includes network buffers)
// - Recommended when checkpoint duration is high due to alignment

When to Use Unaligned Checkpoints

Use unaligned checkpoints when your checkpoint duration is high due to barrier alignment (visible in the Web UI as "alignment duration"). This typically happens with backpressure or when input rates are very skewed. The trade-off: faster checkpoints but larger checkpoint size (includes in-flight network buffers).

04

Checkpoint Storage

Checkpoint data must be stored durably — surviving TaskManager crashes and machine failures. Flink supports multiple storage backends, with S3 and HDFS being the most common in production. Incremental checkpoints dramatically reduce I/O for large state.

checkpoint-storage.txttext
Checkpoint Storage Options:

1. JobManager Heap (development only):
   state.checkpoint-storage: jobmanager
   - Stored in JobManager memory
   - Lost if JobManager crashes
   - Only for testing!

2. Filesystem (production):
   state.checkpoint-storage: filesystem
   state.checkpoints.dir: s3://bucket/checkpoints/
   - Durable, survives any single failure
   - Supports S3, HDFS, GCS, Azure Blob

Full vs Incremental Checkpoints:
═══════════════════════════════════════════════════
Full checkpoint (HashMap backend):
  Checkpoint 1: [ā–ˆā–ˆā–ˆā–ˆā–ˆā–ˆā–ˆā–ˆā–ˆā–ˆā–ˆā–ˆ] 10 GB (full state)
  Checkpoint 2: [ā–ˆā–ˆā–ˆā–ˆā–ˆā–ˆā–ˆā–ˆā–ˆā–ˆā–ˆā–ˆ] 10 GB (full state again)
  Checkpoint 3: [ā–ˆā–ˆā–ˆā–ˆā–ˆā–ˆā–ˆā–ˆā–ˆā–ˆā–ˆā–ˆ] 10 GB (full state again)
  Total I/O per checkpoint: 10 GB

Incremental checkpoint (RocksDB backend):
  Checkpoint 1: [ā–ˆā–ˆā–ˆā–ˆā–ˆā–ˆā–ˆā–ˆā–ˆā–ˆā–ˆā–ˆ] 10 GB (initial full)
  Checkpoint 2: [ā–ˆā–ˆ]           2 GB  (only changed SST files)
  Checkpoint 3: [ā–ˆ]            1 GB  (only changed SST files)
  Total I/O per checkpoint: 1-2 GB (90% reduction!)
═══════════════════════════════════════════════════

Checkpoint Storage Best Practices

  • āœ…Always use S3/HDFS/GCS in production — never JobManager heap
  • āœ…Enable incremental checkpoints with RocksDB (90%+ I/O reduction for large state)
  • āœ…Set checkpoint retention: keep last 3-5 checkpoints for recovery options
  • āœ…Use externalized checkpoints (RETAIN_ON_CANCELLATION) for manual recovery
  • āœ…Monitor checkpoint size and duration — growing size indicates state leak
05

Savepoints

Savepoints are manually triggered, portable snapshots of the entire job state. Unlike checkpoints (automatic, for failure recovery), savepoints are for planned operations: upgrades, rescaling, A/B testing, and migration.

PropertyCheckpointSavepoint
TriggerAutomatic (periodic)Manual (CLI/API)
PurposeFailure recoveryPlanned operations
FormatBackend-specific (optimized)Portable, canonical format
LifecycleAuto-cleaned by FlinkUser-managed (never auto-deleted)
IncrementalYes (RocksDB)No (always full)
Use caseCrash recoveryUpgrades, rescaling, migration
savepoints.shbash
# Trigger savepoint (job keeps running)
flink savepoint <job-id> s3://savepoints/

# Stop job with savepoint (graceful shutdown)
flink stop --savepointPath s3://savepoints/ <job-id>

# Cancel job with savepoint (immediate)
flink cancel --withSavepoint s3://savepoints/ <job-id>

# Start job from savepoint
flink run -s s3://savepoints/savepoint-abc123 job.jar

# Start with different parallelism (rescaling)
flink run -s s3://savepoints/savepoint-abc123 -p 32 job.jar

# Common workflow: upgrade job version
# 1. Stop old version with savepoint
flink stop --savepointPath s3://savepoints/ <old-job-id>
# 2. Deploy new version from savepoint
flink run -s s3://savepoints/savepoint-xyz789 new-job-v2.jar

Operator UIDs Are Critical

Flink maps savepoint state to operators using operator UIDs. If you don't set explicit UIDs, Flink generates them from the job graph structure — any topology change breaks the mapping. Always set .uid("my-operator") on every stateful operator to ensure savepoint compatibility across code changes.

06

Exactly-Once Semantics

Exactly-once means each event affects the final result exactly once — no data loss (at-least-once) and no duplicates. Flink achieves this internally via checkpoints, but end-to-end exactly-once requires cooperation from sources and sinks.

exactly-once-layers.txttext
End-to-End Exactly-Once Requirements:

Layer 1: Source (replayable)
  āœ… Kafka — can replay from committed offset
  āœ… Kinesis — can replay from sequence number
  āŒ Socket source — cannot replay (at-most-once)

Layer 2: Internal Processing (Flink checkpoints)
  āœ… Guaranteed by checkpoint + barrier alignment
  āœ… On failure: restore state + rewind source to checkpoint
  āœ… Events between checkpoint and failure are reprocessed
  āœ… State is restored → reprocessing produces same result

Layer 3: Sink (idempotent or transactional)
  Option A: Idempotent sink
    āœ… Writes are idempotent (same write twice = same result)
    āœ… Example: upsert to database with primary key
    āœ… Simple but requires idempotent operations

  Option B: Transactional sink (two-phase commit)
    āœ… Writes committed only when checkpoint completes
    āœ… Example: Kafka sink with transactions
    āœ… True exactly-once but higher latency

End-to-end exactly-once = replayable source
                        + Flink checkpoints
                        + transactional/idempotent sink
1

Checkpoint triggers

JobManager initiates checkpoint N. Barriers injected at sources.

2

Source records position

Kafka source records current offset per partition as part of checkpoint N.

3

Operators snapshot state

All operators snapshot their state when barrier N passes through.

4

Sink pre-commits

Transactional sink pre-commits its pending writes (Kafka: flush to transaction).

5

Checkpoint completes

JobManager confirms all operators have snapshotted successfully.

6

Sink commits

Sink commits the transaction (Kafka: commitTransaction()). Data now visible to consumers.

Exactly-Once ≠ Exactly-Once Delivery

Flink's exactly-once is about the EFFECT on state and output, not about message delivery. Events may be delivered multiple times (after failure recovery), but the state and output reflect each event exactly once. This is sometimes called "effectively once" — the observable result is as if each event was processed exactly once.

07

Two-Phase Commit for Sinks

The two-phase commit (2PC) protocol enables exactly-once writes to external systems. The sink pre-commits data during checkpoint and only commits when the checkpoint is confirmed successful. If the checkpoint fails, the pre-committed data is rolled back.

two-phase-commit.txttext
Two-Phase Commit Protocol (Kafka Sink Example):

Phase 1: Pre-commit (during checkpoint)
  ā”Œā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”
  │ Checkpoint N triggered                       │
  │ Kafka Sink:                                  │
  │   1. Flush all buffered records to Kafka     │
  │   2. Records written but NOT committed       │
  │   3. Start new transaction for next records  │
  │   4. Report "pre-commit success" to JM       │
  ā””ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”˜

Phase 2: Commit (after checkpoint confirmed)
  ā”Œā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”
  │ JobManager confirms checkpoint N complete    │
  │ Kafka Sink:                                  │
  │   1. commitTransaction() on pre-committed tx │
  │   2. Records now visible to consumers        │
  │   3. (read_committed consumers see them now) │
  ā””ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”˜

On failure (between pre-commit and commit):
  ā”Œā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”
  │ TaskManager crashes after pre-commit         │
  │ Recovery:                                    │
  │   1. Restore from checkpoint N-1             │
  │   2. Abort uncommitted transaction           │
  │   3. Reprocess events from checkpoint N-1    │
  │   4. No duplicates! (aborted tx was invisible│
  │      to read_committed consumers)            │
  ā””ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”˜
kafka-exactly-once.javajava
// Kafka sink with exactly-once (two-phase commit)
KafkaSink<String> sink = KafkaSink.<String>builder()
    .setBootstrapServers("kafka:9092")
    .setRecordSerializer(
        KafkaRecordSerializationSchema.builder()
            .setTopic("output-topic")
            .setValueSerializationSchema(
                new SimpleStringSchema())
            .build())
    .setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE)
    .setTransactionalIdPrefix("flink-job-1")
    .setProperty("transaction.timeout.ms", "900000") // 15 min
    .build();

// IMPORTANT: Kafka transaction.timeout.ms must be >
// checkpoint interval + checkpoint timeout
// Otherwise Kafka aborts the transaction before Flink commits!

// Consumer side: must use read_committed isolation
// props.put("isolation.level", "read_committed");

Transaction Timeout

The Kafka transaction timeout must exceed your checkpoint interval plus maximum checkpoint duration. If a checkpoint takes 2 minutes and your interval is 1 minute, set transaction.timeout.ms to at least 5 minutes. If Kafka times out the transaction before Flink commits, you lose data.

08

Interview Questions

Q:How does Flink's checkpointing work at a high level?

A: Flink periodically creates consistent distributed snapshots using the Chandy-Lamport algorithm. (1) JobManager injects checkpoint barriers into source streams. (2) Barriers flow through the DAG with regular events. (3) When an operator receives barriers from all inputs, it snapshots its state to durable storage (S3/HDFS). (4) Source operators record their position (Kafka offsets). (5) When all operators report success, the checkpoint is complete. On failure: restore all operator state from the last checkpoint and rewind sources to the recorded positions.

Q:What's the difference between aligned and unaligned checkpoints?

A: Aligned: when an operator receives a barrier from one input but not others, it BLOCKS the fast input until all barriers arrive. This ensures consistency but causes backpressure during alignment. Unaligned (Flink 1.11+): instead of blocking, the operator immediately snapshots and includes in-flight buffered data as part of the checkpoint. No blocking = no backpressure during checkpoints, but larger checkpoint size. Use unaligned when checkpoint duration is high due to alignment (visible as 'alignment duration' in Web UI).

Q:How does Flink achieve end-to-end exactly-once semantics?

A: Three layers must cooperate: (1) Replayable source (Kafka) — can rewind to checkpoint position on failure. (2) Internal processing — checkpoints ensure state reflects each event exactly once; on failure, state restored + source rewound. (3) Transactional sink (2PC) — pre-commits during checkpoint, commits only after checkpoint confirmed. If failure occurs between pre-commit and commit, transaction is aborted and reprocessed. Alternative: idempotent sink (upsert with primary key) — duplicates have no effect.

Q:What are savepoints and how do they differ from checkpoints?

A: Savepoints are manually triggered, portable snapshots for planned operations (upgrades, rescaling, migration). Differences: (1) Trigger: savepoints are manual; checkpoints are automatic. (2) Format: savepoints use a portable canonical format; checkpoints use backend-optimized format. (3) Lifecycle: savepoints are never auto-deleted; checkpoints are cleaned up by Flink. (4) Incremental: savepoints are always full; checkpoints can be incremental. Use savepoints for: version upgrades, changing parallelism, A/B testing, migrating between clusters.

Q:Why are operator UIDs important for savepoint compatibility?

A: Flink maps savepoint state to operators using UIDs. Without explicit UIDs, Flink auto-generates them from the job graph topology (operator position). Any change to the graph (adding/removing/reordering operators) changes auto-generated UIDs, breaking the mapping — Flink can't find the state for operators. With explicit UIDs (.uid('my-op')), the mapping is stable across topology changes. Rule: always set .uid() on every stateful operator from day one. Retrofitting UIDs later requires a state migration.

09

Common Mistakes

šŸ·ļø

Not setting operator UIDs

Deploying without explicit .uid() on stateful operators. The first upgrade that changes the job graph breaks savepoint compatibility — state can't be mapped to operators.

āœ…Set .uid('descriptive-name') on EVERY stateful operator from day one. Use stable, descriptive names. This is the #1 cause of failed production upgrades in Flink.

ā±ļø

Checkpoint interval too short

Setting checkpoint interval to 1 second for 'better recovery.' Checkpoints overlap, cause constant I/O pressure, and the job spends more time checkpointing than processing.

āœ…Start with 60 seconds. Set minPauseBetweenCheckpoints to at least half the interval. Monitor checkpoint duration — if it approaches the interval, increase the interval or optimize state size.

šŸ”’

Kafka transaction timeout too short

Using exactly-once Kafka sink with default transaction.timeout.ms (60s). If a checkpoint takes longer than 60s, Kafka aborts the transaction and data is lost.

āœ…Set transaction.timeout.ms > checkpoint_interval + max_checkpoint_duration + buffer. For a 60s interval with checkpoints taking up to 2 minutes, set timeout to at least 300000 (5 minutes). Also set Kafka broker's transaction.max.timeout.ms accordingly.

šŸ’¾

Not using incremental checkpoints

Using full checkpoints with RocksDB and 100 GB of state. Every checkpoint writes 100 GB to S3, taking minutes and causing backpressure.

āœ…Enable incremental checkpoints: new EmbeddedRocksDBStateBackend(true). Only changed RocksDB SST files are uploaded — typically 1-5% of total state per checkpoint. This reduces checkpoint I/O by 90%+ for large state.

šŸ—‘ļø

Not retaining checkpoints on cancellation

Job is cancelled (intentionally or by accident) and all checkpoints are deleted. No way to recover state — must restart from scratch.

āœ…Set ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION. This keeps the last checkpoint even when the job is cancelled. Also take regular savepoints before any planned operation as an additional safety net.