MicroServices Coordination – 2

Quick Review: The Coordination Playbook

  • Synchronous (Request-Response) 📞: One service makes a direct call, typically a REST API request over HTTP, to another service and waits for an immediate response. This creates temporal coupling, meaning the caller’s performance is directly tied to the availability and speed of the service it’s calling, making it potentially brittle.
  • Asynchronous (Event-Based) 🍾: Services communicate indirectly by publishing event messages to a central message broker (like RabbitMQ or Kafka) without waiting for a reply. This removes temporal coupling, allowing services to function independently and making the overall system more resilient and scalable.
  • Saga Pattern: Manages complex, multi-step business operations by sequencing local transactions within each service. If any step fails, the Saga executes compensating transactions to semantically undo the work of preceding steps, ensuring data consistency without using locking distributed transactions.
    • Choreography: Services publish and subscribe to events to trigger each other’s actions. This approach is highly decoupled, but the overall workflow logic is implicit and can be difficult to track.
    • Orchestration: A central controller service sends explicit commands to direct the participant services. This makes the workflow logic centralized and easy to manage but introduces a dependency on the orchestrator.
  • Two-Phase Commit (2PC) 🏛️: A protocol that ensures atomic commitment across all services in a transaction. It uses a coordinator that first asks all participants to “prepare” (vote) and then, based on the votes, issues a final “commit” or “abort” command. Its major drawback is that it’s a blocking protocol; if the coordinator fails, resources can remain locked.
  • Three-Phase Commit (3PC) 🛡️: An evolution of 2PC that adds a “pre-commit” phase. This extra step reduces the risk of blocking by allowing participants to proceed if the coordinator fails after the pre-commit decision has been made. However, its added complexity and network overhead mean it is rarely used in practice

<< Micro Services Coordination – 1

Micro Services Coordination – 3 >>

The Saga Pattern: Managing Complex Workflows

What if a business process involves multiple steps across different services? An e-commerce order might need to process payment, update inventory, and arrange shipping. If any of those steps fail, you need to undo the previous steps to avoid an inconsistent state (like taking a payment for an out-of-stock item). That’s what the Saga pattern is for.

A Saga is a sequence of local transactions. If any local transaction fails, the Saga executes compensating transactions to roll back the changes.

Scenario: An E-Commerce Order Process

We’ll model a simplified order process involving three microservices:

  1. Order Service: Creates an order and initiates the saga.
  2. Payment Service: Processes the payment for the order.
  3. Inventory Service: Reserves the items for the order.

The business rule is: An order is only confirmed if both payment is successful and inventory is reserved. If either fails, the entire transaction must be rolled back (e.g., refunding a payment if inventory is unavailable).

Saga Flavor 1: Choreography (The Dance) 💃

In a choreographed saga, there’s no central director. Services subscribe to each other’s events to know when it’s their turn to act. The OrderService publishes OrderCreated, which the PaymentService hears. The PaymentService then processes the payment and publishes PaymentProcessed, which the InventoryService hears.

It’s very loosely coupled, but it can get hard to track the overall status of the business process, as the logic is spread across many services.

Project Structure

For both examples, we’ll assume a solution structure like this:

/SagaExample
|-- /Common                    (Shared Class Library for event/command contracts)
|-- /Order.API                 (ASP.NET Core Web API)
|-- /Payment.API               (ASP.NET Core Web API)
|-- /Inventory.API             (ASP.NET Core Web API)

Shared Contracts (Common Project)

Create these event classes in the Common project so they can be shared across services.

// Common/Events.cs
public abstract record IntegrationEvent(Guid CorrelationId);

public record OrderCreatedEvent(Guid CorrelationId, int OrderId, decimal Amount, string ProductName) : IntegrationEvent(CorrelationId);

