12Message Brokers и Event-Driven Architecture
RabbitMQ Fundamentals
Архитектура RabbitMQ
RabbitMQ — open-source message broker, реализующий протокол AMQP 0-9-1. Работает на Erlang Runtime Environment, что обеспечивает высокую производительность и отказоустойчивость.
Основные концепции
| Компонент | Описание |
|---|---|
| Producer | Приложение, отправляющее сообщения |
| Exchange | Маршрутизатор, принимающий сообщения от producer'ов |
| Queue | Буфер хранения сообщений |
| Binding | Связь между Exchange и Queue |
| Consumer | Приложение, получающее сообщения из Queue |
| vhost | Виртуальный хост — изоляция namespace (пользователи, exchanges, queues) |
Producer → Exchange → Binding → Queue → ConsumerExchange Types
Exchange определяет правила маршрутизации сообщений в Queue.
Direct Exchange
Рутит сообщения в Queue на основе точного совпадения routing key.
Exchange (direct) ──routing key: "order.created"──→ Queue: "orders"
Exchange (direct) ──routing key: "order.updated"──→ Queue: "order-updates"
Exchange (direct) ──routing key: "order.created"──→ Queue: "order-analytics" // binding на тот же keyИспользование: Point-to-point коммуникации, когда нужен точный контроль маршрутизации.
C# (RabbitMQ.Client):
var factory = new ConnectionFactory() { HostName = "localhost" };
using var connection = await factory.CreateConnectionAsync();
using var channel = await connection.CreateChannelAsync();
await channel.ExchangeDeclareAsync(
exchange: "orders",
type: ExchangeType.Direct,
durable: true);
await channel.QueueDeclareAsync(queue: "order-queue", durable: true, exclusive: false, autoDelete: false, arguments: null);
await channel.QueueBindAsync(queue: "order-queue", exchange: "orders", routingKey: "order.created");Fanout Exchange
Рассылает сообщения всем привязанным Queue. Игнорирует routing key.
Exchange (fanout) → Queue A
Exchange (fanout) → Queue B
Exchange (fanout) → Queue CИспользование: Broadcast событий, когда все подписчики должны получить каждое сообщение.
await channel.ExchangeDeclareAsync(
exchange: "events",
type: ExchangeType.Fanout,
durable: true);
// Все три queue получат каждое сообщение
await channel.QueueBindAsync(queue: "queue-a", exchange: "events", routingKey: "");
await channel.QueueBindAsync(queue: "queue-b", exchange: "events", routingKey: "");
await channel.QueueBindAsync(queue: "queue-c", exchange: "events", routingKey: "");Topic Exchange
Рутит сообщения на основе паттерна routing key с поддержкой wildcards:
*(star) — заменяет ровно одно слово#(hash) — заменяет ноль или более слов
routing key: "order.created" → matches "order.*"
routing key: "order.created" → matches "order.#"
routing key: "order.created.v1" → matches "order.#"
routing key: "payment.processed" → matches "order.#" // НЕТ, разные префиксы
routing key: "order.created" → matches "*.created"
routing key: "order.updated" → matches "*.created" // НЕТИспользование: Pub/Sub с фильтрацией по темам.
await channel.ExchangeDeclareAsync(
exchange: "topic-exchange",
type: ExchangeType.Topic,
durable: true);
// Получает все события order
await channel.QueueBindAsync(queue: "all-orders", exchange: "topic-exchange", routingKey: "order.#");
// Получает только created и deleted
await channel.QueueBindAsync(queue: "order-lifecycle", exchange: "topic-exchange", routingKey: "order.*");
// Получает только created
await channel.QueueBindAsync(queue: "order-created", exchange: "topic-exchange", routingKey: "order.created");Headers Exchange
Рутит на основе заголовков сообщения, а не routing key. Использует x-match:
all— все заголовки должны совпастьany— хотя бы один заголовок должен совпасть
await channel.ExchangeDeclareAsync(
exchange: "headers-exchange",
type: ExchangeType.Headers,
durable: true);
var args = new Dictionary<string, object>
{
["x-match"] = "all", // все заголовки должны совпасть
["level"] = "critical",
["region"] = "eu-west"
};
await channel.QueueDeclareAsync(queue: "critical-eu", durable: true, exclusive: false, autoDelete: false, arguments: null);
await channel.QueueBindAsync(queue: "critical-eu", exchange: "headers-exchange", arguments: args);Использование: Сложная маршрутизация по метаданным.
Queue Declaration
Параметры Queue
| Параметр | Тип | Описание |
|---|---|---|
durable | bool | Queue сохраняется при перезапуске брокера |
exclusive | bool | Queue видна только текущему подключению |
autoDelete | bool | Queue удаляется, когда последний consumer отключается |
arguments | Dictionary | Дополнительные аргументы (x-message-ttl, x-max-length, x-dead-letter-exchange) |
Types of Queues
Durable Queue — сообщения переживают restart broker
Exclusive Queue — приватна для connection (автоматически durable=false)
Auto-Delete Queue — удаляется при отсутствии consumers
Normal Queue — durable=true, exclusive=false, autoDelete=false// Durable queue — данные сохраняются на диск
await channel.QueueDeclareAsync(
queue: "persistent-orders",
durable: true, // переживает restart
exclusive: false, // доступна всем consumers
autoDelete: false, // не удаляется
arguments: null);
// Queue с ограничением размера (backpressure)
await channel.QueueDeclareAsync(
queue: "bounded-queue",
durable: true,
exclusive: false,
autoDelete: false,
arguments: new Dictionary<string, object>
{
["x-max-length"] = 10000, // макс. 10k сообщений
["x-overflow"] = "reject-publish" // отклонять новые при переполнении
});
// TTL queue — сообщения живут не дольше 5 минут
await channel.QueueDeclareAsync(
queue: "temp-queue",
durable: false,
exclusive: false,
autoDelete: true,
arguments: new Dictionary<string, object>
{
["x-message-ttl"] = 300000 // 5 минут в мс
});Binding Keys и Routing Patterns
Binding key определяет связь между Exchange и Queue.
Direct Exchange:
Binding key = "order.created" → точное совпадение
Topic Exchange:
Binding key = "order.#" → все order.*.*
Binding key = "*.created" → order.created, payment.created
Binding key = "order.*.v*" → order.v1.created, order.v2.created
Binding key = "#" → все сообщенияCommon Routing Patterns
1. Multiple queues, single binding key (Fanout):
// Все queue получат сообщение, routing key игнорируется
await channel.QueueBindAsync("queue-a", "fanout-exchange", "");
await channel.QueueBindAsync("queue-b", "fanout-exchange", "");2. Multiple queues, topic filtering:
// Queue "logs-all" получает всё
await channel.QueueBindAsync("logs-all", "topic-exchange", "#");
// Queue "logs-errors" только ошибки
await channel.QueueBindAsync("logs-errors", "topic-exchange", "*.error");
// Queue "logs-warnings" только warnings и выше
await channel.QueueBindAsync("logs-warnings", "topic-exchange", "*.warning");
await channel.QueueBindAsync("logs-warnings", "topic-exchange", "*.error");3. Single queue, multiple binding keys (Direct):
// Одна queue, несколько routing key
await channel.QueueBindAsync("orders", "direct-exchange", "order.created");
await channel.QueueBindAsync("orders", "direct-exchange", "order.updated");
await channel.QueueBindAsync("orders", "direct-exchange", "order.deleted");Message Properties
BasicProperties
var properties = new BasicProperties
{
DeliveryMode = DeliveryModes.Persistent, // 2 = persistent, 1 = transient
ContentType = "application/json",
CorrelationId = Guid.NewGuid().ToString(),
MessageId = Guid.NewGuid().ToString(),
Timestamp = new AmqpTimestamp(DateTimeOffset.UtcNow.ToUnixTimeSeconds()),
Type = "OrderCreatedEvent",
Priority = 5, // 0-9, 9 = highest
Headers = new Dictionary<string, object>
{
["source"] = "order-service",
["environment"] = "production"
},
Expiration = "300000", // 5 минут TTL на уровне сообщения
ReplyTo = "reply-queue" // для Request/Reply паттерна
};Delivery Mode
| Mode | Значение | Поведение |
|---|---|---|
| Transient | 1 или DeliveryModes.Transient | Не сохраняется на диск, теряется при restart |
| Persistent | 2 или DeliveryModes.Persistent | Сохраняется на диск, переживает restart |
Важно: Для persistent сообщений нужно:
- Exchange
durable=true - Queue
durable=true - Message
DeliveryMode.Persistent
// Полный pipeline durability
await channel.ExchangeDeclareAsync("orders", ExchangeType.Direct, durable: true);
await channel.QueueDeclareAsync("order-queue", durable: true, exclusive: false, autoDelete: false, arguments: null);
await channel.QueueBindAsync("order-queue", "orders", "order.created");
var props = new BasicProperties
{
DeliveryMode = DeliveryModes.Persistent // Ключевое свойство
};
var body = Encoding.UTF8.GetBytes(jsonMessage);
await channel.BasicPublishAsync(
exchange: "orders",
routingKey: "order.created",
mandatory: true, // вернуть, если нет подходящей queue
basicProperties: props,
body: body);Consumer Prefetch (QoS)
Basic.QoS
Определяет максимальное количество не-подтверждённых сообщений на consumer.
// 10 сообщений в flight, только после ack следующего придёт новое
await channel.BasicQosAsync(
prefetchSize: 0, // 0 = no size limit
prefetchCount: 10, // max 10 unacked messages
global: false); // false = per-consumer, true = per-channelPrefetch и Throughput
| Prefetch | Throughput | Memory | Use Case |
|---|---|---|---|
| 1 | Низкий | Минимальный | Сложная обработка, гарантия порядка |
| 10-50 | Средний | Умеренный | Баланс производительности и памяти |
| 100+ | Высокий | Высокий | Простая обработка, batch operations |
// Высоконагруженный consumer
await channel.BasicQosAsync(prefetchSize: 0, prefetchCount: 100, global: false);
// Точный контроль — по одному сообщению
await channel.BasicQosAsync(prefetchSize: 0, prefetchCount: 1, global: false);Consumer Lifecycle
var consumer = new AsyncEventingBasicConsumer(channel);
consumer.ReceivedAsync += async (model, ea) =>
{
try
{
var body = ea.Body.ToArray();
var message = Encoding.UTF8.GetString(body);
// Обработка сообщения
await ProcessMessageAsync(message);
// Подтверждение — сообщение удалено из queue
await channel.BasicAckAsync(deliveryTag: ea.DeliveryTag, multiple: false);
}
catch (Exception ex)
{
// Nack с requeue=false — сообщение уходит в DLQ
await channel.BasicNackAsync(deliveryTag: ea.DeliveryTag, multiple: false, requeue: false);
// Nack с requeue=true — возвращаем в queue (можно использовать для retry)
// await channel.BasicNackAsync(deliveryTag: ea.DeliveryTag, multiple: false, requeue: true);
Logger.LogError(ex, "Error processing message");
}
};
// Start consuming
await channel.BasicConsumeAsync(queue: "order-queue", autoAck: false, consumer: consumer);Практика
Topology
+---------------------------------------------+
| RabbitMQ Broker |
| |
Order Service --> | exchange: "orders" (direct) |
| | |
| +--[order.created]--> queue: "orders" |
| | consumer 1|
| | |
| +--[order.created]--> queue: "order-events"|
| fanout |
| +-------------------------+|
| | |
| queue: "notifications" queue: "analytics"
| consumer consumer
+---------------------------------------------+Реализация
// 1. Order Processing Topology Setup
public class OrderTopology
{
public static async Task SetupAsync(IChannel channel)
{
// Main orders exchange
await channel.ExchangeDeclareAsync("orders", ExchangeType.Direct, durable: true);
// Fanout exchange for order events
await channel.ExchangeDeclareAsync("order-events", ExchangeType.Fanout, durable: true);
// Queues
await channel.QueueDeclareAsync("orders", durable: true, exclusive: false, autoDelete: false, arguments: null);
await channel.QueueDeclareAsync("order-events", durable: true, exclusive: false, autoDelete: false, arguments: null);
await channel.QueueDeclareAsync("notifications", durable: true, exclusive: false, autoDelete: false, arguments: null);
await channel.QueueDeclareAsync("analytics", durable: true, exclusive: false, autoDelete: false, arguments: null);
// Bindings
await channel.QueueBindAsync("orders", "orders", "order.created");
await channel.QueueBindAsync("orders", "orders", "order.updated");
await channel.QueueBindAsync("orders", "orders", "order.deleted");
await channel.QueueBindAsync("order-events", "order-events", "");
await channel.QueueBindAsync("notifications", "order-events", "");
await channel.QueueBindAsync("analytics", "order-events", "");
}
}
// 2. Reliable Producer с Confirm Channel
public class OrderProducer
{
private readonly IChannel _channel;
public OrderProducer(IChannel channel)
{
_channel = channel;
}
public async Task PublishOrderCreatedAsync(OrderCreatedEvent order)
{
// Enable publisher confirms
await _channel.ConfirmSelectAsync();
var props = new BasicProperties
{
DeliveryMode = DeliveryModes.Persistent,
ContentType = "application/json",
Type = nameof(OrderCreatedEvent),
MessageId = Guid.NewGuid().ToString(),
Timestamp = new AmqpTimestamp(DateTimeOffset.UtcNow.ToUnixTimeSeconds())
};
var body = Encoding.UTF8.GetBytes(JsonSerializer.Serialize(order));
try
{
await _channel.BasicPublishAsync(
exchange: "orders",
routingKey: "order.created",
mandatory: true,
basicProperties: props,
body: body);
// Ждём confirm от broker
if (await _channel.WaitForConfirmsAsync(TimeSpan.FromSeconds(5)))
{
Logger.LogInformation("Message confirmed: {MessageId}", props.MessageId);
}
else
{
throw new TimeoutException("Publisher confirm timeout");
}
}
catch (PublicationException ex)
{
Logger.LogError(ex, "Message publication failed: {MessageId}", props.MessageId);
throw;
}
}
}
// 3. Consumer с Manual Ack и Retry Logic
public class OrderConsumer
{
private readonly IChannel _channel;
private const int MaxRetries = 3;
public OrderConsumer(IChannel channel)
{
_channel = channel;
}
public async Task StartAsync(CancellationToken ct)
{
// QoS: 10 messages in flight per consumer
await _channel.BasicQosAsync(prefetchSize: 0, prefetchCount: 10, global: false);
var consumer = new AsyncEventingBasicConsumer(_channel);
consumer.ReceivedAsync += async (model, ea) =>
{
var body = ea.Body.ToArray();
var message = Encoding.UTF8.GetString(body);
var messageCount = GetRetryCount(ea);
try
{
await ProcessOrderAsync(message);
await _channel.BasicAckAsync(deliveryTag: ea.DeliveryTag, multiple: false);
}
catch (TransientException ex) when (messageCount < MaxRetries)
{
// Retry с exponential backoff
var backoff = TimeSpan.FromSeconds(Math.Pow(2, messageCount));
Logger.LogWarning(ex, "Transient error, retry {Count}/{Max}. Backoff: {Backoff}",
messageCount + 1, MaxRetries, backoff);
await Task.Delay(backoff, ct);
// Requeue для retry
await _channel.BasicNackAsync(deliveryTag: ea.DeliveryTag, multiple: false, requeue: true);
}
catch (Exception ex)
{
// Unrecoverable error -> DLQ
await _channel.BasicNackAsync(deliveryTag: ea.DeliveryTag, multiple: false, requeue: false);
Logger.LogError(ex, "Message sent to DLQ: {MessageId}", ea.BasicProperties.MessageId);
}
};
await _channel.BasicConsumeAsync("orders", autoAck: false, consumer: consumer);
}
private int GetRetryCount(IBasicProperties props)
{
if (props.Headers is Dictionary<string, object> headers &&
headers.TryGetValue("x-retry-count", out var countObj) &&
countObj is int count)
{
return count;
}
return 0;
}
private Task ProcessOrderAsync(string message) => Task.CompletedTask;
}Order DTO
public record OrderCreatedEvent(
Guid OrderId,
Guid CustomerId,
decimal Total,
string Currency,
IReadOnlyList<OrderItem> Items,
DateTimeOffset CreatedAt);
public record OrderItem(
Guid ProductId,
string Name,
int Quantity,
decimal Price);Проверка topology
// Проверка через RabbitMQ Management API
// GET http://localhost:15672/api/queues/%2F/orders
// GET http://localhost:15672/api/exchanges/%2F/ordersСводная таблица Exchange Types
| Exchange | Routing | Use Case |
|---|---|---|
| Direct | Точное совпадение routing key | Point-to-point, точная маршрутизация |
| Fanout | Broadcast всем Queue | Notifications, broadcast |
| Topic | Pattern matching (*, #) | Pub/Sub с фильтрацией |
| Headers | Match по заголовкам | Сложная маршрутизация по метаданным |
| Default | Direct (если тип не указан) | Backward compatibility |
| X-Ray, Consistent Hash, Modbus | Plugin-based | Специализированные сценарии |
Checklist для Order Processing
- [x] Exchange types: Direct для routing, Fanout для broadcast
- [x] Queue declaration: durable=true для production
- [x] Binding keys: точные для direct, patterns для topic
- [x] Message properties: DeliveryMode.Persistent, ContentType, MessageId
- [x] Consumer prefetch: QoS 10 для балансировки throughput/memory
- [x] Reliable producer: confirm channel + WaitForConfirmsAsync
- [x] Consumer: manual ack/nack + retry logic с exponential backoff
Message Patterns
Point-to-Point (Queue-Based)
Каждое сообщение доставляется одному consumer'у из очереди.
Producer → Queue → Consumer A (получает сообщение 1)
→ Consumer B (получает сообщение 2)
→ Consumer C (получает сообщение 3)Characteristics
- Delivery: Одно сообщение — один consumer
- Load balancing: Broker автоматически round-robin между consumers
- Ordering: Порядок не гарантируется при multiple consumers
- Use case: Task distribution, job processing
Implementation
// Producer
await channel.BasicPublishAsync(
exchange: "",
routingKey: "task-queue",
mandatory: true,
basicProperties: new BasicProperties { DeliveryMode = DeliveryModes.Persistent },
body: Encoding.UTF8.GetBytes(taskJson));
// Multiple consumers — broker распределяет
var consumer1 = new AsyncEventingBasicConsumer(channel);
consumer1.ReceivedAsync += HandleTask1;
await channel.BasicConsumeAsync("task-queue", false, consumer1);
var consumer2 = new AsyncEventingBasicConsumer(channel);
consumer2.ReceivedAsync += HandleTask2;
await channel.BasicConsumeAsync("task-queue", false, consumer2);
// Каждое сообщение получит только один consumerPub/Sub (Topic-Based)
Сообщение доставляется всем подписанным Queue.
Producer → Topic Exchange → Queue A (subscriber 1)
→ Queue B (subscriber 2)
→ Queue C (subscriber 3)Characteristics
- Delivery: Одно сообщение — все Queue
- Isolation: Каждый consumer работает с полной копией данных
- Decoupling: Producer не знает о consumers
- Use case: Event broadcasting, notifications, audit logging
Implementation
// Fanout — всем подписчикам
await channel.ExchangeDeclareAsync("events", ExchangeType.Fanout, durable: true);
await channel.QueueBindAsync("notifications", "events", "");
await channel.QueueBindAsync("audit-log", "events", "");
await channel.QueueBindAsync("analytics", "events", "");
// Topic — фильтрация по routing key
await channel.ExchangeDeclareAsync("topic-events", ExchangeType.Topic, durable: true);
// subscriber 1: только order events
await channel.QueueBindAsync("order-sub", "topic-events", "order.#");
// subscriber 2: только payment events
await channel.QueueBindAsync("payment-sub", "topic-events", "payment.#");
// subscriber 3: всё
await channel.QueueBindAsync("all-sub", "topic-events", "#");Request/Reply (Correlation ID Pattern)
Асинхронная замена RPC. Producer отправляет request и получает response в отдельную queue.
Service A RabbitMQ Service B
| | |
|-- Request ----------------> | reply-queue |
| correlationId: abc123 | -----------------------> |
| replyTo: reply-queue | |
| | |
|<-- Response --------------- | <---------------------- |
| correlationId: abc123 | |Implementation
public class RequestReplyClient
{
private readonly IChannel _channel;
private readonly TaskCompletionSource<ReplyMessage> _tcs = new();
private string _correlationId;
private IDisposable? _subscription;
public RequestReplyClient(IChannel channel)
{
_channel = channel;
}
public async Task<ReplyMessage> SendRequestAsync(RequestMessage request, TimeSpan timeout)
{
// Reply queue — exclusive для этого клиента
var queueName = $"rr-{Guid.NewGuid()}";
await channel.QueueDeclareAsync(queueName, durable: false, exclusive: true, autoDelete: true, arguments: null);
_correlationId = Guid.NewGuid().ToString();
var props = new BasicProperties
{
CorrelationId = _correlationId,
ReplyTo = queueName,
ContentType = "application/json",
MessageId = Guid.NewGuid().ToString()
};
// Слушаем reply queue
_tcs = new TaskCompletionSource<ReplyMessage>();
var consumer = new AsyncEventingBasicConsumer(_channel);
consumer.ReceivedAsync += async (model, ea) =>
{
if (ea.BasicProperties.CorrelationId == _correlationId)
{
var response = JsonSerializer.Deserialize<ReplyMessage>(ea.Body.ToArray());
await channel.BasicAckAsync(deliveryTag: ea.DeliveryTag, multiple: false);
_tcs.TrySetResult(response!);
}
};
await _channel.BasicConsumeAsync(queueName, false, consumer);
// Отправляем request
var requestProps = new BasicProperties
{
CorrelationId = _correlationId,
ReplyTo = queueName,
ContentType = "application/json"
};
await _channel.BasicPublishAsync(
exchange: "rpc-exchange",
routingKey: "process",
mandatory: true,
basicProperties: requestProps,
body: Encoding.UTF8.GetBytes(JsonSerializer.Serialize(request)));
// Ждём response с таймаутом
var completed = await Task.WhenAny(_tcs.Task, Task.Delay(timeout));
if (completed != _tcs.Task)
{
throw new TimeoutException($"Request {_correlationId} timed out");
}
return await _tcs.Task;
}
}
public class RequestReplyServer
{
private readonly IChannel _channel;
public RequestReplyServer(IChannel channel)
{
_channel = channel;
}
public async Task StartAsync()
{
await channel.BasicQosAsync(prefetchSize: 0, prefetchCount: 1, global: false);
var consumer = new AsyncEventingBasicConsumer(_channel);
consumer.ReceivedAsync += async (model, ea) =>
{
var request = JsonSerializer.Deserialize<RequestMessage>(ea.Body.ToArray());
// Process
var response = await ProcessRequestAsync(request!);
// Reply
var replyProps = new BasicProperties
{
CorrelationId = ea.BasicProperties.CorrelationId,
ContentType = "application/json"
};
await _channel.BasicPublishAsync(
exchange: "",
routingKey: ea.BasicProperties.ReplyTo,
mandatory: true,
basicProperties: replyProps,
body: Encoding.UTF8.GetBytes(JsonSerializer.Serialize(response)));
await _channel.BasicAckAsync(deliveryTag: ea.DeliveryTag, multiple: false);
};
await _channel.BasicConsumeAsync("rpc-queue", false, consumer);
}
private Task<ReplyMessage> ProcessRequestAsync(RequestMessage request) =>
Task.FromResult(new ReplyMessage { Result = "processed" });
}
public record RequestMessage(string Action, string Payload);
public record ReplyMessage(string Result, string? Error = null);Competing Consumers
Multiple consumers читают из одной queue, каждый получает уникальные сообщения.
Producer → Queue ──→ Consumer A (получает msg 1, 4, 7...)
──→ Consumer B (получает msg 2, 5, 8...)
──→ Consumer C (получает msg 3, 6, 9...)Characteristics
- Scaling: Добавляем consumers → выше throughput
- Load balancing: Broker автоматически распределяет
- Ordering: Порядок нарушается при multiple consumers
- Fault tolerance: Если consumer падает, broker redelivers
Implementation
public class CompetingConsumerPool
{
private readonly IChannel _channel;
private const int WorkerCount = 4;
private readonly List<Task> _workers = new();
public CompetingConsumerPool(IChannel channel)
{
_channel = channel;
}
public async Task StartAsync(CancellationToken ct)
{
// QoS для балансировки
await channel.BasicQosAsync(prefetchSize: 0, prefetchCount: 10, global: false);
for (int i = 0; i < WorkerCount; i++)
{
var workerId = i;
var consumer = new AsyncEventingBasicConsumer(_channel);
consumer.ReceivedAsync += async (model, ea) =>
{
try
{
var message = Encoding.UTF8.GetString(ea.Body.ToArray());
Logger.LogInformation("Worker {WorkerId} processing message {MsgId}",
workerId, ea.BasicProperties.MessageId);
await ProcessMessageAsync(workerId, message);
await _channel.BasicAckAsync(deliveryTag: ea.DeliveryTag, multiple: false);
}
catch (Exception ex)
{
await _channel.BasicNackAsync(deliveryTag: ea.DeliveryTag, multiple: false, requeue: false);
Logger.LogError(ex, "Worker {WorkerId} failed", workerId);
}
};
await _channel.BasicConsumeAsync("task-queue", false, consumer);
Logger.LogInformation("Worker {WorkerId} started", workerId);
}
}
private Task ProcessMessageAsync(int workerId, string message) => Task.CompletedTask;
}Scaling Considerations
| Consumers | Throughput | Latency | Memory | Notes |
|---|---|---|---|---|
| 1 | Low | Low | Low | Simple, ordered |
| N = CPU cores | High | Low | Medium | Optimal for CPU-bound |
| N >> CPU cores | Very High | Higher | High | Diminishing returns |
Dead Letter Exchange (DLQ)
Необрабатываемые сообщения направляются в отдельный exchange для анализа.
Причины попадания в DLQ
- Message TTL истёк в queue
- Queue достигла max length
- Consumer явно nack'нул с
requeue=false
retry queue chain с exponential backoff
[orders] ──(fail)──→ [orders.retry.1] ──(fail)──→ [orders.retry.2] ──(fail)──→ [orders.dlq]
↑ | |
|______________ TTL 5s _______________ TTL 10s ___|
TTL 20sImplementation
public class DltTopology
{
public static async Task SetupAsync(IChannel channel)
{
// Main exchanges
await channel.ExchangeDeclareAsync("orders", ExchangeType.Direct, durable: true);
await channel.ExchangeDeclareAsync("orders-dlx", ExchangeType.Direct, durable: true);
// Retry exchanges
await channel.ExchangeDeclareAsync("orders.retry.1", ExchangeType.Direct, durable: true);
await channel.ExchangeDeclareAsync("orders.retry.2", ExchangeType.Direct, durable: true);
// Retry queues
await channel.QueueDeclareAsync("orders.retry.1", durable: true, exclusive: false, autoDelete: false, arguments: new Dictionary<string, object>
{
["x-message-ttl"] = 5000, // 5 seconds
["x-dead-letter-exchange"] = "orders.retry.2",
["x-dead-letter-routing-key"] = "orders.retry.2"
});
await channel.QueueDeclareAsync("orders.retry.2", durable: true, exclusive: false, autoDelete: false, arguments: new Dictionary<string, object>
{
["x-message-ttl"] = 10000, // 10 seconds
["x-dead-letter-exchange"] = "orders.retry.3",
["x-dead-letter-routing-key"] = "orders.retry.3"
});
await channel.QueueDeclareAsync("orders.retry.3", durable: true, exclusive: false, autoDelete: false, arguments: new Dictionary<string, object>
{
["x-message-ttl"] = 20000, // 20 seconds
["x-dead-letter-exchange"] = "orders-dlx",
["x-dead-letter-routing-key"] = "orders.dlq"
});
// Main queue с DLX
await channel.QueueDeclareAsync("orders", durable: true, exclusive: false, autoDelete: false, arguments: new Dictionary<string, object>
{
["x-dead-letter-exchange"] = "orders-dlx",
["x-dead-letter-routing-key"] = "orders.dlq"
});
// DLQ
await channel.QueueDeclareAsync("orders.dlq", durable: true, exclusive: false, autoDelete: false, arguments: null);
// Bindings
await channel.QueueBindAsync("orders", "orders", "order.created");
await channel.QueueBindAsync("orders.retry.1", "orders.retry.1", "orders.retry.1");
await channel.QueueBindAsync("orders.retry.2", "orders.retry.2", "orders.retry.2");
await channel.QueueBindAsync("orders.retry.3", "orders.retry.3", "orders.retry.3");
await channel.QueueBindAsync("orders.dlq", "orders-dlx", "orders.dlq");
}
}
// Consumer с retry logic
public class RetryConsumer
{
private const int MaxRetries = 3;
public async Task StartAsync(IChannel channel, CancellationToken ct)
{
await channel.BasicQosAsync(prefetchSize: 0, prefetchCount: 10, global: false);
var consumer = new AsyncEventingBasicConsumer(channel);
consumer.ReceivedAsync += async (model, ea) =>
{
var retryCount = GetRetryCount(ea.BasicProperties);
try
{
await ProcessMessageAsync(Encoding.UTF8.GetString(ea.Body.ToArray()));
await channel.BasicAckAsync(deliveryTag: ea.DeliveryTag, multiple: false);
}
catch (Exception)
{
if (retryCount < MaxRetries)
{
// Nack с requeue=true — сообщение вернётся в queue и попадёт в retry chain
// через TTL + DLX механизм
await channel.BasicNackAsync(deliveryTag: ea.DeliveryTag, multiple: false, requeue: true);
}
else
{
// Max retries exceeded — отправляем в DLQ
await channel.BasicNackAsync(deliveryTag: ea.DeliveryTag, multiple: false, requeue: false);
}
}
};
await channel.BasicConsumeAsync("orders", false, consumer);
}
private int GetRetryCount(IBasicProperties props)
{
if (props.Headers is Dictionary<string, object> headers &&
headers.TryGetValue("x-retry-count", out var count))
{
return (int)count;
}
return 0;
}
private Task ProcessMessageAsync(string message) => Task.CompletedTask;
}Сводная таблица Patterns
| Pattern | Delivery | Ordering | Use Case |
|---|---|---|---|
| Point-to-Point | 1:1 | Нет (round-robin) | Task distribution |
| Pub/Sub | 1:N | Нет | Event broadcasting |
| Request/Reply | 1:1 async | Да (correlationId) | Async RPC |
| Competing Consumers | 1:N (exclusive) | Нет | Horizontal scaling |
| DLQ | N:1 (failed) | Нет | Error handling |
Checklist
- [x] Point-to-point: queue-based, single consumer per message
- [x] Pub/Sub: topic-based, multiple subscribers
- [x] Request/Reply: correlationId + replyTo pattern
- [x] Competing Consumers: parallel processing с load balancing
- [x] Dead Letter Exchange: retry queue chain с exponential backoff (TTL + DLX)
Практика
MassTransit / RabbitMQ.Client
MassTransit Overview
MassTransit — open-source message bus framework для .NET, абстрагирующий работу с message broker (RabbitMQ, Azure Service Bus, Kafka, AWS SNS/SQS).
Application
↓
MassTransit Bus
├── IBus (publish/send)
├── Consumer<TMessage> (receive)
├── Saga<TState> (long-running workflows)
└── Pipeline (validation, routing, error handling)
↓
RabbitMQ / Azure Service Bus / KafkaMassTransit vs RabbitMQ.Client
| Feature | RabbitMQ.Client | MassTransit |
|---|---|---|
| Abstraction | Low-level AMQP | High-level messaging |
| Serialization | Manual | Automatic (JSON, MessagePack, Protobuf) |
| Message contracts | Plain objects | Interfaces/classes with conventions |
| Error handling | Manual | Built-in retry, fault, dead letter |
| Sagas | Not supported | Full state machine support |
| Pipeline | Manual | Configurable pipeline steps |
| Learning curve | Steep | Moderate |
| Performance | Slightly higher | Minimal overhead |
| Use case | Fine-grained control | Enterprise messaging |
MassTransit Abstractions
IBus
Точка отправки сообщений.
// DI registration
builder.Services.AddMassTransit(x =>
{
x.UsingRabbitMq((context, cfg) =>
{
cfg.Host("rabbitmq://localhost", host =>
{
host.Username("guest");
host.Password("guest");
});
cfg.ConfigureEndpoints(context);
});
});IPublishEndpoint vs ISendEndpoint
| Method | Scope | Exchange |
|---|---|---|
IPublishEndpoint.Publish<T> | All consumers matching routing | Topic exchange |
IBus.Send<T> | Single endpoint | Direct exchange |
ISendEndpoint.Send<T> | Specific endpoint | Direct exchange |
public class OrderService
{
private readonly IBus _bus;
public OrderService(IBus bus)
{
_bus = bus;
}
// Publish — всем подписчикам (topic-based)
public async Task CreateOrderAsync(CreateOrderCommand command)
{
var message = new OrderCreated
{
OrderId = Guid.NewGuid(),
CustomerId = command.CustomerId,
Total = command.Total,
Timestamp = DateTime.UtcNow
};
await _bus.Publish(message);
}
// Send — одному получателю (direct)
public async Task NotifyAsync(Guid orderId)
{
var message = new OrderNotificationRequested
{
OrderId = orderId,
Template = "order-confirmed"
};
await _bus.Send(message, ctx =>
{
ctx.ReceiveEndpoint("notification-service");
});
}
}Message Contracts
Interfaces vs Classes
Interfaces — recommended для контрактов (меньше зависимостей).
// Recommended: interface-based contract
public interface IOrderCreated
{
Guid OrderId { get; }
Guid CustomerId { get; }
decimal Total { get; }
DateTimeOffset Timestamp { get; }
}
// Consumer
public class OrderCreatedConsumer : IConsumer<IOrderCreated>
{
public async Task Consume(ConsumeContext<IOrderCreated> context)
{
var message = context.Message;
Logger.LogInformation("Order {OrderId} created", message.OrderId);
}
}
// Classes — когда нужна сложная структура
public class OrderCreated
{
public Guid OrderId { get; set; }
public Guid CustomerId { get; set; }
public decimal Total { get; set; }
public DateTimeOffset Timestamp { get; set; }
}Message Versioning
// V1
public interface IOrderCreatedV1
{
Guid OrderId { get; }
decimal Total { get; }
}
// V2 (backward compatible — добавляем поле)
public interface IOrderCreatedV2 : IOrderCreatedV1
{
string Currency { get; }
IReadOnlyList<OrderItemV2> Items { get; }
}
// V3 (breaking change — новый тип, новая interface)
public interface IOrderCreatedV3 : IOrderCreatedV2
{
string Channel { get; } // "web", "mobile", "api"
}Inheritance Handling
// Base event
public interface IOrderEvent
{
Guid OrderId { get; }
DateTimeOffset CreatedAt { get; }
}
// Derived events
public interface IOrderCreated : IOrderEvent { }
public interface IOrderUpdated : IOrderEvent { string Changes { get; } }
public interface IOrderShipped : IOrderEvent { string TrackingNumber { get; } }
// Consumer может подписаться на base
public class OrderEventConsumer : IConsumer<IOrderEvent>
{
public async Task Consume(ConsumeContext<IOrderEvent> context)
{
// context.Message type determines actual event
switch (context.Message)
{
case IOrderCreated created:
await HandleCreated(created);
break;
case IOrderUpdated updated:
await HandleUpdated(updated);
break;
case IOrderShipped shipped:
await HandleShipped(shipped);
break;
}
}
private Task HandleCreated(IOrderCreated message) => Task.CompletedTask;
private Task HandleUpdated(IOrderUpdated message) => Task.CompletedTask;
private Task HandleShipped(IOrderShipped message) => Task.CompletedTask;
}Consumer Configuration
Concurrency
x.UsingRabbitMq((context, cfg) =>
{
cfg.Host("rabbitmq://localhost");
cfg.ConfigureConsumer<OrderCreatedConsumer>(context, config =>
{
// Concurrency limits
config.UseConcurrencyLimit(10); // max 10 parallel messages
config.UseMessageRetry(retryConfig =>
{
retryConfig.Intervals(
TimeSpan.FromSeconds(5),
TimeSpan.FromSeconds(30),
TimeSpan.FromMinutes(1));
});
config.UseScheduledRetrySender();
config.UseScheduledDelayedRetrySender();
});
});Retry Policies
// In consumer configuration
config.UseMessageRetry(retryConfig =>
{
// Fixed intervals
retryConfig.Intervals(
TimeSpan.FromSeconds(5),
TimeSpan.FromSeconds(10),
TimeSpan.FromSeconds(30));
});
// Exponential backoff
config.UseMessageRetry(retryConfig =>
{
retryConfig.Exponential(
maxRetries: 5,
min: TimeSpan.FromSeconds(1),
max: TimeSpan.FromMinutes(5),
delta: TimeSpan.FromSeconds(10));
});
// Circuit breaker
config.UseCircuitBreaker(cbConfig =>
{
cbConfig.TrackingInterval(TimeSpan.FromMinutes(1));
cbConfig.OpenThreshold(5, TimeSpan.FromMinutes(1)); // 5 errors in 1 min → open
cbConfig.ClosedThreshold(3, TimeSpan.FromMinutes(1)); // 3 success → closed
cbConfig.ReopenedThreshold(3, TimeSpan.FromMinutes(5)); // 3 errors in 5 min → reopen
});Error Handling Pipeline
x.UsingRabbitMq((context, cfg) =>
{
cfg.Host("rabbitmq://localhost");
// Global error handling
cfg.UseSendRetry(context, retryConfig =>
{
retryConfig.Exponential(3, TimeSpan.FromSeconds(1), TimeSpan.FromSeconds(30));
});
cfg.UsePublishRetry(context, retryConfig =>
{
retryConfig.Incremental(3, TimeSpan.FromSeconds(1), TimeSpan.FromSeconds(5));
});
// Dead letter queue
cfg.UseDeadLetterQueue(context, dlq =>
{
dlq.Retry(interval, count => interval.Increment(count * 1000));
});
cfg.ConfigureEndpoints(context);
});Saga State Machine
Long-running workflow orchestration.
public class OrderSaga :
MassTransitStateMachine<OrderSagaState>,
StateMachineObserver
{
public State Ordering { get; set; } = null!;
public State Paid { get; set; } = null!;
public State Shipped { get; set; } = null!;
public State Completed { get; set; } = null!;
public State Cancelled { get; set; } = null!;
public Event<IOrderCreated> OrderCreated { get; set; } = null!;
public Event<IPaymentCompleted> PaymentCompleted { get; set; } = null!;
public Event<IPaymentFailed> PaymentFailed { get; set; } = null!;
public Event<IOrderShipped> OrderShipped { get; set; } = null!;
public Event<IOrderCancelled> OrderCancelled { get; set; } = null!;
public OrderSaga()
{
InstanceState(x => x.CurrentState);
Event(() => OrderCreated, e => e.CorrelateById(ctx => ctx.Message.OrderId));
Event(() => PaymentCompleted, e => e.CorrelateById(ctx => ctx.Message.OrderId));
Event(() => PaymentFailed, e => e.CorrelateById(ctx => ctx.Message.OrderId));
Event(() => OrderShipped, e => e.CorrelateById(ctx => ctx.Message.OrderId));
Event(() => OrderCancelled, e => e.CorrelateById(ctx => ctx.Message.OrderId));
Initially(
When(OrderCreated)
.Then(context => context.SagaData.OrderId = context.Message.OrderId)
.Then(context => context.SagaData.CustomerId = context.Message.CustomerId)
.Then(context => context.SagaData.Total = context.Message.Total)
.TransitionTo(Ordering)
.Publish(context => new PaymentRequested
{
OrderId = context.SagaData.OrderId,
Amount = context.SagaData.Total,
Currency = "USD"
}));
When(PaymentCompleted)
.Then(context => context.SagaData.PaidAt = DateTime.UtcNow)
.TransitionTo(Paid)
.Publish(context => new OrderPrepared
{
OrderId = context.SagaData.OrderId
});
When(PaymentFailed)
.Then(context => context.SagaData.FailedAt = DateTime.UtcNow)
.TransitionTo(Cancelled)
.Publish(context => new OrderCancelled
{
OrderId = context.SagaData.OrderId,
Reason = "Payment failed"
});
When(OrderShipped)
.Then(context => context.SagaData.ShippedAt = DateTime.UtcNow)
.TransitionTo(Shipped)
.TransitionTo(Completed);
When(OrderCancelled)
.TransitionTo(Cancelled);
// Timeout: if not paid within 15 minutes, cancel
Event<Timeout<IOrderTimeout>, _>(e => e.CorrelateById(ctx => ctx.Message.OrderId));
During(Ordering,
When(TimeoutExpired)
.Timeout(ctx => ctx.Message.OrderId, TimeSpan.FromMinutes(15))
.Then(context => context.SagaData.CancelledAt = DateTime.UtcNow)
.TransitionTo(Cancelled));
// Prevent direct transitions
Prevent<IPaymentFailed>(Ordering);
Prevent<IOrderShipped>(Ordering);
Prevent<IOrderShipped>(Paid);
// Finalize
DuringAny(
When(OrderCancelled)
.Finalize());
// Saga persistence
Event(() => OrderCreated, e => e.UseEntityId());
}
}
public class OrderSagaState :
SagaStateMachineInstance
{
public Guid CorrelationId { get; set; }
public string CurrentState { get; set; } = null!;
public Guid OrderId { get; set; }
public Guid CustomerId { get; set; }
public decimal Total { get; set; }
public DateTime? PaidAt { get; set; }
public DateTime? ShippedAt { get; set; }
public DateTime? CancelledAt { get; set; }
public DateTime? FailedAt { get; set; }
}Saga Persistence
// In-memory (testing)
x.AddInMemorySagaRepository<OrderSagaState>();
// Entity Framework Core (production)
x.AddEntityFrameworkStateMachineRepository<OrderSagaState>();
// Redis
x.AddRedisSagaRepository<OrderSagaState>("localhost:6379");
// PostgreSQL
x.AddNpgsqlSagaRepository<OrderSagaState>(connectionString);In-memory vs RabbitMQ Transport
// In-memory — для unit tests
x.UsingInMemory((context, cfg) =>
{
cfg.ConfigureConsumer<OrderConsumer>(context);
cfg.Publish<OrderCreated>(pub => pub.ConfigureConsumer<OrderCreatedConsumer>(context));
});
// RabbitMQ — для production
x.UsingRabbitMq((context, cfg) =>
{
cfg.Host("rabbitmq://localhost", host =>
{
host.Username("guest");
host.Password("guest");
// Virtual host
host.VHost("/");
});
cfg.ConfigureConsumer<OrderConsumer>(context);
cfg.Publish<OrderCreated>(pub => pub.ConfigureConsumer<OrderCreatedConsumer>(context));
// RabbitMQ-specific settings
cfg.ConfigurePartition<OrderCreated>(context, partitionConfig =>
{
partitionConfig.PartitionKey = ctx => ctx.Message.OrderId.ToString();
});
cfg.UseMessageRetry<OrderCreated>(retryConfig =>
{
retryConfig.Exponential(3, TimeSpan.FromSeconds(1), TimeSpan.FromMinutes(1));
});
});Message Pipeline
Pipeline Steps
Outgoing Message:
→ Validation → Enrichment → Serialization → Routing → Broker
Incoming Message:
→ Broker → Deserialization → Routing → Validation → ConsumerCustom Pipeline Steps
// Validation step
public class ValidationStep : IConsumeFilter<IOrderCreated, IOrderCreated>
{
private readonly ILogger<ValidationStep> _logger;
public ValidationStep(ILogger<ValidationStep> logger)
{
_logger = logger;
}
public async Task Filter(ConsumeContext<IOrderCreated> context, IConsumeFilter<IOrderCreated, IOrderCreated> next)
{
var message = context.Message;
if (message.Total <= 0)
{
throw new ArgumentException("Order total must be positive", nameof(message.Total));
}
if (string.IsNullOrEmpty(message.CustomerId.ToString()))
{
throw new ArgumentException("Customer ID is required", nameof(message.CustomerId));
}
_logger.LogDebug("Validation passed for order {OrderId}", message.OrderId);
await next.Filter(context);
}
}
// Enrichment step
public class EnrichmentStep : IConsumeFilter<IOrderCreated, IOrderCreated>
{
public async Task Filter(ConsumeContext<IOrderCreated> context, IConsumeFilter<IOrderCreated, IOrderCreated> next)
{
// Add processing metadata
var enriched = new OrderCreatedEnriched
{
OrderId = context.Message.OrderId,
CustomerId = context.Message.CustomerId,
Total = context.Message.Total,
ProcessedAt = DateTime.UtcNow,
ProcessedBy = Environment.MachineName
};
// Replace message with enriched version
var enrichedContext = context.Clone<OrderCreatedEnriched>(enriched);
await next.Filter(enrichedContext);
}
}
// Registration
x.UsingRabbitMq((context, cfg) =>
{
cfg.Host("rabbitmq://localhost");
cfg.UseConsumeFilter(typeof(ValidationStep), context);
cfg.UseConsumeFilter(typeof(EnrichmentStep), context);
cfg.ConfigureEndpoints(context);
});Практика
Message Contracts
public interface IOrderCreated
{
Guid OrderId { get; }
Guid CustomerId { get; }
decimal Total { get; }
string Currency { get; }
DateTimeOffset Timestamp { get; }
}
public interface IPaymentRequested
{
Guid OrderId { get; }
decimal Amount { get; }
string Currency { get; }
}
public interface IPaymentCompleted
{
Guid OrderId { get; }
DateTime CompletedAt { get; }
string TransactionId { get; }
}
public interface IPaymentFailed
{
Guid OrderId { get; }
string Reason { get; }
DateTime FailedAt { get; }
}
public interface IOrderShipped
{
Guid OrderId { get; }
string TrackingNumber { get; }
DateTime ShippedAt { get; }
}
public interface IOrderCancelled
{
Guid OrderId { get; }
string Reason { get; }
DateTime CancelledAt { get; }
}Saga Configuration
public class OrderSagaRegistration : IRegistration
{
public void Configure(IBusRegistrationConfigurator configurator)
{
configurator.AddSaga<OrderSaga>();
configurator.AddSagaRepository<OrderSagaState>();
}
}Consumer
public class OrderCreatedConsumer : IConsumer<IOrderCreated>
{
private readonly ILogger<OrderCreatedConsumer> _logger;
private readonly IBus _bus;
public OrderCreatedConsumer(ILogger<OrderCreatedConsumer> logger, IBus bus)
{
_logger = logger;
_bus = bus;
}
public async Task Consume(ConsumeContext<IOrderCreated> context)
{
_logger.LogInformation("Processing order {OrderId}", context.Message.OrderId);
// Publish payment request — saga will pick it up
await _bus.Publish(new PaymentRequested
{
OrderId = context.Message.OrderId,
Amount = context.Message.Total,
Currency = context.Message.Currency
}, context.CancellationToken);
}
}Checklist
- [x] MassTransit abstractions: IBus, IPublishEndpoint, ISendEndpoint
- [x] Message contracts: interfaces vs classes, inheritance handling
- [x] Consumer configuration: concurrency, error handling, retry policies
- [x] Saga state machine: long-running workflow orchestration with persistence
- [x] In-memory vs RabbitMQ transport
- [x] Message pipeline: validation, enrichment, routing steps
Apache Kafka для .NET
Kafka Fundamentals
Kafka — distributed event streaming platform, спроектированный для high-throughput, fault-tolerant потоковой обработки данных.
Архитектура
Producer → Broker 1 (Topic: orders, Partition 0) → Replicas: [1, 2, 3]
→ Broker 2 (Topic: orders, Partition 1) → Replicas: [2, 3, 1]
→ Broker 3 (Topic: orders, Partition 2) → Replicas: [3, 1, 2]
Consumer Group:
Consumer A ← Partition 0
Consumer B ← Partition 1
Consumer C ← Partition 2Ключевые термины
| Термин | Описание |
|---|---|
| Topic | Логический канал для сообщений |
| Partition | Физический сегмент topic, ordered & immutable sequence |
| Broker | Сервер в Kafka cluster |
| Replica | Копия partition для fault tolerance |
| Leader | Broker, обрабатывающий read/write для partition |
| Follower | Broker, реплицирующий данные от leader |
| Consumer Group | Группа consumers, разделяющих нагрузку |
| Offset | Позиция сообщения в partition (monotonically increasing) |
| Producer | Клиент, записывающий в topic |
Topics, Partitions, Replicas
Data Distribution Model
Topic: "orders"
├── Partition 0 (Leader: Broker 1, Replicas: [1, 2, 3], ISR: [1, 2])
│ ├── Offset 0: Order#1
│ ├── Offset 1: Order#2
│ └── Offset 2: Order#3
├── Partition 1 (Leader: Broker 2, Replicas: [2, 3, 1], ISR: [2, 3, 1])
│ ├── Offset 0: Order#4
│ └── Offset 1: Order#5
└── Partition 2 (Leader: Broker 3, Replicas: [3, 1, 2], ISR: [3, 1, 2])
├── Offset 0: Order#6
└── Offset 1: Order#7Partitioning Strategy
// Confluent.Kafka Producer
var producerConfig = new ProducerConfig
{
BootstrapServers = "localhost:9092",
Acks = Acks.All, // Все реплики подтверждают
Idempotence = true, // Exactly-once semantics
MaxInFlightRequests = 5, // При idempotence: 1-5
Retries = int.MaxValue, // Автоматические retry
RetryBackoffMs = 100,
EnableIdempotence = true // Idempotent producer
};
using var producer = new ProducerBuilder<string, string>(producerConfig)
.Build();
// Partition key — определяет partition через hash(key) % numPartitions
producer.ProduceAsync(
"orders",
new Message<string, string>
{
Key = orderId.ToString(), // Один order → всегда одна partition
Value = JsonSerializer.Serialize(order),
Headers = new[]
{
new Header("event-type", Encoding.UTF8.GetBytes("OrderCreated")),
new Header("source", Encoding.UTF8.GetBytes("order-service"))
}
});Replication Factor
| Factor | Survives | Min ISR | Notes |
|---|---|---|---|
| 1 | 0 brokers | 1 | No fault tolerance |
| 2 | 1 broker | 1 | Minimum for HA |
| 3 | 2 brokers | 2 | Standard production |
| 5 | 4 brokers | 3 | Maximum recommended |
min.insync.replicas=2 + acks=all = гарантированная durabilityConsumer Groups
Parallel Consumption
Topic: "orders" (3 partitions)
Consumer Group: "order-processors"
Consumer A (instance 1) → Partitions [0, 1]
Consumer B (instance 2) → Partition [2]
Consumer Group: "analytics"
Consumer X (instance 1) → Partitions [0, 1, 2] (read-only, separate group)Offset Management
// Consumer config
var consumerConfig = new ConsumerConfig
{
BootstrapServers = "localhost:9092",
GroupId = "order-processors",
AutoOffsetReset = AutoOffsetReset.Earliest, // При отсутствии offset
EnableAutoCommit = false, // Manual commit
SessionTimeoutMs = 10000,
HeartbeatIntervalMs = 3000,
MaxPollIntervalMs = 300000,
EnablePartitionEof = true, // Уведомление о конце partition
MaxPollRecords = 500, // Batch size
EnableAutoOffsetStore = false // Fine-grained offset control
};
using var consumer = new ConsumerBuilder<string, string>(consumerConfig)
.SetPartitionsAssignedHandler((c, partitions) =>
{
// Настройка partition assignment
c.Assign(partitions);
return partitions;
})
.SetErrorHandler((_, error) =>
{
Logger.LogError("Kafka error: {Code} {Reason}", error.Code, error.Reason);
})
.SetConsumeErrorHandler((_, error, message) =>
{
Logger.LogError(error, "Consume error on message");
})
.Build();
// Subscribe и polling
consumer.Subscribe("orders");
while (!ct.IsCancellationRequested)
{
try
{
var result = consumer.Consume(ct);
// Process message
await ProcessMessageAsync(result.Message);
// Manual commit — ПОСЛЕ успешной обработки
consumer.Commit(result);
}
catch (ConsumeException ex)
{
Logger.LogError(ex, "Consume error");
}
}Offset Commit: Auto vs Manual
| Strategy | Pros | Cons |
|---|---|---|
| Auto commit | Simple | Может потерять сообщения (commit до обработки) |
| Manual commit | Exactly-once processing | Сложнее, risk of stuck offsets |
| Store + Commit | Transactional | Requires transaction support |
// Manual commit — recommended для production
consumer.Subscribe("orders");
while (!ct.IsCancellationRequested)
{
var result = consumer.Consume(ct);
try
{
await ProcessMessageAsync(result.Message);
consumer.Commit(result); // Commit ПОСЛЕ обработки
}
catch (ProcessingException)
{
// Не commit — сообщение будет redelivered
Logger.LogError("Processing failed, message will be retried");
}
}Producer Semantics
At-Most-Once
var config = new ProducerConfig
{
BootstrapServers = "localhost:9092",
Acks = Acks.None, // Не ждём подтверждения
Idempotence = false,
Retries = 0
};
// Сообщение может потеряться при failAt-Least-Once
var config = new ProducerConfig
{
BootstrapServers = "localhost:9092",
Acks = Acks.All, // Все реплики подтверждают
Idempotence = true, // Предотвращает дубликаты при retry
Retries = int.MaxValue,
MaxInFlightRequests = 5
};
// Гарантирует доставку, возможны дубликаты при retry
// (но idempotence предотвращает дубликаты в пределах 5 concurrent requests)Exactly-Once
var config = new ProducerConfig
{
BootstrapServers = "localhost:9092",
Acks = Acks.All,
Idempotence = true,
Retries = int.MaxValue,
MaxInFlightRequests = 5,
// Transactional
TransactionalId = "order-service-1"
};
using var producer = new ProducerBuilder<string, string>(config).Build();
producer.InitTransactions(TimeSpan.FromSeconds(10));
producer.BeginTransaction();
try
{
// Write to topic
producer.ProduceAsync("orders", new Message<string, string>
{
Key = orderId.ToString(),
Value = json
}).GetAwaiter().GetResult();
// Commit transaction — атомарно
producer.CommitTransaction(TimeSpan.FromSeconds(30));
}
catch (Exception)
{
producer.AbortTransaction(TimeSpan.FromSeconds(30));
throw;
}Offset Commits
Automatic vs Manual
// Auto commit — simple, но risk
var config = new ConsumerConfig
{
BootstrapServers = "localhost:9092",
GroupId = "simple-consumer",
EnableAutoCommit = true,
AutoCommitIntervalMs = 5000 // Каждые 5 секунд
};
// Manual commit — safe, но сложнее
var config = new ConsumerConfig
{
BootstrapServers = "localhost:9092",
GroupId = "reliable-consumer",
EnableAutoCommit = false // Полностью ручной
};Transactional Offsets
// Consumer с transactional processing
var consumerConfig = new ConsumerConfig
{
BootstrapServers = "localhost:9092",
GroupId = "transactional-consumer",
EnableAutoCommit = false,
IsolationLevel = IsolationLevel.ReadCommitted // Только committed messages
};
using var consumer = new ConsumerBuilder<string, string>(consumerConfig).Build();
consumer.Subscribe("orders");
while (!ct.IsCancellationRequested)
{
var result = consumer.Consume(ct);
using var transaction = new Transaction();
try
{
// Process and write to database
await _dbContext.Orders.AddAsync(DeserializeOrder(result.Message));
await _dbContext.SaveChangesAsync();
// Commit offset — атомарно с DB transaction
consumer.Commit(result);
}
catch
{
transaction.Rollback();
// Не commit — message redelivered
}
}Schema Registry
Schema Evolution
// Confluent.SchemaRegistry
var schemaRegistryConfig = new SchemaRegistryConfig
{
Url = "http://localhost:8081",
CacheSize = 1000
};
var schemaRegistry = new CachedSchemaRegistryClient(schemaRegistryConfig);
// Producer с Avro serialization
var producerConfig = new ProducerConfig
{
BootstrapServers = "localhost:9092",
ValueSerializer = SerializationType.Json
};
using var producer = new ProducerBuilder<string, Order>(producerConfig)
.SetValueSerializer(new JsonSerializer<Order>(schemaRegistry, new JsonSerializeConfig
{
SubjectNameStrategy = SubjectNameStrategy.TopicRecord // orders-Order
}))
.Build();Schema Evolution Strategies
Backward Compatible:
V1: {id, name, price}
V2: {id, name, price, discount} // Добавляем поле с default → V1 consumers OK
Forward Compatible:
V1: {id, name, price}
V2: {id, name, price, discount?} // Убираем обязательное поле → V2 consumers OK
Full Compatible:
V1: {id, name, price}
V2: {id, name, price, discount?} // Оба направления → V1 и V2 consumers OK
Breaking Change:
V1: {id, name, price}
V3: {id, name, total, items} // Изменяем структуру → NEW TOPIC или VERSIONINGBackward-Compatible Changes
// V1
public record OrderV1(Guid Id, string Name, decimal Price);
// V2 — backward compatible (добавляем optional fields)
public record OrderV2(
Guid Id,
string Name,
decimal Price,
decimal? Discount = null,
string? Currency = "USD");
// V3 — backward compatible (добавляем collection)
public record OrderV3(
Guid Id,
string Name,
decimal Price,
decimal? Discount = null,
string Currency = "USD",
IReadOnlyList<OrderItemV3>? Items = null);
public record OrderItemV3(Guid ProductId, string ProductName, int Quantity, decimal UnitPrice);Практика
public class KafkaOrderProducer
{
private readonly IProducer<string, string> _producer;
private readonly ILogger<KafkaOrderProducer> _logger;
public KafkaOrderProducer(ILogger<KafkaOrderProducer> logger)
{
var config = new ProducerConfig
{
BootstrapServers = "kafka://localhost:9092",
Acks = Acks.All,
Idempotence = true,
MaxInFlightRequests = 5,
Retries = int.MaxValue,
RetryBackoffMs = 100,
EnableIdempotence = true,
CompressionType = CompressionType.Lz4, // Оптимизация bandwidth
BatchSize = 64000, // 64KB batch
LingerMs = 10, // Wait 10ms for batching
BufferMemory = 33554432 // 32MB buffer
};
_producer = new ProducerBuilder<string, string>(config)
.SetErrorHandler((_, error) =>
_logger.LogError("Kafka error: {Code} {Reason}", error.Code, error.Reason))
.SetProductionHandler((message, deliveryReport) =>
{
if (deliveryReport.Status == PersistenceStatus.Persisted)
{
_logger.LogDebug(
"Message delivered: topic={Topic} partition={Partition} offset={Offset}",
deliveryReport.Topic, deliveryReport.Partition, deliveryReport.Offset);
}
else
{
_logger.LogError(
"Message delivery failed: {Error}", deliveryReport.Error.Reason);
}
})
.Build();
_logger.LogInformation("Kafka producer initialized");
}
public async Task PublishOrderCreatedAsync(OrderCreatedEvent order)
{
var message = new Message<string, string>
{
Key = order.OrderId.ToString(),
Value = JsonSerializer.Serialize(order),
Headers = new[]
{
new Header("event-type", Encoding.UTF8.GetBytes("OrderCreated")),
new Header("event-version", Encoding.UTF8.GetBytes("v1")),
new Header("timestamp", Encoding.UTF8.GetBytes(DateTimeOffset.UtcNow.ToUnixTimeMilliseconds().ToString()))
},
Timestamp = new Timestamp(DateTimeOffset.UtcNow.ToUnixTimeMilliseconds(), TimestampType.CreateTime)
};
try
{
var result = await _producer.ProduceAsync("orders", message);
_logger.LogInformation(
"Order {OrderId} published: topic={Topic} partition={Partition} offset={Offset}",
order.OrderId, result.Topic, result.Partition, result.Offset);
}
catch (ProduceException ex)
{
_logger.LogError(ex, "Failed to publish order {OrderId}", order.OrderId);
throw;
}
}
}Kafka Consumer с Manual Offset Commit
public class KafkaOrderConsumer : BackgroundService
{
private readonly IConsumer<string, string> _consumer;
private readonly ILogger<KafkaOrderConsumer> _logger;
private readonly IProcessOrder _processor;
public KafkaOrderConsumer(
IConsumer<string, string> consumer,
ILogger<KafkaOrderConsumer> logger,
IProcessOrder processor)
{
_consumer = consumer;
_logger = logger;
_processor = processor;
}
protected override Task ExecuteAsync(CancellationToken stoppingToken)
{
_consumer.Subscribe("orders");
return Task.Run(() => ConsumeLoop(stoppingToken), stoppingToken);
}
private void ConsumeLoop(CancellationToken ct)
{
while (!ct.IsCancellationRequested)
{
try
{
var result = _consumer.Consume(ct);
try
{
_processor.ProcessOrderAsync(result.Message.Value, ct).GetAwaiter().GetResult();
_consumer.Commit(result); // Commit AFTER processing
}
catch (Exception ex)
{
_logger.LogError(ex, "Processing failed for offset {Offset}", result.Message.Offset);
// Don't commit — message will be redelivered
}
}
catch (ConsumeException ex)
{
_logger.LogError(ex, "Consume error: {Code}", ex.Error.Code);
if (ex.Error.IsFatal)
{
break;
}
}
catch (OperationCanceledException)
{
_consumer.Close();
break;
}
}
}
}Сводная таблица: Kafka vs RabbitMQ
| Feature | Kafka | RabbitMQ |
|---|---|---|
| Model | Log-based | Queue-based |
| Retention | Time/size-based | Until consumed |
| Throughput | Very high (MB/s) | High (KB/s) |
| Latency | Higher (ms) | Lower (µs) |
| Ordering | Per-partition | Per-queue |
| Message size | Large (MB) | Small (KB) |
| Use case | Event streaming, analytics | Task queues, RPC |
| Consumer groups | Built-in | Not native |
| Replay | Yes (seek to offset) | No |
Checklist
- [x] Kafka producer с Confluent.Kafka и idempotent delivery
- [x] Consumer group с manual offset commit и error handling
- [x] Schema evolution strategy с backward-compatible changes
Reliable Messaging Patterns
Outbox Pattern
Проблема
Типичная проблема: запись в БД + отправка сообщения в broker — не атомарные операции.
// BAD: Two-phase commit
Transaction {
db.Save(order); // Успешно
}
// CRASH здесь → сообщение не отправлено
broker.Publish(orderCreated); // Не выполняетсяРешение: Outbox Table
Transaction {
db.Save(order); // Запись в основную таблицу
db.Save(outboxMessage); // Запись в outbox table
} // Атомарно!Архитектура
┌─────────────┐ ┌──────────────┐ ┌─────────────┐ ┌──────────┐
│ Order │ │ Database │ │ Outbox │ │ Message │
│ Service │────▶│ (EF Core) │────▶│ Poller │────▶│ Broker │
│ │ │ │ │ (CDC/Poll) │ │ (Rabbit) │
└─────────────┘ └──────────────┘ └─────────────┘ └──────────┘
│
├── Orders table
└── OutboxMessages tableРеализация с EF Core + MassTransit
// 1. Outbox Message Entity
public class OutboxMessage
{
public Guid Id { get; set; }
public string Type { get; set; } = null!;
public string ContentType { get; set; } = "application/json";
public string Payload { get; set; } = null!;
public DateTime CreatedAt { get; set; } = DateTime.UtcNow;
public DateTime? PublishedAt { get; set; }
public int? RetryCount { get; set; }
public string? Error { get; set; }
//EF Core mapping
public static void Configure(ModelBuilder modelBuilder)
{
modelBuilder.Entity<OutboxMessage>(entity =>
{
entity.HasKey(e => e.Id);
entity.HasIndex(e => new { e.PublishedAt, e.Id });
entity.Property(e => e.Payload).HasColumnType("nvarchar(max)");
});
}
}
// 2. DbContext с Outbox
public class AppDbContext : DbContext
{
public DbSet<Order> Orders { get; set; }
public DbSet<OutboxMessage> OutboxMessages { get; set; }
public override async Task<int> SaveChangesAsync(CancellationToken cancellationToken = default)
{
var result = await base.SaveChangesAsync(cancellationToken);
// После сохранения — publish outbox messages
await PublishOutboxMessagesAsync(cancellationToken);
return result;
}
private async Task PublishOutboxMessagesAsync(CancellationToken ct)
{
var unpublished = await OutboxMessages
.Where(m => m.PublishedAt == null)
.ToListAsync(ct);
foreach (var message in unpublished)
{
try
{
var envelope = new OutboxEnvelope
{
MessageId = message.Id,
MessageType = message.Type,
ContentType = message.ContentType,
Payload = message.Payload,
CreatedAt = message.CreatedAt
};
// MassTransit publish через current bus
var bus = Services.GetRequiredService<IBus>();
await bus.Publish(envelope, ct);
message.PublishedAt = DateTime.UtcNow;
}
catch (Exception ex)
{
message.RetryCount = (message.RetryCount ?? 0) + 1;
message.Error = ex.Message;
if (message.RetryCount >= 5)
{
// Max retries — mark as failed
message.PublishedAt = DateTime.UtcNow;
}
}
}
await base.SaveChangesAsync(ct);
}
}
// 3. Использование
public class OrderService
{
private readonly AppDbContext _context;
private readonly IBus _bus;
public async Task CreateOrderAsync(CreateOrderCommand cmd)
{
var order = new Order
{
Id = Guid.NewGuid(),
CustomerId = cmd.CustomerId,
Total = cmd.Total,
Status = OrderStatus.Created
};
_context.Orders.Add(order);
// Outbox message — в той же транзакции!
_context.OutboxMessages.Add(new OutboxMessage
{
Id = Guid.NewGuid(),
Type = nameof(OrderCreated),
Payload = JsonSerializer.Serialize(new
{
order.Id,
order.CustomerId,
order.Total,
Timestamp = DateTime.UtcNow
})
});
await _context.SaveChangesAsync();
// Атомарно: order + outbox message
}
}Outbox Poller (Standalone)
// Standalone outbox poller — для high-throughput
public class OutboxPoller : BackgroundService
{
private readonly IServiceScopeFactory _scopeFactory;
private readonly ILogger<OutboxPoller> _logger;
private readonly TimeSpan _pollInterval = TimeSpan.FromMilliseconds(100);
public OutboxPoller(IServiceScopeFactory scopeFactory, ILogger<OutboxPoller> logger)
{
_scopeFactory = scopeFactory;
_logger = logger;
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
while (!stoppingToken.IsCancellationRequested)
{
try
{
await ProcessOutboxAsync(stoppingToken);
}
catch (Exception ex)
{
_logger.LogError(ex, "Outbox poller error");
}
await Task.Delay(_pollInterval, stoppingToken);
}
}
private async Task ProcessOutboxAsync(CancellationToken ct)
{
using var scope = _scopeFactory.CreateScope();
var context = scope.ServiceProvider.GetRequiredService<AppDbContext>();
var bus = scope.ServiceProvider.GetRequiredService<IBus>();
var batchSize = 100;
var unpublished = await context.OutboxMessages
.Where(m => m.PublishedAt == null && (m.RetryCount == null || m.RetryCount < 5))
.OrderBy(m => m.Id)
.Take(batchSize)
.ToListAsync(ct);
foreach (var message in unpublished)
{
try
{
var envelope = new OutboxEnvelope
{
MessageId = message.Id,
MessageType = message.Type,
Payload = message.Payload
};
await bus.Publish(envelope, ct);
message.PublishedAt = DateTime.UtcNow;
}
catch (Exception ex)
{
message.RetryCount = (message.RetryCount ?? 0) + 1;
message.Error = ex.Message;
// Continue processing other messages
}
}
if (unpublished.Any())
{
await context.SaveChangesAsync(ct);
}
}
}Change Data Capture (CDC)
CDC vs Outbox
| Aspect | Outbox Table | CDC (Debezium/Maxwell) |
|---|---|---|
| Complexity | Low (код) | High (инфраструктура) |
| Performance | DB write overhead | Zero app overhead |
| Vendor lock-in | None | CDC connector specific |
| Ordering | Guaranteed (PK) | Partition-based |
| Latency | Poll interval | Milliseconds |
CDC Pipeline
┌──────────┐ ┌────────────┐ ┌───────────┐ ┌──────────┐
│ Database │────▶│ CDC │────▶│ Kafka │────▶│ Consumers│
│ (MySQL/ │ │ (Debezium) │ │ Topic │ │ │
│ PG) │ │ │ │ │ │ │
└──────────┘ └────────────┘ └───────────┘ └──────────┘
│
├── Reads binlog/wal
├── Emits change events
└── Schema evolution supportIdempotent Consumers
Проблема
При at-least-once delivery consumer может получить одно и то же сообщение дважды.
Consumer receives message #42
↓
Process message (takes 5s)
↓
Consumer crashes BEFORE ack
↓
Broker redelivers message #42 (duplicate!)Deduplication Strategies
1. MessageId-based deduplication table
public class IdempotentProcessor
{
private readonly AppDbContext _context;
private readonly ILogger<IdempotentProcessor> _logger;
private readonly ConcurrentDictionary<string, DateTime> _cache = new();
private static readonly TimeSpan CacheTtl = TimeSpan.FromHours(24);
public IdempotentProcessor(AppDbContext context, ILogger<IdempotentProcessor> logger)
{
_context = context;
_logger = logger;
}
public async Task<bool> TryProcessAsync(string messageId, Func<Task> processAsync)
{
// Fast path: in-memory cache
if (_cache.TryGetValue(messageId, out var processedAt) &&
processedAt > DateTime.UtcNow.AddTicks(-CacheTtl))
{
_logger.LogDebug("Duplicate message skipped (cache): {MessageId}", messageId);
return false;
}
// Slow path: database check
var existing = await _context.ProcessedMessages
.FirstOrDefaultAsync(m => m.MessageId == messageId);
if (existing != null)
{
_logger.LogDebug("Duplicate message skipped (DB): {MessageId}", messageId);
return false;
}
// Mark as processing
await _context.ProcessedMessages.AddAsync(new ProcessedMessage
{
MessageId = messageId,
ProcessedAt = DateTime.UtcNow,
Status = "Processing"
});
await _context.SaveChangesAsync();
try
{
await processAsync();
// Mark as completed
existing = await _context.ProcessedMessages
.FirstOrDefaultAsync(m => m.MessageId == messageId);
existing!.Status = "Completed";
await _context.SaveChangesAsync();
// Add to cache
_cache[messageId] = DateTime.UtcNow;
return true;
}
catch
{
existing!.Status = "Failed";
await _context.SaveChangesAsync();
throw;
}
}
}
// EF Core entity
public class ProcessedMessage
{
public Guid Id { get; set; }
public string MessageId { get; set; } = null!;
public DateTime ProcessedAt { get; set; }
public string Status { get; set; } = null!;
public static void Configure(ModelBuilder modelBuilder)
{
modelBuilder.Entity<ProcessedMessage>(entity =>
{
entity.HasKey(e => e.Id);
entity.HasIndex(e => e.MessageId).IsUnique(); // UNIQUE constraint!
});
}
}2. Consumer с deduplication
public class OrderConsumer : IConsumer<IOrderCreated>
{
private readonly IdempotentProcessor _idempotentProcessor;
private readonly IProcessOrder _orderProcessor;
public async Task Consume(ConsumeContext<IOrderCreated> context)
{
var messageId = context.MessageId?.ToString() ?? Guid.NewGuid().ToString();
var processed = await _idempotentProcessor.TryProcessAsync(messageId, async () =>
{
await _orderProcessor.ProcessOrderAsync(context.Message);
});
if (!processed)
{
// Duplicate — уже обработано, просто ack
return;
}
}
}3. Database-level idempotency
-- PostgreSQL: UPSERT для idempotency
INSERT INTO processed_messages (message_id, processed_at, status)
VALUES (@messageId, @timestamp, 'Completed')
ON CONFLICT (message_id) DO NOTHING;
-- Возвращает 0 строк, если уже существует → duplicate
-- SQL Server: MERGE
MERGE INTO processed_messages WITH (UPDLOCK)
USING (SELECT @messageId AS message_id) AS src
ON processed_messages.message_id = src.message_id
WHEN NOT MATCHED THEN
INSERT (message_id, processed_at, status)
VALUES (@messageId, @timestamp, 'Completed');Message Ordering Guarantees
Per-Partition Ordering
Kafka: Гарантируется per-partition
Partition 0: [msg1, msg2, msg3] — strict order
Partition 1: [msg4, msg5, msg6] — strict order
RabbitMQ: Гарантируется per-queue
Queue "orders": [msg1, msg2, msg3] — strict orderGlobal Ordering Trade-offs
// Kafka: global ordering = single partition
// PRO: Strict global order
// CON: No parallelism, bottleneck
var config = new ProducerConfig
{
BootstrapServers = "localhost:9092",
// Все сообщения в один partition
Partitioner = Partitioner.Consistent // hash(key) → single partition
};
// RabbitMQ: per-queue ordering
// PRO: Simple, guaranteed
// CON: Single consumer bottleneck
await channel.BasicQosAsync(prefetchSize: 0, prefetchCount: 1, global: false);
// prefetch=1 гарантирует: следующее сообщение только после ackPartial Ordering (Recommended)
// Partition by aggregate root
//同一 aggregate — guaranteed order
// Different aggregates — no ordering guarantee (parallel)
// Kafka: Key = OrderId
//同一 order messages → same partition
// Different orders → different partitions → parallel
producer.ProduceAsync("orders", new Message<string, string>
{
Key = orderId.ToString(), //同一 order → same partition
Value = json
});Transactional Outbox vs Two-Phase Commit
| Feature | Transactional Outbox | 2PC (XA) |
|---|---|---|
| Complexity | Low | High |
| Performance | High (no locking) | Low (distributed locks) |
| Availability | High | Low (coordinator dependency) |
| Vendor support | Universal | Limited |
| Latency | Poll interval | Real-time |
| Recommended | Yes (most cases) | Rarely |
Checklist
- [x] Outbox Pattern с EF Core + MassTransit
- [x] Idempotent message processor с deduplication table
- [x] CDC pipeline для database change event generation
Практика
Event Sourcing Integration
Event Store as Message Source
Core Concepts
Aggregate Root: Order
│
├── Stream: order-{OrderId}
│ ├── Event: OrderCreated (version 1)
│ ├── Event: OrderUpdated (version 2)
│ ├── Event: OrderShipped (version 3)
│ └── Event: OrderCompleted (version 4)
│
└── Snapshot: version 100 (full state)Append-Only, Stream per Aggregate
// Event Store entity
public class DomainEvent
{
public Guid Id { get; set; }
public string Type { get; set; } = null!;
public string StreamId { get; set; } = null!;
public int StreamPosition { get; set; }
public int Version { get; set; }
public string Data { get; set; } = null!;
public string? Metadata { get; set; }
public DateTimeOffset CreatedAt { get; set; } = DateTimeOffset.UtcNow;
public string? CreatedBy { get; set; }
}
// Append-only: never update/delete events
public class EventStore
{
private readonly AppDbContext _context;
public async Task AppendAsync<TEvent>(string streamId, TEvent @event, int expectedVersion, string? userId = null)
where TEvent : class
{
var currentVersion = await _context.Events
.Where(e => e.StreamId == streamId)
.Select(e => e.Version)
.DefaultIfEmpty()
.MaxAsync();
if (currentVersion != expectedVersion)
{
throw new ConcurrencyException(
$"Expected version {expectedVersion}, but was {currentVersion}");
}
var domainEvent = new DomainEvent
{
Id = Guid.NewGuid(),
Type = typeof(TEvent).Name,
StreamId = streamId,
StreamPosition = await GetMaxStreamPositionAsync(),
Version = currentVersion + 1,
Data = JsonSerializer.Serialize(@event),
CreatedAt = DateTimeOffset.UtcNow,
CreatedBy = userId
};
_context.Events.Add(domainEvent);
await _context.SaveChangesAsync();
}
private async Task<int> GetMaxStreamPositionAsync()
{
return await _context.Events
.Select(e => e.StreamPosition)
.DefaultIfEmpty()
.MaxAsync();
}
}Projection Builders
Catch-up Subscription Pattern
Event Stream Projection Builder Read Model
│ │ │
├── OrderCreated ──────▶│──▶ INSERT INTO orders ──▶ SELECT *
├── OrderUpdated ──────▶│──▶ UPDATE orders ───────▶ JOIN ...
├── OrderShipped ──────▶│──▶ UPDATE orders ───────▶ LEFT JOIN shipments
└── OrderCompleted ────▶│──▶ UPDATE orders ───────▶ WHERE status = 'completed'
Real-time: Event Store → Stream → Projection
Catch-up: Event Store → Replay all → ProjectionCatch-up Subscription
public class OrderProjectionBuilder : BackgroundService
{
private readonly EventStore _eventStore;
private readonly IMediator _mediator;
private readonly ILogger<OrderProjectionBuilder> _logger;
private int _lastProcessedPosition = 0;
public OrderProjectionBuilder(
EventStore eventStore,
IMediator mediator,
ILogger<OrderProjectionBuilder> logger)
{
_eventStore = eventStore;
_mediator = mediator;
_logger = logger;
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
_logger.LogInformation("Projection builder starting from position {Position}", _lastProcessedPosition);
while (!stoppingToken.IsCancellationRequested)
{
try
{
// Catch-up: get all events since last processed
var events = await _eventStore.GetEventsAsync(
fromPosition: _lastProcessedPosition,
batchSize: 100,
stoppingToken);
if (!events.Any())
{
await Task.Delay(100, stoppingToken);
continue;
}
foreach (var @event in events)
{
try
{
await ProcessEventAsync(@event, stoppingToken);
_lastProcessedPosition = @event.StreamPosition;
}
catch (Exception ex)
{
_logger.LogError(ex, "Failed to process event at position {Position}", @event.StreamPosition);
// Don't advance — will retry
await Task.Delay(1000, stoppingToken);
break;
}
}
}
catch (Exception ex)
{
_logger.LogError(ex, "Projection builder error");
await Task.Delay(5000, stoppingToken);
}
}
}
private async Task ProcessEventAsync(DomainEvent @event, CancellationToken ct)
{
var eventType = Type.GetType(@event.Type) ?? throw new InvalidOperationException($"Unknown event type: {@event.Type}");
dynamic? deserialized = eventType == typeof(OrderCreated)
? JsonSerializer.Deserialize<OrderCreatedEvent>(@event.Data)
: eventType == typeof(OrderUpdated)
? JsonSerializer.Deserialize<OrderUpdatedEvent>(@event.Data)
: eventType == typeof(OrderShipped)
? JsonSerializer.Deserialize<OrderShippedEvent>(@event.Data)
: eventType == typeof(OrderCompleted)
? JsonSerializer.Deserialize<OrderCompletedEvent>(@event.Data)
: null;
// Route to appropriate projection handler
switch (deserialized)
{
case OrderCreatedEvent created:
await _mediator.Send(new CreateOrderProjection(created), ct);
break;
case OrderUpdatedEvent updated:
await _mediator.Send(new UpdateOrderProjection(updated), ct);
break;
case OrderShippedEvent shipped:
await _mediator.Send(new ShipOrderProjection(shipped), ct);
break;
case OrderCompletedEvent completed:
await _mediator.Send(new CompleteOrderProjection(completed), ct);
break;
}
_logger.LogDebug("Processed event: {Type} at position {Position}", @event.Type, @event.StreamPosition);
}
}
// Real-time subscription (alternative: using MassTransit or event store native)
public class RealTimeProjectionConsumer : IConsumer<IDomainEvent>
{
public async Task Consume(ConsumeContext<IDomainEvent> context)
{
var @event = context.Message;
// Process in real-time
await ApplyEventAsync(@event);
}
private Task ApplyEventAsync(IDomainEvent @event) => Task.CompletedTask;
}Read Model Builder
// CQRS: Events → Read Model
public class OrderReadModel
{
public Guid Id { get; set; }
public Guid CustomerId { get; set; }
public decimal Total { get; set; }
public string Status { get; set; } = null!;
public string? TrackingNumber { get; set; }
public DateTime? ShippedAt { get; set; }
public DateTime? CompletedAt { get; set; }
public List<OrderItemReadModel> Items { get; set; } = new();
}
// Projection handlers
public class CreateOrderProjection : IRequest
{
public OrderCreatedEvent Event { get; }
public CreateOrderProjection(OrderCreatedEvent @event) => Event = @event;
}
public class CreateOrderProjectionHandler : IRequestHandler<CreateOrderProjection>
{
private readonly AppDbContext _context;
public async Task Handle(CreateOrderProjection request, CancellationToken ct)
{
var readModel = new OrderReadModel
{
Id = request.Event.OrderId,
CustomerId = request.Event.CustomerId,
Total = request.Event.Total,
Status = "Created",
Items = request.Event.Items.Select(i => new OrderItemReadModel
{
ProductId = i.ProductId,
Name = i.Name,
Quantity = i.Quantity,
Price = i.Price
}).ToList()
};
_context.OrderReadModels.Add(readModel);
await _context.SaveChangesAsync(ct);
}
}
public class UpdateOrderProjection : IRequest
{
public OrderUpdatedEvent Event { get; }
public UpdateOrderProjection(OrderUpdatedEvent @event) => Event = @event;
}
public class UpdateOrderProjectionHandler : IRequestHandler<UpdateOrderProjection>
{
private readonly AppDbContext _context;
public async Task Handle(UpdateOrderProjection request, CancellationToken ct)
{
var readModel = await _context.OrderReadModels
.FirstOrDefaultAsync(m => m.Id == request.Event.OrderId, ct);
if (readModel != null)
{
readModel.Status = request.Event.NewStatus;
readModel.Total = request.Event.Total;
await _context.SaveChangesAsync(ct);
}
}
}Event Versioning
Handling Breaking Changes
// Version 1
public record OrderCreatedV1(Guid OrderId, Guid CustomerId, decimal Total);
// Version 2 — backward compatible (added optional field)
public record OrderCreatedV2(Guid OrderId, Guid CustomerId, decimal Total, string? Currency);
// Version 3 — breaking change (new structure)
public record OrderCreatedV3(
Guid OrderId,
Guid CustomerId,
IReadOnlyList<OrderItemV3> Items,
string Currency);
public record OrderItemV3(Guid ProductId, string Name, int Quantity, decimal UnitPrice);
// Event versioning handler
public class EventVersioningHandler
{
private readonly Dictionary<string, Func<string, object>> _voters = new()
{
["OrderCreatedV1"] = json => JsonSerializer.Deserialize<OrderCreatedV1>(json)!,
["OrderCreatedV2"] = json => JsonSerializer.Deserialize<OrderCreatedV2>(json)!,
["OrderCreatedV3"] = json => JsonSerializer.Deserialize<OrderCreatedV3>(json)!,
};
public object DeserializeEvent(string eventType, string json)
{
if (_voters.TryGetValue(eventType, out var voter))
{
return voter(json);
}
throw new InvalidOperationException($"Unknown event type: {eventType}");
}
// Migration: V1 → V2
public OrderCreatedV2 MigrateV1ToV2(OrderCreatedV1 v1)
{
return new OrderCreatedV2(
v1.OrderId,
v1.CustomerId,
v1.Total,
Currency: "USD" // Default value
);
}
// Migration: V2 → V3
public OrderCreatedV3 MigrateV2ToV3(OrderCreatedV2 v2)
{
return new OrderCreatedV3(
v2.OrderId,
v2.CustomerId,
items: new List<OrderItemV3>(), // Empty — legacy
v2.Currency ?? "USD"
);
}
}Backward Compatibility Layer
public class BackwardCompatibilityLayer
{
private readonly EventVersioningHandler _versioning;
public BackwardCompatibilityLayer(EventVersioningHandler versioning)
{
_versioning = versioning;
}
public T DeserializeAndNormalize<T>(string eventType, string json) where T : class
{
// Deserialize original version
var original = _versioning.DeserializeEvent(eventType, json);
// Migrate to latest
var normalized = original switch
{
OrderCreatedV1 v1 => _versioning.MigrateV1ToV2(v1),
OrderCreatedV2 v2 => _versioning.MigrateV2ToV3(v2),
OrderCreatedV3 v3 => v3, // Already latest
_ => throw new InvalidOperationException($"Cannot migrate {eventType}")
};
// Serialize as latest version
return JsonSerializer.Deserialize<T>(JsonSerializer.Serialize(normalized))!;
}
}Snapshot Strategy
Problem: Large Stream Replay
Stream with 50,000 events:
Replay all 50k events → Apply each → Build state → Very slow!
With snapshot at every 1000 events:
Load snapshot (version 49,000) → Replay 1,000 events → Build state → Fast!Snapshot Implementation
public class SnapshotStore
{
private readonly AppDbContext _context;
private readonly int _snapshotInterval = 1000;
public async Task<bool> ShouldSnapshotAsync(string streamId, int currentVersion)
{
return currentVersion % _snapshotInterval == 0;
}
public async Task SaveSnapshotAsync(string streamId, int version, byte[] state)
{
var snapshot = await _context.Snapshots
.FirstOrDefaultAsync(s => s.StreamId == streamId, CancellationToken.None);
if (snapshot != null)
{
snapshot.Version = version;
snapshot.Data = state;
snapshot.CreatedAt = DateTimeOffset.UtcNow;
}
else
{
_context.Snapshots.Add(new SnapshotRecord
{
StreamId = streamId,
Version = version,
Data = state,
CreatedAt = DateTimeOffset.UtcNow
});
}
await _context.SaveChangesAsync(CancellationToken.None);
}
public async Task<byte[]?> LoadSnapshotAsync(string streamId)
{
var snapshot = await _context.Snapshots
.Where(s => s.StreamId == streamId)
.OrderByDescending(s => s.Version)
.FirstOrDefaultAsync(CancellationToken.None);
return snapshot?.Data;
}
}
public class SnapshotRecord
{
public Guid Id { get; set; }
public string StreamId { get; set; } = null!;
public int Version { get; set; }
public byte[] Data { get; set; } = default!;
public DateTimeOffset CreatedAt { get; set; }
}Snapshot-based Aggregate Replay
public class OrderAggregate
{
private readonly List<IDomainEvent> _uncommittedEvents = new();
private int _version = 0;
public async Task LoadAsync(string streamId, EventStore eventStore, SnapshotStore snapshotStore)
{
// Load snapshot first
var snapshot = await snapshotStore.LoadSnapshotAsync(streamId);
if (snapshot != null)
{
// Deserialize snapshot state
var state = JsonSerializer.Deserialize<OrderState>(snapshot);
ApplyState(state!);
_version = state!.Version;
// Load only events after snapshot
var events = await eventStore.GetEventsAfterVersionAsync(streamId, _version);
foreach (var @event in events)
{
ApplyEvent(@event);
}
}
else
{
// No snapshot — replay all
var events = await eventStore.GetAllEventsAsync(streamId);
foreach (var @event in events)
{
ApplyEvent(@event);
}
}
}
public async Task SaveAsync(EventStore eventStore, SnapshotStore snapshotStore)
{
foreach (var @event in _uncommittedEvents)
{
await eventStore.AppendAsync(
StreamId,
@event,
_version,
userId: null);
_version++;
}
_uncommittedEvents.Clear();
// Create snapshot if needed
if (await snapshotStore.ShouldSnapshotAsync(StreamId, _version))
{
var state = SerializeState();
await snapshotStore.SaveSnapshotAsync(StreamId, _version, state);
}
}
private void ApplyEvent(IDomainEvent @event)
{
switch (@event.Type)
{
case "OrderCreated":
var created = JsonSerializer.Deserialize<OrderCreatedEvent>(@event.Data);
_orderId = created!.OrderId;
_customerId = created.CustomerId;
_total = created.Total;
_status = "Created";
break;
case "OrderUpdated":
var updated = JsonSerializer.Deserialize<OrderUpdatedEvent>(@event.Data);
_total = updated!.Total;
_status = updated.NewStatus;
break;
// ... more event types
}
}
private byte[] SerializeState() =>
JsonSerializer.SerializeToBytes(new OrderState { Version = _version, /* ... */ });
private void ApplyState(OrderState state)
{
_version = state.Version;
// Restore state
}
}
public record OrderState(
int Version,
Guid OrderId,
Guid CustomerId,
decimal Total,
string Status);Snapshot Performance
| Events | Full Replay | With Snapshot (every 1k) |
|---|---|---|
| 100 | 1ms | 1ms (no snapshot yet) |
| 1,000 | 10ms | 10ms (snapshot at 1k) |
| 10,000 | 100ms | 1ms (snapshot) + 1ms (1k events) |
| 50,000 | 500ms | 1ms (snapshot) + 1ms (1k events) |
| 100,000 | 1s | 1ms (snapshot) + 1ms (1k events) |
Checklist
- [x] Projection builder с catch-up subscription pattern
- [x] Event versioning handler с backward compatibility layer
- [x] Snapshot mechanism для aggregate с 10k+ events
Практика
Distributed Event Architecture
Multi-Broker Topology
Federation
Federation связывает несколько RabbitMQ кластеров в единую mesh-сеть.
Data Center 1 Data Center 2
┌─────────────────┐ ┌─────────────────┐
│ DC1-RabbitMQ │ │ DC2-RabbitMQ │
│ │ │ │
│ Exchange: A ───┼──FED──┼──▶ Exchange: A │
│ Queue: Q1 │ │ Queue: Q2 │
│ │ │ │
└─────────────────┘ └─────────────────┘// Federation plugin configuration (RabbitMQ management UI)
// POST /api/plugins
{
"plugins": ["rabbitmq_federation", "rabbitmq_federation_management"]
}
// Federation upstream
{
"vhost": "/",
"name": "dc2-upstream",
"queue": "orders",
"exchange": "orders",
"uri": "amqp://dc2-rabbitmq.example.com:5672",
"ack-mode": "on-confirm",
"max-hops": 1,
"prefetch-count": 1000,
"reconnect-delay": 5
}Mirroring (Classic Mirrors)
Primary Queue: orders
├── Replica on Broker 1 (Leader)
├── Replica on Broker 2 (Follower)
└── Replica on Broker 3 (Follower)
Write: Leader only → replicate to followers
Read: Any node (but prefer leader)// Policy-based mirroring
// rabbitmqctl set_policy ha-all ".*" '{"ha-mode":"all","ha-sync-mode":"automatic"}' --apply-to queues
// Alternative: Quorum Queues (recommended over classic mirrors)
await channel.QueueDeclareAsync(
queue: "orders",
durable: true,
exclusive: false,
autoDelete: false,
arguments: new Dictionary<string, object>
{
["x-queue-type"] = "quorum" // Raft-based quorum queue
});Cross-Datacenter Replication
DC1 (Primary) DC2 (Secondary)
┌──────────────┐ ┌──────────────┐
│ RabbitMQ │◀───async───────│ RabbitMQ │
│ Cluster │ replication │ Cluster │
│ │ │ │
│ Write here │ │ Read-only │
└──────────────┘ └──────────────┘
│ │
▼ ▼
Services ServicesEvent Mesh
Azure Event Grid
// Azure Event Grid publisher
var client = new EventGridPublisherClient(
new Uri("https://<topic>.westus2-1.eventgrid.azure.net/api/events"),
new DefaultAzureCredential());
var events = new[]
{
new CloudEvent
{
Id = Guid.NewGuid().ToString(),
Source = "order-service",
Type = "Order.Created",
Subject = "orders/123",
Data = new OrderCreatedEvent
{
OrderId = Guid.NewGuid(),
CustomerId = customerId,
Total = total
},
Time = DateTimeOffset.UtcNow
}
};
await client.SendEventsAsync(events);AWS EventBridge
// AWS EventBridge publisher
var eventBridgeClient = new AmazonEventBridgeClient();
var request = new PutEventsRequest
{
Entries = new List<PutEventsRequestEntry>
{
new PutEventsRequestEntry
{
Source = "order.service",
DetailType = "OrderCreated",
Detail = JsonSerializer.Serialize(new OrderCreatedEvent
{
OrderId = orderId,
CustomerId = customerId,
Total = total
}),
Time = DateTime.UtcNow,
EventBusName = "production"
}
}
};
await eventBridgeClient.PutEventsAsync(request);Event Mesh Abstraction
// Abstraction over multiple event platforms
public interface IEventMesh
{
Task PublishAsync<TEvent>(TEvent @event, CancellationToken ct = default)
where TEvent : class;
Task SubscribeAsync<TEvent>(Func<TEvent, CancellationToken, Task> handler,
CancellationToken ct = default) where TEvent : class;
}
public class MultiBrokerEventMesh : IEventMesh
{
private readonly IBus _massTransitBus;
private readonly EventGridPublisherClient? _eventGrid;
private readonly AmazonEventBridgeClient? _eventBridge;
public async Task PublishAsync<TEvent>(TEvent @event, CancellationToken ct = default)
{
// Primary: MassTransit (RabbitMQ)
await _massTransitBus.Publish(@event, ct);
// Secondary: Event Grid (for external consumers)
if (_eventGrid != null)
{
var cloudEvent = new CloudEvent
{
Id = Guid.NewGuid().ToString(),
Source = typeof(TEvent).Name,
Type = typeof(TEvent).Name,
Data = @event,
Time = DateTimeOffset.UtcNow
};
await _eventGrid.SendEventsAsync(new[] { cloudEvent }, ct);
}
// Tertiary: EventBridge (for AWS consumers)
if (_eventBridge != null)
{
var request = new PutEventsRequest
{
Entries = new List<PutEventsRequestEntry>
{
new PutEventsRequestEntry
{
Source = typeof(TEvent).Name,
DetailType = typeof(TEvent).Name,
Detail = JsonSerializer.Serialize(@event),
Time = DateTime.UtcNow
}
}
};
await _eventBridge.PutEventsAsync(request, ct);
}
}
}Cross-Service Event Contracts
Governance Process
┌──────────┐ ┌──────────────┐ ┌──────────┐ ┌──────────┐
│ Service │ │ Event │ │ Schema │ │ CI/CD │
│ Team A │────▶│ Contract │────▶│ Registry │────▶│ Pipeline │
│ │ │ Definition │ │ │ │ │
└──────────┘ └──────────────┘ └──────────┘ └──────────┘
│ │ │
▼ ▼ ▼
Review Validate Deploy
(Team B) (Breaking?) (All services)Automated Validation
// Schema validation for event contracts
public class EventContractValidator
{
private readonly ISchemaRegistryClient _schemaRegistry;
private readonly ILogger<EventContractValidator> _logger;
public EventContractValidator(ISchemaRegistryClient schemaRegistry, ILogger<EventContractValidator> logger)
{
_schemaRegistry = schemaRegistry;
_logger = logger;
}
public async Task<ValidationResult> ValidateAsync<TEvent>(TEvent @event, string topic)
{
var schema = await _schemaRegistry.GetLatestSchemaAsync($"{topic}-value");
var serializer = new JsonSerializer<TEvent>();
var json = serializer.Serialize(@event);
// Validate against schema
var errors = new List<string>();
// 1. Required fields
var requiredFields = schema.Fields.Where(f => f.Required).Select(f => f.Name);
foreach (var field in requiredFields)
{
if (!json.Contains($"\"{field}\""))
{
errors.Add($"Missing required field: {field}");
}
}
// 2. Type validation
var typedFields = schema.Fields.Where(f => f.Type != null);
foreach (var field in typedFields)
{
if (json.Contains($"\"{field.Name}\"") && field.Type == "integer")
{
// Validate numeric type
}
}
return new ValidationResult
{
IsValid = errors.Count == 0,
Errors = errors,
SchemaVersion = schema.Version
};
}
}
public record ValidationResult(bool IsValid, IReadOnlyList<string> Errors, int SchemaVersion);
// CI pipeline integration
public class CiEventValidator
{
public async Task<bool> ValidateEventContractsAsync(string pullRequestPath)
{
var changedFiles = await GetChangedFilesAsync(pullRequestPath);
var eventFiles = changedFiles.Where(f => f.Contains("/events/"));
var results = new List<ValidationResult>();
foreach (var file in eventFiles)
{
var contract = await LoadEventContractAsync(file);
var result = await ValidateAsync(contract, Path.GetFileNameWithoutExtension(file));
results.Add(result);
}
return results.All(r => r.IsValid);
}
}Event Deprecation
// Mark event as deprecated
[Obsolete("Use IOrderCreatedV2 instead. Will be removed in v3.0.")]
public interface IOrderCreatedV1
{
Guid OrderId { get; }
decimal Total { get; }
}
// Version lifecycle
// V1: Active (2024-01-01 — 2024-06-01)
// V2: Active (2024-06-01 — present)
// V3: Beta (preview)
// Deprecation notice in consumer
public class OrderCreatedV1Consumer : IConsumer<IOrderCreatedV1>
{
public async Task Consume(ConsumeContext<IOrderCreatedV1> context)
{
Logger.LogWarning("Received deprecated event IOrderCreatedV1. " +
"Please migrate to IOrderCreatedV2.");
// Migrate to V2
var v2 = new OrderCreatedV2
{
OrderId = context.Message.OrderId,
Total = context.Message.Total,
Currency = "USD"
};
// Republish as V2
await context.Publish(v2);
}
}Zero Data Loss Configuration
Ack Strategies
// RabbitMQ: Publisher confirms + Consumer manual ack
// 1. Publisher confirms
await channel.ConfirmSelectAsync();
await channel.BasicPublishAsync(...);
await channel.WaitForConfirmsAsync();
// 2. Consumer manual ack
await channel.BasicConsumeAsync(queue, false, consumer); // autoAck=false
await channel.BasicAckAsync(deliveryTag, multiple: false);
// 3. Queue durability
await channel.QueueDeclareAsync(queue, durable: true, ...);
// 4. Message persistence
var props = new BasicProperties { DeliveryMode = DeliveryModes.Persistent };Replication Factor Tuning
# Kafka: production config
replication.factor: 3
min.insync.replicas: 2
acks: all
# Guarantees: message written to 3 replicas, at least 2 must confirm
# Survives: 1 broker failure without data loss
# RabbitMQ: Quorum Queues
x-queue-type: quorum
x-quorum-min-sync-factor: 2
# Raft-based consensus
# Survives: floor((n-1)/2) failuresEnd-to-End Data Loss Prevention
Producer Broker Consumer
│ │ │
│── Confirm Select ──────────────▶│ │
│── Publish (persistent) ────────▶│── Persist to disk ──────────▶│
│◀── Confirm ─────────────────────│ │
│ │ │── Process
│ │ │── Manual ACK ──────▶│
│ │◀── ACK ──────────────────────│// Complete zero-loss configuration
public class ZeroLossConfiguration
{
// Producer
public static ProducerConfig GetKafkaProducerConfig() => new()
{
BootstrapServers = "kafka://broker1:9092,kafka://broker2:9092,kafka://broker3:9092",
Acks = Acks.All,
Idempotence = true,
Retries = int.MaxValue,
RetryBackoffMs = 100,
MaxInFlightRequests = 5,
EnableIdempotence = true,
acks = "all",
min.insync.replicas = 2
};
// Consumer
public static ConsumerConfig GetKafkaConsumerConfig() => new()
{
BootstrapServers = "kafka://broker1:9092,kafka://broker2:9092,kafka://broker3:9092",
GroupId = "zero-loss-consumer",
EnableAutoCommit = false, // Manual commit
AutoOffsetReset = AutoOffsetReset.None, // Don't auto-reset
IsolationLevel = IsolationLevel.ReadCommitted,
EnablePartitionEof = true
};
}Checklist
- [x] Cross-datacenter event replication topology
- [x] Event contract governance process с automated validation
- [x] Zero data loss configuration с tuned ack/replication settings
Практика
Stream Processing
Windowing
Types of Windows
Time stream: |---t1---|---t2---|---t3---|---t4---|---t5---|---t6---|
Tumbling (1min, no overlap):
|---------|---------|---------|
t1-t2 t3-t4 t5-t6
Hopping (1min, 30s slide):
|---------|
|---------|
|---------|
Sliding (based on event time, 1min, 30s slide):
|---------|
|---------|
|---------|
Session (gap=30s):
|----| |----| |--------|
session1 session2 session3Tumbling Window
// MassTransit with windowed aggregation
public class TumblingWindowAggregator : IConsumer<IOrderEvent>
{
private readonly ConcurrentDictionary<string, WindowState> _windows = new();
private readonly ILogger<TumblingWindowAggregator> _logger;
private readonly TimeSpan _windowSize = TimeSpan.FromMinutes(1);
private readonly Timer _timer;
public TumblingWindowAggregator(ILogger<TumblingWindowAggregator> logger)
{
_logger = logger;
_timer = new Timer(ProcessExpiredWindows, null, _windowSize, _windowSize);
}
public async Task Consume(ConsumeContext<IOrderEvent> context)
{
var windowKey = GetWindowKey(DateTimeOffset.UtcNow, _windowSize);
var window = _windows.GetOrAdd(windowKey, _ => new WindowState { Key = windowKey });
lock (window)
{
switch (context.Message)
{
case IOrderCreated created:
window.OrderCount++;
window.TotalAmount += created.Total;
window.Orders.Add(created);
break;
case IOrderCompleted completed:
window.CompletedCount++;
break;
}
}
_logger.LogDebug("Window {WindowKey}: {Count} orders", windowKey, window.OrderCount);
}
private void ProcessExpiredWindows(object? state)
{
var now = DateTimeOffset.UtcNow;
var expiredKeys = _windows
.Where(kvp => kvp.Value.CreatedAt < now.AddTicks(-_windowSize.Ticks))
.Select(kvp => kvp.Key)
.ToList();
foreach (var key in expiredKeys)
{
if (_windows.TryRemove(key, out var window))
{
ProcessWindowAsync(window).GetAwaiter().GetResult();
}
}
}
private async Task ProcessWindowAsync(WindowState window)
{
var avgOrderValue = window.OrderCount > 0
? window.TotalAmount / window.OrderCount
: 0;
var analytics = new WindowAnalytics
{
WindowStart = window.CreatedAt,
WindowEnd = window.CreatedAt.Add(_windowSize),
OrderCount = window.OrderCount,
TotalAmount = window.TotalAmount,
AverageOrderValue = avgOrderValue,
CompletedCount = window.CompletedCount,
CompletionRate = window.OrderCount > 0
? (double)window.CompletedCount / window.OrderCount
: 0
};
// Publish window result
await _bus.Publish(analytics);
_logger.LogInformation("Window {Start}-{End}: {Count} orders, ${Total}",
analytics.WindowStart, analytics.WindowEnd, analytics.OrderCount, analytics.TotalAmount);
}
private string GetWindowKey(DateTimeOffset time, TimeSpan windowSize)
{
var ticks = time.Ticks / windowSize.Ticks;
return new DateTimeOffset(ticks * windowSize.Ticks).ToString("yyyy-MM-dd-HH-mm");
}
}
public class WindowState
{
public string Key { get; set; } = null!;
public DateTimeOffset CreatedAt { get; set; } = DateTimeOffset.UtcNow;
public int OrderCount { get; set; }
public decimal TotalAmount { get; set; }
public int CompletedCount { get; set; }
public List<IOrderCreated> Orders { get; set; } = new();
}
public record WindowAnalytics(
DateTimeOffset WindowStart,
DateTimeOffset WindowEnd,
int OrderCount,
decimal TotalAmount,
double AverageOrderValue,
int CompletedCount,
double CompletionRate);Hopping Window
public class HoppingWindowAggregator
{
private readonly ConcurrentDictionary<string, WindowState> _windows = new();
private readonly TimeSpan _windowSize = TimeSpan.FromMinutes(5);
private readonly TimeSpan _slide = TimeSpan.FromMinutes(1);
public void Consume(IOrderEvent @event)
{
// Calculate all windows this event belongs to
var affectedWindows = GetHoppingWindows(@event.Timestamp, _windowSize, _slide);
foreach (var windowKey in affectedWindows)
{
var window = _windows.GetOrAdd(windowKey, _ => new WindowState { Key = windowKey });
lock (window)
{
window.OrderCount++;
if (@event is IOrderCreated created)
{
window.TotalAmount += created.Total;
}
}
}
}
private IEnumerable<string> GetHoppingWindows(DateTimeOffset eventTime, TimeSpan windowSize, TimeSpan slide)
{
var windows = new List<string>();
var windowStart = new DateTimeOffset(eventTime.Ticks / slide.Ticks * slide.Ticks);
// Event belongs to windows starting from (eventTime - windowSize) to eventTime
var earliestStart = windowStart.AddTicks(-(windowSize.Ticks - slide.Ticks));
for (var start = earliestStart; start <= windowStart; start = start.Add(slide))
{
windows.Add(start.ToString("o"));
}
return windows;
}
}Aggregation
Stateful Processing
public class StatefulAggregator
{
private readonly ConcurrentDictionary<Guid, CustomerState> _states = new();
private readonly ILogger<StatefulAggregator> _logger;
public StatefulAggregator(ILogger<StatefulAggregator> logger)
{
_logger = logger;
}
public async Task<AggregationResult> AggregateAsync(IOrderEvent @event)
{
var customerId = GetCustomerId(@event);
var state = _states.AddOrUpdate(
customerId,
_ => new CustomerState { CustomerId = customerId },
(_, existing) => existing);
lock (state)
{
switch (@event)
{
case IOrderCreated created:
state.OrderCount++;
state.TotalSpent += created.Total;
state.LastOrderDate = created.Timestamp;
state.OrderIds.Add(created.OrderId);
break;
case IOrderCompleted completed:
state.CompletedCount++;
break;
case IOrderCancelled cancelled:
state.CancelledCount++;
break;
}
return new AggregationResult
{
CustomerId = customerId,
OrderCount = state.OrderCount,
TotalSpent = state.TotalSpent,
CompletedCount = state.CompletedCount,
CancelledCount = state.CancelledCount,
LastOrderDate = state.LastOrderDate,
AverageOrderValue = state.OrderCount > 0
? state.TotalSpent / state.OrderCount
: 0
};
}
}
private Guid GetCustomerId(IOrderEvent @event) => (@event as IOrderCreated)?.CustomerId ?? Guid.Empty;
}
public class CustomerState
{
public Guid CustomerId { get; set; }
public int OrderCount { get; set; }
public decimal TotalSpent { get; set; }
public int CompletedCount { get; set; }
public int CancelledCount { get; set; }
public DateTimeOffset? LastOrderDate { get; set; }
public List<Guid> OrderIds { get; set; } = new();
}
public record AggregationResult(
Guid CustomerId,
int OrderCount,
decimal TotalSpent,
int CompletedCount,
int CancelledCount,
DateTimeOffset? LastOrderDate,
double AverageOrderValue);Watermark Handling
// Watermark: tracks event time progress
// Handles late events by waiting before finalizing results
public class WatermarkManager
{
private readonly ConcurrentDictionary<string, DateTimeOffset> _watermarks = new();
private readonly TimeSpan _allowedLateness = TimeSpan.FromMinutes(10);
private readonly ILogger<WatermarkManager> _logger;
public WatermarkManager(ILogger<WatermarkManager> logger)
{
_logger = logger;
}
public DateTimeOffset GetWatermark(string windowKey)
{
return _watermarks.GetOrAdd(windowKey, _ => DateTimeOffset.UtcNow);
}
public void UpdateWatermark(string windowKey, DateTimeOffset eventTime)
{
_watermarks.AddOrUpdate(
windowKey,
eventTime,
(_, current) => Math.Max(current, eventTime));
}
public bool IsEventLate(string windowKey, DateTimeOffset eventTime)
{
var watermark = GetWatermark(windowKey);
var isLate = eventTime < watermark.AddTicks(-_allowedLateness.Ticks);
if (isLate)
{
_logger.LogWarning("Late event detected: eventTime={EventTime}, watermark={Watermark}, allowedLateness={Lateness}",
eventTime, watermark, _allowedLateness);
}
return isLate;
}
public bool IsWindowFinalized(string windowKey, DateTimeOffset windowEnd)
{
var watermark = GetWatermark(windowKey);
return watermark > windowEnd.AddTicks(_allowedLateness.Ticks);
}
}Join Patterns
Stream-Table Join
// Enrich order events with customer reference data
public class StreamTableJoinEnricher
{
private readonly ConcurrentDictionary<Guid, CustomerReference> _customers = new();
private readonly ILogger<StreamTableJoinEnricher> _logger;
public StreamTableJoinEnricher(ILogger<StreamTableJoinEnricher> logger)
{
_logger = logger;
}
// Load reference data (from database, cache, etc.)
public async Task LoadReferenceDataAsync()
{
var customers = await _customerRepository.GetAllAsync();
foreach (var customer in customers)
{
_customers[customer.Id] = new CustomerReference
{
Id = customer.Id,
Name = customer.Name,
Tier = customer.Tier,
Country = customer.Country,
DiscountRate = customer.DiscountRate
};
}
_logger.LogInformation("Loaded {Count} customer references", _customers.Count);
}
// Join stream event with table lookup
public async Task<EnrichedOrderEvent?> EnrichAsync(IOrderCreated @event)
{
if (!_customers.TryGetValue(@event.CustomerId, out var customer))
{
_logger.LogWarning("Customer {CustomerId} not found in reference data", @event.CustomerId);
return null;
}
var discount = @event.Total * customer.DiscountRate;
return new EnrichedOrderEvent
{
OrderId = @event.OrderId,
CustomerId = @event.CustomerId,
CustomerName = customer.Name,
CustomerTier = customer.Tier,
CustomerCountry = customer.Country,
Total = @event.Total,
Discount = discount,
NetAmount = @event.Total - discount,
Currency = @event.Currency,
Timestamp = @event.Timestamp
};
}
}
public record CustomerReference(
Guid Id,
string Name,
string Tier,
string Country,
decimal DiscountRate);
public record EnrichedOrderEvent(
Guid OrderId,
Guid CustomerId,
string CustomerName,
string CustomerTier,
string CustomerCountry,
decimal Total,
decimal Discount,
decimal NetAmount,
string Currency,
DateTimeOffset Timestamp);Stream-Stream Join
// Join OrderCreated with OrderShipped events
public class StreamStreamJoiner
{
private readonly ConcurrentDictionary<Guid, OrderCreatedState> _pendingOrders = new();
private readonly ILogger<StreamStreamJoiner> _logger;
public StreamStreamJoiner(ILogger<StreamStreamJoiner> logger)
{
_logger = logger;
}
public async Task<OrderJourney?> OnOrderCreated(IOrderCreated @event)
{
var state = new OrderCreatedState { Event = @event, CreatedAt = DateTimeOffset.UtcNow };
_pendingOrders[@event.OrderId] = state;
return null;
}
public async Task<OrderJourney?> OnOrderShipped(IOrderShipped @event)
{
if (!_pendingOrders.TryRemove(@event.OrderId, out var state))
{
_logger.LogWarning("Order {OrderId} shipped but no created event found", @event.OrderId);
return null;
}
var journey = new OrderJourney
{
OrderId = @event.OrderId,
CreatedAt = state.Event.Timestamp,
ShippedAt = @event.ShippedAt,
TrackingNumber = @event.TrackingNumber,
Duration = @event.ShippedAt - state.Event.Timestamp
};
_logger.LogInformation("Order {OrderId}: created={Created} shipped={Shipped} (duration={Duration})",
journey.OrderId, journey.CreatedAt, journey.ShippedAt, journey.Duration);
return journey;
}
}
public class OrderCreatedState
{
public IOrderCreated Event { get; set; } = null!;
public DateTimeOffset CreatedAt { get; set; }
}
public record OrderJourney(
Guid OrderId,
DateTimeOffset CreatedAt,
DateTimeOffset ShippedAt,
string TrackingNumber,
TimeSpan Duration);Late Events
Watermark Strategy
// Watermark-based late event handling
public class LateEventStrategy
{
private readonly WatermarkManager _watermarkManager;
private readonly ILogger<LateEventStrategy> _logger;
public LateEventStrategy(WatermarkManager watermarkManager, ILogger<LateEventStrategy> logger)
{
_watermarkManager = watermarkManager;
_logger = logger;
}
public async Task<EventOutcome> ProcessWithWatermark(string windowKey, IOrderEvent @event)
{
var eventTime = GetEventTime(@event);
var windowEnd = GetWindowEnd(windowKey);
_watermarkManager.UpdateWatermark(windowKey, eventTime);
if (_watermarkManager.IsEventLate(windowKey, eventTime))
{
// Late event: send to DLQ or separate late-events topic
_logger.LogWarning("Late event: {EventType} at {EventTime}",
@event.GetType().Name, eventTime);
return new EventOutcome { IsLate = true, Event = @event };
}
// On-time event: process normally
await ProcessEventAsync(@event);
return new EventOutcome { IsLate = false, Event = @event };
}
private DateTimeOffset GetEventTime(IOrderEvent @event) => (@event as IOrderCreated)?.Timestamp ?? DateTimeOffset.UtcNow;
private DateTimeOffset GetWindowEnd(string windowKey) => DateTimeOffset.Parse(windowKey);
private Task ProcessEventAsync(IOrderEvent @event) => Task.CompletedTask;
}
public record EventOutcome(bool IsLate, IOrderEvent Event);Event Time vs Processing Time
Event Time: Когда событие произошло (из timestamp события)
Processing Time: Когда событие обрабатывается (текущее время)
Пример:
10:00 — событие создано (event time)
10:05 — событие доставлено (processing time, 5 min delay)
10:05 — window for 10:00-10:01 (event time)
При event time windowing: window закрывается в 10:01 + watermark delay
При processing time windowing: window закрывается в 10:01 (реальное время)State Backend
Options
| Backend | Use Case | Durability | Scale |
|---|---|---|---|
| In-memory | Testing, stateless | No | Limited |
| Filesystem | Development | Yes | Small |
| RocksDB | Production, large state | Yes | Large |
| External (Redis) | Shared state, cross-instance | Yes | Medium |
State Backend Configuration
// In-memory state (testing)
public class InMemoryStateBackend : IStateBackend
{
private readonly ConcurrentDictionary<string, byte[]> _state = new();
public Task<byte[]> GetAsync(string key) =>
Task.FromResult(_state.GetValueOrDefault(key, Array.Empty<byte>()));
public Task SetAsync(string key, byte[] value)
{
_state[key] = value;
return Task.CompletedTask;
}
public Task DeleteAsync(string key)
{
_state.TryRemove(key, out _);
return Task.CompletedTask;
}
}
// Redis state (production)
public class RedisStateBackend : IStateBackend
{
private readonly IDatabase _redis;
private readonly TimeSpan _ttl = TimeSpan.FromHours(24);
public RedisStateBackend(IDatabase redis)
{
_redis = redis;
}
public async Task<byte[]> GetAsync(string key)
{
var data = await _redis.StringGetAsync(key);
return data.HasValue ? data.ToArray() : Array.Empty<byte>();
}
public async Task SetAsync(string key, byte[] value)
{
await _redis.StringSetAsync(key, value, _ttl);
}
public async Task DeleteAsync(string key)
{
await _redis.KeyDeleteAsync(key);
}
}Практика
// Real-time analytics dashboard
public class RealTimeAnalyticsProcessor
{
private readonly ConcurrentDictionary<string, WindowState> _windows = new();
private readonly WatermarkManager _watermarkManager;
private readonly IBus _bus;
private readonly ILogger<RealTimeAnalyticsProcessor> _logger;
private readonly TimeSpan _windowSize = TimeSpan.FromMinutes(5);
private readonly Timer _cleanupTimer;
public RealTimeAnalyticsProcessor(
WatermarkManager watermarkManager,
IBus bus,
ILogger<RealTimeAnalyticsProcessor> logger)
{
_watermarkManager = watermarkManager;
_bus = bus;
_logger = logger;
_cleanupTimer = new Timer(CleanupExpiredWindows, null, _windowSize, _windowSize);
}
public async Task ProcessOrderAsync(IOrderCreated @event)
{
var windowKey = GetWindowKey(@event.Timestamp);
var window = _windows.GetOrAdd(windowKey, _ => new WindowState { Key = windowKey });
lock (window)
{
window.OrderCount++;
window.TotalAmount += @event.Total;
window.UniqueCustomers.Add(@event.CustomerId);
window.Categories.AddRange(@event.Items.Select(i => i.Category));
}
_watermarkManager.UpdateWatermark(windowKey, @event.Timestamp);
}
private void CleanupExpiredWindows(object? state)
{
var now = DateTimeOffset.UtcNow;
var expired = _windows
.Where(kvp => kvp.Value.CreatedAt < now.AddTicks(-_windowSize.Ticks) &&
_watermarkManager.IsWindowFinalized(kvp.Key, kvp.Value.CreatedAt.Add(_windowSize)))
.ToList();
foreach (var kvp in expired)
{
if (_windows.TryRemove(kvp.Key, out var window))
{
PublishWindowResult(window);
}
}
}
private async Task PublishWindowResult(WindowState window)
{
var analytics = new RealTimeAnalytics
{
WindowStart = window.CreatedAt,
WindowEnd = window.CreatedAt.Add(_windowSize),
OrderCount = window.OrderCount,
TotalRevenue = window.TotalAmount,
UniqueCustomers = window.UniqueCustomers.Count,
AvgOrderValue = window.OrderCount > 0 ? window.TotalAmount / window.OrderCount : 0,
TopCategories = window.Categories
.GroupBy(c => c)
.OrderByDescending(g => g.Count())
.Take(5)
.Select(g => new CategoryStats { Category = g.Key, Count = g.Count() })
.ToList()
};
await _bus.Publish(analytics);
_logger.LogInformation("Published analytics: {Count} orders, ${Revenue}",
analytics.OrderCount, analytics.TotalRevenue);
}
private string GetWindowKey(DateTimeOffset time)
{
var ticks = time.Ticks / _windowSize.Ticks * _windowSize.Ticks;
return new DateTimeOffset(ticks).ToString("o");
}
}
public class WindowState
{
public string Key { get; set; } = null!;
public DateTimeOffset CreatedAt { get; set; } = DateTimeOffset.UtcNow;
public int OrderCount { get; set; }
public decimal TotalAmount { get; set; }
public HashSet<Guid> UniqueCustomers { get; set; } = new();
public List<string> Categories { get; set; } = new();
}
public record RealTimeAnalytics(
DateTimeOffset WindowStart,
DateTimeOffset WindowEnd,
int OrderCount,
decimal TotalRevenue,
int UniqueCustomers,
double AvgOrderValue,
IReadOnlyList<CategoryStats> TopCategories);
public record CategoryStats(string Category, int Count);Checklist
- [x] Windowed aggregation для real-time analytics dashboard (tumbling windows)
- [x] Stream-table join для enrichment с reference data
- [x] Watermark strategy для handling late events
Monitoring и Operations
Consumer Lag Monitoring
Что такое Consumer Lag
Consumer Lag = (Latest Offset in Topic) - (Current Consumer Offset)
Пример:
Topic "orders":
Latest offset: 100,000
Consumer group "processors":
Consumer A: offset 40,000 (partition 0)
Consumer B: offset 35,000 (partition 1)
Consumer C: offset 25,000 (partition 2)
Lag per partition:
Partition 0: 100,000 - 40,000 = 60,000
Partition 1: 100,000 - 35,000 = 65,000
Partition 2: 100,000 - 25,000 = 75,000
Total lag: 200,000 messagesDetection и Alerting
public class ConsumerLagMonitor : BackgroundService
{
private readonly IConsumer<string, string> _consumer;
private readonly ILogger<ConsumerLagMonitor> _logger;
private readonly IScalingService _scalingService;
private readonly TimeSpan _monitoringInterval = TimeSpan.FromMinutes(1);
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
while (!stoppingToken.IsCancellationRequested)
{
try
{
var lagData = await CollectLagAsync(stoppingToken);
foreach (var lag in lagData)
{
_logger.LogInformation(
"Consumer lag: group={Group} topic={Topic} partition={Partition} lag={Lag}",
lag.Group, lag.Topic, lag.Partition, lag.Lag);
// Alert on high lag
if (lag.Lag > 10000)
{
_logger.LogWarning("HIGH LAG ALERT: {Group}/{Topic}/{Partition} = {Lag}",
lag.Group, lag.Topic, lag.Partition, lag.Lag);
await _scalingService.TriggerScalingAsync(lag.Group);
}
}
}
catch (Exception ex)
{
_logger.LogError(ex, "Error collecting consumer lag");
}
await Task.Delay(_monitoringInterval, stoppingToken);
}
}
private async Task<IReadOnlyList<ConsumerLag>> CollectLagAsync(CancellationToken ct)
{
var lags = new List<ConsumerLag>();
// Kafka: use admin client to get offsets
var adminClient = new AdminClientBuilder().Build();
var topics = await adminClient.GetMetadataAsync(TimeSpan.FromSeconds(10));
foreach (var topic in topics.Topics)
{
if (topic.HasError) continue;
// Get latest offsets
var endOffsets = await adminClient.GetEndOffsetsAsync(
topic.Partitions.Select(p => new TopicPartition(topic.Topic, p.PartitionId)),
TimeSpan.FromSeconds(5));
// Get committed offsets for each consumer group
var consumerGroups = await GetConsumerGroupsAsync(adminClient, topic.Topic);
foreach (var group in consumerGroups)
{
foreach (var partition in topic.Partitions)
{
var topicPartition = new TopicPartition(topic.Topic, partition.PartitionId);
var committed = await adminClient.GetCommittedOffsetsAsync(
new[] { new TopicPartitionOffset(topicPartition, Offset.Unset) },
TimeSpan.FromSeconds(5));
var latest = endOffsets[topicPartition];
var committedOffset = committed.FirstOrDefault(tpo => tpo.TopicPartition == topicPartition);
var lag = latest.Offset - (committedOffset?.Offset ?? Offset.Unset);
if (lag > Offset.Unset)
{
lags.Add(new ConsumerLag
{
Group = group,
Topic = topic.Topic,
Partition = partition.PartitionId,
Lag = lag,
Timestamp = DateTimeOffset.UtcNow
});
}
}
}
}
return lags;
}
private Task<IReadOnlyList<string>> GetConsumerGroupsAsync(AdminClientBuilder adminClient, string topic) =>
Task.FromResult<IReadOnlyList<string>>(new List<string> { "order-processors", "analytics" });
}
public record ConsumerLag(
string Group,
string Topic,
int Partition,
long Lag,
DateTimeOffset Timestamp);Message Throughput Metrics
Producer/Consumer Rates
public class MessagingMetrics
{
private readonly ILogger<MessagingMetrics> _logger;
private readonly Histogram _messageLatency;
private readonly Counter _messagesSent;
private readonly Counter _messagesReceived;
private readonly Counter _messagesFailed;
private readonly Counter _messagesDlq;
public MessagingMetrics(ILogger<MessagingMetrics> logger, IMeterFactory meterFactory)
{
_logger = logger;
var meter = meterFactory.Create("messaging");
_messageLatency = meter.CreateHistogram<double>("message.latency.ms",
unit: "ms", description: "Message processing latency");
_messagesSent = meter.CreateCounter<long>("messages.sent.total",
description: "Total messages sent");
_messagesReceived = meter.CreateCounter<long>("messages.received.total",
description: "Total messages received");
_messagesFailed = meter.CreateCounter<long>("messages.failed.total",
description: "Total messages failed");
_messagesDlq = meter.CreateCounter<long>("messages.dead-lettered.total",
description: "Total messages sent to DLQ");
}
public void RecordMessageSent(double latencyMs)
{
_messagesSent.Add(1);
_messageLatency.Record(latencyMs);
}
public void RecordMessageReceived(double latencyMs)
{
_messagesReceived.Add(1);
_messageLatency.Record(latencyMs);
}
public void RecordMessageFailed() => _messagesFailed.Add(1);
public void RecordMessageDlq() => _messagesDlq.Add(1);
}
// Prometheus metrics export
// POST /metrics
// messaging_messages_sent_total 150000
// messaging_messages_received_total 148500
// messaging_messages_failed_total 1500
// messaging_messages_dead_lettered_total 350
// messaging_message_latency_ms_sum 45000.5
// messaging_message_latency_ms_count 148500
// messaging_message_latency_ms_bucket{le="10"} 120000
// messaging_message_latency_ms_bucket{le="50"} 145000
// messaging_message_latency_ms_bucket{le="100"} 148000Queue Depth Alerting
Unbounded Growth Prevention
public class QueueDepthMonitor : BackgroundService
{
private readonly IConnection _connection;
private readonly ILogger<QueueDepthMonitor> _logger;
private readonly IAlertService _alertService;
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
while (!stoppingToken.IsCancellationRequested)
{
try
{
var queueStats = await GetQueueStatsAsync(stoppingToken);
foreach (var queue in queueStats)
{
_logger.LogInformation(
"Queue depth: {Queue} messages={Depth} ready={Ready} unack={Unacked}",
queue.Name, queue.Total, queue.Ready, queue.Unacked);
// Alert thresholds
if (queue.Total > 100000)
{
await _alertService.SendAlertAsync(new QueueAlert
{
Queue = queue.Name,
Type = AlertType.CriticalDepth,
Depth = queue.Total,
Message = $"Queue {queue.Name} has {queue.Total} messages"
});
}
else if (queue.Total > 50000)
{
await _alertService.SendAlertAsync(new QueueAlert
{
Queue = queue.Name,
Type = AlertType.WarningDepth,
Depth = queue.Total,
Message = $"Queue {queue.Name} approaching capacity: {queue.Total} messages"
});
}
// Unacked messages alert
if (queue.Unacked > 1000)
{
await _alertService.SendAlertAsync(new QueueAlert
{
Queue = queue.Name,
Type = AlertType.HighUnacked,
Depth = queue.Unacked,
Message = $"Queue {queue.Name} has {queue.Unacked} unacked messages"
});
}
}
}
catch (Exception ex)
{
_logger.LogError(ex, "Error monitoring queue depths");
}
await Task.Delay(TimeSpan.FromSeconds(30), stoppingToken);
}
}
private async Task<IReadOnlyList<QueueStats>> GetQueueStatsAsync(CancellationToken ct)
{
// RabbitMQ Management API
using var http = new HttpClient();
var response = await http.GetStringAsync("http://localhost:15672/api/queues", ct);
// Parse JSON and return stats
return JsonSerializer.Deserialize<List<QueueStats>>() ?? new List<QueueStats>();
}
}
public record QueueStats(
string Name,
long Total, // Ready + Unacked
long Ready, // Messages waiting to be delivered
long Unacked); // Delivered but not acknowledged
public record QueueAlert(
string Queue,
AlertType Type,
long Depth,
string Message);
public enum AlertType
{
WarningDepth,
CriticalDepth,
HighUnacked,
DlqGrowth
}Dead Letter Queue Analysis
Identifying Systematic Failures
public class DlqAnalyzer
{
private readonly AppDbContext _context;
private readonly ILogger<DlqAnalyzer> _logger;
public DlqAnalyzer(AppDbContext context, ILogger<DlqAnalyzer> logger)
{
_context = context;
_logger = logger;
}
public async Task<DlqAnalysis> AnalyzeAsync(TimeSpan lookback = default!)
{
lookback = lookback == default ? TimeSpan.FromHours(24) : lookback;
var dlqMessages = await _context.DeadLetterMessages
.Where(m => m.CreatedAt > DateTimeOffset.UtcNow.AddTicks(-lookback.Ticks))
.ToListAsync();
// Pattern analysis
var errorPatterns = dlqMessages
.GroupBy(m => m.ErrorType)
.Select(g => new ErrorPattern
{
ErrorType = g.Key,
Count = g.Count(),
Percentage = (double)g.Count() / dlqMessages.Count * 100,
FirstOccurrence = g.Min(m => m.CreatedAt),
LastOccurrence = g.Max(m => m.CreatedAt),
SampleMessageId = g.First().MessageId
})
.OrderByDescending(p => p.Count)
.ToList();
// Time series
var hourlyDistribution = dlqMessages
.GroupBy(m => m.CreatedAt.Date.AddHours(m.CreatedAt.Hour))
.Select(g => new HourlyDistribution
{
Hour = g.Key,
Count = g.Count()
})
.OrderBy(d => d.Hour)
.ToList();
// Systematic failure detection
var isSystematic = errorPatterns.Any(p => p.Percentage > 50);
return new DlqAnalysis
{
TotalMessages = dlqMessages.Count,
ErrorPatterns = errorPatterns,
HourlyDistribution = hourlyDistribution,
IsSystematicFailure = isSystematic,
RecommendedAction = GetRecommendedAction(errorPatterns, isSystematic)
};
}
private string GetRecommendedAction(IReadOnlyList<ErrorPattern> patterns, bool isSystematic)
{
if (!isSystematic) return "No systematic failure pattern detected";
var topPattern = patterns.First();
return topPattern.ErrorType switch
{
"TimeoutException" => "Check downstream service health and increase timeout",
"InvalidOperationException" => "Review message processing logic",
"DbUpdateException" => "Check database connectivity and constraints",
_ => $"Investigate {topPattern.ErrorType} — {topPattern.Count} occurrences"
};
}
}
public record DlqAnalysis(
int TotalMessages,
IReadOnlyList<ErrorPattern> ErrorPatterns,
IReadOnlyList<HourlyDistribution> HourlyDistribution,
bool IsSystematicFailure,
string RecommendedAction);
public record ErrorPattern(
string ErrorType,
int Count,
double Percentage,
DateTimeOffset FirstOccurrence,
DateTimeOffset LastOccurrence,
string SampleMessageId);
public record HourlyDistribution(
DateTime Hour,
int Count);Message Tracing
Correlation ID Propagation
Request Flow:
API Gateway ──▶ Order Service ──▶ Payment Service ──▶ Notification Service
│ │ │ │
│ correlationId: abc123 │ │
│ traceId: xyz789 │ traceId: xyz789 │ traceId: xyz789
│ spanId: span1 │ spanId: span2 │ spanId: span3
│ │ │
▼ ▼ ▼
All messages carry correlationId + traceId across servicesImplementation
// Correlation ID middleware
public class CorrelationIdMiddleware
{
private readonly RequestDelegate _next;
private const string CorrelationIdHeader = "X-Correlation-ID";
private const string TraceIdHeader = "X-Trace-ID";
public CorrelationIdMiddleware(RequestDelegate next)
{
_next = next;
}
public async Task InvokeAsync(HttpContext context, ILogger<CorrelationIdMiddleware> logger)
{
// Extract or generate correlation ID
var correlationId = context.Request.Headers[CorrelationIdHeader].FirstOrDefault()
?? Guid.NewGuid().ToString();
var traceId = context.Request.Headers[TraceIdHeader].FirstOrDefault()
?? Guid.NewGuid().ToString();
// Set in current activity
Activity.Current?.SetTag("correlation.id", correlationId);
Activity.Current?.SetTag("trace.id", traceId);
// Add to response headers
context.Response.Headers[CorrelationIdHeader] = correlationId;
context.Response.Headers[TraceIdHeader] = traceId;
// Set in ambient context
CorrelationContext.Current = new CorrelationContext
{
CorrelationId = correlationId,
TraceId = traceId,
Timestamp = DateTimeOffset.UtcNow
};
await _next(context);
}
}
// MassTransit pipeline behavior for correlation ID propagation
public class CorrelationIdBehavior : ISendFilter, IPublishFilter, IConsumerFilter
{
private readonly ILogger<CorrelationIdBehavior> _logger;
public CorrelationIdBehavior(ILogger<CorrelationIdBehavior> logger)
{
_logger = logger;
}
public Task Send<T>(SendContext<T> context, ISendPipe next) where T : class
{
ApplyCorrelationId(context);
return next.Send(context);
}
public Task Publish<T>(PublishContext<T> context, IPublishPipe next) where T : class
{
ApplyCorrelationId(context);
return next.Publish(context);
}
public Task Consume<T>(ConsumeContext<T> context, IConsumerPipe next) where T : class
{
ApplyCorrelationId(context);
return next.Consume(context);
}
private void ApplyCorrelationId<T>(BaseContext<T> context) where T : class
{
var correlationId = CorrelationContext.Current?.CorrelationId ?? Guid.NewGuid().ToString();
var traceId = CorrelationContext.Current?.TraceId ?? Guid.NewGuid().ToString();
context.Headers.Set("correlation-id", correlationId);
context.Headers.Set("trace-id", traceId);
_logger.LogDebug("Correlation: {CorrelationId} Trace: {TraceId}", correlationId, traceId);
}
}
// Registration
x.UsingRabbitMq((context, cfg) =>
{
cfg.Host("rabbitmq://localhost");
cfg.UseSendFilter(typeof(CorrelationIdBehavior), context);
cfg.UsePublishFilter(typeof(CorrelationIdBehavior), context);
cfg.UseConsumeFilter(typeof(CorrelationIdBehavior), context);
cfg.ConfigureEndpoints(context);
});
// Distributed tracing with OpenTelemetry
builder.Services.AddOpenTelemetry()
.WithTracing(tracing =>
{
tracing.AddSource("MassTransit")
.AddAspNetCoreInstrumentation()
.AddHttpClientInstrumentation()
.AddOtlpExporter(OTLPExporterType.Zipkin); // или Jaeger
});Trace Correlation Query
-- Find all messages in a trace
SELECT * FROM message_logs
WHERE correlation_id = 'abc123'
ORDER BY timestamp;
-- Result:
-- | correlation_id | trace_id | service | event_type | timestamp |
-- | abc123 | xyz789 | api-gateway | RequestReceived | 2024-01-15 10:00:00 |
-- | abc123 | xyz789 | order-service | OrderCreated | 2024-01-15 10:00:01 |
-- | abc123 | xyz789 | payment-service | PaymentCompleted | 2024-01-15 10:00:03 |
-- | abc123 | xyz789 | notification-svc | EmailSent | 2024-01-15 10:00:05 |Dashboard и Alerting
Consumer Lag Dashboard
# Grafana dashboard JSON (simplified)
dashboard:
title: "Messaging Platform Overview"
panels:
- title: "Consumer Lag by Group"
type: graph
targets:
- query: "kafka_consumer_lag{group='order-processors'}"
- query: "kafka_consumer_lag{group='analytics'}"
thresholds:
- value: 1000
color: yellow
- value: 5000
color: red
- title: "Message Throughput"
type: graph
targets:
- query: "rate(messaging_messages_sent_total[5m])"
- query: "rate(messaging_messages_received_total[5m])"
- title: "Queue Depth"
type: graph
targets:
- query: "rabbitmq_queue_messages{queue='orders'}"
- query: "rabbitmq_queue_messages{queue='notifications'}"
- title: "Error Rate"
type: stat
targets:
- query: "rate(messaging_messages_failed_total[5m])"
- title: "DLQ Messages"
type: stat
targets:
- query: "messaging_messages_dead_lettered_total"Automatic Scaling Trigger
public class ScalingService
{
private readonly IScalingClient _scalingClient;
private readonly ILogger<ScalingService> _logger;
public async Task TriggerScalingAsync(string consumerGroup)
{
_logger.LogInformation("Triggering scaling for consumer group: {Group}", consumerGroup);
// Get current lag
var lag = await GetConsumerLagAsync(consumerGroup);
// Calculate desired instance count
var messagesPerInstance = 5000; // Target messages per instance
var desiredInstances = (int)Math.Ceiling((double)lag / messagesPerInstance);
desiredInstances = Math.Max(desiredInstances, 2); // Minimum 2 instances
desiredInstances = Math.Min(desiredInstances, 50); // Maximum 50 instances
// Scale
await _scalingClient.SetInstanceCountAsync(consumerGroup, desiredInstances);
_logger.LogInformation(
"Scaled {Group} from {Current} to {Desired} instances (lag: {Lag})",
consumerGroup, await GetCurrentInstanceCount(consumerGroup), desiredInstances, lag);
}
private Task<long> GetConsumerLagAsync(string group) => Task.FromResult(0L);
private Task<int> GetCurrentInstanceCount(string group) => Task.FromResult(1);
}Operational Runbook
Common Messaging Failure Scenarios
## Runbook: Message Processing Failures
### Scenario 1: Consumer Lag Growing
**Symptoms:**
- Consumer lag > 10,000 messages
- Dashboard shows yellow/red threshold
**Diagnosis:**
1. Check consumer logs for errors
2. Verify downstream dependencies (DB, APIs)
3. Check CPU/memory on consumer hosts
**Actions:**
1. Scale consumers horizontally
2. Increase prefetch count temporarily
3. Check for slow queries or external API timeouts
**Escalation:** If lag > 100,000 after scaling → Page on-call
### Scenario 2: Messages Stuck in DLQ
**Symptoms:**
- DLQ depth increasing
- Error pattern analysis shows systematic failures
**Diagnosis:**
1. Run DLQ analyzer for error patterns
2. Check sample messages for data issues
3. Review error logs for root cause
**Actions:**
1. Fix root cause (code fix, config change)
2. Replay healthy messages from DLQ
3. Purge corrupted messages
**Replay Command:**rabbitmqadmin publish routing_key=orders payload_file=replay_messages.json
### Scenario 3: Connection Loss to Broker
**Symptoms:**
- Producer: "Connection closed" errors
- Consumer: "Consumer closed channel" errors
**Diagnosis:**
1. Check network connectivity
2. Verify broker health (systemd status)
3. Check broker logs
**Actions:**
1. Restart broker if needed
2. Verify publisher confirms are re-enabled
3. Verify consumers re-subscribe
### Scenario 4: Outbox Not Publishing
**Symptoms:**
- Outbox table growing
- No messages appearing in broker
**Diagnosis:**
1. Check outbox poller service status
2. Check database connectivity
3. Check broker connectivity from poller
**Actions:**
1. Restart outbox poller
2. Verify database connection string
3. Check broker credentialsChecklist
- [x] Consumer lag dashboard с automatic scaling trigger
- [x] End-to-end message tracing с distributed correlation IDs
- [x] Operational runbook для common messaging failure scenarios
Практика
Контрольная точка модуля 12
- RabbitMQ/Kafka для async inter-service communication
- Outbox Pattern для reliable event publishing
- Idempotent consumers с deduplication
- Dead letter queue handling с retry chain и alerting
- Consumer lag monitoring и automatic scaling
- Event contract governance с schema validation
- Zero message loss под normal operation (verified через test)
- Consumer lag < 1000 messages under steady-state load
- All consumers are idempotent (duplicate processing verified safe)
- Dead letter queue alerting triggers на systematic failure pattern
- Event contract changes validated through CI pipeline