DataStream APIProcessFunctionTable APITimersSide OutputsUnified Batch/Stream

Programming Model

Flink provides multiple API layers — from high-level SQL to low-level ProcessFunction. Understanding when to use each layer is key to building efficient, maintainable streaming applications.

40 min read8 sections
01

DataStream API

The DataStream API is Flink's core streaming API. It provides a fluent interface for defining data transformations on unbounded streams. Programs are lazy — they build a dataflow graph that executes only when env.execute() is called.

datastream-api.javajava
// Complete DataStream API example: real-time order analytics
StreamExecutionEnvironment env =
    StreamExecutionEnvironment.getExecutionEnvironment();

// Source: read from Kafka
DataStream<Order> orders = env
    .fromSource(kafkaSource,
        WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(5))
            .withTimestampAssigner((order, ts) -> order.getTimestamp()),
        "Orders Source");

// Transform: filter, map, enrich
DataStream<EnrichedOrder> enriched = orders
    .filter(order -> order.getAmount() > 0)        // filter invalid
    .map(order -> enrichWithUserData(order))        // enrich
    .name("Enrich Orders");                         // operator name

// Key by region for parallel processing
KeyedStream<EnrichedOrder, String> keyed = enriched
    .keyBy(EnrichedOrder::getRegion);

// Window aggregation: revenue per region per minute
DataStream<RegionRevenue> revenue = keyed
    .window(TumblingEventTimeWindows.of(Time.minutes(1)))
    .aggregate(new RevenueAggregate())
    .name("Revenue Aggregation");

// Sink: write to Kafka
revenue.sinkTo(kafkaSink);

// Execute (nothing runs until this is called!)
env.execute("Order Analytics Job");
TransformationTypeDescription
map()1:1Transform each element
flatMap()1:NTransform each element into 0 or more
filter()1:0/1Keep or discard elements
keyBy()PartitionPartition stream by key
window()GroupGroup elements into windows
union()MergeMerge multiple streams (same type)
connect()MergeConnect two streams (different types)
iterate()LoopFeed output back as input

Lazy Evaluation

DataStream programs are lazily evaluated. Calling map(), filter(), or window() doesn't process any data — it builds a dataflow graph. Only env.execute() submits the graph to the cluster for execution. This allows Flink to optimize the entire graph before running it (operator chaining, resource allocation).

02

ProcessFunction

ProcessFunction is Flink's most powerful API — it gives you access to event time, state, timers, and side outputs. Use it when the higher-level APIs (map, window) can't express your logic. It's the escape hatch for complex stateful processing.

🔧

The Swiss Army Knife

If DataStream API operators are specialized tools (hammer, screwdriver), ProcessFunction is the Swiss Army Knife. It can do everything — access state, set timers, emit to multiple outputs, read the current watermark. It's more verbose but infinitely flexible. Use specialized tools when they fit; reach for ProcessFunction when they don't.

process-function.javajava
// ProcessFunction: detect users inactive for 30 minutes
public class InactivityDetector
    extends KeyedProcessFunction<String, UserEvent, Alert> {

    // State: timestamp of last activity
    private ValueState<Long> lastActivityState;

    @Override
    public void open(Configuration params) {
        lastActivityState = getRuntimeContext().getState(
            new ValueStateDescriptor<>("last-activity", Long.class));
    }

    @Override
    public void processElement(UserEvent event, Context ctx,
                               Collector<Alert> out) throws Exception {
        // Update last activity time
        lastActivityState.update(event.getTimestamp());

        // Delete old timer and set new one (30 min from now)
        long timerTime = event.getTimestamp() + Duration.ofMinutes(30).toMillis();

        // Register event-time timer
        ctx.timerService().registerEventTimeTimer(timerTime);
    }

    @Override
    public void onTimer(long timestamp, OnTimerContext ctx,
                        Collector<Alert> out) throws Exception {
        // Timer fired — check if user is still inactive
        Long lastActivity = lastActivityState.value();

        if (lastActivity != null &&
            timestamp >= lastActivity + Duration.ofMinutes(30).toMillis()) {
            // No activity for 30 minutes — emit alert
            out.collect(new Alert(ctx.getCurrentKey(),
                "USER_INACTIVE_30MIN"));
        }
        // If user was active since timer was set, do nothing
        // (the new activity set a new timer)
    }
}

