12Message Brokers и Event-Driven Architecture

Уровень 1: Foundation

RabbitMQ Fundamentals

Архитектура RabbitMQ

RabbitMQ — open-source message broker, реализующий протокол AMQP 0-9-1. Работает на Erlang Runtime Environment, что обеспечивает высокую производительность и отказоустойчивость.

Основные концепции

КомпонентОписание
ProducerПриложение, отправляющее сообщения
ExchangeМаршрутизатор, принимающий сообщения от producer'ов
QueueБуфер хранения сообщений
BindingСвязь между Exchange и Queue
ConsumerПриложение, получающее сообщения из Queue
vhostВиртуальный хост — изоляция namespace (пользователи, exchanges, queues)
Producer → Exchange → Binding → Queue → Consumer

Exchange Types

Exchange определяет правила маршрутизации сообщений в Queue.

Direct Exchange

Рутит сообщения в Queue на основе точного совпадения routing key.

Exchange (direct) ──routing key: "order.created"──→ Queue: "orders"
        Exchange (direct) ──routing key: "order.updated"──→ Queue: "order-updates"
        Exchange (direct) ──routing key: "order.created"──→ Queue: "order-analytics"  // binding на тот же key

Использование: Point-to-point коммуникации, когда нужен точный контроль маршрутизации.

C# (RabbitMQ.Client):

var factory = new ConnectionFactory() { HostName = "localhost" };
        using var connection = await factory.CreateConnectionAsync();
        using var channel = await connection.CreateChannelAsync();

        await channel.ExchangeDeclareAsync(
            exchange: "orders",
            type: ExchangeType.Direct,
            durable: true);

        await channel.QueueDeclareAsync(queue: "order-queue", durable: true, exclusive: false, autoDelete: false, arguments: null);
        await channel.QueueBindAsync(queue: "order-queue", exchange: "orders", routingKey: "order.created");

Fanout Exchange

Рассылает сообщения всем привязанным Queue. Игнорирует routing key.

Exchange (fanout) → Queue A
        Exchange (fanout) → Queue B
        Exchange (fanout) → Queue C

Использование: Broadcast событий, когда все подписчики должны получить каждое сообщение.

await channel.ExchangeDeclareAsync(
            exchange: "events",
            type: ExchangeType.Fanout,
            durable: true);

        // Все три queue получат каждое сообщение
        await channel.QueueBindAsync(queue: "queue-a", exchange: "events", routingKey: "");
        await channel.QueueBindAsync(queue: "queue-b", exchange: "events", routingKey: "");
        await channel.QueueBindAsync(queue: "queue-c", exchange: "events", routingKey: "");

Topic Exchange

Рутит сообщения на основе паттерна routing key с поддержкой wildcards:

  • * (star) — заменяет ровно одно слово
  • # (hash) — заменяет ноль или более слов
routing key: "order.created"      → matches "order.*"
        routing key: "order.created"      → matches "order.#"
        routing key: "order.created.v1"   → matches "order.#"
        routing key: "payment.processed"  → matches "order.#"  // НЕТ, разные префиксы
        routing key: "order.created"      → matches "*.created"
        routing key: "order.updated"      → matches "*.created"  // НЕТ

Использование: Pub/Sub с фильтрацией по темам.

await channel.ExchangeDeclareAsync(
            exchange: "topic-exchange",
            type: ExchangeType.Topic,
            durable: true);

        // Получает все события order
        await channel.QueueBindAsync(queue: "all-orders", exchange: "topic-exchange", routingKey: "order.#");

        // Получает только created и deleted
        await channel.QueueBindAsync(queue: "order-lifecycle", exchange: "topic-exchange", routingKey: "order.*");

        // Получает только created
        await channel.QueueBindAsync(queue: "order-created", exchange: "topic-exchange", routingKey: "order.created");

Headers Exchange

Рутит на основе заголовков сообщения, а не routing key. Использует x-match:

  • all — все заголовки должны совпасть
  • any — хотя бы один заголовок должен совпасть
await channel.ExchangeDeclareAsync(
            exchange: "headers-exchange",
            type: ExchangeType.Headers,
            durable: true);

        var args = new Dictionary<string, object>
        {
            ["x-match"] = "all",      // все заголовки должны совпасть
            ["level"] = "critical",
            ["region"] = "eu-west"
        };

        await channel.QueueDeclareAsync(queue: "critical-eu", durable: true, exclusive: false, autoDelete: false, arguments: null);
        await channel.QueueBindAsync(queue: "critical-eu", exchange: "headers-exchange", arguments: args);

Использование: Сложная маршрутизация по метаданным.


Queue Declaration

Параметры Queue

ПараметрТипОписание
durableboolQueue сохраняется при перезапуске брокера
exclusiveboolQueue видна только текущему подключению
autoDeleteboolQueue удаляется, когда последний consumer отключается
argumentsDictionaryДополнительные аргументы (x-message-ttl, x-max-length, x-dead-letter-exchange)

Types of Queues

Durable Queue          — сообщения переживают restart broker
        Exclusive Queue        — приватна для connection (автоматически durable=false)
        Auto-Delete Queue      — удаляется при отсутствии consumers
        Normal Queue           — durable=true, exclusive=false, autoDelete=false
// Durable queue — данные сохраняются на диск
        await channel.QueueDeclareAsync(
            queue: "persistent-orders",
            durable: true,          // переживает restart
            exclusive: false,       // доступна всем consumers
            autoDelete: false,      // не удаляется
            arguments: null);

        // Queue с ограничением размера (backpressure)
        await channel.QueueDeclareAsync(
            queue: "bounded-queue",
            durable: true,
            exclusive: false,
            autoDelete: false,
            arguments: new Dictionary<string, object>
            {
                ["x-max-length"] = 10000,       // макс. 10k сообщений
                ["x-overflow"] = "reject-publish" // отклонять новые при переполнении
            });

        // TTL queue — сообщения живут не дольше 5 минут
        await channel.QueueDeclareAsync(
            queue: "temp-queue",
            durable: false,
            exclusive: false,
            autoDelete: true,
            arguments: new Dictionary<string, object>
            {
                ["x-message-ttl"] = 300000  // 5 минут в мс
            });

Binding Keys и Routing Patterns

Binding key определяет связь между Exchange и Queue.

Direct Exchange:
          Binding key = "order.created"  → точное совпадение

        Topic Exchange:
          Binding key = "order.#"        → все order.*.*
          Binding key = "*.created"      → order.created, payment.created
          Binding key = "order.*.v*"     → order.v1.created, order.v2.created
          Binding key = "#"              → все сообщения

Common Routing Patterns

1. Multiple queues, single binding key (Fanout):

// Все queue получат сообщение, routing key игнорируется
        await channel.QueueBindAsync("queue-a", "fanout-exchange", "");
        await channel.QueueBindAsync("queue-b", "fanout-exchange", "");

2. Multiple queues, topic filtering:

// Queue "logs-all" получает всё
        await channel.QueueBindAsync("logs-all", "topic-exchange", "#");

        // Queue "logs-errors" только ошибки
        await channel.QueueBindAsync("logs-errors", "topic-exchange", "*.error");

        // Queue "logs-warnings" только warnings и выше
        await channel.QueueBindAsync("logs-warnings", "topic-exchange", "*.warning");
        await channel.QueueBindAsync("logs-warnings", "topic-exchange", "*.error");

3. Single queue, multiple binding keys (Direct):

// Одна queue, несколько routing key
        await channel.QueueBindAsync("orders", "direct-exchange", "order.created");
        await channel.QueueBindAsync("orders", "direct-exchange", "order.updated");
        await channel.QueueBindAsync("orders", "direct-exchange", "order.deleted");

Message Properties

BasicProperties

var properties = new BasicProperties
        {
            DeliveryMode = DeliveryModes.Persistent,  // 2 = persistent, 1 = transient
            ContentType = "application/json",
            CorrelationId = Guid.NewGuid().ToString(),
            MessageId = Guid.NewGuid().ToString(),
            Timestamp = new AmqpTimestamp(DateTimeOffset.UtcNow.ToUnixTimeSeconds()),
            Type = "OrderCreatedEvent",
            Priority = 5,            // 0-9, 9 = highest
            Headers = new Dictionary<string, object>
            {
                ["source"] = "order-service",
                ["environment"] = "production"
            },
            Expiration = "300000",   // 5 минут TTL на уровне сообщения
            ReplyTo = "reply-queue"  // для Request/Reply паттерна
        };

Delivery Mode

ModeЗначениеПоведение
Transient1 или DeliveryModes.TransientНе сохраняется на диск, теряется при restart
Persistent2 или DeliveryModes.PersistentСохраняется на диск, переживает restart

Важно: Для persistent сообщений нужно:

  1. Exchange durable=true
  2. Queue durable=true
  3. Message DeliveryMode.Persistent
// Полный pipeline durability
        await channel.ExchangeDeclareAsync("orders", ExchangeType.Direct, durable: true);
        await channel.QueueDeclareAsync("order-queue", durable: true, exclusive: false, autoDelete: false, arguments: null);
        await channel.QueueBindAsync("order-queue", "orders", "order.created");

        var props = new BasicProperties
        {
            DeliveryMode = DeliveryModes.Persistent  // Ключевое свойство
        };

        var body = Encoding.UTF8.GetBytes(jsonMessage);
        await channel.BasicPublishAsync(
            exchange: "orders",
            routingKey: "order.created",
            mandatory: true,  // вернуть, если нет подходящей queue
            basicProperties: props,
            body: body);

Consumer Prefetch (QoS)

Basic.QoS

Определяет максимальное количество не-подтверждённых сообщений на consumer.

// 10 сообщений в flight, только после ack следующего придёт новое
        await channel.BasicQosAsync(
            prefetchSize: 0,       // 0 = no size limit
            prefetchCount: 10,     // max 10 unacked messages
            global: false);         // false = per-consumer, true = per-channel

Prefetch и Throughput

PrefetchThroughputMemoryUse Case
1НизкийМинимальныйСложная обработка, гарантия порядка
10-50СреднийУмеренныйБаланс производительности и памяти
100+ВысокийВысокийПростая обработка, batch operations
// Высоконагруженный consumer
        await channel.BasicQosAsync(prefetchSize: 0, prefetchCount: 100, global: false);

        // Точный контроль — по одному сообщению
        await channel.BasicQosAsync(prefetchSize: 0, prefetchCount: 1, global: false);

Consumer Lifecycle

var consumer = new AsyncEventingBasicConsumer(channel);

        consumer.ReceivedAsync += async (model, ea) =>
        {
            try
            {
                var body = ea.Body.ToArray();
                var message = Encoding.UTF8.GetString(body);
        
                // Обработка сообщения
                await ProcessMessageAsync(message);
        
                // Подтверждение — сообщение удалено из queue
                await channel.BasicAckAsync(deliveryTag: ea.DeliveryTag, multiple: false);
            }
            catch (Exception ex)
            {
                // Nack с requeue=false — сообщение уходит в DLQ
                await channel.BasicNackAsync(deliveryTag: ea.DeliveryTag, multiple: false, requeue: false);
        
                // Nack с requeue=true — возвращаем в queue (можно использовать для retry)
                // await channel.BasicNackAsync(deliveryTag: ea.DeliveryTag, multiple: false, requeue: true);
        
                Logger.LogError(ex, "Error processing message");
            }
        };

        // Start consuming
        await channel.BasicConsumeAsync(queue: "order-queue", autoAck: false, consumer: consumer);

Практика

Topology

                    +---------------------------------------------+
                            |             RabbitMQ Broker                  |
                            |                                             |
          Order Service --> |  exchange: "orders" (direct)                 |
                            |    |                                        |
                            |    +--[order.created]--> queue: "orders"      |
                            |    |                               consumer 1|
                            |    |                                        |
                            |    +--[order.created]--> queue: "order-events"|
                            |                                 fanout        |
                            |                     +-------------------------+|
                            |                     |                         |
                            |              queue: "notifications"  queue: "analytics"
                            |              consumer         consumer
                            +---------------------------------------------+

Реализация

// 1. Order Processing Topology Setup
        public class OrderTopology
        {
            public static async Task SetupAsync(IChannel channel)
            {
                // Main orders exchange
                await channel.ExchangeDeclareAsync("orders", ExchangeType.Direct, durable: true);
        
                // Fanout exchange for order events
                await channel.ExchangeDeclareAsync("order-events", ExchangeType.Fanout, durable: true);
        
                // Queues
                await channel.QueueDeclareAsync("orders", durable: true, exclusive: false, autoDelete: false, arguments: null);
                await channel.QueueDeclareAsync("order-events", durable: true, exclusive: false, autoDelete: false, arguments: null);
                await channel.QueueDeclareAsync("notifications", durable: true, exclusive: false, autoDelete: false, arguments: null);
                await channel.QueueDeclareAsync("analytics", durable: true, exclusive: false, autoDelete: false, arguments: null);
        
                // Bindings
                await channel.QueueBindAsync("orders", "orders", "order.created");
                await channel.QueueBindAsync("orders", "orders", "order.updated");
                await channel.QueueBindAsync("orders", "orders", "order.deleted");
        
                await channel.QueueBindAsync("order-events", "order-events", "");
                await channel.QueueBindAsync("notifications", "order-events", "");
                await channel.QueueBindAsync("analytics", "order-events", "");
            }
        }

        // 2. Reliable Producer с Confirm Channel
        public class OrderProducer
        {
            private readonly IChannel _channel;
    
            public OrderProducer(IChannel channel)
            {
                _channel = channel;
            }
    
            public async Task PublishOrderCreatedAsync(OrderCreatedEvent order)
            {
                // Enable publisher confirms
                await _channel.ConfirmSelectAsync();
        
                var props = new BasicProperties
                {
                    DeliveryMode = DeliveryModes.Persistent,
                    ContentType = "application/json",
                    Type = nameof(OrderCreatedEvent),
                    MessageId = Guid.NewGuid().ToString(),
                    Timestamp = new AmqpTimestamp(DateTimeOffset.UtcNow.ToUnixTimeSeconds())
                };
        
                var body = Encoding.UTF8.GetBytes(JsonSerializer.Serialize(order));
        
                try
                {
                    await _channel.BasicPublishAsync(
                        exchange: "orders",
                        routingKey: "order.created",
                        mandatory: true,
                        basicProperties: props,
                        body: body);
            
                    // Ждём confirm от broker
                    if (await _channel.WaitForConfirmsAsync(TimeSpan.FromSeconds(5)))
                    {
                        Logger.LogInformation("Message confirmed: {MessageId}", props.MessageId);
                    }
                    else
                    {
                        throw new TimeoutException("Publisher confirm timeout");
                    }
                }
                catch (PublicationException ex)
                {
                    Logger.LogError(ex, "Message publication failed: {MessageId}", props.MessageId);
                    throw;
                }
            }
        }

        // 3. Consumer с Manual Ack и Retry Logic
        public class OrderConsumer
        {
            private readonly IChannel _channel;
            private const int MaxRetries = 3;
    
            public OrderConsumer(IChannel channel)
            {
                _channel = channel;
            }
    
            public async Task StartAsync(CancellationToken ct)
            {
                // QoS: 10 messages in flight per consumer
                await _channel.BasicQosAsync(prefetchSize: 0, prefetchCount: 10, global: false);
        
                var consumer = new AsyncEventingBasicConsumer(_channel);
        
                consumer.ReceivedAsync += async (model, ea) =>
                {
                    var body = ea.Body.ToArray();
                    var message = Encoding.UTF8.GetString(body);
                    var messageCount = GetRetryCount(ea);
            
                    try
                    {
                        await ProcessOrderAsync(message);
                        await _channel.BasicAckAsync(deliveryTag: ea.DeliveryTag, multiple: false);
                    }
                    catch (TransientException ex) when (messageCount < MaxRetries)
                    {
                        // Retry с exponential backoff
                        var backoff = TimeSpan.FromSeconds(Math.Pow(2, messageCount));
                        Logger.LogWarning(ex, "Transient error, retry {Count}/{Max}. Backoff: {Backoff}", 
                            messageCount + 1, MaxRetries, backoff);
                
                        await Task.Delay(backoff, ct);
                
                        // Requeue для retry
                        await _channel.BasicNackAsync(deliveryTag: ea.DeliveryTag, multiple: false, requeue: true);
                    }
                    catch (Exception ex)
                    {
                        // Unrecoverable error -> DLQ
                        await _channel.BasicNackAsync(deliveryTag: ea.DeliveryTag, multiple: false, requeue: false);
                        Logger.LogError(ex, "Message sent to DLQ: {MessageId}", ea.BasicProperties.MessageId);
                    }
                };
        
                await _channel.BasicConsumeAsync("orders", autoAck: false, consumer: consumer);
            }
    
            private int GetRetryCount(IBasicProperties props)
            {
                if (props.Headers is Dictionary<string, object> headers &&
                    headers.TryGetValue("x-retry-count", out var countObj) &&
                    countObj is int count)
                {
                    return count;
                }
                return 0;
            }
    
            private Task ProcessOrderAsync(string message) => Task.CompletedTask;
        }

Order DTO

public record OrderCreatedEvent(
            Guid OrderId,
            Guid CustomerId,
            decimal Total,
            string Currency,
            IReadOnlyList<OrderItem> Items,
            DateTimeOffset CreatedAt);

        public record OrderItem(
            Guid ProductId,
            string Name,
            int Quantity,
            decimal Price);

Проверка topology

// Проверка через RabbitMQ Management API
        // GET http://localhost:15672/api/queues/%2F/orders
        // GET http://localhost:15672/api/exchanges/%2F/orders

Сводная таблица Exchange Types

