Two GeneralsCAP TheoremCoordination PrimitivesCP SystemDistributed Systems

The Coordination Problem

Why distributed coordination is fundamentally hard — networks are unreliable, clocks drift, and processes crash. Understanding the problem is the first step to appreciating ZooKeeper's solution.

35 min read9 sections
01

Why Coordination is Hard

In a single-process application, coordination is trivial — you use a mutex, a semaphore, or a simple variable. But the moment you distribute your system across multiple machines, three fundamental problems make coordination extraordinarily difficult.

⚔️

The Two Generals Problem

Two armies on opposite sides of a valley need to attack simultaneously. They can only communicate by sending messengers through the valley — but messengers can be captured. General A sends 'Attack at dawn.' Did General B receive it? B sends back 'Confirmed.' Did A receive the confirmation? Neither general can ever be 100% certain the other will attack. This is the fundamental impossibility of reliable communication over unreliable channels — and it's exactly what networks are.

The Three Fundamental Challenges

  • Network unreliability — messages can be lost, duplicated, delayed, or reordered. You cannot distinguish a slow node from a dead one.
  • Clock skew — no two machines have perfectly synchronized clocks. Even with NTP, drift of 10-100ms is common. You cannot use timestamps for ordering.
  • Process crashes — any process can crash at any point, including mid-operation. Partial failures are the norm, not the exception.
coordination-failure.txttext
Scenario: Two services try to become leader simultaneously

Timeline:
  t=0ms   Service A: "I'll check if there's a leader..."
  t=1ms   Service B: "I'll check if there's a leader..."
  t=2ms   Service A: reads DBno leader exists
  t=3ms   Service B: reads DBno leader exists  (race condition!)
  t=4ms   Service A: writes "I am leader"success
  t=5ms   Service B: writes "I am leader"success

Result: SPLIT BRAINboth think they're the leader
  - Service A processes orders 1-50
  - Service B processes orders 1-50 (duplicates!)
  - Data corruption, lost money, angry customers

This is why you need atomic coordination primitives.

FLP Impossibility Result

Fischer, Lynch, and Paterson proved in 1985 that in an asynchronous system where even one process can crash, it is impossible to guarantee consensus will be reached. Every practical consensus system (including ZooKeeper) works around this by using timeouts and assuming partial synchrony.

02

The Fundamental Primitives

Despite the impossibility results, distributed systems need coordination. The key insight is that most coordination needs reduce to a small set of primitives. If you can implement these correctly once, every system can reuse them.

PrimitiveWhat It SolvesExample Use Case
Leader ElectionOnly one node performs a role at a timeKafka controller, HBase master
Group MembershipKnow which nodes are alive right nowService discovery, cluster membership
Configuration ManagementAll nodes see the same config, instantlyFeature flags, connection strings
Distributed LockingMutual exclusion across machinesPreventing duplicate processing
Barriers / SynchronizationAll nodes wait until a condition is metMapReduce phase transitions
1

Leader Election

Exactly one node is the leader at any time. If the leader dies, a new one is elected automatically. No split brain allowed.

2

Group Membership

The system knows exactly which nodes are alive. When a node crashes, it's removed from the group within seconds — not minutes.

3

Configuration Management

Change a config value once, and all nodes see the new value immediately via push notification — no polling, no stale reads.

4

Distributed Locking

A lock that works across machines. If the lock holder crashes, the lock is automatically released — no deadlocks from dead processes.

5

Barrier Synchronization

N processes wait at a barrier. Only when all N have arrived does the barrier open and all proceed. Essential for batch processing phases.

The Key Insight

ZooKeeper doesn't implement these primitives directly. Instead, it provides a small set of building blocks (znodes, watches, ephemeral nodes, sequential nodes) from which all these primitives can be constructed. This is what makes it so powerful and flexible.

03

Why Not Build Into Your Application

Every team that builds a distributed system eventually needs coordination. The temptation is to build it yourself — "how hard can leader election be?" The answer: extremely hard, and almost everyone gets it wrong.

🏗️

Building Your Own Crypto

The security community has a saying: 'Don't roll your own crypto.' The distributed systems community should have the same: 'Don't roll your own coordination.' Just as homegrown encryption has subtle vulnerabilities that only experts catch, homegrown coordination has subtle race conditions that only manifest under load, during network partitions, or after GC pauses — exactly when you need it most.