ProcessFunction Capabilities

  • Access to keyed state (ValueState, ListState, MapState)
  • Event-time and processing-time timers (onTimer callback)
  • Side outputs — emit to multiple output streams
  • Access to current watermark and processing time
  • Custom windowing logic without the Window API
  • Complex event patterns that CEP can't express

When to Use ProcessFunction

Use ProcessFunction when you need: (1) Timers — delayed actions based on event time. (2) Multiple outputs — side outputs for different event types. (3) Custom windowing — logic that doesn't fit standard window types. (4) Complex state access patterns — reading/writing multiple state variables conditionally. For simple transformations, stick with map/filter/window.

03

Table API & SQL

The Table API and Flink SQL provide a relational abstraction over streams. They treat streams as dynamic tables that change over time. Queries are continuous — they produce updating results as new data arrives. The same query works for both batch and streaming.

table-api-sql.javajava
// Table API: relational operations on streams
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

// Register Kafka source as a table
tableEnv.executeSql("""
    CREATE TABLE orders (
        order_id STRING,
        user_id STRING,
        amount DECIMAL(10,2),
        order_time TIMESTAMP(3),
        WATERMARK FOR order_time AS order_time - INTERVAL '5' SECOND
    ) WITH (
        'connector' = 'kafka',
        'topic' = 'orders',
        'properties.bootstrap.servers' = 'kafka:9092',
        'format' = 'json'
    )
""");

// SQL query: continuous aggregation
Table result = tableEnv.sqlQuery("""
    SELECT
        user_id,
        TUMBLE_START(order_time, INTERVAL '1' HOUR) AS window_start,
        COUNT(*) AS order_count,
        SUM(amount) AS total_revenue
    FROM orders
    GROUP BY
        user_id,
        TUMBLE(order_time, INTERVAL '1' HOUR)
""");

// Convert back to DataStream if needed
DataStream<Row> resultStream = tableEnv.toDataStream(result);

// Or write directly to a sink table
tableEnv.executeSql("""
    INSERT INTO revenue_summary
    SELECT user_id, window_start, order_count, total_revenue
    FROM ...
""");
API LayerAbstractionBest ForFlexibility
SQLDeclarative queriesAnalytics, ETL, ad-hoc queriesLow (SQL expressiveness)
Table APIRelational (programmatic)Type-safe relational opsMedium
DataStream APIStream transformationsCustom logic, complex flowsHigh
ProcessFunctionLow-level events + stateTimers, side outputs, custom windowsHighest

Unified Batch and Streaming

The same Table API / SQL query works for both bounded (batch) and unbounded (streaming) data. In streaming mode, queries produce continuous updating results. In batch mode, they produce a final result. This unification means you can develop and test with batch data, then deploy the same query on a live stream.

04

Connectors & Catalogs

Connectors are pluggable components that read from and write to external systems. Catalogs provide metadata management — they store table schemas, making them available across jobs without re-declaration.

Built-in Connectors

  • Kafka — source and sink with exactly-once support
  • Filesystem — source and sink with file rolling and exactly-once
  • JDBC — source and sink with upsert mode for databases
  • Elasticsearch — sink with bulk indexing
  • Kinesis — source and sink for AWS streaming
  • HBase — source and sink for wide-column store
  • Hive — source, sink, and catalog integration
catalogs.javajava
// Hive Catalog: persistent table metadata
tableEnv.executeSql("""
    CREATE CATALOG hive_catalog WITH (
        'type' = 'hive',
        'hive-conf-dir' = '/etc/hive/conf'
    )
""");

// Use catalog — tables defined once, available to all jobs
tableEnv.useCatalog("hive_catalog");
tableEnv.useDatabase("analytics");

// Query tables without re-declaring schema
Table result = tableEnv.sqlQuery(
    "SELECT * FROM orders WHERE amount > 100");