ExchangeRoutingUse Case
DirectТочное совпадение routing keyPoint-to-point, точная маршрутизация
FanoutBroadcast всем QueueNotifications, broadcast
TopicPattern matching (*, #)Pub/Sub с фильтрацией
HeadersMatch по заголовкамСложная маршрутизация по метаданным
DefaultDirect (если тип не указан)Backward compatibility
X-Ray, Consistent Hash, ModbusPlugin-basedСпециализированные сценарии

Checklist для Order Processing

  • [x] Exchange types: Direct для routing, Fanout для broadcast
  • [x] Queue declaration: durable=true для production
  • [x] Binding keys: точные для direct, patterns для topic
  • [x] Message properties: DeliveryMode.Persistent, ContentType, MessageId
  • [x] Consumer prefetch: QoS 10 для балансировки throughput/memory
  • [x] Reliable producer: confirm channel + WaitForConfirmsAsync
  • [x] Consumer: manual ack/nack + retry logic с exponential backoff

Message Patterns

Point-to-Point (Queue-Based)

Каждое сообщение доставляется одному consumer'у из очереди.

Producer → Queue → Consumer A  (получает сообщение 1)
                        → Consumer B  (получает сообщение 2)
                        → Consumer C  (получает сообщение 3)

Characteristics

  • Delivery: Одно сообщение — один consumer
  • Load balancing: Broker автоматически round-robin между consumers
  • Ordering: Порядок не гарантируется при multiple consumers
  • Use case: Task distribution, job processing

Implementation

// Producer
        await channel.BasicPublishAsync(
            exchange: "",
            routingKey: "task-queue",
            mandatory: true,
            basicProperties: new BasicProperties { DeliveryMode = DeliveryModes.Persistent },
            body: Encoding.UTF8.GetBytes(taskJson));

        // Multiple consumers — broker распределяет
        var consumer1 = new AsyncEventingBasicConsumer(channel);
        consumer1.ReceivedAsync += HandleTask1;
        await channel.BasicConsumeAsync("task-queue", false, consumer1);

        var consumer2 = new AsyncEventingBasicConsumer(channel);
        consumer2.ReceivedAsync += HandleTask2;
        await channel.BasicConsumeAsync("task-queue", false, consumer2);
        // Каждое сообщение получит только один consumer

Pub/Sub (Topic-Based)

Сообщение доставляется всем подписанным Queue.

Producer → Topic Exchange → Queue A (subscriber 1)
                              → Queue B (subscriber 2)
                              → Queue C (subscriber 3)

Characteristics

  • Delivery: Одно сообщение — все Queue
  • Isolation: Каждый consumer работает с полной копией данных
  • Decoupling: Producer не знает о consumers
  • Use case: Event broadcasting, notifications, audit logging

Implementation

// Fanout — всем подписчикам
        await channel.ExchangeDeclareAsync("events", ExchangeType.Fanout, durable: true);

        await channel.QueueBindAsync("notifications", "events", "");
        await channel.QueueBindAsync("audit-log", "events", "");
        await channel.QueueBindAsync("analytics", "events", "");

        // Topic — фильтрация по routing key
        await channel.ExchangeDeclareAsync("topic-events", ExchangeType.Topic, durable: true);

        // subscriber 1: только order events
        await channel.QueueBindAsync("order-sub", "topic-events", "order.#");

        // subscriber 2: только payment events
        await channel.QueueBindAsync("payment-sub", "topic-events", "payment.#");

        // subscriber 3: всё
        await channel.QueueBindAsync("all-sub", "topic-events", "#");

Request/Reply (Correlation ID Pattern)

Асинхронная замена RPC. Producer отправляет request и получает response в отдельную queue.

Service A                    RabbitMQ                    Service B
          |                            |                           |
          |-- Request ----------------> |  reply-queue             |
          |   correlationId: abc123    |  -----------------------> |
          |   replyTo: reply-queue     |                           |
          |                            |                           |
          |<-- Response --------------- |  <---------------------- |
          |   correlationId: abc123    |                           |

Implementation

public class RequestReplyClient
        {
            private readonly IChannel _channel;
            private readonly TaskCompletionSource<ReplyMessage> _tcs = new();
            private string _correlationId;
            private IDisposable? _subscription;

            public RequestReplyClient(IChannel channel)
            {
                _channel = channel;
            }

            public async Task<ReplyMessage> SendRequestAsync(RequestMessage request, TimeSpan timeout)
            {
                // Reply queue — exclusive для этого клиента
                var queueName = $"rr-{Guid.NewGuid()}";
                await channel.QueueDeclareAsync(queueName, durable: false, exclusive: true, autoDelete: true, arguments: null);
        
                _correlationId = Guid.NewGuid().ToString();
        
                var props = new BasicProperties
                {
                    CorrelationId = _correlationId,
                    ReplyTo = queueName,
                    ContentType = "application/json",
                    MessageId = Guid.NewGuid().ToString()
                };

                // Слушаем reply queue
                _tcs = new TaskCompletionSource<ReplyMessage>();
        
                var consumer = new AsyncEventingBasicConsumer(_channel);
                consumer.ReceivedAsync += async (model, ea) =>
                {
                    if (ea.BasicProperties.CorrelationId == _correlationId)
                    {
                        var response = JsonSerializer.Deserialize<ReplyMessage>(ea.Body.ToArray());
                        await channel.BasicAckAsync(deliveryTag: ea.DeliveryTag, multiple: false);
                        _tcs.TrySetResult(response!);
                    }
                };
        
                await _channel.BasicConsumeAsync(queueName, false, consumer);

                // Отправляем request
                var requestProps = new BasicProperties
                {
                    CorrelationId = _correlationId,
                    ReplyTo = queueName,
                    ContentType = "application/json"
                };
        
                await _channel.BasicPublishAsync(
                    exchange: "rpc-exchange",
                    routingKey: "process",
                    mandatory: true,
                    basicProperties: requestProps,
                    body: Encoding.UTF8.GetBytes(JsonSerializer.Serialize(request)));

                // Ждём response с таймаутом
                var completed = await Task.WhenAny(_tcs.Task, Task.Delay(timeout));
                if (completed != _tcs.Task)
                {
                    throw new TimeoutException($"Request {_correlationId} timed out");
                }

                return await _tcs.Task;
            }
        }

        public class RequestReplyServer
        {
            private readonly IChannel _channel;

            public RequestReplyServer(IChannel channel)
            {
                _channel = channel;
            }

            public async Task StartAsync()
            {
                await channel.BasicQosAsync(prefetchSize: 0, prefetchCount: 1, global: false);
        
                var consumer = new AsyncEventingBasicConsumer(_channel);
                consumer.ReceivedAsync += async (model, ea) =>
                {
                    var request = JsonSerializer.Deserialize<RequestMessage>(ea.Body.ToArray());
            
                    // Process
                    var response = await ProcessRequestAsync(request!);
            
                    // Reply
                    var replyProps = new BasicProperties
                    {
                        CorrelationId = ea.BasicProperties.CorrelationId,
                        ContentType = "application/json"
                    };
            
                    await _channel.BasicPublishAsync(
                        exchange: "",
                        routingKey: ea.BasicProperties.ReplyTo,
                        mandatory: true,
                        basicProperties: replyProps,
                        body: Encoding.UTF8.GetBytes(JsonSerializer.Serialize(response)));
            
                    await _channel.BasicAckAsync(deliveryTag: ea.DeliveryTag, multiple: false);
                };
        
                await _channel.BasicConsumeAsync("rpc-queue", false, consumer);
            }
    
            private Task<ReplyMessage> ProcessRequestAsync(RequestMessage request) =>
                Task.FromResult(new ReplyMessage { Result = "processed" });
        }

        public record RequestMessage(string Action, string Payload);
        public record ReplyMessage(string Result, string? Error = null);

Competing Consumers

Multiple consumers читают из одной queue, каждый получает уникальные сообщения.

Producer → Queue ──→ Consumer A (получает msg 1, 4, 7...)
                         ──→ Consumer B (получает msg 2, 5, 8...)
                         ──→ Consumer C (получает msg 3, 6, 9...)

Characteristics

  • Scaling: Добавляем consumers → выше throughput
  • Load balancing: Broker автоматически распределяет
  • Ordering: Порядок нарушается при multiple consumers
  • Fault tolerance: Если consumer падает, broker redelivers

Implementation

public class CompetingConsumerPool
        {
            private readonly IChannel _channel;
            private const int WorkerCount = 4;
            private readonly List<Task> _workers = new();

            public CompetingConsumerPool(IChannel channel)
            {
                _channel = channel;
            }

            public async Task StartAsync(CancellationToken ct)
            {
                // QoS для балансировки
                await channel.BasicQosAsync(prefetchSize: 0, prefetchCount: 10, global: false);

                for (int i = 0; i < WorkerCount; i++)
                {
                    var workerId = i;
                    var consumer = new AsyncEventingBasicConsumer(_channel);
            
                    consumer.ReceivedAsync += async (model, ea) =>
                    {
                        try
                        {
                            var message = Encoding.UTF8.GetString(ea.Body.ToArray());
                            Logger.LogInformation("Worker {WorkerId} processing message {MsgId}", 
                                workerId, ea.BasicProperties.MessageId);
                    
                            await ProcessMessageAsync(workerId, message);
                    
                            await _channel.BasicAckAsync(deliveryTag: ea.DeliveryTag, multiple: false);
                        }
                        catch (Exception ex)
                        {
                            await _channel.BasicNackAsync(deliveryTag: ea.DeliveryTag, multiple: false, requeue: false);
                            Logger.LogError(ex, "Worker {WorkerId} failed", workerId);
                        }
                    };
            
                    await _channel.BasicConsumeAsync("task-queue", false, consumer);
                    Logger.LogInformation("Worker {WorkerId} started", workerId);
                }
            }
    
            private Task ProcessMessageAsync(int workerId, string message) => Task.CompletedTask;
        }

Scaling Considerations

ConsumersThroughputLatencyMemoryNotes
1LowLowLowSimple, ordered
N = CPU coresHighLowMediumOptimal for CPU-bound
N >> CPU coresVery HighHigherHighDiminishing returns

Dead Letter Exchange (DLQ)

Необрабатываемые сообщения направляются в отдельный exchange для анализа.

Причины попадания в DLQ

  • Message TTL истёк в queue
  • Queue достигла max length
  • Consumer явно nack'нул с requeue=false

retry queue chain с exponential backoff

[orders] ──(fail)──→ [orders.retry.1] ──(fail)──→ [orders.retry.2] ──(fail)──→ [orders.dlq]
             ↑                   |                        |
             |______________ TTL 5s _______________ TTL 10s ___|
                                          TTL 20s

Implementation

public class DltTopology
        {
            public static async Task SetupAsync(IChannel channel)
            {
                // Main exchanges
                await channel.ExchangeDeclareAsync("orders", ExchangeType.Direct, durable: true);
                await channel.ExchangeDeclareAsync("orders-dlx", ExchangeType.Direct, durable: true);
        
                // Retry exchanges
                await channel.ExchangeDeclareAsync("orders.retry.1", ExchangeType.Direct, durable: true);
                await channel.ExchangeDeclareAsync("orders.retry.2", ExchangeType.Direct, durable: true);
        
                // Retry queues
                await channel.QueueDeclareAsync("orders.retry.1", durable: true, exclusive: false, autoDelete: false, arguments: new Dictionary<string, object>
                {
                    ["x-message-ttl"] = 5000,           // 5 seconds
                    ["x-dead-letter-exchange"] = "orders.retry.2",
                    ["x-dead-letter-routing-key"] = "orders.retry.2"
                });
        
                await channel.QueueDeclareAsync("orders.retry.2", durable: true, exclusive: false, autoDelete: false, arguments: new Dictionary<string, object>
                {
                    ["x-message-ttl"] = 10000,           // 10 seconds
                    ["x-dead-letter-exchange"] = "orders.retry.3",
                    ["x-dead-letter-routing-key"] = "orders.retry.3"
                });
        
                await channel.QueueDeclareAsync("orders.retry.3", durable: true, exclusive: false, autoDelete: false, arguments: new Dictionary<string, object>
                {
                    ["x-message-ttl"] = 20000,           // 20 seconds
                    ["x-dead-letter-exchange"] = "orders-dlx",
                    ["x-dead-letter-routing-key"] = "orders.dlq"
                });
        
                // Main queue с DLX
                await channel.QueueDeclareAsync("orders", durable: true, exclusive: false, autoDelete: false, arguments: new Dictionary<string, object>
                {
                    ["x-dead-letter-exchange"] = "orders-dlx",
                    ["x-dead-letter-routing-key"] = "orders.dlq"
                });
        
                // DLQ
                await channel.QueueDeclareAsync("orders.dlq", durable: true, exclusive: false, autoDelete: false, arguments: null);
        
                // Bindings
                await channel.QueueBindAsync("orders", "orders", "order.created");
                await channel.QueueBindAsync("orders.retry.1", "orders.retry.1", "orders.retry.1");
                await channel.QueueBindAsync("orders.retry.2", "orders.retry.2", "orders.retry.2");
                await channel.QueueBindAsync("orders.retry.3", "orders.retry.3", "orders.retry.3");
                await channel.QueueBindAsync("orders.dlq", "orders-dlx", "orders.dlq");
            }
        }

        // Consumer с retry logic
        public class RetryConsumer
        {
            private const int MaxRetries = 3;
    
            public async Task StartAsync(IChannel channel, CancellationToken ct)
            {
                await channel.BasicQosAsync(prefetchSize: 0, prefetchCount: 10, global: false);
        
                var consumer = new AsyncEventingBasicConsumer(channel);
        
                consumer.ReceivedAsync += async (model, ea) =>
                {
                    var retryCount = GetRetryCount(ea.BasicProperties);
            
                    try
                    {
                        await ProcessMessageAsync(Encoding.UTF8.GetString(ea.Body.ToArray()));
                        await channel.BasicAckAsync(deliveryTag: ea.DeliveryTag, multiple: false);
                    }
                    catch (Exception)
                    {
                        if (retryCount < MaxRetries)
                        {
                            // Nack с requeue=true — сообщение вернётся в queue и попадёт в retry chain
                            // через TTL + DLX механизм
                            await channel.BasicNackAsync(deliveryTag: ea.DeliveryTag, multiple: false, requeue: true);
                        }
                        else
                        {
                            // Max retries exceeded — отправляем в DLQ
                            await channel.BasicNackAsync(deliveryTag: ea.DeliveryTag, multiple: false, requeue: false);
                        }
                    }
                };
        
                await channel.BasicConsumeAsync("orders", false, consumer);
            }
    
            private int GetRetryCount(IBasicProperties props)
            {
                if (props.Headers is Dictionary<string, object> headers &&
                    headers.TryGetValue("x-retry-count", out var count))
                {
                    return (int)count;
                }
                return 0;
            }
    
            private Task ProcessMessageAsync(string message) => Task.CompletedTask;
        }

Сводная таблица Patterns

PatternDeliveryOrderingUse Case
Point-to-Point1:1Нет (round-robin)Task distribution
Pub/Sub1:NНетEvent broadcasting
Request/Reply1:1 asyncДа (correlationId)Async RPC
Competing Consumers1:N (exclusive)НетHorizontal scaling
DLQN:1 (failed)НетError handling

Checklist

  • [x] Point-to-point: queue-based, single consumer per message
  • [x] Pub/Sub: topic-based, multiple subscribers
  • [x] Request/Reply: correlationId + replyTo pattern
  • [x] Competing Consumers: parallel processing с load balancing
  • [x] Dead Letter Exchange: retry queue chain с exponential backoff (TTL + DLX)

Практика


MassTransit / RabbitMQ.Client

MassTransit Overview

MassTransit — open-source message bus framework для .NET, абстрагирующий работу с message broker (RabbitMQ, Azure Service Bus, Kafka, AWS SNS/SQS).

Application
            ↓
        MassTransit Bus
            ├── IBus (publish/send)
            ├── Consumer<TMessage> (receive)
            ├── Saga<TState> (long-running workflows)
            └── Pipeline (validation, routing, error handling)
            ↓
        RabbitMQ / Azure Service Bus / Kafka

MassTransit vs RabbitMQ.Client

FeatureRabbitMQ.ClientMassTransit
AbstractionLow-level AMQPHigh-level messaging
SerializationManualAutomatic (JSON, MessagePack, Protobuf)
Message contractsPlain objectsInterfaces/classes with conventions
Error handlingManualBuilt-in retry, fault, dead letter
SagasNot supportedFull state machine support
PipelineManualConfigurable pipeline steps
Learning curveSteepModerate
PerformanceSlightly higherMinimal overhead
Use caseFine-grained controlEnterprise messaging

MassTransit Abstractions

IBus

Точка отправки сообщений.

// DI registration
        builder.Services.AddMassTransit(x =>
        {
            x.UsingRabbitMq((context, cfg) =>
            {
                cfg.Host("rabbitmq://localhost", host =>
                {
                    host.Username("guest");
                    host.Password("guest");
                });
        
                cfg.ConfigureEndpoints(context);
            });
        });

IPublishEndpoint vs ISendEndpoint

MethodScopeExchange
IPublishEndpoint.Publish<T>All consumers matching routingTopic exchange
IBus.Send<T>Single endpointDirect exchange
ISendEndpoint.Send<T>Specific endpointDirect exchange
public class OrderService
        {
            private readonly IBus _bus;
    
            public OrderService(IBus bus)
            {
                _bus = bus;
            }
    
            // Publish — всем подписчикам (topic-based)
            public async Task CreateOrderAsync(CreateOrderCommand command)
            {
                var message = new OrderCreated
                {
                    OrderId = Guid.NewGuid(),
                    CustomerId = command.CustomerId,
                    Total = command.Total,
                    Timestamp = DateTime.UtcNow
                };
        
                await _bus.Publish(message);
            }
    
            // Send — одному получателю (direct)
            public async Task NotifyAsync(Guid orderId)
            {
                var message = new OrderNotificationRequested
                {
                    OrderId = orderId,
                    Template = "order-confirmed"
                };
        
                await _bus.Send(message, ctx =>
                {
                    ctx.ReceiveEndpoint("notification-service");
                });
            }
        }

Message Contracts

Interfaces vs Classes

Interfaces — recommended для контрактов (меньше зависимостей).

// Recommended: interface-based contract
        public interface IOrderCreated
        {
            Guid OrderId { get; }
            Guid CustomerId { get; }
            decimal Total { get; }
            DateTimeOffset Timestamp { get; }
        }

        // Consumer
        public class OrderCreatedConsumer : IConsumer<IOrderCreated>
        {
            public async Task Consume(ConsumeContext<IOrderCreated> context)
            {
                var message = context.Message;
                Logger.LogInformation("Order {OrderId} created", message.OrderId);
            }
        }

        // Classes — когда нужна сложная структура
        public class OrderCreated
        {
            public Guid OrderId { get; set; }
            public Guid CustomerId { get; set; }
            public decimal Total { get; set; }
            public DateTimeOffset Timestamp { get; set; }
        }

Message Versioning

// V1
        public interface IOrderCreatedV1
        {
            Guid OrderId { get; }
            decimal Total { get; }
        }

        // V2 (backward compatible — добавляем поле)
        public interface IOrderCreatedV2 : IOrderCreatedV1
        {
            string Currency { get; }
            IReadOnlyList<OrderItemV2> Items { get; }
        }

        // V3 (breaking change — новый тип, новая interface)
        public interface IOrderCreatedV3 : IOrderCreatedV2
        {
            string Channel { get; }  // "web", "mobile", "api"
        }

Inheritance Handling

// Base event
        public interface IOrderEvent
        {
            Guid OrderId { get; }
            DateTimeOffset CreatedAt { get; }
        }

        // Derived events
        public interface IOrderCreated : IOrderEvent { }
        public interface IOrderUpdated : IOrderEvent { string Changes { get; } }
        public interface IOrderShipped : IOrderEvent { string TrackingNumber { get; } }

        // Consumer может подписаться на base
        public class OrderEventConsumer : IConsumer<IOrderEvent>
        {
            public async Task Consume(ConsumeContext<IOrderEvent> context)
            {
                // context.Message type determines actual event
                switch (context.Message)
                {
                    case IOrderCreated created:
                        await HandleCreated(created);
                        break;
                    case IOrderUpdated updated:
                        await HandleUpdated(updated);
                        break;
                    case IOrderShipped shipped:
                        await HandleShipped(shipped);
                        break;
                }
            }
    
            private Task HandleCreated(IOrderCreated message) => Task.CompletedTask;
            private Task HandleUpdated(IOrderUpdated message) => Task.CompletedTask;
            private Task HandleShipped(IOrderShipped message) => Task.CompletedTask;
        }

Consumer Configuration

Concurrency

x.UsingRabbitMq((context, cfg) =>
        {
            cfg.Host("rabbitmq://localhost");
    
            cfg.ConfigureConsumer<OrderCreatedConsumer>(context, config =>
            {
                // Concurrency limits
                config.UseConcurrencyLimit(10);        // max 10 parallel messages
                config.UseMessageRetry(retryConfig =>
                {
                    retryConfig.Intervals(
                        TimeSpan.FromSeconds(5),
                        TimeSpan.FromSeconds(30),
                        TimeSpan.FromMinutes(1));
                });
                config.UseScheduledRetrySender();
                config.UseScheduledDelayedRetrySender();
            });
        });

Retry Policies

// In consumer configuration
        config.UseMessageRetry(retryConfig =>
        {
            // Fixed intervals
            retryConfig.Intervals(
                TimeSpan.FromSeconds(5),
                TimeSpan.FromSeconds(10),
                TimeSpan.FromSeconds(30));
        });

        // Exponential backoff
        config.UseMessageRetry(retryConfig =>
        {
            retryConfig.Exponential(
                maxRetries: 5,
                min: TimeSpan.FromSeconds(1),
                max: TimeSpan.FromMinutes(5),
                delta: TimeSpan.FromSeconds(10));
        });

        // Circuit breaker
        config.UseCircuitBreaker(cbConfig =>
        {
            cbConfig.TrackingInterval(TimeSpan.FromMinutes(1));
            cbConfig.OpenThreshold(5, TimeSpan.FromMinutes(1));   // 5 errors in 1 min → open
            cbConfig.ClosedThreshold(3, TimeSpan.FromMinutes(1));  // 3 success → closed
            cbConfig.ReopenedThreshold(3, TimeSpan.FromMinutes(5)); // 3 errors in 5 min → reopen
        });

Error Handling Pipeline

x.UsingRabbitMq((context, cfg) =>
        {
            cfg.Host("rabbitmq://localhost");
    
            // Global error handling
            cfg.UseSendRetry(context, retryConfig =>
            {
                retryConfig.Exponential(3, TimeSpan.FromSeconds(1), TimeSpan.FromSeconds(30));
            });
    
            cfg.UsePublishRetry(context, retryConfig =>
            {
                retryConfig.Incremental(3, TimeSpan.FromSeconds(1), TimeSpan.FromSeconds(5));
            });
    
            // Dead letter queue
            cfg.UseDeadLetterQueue(context, dlq =>
            {
                dlq.Retry(interval, count => interval.Increment(count * 1000));
            });
    
            cfg.ConfigureEndpoints(context);
        });

Saga State Machine

Long-running workflow orchestration.

public class OrderSaga :
            MassTransitStateMachine<OrderSagaState>,
            StateMachineObserver
        {
            public State Ordering { get; set; } = null!;
            public State Paid { get; set; } = null!;
            public State Shipped { get; set; } = null!;
            public State Completed { get; set; } = null!;
            public State Cancelled { get; set; } = null!;
    
            public Event<IOrderCreated> OrderCreated { get; set; } = null!;
            public Event<IPaymentCompleted> PaymentCompleted { get; set; } = null!;
            public Event<IPaymentFailed> PaymentFailed { get; set; } = null!;
            public Event<IOrderShipped> OrderShipped { get; set; } = null!;
            public Event<IOrderCancelled> OrderCancelled { get; set; } = null!;
    
            public OrderSaga()
            {
                InstanceState(x => x.CurrentState);
        
                Event(() => OrderCreated, e => e.CorrelateById(ctx => ctx.Message.OrderId));
                Event(() => PaymentCompleted, e => e.CorrelateById(ctx => ctx.Message.OrderId));
                Event(() => PaymentFailed, e => e.CorrelateById(ctx => ctx.Message.OrderId));
                Event(() => OrderShipped, e => e.CorrelateById(ctx => ctx.Message.OrderId));
                Event(() => OrderCancelled, e => e.CorrelateById(ctx => ctx.Message.OrderId));
        
                Initially(
                    When(OrderCreated)
                        .Then(context => context.SagaData.OrderId = context.Message.OrderId)
                        .Then(context => context.SagaData.CustomerId = context.Message.CustomerId)
                        .Then(context => context.SagaData.Total = context.Message.Total)
                        .TransitionTo(Ordering)
                        .Publish(context => new PaymentRequested
                        {
                            OrderId = context.SagaData.OrderId,
                            Amount = context.SagaData.Total,
                            Currency = "USD"
                        }));
        
                When(PaymentCompleted)
                    .Then(context => context.SagaData.PaidAt = DateTime.UtcNow)
                    .TransitionTo(Paid)
                    .Publish(context => new OrderPrepared
                    {
                        OrderId = context.SagaData.OrderId
                    });
        
                When(PaymentFailed)
                    .Then(context => context.SagaData.FailedAt = DateTime.UtcNow)
                    .TransitionTo(Cancelled)
                    .Publish(context => new OrderCancelled
                    {
                        OrderId = context.SagaData.OrderId,
                        Reason = "Payment failed"
                    });
        
                When(OrderShipped)
                    .Then(context => context.SagaData.ShippedAt = DateTime.UtcNow)
                    .TransitionTo(Shipped)
                    .TransitionTo(Completed);
        
                When(OrderCancelled)
                    .TransitionTo(Cancelled);
        
                // Timeout: if not paid within 15 minutes, cancel
                Event<Timeout<IOrderTimeout>, _>(e => e.CorrelateById(ctx => ctx.Message.OrderId));
        
                During(Ordering,
                    When(TimeoutExpired)
                        .Timeout(ctx => ctx.Message.OrderId, TimeSpan.FromMinutes(15))
                        .Then(context => context.SagaData.CancelledAt = DateTime.UtcNow)
                        .TransitionTo(Cancelled));
        
                // Prevent direct transitions
                Prevent<IPaymentFailed>(Ordering);
                Prevent<IOrderShipped>(Ordering);
                Prevent<IOrderShipped>(Paid);
        
                // Finalize
                DuringAny(
                    When(OrderCancelled)
                        .Finalize());
        
                // Saga persistence
                Event(() => OrderCreated, e => e.UseEntityId());
            }
        }

        public class OrderSagaState :
            SagaStateMachineInstance
        {
            public Guid CorrelationId { get; set; }
            public string CurrentState { get; set; } = null!;
            public Guid OrderId { get; set; }
            public Guid CustomerId { get; set; }
            public decimal Total { get; set; }
            public DateTime? PaidAt { get; set; }
            public DateTime? ShippedAt { get; set; }
            public DateTime? CancelledAt { get; set; }
            public DateTime? FailedAt { get; set; }
        }

Saga Persistence

// In-memory (testing)
        x.AddInMemorySagaRepository<OrderSagaState>();

        // Entity Framework Core (production)
        x.AddEntityFrameworkStateMachineRepository<OrderSagaState>();

        // Redis
        x.AddRedisSagaRepository<OrderSagaState>("localhost:6379");

        // PostgreSQL
        x.AddNpgsqlSagaRepository<OrderSagaState>(connectionString);

In-memory vs RabbitMQ Transport

// In-memory — для unit tests
        x.UsingInMemory((context, cfg) =>
        {
            cfg.ConfigureConsumer<OrderConsumer>(context);
            cfg.Publish<OrderCreated>(pub => pub.ConfigureConsumer<OrderCreatedConsumer>(context));
        });

        // RabbitMQ — для production
        x.UsingRabbitMq((context, cfg) =>
        {
            cfg.Host("rabbitmq://localhost", host =>
            {
                host.Username("guest");
                host.Password("guest");
                // Virtual host
                host.VHost("/");
            });
    
            cfg.ConfigureConsumer<OrderConsumer>(context);
            cfg.Publish<OrderCreated>(pub => pub.ConfigureConsumer<OrderCreatedConsumer>(context));
    
            // RabbitMQ-specific settings
            cfg.ConfigurePartition<OrderCreated>(context, partitionConfig =>
            {
                partitionConfig.PartitionKey = ctx => ctx.Message.OrderId.ToString();
            });
    
            cfg.UseMessageRetry<OrderCreated>(retryConfig =>
            {
                retryConfig.Exponential(3, TimeSpan.FromSeconds(1), TimeSpan.FromMinutes(1));
            });
        });

Message Pipeline

Pipeline Steps

Outgoing Message:
          → Validation → Enrichment → Serialization → Routing → Broker

        Incoming Message:
          → Broker → Deserialization → Routing → Validation → Consumer

Custom Pipeline Steps

// Validation step
        public class ValidationStep : IConsumeFilter<IOrderCreated, IOrderCreated>
        {
            private readonly ILogger<ValidationStep> _logger;
    
            public ValidationStep(ILogger<ValidationStep> logger)
            {
                _logger = logger;
            }
    
            public async Task Filter(ConsumeContext<IOrderCreated> context, IConsumeFilter<IOrderCreated, IOrderCreated> next)
            {
                var message = context.Message;
        
                if (message.Total <= 0)
                {
                    throw new ArgumentException("Order total must be positive", nameof(message.Total));
                }
        
                if (string.IsNullOrEmpty(message.CustomerId.ToString()))
                {
                    throw new ArgumentException("Customer ID is required", nameof(message.CustomerId));
                }
        
                _logger.LogDebug("Validation passed for order {OrderId}", message.OrderId);
                await next.Filter(context);
            }
        }

        // Enrichment step
        public class EnrichmentStep : IConsumeFilter<IOrderCreated, IOrderCreated>
        {
            public async Task Filter(ConsumeContext<IOrderCreated> context, IConsumeFilter<IOrderCreated, IOrderCreated> next)
            {
                // Add processing metadata
                var enriched = new OrderCreatedEnriched
                {
                    OrderId = context.Message.OrderId,
                    CustomerId = context.Message.CustomerId,
                    Total = context.Message.Total,
                    ProcessedAt = DateTime.UtcNow,
                    ProcessedBy = Environment.MachineName
                };
        
                // Replace message with enriched version
                var enrichedContext = context.Clone<OrderCreatedEnriched>(enriched);
                await next.Filter(enrichedContext);
            }
        }

        // Registration
        x.UsingRabbitMq((context, cfg) =>
        {
            cfg.Host("rabbitmq://localhost");
    
            cfg.UseConsumeFilter(typeof(ValidationStep), context);
            cfg.UseConsumeFilter(typeof(EnrichmentStep), context);
    
            cfg.ConfigureEndpoints(context);
        });

Практика

Message Contracts

public interface IOrderCreated
        {
            Guid OrderId { get; }
            Guid CustomerId { get; }
            decimal Total { get; }
            string Currency { get; }
            DateTimeOffset Timestamp { get; }
        }

        public interface IPaymentRequested
        {
            Guid OrderId { get; }
            decimal Amount { get; }
            string Currency { get; }
        }

        public interface IPaymentCompleted
        {
            Guid OrderId { get; }
            DateTime CompletedAt { get; }
            string TransactionId { get; }
        }

        public interface IPaymentFailed
        {
            Guid OrderId { get; }
            string Reason { get; }
            DateTime FailedAt { get; }
        }

        public interface IOrderShipped
        {
            Guid OrderId { get; }
            string TrackingNumber { get; }
            DateTime ShippedAt { get; }
        }

        public interface IOrderCancelled
        {
            Guid OrderId { get; }
            string Reason { get; }
            DateTime CancelledAt { get; }
        }

Saga Configuration

public class OrderSagaRegistration : IRegistration
        {
            public void Configure(IBusRegistrationConfigurator configurator)
            {
                configurator.AddSaga<OrderSaga>();
                configurator.AddSagaRepository<OrderSagaState>();
            }
        }

Consumer

public class OrderCreatedConsumer : IConsumer<IOrderCreated>
        {
            private readonly ILogger<OrderCreatedConsumer> _logger;
            private readonly IBus _bus;
    
            public OrderCreatedConsumer(ILogger<OrderCreatedConsumer> logger, IBus bus)
            {
                _logger = logger;
                _bus = bus;
            }
    
            public async Task Consume(ConsumeContext<IOrderCreated> context)
            {
                _logger.LogInformation("Processing order {OrderId}", context.Message.OrderId);
        
                // Publish payment request — saga will pick it up
                await _bus.Publish(new PaymentRequested
                {
                    OrderId = context.Message.OrderId,
                    Amount = context.Message.Total,
                    Currency = context.Message.Currency
                }, context.CancellationToken);
            }
        }

Checklist

  • [x] MassTransit abstractions: IBus, IPublishEndpoint, ISendEndpoint
  • [x] Message contracts: interfaces vs classes, inheritance handling
  • [x] Consumer configuration: concurrency, error handling, retry policies
  • [x] Saga state machine: long-running workflow orchestration with persistence
  • [x] In-memory vs RabbitMQ transport
  • [x] Message pipeline: validation, enrichment, routing steps

Apache Kafka для .NET

Kafka Fundamentals

Kafka — distributed event streaming platform, спроектированный для high-throughput, fault-tolerant потоковой обработки данных.

Архитектура

Producer → Broker 1 (Topic: orders, Partition 0) → Replicas: [1, 2, 3]
                        → Broker 2 (Topic: orders, Partition 1) → Replicas: [2, 3, 1]
                        → Broker 3 (Topic: orders, Partition 2) → Replicas: [3, 1, 2]

        Consumer Group:
          Consumer A ← Partition 0
          Consumer B ← Partition 1
          Consumer C ← Partition 2

Ключевые термины

ТерминОписание
TopicЛогический канал для сообщений
PartitionФизический сегмент topic, ordered & immutable sequence
BrokerСервер в Kafka cluster
ReplicaКопия partition для fault tolerance
LeaderBroker, обрабатывающий read/write для partition
FollowerBroker, реплицирующий данные от leader
Consumer GroupГруппа consumers, разделяющих нагрузку
OffsetПозиция сообщения в partition (monotonically increasing)
ProducerКлиент, записывающий в topic

Topics, Partitions, Replicas

Data Distribution Model

Topic: "orders"
        ├── Partition 0 (Leader: Broker 1, Replicas: [1, 2, 3], ISR: [1, 2])
        │   ├── Offset 0: Order#1
        │   ├── Offset 1: Order#2
        │   └── Offset 2: Order#3
        ├── Partition 1 (Leader: Broker 2, Replicas: [2, 3, 1], ISR: [2, 3, 1])
        │   ├── Offset 0: Order#4
        │   └── Offset 1: Order#5
        └── Partition 2 (Leader: Broker 3, Replicas: [3, 1, 2], ISR: [3, 1, 2])
            ├── Offset 0: Order#6
            └── Offset 1: Order#7

Partitioning Strategy

// Confluent.Kafka Producer
        var producerConfig = new ProducerConfig
        {
            BootstrapServers = "localhost:9092",
            Acks = Acks.All,              // Все реплики подтверждают
            Idempotence = true,           // Exactly-once semantics
            MaxInFlightRequests = 5,      // При idempotence: 1-5
            Retries = int.MaxValue,       // Автоматические retry
            RetryBackoffMs = 100,
            EnableIdempotence = true      // Idempotent producer
        };

        using var producer = new ProducerBuilder<string, string>(producerConfig)
            .Build();

        // Partition key — определяет partition через hash(key) % numPartitions
        producer.ProduceAsync(
            "orders",
            new Message<string, string>
            {
                Key = orderId.ToString(),       // Один order → всегда одна partition
                Value = JsonSerializer.Serialize(order),
                Headers = new[]
                {
                    new Header("event-type", Encoding.UTF8.GetBytes("OrderCreated")),
                    new Header("source", Encoding.UTF8.GetBytes("order-service"))
                }
            });

Replication Factor

FactorSurvivesMin ISRNotes
10 brokers1No fault tolerance
21 broker1Minimum for HA
32 brokers2Standard production
54 brokers3Maximum recommended
min.insync.replicas=2 + acks=all = гарантированная durability

Consumer Groups

Parallel Consumption

Topic: "orders" (3 partitions)

        Consumer Group: "order-processors"
          Consumer A (instance 1) → Partitions [0, 1]
          Consumer B (instance 2) → Partition [2]
  
        Consumer Group: "analytics"
          Consumer X (instance 1) → Partitions [0, 1, 2]  (read-only, separate group)

Offset Management

// Consumer config
        var consumerConfig = new ConsumerConfig
        {
            BootstrapServers = "localhost:9092",
            GroupId = "order-processors",
            AutoOffsetReset = AutoOffsetReset.Earliest,  // При отсутствии offset
            EnableAutoCommit = false,                      // Manual commit
            SessionTimeoutMs = 10000,
            HeartbeatIntervalMs = 3000,
            MaxPollIntervalMs = 300000,
            EnablePartitionEof = true,                     // Уведомление о конце partition
            MaxPollRecords = 500,                          // Batch size
            EnableAutoOffsetStore = false                  // Fine-grained offset control
        };

        using var consumer = new ConsumerBuilder<string, string>(consumerConfig)
            .SetPartitionsAssignedHandler((c, partitions) =>
            {
                // Настройка partition assignment
                c.Assign(partitions);
                return partitions;
            })
            .SetErrorHandler((_, error) =>
            {
                Logger.LogError("Kafka error: {Code} {Reason}", error.Code, error.Reason);
            })
            .SetConsumeErrorHandler((_, error, message) =>
            {
                Logger.LogError(error, "Consume error on message");
            })
            .Build();

        // Subscribe и polling
        consumer.Subscribe("orders");

        while (!ct.IsCancellationRequested)
        {
            try
            {
                var result = consumer.Consume(ct);
        
                // Process message
                await ProcessMessageAsync(result.Message);
        
                // Manual commit — ПОСЛЕ успешной обработки
                consumer.Commit(result);
            }
            catch (ConsumeException ex)
            {
                Logger.LogError(ex, "Consume error");
            }
        }

Offset Commit: Auto vs Manual

StrategyProsCons
Auto commitSimpleМожет потерять сообщения (commit до обработки)
Manual commitExactly-once processingСложнее, risk of stuck offsets
Store + CommitTransactionalRequires transaction support
// Manual commit — recommended для production
        consumer.Subscribe("orders");

        while (!ct.IsCancellationRequested)
        {
            var result = consumer.Consume(ct);
    
            try
            {
                await ProcessMessageAsync(result.Message);
                consumer.Commit(result);  // Commit ПОСЛЕ обработки
            }
            catch (ProcessingException)
            {
                // Не commit — сообщение будет redelivered
                Logger.LogError("Processing failed, message will be retried");
            }
        }

Producer Semantics

At-Most-Once

var config = new ProducerConfig
        {
            BootstrapServers = "localhost:9092",
            Acks = Acks.None,              // Не ждём подтверждения
            Idempotence = false,
            Retries = 0
        };
        // Сообщение может потеряться при fail

At-Least-Once

var config = new ProducerConfig
        {
            BootstrapServers = "localhost:9092",
            Acks = Acks.All,               // Все реплики подтверждают
            Idempotence = true,            // Предотвращает дубликаты при retry
            Retries = int.MaxValue,
            MaxInFlightRequests = 5
        };
        // Гарантирует доставку, возможны дубликаты при retry
        // (но idempotence предотвращает дубликаты в пределах 5 concurrent requests)

Exactly-Once

var config = new ProducerConfig
        {
            BootstrapServers = "localhost:9092",
            Acks = Acks.All,
            Idempotence = true,
            Retries = int.MaxValue,
            MaxInFlightRequests = 5,
            // Transactional
            TransactionalId = "order-service-1"
        };

        using var producer = new ProducerBuilder<string, string>(config).Build();

        producer.InitTransactions(TimeSpan.FromSeconds(10));
        producer.BeginTransaction();

        try
        {
            // Write to topic
            producer.ProduceAsync("orders", new Message<string, string>
            {
                Key = orderId.ToString(),
                Value = json
            }).GetAwaiter().GetResult();
    
            // Commit transaction — атомарно
            producer.CommitTransaction(TimeSpan.FromSeconds(30));
        }
        catch (Exception)
        {
            producer.AbortTransaction(TimeSpan.FromSeconds(30));
            throw;
        }

Offset Commits

Automatic vs Manual

// Auto commit — simple, но risk
        var config = new ConsumerConfig
        {
            BootstrapServers = "localhost:9092",
            GroupId = "simple-consumer",
            EnableAutoCommit = true,
            AutoCommitIntervalMs = 5000  // Каждые 5 секунд
        };

        // Manual commit — safe, но сложнее
        var config = new ConsumerConfig
        {
            BootstrapServers = "localhost:9092",
            GroupId = "reliable-consumer",
            EnableAutoCommit = false  // Полностью ручной
        };

Transactional Offsets

// Consumer с transactional processing
        var consumerConfig = new ConsumerConfig
        {
            BootstrapServers = "localhost:9092",
            GroupId = "transactional-consumer",
            EnableAutoCommit = false,
            IsolationLevel = IsolationLevel.ReadCommitted  // Только committed messages
        };

        using var consumer = new ConsumerBuilder<string, string>(consumerConfig).Build();
        consumer.Subscribe("orders");

        while (!ct.IsCancellationRequested)
        {
            var result = consumer.Consume(ct);
    
            using var transaction = new Transaction();
            try
            {
                // Process and write to database
                await _dbContext.Orders.AddAsync(DeserializeOrder(result.Message));
                await _dbContext.SaveChangesAsync();
        
                // Commit offset — атомарно с DB transaction
                consumer.Commit(result);
            }
            catch
            {
                transaction.Rollback();
                // Не commit — message redelivered
            }
        }

Schema Registry

Schema Evolution

// Confluent.SchemaRegistry
        var schemaRegistryConfig = new SchemaRegistryConfig
        {
            Url = "http://localhost:8081",
            CacheSize = 1000
        };

        var schemaRegistry = new CachedSchemaRegistryClient(schemaRegistryConfig);

        // Producer с Avro serialization
        var producerConfig = new ProducerConfig
        {
            BootstrapServers = "localhost:9092",
            ValueSerializer = SerializationType.Json
        };

        using var producer = new ProducerBuilder<string, Order>(producerConfig)
            .SetValueSerializer(new JsonSerializer<Order>(schemaRegistry, new JsonSerializeConfig
            {
                SubjectNameStrategy = SubjectNameStrategy.TopicRecord  // orders-Order
            }))
            .Build();

Schema Evolution Strategies

Backward Compatible:
          V1: {id, name, price}
          V2: {id, name, price, discount}  // Добавляем поле с default → V1 consumers OK

        Forward Compatible:
          V1: {id, name, price}
          V2: {id, name, price, discount?}  // Убираем обязательное поле → V2 consumers OK

        Full Compatible:
          V1: {id, name, price}
          V2: {id, name, price, discount?}  // Оба направления → V1 и V2 consumers OK

        Breaking Change:
          V1: {id, name, price}
          V3: {id, name, total, items}  // Изменяем структуру → NEW TOPIC или VERSIONING

Backward-Compatible Changes

// V1
        public record OrderV1(Guid Id, string Name, decimal Price);

        // V2 — backward compatible (добавляем optional fields)
        public record OrderV2(
            Guid Id, 
            string Name, 
            decimal Price,
            decimal? Discount = null,
            string? Currency = "USD");

        // V3 — backward compatible (добавляем collection)
        public record OrderV3(
            Guid Id,
            string Name,
            decimal Price,
            decimal? Discount = null,
            string Currency = "USD",
            IReadOnlyList<OrderItemV3>? Items = null);

        public record OrderItemV3(Guid ProductId, string ProductName, int Quantity, decimal UnitPrice);

Практика

public class KafkaOrderProducer
        {
            private readonly IProducer<string, string> _producer;
            private readonly ILogger<KafkaOrderProducer> _logger;
    
            public KafkaOrderProducer(ILogger<KafkaOrderProducer> logger)
            {
                var config = new ProducerConfig
                {
                    BootstrapServers = "kafka://localhost:9092",
                    Acks = Acks.All,
                    Idempotence = true,
                    MaxInFlightRequests = 5,
                    Retries = int.MaxValue,
                    RetryBackoffMs = 100,
                    EnableIdempotence = true,
                    CompressionType = CompressionType.Lz4,  // Оптимизация bandwidth
                    BatchSize = 64000,                       // 64KB batch
                    LingerMs = 10,                           // Wait 10ms for batching
                    BufferMemory = 33554432                  // 32MB buffer
                };
        
                _producer = new ProducerBuilder<string, string>(config)
                    .SetErrorHandler((_, error) =>
                        _logger.LogError("Kafka error: {Code} {Reason}", error.Code, error.Reason))
                    .SetProductionHandler((message, deliveryReport) =>
                    {
                        if (deliveryReport.Status == PersistenceStatus.Persisted)
                        {
                            _logger.LogDebug(
                                "Message delivered: topic={Topic} partition={Partition} offset={Offset}",
                                deliveryReport.Topic, deliveryReport.Partition, deliveryReport.Offset);
                        }
                        else
                        {
                            _logger.LogError(
                                "Message delivery failed: {Error}", deliveryReport.Error.Reason);
                        }
                    })
                    .Build();
        
                _logger.LogInformation("Kafka producer initialized");
            }
    
            public async Task PublishOrderCreatedAsync(OrderCreatedEvent order)
            {
                var message = new Message<string, string>
                {
                    Key = order.OrderId.ToString(),
                    Value = JsonSerializer.Serialize(order),
                    Headers = new[]
                    {
                        new Header("event-type", Encoding.UTF8.GetBytes("OrderCreated")),
                        new Header("event-version", Encoding.UTF8.GetBytes("v1")),
                        new Header("timestamp", Encoding.UTF8.GetBytes(DateTimeOffset.UtcNow.ToUnixTimeMilliseconds().ToString()))
                    },
                    Timestamp = new Timestamp(DateTimeOffset.UtcNow.ToUnixTimeMilliseconds(), TimestampType.CreateTime)
                };
        
                try
                {
                    var result = await _producer.ProduceAsync("orders", message);
                    _logger.LogInformation(
                        "Order {OrderId} published: topic={Topic} partition={Partition} offset={Offset}",
                        order.OrderId, result.Topic, result.Partition, result.Offset);
                }
                catch (ProduceException ex)
                {
                    _logger.LogError(ex, "Failed to publish order {OrderId}", order.OrderId);
                    throw;
                }
            }
        }

Kafka Consumer с Manual Offset Commit

public class KafkaOrderConsumer : BackgroundService
        {
            private readonly IConsumer<string, string> _consumer;
            private readonly ILogger<KafkaOrderConsumer> _logger;
            private readonly IProcessOrder _processor;
    
            public KafkaOrderConsumer(
                IConsumer<string, string> consumer,
                ILogger<KafkaOrderConsumer> logger,
                IProcessOrder processor)
            {
                _consumer = consumer;
                _logger = logger;
                _processor = processor;
            }
    
            protected override Task ExecuteAsync(CancellationToken stoppingToken)
            {
                _consumer.Subscribe("orders");
                return Task.Run(() => ConsumeLoop(stoppingToken), stoppingToken);
            }
    
            private void ConsumeLoop(CancellationToken ct)
            {
                while (!ct.IsCancellationRequested)
                {
                    try
                    {
                        var result = _consumer.Consume(ct);
                
                        try
                        {
                            _processor.ProcessOrderAsync(result.Message.Value, ct).GetAwaiter().GetResult();
                            _consumer.Commit(result);  // Commit AFTER processing
                        }
                        catch (Exception ex)
                        {
                            _logger.LogError(ex, "Processing failed for offset {Offset}", result.Message.Offset);
                            // Don't commit — message will be redelivered
                        }
                    }
                    catch (ConsumeException ex)
                    {
                        _logger.LogError(ex, "Consume error: {Code}", ex.Error.Code);
                        if (ex.Error.IsFatal)
                        {
                            break;
                        }
                    }
                    catch (OperationCanceledException)
                    {
                        _consumer.Close();
                        break;
                    }
                }
            }
        }

Сводная таблица: Kafka vs RabbitMQ

FeatureKafkaRabbitMQ
ModelLog-basedQueue-based
RetentionTime/size-basedUntil consumed
ThroughputVery high (MB/s)High (KB/s)
LatencyHigher (ms)Lower (µs)
OrderingPer-partitionPer-queue
Message sizeLarge (MB)Small (KB)
Use caseEvent streaming, analyticsTask queues, RPC
Consumer groupsBuilt-inNot native
ReplayYes (seek to offset)No

Checklist

  • [x] Kafka producer с Confluent.Kafka и idempotent delivery
  • [x] Consumer group с manual offset commit и error handling
  • [x] Schema evolution strategy с backward-compatible changes

Reliable Messaging Patterns

Outbox Pattern

Проблема

Типичная проблема: запись в БД + отправка сообщения в broker — не атомарные операции.

// BAD: Two-phase commit
        Transaction {
            db.Save(order);           // Успешно
        }
        // CRASH здесь → сообщение не отправлено
        broker.Publish(orderCreated); // Не выполняется

Решение: Outbox Table

Transaction {
            db.Save(order);                // Запись в основную таблицу
            db.Save(outboxMessage);        // Запись в outbox table
        }                                    // Атомарно!

Архитектура

┌─────────────┐     ┌──────────────┐     ┌─────────────┐     ┌──────────┐
        │  Order      │     │  Database    │     │  Outbox     │     │  Message  │
        │  Service    │────▶│  (EF Core)   │────▶│  Poller     │────▶│  Broker   │
        │             │     │              │     │  (CDC/Poll) │     │  (Rabbit) │
        └─────────────┘     └──────────────┘     └─────────────┘     └──────────┘
                                 │
                                 ├── Orders table
                                 └── OutboxMessages table

Реализация с EF Core + MassTransit

// 1. Outbox Message Entity
        public class OutboxMessage
        {
            public Guid Id { get; set; }
            public string Type { get; set; } = null!;
            public string ContentType { get; set; } = "application/json";
            public string Payload { get; set; } = null!;
            public DateTime CreatedAt { get; set; } = DateTime.UtcNow;
            public DateTime? PublishedAt { get; set; }
            public int? RetryCount { get; set; }
            public string? Error { get; set; }
    
            //EF Core mapping
            public static void Configure(ModelBuilder modelBuilder)
            {
                modelBuilder.Entity<OutboxMessage>(entity =>
                {
                    entity.HasKey(e => e.Id);
                    entity.HasIndex(e => new { e.PublishedAt, e.Id });
                    entity.Property(e => e.Payload).HasColumnType("nvarchar(max)");
                });
            }
        }

        // 2. DbContext с Outbox
        public class AppDbContext : DbContext
        {
            public DbSet<Order> Orders { get; set; }
            public DbSet<OutboxMessage> OutboxMessages { get; set; }
    
            public override async Task<int> SaveChangesAsync(CancellationToken cancellationToken = default)
            {
                var result = await base.SaveChangesAsync(cancellationToken);
        
                // После сохранения — publish outbox messages
                await PublishOutboxMessagesAsync(cancellationToken);
        
                return result;
            }
    
            private async Task PublishOutboxMessagesAsync(CancellationToken ct)
            {
                var unpublished = await OutboxMessages
                    .Where(m => m.PublishedAt == null)
                    .ToListAsync(ct);
        
                foreach (var message in unpublished)
                {
                    try
                    {
                        var envelope = new OutboxEnvelope
                        {
                            MessageId = message.Id,
                            MessageType = message.Type,
                            ContentType = message.ContentType,
                            Payload = message.Payload,
                            CreatedAt = message.CreatedAt
                        };
                
                        // MassTransit publish через current bus
                        var bus = Services.GetRequiredService<IBus>();
                        await bus.Publish(envelope, ct);
                
                        message.PublishedAt = DateTime.UtcNow;
                    }
                    catch (Exception ex)
                    {
                        message.RetryCount = (message.RetryCount ?? 0) + 1;
                        message.Error = ex.Message;
                
                        if (message.RetryCount >= 5)
                        {
                            // Max retries — mark as failed
                            message.PublishedAt = DateTime.UtcNow;
                        }
                    }
                }
        
                await base.SaveChangesAsync(ct);
            }
        }

        // 3. Использование
        public class OrderService
        {
            private readonly AppDbContext _context;
            private readonly IBus _bus;
    
            public async Task CreateOrderAsync(CreateOrderCommand cmd)
            {
                var order = new Order
                {
                    Id = Guid.NewGuid(),
                    CustomerId = cmd.CustomerId,
                    Total = cmd.Total,
                    Status = OrderStatus.Created
                };
        
                _context.Orders.Add(order);
        
                // Outbox message — в той же транзакции!
                _context.OutboxMessages.Add(new OutboxMessage
                {
                    Id = Guid.NewGuid(),
                    Type = nameof(OrderCreated),
                    Payload = JsonSerializer.Serialize(new
                    {
                        order.Id,
                        order.CustomerId,
                        order.Total,
                        Timestamp = DateTime.UtcNow
                    })
                });
        
                await _context.SaveChangesAsync();  
                // Атомарно: order + outbox message
            }
        }

Outbox Poller (Standalone)

// Standalone outbox poller — для high-throughput
        public class OutboxPoller : BackgroundService
        {
            private readonly IServiceScopeFactory _scopeFactory;
            private readonly ILogger<OutboxPoller> _logger;
            private readonly TimeSpan _pollInterval = TimeSpan.FromMilliseconds(100);
    
            public OutboxPoller(IServiceScopeFactory scopeFactory, ILogger<OutboxPoller> logger)
            {
                _scopeFactory = scopeFactory;
                _logger = logger;
            }
    
            protected override async Task ExecuteAsync(CancellationToken stoppingToken)
            {
                while (!stoppingToken.IsCancellationRequested)
                {
                    try
                    {
                        await ProcessOutboxAsync(stoppingToken);
                    }
                    catch (Exception ex)
                    {
                        _logger.LogError(ex, "Outbox poller error");
                    }
            
                    await Task.Delay(_pollInterval, stoppingToken);
                }
            }
    
            private async Task ProcessOutboxAsync(CancellationToken ct)
            {
                using var scope = _scopeFactory.CreateScope();
                var context = scope.ServiceProvider.GetRequiredService<AppDbContext>();
                var bus = scope.ServiceProvider.GetRequiredService<IBus>();
        
                var batchSize = 100;
                var unpublished = await context.OutboxMessages
                    .Where(m => m.PublishedAt == null && (m.RetryCount == null || m.RetryCount < 5))
                    .OrderBy(m => m.Id)
                    .Take(batchSize)
                    .ToListAsync(ct);
        
                foreach (var message in unpublished)
                {
                    try
                    {
                        var envelope = new OutboxEnvelope
                        {
                            MessageId = message.Id,
                            MessageType = message.Type,
                            Payload = message.Payload
                        };
                
                        await bus.Publish(envelope, ct);
                        message.PublishedAt = DateTime.UtcNow;
                    }
                    catch (Exception ex)
                    {
                        message.RetryCount = (message.RetryCount ?? 0) + 1;
                        message.Error = ex.Message;
                        // Continue processing other messages
                    }
                }
        
                if (unpublished.Any())
                {
                    await context.SaveChangesAsync(ct);
                }
            }
        }

Change Data Capture (CDC)

CDC vs Outbox

AspectOutbox TableCDC (Debezium/Maxwell)
ComplexityLow (код)High (инфраструктура)
PerformanceDB write overheadZero app overhead
Vendor lock-inNoneCDC connector specific
OrderingGuaranteed (PK)Partition-based
LatencyPoll intervalMilliseconds

CDC Pipeline

┌──────────┐     ┌────────────┐     ┌───────────┐     ┌──────────┐
        │ Database │────▶│  CDC       │────▶│ Kafka     │────▶│ Consumers│
        │ (MySQL/  │     │ (Debezium) │     │ Topic     │     │          │
        │  PG)     │     │            │     │           │     │          │
        └──────────┘     └────────────┘     └───────────┘     └──────────┘
                            │
                            ├── Reads binlog/wal
                            ├── Emits change events
                            └── Schema evolution support

Idempotent Consumers

Проблема

При at-least-once delivery consumer может получить одно и то же сообщение дважды.

Consumer receives message #42
            ↓
        Process message (takes 5s)
            ↓
        Consumer crashes BEFORE ack
            ↓
        Broker redelivers message #42 (duplicate!)

Deduplication Strategies

1. MessageId-based deduplication table

public class IdempotentProcessor
        {
            private readonly AppDbContext _context;
            private readonly ILogger<IdempotentProcessor> _logger;
            private readonly ConcurrentDictionary<string, DateTime> _cache = new();
            private static readonly TimeSpan CacheTtl = TimeSpan.FromHours(24);
    
            public IdempotentProcessor(AppDbContext context, ILogger<IdempotentProcessor> logger)
            {
                _context = context;
                _logger = logger;
            }
    
            public async Task<bool> TryProcessAsync(string messageId, Func<Task> processAsync)
            {
                // Fast path: in-memory cache
                if (_cache.TryGetValue(messageId, out var processedAt) &&
                    processedAt > DateTime.UtcNow.AddTicks(-CacheTtl))
                {
                    _logger.LogDebug("Duplicate message skipped (cache): {MessageId}", messageId);
                    return false;
                }
        
                // Slow path: database check
                var existing = await _context.ProcessedMessages
                    .FirstOrDefaultAsync(m => m.MessageId == messageId);
        
                if (existing != null)
                {
                    _logger.LogDebug("Duplicate message skipped (DB): {MessageId}", messageId);
                    return false;
                }
        
                // Mark as processing
                await _context.ProcessedMessages.AddAsync(new ProcessedMessage
                {
                    MessageId = messageId,
                    ProcessedAt = DateTime.UtcNow,
                    Status = "Processing"
                });
                await _context.SaveChangesAsync();
        
                try
                {
                    await processAsync();
            
                    // Mark as completed
                    existing = await _context.ProcessedMessages
                        .FirstOrDefaultAsync(m => m.MessageId == messageId);
                    existing!.Status = "Completed";
                    await _context.SaveChangesAsync();
            
                    // Add to cache
                    _cache[messageId] = DateTime.UtcNow;
            
                    return true;
                }
                catch
                {
                    existing!.Status = "Failed";
                    await _context.SaveChangesAsync();
                    throw;
                }
            }
        }

        // EF Core entity
        public class ProcessedMessage
        {
            public Guid Id { get; set; }
            public string MessageId { get; set; } = null!;
            public DateTime ProcessedAt { get; set; }
            public string Status { get; set; } = null!;
    
            public static void Configure(ModelBuilder modelBuilder)
            {
                modelBuilder.Entity<ProcessedMessage>(entity =>
                {
                    entity.HasKey(e => e.Id);
                    entity.HasIndex(e => e.MessageId).IsUnique();  // UNIQUE constraint!
                });
            }
        }

2. Consumer с deduplication

public class OrderConsumer : IConsumer<IOrderCreated>
        {
            private readonly IdempotentProcessor _idempotentProcessor;
            private readonly IProcessOrder _orderProcessor;
    
            public async Task Consume(ConsumeContext<IOrderCreated> context)
            {
                var messageId = context.MessageId?.ToString() ?? Guid.NewGuid().ToString();
        
                var processed = await _idempotentProcessor.TryProcessAsync(messageId, async () =>
                {
                    await _orderProcessor.ProcessOrderAsync(context.Message);
                });
        
                if (!processed)
                {
                    // Duplicate — уже обработано, просто ack
                    return;
                }
            }
        }

3. Database-level idempotency

-- PostgreSQL: UPSERT для idempotency
        INSERT INTO processed_messages (message_id, processed_at, status)
        VALUES (@messageId, @timestamp, 'Completed')
        ON CONFLICT (message_id) DO NOTHING;
        -- Возвращает 0 строк, если уже существует → duplicate

        -- SQL Server: MERGE
        MERGE INTO processed_messages WITH (UPDLOCK)
        USING (SELECT @messageId AS message_id) AS src
        ON processed_messages.message_id = src.message_id
        WHEN NOT MATCHED THEN
            INSERT (message_id, processed_at, status)
            VALUES (@messageId, @timestamp, 'Completed');

Message Ordering Guarantees

Per-Partition Ordering

Kafka: Гарантируется per-partition
          Partition 0: [msg1, msg2, msg3] — strict order
          Partition 1: [msg4, msg5, msg6] — strict order
  
        RabbitMQ: Гарантируется per-queue
          Queue "orders": [msg1, msg2, msg3] — strict order

Global Ordering Trade-offs

// Kafka: global ordering = single partition
        // PRO: Strict global order
        // CON: No parallelism, bottleneck
        var config = new ProducerConfig
        {
            BootstrapServers = "localhost:9092",
            // Все сообщения в один partition
            Partitioner = Partitioner.Consistent  // hash(key) → single partition
        };

        // RabbitMQ: per-queue ordering
        // PRO: Simple, guaranteed
        // CON: Single consumer bottleneck
        await channel.BasicQosAsync(prefetchSize: 0, prefetchCount: 1, global: false);
        // prefetch=1 гарантирует: следующее сообщение только после ack

Partial Ordering (Recommended)

// Partition by aggregate root
        //同一 aggregate — guaranteed order
        // Different aggregates — no ordering guarantee (parallel)

        // Kafka: Key = OrderId
        //同一 order messages → same partition
        // Different orders → different partitions → parallel

        producer.ProduceAsync("orders", new Message<string, string>
        {
            Key = orderId.ToString(),  //同一 order → same partition
            Value = json
        });

Transactional Outbox vs Two-Phase Commit

FeatureTransactional Outbox2PC (XA)
ComplexityLowHigh
PerformanceHigh (no locking)Low (distributed locks)
AvailabilityHighLow (coordinator dependency)
Vendor supportUniversalLimited
LatencyPoll intervalReal-time
RecommendedYes (most cases)Rarely

Checklist

  • [x] Outbox Pattern с EF Core + MassTransit
  • [x] Idempotent message processor с deduplication table
  • [x] CDC pipeline для database change event generation

Практика


Event Sourcing Integration

Event Store as Message Source

Core Concepts

Aggregate Root: Order
            │
            ├── Stream: order-{OrderId}
            │   ├── Event: OrderCreated (version 1)
            │   ├── Event: OrderUpdated (version 2)
            │   ├── Event: OrderShipped (version 3)
            │   └── Event: OrderCompleted (version 4)
            │
            └── Snapshot: version 100 (full state)

Append-Only, Stream per Aggregate

// Event Store entity
        public class DomainEvent
        {
            public Guid Id { get; set; }
            public string Type { get; set; } = null!;
            public string StreamId { get; set; } = null!;
            public int StreamPosition { get; set; }
            public int Version { get; set; }
            public string Data { get; set; } = null!;
            public string? Metadata { get; set; }
            public DateTimeOffset CreatedAt { get; set; } = DateTimeOffset.UtcNow;
            public string? CreatedBy { get; set; }
        }

        // Append-only: never update/delete events
        public class EventStore
        {
            private readonly AppDbContext _context;
    
            public async Task AppendAsync<TEvent>(string streamId, TEvent @event, int expectedVersion, string? userId = null)
                where TEvent : class
            {
                var currentVersion = await _context.Events
                    .Where(e => e.StreamId == streamId)
                    .Select(e => e.Version)
                    .DefaultIfEmpty()
                    .MaxAsync();
        
                if (currentVersion != expectedVersion)
                {
                    throw new ConcurrencyException(
                        $"Expected version {expectedVersion}, but was {currentVersion}");
                }
        
                var domainEvent = new DomainEvent
                {
                    Id = Guid.NewGuid(),
                    Type = typeof(TEvent).Name,
                    StreamId = streamId,
                    StreamPosition = await GetMaxStreamPositionAsync(),
                    Version = currentVersion + 1,
                    Data = JsonSerializer.Serialize(@event),
                    CreatedAt = DateTimeOffset.UtcNow,
                    CreatedBy = userId
                };
        
                _context.Events.Add(domainEvent);
                await _context.SaveChangesAsync();
            }
    
            private async Task<int> GetMaxStreamPositionAsync()
            {
                return await _context.Events
                    .Select(e => e.StreamPosition)
                    .DefaultIfEmpty()
                    .MaxAsync();
            }
        }

Projection Builders

Catch-up Subscription Pattern

Event Stream          Projection Builder        Read Model
            │                        │                        │
            ├── OrderCreated ──────▶│──▶ INSERT INTO orders ──▶ SELECT *
            ├── OrderUpdated ──────▶│──▶ UPDATE orders ───────▶ JOIN ...
            ├── OrderShipped ──────▶│──▶ UPDATE orders ───────▶ LEFT JOIN shipments
            └── OrderCompleted ────▶│──▶ UPDATE orders ───────▶ WHERE status = 'completed'

        Real-time: Event Store → Stream → Projection
        Catch-up: Event Store → Replay all → Projection

Catch-up Subscription

public class OrderProjectionBuilder : BackgroundService
        {
            private readonly EventStore _eventStore;
            private readonly IMediator _mediator;
            private readonly ILogger<OrderProjectionBuilder> _logger;
            private int _lastProcessedPosition = 0;
    
            public OrderProjectionBuilder(
                EventStore eventStore,
                IMediator mediator,
                ILogger<OrderProjectionBuilder> logger)
            {
                _eventStore = eventStore;
                _mediator = mediator;
                _logger = logger;
            }
    
            protected override async Task ExecuteAsync(CancellationToken stoppingToken)
            {
                _logger.LogInformation("Projection builder starting from position {Position}", _lastProcessedPosition);
        
                while (!stoppingToken.IsCancellationRequested)
                {
                    try
                    {
                        // Catch-up: get all events since last processed
                        var events = await _eventStore.GetEventsAsync(
                            fromPosition: _lastProcessedPosition,
                            batchSize: 100,
                            stoppingToken);
                
                        if (!events.Any())
                        {
                            await Task.Delay(100, stoppingToken);
                            continue;
                        }
                
                        foreach (var @event in events)
                        {
                            try
                            {
                                await ProcessEventAsync(@event, stoppingToken);
                                _lastProcessedPosition = @event.StreamPosition;
                            }
                            catch (Exception ex)
                            {
                                _logger.LogError(ex, "Failed to process event at position {Position}", @event.StreamPosition);
                                // Don't advance — will retry
                                await Task.Delay(1000, stoppingToken);
                                break;
                            }
                        }
                    }
                    catch (Exception ex)
                    {
                        _logger.LogError(ex, "Projection builder error");
                        await Task.Delay(5000, stoppingToken);
                    }
                }
            }
    
            private async Task ProcessEventAsync(DomainEvent @event, CancellationToken ct)
            {
                var eventType = Type.GetType(@event.Type) ?? throw new InvalidOperationException($"Unknown event type: {@event.Type}");
        
                dynamic? deserialized = eventType == typeof(OrderCreated)
                    ? JsonSerializer.Deserialize<OrderCreatedEvent>(@event.Data)
                    : eventType == typeof(OrderUpdated)
                        ? JsonSerializer.Deserialize<OrderUpdatedEvent>(@event.Data)
                        : eventType == typeof(OrderShipped)
                            ? JsonSerializer.Deserialize<OrderShippedEvent>(@event.Data)
                            : eventType == typeof(OrderCompleted)
                                ? JsonSerializer.Deserialize<OrderCompletedEvent>(@event.Data)
                                : null;
        
                // Route to appropriate projection handler
                switch (deserialized)
                {
                    case OrderCreatedEvent created:
                        await _mediator.Send(new CreateOrderProjection(created), ct);
                        break;
                    case OrderUpdatedEvent updated:
                        await _mediator.Send(new UpdateOrderProjection(updated), ct);
                        break;
                    case OrderShippedEvent shipped:
                        await _mediator.Send(new ShipOrderProjection(shipped), ct);
                        break;
                    case OrderCompletedEvent completed:
                        await _mediator.Send(new CompleteOrderProjection(completed), ct);
                        break;
                }
        
                _logger.LogDebug("Processed event: {Type} at position {Position}", @event.Type, @event.StreamPosition);
            }
        }

        // Real-time subscription (alternative: using MassTransit or event store native)
        public class RealTimeProjectionConsumer : IConsumer<IDomainEvent>
        {
            public async Task Consume(ConsumeContext<IDomainEvent> context)
            {
                var @event = context.Message;
                // Process in real-time
                await ApplyEventAsync(@event);
            }
    
            private Task ApplyEventAsync(IDomainEvent @event) => Task.CompletedTask;
        }

Read Model Builder

// CQRS: Events → Read Model
        public class OrderReadModel
        {
            public Guid Id { get; set; }
            public Guid CustomerId { get; set; }
            public decimal Total { get; set; }
            public string Status { get; set; } = null!;
            public string? TrackingNumber { get; set; }
            public DateTime? ShippedAt { get; set; }
            public DateTime? CompletedAt { get; set; }
            public List<OrderItemReadModel> Items { get; set; } = new();
        }

        // Projection handlers
        public class CreateOrderProjection : IRequest
        {
            public OrderCreatedEvent Event { get; }
            public CreateOrderProjection(OrderCreatedEvent @event) => Event = @event;
        }

        public class CreateOrderProjectionHandler : IRequestHandler<CreateOrderProjection>
        {
            private readonly AppDbContext _context;
    
            public async Task Handle(CreateOrderProjection request, CancellationToken ct)
            {
                var readModel = new OrderReadModel
                {
                    Id = request.Event.OrderId,
                    CustomerId = request.Event.CustomerId,
                    Total = request.Event.Total,
                    Status = "Created",
                    Items = request.Event.Items.Select(i => new OrderItemReadModel
                    {
                        ProductId = i.ProductId,
                        Name = i.Name,
                        Quantity = i.Quantity,
                        Price = i.Price
                    }).ToList()
                };
        
                _context.OrderReadModels.Add(readModel);
                await _context.SaveChangesAsync(ct);
            }
        }

        public class UpdateOrderProjection : IRequest
        {
            public OrderUpdatedEvent Event { get; }
            public UpdateOrderProjection(OrderUpdatedEvent @event) => Event = @event;
        }

        public class UpdateOrderProjectionHandler : IRequestHandler<UpdateOrderProjection>
        {
            private readonly AppDbContext _context;
    
            public async Task Handle(UpdateOrderProjection request, CancellationToken ct)
            {
                var readModel = await _context.OrderReadModels
                    .FirstOrDefaultAsync(m => m.Id == request.Event.OrderId, ct);
        
                if (readModel != null)
                {
                    readModel.Status = request.Event.NewStatus;
                    readModel.Total = request.Event.Total;
                    await _context.SaveChangesAsync(ct);
                }
            }
        }

Event Versioning

Handling Breaking Changes

// Version 1
        public record OrderCreatedV1(Guid OrderId, Guid CustomerId, decimal Total);

        // Version 2 — backward compatible (added optional field)
        public record OrderCreatedV2(Guid OrderId, Guid CustomerId, decimal Total, string? Currency);

        // Version 3 — breaking change (new structure)
        public record OrderCreatedV3(
            Guid OrderId, 
            Guid CustomerId, 
            IReadOnlyList<OrderItemV3> Items,
            string Currency);

        public record OrderItemV3(Guid ProductId, string Name, int Quantity, decimal UnitPrice);

        // Event versioning handler
        public class EventVersioningHandler
        {
            private readonly Dictionary<string, Func<string, object>> _voters = new()
            {
                ["OrderCreatedV1"] = json => JsonSerializer.Deserialize<OrderCreatedV1>(json)!,
                ["OrderCreatedV2"] = json => JsonSerializer.Deserialize<OrderCreatedV2>(json)!,
                ["OrderCreatedV3"] = json => JsonSerializer.Deserialize<OrderCreatedV3>(json)!,
            };
    
            public object DeserializeEvent(string eventType, string json)
            {
                if (_voters.TryGetValue(eventType, out var voter))
                {
                    return voter(json);
                }
                throw new InvalidOperationException($"Unknown event type: {eventType}");
            }
    
            // Migration: V1 → V2
            public OrderCreatedV2 MigrateV1ToV2(OrderCreatedV1 v1)
            {
                return new OrderCreatedV2(
                    v1.OrderId,
                    v1.CustomerId,
                    v1.Total,
                    Currency: "USD"  // Default value
                );
            }
    
            // Migration: V2 → V3
            public OrderCreatedV3 MigrateV2ToV3(OrderCreatedV2 v2)
            {
                return new OrderCreatedV3(
                    v2.OrderId,
                    v2.CustomerId,
                    items: new List<OrderItemV3>(),  // Empty — legacy
                    v2.Currency ?? "USD"
                );
            }
        }

Backward Compatibility Layer

public class BackwardCompatibilityLayer
        {
            private readonly EventVersioningHandler _versioning;
    
            public BackwardCompatibilityLayer(EventVersioningHandler versioning)
            {
                _versioning = versioning;
            }
    
            public T DeserializeAndNormalize<T>(string eventType, string json) where T : class
            {
                // Deserialize original version
                var original = _versioning.DeserializeEvent(eventType, json);
        
                // Migrate to latest
                var normalized = original switch
                {
                    OrderCreatedV1 v1 => _versioning.MigrateV1ToV2(v1),
                    OrderCreatedV2 v2 => _versioning.MigrateV2ToV3(v2),
                    OrderCreatedV3 v3 => v3,  // Already latest
                    _ => throw new InvalidOperationException($"Cannot migrate {eventType}")
                };
        
                // Serialize as latest version
                return JsonSerializer.Deserialize<T>(JsonSerializer.Serialize(normalized))!;
            }
        }

Snapshot Strategy

Problem: Large Stream Replay

Stream with 50,000 events:
          Replay all 50k events → Apply each → Build state → Very slow!

        With snapshot at every 1000 events:
          Load snapshot (version 49,000) → Replay 1,000 events → Build state → Fast!

Snapshot Implementation

public class SnapshotStore
        {
            private readonly AppDbContext _context;
            private readonly int _snapshotInterval = 1000;
    
            public async Task<bool> ShouldSnapshotAsync(string streamId, int currentVersion)
            {
                return currentVersion % _snapshotInterval == 0;
            }
    
            public async Task SaveSnapshotAsync(string streamId, int version, byte[] state)
            {
                var snapshot = await _context.Snapshots
                    .FirstOrDefaultAsync(s => s.StreamId == streamId, CancellationToken.None);
        
                if (snapshot != null)
                {
                    snapshot.Version = version;
                    snapshot.Data = state;
                    snapshot.CreatedAt = DateTimeOffset.UtcNow;
                }
                else
                {
                    _context.Snapshots.Add(new SnapshotRecord
                    {
                        StreamId = streamId,
                        Version = version,
                        Data = state,
                        CreatedAt = DateTimeOffset.UtcNow
                    });
                }
        
                await _context.SaveChangesAsync(CancellationToken.None);
            }
    
            public async Task<byte[]?> LoadSnapshotAsync(string streamId)
            {
                var snapshot = await _context.Snapshots
                    .Where(s => s.StreamId == streamId)
                    .OrderByDescending(s => s.Version)
                    .FirstOrDefaultAsync(CancellationToken.None);
        
                return snapshot?.Data;
            }
        }

        public class SnapshotRecord
        {
            public Guid Id { get; set; }
            public string StreamId { get; set; } = null!;
            public int Version { get; set; }
            public byte[] Data { get; set; } = default!;
            public DateTimeOffset CreatedAt { get; set; }
        }

Snapshot-based Aggregate Replay

public class OrderAggregate
        {
            private readonly List<IDomainEvent> _uncommittedEvents = new();
            private int _version = 0;
    
            public async Task LoadAsync(string streamId, EventStore eventStore, SnapshotStore snapshotStore)
            {
                // Load snapshot first
                var snapshot = await snapshotStore.LoadSnapshotAsync(streamId);
        
                if (snapshot != null)
                {
                    // Deserialize snapshot state
                    var state = JsonSerializer.Deserialize<OrderState>(snapshot);
                    ApplyState(state!);
                    _version = state!.Version;
            
                    // Load only events after snapshot
                    var events = await eventStore.GetEventsAfterVersionAsync(streamId, _version);
                    foreach (var @event in events)
                    {
                        ApplyEvent(@event);
                    }
                }
                else
                {
                    // No snapshot — replay all
                    var events = await eventStore.GetAllEventsAsync(streamId);
                    foreach (var @event in events)
                    {
                        ApplyEvent(@event);
                    }
                }
            }
    
            public async Task SaveAsync(EventStore eventStore, SnapshotStore snapshotStore)
            {
                foreach (var @event in _uncommittedEvents)
                {
                    await eventStore.AppendAsync(
                        StreamId, 
                        @event, 
                        _version, 
                        userId: null);
                    _version++;
                }
        
                _uncommittedEvents.Clear();
        
                // Create snapshot if needed
                if (await snapshotStore.ShouldSnapshotAsync(StreamId, _version))
                {
                    var state = SerializeState();
                    await snapshotStore.SaveSnapshotAsync(StreamId, _version, state);
                }
            }
    
            private void ApplyEvent(IDomainEvent @event)
            {
                switch (@event.Type)
                {
                    case "OrderCreated":
                        var created = JsonSerializer.Deserialize<OrderCreatedEvent>(@event.Data);
                        _orderId = created!.OrderId;
                        _customerId = created.CustomerId;
                        _total = created.Total;
                        _status = "Created";
                        break;
                    case "OrderUpdated":
                        var updated = JsonSerializer.Deserialize<OrderUpdatedEvent>(@event.Data);
                        _total = updated!.Total;
                        _status = updated.NewStatus;
                        break;
                    // ... more event types
                }
            }
    
            private byte[] SerializeState() => 
                JsonSerializer.SerializeToBytes(new OrderState { Version = _version, /* ... */ });
    
            private void ApplyState(OrderState state)
            {
                _version = state.Version;
                // Restore state
            }
        }

        public record OrderState(
            int Version,
            Guid OrderId,
            Guid CustomerId,
            decimal Total,
            string Status);

Snapshot Performance

EventsFull ReplayWith Snapshot (every 1k)
1001ms1ms (no snapshot yet)
1,00010ms10ms (snapshot at 1k)
10,000100ms1ms (snapshot) + 1ms (1k events)
50,000500ms1ms (snapshot) + 1ms (1k events)
100,0001s1ms (snapshot) + 1ms (1k events)

Checklist

  • [x] Projection builder с catch-up subscription pattern
  • [x] Event versioning handler с backward compatibility layer
  • [x] Snapshot mechanism для aggregate с 10k+ events

Практика


Distributed Event Architecture

Multi-Broker Topology

Federation

Federation связывает несколько RabbitMQ кластеров в единую mesh-сеть.

Data Center 1              Data Center 2
        ┌─────────────────┐       ┌─────────────────┐
        │  DC1-RabbitMQ   │       │  DC2-RabbitMQ   │
        │                 │       │                 │
        │  Exchange: A ───┼──FED──┼──▶ Exchange: A  │
        │  Queue: Q1      │       │  Queue: Q2      │
        │                 │       │                 │
        └─────────────────┘       └─────────────────┘
// Federation plugin configuration (RabbitMQ management UI)
        // POST /api/plugins
        {
            "plugins": ["rabbitmq_federation", "rabbitmq_federation_management"]
        }

        // Federation upstream
        {
            "vhost": "/",
            "name": "dc2-upstream",
            "queue": "orders",
            "exchange": "orders",
            "uri": "amqp://dc2-rabbitmq.example.com:5672",
            "ack-mode": "on-confirm",
            "max-hops": 1,
            "prefetch-count": 1000,
            "reconnect-delay": 5
        }

Mirroring (Classic Mirrors)

Primary Queue: orders
        ├── Replica on Broker 1 (Leader)
        ├── Replica on Broker 2 (Follower)
        └── Replica on Broker 3 (Follower)

        Write: Leader only → replicate to followers
        Read: Any node (but prefer leader)
// Policy-based mirroring
        // rabbitmqctl set_policy ha-all ".*" '{"ha-mode":"all","ha-sync-mode":"automatic"}' --apply-to queues

        // Alternative: Quorum Queues (recommended over classic mirrors)
        await channel.QueueDeclareAsync(
            queue: "orders",
            durable: true,
            exclusive: false,
            autoDelete: false,
            arguments: new Dictionary<string, object>
            {
                ["x-queue-type"] = "quorum"  // Raft-based quorum queue
            });

Cross-Datacenter Replication

DC1 (Primary)                    DC2 (Secondary)
        ┌──────────────┐                 ┌──────────────┐
        │ RabbitMQ     │◀───async───────│ RabbitMQ     │
        │ Cluster      │  replication   │ Cluster      │
        │              │                 │              │
        │ Write here   │                 │ Read-only    │
        └──────────────┘                 └──────────────┘
            │                                  │
            ▼                                  ▼
        Services                           Services

Event Mesh

Azure Event Grid

// Azure Event Grid publisher
        var client = new EventGridPublisherClient(
            new Uri("https://<topic>.westus2-1.eventgrid.azure.net/api/events"),
            new DefaultAzureCredential());

        var events = new[]
        {
            new CloudEvent
            {
                Id = Guid.NewGuid().ToString(),
                Source = "order-service",
                Type = "Order.Created",
                Subject = "orders/123",
                Data = new OrderCreatedEvent
                {
                    OrderId = Guid.NewGuid(),
                    CustomerId = customerId,
                    Total = total
                },
                Time = DateTimeOffset.UtcNow
            }
        };

        await client.SendEventsAsync(events);

AWS EventBridge

// AWS EventBridge publisher
        var eventBridgeClient = new AmazonEventBridgeClient();

        var request = new PutEventsRequest
        {
            Entries = new List<PutEventsRequestEntry>
            {
                new PutEventsRequestEntry
                {
                    Source = "order.service",
                    DetailType = "OrderCreated",
                    Detail = JsonSerializer.Serialize(new OrderCreatedEvent
                    {
                        OrderId = orderId,
                        CustomerId = customerId,
                        Total = total
                    }),
                    Time = DateTime.UtcNow,
                    EventBusName = "production"
                }
            }
        };

        await eventBridgeClient.PutEventsAsync(request);

Event Mesh Abstraction

// Abstraction over multiple event platforms
        public interface IEventMesh
        {
            Task PublishAsync<TEvent>(TEvent @event, CancellationToken ct = default) 
                where TEvent : class;
    
            Task SubscribeAsync<TEvent>(Func<TEvent, CancellationToken, Task> handler, 
                CancellationToken ct = default) where TEvent : class;
        }

        public class MultiBrokerEventMesh : IEventMesh
        {
            private readonly IBus _massTransitBus;
            private readonly EventGridPublisherClient? _eventGrid;
            private readonly AmazonEventBridgeClient? _eventBridge;
    
            public async Task PublishAsync<TEvent>(TEvent @event, CancellationToken ct = default)
            {
                // Primary: MassTransit (RabbitMQ)
                await _massTransitBus.Publish(@event, ct);
        
                // Secondary: Event Grid (for external consumers)
                if (_eventGrid != null)
                {
                    var cloudEvent = new CloudEvent
                    {
                        Id = Guid.NewGuid().ToString(),
                        Source = typeof(TEvent).Name,
                        Type = typeof(TEvent).Name,
                        Data = @event,
                        Time = DateTimeOffset.UtcNow
                    };
                    await _eventGrid.SendEventsAsync(new[] { cloudEvent }, ct);
                }
        
                // Tertiary: EventBridge (for AWS consumers)
                if (_eventBridge != null)
                {
                    var request = new PutEventsRequest
                    {
                        Entries = new List<PutEventsRequestEntry>
                        {
                            new PutEventsRequestEntry
                            {
                                Source = typeof(TEvent).Name,
                                DetailType = typeof(TEvent).Name,
                                Detail = JsonSerializer.Serialize(@event),
                                Time = DateTime.UtcNow
                            }
                        }
                    };
                    await _eventBridge.PutEventsAsync(request, ct);
                }
            }
        }

Cross-Service Event Contracts

Governance Process

┌──────────┐     ┌──────────────┐     ┌──────────┐     ┌──────────┐
        │ Service  │     │ Event        │     │ Schema   │     │ CI/CD    │
        │ Team A   │────▶│ Contract     │────▶│ Registry │────▶│ Pipeline │
        │          │     │ Definition   │     │          │     │          │
        └──────────┘     └──────────────┘     └──────────┘     └──────────┘
                                │                      │              │
                                ▼                      ▼              ▼
                           Review                  Validate       Deploy
                           (Team B)               (Breaking?)    (All services)

Automated Validation

// Schema validation for event contracts
        public class EventContractValidator
        {
            private readonly ISchemaRegistryClient _schemaRegistry;
            private readonly ILogger<EventContractValidator> _logger;
    
            public EventContractValidator(ISchemaRegistryClient schemaRegistry, ILogger<EventContractValidator> logger)
            {
                _schemaRegistry = schemaRegistry;
                _logger = logger;
            }
    
            public async Task<ValidationResult> ValidateAsync<TEvent>(TEvent @event, string topic)
            {
                var schema = await _schemaRegistry.GetLatestSchemaAsync($"{topic}-value");
        
                var serializer = new JsonSerializer<TEvent>();
                var json = serializer.Serialize(@event);
        
                // Validate against schema
                var errors = new List<string>();
        
                // 1. Required fields
                var requiredFields = schema.Fields.Where(f => f.Required).Select(f => f.Name);
                foreach (var field in requiredFields)
                {
                    if (!json.Contains($"\"{field}\""))
                    {
                        errors.Add($"Missing required field: {field}");
                    }
                }
        
                // 2. Type validation
                var typedFields = schema.Fields.Where(f => f.Type != null);
                foreach (var field in typedFields)
                {
                    if (json.Contains($"\"{field.Name}\"") && field.Type == "integer")
                    {
                        // Validate numeric type
                    }
                }
        
                return new ValidationResult
                {
                    IsValid = errors.Count == 0,
                    Errors = errors,
                    SchemaVersion = schema.Version
                };
            }
        }

        public record ValidationResult(bool IsValid, IReadOnlyList<string> Errors, int SchemaVersion);

        // CI pipeline integration
        public class CiEventValidator
        {
            public async Task<bool> ValidateEventContractsAsync(string pullRequestPath)
            {
                var changedFiles = await GetChangedFilesAsync(pullRequestPath);
                var eventFiles = changedFiles.Where(f => f.Contains("/events/"));
        
                var results = new List<ValidationResult>();
        
                foreach (var file in eventFiles)
                {
                    var contract = await LoadEventContractAsync(file);
                    var result = await ValidateAsync(contract, Path.GetFileNameWithoutExtension(file));
                    results.Add(result);
                }
        
                return results.All(r => r.IsValid);
            }
        }

Event Deprecation

// Mark event as deprecated
        [Obsolete("Use IOrderCreatedV2 instead. Will be removed in v3.0.")]
        public interface IOrderCreatedV1
        {
            Guid OrderId { get; }
            decimal Total { get; }
        }

        // Version lifecycle
        // V1: Active (2024-01-01 — 2024-06-01)
        // V2: Active (2024-06-01 — present)
        // V3: Beta (preview)

        // Deprecation notice in consumer
        public class OrderCreatedV1Consumer : IConsumer<IOrderCreatedV1>
        {
            public async Task Consume(ConsumeContext<IOrderCreatedV1> context)
            {
                Logger.LogWarning("Received deprecated event IOrderCreatedV1. " +
                    "Please migrate to IOrderCreatedV2.");
        
                // Migrate to V2
                var v2 = new OrderCreatedV2
                {
                    OrderId = context.Message.OrderId,
                    Total = context.Message.Total,
                    Currency = "USD"
                };
        
                // Republish as V2
                await context.Publish(v2);
            }
        }

Zero Data Loss Configuration

Ack Strategies

// RabbitMQ: Publisher confirms + Consumer manual ack
        // 1. Publisher confirms
        await channel.ConfirmSelectAsync();
        await channel.BasicPublishAsync(...);
        await channel.WaitForConfirmsAsync();

        // 2. Consumer manual ack
        await channel.BasicConsumeAsync(queue, false, consumer);  // autoAck=false
        await channel.BasicAckAsync(deliveryTag, multiple: false);

        // 3. Queue durability
        await channel.QueueDeclareAsync(queue, durable: true, ...);

        // 4. Message persistence
        var props = new BasicProperties { DeliveryMode = DeliveryModes.Persistent };

Replication Factor Tuning

# Kafka: production config
        replication.factor: 3
        min.insync.replicas: 2
        acks: all
        # Guarantees: message written to 3 replicas, at least 2 must confirm
        # Survives: 1 broker failure without data loss

        # RabbitMQ: Quorum Queues
        x-queue-type: quorum
        x-quorum-min-sync-factor: 2
        # Raft-based consensus
        # Survives: floor((n-1)/2) failures

End-to-End Data Loss Prevention

Producer                          Broker                         Consumer
          │                                  │                              │
          │── Confirm Select ──────────────▶│                              │
          │── Publish (persistent) ────────▶│── Persist to disk ──────────▶│
          │◀── Confirm ─────────────────────│                              │
          │                                  │                              │── Process
          │                                  │                              │── Manual ACK ──────▶│
          │                                  │◀── ACK ──────────────────────│
// Complete zero-loss configuration
        public class ZeroLossConfiguration
        {
            // Producer
            public static ProducerConfig GetKafkaProducerConfig() => new()
            {
                BootstrapServers = "kafka://broker1:9092,kafka://broker2:9092,kafka://broker3:9092",
                Acks = Acks.All,
                Idempotence = true,
                Retries = int.MaxValue,
                RetryBackoffMs = 100,
                MaxInFlightRequests = 5,
                EnableIdempotence = true,
               acks = "all",
                min.insync.replicas = 2
            };
    
            // Consumer
            public static ConsumerConfig GetKafkaConsumerConfig() => new()
            {
                BootstrapServers = "kafka://broker1:9092,kafka://broker2:9092,kafka://broker3:9092",
                GroupId = "zero-loss-consumer",
                EnableAutoCommit = false,  // Manual commit
                AutoOffsetReset = AutoOffsetReset.None,  // Don't auto-reset
                IsolationLevel = IsolationLevel.ReadCommitted,
                EnablePartitionEof = true
            };
        }

Checklist

  • [x] Cross-datacenter event replication topology
  • [x] Event contract governance process с automated validation
  • [x] Zero data loss configuration с tuned ack/replication settings

Практика


Stream Processing

Windowing

Types of Windows

Time stream:  |---t1---|---t2---|---t3---|---t4---|---t5---|---t6---|

        Tumbling (1min, no overlap):
                      |---------|---------|---------|
                      t1-t2     t3-t4     t5-t6

        Hopping (1min, 30s slide):
                      |---------|
                           |---------|
                                |---------|

        Sliding (based on event time, 1min, 30s slide):
                      |---------|
                           |---------|
                                |---------|

        Session (gap=30s):
                      |----|          |----|             |--------|
                      session1       session2          session3

Tumbling Window

// MassTransit with windowed aggregation
        public class TumblingWindowAggregator : IConsumer<IOrderEvent>
        {
            private readonly ConcurrentDictionary<string, WindowState> _windows = new();
            private readonly ILogger<TumblingWindowAggregator> _logger;
            private readonly TimeSpan _windowSize = TimeSpan.FromMinutes(1);
            private readonly Timer _timer;
    
            public TumblingWindowAggregator(ILogger<TumblingWindowAggregator> logger)
            {
                _logger = logger;
                _timer = new Timer(ProcessExpiredWindows, null, _windowSize, _windowSize);
            }
    
            public async Task Consume(ConsumeContext<IOrderEvent> context)
            {
                var windowKey = GetWindowKey(DateTimeOffset.UtcNow, _windowSize);
                var window = _windows.GetOrAdd(windowKey, _ => new WindowState { Key = windowKey });
        
                lock (window)
                {
                    switch (context.Message)
                    {
                        case IOrderCreated created:
                            window.OrderCount++;
                            window.TotalAmount += created.Total;
                            window.Orders.Add(created);
                            break;
                        case IOrderCompleted completed:
                            window.CompletedCount++;
                            break;
                    }
                }
        
                _logger.LogDebug("Window {WindowKey}: {Count} orders", windowKey, window.OrderCount);
            }
    
            private void ProcessExpiredWindows(object? state)
            {
                var now = DateTimeOffset.UtcNow;
                var expiredKeys = _windows
                    .Where(kvp => kvp.Value.CreatedAt < now.AddTicks(-_windowSize.Ticks))
                    .Select(kvp => kvp.Key)
                    .ToList();
        
                foreach (var key in expiredKeys)
                {
                    if (_windows.TryRemove(key, out var window))
                    {
                        ProcessWindowAsync(window).GetAwaiter().GetResult();
                    }
                }
            }
    
            private async Task ProcessWindowAsync(WindowState window)
            {
                var avgOrderValue = window.OrderCount > 0 
                    ? window.TotalAmount / window.OrderCount 
                    : 0;
        
                var analytics = new WindowAnalytics
                {
                    WindowStart = window.CreatedAt,
                    WindowEnd = window.CreatedAt.Add(_windowSize),
                    OrderCount = window.OrderCount,
                    TotalAmount = window.TotalAmount,
                    AverageOrderValue = avgOrderValue,
                    CompletedCount = window.CompletedCount,
                    CompletionRate = window.OrderCount > 0 
                        ? (double)window.CompletedCount / window.OrderCount 
                        : 0
                };
        
                // Publish window result
                await _bus.Publish(analytics);
                _logger.LogInformation("Window {Start}-{End}: {Count} orders, ${Total}",
                    analytics.WindowStart, analytics.WindowEnd, analytics.OrderCount, analytics.TotalAmount);
            }
    
            private string GetWindowKey(DateTimeOffset time, TimeSpan windowSize)
            {
                var ticks = time.Ticks / windowSize.Ticks;
                return new DateTimeOffset(ticks * windowSize.Ticks).ToString("yyyy-MM-dd-HH-mm");
            }
        }

        public class WindowState
        {
            public string Key { get; set; } = null!;
            public DateTimeOffset CreatedAt { get; set; } = DateTimeOffset.UtcNow;
            public int OrderCount { get; set; }
            public decimal TotalAmount { get; set; }
            public int CompletedCount { get; set; }
            public List<IOrderCreated> Orders { get; set; } = new();
        }

        public record WindowAnalytics(
            DateTimeOffset WindowStart,
            DateTimeOffset WindowEnd,
            int OrderCount,
            decimal TotalAmount,
            double AverageOrderValue,
            int CompletedCount,
            double CompletionRate);

Hopping Window

public class HoppingWindowAggregator
        {
            private readonly ConcurrentDictionary<string, WindowState> _windows = new();
            private readonly TimeSpan _windowSize = TimeSpan.FromMinutes(5);
            private readonly TimeSpan _slide = TimeSpan.FromMinutes(1);
    
            public void Consume(IOrderEvent @event)
            {
                // Calculate all windows this event belongs to
                var affectedWindows = GetHoppingWindows(@event.Timestamp, _windowSize, _slide);
        
                foreach (var windowKey in affectedWindows)
                {
                    var window = _windows.GetOrAdd(windowKey, _ => new WindowState { Key = windowKey });
                    lock (window)
                    {
                        window.OrderCount++;
                        if (@event is IOrderCreated created)
                        {
                            window.TotalAmount += created.Total;
                        }
                    }
                }
            }
    
            private IEnumerable<string> GetHoppingWindows(DateTimeOffset eventTime, TimeSpan windowSize, TimeSpan slide)
            {
                var windows = new List<string>();
                var windowStart = new DateTimeOffset(eventTime.Ticks / slide.Ticks * slide.Ticks);
        
                // Event belongs to windows starting from (eventTime - windowSize) to eventTime
                var earliestStart = windowStart.AddTicks(-(windowSize.Ticks - slide.Ticks));
        
                for (var start = earliestStart; start <= windowStart; start = start.Add(slide))
                {
                    windows.Add(start.ToString("o"));
                }
        
                return windows;
            }
        }

Aggregation

Stateful Processing

public class StatefulAggregator
        {
            private readonly ConcurrentDictionary<Guid, CustomerState> _states = new();
            private readonly ILogger<StatefulAggregator> _logger;
    
            public StatefulAggregator(ILogger<StatefulAggregator> logger)
            {
                _logger = logger;
            }
    
            public async Task<AggregationResult> AggregateAsync(IOrderEvent @event)
            {
                var customerId = GetCustomerId(@event);
                var state = _states.AddOrUpdate(
                    customerId,
                    _ => new CustomerState { CustomerId = customerId },
                    (_, existing) => existing);
        
                lock (state)
                {
                    switch (@event)
                    {
                        case IOrderCreated created:
                            state.OrderCount++;
                            state.TotalSpent += created.Total;
                            state.LastOrderDate = created.Timestamp;
                            state.OrderIds.Add(created.OrderId);
                            break;
                        case IOrderCompleted completed:
                            state.CompletedCount++;
                            break;
                        case IOrderCancelled cancelled:
                            state.CancelledCount++;
                            break;
                    }
            
                    return new AggregationResult
                    {
                        CustomerId = customerId,
                        OrderCount = state.OrderCount,
                        TotalSpent = state.TotalSpent,
                        CompletedCount = state.CompletedCount,
                        CancelledCount = state.CancelledCount,
                        LastOrderDate = state.LastOrderDate,
                        AverageOrderValue = state.OrderCount > 0 
                            ? state.TotalSpent / state.OrderCount 
                            : 0
                    };
                }
            }
    
            private Guid GetCustomerId(IOrderEvent @event) => (@event as IOrderCreated)?.CustomerId ?? Guid.Empty;
        }

        public class CustomerState
        {
            public Guid CustomerId { get; set; }
            public int OrderCount { get; set; }
            public decimal TotalSpent { get; set; }
            public int CompletedCount { get; set; }
            public int CancelledCount { get; set; }
            public DateTimeOffset? LastOrderDate { get; set; }
            public List<Guid> OrderIds { get; set; } = new();
        }

        public record AggregationResult(
            Guid CustomerId,
            int OrderCount,
            decimal TotalSpent,
            int CompletedCount,
            int CancelledCount,
            DateTimeOffset? LastOrderDate,
            double AverageOrderValue);

Watermark Handling

// Watermark: tracks event time progress
        // Handles late events by waiting before finalizing results

        public class WatermarkManager
        {
            private readonly ConcurrentDictionary<string, DateTimeOffset> _watermarks = new();
            private readonly TimeSpan _allowedLateness = TimeSpan.FromMinutes(10);
            private readonly ILogger<WatermarkManager> _logger;
    
            public WatermarkManager(ILogger<WatermarkManager> logger)
            {
                _logger = logger;
            }
    
            public DateTimeOffset GetWatermark(string windowKey)
            {
                return _watermarks.GetOrAdd(windowKey, _ => DateTimeOffset.UtcNow);
            }
    
            public void UpdateWatermark(string windowKey, DateTimeOffset eventTime)
            {
                _watermarks.AddOrUpdate(
                    windowKey,
                    eventTime,
                    (_, current) => Math.Max(current, eventTime));
            }
    
            public bool IsEventLate(string windowKey, DateTimeOffset eventTime)
            {
                var watermark = GetWatermark(windowKey);
                var isLate = eventTime < watermark.AddTicks(-_allowedLateness.Ticks);
        
                if (isLate)
                {
                    _logger.LogWarning("Late event detected: eventTime={EventTime}, watermark={Watermark}, allowedLateness={Lateness}",
                        eventTime, watermark, _allowedLateness);
                }
        
                return isLate;
            }
    
            public bool IsWindowFinalized(string windowKey, DateTimeOffset windowEnd)
            {
                var watermark = GetWatermark(windowKey);
                return watermark > windowEnd.AddTicks(_allowedLateness.Ticks);
            }
        }

Join Patterns

Stream-Table Join

// Enrich order events with customer reference data
        public class StreamTableJoinEnricher
        {
            private readonly ConcurrentDictionary<Guid, CustomerReference> _customers = new();
            private readonly ILogger<StreamTableJoinEnricher> _logger;
    
            public StreamTableJoinEnricher(ILogger<StreamTableJoinEnricher> logger)
            {
                _logger = logger;
            }
    
            // Load reference data (from database, cache, etc.)
            public async Task LoadReferenceDataAsync()
            {
                var customers = await _customerRepository.GetAllAsync();
                foreach (var customer in customers)
                {
                    _customers[customer.Id] = new CustomerReference
                    {
                        Id = customer.Id,
                        Name = customer.Name,
                        Tier = customer.Tier,
                        Country = customer.Country,
                        DiscountRate = customer.DiscountRate
                    };
                }
                _logger.LogInformation("Loaded {Count} customer references", _customers.Count);
            }
    
            // Join stream event with table lookup
            public async Task<EnrichedOrderEvent?> EnrichAsync(IOrderCreated @event)
            {
                if (!_customers.TryGetValue(@event.CustomerId, out var customer))
                {
                    _logger.LogWarning("Customer {CustomerId} not found in reference data", @event.CustomerId);
                    return null;
                }
        
                var discount = @event.Total * customer.DiscountRate;
        
                return new EnrichedOrderEvent
                {
                    OrderId = @event.OrderId,
                    CustomerId = @event.CustomerId,
                    CustomerName = customer.Name,
                    CustomerTier = customer.Tier,
                    CustomerCountry = customer.Country,
                    Total = @event.Total,
                    Discount = discount,
                    NetAmount = @event.Total - discount,
                    Currency = @event.Currency,
                    Timestamp = @event.Timestamp
                };
            }
        }

        public record CustomerReference(
            Guid Id,
            string Name,
            string Tier,
            string Country,
            decimal DiscountRate);

        public record EnrichedOrderEvent(
            Guid OrderId,
            Guid CustomerId,
            string CustomerName,
            string CustomerTier,
            string CustomerCountry,
            decimal Total,
            decimal Discount,
            decimal NetAmount,
            string Currency,
            DateTimeOffset Timestamp);

Stream-Stream Join

// Join OrderCreated with OrderShipped events
        public class StreamStreamJoiner
        {
            private readonly ConcurrentDictionary<Guid, OrderCreatedState> _pendingOrders = new();
            private readonly ILogger<StreamStreamJoiner> _logger;
    
            public StreamStreamJoiner(ILogger<StreamStreamJoiner> logger)
            {
                _logger = logger;
            }
    
            public async Task<OrderJourney?> OnOrderCreated(IOrderCreated @event)
            {
                var state = new OrderCreatedState { Event = @event, CreatedAt = DateTimeOffset.UtcNow };
                _pendingOrders[@event.OrderId] = state;
                return null;
            }
    
            public async Task<OrderJourney?> OnOrderShipped(IOrderShipped @event)
            {
                if (!_pendingOrders.TryRemove(@event.OrderId, out var state))
                {
                    _logger.LogWarning("Order {OrderId} shipped but no created event found", @event.OrderId);
                    return null;
                }
        
                var journey = new OrderJourney
                {
                    OrderId = @event.OrderId,
                    CreatedAt = state.Event.Timestamp,
                    ShippedAt = @event.ShippedAt,
                    TrackingNumber = @event.TrackingNumber,
                    Duration = @event.ShippedAt - state.Event.Timestamp
                };
        
                _logger.LogInformation("Order {OrderId}: created={Created} shipped={Shipped} (duration={Duration})",
                    journey.OrderId, journey.CreatedAt, journey.ShippedAt, journey.Duration);
        
                return journey;
            }
        }

        public class OrderCreatedState
        {
            public IOrderCreated Event { get; set; } = null!;
            public DateTimeOffset CreatedAt { get; set; }
        }

        public record OrderJourney(
            Guid OrderId,
            DateTimeOffset CreatedAt,
            DateTimeOffset ShippedAt,
            string TrackingNumber,
            TimeSpan Duration);

Late Events

Watermark Strategy

// Watermark-based late event handling
        public class LateEventStrategy
        {
            private readonly WatermarkManager _watermarkManager;
            private readonly ILogger<LateEventStrategy> _logger;
    
            public LateEventStrategy(WatermarkManager watermarkManager, ILogger<LateEventStrategy> logger)
            {
                _watermarkManager = watermarkManager;
                _logger = logger;
            }
    
            public async Task<EventOutcome> ProcessWithWatermark(string windowKey, IOrderEvent @event)
            {
                var eventTime = GetEventTime(@event);
                var windowEnd = GetWindowEnd(windowKey);
        
                _watermarkManager.UpdateWatermark(windowKey, eventTime);
        
                if (_watermarkManager.IsEventLate(windowKey, eventTime))
                {
                    // Late event: send to DLQ or separate late-events topic
                    _logger.LogWarning("Late event: {EventType} at {EventTime}", 
                        @event.GetType().Name, eventTime);
                    return new EventOutcome { IsLate = true, Event = @event };
                }
        
                // On-time event: process normally
                await ProcessEventAsync(@event);
                return new EventOutcome { IsLate = false, Event = @event };
            }
    
            private DateTimeOffset GetEventTime(IOrderEvent @event) => (@event as IOrderCreated)?.Timestamp ?? DateTimeOffset.UtcNow;
            private DateTimeOffset GetWindowEnd(string windowKey) => DateTimeOffset.Parse(windowKey);
    
            private Task ProcessEventAsync(IOrderEvent @event) => Task.CompletedTask;
        }

        public record EventOutcome(bool IsLate, IOrderEvent Event);

Event Time vs Processing Time

Event Time:     Когда событие произошло (из timestamp события)
        Processing Time: Когда событие обрабатывается (текущее время)

        Пример:
          10:00 — событие создано (event time)
          10:05 — событие доставлено (processing time, 5 min delay)
          10:05 — window for 10:00-10:01 (event time)
  
        При event time windowing: window закрывается в 10:01 + watermark delay
        При processing time windowing: window закрывается в 10:01 (реальное время)

State Backend

Options

BackendUse CaseDurabilityScale
In-memoryTesting, statelessNoLimited
FilesystemDevelopmentYesSmall
RocksDBProduction, large stateYesLarge
External (Redis)Shared state, cross-instanceYesMedium

State Backend Configuration

// In-memory state (testing)
        public class InMemoryStateBackend : IStateBackend
        {
            private readonly ConcurrentDictionary<string, byte[]> _state = new();
    
            public Task<byte[]> GetAsync(string key) => 
                Task.FromResult(_state.GetValueOrDefault(key, Array.Empty<byte>()));
    
            public Task SetAsync(string key, byte[] value)
            {
                _state[key] = value;
                return Task.CompletedTask;
            }
    
            public Task DeleteAsync(string key)
            {
                _state.TryRemove(key, out _);
                return Task.CompletedTask;
            }
        }

        // Redis state (production)
        public class RedisStateBackend : IStateBackend
        {
            private readonly IDatabase _redis;
            private readonly TimeSpan _ttl = TimeSpan.FromHours(24);
    
            public RedisStateBackend(IDatabase redis)
            {
                _redis = redis;
            }
    
            public async Task<byte[]> GetAsync(string key)
            {
                var data = await _redis.StringGetAsync(key);
                return data.HasValue ? data.ToArray() : Array.Empty<byte>();
            }
    
            public async Task SetAsync(string key, byte[] value)
            {
                await _redis.StringSetAsync(key, value, _ttl);
            }
    
            public async Task DeleteAsync(string key)
            {
                await _redis.KeyDeleteAsync(key);
            }
        }

Практика

// Real-time analytics dashboard
        public class RealTimeAnalyticsProcessor
        {
            private readonly ConcurrentDictionary<string, WindowState> _windows = new();
            private readonly WatermarkManager _watermarkManager;
            private readonly IBus _bus;
            private readonly ILogger<RealTimeAnalyticsProcessor> _logger;
            private readonly TimeSpan _windowSize = TimeSpan.FromMinutes(5);
            private readonly Timer _cleanupTimer;
    
            public RealTimeAnalyticsProcessor(
                WatermarkManager watermarkManager,
                IBus bus,
                ILogger<RealTimeAnalyticsProcessor> logger)
            {
                _watermarkManager = watermarkManager;
                _bus = bus;
                _logger = logger;
                _cleanupTimer = new Timer(CleanupExpiredWindows, null, _windowSize, _windowSize);
            }
    
            public async Task ProcessOrderAsync(IOrderCreated @event)
            {
                var windowKey = GetWindowKey(@event.Timestamp);
                var window = _windows.GetOrAdd(windowKey, _ => new WindowState { Key = windowKey });
        
                lock (window)
                {
                    window.OrderCount++;
                    window.TotalAmount += @event.Total;
                    window.UniqueCustomers.Add(@event.CustomerId);
                    window.Categories.AddRange(@event.Items.Select(i => i.Category));
                }
        
                _watermarkManager.UpdateWatermark(windowKey, @event.Timestamp);
            }
    
            private void CleanupExpiredWindows(object? state)
            {
                var now = DateTimeOffset.UtcNow;
                var expired = _windows
                    .Where(kvp => kvp.Value.CreatedAt < now.AddTicks(-_windowSize.Ticks) && 
                                  _watermarkManager.IsWindowFinalized(kvp.Key, kvp.Value.CreatedAt.Add(_windowSize)))
                    .ToList();
        
                foreach (var kvp in expired)
                {
                    if (_windows.TryRemove(kvp.Key, out var window))
                    {
                        PublishWindowResult(window);
                    }
                }
            }
    
            private async Task PublishWindowResult(WindowState window)
            {
                var analytics = new RealTimeAnalytics
                {
                    WindowStart = window.CreatedAt,
                    WindowEnd = window.CreatedAt.Add(_windowSize),
                    OrderCount = window.OrderCount,
                    TotalRevenue = window.TotalAmount,
                    UniqueCustomers = window.UniqueCustomers.Count,
                    AvgOrderValue = window.OrderCount > 0 ? window.TotalAmount / window.OrderCount : 0,
                    TopCategories = window.Categories
                        .GroupBy(c => c)
                        .OrderByDescending(g => g.Count())
                        .Take(5)
                        .Select(g => new CategoryStats { Category = g.Key, Count = g.Count() })
                        .ToList()
                };
        
                await _bus.Publish(analytics);
                _logger.LogInformation("Published analytics: {Count} orders, ${Revenue}", 
                    analytics.OrderCount, analytics.TotalRevenue);
            }
    
            private string GetWindowKey(DateTimeOffset time)
            {
                var ticks = time.Ticks / _windowSize.Ticks * _windowSize.Ticks;
                return new DateTimeOffset(ticks).ToString("o");
            }
        }

        public class WindowState
        {
            public string Key { get; set; } = null!;
            public DateTimeOffset CreatedAt { get; set; } = DateTimeOffset.UtcNow;
            public int OrderCount { get; set; }
            public decimal TotalAmount { get; set; }
            public HashSet<Guid> UniqueCustomers { get; set; } = new();
            public List<string> Categories { get; set; } = new();
        }

        public record RealTimeAnalytics(
            DateTimeOffset WindowStart,
            DateTimeOffset WindowEnd,
            int OrderCount,
            decimal TotalRevenue,
            int UniqueCustomers,
            double AvgOrderValue,
            IReadOnlyList<CategoryStats> TopCategories);

        public record CategoryStats(string Category, int Count);

Checklist

  • [x] Windowed aggregation для real-time analytics dashboard (tumbling windows)
  • [x] Stream-table join для enrichment с reference data
  • [x] Watermark strategy для handling late events

Monitoring и Operations

Consumer Lag Monitoring

Что такое Consumer Lag

Consumer Lag = (Latest Offset in Topic) - (Current Consumer Offset)

        Пример:
          Topic "orders":
            Latest offset: 100,000
            Consumer group "processors":
              Consumer A: offset 40,000 (partition 0)
              Consumer B: offset 35,000 (partition 1)
              Consumer C: offset 25,000 (partition 2)
  
          Lag per partition:
            Partition 0: 100,000 - 40,000 = 60,000
            Partition 1: 100,000 - 35,000 = 65,000
            Partition 2: 100,000 - 25,000 = 75,000
  
          Total lag: 200,000 messages

Detection и Alerting

public class ConsumerLagMonitor : BackgroundService
        {
            private readonly IConsumer<string, string> _consumer;
            private readonly ILogger<ConsumerLagMonitor> _logger;
            private readonly IScalingService _scalingService;
            private readonly TimeSpan _monitoringInterval = TimeSpan.FromMinutes(1);
    
            protected override async Task ExecuteAsync(CancellationToken stoppingToken)
            {
                while (!stoppingToken.IsCancellationRequested)
                {
                    try
                    {
                        var lagData = await CollectLagAsync(stoppingToken);
                
                        foreach (var lag in lagData)
                        {
                            _logger.LogInformation(
                                "Consumer lag: group={Group} topic={Topic} partition={Partition} lag={Lag}",
                                lag.Group, lag.Topic, lag.Partition, lag.Lag);
                    
                            // Alert on high lag
                            if (lag.Lag > 10000)
                            {
                                _logger.LogWarning("HIGH LAG ALERT: {Group}/{Topic}/{Partition} = {Lag}",
                                    lag.Group, lag.Topic, lag.Partition, lag.Lag);
                        
                                await _scalingService.TriggerScalingAsync(lag.Group);
                            }
                        }
                    }
                    catch (Exception ex)
                    {
                        _logger.LogError(ex, "Error collecting consumer lag");
                    }
            
                    await Task.Delay(_monitoringInterval, stoppingToken);
                }
            }
    
            private async Task<IReadOnlyList<ConsumerLag>> CollectLagAsync(CancellationToken ct)
            {
                var lags = new List<ConsumerLag>();
        
                // Kafka: use admin client to get offsets
                var adminClient = new AdminClientBuilder().Build();
        
                var topics = await adminClient.GetMetadataAsync(TimeSpan.FromSeconds(10));
        
                foreach (var topic in topics.Topics)
                {
                    if (topic.HasError) continue;
            
                    // Get latest offsets
                    var endOffsets = await adminClient.GetEndOffsetsAsync(
                        topic.Partitions.Select(p => new TopicPartition(topic.Topic, p.PartitionId)),
                        TimeSpan.FromSeconds(5));
            
                    // Get committed offsets for each consumer group
                    var consumerGroups = await GetConsumerGroupsAsync(adminClient, topic.Topic);
            
                    foreach (var group in consumerGroups)
                    {
                        foreach (var partition in topic.Partitions)
                        {
                            var topicPartition = new TopicPartition(topic.Topic, partition.PartitionId);
                            var committed = await adminClient.GetCommittedOffsetsAsync(
                                new[] { new TopicPartitionOffset(topicPartition, Offset.Unset) },
                                TimeSpan.FromSeconds(5));
                    
                            var latest = endOffsets[topicPartition];
                            var committedOffset = committed.FirstOrDefault(tpo => tpo.TopicPartition == topicPartition);
                    
                            var lag = latest.Offset - (committedOffset?.Offset ?? Offset.Unset);
                    
                            if (lag > Offset.Unset)
                            {
                                lags.Add(new ConsumerLag
                                {
                                    Group = group,
                                    Topic = topic.Topic,
                                    Partition = partition.PartitionId,
                                    Lag = lag,
                                    Timestamp = DateTimeOffset.UtcNow
                                });
                            }
                        }
                    }
                }
        
                return lags;
            }
    
            private Task<IReadOnlyList<string>> GetConsumerGroupsAsync(AdminClientBuilder adminClient, string topic) =>
                Task.FromResult<IReadOnlyList<string>>(new List<string> { "order-processors", "analytics" });
        }

        public record ConsumerLag(
            string Group,
            string Topic,
            int Partition,
            long Lag,
            DateTimeOffset Timestamp);

Message Throughput Metrics

Producer/Consumer Rates

public class MessagingMetrics
        {
            private readonly ILogger<MessagingMetrics> _logger;
            private readonly Histogram _messageLatency;
            private readonly Counter _messagesSent;
            private readonly Counter _messagesReceived;
            private readonly Counter _messagesFailed;
            private readonly Counter _messagesDlq;
    
            public MessagingMetrics(ILogger<MessagingMetrics> logger, IMeterFactory meterFactory)
            {
                _logger = logger;
        
                var meter = meterFactory.Create("messaging");
        
                _messageLatency = meter.CreateHistogram<double>("message.latency.ms", 
                    unit: "ms", description: "Message processing latency");
                _messagesSent = meter.CreateCounter<long>("messages.sent.total", 
                    description: "Total messages sent");
                _messagesReceived = meter.CreateCounter<long>("messages.received.total", 
                    description: "Total messages received");
                _messagesFailed = meter.CreateCounter<long>("messages.failed.total", 
                    description: "Total messages failed");
                _messagesDlq = meter.CreateCounter<long>("messages.dead-lettered.total", 
                    description: "Total messages sent to DLQ");
            }
    
            public void RecordMessageSent(double latencyMs)
            {
                _messagesSent.Add(1);
                _messageLatency.Record(latencyMs);
            }
    
            public void RecordMessageReceived(double latencyMs)
            {
                _messagesReceived.Add(1);
                _messageLatency.Record(latencyMs);
            }
    
            public void RecordMessageFailed() => _messagesFailed.Add(1);
            public void RecordMessageDlq() => _messagesDlq.Add(1);
        }

        // Prometheus metrics export
        // POST /metrics
        // messaging_messages_sent_total 150000
        // messaging_messages_received_total 148500
        // messaging_messages_failed_total 1500
        // messaging_messages_dead_lettered_total 350
        // messaging_message_latency_ms_sum 45000.5
        // messaging_message_latency_ms_count 148500
        // messaging_message_latency_ms_bucket{le="10"} 120000
        // messaging_message_latency_ms_bucket{le="50"} 145000
        // messaging_message_latency_ms_bucket{le="100"} 148000

Queue Depth Alerting

Unbounded Growth Prevention

public class QueueDepthMonitor : BackgroundService
        {
            private readonly IConnection _connection;
            private readonly ILogger<QueueDepthMonitor> _logger;
            private readonly IAlertService _alertService;
    
            protected override async Task ExecuteAsync(CancellationToken stoppingToken)
            {
                while (!stoppingToken.IsCancellationRequested)
                {
                    try
                    {
                        var queueStats = await GetQueueStatsAsync(stoppingToken);
                
                        foreach (var queue in queueStats)
                        {
                            _logger.LogInformation(
                                "Queue depth: {Queue} messages={Depth} ready={Ready} unack={Unacked}",
                                queue.Name, queue.Total, queue.Ready, queue.Unacked);
                    
                            // Alert thresholds
                            if (queue.Total > 100000)
                            {
                                await _alertService.SendAlertAsync(new QueueAlert
                                {
                                    Queue = queue.Name,
                                    Type = AlertType.CriticalDepth,
                                    Depth = queue.Total,
                                    Message = $"Queue {queue.Name} has {queue.Total} messages"
                                });
                            }
                            else if (queue.Total > 50000)
                            {
                                await _alertService.SendAlertAsync(new QueueAlert
                                {
                                    Queue = queue.Name,
                                    Type = AlertType.WarningDepth,
                                    Depth = queue.Total,
                                    Message = $"Queue {queue.Name} approaching capacity: {queue.Total} messages"
                                });
                            }
                    
                            // Unacked messages alert
                            if (queue.Unacked > 1000)
                            {
                                await _alertService.SendAlertAsync(new QueueAlert
                                {
                                    Queue = queue.Name,
                                    Type = AlertType.HighUnacked,
                                    Depth = queue.Unacked,
                                    Message = $"Queue {queue.Name} has {queue.Unacked} unacked messages"
                                });
                            }
                        }
                    }
                    catch (Exception ex)
                    {
                        _logger.LogError(ex, "Error monitoring queue depths");
                    }
            
                    await Task.Delay(TimeSpan.FromSeconds(30), stoppingToken);
                }
            }
    
            private async Task<IReadOnlyList<QueueStats>> GetQueueStatsAsync(CancellationToken ct)
            {
                // RabbitMQ Management API
                using var http = new HttpClient();
                var response = await http.GetStringAsync("http://localhost:15672/api/queues", ct);
        
                // Parse JSON and return stats
                return JsonSerializer.Deserialize<List<QueueStats>>() ?? new List<QueueStats>();
            }
        }

        public record QueueStats(
            string Name,
            long Total,       // Ready + Unacked
            long Ready,       // Messages waiting to be delivered
            long Unacked);    // Delivered but not acknowledged

        public record QueueAlert(
            string Queue,
            AlertType Type,
            long Depth,
            string Message);

        public enum AlertType
        {
            WarningDepth,
            CriticalDepth,
            HighUnacked,
            DlqGrowth
        }

Dead Letter Queue Analysis

Identifying Systematic Failures

public class DlqAnalyzer
        {
            private readonly AppDbContext _context;
            private readonly ILogger<DlqAnalyzer> _logger;
    
            public DlqAnalyzer(AppDbContext context, ILogger<DlqAnalyzer> logger)
            {
                _context = context;
                _logger = logger;
            }
    
            public async Task<DlqAnalysis> AnalyzeAsync(TimeSpan lookback = default!)
            {
                lookback = lookback == default ? TimeSpan.FromHours(24) : lookback;
        
                var dlqMessages = await _context.DeadLetterMessages
                    .Where(m => m.CreatedAt > DateTimeOffset.UtcNow.AddTicks(-lookback.Ticks))
                    .ToListAsync();
        
                // Pattern analysis
                var errorPatterns = dlqMessages
                    .GroupBy(m => m.ErrorType)
                    .Select(g => new ErrorPattern
                    {
                        ErrorType = g.Key,
                        Count = g.Count(),
                        Percentage = (double)g.Count() / dlqMessages.Count * 100,
                        FirstOccurrence = g.Min(m => m.CreatedAt),
                        LastOccurrence = g.Max(m => m.CreatedAt),
                        SampleMessageId = g.First().MessageId
                    })
                    .OrderByDescending(p => p.Count)
                    .ToList();
        
                // Time series
                var hourlyDistribution = dlqMessages
                    .GroupBy(m => m.CreatedAt.Date.AddHours(m.CreatedAt.Hour))
                    .Select(g => new HourlyDistribution
                    {
                        Hour = g.Key,
                        Count = g.Count()
                    })
                    .OrderBy(d => d.Hour)
                    .ToList();
        
                // Systematic failure detection
                var isSystematic = errorPatterns.Any(p => p.Percentage > 50);
        
                return new DlqAnalysis
                {
                    TotalMessages = dlqMessages.Count,
                    ErrorPatterns = errorPatterns,
                    HourlyDistribution = hourlyDistribution,
                    IsSystematicFailure = isSystematic,
                    RecommendedAction = GetRecommendedAction(errorPatterns, isSystematic)
                };
            }
    
            private string GetRecommendedAction(IReadOnlyList<ErrorPattern> patterns, bool isSystematic)
            {
                if (!isSystematic) return "No systematic failure pattern detected";
        
                var topPattern = patterns.First();
                return topPattern.ErrorType switch
                {
                    "TimeoutException" => "Check downstream service health and increase timeout",
                    "InvalidOperationException" => "Review message processing logic",
                    "DbUpdateException" => "Check database connectivity and constraints",
                    _ => $"Investigate {topPattern.ErrorType} — {topPattern.Count} occurrences"
                };
            }
        }

        public record DlqAnalysis(
            int TotalMessages,
            IReadOnlyList<ErrorPattern> ErrorPatterns,
            IReadOnlyList<HourlyDistribution> HourlyDistribution,
            bool IsSystematicFailure,
            string RecommendedAction);

        public record ErrorPattern(
            string ErrorType,
            int Count,
            double Percentage,
            DateTimeOffset FirstOccurrence,
            DateTimeOffset LastOccurrence,
            string SampleMessageId);

        public record HourlyDistribution(
            DateTime Hour,
            int Count);

Message Tracing

Correlation ID Propagation

Request Flow:
          API Gateway ──▶ Order Service ──▶ Payment Service ──▶ Notification Service
               │               │                   │                      │
               │ correlationId: abc123             │                      │
               │ traceId: xyz789                   │ traceId: xyz789      │ traceId: xyz789
               │ spanId: span1                     │ spanId: span2        │ spanId: span3
               │                                     │                      │
               ▼                                     ▼                      ▼
             All messages carry correlationId + traceId across services

Implementation

// Correlation ID middleware
        public class CorrelationIdMiddleware
        {
            private readonly RequestDelegate _next;
            private const string CorrelationIdHeader = "X-Correlation-ID";
            private const string TraceIdHeader = "X-Trace-ID";
    
            public CorrelationIdMiddleware(RequestDelegate next)
            {
                _next = next;
            }
    
            public async Task InvokeAsync(HttpContext context, ILogger<CorrelationIdMiddleware> logger)
            {
                // Extract or generate correlation ID
                var correlationId = context.Request.Headers[CorrelationIdHeader].FirstOrDefault() 
                    ?? Guid.NewGuid().ToString();
                var traceId = context.Request.Headers[TraceIdHeader].FirstOrDefault() 
                    ?? Guid.NewGuid().ToString();
        
                // Set in current activity
                Activity.Current?.SetTag("correlation.id", correlationId);
                Activity.Current?.SetTag("trace.id", traceId);
        
                // Add to response headers
                context.Response.Headers[CorrelationIdHeader] = correlationId;
                context.Response.Headers[TraceIdHeader] = traceId;
        
                // Set in ambient context
                CorrelationContext.Current = new CorrelationContext
                {
                    CorrelationId = correlationId,
                    TraceId = traceId,
                    Timestamp = DateTimeOffset.UtcNow
                };
        
                await _next(context);
            }
        }

        // MassTransit pipeline behavior for correlation ID propagation
        public class CorrelationIdBehavior : ISendFilter, IPublishFilter, IConsumerFilter
        {
            private readonly ILogger<CorrelationIdBehavior> _logger;
    
            public CorrelationIdBehavior(ILogger<CorrelationIdBehavior> logger)
            {
                _logger = logger;
            }
    
            public Task Send<T>(SendContext<T> context, ISendPipe next) where T : class
            {
                ApplyCorrelationId(context);
                return next.Send(context);
            }
    
            public Task Publish<T>(PublishContext<T> context, IPublishPipe next) where T : class
            {
                ApplyCorrelationId(context);
                return next.Publish(context);
            }
    
            public Task Consume<T>(ConsumeContext<T> context, IConsumerPipe next) where T : class
            {
                ApplyCorrelationId(context);
                return next.Consume(context);
            }
    
            private void ApplyCorrelationId<T>(BaseContext<T> context) where T : class
            {
                var correlationId = CorrelationContext.Current?.CorrelationId ?? Guid.NewGuid().ToString();
                var traceId = CorrelationContext.Current?.TraceId ?? Guid.NewGuid().ToString();
        
                context.Headers.Set("correlation-id", correlationId);
                context.Headers.Set("trace-id", traceId);
        
                _logger.LogDebug("Correlation: {CorrelationId} Trace: {TraceId}", correlationId, traceId);
            }
        }

        // Registration
        x.UsingRabbitMq((context, cfg) =>
        {
            cfg.Host("rabbitmq://localhost");
    
            cfg.UseSendFilter(typeof(CorrelationIdBehavior), context);
            cfg.UsePublishFilter(typeof(CorrelationIdBehavior), context);
            cfg.UseConsumeFilter(typeof(CorrelationIdBehavior), context);
    
            cfg.ConfigureEndpoints(context);
        });

        // Distributed tracing with OpenTelemetry
        builder.Services.AddOpenTelemetry()
            .WithTracing(tracing =>
            {
                tracing.AddSource("MassTransit")
                       .AddAspNetCoreInstrumentation()
                       .AddHttpClientInstrumentation()
                       .AddOtlpExporter(OTLPExporterType.Zipkin);  // или Jaeger
            });

Trace Correlation Query

-- Find all messages in a trace
        SELECT * FROM message_logs
        WHERE correlation_id = 'abc123'
        ORDER BY timestamp;

        -- Result:
        -- | correlation_id | trace_id | service          | event_type      | timestamp           |
        -- | abc123         | xyz789   | api-gateway      | RequestReceived   | 2024-01-15 10:00:00 |
        -- | abc123         | xyz789   | order-service    | OrderCreated      | 2024-01-15 10:00:01 |
        -- | abc123         | xyz789   | payment-service  | PaymentCompleted  | 2024-01-15 10:00:03 |
        -- | abc123         | xyz789   | notification-svc | EmailSent         | 2024-01-15 10:00:05 |

Dashboard и Alerting

Consumer Lag Dashboard

# Grafana dashboard JSON (simplified)
        dashboard:
          title: "Messaging Platform Overview"
          panels:
            - title: "Consumer Lag by Group"
              type: graph
              targets:
                - query: "kafka_consumer_lag{group='order-processors'}"
                - query: "kafka_consumer_lag{group='analytics'}"
              thresholds:
                - value: 1000
                  color: yellow
                - value: 5000
                  color: red
    
            - title: "Message Throughput"
              type: graph
              targets:
                - query: "rate(messaging_messages_sent_total[5m])"
                - query: "rate(messaging_messages_received_total[5m])"
    
            - title: "Queue Depth"
              type: graph
              targets:
                - query: "rabbitmq_queue_messages{queue='orders'}"
                - query: "rabbitmq_queue_messages{queue='notifications'}"
    
            - title: "Error Rate"
              type: stat
              targets:
                - query: "rate(messaging_messages_failed_total[5m])"
    
            - title: "DLQ Messages"
              type: stat
              targets:
                - query: "messaging_messages_dead_lettered_total"

Automatic Scaling Trigger

public class ScalingService
        {
            private readonly IScalingClient _scalingClient;
            private readonly ILogger<ScalingService> _logger;
    
            public async Task TriggerScalingAsync(string consumerGroup)
            {
                _logger.LogInformation("Triggering scaling for consumer group: {Group}", consumerGroup);
        
                // Get current lag
                var lag = await GetConsumerLagAsync(consumerGroup);
        
                // Calculate desired instance count
                var messagesPerInstance = 5000;  // Target messages per instance
                var desiredInstances = (int)Math.Ceiling((double)lag / messagesPerInstance);
                desiredInstances = Math.Max(desiredInstances, 2);  // Minimum 2 instances
                desiredInstances = Math.Min(desiredInstances, 50);  // Maximum 50 instances
        
                // Scale
                await _scalingClient.SetInstanceCountAsync(consumerGroup, desiredInstances);
        
                _logger.LogInformation(
                    "Scaled {Group} from {Current} to {Desired} instances (lag: {Lag})",
                    consumerGroup, await GetCurrentInstanceCount(consumerGroup), desiredInstances, lag);
            }
    
            private Task<long> GetConsumerLagAsync(string group) => Task.FromResult(0L);
            private Task<int> GetCurrentInstanceCount(string group) => Task.FromResult(1);
        }

Operational Runbook

Common Messaging Failure Scenarios

## Runbook: Message Processing Failures

        ### Scenario 1: Consumer Lag Growing

        **Symptoms:**
        - Consumer lag > 10,000 messages
        - Dashboard shows yellow/red threshold

        **Diagnosis:**
        1. Check consumer logs for errors
        2. Verify downstream dependencies (DB, APIs)
        3. Check CPU/memory on consumer hosts

        **Actions:**
        1. Scale consumers horizontally
        2. Increase prefetch count temporarily
        3. Check for slow queries or external API timeouts

        **Escalation:** If lag > 100,000 after scaling → Page on-call

        ### Scenario 2: Messages Stuck in DLQ

        **Symptoms:**
        - DLQ depth increasing
        - Error pattern analysis shows systematic failures

        **Diagnosis:**
        1. Run DLQ analyzer for error patterns
        2. Check sample messages for data issues
        3. Review error logs for root cause

        **Actions:**
        1. Fix root cause (code fix, config change)
        2. Replay healthy messages from DLQ
        3. Purge corrupted messages

        **Replay Command:**

rabbitmqadmin publish routing_key=orders payload_file=replay_messages.json


        ### Scenario 3: Connection Loss to Broker

        **Symptoms:**
        - Producer: "Connection closed" errors
        - Consumer: "Consumer closed channel" errors

        **Diagnosis:**
        1. Check network connectivity
        2. Verify broker health (systemd status)
        3. Check broker logs

        **Actions:**
        1. Restart broker if needed
        2. Verify publisher confirms are re-enabled
        3. Verify consumers re-subscribe

        ### Scenario 4: Outbox Not Publishing

        **Symptoms:**
        - Outbox table growing
        - No messages appearing in broker

        **Diagnosis:**
        1. Check outbox poller service status
        2. Check database connectivity
        3. Check broker connectivity from poller

        **Actions:**
        1. Restart outbox poller
        2. Verify database connection string
        3. Check broker credentials

Checklist

  • [x] Consumer lag dashboard с automatic scaling trigger
  • [x] End-to-end message tracing с distributed correlation IDs
  • [x] Operational runbook для common messaging failure scenarios

Практика


Контрольная точка модуля 12

Проект: Event-driven microservice platform
  • RabbitMQ/Kafka для async inter-service communication
  • Outbox Pattern для reliable event publishing
  • Idempotent consumers с deduplication
  • Dead letter queue handling с retry chain и alerting
  • Consumer lag monitoring и automatic scaling
  • Event contract governance с schema validation
Критерии прохождения:
  • Zero message loss под normal operation (verified через test)
  • Consumer lag < 1000 messages under steady-state load
  • All consumers are idempotent (duplicate processing verified safe)
  • Dead letter queue alerting triggers на systematic failure pattern
  • Event contract changes validated through CI pipeline