| Shard | Users | Status |
|---|---|---|
| Shard 0 | 233,831 | ✅ Primary + Replica |
| Shard 1 | 249,883 | ✅ Primary + Replica |
| Shard 2 | 259,295 | ✅ Primary + Replica |
| Shard 3 | 259,363 | ✅ Primary + Replica |
| Total | 1,002,372 | Evenly distributed |
| Concurrent Users | Throughput | Success Rate | Mean Response |
|---|---|---|---|
| 1,000 | 384 req/sec | 100% | 2ms |
| 5,000 | 7,783 req/sec | 99.7% | 514ms |
| 10,000 | 6,245 req/sec | 95.0% | 1,250ms |
Observation: Sharding with consistent hashing achieves near-perfect data distribution and handles 6K+ writes/sec with all replicas in sync.
┌─────────────────────────────────────────────────────────────────────┐
│ DATABASE SCALING PYRAMID │
├─────────────────────────────────────────────────────────────────────┤
│ │
│ ┌─────────┐ │
│ │SHARDING │ ← 100M+ rows │
│ ─┴─────────┴─ │
│ ┌───────────────┐ │
│ │ PARTITIONING │ ← 10M+ rows │
│ ─┴───────────────┴─ │
│ ┌─────────────────────┐ │
│ │ READ REPLICAS │ ← 1M+ rows │
│ ─┴─────────────────────┴─ │
│ ┌─────────────────────────────┐ │
│ │ INDEXING & QUERY TUNING │ ← 100K+ rows │
│ ─┴─────────────────────────────┴─ │
│ ┌───────────────────────────────────┐ │
│ │ CONNECTION POOLING & CACHING │ ← Any size │
│ ─┴───────────────────────────────────┴─ │
│ │
└─────────────────────────────────────────────────────────────────────┘
You're currently at 1M+ users with 4-shard architecture and streaming replication. Sharding is already implemented!
Partitioning splits a large table into smaller physical pieces while keeping it logically as one table.
┌─────────────────────────────────────────────────────────────────────┐
│ PARTITIONING STRATEGIES │
├─────────────────────────────────────────────────────────────────────┤
│ │
│ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │
│ │ RANGE │ │ LIST │ │ HASH │ │
│ │ (by date) │ │ (by region) │ │ (by user_id) │ │
│ ├──────────────┤ ├──────────────┤ ├──────────────┤ │
│ │ Jan 2024 │ │ US │ │ hash(id) % 4 │ │
│ │ Feb 2024 │ │ EU │ │ = 0,1,2,3 │ │
│ │ Mar 2024 │ │ ASIA │ │ │ │
│ │ ... │ │ ... │ │ │ │
│ └──────────────┘ └──────────────┘ └──────────────┘ │
│ │
│ Best for: Best for: Best for: │
│ - Time-series - Multi-tenant - Even data │
│ - Archiving old - Regional data - distribution │
│ data - Categories - No natural key │
│ │
└─────────────────────────────────────────────────────────────────────┘
-- Step 1: Create partitioned table
CREATE TABLE users_partitioned (
id UUID NOT NULL DEFAULT uuidv7(),
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
updated_at TIMESTAMPTZ,
first_name VARCHAR(100) NOT NULL,
last_name VARCHAR(100) NOT NULL,
PRIMARY KEY (id, created_at) -- Partition key must be in PK
) PARTITION BY RANGE (created_at);
-- Step 2: Create partitions for each month
CREATE TABLE users_2024_01 PARTITION OF users_partitioned
FOR VALUES FROM ('2024-01-01') TO ('2024-02-01');
CREATE TABLE users_2024_02 PARTITION OF users_partitioned
FOR VALUES FROM ('2024-02-01') TO ('2024-03-01');
CREATE TABLE users_2024_03 PARTITION OF users_partitioned
FOR VALUES FROM ('2024-03-01') TO ('2024-04-01');
-- ... more partitions
-- Step 3: Create default partition for overflow
CREATE TABLE users_default PARTITION OF users_partitioned DEFAULT;
-- Step 4: Create indexes on partitions (automatically inherited)
CREATE INDEX idx_users_part_name ON users_partitioned (first_name, last_name);
CREATE INDEX idx_users_part_created ON users_partitioned (created_at DESC);-- Better for even distribution when you don't query by date
CREATE TABLE users_hash_partitioned (
id UUID NOT NULL DEFAULT uuidv7(),
created_at TIMESTAMPTZ DEFAULT NOW(),
updated_at TIMESTAMPTZ,
first_name VARCHAR(100) NOT NULL,
last_name VARCHAR(100) NOT NULL,
PRIMARY KEY (id)
) PARTITION BY HASH (id);
-- Create 8 hash partitions
CREATE TABLE users_hash_p0 PARTITION OF users_hash_partitioned
FOR VALUES WITH (MODULUS 8, REMAINDER 0);
CREATE TABLE users_hash_p1 PARTITION OF users_hash_partitioned
FOR VALUES WITH (MODULUS 8, REMAINDER 1);
CREATE TABLE users_hash_p2 PARTITION OF users_hash_partitioned
FOR VALUES WITH (MODULUS 8, REMAINDER 2);
CREATE TABLE users_hash_p3 PARTITION OF users_hash_partitioned
FOR VALUES WITH (MODULUS 8, REMAINDER 3);
CREATE TABLE users_hash_p4 PARTITION OF users_hash_partitioned
FOR VALUES WITH (MODULUS 8, REMAINDER 4);
CREATE TABLE users_hash_p5 PARTITION OF users_hash_partitioned
FOR VALUES WITH (MODULUS 8, REMAINDER 5);
CREATE TABLE users_hash_p6 PARTITION OF users_hash_partitioned
FOR VALUES WITH (MODULUS 8, REMAINDER 6);
CREATE TABLE users_hash_p7 PARTITION OF users_hash_partitioned
FOR VALUES WITH (MODULUS 8, REMAINDER 7);| Benefit | Description |
|---|---|
| Partition Pruning | Queries automatically skip irrelevant partitions |
| Parallel Scans | PostgreSQL can scan partitions in parallel |
| Easy Archiving | Drop old partitions instead of DELETE (instant!) |
| Smaller Indexes | Each partition has its own smaller index |
| Maintenance | VACUUM/ANALYZE on smaller chunks |
Sharding distributes data across multiple database servers.
┌─────────────────────────────────────────────────────────────────────┐
│ SHARDING ARCHITECTURE │
├─────────────────────────────────────────────────────────────────────┤
│ │
│ ┌─────────────┐ │
│ │ Client │ │
│ └──────┬──────┘ │
│ │ │
│ ┌──────▼──────┐ │
│ │ Router │ ← Determines which shard │
│ │ (App/Proxy)│ │
│ └──────┬──────┘ │
│ ┌───────────────┼───────────────┐ │
│ │ │ │ │
│ ┌──────▼──────┐ ┌──────▼──────┐ ┌──────▼──────┐ │
│ │ Shard 1 │ │ Shard 2 │ │ Shard 3 │ │
│ │ (0-33%) │ │ (34-66%) │ │ (67-100%) │ │
│ └──────┬──────┘ └──────┬──────┘ └──────┬──────┘ │
│ │ │ │ │
│ ┌──────▼──────┐ ┌──────▼──────┐ ┌──────▼──────┐ │
│ │ Replica 1 │ │ Replica 2 │ │ Replica 3 │ │
│ └─────────────┘ └─────────────┘ └─────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────────┘
// shard_router.go
package sharding
import (
"crypto/sha256"
"encoding/binary"
"fmt"
"github.com/google/uuid"
"gorm.io/gorm"
)
// ShardConfig holds configuration for sharding
type ShardConfig struct {
ShardCount int
Shards map[int]*gorm.DB
}
// ShardRouter routes queries to the correct shard
type ShardRouter struct {
config *ShardConfig
}
// NewShardRouter creates a new shard router
func NewShardRouter(shards map[int]*gorm.DB) *ShardRouter {
return &ShardRouter{
config: &ShardConfig{
ShardCount: len(shards),
Shards: shards,
},
}
}
// GetShardForUser returns the shard for a given user ID
func (r *ShardRouter) GetShardForUser(userID uuid.UUID) *gorm.DB {
shardIndex := r.hashToShard(userID)
return r.config.Shards[shardIndex]
}
// hashToShard converts a UUID to a shard index
func (r *ShardRouter) hashToShard(id uuid.UUID) int {
hash := sha256.Sum256(id[:])
// Use first 4 bytes as uint32
num := binary.BigEndian.Uint32(hash[:4])
return int(num % uint32(r.config.ShardCount))
}
// GetAllShards returns all shards for scatter-gather queries
func (r *ShardRouter) GetAllShards() []*gorm.DB {
shards := make([]*gorm.DB, r.config.ShardCount)
for i, db := range r.config.Shards {
shards[i] = db
}
return shards
}
// Example usage in repository
type ShardedUserRepository struct {
router *ShardRouter
}
func (r *ShardedUserRepository) Create(user *User) error {
shard := r.router.GetShardForUser(user.ID)
return shard.Create(user).Error
}
func (r *ShardedUserRepository) FindByID(id uuid.UUID) (*User, error) {
shard := r.router.GetShardForUser(id)
var user User
err := shard.First(&user, "id = ?", id).Error
return &user, err
}
// Scatter-gather for queries that need all shards
func (r *ShardedUserRepository) CountAll() (int64, error) {
var totalCount int64
// Query all shards in parallel
results := make(chan int64, len(r.router.GetAllShards()))
errors := make(chan error, len(r.router.GetAllShards()))
for _, shard := range r.router.GetAllShards() {
go func(db *gorm.DB) {
var count int64
if err := db.Model(&User{}).Count(&count).Error; err != nil {
errors <- err
return
}
results <- count
}(shard)
}
for range r.router.GetAllShards() {
select {
case count := <-results:
totalCount += count
case err := <-errors:
return 0, err
}
}
return totalCount, nil
}Citus turns PostgreSQL into a distributed database:
-- On coordinator node
CREATE EXTENSION citus;
-- Add worker nodes
SELECT citus_add_node('worker1', 5432);
SELECT citus_add_node('worker2', 5432);
SELECT citus_add_node('worker3', 5432);
-- Distribute the users table by ID
SELECT create_distributed_table('users', 'id');
-- Now queries automatically route to correct shard!
INSERT INTO users (first_name, last_name) VALUES ('John', 'Doe');
-- Citus automatically routes to correct worker
SELECT * FROM users WHERE id = '123e4567-e89b-12d3-a456-426614174000';
-- Citus routes to the shard containing this IDFor MySQL-compatible sharding, Vitess is popular (used by YouTube, Slack):
# vitess topology
keyspaces:
- name: messenger
shards:
- name: "-80" # IDs hashing to 0x00-0x7F
tablets:
- type: primary
host: shard1-primary
- type: replica
host: shard1-replica
- name: "80-" # IDs hashing to 0x80-0xFF
tablets:
- type: primary
host: shard2-primary
- type: replica
host: shard2-replica┌─────────────────────────────────────────────────────────────────────┐
│ PARTITIONING vs SHARDING │
├─────────────────┬────────────────────┬──────────────────────────────┤
│ Aspect │ Partitioning │ Sharding │
├─────────────────┼────────────────────┼──────────────────────────────┤
│ Location │ Single server │ Multiple servers │
│ Complexity │ Low (built-in) │ High (distributed system) │
│ Transactions │ Full ACID │ Distributed transactions │
│ JOINs │ Easy │ Cross-shard = complex │
│ Scale limit │ Single server RAM │ Unlimited horizontal │
│ Maintenance │ Standard PG tools │ Custom tooling needed │
│ Best for │ 10M - 500M rows │ 500M+ rows │
│ When to use │ First! Try this │ When partitioning isn't │
│ │ before sharding │ enough │
└─────────────────┴────────────────────┴──────────────────────────────┘
┌─────────────────────────────────────────────────────────────────────┐
│ CONNECTION POOLING │
├─────────────────────────────────────────────────────────────────────┤
│ │
│ WITHOUT POOLING WITH PGBOUNCER │
│ │
│ ┌─────┐ ┌─────┐ │
│ │App 1│──┐ │App 1│──┐ │
│ └─────┘ │ └─────┘ │ │
│ ┌─────┐ │ 1000 ┌─────┐ │ 100 │
│ │App 2│──┼─connections──► │App 2│──┼─connections──► │
│ └─────┘ │ └─────┘ │ ┌─────────┐ │
│ ┌─────┐ │ ┌─────┐ │ │PgBouncer│ │
│ │App 3│──┘ │App 3│──┘ └────┬────┘ │
│ └─────┘ └─────┘ │ │
│ │ │ 50 │
│ ▼ ▼ │
│ ┌─────────┐ ┌─────────┐ │
│ │PostgreSQL│ (overwhelmed!) │PostgreSQL│ │
│ │ 1000 │ │ 50 │ │
│ │connections│ │connections││
│ └─────────┘ └─────────┘ │
│ │
└─────────────────────────────────────────────────────────────────────┘
# docker-compose.yml - Add PgBouncer
pgbouncer:
image: edoburu/pgbouncer:latest
environment:
DATABASE_URL: postgres://root:root@messenger_db_primary:5432/postgres
POOL_MODE: transaction # Important for GORM
MAX_CLIENT_CONN: 1000
DEFAULT_POOL_SIZE: 50
MIN_POOL_SIZE: 10
ports:
- "6432:5432"
depends_on:
- messenger_db_primary// cache/user_cache.go
package cache
import (
"context"
"encoding/json"
"time"
"github.com/redis/go-redis/v9"
)
type UserCache struct {
client *redis.Client
ttl time.Duration
}
func NewUserCache(client *redis.Client) *UserCache {
return &UserCache{
client: client,
ttl: 15 * time.Minute,
}
}
func (c *UserCache) Get(ctx context.Context, id string) (*User, error) {
data, err := c.client.Get(ctx, "user:"+id).Bytes()
if err == redis.Nil {
return nil, nil // Cache miss
}
if err != nil {
return nil, err
}
var user User
if err := json.Unmarshal(data, &user); err != nil {
return nil, err
}
return &user, nil
}
func (c *UserCache) Set(ctx context.Context, user *User) error {
data, err := json.Marshal(user)
if err != nil {
return err
}
return c.client.Set(ctx, "user:"+user.ID.String(), data, c.ttl).Err()
}
func (c *UserCache) Invalidate(ctx context.Context, id string) error {
return c.client.Del(ctx, "user:"+id).Err()
}
// Cache-aside pattern in service
type CachedUserService struct {
cache *UserCache
repo UserRepository
}
func (s *CachedUserService) GetByID(ctx context.Context, id string) (*User, error) {
// Try cache first
user, err := s.cache.Get(ctx, id)
if err != nil {
return nil, err
}
if user != nil {
return user, nil // Cache hit!
}
// Cache miss - get from DB
user, err = s.repo.FindByID(id)
if err != nil {
return nil, err
}
// Store in cache for next time
_ = s.cache.Set(ctx, user)
return user, nil
}// Already implemented in your CQRS pattern
// Writes → Primary
// Reads → Replica-- Create materialized view for expensive aggregations
CREATE MATERIALIZED VIEW user_stats AS
SELECT
DATE_TRUNC('day', created_at) as date,
COUNT(*) as users_created,
COUNT(*) FILTER (WHERE first_name LIKE 'A%') as a_names
FROM users
GROUP BY DATE_TRUNC('day', created_at);
-- Refresh periodically (not real-time)
REFRESH MATERIALIZED VIEW CONCURRENTLY user_stats;
-- Query the view (instant!)
SELECT * FROM user_stats WHERE date > NOW() - INTERVAL '7 days';-- Instead of full index, create partial index for common case
-- If 90% of queries are for recent users:
CREATE INDEX idx_users_recent ON users (created_at DESC)
WHERE created_at > NOW() - INTERVAL '30 days';
-- Much smaller index, faster queries for recent data- Read replicas ✅
- Connection pooling at app level ✅
- Query optimization ✅
- PgBouncer available (profile-based)
- Redis caching layer ✅
- Implemented consistent hashing with MD5 + 150 virtual nodes ✅
- 4-shard architecture with primaries and replicas ✅
- ShardManager for automatic routing ✅
- Parallel scatter-gather queries ✅
- Streaming replication for all shards ✅
- Add more shards as data grows (8, 16, etc.)
- Implement read routing to replicas for read-heavy workloads
- Add cross-shard query caching
- Consider Citus for advanced distributed queries
# Start 4-shard cluster with replicas
docker-compose up -d
# Verify all containers are healthy
docker ps --format "table {{.Names}}\t{{.Status}}"
# Run API with sharding enabled
cd server
SHARDING_ENABLED=true ./messenger-api
# Check shard stats
curl http://localhost:8080/api/v1/shards/stats | jq
# Verify replication
for i in 0 1 2 3; do
PRIMARY=$(docker exec shard-$i psql -U postgres -d messenger -t -c "SELECT COUNT(*) FROM users")
REPLICA=$(docker exec shard-$i-replica psql -U postgres -d messenger -t -c "SELECT COUNT(*) FROM users")
echo "Shard $i: PRIMARY=$PRIMARY REPLICA=$REPLICA"
doneSee SHARDING_GUIDE.md for detailed implementation documentation.
- PostgreSQL Partitioning: https://www.postgresql.org/docs/current/ddl-partitioning.html
- Citus Data: https://www.citusdata.com/
- Designing Data-Intensive Applications (Book) - Martin Kleppmann
- How Discord Stores Billions of Messages: https://discord.com/blog/how-discord-stores-billions-of-messages
Implemented and tested:
- ✅ 4-shard PostgreSQL cluster with consistent hashing (MD5 + 150 virtual nodes)
- ✅ Streaming replication on all shards (primaries + hot standby replicas)
- ✅ 1M+ users evenly distributed across shards
- ✅ 7,783 req/sec throughput at 5K concurrent users
- ✅ 99.7% success rate under heavy load
- ✅ Replicas perfectly synced during 6K+ writes/second
Next steps for scaling:
- Add more shards (8, 16) as data grows beyond 10M users per shard
- Route reads to replicas for read-heavy workloads
- Consider Citus for advanced distributed SQL features