ETLFraud DetectionCDCCEPStream JoinsSession Analysis

Common Patterns

Real-world Flink applications follow recurring architectural patterns. From real-time aggregation pipelines to fraud detection with stateful pattern matching, these patterns form the building blocks of production streaming systems.

45 min read9 sections
01

Real-Time Aggregation Pipeline

The most common Flink pattern: consume events from Kafka, aggregate in real-time windows, and write results to a serving layer (Redis, Elasticsearch, or a database) for low-latency queries.

1

Ingest from Kafka

Kafka source with event-time watermarks and per-partition tracking. Multiple topics if needed.

2

Filter & Transform

Validate events, parse fields, filter invalid data. Stateless operations chained for efficiency.

3

Key & Window

keyBy dimension (user, region, product), apply tumbling/sliding window, aggregate with incremental function.

4

Write to Serving Layer

Sink aggregated results to Redis (low-latency lookups), Elasticsearch (search/dashboards), or PostgreSQL (reporting).

aggregation-pipeline.javajava
// Real-time revenue dashboard: per-region, per-minute
DataStream<RegionRevenue> pipeline = env
    .fromSource(kafkaSource, watermarkStrategy, "Orders")
    .filter(order -> order.getStatus().equals("COMPLETED"))
    .keyBy(Order::getRegion)
    .window(TumblingEventTimeWindows.of(Time.minutes(1)))
    .aggregate(new AggregateFunction<Order, RevenueAcc, RegionRevenue>() {
        public RevenueAcc createAccumulator() {
            return new RevenueAcc(0L, 0.0, 0L);
        }
        public RevenueAcc add(Order order, RevenueAcc acc) {
            return new RevenueAcc(
                acc.count + 1,
                acc.revenue + order.getAmount(),
                Math.max(acc.maxOrder, order.getAmount()));
        }
        public RegionRevenue getResult(RevenueAcc acc) {
            return new RegionRevenue(acc.count, acc.revenue, acc.maxOrder);
        }
        public RevenueAcc merge(RevenueAcc a, RevenueAcc b) {
            return new RevenueAcc(
                a.count + b.count, a.revenue + b.revenue,
                Math.max(a.maxOrder, b.maxOrder));
        }
    });

// Dual sink: Redis for real-time + Kafka for downstream
pipeline.addSink(new RedisSink());
pipeline.sinkTo(kafkaResultSink);

Dual Sink Pattern

Write aggregated results to both a serving layer (Redis for real-time dashboards) and a message bus (Kafka for downstream consumers). This decouples the real-time serving from further processing and provides a replay mechanism.

02

Streaming ETL

Streaming ETL continuously transforms, validates, and enriches data as it flows through the pipeline. Unlike batch ETL that runs periodically, streaming ETL provides near-real-time data freshness in the target system.

streaming-etl.javajava
// Streaming ETL: raw events → cleaned, enriched, partitioned data lake
DataStream<RawEvent> raw = env.fromSource(kafkaSource, wms, "Raw Events");

// Extract: parse and validate
DataStream<ParsedEvent> parsed = raw
    .flatMap(new EventParser())  // parse JSON, handle malformed
    .name("Parse Events");

// Transform: normalize, deduplicate, enrich
OutputTag<ParsedEvent> deadLetterTag = new OutputTag<>("dead-letter") {};

SingleOutputStreamOperator<EnrichedEvent> enriched = parsed
    .keyBy(ParsedEvent::getEventId)
    .process(new DeduplicationFunction(Duration.ofHours(1)))  // dedup
    .keyBy(ParsedEvent::getUserId)
    .process(new EnrichmentFunction())  // add user metadata from state
    .name("Enrich");

// Load: write to multiple destinations based on event type
enriched
    .filter(event -> event.getType().equals("PURCHASE"))
    .sinkTo(purchasesSink);  // → purchases topic

enriched
    .filter(event -> event.getType().equals("PAGEVIEW"))
    .sinkTo(pageviewsSink);  // → pageviews topic

// Dead letter queue for failed events
enriched.getSideOutput(deadLetterTag)
    .sinkTo(deadLetterSink);  // → DLQ for investigation

Streaming ETL Best Practices

  • Dead letter queue — route unparseable/invalid events to a DLQ instead of dropping them
  • Deduplication — use keyed state with TTL to detect and remove duplicate events
  • Schema validation — validate against schema registry before processing
  • Idempotent writes — use upsert sinks so reprocessing doesn't create duplicates
  • Monitoring — track parse failure rate, enrichment cache hit rate, end-to-end latency
03

Fraud Detection

Fraud detection requires stateful pattern matching across events — detecting sequences like "small transaction followed by large transaction at a new merchant within 5 minutes." Flink's keyed state and timers make this natural.

