Core Architecture
Flink's distributed runtime consists of a JobManager that orchestrates execution and TaskManagers that run the actual data processing. Understanding this architecture is key to tuning, debugging, and scaling Flink applications.
Table of Contents
JobManager
The JobManager is the brain of a Flink cluster. It receives jobs, transforms them into execution graphs, schedules tasks onto TaskManagers, coordinates checkpoints, and handles failure recovery. In production, you run it in high-availability mode with ZooKeeper or Kubernetes leader election.
The Air Traffic Controller
The JobManager is like an air traffic controller. It doesn't fly planes (process data) itself ā it coordinates which planes land on which runways (which tasks run on which slots), monitors their progress, and reroutes traffic when something goes wrong. If the controller goes down, a standby takes over using the flight logs (checkpoint metadata).
JobManager Responsibilities
- ā Receives job submissions and transforms the logical plan into a physical execution graph
- ā Schedules tasks onto available TaskManager slots based on resource requirements
- ā Coordinates distributed snapshots (checkpoints) across all tasks
- ā Detects task failures and triggers recovery from the latest checkpoint
- ā Manages savepoints ā manual snapshots for upgrades and rescaling
- ā Serves the Flink Web UI and REST API for monitoring and management
# High Availability Configuration (Kubernetes) high-availability: kubernetes high-availability.storageDir: s3://flink-ha/cluster-1 restart-strategy: exponential-delay restart-strategy.exponential-delay.initial-backoff: 1s restart-strategy.exponential-delay.max-backoff: 60s # JobManager resources jobmanager.memory.process.size: 4096m jobmanager.memory.jvm-overhead.fraction: 0.1 # HA ensures: # - Leader election via Kubernetes ConfigMaps # - Checkpoint metadata persisted to S3 # - Standby JobManager takes over in seconds # - Running tasks continue from last checkpoint
JobManager Components
Internally, the JobManager consists of three components: the Dispatcher (accepts jobs, launches JobMasters), the ResourceManager (manages TaskManager slots), and the JobMaster (one per job ā manages execution, checkpointing, and recovery for that specific job).
TaskManager
TaskManagers are the worker processes that actually execute the dataflow operators. Each TaskManager is a JVM process that offers a fixed number of task slots. Data flows between TaskManagers via network buffers, and state is stored locally on each TaskManager.
TaskManager Memory Model: āāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāā Total Process Memory (e.g., 8 GB) āāā JVM Heap ā āāā Framework Heap (128 MB default) ā āāā Task Heap (for user code objects) āāā Off-Heap (Managed Memory) ā āāā RocksDB state backend buffers ā āāā Batch sorting/hashing ā āāā Python UDF memory āāā Network Memory (for shuffle buffers) ā āāā Input buffers (receiving data) ā āāā Output buffers (sending data) āāā JVM Metaspace (256 MB default) āāā JVM Overhead (GC, threads, etc.) āāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāā Key sizing rules: - Task Heap: depends on your UDF complexity - Managed Memory: 40% default, increase for RocksDB - Network Memory: 10% default, increase for high parallelism - More slots per TM = more memory sharing but less isolation
| Component | Purpose | Sizing Guidance |
|---|---|---|
| Task Heap | User code objects, serialization buffers | Depends on UDF complexity; 1-4 GB typical |
| Managed Memory | RocksDB, batch ops, Python UDFs | 40% of total; increase for large state |
| Network Memory | Shuffle buffers between operators | 10% of total; increase for high parallelism |
| JVM Overhead | GC, threads, native libraries | 10% of total; rarely needs adjustment |
TaskManager Sizing
A common production setup: 4-8 GB per TaskManager with 2-4 task slots each. Fewer large TaskManagers are generally better than many small ones ā they reduce network overhead and allow slot sharing. But too large means a single TM failure impacts more tasks.
Task Slots & Parallelism
Task slots are the unit of resource allocation in Flink. Each slot represents a fixed portion of a TaskManager's memory. Parallelism determines how many parallel instances of each operator run. Slot sharing allows multiple operators from the same job to share a slot.
Parallelism vs Task Slots: Job: Source(Kafka) ā Map ā KeyBy ā Window ā Sink Parallelism: 4 Without slot sharing (needs 20 slots): Slot 1: Source[0] Slot 5: Map[0] Slot 9: KeyBy[0] ... Slot 2: Source[1] Slot 6: Map[1] Slot 10: KeyBy[1] ... Slot 3: Source[2] Slot 7: Map[2] Slot 11: KeyBy[2] ... Slot 4: Source[3] Slot 8: Map[3] Slot 12: KeyBy[3] ... With slot sharing (needs only 4 slots): Slot 1: Source[0] ā Map[0] ā KeyBy[0] ā Window[0] ā Sink[0] Slot 2: Source[1] ā Map[1] ā KeyBy[1] ā Window[1] ā Sink[1] Slot 3: Source[2] ā Map[2] ā KeyBy[2] ā Window[2] ā Sink[2] Slot 4: Source[3] ā Map[3] ā KeyBy[3] ā Window[3] ā Sink[3] Benefits of slot sharing: ā Fewer slots needed (slots = max parallelism of any operator) ā Better resource utilization (light + heavy operators share) ā Full pipeline in one slot = less network shuffling Rule: Total slots needed = max(operator parallelism) across all operators
Parallelism Configuration Levels
- ā Operator level ā set parallelism per operator: source.setParallelism(12)
- ā Environment level ā default for all operators: env.setParallelism(8)
- ā Client level ā override at submission: flink run -p 16 job.jar
- ā Cluster level ā flink-conf.yaml: parallelism.default: 4
Kafka Source Parallelism
For Kafka sources, set parallelism equal to the number of Kafka partitions. Each parallel source instance reads from one or more partitions. If parallelism exceeds partition count, extra instances sit idle. If it's less, some instances read multiple partitions.
Job Execution Model
When you submit a Flink job, it goes through several transformation stages: from your high-level API code to an optimized physical execution plan distributed across the cluster.
StreamGraph
Your API calls (map, filter, keyBy, window) are translated into a StreamGraph ā a logical DAG of operators and their connections.
JobGraph
The StreamGraph is optimized into a JobGraph. Operators that can be chained (same parallelism, no shuffle) are fused into a single task for efficiency.
ExecutionGraph
The JobManager expands the JobGraph into an ExecutionGraph ā one vertex per parallel instance of each operator. This is the physical execution plan.
Task Deployment
The JobManager schedules ExecutionGraph vertices onto available TaskManager slots and deploys the task code.
Operator Chaining (Fusion): Before chaining: Source ā Map ā Filter ā KeyBy ā Reduce ā Sink (6 separate tasks, network transfer between each) After chaining: [Source ā Map ā Filter] ā KeyBy ā [Reduce ā Sink] (3 task groups, network only at shuffle boundaries) Chaining rules: ā Same parallelism ā Forward connection (one-to-one, no rebalance) ā Same slot sharing group ā Cannot chain across keyBy (requires hash shuffle) ā Cannot chain if parallelism differs ā Cannot chain if user disabled it: .disableChaining() Why chaining matters: - Eliminates serialization/deserialization between operators - Eliminates network buffer copies - Reduces thread context switches - Can improve throughput by 2-5x for simple operators
Viewing the Execution Plan
Use env.getExecutionPlan() to get the JSON execution plan before submitting. The Flink Web UI also shows the execution graph with parallelism, bytes sent/received, and backpressure per operator ā essential for debugging performance issues.
Deployment Modes
Flink supports three deployment modes that differ in cluster lifecycle, resource isolation, and how the main() method is executed. Choosing the right mode depends on your operational requirements.
| Mode | Cluster Lifecycle | Resource Isolation | Best For |
|---|---|---|---|
| Session Mode | Long-running, shared cluster | Low ā jobs share TaskManagers | Development, many short jobs |
| Per-Job Mode (deprecated) | Cluster per job, spun up on submit | High ā dedicated resources | Production on YARN (legacy) |
| Application Mode | Cluster per application, main() runs on cluster | High ā dedicated resources | Production on K8s/YARN |
Session Mode: āāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāā ā Flink Session Cluster (always running) ā ā āāāāāāāāāāāā āāāāāāāāāāāā ā ā ā Job A ā ā Job B ā ā shared ā ā āāāāāāāāāāāā āāāāāāāāāāāā cluster ā ā āāāāāāāāāāāā ā ā ā Job C ā ā ā āāāāāāāāāāāā ā āāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāā Pros: Fast job startup, resource sharing Cons: Noisy neighbor, single point of failure Application Mode: āāāāāāāāāāāāāāāā āāāāāāāāāāāāāāāā ā Cluster for ā ā Cluster for ā ā isolated ā Job A ā ā Job B ā clusters ā main() here ā ā main() here ā āāāāāāāāāāāāāāāā āāāāāāāāāāāāāāāā Pros: Full isolation, main() doesn't need client resources Cons: Slower startup, more cluster overhead Application Mode is the recommended production deployment.
Application Mode Advantage
In Application Mode, the main() method runs on the cluster, not the client. This means the job JAR doesn't need to be downloaded to a client machine, and the client doesn't need enough memory to build the job graph. This is critical for large jobs with complex graphs.
Flink on Kubernetes
Kubernetes is the recommended deployment platform for Flink in production. The Flink Kubernetes Operator provides native integration with declarative job management, automated upgrades, and reactive scaling.
apiVersion: flink.apache.org/v1beta1 kind: FlinkDeployment metadata: name: fraud-detection-job spec: image: flink:1.18 flinkVersion: v1_18 flinkConfiguration: taskmanager.numberOfTaskSlots: "4" state.backend: rocksdb state.checkpoints.dir: s3://checkpoints/fraud-detection execution.checkpointing.interval: "60000" serviceAccount: flink jobManager: resource: memory: "4096m" cpu: 2 taskManager: resource: memory: "8192m" cpu: 4 replicas: 6 job: jarURI: s3://jars/fraud-detection-1.0.jar entryClass: com.example.FraudDetection parallelism: 24 upgradeMode: savepoint state: running
Flink Kubernetes Operator Features
- ā Declarative job management ā define desired state, operator handles the rest
- ā Automated savepoint on upgrade ā takes savepoint, stops job, deploys new version, restores
- ā Reactive mode ā automatically scales TaskManagers based on workload
- ā Job lifecycle management ā handles restarts, failures, and status reporting
- ā Native HA ā uses Kubernetes ConfigMaps for leader election (no ZooKeeper needed)
Flink on YARN
For organizations with existing Hadoop infrastructure, Flink integrates with YARN as a resource manager. YARN allocates containers for the JobManager and TaskManagers, and Flink manages the job execution within those containers.
# Application Mode on YARN (recommended) ./bin/flink run-application -t yarn-application \ -Djobmanager.memory.process.size=4096m \ -Dtaskmanager.memory.process.size=8192m \ -Dtaskmanager.numberOfTaskSlots=4 \ -Dparallelism.default=24 \ -Dyarn.application.name="fraud-detection" \ -Dyarn.application.queue="streaming" \ s3://jars/fraud-detection-1.0.jar # Session Mode on YARN ./bin/yarn-session.sh \ -jm 4096m -tm 8192m \ -s 4 \ -d # detached mode # HA with ZooKeeper on YARN high-availability: zookeeper high-availability.zookeeper.quorum: zk1:2181,zk2:2181,zk3:2181 high-availability.storageDir: hdfs:///flink/ha yarn.application-attempts: 3
YARN vs Kubernetes
If you're starting fresh, choose Kubernetes. It offers better resource isolation, faster scaling, and the Flink Kubernetes Operator provides superior lifecycle management. YARN is the right choice only if you have existing Hadoop infrastructure and need to share resources with MapReduce/Spark/Hive workloads.
Interview Questions
Q:What is the role of the JobManager in Flink?
A: The JobManager is the control plane of a Flink cluster. It: (1) Receives job submissions and transforms the logical plan into a physical ExecutionGraph. (2) Schedules tasks onto TaskManager slots. (3) Coordinates distributed checkpoints using the Chandy-Lamport algorithm. (4) Detects task failures and triggers recovery from the latest checkpoint. (5) Manages savepoints for upgrades and rescaling. Internally it has three components: Dispatcher (accepts jobs), ResourceManager (manages slots), and JobMaster (one per job, handles execution and checkpointing).
Q:Explain the difference between parallelism and task slots.
A: Task slots are the physical resource containers ā each slot gets a fixed share of a TaskManager's memory. Parallelism is the logical concept ā how many parallel instances of an operator run. With slot sharing (default), an entire pipeline fits in one slot, so you need slots = max(operator parallelism). Without slot sharing, you need slots = sum(all operator parallelisms). Example: 4 operators each with parallelism 8 needs only 8 slots with sharing, but 32 without.
Q:What is operator chaining and why does it matter?
A: Operator chaining fuses multiple operators into a single task when they have the same parallelism and a forward (one-to-one) connection. Chained operators run in the same thread, eliminating serialization, network buffers, and thread context switches between them. This can improve throughput 2-5x for simple operators. Chaining breaks at shuffle boundaries (keyBy, rebalance) or when parallelism differs. You can disable it per-operator with .disableChaining() for debugging.
Q:Compare Flink's three deployment modes.
A: (1) Session Mode: long-running shared cluster, fast job startup, but noisy-neighbor issues and shared failure domain. Good for development. (2) Per-Job Mode (deprecated): dedicated cluster per job on YARN, good isolation but slow startup. (3) Application Mode: dedicated cluster where main() runs on the cluster (not client). Best isolation, no client resource requirements for graph building. Recommended for production on both K8s and YARN.
Q:How does Flink achieve high availability for the JobManager?
A: Flink HA uses leader election (ZooKeeper or Kubernetes ConfigMaps) with multiple standby JobManagers. The active leader persists checkpoint metadata and job graphs to a distributed filesystem (S3, HDFS). On failure: (1) Standby wins leader election. (2) Recovers job metadata from storage. (3) Reconnects to running TaskManagers. (4) Restores from the latest completed checkpoint. TaskManagers continue running during failover ā only the coordination is interrupted, typically for seconds.
Common Mistakes
Setting parallelism higher than Kafka partitions
Setting source parallelism to 32 when the Kafka topic has only 12 partitions. The extra 20 source instances sit completely idle, wasting slots.
ā Set Kafka source parallelism equal to the number of partitions. If you need higher downstream parallelism, use .rebalance() after the source to redistribute across more instances.
Using Session Mode in production
Running multiple production jobs on a shared session cluster. One job's memory leak or GC storm crashes the entire cluster, taking down all jobs.
ā Use Application Mode for production. Each job gets its own isolated cluster. A failure in one job cannot affect others. The Flink Kubernetes Operator makes this easy to manage.
Ignoring the TaskManager memory model
Setting taskmanager.memory.process.size without understanding the internal breakdown. Results in OOM kills because managed memory, network buffers, and JVM overhead weren't accounted for.
ā Understand the memory model: Task Heap + Managed Memory (40%) + Network (10%) + JVM Overhead (10%) + Metaspace. Use the Flink memory calculator or set individual components explicitly for production.
Disabling operator chaining globally
Calling env.disableOperatorChaining() for debugging and forgetting to remove it. This forces every operator into its own task, multiplying network overhead and slot requirements.
ā Only disable chaining on specific operators for debugging: operator.disableChaining(). In production, always keep chaining enabled ā it's one of Flink's key performance optimizations.