// Events for Choreography
public record PaymentSuccessfulEvent(Guid CorrelationId, int OrderId) : IntegrationEvent(CorrelationId);
public record PaymentFailedEvent(Guid CorrelationId, int OrderId, string Reason) : IntegrationEvent(CorrelationId);
public record InventoryReservedEvent(Guid CorrelationId, int OrderId) : IntegrationEvent(CorrelationId);
public record InventoryReservationFailedEvent(Guid CorrelationId, int OrderId, string Reason) : IntegrationEvent(CorrelationId);

// Events for Orchestration (Replies)
public record OrderStatusUpdatedEvent(Guid CorrelationId, int OrderId, string Status) : IntegrationEvent(CorrelationId);

// Commands for Orchestration
public record ProcessPaymentCommand(Guid CorrelationId, int OrderId, decimal Amount) : IntegrationEvent(CorrelationId);
public record RefundPaymentCommand(Guid CorrelationId, int OrderId) : IntegrationEvent(CorrelationId);
public record ReserveInventoryCommand(Guid CorrelationId, int OrderId, string ProductName) : IntegrationEvent(CorrelationId);
public record ReleaseInventoryCommand(Guid CorrelationId, int OrderId) : IntegrationEvent(CorrelationId);

1. Choreography Saga Pattern

In this pattern, services communicate by publishing and subscribing to events. There is no central coordinator.

Azure Service Bus Setup (Choreography)

  1. Create a Topic named saga-events.
  2. Order Service will subscribe to saga-events to listen for final outcomes.
  3. Payment Service will create a subscription to saga-events to listen for OrderCreatedEvent.
  4. Inventory Service will create a subscription to saga-events to listen for PaymentSuccessfulEvent.

Implementation

Each service will have a background service (IHostedService) to listen to its subscription on the saga-events topic.

Order.API

// Order.API/Controllers/OrdersController.cs
[ApiController]
[Route("[controller]")]
public class OrdersController : ControllerBase
{
    private readonly ServiceBusSender _sender;
    public OrdersController(ServiceBusClient client)
    {
        _sender = client.CreateSender("saga-events");
    }

    [HttpPost]
    public async Task<IActionResult> CreateOrder([FromBody] OrderRequest request)
    {
        // In a real app, save order to DB with 'Pending' status
        int orderId = new Random().Next(1000, 9999);
        Console.WriteLine($"[OrderSvc] Creating Order {orderId}. Status: Pending");

        var orderCreatedEvent = new OrderCreatedEvent(
            CorrelationId: Guid.NewGuid(),
            OrderId: orderId,
            Amount: request.Amount,
            ProductName: request.ProductName);

        var message = new ServiceBusMessage(JsonSerializer.Serialize(orderCreatedEvent))
        {
            // Use ApplicationProperties to route message type
            ApplicationProperties = { ["MessageType"] = nameof(OrderCreatedEvent) }
        };
        
        await _sender.SendMessageAsync(message);
        return Ok(new { orderId, status = "Pending" });
    }
}

// Order.API/Listeners/SagaEventListener.cs - listens for final outcomes
public class SagaEventListener : IHostedService
{
    // ... setup ServiceBusProcessor to listen to a subscription on 'saga-events'
    // This listener would process InventoryReservedEvent or any failure event
    // to update the final order status (e.g., 'Confirmed' or 'Failed').
}

Payment.API

This service listens for OrderCreatedEvent and InventoryReservationFailedEvent (to process refunds).

// Payment.API/Listeners/SagaEventListener.cs
public class SagaEventListener : IHostedService
{
    private readonly ServiceBusProcessor _processor;
    private readonly ServiceBusSender _sender;

    public SagaEventListener(ServiceBusClient client)
    {
        // Create a subscription to 'saga-events' topic
        // Optionally use filters to only receive messages of interest
        var processorOptions = new ServiceBusProcessorOptions { /* ... */ };
        _processor = client.CreateProcessor("saga-events", "payment_subscription", processorOptions);
        _sender = client.CreateSender("saga-events"); // To publish reply events
    }

