Fault Tolerance & Checkpointing
Flink achieves fault tolerance through periodic distributed snapshots (checkpoints) using the Chandy-Lamport algorithm. Combined with two-phase commit for sinks, it provides end-to-end exactly-once semantics even across failures.
Table of Contents
The Challenge
Stream processing runs continuously ā 24/7, for months or years. During that time, machines crash, networks partition, and disks fail. The challenge: how do you recover from failures without losing data or producing duplicate results, while processing millions of events per second?
The Group Photo Analogy
Imagine taking a group photo of 100 people who are constantly moving and talking. You need a consistent snapshot ā everyone frozen at the same logical instant. You can't actually stop everyone (that would halt processing). Instead, you send a 'say cheese' signal that propagates through the group. When each person hears it, they freeze their pose (save state). The photo is consistent even though people froze at slightly different wall-clock times ā the signal ensures logical consistency.
What can go wrong in a streaming job: Hardware failures: - TaskManager JVM crashes (OOM, segfault) - Machine dies (power, hardware failure) - Disk corruption or full Software failures: - User code throws exception (NPE, parsing error) - Serialization failure (schema mismatch) - Timeout (checkpoint takes too long) Infrastructure failures: - Network partition between TaskManagers - Kafka broker unavailable - S3/HDFS checkpoint storage unreachable Without fault tolerance: - All in-flight state is LOST - Events processed since last output are LOST - Kafka offsets not committed ā reprocessing ā DUPLICATES - Hours of computation wasted With Flink checkpointing: ā State restored to last consistent snapshot ā Source rewound to checkpoint position ā Processing resumes from consistent point ā No data loss, no duplicates (exactly-once)
Continuous vs Batch Recovery
Unlike batch jobs that can simply restart from scratch, streaming jobs have accumulated state (counters, windows, sessions) that took hours or days to build. Restarting from zero means losing all that state. Checkpoints preserve state so recovery takes seconds, not hours.
Checkpointing
Checkpointing is Flink's mechanism for creating consistent distributed snapshots of the entire job state. At configurable intervals, Flink snapshots all operator state and source positions to durable storage. On failure, the job restores from the latest completed checkpoint.
// Enable checkpointing StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // Checkpoint every 60 seconds env.enableCheckpointing(60_000); // Checkpoint configuration CheckpointConfig config = env.getCheckpointConfig(); // Exactly-once vs at-least-once config.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); // Minimum time between checkpoints (prevents overlap) config.setMinPauseBetweenCheckpoints(30_000); // Checkpoint timeout (fail if not complete in 10 min) config.setCheckpointTimeout(600_000); // Max concurrent checkpoints config.setMaxConcurrentCheckpoints(1); // Tolerate N checkpoint failures before job fails config.setTolerableCheckpointFailureNumber(3); // Keep checkpoints on cancellation (for manual recovery) config.setExternalizedCheckpointCleanup( ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); // Checkpoint storage config.setCheckpointStorage("s3://my-bucket/checkpoints/");
Checkpoint Lifecycle
- ā JobManager triggers checkpoint at configured interval
- ā Barriers injected into source streams (one per checkpoint)
- ā Barriers flow through the DAG with regular events
- ā Each operator snapshots state when barrier arrives
- ā State written asynchronously to checkpoint storage
- ā JobManager confirms checkpoint when all operators report success
- ā Old checkpoints cleaned up (keep last N)
Checkpoint Interval Trade-off
Shorter intervals (e.g., 10s) mean less data to reprocess on failure but more I/O overhead and potential backpressure from frequent snapshots. Longer intervals (e.g., 5min) mean less overhead but more reprocessing on failure. Start with 60 seconds and adjust based on your recovery time requirements and checkpoint duration metrics.
Chandy-Lamport Algorithm
Flink uses a variant of the Chandy-Lamport distributed snapshot algorithm. The key mechanism is checkpoint barriers ā special markers injected into the stream that separate events belonging to different checkpoints. Barriers ensure a consistent cut across all operators without stopping processing.
Aligned Checkpoint Barriers: Source 1: [e1] [e2] [B_n] [e3] [e4] ... Source 2: [e5] [e6] [e7] [B_n] [e8] ... Operator with 2 inputs: 1. Receives B_n from Source 1 first 2. BLOCKS Source 1 input (buffers events after barrier) 3. Continues processing Source 2 until B_n arrives 4. Receives B_n from Source 2 5. NOW: takes state snapshot (consistent point) 6. Emits B_n downstream 7. Unblocks Source 1, processes buffered events Timeline: Source 1: āā[e1]āā[e2]āā[B]āā[e3]āā[e4]āāāāāāāāāāā ā Operator: processes e1,e2 ā BLOCKS ā snapshot ā resume ā ā ā Source 2: āā[e5]āā[e6]āā[e7]āā[B]āāāāāāāāāāāāāāāāā ā still processing Problem: blocking causes backpressure during alignment!
| Property | Aligned Barriers | Unaligned Barriers (1.11+) |
|---|---|---|
| Mechanism | Block fast input until slow input's barrier arrives | Snapshot in-flight data between barriers |
| Backpressure during checkpoint | Yes ā blocked input causes upstream backpressure | No ā no blocking, immediate snapshot |
| Checkpoint size | Smaller (only operator state) | Larger (state + in-flight buffers) |
| Checkpoint speed | Slower (waits for alignment) | Faster (immediate) |
| Best for | Low-latency jobs with balanced inputs | Jobs with backpressure or skewed inputs |
// Enable unaligned checkpoints (Flink 1.11+) env.getCheckpointConfig().enableUnalignedCheckpoints(); // Unaligned checkpoints: // - Don't block inputs during barrier alignment // - Instead, snapshot in-flight data (buffers) as part of checkpoint // - Faster checkpoint completion under backpressure // - Larger checkpoint size (includes network buffers) // - Recommended when checkpoint duration is high due to alignment
When to Use Unaligned Checkpoints
Use unaligned checkpoints when your checkpoint duration is high due to barrier alignment (visible in the Web UI as "alignment duration"). This typically happens with backpressure or when input rates are very skewed. The trade-off: faster checkpoints but larger checkpoint size (includes in-flight network buffers).
Checkpoint Storage
Checkpoint data must be stored durably ā surviving TaskManager crashes and machine failures. Flink supports multiple storage backends, with S3 and HDFS being the most common in production. Incremental checkpoints dramatically reduce I/O for large state.
Checkpoint Storage Options: 1. JobManager Heap (development only): state.checkpoint-storage: jobmanager - Stored in JobManager memory - Lost if JobManager crashes - Only for testing! 2. Filesystem (production): state.checkpoint-storage: filesystem state.checkpoints.dir: s3://bucket/checkpoints/ - Durable, survives any single failure - Supports S3, HDFS, GCS, Azure Blob Full vs Incremental Checkpoints: āāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāā Full checkpoint (HashMap backend): Checkpoint 1: [āāāāāāāāāāāā] 10 GB (full state) Checkpoint 2: [āāāāāāāāāāāā] 10 GB (full state again) Checkpoint 3: [āāāāāāāāāāāā] 10 GB (full state again) Total I/O per checkpoint: 10 GB Incremental checkpoint (RocksDB backend): Checkpoint 1: [āāāāāāāāāāāā] 10 GB (initial full) Checkpoint 2: [āā] 2 GB (only changed SST files) Checkpoint 3: [ā] 1 GB (only changed SST files) Total I/O per checkpoint: 1-2 GB (90% reduction!) āāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāā
Checkpoint Storage Best Practices
- ā Always use S3/HDFS/GCS in production ā never JobManager heap
- ā Enable incremental checkpoints with RocksDB (90%+ I/O reduction for large state)
- ā Set checkpoint retention: keep last 3-5 checkpoints for recovery options
- ā Use externalized checkpoints (RETAIN_ON_CANCELLATION) for manual recovery
- ā Monitor checkpoint size and duration ā growing size indicates state leak
Savepoints
Savepoints are manually triggered, portable snapshots of the entire job state. Unlike checkpoints (automatic, for failure recovery), savepoints are for planned operations: upgrades, rescaling, A/B testing, and migration.
| Property | Checkpoint | Savepoint |
|---|---|---|
| Trigger | Automatic (periodic) | Manual (CLI/API) |
| Purpose | Failure recovery | Planned operations |
| Format | Backend-specific (optimized) | Portable, canonical format |
| Lifecycle | Auto-cleaned by Flink | User-managed (never auto-deleted) |
| Incremental | Yes (RocksDB) | No (always full) |
| Use case | Crash recovery | Upgrades, rescaling, migration |
# Trigger savepoint (job keeps running) flink savepoint <job-id> s3://savepoints/ # Stop job with savepoint (graceful shutdown) flink stop --savepointPath s3://savepoints/ <job-id> # Cancel job with savepoint (immediate) flink cancel --withSavepoint s3://savepoints/ <job-id> # Start job from savepoint flink run -s s3://savepoints/savepoint-abc123 job.jar # Start with different parallelism (rescaling) flink run -s s3://savepoints/savepoint-abc123 -p 32 job.jar # Common workflow: upgrade job version # 1. Stop old version with savepoint flink stop --savepointPath s3://savepoints/ <old-job-id> # 2. Deploy new version from savepoint flink run -s s3://savepoints/savepoint-xyz789 new-job-v2.jar
Operator UIDs Are Critical
Flink maps savepoint state to operators using operator UIDs. If you don't set explicit UIDs, Flink generates them from the job graph structure ā any topology change breaks the mapping. Always set .uid("my-operator") on every stateful operator to ensure savepoint compatibility across code changes.
Exactly-Once Semantics
Exactly-once means each event affects the final result exactly once ā no data loss (at-least-once) and no duplicates. Flink achieves this internally via checkpoints, but end-to-end exactly-once requires cooperation from sources and sinks.
End-to-End Exactly-Once Requirements: Layer 1: Source (replayable) ā Kafka ā can replay from committed offset ā Kinesis ā can replay from sequence number ā Socket source ā cannot replay (at-most-once) Layer 2: Internal Processing (Flink checkpoints) ā Guaranteed by checkpoint + barrier alignment ā On failure: restore state + rewind source to checkpoint ā Events between checkpoint and failure are reprocessed ā State is restored ā reprocessing produces same result Layer 3: Sink (idempotent or transactional) Option A: Idempotent sink ā Writes are idempotent (same write twice = same result) ā Example: upsert to database with primary key ā Simple but requires idempotent operations Option B: Transactional sink (two-phase commit) ā Writes committed only when checkpoint completes ā Example: Kafka sink with transactions ā True exactly-once but higher latency End-to-end exactly-once = replayable source + Flink checkpoints + transactional/idempotent sink
Checkpoint triggers
JobManager initiates checkpoint N. Barriers injected at sources.
Source records position
Kafka source records current offset per partition as part of checkpoint N.
Operators snapshot state
All operators snapshot their state when barrier N passes through.
Sink pre-commits
Transactional sink pre-commits its pending writes (Kafka: flush to transaction).
Checkpoint completes
JobManager confirms all operators have snapshotted successfully.
Sink commits
Sink commits the transaction (Kafka: commitTransaction()). Data now visible to consumers.
Exactly-Once ā Exactly-Once Delivery
Flink's exactly-once is about the EFFECT on state and output, not about message delivery. Events may be delivered multiple times (after failure recovery), but the state and output reflect each event exactly once. This is sometimes called "effectively once" ā the observable result is as if each event was processed exactly once.
Two-Phase Commit for Sinks
The two-phase commit (2PC) protocol enables exactly-once writes to external systems. The sink pre-commits data during checkpoint and only commits when the checkpoint is confirmed successful. If the checkpoint fails, the pre-committed data is rolled back.
Two-Phase Commit Protocol (Kafka Sink Example): Phase 1: Pre-commit (during checkpoint) āāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāā ā Checkpoint N triggered ā ā Kafka Sink: ā ā 1. Flush all buffered records to Kafka ā ā 2. Records written but NOT committed ā ā 3. Start new transaction for next records ā ā 4. Report "pre-commit success" to JM ā āāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāā Phase 2: Commit (after checkpoint confirmed) āāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāā ā JobManager confirms checkpoint N complete ā ā Kafka Sink: ā ā 1. commitTransaction() on pre-committed tx ā ā 2. Records now visible to consumers ā ā 3. (read_committed consumers see them now) ā āāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāā On failure (between pre-commit and commit): āāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāā ā TaskManager crashes after pre-commit ā ā Recovery: ā ā 1. Restore from checkpoint N-1 ā ā 2. Abort uncommitted transaction ā ā 3. Reprocess events from checkpoint N-1 ā ā 4. No duplicates! (aborted tx was invisibleā ā to read_committed consumers) ā āāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāā
// Kafka sink with exactly-once (two-phase commit) KafkaSink<String> sink = KafkaSink.<String>builder() .setBootstrapServers("kafka:9092") .setRecordSerializer( KafkaRecordSerializationSchema.builder() .setTopic("output-topic") .setValueSerializationSchema( new SimpleStringSchema()) .build()) .setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE) .setTransactionalIdPrefix("flink-job-1") .setProperty("transaction.timeout.ms", "900000") // 15 min .build(); // IMPORTANT: Kafka transaction.timeout.ms must be > // checkpoint interval + checkpoint timeout // Otherwise Kafka aborts the transaction before Flink commits! // Consumer side: must use read_committed isolation // props.put("isolation.level", "read_committed");
Transaction Timeout
The Kafka transaction timeout must exceed your checkpoint interval plus maximum checkpoint duration. If a checkpoint takes 2 minutes and your interval is 1 minute, set transaction.timeout.ms to at least 5 minutes. If Kafka times out the transaction before Flink commits, you lose data.
Interview Questions
Q:How does Flink's checkpointing work at a high level?
A: Flink periodically creates consistent distributed snapshots using the Chandy-Lamport algorithm. (1) JobManager injects checkpoint barriers into source streams. (2) Barriers flow through the DAG with regular events. (3) When an operator receives barriers from all inputs, it snapshots its state to durable storage (S3/HDFS). (4) Source operators record their position (Kafka offsets). (5) When all operators report success, the checkpoint is complete. On failure: restore all operator state from the last checkpoint and rewind sources to the recorded positions.
Q:What's the difference between aligned and unaligned checkpoints?
A: Aligned: when an operator receives a barrier from one input but not others, it BLOCKS the fast input until all barriers arrive. This ensures consistency but causes backpressure during alignment. Unaligned (Flink 1.11+): instead of blocking, the operator immediately snapshots and includes in-flight buffered data as part of the checkpoint. No blocking = no backpressure during checkpoints, but larger checkpoint size. Use unaligned when checkpoint duration is high due to alignment (visible as 'alignment duration' in Web UI).
Q:How does Flink achieve end-to-end exactly-once semantics?
A: Three layers must cooperate: (1) Replayable source (Kafka) ā can rewind to checkpoint position on failure. (2) Internal processing ā checkpoints ensure state reflects each event exactly once; on failure, state restored + source rewound. (3) Transactional sink (2PC) ā pre-commits during checkpoint, commits only after checkpoint confirmed. If failure occurs between pre-commit and commit, transaction is aborted and reprocessed. Alternative: idempotent sink (upsert with primary key) ā duplicates have no effect.
Q:What are savepoints and how do they differ from checkpoints?
A: Savepoints are manually triggered, portable snapshots for planned operations (upgrades, rescaling, migration). Differences: (1) Trigger: savepoints are manual; checkpoints are automatic. (2) Format: savepoints use a portable canonical format; checkpoints use backend-optimized format. (3) Lifecycle: savepoints are never auto-deleted; checkpoints are cleaned up by Flink. (4) Incremental: savepoints are always full; checkpoints can be incremental. Use savepoints for: version upgrades, changing parallelism, A/B testing, migrating between clusters.
Q:Why are operator UIDs important for savepoint compatibility?
A: Flink maps savepoint state to operators using UIDs. Without explicit UIDs, Flink auto-generates them from the job graph topology (operator position). Any change to the graph (adding/removing/reordering operators) changes auto-generated UIDs, breaking the mapping ā Flink can't find the state for operators. With explicit UIDs (.uid('my-op')), the mapping is stable across topology changes. Rule: always set .uid() on every stateful operator from day one. Retrofitting UIDs later requires a state migration.
Common Mistakes
Not setting operator UIDs
Deploying without explicit .uid() on stateful operators. The first upgrade that changes the job graph breaks savepoint compatibility ā state can't be mapped to operators.
ā Set .uid('descriptive-name') on EVERY stateful operator from day one. Use stable, descriptive names. This is the #1 cause of failed production upgrades in Flink.
Checkpoint interval too short
Setting checkpoint interval to 1 second for 'better recovery.' Checkpoints overlap, cause constant I/O pressure, and the job spends more time checkpointing than processing.
ā Start with 60 seconds. Set minPauseBetweenCheckpoints to at least half the interval. Monitor checkpoint duration ā if it approaches the interval, increase the interval or optimize state size.
Kafka transaction timeout too short
Using exactly-once Kafka sink with default transaction.timeout.ms (60s). If a checkpoint takes longer than 60s, Kafka aborts the transaction and data is lost.
ā Set transaction.timeout.ms > checkpoint_interval + max_checkpoint_duration + buffer. For a 60s interval with checkpoints taking up to 2 minutes, set timeout to at least 300000 (5 minutes). Also set Kafka broker's transaction.max.timeout.ms accordingly.
Not using incremental checkpoints
Using full checkpoints with RocksDB and 100 GB of state. Every checkpoint writes 100 GB to S3, taking minutes and causing backpressure.
ā Enable incremental checkpoints: new EmbeddedRocksDBStateBackend(true). Only changed RocksDB SST files are uploaded ā typically 1-5% of total state per checkpoint. This reduces checkpoint I/O by 90%+ for large state.
Not retaining checkpoints on cancellation
Job is cancelled (intentionally or by accident) and all checkpoints are deleted. No way to recover state ā must restart from scratch.
ā Set ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION. This keeps the last checkpoint even when the job is cancelled. Also take regular savepoints before any planned operation as an additional safety net.