Monitoring & Operations
Running Flink in production requires comprehensive monitoring, intelligent alerting, and well-configured restart strategies. Understanding key metrics and common failure scenarios is essential for maintaining reliable streaming pipelines.
Table of Contents
Flink Metrics System
Flink exposes thousands of metrics at the job, operator, task, and TaskManager level. These metrics can be reported to external systems (Prometheus, Datadog, InfluxDB) for dashboarding and alerting. The metrics system is pluggable and extensible.
# Prometheus metrics reporter configuration (flink-conf.yaml) metrics.reporter.prometheus.factory.class: org.apache.flink.metrics.prometheus.PrometheusReporterFactory metrics.reporter.prometheus.port: 9249 # Alternative: Prometheus PushGateway (for short-lived jobs) metrics.reporter.promgateway.factory.class: org.apache.flink.metrics.prometheus.PrometheusPushGatewayReporterFactory metrics.reporter.promgateway.hostUrl: http://pushgateway:9091 metrics.reporter.promgateway.jobName: flink-metrics metrics.reporter.promgateway.randomJobNameSuffix: true metrics.reporter.promgateway.deleteOnShutdown: false # Metric scope format (how metrics are named) metrics.scope.jm: <host>.jobmanager metrics.scope.jm.job: <host>.jobmanager.<job_name> metrics.scope.tm: <host>.taskmanager.<tm_id> metrics.scope.tm.job: <host>.taskmanager.<tm_id>.<job_name> metrics.scope.task: <host>.taskmanager.<tm_id>.<job_name>.<task_name>.<subtask_index> metrics.scope.operator: <host>.taskmanager.<tm_id>.<job_name>.<operator_name>.<subtask_index> # Latency tracking (adds overhead — use selectively) metrics.latency.interval: 5000 metrics.latency.granularity: operator
Metrics Architecture
- ✅Flink exposes metrics via reporters (Prometheus, Datadog, StatsD, JMX)
- ✅Each TaskManager runs a metrics HTTP endpoint (port 9249 for Prometheus)
- ✅Metrics are hierarchical: Job → Task → Operator → Subtask
- ✅Custom metrics can be added in user code via getRuntimeContext().getMetricGroup()
- ✅Prometheus + Grafana is the most common production monitoring stack
Custom Metrics
Add application-specific metrics in your operators using getRuntimeContext().getMetricGroup().counter("events-processed") or .gauge("queue-size", () -> queue.size()). These appear alongside Flink's built-in metrics in your dashboards.
Key Metrics to Monitor
Not all metrics are equally important. Focus on these categories: job health, operator performance, TaskManager resources, and source-specific metrics (Kafka lag).
| Category | Metric | What It Tells You | Alert Threshold |
|---|---|---|---|
| Job Health | uptime / restartingTime | Job stability | Restart count > 3 in 10 min |
| Job Health | lastCheckpointDuration | Checkpoint performance | > 2x normal duration |
| Job Health | numberOfFailedCheckpoints | Checkpoint reliability | > 0 in 5 min window |
| Operator | busyTimeMsPerSecond | Operator saturation | > 900 (90% busy) |
| Operator | numRecordsOutPerSecond | Throughput | Dropping below baseline |
| Operator | currentOutputWatermark | Event-time progress | Lag growing over time |
| TaskManager | Status.JVM.Memory.Heap.Used | Heap usage | > 85% of max |
| TaskManager | Status.JVM.GarbageCollector.*.Time | GC pressure | > 500ms/sec |
| Kafka Source | currentOffsets - committedOffsets | Consumer lag | Growing continuously |
Key Prometheus Queries for Grafana: # Throughput per operator rate(flink_taskmanager_job_task_operator_numRecordsOut[1m]) # Backpressure indicator (busy time) flink_taskmanager_job_task_busyTimeMsPerSecond # Checkpoint duration trend flink_jobmanager_job_lastCheckpointDuration # Kafka consumer lag (records behind) flink_taskmanager_job_task_operator_KafkaSourceReader_KafkaConsumer_records_lag_max # Heap memory usage percentage flink_taskmanager_Status_JVM_Memory_Heap_Used / flink_taskmanager_Status_JVM_Memory_Heap_Max * 100 # Watermark lag (processing time - watermark) time() * 1000 - flink_taskmanager_job_task_operator_currentOutputWatermark # State size (RocksDB) flink_taskmanager_job_task_operator_state_size # Checkpoint size trend flink_jobmanager_job_lastCheckpointSize
The Big Three
If you can only monitor three things, monitor: (1) Kafka consumer lag — tells you if you're keeping up. (2) Checkpoint duration — tells you if state is healthy. (3) busyTimeMsPerSecond — tells you where bottlenecks are. These three metrics catch 90% of production issues.
Flink Web UI
The Flink Web UI provides real-time visibility into job execution, operator metrics, checkpoint history, and backpressure. It's the first place to look when debugging performance issues.
Flink Web UI Sections: 1. Overview - Running/completed/failed jobs - TaskManager count and available slots - JobManager memory and uptime 2. Jobs → Running Job → Overview - Execution graph (DAG visualization) - Per-operator: records in/out, bytes in/out - Parallelism per operator - Status of each subtask 3. Jobs → Running Job → Checkpoints - Checkpoint history (duration, size, trigger time) - Per-operator checkpoint stats - Alignment duration (indicator of backpressure) - Failed checkpoints with error messages 4. Jobs → Running Job → Backpressure - Per-operator backpressure status (OK/LOW/HIGH) - Sampling-based detection - Identifies bottleneck location 5. Jobs → Running Job → Subtasks - Per-subtask metrics (useful for detecting skew) - One subtask much busier than others = data skew - Bytes received/sent per subtask 6. Task Managers - Memory usage (heap, non-heap, managed) - GC statistics - Network buffer usage - Log file access
Web UI Debugging Workflow
- ✅Check Backpressure tab — find the first HIGH operator (bottleneck is downstream of it)
- ✅Check Checkpoints — growing duration or failures indicate state/storage issues
- ✅Check Subtasks — uneven metrics across subtasks indicate data skew
- ✅Check TaskManagers — high heap usage or GC time indicates memory pressure
- ✅Check operator metrics — records in vs out ratio shows filtering/expansion
Alerting
Effective alerting catches problems before they become outages. Focus on actionable alerts that indicate real problems, not transient spikes. Use multi-level severity (warning vs critical) to avoid alert fatigue.
# Prometheus alerting rules for Flink groups: - name: flink-critical rules: # Job has restarted multiple times - alert: FlinkJobUnstable expr: increase(flink_jobmanager_job_numRestarts[10m]) > 3 for: 5m labels: severity: critical annotations: summary: "Flink job restarting repeatedly" # Checkpoint failing consistently - alert: FlinkCheckpointFailing expr: increase(flink_jobmanager_job_numberOfFailedCheckpoints[10m]) > 2 for: 5m labels: severity: critical annotations: summary: "Flink checkpoints failing — data loss risk" # Kafka lag growing (falling behind) - alert: FlinkKafkaLagGrowing expr: rate(flink_taskmanager_job_task_operator_KafkaSourceReader_KafkaConsumer_records_lag_max[5m]) > 0 for: 15m labels: severity: warning annotations: summary: "Kafka consumer lag growing — pipeline falling behind" # Watermark stalled (event time not advancing) - alert: FlinkWatermarkStalled expr: changes(flink_taskmanager_job_task_operator_currentOutputWatermark[5m]) == 0 for: 10m labels: severity: warning annotations: summary: "Watermark not advancing — possible idle source or stuck job" # High GC pressure - alert: FlinkHighGCPressure expr: rate(flink_taskmanager_Status_JVM_GarbageCollector_G1_Young_Generation_Time[1m]) > 500 for: 5m labels: severity: warning annotations: summary: "High GC time — risk of checkpoint timeout or instability"
| Alert | Severity | Action |
|---|---|---|
| Job restarting repeatedly | Critical | Check logs for root cause, may need code fix |
| Checkpoints failing | Critical | Check storage connectivity, state size, timeout |
| Kafka lag growing | Warning | Scale up, fix bottleneck, or accept higher latency |
| Watermark stalled | Warning | Check for idle sources, stuck operators, or data gap |
| High GC pressure | Warning | Increase heap, reduce object creation, check for memory leak |
| Checkpoint duration doubling | Warning | State growing — check TTL, add cleanup, tune RocksDB |
Restart Strategies
Restart strategies determine how Flink recovers from task failures. The right strategy balances fast recovery (minimize downtime) with protection against restart loops (don't burn through attempts on a persistent bug).
| Strategy | Behavior | Best For |
|---|---|---|
| Fixed Delay | Restart N times with fixed delay between | Simple jobs, known transient failures |
| Exponential Delay | Increasing delay between restarts (1s, 2s, 4s...) | Production — prevents thundering herd on recovery |
| Failure Rate | Max N failures within time window | Long-running jobs — resets counter over time |
| No Restart | Job fails immediately on any failure | Development, testing |
# Exponential Delay (recommended for production) restart-strategy: exponential-delay restart-strategy.exponential-delay.initial-backoff: 1s restart-strategy.exponential-delay.max-backoff: 60s restart-strategy.exponential-delay.backoff-multiplier: 2.0 restart-strategy.exponential-delay.reset-backoff-threshold: 10min restart-strategy.exponential-delay.jitter-factor: 0.1 # Behavior: # Failure 1: wait 1s, restart # Failure 2: wait 2s, restart # Failure 3: wait 4s, restart # ... # Failure N: wait 60s (max), restart # After 10 min of stability: reset backoff to 1s # Failure Rate (alternative for long-running jobs) restart-strategy: failure-rate restart-strategy.failure-rate.max-failures-per-interval: 10 restart-strategy.failure-rate.failure-rate-interval: 5min restart-strategy.failure-rate.delay: 10s # Behavior: # Allow up to 10 failures per 5-minute window # Wait 10s between restarts # If 11th failure in 5 min → job fails permanently # Fixed Delay (simple) restart-strategy: fixed-delay restart-strategy.fixed-delay.attempts: 5 restart-strategy.fixed-delay.delay: 30s
Exponential Delay with Reset
The exponential delay strategy with reset-backoff-threshold is ideal for production. It backs off quickly during failure storms (preventing rapid restart loops) but resets after a period of stability. This handles both transient failures (quick recovery) and persistent issues (backs off to avoid resource waste).
Common Failure Scenarios
Understanding common failure scenarios helps you configure appropriate restart strategies, set up targeted alerts, and respond quickly when issues occur in production.
Common Failure Scenarios and Responses: 1. OutOfMemoryError (OOM) Cause: State too large for heap, memory leak in UDF, GC overhead Symptoms: TaskManager killed by OS, "java.lang.OutOfMemoryError" Fix: Increase memory, use RocksDB, add state TTL, fix leak Prevention: Monitor heap usage, set GC alerts 2. Checkpoint Timeout Cause: State too large, storage slow, backpressure during alignment Symptoms: "Checkpoint X expired before completing" Fix: Increase timeout, enable incremental/unaligned checkpoints Prevention: Monitor checkpoint duration trend 3. Kafka Broker Unavailable Cause: Kafka broker crash, network partition, maintenance Symptoms: "Failed to fetch metadata", consumer lag spike Fix: Flink retries automatically; ensure restart strategy allows it Prevention: Multi-broker cluster, monitor broker health 4. TaskManager Lost Cause: Machine failure, container eviction, network partition Symptoms: "TaskManager X lost", tasks rescheduled Fix: Automatic — Flink reschedules tasks on remaining TMs Prevention: Run enough TMs for N-1 survival, use K8s pod anti-affinity 5. Data Skew / Hot Key Cause: One key has disproportionate traffic Symptoms: One subtask at 100% CPU, others idle Fix: Key salting, pre-aggregation, custom partitioner Prevention: Monitor per-subtask metrics, alert on skew 6. Serialization Error Cause: Schema change, corrupt data, incompatible types Symptoms: "Could not deserialize", job restarts repeatedly Fix: Fix schema, add error handling, use dead letter queue Prevention: Schema registry, validation at source
Operational Runbook Essentials
- ✅OOM → Check heap metrics, state size, recent code changes. Increase memory or add TTL.
- ✅Checkpoint timeout → Check storage latency, state size growth, backpressure. Enable unaligned.
- ✅Restart loop → Check logs for root cause. If persistent bug, stop job, fix, redeploy from savepoint.
- ✅Kafka lag spike → Check backpressure location, scale bottleneck, or accept temporary lag.
- ✅Watermark stall → Check for idle partitions, data gaps, or stuck operators.
Restart Loop Protection
If a job enters a restart loop (same error every restart), the exponential backoff gives you time to investigate. After max backoff (60s), you have a minute between restarts to check logs, take a thread dump, or cancel the job. Without backoff, the job restarts immediately and fails again before you can react.
Interview Questions
Q:What metrics would you monitor for a production Flink job?
A: Three categories: (1) Job health — restart count, checkpoint duration/failures, uptime. (2) Performance — busyTimeMsPerSecond (operator saturation), numRecordsOutPerSecond (throughput), currentOutputWatermark (event-time progress). (3) Resources — heap usage, GC time, Kafka consumer lag. The 'big three' for quick diagnosis: Kafka lag (are we keeping up?), checkpoint duration (is state healthy?), busyTime (where's the bottleneck?). Set up Prometheus + Grafana dashboards with these metrics and alert on sustained degradation.
Q:How would you debug a Flink job that's falling behind (growing Kafka lag)?
A: Step-by-step: (1) Check Flink Web UI backpressure tab — find the bottleneck operator (first one showing HIGH is upstream of it). (2) Check busyTimeMsPerSecond on the bottleneck — if 1000, it's saturated. (3) Determine bottleneck type: CPU-bound (high CPU) → increase parallelism. I/O-bound (external calls) → use async I/O, batch writes. State-bound (RocksDB slow) → tune cache/buffers. Skew (one subtask hot) → key salting. (4) Check if it's transient (GC pause, Kafka rebalance) or sustained. (5) Scale the bottleneck operator or optimize its code.
Q:Explain Flink's restart strategies and when to use each.
A: (1) Exponential Delay (recommended): backs off exponentially (1s, 2s, 4s... up to max). Resets after stability period. Best for production — handles both transient failures (quick recovery) and persistent issues (backs off). (2) Failure Rate: allows N failures per time window. Good for long-running jobs where occasional failures are expected. (3) Fixed Delay: N attempts with fixed delay. Simple but doesn't adapt. (4) No Restart: immediate failure. For development/testing. Key: exponential delay with reset-backoff-threshold gives the best balance of fast recovery and loop protection.
Q:What are common failure scenarios in production Flink jobs?
A: (1) OOM — state exceeds heap, memory leak, or GC overhead. Fix: RocksDB, state TTL, increase memory. (2) Checkpoint timeout — state too large or storage slow. Fix: incremental/unaligned checkpoints, increase timeout. (3) Restart loop — persistent bug causes immediate failure after restart. Fix: exponential backoff gives time to investigate; fix code, redeploy from savepoint. (4) Data skew — hot key overloads one subtask. Fix: key salting, pre-aggregation. (5) Serialization error — schema change or corrupt data. Fix: dead letter queue, schema validation.
Common Mistakes
No monitoring until something breaks
Running Flink jobs without Prometheus/Grafana dashboards or alerts. Problems are only discovered when downstream consumers complain about missing data or users report stale results.
✅Set up monitoring from day one. Minimum: Prometheus metrics reporter, Grafana dashboard with the big three (lag, checkpoint duration, busyTime), and alerts for restart count, checkpoint failures, and growing lag.
Using no-restart strategy in production
Deploying with restart-strategy: none or fixed-delay with attempts: 1. A single transient failure (network blip, GC pause) permanently kills the job.
✅Use exponential-delay strategy with generous limits. Most production failures are transient — a restart from checkpoint recovers in seconds. Set reset-backoff-threshold so the counter resets after stability.
Alerting on every metric fluctuation
Setting alerts with no 'for' duration or very tight thresholds. Every GC pause, every brief lag spike triggers a page. Team develops alert fatigue and ignores real problems.
✅Use 'for' duration (5-15 min) to filter transient spikes. Alert on trends (lag growing for 15 min) not instantaneous values. Use warning vs critical severity. Only page on-call for critical alerts that require immediate action.
No runbook for common failures
When a 3am alert fires, the on-call engineer has no documentation on how to diagnose or fix common Flink issues. They spend 30 minutes figuring out how to access logs.
✅Create a runbook covering: how to access Flink Web UI, how to check logs, common failure patterns and their fixes, how to take a savepoint, how to restart from savepoint, and escalation paths. Link it from every alert.