fraud-detection.javajava
// Fraud pattern: small tx → large tx at new merchant within 5 min
public class FraudDetector
    extends KeyedProcessFunction<String, Transaction, Alert> {

    private ValueState<Transaction> lastSmallTxState;
    private ValueState<Long> timerState;
    private MapState<String, Boolean> knownMerchantsState;

    @Override
    public void open(Configuration params) {
        lastSmallTxState = getRuntimeContext().getState(
            new ValueStateDescriptor<>("small-tx", Transaction.class));
        timerState = getRuntimeContext().getState(
            new ValueStateDescriptor<>("timer", Long.class));
        knownMerchantsState = getRuntimeContext().getMapState(
            new MapStateDescriptor<>("merchants", String.class, Boolean.class));
    }

    @Override
    public void processElement(Transaction tx, Context ctx,
                               Collector<Alert> out) throws Exception {
        Transaction lastSmall = lastSmallTxState.value();

        if (tx.getAmount() < 1.00) {
            // Small transaction — potential probe
            lastSmallTxState.update(tx);
            // Set 5-minute timer
            long timer = tx.getTimestamp() + 5 * 60 * 1000;
            ctx.timerService().registerEventTimeTimer(timer);
            timerState.update(timer);

        } else if (tx.getAmount() > 500.00 && lastSmall != null) {
            // Large transaction after small — check merchant
            if (!knownMerchantsState.contains(tx.getMerchant())) {
                out.collect(new Alert(ctx.getCurrentKey(),
                    "PROBE_THEN_LARGE_NEW_MERCHANT",
                    lastSmall, tx));
            }
            cleanup();
        }

        // Track known merchants
        knownMerchantsState.put(tx.getMerchant(), true);
    }

    @Override
    public void onTimer(long timestamp, OnTimerContext ctx,
                        Collector<Alert> out) {
        // 5 minutes passed without large tx — clear state
        cleanup();
    }

    private void cleanup() {
        lastSmallTxState.clear();
        timerState.clear();
    }
}

Fraud Detection at Scale

At companies like Alibaba and PayPal, Flink processes millions of transactions per second for fraud detection. The key: state is partitioned by user/account, so each parallel instance handles a subset of users independently. Patterns are evaluated per-key with sub-millisecond latency.

04

Change Data Capture Processing

CDC captures database changes (INSERT, UPDATE, DELETE) as a stream of events. Flink processes these changes in real-time — materializing views, syncing to search indexes, or triggering downstream workflows.

cdc-pipeline.txttext
CDC Pipeline Architecture:

PostgreSQLDebeziumKafkaFlinkElasticsearch/Redis

1. Debezium captures WAL changes from PostgreSQL
2. Publishes change events to Kafka (one topic per table)
3. Flink consumes change events with CDC format
4. Processes: join tables, transform, aggregate
5. Writes materialized view to serving layer

Change event format (Debezium):
{
  "op": "u",           // c=create, u=update, d=delete
  "before": { "id": 1, "status": "pending", "amount": 50 },
  "after":  { "id": 1, "status": "shipped", "amount": 50 },
  "source": { "table": "orders", "lsn": "0/1234" }
}
cdc-flink-sql.sqlsql
-- Flink SQL: CDC source from Kafka (Debezium format)
CREATE TABLE orders_cdc (
    id INT,
    user_id INT,
    status STRING,
    amount DECIMAL(10,2),
    updated_at TIMESTAMP(3),
    PRIMARY KEY (id) NOT ENFORCED
) WITH (
    'connector' = 'kafka',
    'topic' = 'dbserver1.public.orders',
    'properties.bootstrap.servers' = 'kafka:9092',
    'format' = 'debezium-json'
);

-- Materialized view: real-time order stats per user
CREATE TABLE user_order_stats (
    user_id INT,
    total_orders BIGINT,
    total_revenue DECIMAL(10,2),
    PRIMARY KEY (user_id) NOT ENFORCED
) WITH (
    'connector' = 'jdbc',
    'url' = 'jdbc:postgresql://host:5432/analytics',
    'table-name' = 'user_order_stats'
);

INSERT INTO user_order_stats
SELECT
    user_id,
    COUNT(*) AS total_orders,
    SUM(amount) AS total_revenue
FROM orders_cdc
WHERE status = 'completed'
GROUP BY user_id;

CDC + Flink SQL

Flink SQL with CDC connectors (debezium-json, canal-json) is the simplest way to build real-time materialized views. Define CDC source tables, write a GROUP BY query, and Flink continuously maintains the aggregation as the source database changes. No custom code needed.

05

Session Analysis

Session analysis groups user events into activity sessions separated by inactivity gaps. It answers questions like "how long do users spend per session?" and "what's the conversion funnel within a session?"

