TumblingSlidingSessionGlobalTriggersWindow Functions

Windows

Windows are the mechanism for bounding infinite streams into finite chunks for computation. Flink provides tumbling, sliding, session, and global windows — each suited to different aggregation patterns.

40 min read9 sections
01

Why Windows

Streams are unbounded — they have no end. But most computations need a finite set of data to produce a result. "What's the average order value?" is meaningless on an infinite stream. "What's the average order value in the last 5 minutes?" is answerable. Windows provide that boundary.

🪟

The Train Window

Imagine looking out a train window at a continuous landscape. You can't describe 'the entire view' because it never ends. But you can describe what you see in a specific frame — the current window. Tumbling windows are like taking a photo every 5 seconds (non-overlapping snapshots). Sliding windows are like a video camera with a 10-second buffer that updates every 2 seconds (overlapping). Session windows are like looking at distinct scenes separated by tunnels (gaps).

window-need.txttext
Without windows (per-event processing):
  Event 1: count=1, sum=50,  avg=50.00
  Event 2: count=2, sum=120, avg=60.00
  Event 3: count=3, sum=180, avg=60.00
  ... (running aggregate, never "completes")

With tumbling window (1-minute):
  Window [10:00, 10:01): 45 events, avg=$62.30EMIT
  Window [10:01, 10:02): 52 events, avg=$58.10EMIT
  Window [10:02, 10:03): 38 events, avg=$71.20EMIT
  ... (discrete results, each window is complete)

Windows answer questions like:
"How many orders per minute?"Tumbling(1 min)
"Moving average over last 5 min, updated every 30s?"Sliding(5 min, 30s)
"Total spend per user session?"Session(30 min gap)
"Alert after 3 failed logins?"Global + Count trigger

Windows Are Per-Key

In Flink, windows are always applied after a keyBy operation. Each key gets its own independent set of windows. User A's 1-minute window is completely independent of User B's. This means millions of windows can be active simultaneously — one per key per active time interval.

02

Tumbling Windows

Tumbling windows are fixed-size, non-overlapping time intervals. Every event belongs to exactly one window. They're the simplest and most common window type — ideal for periodic aggregations like "events per minute" or "revenue per hour."

tumbling-windows.txttext
Tumbling Window (size = 1 minute):

Timeline:
  |----Window 1----|----Window 2----|----Window 3----|
  10:00           10:01           10:02           10:03

Events:
  [10:00:05] → Window 1
  [10:00:45] → Window 1
  [10:01:02] → Window 2
  [10:01:30] → Window 2
  [10:01:58] → Window 2
  [10:02:10] → Window 3

Properties:
Non-overlappingno event counted twice
Fixed sizeevery window is exactly 1 minute
Alignedwindows start at minute boundaries (configurable)
Completeevery event belongs to exactly one window
tumbling-example.javajava
// Count events per user per minute
DataStream<Result> result = events
    .keyBy(Event::getUserId)
    .window(TumblingEventTimeWindows.of(Time.minutes(1)))
    .aggregate(new CountAggregate());

// Revenue per product per hour, with offset
// (windows aligned to 30-minute mark instead of hour boundary)
DataStream<Revenue> revenue = orders
    .keyBy(Order::getProductId)
    .window(TumblingEventTimeWindows.of(
        Time.hours(1),
        Time.minutes(30)))  // offset: windows start at :30
    .reduce((a, b) -> new Revenue(
        a.getProductId(),
        a.getAmount() + b.getAmount()));

Window Alignment

By default, tumbling windows align to epoch (Unix time 0). A 1-hour window starts at :00, not when the first event arrives. Use the offset parameter to shift alignment — useful when your business day starts at a non-standard time or you're in a non-UTC timezone.

03

Sliding Windows

Sliding windows have a fixed size but overlap — they "slide" forward by a configurable step. An event can belong to multiple windows. They're ideal for moving averages and rolling aggregations where you need smoother, more frequent updates.

sliding-windows.txttext
Sliding Window (size = 5 min, slide = 1 min):

Timeline:
  |─────── Window A (10:00-10:05) ───────|
       |─────── Window B (10:01-10:06) ───────|
            |─────── Window C (10:02-10:07) ───────|

Event at 10:03:30 belongs to:
Window A (10:00-10:05)
Window B (10:01-10:06)
Window C (10:02-10:07)
Window D (10:03-10:08)  (not shown)
Window E (10:03-10:08)  (not shown)
Each event belongs to (size / slide) = 5 windows!

Properties:
  ⚠️ Overlappingevents counted in multiple windows
Smooth updatesnew result every slide interval
  ⚠️ Higher state costsize/slide × more state than tumbling
