JobManagerTaskManagerTask SlotsParallelismExecution GraphDeployment Modes

Core Architecture

Flink's distributed runtime consists of a JobManager that orchestrates execution and TaskManagers that run the actual data processing. Understanding this architecture is key to tuning, debugging, and scaling Flink applications.

45 min read9 sections
01

JobManager

The JobManager is the brain of a Flink cluster. It receives jobs, transforms them into execution graphs, schedules tasks onto TaskManagers, coordinates checkpoints, and handles failure recovery. In production, you run it in high-availability mode with ZooKeeper or Kubernetes leader election.

🧠

The Air Traffic Controller

The JobManager is like an air traffic controller. It doesn't fly planes (process data) itself — it coordinates which planes land on which runways (which tasks run on which slots), monitors their progress, and reroutes traffic when something goes wrong. If the controller goes down, a standby takes over using the flight logs (checkpoint metadata).

JobManager Responsibilities

  • āœ…Receives job submissions and transforms the logical plan into a physical execution graph
  • āœ…Schedules tasks onto available TaskManager slots based on resource requirements
  • āœ…Coordinates distributed snapshots (checkpoints) across all tasks
  • āœ…Detects task failures and triggers recovery from the latest checkpoint
  • āœ…Manages savepoints — manual snapshots for upgrades and rescaling
  • āœ…Serves the Flink Web UI and REST API for monitoring and management
jobmanager-ha.yamlyaml
# High Availability Configuration (Kubernetes)
high-availability: kubernetes
high-availability.storageDir: s3://flink-ha/cluster-1
restart-strategy: exponential-delay
restart-strategy.exponential-delay.initial-backoff: 1s
restart-strategy.exponential-delay.max-backoff: 60s

# JobManager resources
jobmanager.memory.process.size: 4096m
jobmanager.memory.jvm-overhead.fraction: 0.1

# HA ensures:
# - Leader election via Kubernetes ConfigMaps
# - Checkpoint metadata persisted to S3
# - Standby JobManager takes over in seconds
# - Running tasks continue from last checkpoint

JobManager Components

Internally, the JobManager consists of three components: the Dispatcher (accepts jobs, launches JobMasters), the ResourceManager (manages TaskManager slots), and the JobMaster (one per job — manages execution, checkpointing, and recovery for that specific job).

02

TaskManager

TaskManagers are the worker processes that actually execute the dataflow operators. Each TaskManager is a JVM process that offers a fixed number of task slots. Data flows between TaskManagers via network buffers, and state is stored locally on each TaskManager.

taskmanager-memory.txttext
TaskManager Memory Model:
═══════════════════════════════════════════════════════════
Total Process Memory (e.g., 8 GB)
ā”œā”€ā”€ JVM Heap
│   ā”œā”€ā”€ Framework Heap (128 MB default)
│   └── Task Heap (for user code objects)
ā”œā”€ā”€ Off-Heap (Managed Memory)
│   ā”œā”€ā”€ RocksDB state backend buffers
│   ā”œā”€ā”€ Batch sorting/hashing
│   └── Python UDF memory
ā”œā”€ā”€ Network Memory (for shuffle buffers)
│   ā”œā”€ā”€ Input buffers (receiving data)
│   └── Output buffers (sending data)
ā”œā”€ā”€ JVM Metaspace (256 MB default)
└── JVM Overhead (GC, threads, etc.)
═══════════════════════════════════════════════════════════

Key sizing rules:
  - Task Heap: depends on your UDF complexity
  - Managed Memory: 40% default, increase for RocksDB
  - Network Memory: 10% default, increase for high parallelism
  - More slots per TM = more memory sharing but less isolation
ComponentPurposeSizing Guidance
Task HeapUser code objects, serialization buffersDepends on UDF complexity; 1-4 GB typical
Managed MemoryRocksDB, batch ops, Python UDFs40% of total; increase for large state
Network MemoryShuffle buffers between operators10% of total; increase for high parallelism
JVM OverheadGC, threads, native libraries10% of total; rarely needs adjustment

TaskManager Sizing

A common production setup: 4-8 GB per TaskManager with 2-4 task slots each. Fewer large TaskManagers are generally better than many small ones — they reduce network overhead and allow slot sharing. But too large means a single TM failure impacts more tasks.