    protected override async Task ExecuteAsync(CancellationToken stoppingToken)
    {
        _processor.ProcessMessageAsync += MessageHandler;
        _processor.ProcessErrorAsync += ErrorHandler;
        await _processor.StartProcessingAsync(stoppingToken);
    }

    private async Task MessageHandler(ProcessMessageEventArgs args)
    {
        var messageType = args.Message.ApplicationProperties["MessageType"].ToString();

        if (messageType == nameof(OrderCreatedEvent))
        {
            var body = args.Message.Body.ToString();
            var orderEvent = JsonSerializer.Deserialize<OrderCreatedEvent>(body);
            Console.WriteLine($"[PaymentSvc] Received OrderCreatedEvent for Order {orderEvent.OrderId}. Processing payment...");
            
            // Simulate payment success/failure
            bool paymentSuccess = new Random().Next(1, 10) > 2; // 80% success rate

            IntegrationEvent replyEvent = paymentSuccess
                ? new PaymentSuccessfulEvent(orderEvent.CorrelationId, orderEvent.OrderId)
                : new PaymentFailedEvent(orderEvent.CorrelationId, orderEvent.OrderId, "Insufficient funds");
            
            Console.WriteLine($"[PaymentSvc] Payment for Order {orderEvent.OrderId} {(paymentSuccess ? "Successful" : "Failed")}.");

            var replyMessage = new ServiceBusMessage(JsonSerializer.Serialize(replyEvent))
            {
                ApplicationProperties = { ["MessageType"] = replyEvent.GetType().Name }
            };
            await _sender.SendMessageAsync(replyMessage);
        }
        else if (messageType == nameof(InventoryReservationFailedEvent))
        {
            // COMPENSATING TRANSACTION
            var body = args.Message.Body.ToString();
            var inventoryFailedEvent = JsonSerializer.Deserialize<InventoryReservationFailedEvent>(body);
            Console.WriteLine($"[PaymentSvc] Received InventoryReservationFailedEvent for Order {inventoryFailedEvent.OrderId}. Refunding payment.");
            // ... logic to refund payment ...
        }

        await args.CompleteMessageAsync(args.Message);
    }
    // ... ErrorHandler and other methods
}

Inventory.API

// Inventory.API/Listeners/SagaEventListener.cs
// This listener subscribes to 'saga-events' and listens for 'PaymentSuccessfulEvent'
// If inventory is available, it publishes 'InventoryReservedEvent'.
// If not, it publishes 'InventoryReservationFailedEvent', which triggers the payment refund.
// Code structure is very similar to Payment.API's listener.

2. Orchestration Saga Pattern

In this pattern, a central orchestrator manages the workflow. It sends commands to services and reacts to their reply events. The orchestrator can be part of the originating service (e.g., Order Service) or a separate service.

Azure Service Bus Setup (Orchestration)

  1. Create Queues for commands: payment-commands, inventory-commands.
  2. Create a Topic for replies: saga-replies.
  3. The Orchestrator (in Order Service) will subscribe to the saga-replies topic to get all replies.
  4. Payment Service will listen on the payment-commands queue and publish replies to saga-replies.
  5. Inventory Service will listen on the inventory-commands queue and publish replies to saga-replies.

Implementation

Order.API (with Orchestrator)

The orchestrator needs to maintain the state of each saga. For this example, we use a simple in-memory dictionary. In a production system, this state must be persisted (e.g., in Azure Cosmos DB or SQL Server) to handle restarts and failures.

// Order.API/Sagas/OrderSagaOrchestrator.cs
public class OrderSagaOrchestrator : IHostedService
{
    // In-memory state store. In production, use a database!
    private static readonly ConcurrentDictionary<Guid, SagaState> _sagaStates = new();
    
