Programming Model
Flink provides multiple API layers — from high-level SQL to low-level ProcessFunction. Understanding when to use each layer is key to building efficient, maintainable streaming applications.
Table of Contents
DataStream API
The DataStream API is Flink's core streaming API. It provides a fluent interface for defining data transformations on unbounded streams. Programs are lazy — they build a dataflow graph that executes only when env.execute() is called.
// Complete DataStream API example: real-time order analytics StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // Source: read from Kafka DataStream<Order> orders = env .fromSource(kafkaSource, WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(5)) .withTimestampAssigner((order, ts) -> order.getTimestamp()), "Orders Source"); // Transform: filter, map, enrich DataStream<EnrichedOrder> enriched = orders .filter(order -> order.getAmount() > 0) // filter invalid .map(order -> enrichWithUserData(order)) // enrich .name("Enrich Orders"); // operator name // Key by region for parallel processing KeyedStream<EnrichedOrder, String> keyed = enriched .keyBy(EnrichedOrder::getRegion); // Window aggregation: revenue per region per minute DataStream<RegionRevenue> revenue = keyed .window(TumblingEventTimeWindows.of(Time.minutes(1))) .aggregate(new RevenueAggregate()) .name("Revenue Aggregation"); // Sink: write to Kafka revenue.sinkTo(kafkaSink); // Execute (nothing runs until this is called!) env.execute("Order Analytics Job");
| Transformation | Type | Description |
|---|---|---|
| map() | 1:1 | Transform each element |
| flatMap() | 1:N | Transform each element into 0 or more |
| filter() | 1:0/1 | Keep or discard elements |
| keyBy() | Partition | Partition stream by key |
| window() | Group | Group elements into windows |
| union() | Merge | Merge multiple streams (same type) |
| connect() | Merge | Connect two streams (different types) |
| iterate() | Loop | Feed output back as input |
Lazy Evaluation
DataStream programs are lazily evaluated. Calling map(), filter(), or window() doesn't process any data — it builds a dataflow graph. Only env.execute() submits the graph to the cluster for execution. This allows Flink to optimize the entire graph before running it (operator chaining, resource allocation).
ProcessFunction
ProcessFunction is Flink's most powerful API — it gives you access to event time, state, timers, and side outputs. Use it when the higher-level APIs (map, window) can't express your logic. It's the escape hatch for complex stateful processing.
The Swiss Army Knife
If DataStream API operators are specialized tools (hammer, screwdriver), ProcessFunction is the Swiss Army Knife. It can do everything — access state, set timers, emit to multiple outputs, read the current watermark. It's more verbose but infinitely flexible. Use specialized tools when they fit; reach for ProcessFunction when they don't.
// ProcessFunction: detect users inactive for 30 minutes public class InactivityDetector extends KeyedProcessFunction<String, UserEvent, Alert> { // State: timestamp of last activity private ValueState<Long> lastActivityState; @Override public void open(Configuration params) { lastActivityState = getRuntimeContext().getState( new ValueStateDescriptor<>("last-activity", Long.class)); } @Override public void processElement(UserEvent event, Context ctx, Collector<Alert> out) throws Exception { // Update last activity time lastActivityState.update(event.getTimestamp()); // Delete old timer and set new one (30 min from now) long timerTime = event.getTimestamp() + Duration.ofMinutes(30).toMillis(); // Register event-time timer ctx.timerService().registerEventTimeTimer(timerTime); } @Override public void onTimer(long timestamp, OnTimerContext ctx, Collector<Alert> out) throws Exception { // Timer fired — check if user is still inactive Long lastActivity = lastActivityState.value(); if (lastActivity != null && timestamp >= lastActivity + Duration.ofMinutes(30).toMillis()) { // No activity for 30 minutes — emit alert out.collect(new Alert(ctx.getCurrentKey(), "USER_INACTIVE_30MIN")); } // If user was active since timer was set, do nothing // (the new activity set a new timer) } }
ProcessFunction Capabilities
- ✅Access to keyed state (ValueState, ListState, MapState)
- ✅Event-time and processing-time timers (onTimer callback)
- ✅Side outputs — emit to multiple output streams
- ✅Access to current watermark and processing time
- ✅Custom windowing logic without the Window API
- ✅Complex event patterns that CEP can't express
When to Use ProcessFunction
Use ProcessFunction when you need: (1) Timers — delayed actions based on event time. (2) Multiple outputs — side outputs for different event types. (3) Custom windowing — logic that doesn't fit standard window types. (4) Complex state access patterns — reading/writing multiple state variables conditionally. For simple transformations, stick with map/filter/window.
Table API & SQL
The Table API and Flink SQL provide a relational abstraction over streams. They treat streams as dynamic tables that change over time. Queries are continuous — they produce updating results as new data arrives. The same query works for both batch and streaming.
// Table API: relational operations on streams StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env); // Register Kafka source as a table tableEnv.executeSql(""" CREATE TABLE orders ( order_id STRING, user_id STRING, amount DECIMAL(10,2), order_time TIMESTAMP(3), WATERMARK FOR order_time AS order_time - INTERVAL '5' SECOND ) WITH ( 'connector' = 'kafka', 'topic' = 'orders', 'properties.bootstrap.servers' = 'kafka:9092', 'format' = 'json' ) """); // SQL query: continuous aggregation Table result = tableEnv.sqlQuery(""" SELECT user_id, TUMBLE_START(order_time, INTERVAL '1' HOUR) AS window_start, COUNT(*) AS order_count, SUM(amount) AS total_revenue FROM orders GROUP BY user_id, TUMBLE(order_time, INTERVAL '1' HOUR) """); // Convert back to DataStream if needed DataStream<Row> resultStream = tableEnv.toDataStream(result); // Or write directly to a sink table tableEnv.executeSql(""" INSERT INTO revenue_summary SELECT user_id, window_start, order_count, total_revenue FROM ... """);
| API Layer | Abstraction | Best For | Flexibility |
|---|---|---|---|
| SQL | Declarative queries | Analytics, ETL, ad-hoc queries | Low (SQL expressiveness) |
| Table API | Relational (programmatic) | Type-safe relational ops | Medium |
| DataStream API | Stream transformations | Custom logic, complex flows | High |
| ProcessFunction | Low-level events + state | Timers, side outputs, custom windows | Highest |
Unified Batch and Streaming
The same Table API / SQL query works for both bounded (batch) and unbounded (streaming) data. In streaming mode, queries produce continuous updating results. In batch mode, they produce a final result. This unification means you can develop and test with batch data, then deploy the same query on a live stream.
Connectors & Catalogs
Connectors are pluggable components that read from and write to external systems. Catalogs provide metadata management — they store table schemas, making them available across jobs without re-declaration.
Built-in Connectors
- ✅Kafka — source and sink with exactly-once support
- ✅Filesystem — source and sink with file rolling and exactly-once
- ✅JDBC — source and sink with upsert mode for databases
- ✅Elasticsearch — sink with bulk indexing
- ✅Kinesis — source and sink for AWS streaming
- ✅HBase — source and sink for wide-column store
- ✅Hive — source, sink, and catalog integration
// Hive Catalog: persistent table metadata tableEnv.executeSql(""" CREATE CATALOG hive_catalog WITH ( 'type' = 'hive', 'hive-conf-dir' = '/etc/hive/conf' ) """); // Use catalog — tables defined once, available to all jobs tableEnv.useCatalog("hive_catalog"); tableEnv.useDatabase("analytics"); // Query tables without re-declaring schema Table result = tableEnv.sqlQuery( "SELECT * FROM orders WHERE amount > 100"); // JDBC Catalog: read table schemas from PostgreSQL tableEnv.executeSql(""" CREATE CATALOG pg_catalog WITH ( 'type' = 'jdbc', 'base-url' = 'jdbc:postgresql://host:5432/', 'default-database' = 'mydb', 'username' = 'flink', 'password' = '...' ) """);
Timer Service
The Timer Service allows ProcessFunctions to register callbacks that fire at a specific event time or processing time. Timers are per-key, fault-tolerant (included in checkpoints), and deduplicated (registering the same timestamp twice results in one callback).
// Timer patterns in KeyedProcessFunction @Override public void processElement(Event event, Context ctx, Collector<Result> out) throws Exception { // Event-time timer: fires when watermark passes timestamp ctx.timerService().registerEventTimeTimer( event.getTimestamp() + 60_000); // 1 min after event // Processing-time timer: fires after wall-clock time ctx.timerService().registerProcessingTimeTimer( ctx.timerService().currentProcessingTime() + 30_000); // 30s // Delete a timer (cancel scheduled callback) ctx.timerService().deleteEventTimeTimer(oldTimerTimestamp); // Get current time references long eventTime = ctx.timestamp(); // current event's timestamp long watermark = ctx.timerService().currentWatermark(); long procTime = ctx.timerService().currentProcessingTime(); } @Override public void onTimer(long timestamp, OnTimerContext ctx, Collector<Result> out) throws Exception { // Determine which type of timer fired if (ctx.timeDomain() == TimeDomain.EVENT_TIME) { // Event-time timer fired (watermark advanced past timestamp) } else { // Processing-time timer fired (wall clock reached timestamp) } // Access state (same key as when timer was registered) MyState state = myStateHandle.value(); // Can register new timers from within onTimer ctx.timerService().registerEventTimeTimer(timestamp + 60_000); }
Timer Use Cases
- ✅Inactivity detection — alert if no event for key within N minutes
- ✅Delayed actions — trigger action N seconds after an event
- ✅Session timeout — close session after gap of inactivity
- ✅Periodic cleanup — purge stale state entries
- ✅SLA monitoring — alert if response not received within deadline
- ✅Batching — collect events for N seconds then emit batch
Timer Deduplication
Timers are deduplicated per key and timestamp. Registering the same timestamp multiple times results in only one onTimer call. This is useful for the "reset timer on activity" pattern — you don't need to delete the old timer before registering a new one at the same timestamp.
DataSet API (Legacy)
The DataSet API was Flink's original batch processing API. It has been deprecated since Flink 1.12 in favor of the unified DataStream API, which handles both batch and streaming with the same code.
| Aspect | DataSet API (Legacy) | DataStream API (Unified) |
|---|---|---|
| Status | Deprecated (Flink 1.12+) | Active, recommended |
| Processing model | Batch only | Batch + Streaming |
| Execution | Separate runtime | Unified runtime |
| State | No streaming state | Full state support |
| Fault tolerance | Task-level restart | Checkpointing |
| Migration | Rewrite to DataStream/Table API | — |
// Unified API: same code for batch and streaming StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // For batch: set runtime mode env.setRuntimeMode(RuntimeExecutionMode.BATCH); // For streaming: (default) env.setRuntimeMode(RuntimeExecutionMode.STREAMING); // For automatic detection based on sources: env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC); // Same transformation code works for both! DataStream<Result> result = env .fromSource(source, watermarkStrategy, "Source") .keyBy(Event::getKey) .window(TumblingEventTimeWindows.of(Time.hours(1))) .aggregate(new MyAggregate());
Migration Path
If you have existing DataSet API jobs, migrate to either the DataStream API (for programmatic control) or Table API/SQL (for relational operations). The Table API provides the smoothest migration for batch SQL workloads. The DataStream API with BATCH runtime mode replaces DataSet for programmatic batch jobs.
Interview Questions
Q:What are Flink's API layers and when would you use each?
A: Four layers, from highest to lowest: (1) SQL — declarative, best for analytics and ETL, least flexible. (2) Table API — programmatic relational operations, type-safe. (3) DataStream API — stream transformations (map, filter, window, keyBy), good balance of power and simplicity. (4) ProcessFunction — lowest level, access to state, timers, side outputs, watermarks. Use SQL/Table for standard analytics. DataStream for custom transformation logic. ProcessFunction for complex stateful patterns, timers, or when higher APIs can't express your logic.
Q:What is ProcessFunction and why is it called the most powerful API?
A: ProcessFunction gives access to everything: (1) Per-key state (ValueState, MapState, etc.). (2) Event-time and processing-time timers with onTimer callbacks. (3) Side outputs for emitting to multiple streams. (4) Current watermark and processing time. (5) The event's timestamp. It can implement any stateful computation — custom windows, complex patterns, delayed actions, session management. It's 'most powerful' because anything expressible in higher APIs can be implemented with ProcessFunction, plus things that can't (like timers or multi-output).
Q:How does Flink unify batch and stream processing?
A: Flink treats batch as a special case of streaming — a bounded stream. The same DataStream API and Table API/SQL work for both. You set RuntimeExecutionMode.BATCH or STREAMING. In batch mode: (1) No checkpointing needed (can restart from beginning). (2) Sort-based shuffles instead of hash-based. (3) Watermark is effectively infinity (all data available). (4) Windows fire once at the end. The unified model means you develop and test with batch data, then deploy the same code on live streams.
Q:Explain the Timer Service and give a use case.
A: The Timer Service lets ProcessFunctions register callbacks at future timestamps. Two types: event-time timers (fire when watermark passes the timestamp) and processing-time timers (fire at wall-clock time). Timers are per-key, fault-tolerant (checkpointed), and deduplicated. Use case — inactivity detection: on each user event, register a timer 30 minutes in the future. In onTimer, check if the user has been active since. If not, emit an alert. Each new event effectively 'resets' the timer by registering a new one.
Q:What's the difference between connect() and union() in DataStream?
A: union() merges multiple streams of the SAME type into one stream. Simple concatenation, no special processing logic. connect() joins two streams of POTENTIALLY DIFFERENT types into a ConnectedStream. You process each side with separate functions (processElement1, processElement2) that can share state. Use connect for: broadcast state pattern (rules + data), stream-stream joins, or any case where two different event types need to interact through shared state.
Common Mistakes
Using ProcessFunction for simple transformations
Writing a ProcessFunction to filter events or compute a sum — things that map(), filter(), or reduce() handle perfectly. ProcessFunction adds complexity (state management, timer cleanup) for no benefit.
✅Use the highest-level API that can express your logic. map() for 1:1 transforms, filter() for predicates, reduce()/aggregate() for aggregations. Only drop to ProcessFunction when you need timers, side outputs, or complex state patterns.
Not calling env.execute()
Building a DataStream program but forgetting to call env.execute() at the end. The program compiles and runs without errors but processes zero events — the graph is never submitted.
✅Always end your main() with env.execute('Job Name'). The name parameter appears in the Flink Web UI. For Table API, use tableEnv.executeSql() which triggers execution implicitly.
Not naming operators
Leaving operators unnamed. The Flink Web UI shows auto-generated names like 'Map -> Filter -> Map' which are impossible to debug when you have 20 operators.
✅Name every operator with .name('descriptive-name') and set .uid('stable-uid') for stateful operators. Good names make the Web UI readable and UIDs ensure savepoint compatibility.
Using DataSet API for new projects
Starting a new batch processing project with the deprecated DataSet API because old tutorials use it. The DataSet API won't receive new features and will eventually be removed.
✅Use DataStream API with RuntimeExecutionMode.BATCH for programmatic batch jobs, or Table API/SQL for relational batch processing. Both are actively maintained and support the unified batch/streaming model.