ValueStateListStateMapStateRocksDBBroadcast StateTTL

State Management

State is what makes Flink powerful — it enables computations that remember information across events. From simple counters to terabytes of keyed state, Flink provides fault-tolerant, scalable state management with multiple backends and lifecycle controls.

45 min read9 sections
01

What State Is

State is any information that an operator needs to remember between processing individual events. Without state, each event is processed in isolation — you can filter and transform, but you can't count, aggregate, detect patterns, or join streams.

🧠

Short-Term Memory

State is like short-term memory for your stream processor. A stateless operator is like a person who forgets everything after each sentence — they can repeat what you said (map) or ignore boring parts (filter), but they can't summarize a conversation (aggregate) or notice you've said the same thing three times (pattern detection). State gives operators memory that persists across events.

stateful-vs-stateless.txttext
Stateless operations (no memory needed):
  map:    eventtransform(event)
  filter: eventkeep/discard
  flatMap: event → [event1, event2, ...]

Stateful operations (need memory):
  count:     need to remember running count
  sum:       need to remember running total
  average:   need count + sum
  distinct:  need set of seen values
  join:      need to buffer one stream while waiting for matches
  pattern:   need to remember sequence of events
  session:   need to track activity per user
  dedup:     need to remember processed IDs

State in Flink:
Local to each operator instance (fast access)
Fault-tolerant (included in checkpoints)
Scalable (distributed across TaskManagers)
Queryable (can be read externally)
Supports TTL (automatic expiry)

State is Local

Each parallel instance of an operator has its own state. State is never shared between parallel instances — this is what makes Flink scalable. For keyed state, Flink guarantees that all events with the same key go to the same operator instance, so the state for that key is always local.

02

Keyed State

Keyed state is the most common state type. It's partitioned by key — each key has its own independent state. You access it in functions that operate on keyed streams (after keyBy). Flink provides several state primitives for different data structures.

State TypeStructureUse CaseAccess Pattern
ValueState<T>Single value per keyRunning total, last seen eventget(), update(), clear()
ListState<T>List of values per keyEvent buffer, historyadd(), get() → Iterable, clear()
MapState<K,V>Map of key-value pairs per keyPer-key lookup table, counters by categoryget(k), put(k,v), entries(), clear()
ReducingState<T>Single value, auto-reduced on addRunning aggregationadd() triggers reduce, get()
AggregatingState<IN,OUT>Accumulator patternComplex aggregationsadd(IN), get() → OUT
keyed-state.javajava
public class FraudDetector extends KeyedProcessFunction<String, Transaction, Alert> {

    // State: last transaction amount for this user
    private ValueState<Double> lastAmountState;

    // State: count of transactions in current window
    private ValueState<Integer> txCountState;

    // State: set of merchants seen by this user
    private MapState<String, Boolean> seenMerchantsState;

    @Override
    public void open(Configuration params) {
        // Declare state with descriptors (name + type)
        lastAmountState = getRuntimeContext().getState(
            new ValueStateDescriptor<>("last-amount", Double.class));

        txCountState = getRuntimeContext().getState(
            new ValueStateDescriptor<>("tx-count", Integer.class));

        seenMerchantsState = getRuntimeContext().getMapState(
            new MapStateDescriptor<>("seen-merchants",
                String.class, Boolean.class));
    }

    @Override
    public void processElement(Transaction tx, Context ctx,
                               Collector<Alert> out) throws Exception {
        Double lastAmount = lastAmountState.value();
        Integer count = txCountState.value();

        // Fraud pattern: small tx followed by large tx at new merchant
        if (lastAmount != null && lastAmount < 1.0
            && tx.getAmount() > 500.0
            && !seenMerchantsState.contains(tx.getMerchant())) {
            out.collect(new Alert(tx.getUserId(), "FRAUD_SUSPECTED"));
        }

        // Update state
        lastAmountState.update(tx.getAmount());
        txCountState.update((count == null ? 0 : count) + 1);
        seenMerchantsState.put(tx.getMerchant(), true);
    }
}

State Descriptor Names Matter

State descriptor names (e.g., "last-amount") are used to identify state during checkpointing and recovery. If you rename a state descriptor, Flink cannot restore the old state — it's treated as new, empty state. Plan your state names carefully and never rename them in production.

03

Operator State

Operator state is not partitioned by key — it belongs to the operator instance as a whole. It's used in source/sink connectors and non-keyed operators. During rescaling, operator state must be explicitly redistributed.