session-analysis.javajava
// Session analysis: compute session metrics
DataStream<SessionSummary> sessions = clickEvents
    .keyBy(ClickEvent::getUserId)
    .window(EventTimeSessionWindows.withGap(Time.minutes(30)))
    .aggregate(
        new AggregateFunction<ClickEvent, SessionAcc, SessionSummary>() {
            public SessionAcc createAccumulator() {
                return new SessionAcc();
            }
            public SessionAcc add(ClickEvent event, SessionAcc acc) {
                acc.eventCount++;
                acc.pages.add(event.getPage());
                if (acc.firstEvent == 0) acc.firstEvent = event.getTimestamp();
                acc.lastEvent = event.getTimestamp();
                if (event.getPage().equals("/checkout/complete")) {
                    acc.converted = true;
                }
                return acc;
            }
            public SessionSummary getResult(SessionAcc acc) {
                return new SessionSummary(
                    acc.eventCount,
                    acc.lastEvent - acc.firstEvent,  // duration
                    acc.pages.size(),                  // unique pages
                    acc.converted                     // conversion
                );
            }
            public SessionAcc merge(SessionAcc a, SessionAcc b) {
                // Required for session window merging
                a.eventCount += b.eventCount;
                a.pages.addAll(b.pages);
                a.firstEvent = Math.min(a.firstEvent, b.firstEvent);
                a.lastEvent = Math.max(a.lastEvent, b.lastEvent);
                a.converted = a.converted || b.converted;
                return a;
            }
        });
06

Real-Time Joining

Joining streams is one of the most complex operations in stream processing. Flink supports multiple join types depending on whether you're joining two streams, a stream with a table, or events within a time interval.

Join TypeDescriptionState CostUse Case
Interval JoinMatch events within time rangeBounded (time window)Correlate events within N minutes
Window JoinJoin events in same windowBounded (window size)Combine data from same time period
Temporal JoinJoin stream with versioned tableTable stateEnrich with latest dimension data
Lookup JoinJoin with external table (point-in-time)None (external lookup)Enrich with slowly-changing data
stream-joins.javajava
// Interval Join: match orders with payments within 30 minutes
DataStream<OrderPayment> matched = orders
    .keyBy(Order::getOrderId)
    .intervalJoin(payments.keyBy(Payment::getOrderId))
    .between(Time.seconds(0), Time.minutes(30))
    .process(new ProcessJoinFunction<Order, Payment, OrderPayment>() {
        public void processElement(Order order, Payment payment,
                Context ctx, Collector<OrderPayment> out) {
            out.collect(new OrderPayment(order, payment));
        }
    });

// Temporal Join (SQL): enrich with latest exchange rate
// SELECT o.amount * r.rate AS usd_amount
// FROM orders o
// JOIN exchange_rates FOR SYSTEM_TIME AS OF o.order_time AS r
// ON o.currency = r.currency;

Join State Management

Stream-stream joins buffer events from both sides in state. For interval joins, state is bounded by the time interval. For unbounded joins (no time constraint), state grows forever. Always use time-bounded joins in production or set state TTL to prevent unbounded growth.

07

Complex Event Processing (CEP)

FlinkCEP is a library for detecting complex event patterns in streams. It provides a pattern API for defining sequences, conditions, and time constraints — more expressive than manual ProcessFunction state machines for complex patterns.

flink-cep.javajava
// CEP: detect login → failed_payment → login from different IP
// (potential account takeover pattern)

Pattern<Event, ?> accountTakeover = Pattern
    .<Event>begin("login")
        .where(new SimpleCondition<Event>() {
            public boolean filter(Event event) {
                return event.getType().equals("LOGIN_SUCCESS");
            }
        })
    .followedBy("failed-payment")
        .where(new SimpleCondition<Event>() {
            public boolean filter(Event event) {
                return event.getType().equals("PAYMENT_FAILED");
            }
        })
        .times(3)  // 3 failed payments
    .followedBy("suspicious-login")
        .where(new IterativeCondition<Event>() {
            public boolean filter(Event event, Context<Event> ctx) {
                Event firstLogin = ctx.getEventsForPattern("login")
                    .iterator().next();
                // Different IP than original login
                return event.getType().equals("LOGIN_SUCCESS")
                    && !event.getIp().equals(firstLogin.getIp());
            }
        })
    .within(Time.minutes(10));  // All within 10 minutes

// Apply pattern to stream
PatternStream<Event> patternStream = CEP.pattern(
    events.keyBy(Event::getUserId), accountTakeover);

// Extract matches
DataStream<Alert> alerts = patternStream.process(
    new PatternProcessFunction<Event, Alert>() {
        public void processMatch(Map<String, List<Event>> match,
                Context ctx, Collector<Alert> out) {
            Event login = match.get("login").get(0);
            Event suspicious = match.get("suspicious-login").get(0);
            out.collect(new Alert("ACCOUNT_TAKEOVER",
                login.getUserId(), login.getIp(),
                suspicious.getIp()));
        }
    });

