State Management
State is what makes Flink powerful — it enables computations that remember information across events. From simple counters to terabytes of keyed state, Flink provides fault-tolerant, scalable state management with multiple backends and lifecycle controls.
Table of Contents
What State Is
State is any information that an operator needs to remember between processing individual events. Without state, each event is processed in isolation — you can filter and transform, but you can't count, aggregate, detect patterns, or join streams.
Short-Term Memory
State is like short-term memory for your stream processor. A stateless operator is like a person who forgets everything after each sentence — they can repeat what you said (map) or ignore boring parts (filter), but they can't summarize a conversation (aggregate) or notice you've said the same thing three times (pattern detection). State gives operators memory that persists across events.
Stateless operations (no memory needed): map: event → transform(event) filter: event → keep/discard flatMap: event → [event1, event2, ...] Stateful operations (need memory): count: need to remember running count sum: need to remember running total average: need count + sum distinct: need set of seen values join: need to buffer one stream while waiting for matches pattern: need to remember sequence of events session: need to track activity per user dedup: need to remember processed IDs State in Flink: ✅ Local to each operator instance (fast access) ✅ Fault-tolerant (included in checkpoints) ✅ Scalable (distributed across TaskManagers) ✅ Queryable (can be read externally) ✅ Supports TTL (automatic expiry)
State is Local
Each parallel instance of an operator has its own state. State is never shared between parallel instances — this is what makes Flink scalable. For keyed state, Flink guarantees that all events with the same key go to the same operator instance, so the state for that key is always local.
Keyed State
Keyed state is the most common state type. It's partitioned by key — each key has its own independent state. You access it in functions that operate on keyed streams (after keyBy). Flink provides several state primitives for different data structures.
| State Type | Structure | Use Case | Access Pattern |
|---|---|---|---|
| ValueState<T> | Single value per key | Running total, last seen event | get(), update(), clear() |
| ListState<T> | List of values per key | Event buffer, history | add(), get() → Iterable, clear() |
| MapState<K,V> | Map of key-value pairs per key | Per-key lookup table, counters by category | get(k), put(k,v), entries(), clear() |
| ReducingState<T> | Single value, auto-reduced on add | Running aggregation | add() triggers reduce, get() |
| AggregatingState<IN,OUT> | Accumulator pattern | Complex aggregations | add(IN), get() → OUT |
public class FraudDetector extends KeyedProcessFunction<String, Transaction, Alert> { // State: last transaction amount for this user private ValueState<Double> lastAmountState; // State: count of transactions in current window private ValueState<Integer> txCountState; // State: set of merchants seen by this user private MapState<String, Boolean> seenMerchantsState; @Override public void open(Configuration params) { // Declare state with descriptors (name + type) lastAmountState = getRuntimeContext().getState( new ValueStateDescriptor<>("last-amount", Double.class)); txCountState = getRuntimeContext().getState( new ValueStateDescriptor<>("tx-count", Integer.class)); seenMerchantsState = getRuntimeContext().getMapState( new MapStateDescriptor<>("seen-merchants", String.class, Boolean.class)); } @Override public void processElement(Transaction tx, Context ctx, Collector<Alert> out) throws Exception { Double lastAmount = lastAmountState.value(); Integer count = txCountState.value(); // Fraud pattern: small tx followed by large tx at new merchant if (lastAmount != null && lastAmount < 1.0 && tx.getAmount() > 500.0 && !seenMerchantsState.contains(tx.getMerchant())) { out.collect(new Alert(tx.getUserId(), "FRAUD_SUSPECTED")); } // Update state lastAmountState.update(tx.getAmount()); txCountState.update((count == null ? 0 : count) + 1); seenMerchantsState.put(tx.getMerchant(), true); } }
State Descriptor Names Matter
State descriptor names (e.g., "last-amount") are used to identify state during checkpointing and recovery. If you rename a state descriptor, Flink cannot restore the old state — it's treated as new, empty state. Plan your state names carefully and never rename them in production.
Operator State
Operator state is not partitioned by key — it belongs to the operator instance as a whole. It's used in source/sink connectors and non-keyed operators. During rescaling, operator state must be explicitly redistributed.
// Operator state: used in sources/sinks for offset tracking public class CountingSource implements SourceFunction<Long>, CheckpointedFunction { private long count = 0; private volatile boolean running = true; // Operator state: survives failures private ListState<Long> checkpointedCount; @Override public void snapshotState(FunctionSnapshotContext ctx) throws Exception { // Save current count to state during checkpoint checkpointedCount.clear(); checkpointedCount.add(count); } @Override public void initializeState(FunctionInitializationContext ctx) throws Exception { // Restore state on recovery checkpointedCount = ctx.getOperatorStateStore() .getListState(new ListStateDescriptor<>( "count", Long.class)); // On recovery, restore from checkpoint if (ctx.isRestored()) { for (Long value : checkpointedCount.get()) { count = value; } } } @Override public void run(SourceContext<Long> ctx) throws Exception { while (running) { synchronized (ctx.getCheckpointLock()) { ctx.collect(count++); } } } }
| Redistribution Mode | Behavior on Rescale | Use Case |
|---|---|---|
| Even-split (ListState) | List split evenly across new instances | Kafka offsets, file positions |
| Union (UnionListState) | Each instance gets the FULL list | Global configuration, lookup tables |
| Broadcast | Each instance gets identical copy | Rules, ML models, config updates |
Keyed vs Operator State
Use keyed state (after keyBy) for per-entity data — user profiles, session state, per-device counters. Use operator state for infrastructure concerns — source offsets, sink buffers, connector metadata. Most application logic uses keyed state; operator state is primarily for connector development.
Broadcast State Pattern
The broadcast state pattern connects a low-throughput control stream (rules, config) with a high-throughput data stream. The control stream is broadcast to all parallel instances, ensuring every instance has the same rules. The data stream is processed against these rules.
// Pattern: Dynamic fraud rules applied to transaction stream // 1. Define the broadcast state descriptor MapStateDescriptor<String, FraudRule> ruleStateDesc = new MapStateDescriptor<>("fraud-rules", String.class, FraudRule.class); // 2. Create broadcast stream from rules topic BroadcastStream<FraudRule> ruleStream = env .fromSource(kafkaRulesSource, WatermarkStrategy.noWatermarks(), "Rules Source") .broadcast(ruleStateDesc); // 3. Connect data stream with broadcast stream DataStream<Alert> alerts = transactions .keyBy(Transaction::getUserId) .connect(ruleStream) .process(new BroadcastProcessFunction<Transaction, FraudRule, Alert>() { @Override public void processElement(Transaction tx, ReadOnlyContext ctx, Collector<Alert> out) throws Exception { // Read-only access to broadcast state ReadOnlyBroadcastState<String, FraudRule> rules = ctx.getBroadcastState(ruleStateDesc); // Apply all active rules to this transaction for (Map.Entry<String, FraudRule> entry : rules.immutableEntries()) { if (entry.getValue().matches(tx)) { out.collect(new Alert(tx.getUserId(), entry.getKey())); } } } @Override public void processBroadcastElement(FraudRule rule, Context ctx, Collector<Alert> out) throws Exception { // Read-write access to broadcast state BroadcastState<String, FraudRule> state = ctx.getBroadcastState(ruleStateDesc); state.put(rule.getId(), rule); } });
Broadcast State Use Cases
- ✅Dynamic fraud rules — update detection rules without restarting the job
- ✅Feature flags — enable/disable features across all parallel instances
- ✅ML model updates — push new model weights to all scoring instances
- ✅Configuration changes — update thresholds, timeouts, or routing rules at runtime
- ✅Blocklists — broadcast updated IP/user blocklists to all filter instances
Broadcast State Guarantees
Broadcast state ensures all parallel instances see the same rules, but the ORDER in which rules and data events are processed is not guaranteed across instances. If rule ordering matters, include a version/timestamp in your rules and handle ordering in your process function.
State Backends
The state backend determines how state is stored and accessed. Flink provides two production backends: HashMapStateBackend (in-memory, fast) and EmbeddedRocksDBStateBackend (on-disk, scalable). The choice depends on your state size and latency requirements.
| Property | HashMapStateBackend | EmbeddedRocksDBStateBackend |
|---|---|---|
| Storage | JVM heap (in-memory) | Local disk (RocksDB embedded DB) |
| State size limit | Limited by heap (typically <10 GB) | Limited by disk (TBs possible) |
| Access latency | ~nanoseconds (memory access) | ~microseconds (disk + deserialization) |
| Serialization | Only during checkpoint | On every read/write (state stored serialized) |
| Checkpoint type | Full snapshot | Full or incremental |
| GC pressure | High (objects on heap) | Low (data off-heap in RocksDB) |
| Best for | Small state, low latency | Large state, production workloads |
// Option 1: HashMapStateBackend (small state, fast access) env.setStateBackend(new HashMapStateBackend()); env.getCheckpointConfig().setCheckpointStorage( "s3://checkpoints/job-1"); // Option 2: EmbeddedRocksDBStateBackend (large state, production) EmbeddedRocksDBStateBackend rocksDB = new EmbeddedRocksDBStateBackend(true); // incremental checkpoints env.setStateBackend(rocksDB); env.getCheckpointConfig().setCheckpointStorage( "s3://checkpoints/job-1"); // RocksDB tuning via flink-conf.yaml: // state.backend.rocksdb.memory.managed: true // state.backend.rocksdb.block.cache-size: 256mb // state.backend.rocksdb.writebuffer.size: 128mb // state.backend.rocksdb.writebuffer.count: 4
Production Default: RocksDB
For production, always use EmbeddedRocksDBStateBackend with incremental checkpoints. It handles state growth gracefully (spills to disk), produces smaller checkpoints (only diffs), and doesn't cause GC pressure. The microsecond access overhead is negligible for most streaming workloads. Use HashMapStateBackend only for development or when state is guaranteed to be tiny.
State TTL & Cleanup
Without TTL, state grows forever — every key that was ever seen remains in state. For use cases with unbounded key spaces (user IDs, session IDs), this leads to unbounded state growth and eventual OOM. State TTL automatically expires entries after a configurable duration.
// Configure TTL on a state descriptor StateTtlConfig ttlConfig = StateTtlConfig .newBuilder(Time.hours(24)) // Expire after 24 hours .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite) .setStateVisibility( StateTtlConfig.StateVisibility.NeverReturnExpired) .cleanupFullSnapshot() // Clean during full checkpoints .cleanupInRocksdbCompaction() // Clean during RocksDB compaction .build(); ValueStateDescriptor<UserProfile> descriptor = new ValueStateDescriptor<>("user-profile", UserProfile.class); descriptor.enableTimeToLive(ttlConfig); // UpdateType options: // OnCreateAndWrite — reset TTL on create and every update // OnReadAndWrite — reset TTL on any access (read or write) // StateVisibility options: // NeverReturnExpired — expired state returns null (safe default) // ReturnExpiredIfNotCleanedUp — may return stale data (faster) // Cleanup strategies: // cleanupFullSnapshot() — remove during checkpoint (batch) // cleanupIncrementally(10, true) — check N entries per access // cleanupInRocksdbCompaction() — remove during RocksDB compaction
When to Use State TTL
- ✅Unbounded key spaces — user IDs, session IDs, device IDs that grow forever
- ✅Deduplication state — remember processed IDs for N hours, then forget
- ✅Session state — expire inactive sessions after timeout
- ✅Cache state — enrichment data that should refresh periodically
- ✅Fraud detection — only consider recent transaction history
TTL + RocksDB Compaction
The most efficient cleanup strategy for RocksDB is cleanupInRocksdbCompaction(). It piggybacks on RocksDB's natural compaction process — expired entries are removed when SST files are compacted, with zero additional I/O. Combine with cleanupIncrementally() for faster cleanup of hot keys.
State Serialization & Schema Evolution
State must be serialized for checkpoints and (with RocksDB) for every access. When you update your application, state schemas may change. Flink supports schema evolution for certain serializers, allowing you to add fields without losing existing state.
State Schema Evolution Support: POJO serializer (recommended for evolvable state): ✅ Add new fields (initialized to default value) ✅ Remove fields (silently dropped) ❌ Cannot change field types ❌ Cannot rename fields Rules: must have no-arg constructor, public fields or getters/setters Avro serializer: ✅ Full Avro schema evolution rules ✅ Add fields with defaults ✅ Remove fields ✅ Promote types (int → long) Best for complex, frequently-evolving state schemas Kryo serializer (default fallback): ❌ No schema evolution support ❌ Any change requires state migration ⚠️ Avoid for production state — use POJOs or Avro TypeInformation (Flink-native types): ✅ Primitives, Tuples, Row types ❌ Limited evolution support Best for simple state (counters, timestamps) Migration strategy when schema evolution isn't supported: 1. Take savepoint with old job 2. Write state migration tool (read old format, write new) 3. Start new job from migrated savepoint
Use POJOs for State
Define your state types as POJOs (plain Java objects with no-arg constructor and public fields/getters). Flink's POJO serializer supports adding and removing fields — the most common schema changes. Avoid Kryo (the fallback serializer) for any state that needs to survive upgrades. Check logs for "falling back to Kryo" warnings.
Interview Questions
Q:What's the difference between keyed state and operator state?
A: Keyed state is partitioned by key — each key has independent state, accessed after keyBy(). It's for per-entity data (user profiles, session state). Operator state belongs to the operator instance as a whole, not partitioned by key. It's for infrastructure concerns (source offsets, sink buffers). On rescale: keyed state is automatically redistributed by key; operator state requires explicit redistribution strategy (even-split, union, or broadcast). Most application logic uses keyed state.
Q:Compare HashMapStateBackend and EmbeddedRocksDBStateBackend.
A: HashMapStateBackend: stores state as Java objects on JVM heap. Nanosecond access, but limited by heap size (typically <10 GB), causes GC pressure, and only supports full checkpoints. EmbeddedRocksDBStateBackend: stores state serialized in RocksDB (local disk). Microsecond access (serialization overhead), but supports TBs of state, minimal GC pressure, and incremental checkpoints. Production recommendation: always RocksDB with incremental checkpoints. Use HashMap only for development or guaranteed-tiny state.
Q:How does the broadcast state pattern work?
A: It connects a low-throughput control stream (rules, config) with a high-throughput data stream. The control stream is broadcast to ALL parallel instances via BroadcastStream, ensuring every instance has identical rules. Data stream elements are processed against these rules. Key detail: the data-processing side has READ-ONLY access to broadcast state; only the broadcast-processing side can write. Use cases: dynamic fraud rules, feature flags, ML model updates, blocklists — anything where you need to update processing logic without restarting the job.
Q:Why is state TTL important and how does it work?
A: Without TTL, state grows forever — every key ever seen remains in state. For unbounded key spaces (user IDs), this causes OOM. State TTL automatically expires entries after a configurable duration. Configuration: (1) Duration (e.g., 24 hours). (2) UpdateType — reset TTL on write only, or on any access. (3) Visibility — return expired data or null. (4) Cleanup strategy — during checkpoints, incrementally per access, or during RocksDB compaction. Best practice: use TTL for any state with unbounded keys, and cleanupInRocksdbCompaction() for efficient cleanup.
Q:How does Flink handle state schema evolution?
A: When you update your application, state types may change. Flink supports evolution for: (1) POJOs — can add/remove fields (not rename or change types). (2) Avro — full Avro evolution rules (add with defaults, promote types). (3) Kryo — NO evolution support (any change breaks). Strategy: use POJOs for state types, ensure no-arg constructors, avoid Kryo. For breaking changes: take savepoint, write migration tool to transform state, restore from migrated savepoint. Always test state compatibility before production upgrades.
Common Mistakes
Not setting state TTL for unbounded key spaces
Processing events keyed by user_id without TTL. Over months, state accumulates entries for millions of users who will never return. State grows until OOM or checkpoint timeout.
✅Always set state TTL for unbounded key spaces. Choose a TTL based on your business logic — 24 hours for session data, 7 days for fraud detection history, etc. Use cleanupInRocksdbCompaction() for efficient background cleanup.
Using HashMapStateBackend in production
Deploying with the default HashMap backend. State grows beyond heap, causing GC storms and OOM. Full checkpoints become huge and slow.
✅Always use EmbeddedRocksDBStateBackend with incremental checkpoints in production. It handles state growth gracefully, produces smaller checkpoint diffs, and doesn't cause GC pressure.
Renaming state descriptors
Renaming a state descriptor from 'user-count' to 'user-event-count' during an upgrade. Flink can't find the old state under the new name — it starts fresh, losing all accumulated state.
✅Never rename state descriptors in production. If you must restructure state, use a savepoint-based migration: read old state with old descriptor name, write to new descriptor, then deploy the new version.
Storing large objects in ValueState
Storing a List<Event> with thousands of elements in a single ValueState. With RocksDB, the entire list is serialized/deserialized on every access — even to add one element.
✅Use ListState for collections (append-only, no full deserialization on add) or MapState for key-value lookups. Only use ValueState for single values or small objects that are always accessed as a whole.
Falling back to Kryo serialization
Using complex types (third-party classes, generics) that Flink can't serialize natively. It silently falls back to Kryo — slower serialization, no schema evolution, larger checkpoints.
✅Check logs for 'Falling back to Kryo' warnings. Define state types as POJOs (no-arg constructor, public fields). Register custom serializers for third-party types. Use TypeInformation hints for generics.