operator-state.javajava
// Operator state: used in sources/sinks for offset tracking
public class CountingSource implements
        SourceFunction<Long>, CheckpointedFunction {

    private long count = 0;
    private volatile boolean running = true;

    // Operator state: survives failures
    private ListState<Long> checkpointedCount;

    @Override
    public void snapshotState(FunctionSnapshotContext ctx)
            throws Exception {
        // Save current count to state during checkpoint
        checkpointedCount.clear();
        checkpointedCount.add(count);
    }

    @Override
    public void initializeState(FunctionInitializationContext ctx)
            throws Exception {
        // Restore state on recovery
        checkpointedCount = ctx.getOperatorStateStore()
            .getListState(new ListStateDescriptor<>(
                "count", Long.class));

        // On recovery, restore from checkpoint
        if (ctx.isRestored()) {
            for (Long value : checkpointedCount.get()) {
                count = value;
            }
        }
    }

    @Override
    public void run(SourceContext<Long> ctx) throws Exception {
        while (running) {
            synchronized (ctx.getCheckpointLock()) {
                ctx.collect(count++);
            }
        }
    }
}
Redistribution ModeBehavior on RescaleUse Case
Even-split (ListState)List split evenly across new instancesKafka offsets, file positions
Union (UnionListState)Each instance gets the FULL listGlobal configuration, lookup tables
BroadcastEach instance gets identical copyRules, ML models, config updates

Keyed vs Operator State

Use keyed state (after keyBy) for per-entity data — user profiles, session state, per-device counters. Use operator state for infrastructure concerns — source offsets, sink buffers, connector metadata. Most application logic uses keyed state; operator state is primarily for connector development.

04

Broadcast State Pattern

The broadcast state pattern connects a low-throughput control stream (rules, config) with a high-throughput data stream. The control stream is broadcast to all parallel instances, ensuring every instance has the same rules. The data stream is processed against these rules.

broadcast-state.javajava
// Pattern: Dynamic fraud rules applied to transaction stream

// 1. Define the broadcast state descriptor
MapStateDescriptor<String, FraudRule> ruleStateDesc =
    new MapStateDescriptor<>("fraud-rules",
        String.class, FraudRule.class);

// 2. Create broadcast stream from rules topic
BroadcastStream<FraudRule> ruleStream = env
    .fromSource(kafkaRulesSource, WatermarkStrategy.noWatermarks(),
        "Rules Source")
    .broadcast(ruleStateDesc);

// 3. Connect data stream with broadcast stream
DataStream<Alert> alerts = transactions
    .keyBy(Transaction::getUserId)
    .connect(ruleStream)
    .process(new BroadcastProcessFunction<Transaction, FraudRule, Alert>() {

        @Override
        public void processElement(Transaction tx, ReadOnlyContext ctx,
                Collector<Alert> out) throws Exception {
            // Read-only access to broadcast state
            ReadOnlyBroadcastState<String, FraudRule> rules =
                ctx.getBroadcastState(ruleStateDesc);

            // Apply all active rules to this transaction
            for (Map.Entry<String, FraudRule> entry : rules.immutableEntries()) {
                if (entry.getValue().matches(tx)) {
                    out.collect(new Alert(tx.getUserId(),
                        entry.getKey()));
                }
            }
        }

        @Override
        public void processBroadcastElement(FraudRule rule,
                Context ctx, Collector<Alert> out) throws Exception {
            // Read-write access to broadcast state
            BroadcastState<String, FraudRule> state =
                ctx.getBroadcastState(ruleStateDesc);
            state.put(rule.getId(), rule);
        }
    });

Broadcast State Use Cases

  • Dynamic fraud rules — update detection rules without restarting the job
  • Feature flags — enable/disable features across all parallel instances
  • ML model updates — push new model weights to all scoring instances
  • Configuration changes — update thresholds, timeouts, or routing rules at runtime
  • Blocklists — broadcast updated IP/user blocklists to all filter instances

Broadcast State Guarantees

Broadcast state ensures all parallel instances see the same rules, but the ORDER in which rules and data events are processed is not guaranteed across instances. If rule ordering matters, include a version/timestamp in your rules and handle ordering in your process function.

05

State Backends

The state backend determines how state is stored and accessed. Flink provides two production backends: HashMapStateBackend (in-memory, fast) and EmbeddedRocksDBStateBackend (on-disk, scalable). The choice depends on your state size and latency requirements.

