Introduction

Scaling distributed systems beyond their initial design capacity often requires fundamental architectural changes. This post explores advanced scalability patterns I’ve implemented in high-throughput systems, with practical examples and performance implications.

Command Query Responsibility Segregation (CQRS)

Implementation Example

// Command side
type OrderCommand struct {
    OrderID     string
    CustomerID  string
    Items       []OrderItem
    TotalAmount decimal.Decimal
}

type OrderCommandHandler struct {
    eventStore EventStore
    outbox     MessageOutbox
}

func (h *OrderCommandHandler) Handle(ctx context.Context, cmd OrderCommand) error {
    // Create and validate order
    order := domain.NewOrder(cmd.OrderID, cmd.CustomerID, cmd.Items)
    if err := order.Validate(); err != nil {
        return err
    }

    // Store events
    events := order.Changes()
    if err := h.eventStore.Store(ctx, events); err != nil {
        return err
    }

    // Publish to outbox for eventual consistency
    return h.outbox.Publish(ctx, events)
}

// Query side
type OrderQuery struct {
    repository *OrderReadRepository
    cache      *redis.Client
}

func (q *OrderQuery) GetOrderSummary(ctx context.Context, orderID string) (*OrderSummary, error) {
    // Try cache first
    summary, err := q.cache.Get(ctx, fmt.Sprintf("order:%s", orderID)).Result()
    if err == nil {
        return unmarshalOrderSummary(summary)
    }

    // Fall back to read repository
    return q.repository.GetOrderSummary(ctx, orderID)
}

Event Sourcing with Snapshots

Event Store Implementation

type Event struct {
    AggregateID   string
    Version       int
    Type          string
    Data          []byte
    Timestamp     time.Time
}

type EventStore struct {
    db        *sql.DB
    snapshots *SnapshotStore
}

func (es *EventStore) Store(ctx context.Context, events []Event) error {
    tx, err := es.db.BeginTx(ctx, nil)
    if err != nil {
        return err
    }
    defer tx.Rollback()

    for _, event := range events {
        if err := es.insertEvent(ctx, tx, event); err != nil {
            return err
        }
    }

    // Create snapshot if needed
    if shouldCreateSnapshot(events) {
        if err := es.snapshots.Create(ctx, tx, events[len(events)-1]); err != nil {
            return err
        }
    }

    return tx.Commit()
}

func (es *EventStore) Rebuild(ctx context.Context, aggregateID string) (interface{}, error) {
    // Try loading from snapshot first
    snapshot, err := es.snapshots.GetLatest(ctx, aggregateID)
    if err == nil {
        return snapshot, nil
    }

    // Rebuild from events if no snapshot
    events, err := es.GetEvents(ctx, aggregateID)
    if err != nil {
        return nil, err
    }

    return es.applyEvents(events)
}

Sharding Strategies

Dynamic Sharding Implementation

type ShardManager struct {
    shardMap     *concurrent.Map
    coordinator  *etcd.Client
    rebalancer  *ShardRebalancer
}

type ShardInfo struct {
    ID        string
    KeyRange  KeyRange
    NodeID    string
    Status    ShardStatus
}

func (sm *ShardManager) GetShard(key []byte) (*ShardInfo, error) {
    hash := sm.hashKey(key)
    return sm.findShardByHash(hash)
}

func (sm *ShardManager) Rebalance(ctx context.Context) error {
    // Lock for rebalancing
    if err := sm.coordinator.Lock(ctx, "shard-rebalance"); err != nil {
        return err
    }
    defer sm.coordinator.Unlock(ctx, "shard-rebalance")

    // Calculate optimal distribution
    distribution := sm.rebalancer.CalculateOptimalDistribution()

    // Apply changes
    return sm.applyRebalancing(ctx, distribution)
}

// Consistent hashing implementation
func (sm *ShardManager) hashKey(key []byte) uint64 {
    h := fnv.New64a()
    h.Write(key)
    return h.Sum64()
}

