Common Patterns
Real-world Flink applications follow recurring architectural patterns. From real-time aggregation pipelines to fraud detection with stateful pattern matching, these patterns form the building blocks of production streaming systems.
Table of Contents
Real-Time Aggregation Pipeline
The most common Flink pattern: consume events from Kafka, aggregate in real-time windows, and write results to a serving layer (Redis, Elasticsearch, or a database) for low-latency queries.
Ingest from Kafka
Kafka source with event-time watermarks and per-partition tracking. Multiple topics if needed.
Filter & Transform
Validate events, parse fields, filter invalid data. Stateless operations chained for efficiency.
Key & Window
keyBy dimension (user, region, product), apply tumbling/sliding window, aggregate with incremental function.
Write to Serving Layer
Sink aggregated results to Redis (low-latency lookups), Elasticsearch (search/dashboards), or PostgreSQL (reporting).
// Real-time revenue dashboard: per-region, per-minute DataStream<RegionRevenue> pipeline = env .fromSource(kafkaSource, watermarkStrategy, "Orders") .filter(order -> order.getStatus().equals("COMPLETED")) .keyBy(Order::getRegion) .window(TumblingEventTimeWindows.of(Time.minutes(1))) .aggregate(new AggregateFunction<Order, RevenueAcc, RegionRevenue>() { public RevenueAcc createAccumulator() { return new RevenueAcc(0L, 0.0, 0L); } public RevenueAcc add(Order order, RevenueAcc acc) { return new RevenueAcc( acc.count + 1, acc.revenue + order.getAmount(), Math.max(acc.maxOrder, order.getAmount())); } public RegionRevenue getResult(RevenueAcc acc) { return new RegionRevenue(acc.count, acc.revenue, acc.maxOrder); } public RevenueAcc merge(RevenueAcc a, RevenueAcc b) { return new RevenueAcc( a.count + b.count, a.revenue + b.revenue, Math.max(a.maxOrder, b.maxOrder)); } }); // Dual sink: Redis for real-time + Kafka for downstream pipeline.addSink(new RedisSink()); pipeline.sinkTo(kafkaResultSink);
Dual Sink Pattern
Write aggregated results to both a serving layer (Redis for real-time dashboards) and a message bus (Kafka for downstream consumers). This decouples the real-time serving from further processing and provides a replay mechanism.
Streaming ETL
Streaming ETL continuously transforms, validates, and enriches data as it flows through the pipeline. Unlike batch ETL that runs periodically, streaming ETL provides near-real-time data freshness in the target system.
// Streaming ETL: raw events → cleaned, enriched, partitioned data lake DataStream<RawEvent> raw = env.fromSource(kafkaSource, wms, "Raw Events"); // Extract: parse and validate DataStream<ParsedEvent> parsed = raw .flatMap(new EventParser()) // parse JSON, handle malformed .name("Parse Events"); // Transform: normalize, deduplicate, enrich OutputTag<ParsedEvent> deadLetterTag = new OutputTag<>("dead-letter") {}; SingleOutputStreamOperator<EnrichedEvent> enriched = parsed .keyBy(ParsedEvent::getEventId) .process(new DeduplicationFunction(Duration.ofHours(1))) // dedup .keyBy(ParsedEvent::getUserId) .process(new EnrichmentFunction()) // add user metadata from state .name("Enrich"); // Load: write to multiple destinations based on event type enriched .filter(event -> event.getType().equals("PURCHASE")) .sinkTo(purchasesSink); // → purchases topic enriched .filter(event -> event.getType().equals("PAGEVIEW")) .sinkTo(pageviewsSink); // → pageviews topic // Dead letter queue for failed events enriched.getSideOutput(deadLetterTag) .sinkTo(deadLetterSink); // → DLQ for investigation
Streaming ETL Best Practices
- ✅Dead letter queue — route unparseable/invalid events to a DLQ instead of dropping them
- ✅Deduplication — use keyed state with TTL to detect and remove duplicate events
- ✅Schema validation — validate against schema registry before processing
- ✅Idempotent writes — use upsert sinks so reprocessing doesn't create duplicates
- ✅Monitoring — track parse failure rate, enrichment cache hit rate, end-to-end latency
Fraud Detection
Fraud detection requires stateful pattern matching across events — detecting sequences like "small transaction followed by large transaction at a new merchant within 5 minutes." Flink's keyed state and timers make this natural.
// Fraud pattern: small tx → large tx at new merchant within 5 min public class FraudDetector extends KeyedProcessFunction<String, Transaction, Alert> { private ValueState<Transaction> lastSmallTxState; private ValueState<Long> timerState; private MapState<String, Boolean> knownMerchantsState; @Override public void open(Configuration params) { lastSmallTxState = getRuntimeContext().getState( new ValueStateDescriptor<>("small-tx", Transaction.class)); timerState = getRuntimeContext().getState( new ValueStateDescriptor<>("timer", Long.class)); knownMerchantsState = getRuntimeContext().getMapState( new MapStateDescriptor<>("merchants", String.class, Boolean.class)); } @Override public void processElement(Transaction tx, Context ctx, Collector<Alert> out) throws Exception { Transaction lastSmall = lastSmallTxState.value(); if (tx.getAmount() < 1.00) { // Small transaction — potential probe lastSmallTxState.update(tx); // Set 5-minute timer long timer = tx.getTimestamp() + 5 * 60 * 1000; ctx.timerService().registerEventTimeTimer(timer); timerState.update(timer); } else if (tx.getAmount() > 500.00 && lastSmall != null) { // Large transaction after small — check merchant if (!knownMerchantsState.contains(tx.getMerchant())) { out.collect(new Alert(ctx.getCurrentKey(), "PROBE_THEN_LARGE_NEW_MERCHANT", lastSmall, tx)); } cleanup(); } // Track known merchants knownMerchantsState.put(tx.getMerchant(), true); } @Override public void onTimer(long timestamp, OnTimerContext ctx, Collector<Alert> out) { // 5 minutes passed without large tx — clear state cleanup(); } private void cleanup() { lastSmallTxState.clear(); timerState.clear(); } }
Fraud Detection at Scale
At companies like Alibaba and PayPal, Flink processes millions of transactions per second for fraud detection. The key: state is partitioned by user/account, so each parallel instance handles a subset of users independently. Patterns are evaluated per-key with sub-millisecond latency.
Change Data Capture Processing
CDC captures database changes (INSERT, UPDATE, DELETE) as a stream of events. Flink processes these changes in real-time — materializing views, syncing to search indexes, or triggering downstream workflows.
CDC Pipeline Architecture: PostgreSQL → Debezium → Kafka → Flink → Elasticsearch/Redis 1. Debezium captures WAL changes from PostgreSQL 2. Publishes change events to Kafka (one topic per table) 3. Flink consumes change events with CDC format 4. Processes: join tables, transform, aggregate 5. Writes materialized view to serving layer Change event format (Debezium): { "op": "u", // c=create, u=update, d=delete "before": { "id": 1, "status": "pending", "amount": 50 }, "after": { "id": 1, "status": "shipped", "amount": 50 }, "source": { "table": "orders", "lsn": "0/1234" } }
-- Flink SQL: CDC source from Kafka (Debezium format) CREATE TABLE orders_cdc ( id INT, user_id INT, status STRING, amount DECIMAL(10,2), updated_at TIMESTAMP(3), PRIMARY KEY (id) NOT ENFORCED ) WITH ( 'connector' = 'kafka', 'topic' = 'dbserver1.public.orders', 'properties.bootstrap.servers' = 'kafka:9092', 'format' = 'debezium-json' ); -- Materialized view: real-time order stats per user CREATE TABLE user_order_stats ( user_id INT, total_orders BIGINT, total_revenue DECIMAL(10,2), PRIMARY KEY (user_id) NOT ENFORCED ) WITH ( 'connector' = 'jdbc', 'url' = 'jdbc:postgresql://host:5432/analytics', 'table-name' = 'user_order_stats' ); INSERT INTO user_order_stats SELECT user_id, COUNT(*) AS total_orders, SUM(amount) AS total_revenue FROM orders_cdc WHERE status = 'completed' GROUP BY user_id;
CDC + Flink SQL
Flink SQL with CDC connectors (debezium-json, canal-json) is the simplest way to build real-time materialized views. Define CDC source tables, write a GROUP BY query, and Flink continuously maintains the aggregation as the source database changes. No custom code needed.
Session Analysis
Session analysis groups user events into activity sessions separated by inactivity gaps. It answers questions like "how long do users spend per session?" and "what's the conversion funnel within a session?"
// Session analysis: compute session metrics DataStream<SessionSummary> sessions = clickEvents .keyBy(ClickEvent::getUserId) .window(EventTimeSessionWindows.withGap(Time.minutes(30))) .aggregate( new AggregateFunction<ClickEvent, SessionAcc, SessionSummary>() { public SessionAcc createAccumulator() { return new SessionAcc(); } public SessionAcc add(ClickEvent event, SessionAcc acc) { acc.eventCount++; acc.pages.add(event.getPage()); if (acc.firstEvent == 0) acc.firstEvent = event.getTimestamp(); acc.lastEvent = event.getTimestamp(); if (event.getPage().equals("/checkout/complete")) { acc.converted = true; } return acc; } public SessionSummary getResult(SessionAcc acc) { return new SessionSummary( acc.eventCount, acc.lastEvent - acc.firstEvent, // duration acc.pages.size(), // unique pages acc.converted // conversion ); } public SessionAcc merge(SessionAcc a, SessionAcc b) { // Required for session window merging a.eventCount += b.eventCount; a.pages.addAll(b.pages); a.firstEvent = Math.min(a.firstEvent, b.firstEvent); a.lastEvent = Math.max(a.lastEvent, b.lastEvent); a.converted = a.converted || b.converted; return a; } });
Real-Time Joining
Joining streams is one of the most complex operations in stream processing. Flink supports multiple join types depending on whether you're joining two streams, a stream with a table, or events within a time interval.
| Join Type | Description | State Cost | Use Case |
|---|---|---|---|
| Interval Join | Match events within time range | Bounded (time window) | Correlate events within N minutes |
| Window Join | Join events in same window | Bounded (window size) | Combine data from same time period |
| Temporal Join | Join stream with versioned table | Table state | Enrich with latest dimension data |
| Lookup Join | Join with external table (point-in-time) | None (external lookup) | Enrich with slowly-changing data |
// Interval Join: match orders with payments within 30 minutes DataStream<OrderPayment> matched = orders .keyBy(Order::getOrderId) .intervalJoin(payments.keyBy(Payment::getOrderId)) .between(Time.seconds(0), Time.minutes(30)) .process(new ProcessJoinFunction<Order, Payment, OrderPayment>() { public void processElement(Order order, Payment payment, Context ctx, Collector<OrderPayment> out) { out.collect(new OrderPayment(order, payment)); } }); // Temporal Join (SQL): enrich with latest exchange rate // SELECT o.amount * r.rate AS usd_amount // FROM orders o // JOIN exchange_rates FOR SYSTEM_TIME AS OF o.order_time AS r // ON o.currency = r.currency;
Join State Management
Stream-stream joins buffer events from both sides in state. For interval joins, state is bounded by the time interval. For unbounded joins (no time constraint), state grows forever. Always use time-bounded joins in production or set state TTL to prevent unbounded growth.
Complex Event Processing (CEP)
FlinkCEP is a library for detecting complex event patterns in streams. It provides a pattern API for defining sequences, conditions, and time constraints — more expressive than manual ProcessFunction state machines for complex patterns.
// CEP: detect login → failed_payment → login from different IP // (potential account takeover pattern) Pattern<Event, ?> accountTakeover = Pattern .<Event>begin("login") .where(new SimpleCondition<Event>() { public boolean filter(Event event) { return event.getType().equals("LOGIN_SUCCESS"); } }) .followedBy("failed-payment") .where(new SimpleCondition<Event>() { public boolean filter(Event event) { return event.getType().equals("PAYMENT_FAILED"); } }) .times(3) // 3 failed payments .followedBy("suspicious-login") .where(new IterativeCondition<Event>() { public boolean filter(Event event, Context<Event> ctx) { Event firstLogin = ctx.getEventsForPattern("login") .iterator().next(); // Different IP than original login return event.getType().equals("LOGIN_SUCCESS") && !event.getIp().equals(firstLogin.getIp()); } }) .within(Time.minutes(10)); // All within 10 minutes // Apply pattern to stream PatternStream<Event> patternStream = CEP.pattern( events.keyBy(Event::getUserId), accountTakeover); // Extract matches DataStream<Alert> alerts = patternStream.process( new PatternProcessFunction<Event, Alert>() { public void processMatch(Map<String, List<Event>> match, Context ctx, Collector<Alert> out) { Event login = match.get("login").get(0); Event suspicious = match.get("suspicious-login").get(0); out.collect(new Alert("ACCOUNT_TAKEOVER", login.getUserId(), login.getIp(), suspicious.getIp())); } });
CEP Pattern Operators
- ✅begin() — start of pattern sequence
- ✅followedBy() — next event (allows non-matching events between)
- ✅followedByAny() — next event (non-deterministic, matches any occurrence)
- ✅next() — strict contiguity (immediately after, no events between)
- ✅times(n) — exactly n occurrences
- ✅within(time) — entire pattern must complete within time window
- ✅where() / until() — conditions on individual events
Interview Questions
Q:Design a real-time fraud detection system using Flink.
A: Architecture: Kafka (transactions) → Flink → Alerts topic + Database. Flink job: (1) Source: Kafka with event-time watermarks. (2) Key by user/account ID. (3) ProcessFunction with keyed state: store recent transaction history, known merchants, velocity counters. (4) Pattern detection: small probe → large purchase at new merchant within 5 min (timers for expiry). (5) ML scoring: async I/O to model serving endpoint. (6) Multi-output: alerts to Kafka topic, blocked transactions to enforcement service, all decisions to audit log. Key: sub-100ms latency requirement means state must be local (RocksDB), not external lookups.
Q:How would you implement a streaming ETL pipeline with exactly-once guarantees?
A: Source: Kafka with checkpointed offsets. Processing: (1) Parse/validate with dead letter queue (side output for failures). (2) Deduplicate using keyed state with TTL (store event IDs for N hours). (3) Enrich via broadcast state (dimension data) or async I/O (external lookups). (4) Transform and route by event type. Sink: Kafka with exactly-once (transactions) or JDBC with upsert (idempotent). Monitoring: track parse failure rate, dedup hit rate, enrichment latency, end-to-end lag. Checkpoints every 60s with RocksDB incremental.
Q:What are the different types of stream joins in Flink?
A: (1) Interval Join: match events from two streams within a time range (e.g., order + payment within 30 min). Bounded state. (2) Window Join: join events that fall in the same window. Both streams windowed identically. (3) Temporal Join: join stream with a versioned table (FOR SYSTEM_TIME AS OF). Gets the table value valid at the event's timestamp. (4) Lookup Join: point-in-time lookup to external system (async I/O). No state, but external dependency. Key consideration: all stream-stream joins require buffering — use time bounds to limit state growth.
Q:Explain the CDC processing pattern with Flink.
A: Pattern: Database → Debezium → Kafka → Flink → Serving Layer. Debezium captures WAL changes (INSERT/UPDATE/DELETE) and publishes to Kafka in a structured format. Flink consumes with CDC format (debezium-json), which understands change operations. Use cases: (1) Real-time materialized views — Flink SQL GROUP BY continuously maintains aggregations. (2) Search index sync — transform and write to Elasticsearch. (3) Cross-database replication — transform and write to different database. Key advantage: Flink SQL treats CDC streams as updating tables, making complex joins and aggregations natural.
Q:When would you use FlinkCEP vs a custom ProcessFunction?
A: FlinkCEP: when you need to detect SEQUENCES of events with complex ordering (followedBy, next, times), time constraints (within), and iterative conditions (reference earlier events in pattern). Examples: account takeover patterns, multi-step fraud, SLA violation sequences. ProcessFunction: when patterns are simpler (single condition + timer), when you need full control over state management, or when CEP's pattern language can't express your logic. ProcessFunction is also better for high-throughput simple patterns (less overhead than CEP's NFA engine).
Common Mistakes
Unbounded state in stream-stream joins
Joining two streams without a time bound. Both sides buffer all events forever, waiting for potential matches. State grows until OOM.
✅Always use time-bounded joins: interval join with explicit time range, or window join. If you need unbounded joins, set state TTL to limit how long events are buffered. Accept that very late matches will be missed.
Dropping invalid events silently
Using filter() to discard unparseable or invalid events. These events disappear without trace — you can't debug data quality issues or recover them later.
✅Route invalid events to a dead letter queue (side output → separate Kafka topic). Monitor DLQ volume. This gives you visibility into data quality issues and the ability to reprocess fixed events later.
Not deduplicating in streaming ETL
Assuming events are unique. In practice, Kafka producers retry, upstream systems send duplicates, and Flink reprocessing after failures creates duplicates. Without dedup, downstream counts are inflated.
✅Add a deduplication step: keyBy(eventId), store seen IDs in MapState with TTL. Check if ID exists before processing. Set TTL to your maximum expected duplicate window (e.g., 1-24 hours depending on source guarantees).
CEP patterns without time constraints
Defining a CEP pattern without .within(time). The pattern matcher keeps partial matches in state indefinitely, waiting for the next event in the sequence. State grows unbounded.
✅Always add .within(Time.minutes(N)) to CEP patterns. This bounds how long partial matches are kept. Choose N based on your business logic — if the pattern hasn't completed in N minutes, it's not a valid match.