Partitioning
Scale writes through partitioning — write sharding, hot partition avoidance, fan-out on write vs read, and append-only design patterns for high-throughput systems.
Table of Contents
The Big Picture — Why Writes Don't Scale on One Machine
A single database server has a hard ceiling on write throughput. Every write must update indexes, flush to disk, replicate to followers, and maintain ACID guarantees. At some point — whether it's 10K or 100K writes per second — the machine maxes out on CPU, disk I/O, or lock contention. You can't buy a bigger machine forever.
The Supermarket Checkout
A supermarket with one checkout counter can handle 5 customers per minute. During rush hour, 50 customers arrive per minute. The line grows, wait times explode, and customers leave. The fix isn't a faster cashier — it's opening 10 checkout counters. Each counter handles a portion of customers in parallel. That's write partitioning: split the write load across multiple machines so each handles a fraction of the total.
🔥 Key Insight
Read scaling is relatively easy — add read replicas. Write scaling is fundamentally harder because every write must go to the source of truth. You can't just "copy" writes — you must split them. Partitioning is how you split writes across machines so each machine handles a manageable fraction.
Partitioning Overview
Partitioning (also called sharding) splits data across multiple nodes. Each node owns a subset of the data and handles writes only for its subset. The total write throughput scales linearly with the number of partitions.
Client
Sends write
Router
Determines partition
Partition
Handles write
Storage
Persists data
Single node: 10,000 writes/sec (max) Requirement: 100,000 writes/sec Option 1: Bigger machine (vertical scaling) → Maybe 20,000 writes/sec with 2x hardware → Still not enough. And 10x hardware doesn't exist. Option 2: 10 partitions (horizontal scaling) → Each partition handles 10,000 writes/sec → Total: 10 × 10,000 = 100,000 writes/sec ✅ → Need more? Add more partitions. The key: a good partition key distributes writes EVENLY. If 90% of writes go to 1 partition → you're back to square one.
✅ What Partitioning Gives You
- Linear write scaling (add partitions = add throughput)
- Smaller indexes per partition (faster writes)
- Independent failure domains (one partition down ≠ all down)
- Parallel processing across partitions
⚠️ What It Costs
- Cross-partition queries are expensive (scatter-gather)
- Transactions across partitions are hard or impossible
- Rebalancing when adding/removing partitions
- Operational complexity (more machines to manage)
Write Sharding
Write sharding distributes writes across multiple database instances based on a shard key. The shard key determines which partition owns each piece of data.
Sharding Methods
| Method | How It Works | Pros | Cons | Best For |
|---|---|---|---|---|
| Hash-Based | hash(userId) % N → shard | Even distribution, simple | Range queries span all shards, rebalancing moves data | User data, sessions, general CRUD |
| Range-Based | userId 1-1M → shard 0 userId 1M-2M → shard 1 | Range queries hit one shard, easy to understand | Hotspots if ranges are uneven, new users pile on last shard | Time-series, sequential IDs, alphabetical data |
| User-Based | All data for user X on one shard | User queries never cross shards, simple transactions | Power users create hotspots, uneven shard sizes | Social media, multi-tenant SaaS |
Write: INSERT user { id: "user_42", name: "Alice", ... } Step 1: Compute shard shard = hash("user_42") % 4 hash("user_42") = 2847291 2847291 % 4 = 3 → Route to Shard 3 Step 2: Shard 3 handles the write → Updates its local index → Writes to its local WAL → Replicates to its followers Step 3: Read for user_42 → Same computation: hash("user_42") % 4 = 3 → Route to Shard 3 → Single-shard read (fast) Cross-shard query: "Find all users named Alice" → Must query ALL 4 shards (scatter) → Merge results (gather) → Slow — this is the cost of sharding
Good Shard Keys
- ✅userId — even distribution, user queries stay on one shard
- ✅orderId (with hash) — distributes orders evenly
- ✅tenantId — all tenant data co-located, simple isolation
- ✅High cardinality — many distinct values = even spread
- ✅Frequently used in WHERE clauses — queries hit one shard
Bad Shard Keys
- ❌timestamp — all recent writes go to one shard (hot partition)
- ❌country — 50% of users might be in one country
- ❌boolean (is_active) — only 2 values, can't distribute
- ❌Low cardinality — few distinct values = uneven shards
- ❌Frequently JOINed across — forces cross-shard queries
🎯 Interview Insight
When asked "how would you scale writes?" — start with the shard key. Say: "I'd shard by userId using hash-based partitioning. This ensures even write distribution and keeps all of a user's data on one shard for fast single-user queries. The trade-off is that cross-user queries require scatter-gather across all shards."
Hot Partition Avoidance
A hot partition is a shard that receives disproportionately more writes than others. It becomes the bottleneck — the entire system's write throughput is limited by the slowest (busiest) partition.
The One Busy Counter
You opened 10 checkout counters, but counter #3 handles all returns, exchanges, AND regular purchases. The other 9 counters are idle while counter #3 has a 50-person line. Your 10x scaling investment is wasted because one counter is doing all the work. Hot partitions work the same way — one shard absorbs most writes while others sit idle.
Common Causes
Skewed Keys
Sharding by country when 40% of users are in one country. That shard gets 40% of all writes while others get 5-10% each.
Celebrity / Viral Content
A celebrity with 100M followers posts. All 100M 'like' writes go to the shard that owns that post. One shard is overwhelmed.
Temporal Patterns
Sharding by timestamp. All current writes go to the 'latest' shard. Historical shards are idle. The newest shard is always hot.
Solutions
Better Shard Key Selection
Choose a key with high cardinality and even distribution. userId is almost always better than country, timestamp, or status. If no single field works, use a composite key: hash(userId + postId).
Key Salting (Randomization)
For known hot keys, append a random suffix. 'post:viral_123' becomes 'post:viral_123_0' through 'post:viral_123_9'. Writes distribute across 10 shards. Reads fan out to all 10 and merge. Trade-off: reads are 10x more expensive.
Consistent Hashing with VNodes
Instead of hash(key) % N, use consistent hashing with virtual nodes. This ensures even distribution and minimal data movement when adding/removing shards.
Split Hot Shards
Monitor per-shard write rates. When a shard exceeds a threshold, split it into two. The hot shard's key range is divided, and half the data migrates to the new shard. DynamoDB does this automatically.
Problem: Celebrity post gets 1M likes/minute All writes: INSERT INTO likes (post_id, user_id, ...) post_id = "post_viral_123" hash("post_viral_123") % 10 = shard 7 → Shard 7 gets 1M writes/min, others get ~0 Solution: Salt the key salted_key = "post_viral_123_" + random(0, 9) → "post_viral_123_0" → shard 2 → "post_viral_123_3" → shard 5 → "post_viral_123_7" → shard 1 → Writes spread across ~10 shards Reading like count: → Query all 10 salted keys: post_viral_123_0 through _9 → SUM the counts → 10 parallel reads instead of 1, but no hot partition
🎯 Interview Insight
Hot partitions are one of the most common real-world scaling problems. Always mention monitoring per-shard metrics. Say: "I'd monitor write QPS per shard. If one shard exceeds 2x the average, I'd investigate the key distribution and either re-key, salt the hot key, or split the shard."
Fan-out on Write vs Fan-out on Read
This is one of the most important trade-offs in system design. When a user creates content that many others will read, you have two choices: do the work at write time (push) or at read time (pull).
Fan-out on Write (Push Model)
Alice posts a photo. Alice has 500 followers. At WRITE time: 1. Store the post in posts table 2. Look up Alice's 500 followers 3. Write the post_id into each follower's feed: INSERT INTO feed (user_id, post_id) VALUES (follower_1, post_123), (follower_2, post_123), ... (follower_500, post_123) Write cost: 1 post + 500 feed entries = 501 writes Read cost: SELECT * FROM feed WHERE user_id = ? ORDER BY time → Single partition read, pre-sorted. FAST. When follower opens their feed: → Data is already there, pre-computed → One read, one partition, instant
Fan-out on Read (Pull Model)
Alice posts a photo. Alice has 500 followers. At WRITE time: 1. Store the post in posts table Write cost: 1 write. Done. When follower opens their feed: 1. Look up who they follow: [Alice, Bob, Charlie, ...] 2. Fetch recent posts from EACH followed user: SELECT * FROM posts WHERE author_id = 'alice' ORDER BY time LIMIT 20 SELECT * FROM posts WHERE author_id = 'bob' ORDER BY time LIMIT 20 ... 3. Merge and sort all results by time 4. Return top 20 Read cost: N queries (one per followed user) + merge + sort If user follows 500 people → 500 queries per feed load. SLOW.
| Factor | Fan-out on Write | Fan-out on Read |
|---|---|---|
| Write cost | High (N writes per post, N = followers) | Low (1 write per post) |
| Read cost | Low (pre-computed, single read) | High (N reads per feed load) |
| Latency | Fast reads, slow writes | Slow reads, fast writes |
| Storage | High (duplicated in every follower's feed) | Low (stored once) |
| Consistency | Feed may lag behind post (async fan-out) | Always fresh (reads from source) |
| Celebrity problem | 1 post → 100M writes (expensive) | No problem at write time |
| Best for | Most users (< 10K followers) | Celebrities (> 100K followers) |
The Hybrid Approach (Twitter/Instagram)
Rule: Use fan-out on write for normal users, fan-out on read for celebrities. Alice posts (500 followers → fan-out on write): → Write post_id into 500 follower feeds → Followers see it instantly in their pre-built feed Taylor Swift posts (100M followers → fan-out on read): → Store post once → Do NOT write to 100M feeds (would take minutes, waste storage) → When a follower loads their feed: 1. Read pre-built feed (fan-out on write posts) 2. Fetch latest posts from followed celebrities (fan-out on read) 3. Merge the two lists → Slightly slower read, but avoids 100M writes Threshold: typically ~5,000-10,000 followers Below → fan-out on write Above → fan-out on read
🎯 Interview Insight
This is a top-tier system design question. When designing a social feed, always mention both approaches and the hybrid. Say: "For users with fewer than 5K followers, I'd fan-out on write for instant feed reads. For celebrities, I'd fan-out on read to avoid millions of writes per post. The feed service merges both at read time."
Append-Only Design
Instead of updating existing records in place, append-only design writes new records for every change. The current state is derived by reading the latest entry or replaying the log. This is the fastest possible write pattern because sequential disk writes are orders of magnitude faster than random updates.
The Ledger vs The Whiteboard
A whiteboard (update-in-place): you erase the old balance and write the new one. If the power goes out mid-erase, you've lost data. A ledger (append-only): you never erase. You write a new line: 'Balance was $500, now $300 (withdrew $200).' The full history is preserved. To get the current balance, read the last line. To audit, read the whole ledger. This is how banks, blockchains, and high-write databases work.
Update-in-place (traditional): UPDATE accounts SET balance = 300 WHERE id = 42 → Random disk seek to find row → Lock the row → Update in place → Update index → Write WAL entry → Slow: random I/O + locking Append-only: INSERT INTO account_events (account_id, type, amount, balance, timestamp) VALUES (42, 'withdrawal', 200, 300, NOW()) → Sequential disk append (fastest I/O pattern) → No locking (no row to lock) → No index update on existing data → Fast: sequential I/O, no contention Current balance: SELECT balance FROM account_events WHERE account_id = 42 ORDER BY timestamp DESC LIMIT 1 Full history: SELECT * FROM account_events WHERE account_id = 42 ORDER BY timestamp
Benefits
- ✅Extreme write throughput (sequential disk writes)
- ✅No lock contention (no rows to lock)
- ✅Full audit trail (every change is recorded)
- ✅Easy recovery (replay the log from any point)
- ✅Natural fit for event sourcing and CQRS
Trade-offs
- ❌Storage grows continuously (every change is a new record)
- ❌Reads may be slower (must find latest entry or aggregate)
- ❌Compaction needed to reclaim space (merge old entries)
- ❌Current state requires computation (not stored directly)
- ❌More complex query patterns than simple SELECT by ID
Real-World Examples
Apache Kafka
Every message is appended to a partition log. Consumers read from any offset. Messages are never updated — only appended and eventually expired by retention policy.
Event Sourcing
Instead of storing current state, store the sequence of events that led to it. 'OrderCreated → ItemAdded → PaymentReceived → OrderShipped.' Replay events to rebuild state.
LSM Trees (RocksDB)
Writes go to an in-memory buffer (memtable), then flush to sorted files on disk (SSTables). Never update in place — always append. Background compaction merges old files.
🎯 Interview Insight
When designing any high-write system (logging, metrics, event processing), mention append-only design. Say: "I'd use an append-only log because sequential writes are 100x faster than random updates. The trade-off is storage growth, which I'd handle with compaction and retention policies."
End-to-End Scenario
Let's design a high-scale social media platform that handles 500K writes per second using every pattern from this guide.
System: Social Feed (500K writes/sec)
Traffic: 500K writes/sec (posts, likes, comments, follows) Users: 500M total, 50M DAU Celebrities: 10K users with > 100K followers 1. WRITE SHARDING Posts table: sharded by hash(userId) across 50 partitions Each partition handles ~10K writes/sec (within capacity) All of a user's posts on one shard → fast user profile reads 2. HOT PARTITION AVOIDANCE Likes on viral posts: salted key "like:post_viral_123" → "like:post_viral_123_{0-9}" Writes spread across 10 shards instead of 1 Like count: SUM across all 10 salted keys (async counter) 3. FAN-OUT STRATEGY (Hybrid) Normal user posts (< 5K followers): fan-out on write → Post written + 5K feed entries = 5,001 writes → Followers see it instantly in pre-built feed Celebrity posts (> 100K followers): fan-out on read → Post written once → Feed service merges pre-built feed + celebrity posts at read time 4. APPEND-ONLY DESIGN All writes go to Kafka first (append-only log) Kafka partitioned by userId (same user → same partition → ordering) Consumers read from Kafka and write to: → Posts DB (sharded PostgreSQL) → Feed cache (Redis, pre-built feeds) → Search index (Elasticsearch) → Analytics (ClickHouse) Flow: Client → API → Kafka (append) → Consumers → [DB, Cache, Search, Analytics] ↑ Fast (Kafka append) ↑ Async (decoupled from user request)
💡 This Is How Instagram / Twitter Works
The combination of write sharding + hot key salting + hybrid fan-out + append-only ingestion via Kafka is the standard architecture for every high-scale social platform. Each pattern solves a specific bottleneck, and together they handle millions of writes per second.
Trade-offs & Decision Making
| Trade-off | Option A | Option B | Choose A When | Choose B When |
|---|---|---|---|---|
| Write vs Read complexity | Fan-out on write (complex writes, simple reads) | Fan-out on read (simple writes, complex reads) | Read-heavy, latency-sensitive feeds | Write-heavy, celebrity-heavy systems |
| Distribution vs query simplicity | More partitions (higher throughput) | Fewer partitions (simpler queries) | Write throughput is the bottleneck | Cross-partition queries are frequent |
| Storage vs performance | Append-only (fast writes, more storage) | Update-in-place (less storage, slower writes) | Write throughput > storage cost | Storage is expensive, writes are moderate |
| Consistency vs throughput | Async fan-out (eventual, high throughput) | Sync fan-out (immediate, lower throughput) | Feed can lag 1-5 seconds | Real-time consistency required |
🎯 Interview Framework
For any write-scaling question, walk through: (1) What's the shard key? (2) How do we handle hot partitions? (3) Fan-out on write or read? (4) Can we use append-only for throughput? This four-step framework covers every write-scaling scenario.
Interview Questions
Q:How do you scale writes beyond a single database?
A: Partition (shard) the data across multiple database instances. Choose a shard key with high cardinality and even distribution (typically userId or a hash of the primary key). Each partition handles a fraction of the total writes. With 10 partitions, each handling 10K writes/sec, you get 100K total. For hot keys (viral content), use key salting to spread writes across multiple partitions. For ingestion, use an append-only log (Kafka) as a buffer — it absorbs write spikes and feeds downstream systems asynchronously.
Q:What is a hot partition and how do you fix it?
A: A hot partition is a shard that receives disproportionately more writes than others, becoming the bottleneck. Causes: bad shard key (timestamp, low-cardinality field), celebrity/viral content, temporal patterns. Fixes: (1) Better shard key — high cardinality, even distribution. (2) Key salting — append random suffix to hot keys, spreading writes across shards. (3) Consistent hashing with vnodes — automatic even distribution. (4) Auto-splitting — monitor per-shard QPS and split hot shards (DynamoDB does this). Prevention: always monitor per-shard write rates, not just aggregate throughput.
Q:Fan-out on write vs fan-out on read — when do you use each?
A: Fan-out on write: when a user posts, push the post into every follower's pre-built feed. Reads are instant (pre-computed). Use for: most users with < 5-10K followers. Fan-out on read: store the post once, assemble the feed at read time by fetching from all followed users. Use for: celebrities with 100K+ followers (avoids millions of writes per post). Most production systems use a hybrid: fan-out on write for normal users, fan-out on read for celebrities, merge at read time.
You're designing a logging platform that ingests 1M events per second
How would you handle the write throughput?
Answer: (1) Kafka as the ingestion layer — append-only, partitioned by service_name or hash(event_id). Kafka handles 1M+ writes/sec per cluster. (2) Consumers read from Kafka and write to ClickHouse (columnar, append-only, optimized for analytics) sharded by time + service. (3) Hot services (generating 50% of events) get their own Kafka partitions to avoid hotspots. (4) Retention: raw events for 7 days, aggregated metrics for 1 year. The key insight: never write directly to the analytics DB — buffer through Kafka to decouple ingestion from storage.
Your sharded database has one shard at 95% capacity while others are at 30%
What happened and how do you fix it?
Answer: Hot partition — one shard is receiving disproportionate writes. Diagnosis: check the shard key distribution. Common causes: (1) Shard key is timestamp (all current writes go to one shard). Fix: use hash(userId) instead. (2) One user/entity generates massive writes (celebrity, viral content). Fix: salt the key. (3) Range-based sharding with uneven ranges. Fix: split the hot range. Immediate fix: split the hot shard into two (migrate half its key range to a new shard). Long-term fix: switch to consistent hashing with vnodes for automatic even distribution.
Common Pitfalls
Choosing a bad shard key
Sharding by timestamp (all writes go to the 'current' shard), by country (40% of users in one country), or by a boolean field (only 2 possible values). The result: one shard is overwhelmed while others are idle. Your 10-shard cluster performs like a 1-shard cluster.
✅Choose a shard key with high cardinality (many distinct values) and even distribution. userId is almost always the right choice for user-facing systems. Test the distribution with your actual data before deploying. If no single field works, use a composite key: hash(userId + entityId).
Ignoring hotspots until production
The system works perfectly in testing with uniform data. In production, a celebrity posts and one shard gets 100x the normal write load. The shard's disk I/O maxes out, writes queue up, timeouts cascade, and the entire system degrades.
✅Design for hotspots from day one. Implement key salting for known hot entities. Monitor per-shard write QPS with alerts. Have a runbook for splitting hot shards. Test with realistic skewed data, not uniform synthetic data.
Overloading a single partition
All writes for a feature go to one partition because the shard key doesn't distribute that feature's data. Example: a 'global counters' table with one row per counter — all updates hit the same shard regardless of the shard key.
✅Identify write-hot tables and ensure their shard key distributes writes. For global counters, use distributed counting: maintain per-shard counters and aggregate periodically. For global sequences, use a distributed ID generator (Snowflake IDs) instead of a single auto-increment.
Not planning for re-sharding
Starting with 4 shards using hash(key) % 4. When you need 8 shards, hash(key) % 8 moves ~75% of data. The migration takes days, during which the system is degraded. Teams avoid re-sharding and instead over-provision, wasting resources.
✅Use consistent hashing from the start — adding a shard moves only ~1/N of data. Or over-provision logical partitions: start with 256 logical partitions mapped to 4 physical nodes. When you need 8 nodes, reassign logical partitions without rehashing. DynamoDB and Cassandra do this automatically.