Great for moving averages and trend detection
sliding-example.javajava
// 5-minute moving average, updated every 30 seconds
DataStream<Average> movingAvg = metrics
    .keyBy(Metric::getSensorId)
    .window(SlidingEventTimeWindows.of(
        Time.minutes(5),    // window size
        Time.seconds(30)))  // slide interval
    .aggregate(new AverageAggregate());

// State cost: each event stored in 5min/30s = 10 windows
// For 1M events/min → 10M window assignments
// Consider: is the smoothness worth 10x state?

State Cost Warning

Sliding windows with a small slide relative to size are expensive. A 1-hour window sliding every 1 second means each event belongs to 3,600 windows. This multiplies state by 3,600x. If you need very frequent updates, consider using a tumbling window with a ProcessFunction that maintains a rolling buffer instead.

04

Session Windows

Session windows group events by activity periods separated by gaps of inactivity. They don't have a fixed size — they grow as long as events keep arriving within the gap threshold. They're perfect for user session analysis, where a "session" ends when the user stops interacting.

session-windows.txttext
Session Window (gap = 30 minutes):

User A's events:
  10:00  10:05  10:12  10:2511:30  11:35  11:40
  |──────── Session 1 ────────│  |──── Session 2 ────|
  (25 min duration)            65 min gap  (10 min duration)

User B's events:
  10:00  10:10  10:20  10:30  10:40  10:50  11:00  11:10
  |──────────────── Session 1 (70 min) ─────────────────|
  (no gap > 30 min, so one continuous session)

Properties:
Dynamic sizegrows with activity
Per-keyeach user has independent sessions
  ⚠️ Mergingnew events can merge previously separate sessions
  ⚠️ Late firingsession only fires after gap timeout passes
Perfect for user behavior analysis
session-example.javajava
// User session analysis with 30-minute gap
DataStream<SessionSummary> sessions = clickEvents
    .keyBy(ClickEvent::getUserId)
    .window(EventTimeSessionWindows.withGap(Time.minutes(30)))
    .aggregate(new SessionAggregator());

// Dynamic gap based on event properties
DataStream<SessionSummary> dynamicSessions = clickEvents
    .keyBy(ClickEvent::getUserId)
    .window(EventTimeSessionWindows.withDynamicGap(
        (event) -> {
            // Premium users get longer session timeout
            if (event.isPremium()) {
                return Time.minutes(60).toMilliseconds();
            }
            return Time.minutes(30).toMilliseconds();
        }))
    .aggregate(new SessionAggregator());

Session Window Merging

Session windows are "merging windows." When a new event arrives that bridges two previously separate sessions, Flink merges them into one. This means session window state can grow unpredictably. Monitor state size and consider setting a maximum session duration using a custom trigger to prevent unbounded growth.

05

Global & Count Windows

Global windows assign all events for a key to a single window that never closes on its own. You must provide a custom trigger to define when results are emitted. Count windows are a common pattern built on global windows — they fire after N events.

global-count-windows.javajava
// Count window: fire every 100 events per key
DataStream<Result> countWindow = events
    .keyBy(Event::getUserId)
    .countWindow(100)
    .aggregate(new BatchAggregate());

// Sliding count window: last 100 events, updated every 10
DataStream<Result> slidingCount = events
    .keyBy(Event::getUserId)
    .countWindow(100, 10)
    .aggregate(new BatchAggregate());

// Global window with custom trigger
DataStream<Alert> alerts = loginAttempts
    .keyBy(LoginAttempt::getUserId)
    .window(GlobalWindows.create())
    .trigger(new ThreeFailedLoginsTrigger())
    .process(new AlertFunction());

// Custom trigger: fire when 3 failed logins within 5 minutes
public class ThreeFailedLoginsTrigger
    extends Trigger<LoginAttempt, GlobalWindow> {

    @Override
    public TriggerResult onElement(LoginAttempt event,
            long timestamp, GlobalWindow window, TriggerContext ctx) {
        // Count failed attempts in state
        ValueState<Integer> count = ctx.getPartitionedState(...);
        int current = count.value() == null ? 0 : count.value();
        if (event.isFailed()) {
            current++;
            count.update(current);
        }
        if (current >= 3) {
            count.clear();
            return TriggerResult.FIRE_AND_PURGE;
        }
        return TriggerResult.CONTINUE;
    }
}
Window TypeSizeTriggerUse Case
TumblingFixed timeWatermark passes endPeriodic reports
SlidingFixed time, overlappingWatermark passes endMoving averages
SessionDynamic (gap-based)Gap timeoutUser sessions
GlobalInfinite (all events)Custom trigger requiredComplex patterns, count-based
CountN eventsEvery N eventsBatch processing, micro-batching

