Advanced Scalability Patterns for Modern Distributed Systems
Introduction
Scaling distributed systems beyond their initial design capacity often requires fundamental architectural changes. This post explores advanced scalability patterns I’ve implemented in high-throughput systems, with practical examples and performance implications.
Command Query Responsibility Segregation (CQRS)
Implementation Example
// Command side
type OrderCommand struct {
OrderID string
CustomerID string
Items []OrderItem
TotalAmount decimal.Decimal
}
type OrderCommandHandler struct {
eventStore EventStore
outbox MessageOutbox
}
func (h *OrderCommandHandler) Handle(ctx context.Context, cmd OrderCommand) error {
// Create and validate order
order := domain.NewOrder(cmd.OrderID, cmd.CustomerID, cmd.Items)
if err := order.Validate(); err != nil {
return err
}
// Store events
events := order.Changes()
if err := h.eventStore.Store(ctx, events); err != nil {
return err
}
// Publish to outbox for eventual consistency
return h.outbox.Publish(ctx, events)
}
// Query side
type OrderQuery struct {
repository *OrderReadRepository
cache *redis.Client
}
func (q *OrderQuery) GetOrderSummary(ctx context.Context, orderID string) (*OrderSummary, error) {
// Try cache first
summary, err := q.cache.Get(ctx, fmt.Sprintf("order:%s", orderID)).Result()
if err == nil {
return unmarshalOrderSummary(summary)
}
// Fall back to read repository
return q.repository.GetOrderSummary(ctx, orderID)
}
Event Sourcing with Snapshots
Event Store Implementation
type Event struct {
AggregateID string
Version int
Type string
Data []byte
Timestamp time.Time
}
type EventStore struct {
db *sql.DB
snapshots *SnapshotStore
}
func (es *EventStore) Store(ctx context.Context, events []Event) error {
tx, err := es.db.BeginTx(ctx, nil)
if err != nil {
return err
}
defer tx.Rollback()
for _, event := range events {
if err := es.insertEvent(ctx, tx, event); err != nil {
return err
}
}
// Create snapshot if needed
if shouldCreateSnapshot(events) {
if err := es.snapshots.Create(ctx, tx, events[len(events)-1]); err != nil {
return err
}
}
return tx.Commit()
}
func (es *EventStore) Rebuild(ctx context.Context, aggregateID string) (interface{}, error) {
// Try loading from snapshot first
snapshot, err := es.snapshots.GetLatest(ctx, aggregateID)
if err == nil {
return snapshot, nil
}
// Rebuild from events if no snapshot
events, err := es.GetEvents(ctx, aggregateID)
if err != nil {
return nil, err
}
return es.applyEvents(events)
}
Sharding Strategies
Dynamic Sharding Implementation
type ShardManager struct {
shardMap *concurrent.Map
coordinator *etcd.Client
rebalancer *ShardRebalancer
}
type ShardInfo struct {
ID string
KeyRange KeyRange
NodeID string
Status ShardStatus
}
func (sm *ShardManager) GetShard(key []byte) (*ShardInfo, error) {
hash := sm.hashKey(key)
return sm.findShardByHash(hash)
}
func (sm *ShardManager) Rebalance(ctx context.Context) error {
// Lock for rebalancing
if err := sm.coordinator.Lock(ctx, "shard-rebalance"); err != nil {
return err
}
defer sm.coordinator.Unlock(ctx, "shard-rebalance")
// Calculate optimal distribution
distribution := sm.rebalancer.CalculateOptimalDistribution()
// Apply changes
return sm.applyRebalancing(ctx, distribution)
}
// Consistent hashing implementation
func (sm *ShardManager) hashKey(key []byte) uint64 {
h := fnv.New64a()
h.Write(key)
return h.Sum64()
}
Load Balancing and Rate Limiting
Adaptive Rate Limiter
type AdaptiveRateLimiter struct {
tokens atomic.Int64
refillRate atomic.Float64
lastRefill atomic.Int64
metrics *MetricsCollector
}
func (l *AdaptiveRateLimiter) Allow() bool {
now := time.Now().UnixNano()
lastRefill := l.lastRefill.Load()
// Refill tokens based on time elapsed
elapsed := float64(now - lastRefill) / float64(time.Second)
newTokens := int64(elapsed * l.refillRate.Load())
if newTokens > 0 {
l.tokens.Add(newTokens)
l.lastRefill.Store(now)
}
// Try to acquire token
if l.tokens.Load() > 0 {
l.tokens.Add(-1)
return true
}
return false
}
func (l *AdaptiveRateLimiter) AdjustRate(latency time.Duration) {
// Adjust rate based on system performance
currentRate := l.refillRate.Load()
if latency > targetLatency {
// Decrease rate if system is struggling
newRate := currentRate * 0.95
l.refillRate.Store(newRate)
} else {
// Increase rate if system is healthy
newRate := currentRate * 1.05
l.refillRate.Store(math.Min(newRate, maxRate))
}
}
Data Partitioning and Replication
Multi-Region Data Sync
type ReplicationManager struct {
regions map[string]*RegionInfo
syncQueue *kafka.Producer
consistency ConsistencyLevel
}
type RegionInfo struct {
ID string
Priority int
Latency time.Duration
Status RegionStatus
}
func (rm *ReplicationManager) Replicate(ctx context.Context, data *Data) error {
// Determine target regions based on consistency level
targets := rm.getTargetRegions(rm.consistency)
// Create replication tasks
tasks := make([]*ReplicationTask, len(targets))
for i, region := range targets {
tasks[i] = &ReplicationTask{
Region: region,
Data: data,
}
}
// Execute replication
return rm.executeReplication(ctx, tasks)
}
func (rm *ReplicationManager) executeReplication(ctx context.Context, tasks []*ReplicationTask) error {
g, ctx := errgroup.WithContext(ctx)
for _, task := range tasks {
t := task // Create new variable for goroutine
g.Go(func() error {
return rm.syncQueue.Produce(ctx, &kafka.Message{
Topic: t.Region.ID,
Key: []byte(t.Data.ID),
Value: t.Data.Payload,
})
})
}
return g.Wait()
}
Performance Optimization
Connection Pooling
type ConnectionPool struct {
idle chan *Connection
active *sync.Map
factory ConnectionFactory
config PoolConfig
}
func (p *ConnectionPool) Get(ctx context.Context) (*Connection, error) {
// Try to get idle connection
select {
case conn := <-p.idle:
if conn.IsValid() {
p.active.Store(conn.ID, conn)
return conn, nil
}
// Invalid connection, create new one
default:
// No idle connections available
}
// Create new connection
conn, err := p.factory.Create(ctx)
if err != nil {
return nil, err
}
p.active.Store(conn.ID, conn)
return conn, nil
}
func (p *ConnectionPool) Release(conn *Connection) {
p.active.Delete(conn.ID)
// Return to idle pool if valid
if conn.IsValid() {
select {
case p.idle <- conn:
// Successfully returned to idle pool
default:
// Idle pool full, close connection
conn.Close()
}
}
}
Conclusion
These scalability patterns represent battle-tested solutions for handling high-scale distributed systems. The key is understanding not just how to implement them, but when each pattern is appropriate and what trade-offs it introduces. Remember that premature optimization is the root of all evil - implement these patterns only when metrics and monitoring indicate they’re needed.