03

Task Slots & Parallelism

Task slots are the unit of resource allocation in Flink. Each slot represents a fixed portion of a TaskManager's memory. Parallelism determines how many parallel instances of each operator run. Slot sharing allows multiple operators from the same job to share a slot.

parallelism-slots.txttext
Parallelism vs Task Slots:

Job: Source(Kafka) → Map → KeyBy → Window → Sink
Parallelism: 4

Without slot sharing (needs 20 slots):
  Slot 1: Source[0]    Slot 5: Map[0]    Slot 9:  KeyBy[0]   ...
  Slot 2: Source[1]    Slot 6: Map[1]    Slot 10: KeyBy[1]   ...
  Slot 3: Source[2]    Slot 7: Map[2]    Slot 11: KeyBy[2]   ...
  Slot 4: Source[3]    Slot 8: Map[3]    Slot 12: KeyBy[3]   ...

With slot sharing (needs only 4 slots):
  Slot 1: Source[0] → Map[0] → KeyBy[0] → Window[0] → Sink[0]
  Slot 2: Source[1] → Map[1] → KeyBy[1] → Window[1] → Sink[1]
  Slot 3: Source[2] → Map[2] → KeyBy[2] → Window[2] → Sink[2]
  Slot 4: Source[3] → Map[3] → KeyBy[3] → Window[3] → Sink[3]

Benefits of slot sharing:
  āœ… Fewer slots needed (slots = max parallelism of any operator)
  āœ… Better resource utilization (light + heavy operators share)
  āœ… Full pipeline in one slot = less network shuffling

Rule: Total slots needed = max(operator parallelism) across all operators

Parallelism Configuration Levels

  • āœ…Operator level — set parallelism per operator: source.setParallelism(12)
  • āœ…Environment level — default for all operators: env.setParallelism(8)
  • āœ…Client level — override at submission: flink run -p 16 job.jar
  • āœ…Cluster level — flink-conf.yaml: parallelism.default: 4

Kafka Source Parallelism

For Kafka sources, set parallelism equal to the number of Kafka partitions. Each parallel source instance reads from one or more partitions. If parallelism exceeds partition count, extra instances sit idle. If it's less, some instances read multiple partitions.

04

Job Execution Model

When you submit a Flink job, it goes through several transformation stages: from your high-level API code to an optimized physical execution plan distributed across the cluster.

1

StreamGraph

Your API calls (map, filter, keyBy, window) are translated into a StreamGraph — a logical DAG of operators and their connections.

2

JobGraph

The StreamGraph is optimized into a JobGraph. Operators that can be chained (same parallelism, no shuffle) are fused into a single task for efficiency.

3

ExecutionGraph

The JobManager expands the JobGraph into an ExecutionGraph — one vertex per parallel instance of each operator. This is the physical execution plan.

4

Task Deployment

The JobManager schedules ExecutionGraph vertices onto available TaskManager slots and deploys the task code.

operator-chaining.txttext
Operator Chaining (Fusion):

Before chaining:
  Source → Map → Filter → KeyBy → Reduce → Sink
  (6 separate tasks, network transfer between each)

After chaining:
  [Source → Map → Filter] → KeyBy → [Reduce → Sink]
  (3 task groups, network only at shuffle boundaries)

Chaining rules:
  āœ… Same parallelism
  āœ… Forward connection (one-to-one, no rebalance)
  āœ… Same slot sharing group
  āŒ Cannot chain across keyBy (requires hash shuffle)
  āŒ Cannot chain if parallelism differs
  āŒ Cannot chain if user disabled it: .disableChaining()

Why chaining matters:
  - Eliminates serialization/deserialization between operators
  - Eliminates network buffer copies
  - Reduces thread context switches
  - Can improve throughput by 2-5x for simple operators

Viewing the Execution Plan

Use env.getExecutionPlan() to get the JSON execution plan before submitting. The Flink Web UI also shows the execution graph with parallelism, bytes sent/received, and backpressure per operator — essential for debugging performance issues.

05

Deployment Modes