Global Windows Need Triggers

A global window without a trigger will never fire — it accumulates events forever until you run out of memory. Always pair global windows with a custom trigger. Common triggers: count-based, time-based (processing time timer), or condition-based (specific event pattern detected).

06

Window Functions

Window functions define what computation happens when a window fires. Flink provides three types with different trade-offs between efficiency and flexibility: ReduceFunction, AggregateFunction, and ProcessWindowFunction.

FunctionIncremental?Access to All Elements?PerformanceUse Case
ReduceFunction✅ Yes❌ NoBestSimple reductions (sum, min, max)
AggregateFunction✅ Yes❌ NoGreatComplex aggregations (avg, percentiles)
ProcessWindowFunction❌ No✅ Yes (Iterable)WorstNeed all elements or window metadata
Aggregate + Process✅ Yes✅ Yes (pre-aggregated)GreatIncremental + window metadata
window-functions.javajava
// ReduceFunction — most efficient, same input/output type
stream.keyBy(Event::getKey)
    .window(TumblingEventTimeWindows.of(Time.minutes(1)))
    .reduce((a, b) -> new Event(a.getKey(),
        a.getValue() + b.getValue()));

// AggregateFunction — flexible accumulator pattern
stream.keyBy(Event::getKey)
    .window(TumblingEventTimeWindows.of(Time.minutes(1)))
    .aggregate(new AggregateFunction<Event, Accumulator, Result>() {
        public Accumulator createAccumulator() {
            return new Accumulator(0L, 0.0);
        }
        public Accumulator add(Event event, Accumulator acc) {
            return new Accumulator(acc.count + 1,
                acc.sum + event.getValue());
        }
        public Result getResult(Accumulator acc) {
            return new Result(acc.sum / acc.count); // average
        }
        public Accumulator merge(Accumulator a, Accumulator b) {
            return new Accumulator(a.count + b.count,
                a.sum + b.sum);
        }
    });

// ProcessWindowFunction — access all elements + window metadata
stream.keyBy(Event::getKey)
    .window(TumblingEventTimeWindows.of(Time.minutes(1)))
    .process(new ProcessWindowFunction<Event, Result, String, TimeWindow>() {
        public void process(String key, Context ctx,
                Iterable<Event> elements, Collector<Result> out) {
            TimeWindow window = ctx.window();
            long count = StreamSupport.stream(
                elements.spliterator(), false).count();
            out.collect(new Result(key, window.getStart(), count));
        }
    });
// ⚠️ Buffers ALL elements in state — expensive for large windows!

// Best of both: Aggregate + Process (incremental + metadata)
stream.keyBy(Event::getKey)
    .window(TumblingEventTimeWindows.of(Time.minutes(1)))
    .aggregate(new CountAggregate(), new AddWindowInfo());

Always Prefer Incremental

Use ReduceFunction or AggregateFunction whenever possible. They process elements incrementally as they arrive — only the accumulator is stored in state. ProcessWindowFunction buffers ALL elements until the window fires, which can cause OOM for large windows. If you need window metadata, combine Aggregate + Process.

07

Window Triggers

Triggers determine when a window fires (emits results). The default trigger fires when the watermark passes the window end time. Custom triggers enable early firings, count-based firing, or complex conditions.

triggers.javajava
// Default: EventTimeTrigger — fires when watermark >= window.end
// (This is what you get automatically with event-time windows)

// Early firings: emit partial results every 30 seconds
stream.keyBy(Event::getKey)
    .window(TumblingEventTimeWindows.of(Time.minutes(5)))
    .trigger(ContinuousProcessingTimeTrigger.of(Time.seconds(30)))
    .aggregate(new CountAggregate());
// Emits partial count every 30s, final count when window closes

// Count trigger: fire every 1000 events within the window
stream.keyBy(Event::getKey)
    .window(TumblingEventTimeWindows.of(Time.hours(1)))
    .trigger(CountTrigger.of(1000))
    .aggregate(new SumAggregate());
// Emits partial sum every 1000 events, plus at window end

// Purging trigger: clear state after firing
stream.keyBy(Event::getKey)
    .window(TumblingEventTimeWindows.of(Time.minutes(1)))
    .trigger(PurgingTrigger.of(CountTrigger.of(100)))
    .aggregate(new CountAggregate());
// Fires and clears accumulator every 100 events

Trigger Types

  • EventTimeTrigger (default) — fires when watermark passes window end time
  • ProcessingTimeTrigger — fires based on wall-clock time (for early results)
  • CountTrigger — fires after N elements arrive in the window
  • ContinuousProcessingTimeTrigger — fires periodically (early partial results)
  • PurgingTrigger — wraps another trigger and clears state after firing
  • Custom triggers — implement Trigger interface for complex logic