Load Balancing and Rate Limiting

Adaptive Rate Limiter

type AdaptiveRateLimiter struct {
    tokens         atomic.Int64
    refillRate     atomic.Float64
    lastRefill     atomic.Int64
    metrics        *MetricsCollector
}

func (l *AdaptiveRateLimiter) Allow() bool {
    now := time.Now().UnixNano()
    lastRefill := l.lastRefill.Load()

    // Refill tokens based on time elapsed
    elapsed := float64(now - lastRefill) / float64(time.Second)
    newTokens := int64(elapsed * l.refillRate.Load())

    if newTokens > 0 {
        l.tokens.Add(newTokens)
        l.lastRefill.Store(now)
    }

    // Try to acquire token
    if l.tokens.Load() > 0 {
        l.tokens.Add(-1)
        return true
    }

    return false
}

func (l *AdaptiveRateLimiter) AdjustRate(latency time.Duration) {
    // Adjust rate based on system performance
    currentRate := l.refillRate.Load()
    if latency > targetLatency {
        // Decrease rate if system is struggling
        newRate := currentRate * 0.95
        l.refillRate.Store(newRate)
    } else {
        // Increase rate if system is healthy
        newRate := currentRate * 1.05
        l.refillRate.Store(math.Min(newRate, maxRate))
    }
}

Data Partitioning and Replication

Multi-Region Data Sync

type ReplicationManager struct {
    regions     map[string]*RegionInfo
    syncQueue   *kafka.Producer
    consistency ConsistencyLevel
}

type RegionInfo struct {
    ID       string
    Priority int
    Latency  time.Duration
    Status   RegionStatus
}

func (rm *ReplicationManager) Replicate(ctx context.Context, data *Data) error {
    // Determine target regions based on consistency level
    targets := rm.getTargetRegions(rm.consistency)

    // Create replication tasks
    tasks := make([]*ReplicationTask, len(targets))
    for i, region := range targets {
        tasks[i] = &ReplicationTask{
            Region: region,
            Data:   data,
        }
    }

    // Execute replication
    return rm.executeReplication(ctx, tasks)
}

func (rm *ReplicationManager) executeReplication(ctx context.Context, tasks []*ReplicationTask) error {
    g, ctx := errgroup.WithContext(ctx)

    for _, task := range tasks {
        t := task // Create new variable for goroutine
        g.Go(func() error {
            return rm.syncQueue.Produce(ctx, &kafka.Message{
                Topic: t.Region.ID,
                Key:   []byte(t.Data.ID),
                Value: t.Data.Payload,
            })
        })
    }

    return g.Wait()
}

Performance Optimization

Connection Pooling

type ConnectionPool struct {
    idle     chan *Connection
    active   *sync.Map
    factory  ConnectionFactory
    config   PoolConfig
}

func (p *ConnectionPool) Get(ctx context.Context) (*Connection, error) {
    // Try to get idle connection
    select {
    case conn := <-p.idle:
        if conn.IsValid() {
            p.active.Store(conn.ID, conn)
            return conn, nil
        }
        // Invalid connection, create new one
    default:
        // No idle connections available
    }

    // Create new connection
    conn, err := p.factory.Create(ctx)
    if err != nil {
        return nil, err
    }

    p.active.Store(conn.ID, conn)
    return conn, nil
}

func (p *ConnectionPool) Release(conn *Connection) {
    p.active.Delete(conn.ID)

    // Return to idle pool if valid
    if conn.IsValid() {
        select {
        case p.idle <- conn:
            // Successfully returned to idle pool
        default:
            // Idle pool full, close connection
            conn.Close()
        }
    }
}

Conclusion

These scalability patterns represent battle-tested solutions for handling high-scale distributed systems. The key is understanding not just how to implement them, but when each pattern is appropriate and what trade-offs it introduces. Remember that premature optimization is the root of all evil - implement these patterns only when metrics and monitoring indicate they’re needed.