PropertyHashMapStateBackendEmbeddedRocksDBStateBackend
StorageJVM heap (in-memory)Local disk (RocksDB embedded DB)
State size limitLimited by heap (typically <10 GB)Limited by disk (TBs possible)
Access latency~nanoseconds (memory access)~microseconds (disk + deserialization)
SerializationOnly during checkpointOn every read/write (state stored serialized)
Checkpoint typeFull snapshotFull or incremental
GC pressureHigh (objects on heap)Low (data off-heap in RocksDB)
Best forSmall state, low latencyLarge state, production workloads
state-backend-config.javajava
// Option 1: HashMapStateBackend (small state, fast access)
env.setStateBackend(new HashMapStateBackend());
env.getCheckpointConfig().setCheckpointStorage(
    "s3://checkpoints/job-1");

// Option 2: EmbeddedRocksDBStateBackend (large state, production)
EmbeddedRocksDBStateBackend rocksDB =
    new EmbeddedRocksDBStateBackend(true); // incremental checkpoints
env.setStateBackend(rocksDB);
env.getCheckpointConfig().setCheckpointStorage(
    "s3://checkpoints/job-1");

// RocksDB tuning via flink-conf.yaml:
// state.backend.rocksdb.memory.managed: true
// state.backend.rocksdb.block.cache-size: 256mb
// state.backend.rocksdb.writebuffer.size: 128mb
// state.backend.rocksdb.writebuffer.count: 4

Production Default: RocksDB

For production, always use EmbeddedRocksDBStateBackend with incremental checkpoints. It handles state growth gracefully (spills to disk), produces smaller checkpoints (only diffs), and doesn't cause GC pressure. The microsecond access overhead is negligible for most streaming workloads. Use HashMapStateBackend only for development or when state is guaranteed to be tiny.

06

State TTL & Cleanup

Without TTL, state grows forever — every key that was ever seen remains in state. For use cases with unbounded key spaces (user IDs, session IDs), this leads to unbounded state growth and eventual OOM. State TTL automatically expires entries after a configurable duration.

state-ttl.javajava
// Configure TTL on a state descriptor
StateTtlConfig ttlConfig = StateTtlConfig
    .newBuilder(Time.hours(24))  // Expire after 24 hours
    .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
    .setStateVisibility(
        StateTtlConfig.StateVisibility.NeverReturnExpired)
    .cleanupFullSnapshot()       // Clean during full checkpoints
    .cleanupInRocksdbCompaction() // Clean during RocksDB compaction
    .build();

ValueStateDescriptor<UserProfile> descriptor =
    new ValueStateDescriptor<>("user-profile", UserProfile.class);
descriptor.enableTimeToLive(ttlConfig);

// UpdateType options:
//   OnCreateAndWrite — reset TTL on create and every update
//   OnReadAndWrite   — reset TTL on any access (read or write)

// StateVisibility options:
//   NeverReturnExpired — expired state returns null (safe default)
//   ReturnExpiredIfNotCleanedUp — may return stale data (faster)

// Cleanup strategies:
//   cleanupFullSnapshot()        — remove during checkpoint (batch)
//   cleanupIncrementally(10, true) — check N entries per access
//   cleanupInRocksdbCompaction() — remove during RocksDB compaction

When to Use State TTL

  • Unbounded key spaces — user IDs, session IDs, device IDs that grow forever
  • Deduplication state — remember processed IDs for N hours, then forget
  • Session state — expire inactive sessions after timeout
  • Cache state — enrichment data that should refresh periodically
  • Fraud detection — only consider recent transaction history

TTL + RocksDB Compaction

The most efficient cleanup strategy for RocksDB is cleanupInRocksdbCompaction(). It piggybacks on RocksDB's natural compaction process — expired entries are removed when SST files are compacted, with zero additional I/O. Combine with cleanupIncrementally() for faster cleanup of hot keys.

07

State Serialization & Schema Evolution

State must be serialized for checkpoints and (with RocksDB) for every access. When you update your application, state schemas may change. Flink supports schema evolution for certain serializers, allowing you to add fields without losing existing state.

schema-evolution.txttext
State Schema Evolution Support:

POJO serializer (recommended for evolvable state):
Add new fields (initialized to default value)
Remove fields (silently dropped)
Cannot change field types
Cannot rename fields
  Rules: must have no-arg constructor, public fields or getters/setters

Avro serializer:
Full Avro schema evolution rules
Add fields with defaults
Remove fields
Promote types (intlong)
  Best for complex, frequently-evolving state schemas

Kryo serializer (default fallback):
No schema evolution support
Any change requires state migration
  ⚠️ Avoid for production stateuse POJOs or Avro

TypeInformation (Flink-native types):
Primitives, Tuples, Row types
Limited evolution support
  Best for simple state (counters, timestamps)

Migration strategy when schema evolution isn't supported:
  1. Take savepoint with old job
  2. Write state migration tool (read old format, write new)
  3. Start new job from migrated savepoint

Use POJOs for State