Early Firings Pattern

A common pattern: use a 1-hour tumbling window with a ContinuousProcessingTimeTrigger of 10 seconds. This gives you approximate results every 10 seconds (low latency) plus a final correct result when the window closes (correctness). Downstream consumers can distinguish early from final results.

08

Interview Questions

Q:What's the difference between tumbling and sliding windows?

A: Tumbling windows are fixed-size, non-overlapping intervals. Each event belongs to exactly one window. Sliding windows have a fixed size but overlap — they advance by a 'slide' interval smaller than the window size. Each event belongs to (size/slide) windows. Example: 5-min tumbling → each event in 1 window. 5-min sliding with 1-min slide → each event in 5 windows. Tumbling is cheaper (less state) and simpler. Sliding gives smoother, more frequent updates but costs (size/slide)× more state.

Q:How do session windows work and what makes them special?

A: Session windows group events by periods of activity separated by inactivity gaps. They don't have a fixed size — they grow as long as events keep arriving within the gap threshold. What makes them special: (1) They're 'merging windows' — a new event can bridge two previously separate sessions into one. (2) Size is dynamic and per-key. (3) They only fire after the gap timeout passes (confirmed by watermark). Use case: user session analysis where a 'session' is defined by continuous activity with no gap > 30 minutes.

Q:Why should you prefer AggregateFunction over ProcessWindowFunction?

A: AggregateFunction is incremental — it processes each element as it arrives and only stores the accumulator in state. ProcessWindowFunction buffers ALL elements until the window fires, then iterates over them. For a 1-hour window with 1M events: AggregateFunction stores one accumulator object; ProcessWindowFunction stores 1M events. This difference causes OOM for large windows. Use ProcessWindowFunction only when you need access to all elements or window metadata. Best practice: combine Aggregate + Process to get incremental aggregation with window metadata access.

Q:What are window triggers and when would you use a custom trigger?

A: Triggers determine when a window emits results. Default: EventTimeTrigger fires when watermark passes window end. Custom triggers for: (1) Early firings — emit partial results every N seconds for low-latency dashboards while still getting final correct results. (2) Count-based — fire every N events for micro-batching. (3) Complex conditions — fire when a specific pattern is detected (e.g., 3 failed logins). (4) Purging — clear state after firing to limit memory. The early-firing pattern (ContinuousProcessingTimeTrigger) is the most common custom trigger use case.

Q:What's the state cost of sliding windows and how can you mitigate it?

A: Each event belongs to (window_size / slide_interval) windows. A 1-hour window sliding every 1 second means each event is in 3,600 windows — 3,600× the state of a tumbling window. Mitigation: (1) Use AggregateFunction (not ProcessWindowFunction) so only accumulators are stored per window. (2) Increase the slide interval — do you really need updates every second? (3) Consider a tumbling window + ProcessFunction with a manual rolling buffer for very frequent updates. (4) Use RocksDB state backend for large state that exceeds heap.

09

Common Mistakes

💾

Using ProcessWindowFunction for simple aggregations

Using ProcessWindowFunction to compute a sum or count. This buffers all events in state until the window fires, causing OOM for large windows or high-throughput streams.

Use ReduceFunction or AggregateFunction for any computation that can be expressed incrementally. Only use ProcessWindowFunction when you genuinely need access to all elements or window metadata. Combine Aggregate + Process if you need both.

📈

Sliding windows with tiny slide intervals

Creating a 1-hour sliding window with a 1-second slide. Each event belongs to 3,600 windows, multiplying state by 3,600×. This quickly exhausts memory and causes checkpoint timeouts.

Keep the ratio of size/slide reasonable (≤10). If you need very frequent updates, use a tumbling window with early-firing triggers (ContinuousProcessingTimeTrigger) or a ProcessFunction with manual state management.

🔄

Forgetting that session windows merge

Assuming session windows have a maximum size. A user who is continuously active for 8 hours creates one massive session window that accumulates all events in state.

Set a maximum session duration using a custom trigger that fires and purges after a time limit. Or use a ProcessFunction with manual session management that caps session length.

⏱️

Not understanding window alignment

Expecting a 1-hour tumbling window to start when the first event arrives. Windows align to epoch by default — a 1-hour window always starts at :00 regardless of when events arrive.

Use the offset parameter to shift window alignment: TumblingEventTimeWindows.of(Time.hours(1), Time.minutes(30)) starts windows at :30. For per-key alignment, use session windows or a ProcessFunction.