// JDBC Catalog: read table schemas from PostgreSQL
tableEnv.executeSql("""
    CREATE CATALOG pg_catalog WITH (
        'type' = 'jdbc',
        'base-url' = 'jdbc:postgresql://host:5432/',
        'default-database' = 'mydb',
        'username' = 'flink',
        'password' = '...'
    )
""");
05

Timer Service

The Timer Service allows ProcessFunctions to register callbacks that fire at a specific event time or processing time. Timers are per-key, fault-tolerant (included in checkpoints), and deduplicated (registering the same timestamp twice results in one callback).

timer-service.javajava
// Timer patterns in KeyedProcessFunction

@Override
public void processElement(Event event, Context ctx,
                           Collector<Result> out) throws Exception {

    // Event-time timer: fires when watermark passes timestamp
    ctx.timerService().registerEventTimeTimer(
        event.getTimestamp() + 60_000); // 1 min after event

    // Processing-time timer: fires after wall-clock time
    ctx.timerService().registerProcessingTimeTimer(
        ctx.timerService().currentProcessingTime() + 30_000); // 30s

    // Delete a timer (cancel scheduled callback)
    ctx.timerService().deleteEventTimeTimer(oldTimerTimestamp);

    // Get current time references
    long eventTime = ctx.timestamp();           // current event's timestamp
    long watermark = ctx.timerService().currentWatermark();
    long procTime = ctx.timerService().currentProcessingTime();
}

@Override
public void onTimer(long timestamp, OnTimerContext ctx,
                    Collector<Result> out) throws Exception {
    // Determine which type of timer fired
    if (ctx.timeDomain() == TimeDomain.EVENT_TIME) {
        // Event-time timer fired (watermark advanced past timestamp)
    } else {
        // Processing-time timer fired (wall clock reached timestamp)
    }

    // Access state (same key as when timer was registered)
    MyState state = myStateHandle.value();

    // Can register new timers from within onTimer
    ctx.timerService().registerEventTimeTimer(timestamp + 60_000);
}

Timer Use Cases

  • Inactivity detection — alert if no event for key within N minutes
  • Delayed actions — trigger action N seconds after an event
  • Session timeout — close session after gap of inactivity
  • Periodic cleanup — purge stale state entries
  • SLA monitoring — alert if response not received within deadline
  • Batching — collect events for N seconds then emit batch

Timer Deduplication

Timers are deduplicated per key and timestamp. Registering the same timestamp multiple times results in only one onTimer call. This is useful for the "reset timer on activity" pattern — you don't need to delete the old timer before registering a new one at the same timestamp.

06

DataSet API (Legacy)

The DataSet API was Flink's original batch processing API. It has been deprecated since Flink 1.12 in favor of the unified DataStream API, which handles both batch and streaming with the same code.

AspectDataSet API (Legacy)DataStream API (Unified)
StatusDeprecated (Flink 1.12+)Active, recommended
Processing modelBatch onlyBatch + Streaming
ExecutionSeparate runtimeUnified runtime
StateNo streaming stateFull state support
Fault toleranceTask-level restartCheckpointing
MigrationRewrite to DataStream/Table API
unified-batch-stream.javajava
// Unified API: same code for batch and streaming
StreamExecutionEnvironment env =
    StreamExecutionEnvironment.getExecutionEnvironment();

// For batch: set runtime mode
env.setRuntimeMode(RuntimeExecutionMode.BATCH);

// For streaming: (default)
env.setRuntimeMode(RuntimeExecutionMode.STREAMING);

// For automatic detection based on sources:
env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);

// Same transformation code works for both!
DataStream<Result> result = env
    .fromSource(source, watermarkStrategy, "Source")
    .keyBy(Event::getKey)
    .window(TumblingEventTimeWindows.of(Time.hours(1)))
    .aggregate(new MyAggregate());

Migration Path

If you have existing DataSet API jobs, migrate to either the DataStream API (for programmatic control) or Table API/SQL (for relational operations). The Table API provides the smoothest migration for batch SQL workloads. The DataStream API with BATCH runtime mode replaces DataSet for programmatic batch jobs.

