Flink SQL
Flink SQL brings the power of SQL to stream processing. Streams become dynamic tables, queries become continuous, and the same SQL works for both batch and real-time analytics ā making streaming accessible to anyone who knows SQL.
Table of Contents
Streaming SQL Concepts
Traditional SQL operates on static tables ā you query and get a result. Flink SQL operates on dynamic tables ā tables that change over time as new events arrive. Queries are continuous ā they produce updating results as the input changes.
The Live Scoreboard
Traditional SQL is like checking the final score after a game. Flink SQL is like watching a live scoreboard ā the numbers update continuously as the game progresses. The query 'SELECT team, SUM(points)' doesn't run once; it runs forever, updating the result every time a team scores. The 'table' is the stream of scoring events, and the 'result' is the continuously-updating scoreboard.
Dynamic Tables Concept: Stream ā Dynamic Table ā Continuous Query ā Dynamic Result Table ā Stream 1. Stream to Table: Events: (10:01, Alice, click), (10:02, Bob, purchase), (10:03, Alice, click) Dynamic Table "events": Time ā User ā Action āāāāāāā¼āāāāāāāā¼āāāāāāāāā 10:01 ā Alice ā click ā row added 10:02 ā Bob ā purchase ā row added 10:03 ā Alice ā click ā row added 2. Continuous Query: SELECT user, COUNT(*) AS cnt FROM events GROUP BY user Result table (updates over time): After event 1: { Alice: 1 } After event 2: { Alice: 1, Bob: 1 } After event 3: { Alice: 2, Bob: 1 } ā Alice's count UPDATED 3. Result Table to Stream: Append stream: (+Alice,1), (+Bob,1), (+Alice,2) ā wrong! overwrites Retract stream: (+Alice,1), (+Bob,1), (-Alice,1), (+Alice,2) ā correct! Upsert stream: (Aliceā1), (Bobā1), (Aliceā2) ā with primary key
Not All Queries Work on Streams
Some SQL operations require seeing all data (e.g., ORDER BY without a time window, or LIMIT on an unbounded stream). Flink SQL restricts certain operations on unbounded inputs. Aggregations with GROUP BY work (produce updating results), but unbounded sorts don't.
Time in SQL
Flink SQL supports both processing time and event time. Time attributes are declared in the table schema and used in window functions, temporal joins, and time-based operations.
-- Processing time: computed column CREATE TABLE pageviews ( user_id STRING, page STRING, proc_time AS PROCTIME() -- processing time attribute ) WITH ('connector' = 'kafka', ...); -- Event time: from a column + watermark declaration CREATE TABLE orders ( order_id STRING, amount DECIMAL(10,2), order_time TIMESTAMP(3), -- Watermark: tolerate 5 seconds of out-of-orderness WATERMARK FOR order_time AS order_time - INTERVAL '5' SECOND ) WITH ('connector' = 'kafka', ...); -- Tumbling window aggregation (event time) SELECT user_id, TUMBLE_START(order_time, INTERVAL '1' HOUR) AS window_start, TUMBLE_END(order_time, INTERVAL '1' HOUR) AS window_end, COUNT(*) AS order_count, SUM(amount) AS total_amount FROM orders GROUP BY user_id, TUMBLE(order_time, INTERVAL '1' HOUR); -- Sliding window (HOP) SELECT user_id, HOP_START(order_time, INTERVAL '5' MINUTE, INTERVAL '1' HOUR) AS ws, COUNT(*) AS cnt FROM orders GROUP BY user_id, HOP(order_time, INTERVAL '5' MINUTE, INTERVAL '1' HOUR); -- Session window SELECT user_id, SESSION_START(order_time, INTERVAL '30' MINUTE) AS session_start, COUNT(*) AS events_in_session FROM orders GROUP BY user_id, SESSION(order_time, INTERVAL '30' MINUTE);
| Window Function | SQL Syntax | Equivalent DataStream |
|---|---|---|
| Tumbling | TUMBLE(time_col, INTERVAL '1' HOUR) | TumblingEventTimeWindows.of(Time.hours(1)) |
| Sliding (Hop) | HOP(time_col, INTERVAL '5' MIN, INTERVAL '1' HOUR) | SlidingEventTimeWindows.of(Time.hours(1), Time.minutes(5)) |
| Session | SESSION(time_col, INTERVAL '30' MIN) | EventTimeSessionWindows.withGap(Time.minutes(30)) |
| Cumulate | CUMULATE(time_col, INTERVAL '1' MIN, INTERVAL '1' HOUR) | Custom (expanding windows within period) |
Kafka + Flink SQL
Kafka is the most common connector for Flink SQL. You define Kafka topics as tables with CREATE TABLE DDL, specifying the format (JSON, Avro, Protobuf) and connection properties.
-- Source table: Kafka topic with JSON format CREATE TABLE user_events ( event_id STRING, user_id STRING, event_type STRING, page STRING, event_time TIMESTAMP(3), -- Metadata columns from Kafka kafka_partition INT METADATA FROM 'partition' VIRTUAL, kafka_offset BIGINT METADATA FROM 'offset' VIRTUAL, -- Watermark for event-time processing WATERMARK FOR event_time AS event_time - INTERVAL '10' SECOND ) WITH ( 'connector' = 'kafka', 'topic' = 'user-events', 'properties.bootstrap.servers' = 'kafka:9092', 'properties.group.id' = 'flink-sql-analytics', 'scan.startup.mode' = 'earliest-offset', 'format' = 'json', 'json.fail-on-missing-field' = 'false', 'json.ignore-parse-errors' = 'true' ); -- Sink table: Kafka topic for results CREATE TABLE hourly_stats ( user_id STRING, window_start TIMESTAMP(3), event_count BIGINT, unique_pages BIGINT, PRIMARY KEY (user_id, window_start) NOT ENFORCED ) WITH ( 'connector' = 'kafka', 'topic' = 'hourly-user-stats', 'properties.bootstrap.servers' = 'kafka:9092', 'format' = 'json', 'sink.delivery-guarantee' = 'exactly-once', 'sink.transactional-id-prefix' = 'sql-job-1' ); -- Continuous query: insert results into sink INSERT INTO hourly_stats SELECT user_id, TUMBLE_START(event_time, INTERVAL '1' HOUR) AS window_start, COUNT(*) AS event_count, COUNT(DISTINCT page) AS unique_pages FROM user_events GROUP BY user_id, TUMBLE(event_time, INTERVAL '1' HOUR);
Kafka Metadata Columns
Flink SQL can expose Kafka metadata (partition, offset, timestamp, headers) as virtual columns. This is useful for debugging, auditing, and routing logic without modifying the event payload.
Changelog Streams
When a continuous query produces updating results (e.g., GROUP BY), the output is a changelog stream ā a sequence of insert, update, and delete operations. Flink SQL supports multiple changelog encodings for different sink requirements.
Changelog Modes: 1. Append-only (INSERT only): Used when results only grow (no updates/deletes) Example: SELECT * FROM events (no aggregation) Output: +I[Alice, click, 10:01] +I[Bob, purchase, 10:02] 2. Retract (INSERT + DELETE pairs): Used when results update (old value retracted, new value inserted) Example: SELECT user, COUNT(*) FROM events GROUP BY user Output: +I[Alice, 1] ā insert Alice=1 +I[Bob, 1] ā insert Bob=1 -D[Alice, 1] ā retract old Alice value +I[Alice, 2] ā insert new Alice value 3. Upsert (UPSERT + DELETE): Used when results have a primary key (update by key) Example: Same query, but sink has PRIMARY KEY (user) Output: +U[Alice, 1] ā upsert Alice=1 +U[Bob, 1] ā upsert Bob=1 +U[Alice, 2] ā upsert Alice=2 (overwrites) Sink compatibility: Kafka (append): only append-only queries Kafka (upsert): queries with PRIMARY KEY ā upsert-kafka connector JDBC: supports all modes (INSERT, UPDATE, DELETE) Elasticsearch: upsert mode with document ID
| Connector | Append | Retract | Upsert |
|---|---|---|---|
| kafka (standard) | ā | ā | ā |
| upsert-kafka | ā | ā | ā (with key) |
| jdbc | ā | ā | ā |
| elasticsearch | ā | ā | ā (with doc ID) |
| filesystem | ā | ā | ā |
Upsert-Kafka Connector
For aggregation results written to Kafka, use the upsert-kafka connector. It produces a compacted topic where each key has only the latest value. Downstream consumers see the current state without processing the full changelog history.
Temporal Joins
Temporal joins allow you to join a stream with a versioned table at a specific point in time. This is essential for enrichment with slowly-changing dimension data ā you want the value that was valid at the time of the event, not the current value.
-- Versioned table: exchange rates that change over time CREATE TABLE exchange_rates ( currency STRING, rate DECIMAL(10,6), update_time TIMESTAMP(3), WATERMARK FOR update_time AS update_time - INTERVAL '5' SECOND, PRIMARY KEY (currency) NOT ENFORCED ) WITH ( 'connector' = 'kafka', 'topic' = 'exchange-rates', 'format' = 'debezium-json' -- CDC format for versioned table ); -- Fact table: orders stream CREATE TABLE orders ( order_id STRING, currency STRING, amount DECIMAL(10,2), order_time TIMESTAMP(3), WATERMARK FOR order_time AS order_time - INTERVAL '5' SECOND ) WITH ( 'connector' = 'kafka', 'topic' = 'orders', 'format' = 'json' ); -- Temporal join: get exchange rate valid at order time SELECT o.order_id, o.amount, o.currency, r.rate, o.amount * r.rate AS usd_amount FROM orders AS o JOIN exchange_rates FOR SYSTEM_TIME AS OF o.order_time AS r ON o.currency = r.currency; -- This gives the rate that was valid WHEN the order was placed -- Not the current rate (which may have changed since)
Temporal Join Requirements
- ā The versioned table must have a PRIMARY KEY declared
- ā The versioned table must have a time attribute (event time with watermark)
- ā The join condition must include the primary key of the versioned table
- ā FOR SYSTEM_TIME AS OF references the time attribute of the fact table
- ā The versioned table is typically a CDC source (Debezium) or a compacted Kafka topic
Flink SQL Gateway
The Flink SQL Gateway provides a REST API and JDBC interface for submitting SQL queries to a Flink cluster. It enables interactive SQL development, BI tool integration, and multi-tenant SQL execution without writing Java code.
Flink SQL Gateway Architecture: āāāāāāāāāāāāāāā āāāāāāāāāāāāāāāāāāāā āāāāāāāāāāāāāāāā ā SQL Client āāāāāāā SQL Gateway āāāāāāā Flink Clusterā ā (CLI/JDBC) ā ā (REST API) ā ā ā āāāāāāāāāāāāāāā āāāāāāāāāāāāāāāāāāāā āāāāāāāāāāāāāāāā ā Session management ā ā Query parsing ā ā Result caching ā Interfaces: - REST API: POST /v1/sessions/{id}/statements - JDBC: jdbc:flink://gateway-host:8083 - HiveServer2 compatible: works with existing BI tools Use cases: - Interactive SQL development (Flink SQL CLI) - BI tool integration (Tableau, Superset via JDBC) - Multi-tenant SQL execution (shared gateway, isolated sessions) - Ad-hoc queries on streaming data Starting the gateway: ./bin/sql-gateway.sh start -Dsql-gateway.endpoint.rest.address=0.0.0.0
SQL Gateway vs Embedded SQL
Use the SQL Gateway when you want to submit SQL without packaging a JAR ā ideal for data analysts, BI tools, and interactive exploration. Use embedded SQL (TableEnvironment in Java) when SQL is part of a larger application with custom DataStream logic.
Interview Questions
Q:What are dynamic tables in Flink SQL?
A: Dynamic tables are Flink SQL's abstraction for streams. Unlike static tables (fixed data), dynamic tables change over time as new events arrive. A stream is converted to a dynamic table (each event becomes a row), queries run continuously producing updating results (another dynamic table), and the result table is converted back to a stream (changelog). This abstraction lets you write standard SQL that works on both bounded (batch) and unbounded (streaming) data.
Q:Explain changelog streams and the different modes.
A: When a continuous query produces updating results (e.g., GROUP BY), the output is a changelog ā a sequence of changes. Three modes: (1) Append-only ā only inserts, for queries without updates (simple SELECT, window aggregations). (2) Retract ā delete old value + insert new value for each update. Works with any sink but verbose. (3) Upsert ā update by primary key. More efficient but requires the sink to support upsert (JDBC, Elasticsearch, upsert-kafka). The mode determines which sinks are compatible with your query.
Q:What is a temporal join and when would you use it?
A: A temporal join enriches a fact stream with dimension data valid at the event's timestamp (FOR SYSTEM_TIME AS OF). Example: joining orders with exchange rates ā you want the rate when the order was placed, not the current rate. Requirements: versioned table with PRIMARY KEY and time attribute (typically CDC source). Without temporal joins, you'd either use the current value (wrong for historical events) or maintain a full history table (expensive). Temporal joins give point-in-time correctness automatically.
Q:How does Flink SQL handle event time and watermarks?
A: In CREATE TABLE DDL, you declare: (1) A TIMESTAMP column as the event time attribute. (2) A WATERMARK clause defining the watermark strategy: WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND (bounded out-of-orderness). Window functions (TUMBLE, HOP, SESSION) then use this time attribute for event-time semantics. For processing time, use a computed column: proc_time AS PROCTIME(). The watermark declaration is equivalent to WatermarkStrategy.forBoundedOutOfOrderness() in the DataStream API.
Common Mistakes
Writing aggregation results to append-only Kafka
Using the standard kafka connector for GROUP BY query results. The query produces updates (changelog), but the kafka connector only supports appends ā you get duplicate/conflicting records.
ā Use the upsert-kafka connector for aggregation results. It produces a compacted topic where each key has only the latest value. Declare PRIMARY KEY in the sink table definition.
Forgetting WATERMARK declaration
Defining a TIMESTAMP column but not declaring a WATERMARK. Event-time windows never fire because Flink doesn't know how to advance event time without watermarks.
ā Always add WATERMARK FOR time_col AS time_col - INTERVAL 'N' SECOND for event-time processing. Without it, you can only use processing-time windows (PROCTIME()).
Unbounded GROUP BY without state TTL
Running SELECT user_id, COUNT(*) FROM events GROUP BY user_id on an unbounded stream. State grows forever ā one entry per unique user_id, never cleaned up.
ā Set table.exec.state.ttl in the table environment configuration. This automatically expires state entries that haven't been updated within the TTL period. Choose TTL based on your business requirements.
Regular join instead of temporal join for enrichment
Using a regular JOIN between a fact stream and a dimension stream. This creates a stream-stream join that buffers both sides in state ā expensive and semantically wrong for dimension lookups.
ā Use temporal join (FOR SYSTEM_TIME AS OF) for enrichment with slowly-changing dimensions. It maintains only the latest version of the dimension table in state and gives point-in-time correct results.