Flink supports three deployment modes that differ in cluster lifecycle, resource isolation, and how the main() method is executed. Choosing the right mode depends on your operational requirements.

ModeCluster LifecycleResource IsolationBest For
Session ModeLong-running, shared clusterLow — jobs share TaskManagersDevelopment, many short jobs
Per-Job Mode (deprecated)Cluster per job, spun up on submitHigh — dedicated resourcesProduction on YARN (legacy)
Application ModeCluster per application, main() runs on clusterHigh — dedicated resourcesProduction on K8s/YARN
deployment-modes.txttext
Session Mode:
  ā”Œā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”
  │  Flink Session Cluster (always running) │
  │  ā”Œā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”  ā”Œā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”            │
  │  │  Job A   │  │  Job B   │  ← shared  │
  │  ā””ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”˜  ā””ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”˜    cluster  │
  │  ā”Œā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”                           │
  │  │  Job C   │                           │
  │  ā””ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”˜                           │
  ā””ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”˜
  Pros: Fast job startup, resource sharing
  Cons: Noisy neighbor, single point of failure

Application Mode:
  ā”Œā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”  ā”Œā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”
  │ Cluster for  │  │ Cluster for  │  ← isolated
  │   Job A      │  │   Job B      │    clusters
  │ main() here  │  │ main() here  │
  ā””ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”˜  ā””ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”˜
  Pros: Full isolation, main() doesn't need client resources
  Cons: Slower startup, more cluster overhead

Application Mode is the recommended production deployment.

Application Mode Advantage

In Application Mode, the main() method runs on the cluster, not the client. This means the job JAR doesn't need to be downloaded to a client machine, and the client doesn't need enough memory to build the job graph. This is critical for large jobs with complex graphs.

06

Flink on Kubernetes

Kubernetes is the recommended deployment platform for Flink in production. The Flink Kubernetes Operator provides native integration with declarative job management, automated upgrades, and reactive scaling.

flink-deployment.yamlyaml
apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
  name: fraud-detection-job
spec:
  image: flink:1.18
  flinkVersion: v1_18
  flinkConfiguration:
    taskmanager.numberOfTaskSlots: "4"
    state.backend: rocksdb
    state.checkpoints.dir: s3://checkpoints/fraud-detection
    execution.checkpointing.interval: "60000"
  serviceAccount: flink
  jobManager:
    resource:
      memory: "4096m"
      cpu: 2
  taskManager:
    resource:
      memory: "8192m"
      cpu: 4
    replicas: 6
  job:
    jarURI: s3://jars/fraud-detection-1.0.jar
    entryClass: com.example.FraudDetection
    parallelism: 24
    upgradeMode: savepoint
    state: running

Flink Kubernetes Operator Features

  • āœ…Declarative job management — define desired state, operator handles the rest
  • āœ…Automated savepoint on upgrade — takes savepoint, stops job, deploys new version, restores
  • āœ…Reactive mode — automatically scales TaskManagers based on workload
  • āœ…Job lifecycle management — handles restarts, failures, and status reporting
  • āœ…Native HA — uses Kubernetes ConfigMaps for leader election (no ZooKeeper needed)
07

Flink on YARN

For organizations with existing Hadoop infrastructure, Flink integrates with YARN as a resource manager. YARN allocates containers for the JobManager and TaskManagers, and Flink manages the job execution within those containers.

flink-yarn.shbash
# Application Mode on YARN (recommended)
./bin/flink run-application -t yarn-application \
  -Djobmanager.memory.process.size=4096m \
  -Dtaskmanager.memory.process.size=8192m \
  -Dtaskmanager.numberOfTaskSlots=4 \
  -Dparallelism.default=24 \
  -Dyarn.application.name="fraud-detection" \
  -Dyarn.application.queue="streaming" \
  s3://jars/fraud-detection-1.0.jar

# Session Mode on YARN
./bin/yarn-session.sh \
  -jm 4096m -tm 8192m \
  -s 4 \
  -d  # detached mode

# HA with ZooKeeper on YARN
high-availability: zookeeper
high-availability.zookeeper.quorum: zk1:2181,zk2:2181,zk3:2181
high-availability.storageDir: hdfs:///flink/ha
yarn.application-attempts: 3