    private readonly ServiceBusProcessor _replyProcessor;
    private readonly ServiceBusSender _paymentCommandSender;
    private readonly ServiceBusSender _inventoryCommandSender;

    public OrderSagaOrchestrator(ServiceBusClient client)
    {
        _replyProcessor = client.CreateProcessor("saga-replies", "orchestrator_subscription");
        _paymentCommandSender = client.CreateSender("payment-commands");
        _inventoryCommandSender = client.CreateSender("inventory-commands");
    }

    public static void StartSaga(SagaState initialState, ServiceBusSender paymentSender)
    {
        _sagaStates.TryAdd(initialState.CorrelationId, initialState);
        Console.WriteLine($"[Orchestrator] Starting Saga for Order {initialState.OrderId}");
        
        var command = new ProcessPaymentCommand(initialState.CorrelationId, initialState.OrderId, initialState.Amount);
        var message = new ServiceBusMessage(JsonSerializer.Serialize(command));
        paymentSender.SendMessageAsync(message).GetAwaiter().GetResult();
    }

    private async Task MessageHandler(ProcessMessageEventArgs args)
    {
        var correlationId = Guid.Parse(args.Message.CorrelationId);
        if (!_sagaStates.TryGetValue(correlationId, out var state)) return;

        var messageType = args.Message.ApplicationProperties["MessageType"].ToString();
        
        switch (messageType)
        {
            case nameof(PaymentSuccessfulEvent):
                Console.WriteLine($"[Orchestrator] Payment successful for Order {state.OrderId}. Reserving inventory...");
                state.CurrentStep = "InventoryReservation";
                var reserveCmd = new ReserveInventoryCommand(correlationId, state.OrderId, state.ProductName);
                await _inventoryCommandSender.SendMessageAsync(new ServiceBusMessage(JsonSerializer.Serialize(reserveCmd)));
                break;

            case nameof(PaymentFailedEvent):
                Console.WriteLine($"[Orchestrator] Payment failed for Order {state.OrderId}. Saga failed.");
                _sagaStates.TryRemove(correlationId, out _); // End of saga
                // Update order status to 'Failed'
                break;

            case nameof(InventoryReservedEvent):
                Console.WriteLine($"[Orchestrator] Inventory reserved for Order {state.OrderId}. Saga successful!");
                 _sagaStates.TryRemove(correlationId, out _); // End of saga
                // Update order status to 'Confirmed'
                break;

            case nameof(InventoryReservationFailedEvent):
                Console.WriteLine($"[Orchestrator] Inventory reservation failed for Order {state.OrderId}. Refunding payment...");
                state.CurrentStep = "RefundingPayment";
                // COMPENSATING TRANSACTION COMMAND
                var refundCmd = new RefundPaymentCommand(correlationId, state.OrderId);
                await _paymentCommandSender.SendMessageAsync(new ServiceBusMessage(JsonSerializer.Serialize(refundCmd)));
                _sagaStates.TryRemove(correlationId, out _); // End of saga
                break;
        }
        await args.CompleteMessageAsync(args.Message);
    }
    // ... Other IHostedService methods
}

// Order.API/Controllers/OrdersController.cs
[HttpPost]
public IActionResult CreateOrder([FromBody] OrderRequest request)
{
    // Save order to DB with 'Pending' status
    int orderId = new Random().Next(1000, 9999);
    var initialState = new SagaState { /* ... set properties ... */ };

    OrderSagaOrchestrator.StartSaga(initialState, _paymentCommandSender);
    return Ok(new { orderId, status = "Orchestration Started" });
}

Payment.API (Orchestration Participant)

The participant services are now much simpler. They just execute commands and publish results.

// Payment.API/Listeners/CommandListener.cs
public class CommandListener : IHostedService
{
    private readonly ServiceBusProcessor _processor;
    private readonly ServiceBusSender _replySender;