CEP Pattern Operators

  • begin() — start of pattern sequence
  • followedBy() — next event (allows non-matching events between)
  • followedByAny() — next event (non-deterministic, matches any occurrence)
  • next() — strict contiguity (immediately after, no events between)
  • times(n) — exactly n occurrences
  • within(time) — entire pattern must complete within time window
  • where() / until() — conditions on individual events
08

Interview Questions

Q:Design a real-time fraud detection system using Flink.

A: Architecture: Kafka (transactions) → Flink → Alerts topic + Database. Flink job: (1) Source: Kafka with event-time watermarks. (2) Key by user/account ID. (3) ProcessFunction with keyed state: store recent transaction history, known merchants, velocity counters. (4) Pattern detection: small probe → large purchase at new merchant within 5 min (timers for expiry). (5) ML scoring: async I/O to model serving endpoint. (6) Multi-output: alerts to Kafka topic, blocked transactions to enforcement service, all decisions to audit log. Key: sub-100ms latency requirement means state must be local (RocksDB), not external lookups.

Q:How would you implement a streaming ETL pipeline with exactly-once guarantees?

A: Source: Kafka with checkpointed offsets. Processing: (1) Parse/validate with dead letter queue (side output for failures). (2) Deduplicate using keyed state with TTL (store event IDs for N hours). (3) Enrich via broadcast state (dimension data) or async I/O (external lookups). (4) Transform and route by event type. Sink: Kafka with exactly-once (transactions) or JDBC with upsert (idempotent). Monitoring: track parse failure rate, dedup hit rate, enrichment latency, end-to-end lag. Checkpoints every 60s with RocksDB incremental.

Q:What are the different types of stream joins in Flink?

A: (1) Interval Join: match events from two streams within a time range (e.g., order + payment within 30 min). Bounded state. (2) Window Join: join events that fall in the same window. Both streams windowed identically. (3) Temporal Join: join stream with a versioned table (FOR SYSTEM_TIME AS OF). Gets the table value valid at the event's timestamp. (4) Lookup Join: point-in-time lookup to external system (async I/O). No state, but external dependency. Key consideration: all stream-stream joins require buffering — use time bounds to limit state growth.

Q:Explain the CDC processing pattern with Flink.

A: Pattern: Database → Debezium → Kafka → Flink → Serving Layer. Debezium captures WAL changes (INSERT/UPDATE/DELETE) and publishes to Kafka in a structured format. Flink consumes with CDC format (debezium-json), which understands change operations. Use cases: (1) Real-time materialized views — Flink SQL GROUP BY continuously maintains aggregations. (2) Search index sync — transform and write to Elasticsearch. (3) Cross-database replication — transform and write to different database. Key advantage: Flink SQL treats CDC streams as updating tables, making complex joins and aggregations natural.

Q:When would you use FlinkCEP vs a custom ProcessFunction?

A: FlinkCEP: when you need to detect SEQUENCES of events with complex ordering (followedBy, next, times), time constraints (within), and iterative conditions (reference earlier events in pattern). Examples: account takeover patterns, multi-step fraud, SLA violation sequences. ProcessFunction: when patterns are simpler (single condition + timer), when you need full control over state management, or when CEP's pattern language can't express your logic. ProcessFunction is also better for high-throughput simple patterns (less overhead than CEP's NFA engine).

09

Common Mistakes

♾️

Unbounded state in stream-stream joins

Joining two streams without a time bound. Both sides buffer all events forever, waiting for potential matches. State grows until OOM.

Always use time-bounded joins: interval join with explicit time range, or window join. If you need unbounded joins, set state TTL to limit how long events are buffered. Accept that very late matches will be missed.

🗑️

Dropping invalid events silently

Using filter() to discard unparseable or invalid events. These events disappear without trace — you can't debug data quality issues or recover them later.

Route invalid events to a dead letter queue (side output → separate Kafka topic). Monitor DLQ volume. This gives you visibility into data quality issues and the ability to reprocess fixed events later.

🔄

Not deduplicating in streaming ETL

Assuming events are unique. In practice, Kafka producers retry, upstream systems send duplicates, and Flink reprocessing after failures creates duplicates. Without dedup, downstream counts are inflated.

Add a deduplication step: keyBy(eventId), store seen IDs in MapState with TTL. Check if ID exists before processing. Set TTL to your maximum expected duplicate window (e.g., 1-24 hours depending on source guarantees).

CEP patterns without time constraints

Defining a CEP pattern without .within(time). The pattern matcher keeps partial matches in state indefinitely, waiting for the next event in the sequence. State grows unbounded.

Always add .within(Time.minutes(N)) to CEP patterns. This bounds how long partial matches are kept. Choose N based on your business logic — if the pattern hasn't completed in N minutes, it's not a valid match.