Why Homegrown Coordination Fails

  • Database-based locks break under network partitions — connection drops but lock isn't released
  • Heartbeat-based leader election has race conditions during GC pauses — two nodes both think they're leader
  • File-based locks don't work across machines and have stale lock problems after crashes
  • Redis SETNX locks aren't safe without careful TTL management and clock synchronization
  • Custom protocols are never tested against the full spectrum of failure modes
naive-leader-election.tstypescript
// ❌ BROKEN: Naive database-based leader election
async function tryBecomeLeader(nodeId: string) {
  const currentLeader = await db.query(
    'SELECT leader_id FROM leaders WHERE role = $1',
    ['primary']
  );

  // RACE CONDITION: Another node can read the same null value
  // between our read and write!
  if (!currentLeader) {
    await db.query(
      'INSERT INTO leaders (role, leader_id) VALUES ($1, $2)',
      ['primary', nodeId]
    );
    return true; // We think we're leader...
  }

  // PROBLEM: What if the current leader is dead?
  // How long do we wait before declaring it dead?
  // What if it's just slow (GC pause)?
  return false;
}

// ❌ BROKEN: What happens when the leader crashes?
// - The row stays in the database forever
// - No other node can become leader
// - Manual intervention required at 3am

// ❌ BROKEN: What about network partitions?
// - Leader is alive but can't reach the database
// - Other nodes elect a new leader
// - Now TWO leaders exist (split brain)

The Cost of Getting It Wrong

Split brain in a payment system means double-charging customers. Split brain in a queue processor means duplicate message processing. Split brain in a database means data corruption. These bugs are nearly impossible to reproduce in testing and only appear in production under stress.

04

CAP Theorem Applied

The CAP theorem states that during a network partition, a distributed system must choose between Consistency and Availability. ZooKeeper explicitly chooses Consistency (CP) — it will refuse to serve requests rather than return stale or incorrect data.

PropertyZooKeeper's ChoiceWhat This Means
Consistency✅ Always consistentAll clients see the same data in the same order
Availability❌ Sacrificed during partitionIf quorum is lost, ZK stops serving writes
Partition Tolerance✅ Handles partitionsContinues operating with majority partition
🏦

The Bank Vault Analogy

ZooKeeper is like a bank vault, not a convenience store. A convenience store stays open 24/7 even if the security system is down (AP — available but potentially inconsistent). A bank vault refuses to open if the security system can't verify your identity (CP — consistent but potentially unavailable). For coordination data — who is the leader, who holds the lock — you NEED the bank vault. Wrong answers are worse than no answers.

cap-behavior.txttext
Scenario: 5-node ZooKeeper ensemble, network partition splits 2|3

Majority partition (3 nodes):
Can elect a leader
Can serve reads and writes
Maintains all guarantees
Clients connected here continue normally

Minority partition (2 nodes):
Cannot form quorum (need 3 of 5)
Cannot serve writes
Cannot guarantee reads are current
Clients get CONNECTION_LOSS errors
Must reconnect to majority partition

Why this is CORRECT behavior:
  - If minority partition served writes, you'd have split brain
  - Two leaders, two versions of truth = data corruption
  - Better to be unavailable than inconsistent for coordination data

CP is Right for Coordination

For coordination data, CP is the only safe choice. If your leader election service returns "you are the leader" to two different nodes, you have a catastrophic failure. It's far better for the service to say "I don't know" (unavailable) than to give a wrong answer (inconsistent).

05

What ZooKeeper Provides

ZooKeeper provides a deceptively simple API — a hierarchical key-value store with a few special properties. But from these simple building blocks, you can construct all the coordination primitives a distributed system needs.

Core Building Blocks

  • Hierarchical namespace — tree of znodes like a filesystem (/app/config, /app/leader)
  • Small data storage — each znode holds up to 1MB (typically bytes to kilobytes)
  • Ephemeral nodes — automatically deleted when the creating session ends (client disconnects/crashes)
  • Sequential nodes — ZooKeeper appends a monotonically increasing counter to the name
  • Watches — one-time notifications when a znode changes (data or children)
  • Ordering guarantees — all updates are totally ordered with a global transaction ID (zxid)
  • Atomic operations — reads and writes are atomic; no partial updates
  • Session management — ZooKeeper tracks client liveness via heartbeats
zookeeper-api.txttext
ZooKeeper API (simplified):

Create Operations:
  create(path, data, acl, flags)     → creates a znode
    flags: PERSISTENT, EPHEMERAL, SEQUENTIAL, CONTAINER

Read Operations:
  getData(path, watch)               → returns data + stat
  getChildren(path, watch)           → returns child names
  exists(path, watch)                → returns stat or null