07

Interview Questions

Q:What are Flink's API layers and when would you use each?

A: Four layers, from highest to lowest: (1) SQL — declarative, best for analytics and ETL, least flexible. (2) Table API — programmatic relational operations, type-safe. (3) DataStream API — stream transformations (map, filter, window, keyBy), good balance of power and simplicity. (4) ProcessFunction — lowest level, access to state, timers, side outputs, watermarks. Use SQL/Table for standard analytics. DataStream for custom transformation logic. ProcessFunction for complex stateful patterns, timers, or when higher APIs can't express your logic.

Q:What is ProcessFunction and why is it called the most powerful API?

A: ProcessFunction gives access to everything: (1) Per-key state (ValueState, MapState, etc.). (2) Event-time and processing-time timers with onTimer callbacks. (3) Side outputs for emitting to multiple streams. (4) Current watermark and processing time. (5) The event's timestamp. It can implement any stateful computation — custom windows, complex patterns, delayed actions, session management. It's 'most powerful' because anything expressible in higher APIs can be implemented with ProcessFunction, plus things that can't (like timers or multi-output).

Q:How does Flink unify batch and stream processing?

A: Flink treats batch as a special case of streaming — a bounded stream. The same DataStream API and Table API/SQL work for both. You set RuntimeExecutionMode.BATCH or STREAMING. In batch mode: (1) No checkpointing needed (can restart from beginning). (2) Sort-based shuffles instead of hash-based. (3) Watermark is effectively infinity (all data available). (4) Windows fire once at the end. The unified model means you develop and test with batch data, then deploy the same code on live streams.

Q:Explain the Timer Service and give a use case.

A: The Timer Service lets ProcessFunctions register callbacks at future timestamps. Two types: event-time timers (fire when watermark passes the timestamp) and processing-time timers (fire at wall-clock time). Timers are per-key, fault-tolerant (checkpointed), and deduplicated. Use case — inactivity detection: on each user event, register a timer 30 minutes in the future. In onTimer, check if the user has been active since. If not, emit an alert. Each new event effectively 'resets' the timer by registering a new one.

Q:What's the difference between connect() and union() in DataStream?

A: union() merges multiple streams of the SAME type into one stream. Simple concatenation, no special processing logic. connect() joins two streams of POTENTIALLY DIFFERENT types into a ConnectedStream. You process each side with separate functions (processElement1, processElement2) that can share state. Use connect for: broadcast state pattern (rules + data), stream-stream joins, or any case where two different event types need to interact through shared state.

08

Common Mistakes

🔨

Using ProcessFunction for simple transformations

Writing a ProcessFunction to filter events or compute a sum — things that map(), filter(), or reduce() handle perfectly. ProcessFunction adds complexity (state management, timer cleanup) for no benefit.

Use the highest-level API that can express your logic. map() for 1:1 transforms, filter() for predicates, reduce()/aggregate() for aggregations. Only drop to ProcessFunction when you need timers, side outputs, or complex state patterns.

🔄

Not calling env.execute()

Building a DataStream program but forgetting to call env.execute() at the end. The program compiles and runs without errors but processes zero events — the graph is never submitted.

Always end your main() with env.execute('Job Name'). The name parameter appears in the Flink Web UI. For Table API, use tableEnv.executeSql() which triggers execution implicitly.

📛

Not naming operators

Leaving operators unnamed. The Flink Web UI shows auto-generated names like 'Map -> Filter -> Map' which are impossible to debug when you have 20 operators.

Name every operator with .name('descriptive-name') and set .uid('stable-uid') for stateful operators. Good names make the Web UI readable and UIDs ensure savepoint compatibility.

🗂️

Using DataSet API for new projects

Starting a new batch processing project with the deprecated DataSet API because old tutorials use it. The DataSet API won't receive new features and will eventually be removed.

Use DataStream API with RuntimeExecutionMode.BATCH for programmatic batch jobs, or Table API/SQL for relational batch processing. Both are actively maintained and support the unified batch/streaming model.