Windows
Windows are the mechanism for bounding infinite streams into finite chunks for computation. Flink provides tumbling, sliding, session, and global windows — each suited to different aggregation patterns.
Table of Contents
Why Windows
Streams are unbounded — they have no end. But most computations need a finite set of data to produce a result. "What's the average order value?" is meaningless on an infinite stream. "What's the average order value in the last 5 minutes?" is answerable. Windows provide that boundary.
The Train Window
Imagine looking out a train window at a continuous landscape. You can't describe 'the entire view' because it never ends. But you can describe what you see in a specific frame — the current window. Tumbling windows are like taking a photo every 5 seconds (non-overlapping snapshots). Sliding windows are like a video camera with a 10-second buffer that updates every 2 seconds (overlapping). Session windows are like looking at distinct scenes separated by tunnels (gaps).
Without windows (per-event processing): Event 1: count=1, sum=50, avg=50.00 Event 2: count=2, sum=120, avg=60.00 Event 3: count=3, sum=180, avg=60.00 ... (running aggregate, never "completes") With tumbling window (1-minute): Window [10:00, 10:01): 45 events, avg=$62.30 → EMIT Window [10:01, 10:02): 52 events, avg=$58.10 → EMIT Window [10:02, 10:03): 38 events, avg=$71.20 → EMIT ... (discrete results, each window is complete) Windows answer questions like: ✅ "How many orders per minute?" → Tumbling(1 min) ✅ "Moving average over last 5 min, updated every 30s?" → Sliding(5 min, 30s) ✅ "Total spend per user session?" → Session(30 min gap) ✅ "Alert after 3 failed logins?" → Global + Count trigger
Windows Are Per-Key
In Flink, windows are always applied after a keyBy operation. Each key gets its own independent set of windows. User A's 1-minute window is completely independent of User B's. This means millions of windows can be active simultaneously — one per key per active time interval.
Tumbling Windows
Tumbling windows are fixed-size, non-overlapping time intervals. Every event belongs to exactly one window. They're the simplest and most common window type — ideal for periodic aggregations like "events per minute" or "revenue per hour."
Tumbling Window (size = 1 minute): Timeline: |----Window 1----|----Window 2----|----Window 3----| 10:00 10:01 10:02 10:03 Events: [10:00:05] → Window 1 [10:00:45] → Window 1 [10:01:02] → Window 2 [10:01:30] → Window 2 [10:01:58] → Window 2 [10:02:10] → Window 3 Properties: ✅ Non-overlapping — no event counted twice ✅ Fixed size — every window is exactly 1 minute ✅ Aligned — windows start at minute boundaries (configurable) ✅ Complete — every event belongs to exactly one window
// Count events per user per minute DataStream<Result> result = events .keyBy(Event::getUserId) .window(TumblingEventTimeWindows.of(Time.minutes(1))) .aggregate(new CountAggregate()); // Revenue per product per hour, with offset // (windows aligned to 30-minute mark instead of hour boundary) DataStream<Revenue> revenue = orders .keyBy(Order::getProductId) .window(TumblingEventTimeWindows.of( Time.hours(1), Time.minutes(30))) // offset: windows start at :30 .reduce((a, b) -> new Revenue( a.getProductId(), a.getAmount() + b.getAmount()));
Window Alignment
By default, tumbling windows align to epoch (Unix time 0). A 1-hour window starts at :00, not when the first event arrives. Use the offset parameter to shift alignment — useful when your business day starts at a non-standard time or you're in a non-UTC timezone.
Sliding Windows
Sliding windows have a fixed size but overlap — they "slide" forward by a configurable step. An event can belong to multiple windows. They're ideal for moving averages and rolling aggregations where you need smoother, more frequent updates.
Sliding Window (size = 5 min, slide = 1 min): Timeline: |─────── Window A (10:00-10:05) ───────| |─────── Window B (10:01-10:06) ───────| |─────── Window C (10:02-10:07) ───────| Event at 10:03:30 belongs to: ✅ Window A (10:00-10:05) ✅ Window B (10:01-10:06) ✅ Window C (10:02-10:07) ✅ Window D (10:03-10:08) (not shown) ✅ Window E (10:03-10:08) (not shown) → Each event belongs to (size / slide) = 5 windows! Properties: ⚠️ Overlapping — events counted in multiple windows ✅ Smooth updates — new result every slide interval ⚠️ Higher state cost — size/slide × more state than tumbling ✅ Great for moving averages and trend detection
// 5-minute moving average, updated every 30 seconds DataStream<Average> movingAvg = metrics .keyBy(Metric::getSensorId) .window(SlidingEventTimeWindows.of( Time.minutes(5), // window size Time.seconds(30))) // slide interval .aggregate(new AverageAggregate()); // State cost: each event stored in 5min/30s = 10 windows // For 1M events/min → 10M window assignments // Consider: is the smoothness worth 10x state?
State Cost Warning
Sliding windows with a small slide relative to size are expensive. A 1-hour window sliding every 1 second means each event belongs to 3,600 windows. This multiplies state by 3,600x. If you need very frequent updates, consider using a tumbling window with a ProcessFunction that maintains a rolling buffer instead.
Session Windows
Session windows group events by activity periods separated by gaps of inactivity. They don't have a fixed size — they grow as long as events keep arriving within the gap threshold. They're perfect for user session analysis, where a "session" ends when the user stops interacting.
Session Window (gap = 30 minutes): User A's events: 10:00 10:05 10:12 10:25 │ 11:30 11:35 11:40 |──────── Session 1 ────────│ |──── Session 2 ────| (25 min duration) 65 min gap (10 min duration) User B's events: 10:00 10:10 10:20 10:30 10:40 10:50 11:00 11:10 |──────────────── Session 1 (70 min) ─────────────────| (no gap > 30 min, so one continuous session) Properties: ✅ Dynamic size — grows with activity ✅ Per-key — each user has independent sessions ⚠️ Merging — new events can merge previously separate sessions ⚠️ Late firing — session only fires after gap timeout passes ✅ Perfect for user behavior analysis
// User session analysis with 30-minute gap DataStream<SessionSummary> sessions = clickEvents .keyBy(ClickEvent::getUserId) .window(EventTimeSessionWindows.withGap(Time.minutes(30))) .aggregate(new SessionAggregator()); // Dynamic gap based on event properties DataStream<SessionSummary> dynamicSessions = clickEvents .keyBy(ClickEvent::getUserId) .window(EventTimeSessionWindows.withDynamicGap( (event) -> { // Premium users get longer session timeout if (event.isPremium()) { return Time.minutes(60).toMilliseconds(); } return Time.minutes(30).toMilliseconds(); })) .aggregate(new SessionAggregator());
Session Window Merging
Session windows are "merging windows." When a new event arrives that bridges two previously separate sessions, Flink merges them into one. This means session window state can grow unpredictably. Monitor state size and consider setting a maximum session duration using a custom trigger to prevent unbounded growth.
Global & Count Windows
Global windows assign all events for a key to a single window that never closes on its own. You must provide a custom trigger to define when results are emitted. Count windows are a common pattern built on global windows — they fire after N events.
// Count window: fire every 100 events per key DataStream<Result> countWindow = events .keyBy(Event::getUserId) .countWindow(100) .aggregate(new BatchAggregate()); // Sliding count window: last 100 events, updated every 10 DataStream<Result> slidingCount = events .keyBy(Event::getUserId) .countWindow(100, 10) .aggregate(new BatchAggregate()); // Global window with custom trigger DataStream<Alert> alerts = loginAttempts .keyBy(LoginAttempt::getUserId) .window(GlobalWindows.create()) .trigger(new ThreeFailedLoginsTrigger()) .process(new AlertFunction()); // Custom trigger: fire when 3 failed logins within 5 minutes public class ThreeFailedLoginsTrigger extends Trigger<LoginAttempt, GlobalWindow> { @Override public TriggerResult onElement(LoginAttempt event, long timestamp, GlobalWindow window, TriggerContext ctx) { // Count failed attempts in state ValueState<Integer> count = ctx.getPartitionedState(...); int current = count.value() == null ? 0 : count.value(); if (event.isFailed()) { current++; count.update(current); } if (current >= 3) { count.clear(); return TriggerResult.FIRE_AND_PURGE; } return TriggerResult.CONTINUE; } }
| Window Type | Size | Trigger | Use Case |
|---|---|---|---|
| Tumbling | Fixed time | Watermark passes end | Periodic reports |
| Sliding | Fixed time, overlapping | Watermark passes end | Moving averages |
| Session | Dynamic (gap-based) | Gap timeout | User sessions |
| Global | Infinite (all events) | Custom trigger required | Complex patterns, count-based |
| Count | N events | Every N events | Batch processing, micro-batching |
Global Windows Need Triggers
A global window without a trigger will never fire — it accumulates events forever until you run out of memory. Always pair global windows with a custom trigger. Common triggers: count-based, time-based (processing time timer), or condition-based (specific event pattern detected).
Window Functions
Window functions define what computation happens when a window fires. Flink provides three types with different trade-offs between efficiency and flexibility: ReduceFunction, AggregateFunction, and ProcessWindowFunction.
| Function | Incremental? | Access to All Elements? | Performance | Use Case |
|---|---|---|---|---|
| ReduceFunction | ✅ Yes | ❌ No | Best | Simple reductions (sum, min, max) |
| AggregateFunction | ✅ Yes | ❌ No | Great | Complex aggregations (avg, percentiles) |
| ProcessWindowFunction | ❌ No | ✅ Yes (Iterable) | Worst | Need all elements or window metadata |
| Aggregate + Process | ✅ Yes | ✅ Yes (pre-aggregated) | Great | Incremental + window metadata |
// ReduceFunction — most efficient, same input/output type stream.keyBy(Event::getKey) .window(TumblingEventTimeWindows.of(Time.minutes(1))) .reduce((a, b) -> new Event(a.getKey(), a.getValue() + b.getValue())); // AggregateFunction — flexible accumulator pattern stream.keyBy(Event::getKey) .window(TumblingEventTimeWindows.of(Time.minutes(1))) .aggregate(new AggregateFunction<Event, Accumulator, Result>() { public Accumulator createAccumulator() { return new Accumulator(0L, 0.0); } public Accumulator add(Event event, Accumulator acc) { return new Accumulator(acc.count + 1, acc.sum + event.getValue()); } public Result getResult(Accumulator acc) { return new Result(acc.sum / acc.count); // average } public Accumulator merge(Accumulator a, Accumulator b) { return new Accumulator(a.count + b.count, a.sum + b.sum); } }); // ProcessWindowFunction — access all elements + window metadata stream.keyBy(Event::getKey) .window(TumblingEventTimeWindows.of(Time.minutes(1))) .process(new ProcessWindowFunction<Event, Result, String, TimeWindow>() { public void process(String key, Context ctx, Iterable<Event> elements, Collector<Result> out) { TimeWindow window = ctx.window(); long count = StreamSupport.stream( elements.spliterator(), false).count(); out.collect(new Result(key, window.getStart(), count)); } }); // ⚠️ Buffers ALL elements in state — expensive for large windows! // Best of both: Aggregate + Process (incremental + metadata) stream.keyBy(Event::getKey) .window(TumblingEventTimeWindows.of(Time.minutes(1))) .aggregate(new CountAggregate(), new AddWindowInfo());
Always Prefer Incremental
Use ReduceFunction or AggregateFunction whenever possible. They process elements incrementally as they arrive — only the accumulator is stored in state. ProcessWindowFunction buffers ALL elements until the window fires, which can cause OOM for large windows. If you need window metadata, combine Aggregate + Process.
Window Triggers
Triggers determine when a window fires (emits results). The default trigger fires when the watermark passes the window end time. Custom triggers enable early firings, count-based firing, or complex conditions.
// Default: EventTimeTrigger — fires when watermark >= window.end // (This is what you get automatically with event-time windows) // Early firings: emit partial results every 30 seconds stream.keyBy(Event::getKey) .window(TumblingEventTimeWindows.of(Time.minutes(5))) .trigger(ContinuousProcessingTimeTrigger.of(Time.seconds(30))) .aggregate(new CountAggregate()); // Emits partial count every 30s, final count when window closes // Count trigger: fire every 1000 events within the window stream.keyBy(Event::getKey) .window(TumblingEventTimeWindows.of(Time.hours(1))) .trigger(CountTrigger.of(1000)) .aggregate(new SumAggregate()); // Emits partial sum every 1000 events, plus at window end // Purging trigger: clear state after firing stream.keyBy(Event::getKey) .window(TumblingEventTimeWindows.of(Time.minutes(1))) .trigger(PurgingTrigger.of(CountTrigger.of(100))) .aggregate(new CountAggregate()); // Fires and clears accumulator every 100 events
Trigger Types
- ✅EventTimeTrigger (default) — fires when watermark passes window end time
- ✅ProcessingTimeTrigger — fires based on wall-clock time (for early results)
- ✅CountTrigger — fires after N elements arrive in the window
- ✅ContinuousProcessingTimeTrigger — fires periodically (early partial results)
- ✅PurgingTrigger — wraps another trigger and clears state after firing
- ✅Custom triggers — implement Trigger interface for complex logic
Early Firings Pattern
A common pattern: use a 1-hour tumbling window with a ContinuousProcessingTimeTrigger of 10 seconds. This gives you approximate results every 10 seconds (low latency) plus a final correct result when the window closes (correctness). Downstream consumers can distinguish early from final results.
Interview Questions
Q:What's the difference between tumbling and sliding windows?
A: Tumbling windows are fixed-size, non-overlapping intervals. Each event belongs to exactly one window. Sliding windows have a fixed size but overlap — they advance by a 'slide' interval smaller than the window size. Each event belongs to (size/slide) windows. Example: 5-min tumbling → each event in 1 window. 5-min sliding with 1-min slide → each event in 5 windows. Tumbling is cheaper (less state) and simpler. Sliding gives smoother, more frequent updates but costs (size/slide)× more state.
Q:How do session windows work and what makes them special?
A: Session windows group events by periods of activity separated by inactivity gaps. They don't have a fixed size — they grow as long as events keep arriving within the gap threshold. What makes them special: (1) They're 'merging windows' — a new event can bridge two previously separate sessions into one. (2) Size is dynamic and per-key. (3) They only fire after the gap timeout passes (confirmed by watermark). Use case: user session analysis where a 'session' is defined by continuous activity with no gap > 30 minutes.
Q:Why should you prefer AggregateFunction over ProcessWindowFunction?
A: AggregateFunction is incremental — it processes each element as it arrives and only stores the accumulator in state. ProcessWindowFunction buffers ALL elements until the window fires, then iterates over them. For a 1-hour window with 1M events: AggregateFunction stores one accumulator object; ProcessWindowFunction stores 1M events. This difference causes OOM for large windows. Use ProcessWindowFunction only when you need access to all elements or window metadata. Best practice: combine Aggregate + Process to get incremental aggregation with window metadata access.
Q:What are window triggers and when would you use a custom trigger?
A: Triggers determine when a window emits results. Default: EventTimeTrigger fires when watermark passes window end. Custom triggers for: (1) Early firings — emit partial results every N seconds for low-latency dashboards while still getting final correct results. (2) Count-based — fire every N events for micro-batching. (3) Complex conditions — fire when a specific pattern is detected (e.g., 3 failed logins). (4) Purging — clear state after firing to limit memory. The early-firing pattern (ContinuousProcessingTimeTrigger) is the most common custom trigger use case.
Q:What's the state cost of sliding windows and how can you mitigate it?
A: Each event belongs to (window_size / slide_interval) windows. A 1-hour window sliding every 1 second means each event is in 3,600 windows — 3,600× the state of a tumbling window. Mitigation: (1) Use AggregateFunction (not ProcessWindowFunction) so only accumulators are stored per window. (2) Increase the slide interval — do you really need updates every second? (3) Consider a tumbling window + ProcessFunction with a manual rolling buffer for very frequent updates. (4) Use RocksDB state backend for large state that exceeds heap.
Common Mistakes
Using ProcessWindowFunction for simple aggregations
Using ProcessWindowFunction to compute a sum or count. This buffers all events in state until the window fires, causing OOM for large windows or high-throughput streams.
✅Use ReduceFunction or AggregateFunction for any computation that can be expressed incrementally. Only use ProcessWindowFunction when you genuinely need access to all elements or window metadata. Combine Aggregate + Process if you need both.
Sliding windows with tiny slide intervals
Creating a 1-hour sliding window with a 1-second slide. Each event belongs to 3,600 windows, multiplying state by 3,600×. This quickly exhausts memory and causes checkpoint timeouts.
✅Keep the ratio of size/slide reasonable (≤10). If you need very frequent updates, use a tumbling window with early-firing triggers (ContinuousProcessingTimeTrigger) or a ProcessFunction with manual state management.
Forgetting that session windows merge
Assuming session windows have a maximum size. A user who is continuously active for 8 hours creates one massive session window that accumulates all events in state.
✅Set a maximum session duration using a custom trigger that fires and purges after a time limit. Or use a ProcessFunction with manual session management that caps session length.
Not understanding window alignment
Expecting a 1-hour tumbling window to start when the first event arrives. Windows align to epoch by default — a 1-hour window always starts at :00 regardless of when events arrive.
✅Use the offset parameter to shift window alignment: TumblingEventTimeWindows.of(Time.hours(1), Time.minutes(30)) starts windows at :30. For per-key alignment, use session windows or a ProcessFunction.