YARN vs Kubernetes

If you're starting fresh, choose Kubernetes. It offers better resource isolation, faster scaling, and the Flink Kubernetes Operator provides superior lifecycle management. YARN is the right choice only if you have existing Hadoop infrastructure and need to share resources with MapReduce/Spark/Hive workloads.

08

Interview Questions

Q:What is the role of the JobManager in Flink?

A: The JobManager is the control plane of a Flink cluster. It: (1) Receives job submissions and transforms the logical plan into a physical ExecutionGraph. (2) Schedules tasks onto TaskManager slots. (3) Coordinates distributed checkpoints using the Chandy-Lamport algorithm. (4) Detects task failures and triggers recovery from the latest checkpoint. (5) Manages savepoints for upgrades and rescaling. Internally it has three components: Dispatcher (accepts jobs), ResourceManager (manages slots), and JobMaster (one per job, handles execution and checkpointing).

Q:Explain the difference between parallelism and task slots.

A: Task slots are the physical resource containers — each slot gets a fixed share of a TaskManager's memory. Parallelism is the logical concept — how many parallel instances of an operator run. With slot sharing (default), an entire pipeline fits in one slot, so you need slots = max(operator parallelism). Without slot sharing, you need slots = sum(all operator parallelisms). Example: 4 operators each with parallelism 8 needs only 8 slots with sharing, but 32 without.

Q:What is operator chaining and why does it matter?

A: Operator chaining fuses multiple operators into a single task when they have the same parallelism and a forward (one-to-one) connection. Chained operators run in the same thread, eliminating serialization, network buffers, and thread context switches between them. This can improve throughput 2-5x for simple operators. Chaining breaks at shuffle boundaries (keyBy, rebalance) or when parallelism differs. You can disable it per-operator with .disableChaining() for debugging.

Q:Compare Flink's three deployment modes.

A: (1) Session Mode: long-running shared cluster, fast job startup, but noisy-neighbor issues and shared failure domain. Good for development. (2) Per-Job Mode (deprecated): dedicated cluster per job on YARN, good isolation but slow startup. (3) Application Mode: dedicated cluster where main() runs on the cluster (not client). Best isolation, no client resource requirements for graph building. Recommended for production on both K8s and YARN.

Q:How does Flink achieve high availability for the JobManager?

A: Flink HA uses leader election (ZooKeeper or Kubernetes ConfigMaps) with multiple standby JobManagers. The active leader persists checkpoint metadata and job graphs to a distributed filesystem (S3, HDFS). On failure: (1) Standby wins leader election. (2) Recovers job metadata from storage. (3) Reconnects to running TaskManagers. (4) Restores from the latest completed checkpoint. TaskManagers continue running during failover — only the coordination is interrupted, typically for seconds.

09

Common Mistakes

šŸŽ°

Setting parallelism higher than Kafka partitions

Setting source parallelism to 32 when the Kafka topic has only 12 partitions. The extra 20 source instances sit completely idle, wasting slots.

āœ…Set Kafka source parallelism equal to the number of partitions. If you need higher downstream parallelism, use .rebalance() after the source to redistribute across more instances.

šŸ“¦

Using Session Mode in production

Running multiple production jobs on a shared session cluster. One job's memory leak or GC storm crashes the entire cluster, taking down all jobs.

āœ…Use Application Mode for production. Each job gets its own isolated cluster. A failure in one job cannot affect others. The Flink Kubernetes Operator makes this easy to manage.

🧮

Ignoring the TaskManager memory model

Setting taskmanager.memory.process.size without understanding the internal breakdown. Results in OOM kills because managed memory, network buffers, and JVM overhead weren't accounted for.

āœ…Understand the memory model: Task Heap + Managed Memory (40%) + Network (10%) + JVM Overhead (10%) + Metaspace. Use the Flink memory calculator or set individual components explicitly for production.

šŸ”—

Disabling operator chaining globally

Calling env.disableOperatorChaining() for debugging and forgetting to remove it. This forces every operator into its own task, multiplying network overhead and slot requirements.

āœ…Only disable chaining on specific operators for debugging: operator.disableChaining(). In production, always keep chaining enabled — it's one of Flink's key performance optimizations.