14Fintech — Специфика работы с данными
Этот модуль охватывает специфические аспекты проектирования и реализации систем в fintech-индустрии: целостность финансовых данных, теорема CAP и её практическое применение, паттерны распределённой согласованности, архитектура финансовых реестров (ledger), и регуляторные требования к данным.
Этот модуль следует пройти после модулей 5 (Базы данных), 3 (Concurrency), 12 (Message Brokers). CAP theorem и distributed consistency patterns — фундамент для понимания модуля 10 (Cloud-Native).
Целостность данных в Fintech
Введение
В финансовых системах целостность данных — не просто техническое требование, а бизнес-императив и регуляторное обязательство. Ошибка в расчёте баланса может привести к потере миллионов долларов, а потеря транзакции — к судебным искам и штрафам от регуляторов.
> Ключевая мысль: ACID гарантирует корректность одной транзакции в одной БД. Но движение денег — это контракт между сервисами, кэшами, брокерами и внешними системами. Production-grade корректность живёт выше уровня базы данных.
ACID в финансовых системах
Atomicity — Атомарность
Каждая финансовая операция должна быть либо полностью выполнена, либо полностью откатана. Нет промежуточных состояний.
// CORRECT: Atomic double-entry posting
public async Task<PostJournalEntryResult> PostAsync(JournalEntry entry, CancellationToken ct)
{
using var txn = await _connection.BeginTransactionAsync(IsolationLevel.Serializable, ct);
try
{
// 1. Idempotency check
var existing = await txn.QueryFirstOrDefaultAsync<Guid>(
"SELECT idempotency_key FROM journal_entries WHERE idempotency_key = @Key",
new { Key = entry.IdempotencyKey });
if (existing.HasValue)
return PostJournalEntryResult.Duplicate(existing.Value);
// 2. Validate double-entry invariant
var totalDebits = entry.Postings.Sum(p => p.Amount * (p.IsDebit ? 1 : -1));
if (Math.Abs(totalDebits) > decimal.Epsilon)
throw new InvalidOperationException("Double-entry invariant violated: debits must equal credits");
// 3. Write all postings atomically
await txn.ExecuteAsync("""
INSERT INTO journal_entries (id, idempotency_key, timestamp, description)
VALUES (@Id, @Key, @Ts, @Desc)
""", new { entry.Id, entry.IdempotencyKey, entry.Timestamp, entry.Description });
await txn.ExecuteAsync("""
INSERT INTO journal_postings (entry_id, account_id, amount, is_debit, version)
VALUES (@EntryId, @AccountId, @Amount, @IsDebit, @Version)
""", entry.Postings.Select(p => new { entry.Id, p.AccountId, p.Amount, p.IsDebit, p.Version }));
// 4. Emit outbox event (same transaction!)
await txn.ExecuteAsync("""
INSERT INTO outbox_events (id, aggregate_id, event_type, payload, created_at)
VALUES (@Id, @AggId, @Type, @Payload, @Created)
""", new {
Id = Guid.NewGuid(),
AggId = entry.Id,
Type = "JournalEntryPosted",
Payload = JsonSerializer.Serialize(entry),
Created = DateTimeOffset.UtcNow
});
await txn.CommitAsync();
return PostJournalEntryResult.Success(entry.Id);
}
catch
{
await txn.RollbackAsync();
throw;
}
}Consistency — Согласованность
Система переходит из одного корректного состояния в другое. Все business invariants соблюдаются.
Ключевые инварианты в fintech:
| Инвариант | Описание | Как enforcement |
|---|---|---|
| Double-entry | sum(debits) = sum(credits) для каждой записи | CHECK constraint на уровне БД |
| Non-negative balance | Баланс не может быть отрицательным (если не овердрафт) | CHECK constraint или trigger |
| No double-spend | Одни и те же средства не могут быть переведены дважды | Idempotency key + UNIQUE constraint |
| Order preservation | Транзакции обрабатываются в корректном порядке | Monotonic timestamps / sequence numbers |
| Total conservation | Сумма денег в системе не меняется | Double-entry invariant |
-- Database-level enforcement of double-entry invariant
ALTER TABLE journal_entries ADD CONSTRAINT chk_double_entry
CHECK (
(SELECT SUM(amount * CASE WHEN is_debit = true THEN 1 ELSE -1 END
FROM journal_postings WHERE entry_id = journal_entries.id) = 0)
);
-- Non-negative balance enforcement
ALTER TABLE accounts ADD CONSTRAINT chk_non_negative_balance
CHECK (balance >= 0);
-- Idempotency key uniqueness
CREATE UNIQUE INDEX uq_journal_idempotency_key ON journal_entries(idempotency_key);Isolation — Изоляция
Конкурентные транзакции не должны влиять друг на друга. В финансовых системах это критично для предотвращения race conditions при одновременных переводах.
Уровни изоляции в контексте fintech:
| Уровень | Dirty Read | Non-Repeatable Read | Phantom Read | Use Case | Risk |
|---|---|---|---|---|---|
| Read Uncommitted | Possible | Possible | Possible | Analytics only | Критический |
| Read Committed | No | Possible | Possible | Reporting | Средний |
| Repeatable Read | No | No | Possible | Internal operations | Низкий |
| Serializable | No | No | No | Ledger postings | Нулевой |
// SERIALIZABLE isolation for balance-critical operations
public async Task<TransferResult> TransferAsync(TransferCommand cmd, CancellationToken ct)
{
using var txn = await _connection.BeginTransactionAsync(IsolationLevel.Serializable, ct);
try
{
// Pessimistic locking: SELECT FOR UPDATE
var sourceBalance = await txn.QueryFirstOrDefaultAsync<decimal>(
"SELECT balance FROM accounts WHERE id = @Id FOR UPDATE",
new { cmd.SourceAccountId });
if (sourceBalance < cmd.Amount)
return TransferResult.InsufficientFunds;
// Deduct from source
await txn.ExecuteAsync(
"UPDATE accounts SET balance = balance - @Amount WHERE id = @Id",
new { cmd.SourceAccountId, cmd.Amount });
// Add to destination
await txn.ExecuteAsync(
"UPDATE accounts SET balance = balance + @Amount WHERE id = @Id",
new { cmd.DestinationAccountId, cmd.Amount });
// Post journal entry (atomic with updates)
await PostJournalEntryAsync(cmd, txn, ct);
await txn.CommitAsync();
return TransferResult.Success;
}
catch (DbException)
{
await txn.RollbackAsync();
return TransferResult.ConflictRetry;
}
}Durability — Долговечность
После коммита данные не должны быть потеряны даже при отказе оборудования.
// WAL (Write-Ahead Log) configuration in PostgreSQL
// Requires:
// - fsync = on
// - wal_level = replica
// - synchronous_commit = on
// - max_wal_senders > 0 (for replication)
// - replication slot created
// In .NET, ensure connection string enables reliable writes:
// "Host=db;Database=ledger;Username=app;Password=xxx;SslMode=Require;
// TrustServerCertificate=false;Timeout=30;CommandTimeout=30;
// Pooling=true;MinPoolSize=5;MaxPoolSize=100;"
// Npgsql connection with durability guarantees:
var dataSourceBuilder = new NpgsqlDataSourceBuilder(connectionString);
dataSourceBuilder.UseSslServerValidation();
var dataSource = dataSourceBuilder.Build();
// Verify synchronous commit
using var conn = await dataSource.OpenConnectionAsync();
using var cmd = new NpgsqlCommand("SHOW synchronous_commit", conn);
var value = await cmd.ExecuteScalarAsync();
Console.WriteLine($"Synchronous commit: {value}"); // Expected: 'on'Double-Entry Bookkeeping
Двойная запись — не просто бухгалтерская традиция. Это математический инвариант, который гарантирует, что деньги не создаются и не уничтожаются архитектурными ошибками.
Как это работает
Каждая транзакция записывается как journal entry с двумя или более posting'ами. Сумма всех debits равна сумме всех credits.
Transaction: Transfer $100 from Account A to Account B
Journal Entry #1001
├── Posting 1: Account A → Debit $100 (уменьшение актива)
├── Posting 2: Account B → Credit $100 (увеличение актива)
└── Total: Debit $100 = Credit $100 ✓Важно: В бухгалтерской терминологии:
- Debit (дебет) для asset account = увеличение
- Credit (кредит) для asset account = уменьшение
Но в системе платежей проще думать так:
is_debit = true→ деньги уходят со счётаis_debit = false→ деньги приходят на счёт
public record JournalEntry(
Guid Id,
string IdempotencyKey,
DateTimeOffset Timestamp,
string Description,
IReadOnlyList<JournalPosting> Postings);
public record JournalPosting(
Guid AccountId,
decimal Amount,
bool IsDebit,
int Version);
// Validation: double-entry invariant
public bool IsValid()
{
var net = Postings.Sum(p => p.Amount * (p.IsDebit ? 1m : -1m));
return Math.Abs(net) < decimal.Epsilon;
}Append-Only Journal
Journal entries NEVER change после posting. Если нужно исправить ошибку — создаётся reversing entry.
-- Zero UPDATE/DELETE on journal entries
-- PostgreSQL permissions:
-- GRANT INSERT, SELECT ON journal_entries TO posting_engine_role;
-- REVOKE UPDATE, DELETE ON journal_entries FROM posting_engine_role;
-- Reversal entry (не UPDATE существующей записи!)
INSERT INTO journal_entries (id, idempotency_key, timestamp, description)
VALUES (gen_random_uuid(), gen_random_uuid(), NOW(), 'Reversal of entry #1001');
INSERT INTO journal_postings (entry_id, account_id, amount, is_debit, version)
SELECT entry_id, account_id, -amount, NOT is_debit, version
FROM journal_postings WHERE entry_id = '1001-guid';Idempotency
В финансовых системах retry — это норма, а не исключение. Сети лагают, таймауты случаются, клиенты повторяют запросы. Idempotency гарантирует, что повторная обработка того же запроса не создаст дублирующую транзакцию.
Idempotency Key Pattern
public class IdempotencyService
{
private readonly NpgsqlDataSource _dataSource;
public IdempotencyService(NpgsqlDataSource dataSource)
{
_dataSource = dataSource;
}
public async Task<T> ExecuteAsync<T>(
string key,
Func<Task<T>> operation,
TimeSpan? ttl = null,
CancellationToken ct = default)
{
ttl ??= TimeSpan.FromHours(24);
using var conn = await _dataSource.OpenConnectionAsync(ct);
using var txn = await conn.BeginTransactionAsync(ct);
// Check existing result
var existing = await conn.QueryFirstOrDefaultAsync<IdempotencyRecord>(
"""
SELECT result FROM idempotency_keys
WHERE key = @Key AND expires_at > NOW()
""", new { Key = key }, transaction: txn);
if (existing != null)
return existing.Result;
// Execute operation
T result;
try
{
result = await operation();
}
catch
{
await txn.RollbackAsync(ct);
throw;
}
// Store result atomically
await conn.ExecuteAsync(
"""
INSERT INTO idempotency_keys (key, result, expires_at)
VALUES (@Key, @Result, @Expires)
""",
new { Key = key, Result = result, Expires = DateTimeOffset.UtcNow + ttl },
transaction: txn);
await txn.CommitAsync(ct);
return result;
}
}
public record IdempotencyRecord(string Result);Idempotency в HTTP API
// Client sends idempotency key in header
// POST /api/transfers
// Idempotency-Key: abc-123-def
// { "from": "acc-1", "to": "acc-2", "amount": 100 }
[HttpPost("transfers")]
public async Task<IActionResult> CreateTransfer([FromBody] TransferRequest request,
[FromHeader(Name = "Idempotency-Key")] string idempotencyKey)
{
if (string.IsNullOrWhiteSpace(idempotencyKey))
return BadRequest("Idempotency-Key header is required");
var result = await _transferService.ExecuteWithIdempotencyAsync(
idempotencyKey,
() => _transferService.ProcessTransferAsync(request));
return result switch
{
TransferResult.Success => Ok(new { transferId = result.TransferId }),
TransferResult.InsufficientFunds => BadRequest("Insufficient funds"),
TransferResult.Duplicate => Ok(new { transferId = result.DuplicateTransferId }),
TransferResult.ConflictRetry => RetryAfter(2),
_ => StatusCode(500)
};
}Immutability и Audit Trail
Immutable Audit Log
Каждая операция записи в финансовую систему должна фиксироваться в неизменяемом audit log.
-- Audit log table: zero UPDATE/DELETE
CREATE TABLE audit_log (
id BIGSERIAL PRIMARY KEY,
event_type VARCHAR(100) NOT NULL,
actor_id UUID NOT NULL,
entity_type VARCHAR(50) NOT NULL,
entity_id UUID NOT NULL,
action VARCHAR(50) NOT NULL, -- CREATED, POSTED, REVERSED
old_state JSONB, -- состояние ДО операции
new_state JSONB, -- состояние ПОСЛЕ операции
metadata JSONB, -- дополнительные данные
ip_address INET,
user_agent TEXT,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
-- Cryptographic hash chain для tamper-evidence
ALTER TABLE audit_log ADD COLUMN hash TEXT;
ALTER TABLE audit_log ADD COLUMN prev_hash TEXT;
-- Trigger для вычисления хеша
CREATE OR REPLACE FUNCTION compute_audit_hash()
RETURNS TRIGGER AS $$
DECLARE
prev_hash_val TEXT;
data_to_hash TEXT;
BEGIN
-- Get previous hash
SELECT hash INTO prev_hash_val FROM audit_log ORDER BY id DESC LIMIT 1;
prev_hash_val := COALESCE(prev_hash_val, 'genesis');
-- Create hash of this record + previous hash
data_to_hash := NEW.id || NEW.event_type || NEW.actor_id ||
NEW.action || NEW.old_state || NEW.new_state || NEW.created_at;
NEW.hash := encode(sha256(data_to_hash::bytea), 'hex');
NEW.prev_hash := prev_hash_val;
RETURN NEW;
END;
$$ LANGUAGE plpgsql;
CREATE TRIGGER trg_audit_hash
BEFORE INSERT ON audit_log
FOR EACH ROW EXECUTE FUNCTION compute_audit_hash();Reconciliation
Reconciliation — процесс сравнения journal entries с материализованными balance'ами для обнаружения расхождений.
public class ReconciliationService
{
private readonly NpgsqlDataSource _dataSource;
private readonly ILogger<ReconciliationService> _logger;
public ReconciliationService(NpgsqlDataSource dataSource,
ILogger<ReconciliationService> logger)
{
_dataSource = dataSource;
_logger = logger;
}
public async Task<ReconciliationReport> RunAsync(CancellationToken ct)
{
var discrepancies = new List<Discrepancy>();
using var conn = await _dataSource.OpenConnectionAsync(ct);
// Get all accounts
var accounts = await conn.QueryAsync<Account>(
"SELECT id, currency FROM accounts WHERE is_active = true", ct: ct);
foreach (var account in accounts)
{
// Materialized balance
var storedBalance = await conn.QueryFirstOrDefaultAsync<decimal?>(
"SELECT balance FROM account_balances WHERE account_id = @Id",
new { Id = account.Id }, ct: ct);
// Derived balance from journal entries
var derivedBalance = await conn.QueryFirstOrDefaultAsync<decimal>(
@"SELECT COALESCE(SUM(amount * CASE WHEN is_debit = true THEN -1 ELSE 1 END), 0)
FROM journal_postings jp
JOIN journal_entries je ON jp.entry_id = je.id
WHERE jp.account_id = @Id AND je.status = 'POSTED'",
new { Id = account.Id }, ct: ct);
if (Math.Abs(storedBalance.GetValueOrDefault() - derivedBalance) > decimal.Epsilon)
{
discrepancies.Add(new Discrepancy(
account.Id,
account.Currency,
storedBalance.GetValueOrDefault(),
derivedBalance,
DateTimeOffset.UtcNow));
_logger.LogWarning(
"Balance discrepancy detected: Account={AccountId} Stored={Stored} Derived={Derived}",
account.Id, storedBalance, derivedBalance);
}
}
return new ReconciliationReport(DateTimeOffset.UtcNow, discrepancies);
}
}
public record ReconciliationReport(
DateTimeOffset RanAt,
IReadOnlyList<Discrepancy> Discrepancies);
public record Discrepancy(
Guid AccountId,
string Currency,
decimal StoredBalance,
decimal DerivedBalance,
DateTimeOffset DetectedAt);Materialized Balances vs Derived Balances
Проблема mutable balance
WRONG: Mutable balance column
┌─────────────────────────────────────────────────────┐
│ UPDATE accounts SET balance = balance - 100 WHERE id = 'A';
│ UPDATE accounts SET balance = balance + 100 WHERE id = 'B';
│ -- Если второй UPDATE упадёт → balance A уменьшен, B нет │
│ -- Конкурентные записи → lost update │
│ -- Balance drift со временем │
└─────────────────────────────────────────────────────┘Правильный подход: derived balance + materialized cache
CORRECT: Derived from journal entries, cached as materialized view
┌──────────────────────────────────────────────────────────────┐
│ Journal Entries (source of truth): │
│ Entry 1: A -100, B +100 │
│ Entry 2: A -50, C +50 │
│ │
│ Derived balance for A: -150 (always correct) │
│ Materialized balance: -150 (cached, invalidated on new entry)│
│ │
│ Reconciliation: periodic comparison of derived vs materialized│
└──────────────────────────────────────────────────────────────┘-- Materialized view for fast reads
CREATE MATERIALIZED VIEW account_balances AS
SELECT
jp.account_id,
SUM(jp.amount * CASE WHEN jp.is_debit = true THEN -1 ELSE 1 END) AS balance
FROM journal_postings jp
JOIN journal_entries je ON jp.entry_id = je.id
WHERE je.status = 'POSTED'
GROUP BY jp.account_id;
-- Create unique index for locking
CREATE UNIQUE INDEX uq_account_balances_account_id ON account_balances(account_id);
-- Refresh on new journal entries (via trigger or event handler)
CREATE OR REPLACE FUNCTION refresh_account_balances()
RETURNS TRIGGER AS $$
BEGIN
REFRESH MATERIALIZED VIEW CONCURRENTLY account_balances;
RETURN NEW;
END;
$$ LANGUAGE plpgsql;
CREATE TRIGGER trg_refresh_balances
AFTER INSERT ON journal_entries
FOR EACH STATEMENT EXECUTE FUNCTION refresh_account_balances();// Balance types for different consistency needs
public enum BalanceType
{
/// <summary>
/// Real-time balance from writer — strongest consistency
/// Use for: financial decisions, transfer validation
/// </summary>
Writer,
/// <summary>
/// Session-level consistency — read from writer after write
/// Use for: "check balance right after transfer"
/// </summary>
Session,
/// <summary>
/// Materialized view — fast reads, eventually consistent
/// Use for: UI display, reporting
/// </summary>
Materialized
}
public async Task<decimal> GetBalanceAsync(Guid accountId, BalanceType type, CancellationToken ct)
{
return type switch
{
BalanceType.Writer => await GetWriterBalanceAsync(accountId, ct),
BalanceType.Session => await GetSessionBalanceAsync(accountId, ct),
BalanceType.Materialized => await GetMaterializedBalanceAsync(accountId, ct),
_ => throw new ArgumentOutOfRangeException(nameof(type), type, null)
};
}
private async Task<decimal> GetWriterBalanceAsync(Guid accountId, CancellationToken ct)
{
// Direct from journal entries — always correct, but slower
using var conn = await _dataSource.OpenConnectionAsync(ct);
return await conn.QueryFirstOrDefaultAsync<decimal>(
@"SELECT COALESCE(SUM(amount * CASE WHEN is_debit = true THEN -1 ELSE 1 END), 0)
FROM journal_postings jp
JOIN journal_entries je ON jp.entry_id = je.id
WHERE jp.account_id = @Id AND je.status = 'POSTED'",
new { Id = accountId }, ct: ct);
}
private async Task<decimal> GetMaterializedBalanceAsync(Guid accountId, CancellationToken ct)
{
// From materialized view — fast, eventually consistent
using var conn = await _dataSource.OpenConnectionAsync(ct);
return await conn.QueryFirstOrDefaultAsync<decimal>(
"SELECT balance FROM account_balances WHERE account_id = @Id",
new { Id = accountId }, ct: ct);
}Outbox Pattern
Outbox pattern гарантирует, что journal entry и событие для event bus записываются атомарно — либо оба, либо ни одного.
// Outbox table — written in the same transaction as the journal entry
CREATE TABLE outbox_events (
id UUID PRIMARY KEY,
aggregate_id UUID NOT NULL,
event_type VARCHAR(100) NOT NULL,
payload JSONB NOT NULL,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
published_at TIMESTAMPTZ,
retry_count INTEGER NOT NULL DEFAULT 0
);
CREATE INDEX idx_outbox_unpublished ON outbox_events(created_at)
WHERE published_at IS NULL;
// Publisher — polls outbox and publishes to event bus
public class OutboxPublisher
{
private readonly NpgsqlDataSource _dataSource;
private readonly IEventBus _eventBus;
private readonly ILogger<OutboxPublisher> _logger;
public OutboxPublisher(NpgsqlDataSource dataSource, IEventBus eventBus,
ILogger<OutboxPublisher> logger)
{
_dataSource = dataSource;
_eventBus = eventBus;
_logger = logger;
}
public async Task PublishBatchAsync(int batchSize = 100, CancellationToken ct = default)
{
using var conn = await _dataSource.OpenConnectionAsync(ct);
var events = await conn.QueryAsync<OutboxEvent>(
"""
SELECT id, aggregate_id, event_type, payload
FROM outbox_events
WHERE published_at IS NULL
ORDER BY created_at
LIMIT @Limit
""",
new { Limit = batchSize }, ct: ct);
foreach (var evt in events)
{
try
{
await _eventBus.PublishAsync(evt.EventType, evt.Payload, ct);
await conn.ExecuteAsync(
"UPDATE outbox_events SET published_at = NOW() WHERE id = @Id",
new { evt.Id }, ct: ct);
}
catch
{
await conn.ExecuteAsync(
"""
UPDATE outbox_events
SET retry_count = retry_count + 1
WHERE id = @Id
""",
new { evt.Id }, ct: ct);
_logger.LogWarning(ex, "Failed to publish outbox event {EventId}, retry {RetryCount}",
evt.Id, evt.RetryCount + 1);
throw;
}
}
}
}
public record OutboxEvent(
Guid Id,
Guid AggregateId,
string EventType,
string Payload);Конкурентный контроль
Pessimistic Locking (SELECT FOR UPDATE)
// Pessimistic: lock the row, block other transactions
public async Task<TransferResult> TransferPessimisticAsync(
Guid sourceId, Guid destId, decimal amount, CancellationToken ct)
{
using var txn = await _connection.BeginTransactionAsync(IsolationLevel.Serializable, ct);
try
{
// Lock source account row
var balance = await txn.QueryFirstOrDefaultAsync<decimal>(
"SELECT balance FROM accounts WHERE id = @Id FOR UPDATE",
new { Id = sourceId }, ct: ct);
if (balance < amount)
return TransferResult.InsufficientFunds;
// Perform transfers
await txn.ExecuteAsync("UPDATE accounts SET balance = balance - @A WHERE id = @S",
new { A = amount, S = sourceId }, ct: ct);
await txn.ExecuteAsync("UPDATE accounts SET balance = balance + @A WHERE id = @D",
new { A = amount, D = destId }, ct: ct);
await txn.CommitAsync();
return TransferResult.Success;
}
catch (DbException)
{
await txn.RollbackAsync();
return TransferResult.ConflictRetry;
}
}Optimistic Concurrency Control (OCC)
// Optimistic: no locks, retry on conflict
public async Task<TransferResult> TransferOptimisticAsync(
Guid sourceId, Guid destId, decimal amount, int currentVersion, CancellationToken ct)
{
using var txn = await _connection.BeginTransactionAsync(IsolationLevel.Serializable, ct);
try
{
// Update with version check
var rowsAffected = await txn.ExecuteAsync(
"""
UPDATE accounts
SET balance = balance - @Amount, version = version + 1
WHERE id = @Id AND version = @Version
""",
new { Amount = amount, Id = sourceId, Version = currentVersion }, ct: ct);
if (rowsAffected == 0)
return TransferResult.ConflictRetry;
// Similar for destination...
await txn.CommitAsync();
return TransferResult.Success;
}
catch
{
await txn.RollbackAsync();
return TransferResult.ConflictRetry;
}
}Checklist для data integrity в fintech
- [ ] ACID транзакции для всех financial operations
- [ ] Double-entry invariant enforced на уровне БД (CHECK constraint)
- [ ] Append-only journal: zero UPDATE/DELETE на entries
- [ ] Idempotency keys на каждом state-changing endpoint
- [ ] SERIALIZABLE isolation для balance-critical writes
- [ ] Synchronous WAL commit включён
- [ ] Materialized balances invalidируются при новых journal entries
- [ ] Reconciliation pipeline: automated comparison journal vs balances
- [ ] Outbox pattern: events written atomically с journal entries
- [ ] Audit log: immutable, tamper-evident с cryptographic hash chain
- [ ] Pessimistic или optimistic locking для concurrent transfers
- [ ] Balance types: Writer (strong) vs Materialized (fast)
Дополнительные материалы
- [ACID Is a Contract, Not a Religion — Gothar](https://gothar.com/en/insights/acid-ledger-correctness-2026)
- [Payment Ledger Architecture — Trio](https://trio.dev/payment-ledger-architecture-fintech/)
- [TigerBeetle Design Document](https://github.com/tigerbeetledb/tigerbeetle/blob/main/docs/DESIGN.md)
- [The Twisp Financial Ledger Database](https://www.twisp.com/docs/infrastructure/ledger-database)
- Martin Kleppmann, "Designing Data-Intensive Applications" — главы 3, 6, 10
Практика
Теорема CAP в Fintech
Введение
Теорема CAP (Consistency, Availability, Partition Tolerance) — один из фундаментальных результатов теории распределённых систем, сформулированная Эриком Браурером (Eric Brewer) в 2000 году и формально доказанная как CAP theorem в работе Gilbert и Lynch (2002).
> Формулировка: В распределённой системе при наличии network partition (P) невозможно одновременно гарантировать Consistency (C) и Availability (A). Можно выбрать только одно из двух.
В контексте fintech эта теорема приобретает особый смысл, потому что:
- В обычных системах eventual consistency — часто приемлемый компромисс
- В финансовых системах ledger of record почти всегда должен выбирать consistency
- Деньги не могут быть созданы или уничтожены архитектурной ошибкой
Три столпа CAP
Consistency (C) — Сильная согласованность
Каждый read видит результат последнего write. Все ноды видят одни и те же данные в один момент времени (linearizability).
Client → Node A (write: $100) → Node B → Node C
↓
Client → read → Node B → возвращает $100 (свежие данные)Характеристики:
- Linearizable reads: все ноды синхронизированы
- Write требует quorum (большинство) нод
- При partition: недоступная часть останавливает writes
- Примеры: Google Spanner, CockroachDB, PostgreSQL streaming replication
Availability (A) — Доступность
Каждый valid request получает response, даже если часть нод недоступна. No guarantees о свежести данных.
Client → Node A (write: $100) — OK ✓
Client → Node B (read) — возвращает $0 (устаревшие данные, но система отвечает)Характеристики:
- Каждая нода отвечает independently
- Write может уйти на любую ноду
- При partition: обе части продолжают работать
- Примеры: DynamoDB, Cassandra, Couchbase
Partition Tolerance (P) — Устойчивость к partition
Система продолжает работать при сетевых разрывах между нодами.
[Data Center US] ────✗✗✗──── [Data Center EU]
(Node A, B) (Node C, D)
Partition: US ↔ EU network brokenВажно: В реальных распределённых системах P — это не выбор, а данность. Сети ломаются всегда. Поэтому CAP на практике сводится к выбору: CP или AP при partition.
CAP в финансовых системах: ключевое ограничение
> Самое важное правило fintech-архитектуры: Ledger of record всегда выбирает Consistency. Downstream-системы могут выбирать Availability и терпеть eventual consistency, но сам ledger — нет.
Почему ledger не может выбрать AP
Сценарий: Partition между US-East и EU-West
Account balance: $150
US-East (AP): deduct $100 → balance = $50 ✓
EU-West (AP): deduct $100 → balance = $50 ✓
После восстановления partition:
Account balance: $50 (должно быть $-50 или $150 — оба варианта неверны!)
Результат: double-spend, $100 создано из ничегоАльтернатива: CP (Consistency)
Account balance: $150
US-East (3 nodes — quorum): deduct $100 → balance = $50 ✓
EU-West (2 nodes — no quorum): WRITE BLOCKED ⛔
После восстановления partition:
Account balance: $50 — корректно ✓CAP Matrix: Базы данных в контексте Fintech
| База данных | CAP позиция | Подходит для fintech? | Use Case |
|---|---|---|---|
| PostgreSQL (single) | CP (P не применима) | ✅ Core ledger | Транзакции, journal entries |
| PostgreSQL (streaming replication) | CP | ✅ Core ledger (read replicas) | Read replicas для reporting |
| CockroachDB | CP | ✅ Multi-region ledger | Глобальный ledger с strong consistency |
| Google Spanner | CP | ✅ Multi-region ledger | Глобальный ledger с TrueTime |
| MongoDB (single) | CP (P не применима) | ✅ Single-region ledger | Transactional data |
| MongoDB (multi-set replication) | CP/AP configurable | ⚠️ С осторожностью | Ledger с ручной настройкой |
| Cassandra | AP | ❌ Ledger | Analytics, cache, search |
| DynamoDB | AP | ❌ Ledger | Cache, session data, event store |
| Redis | CP (single) / AP (cluster) | ⚠️ Cache only | Hot balances, idempotency keys |
| Elasticsearch | AP | ❌ Transactional | Search, analytics, log storage |
CAP Boundary Pattern
Правильный подход к CAP в fintech — не выбирать одну позицию для всей системы, а определить CAP boundary между разными компонентами.
Архитектура с CAP boundary
┌─────────────────────────────────────────────────────────────────┐
│ Edge Tier (AP — Availability) │
│ │
│ API Gateway ──→ Kafka (AP, partition-tolerant) │
│ - Принимает запросы при любой partition │
│ - Гарантирует at-least-once delivery │
│ - Idempotency keys для duplicate protection │
│ │
│ │ │
│ ▼ │
│ Orchestration Tier (Hybrid) │
│ │
│ Saga Orchestrator │
│ - Читает из Kafka (AP) │
│ - Записывает в Ledger (CP) │
│ - Мост между AP и CP мирами │
│ │
│ │ │
│ ▼ │
│ Core Tier (CP — Consistency) │
│ │
│ Ledger Database (CockroachDB / PostgreSQL) │
│ - Strong consistency для financial records │
│ - Quorum-based writes │
│ - При partition: quorum часть работает, остальная блокируется │
│ - Double-entry invariant enforced │
│ │
│ │ │
│ ▼ │
│ Periphery Tier (AP — eventual consistency) │
│ │
│ Read Models / Materialized Views │
│ - UI balances, reporting, analytics │
│ - Tolerate eventual consistency │
│ - Reconciliation pipeline для обнаружения расхождений │
└─────────────────────────────────────────────────────────────────┘Реализация CAP boundary с Kafka + PostgreSQL
// Edge: AP — Kafka принимает все запросы
public class PaymentEdgeService
{
private readonly IKafkaProducer _producer;
public async Task<ReceiveResult> ReceivePaymentAsync(PaymentRequest request,
string idempotencyKey, CancellationToken ct)
{
// Даже если backend недоступен — мы принимаем запрос
var eventMsg = new PaymentReceivedEvent(
Id: Guid.NewGuid(),
IdempotencyKey: idempotencyKey,
Amount: request.Amount,
Currency: request.Currency,
SourceAccount: request.SourceAccount,
DestinationAccount: request.DestinationAccount,
ReceivedAt: DateTimeOffset.UtcNow
);
await _producer.PublishAsync("payment-events", idempotencyKey,
JsonSerializer.Serialize(eventMsg), ct);
return ReceiveResult.Accepted; // "Мы получили, обрабатываем"
}
}
// Core: CP — Ledger гарантирует consistency
public class LedgerService
{
private readonly NpgsqlDataSource _dataSource;
public async Task<PaymentProcessedResult> ProcessPaymentAsync(
PaymentReceivedEvent evt, CancellationToken ct)
{
using var txn = await _dataSource.BeginTransactionAsync(IsolationLevel.Serializable, ct);
try
{
// Idempotency check (UNIQUE constraint на idempotency_key)
var existing = await txn.QueryFirstOrDefaultAsync<Guid>(
"SELECT transfer_id FROM transfers WHERE idempotency_key = @Key",
new { Key = evt.IdempotencyKey }, ct: ct);
if (existing.HasValue)
return PaymentProcessedResult.Duplicate(existing.Value);
// Balance check + transfer (SERIALIZABLE isolation)
var sourceBalance = await txn.QueryFirstOrDefaultAsync<decimal>(
"SELECT balance FROM accounts WHERE id = @Id FOR UPDATE",
new { Id = evt.SourceAccount }, ct: ct);
if (sourceBalance < evt.Amount)
return PaymentProcessedResult.InsufficientFunds;
// Atomic transfer + journal entry
await txn.ExecuteAsync(
"UPDATE accounts SET balance = balance - @A WHERE id = @Id",
new { A = evt.Amount, Id = evt.SourceAccount }, ct: ct);
await txn.ExecuteAsync(
"UPDATE accounts SET balance = balance + @A WHERE id = @Id",
new { A = evt.Amount, Id = evt.DestinationAccount }, ct: ct);
var transferId = Guid.NewGuid();
await txn.ExecuteAsync(
"INSERT INTO transfers (id, idempotency_key, amount, source, destination, status) " +
"VALUES (@Id, @Key, @Amt, @Src, @Dst, 'COMPLETED')",
new { Id = transferId, Key = evt.IdempotencyKey, Amt = evt.Amount,
Src = evt.SourceAccount, Dst = evt.DestinationAccount }, ct: ct);
await txn.CommitAsync();
return PaymentProcessedResult.Success(transferId);
}
catch (DbException)
{
await txn.RollbackAsync(ct);
return PaymentProcessedResult.Conflict;
}
}
}Dynamic CAP
Dynamic CAP — концепция, при которой система динамически адаптирует свою CAP-позицию в зависимости от контекста транзакции (сумма, риск, тип операции).
Dynamic CAP в действии
public class DynamicCapRouter
{
private const decimal HighRiskThreshold = 10_000m;
private const decimal CriticalRiskThreshold = 100_000m;
public CapMode DetermineCapMode(decimal amount, string transactionType)
{
// Критические транзакции — всегда CP
if (amount >= CriticalRiskThreshold)
return CapMode.Consistency;
// Высокий риск — CP
if (amount >= HighRiskThreshold)
return CapMode.Consistency;
// Низкий риск, read-only — можно AP
if (transactionType == "balance_check")
return CapMode.Availability;
// Стандартные транзакции — CP по умолчанию
return CapMode.Consistency;
}
}
public enum CapMode
{
Consistency, // CP — ledger writes, high-value transfers
Availability // AP — balance checks, reporting, analytics
}Примеры Dynamic CAP:
| Транзакция | Сумма | CAP Mode | Почему |
|---|---|---|---|
| Проверка баланса | $1 | AP | Read-only, нет риска |
| Перевод | $50 | CP | Standard financial operation |
| Перевод | $5,000 | CP | Standard financial operation |
| Перевод | $50,000 | CP | High-value, strict consistency |
| Платёж merchant | $20 | AP (edge) → CP (core) | AP для приёма, CP для обработки |
| Settlement (между банками) | $10M | CP | Inter-bank settlement, zero risk tolerance |
Quorum-based Consensus
Quorum — механизм, при котором для write требуется согласие большинства (majority) нод, а для read — чтение с большинства нод. Это обеспечивает consistency без полной синхронизации всех нод.
Quorum math
N = общее количество нод
R = количество нод для read quorum
W = количество нод для write quorum
Условие consistency: R + W > N
Примеры:
N=3, R=2, W=2 → 2+2 > 3 ✓ (стандартный quorum)
N=5, R=3, W=3 → 3+3 > 5 ✓ (strong quorum)
N=5, R=2, W=3 → 2+3 > 5 ✗ (не гарантирует consistency!)Quorum в CockroachDB / Spanner
// CockroachDB: 5-node cluster, 3 replicas per range
// Write requires 3 replicas (write quorum = 3)
// Read requires 2 replicas (read quorum = 2)
// 2 + 3 > 5 → consistency guaranteed
// При network partition:
// US-East (3 nodes) — имеет write quorum → продолжает работать
// EU-West (2 nodes) — нет write quorum → writes заблокированы
// Это и есть CP поведение: consistency > availabilityCAP и распределённые транзакции
PC (Two-Phase Commit) — CP
Coordinator Participant A Participant B
│ │ │
│─── PREPARE ──────────────>│ │
│ │─── PREPARE ────────>│
│ │ │
│<── VOTE: COMMIT ─────────│ │
│<──────── VOTE: COMMIT ────│ │
│ │ │
│─── COMMIT ───────────────>│ │
│─── COMMIT ────────────────────────────────────>│Плюсы:
- Строгая consistency (linearizability)
- Атомарность распределённых транзакций
Минусы:
- Блокирующие операции: все участники держат locks
- Coordinator — single point of blocking
- При partition: все участники, не связанные с quorum, блокируются
- Низкая availability при network issues
В fintech: 2PC редко используется для high-throughput payment systems из-за lock contention на account'ах.
Saga Pattern — eventual consistency с гарантиями
Transfer Saga:
Step 1: [LockService] Reserve $100 on Account A ✓
Step 2: [LedgerService] Post journal entry ✓
Step 3: [PaymentService] Charge external gateway ✗ (timeout)
Compensation:
Step 3c: [PaymentService] Cancel charge (no-op)
Step 2c: [LedgerService] Reverse journal entry ✓
Step 1c: [LockService] Release reservation ✓
Результат: система возвращается в consistent statepublic class TransferSaga : ISaga
{
private readonly ILedgerService _ledger;
private readonly IPaymentService _payment;
private readonly ILockService _lock;
private readonly ILogger<TransferSaga> _logger;
public async Task<TransferResult> ExecuteAsync(TransferCommand cmd,
CancellationToken ct)
{
var sagaState = new SagaState
{
TransferId = Guid.NewGuid(),
IdempotencyKey = cmd.IdempotencyKey,
SourceAccount = cmd.SourceAccount,
DestinationAccount = cmd.DestinationAccount,
Amount = cmd.Amount,
Currency = cmd.Currency,
Step = 0
};
try
{
// Step 1: Reserve funds
sagaState.Step = 1;
var reservationId = await _lock.ReserveAsync(
cmd.SourceAccount, cmd.Amount, cmd.Currency, sagaState.TransferId, ct);
// Step 2: Post journal entry
sagaState.Step = 2;
var journalId = await _ledger.PostAsync(new JournalEntry(
sagaState.TransferId, sagaState.IdempotencyKey, DateTimeOffset.UtcNow,
$"Transfer {cmd.SourceAccount} → {cmd.DestinationAccount}",
new[]
{
new JournalPosting(cmd.SourceAccount, cmd.Amount, IsDebit: true, 1),
new JournalPosting(cmd.DestinationAccount, cmd.Amount, IsDebit: false, 1)
}), ct);
// Step 3: External payment
sagaState.Step = 3;
var paymentResult = await _payment.ChargeAsync(new ChargeRequest(
sagaState.TransferId, cmd.SourceAccount, cmd.DestinationAccount,
cmd.Amount, cmd.Currency), ct);
if (!paymentResult.Success)
throw new PaymentFailedException(paymentResult.Error);
// Complete
return TransferResult.Success(sagaState.TransferId);
}
catch (Exception ex)
{
// Compensating transactions (in reverse order)
await CompensateAsync(sagaState, ex, ct);
return TransferResult.FailedCompensated(sagaState.TransferId);
}
}
private async Task CompensateAsync(SagaState state, Exception reason, CancellationToken ct)
{
_logger.LogWarning(reason, "Saga {TransferId} failed at step {Step}, compensating",
state.TransferId, state.Step);
// Compensate in reverse order
if (state.Step >= 3)
{
try { await _payment.CancelAsync(state.TransferId, ct); }
catch (Exception ex) { _logger.LogError(ex, "Compensation failed: cancel payment"); }
}
if (state.Step >= 2)
{
try { await _ledger.ReverseAsync(state.TransferId, ct); }
catch (Exception ex) { _logger.LogError(ex, "Compensation failed: reverse journal"); }
}
if (state.Step >= 1)
{
try { await _lock.ReleaseAsync(state.SourceAccount, state.TransferId, ct); }
catch (Exception ex) { _logger.LogError(ex, "Compensation failed: release lock"); }
}
}
}
public record SagaState
{
public Guid TransferId { get; set; }
public string IdempotencyKey { get; set; } = string.Empty;
public Guid SourceAccount { get; set; }
public Guid DestinationAccount { get; set; }
public decimal Amount { get; set; }
public string Currency { get; set; } = string.Empty;
public int Step { get; set; }
}CAP Theorem: практические выводы для fintech
Что CAP theorem говорит (и не говорит)
| Утверждение | Верно? | Пояснение |
|---|---|---|
| Можно выбрать C и A без P | ✅ | Single-node БД (PostgreSQL) — CP, но P не применима |
| Distributed система всегда должна жертвовать C или A при P | ✅ | Это и есть theorem |
| Ledger может выбирать AP | ❌ | Financial ledger должен быть CP |
| Все части системы должны быть одинакового CAP | ❌ | Dynamic CAP: разные части — разные CAP |
| Eventual consistency подходит для core ledger | ❌ | Только для periphery (reporting, analytics) |
| 2PC — лучший выбор для distributed financial transactions | ❌ | Saga pattern предпочтительнее для high-throughput |
| CAP — это бинарный выбор | ⚠️ | На практике — спектр: от strong consistency до eventual |
| Quorum обеспечивает consistency | ✅ | При R + W > N |
Decision Framework
1. Определите CAP boundary вашей системы:
┌─────────────────────────────────────────────┐
│ Edge: AP (принимать запросы при любой partition) │
│ Core: CP (ledger — strong consistency) │
│ Periphery: AP (reporting, analytics) │
└─────────────────────────────────────────────┘
2. Выберите механизм consistency для каждого слоя:
- Core: CP с quorum-based consensus (CockroachDB, Spanner, PostgreSQL)
- Edge: AP с Kafka (at-least-once, idempotency)
- Periphery: AP с eventual consistency (Cassandra, Elasticsearch)
3. Orchestrация между слоями:
- Saga pattern для cross-service transactions
- Outbox pattern для event publishing
- Idempotency keys для duplicate protection
4. Reconciliation:
- Automated comparison journal entries vs materialized balances
- Alerting при обнаружении расхождений
- Remediation pipeline для исправленияДополнительные материалы
- Eric Brewer, "CAP Writings" — https://cs.brown.edu/~mph/JavaPY2W2002/p12-brewer.pdf
- Gilbert & Lynch, "Brewer's Conjecture and the Feasibility of Consistent, Available, Partition-Tolerant Web Services" (2002)
- Martin Kleppmann, "Designing Data-Intensive Applications" — главы 6, 7
- RIG Model: "Guaranteed Data-Consistent Microservice Systems" — https://www.infoq.com/articles/rig-data-consistent-microservices/
- TechBullion, "Distributed Systems in U.S. Finance" — https://techbullion.com/distributed-systems-in-u-s-finance-the-patterns-that-survive-production/
Практика
Паттерны распределённой согласованности
Введение
В fintech distributed systems, где нет единой БД с ACID транзакциями для всех данных, согласованность достигается через композицию паттернов. Каждый паттерн решает определённую проблему, и правильный выбор — ключ к построению корректной системы.
Saga Pattern
Saga — последовательность локальных транзакций, где каждая завершается independently, а при ошибке выполняются compensating transactions (компенсирующие транзакции).
Choreography-based Saga
OrderService ──→ OrderCreatedEvent
│
├──→ InventoryService (reserve stock)
│ │
│ ├──→ StockReservedEvent
│ │ │
│ │ └──→ PaymentService (charge)
│ │ │
│ │ ├──→ PaymentChargedEvent
│ │ │ │
│ │ │ └──→ OrderService (confirm)
│ │ │
│ │ └──→ PaymentFailedEvent ──→ InventoryService (release stock)
│ │
│ └──→ StockReservationFailedEvent ──→ OrderService (cancel)
│
└──→ NotificationService (send confirmation)// Choreography-based: events drive the saga
public class OrderCreatedEventHandler : IEventHandler<OrderCreatedEvent>
{
private readonly IInventoryService _inventory;
private readonly ILogger<OrderCreatedEventHandler> _logger;
public async Task HandleAsync(OrderCreatedEvent evt, CancellationToken ct)
{
try
{
await _inventory.ReserveStockAsync(evt.OrderId, evt.Items, ct);
await _eventBus.PublishAsync(new StockReservedEvent(evt.OrderId), ct);
}
catch (Exception ex)
{
_logger.LogError(ex, "Stock reservation failed for order {OrderId}", evt.OrderId);
await _eventBus.PublishAsync(new StockReservationFailedEvent(evt.OrderId), ct);
}
}
}
public class PaymentChargedEventHandler : IEventHandler<PaymentChargedEvent>
{
private readonly IOrderService _order;
public async Task HandleAsync(PaymentChargedEvent evt, CancellationToken ct)
{
await _order.ConfirmOrderAsync(evt.OrderId, ct);
await _eventBus.PublishAsync(new OrderConfirmedEvent(evt.OrderId), ct);
}
}
public class PaymentFailedEventHandler : IEventHandler<PaymentFailedEvent>
{
private readonly IInventoryService _inventory;
public async Task HandleAsync(PaymentFailedEvent evt, CancellationToken ct)
{
// Compensating: release reserved stock
await _inventory.ReleaseStockAsync(evt.OrderId, ct);
await _eventBus.PublishAsync(new StockReleasedEvent(evt.OrderId), ct);
}
}Orchestration-based Saga (рекомендуется для fintech)
// Orchestration-based: central orchestrator controls the flow
public class PaymentSagaOrchestrator
{
private readonly ILedgerService _ledger;
private readonly IPaymentGateway _gateway;
private readonly IKafkaProducer _producer;
private readonly ILogger<PaymentSagaOrchestrator> _logger;
public async Task<SagaResult> ExecuteAsync(PaymentSagaCommand cmd,
CancellationToken ct)
{
var saga = new SagaInstance
{
Id = Guid.NewGuid(),
IdempotencyKey = cmd.IdempotencyKey,
SourceAccount = cmd.SourceAccount,
DestinationAccount = cmd.DestinationAccount,
Amount = cmd.Amount,
Currency = cmd.Currency,
Step = 0,
State = SagaState.Active
};
// Persist saga state (для recovery после crash)
await PersistSagaStateAsync(saga, ct);
try
{
// Step 1: Validate
saga.Step = 1;
var isValid = await ValidateAsync(saga, ct);
if (!isValid)
return SagaResult.Invalid(saga.Id);
// Step 2: Reserve funds
saga.Step = 2;
await ReserveFundsAsync(saga, ct);
// Step 3: Post journal entry
saga.Step = 3;
await PostJournalEntryAsync(saga, ct);
// Step 4: Execute external payment
saga.Step = 4;
var paymentResult = await ExecuteExternalPaymentAsync(saga, ct);
if (!paymentResult.Success)
throw new ExternalPaymentFailedException(paymentResult.Error);
// Step 5: Confirm
saga.Step = 5;
await ConfirmPaymentAsync(saga, ct);
saga.State = SagaState.Completed;
await PersistSagaStateAsync(saga, ct);
return SagaResult.Success(saga.Id);
}
catch (Exception ex)
{
saga.State = SagaState.Compensating;
await PersistSagaStateAsync(saga, ct);
// Compensate in reverse
await CompensateAsync(saga, ex, ct);
saga.State = SagaState.Compensated;
await PersistSagaStateAsync(saga, ct);
return SagaResult.Compensated(saga.Id);
}
}
private async Task CompensateAsync(SagaInstance saga, Exception reason, CancellationToken ct)
{
_logger.LogWarning(reason, "Saga {SagaId} compensating from step {Step}",
saga.Id, saga.Step);
// Compensate in REVERSE order
if (saga.Step >= 5)
try { await UnconfirmPaymentAsync(saga, ct); }
catch (Exception ex) { await LogCompensationFailureAsync(saga.Id, ex); }
if (saga.Step >= 4)
try { await CancelExternalPaymentAsync(saga, ct); }
catch (Exception ex) { await LogCompensationFailureAsync(saga.Id, ex); }
if (saga.Step >= 3)
try { await ReverseJournalEntryAsync(saga, ct); }
catch (Exception ex) { await LogCompensationFailureAsync(saga.Id, ex); }
if (saga.Step >= 2)
try { await ReleaseFundsAsync(saga, ct); }
catch (Exception ex) { await LogCompensationFailureAsync(saga.Id, ex); }
}
}
public record SagaInstance
{
public Guid Id { get; set; }
public string IdempotencyKey { get; set; } = string.Empty;
public Guid SourceAccount { get; set; }
public Guid DestinationAccount { get; set; }
public decimal Amount { get; set; }
public string Currency { get; set; } = string.Empty;
public int Step { get; set; }
public SagaState State { get; set; }
}
public enum SagaState { Active, Compensating, Completed, Compensated, Failed }Saga: правила для fintech
| Правило | Описание |
|---|---|
| Idempotent compensations | Каждая compensating transaction должна быть idempotent |
| Persist saga state | Saga state сохраняется в БД для recovery после crash |
| No interleaving | Sagas не должны interleaving'аться на одних и тех же данных |
| Audit every step | Каждый step и compensation логируются |
| Timeout handling | Timeout для long-running sagas → manual review |
| No 2PC | Saga заменяет distributed 2PC в microservices |
Outbox Pattern
Outbox гарантирует atomicity между записью данных и публикацией события.
Inbox Pattern (для exactly-once обработки)
// Inbox: prevents duplicate processing of events
CREATE TABLE inbox_events (
id UUID PRIMARY KEY,
source VARCHAR(100) NOT NULL,
event_type VARCHAR(100) NOT NULL,
payload JSONB NOT NULL,
received_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
CREATE UNIQUE INDEX uq_inbox_id_source ON inbox_events(id, source);
// При получении события:
// INSERT ... ON CONFLICT DO NOTHING — если уже есть, событие пропущеноpublic class InboxProcessor<TEvent>
{
private readonly NpgsqlDataSource _dataSource;
private readonly IEventHandler<TEvent> _handler;
private readonly ILogger<InboxProcessor<TEvent>> _logger;
public async Task ProcessAsync(InboxMessage<TEvent> message, CancellationToken ct)
{
using var conn = await _dataSource.OpenConnectionAsync(ct);
using var txn = await conn.BeginTransactionAsync(ct);
// Try to insert into inbox (UNIQUE constraint prevents duplicates)
var inserted = await conn.ExecuteAsync(
"""
INSERT INTO inbox_events (id, source, event_type, payload)
VALUES (@Id, @Source, @Type, @Payload)
""",
new { message.Id, Source = message.Source, Type = message.EventType,
Payload = message.Payload },
transaction: txn, ct: ct);
if (inserted == 0)
{
// Already processed — skip
_logger.LogDebug("Inbox event {EventId} already processed, skipping", message.Id);
return;
}
// Process event within the same transaction
var evt = JsonSerializer.Deserialize<TEvent>(message.Payload);
if (evt != null)
await _handler.HandleAsync(evt, ct);
await txn.CommitAsync(ct);
}
}CQRS (Command Query Responsibility Segregation)
Разделение write (command) и read (query) моделей для independent scaling и consistency control.
CQRS в fintech
┌──────────────────────────────────────────────────────────────────┐
│ Write Path │
│ │
│ API ──→ Command Validator ──→ Saga Orchestrator ──→ PostgreSQL │
│ │ │
│ ▼ │
│ Journal Entry │
│ │ │
│ ▼ │
│ Outbox Event │
│ │ │
└──────────────────────────────────────────────────────────────────┘
│
┌──────────────────────────────────────────────────────────────────┐
│ Read Path │
│ │
│ Event Bus │
│ │ │
│ ▼ │
│ Read Processor │
│ │ │
│ ▼ │
│ Materialized Views (PostgreSQL) │
│ │ │
│ ▼ │
│ API ──→ Balance / History / Reports │
└──────────────────────────────────────────────────────────────────┘// Command side: strongly consistent
public class TransferCommandHandler
{
public async Task<TransferResult> Handle(TransferCommand cmd, CancellationToken ct)
{
using var txn = await _db.BeginTransactionAsync(IsolationLevel.Serializable, ct);
// Validate: check balance
var balance = await GetBalanceAsync(cmd.SourceAccount, BalanceType.Writer, txn, ct);
if (balance < cmd.Amount)
return TransferResult.InsufficientFunds;
// Execute: atomic debit + credit + journal
await DebitAsync(cmd.SourceAccount, cmd.Amount, txn, ct);
await CreditAsync(cmd.DestinationAccount, cmd.Amount, txn, ct);
await PostJournalEntryAsync(cmd, txn, ct);
// Publish event to outbox (same transaction)
await PublishOutboxEventAsync(new FundsTransferredEvent(
cmd.SourceAccount, cmd.DestinationAccount, cmd.Amount), txn, ct);
await txn.CommitAsync();
return TransferResult.Success;
}
}
// Query side: eventually consistent, optimized for reads
public class BalanceQueryHandler
{
public async Task<BalanceResult> Handle(GetBalanceQuery query, CancellationToken ct)
{
// Fast read from materialized view
var balance = await _readDb.QueryFirstOrDefaultAsync<decimal>(
"SELECT balance FROM account_balances WHERE account_id = @Id",
new { query.AccountId }, ct: ct);
return new BalanceResult(query.AccountId, balance, DateTimeOffset.UtcNow);
}
}
// Read Processor: consumes events and updates materialized views
public class FundsTransferredReadProcessor : IEventHandler<FundsTransferredEvent>
{
public async Task HandleAsync(FundsTransferredEvent evt, CancellationToken ct)
{
using var conn = await _readDb.OpenConnectionAsync(ct);
// Update materialized balances
await conn.ExecuteAsync(
"UPDATE account_balances SET balance = balance - @Amount WHERE account_id = @Source",
new { evt.Amount, Source = evt.SourceAccount }, ct: ct);
await conn.ExecuteAsync(
"UPDATE account_balances SET balance = balance + @Amount WHERE account_id = @Dest",
new { evt.Amount, Dest = evt.DestinationAccount }, ct: ct);
}
}Event Sourcing
Event Sourcing — паттерн, где состояние системы восстанавливается из последовательности immutable events.
Event Sourcing для ledger
// Events (immutable, append-only)
public record FundsTransferred(
Guid AggregateId,
Guid SourceAccount,
Guid DestinationAccount,
decimal Amount,
string Currency,
string IdempotencyKey,
DateTimeOffset OccurredAt);
public record AccountOpened(
Guid AggregateId,
string AccountNumber,
string Currency,
decimal OpeningBalance,
DateTimeOffset OccurredAt);
// Aggregate root
public class Account : EntityGuid
{
private readonly List<object> _uncommittedEvents = new();
public string Number { get; private set; } = string.Empty;
public string Currency { get; private set; } = string.Empty;
public decimal Balance => CalculateBalance();
public int Version { get; private set; }
public void Open(string accountNumber, string currency, decimal openingBalance,
Guid createdBy)
{
Ensure.NotEmpty(accountNumber, nameof(accountNumber));
var @event = new AccountOpened(Id, accountNumber, currency, openingBalance,
DateTimeOffset.UtcNow);
Apply(@event);
}
public void Transfer(Guid sourceId, Guid destId, decimal amount, string currency,
string idempotencyKey, Guid createdBy)
{
if (amount <= 0) throw new ArgumentException("Amount must be positive");
if (Balance < amount) throw new InsufficientFundsException(Balance, amount);
var @event = new FundsTransferred(Id, sourceId, destId, amount, currency,
idempotencyKey, DateTimeOffset.UtcNow);
Apply(@event);
}
private void Apply(@event)
{
switch (@event)
{
case AccountOpened e:
Number = e.AccountNumber;
Currency = e.Currency;
break;
case FundsTransferred e:
// Balance updated by event processor, not directly
break;
}
_uncommittedEvents.Add(@event);
Version++;
}
private decimal CalculateBalance()
{
// Balance derived from events
return 0m; // computed by projection
}
public IReadOnlyList<object> GetUncommittedEvents() => _uncommittedEvents.AsReadOnly();
public void ClearUncommittedEvents() => _uncommittedEvents.Clear();
}-- Event store table
CREATE TABLE event_store (
id BIGSERIAL PRIMARY KEY,
aggregate_id UUID NOT NULL,
event_type VARCHAR(200) NOT NULL,
payload JSONB NOT NULL,
version INTEGER NOT NULL,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
metadata JSONB
);
CREATE INDEX idx_event_store_aggregate ON event_store(aggregate_id, version);
CREATE UNIQUE INDEX uq_event_store_aggregate_version ON event_store(aggregate_id, version);
-- Append-only: no UPDATE/DELETE
-- GRANT INSERT, SELECT ON event_store TO event_store_writer;
-- REVOKE UPDATE, DELETE ON event_store FROM event_store_writer;// Event Sourcing repository
public class EventSourcingRepository<TAggregate>
where TAggregate : EntityGuid, IAggregateRoot
{
private readonly NpgsqlDataSource _dataSource;
public async Task<TAggregate> LoadAsync(Guid id, CancellationToken ct)
{
var events = await _dataSource.QueryAsync<EventRecord>(
"""
SELECT event_type, payload, version
FROM event_store
WHERE aggregate_id = @Id
ORDER BY version
""",
new { Id = id }, ct: ct);
// Create empty aggregate and replay events
var aggregate = (TAggregate)Activator.CreateInstance(typeof(TAggregate), id);
foreach (var evt in events)
{
var deserialized = JsonSerializer.Deserialize(evt.Payload);
if (deserialized != null)
aggregate.Apply(deserialized);
aggregate.Version = evt.Version;
}
return aggregate;
}
public async Task SaveAsync(TAggregate aggregate, CancellationToken ct)
{
using var txn = await _dataSource.BeginTransactionAsync(IsolationLevel.Serializable, ct);
var uncommitted = aggregate.GetUncommittedEvents();
if (uncommitted.Count == 0) return;
foreach (var evt in uncommitted)
{
var payload = JsonSerializer.Serialize(evt);
var eventType = evt.GetType().Name;
var version = aggregate.Version - uncommitted.Count +
Array.IndexOf(uncommitted.ToArray(), evt) + 1;
await txn.ExecuteAsync(
"""
INSERT INTO event_store (aggregate_id, event_type, payload, version, created_at)
VALUES (@Id, @Type, @Payload, @Version, @Created)
""",
new { Id = aggregate.Id, Type = eventType, Payload = payload,
Version = version, Created = DateTimeOffset.UtcNow },
ct: ct);
}
aggregate.ClearUncommittedEvents();
await txn.CommitAsync(ct);
}
}
public record EventRecord(string EventType, string Payload, int Version);RIG Model
RIG (Run-Isolate-Group) — модель для проектирования microservices с гарантированной eventual consistency через saga pattern.
RIG категории
| Категория | Описание | Правило | Пример |
|---|---|---|---|
| Run | Транзакции, которые можно interleaving'ать | Нет ограничений | Analytics, reporting |
| Isolate | Транзакции, которые блокируют данные на время saga | Data locked during saga | Order processing |
| Group | Транзакции, которые работают с общими данными без блокировки | Requires careful saga design | Shared inventory |
RIG в действии
Payment Saga:
┌────────────────────────────────────────────────────────────┐
│ │
│ Run (no isolation needed): │
│ ┌─────────────┐ ┌─────────────┐ │
│ │ Notification │ │ Analytics │ │
│ └─────────────┘ └─────────────┘ │
│ │
│ Isolate (data locked during saga): │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
│ │ Ledger │→│ Payment │→│ Settlement │ │
│ │ (lock acct)│ │ (lock acct)│ │ │ │
│ └─────────────┘ └─────────────┘ └─────────────┘ │
│ │
│ Group (shared data, careful saga): │
│ ┌─────────────┐ ┌─────────────┐ │
│ │ Fraud │ │ Compliance │ │
│ │ Detection │ │ Check │ │
│ └─────────────┘ └─────────────┘ │
│ │
└────────────────────────────────────────────────────────────┘Idempotency Keys
Idempotency — обязательный паттерн для всех state-changing операций в fintech.
Implementation
public class IdempotencyMiddleware<TContext> : IMiddleware
{
private readonly NpgsqlDataSource _dataSource;
private readonly RequestDelegate _next;
public IdempotencyMiddleware(NpgsqlDataSource dataSource, RequestDelegate next)
{
_dataSource = dataSource;
_next = next;
}
public async Task InvokeAsync(HttpContext context, CancellationToken ct)
{
var idempotencyKey = context.Request.Headers["Idempotency-Key"].ToString();
if (!string.IsNullOrEmpty(idempotencyKey))
{
// Check if result already exists
var cached = await GetCachedResultAsync(idempotencyKey, ct);
if (cached != null)
{
context.Response.StatusCode = cached.StatusCode;
context.Response.ContentType = "application/json";
await context.Response.WriteAsync(cached.Body, ct);
return;
}
// Store pending state
await StorePendingStateAsync(idempotencyKey, ct);
}
await _next(context);
// Cache result
if (!string.IsNullOrEmpty(idempotencyKey))
{
await StoreResultAsync(idempotencyKey, context.Response, ct);
}
}
private async Task<CachedResult?> GetCachedResultAsync(string key, CancellationToken ct)
{
using var conn = await _dataSource.OpenConnectionAsync(ct);
return await conn.QueryFirstOrDefaultAsync<CachedResult>(
"""
SELECT status_code, content_type, body
FROM idempotency_cache
WHERE key = @Key AND expires_at > NOW()
""", new { Key = key }, ct: ct);
}
private Task StoreResultAsync(string key, HttpResponse response, CancellationToken ct)
{
return _dataSource.ExecuteAsync(
"""
INSERT INTO idempotency_cache (key, status_code, content_type, body, expires_at)
VALUES (@Key, @Status, @Type, @Body, @Expires)
ON CONFLICT (key) DO NOTHING
""",
new {
Key = key,
Status = response.StatusCode,
Type = response.ContentType ?? "application/json",
Body = "cached_response_body",
Expires = DateTimeOffset.UtcNow + TimeSpan.FromHours(24)
}, ct: ct);
}
}
public record CachedResult(int StatusCode, string ContentType, string Body);Сравнение паттернов
| Паттерн | Consistency | Complexity | Use Case | Fintech适用 |
|---|---|---|---|---|
| 2PC | Strong | Высокая | Monolithic distributed | ❌ lock contention |
| Saga (choreography) | Eventual | Средняя | Simple workflows | ⚠️ hard to debug |
| Saga (orchestration) | Eventual | Средняя | Complex financial flows | ✅ recommended |
| Outbox + Inbox | Exactly-once | Низкая | Event publishing/consuming | ✅ essential |
| CQRS | Strong (write) / Eventual (read) | Средняя | High-throughput reads | ✅ recommended |
| Event Sourcing | Strong | Высокая | Audit trails, time-travel | ✅ for ledger |
| Idempotency keys | Exactly-once | Низкая | All state-changing APIs | ✅ essential |
Checklist для distributed consistency
- [ ] Saga pattern для всех cross-service транзакций
- [ ] Saga state persisted для crash recovery
- [ ] Idempotent compensating transactions
- [ ] Outbox pattern для event publishing
- [ ] Inbox pattern для exactly-once event processing
- [ ] Idempotency keys на всех state-changing API
- [ ] CQRS split: strong consistency (write) + eventual (read)
- [ ] Reconciliation pipeline для verification
- [ ] No interleaving sagas на одних данных
- [ ] Distributed tracing для saga debugging
Дополнительные материалы
- Chris Richardson, "Microservices Patterns" — Saga chapter
- Sam Newman, "Building Microservices" — Chapter 6: Transactional Messaging
- RIG Model: https://www.infoq.com/articles/rig-data-consistent-microservices/
- Martin Fowler, "Saga Pattern": https://martinfowler.com/articles/saga.html
Практика
Архитектура финансовых реестров
Введение
Financial ledger — сердце любой fintech системы. Это append-only, double-entry journal, который является source of truth для всех финансовых операций. Правильная архитектура ledger определяет корректность, масштабируемость и auditable всей системы.
Ledger Architecture: Core Principles
принципов financial ledger
| # | Принцип | Описание |
|---|---|---|
| 1 | Double-entry | sum(debits) = sum(credits) для каждой записи |
| 2 | Append-only | Zero UPDATE/DELETE на journal entries |
| 3 | Idempotent | Idempotency keys предотвращают дублирование |
| 4 | Atomic | Posting journal entry + events в одной транзакции |
| 5 | Immutable audit | Tamper-evident audit log с cryptographic hashes |
| 6 | Reconcilable | Automated reconciliation journal vs balances |
| 7 | Strongly consistent | Serializable isolation для posting operations |
Ledger Schema Design
PostgreSQL Schema
-- Accounts table
CREATE TABLE accounts (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
account_number VARCHAR(50) NOT NULL UNIQUE,
name VARCHAR(200) NOT NULL,
currency CHAR(3) NOT NULL CHECK (currency ~ '^[A-Z]{3}$'),
type VARCHAR(20) NOT NULL CHECK (type IN ('ASSET', 'LIABILITY', 'EQUITY', 'REVENUE', 'EXPENSE')),
status VARCHAR(20) NOT NULL DEFAULT 'ACTIVE' CHECK (status IN ('ACTIVE', 'SUSPENDED', 'CLOSED')),
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
CREATE INDEX idx_accounts_currency ON accounts(currency);
CREATE INDEX idx_accounts_status ON accounts(status);
-- Journal entries (immutable, append-only)
CREATE TABLE journal_entries (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
idempotency_key VARCHAR(100) NOT NULL UNIQUE,
timestamp TIMESTAMPTZ NOT NULL DEFAULT NOW(),
description TEXT,
metadata JSONB,
status VARCHAR(20) NOT NULL DEFAULT 'POSTED' CHECK (status IN ('POSTED', 'REVERSED')),
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
-- CHECK constraint: double-entry invariant
ALTER TABLE journal_entries ADD CONSTRAINT chk_double_entry_invariant
CHECK (
(SELECT COALESCE(SUM(amount * CASE WHEN is_debit = true THEN 1 ELSE -1 END), 0)
FROM journal_postings WHERE entry_id = journal_entries.id) = 0
);
-- Journal postings
CREATE TABLE journal_postings (
id BIGSERIAL PRIMARY KEY,
entry_id UUID NOT NULL REFERENCES journal_entries(id) ON DELETE RESTRICT,
account_id UUID NOT NULL REFERENCES accounts(id) ON DELETE RESTRICT,
amount DECIMAL(20,8) NOT NULL CHECK (amount > 0),
is_debit BOOLEAN NOT NULL,
version INTEGER NOT NULL DEFAULT 1,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
CREATE INDEX idx_postings_account ON journal_postings(account_id);
CREATE INDEX idx_postings_entry ON journal_postings(entry_id);
-- Materialized balances (read-optimized)
CREATE MATERIALIZED VIEW account_balances AS
SELECT
jp.account_id,
a.currency,
SUM(jp.amount * CASE WHEN jp.is_debit = true THEN -1 ELSE 1 END) AS balance
FROM journal_postings jp
JOIN journal_entries je ON jp.entry_id = je.id
JOIN accounts a ON jp.account_id = a.id
WHERE je.status = 'POSTED'
GROUP BY jp.account_id, a.currency;
CREATE UNIQUE INDEX uq_account_balances_account ON account_balances(account_id);
-- Refresh trigger
CREATE OR REPLACE FUNCTION refresh_balances()
RETURNS TRIGGER AS $$
BEGIN
REFRESH MATERIALIZED VIEW CONCURRENTLY account_balances;
RETURN NEW;
END;
$$ LANGUAGE plpgsql;
CREATE TRIGGER trg_refresh_balances
AFTER INSERT ON journal_entries
FOR EACH STATEMENT EXECUTE FUNCTION refresh_balances();
-- Outbox events
CREATE TABLE outbox_events (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
aggregate_id UUID NOT NULL,
event_type VARCHAR(100) NOT NULL,
payload JSONB NOT NULL,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
published_at TIMESTAMPTZ,
retry_count INTEGER NOT NULL DEFAULT 0
);
CREATE INDEX idx_outbox_unpublished ON outbox_events(created_at)
WHERE published_at IS NULL;
-- Idempotency keys
CREATE TABLE idempotency_keys (
key VARCHAR(100) PRIMARY KEY,
result JSONB NOT NULL,
expires_at TIMESTAMPTZ NOT NULL
);
CREATE INDEX idx_idempotency_expires ON idempotency_keys(expires_at)
WHERE expires_at > NOW();
-- Audit log (tamper-evident)
CREATE TABLE audit_log (
id BIGSERIAL PRIMARY KEY,
event_type VARCHAR(100) NOT NULL,
actor_id UUID,
entity_type VARCHAR(50) NOT NULL,
entity_id UUID,
action VARCHAR(50) NOT NULL,
old_state JSONB,
new_state JSONB,
metadata JSONB,
ip_address INET,
user_agent TEXT,
hash TEXT,
prev_hash TEXT,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
CREATE INDEX idx_audit_log_entity ON audit_log(entity_type, entity_id);
CREATE INDEX idx_audit_log_created ON audit_log(created_at);Ledger Service Implementation
Posting Engine
public record JournalEntry(
Guid Id,
string IdempotencyKey,
DateTimeOffset Timestamp,
string Description,
IReadOnlyList<JournalPosting> Postings,
Dictionary<string, object>? Metadata = null);
public record JournalPosting(
Guid AccountId,
decimal Amount,
bool IsDebit,
int Version);
public record PostJournalEntryResult(
bool Success,
Guid? EntryId,
string? Error,
bool IsDuplicate);
public class PostingEngine
{
private readonly NpgsqlDataSource _dataSource;
private readonly IEventBus _eventBus;
private readonly ILogger<PostingEngine> _logger;
public PostingEngine(NpgsqlDataSource dataSource, IEventBus eventBus,
ILogger<PostingEngine> logger)
{
_dataSource = dataSource;
_eventBus = eventBus;
_logger = logger;
}
public async Task<PostJournalEntryResult> PostAsync(JournalEntry entry,
CancellationToken ct)
{
// Validate double-entry invariant
var netAmount = entry.Postings.Sum(p => p.Amount * (p.IsDebit ? 1m : -1m));
if (Math.Abs(netAmount) > decimal.Epsilon)
return PostJournalEntryResult.Failure(null,
$"Double-entry invariant violated: net amount = {netAmount}");
using var conn = await _dataSource.OpenConnectionAsync(ct);
using var txn = await conn.BeginTransactionAsync(IsolationLevel.Serializable, ct);
try
{
// 1. Idempotency check
var existingEntryId = await conn.QueryFirstOrDefaultAsync<Guid?>(
"SELECT id FROM journal_entries WHERE idempotency_key = @Key",
new { Key = entry.IdempotencyKey }, transaction: txn, ct: ct);
if (existingEntryId.HasValue)
return PostJournalEntryResult.Duplicate(existingEntryId.Value);
// 2. Insert journal entry
await conn.ExecuteAsync(
"""
INSERT INTO journal_entries (id, idempotency_key, timestamp, description, metadata, status)
VALUES (@Id, @Key, @Ts, @Desc, @Meta, 'POSTED')
""",
new {
Id = entry.Id,
Key = entry.IdempotencyKey,
Ts = entry.Timestamp,
Desc = entry.Description,
Meta = entry.Metadata != null ? JsonSerializer.Serialize(entry.Metadata) : (object)DBNull.Value
},
transaction: txn, ct: ct);
// 3. Insert postings
var postings = entry.Postings.Select(p => new
{
EntryId = entry.Id,
p.AccountId,
p.Amount,
p.IsDebit,
p.Version
});
await conn.ExecuteAsync(
"""
INSERT INTO journal_postings (entry_id, account_id, amount, is_debit, version)
VALUES (@EntryId, @AccountId, @Amount, @IsDebit, @Version)
""",
postings,
transaction: txn, ct: ct);
// 4. Publish outbox event (same transaction!)
var eventId = Guid.NewGuid();
var payload = JsonSerializer.Serialize(new
{
entry.Id,
entry.IdempotencyKey,
entry.Description,
entry.Postings.Count,
entry.Timestamp
});
await conn.ExecuteAsync(
"""
INSERT INTO outbox_events (id, aggregate_id, event_type, payload, created_at)
VALUES (@Id, @AggId, @Type, @Payload, @Created)
""",
new {
Id = eventId,
AggId = entry.Id,
Type = "JournalEntryPosted",
Payload = payload,
Created = DateTimeOffset.UtcNow
},
transaction: txn, ct: ct);
// 5. Audit log
await conn.ExecuteAsync(
"""
INSERT INTO audit_log (event_type, entity_type, entity_id, action, new_state, created_at)
VALUES ('LEDGER_POST', 'JOURNAL_ENTRY', @EntryId, 'POSTED', @State, NOW())
""",
new { EntryId = entry.Id, State = payload },
transaction: txn, ct: ct);
await txn.CommitAsync();
_logger.LogInformation("Journal entry {EntryId} posted with {PostingCount} postings",
entry.Id, entry.Postings.Count);
return PostJournalEntryResult.Success(entry.Id);
}
catch (DbException ex) when (ex.Message.Contains("chk_double_entry_invariant"))
{
await txn.RollbackAsync(ct);
return PostJournalEntryResult.Failure(null, "Double-entry invariant violated");
}
catch (DbException ex) when (ex.Message.Contains("uq_journal_idempotency"))
{
await txn.RollbackAsync(ct);
var existingId = await conn.QueryFirstOrDefaultAsync<Guid?>(
"SELECT id FROM journal_entries WHERE idempotency_key = @Key",
new { Key = entry.IdempotencyKey }, ct: ct);
return PostJournalEntryResult.Duplicate(existingId ?? Guid.Empty);
}
catch
{
await txn.RollbackAsync(ct);
throw;
}
}
}
public static class PostJournalEntryResult
{
public static PostJournalEntryResult Success(Guid entryId) =>
new(true, entryId, null, false);
public static PostJournalEntryResult Failure(Guid? entryId, string error) =>
new(false, entryId, error, false);
public static PostJournalEntryResult Duplicate(Guid entryId) =>
new(true, entryId, "Duplicate idempotency key", true);
}Transfer Service
public record TransferCommand(
Guid SourceAccountId,
Guid DestinationAccountId,
decimal Amount,
string Currency,
string IdempotencyKey,
string Description);
public record TransferResult(
bool Success,
Guid? EntryId,
string? Error,
bool IsDuplicate);
public class TransferService
{
private readonly PostingEngine _postingEngine;
private readonly NpgsqlDataSource _dataSource;
private readonly ILogger<TransferService> _logger;
public TransferService(PostingEngine postingEngine, NpgsqlDataSource dataSource,
ILogger<TransferService> logger)
{
_postingEngine = postingEngine;
_dataSource = dataSource;
_logger = logger;
}
public async Task<TransferResult> ExecuteAsync(TransferCommand cmd,
CancellationToken ct)
{
// Validate amounts
if (cmd.Amount <= 0)
return TransferResult.Failure(null, "Amount must be positive");
// Validate accounts exist
var accountsExist = await ValidateAccountsAsync(cmd, ct);
if (!accountsExist)
return TransferResult.Failure(null, "One or both accounts not found");
// Check balance (strong consistency from writer)
var sourceBalance = await GetWriterBalanceAsync(cmd.SourceAccountId, ct);
if (sourceBalance < cmd.Amount)
return TransferResult.Failure(null,
$"Insufficient funds: {sourceBalance} < {cmd.Amount}");
// Create double-entry journal
var entry = new JournalEntry(
Id: Guid.NewGuid(),
IdempotencyKey: cmd.IdempotencyKey,
Timestamp: DateTimeOffset.UtcNow,
Description: cmd.Description ?? $"Transfer {cmd.SourceAccountId} → {cmd.DestinationAccountId}",
Postings: new List<JournalPosting>
{
new JournalPosting(cmd.SourceAccountId, cmd.Amount, IsDebit: true, 1),
new JournalPosting(cmd.DestinationAccountId, cmd.Amount, IsDebit: false, 1)
}
);
var result = await _postingEngine.PostAsync(entry, ct);
if (!result.Success)
return TransferResult.Failure(result.EntryId, result.Error ?? "Unknown error");
if (result.IsDuplicate)
return TransferResult.Duplicate(result.EntryId ?? Guid.Empty);
return TransferResult.Success(result.EntryId ?? Guid.Empty);
}
private async Task<bool> ValidateAccountsAsync(TransferCommand cmd, CancellationToken ct)
{
using var conn = await _dataSource.OpenConnectionAsync(ct);
var count = await conn.QueryFirstOrDefaultAsync<int>(
"""
SELECT COUNT(*) FROM accounts
WHERE id IN (@Source, @Dest) AND status = 'ACTIVE'
""",
new { Source = cmd.SourceAccountId, Dest = cmd.DestinationAccountId },
ct: ct);
return count == 2;
}
private async Task<decimal> GetWriterBalanceAsync(Guid accountId, CancellationToken ct)
{
using var conn = await _dataSource.OpenConnectionAsync(ct);
return await conn.QueryFirstOrDefaultAsync<decimal>(
"""
SELECT COALESCE(SUM(amount * CASE WHEN is_debit = true THEN -1 ELSE 1 END), 0)
FROM journal_postings jp
JOIN journal_entries je ON jp.entry_id = je.id
WHERE jp.account_id = @Id AND je.status = 'POSTED'
""",
new { Id = accountId }, ct: ct);
}
}
public static class TransferResult
{
public static TransferResult Success(Guid entryId) =>
new(true, entryId, null, false);
public static TransferResult Failure(Guid? entryId, string error) =>
new(false, entryId, error, false);
public static TransferResult Duplicate(Guid entryId) =>
new(true, entryId, "Duplicate idempotency key", true);
}Balance Types и Consistency Levels
Три типа баланса
| Тип | Источник | Consistency | Latency | Use Case |
|---|---|---|---|---|
| Writer | Direct journal query | Strong (serializable) | ~10ms | Transfer validation, financial decisions |
| Session | Writer после write | Session-level | ~5ms | "Check balance after transfer" |
| Materialized | Materialized view | Eventually consistent | ~1ms | UI display, reporting |
public enum BalanceConsistency
{
Strong, // Writer: strongest consistency
Session, // Read-your-writes guarantee
Eventual // Materialized view
}
public class BalanceService
{
private readonly NpgsqlDataSource _writeDb;
private readonly NpgsqlDataSource _readDb;
public async Task<decimal> GetBalanceAsync(Guid accountId,
BalanceConsistency consistency, CancellationToken ct)
{
return consistency switch
{
BalanceConsistency.Strong => await GetStrongBalanceAsync(accountId, ct),
BalanceConsistency.Session => await GetSessionBalanceAsync(accountId, ct),
BalanceConsistency.Eventual => await GetEventualBalanceAsync(accountId, ct),
_ => throw new ArgumentOutOfRangeException(nameof(consistency), consistency, null)
};
}
private async Task<decimal> GetStrongBalanceAsync(Guid accountId, CancellationToken ct)
{
using var conn = await _writeDb.OpenConnectionAsync(ct);
return await conn.QueryFirstOrDefaultAsync<decimal>(
"""
SELECT COALESCE(SUM(amount * CASE WHEN is_debit = true THEN -1 ELSE 1 END), 0)
FROM journal_postings jp
JOIN journal_entries je ON jp.entry_id = je.id
WHERE jp.account_id = @Id AND je.status = 'POSTED'
""",
new { Id = accountId }, ct: ct);
}
private async Task<decimal> GetSessionBalanceAsync(Guid accountId,
CancellationToken ct)
{
// Use session-level consistency: read from writer after last write
// Implementation depends on connection pooling strategy
return await GetStrongBalanceAsync(accountId, ct);
}
private async Task<decimal> GetEventualBalanceAsync(Guid accountId,
CancellationToken ct)
{
using var conn = await _readDb.OpenConnectionAsync(ct);
return await conn.QueryFirstOrDefaultAsync<decimal>(
"SELECT balance FROM account_balances WHERE account_id = @Id",
new { Id = accountId }, ct: ct);
}
}Reconciliation Pipeline
Automated Reconciliation
public class ReconciliationService
{
private readonly NpgsqlDataSource _dataSource;
private readonly IEventBus _eventBus;
private readonly ILogger<ReconciliationService> _logger;
public ReconciliationService(NpgsqlDataSource dataSource, IEventBus eventBus,
ILogger<ReconciliationService> logger)
{
_dataSource = dataSource;
_eventBus = eventBus;
_logger = logger;
}
public async Task<ReconciliationReport> RunAsync(CancellationToken ct)
{
var discrepancies = new List<Discrepancy>();
using var conn = await _dataSource.OpenConnectionAsync(ct);
// Get all active accounts
var accounts = await conn.QueryAsync<AccountInfo>(
"SELECT id, currency FROM accounts WHERE status = 'ACTIVE'", ct: ct);
foreach (var account in accounts)
{
// Derived balance from journal entries (source of truth)
var derivedBalance = await conn.QueryFirstOrDefaultAsync<decimal>(
"""
SELECT COALESCE(SUM(jp.amount * CASE WHEN jp.is_debit = true THEN -1 ELSE 1 END), 0)
FROM journal_postings jp
JOIN journal_entries je ON jp.entry_id = je.id
WHERE jp.account_id = @Id AND je.status = 'POSTED'
""",
new { Id = account.Id }, ct: ct);
// Materialized balance (cached, should match)
var materializedBalance = await conn.QueryFirstOrDefaultAsync<decimal?>(
"SELECT balance FROM account_balances WHERE account_id = @Id",
new { Id = account.Id }, ct: ct);
var materialized = materializedBalance.GetValueOrDefault();
// Compare with tolerance
if (Math.Abs(derivedBalance - materialized) > decimal.Epsilon)
{
var discrepancy = new Discrepancy(
AccountId: account.Id,
Currency: account.Currency,
DerivedBalance: derivedBalance,
MaterializedBalance: materialized,
Difference: derivedBalance - materialized,
DetectedAt: DateTimeOffset.UtcNow
);
discrepancies.Add(discrepancy);
_logger.LogWarning(
"Reconciliation discrepancy: Account={AccountId} Currency={Currency} " +
"Derived={Derived} Materialized={Materialized} Diff={Diff}",
account.Id, account.Currency, derivedBalance, materialized,
discrepancy.Difference);
// Publish reconciliation alert
await _eventBus.PublishAsync(new ReconciliationAlertEvent(discrepancy), ct);
}
}
var report = new ReconciliationReport(
RanAt: DateTimeOffset.UtcNow,
AccountsScanned: accounts.Count,
Discrepancies: discrepancies);
_logger.LogInformation(
"Reconciliation completed: {Accounts} scanned, {Discrepancies} discrepancies",
report.AccountsScanned, report.Discrepancies.Count);
return report;
}
}
public record AccountInfo(Guid Id, string Currency);
public record Discrepancy(
Guid AccountId,
string Currency,
decimal DerivedBalance,
decimal MaterializedBalance,
decimal Difference,
DateTimeOffset DetectedAt);
public record ReconciliationReport(
DateTimeOffset RanAt,
int AccountsScanned,
IReadOnlyList<Discrepancy> Discrepancies);
public record ReconciliationAlertEvent(Discrepancy Discrepancy);Ledger Architecture: Сравнение реализаций
TigerBeetle
TigerBeetle — high-performance financial accounting database:
- Strict serializability через replicated state machine
- Fixed-size data structures для performance
- Cryptographic checksums для tamper detection
- Leader-based timestamping (no NTP dependency)
- Synchronous replication
PostgreSQL (custom ledger)
PostgreSQL с custom schema — наиболее распространённый подход:
- ACID транзакции, SERIALIZABLE isolation
- CHECK constraints для double-entry
- Materialized views для balances
- Outbox pattern для events
- Full control над schema и constraints
CockroachDB
CockroachDB для multi-region ledger:
- Distributed SQL с strong consistency
- Automatic replication и rebalancing
- Raft consensus для distributed transactions
- Global transactions без sharding logic
Выбор реализации
| Критерий | PostgreSQL | TigerBeetle | CockroachDB |
|---|---|---|---|
| Strong consistency | ✅ | ✅ | ✅ |
| Double-entry enforcement | ✅ CHECK | ✅ built-in | ✅ CHECK |
| Multi-region | ⚠️ manual | ⚠️ limited | ✅ native |
| Performance | 10k-50k TPS | 100k+ TPS | 10k-30k TPS |
| Operational complexity | Низкая | Средняя | Высокая |
| Ecosystem | Mature | Emerging | Mature |
| Best for | Most fintechs | High-throughput | Multi-region |
Checklist для Ledger Architecture
- [ ] Double-entry enforced на уровне БД (CHECK constraint)
- [ ] Append-only journal: zero UPDATE/DELETE
- [ ] Idempotency keys с UNIQUE constraint
- [ ] SERIALIZABLE isolation для posting
- [ ] Outbox pattern для event publishing
- [ ] Materialized balances с automatic refresh
- [ ] Reconciliation pipeline: automated + alerting
- [ ] Audit log: immutable, tamper-evident
- [ ] Balance types: Strong / Session / Eventual
- [ ] CAP boundary: CP для core ledger, AP для periphery
Дополнительные материалы
- TigerBeetle Design: https://github.com/tigerbeetledb/tigerbeetle/blob/main/docs/DESIGN.md
- Trio, "Payment Ledger Architecture": https://trio.dev/payment-ledger-architecture-fintech/
- Gothar, "ACID Is a Contract, Not a Religion": https://gothar.com/en/insights/acid-ledger-correctness-2026
- Chronicle Ledger (Event Sourcing + CQRS): https://github.com/Kimosabey/chronicle-ledger
- AWS Aurora DSQL for Financial Transactions: https://aws.amazon.com/blogs/database/amazon-aurora-dsql-for-global-scale-financial-transactions/
Практика
Регуляторные требования и compliance
Введение
Fintech компании работают в условиях строгого регуляторного контроля. Compliance — не опция, а architectural constraint, который должен быть заложен в архитектуру с day one.
Ключевые регуляторные стандарты
PCI DSS (Payment Card Industry Data Security Standard)
Требования для систем, работающих с платёжными картами:
| Requirement | Описание | Architectural Impact |
|---|---|---|
| 1 | Firewall для защиты cardholder data | Network segmentation, VPC isolation |
| 2 | No default passwords | Credential management, secrets rotation |
| 3 | Protect stored cardholder data | Encryption at rest, tokenization |
| 4 | Encrypt transmission of cardholder data | TLS 1.2+, mutual TLS |
| 5 | Antivirus protection | Endpoint security |
| 6 | Secure systems and applications | Secure SDLC, code scanning |
| 7 | Restrict access to cardholder data | RBAC, least privilege |
| 8 | Identify and authenticate access | MFA, session management |
| 9 | Physical access controls | Data center security |
| 10 | Track and monitor access | Audit logs, monitoring |
| 11 | Test security systems | Penetration testing, vulnerability scanning |
-- PCI DSS: Encrypting stored cardholder data
-- NEVER store CVV2/CVC2
-- PAN (Primary Account Number) must be masked or truncated
ALTER TABLE payment_cards ADD COLUMN pan_hash TEXT NOT NULL;
ALTER TABLE payment_cards ADD COLUMN pan_last4 CHAR(4) NOT NULL;
ALTER TABLE payment_cards ADD COLUMN pan_encrypted BYTEA;
-- Example: PAN hashing (PCI DSS Requirement 3.4)
-- Use strong one-way hash (AES-256 encryption with key rotation)
CREATE OR REPLACE FUNCTION hash_pan(pan_text TEXT, key_id INTEGER)
RETURNS TEXT AS $$
BEGIN
-- Encrypt PAN with AES-256
RETURN encode(
pgp_sym_encrypt(pan_text, get_encryption_key(key_id)),
'hex'
);
END;
$$ LANGUAGE plpgsql SECURITY DEFINER;
-- Mask PAN for display (show only last 4 digits)
CREATE OR REPLACE FUNCTION mask_pan(pan_text TEXT)
RETURNS TEXT AS $$
BEGIN
RETURN '****' || right(pan_text, 4);
END;
$$ LANGUAGE plpgsql;
-- Tokenization for PCI compliance
CREATE TABLE payment_tokens (
token_id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
pan_hash TEXT NOT NULL UNIQUE,
token_value VARCHAR(255) NOT NULL UNIQUE,
customer_id UUID NOT NULL,
card_network VARCHAR(20) NOT NULL,
last4 CHAR(4) NOT NULL,
expiry_month INTEGER NOT NULL,
expiry_year INTEGER NOT NULL,
is_active BOOLEAN NOT NULL DEFAULT true,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
expires_at TIMESTAMPTZ NOT NULL
);
-- PCI DSS: No CVV2 storage
-- CVV2 is collected at transaction time only, never stored
-- Table: payment_cards should NOT have cvv2 columnGDPR (General Data Protection Regulation)
Для fintech с EU customers:
| Требование | Описание | Implementation |
|---|---|---|
| Right to access | User can request all personal data | Data export API |
| Right to erasure | User can request data deletion | Anonymization pipeline |
| Data portability | User can export data in structured format | CSV/JSON export |
| Consent management | Explicit consent for data processing | Consent tracking table |
| Data minimization | Collect only necessary data | Schema design review |
| Purpose limitation | Use data only for stated purpose | Data classification |
-- GDPR: Consent tracking
CREATE TABLE user_consents (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
user_id UUID NOT NULL,
consent_type VARCHAR(100) NOT NULL, -- 'MARKETING', 'DATA_PROCESSING', 'THIRD_PARTY_SHARING'
consent_given BOOLEAN NOT NULL,
consent_text TEXT NOT NULL, -- version of consent text
ip_address INET,
user_agent TEXT,
granted_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
withdrawn_at TIMESTAMPTZ,
CONSTRAINT chk_withdrawn CHECK (withdrawn_at IS NULL OR granted_at < withdrawn_at)
);
CREATE INDEX idx_consents_user ON user_consents(user_id, consent_type);
-- GDPR: Right to erasure (anonymization)
CREATE OR REPLACE FUNCTION anonymize_user_data(target_user_id UUID)
RETURNS VOID AS $$
DECLARE
consent_record user_consents%ROWTYPE;
BEGIN
-- Anonymize personal data in consents
UPDATE user_consents
SET user_id = gen_random_uuid(), -- unlink from identity
ip_address = '0.0.0.0',
user_agent = NULL,
withdrawn_at = NOW()
WHERE user_id = target_user_id;
-- Anonymize audit log (keep for compliance, remove PII)
UPDATE audit_log
SET actor_id = gen_random_uuid(),
metadata = jsonb_set(metadata, '{pii}', '"anonymized"'::jsonb)
WHERE entity_id = target_user_id AND entity_type = 'USER';
END;
$$ LANGUAGE plpgsql SECURITY DEFINER;Data Residency
Требование хранить данные клиентов в определённой юрисдикции:
// Data residency routing
public class DataResidencyRouter
{
private readonly Dictionary<string, string> _customerRegionMap;
private readonly Dictionary<string, NpgsqlDataSource> _regionDataSources;
public DataResidencyRouter(
Dictionary<string, string> customerRegionMap,
Dictionary<string, NpgsqlDataSource> regionDataSources)
{
_customerRegionMap = customerRegionMap;
_regionDataSources = regionDataSources;
}
public NpgsqlDataSource GetDataSourceForCustomer(Guid customerId)
{
var region = _customerRegionMap[customerId.ToString()];
return _regionDataSources[region];
}
// Example regions: 'EU', 'US', 'APAC'
// Each region has its own database instance
}
// Multi-region PostgreSQL configuration
var regionConfigurations = new Dictionary<string, string>
{
["EU"] = "Host=eu-postgres.internal;Database=ledger_eu;...",
["US"] = "Host=us-postgres.internal;Database=ledger_us;...",
["APAC"] = "Host=apac-postgres.internal;Database=ledger_apac;..."
};Audit Trail Requirements
Regulatory audit requirements
| Regulation | Audit Requirement | Retention Period |
|---|---|---|
| PCI DSS | Track all access to cardholder data | Minimum 1 year (3 years available) |
| GDPR | Record of processing activities | Duration + 3 years |
| SOX | Financial reporting integrity | 7 years |
| CFTC Rule 1.73 | MVCC historical data retention | 5 years (5621 days) |
| BCBS 239 | Risk data aggregation | Ongoing |
| MiFID II | Transaction reporting | 5 years |
| AML/KYC | Customer due diligence | 5 years after relationship ends |
Tamper-Evident Audit Log
-- Cryptographic hash chain for audit log
CREATE TABLE audit_log (
id BIGSERIAL PRIMARY KEY,
event_type VARCHAR(100) NOT NULL,
actor_id UUID,
entity_type VARCHAR(50) NOT NULL,
entity_id UUID,
action VARCHAR(50) NOT NULL,
old_state JSONB,
new_state JSONB,
metadata JSONB,
ip_address INET,
user_agent TEXT,
hash TEXT NOT NULL,
prev_hash TEXT NOT NULL,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
-- Hash chain trigger
CREATE OR REPLACE FUNCTION compute_audit_hash()
RETURNS TRIGGER AS $$
DECLARE
prev_hash_val TEXT;
data_to_hash TEXT;
BEGIN
-- Get previous hash (NULL for first record = genesis)
SELECT COALESCE(hash, 'genesis') INTO prev_hash_val
FROM audit_log ORDER BY id DESC LIMIT 1;
-- Create hash of record content + previous hash
data_to_hash := NEW.id::TEXT || NEW.event_type || NEW.actor_id::TEXT ||
NEW.action || NEW.old_state::TEXT || NEW.new_state::TEXT ||
NEW.created_at::TEXT || prev_hash_val;
NEW.hash := encode(sha256(data_to_hash::bytea), 'hex');
NEW.prev_hash := prev_hash_val;
RETURN NEW;
END;
$$ LANGUAGE plpgsql;
CREATE TRIGGER trg_audit_hash
BEFORE INSERT ON audit_log
FOR EACH ROW EXECUTE FUNCTION compute_audit_hash();
-- Verification function: check entire hash chain
CREATE OR REPLACE FUNCTION verify_audit_chain()
RETURNS TABLE(id BIGINT, valid BOOLEAN, error TEXT) AS $$
DECLARE
prev_hash_val TEXT := 'genesis';
current_record audit_log%ROWTYPE;
BEGIN
FOR current_record IN
SELECT * FROM audit_log ORDER BY id
LOOP
-- Recompute hash
data_to_hash := current_record.id::TEXT || current_record.event_type ||
current_record.actor_id::TEXT || current_record.action ||
current_record.old_state::TEXT || current_record.new_state::TEXT ||
current_record.created_at::TEXT || prev_hash_val;
computed_hash := encode(sha256(data_to_hash::bytea), 'hex');
IF computed_hash != current_record.hash THEN
RETURN NEXT SELECT current_record.id, false,
'Hash mismatch at id ' || current_record.id;
ELSE
prev_hash_val := current_record.hash;
RETURN NEXT SELECT current_record.id, true, NULL;
END IF;
END LOOP;
END;
$$ LANGUAGE plpgsql;Compliance by Design
Privacy by Design
// Data classification
public enum DataClassification
{
Public, // No restrictions
Internal, // Internal use only
Confidential, // Access restricted
Restricted, // PCI DSS / GDPR regulated
HighlyRestricted // Financial secrets, encryption keys
}
// Data classification annotation
public class DataClassifiedAttribute : Attribute
{
public DataClassification Classification { get; }
public string[] AllowedRegions { get; }
public DataClassifiedAttribute(DataClassification classification,
params string[] allowedRegions)
{
Classification = classification;
AllowedRegions = allowedRegions;
}
}
// Example: Customer PII
public class CustomerData
{
[DataClassified(DataClassification.Restricted, "EU")]
public string FullName { get; set; } = string.Empty;
[DataClassified(DataClassification.Restricted, "EU", "US")]
public string Address { get; set; } = string.Empty;
[DataClassified(DataClassification.HighlyRestricted)]
public string Ssn { get; set; } = string.Empty;
[DataClassified(DataClassification.Restricted)]
public List<PaymentCard> Cards { get; set; } = new();
}Compliance Monitoring
public class ComplianceMonitor
{
private readonly NpgsqlDataSource _dataSource;
private readonly IEventBus _eventBus;
private readonly ILogger<ComplianceMonitor> _logger;
public async Task RunComplianceChecksAsync(CancellationToken ct)
{
await CheckPciComplianceAsync(ct);
await CheckGDPRComplianceAsync(ct);
await CheckAuditLogIntegrityAsync(ct);
await CheckDataRetentionAsync(ct);
}
private async Task CheckPciComplianceAsync(CancellationToken ct)
{
using var conn = await _dataSource.OpenConnectionAsync(ct);
// Check: no CVV2 in database
var cvv2Columns = await conn.QueryFirstOrDefaultAsync<int>(
"""
SELECT COUNT(*) FROM information_schema.columns
WHERE column_name IN ('cvv2', 'cvc2', 'cvv')
AND table_name = 'payment_cards'
""", ct: ct);
if (cvv2Columns > 0)
{
_logger.Critical("PCI DSS VIOLATION: CVV2 columns found in payment_cards table");
await _eventBus.PublishAsync(new ComplianceViolationEvent(
"PCI_DSS_VIOLATION_CVV2", "Critical",
"CVV2 columns detected in payment_cards table"));
}
// Check: PAN encryption enabled
var unencryptedPan = await conn.QueryFirstOrDefaultAsync<int>(
"""
SELECT COUNT(*) FROM payment_cards
WHERE pan_encrypted IS NULL AND pan_hash IS NULL
""", ct: ct);
if (unencryptedPan > 0)
{
_logger.LogWarning("PCI DSS WARNING: {Count} unencrypted PANs found", unencryptedPan);
await _eventBus.PublishAsync(new ComplianceViolationEvent(
"PCI_DSS_WARNING_UNENCRYPTED_PAN", "Warning",
$"{unencryptedPan} unencrypted PANs in database"));
}
}
private async Task CheckAuditLogIntegrityAsync(CancellationToken ct)
{
using var conn = await _dataSource.OpenConnectionAsync(ct);
// Verify hash chain
var invalidRecords = await conn.QueryAsync<(long Id, bool Valid, string Error)>(
"SELECT * FROM verify_audit_chain() WHERE valid = false", ct: ct);
if (invalidRecords.Any())
{
_logger.Critical("AUDIT LOG INTEGRITY VIOLATION: {Count} invalid records",
invalidRecords.Count());
await _eventBus.PublishAsync(new ComplianceViolationEvent(
"AUDIT_LOG_INTEGRITY_VIOLATION", "Critical",
$"{invalidRecords.Count()} tampered audit log entries detected"));
}
}
private async Task CheckDataRetentionAsync(CancellationToken ct)
{
using var conn = await _dataSource.OpenConnectionAsync(ct);
// Check: CFTC Rule 1.73 — MVCC historical data ≥ 7 years
var oldData = await conn.QueryFirstOrDefaultAsync<int>(
"""
SELECT COUNT(*) FROM event_store
WHERE created_at < NOW() - INTERVAL '7 years'
AND NOT archived
""", ct: ct);
if (oldData > 0)
{
_logger.LogWarning("Data retention: {Count} records older than 7 years not archived",
oldData);
}
}
}
public record ComplianceViolationEvent(
string ViolationType,
string Severity,
string Description);Checklist для Regulatory Compliance
- [ ] PCI DSS: no CVV2 storage, PAN encryption/tokenization
- [ ] GDPR: consent management, right to erasure, data portability
- [ ] Data residency: region-specific database instances
- [ ] Audit log: tamper-evident с cryptographic hash chain
- [ ] Data retention: automated archival per regulatory requirements
- [ ] Compliance monitoring: automated checks + alerting
- [ ] RBAC: least privilege access to financial data
- [ ] MFA: multi-factor authentication для access
- [ ] Encryption: TLS 1.2+ для transmission, AES-256 для rest
- [ ] Penetration testing: regular security assessments
Дополнительные материалы
- PCI DSS v4.0: https://www.pcisecuritystandards.org/
- GDPR: https://gdpr.eu/
- CFTC Rule 1.73: https://www.law.cornell.edu/cfr/text/17/1.73
- BCBS 239: https://www.bis.org/bcbs/publ/d377.htm
Практика
Consistency tiering и hybrid models
Введение
Consistency tiering — подход, при котором система использует разные уровни согласованности для разных частей данных или разных операций. Это позволяет оптимизировать performance без компромиссов в critical path'ах.
Hybrid Consistency Architecture
Tiered consistency model
┌─────────────────────────────────────────────────────────────────┐
│ Tier 0: Strong Consistency (CP) │
│ │
│ Ledger journal entries │
│ Balance validation for transfers │
│ Idempotency keys │
│ │
│ Database: PostgreSQL / CockroachDB │
│ Isolation: SERIALIZABLE │
│ Latency: 5-50ms │
│ │
│ ──────────────────────────────────────────────────────────── │
│ │
│ Tier 1: Session Consistency │
│ │
│ Read-your-writes guarantee │
│ Balance check after transfer │
│ │
│ Mechanism: Session-affinity routing or writer-read │
│ Latency: 2-10ms │
│ │
│ ──────────────────────────────────────────────────────────── │
│ │
│ Tier 2: Eventual Consistency (AP) │
│ │
│ Materialized balances │
│ Account history (UI) │
│ Reporting dashboard │
│ │
│ Database: PostgreSQL materialized views / Redis │
│ Refresh: Event-driven (outbox → event bus → processor) │
│ Latency: <1ms (cached) │
│ Staleness: 10-500ms │
│ │
│ ──────────────────────────────────────────────────────────── │
│ │
│ Tier 3: Best-Effort (Analytics) │
│ │
│ Aggregates, ML features, fraud detection │
│ Data warehouse (Snowflake, BigQuery) │
│ │
│ Refresh: Batch (hourly/daily) │
│ Latency: N/A (analytical queries) │
└─────────────────────────────────────────────────────────────────┘Implementation: Consistency Router
public enum ConsistencyTier
{
Strong, // Tier 0: ledger, balance validation
Session, // Tier 1: read-your-writes
Eventual, // Tier 2: materialized views
Analytics // Tier 3: data warehouse
}
public class ConsistencyRouter
{
private readonly NpgsqlDataSource _writeDb;
private readonly NpgsqlDataSource _readDb;
private readonly RedisConnection _cache;
private readonly AnalyticsDataSource _analytics;
public async Task<T> ExecuteWithConsistencyAsync<T>(
ConsistencyTier tier,
Func<NpgsqlConnection, Task<T>> strongOperation,
Func<NpgsqlConnection, Task<T>> eventualOperation,
Func<Task<T>> analyticsOperation,
CancellationToken ct)
{
return tier switch
{
ConsistencyTier.Strong => await ExecuteStrongAsync(strongOperation, ct),
ConsistencyTier.Session => await ExecuteSessionAsync(strongOperation, ct),
ConsistencyTier.Eventual => await ExecuteEventualAsync(eventualOperation, ct),
ConsistencyTier.Analytics => await analyticsOperation(),
_ => throw new ArgumentOutOfRangeException(nameof(tier), tier, null)
};
}
private async Task<T> ExecuteStrongAsync<T>(
Func<NpgsqlConnection, Task<T>> operation, CancellationToken ct)
{
using var conn = await _writeDb.OpenConnectionAsync(ct);
return await operation(conn);
}
private async Task<T> ExecuteSessionAsync<T>(
Func<NpgsqlConnection, Task<T>> operation, CancellationToken ct)
{
// Session affinity: route to same node that handled the write
// Implementation depends on connection pooling strategy
return await ExecuteStrongAsync(operation, ct);
}
private async Task<T> ExecuteEventualAsync<T>(
Func<NpgsqlConnection, Task<T>> operation, CancellationToken ct)
{
// Try cache first
var cached = await _cache.GetAsync<T>("balance");
if (cached != null)
return cached;
// Fallback to materialized view
using var conn = await _readDb.OpenConnectionAsync(ct);
return await operation(conn);
}
}Quorum-Based Checkpoints
Quorum checkpoints — периодическая синхронизация состояния между нодами через quorum consensus.
Quorum Checkpoint Pattern
Каждые N секунд (например, 30s):
1. Каждая нода создаёт snapshot своего состояния
2. Snapshots отправляются quorum нод (R > N/2)
3. Quorum ноды сравнивают snapshots
4. При mismatch — trigger reconciliation
5. При match — checkpoint committedpublic class QuorumCheckpointService
{
private readonly List<CheckpointNode> _nodes;
private readonly NpgsqlDataSource _dataSource;
private readonly ILogger<QuorumCheckpointService> _logger;
private readonly TimeSpan _checkpointInterval = TimeSpan.FromSeconds(30);
public async Task RunCheckpointLoopAsync(CancellationToken ct)
{
while (!ct.IsCancellationRequested)
{
try
{
await RunCheckpointAsync(ct);
}
catch (Exception ex)
{
_logger.LogError(ex, "Checkpoint failed");
}
await Task.Delay(_checkpointInterval, ct);
}
}
private async Task RunCheckpointAsync(CancellationToken ct)
{
// 1. Create local snapshot
var localSnapshot = await CreateSnapshotAsync(ct);
// 2. Send to quorum nodes
var responses = new List<(string NodeId, bool Accepted, string Hash)>();
foreach (var node in _nodes.Where(n => n.Id != LocalNodeId))
{
try
{
var response = await node.SendSnapshotAsync(localSnapshot, ct);
responses.Add((node.Id, response.Accepted, response.Hash));
}
catch
{
// Node unreachable — skip
}
}
// 3. Check quorum (R + W > N)
var acceptedCount = responses.Count(r => r.Accepted);
var quorumRequired = (_nodes.Count / 2) + 1;
if (acceptedCount >= quorumRequired)
{
// 4. Verify hash consistency
var hashes = responses.Where(r => r.Accepted).Select(r => r.Hash).ToList();
var majorityHash = GetMajorityHash(hashes);
if (majorityHash != localSnapshot.Hash)
{
// Mismatch detected — trigger reconciliation
await TriggerReconciliationAsync(localSnapshot, ct);
}
else
{
_logger.LogInformation("Checkpoint committed at {Timestamp}",
localSnapshot.Timestamp);
}
}
else
{
_logger.LogWarning("Checkpoint failed: {Accepted}/{Required} nodes accepted",
acceptedCount, quorumRequired);
}
}
private async Task<Snapshot> CreateSnapshotAsync(CancellationToken ct)
{
using var conn = await _dataSource.OpenConnectionAsync(ct);
// Capture consistent snapshot of account balances
var balances = await conn.QueryAsync<(Guid AccountId, decimal Balance)>(
"SELECT account_id, balance FROM account_balances", ct: ct);
var hash = ComputeSnapshotHash(balances);
return new Snapshot(
Timestamp: DateTimeOffset.UtcNow,
Balances: balances.ToList(),
Hash: hash
);
}
private string ComputeSnapshotHash(IEnumerable<(Guid, decimal)> balances)
{
var data = string.Join("|", balances.OrderBy(b => b.Item1)
.Select(b => $"{b.Item1}:{b.Item2}"));
using var sha256 = System.Security.Cryptography.SHA256.Create();
var hashBytes = sha256.ComputeHash(Encoding.UTF8.GetBytes(data));
return Convert.ToHexString(hashBytes);
}
private string GetMajorityHash(List<string> hashes)
{
var counts = hashes.GroupBy(h => h)
.OrderByDescending(g => g.Count())
.ToList();
return counts.First().Key;
}
private Task TriggerReconciliationAsync(Snapshot snapshot, CancellationToken ct)
{
_logger.LogWarning("Hash mismatch detected at {Timestamp}, triggering reconciliation",
snapshot.Timestamp);
// Trigger full reconciliation
return Task.CompletedTask;
}
}
public record Snapshot(
DateTimeOffset Timestamp,
List<(Guid AccountId, decimal Balance)> Balances,
string Hash);Temporal Sequence Barrier
Temporal Sequence Barrier — consistency model для asynchronous high-throughput ledger systems, которыйCombines logical vector clocks с epoch-based orchestration для обеспечения causal consistency.
Vector Clocks для causal ordering
public class VectorClock
{
private readonly Dictionary<string, long> _clocks = new();
public void Increment(string nodeId)
{
_clocks[nodeId] = _clocks.GetValueOrDefault(nodeId, 0) + 1;
}
public void Merge(VectorClock other)
{
foreach (var (nodeId, timestamp) in other._clocks)
{
_clocks[nodeId] = Math.Max(_clocks.GetValueOrDefault(nodeId, 0), timestamp);
}
}
public bool HappensBefore(VectorClock other)
{
bool atLeastOneLess = false;
foreach (var nodeId in _clocks.Keys.Union(other._clocks.Keys))
{
var myTime = _clocks.GetValueOrDefault(nodeId, 0);
var otherTime = other._clocks.GetValueOrDefault(nodeId, 0);
if (myTime > otherTime)
return false;
if (myTime < otherTime)
atLeastOneLess = true;
}
return atLeastOneLess;
}
public bool ConcurrentsWith(VectorClock other)
{
return !HappensBefore(other) && !other.HappensBefore(this);
}
}
// Event with vector clock for causal ordering
public record CausallyOrderedEvent(
Guid Id,
string EventType,
string Payload,
VectorClock VectorClock,
Guid? DependsOn, // explicit dependency
DateTimeOffset LogicalTime,
string SourceRegion);Epoch-based orchestration
public class EpochOrchestrator
{
private readonly TimeSpan _epochDuration = TimeSpan.FromSeconds(5);
private long _currentEpoch = 0;
private readonly Dictionary<string, Queue<CausallyOrderedEvent>> _regionQueues = new();
public async Task ProcessEventsAsync(IEnumerable<CausallyOrderedEvent> events,
CancellationToken ct)
{
// Group events by epoch
var epochGroups = events.GroupBy(e =>
(long)(e.LogicalTime.ToUnixTimeMilliseconds() / _epochDuration.TotalMilliseconds));
foreach (var epochGroup in epochGroups.OrderBy(g => g.Key))
{
_currentEpoch = epochGroup.Key;
// Process events within epoch in causal order
var orderedEvents = TopologicalSortByVectorClock(epochGroup.ToList());
foreach (var evt in orderedEvents)
{
await ProcessEventAsync(evt, ct);
}
// Epoch checkpoint: ensure all dependencies resolved
await CheckEpochCompletenessAsync(_currentEpoch, ct);
}
}
private List<CausallyOrderedEvent> TopologicalSortByVectorClock(
List<CausallyOrderedEvent> events)
{
// Sort events by vector clock (causal order)
return events.OrderBy(e => SerializeVectorClock(e.VectorClock)).ToList();
}
private string SerializeVectorClock(VectorClock clock)
{
return string.Join(",", clock.GetClocks()
.OrderBy(kvp => kvp.Key)
.Select(kvp => $"{kvp.Key}:{kvp.Value}"));
}
private Task CheckEpochCompletenessAsync(long epoch, CancellationToken ct)
{
// Verify all causal dependencies for this epoch are resolved
return Task.CompletedTask;
}
}Dynamic CAP в Fintech
Dynamic CAP — адаптация CAP позиции в зависимости от:
- Суммы транзакции
- Типа операции
- Риска
- Юрисдикции
Dynamic CAP Implementation
public record CapDecision(
CapMode Mode,
string Reason,
decimal? Threshold,
string? RiskFactor);
public enum CapMode
{
Consistency, // CP: strong consistency required
Availability // AP: availability prioritized
}
public class DynamicCapEngine
{
private const decimal HighValueThreshold = 10_000m;
private const decimal CriticalValueThreshold = 100_000m;
private const decimal RegulatoryThreshold = 10_001m; // CTR threshold (US)
public CapDecision DecideCapMode(
decimal amount,
string currency,
string transactionType,
string sourceRegion,
string destRegion,
decimal? customerRiskScore,
bool isSettlement)
{
// Rule 1: Settlement transactions — always CP
if (isSettlement)
return new CapDecision(CapMode.Consistency,
"Settlement transactions require strong consistency",
null, "settlement");
// Rule 2: Cross-jurisdiction — always CP
if (sourceRegion != destRegion)
return new CapDecision(CapMode.Consistency,
$"Cross-jurisdiction transfer ({sourceRegion} → {destRegion}) requires consistency",
null, "data_residency");
// Rule 3: High-value — CP
if (amount >= CriticalValueThreshold)
return new CapDecision(CapMode.Consistency,
$"Critical value: {amount} {currency}",
CriticalValueThreshold, "high_value");
// Rule 4: Regulatory threshold (CTR) — CP
if (amount > RegulatoryThreshold)
return new CapDecision(CapMode.Consistency,
$"Regulatory threshold exceeded: {amount} {currency} > {RegulatoryThreshold}",
RegulatoryThreshold, "regulatory");
// Rule 5: High risk customer — CP
if (customerRiskScore.HasValue && customerRiskScore.Value > 0.7m)
return new CapDecision(CapMode.Consistency,
$"High risk customer: score {customerRiskScore}",
null, "risk_score");
// Rule 6: Balance check (read-only) — AP
if (transactionType == "balance_check")
return new CapDecision(CapMode.Availability,
"Read-only balance check",
null, "read_only");
// Rule 7: Low-value domestic — CP by default (fintech baseline)
// Even small transactions in fintech default to CP
return new CapDecision(CapMode.Consistency,
$"Standard transaction: {amount} {currency}",
amount, "standard");
}
}CAP-aware Transaction Router
public class CapAwareTransactionRouter
{
private readonly DynamicCapEngine _capEngine;
private readonly ILedgerService _ledgerService; // CP
private readonly IBalanceCacheService _balanceCache; // AP
private readonly IKafkaProducer _eventProducer; // AP
public async Task<TransferResult> ExecuteTransferAsync(
TransferCommand cmd, CancellationToken ct)
{
// Determine CAP mode
var capDecision = _capEngine.DecideCapMode(
cmd.Amount, cmd.Currency, "transfer",
cmd.SourceRegion, cmd.DestinationRegion,
cmd.CustomerRiskScore, isSettlement: false);
// Route based on CAP decision
return capDecision.Mode switch
{
CapMode.Consistency => await ExecuteConsistentAsync(cmd, ct),
CapMode.Availability => await ExecuteAvailableAsync(cmd, ct),
_ => await ExecuteConsistentAsync(cmd, ct) // default to CP
};
}
private async Task<TransferResult> ExecuteConsistentAsync(
TransferCommand cmd, CancellationToken ct)
{
// CP path: strong consistency through ledger
return await _ledgerService.ExecuteTransferAsync(cmd, ct);
}
private async Task<TransferResult> ExecuteAvailableAsync(
TransferCommand cmd, CancellationToken ct)
{
// AP path: accept at edge, process asynchronously
await _eventProducer.PublishAsync("transfer-events", cmd.IdempotencyKey,
JsonSerializer.Serialize(cmd), ct);
return TransferResult.AcceptedAsync;
}
}Consistency Trade-offs Matrix
Decision Framework
┌──────────────────────────────────────────────────────────────┐
│ Выбор уровня consistency │
│ │
│ 1. Transaction affects ledger balance? │
│ YES → Tier 0: Strong (SERIALIZABLE) │
│ NO ↓ │
│ │
│ 2. Read-your-writes required? │
│ YES → Tier 1: Session │
│ NO ↓ │
│ │
│ 3. Real-time balance display? │
│ YES → Tier 2: Eventual (materialized) │
│ NO ↓ │
│ │
│ 4. Analytics / reporting? │
│ YES → Tier 3: Analytics (batch) │
└──────────────────────────────────────────────────────────────┘Trade-offs Summary
| Tier | Consistency | Latency | Throughput | Use Case | Risk |
|---|---|---|---|---|---|
| 0 - Strong | Linearizable | 5-50ms | 10k-50k TPS | Ledger, balance validation | None |
| 1 - Session | Session | 2-10ms | 50k-100k TPS | Post-transfer balance check | Minimal |
| 2 - Eventual | Eventual | <1ms | 100k+ TPS | UI display, reporting | Balance staleness 10-500ms |
| 3 - Analytics | Best-effort | N/A | 1M+ TPS | Analytics, ML | Hours of staleness |
Checklist для Consistency Tiering
- [ ] Определены consistency tiers для всех операций
- [ ] Tier 0 (Strong) для всех ledger operations
- [ ] Tier 1 (Session) для read-your-writes
- [ ] Tier 2 (Eventual) для UI и reporting
- [ ] Tier 3 (Analytics) для data warehouse
- [ ] Quorum checkpoints для verification
- [ ] Temporal ordering для async processing
- [ ] Dynamic CAP: адаптация по сумме/риску/юрисдикции
- [ ] Reconciliation между tiers
- [ ] Monitoring staleness между tiers
Дополнительные материалы
- Gilbert & Lynch, "Brewer's Conjecture and the Feasibility of Consistent, Available, Partition-Tolerant Web Services" (2002)
- RIG Model: https://www.infoq.com/articles/rig-data-consistent-microservices/
- Temporal Consistency Models for Financial Data: https://www.ijcesen.com/index.php/ijcesen/article/view/4967
- Martin Kleppmann, "Designing Data-Intensive Applications" — Chapter 7: Decision-making
Практика
Operational discipline для fintech
Введение
В fintech архитектура — это только половина дела. Operational discipline — практики, процессы и культура, которые делают архитектуру работающей в production. Без operational discipline даже идеально спроектированная система упадёт.
> Ключевая мысль: Architectural choices matter much less than operational discipline. Distributed tracing, structured logging, monitoring, и runbooks — это multiplier для всей архитектуры.
Distributed Tracing
Correlation ID для всех сервисов
// Correlation ID middleware — propagates trace across service boundaries
public class CorrelationIdMiddleware
{
private const string CorrelationIdHeader = "X-Correlation-ID";
private readonly RequestDelegate _next;
private readonly ILogger<CorrelationIdMiddleware> _logger;
public CorrelationIdMiddleware(RequestDelegate next,
ILogger<CorrelationIdMiddleware> logger)
{
_next = next;
_logger = logger;
}
public async Task InvokeAsync(HttpContext context, ILogger logger)
{
// Extract or generate correlation ID
var correlationId = context.Request.Headers[CorrelationIdHeader].FirstOrDefault();
if (string.IsNullOrEmpty(correlationId))
{
correlationId = Guid.NewGuid().ToString("N");
context.Response.Headers[CorrelationIdHeader] = correlationId;
}
// Store in DI scope for downstream use
context.Items["CorrelationId"] = correlationId;
// Add to logging scope
using logger.BeginScope(new Dictionary<string, object>
{
["CorrelationId"] = correlationId
});
var sw = Stopwatch.StartNew();
try
{
await _next(context);
}
finally
{
sw.Stop();
logger.LogInformation(
"[{CorrelationId}] {Method} {Path} → {StatusCode} ({Elapsed}ms)",
correlationId, context.Request.Method, context.Request.Path,
context.Response.StatusCode, sw.ElapsedMilliseconds);
}
}
}
// Serilog enrichment with CorrelationId
Log.Logger = new LoggerConfiguration()
.Enrich.WithProperty("CorrelationId", "unknown")
.Enrich.WithExceptionDetails()
.Enrich.WithMachineName()
.WriteTo.Console()
.WriteTo.Seq("http://seq:5341")
.CreateLogger();
// ASP.NET Core: auto-correlation from HttpContext
app.Use(async (context, next) =>
{
var correlationId = context.Items["CorrelationId"]?.ToString() ?? "unknown";
LogContext.PushProperty("CorrelationId", correlationId, destructure: false);
await next();
});OpenTelemetry Integration
// OpenTelemetry setup for .NET
builder.Services.AddOpenTelemetry()
.WithTracing(tracing => tracing
.AddSource(builder.Environment.ApplicationName)
.AddAspNetCoreInstrumentation()
.AddHttpClientInstrumentation()
.AddNpgsqlInstrumentation()
.AddKafkaInstrumentation()
.AddSource("PaymentSaga")
.AddSource("JournalPosting")
.AddJaegerExporter(options =>
{
options.Endpoint = new Uri("http://jaeger:14268/api/traces");
})
)
.WithMetrics(metrics => metrics
.AddAspNetCoreInstrumentation()
.AddHttpClientInstrumentation()
.AddRuntimeInstrumentation()
.AddPrometheusExporter()
);
// Custom spans for financial operations
public class TransferService
{
private readonly ILogger<TransferService> _logger;
private readonly Tracer _tracer;
public TransferService(ILogger<TransferService> logger, ILoggerFactory factory)
{
_logger = logger;
_tracer = factory.CreateTracer("fintech-tracer");
}
public async Task<TransferResult> ExecuteAsync(TransferCommand cmd,
CancellationToken ct)
{
using var span = _tracer.StartActiveSpan("transfer.execute");
span.SetAttribute("source_account", cmd.SourceAccount.ToString());
span.SetAttribute("destination_account", cmd.DestinationAccount.ToString());
span.SetAttribute("amount", cmd.Amount.ToString());
span.SetAttribute("currency", cmd.Currency);
span.SetAttribute("idempotency_key", cmd.IdempotencyKey);
try
{
var result = await ExecuteTransferInternalAsync(cmd, ct);
span.SetAttribute("result", result.Success ? "success" : "failed");
if (!result.Success)
span.SetStatus(Status.Error);
return result;
}
catch (Exception ex)
{
span.RecordException(ex);
span.SetStatus(Status.Error);
throw;
}
}
}Structured Logging
Financial-specific log events
// Structured log events for financial operations
public static class FinancialLogEvents
{
// Informational
public const int JournalEntryPosted = 1001;
public const int TransferCompleted = 1002;
public const int BalanceChecked = 1003;
public const int IdempotencyHit = 1004;
// Warnings
public const int BalanceDiscrepancy = 2001;
public const int SagaCompensation = 2002;
public const int RetryAttempt = 2003;
public const int SlowQuery = 2004;
// Errors
public const int TransferFailed = 3001;
public const int JournalPostingFailed = 3002;
public const int ReconciliationFailed = 3003;
public const int OutboxPublishFailed = 3004;
public const int IdempotencyConflict = 3005;
}
public class FinancialLogger
{
private readonly ILogger<FinancialLogger> _logger;
public void LogJournalEntryPosted(Guid entryId, string idempotencyKey,
int postingCount, decimal totalAmount, string currency, double elapsedMs)
{
_logger.Log(FinancialLogEvents.JournalEntryPosted, LogLevel.Information,
null,
"Journal entry posted: EntryId={EntryId} IdempotencyKey={IdempotencyKey} " +
"Postings={PostingCount} Amount={Amount} Currency={Currency} ElapsedMs={ElapsedMs}",
entryId, idempotencyKey, postingCount, totalAmount, currency, elapsedMs);
}
public void LogBalanceDiscrepancy(Guid accountId, string currency,
decimal derived, decimal materialized, decimal difference)
{
_logger.Log(FinancialLogEvents.BalanceDiscrepancy, LogLevel.Warning,
null,
"Balance discrepancy: Account={AccountId} Currency={Currency} " +
"Derived={Derived} Materialized={Materialized} Difference={Difference}",
accountId, currency, derived, materialized, difference);
}
public void LogTransferFailed(Guid? transferId, string reason,
string sourceAccount, string destAccount, decimal amount, string currency)
{
_logger.Log(FinancialLogEvents.TransferFailed, LogLevel.Error,
null,
"Transfer failed: TransferId={TransferId} Reason={Reason} " +
"Source={Source} Dest={Dest} Amount={Amount} Currency={Currency}",
transferId, reason, sourceAccount, destAccount, amount, currency);
}
}Monitoring и Alerting
Golden Metrics для Fintech
| Metric | Type | Description | Alert Threshold |
|---|---|---|---|
ledger_journal_post_latency | Histogram | Time to post journal entry | p99 > 100ms |
ledger_transfer_success_rate | Counter | Transfer success/failure rate | < 99.9% |
ledger_balance_discrepancy_count | Counter | Balance reconciliation mismatches | > 0 |
saga_compensation_rate | Counter | Saga compensation rate | > 1% |
outbox_publish_lag | Gauge | Seconds behind real-time | > 30s |
idempotency_hit_rate | Counter | Idempotency cache hit rate | informational |
reconciliation_interval_seconds | Gauge | Time since last reconciliation | > 3600s (1h) |
audit_log_hash_valid_count | Counter | Valid audit log entries | informational |
audit_log_hash_invalid_count | Counter | Invalid audit log entries | > 0 → CRITICAL |
db_connection_pool_active | Gauge | Active DB connections | > 80% max |
Prometheus Metrics
// Custom metrics for ledger
public class LedgerMetrics
{
private readonly Histogram _journalPostLatency;
private readonly Counter _journalPostsTotal;
private readonly Counter _journalPostsFailed;
private readonly Counter _transfersTotal;
private readonly Counter _transfersFailed;
private readonly Counter _balanceDiscrepancies;
private readonly Counter _sagaCompensations;
private readonly Gauge _outboxLagSeconds;
private readonly Gauge _reconciliationLastRun;
public LedgerMetrics(Meter meter)
{
_journalPostLatency = meter.CreateHistogram<double>(
"ledger.journal_post_latency_seconds",
unit: "s",
description: "Time to post a journal entry");
_journalPostsTotal = meter.CreateCounter<long>(
"ledger.journal_posts_total",
description: "Total journal posts");
_journalPostsFailed = meter.CreateCounter<long>(
"ledger.journal_posts_failed",
description: "Failed journal posts");
_transfersTotal = meter.CreateCounter<long>(
"ledger.transfers_total",
description: "Total transfers");
_transfersFailed = meter.CreateCounter<long>(
"ledger.transfers_failed",
description: "Failed transfers");
_balanceDiscrepancies = meter.CreateCounter<long>(
"ledger.balance_discrepancies_total",
description: "Balance reconciliation discrepancies");
_sagaCompensations = meter.CreateCounter<long>(
"ledger.saga_compensations_total",
description: "Saga compensations");
_outboxLagSeconds = meter.CreateGauge<double>(
"ledger.outbox_lag_seconds",
description: "Outbox publish lag in seconds");
_reconciliationLastRun = meter.CreateGauge<double>(
"ledger.reconciliation_last_run_timestamp",
description: "Unix timestamp of last reconciliation run");
}
public void RecordJournalPost(double elapsedSeconds, bool success)
{
_journalPostLatency.Record(elapsedSeconds);
_journalPostsTotal.Add(1);
if (!success)
_journalPostsFailed.Add(1);
}
public void RecordTransfer(bool success)
{
_transfersTotal.Add(1);
if (!success)
_transfersFailed.Add(1);
}
public void RecordBalanceDiscrepancy()
{
_balanceDiscrepancies.Add(1);
}
public void RecordSagaCompensation()
{
_sagaCompensations.Add(1);
}
public void SetOutboxLag(double seconds)
{
_outboxLagSeconds.Set(seconds);
}
public void SetReconciliationLastRun()
{
_reconciliationLastRun.Set(DateTimeOffset.UtcNow.ToUnixTimeSeconds());
}
}Alerting Rules (Prometheus)
# prometheus/alerting_rules.yml
groups:
- name: fintech-critical
rules:
# Balance discrepancy — immediate critical
- alert: BalanceDiscrepancyDetected
expr: ledger_balance_discrepancies_total > 0
for: 0m
labels:
severity: critical
annotations:
summary: "Balance discrepancy detected"
description: "Account balance mismatch: {{ $value }} discrepancies found"
# Audit log integrity violation
- alert: AuditLogIntegrityViolation
expr: audit_log_hash_invalid_count > 0
for: 0m
labels:
severity: critical
annotations:
summary: "Audit log integrity violation"
description: "Tampered audit log entries detected"
# Transfer success rate drop
- alert: TransferSuccessRateLow
expr: rate(ledger_transfers_failed_total[5m]) / (rate(ledger_transfers_total[5m]) + rate(ledger_transfers_failed_total[5m])) > 0.001
for: 2m
labels:
severity: critical
annotations:
summary: "Transfer success rate below 99.9%"
description: "Current rate: {{ $value | humanizePercentage }}"
# Outbox lag
- alert: OutboxLagHigh
expr: ledger_outbox_lag_seconds > 30
for: 1m
labels:
severity: warning
annotations:
summary: "Outbox publish lag high"
description: "Outbox lag: {{ $value }} seconds"
# Saga compensation rate
- alert: SagaCompensationRateHigh
expr: rate(ledger_saga_compensations_total[5m]) / rate(ledger_transfers_total[5m]) > 0.01
for: 5m
labels:
severity: warning
annotations:
summary: "High saga compensation rate"
description: "Compensation rate: {{ $value | humanizePercentage }}"
# Reconciliation overdue
- alert: ReconciliationOverdue
expr: time() - ledger_reconciliation_last_run_timestamp > 3600
for: 1m
labels:
severity: warning
annotations:
summary: "Reconciliation not run in over 1 hour"
description: "Last reconciliation: {{ $value }} seconds ago"Chaos Engineering для Fintech
Chaos experiments для financial systems
| Experiment | Target | Expected Behavior | Success Criteria |
|---|---|---|---|
| Network partition | Ledger DB nodes | CP behavior: quorum works, non-quorum blocks | No data loss, consistent ledger |
| DB failover | Primary PostgreSQL | Streaming replication takes over | Zero data loss, <30s failover |
| Kafka partition | Event bus topic | Events buffered, no loss | All events eventually processed |
| Idempotency key collision | Ledger service | Duplicate requests handled correctly | Zero duplicate transfers |
| Saga timeout | Saga orchestrator | Compensation executes correctly | System returns to consistent state |
| Outbox backlog | Outbox publisher | Messages processed, no loss | Lag returns to normal |
| Balance cache invalidation | Materialized views | Views refresh correctly | Derived = materialized after refresh |
Chaos Engineering Implementation
// Chaos engineering simulation for ledger
public interface IChaosExperiment
{
string Name { get; }
Task ExecuteAsync(CancellationToken ct);
Task<bool> ValidateAsync(CancellationToken ct);
}
// Network partition simulation
public class NetworkPartitionExperiment : IChaosExperiment
{
private readonly string _targetNode;
private readonly ILogger<NetworkPartitionExperiment> _logger;
public string Name => "Network Partition";
public NetworkPartitionExperiment(string targetNode,
ILogger<NetworkPartitionExperiment> logger)
{
_targetNode = targetNode;
_logger = logger;
}
public async Task ExecuteAsync(CancellationToken ct)
{
_logger.LogWarning("CHAOS: Simulating network partition for node {Node}", _targetNode);
// Simulate partition: block all writes to target node
// In production: use tc (traffic control) or cloud network ACLs
await Task.Delay(TimeSpan.FromSeconds(30), ct);
_logger.LogWarning("CHAOS: Restoring network for node {Node}", _targetNode);
// Restore network
}
public async Task<bool> ValidateAsync(CancellationToken ct)
{
// Verify: no data inconsistency after partition recovery
var report = await RunReconciliationAsync(ct);
return report.Discrepancies.Count == 0;
}
}
// Chaos testing pipeline
public class ChaosPipeline
{
private readonly List<IChaosExperiment> _experiments;
private readonly ILogger<ChaosPipeline> _logger;
public ChaosPipeline(List<IChaosExperiment> experiments,
ILogger<ChaosPipeline> logger)
{
_experiments = experiments;
_logger = logger;
}
public async Task RunAsync(CancellationToken ct)
{
foreach (var experiment in _experiments)
{
_logger.LogInformation("Running chaos experiment: {Name}", experiment.Name);
var preState = await CaptureStateAsync(ct);
await experiment.ExecuteAsync(ct);
var postState = await CaptureStateAsync(ct);
var valid = await experiment.ValidateAsync(ct);
if (!valid)
{
_logger.Critical("CHAOS FAILED: {Name} — system state inconsistent", experiment.Name);
await AlertOnChaosFailureAsync(experiment.Name, preState, postState, ct);
}
else
{
_logger.LogInformation("CHAOS PASSED: {Name}", experiment.Name);
}
}
}
private Task<StateSnapshot> CaptureStateAsync(CancellationToken ct) => Task.FromResult(new StateSnapshot());
private Task AlertOnChaosFailureAsync(string experiment, StateSnapshot pre,
StateSnapshot post, CancellationToken ct) => Task.CompletedTask;
}
public record StateSnapshot(
DateTimeOffset CapturedAt,
int JournalEntries,
decimal TotalBalance,
int ActiveAccounts);Runbooks
Standard Runbooks для Fintech
Runbook: Balance Discrepancy
## Balance Discrepancy Detected
### Symptoms
- Alert: BalanceDiscrepancyDetected
- Grafana: ledger_balance_discrepancies_total > 0
### Immediate Actions
1. [ ] Check reconciliation report: `GET /api/reconciliation/latest`
2. [ ] Identify affected accounts
3. [ ] Determine root cause:
- Materialized view refresh failure?
- Journal entry corruption?
- Bug in balance calculation?
4. [ ] If materialized view issue:
- Manually trigger refresh: `REFRESH MATERIALIZED VIEW CONCURRENTLY account_balances;`
- Verify discrepancy resolved
5. [ ] If journal entry issue:
- DO NOT modify journal entries
- Create correcting journal entry
- Escalate to engineering
### Resolution Criteria
- [ ] All discrepancies resolved
- [ ] Root cause identified and documented
- [ ] Fix deployed or workaround implemented
- [ ] Post-mortem scheduled if severity >= warningRunbook: Outbox Backlog
## Outbox Events Backlog
### Symptoms
- Alert: OutboxLagHigh (lag > 30s)
- Grafana: ledger_outbox_lag_seconds increasing
### Immediate Actions
1. [ ] Check outbox publisher health
2. [ ] Check event bus connectivity
3. [ ] Check consumer processing rate
4. [ ] If publisher down:
- Restart outbox publisher
- Verify it picks up from last published offset
5. [ ] If consumer slow:
- Scale out consumers
- Check for blocking operations
6. [ ] If event bus issue:
- Check Kafka/Pulsar cluster health
- Verify topic partitions
### Resolution Criteria
- [ ] Outbox lag < 5s
- [ ] No event loss (verify outbox published count = event bus received count)Runbook: Saga Compensation Storm
## High Saga Compensation Rate
### Symptoms
- Alert: SagaCompensationRateHigh (> 1%)
- Multiple saga compensations in logs
### Immediate Actions
1. [ ] Identify failing saga step
2. [ ] Check downstream service health
3. [ ] If external payment gateway down:
- Switch to backup gateway
- Queue pending payments
4. [ ] If internal service issue:
- Check service logs and metrics
- Restart if necessary
5. [ ] Monitor compensation rate
### Resolution Criteria
- [ ] Compensation rate < 0.1%
- [ ] Root cause identified
- [ ] Pending sagas completed or compensatedReconciliation Pipeline
Automated Reconciliation Architecture
┌─────────────────────────────────────────────────────────────────┐
│ Reconciliation Pipeline │
│ │
│ ┌─────────────┐ ┌──────────────┐ ┌──────────────────┐ │
│ │ Scheduler │───→│ Journal │───→│ Materialized │ │
│ │ (every 1h) │ │ Query │ │ Balance Query │ │
│ └─────────────┘ └──────────────┘ └──────────────────┘ │
│ │ │ │
│ ▼ ▼ │
│ ┌──────────────┐ ┌──────────────────┐ │
│ │ Compare │←───│ Discrepancy │ │
│ │ & Detect │ │ Alert │ │
│ └──────────────┘ └──────────────────┘ │
│ │ │ │
│ ▼ ▼ │
│ ┌──────────────┐ ┌──────────────────┐ │
│ │ Report │───→│ Remediation │ │
│ │ & Metrics │ │ Pipeline │ │
│ └──────────────┘ └──────────────────┘ │
└─────────────────────────────────────────────────────────────────┘// Reconciliation scheduler
public class ReconciliationScheduler
{
private readonly ReconciliationService _reconciliation;
private readonly LedgerMetrics _metrics;
private readonly ILogger<ReconciliationScheduler> _logger;
private readonly CancellationTokenSource _cts;
private readonly TimeSpan _interval = TimeSpan.FromHours(1);
public ReconciliationScheduler(ReconciliationService reconciliation,
LedgerMetrics metrics, ILogger<ReconciliationScheduler> logger)
{
_reconciliation = reconciliation;
_metrics = metrics;
_logger = logger;
_cts = new CancellationTokenSource();
}
public void Start()
{
_ = Task.Run(async () =>
{
while (!_cts.Token.IsCancellationRequested)
{
try
{
_logger.LogInformation("Starting scheduled reconciliation");
var report = await _reconciliation.RunAsync(_cts.Token);
_metrics.SetReconciliationLastRun();
if (report.Discrepancies.Any())
{
_logger.LogWarning(
"Reconciliation found {Count} discrepancies",
report.Discrepancies.Count);
}
else
{
_logger.LogInformation(
"Reconciliation passed: {Accounts} accounts, zero discrepancies",
report.AccountsScanned);
}
}
catch (OperationCanceledException)
{
break;
}
catch (Exception ex)
{
_logger.LogError(ex, "Reconciliation failed");
}
await Task.Delay(_interval, _cts.Token);
}
}, _cts.Token);
}
public void Stop()
{
_cts.Cancel();
}
}Checklist для Operational Discipline
- [ ] Distributed tracing: correlation ID across all services
- [ ] Structured logging: financial-specific log events
- [ ] OpenTelemetry: metrics, traces, logs integrated
- [ ] Golden metrics: defined и monitored
- [ ] Alerting: critical + warning alerts configured
- [ ] Runbooks: standard procedures для common incidents
- [ ] Chaos engineering: regular experiments в staging
- [ ] Reconciliation: automated hourly + alerting
- [ ] Post-mortem culture: blameless incident review
- [ ] On-call rotation: 24/7 coverage для critical systems
Дополнительные материалы
- Google SRE Workbook — chapters on monitoring, incident response
- Netflix Chaos Engineering principles
- Martin Kleppmann, "Designing Data-Intensive Applications" — Chapter 11: Life of Data
- TechBullion, "Distributed Systems in U.S. Finance": https://techbullion.com/distributed-systems-in-u-s-finance-the-patterns-that-survive-production/
Практика
Контрольная точка модуля 14
- Double-entry journal с atomic posting и CHECK constraint
sum(debits) = sum(credits) - Append-only journal: zero UPDATE/DELETE на journal entries, enforced на уровне БД
- Idempotency keys на каждом state-changing endpoint с UNIQUE CONSTRAINT
- Saga pattern для cross-service транзакций с idempotent compensating transactions
- CAP boundary: CP для core ledger, AP для edge/gateway с Kafka asynchrony
- CQRS split: write store (PostgreSQL) → event bus → read store (materialized balances)
- Reconciliation pipeline: automated comparison journal entries vs balances, alerting
- Audit log: immutable, tamper-evident с cryptographic hash chain
- Zero balance drift между journal entries и materialized balances под нагрузкой
- All cross-service transactions завершаются в eventually consistent state (saga + compensation)
- Double-entry invariant enforced на уровне БД (CHECK constraint), не в application code
- Idempotency: zero duplicate charges при retry с одинаковым idempotency key
- Ledger доступен только в CP mode при network partition (quorum-based)
- Reconciliation runs automated hourly с zero unexplained discrepancies
- Audit log tamper-evident: cryptographic verification passes for all entries