Write Operations:
  setData(path, data, version)       → conditional update (CAS)
  delete(path, version)              → conditional delete

Session Operations:
  connect(hosts, timeout)            → establish session
  close()                            → end session cleanly

Multi Operations (3.4+):
  multi(ops[])                       → atomic batch of operations

That's it. ~10 operations. From these, you build:
  - Leader election (ephemeral + sequential)
  - Distributed locks (ephemeral + sequential + watches)
  - Service discovery (ephemeral + getChildren + watches)
  - Configuration (persistent + getData + watches)
  - Barriers (persistent + getChildren + watches)

Simplicity is the Design

ZooKeeper's power comes from its simplicity. Rather than providing a complex API with built-in leader election, locking, etc., it provides minimal primitives that compose into any coordination pattern. This is why it has lasted 15+ years — the primitives don't need to change even as use cases evolve.

06

What ZooKeeper is NOT

Understanding what ZooKeeper is NOT is just as important as understanding what it is. Misusing ZooKeeper is a common source of production incidents.

ZooKeeper is NOTWhy NotUse Instead
A general-purpose database1MB per znode, not designed for large data, no query languagePostgreSQL, MongoDB, DynamoDB
A message queueNo consumer groups, no replay, watches are one-timeKafka, RabbitMQ, SQS
A cacheNot optimized for high-throughput reads, no evictionRedis, Memcached
A filesystemHierarchical but not designed for large files or streamingHDFS, S3
A service meshNo traffic routing, no load balancing, no health checksConsul, Istio, Envoy

Anti-Patterns That Kill ZooKeeper

  • Storing large blobs (images, logs, serialized objects) — znodes have a 1MB limit for a reason
  • High-frequency writes (>1000/sec) — ZooKeeper is designed for coordination, not data throughput
  • Using it as a work queue — sequential nodes work but don't scale; use Kafka or SQS instead
  • Storing session data — too many ephemeral nodes overwhelm the ensemble during restarts
  • Client-side caching without watches — leads to stale coordination data, which is worse than no data

The Litmus Test

Ask yourself: "Is this coordination metadata or application data?" If it's "who is the leader" or "what nodes are alive" — ZooKeeper. If it's "what did the user order" or "what's in the shopping cart" — use a database.

07

Systems That Depend on ZooKeeper

ZooKeeper is the coordination backbone for some of the most critical distributed systems in production today. Understanding how these systems use ZooKeeper helps you understand what coordination primitives they need.

SystemWhat It Uses ZooKeeper ForMoving Away?
Apache Kafka (pre-3.3)Controller election, broker registration, topic config, partition assignmentYes → KRaft (internal Raft)
Apache HBaseMaster election, region server tracking, schema managementNo — still requires ZK
Apache Hadoop YARNResourceManager HA, leader electionNo — still requires ZK
Apache Solr/SolrCloudCluster state, shard leader election, config managementNo — core dependency
Apache FlinkJob manager HA, checkpoint coordinationMoving to K8s-native HA
ClickHouseReplica coordination, distributed DDLMoving to ClickHouse Keeper
kafka-zookeeper-usage.txttext
How Kafka Uses ZooKeeper (pre-KRaft):

/kafka
├── brokers
│   ├── ids
│   │   ├── 0ephemeral: broker 0 is alive
│   │   ├── 1ephemeral: broker 1 is alive
│   │   └── 2ephemeral: broker 2 is alive
│   └── topics
│       └── orders
│           └── partitions
│               ├── 0/stateleader: broker 1, ISR: [1,2]
│               └── 1/stateleader: broker 2, ISR: [2,0]
├── controllerephemeral: current controller is broker 0
├── config
│   ├── topics/orderstopic configuration
│   └── brokers/0broker-level config overrides
└── admin
    └── reassign_partitionspartition reassignment state

When broker 1 crashes:
  1. Its ephemeral node /brokers/ids/1 is deleted
  2. Controller (broker 0) gets watch notification
  3. Controller reassigns partition 0 leadership to broker 2
  4. Controller updates /brokers/topics/orders/partitions/0/state
  5. All of this happens in seconds, automatically

Kafka's Move Away from ZooKeeper

Kafka 3.3+ introduced KRaft mode — an internal Raft-based consensus protocol that eliminates the ZooKeeper dependency. This reduces operational complexity (one less system to manage) and improves scalability (metadata stored in Kafka itself). But the coordination primitives are the same — just implemented differently.

