Advanced Microservices Patterns and Anti-Patterns
Introduction#
Building resilient microservices requires more than just breaking down a monolith. This post explores advanced patterns and anti-patterns I’ve encountered while architecting large-scale microservices systems, with practical implementations and lessons learned.
Circuit Breaker Pattern#
Implementation with Hystrix-style Circuit Breaker#
type CircuitBreaker struct {
name string
maxFailures int32
resetTimeout time.Duration
failureCount atomic.Int32
lastFailure atomic.Int64
state atomic.Value // stores State
metrics *MetricsCollector
}
type State int
const (
StateClosed State = iota
StateHalfOpen
StateOpen
)
func NewCircuitBreaker(name string, maxFailures int32, resetTimeout time.Duration) *CircuitBreaker {
cb := &CircuitBreaker{
name: name,
maxFailures: maxFailures,
resetTimeout: resetTimeout,
}
cb.state.Store(StateClosed)
return cb
}
func (cb *CircuitBreaker) Execute(ctx context.Context, command func() error) error {
if !cb.allowRequest() {
return ErrCircuitOpen
}
err := command()
cb.recordResult(err)
return err
}
func (cb *CircuitBreaker) allowRequest() bool {
state := cb.state.Load().(State)
switch state {
case StateClosed:
return true
case StateOpen:
if time.Since(time.Unix(0, cb.lastFailure.Load())) > cb.resetTimeout {
cb.state.Store(StateHalfOpen)
return true
}
return false
case StateHalfOpen:
return true
default:
return false
}
}
func (cb *CircuitBreaker) recordResult(err error) {
if err != nil {
failures := cb.failureCount.Add(1)
cb.lastFailure.Store(time.Now().UnixNano())
if failures >= cb.maxFailures {
cb.state.Store(StateOpen)
}
cb.metrics.IncrementCounter(fmt.Sprintf("circuit_breaker.%s.failure", cb.name))
} else {
cb.failureCount.Store(0)
if cb.state.Load().(State) == StateHalfOpen {
cb.state.Store(StateClosed)
}
cb.metrics.IncrementCounter(fmt.Sprintf("circuit_breaker.%s.success", cb.name))
}
}
Service Mesh Implementation#
Custom Service Mesh Controller#
type ServiceMesh struct {
discovery *ServiceDiscovery
loadBalancer *LoadBalancer
circuitBreaker *CircuitBreaker
tracing *TracingProvider
metrics *MetricsCollector
}
type ServiceInstance struct {
ID string
Name string
Version string
Endpoints []string
Metadata map[string]string
Health HealthStatus
}
func (sm *ServiceMesh) RouteRequest(ctx context.Context, target string) (*ServiceInstance, error) {
// Get available instances
instances, err := sm.discovery.GetInstances(target)
if err != nil {
return nil, fmt.Errorf("service discovery error: %w", err)
}
// Filter unhealthy instances
healthy := filterHealthyInstances(instances)
if len(healthy) == 0 {
return nil, ErrNoHealthyInstances
}
// Apply load balancing
instance := sm.loadBalancer.Choose(healthy)
// Add tracing context
span := sm.tracing.StartSpan(ctx, "route_request")
defer span.End()
// Record metrics
sm.metrics.RecordRequest(target, instance.ID)
return instance, nil
}
func (sm *ServiceMesh) HandleRequest(ctx context.Context, req *Request) (*Response, error) {
instance, err := sm.RouteRequest(ctx, req.Service)
if err != nil {
return nil, err
}
return sm.circuitBreaker.Execute(ctx, func() error {
return sm.sendRequest(ctx, instance, req)
})
}
Distributed Transaction Management#
Saga Pattern Implementation#
type Saga struct {
steps []SagaStep
coordinator *SagaCoordinator
logger *Logger
}
type SagaStep struct {
Execute func(ctx context.Context) error
Compensate func(ctx context.Context) error
Name string
}
func (s *Saga) Execute(ctx context.Context) error {
var executedSteps []int
// Start transaction
txID := s.coordinator.Begin()
defer func() {
if len(executedSteps) > 0 {
s.coordinator.End(txID)
}
}()
// Execute steps
for i, step := range s.steps {
if err := step.Execute(ctx); err != nil {
// Compensate executed steps in reverse order
s.compensate(ctx, executedSteps)
return fmt.Errorf("saga step %s failed: %w", step.Name, err)
}
executedSteps = append(executedSteps, i)
}
return nil
}
func (s *Saga) compensate(ctx context.Context, executedSteps []int) {
for i := len(executedSteps) - 1; i >= 0; i-- {
step := s.steps[executedSteps[i]]
if err := step.Compensate(ctx); err != nil {
s.logger.Error("compensation failed",
"step", step.Name,
"error", err)
}
}
}
API Gateway Pattern#
Implementation with Dynamic Routing#
type APIGateway struct {
router *Router
auth *Authenticator
rateLimit *RateLimiter
cache *Cache
monitoring *Monitor
}
type Route struct {
Path string
Service string
Method string
Timeout time.Duration
RateLimit int
CacheTTL time.Duration
AuthRequired bool
}
func (g *APIGateway) HandleRequest(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
// Rate limiting
if !g.rateLimit.Allow(r) {
http.Error(w, "rate limit exceeded", http.StatusTooManyRequests)
return
}
// Authentication if required
route := g.router.FindRoute(r)
if route.AuthRequired {
if err := g.auth.Authenticate(r); err != nil {
http.Error(w, "unauthorized", http.StatusUnauthorized)
return
}
}
// Check cache
if route.CacheTTL > 0 {
if cached, ok := g.cache.Get(r.URL.Path); ok {
w.Write(cached)
return
}
}
// Set timeout
ctx, cancel := context.WithTimeout(ctx, route.Timeout)
defer cancel()
// Forward request
resp, err := g.forwardRequest(ctx, route, r)
if err != nil {
g.monitoring.RecordError(route.Service, err)
http.Error(w, "service unavailable", http.StatusServiceUnavailable)
return
}
// Cache response if needed
if route.CacheTTL > 0 {
g.cache.Set(r.URL.Path, resp, route.CacheTTL)
}
w.Write(resp)
}
Service Discovery and Registration#
Dynamic Service Registry#
type ServiceRegistry struct {
mu sync.RWMutex
services map[string][]ServiceInstance
watchers map[string][]chan ServiceEvent
health *HealthChecker
}
type ServiceEvent struct {
Type EventType
Service string
Instance ServiceInstance
}
func (sr *ServiceRegistry) Register(ctx context.Context, instance ServiceInstance) error {
sr.mu.Lock()
defer sr.mu.Unlock()
// Validate instance
if err := sr.validateInstance(instance); err != nil {
return err
}
// Add to registry
services := sr.services[instance.Name]
sr.services[instance.Name] = append(services, instance)
// Notify watchers
sr.notifyWatchers(ServiceEvent{
Type: EventTypeAdd,
Service: instance.Name,
Instance: instance,
})
// Start health checking
go sr.health.Monitor(ctx, instance)
return nil
}
func (sr *ServiceRegistry) Watch(service string) <-chan ServiceEvent {
sr.mu.Lock()
defer sr.mu.Unlock()
ch := make(chan ServiceEvent, 100)
sr.watchers[service] = append(sr.watchers[service], ch)
// Send initial state
go func() {
for _, instance := range sr.services[service] {
ch <- ServiceEvent{
Type: EventTypeAdd,
Service: service,
Instance: instance,
}
}
}()
return ch
}
Conclusion#
These patterns represent solutions to common challenges in microservices architectures, but they come with their own complexities. The key is to understand when to apply them and how to manage their inherent complexity. Remember that each pattern introduces operational overhead - use them judiciously based on your specific requirements and scale.