    public CommandListener(ServiceBusClient client)
    {
        _processor = client.CreateProcessor("payment-commands"); // Listen on command queue
        _replySender = client.CreateSender("saga-replies"); // Publish to reply topic
    }
    
    private async Task MessageHandler(ProcessMessageEventArgs args)
    {
        var command = JsonSerializer.Deserialize<IntegrationEvent>(args.Message.Body.ToString());
        IntegrationEvent replyEvent = null;

        if (command is ProcessPaymentCommand cmd)
        {
            Console.WriteLine($"[PaymentSvc] Received ProcessPaymentCommand for Order {cmd.OrderId}.");
            bool paymentSuccess = new Random().Next(1, 10) > 2;
            replyEvent = paymentSuccess
                ? new PaymentSuccessfulEvent(cmd.CorrelationId, cmd.OrderId)
                : new PaymentFailedEvent(cmd.CorrelationId, cmd.OrderId, "Insufficient funds");
        }
        else if (command is RefundPaymentCommand refundCmd)
        {
            Console.WriteLine($"[PaymentSvc] Received RefundPaymentCommand for Order {refundCmd.OrderId}.");
            // ... logic to refund payment ...
            // For simplicity, we don't publish an event for a compensation's success
        }

        if (replyEvent != null)
        {
            var replyMessage = new ServiceBusMessage(JsonSerializer.Serialize(replyEvent))
            {
                CorrelationId = replyEvent.CorrelationId.ToString(),
                ApplicationProperties = { ["MessageType"] = replyEvent.GetType().Name }
            };
            await _replySender.SendMessageAsync(replyMessage);
        }
        await args.CompleteMessageAsync(args.Message);
    }
    // ... other methods
}

Choreography vs. Orchestration: Weighing the Trade-offs

Choosing between Choreography and Orchestration involves considering several factors and their respective trade-offs:

FeatureChoreography-Based SagaOrchestration-Based Saga
CoordinationDecentralized: Services react to events.Centralized: A dedicated orchestrator directs the flow.
CouplingLoose coupling: Services are unaware of each other, only the events they consume and produce.Tighter coupling to the orchestrator: Services depend on the orchestrator’s API but not directly on each other.
ComplexityEasier for simple workflows: Adding services is straightforward. Harder to manage complex workflows: Flow is implicit.Easier to manage complex workflows: Workflow is explicitly defined in the orchestrator. Participant services are simpler.
Failure HandlingDecentralized: Each service handles its own compensation based on failure events.Centralized: The orchestrator manages all failure scenarios and initiates compensation.
DependenciesHigher risk of cyclic dependencies if not carefully designed.Lower risk of cyclic dependencies due to the centralized control of the orchestrator.
ScalabilityOften scales well due to the decoupled nature and reliance on message brokers.The orchestrator can become a bottleneck if not designed to be highly scalable.
TestingMore challenging to perform end-to-end testing as the flow is distributed.Easier to test the overall flow by interacting with the orchestrator.
VisibilityHarder to get a holistic view of the entire Saga process as it’s spread across services.Easier to monitor and track the progress of the Saga through the orchestrator.

Making the Right Choice:

  • Opt for Choreography when you have simpler business processes involving a small number of services and desire maximum decoupling. It’s well-suited for event-driven architectures where services naturally react to changes.
  • Choose Orchestration for more complex workflows with numerous steps, conditional logic, or the need for better visibility and control over the entire process. It simplifies individual services and centralizes the Saga’s logic.

Conclusion: Achieving Harmony in Distributed Systems

The Saga pattern provides a crucial mechanism for ensuring data consistency in microservice architectures without the limitations of distributed transactions. Understanding the nuances of Choreography and Orchestration, along with their respective trade-offs, empowers development teams in Brampton, Ontario, and beyond, to design resilient and reliable distributed systems. By carefully considering the complexity of their business processes and the desired level of coupling and control, teams can select the Saga implementation that best suits their needs and helps them achieve harmony across their diverse set of microservices.