Define your state types as POJOs (plain Java objects with no-arg constructor and public fields/getters). Flink's POJO serializer supports adding and removing fields — the most common schema changes. Avoid Kryo (the fallback serializer) for any state that needs to survive upgrades. Check logs for "falling back to Kryo" warnings.

08

Interview Questions

Q:What's the difference between keyed state and operator state?

A: Keyed state is partitioned by key — each key has independent state, accessed after keyBy(). It's for per-entity data (user profiles, session state). Operator state belongs to the operator instance as a whole, not partitioned by key. It's for infrastructure concerns (source offsets, sink buffers). On rescale: keyed state is automatically redistributed by key; operator state requires explicit redistribution strategy (even-split, union, or broadcast). Most application logic uses keyed state.

Q:Compare HashMapStateBackend and EmbeddedRocksDBStateBackend.

A: HashMapStateBackend: stores state as Java objects on JVM heap. Nanosecond access, but limited by heap size (typically <10 GB), causes GC pressure, and only supports full checkpoints. EmbeddedRocksDBStateBackend: stores state serialized in RocksDB (local disk). Microsecond access (serialization overhead), but supports TBs of state, minimal GC pressure, and incremental checkpoints. Production recommendation: always RocksDB with incremental checkpoints. Use HashMap only for development or guaranteed-tiny state.

Q:How does the broadcast state pattern work?

A: It connects a low-throughput control stream (rules, config) with a high-throughput data stream. The control stream is broadcast to ALL parallel instances via BroadcastStream, ensuring every instance has identical rules. Data stream elements are processed against these rules. Key detail: the data-processing side has READ-ONLY access to broadcast state; only the broadcast-processing side can write. Use cases: dynamic fraud rules, feature flags, ML model updates, blocklists — anything where you need to update processing logic without restarting the job.

Q:Why is state TTL important and how does it work?

A: Without TTL, state grows forever — every key ever seen remains in state. For unbounded key spaces (user IDs), this causes OOM. State TTL automatically expires entries after a configurable duration. Configuration: (1) Duration (e.g., 24 hours). (2) UpdateType — reset TTL on write only, or on any access. (3) Visibility — return expired data or null. (4) Cleanup strategy — during checkpoints, incrementally per access, or during RocksDB compaction. Best practice: use TTL for any state with unbounded keys, and cleanupInRocksdbCompaction() for efficient cleanup.

Q:How does Flink handle state schema evolution?

A: When you update your application, state types may change. Flink supports evolution for: (1) POJOs — can add/remove fields (not rename or change types). (2) Avro — full Avro evolution rules (add with defaults, promote types). (3) Kryo — NO evolution support (any change breaks). Strategy: use POJOs for state types, ensure no-arg constructors, avoid Kryo. For breaking changes: take savepoint, write migration tool to transform state, restore from migrated savepoint. Always test state compatibility before production upgrades.

09

Common Mistakes

📈

Not setting state TTL for unbounded key spaces

Processing events keyed by user_id without TTL. Over months, state accumulates entries for millions of users who will never return. State grows until OOM or checkpoint timeout.

Always set state TTL for unbounded key spaces. Choose a TTL based on your business logic — 24 hours for session data, 7 days for fraud detection history, etc. Use cleanupInRocksdbCompaction() for efficient background cleanup.

🏔️

Using HashMapStateBackend in production

Deploying with the default HashMap backend. State grows beyond heap, causing GC storms and OOM. Full checkpoints become huge and slow.

Always use EmbeddedRocksDBStateBackend with incremental checkpoints in production. It handles state growth gracefully, produces smaller checkpoint diffs, and doesn't cause GC pressure.

🔄

Renaming state descriptors

Renaming a state descriptor from 'user-count' to 'user-event-count' during an upgrade. Flink can't find the old state under the new name — it starts fresh, losing all accumulated state.

Never rename state descriptors in production. If you must restructure state, use a savepoint-based migration: read old state with old descriptor name, write to new descriptor, then deploy the new version.

Storing large objects in ValueState

Storing a List<Event> with thousands of elements in a single ValueState. With RocksDB, the entire list is serialized/deserialized on every access — even to add one element.

Use ListState for collections (append-only, no full deserialization on add) or MapState for key-value lookups. Only use ValueState for single values or small objects that are always accessed as a whole.

🐌

Falling back to Kryo serialization

Using complex types (third-party classes, generics) that Flink can't serialize natively. It silently falls back to Kryo — slower serialization, no schema evolution, larger checkpoints.

Check logs for 'Falling back to Kryo' warnings. Define state types as POJOs (no-arg constructor, public fields). Register custom serializers for third-party types. Use TypeInformation hints for generics.