Time & Watermarks
Time is the hardest problem in stream processing. Events arrive late, out of order, and from different time zones. Watermarks are Flink's mechanism to make sense of event time in an unbounded, disordered world.
Table of Contents
Why Time is Hard in Streaming
In batch processing, all data is available before computation starts. In streaming, data arrives continuously and you must decide when you have "enough" data to produce a result. The fundamental challenge: events don't arrive in the order they occurred.
The International Mail Analogy
Imagine receiving letters from around the world. A letter mailed from Tokyo on Monday might arrive after one mailed from London on Wednesday. You can't sort your mail by 'date sent' until you're confident no more old letters are in transit. But how long do you wait? Forever means no results. Too short means missing letters. Watermarks are Flink's way of saying 'I'm reasonably confident no letters older than X are still coming.'
Real-world event ordering problems: Mobile app events: Event generated: 10:00:01 (user taps button) Phone offline for 5 minutes Event arrives at server: 10:05:03 → 5 minutes late! Multi-region events: US event: generated 10:00:00, arrives 10:00:05 (5ms network) Asia event: generated 09:59:58, arrives 10:00:15 (17ms network) → Asia event is EARLIER but arrives LATER Kafka partition lag: Partition 0: consumer caught up, latest event time 10:05:00 Partition 3: consumer lagging, latest event time 10:02:00 → 3-minute skew between partitions of the same topic Without event-time processing: Window [10:00, 10:01) closes at wall-clock 10:01 Late events silently dropped → WRONG RESULTS Results depend on processing speed → NON-DETERMINISTIC
The Core Insight
Processing time is easy but wrong — results depend on when events happen to arrive, not when they actually occurred. Event time is correct but hard — you need a mechanism (watermarks) to track progress in event time and decide when windows can close.
Event Time vs Processing Time vs Ingestion Time
Flink supports three notions of time. Each has different trade-offs between correctness, latency, and complexity. Understanding when to use each is critical for building correct streaming applications.
| Time Notion | Definition | Deterministic? | Latency | Use Case |
|---|---|---|---|---|
| Event Time | When the event actually occurred (embedded in data) | ✅ Yes | Higher (waits for watermarks) | Correct analytics, billing, fraud detection |
| Processing Time | When the event is processed by Flink | ❌ No | Lowest | Monitoring, approximate metrics, low-latency alerts |
| Ingestion Time | When the event enters Flink (deprecated in 1.12+) | ❌ No | Medium | Legacy — use event time instead |
// Event Time — correct, deterministic results DataStream<Event> stream = env .fromSource(kafkaSource, WatermarkStrategy .<Event>forBoundedOutOfOrderness(Duration.ofSeconds(5)) .withTimestampAssigner((event, timestamp) -> event.getTimestamp()), "Kafka Source"); // Window uses event time — same input always produces same output stream .keyBy(Event::getUserId) .window(TumblingEventTimeWindows.of(Time.minutes(1))) .aggregate(new CountAggregate()); // Processing Time — simple but non-deterministic stream .keyBy(Event::getUserId) .window(TumblingProcessingTimeWindows.of(Time.minutes(1))) .aggregate(new CountAggregate()); // Same input can produce different results depending on processing speed!
When to Use Each
- ✅Event time — billing, fraud detection, analytics, any case where correctness matters more than latency
- ✅Processing time — real-time monitoring dashboards, approximate counts, alerting where speed beats accuracy
- ✅Never use ingestion time — it's deprecated and provides no benefit over event time with proper watermarks
Default to Event Time
Unless you have a specific reason to use processing time (sub-millisecond latency requirements, no event timestamps available), always use event time. The overhead of watermarks is minimal, and you get deterministic, reproducible results that are correct regardless of processing delays or reprocessing.
Watermarks — The Key to Event Time
A watermark is a special marker in the stream that declares: "No events with a timestamp less than or equal to this value will arrive after this point." Watermarks flow through the dataflow graph alongside regular events and trigger window computations.
The Rising Tide
Think of watermarks as a rising tide. As the tide rises to level T, it means all events below T have 'washed ashore' (been processed). Any event that arrives after the tide has passed its timestamp is 'late' — it missed the boat. The tide never goes backward, only forward. Operators watch the tide level to know when they can safely close windows and emit results.
How watermarks flow through a pipeline: Stream of events (event_time): [10:01] [10:03] [10:02] [W:10:00] [10:04] [10:05] [W:10:03] [10:06] W:10:00 means: "All events with time ≤ 10:00 have arrived" W:10:03 means: "All events with time ≤ 10:03 have arrived" Window [10:00, 10:05) behavior: 1. Receives [10:01], [10:03], [10:02] → buffers them 2. Receives W:10:00 → not enough to close (need W ≥ 10:05) 3. Receives [10:04], [10:05] → buffers them 4. Receives W:10:03 → still not enough 5. Eventually receives W:10:05 → FIRES! Emits result for window 6. Any event with time < 10:05 arriving after this is LATE Key properties: ✅ Watermarks are monotonically non-decreasing ✅ They flow as part of the stream (not out-of-band) ✅ Each operator tracks its own watermark ✅ Multi-input operators use MIN of input watermarks
Source generates watermarks
The source operator (e.g., Kafka consumer) generates watermarks based on the timestamps it observes, using a WatermarkStrategy.
Watermarks flow downstream
Watermarks are broadcast to all downstream operators. They flow through the DAG just like regular events.
Operators advance their event-time clock
When an operator receives a watermark W(t), it knows all events with timestamp ≤ t have been received. It advances its internal clock.
Windows fire
When the watermark passes a window's end time, the window fires — it computes and emits its result, then cleans up state.
Watermarks Are Heuristic
Watermarks are a best-effort declaration. They can be wrong — late events can still arrive after the watermark has passed. This is why Flink provides mechanisms for handling late data (allowed lateness, side outputs). The watermark strategy is a trade-off between completeness and latency.
Watermark Strategies
Flink provides built-in watermark strategies and allows custom implementations. The choice depends on your data characteristics — how out-of-order your events are and how much latency you can tolerate.
// Strategy 1: Monotonously increasing timestamps // Use when events arrive perfectly in order (rare in practice) WatermarkStrategy .<Event>forMonotonousTimestamps() .withTimestampAssigner((event, ts) -> event.getTimestamp()); // Watermark = max_timestamp_seen - 1 // Zero tolerance for out-of-order → any late event is dropped // Strategy 2: Bounded out-of-orderness (MOST COMMON) // Use when events can be up to N seconds late WatermarkStrategy .<Event>forBoundedOutOfOrderness(Duration.ofSeconds(5)) .withTimestampAssigner((event, ts) -> event.getTimestamp()); // Watermark = max_timestamp_seen - 5 seconds // Tolerates up to 5 seconds of out-of-orderness // Strategy 3: Custom watermark generator WatermarkStrategy .<Event>forGenerator(ctx -> new WatermarkGenerator<Event>() { private long maxTimestamp = Long.MIN_VALUE; @Override public void onEvent(Event event, long eventTimestamp, WatermarkOutput output) { maxTimestamp = Math.max(maxTimestamp, eventTimestamp); // Could emit per-event watermarks for low latency } @Override public void onPeriodicEmit(WatermarkOutput output) { // Emit watermark every 200ms (default interval) output.emitWatermark( new Watermark(maxTimestamp - 5000)); } });
| Strategy | Latency | Completeness | When to Use |
|---|---|---|---|
| Monotonous | Lowest | Drops any out-of-order event | Perfectly ordered sources (rare) |
| Bounded (5s) | 5s delay | Handles ≤5s out-of-order | Most streaming applications |
| Bounded (30s) | 30s delay | Handles ≤30s out-of-order | Mobile apps, unreliable networks |
| Custom | Varies | Application-specific | Complex sources, multi-source alignment |
Choosing the Bound
Set the out-of-orderness bound to the 99th percentile of your event delay. If 99% of events arrive within 5 seconds of their event time, use 5 seconds. The remaining 1% will be handled by allowed lateness or side outputs. Don't set it too high — it directly adds to your end-to-end latency.
Watermarks in Parallel Pipelines
In a parallel pipeline, each source instance generates its own watermarks independently. Downstream operators that receive from multiple inputs must reconcile these watermarks — they use the minimum across all inputs.
Parallel watermark propagation: Source (parallelism=3, reading from Kafka): Source[0] (partition 0): watermark = 10:05:00 Source[1] (partition 1): watermark = 10:03:00 ← lagging! Source[2] (partition 2): watermark = 10:04:30 Downstream operator (receives from all 3): Effective watermark = MIN(10:05:00, 10:03:00, 10:04:30) = 10:03:00 Problem: One slow partition holds back the ENTIRE pipeline! The idle source problem: Source[0]: watermark = 10:05:00 (active, receiving events) Source[1]: watermark = 10:01:00 (idle — no events for 5 min) Source[2]: watermark = 10:04:30 (active) Effective watermark = MIN(10:05:00, 10:01:00, 10:04:30) = 10:01:00 ← STUCK because of idle source! Solution: Mark idle sources WatermarkStrategy .forBoundedOutOfOrderness(Duration.ofSeconds(5)) .withIdleness(Duration.ofMinutes(2)); // After 2 min of no events, source is marked idle // Its watermark is excluded from the MIN calculation
Parallel Watermark Rules
- ✅Each parallel source instance generates watermarks independently based on its own events
- ✅Downstream operators take the MINIMUM watermark across all input channels
- ✅One slow or idle source holds back the entire pipeline's event-time progress
- ✅Use withIdleness() to exclude sources that haven't produced events recently
- ✅Kafka per-partition watermarks ensure one slow partition doesn't block others within the same source instance
Per-Partition Watermarks
Since Flink 1.12, the Kafka source tracks watermarks per-partition within each source instance. This means if one partition is idle or slow, it only affects the watermark for that partition, not the entire source instance. This is a major improvement for topics with uneven partition activity.
Late Data Handling
Despite watermarks, some events will arrive after their window has already fired. Flink provides three strategies for handling late data: drop it, allow lateness (re-fire windows), or redirect to a side output for separate processing.
// Strategy 1: Drop late data (default) // Events arriving after watermark passes window end are silently dropped stream .keyBy(Event::getUserId) .window(TumblingEventTimeWindows.of(Time.minutes(1))) .aggregate(new CountAggregate()); // Late events → gone forever // Strategy 2: Allowed lateness (window stays open longer) stream .keyBy(Event::getUserId) .window(TumblingEventTimeWindows.of(Time.minutes(1))) .allowedLateness(Time.minutes(5)) .aggregate(new CountAggregate()); // Window fires at watermark, but stays open for 5 more minutes // Late events trigger re-firing with updated results // State kept for 5 extra minutes (memory cost!) // Strategy 3: Side output (redirect late data) OutputTag<Event> lateTag = new OutputTag<Event>("late-events") {}; SingleOutputStreamOperator<Result> result = stream .keyBy(Event::getUserId) .window(TumblingEventTimeWindows.of(Time.minutes(1))) .allowedLateness(Time.minutes(5)) .sideOutputLateData(lateTag) .aggregate(new CountAggregate()); // Process late data separately DataStream<Event> lateStream = result.getSideOutput(lateTag); lateStream.addSink(new LateDataAlertSink());
| Strategy | Data Loss | State Cost | Result Updates | Use Case |
|---|---|---|---|---|
| Drop (default) | Late events lost | Minimal | No updates | Approximate metrics, monitoring |
| Allowed Lateness | None within window | Higher (state kept longer) | Window re-fires | Billing, analytics needing corrections |
| Side Output | None | Same as allowed lateness | Separate stream | Audit trail, manual correction, alerting |
Combining Strategies
The best practice is to combine allowed lateness with side outputs. Set allowed lateness to handle the common case (events up to 5 minutes late get incorporated). Events beyond that go to the side output for manual review or separate processing. This gives you correctness for most data and visibility into the rest.
Kafka + Event Time
Kafka is the most common source for Flink streaming jobs. Getting event-time semantics right with Kafka requires understanding per-partition watermarks, idle partitions, and the relationship between Kafka timestamps and event time.
// Kafka source with event-time watermarks KafkaSource<Event> source = KafkaSource.<Event>builder() .setBootstrapServers("kafka:9092") .setTopics("user-events") .setGroupId("flink-analytics") .setStartingOffsets(OffsetsInitializer.committedOffsets( OffsetResetStrategy.EARLIEST)) .setDeserializer(new EventDeserializer()) .build(); // Watermark strategy with per-partition tracking WatermarkStrategy<Event> watermarkStrategy = WatermarkStrategy .<Event>forBoundedOutOfOrderness(Duration.ofSeconds(10)) .withTimestampAssigner((event, kafkaTimestamp) -> event.getEventTime()) // Use YOUR event time, not Kafka's .withIdleness(Duration.ofMinutes(5)); // Handle idle partitions DataStream<Event> stream = env.fromSource( source, watermarkStrategy, "Kafka Source"); // Per-partition watermarks (Flink 1.12+): // - Each Kafka partition tracked independently // - Watermark = MIN across all active partitions in this source instance // - Idle partitions excluded after 5 minutes // - Prevents one slow partition from blocking the entire pipeline
Kafka + Event Time Best Practices
- ✅Always use your own event timestamp, not Kafka's CreateTime — Kafka timestamps can be producer clock time which may differ
- ✅Set withIdleness() to handle partitions that stop receiving events (common with key-based partitioning)
- ✅Match source parallelism to partition count for optimal watermark tracking
- ✅Monitor watermark lag metric — if it grows, you have a lateness problem
- ✅Consider partition-aware watermarks for topics with uneven partition activity
Kafka Timestamp vs Event Time
Kafka records have a built-in timestamp (CreateTime or LogAppendTime). This is NOT the same as your event time. CreateTime is when the producer sent the record — which could be different from when the event actually occurred (e.g., mobile app buffering). Always extract event time from your event payload for correct results.
Interview Questions
Q:What is a watermark in Flink and why is it needed?
A: A watermark is a special timestamp marker that flows through the stream declaring 'no events with timestamp ≤ W will arrive after this point.' It's needed because in event-time processing, events arrive out of order. Without watermarks, window operators would never know when to fire — they'd wait forever for potentially late events. Watermarks provide the mechanism to advance event-time progress and trigger window computations. They're a trade-off: too aggressive (small delay) means more late data; too conservative (large delay) means higher latency.
Q:Explain the difference between event time and processing time. When would you use each?
A: Event time is when the event actually occurred (embedded in the data). Processing time is when Flink processes the event (wall clock). Event time gives deterministic, reproducible results regardless of processing speed or reprocessing — use it for billing, analytics, fraud detection. Processing time gives lowest latency but non-deterministic results — use it for monitoring dashboards or approximate metrics where speed matters more than exactness. Key insight: replaying the same data with event time gives identical results; with processing time, results change every run.
Q:How do watermarks work in parallel pipelines? What's the idle source problem?
A: Each parallel source generates watermarks independently. Downstream operators take the MINIMUM watermark across all input channels. Problem: one slow or idle source holds back the entire pipeline. If partition 3 stops receiving events, its watermark stays at the last seen timestamp while others advance — the MIN stays stuck. Solution: withIdleness(Duration) marks sources as idle after a timeout, excluding them from the MIN calculation. Flink 1.12+ also tracks per-partition watermarks within each source instance.
Q:What strategies does Flink provide for handling late data?
A: Three strategies: (1) Drop (default) — events arriving after the watermark passes the window end are silently discarded. Simple but loses data. (2) Allowed lateness — window stays open for an additional period after firing. Late events trigger re-firing with updated results. Costs extra state. (3) Side output — late events are redirected to a separate stream for independent processing (alerting, manual correction, audit). Best practice: combine allowed lateness (handles common late events) with side output (captures extremely late events for review).
Q:How should you configure watermarks when reading from Kafka?
A: (1) Use your own event timestamp from the payload, not Kafka's CreateTime. (2) Set bounded out-of-orderness to your 99th percentile delay (e.g., 5-10 seconds). (3) Set withIdleness() to handle partitions that stop producing (common with key-based partitioning). (4) Match source parallelism to partition count. (5) Flink 1.12+ tracks per-partition watermarks automatically — one slow partition within a source instance won't block others. Monitor the watermark lag metric; if it grows continuously, your bound is too tight or you have a data pipeline issue.
Common Mistakes
Using processing time when event time is needed
Using processing time windows for billing or analytics. Results change depending on processing speed, cluster load, and reprocessing. A backlog replay produces completely different numbers.
✅Use event time for any computation where correctness matters. The latency overhead of watermarks (typically seconds) is negligible compared to the cost of wrong results.
Not handling idle sources
A Kafka topic with 100 partitions where only 20 are active. The 80 idle partitions hold back the watermark indefinitely, causing windows to never fire.
✅Always set withIdleness(Duration.ofMinutes(N)) on your watermark strategy. Choose N based on your longest expected gap between events on a single partition.
Setting out-of-orderness too high
Setting bounded out-of-orderness to 5 minutes 'just to be safe.' This adds 5 minutes of latency to ALL window results, even though 99% of events arrive within 5 seconds.
✅Set the bound to your actual 99th percentile delay (measure it!). Use allowed lateness + side outputs for the remaining 1% of extremely late events. This gives low latency for most results while still handling stragglers.
Assigning watermarks after a shuffle
Assigning watermarks after a keyBy or rebalance operation. By this point, the per-source ordering information is lost and watermarks become less accurate.
✅Always assign watermarks as close to the source as possible — ideally directly on the source using the WatermarkStrategy parameter in fromSource(). This ensures per-partition tracking works correctly.
Ignoring watermark lag metrics
Not monitoring the watermark lag (difference between current processing time and current watermark). A growing lag means windows are firing later and later, and results are increasingly delayed.
✅Monitor currentOutputWatermark metric per source. Alert if lag exceeds your SLA. Common causes: one slow Kafka partition, consumer group rebalancing, or network issues to a specific broker.