08

Interview Questions

Q:Why is distributed coordination fundamentally hard? What makes it different from single-machine coordination?

A: Three things make it hard: (1) Network unreliability — messages can be lost, delayed, or reordered, and you can't distinguish a slow node from a dead one. (2) Clock skew — no two machines agree on the time, so you can't use timestamps for ordering. (3) Partial failures — any process can crash mid-operation. On a single machine, you have shared memory, a single clock, and atomic operations. Across machines, you have none of these. The FLP impossibility result proves that deterministic consensus is impossible in a fully asynchronous system with even one crash failure.

Q:What does ZooKeeper choose in the CAP theorem and why is that the right choice?

A: ZooKeeper chooses CP (Consistency + Partition tolerance). During a network partition, the minority partition becomes unavailable rather than serving potentially stale data. This is correct for coordination because: (1) A leader election that returns two leaders is catastrophic (split brain). (2) A lock service that grants the same lock twice defeats its purpose. (3) For coordination metadata, a wrong answer is always worse than no answer. Systems that need AP (like caches or social feeds) should use different tools.

Q:Why shouldn't you build coordination primitives into your application?

A: Because it's extremely hard to get right and the failure modes are subtle: (1) Database-based locks break during network partitions — the connection drops but the lock row persists. (2) Heartbeat-based detection has race conditions during GC pauses. (3) Custom protocols are never tested against all failure combinations. (4) Bugs only manifest under production load during partitions — exactly when you need coordination most. ZooKeeper has been battle-tested for 15+ years across thousands of production deployments.

Q:Name five systems that use ZooKeeper and what they use it for.

A: (1) Kafka (pre-3.3): controller election, broker registration, topic metadata, partition assignment. (2) HBase: master election, region server tracking, schema management. (3) Hadoop YARN: ResourceManager HA via leader election. (4) SolrCloud: cluster state management, shard leader election, distributed config. (5) ClickHouse: replica coordination, distributed DDL execution. Note: Kafka is moving to KRaft (internal Raft), and ClickHouse to its own Keeper — but the coordination primitives remain the same.

Q:What is ZooKeeper NOT suitable for? Give examples of misuse.

A: ZooKeeper is NOT: (1) A database — 1MB per znode limit, no query language, not designed for application data. (2) A message queue — no consumer groups, no replay, watches are one-time. (3) A cache — not optimized for high-throughput reads. Anti-patterns: storing large blobs, high-frequency writes (>1000/sec), using as a work queue, storing user session data. The litmus test: if it's coordination metadata (who is leader, what nodes are alive), use ZK. If it's application data (user orders, shopping carts), use a database.

09

Common Mistakes

🗄️

Using ZooKeeper as a general-purpose database

Storing application data, user sessions, or large objects in znodes. ZooKeeper's 1MB limit exists because it replicates all data to all nodes in memory.

Use ZooKeeper only for coordination metadata — leader info, config, service registry. Store application data in purpose-built databases (PostgreSQL, DynamoDB, etc.).

🔨

Building your own leader election with a database

Using SELECT FOR UPDATE or INSERT with unique constraints for leader election. This breaks during network partitions, GC pauses, and connection pool exhaustion.

Use ZooKeeper's ephemeral nodes for leader election. The session mechanism handles all the edge cases — if the leader crashes or gets partitioned, its ephemeral node is automatically deleted and a new leader is elected.

📊

High-frequency writes to ZooKeeper

Writing metrics, logs, or rapidly-changing state to ZooKeeper. Every write goes through consensus (Zab protocol) across all nodes — this is expensive by design.

ZooKeeper is designed for low-frequency coordination writes (tens per second, not thousands). For high-throughput data, use Kafka, Redis, or a time-series database.

🌐

Assuming ZooKeeper is always available

Not handling CONNECTION_LOSS or SESSION_EXPIRED in client code. When quorum is lost or the client is partitioned, ZooKeeper correctly becomes unavailable.

Always handle ZK connection events in your client. Implement retry logic for CONNECTION_LOSS and full re-initialization for SESSION_EXPIRED (all ephemeral nodes are gone).

Confusing ZooKeeper with a service mesh

Expecting ZooKeeper to do health checking, load balancing, or traffic routing. ZooKeeper provides the primitives for service discovery, but not the full service mesh functionality.

Use ZooKeeper for service registration (ephemeral nodes) and discovery (getChildren + watches). For health checks, load balancing, and traffic management, use Consul, Envoy, or Istio on top.