14Fintech — Специфика работы с данными

Этот модуль охватывает специфические аспекты проектирования и реализации систем в fintech-индустрии: целостность финансовых данных, теорема CAP и её практическое применение, паттерны распределённой согласованности, архитектура финансовых реестров (ledger), и регуляторные требования к данным.

Этот модуль следует пройти после модулей 5 (Базы данных), 3 (Concurrency), 12 (Message Brokers). CAP theorem и distributed consistency patterns — фундамент для понимания модуля 10 (Cloud-Native).

Уровень 1: Foundation

Целостность данных в Fintech

Введение

В финансовых системах целостность данных — не просто техническое требование, а бизнес-императив и регуляторное обязательство. Ошибка в расчёте баланса может привести к потере миллионов долларов, а потеря транзакции — к судебным искам и штрафам от регуляторов.

> Ключевая мысль: ACID гарантирует корректность одной транзакции в одной БД. Но движение денег — это контракт между сервисами, кэшами, брокерами и внешними системами. Production-grade корректность живёт выше уровня базы данных.


ACID в финансовых системах

Atomicity — Атомарность

Каждая финансовая операция должна быть либо полностью выполнена, либо полностью откатана. Нет промежуточных состояний.

// CORRECT: Atomic double-entry posting
        public async Task<PostJournalEntryResult> PostAsync(JournalEntry entry, CancellationToken ct)
        {
            using var txn = await _connection.BeginTransactionAsync(IsolationLevel.Serializable, ct);
    
            try
            {
                // 1. Idempotency check
                var existing = await txn.QueryFirstOrDefaultAsync<Guid>(
                    "SELECT idempotency_key FROM journal_entries WHERE idempotency_key = @Key",
                    new { Key = entry.IdempotencyKey });
        
                if (existing.HasValue)
                    return PostJournalEntryResult.Duplicate(existing.Value);
        
                // 2. Validate double-entry invariant
                var totalDebits = entry.Postings.Sum(p => p.Amount * (p.IsDebit ? 1 : -1));
                if (Math.Abs(totalDebits) > decimal.Epsilon)
                    throw new InvalidOperationException("Double-entry invariant violated: debits must equal credits");
        
                // 3. Write all postings atomically
                await txn.ExecuteAsync("""
                    INSERT INTO journal_entries (id, idempotency_key, timestamp, description)
                    VALUES (@Id, @Key, @Ts, @Desc)
                """, new { entry.Id, entry.IdempotencyKey, entry.Timestamp, entry.Description });
        
                await txn.ExecuteAsync("""
                    INSERT INTO journal_postings (entry_id, account_id, amount, is_debit, version)
                    VALUES (@EntryId, @AccountId, @Amount, @IsDebit, @Version)
                """, entry.Postings.Select(p => new { entry.Id, p.AccountId, p.Amount, p.IsDebit, p.Version }));
        
                // 4. Emit outbox event (same transaction!)
                await txn.ExecuteAsync("""
                    INSERT INTO outbox_events (id, aggregate_id, event_type, payload, created_at)
                    VALUES (@Id, @AggId, @Type, @Payload, @Created)
                """, new {
                    Id = Guid.NewGuid(),
                    AggId = entry.Id,
                    Type = "JournalEntryPosted",
                    Payload = JsonSerializer.Serialize(entry),
                    Created = DateTimeOffset.UtcNow
                });
        
                await txn.CommitAsync();
                return PostJournalEntryResult.Success(entry.Id);
            }
            catch
            {
                await txn.RollbackAsync();
                throw;
            }
        }

Consistency — Согласованность

Система переходит из одного корректного состояния в другое. Все business invariants соблюдаются.

Ключевые инварианты в fintech:

ИнвариантОписаниеКак enforcement
Double-entrysum(debits) = sum(credits) для каждой записиCHECK constraint на уровне БД
Non-negative balanceБаланс не может быть отрицательным (если не овердрафт)CHECK constraint или trigger
No double-spendОдни и те же средства не могут быть переведены дваждыIdempotency key + UNIQUE constraint
Order preservationТранзакции обрабатываются в корректном порядкеMonotonic timestamps / sequence numbers
Total conservationСумма денег в системе не меняетсяDouble-entry invariant
-- Database-level enforcement of double-entry invariant
        ALTER TABLE journal_entries ADD CONSTRAINT chk_double_entry
        CHECK (
            (SELECT SUM(amount * CASE WHEN is_debit = true THEN 1 ELSE -1 END 
             FROM journal_postings WHERE entry_id = journal_entries.id) = 0)
        );

        -- Non-negative balance enforcement
        ALTER TABLE accounts ADD CONSTRAINT chk_non_negative_balance
        CHECK (balance >= 0);

        -- Idempotency key uniqueness
        CREATE UNIQUE INDEX uq_journal_idempotency_key ON journal_entries(idempotency_key);

Isolation — Изоляция

Конкурентные транзакции не должны влиять друг на друга. В финансовых системах это критично для предотвращения race conditions при одновременных переводах.

Уровни изоляции в контексте fintech:

УровеньDirty ReadNon-Repeatable ReadPhantom ReadUse CaseRisk
Read UncommittedPossiblePossiblePossibleAnalytics onlyКритический
Read CommittedNoPossiblePossibleReportingСредний
Repeatable ReadNoNoPossibleInternal operationsНизкий
SerializableNoNoNoLedger postingsНулевой
// SERIALIZABLE isolation for balance-critical operations
        public async Task<TransferResult> TransferAsync(TransferCommand cmd, CancellationToken ct)
        {
            using var txn = await _connection.BeginTransactionAsync(IsolationLevel.Serializable, ct);
    
            try
            {
                // Pessimistic locking: SELECT FOR UPDATE
                var sourceBalance = await txn.QueryFirstOrDefaultAsync<decimal>(
                    "SELECT balance FROM accounts WHERE id = @Id FOR UPDATE",
                    new { cmd.SourceAccountId });
        
                if (sourceBalance < cmd.Amount)
                    return TransferResult.InsufficientFunds;
        
                // Deduct from source
                await txn.ExecuteAsync(
                    "UPDATE accounts SET balance = balance - @Amount WHERE id = @Id",
                    new { cmd.SourceAccountId, cmd.Amount });
        
                // Add to destination
                await txn.ExecuteAsync(
                    "UPDATE accounts SET balance = balance + @Amount WHERE id = @Id",
                    new { cmd.DestinationAccountId, cmd.Amount });
        
                // Post journal entry (atomic with updates)
                await PostJournalEntryAsync(cmd, txn, ct);
        
                await txn.CommitAsync();
                return TransferResult.Success;
            }
            catch (DbException)
            {
                await txn.RollbackAsync();
                return TransferResult.ConflictRetry;
            }
        }

Durability — Долговечность

После коммита данные не должны быть потеряны даже при отказе оборудования.

// WAL (Write-Ahead Log) configuration in PostgreSQL
        // Requires:
        // - fsync = on
        // - wal_level = replica
        // - synchronous_commit = on
        // - max_wal_senders > 0 (for replication)
        // - replication slot created

        // In .NET, ensure connection string enables reliable writes:
        // "Host=db;Database=ledger;Username=app;Password=xxx;SslMode=Require;
        //  TrustServerCertificate=false;Timeout=30;CommandTimeout=30;
        //  Pooling=true;MinPoolSize=5;MaxPoolSize=100;"

        // Npgsql connection with durability guarantees:
        var dataSourceBuilder = new NpgsqlDataSourceBuilder(connectionString);
        dataSourceBuilder.UseSslServerValidation();
        var dataSource = dataSourceBuilder.Build();

        // Verify synchronous commit
        using var conn = await dataSource.OpenConnectionAsync();
        using var cmd = new NpgsqlCommand("SHOW synchronous_commit", conn);
        var value = await cmd.ExecuteScalarAsync();
        Console.WriteLine($"Synchronous commit: {value}"); // Expected: 'on'

Double-Entry Bookkeeping

Двойная запись — не просто бухгалтерская традиция. Это математический инвариант, который гарантирует, что деньги не создаются и не уничтожаются архитектурными ошибками.

Как это работает

Каждая транзакция записывается как journal entry с двумя или более posting'ами. Сумма всех debits равна сумме всех credits.

Transaction: Transfer $100 from Account A to Account B

        Journal Entry #1001
        ├── Posting 1: Account A  → Debit  $100  (уменьшение актива)
        ├── Posting 2: Account B  → Credit $100  (увеличение актива)
        └── Total:                Debit $100 = Credit $100 ✓

Важно: В бухгалтерской терминологии:

  • Debit (дебет) для asset account = увеличение
  • Credit (кредит) для asset account = уменьшение

Но в системе платежей проще думать так:

  • is_debit = true → деньги уходят со счёта
  • is_debit = false → деньги приходят на счёт
public record JournalEntry(
            Guid Id,
            string IdempotencyKey,
            DateTimeOffset Timestamp,
            string Description,
            IReadOnlyList<JournalPosting> Postings);

        public record JournalPosting(
            Guid AccountId,
            decimal Amount,
            bool IsDebit,
            int Version);

        // Validation: double-entry invariant
        public bool IsValid()
        {
            var net = Postings.Sum(p => p.Amount * (p.IsDebit ? 1m : -1m));
            return Math.Abs(net) < decimal.Epsilon;
        }

Append-Only Journal

Journal entries NEVER change после posting. Если нужно исправить ошибку — создаётся reversing entry.

-- Zero UPDATE/DELETE on journal entries
        -- PostgreSQL permissions:
        -- GRANT INSERT, SELECT ON journal_entries TO posting_engine_role;
        -- REVOKE UPDATE, DELETE ON journal_entries FROM posting_engine_role;

        -- Reversal entry (не UPDATE существующей записи!)
        INSERT INTO journal_entries (id, idempotency_key, timestamp, description)
        VALUES (gen_random_uuid(), gen_random_uuid(), NOW(), 'Reversal of entry #1001');

        INSERT INTO journal_postings (entry_id, account_id, amount, is_debit, version)
        SELECT entry_id, account_id, -amount, NOT is_debit, version
        FROM journal_postings WHERE entry_id = '1001-guid';

Idempotency

В финансовых системах retry — это норма, а не исключение. Сети лагают, таймауты случаются, клиенты повторяют запросы. Idempotency гарантирует, что повторная обработка того же запроса не создаст дублирующую транзакцию.

Idempotency Key Pattern

public class IdempotencyService
        {
            private readonly NpgsqlDataSource _dataSource;
    
            public IdempotencyService(NpgsqlDataSource dataSource)
            {
                _dataSource = dataSource;
            }
    
            public async Task<T> ExecuteAsync<T>(
                string key,
                Func<Task<T>> operation,
                TimeSpan? ttl = null,
                CancellationToken ct = default)
            {
                ttl ??= TimeSpan.FromHours(24);
        
                using var conn = await _dataSource.OpenConnectionAsync(ct);
                using var txn = await conn.BeginTransactionAsync(ct);
        
                // Check existing result
                var existing = await conn.QueryFirstOrDefaultAsync<IdempotencyRecord>(
                    """
                    SELECT result FROM idempotency_keys 
                    WHERE key = @Key AND expires_at > NOW()
                    """, new { Key = key }, transaction: txn);
        
                if (existing != null)
                    return existing.Result;
        
                // Execute operation
                T result;
                try
                {
                    result = await operation();
                }
                catch
                {
                    await txn.RollbackAsync(ct);
                    throw;
                }
        
                // Store result atomically
                await conn.ExecuteAsync(
                    """
                    INSERT INTO idempotency_keys (key, result, expires_at)
                    VALUES (@Key, @Result, @Expires)
                    """,
                    new { Key = key, Result = result, Expires = DateTimeOffset.UtcNow + ttl },
                    transaction: txn);
        
                await txn.CommitAsync(ct);
                return result;
            }
        }

        public record IdempotencyRecord(string Result);

Idempotency в HTTP API

// Client sends idempotency key in header
        // POST /api/transfers
        // Idempotency-Key: abc-123-def
        // { "from": "acc-1", "to": "acc-2", "amount": 100 }

        [HttpPost("transfers")]
        public async Task<IActionResult> CreateTransfer([FromBody] TransferRequest request,
            [FromHeader(Name = "Idempotency-Key")] string idempotencyKey)
        {
            if (string.IsNullOrWhiteSpace(idempotencyKey))
                return BadRequest("Idempotency-Key header is required");
    
            var result = await _transferService.ExecuteWithIdempotencyAsync(
                idempotencyKey,
                () => _transferService.ProcessTransferAsync(request));
    
            return result switch
            {
                TransferResult.Success => Ok(new { transferId = result.TransferId }),
                TransferResult.InsufficientFunds => BadRequest("Insufficient funds"),
                TransferResult.Duplicate => Ok(new { transferId = result.DuplicateTransferId }),
                TransferResult.ConflictRetry => RetryAfter(2),
                _ => StatusCode(500)
            };
        }

Immutability и Audit Trail

Immutable Audit Log

Каждая операция записи в финансовую систему должна фиксироваться в неизменяемом audit log.

-- Audit log table: zero UPDATE/DELETE
        CREATE TABLE audit_log (
            id BIGSERIAL PRIMARY KEY,
            event_type VARCHAR(100) NOT NULL,
            actor_id UUID NOT NULL,
            entity_type VARCHAR(50) NOT NULL,
            entity_id UUID NOT NULL,
            action VARCHAR(50) NOT NULL,       -- CREATED, POSTED, REVERSED
            old_state JSONB,                    -- состояние ДО операции
            new_state JSONB,                    -- состояние ПОСЛЕ операции
            metadata JSONB,                     -- дополнительные данные
            ip_address INET,
            user_agent TEXT,
            created_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
        );

        -- Cryptographic hash chain для tamper-evidence
        ALTER TABLE audit_log ADD COLUMN hash TEXT;
        ALTER TABLE audit_log ADD COLUMN prev_hash TEXT;

        -- Trigger для вычисления хеша
        CREATE OR REPLACE FUNCTION compute_audit_hash()
        RETURNS TRIGGER AS $$
        DECLARE
            prev_hash_val TEXT;
            data_to_hash TEXT;
        BEGIN
            -- Get previous hash
            SELECT hash INTO prev_hash_val FROM audit_log ORDER BY id DESC LIMIT 1;
            prev_hash_val := COALESCE(prev_hash_val, 'genesis');
    
            -- Create hash of this record + previous hash
            data_to_hash := NEW.id || NEW.event_type || NEW.actor_id || 
                            NEW.action || NEW.old_state || NEW.new_state || NEW.created_at;
            NEW.hash := encode(sha256(data_to_hash::bytea), 'hex');
            NEW.prev_hash := prev_hash_val;
    
            RETURN NEW;
        END;
        $$ LANGUAGE plpgsql;

        CREATE TRIGGER trg_audit_hash
            BEFORE INSERT ON audit_log
            FOR EACH ROW EXECUTE FUNCTION compute_audit_hash();

Reconciliation

Reconciliation — процесс сравнения journal entries с материализованными balance'ами для обнаружения расхождений.

public class ReconciliationService
        {
            private readonly NpgsqlDataSource _dataSource;
            private readonly ILogger<ReconciliationService> _logger;
    
            public ReconciliationService(NpgsqlDataSource dataSource, 
                ILogger<ReconciliationService> logger)
            {
                _dataSource = dataSource;
                _logger = logger;
            }
    
            public async Task<ReconciliationReport> RunAsync(CancellationToken ct)
            {
                var discrepancies = new List<Discrepancy>();
        
                using var conn = await _dataSource.OpenConnectionAsync(ct);
        
                // Get all accounts
                var accounts = await conn.QueryAsync<Account>(
                    "SELECT id, currency FROM accounts WHERE is_active = true", ct: ct);
        
                foreach (var account in accounts)
                {
                    // Materialized balance
                    var storedBalance = await conn.QueryFirstOrDefaultAsync<decimal?>(
                        "SELECT balance FROM account_balances WHERE account_id = @Id",
                        new { Id = account.Id }, ct: ct);
            
                    // Derived balance from journal entries
                    var derivedBalance = await conn.QueryFirstOrDefaultAsync<decimal>(
                        @"SELECT COALESCE(SUM(amount * CASE WHEN is_debit = true THEN -1 ELSE 1 END), 0)
                          FROM journal_postings jp
                          JOIN journal_entries je ON jp.entry_id = je.id
                          WHERE jp.account_id = @Id AND je.status = 'POSTED'",
                        new { Id = account.Id }, ct: ct);
            
                    if (Math.Abs(storedBalance.GetValueOrDefault() - derivedBalance) > decimal.Epsilon)
                    {
                        discrepancies.Add(new Discrepancy(
                            account.Id,
                            account.Currency,
                            storedBalance.GetValueOrDefault(),
                            derivedBalance,
                            DateTimeOffset.UtcNow));
                
                        _logger.LogWarning(
                            "Balance discrepancy detected: Account={AccountId} Stored={Stored} Derived={Derived}",
                            account.Id, storedBalance, derivedBalance);
                    }
                }
        
                return new ReconciliationReport(DateTimeOffset.UtcNow, discrepancies);
            }
        }

        public record ReconciliationReport(
            DateTimeOffset RanAt,
            IReadOnlyList<Discrepancy> Discrepancies);

        public record Discrepancy(
            Guid AccountId,
            string Currency,
            decimal StoredBalance,
            decimal DerivedBalance,
            DateTimeOffset DetectedAt);

Materialized Balances vs Derived Balances

Проблема mutable balance

WRONG: Mutable balance column
        ┌─────────────────────────────────────────────────────┐
        │ UPDATE accounts SET balance = balance - 100 WHERE id = 'A';
        │ UPDATE accounts SET balance = balance + 100 WHERE id = 'B';
        │ -- Если второй UPDATE упадёт → balance A уменьшен, B нет  │
        │ -- Конкурентные записи → lost update                      │
        │ -- Balance drift со временем                               │
        └─────────────────────────────────────────────────────┘

Правильный подход: derived balance + materialized cache

CORRECT: Derived from journal entries, cached as materialized view
        ┌──────────────────────────────────────────────────────────────┐
        │ Journal Entries (source of truth):                           │
        │   Entry 1: A -100, B +100                                   │
        │   Entry 2: A -50,  C +50                                    │
        │                                                              │
        │ Derived balance for A: -150 (always correct)                │
        │ Materialized balance: -150 (cached, invalidated on new entry)│
        │                                                              │
        │ Reconciliation: periodic comparison of derived vs materialized│
        └──────────────────────────────────────────────────────────────┘
-- Materialized view for fast reads
        CREATE MATERIALIZED VIEW account_balances AS
        SELECT 
            jp.account_id,
            SUM(jp.amount * CASE WHEN jp.is_debit = true THEN -1 ELSE 1 END) AS balance
        FROM journal_postings jp
        JOIN journal_entries je ON jp.entry_id = je.id
        WHERE je.status = 'POSTED'
        GROUP BY jp.account_id;

        -- Create unique index for locking
        CREATE UNIQUE INDEX uq_account_balances_account_id ON account_balances(account_id);

        -- Refresh on new journal entries (via trigger or event handler)
        CREATE OR REPLACE FUNCTION refresh_account_balances()
        RETURNS TRIGGER AS $$
        BEGIN
            REFRESH MATERIALIZED VIEW CONCURRENTLY account_balances;
            RETURN NEW;
        END;
        $$ LANGUAGE plpgsql;

        CREATE TRIGGER trg_refresh_balances
            AFTER INSERT ON journal_entries
            FOR EACH STATEMENT EXECUTE FUNCTION refresh_account_balances();
// Balance types for different consistency needs
        public enum BalanceType
        {
            /// <summary>
            /// Real-time balance from writer — strongest consistency
            /// Use for: financial decisions, transfer validation
            /// </summary>
            Writer,
    
            /// <summary>
            /// Session-level consistency — read from writer after write
            /// Use for: "check balance right after transfer"
            /// </summary>
            Session,
    
            /// <summary>
            /// Materialized view — fast reads, eventually consistent
            /// Use for: UI display, reporting
            /// </summary>
            Materialized
        }

        public async Task<decimal> GetBalanceAsync(Guid accountId, BalanceType type, CancellationToken ct)
        {
            return type switch
            {
                BalanceType.Writer => await GetWriterBalanceAsync(accountId, ct),
                BalanceType.Session => await GetSessionBalanceAsync(accountId, ct),
                BalanceType.Materialized => await GetMaterializedBalanceAsync(accountId, ct),
                _ => throw new ArgumentOutOfRangeException(nameof(type), type, null)
            };
        }

        private async Task<decimal> GetWriterBalanceAsync(Guid accountId, CancellationToken ct)
        {
            // Direct from journal entries — always correct, but slower
            using var conn = await _dataSource.OpenConnectionAsync(ct);
            return await conn.QueryFirstOrDefaultAsync<decimal>(
                @"SELECT COALESCE(SUM(amount * CASE WHEN is_debit = true THEN -1 ELSE 1 END), 0)
                  FROM journal_postings jp
                  JOIN journal_entries je ON jp.entry_id = je.id
                  WHERE jp.account_id = @Id AND je.status = 'POSTED'",
                new { Id = accountId }, ct: ct);
        }

        private async Task<decimal> GetMaterializedBalanceAsync(Guid accountId, CancellationToken ct)
        {
            // From materialized view — fast, eventually consistent
            using var conn = await _dataSource.OpenConnectionAsync(ct);
            return await conn.QueryFirstOrDefaultAsync<decimal>(
                "SELECT balance FROM account_balances WHERE account_id = @Id",
                new { Id = accountId }, ct: ct);
        }

Outbox Pattern

Outbox pattern гарантирует, что journal entry и событие для event bus записываются атомарно — либо оба, либо ни одного.

// Outbox table — written in the same transaction as the journal entry
        CREATE TABLE outbox_events (
            id UUID PRIMARY KEY,
            aggregate_id UUID NOT NULL,
            event_type VARCHAR(100) NOT NULL,
            payload JSONB NOT NULL,
            created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
            published_at TIMESTAMPTZ,
            retry_count INTEGER NOT NULL DEFAULT 0
        );

        CREATE INDEX idx_outbox_unpublished ON outbox_events(created_at) 
            WHERE published_at IS NULL;

        // Publisher — polls outbox and publishes to event bus
        public class OutboxPublisher
        {
            private readonly NpgsqlDataSource _dataSource;
            private readonly IEventBus _eventBus;
            private readonly ILogger<OutboxPublisher> _logger;
    
            public OutboxPublisher(NpgsqlDataSource dataSource, IEventBus eventBus,
                ILogger<OutboxPublisher> logger)
            {
                _dataSource = dataSource;
                _eventBus = eventBus;
                _logger = logger;
            }
    
            public async Task PublishBatchAsync(int batchSize = 100, CancellationToken ct = default)
            {
                using var conn = await _dataSource.OpenConnectionAsync(ct);
        
                var events = await conn.QueryAsync<OutboxEvent>(
                    """
                    SELECT id, aggregate_id, event_type, payload
                    FROM outbox_events
                    WHERE published_at IS NULL
                    ORDER BY created_at
                    LIMIT @Limit
                    """,
                    new { Limit = batchSize }, ct: ct);
        
                foreach (var evt in events)
                {
                    try
                    {
                        await _eventBus.PublishAsync(evt.EventType, evt.Payload, ct);
                
                        await conn.ExecuteAsync(
                            "UPDATE outbox_events SET published_at = NOW() WHERE id = @Id",
                            new { evt.Id }, ct: ct);
                    }
                    catch
                    {
                        await conn.ExecuteAsync(
                            """
                            UPDATE outbox_events 
                            SET retry_count = retry_count + 1
                            WHERE id = @Id
                            """,
                            new { evt.Id }, ct: ct);
                
                        _logger.LogWarning(ex, "Failed to publish outbox event {EventId}, retry {RetryCount}",
                            evt.Id, evt.RetryCount + 1);
                
                        throw;
                    }
                }
            }
        }

        public record OutboxEvent(
            Guid Id,
            Guid AggregateId,
            string EventType,
            string Payload);

Конкурентный контроль

Pessimistic Locking (SELECT FOR UPDATE)

// Pessimistic: lock the row, block other transactions
        public async Task<TransferResult> TransferPessimisticAsync(
            Guid sourceId, Guid destId, decimal amount, CancellationToken ct)
        {
            using var txn = await _connection.BeginTransactionAsync(IsolationLevel.Serializable, ct);
    
            try
            {
                // Lock source account row
                var balance = await txn.QueryFirstOrDefaultAsync<decimal>(
                    "SELECT balance FROM accounts WHERE id = @Id FOR UPDATE",
                    new { Id = sourceId }, ct: ct);
        
                if (balance < amount)
                    return TransferResult.InsufficientFunds;
        
                // Perform transfers
                await txn.ExecuteAsync("UPDATE accounts SET balance = balance - @A WHERE id = @S",
                    new { A = amount, S = sourceId }, ct: ct);
                await txn.ExecuteAsync("UPDATE accounts SET balance = balance + @A WHERE id = @D",
                    new { A = amount, D = destId }, ct: ct);
        
                await txn.CommitAsync();
                return TransferResult.Success;
            }
            catch (DbException)
            {
                await txn.RollbackAsync();
                return TransferResult.ConflictRetry;
            }
        }

Optimistic Concurrency Control (OCC)

// Optimistic: no locks, retry on conflict
        public async Task<TransferResult> TransferOptimisticAsync(
            Guid sourceId, Guid destId, decimal amount, int currentVersion, CancellationToken ct)
        {
            using var txn = await _connection.BeginTransactionAsync(IsolationLevel.Serializable, ct);
    
            try
            {
                // Update with version check
                var rowsAffected = await txn.ExecuteAsync(
                    """
                    UPDATE accounts 
                    SET balance = balance - @Amount, version = version + 1
                    WHERE id = @Id AND version = @Version
                    """,
                    new { Amount = amount, Id = sourceId, Version = currentVersion }, ct: ct);
        
                if (rowsAffected == 0)
                    return TransferResult.ConflictRetry;
        
                // Similar for destination...
                await txn.CommitAsync();
                return TransferResult.Success;
            }
            catch
            {
                await txn.RollbackAsync();
                return TransferResult.ConflictRetry;
            }
        }

Checklist для data integrity в fintech

  • [ ] ACID транзакции для всех financial operations
  • [ ] Double-entry invariant enforced на уровне БД (CHECK constraint)
  • [ ] Append-only journal: zero UPDATE/DELETE на entries
  • [ ] Idempotency keys на каждом state-changing endpoint
  • [ ] SERIALIZABLE isolation для balance-critical writes
  • [ ] Synchronous WAL commit включён
  • [ ] Materialized balances invalidируются при новых journal entries
  • [ ] Reconciliation pipeline: automated comparison journal vs balances
  • [ ] Outbox pattern: events written atomically с journal entries
  • [ ] Audit log: immutable, tamper-evident с cryptographic hash chain
  • [ ] Pessimistic или optimistic locking для concurrent transfers
  • [ ] Balance types: Writer (strong) vs Materialized (fast)

Дополнительные материалы

  • [ACID Is a Contract, Not a Religion — Gothar](https://gothar.com/en/insights/acid-ledger-correctness-2026)
  • [Payment Ledger Architecture — Trio](https://trio.dev/payment-ledger-architecture-fintech/)
  • [TigerBeetle Design Document](https://github.com/tigerbeetledb/tigerbeetle/blob/main/docs/DESIGN.md)
  • [The Twisp Financial Ledger Database](https://www.twisp.com/docs/infrastructure/ledger-database)
  • Martin Kleppmann, "Designing Data-Intensive Applications" — главы 3, 6, 10

Практика


Теорема CAP в Fintech

Введение

Теорема CAP (Consistency, Availability, Partition Tolerance) — один из фундаментальных результатов теории распределённых систем, сформулированная Эриком Браурером (Eric Brewer) в 2000 году и формально доказанная как CAP theorem в работе Gilbert и Lynch (2002).

> Формулировка: В распределённой системе при наличии network partition (P) невозможно одновременно гарантировать Consistency (C) и Availability (A). Можно выбрать только одно из двух.

В контексте fintech эта теорема приобретает особый смысл, потому что:

  • В обычных системах eventual consistency — часто приемлемый компромисс
  • В финансовых системах ledger of record почти всегда должен выбирать consistency
  • Деньги не могут быть созданы или уничтожены архитектурной ошибкой

Три столпа CAP

Consistency (C) — Сильная согласованность

Каждый read видит результат последнего write. Все ноды видят одни и те же данные в один момент времени (linearizability).

Client → Node A (write: $100) → Node B → Node C
                                       ↓
        Client → read → Node B → возвращает $100 (свежие данные)

Характеристики:

  • Linearizable reads: все ноды синхронизированы
  • Write требует quorum (большинство) нод
  • При partition: недоступная часть останавливает writes
  • Примеры: Google Spanner, CockroachDB, PostgreSQL streaming replication

Availability (A) — Доступность

Каждый valid request получает response, даже если часть нод недоступна. No guarantees о свежести данных.

Client → Node A (write: $100) — OK ✓
        Client → Node B (read) — возвращает $0 (устаревшие данные, но система отвечает)

Характеристики:

  • Каждая нода отвечает independently
  • Write может уйти на любую ноду
  • При partition: обе части продолжают работать
  • Примеры: DynamoDB, Cassandra, Couchbase

Partition Tolerance (P) — Устойчивость к partition

Система продолжает работать при сетевых разрывах между нодами.

[Data Center US] ────✗✗✗──── [Data Center EU]
          (Node A, B)                 (Node C, D)
  
          Partition: US ↔ EU network broken

Важно: В реальных распределённых системах P — это не выбор, а данность. Сети ломаются всегда. Поэтому CAP на практике сводится к выбору: CP или AP при partition.


CAP в финансовых системах: ключевое ограничение

> Самое важное правило fintech-архитектуры: Ledger of record всегда выбирает Consistency. Downstream-системы могут выбирать Availability и терпеть eventual consistency, но сам ledger — нет.

Почему ledger не может выбрать AP

Сценарий: Partition между US-East и EU-West

        Account balance: $150

        US-East (AP):  deduct $100 → balance = $50  ✓
        EU-West (AP):  deduct $100 → balance = $50  ✓

        После восстановления partition:
        Account balance: $50 (должно быть $-50 или $150 — оба варианта неверны!)

        Результат: double-spend, $100 создано из ничего
Альтернатива: CP (Consistency)

        Account balance: $150

        US-East (3 nodes — quorum):  deduct $100 → balance = $50  ✓
        EU-West (2 nodes — no quorum):  WRITE BLOCKED  ⛔

        После восстановления partition:
        Account balance: $50 — корректно ✓

CAP Matrix: Базы данных в контексте Fintech

База данныхCAP позицияПодходит для fintech?Use Case
PostgreSQL (single)CP (P не применима)✅ Core ledgerТранзакции, journal entries
PostgreSQL (streaming replication)CP✅ Core ledger (read replicas)Read replicas для reporting
CockroachDBCP✅ Multi-region ledgerГлобальный ledger с strong consistency
Google SpannerCP✅ Multi-region ledgerГлобальный ledger с TrueTime
MongoDB (single)CP (P не применима)✅ Single-region ledgerTransactional data
MongoDB (multi-set replication)CP/AP configurable⚠️ С осторожностьюLedger с ручной настройкой
CassandraAP❌ LedgerAnalytics, cache, search
DynamoDBAP❌ LedgerCache, session data, event store
RedisCP (single) / AP (cluster)⚠️ Cache onlyHot balances, idempotency keys
ElasticsearchAP❌ TransactionalSearch, analytics, log storage

CAP Boundary Pattern

Правильный подход к CAP в fintech — не выбирать одну позицию для всей системы, а определить CAP boundary между разными компонентами.

Архитектура с CAP boundary

┌─────────────────────────────────────────────────────────────────┐
        │  Edge Tier (AP — Availability)                                   │
        │                                                                  │
        │  API Gateway ──→ Kafka (AP, partition-tolerant)                  │
        │  - Принимает запросы при любой partition                         │
        │  - Гарантирует at-least-once delivery                            │
        │  - Idempotency keys для duplicate protection                     │
        │                                                                  │
        │                    │                                             │
        │                    ▼                                             │
        │  Orchestration Tier (Hybrid)                                     │
        │                                                                  │
        │  Saga Orchestrator                                               │
        │  - Читает из Kafka (AP)                                          │
        │  - Записывает в Ledger (CP)                                      │
        │  - Мост между AP и CP мирами                                     │
        │                                                                  │
        │                    │                                             │
        │                    ▼                                             │
        │  Core Tier (CP — Consistency)                                    │
        │                                                                  │
        │  Ledger Database (CockroachDB / PostgreSQL)                      │
        │  - Strong consistency для financial records                      │
        │  - Quorum-based writes                                           │
        │  - При partition: quorum часть работает, остальная блокируется   │
        │  - Double-entry invariant enforced                               │
        │                                                                  │
        │                    │                                             │
        │                    ▼                                             │
        │  Periphery Tier (AP — eventual consistency)                      │
        │                                                                  │
        │  Read Models / Materialized Views                                │
        │  - UI balances, reporting, analytics                             │
        │  - Tolerate eventual consistency                                 │
        │  - Reconciliation pipeline для обнаружения расхождений           │
        └─────────────────────────────────────────────────────────────────┘

Реализация CAP boundary с Kafka + PostgreSQL

// Edge: AP — Kafka принимает все запросы
        public class PaymentEdgeService
        {
            private readonly IKafkaProducer _producer;
    
            public async Task<ReceiveResult> ReceivePaymentAsync(PaymentRequest request, 
                string idempotencyKey, CancellationToken ct)
            {
                // Даже если backend недоступен — мы принимаем запрос
                var eventMsg = new PaymentReceivedEvent(
                    Id: Guid.NewGuid(),
                    IdempotencyKey: idempotencyKey,
                    Amount: request.Amount,
                    Currency: request.Currency,
                    SourceAccount: request.SourceAccount,
                    DestinationAccount: request.DestinationAccount,
                    ReceivedAt: DateTimeOffset.UtcNow
                );
        
                await _producer.PublishAsync("payment-events", idempotencyKey, 
                    JsonSerializer.Serialize(eventMsg), ct);
        
                return ReceiveResult.Accepted; // "Мы получили, обрабатываем"
            }
        }

        // Core: CP — Ledger гарантирует consistency
        public class LedgerService
        {
            private readonly NpgsqlDataSource _dataSource;
    
            public async Task<PaymentProcessedResult> ProcessPaymentAsync(
                PaymentReceivedEvent evt, CancellationToken ct)
            {
                using var txn = await _dataSource.BeginTransactionAsync(IsolationLevel.Serializable, ct);
        
                try
                {
                    // Idempotency check (UNIQUE constraint на idempotency_key)
                    var existing = await txn.QueryFirstOrDefaultAsync<Guid>(
                        "SELECT transfer_id FROM transfers WHERE idempotency_key = @Key",
                        new { Key = evt.IdempotencyKey }, ct: ct);
            
                    if (existing.HasValue)
                        return PaymentProcessedResult.Duplicate(existing.Value);
            
                    // Balance check + transfer (SERIALIZABLE isolation)
                    var sourceBalance = await txn.QueryFirstOrDefaultAsync<decimal>(
                        "SELECT balance FROM accounts WHERE id = @Id FOR UPDATE",
                        new { Id = evt.SourceAccount }, ct: ct);
            
                    if (sourceBalance < evt.Amount)
                        return PaymentProcessedResult.InsufficientFunds;
            
                    // Atomic transfer + journal entry
                    await txn.ExecuteAsync(
                        "UPDATE accounts SET balance = balance - @A WHERE id = @Id",
                        new { A = evt.Amount, Id = evt.SourceAccount }, ct: ct);
            
                    await txn.ExecuteAsync(
                        "UPDATE accounts SET balance = balance + @A WHERE id = @Id",
                        new { A = evt.Amount, Id = evt.DestinationAccount }, ct: ct);
            
                    var transferId = Guid.NewGuid();
                    await txn.ExecuteAsync(
                        "INSERT INTO transfers (id, idempotency_key, amount, source, destination, status) " +
                        "VALUES (@Id, @Key, @Amt, @Src, @Dst, 'COMPLETED')",
                        new { Id = transferId, Key = evt.IdempotencyKey, Amt = evt.Amount,
                              Src = evt.SourceAccount, Dst = evt.DestinationAccount }, ct: ct);
            
                    await txn.CommitAsync();
                    return PaymentProcessedResult.Success(transferId);
                }
                catch (DbException)
                {
                    await txn.RollbackAsync(ct);
                    return PaymentProcessedResult.Conflict;
                }
            }
        }

Dynamic CAP

Dynamic CAP — концепция, при которой система динамически адаптирует свою CAP-позицию в зависимости от контекста транзакции (сумма, риск, тип операции).

Dynamic CAP в действии

public class DynamicCapRouter
        {
            private const decimal HighRiskThreshold = 10_000m;
            private const decimal CriticalRiskThreshold = 100_000m;
    
            public CapMode DetermineCapMode(decimal amount, string transactionType)
            {
                // Критические транзакции — всегда CP
                if (amount >= CriticalRiskThreshold)
                    return CapMode.Consistency;
        
                // Высокий риск — CP
                if (amount >= HighRiskThreshold)
                    return CapMode.Consistency;
        
                // Низкий риск, read-only — можно AP
                if (transactionType == "balance_check")
                    return CapMode.Availability;
        
                // Стандартные транзакции — CP по умолчанию
                return CapMode.Consistency;
            }
        }

        public enum CapMode
        {
            Consistency,   // CP — ledger writes, high-value transfers
            Availability   // AP — balance checks, reporting, analytics
        }

Примеры Dynamic CAP:

ТранзакцияСуммаCAP ModeПочему
Проверка баланса$1APRead-only, нет риска
Перевод$50CPStandard financial operation
Перевод$5,000CPStandard financial operation
Перевод$50,000CPHigh-value, strict consistency
Платёж merchant$20AP (edge) → CP (core)AP для приёма, CP для обработки
Settlement (между банками)$10MCPInter-bank settlement, zero risk tolerance

Quorum-based Consensus

Quorum — механизм, при котором для write требуется согласие большинства (majority) нод, а для read — чтение с большинства нод. Это обеспечивает consistency без полной синхронизации всех нод.

Quorum math

N = общее количество нод
        R = количество нод для read quorum  
        W = количество нод для write quorum

        Условие consistency: R + W > N

        Примеры:
        N=3, R=2, W=2 → 2+2 > 3 ✓  (стандартный quorum)
        N=5, R=3, W=3 → 3+3 > 5 ✓  (strong quorum)
        N=5, R=2, W=3 → 2+3 > 5 ✗  (не гарантирует consistency!)

Quorum в CockroachDB / Spanner

// CockroachDB: 5-node cluster, 3 replicas per range
        // Write requires 3 replicas (write quorum = 3)
        // Read requires 2 replicas (read quorum = 2)
        // 2 + 3 > 5 → consistency guaranteed

        // При network partition:
        // US-East (3 nodes) — имеет write quorum → продолжает работать
        // EU-West (2 nodes) — нет write quorum → writes заблокированы

        // Это и есть CP поведение: consistency > availability

CAP и распределённые транзакции

PC (Two-Phase Commit) — CP

Coordinator                Participant A        Participant B
            │                           │                    │
            │─── PREPARE ──────────────>│                    │
            │                           │─── PREPARE ────────>│
            │                           │                    │
            │<── VOTE: COMMIT ─────────│                    │
            │<──────── VOTE: COMMIT ────│                    │
            │                           │                    │
            │─── COMMIT ───────────────>│                    │
            │─── COMMIT ────────────────────────────────────>│

Плюсы:

  • Строгая consistency (linearizability)
  • Атомарность распределённых транзакций

Минусы:

  • Блокирующие операции: все участники держат locks
  • Coordinator — single point of blocking
  • При partition: все участники, не связанные с quorum, блокируются
  • Низкая availability при network issues

В fintech: 2PC редко используется для high-throughput payment systems из-за lock contention на account'ах.

Saga Pattern — eventual consistency с гарантиями

Transfer Saga:
        Step 1: [LockService] Reserve $100 on Account A  ✓
        Step 2: [LedgerService] Post journal entry        ✓
        Step 3: [PaymentService] Charge external gateway  ✗ (timeout)

        Compensation:
        Step 3c: [PaymentService] Cancel charge (no-op)
        Step 2c: [LedgerService] Reverse journal entry     ✓
        Step 1c: [LockService] Release reservation         ✓

        Результат: система возвращается в consistent state
public class TransferSaga : ISaga
        {
            private readonly ILedgerService _ledger;
            private readonly IPaymentService _payment;
            private readonly ILockService _lock;
            private readonly ILogger<TransferSaga> _logger;
    
            public async Task<TransferResult> ExecuteAsync(TransferCommand cmd, 
                CancellationToken ct)
            {
                var sagaState = new SagaState
                {
                    TransferId = Guid.NewGuid(),
                    IdempotencyKey = cmd.IdempotencyKey,
                    SourceAccount = cmd.SourceAccount,
                    DestinationAccount = cmd.DestinationAccount,
                    Amount = cmd.Amount,
                    Currency = cmd.Currency,
                    Step = 0
                };
        
                try
                {
                    // Step 1: Reserve funds
                    sagaState.Step = 1;
                    var reservationId = await _lock.ReserveAsync(
                        cmd.SourceAccount, cmd.Amount, cmd.Currency, sagaState.TransferId, ct);
            
                    // Step 2: Post journal entry
                    sagaState.Step = 2;
                    var journalId = await _ledger.PostAsync(new JournalEntry(
                        sagaState.TransferId, sagaState.IdempotencyKey, DateTimeOffset.UtcNow,
                        $"Transfer {cmd.SourceAccount} → {cmd.DestinationAccount}",
                        new[]
                        {
                            new JournalPosting(cmd.SourceAccount, cmd.Amount, IsDebit: true, 1),
                            new JournalPosting(cmd.DestinationAccount, cmd.Amount, IsDebit: false, 1)
                        }), ct);
            
                    // Step 3: External payment
                    sagaState.Step = 3;
                    var paymentResult = await _payment.ChargeAsync(new ChargeRequest(
                        sagaState.TransferId, cmd.SourceAccount, cmd.DestinationAccount,
                        cmd.Amount, cmd.Currency), ct);
            
                    if (!paymentResult.Success)
                        throw new PaymentFailedException(paymentResult.Error);
            
                    // Complete
                    return TransferResult.Success(sagaState.TransferId);
                }
                catch (Exception ex)
                {
                    // Compensating transactions (in reverse order)
                    await CompensateAsync(sagaState, ex, ct);
                    return TransferResult.FailedCompensated(sagaState.TransferId);
                }
            }
    
            private async Task CompensateAsync(SagaState state, Exception reason, CancellationToken ct)
            {
                _logger.LogWarning(reason, "Saga {TransferId} failed at step {Step}, compensating",
                    state.TransferId, state.Step);
        
                // Compensate in reverse order
                if (state.Step >= 3)
                {
                    try { await _payment.CancelAsync(state.TransferId, ct); }
                    catch (Exception ex) { _logger.LogError(ex, "Compensation failed: cancel payment"); }
                }
        
                if (state.Step >= 2)
                {
                    try { await _ledger.ReverseAsync(state.TransferId, ct); }
                    catch (Exception ex) { _logger.LogError(ex, "Compensation failed: reverse journal"); }
                }
        
                if (state.Step >= 1)
                {
                    try { await _lock.ReleaseAsync(state.SourceAccount, state.TransferId, ct); }
                    catch (Exception ex) { _logger.LogError(ex, "Compensation failed: release lock"); }
                }
            }
        }

        public record SagaState
        {
            public Guid TransferId { get; set; }
            public string IdempotencyKey { get; set; } = string.Empty;
            public Guid SourceAccount { get; set; }
            public Guid DestinationAccount { get; set; }
            public decimal Amount { get; set; }
            public string Currency { get; set; } = string.Empty;
            public int Step { get; set; }
        }

CAP Theorem: практические выводы для fintech

Что CAP theorem говорит (и не говорит)

УтверждениеВерно?Пояснение
Можно выбрать C и A без PSingle-node БД (PostgreSQL) — CP, но P не применима
Distributed система всегда должна жертвовать C или A при PЭто и есть theorem
Ledger может выбирать APFinancial ledger должен быть CP
Все части системы должны быть одинакового CAPDynamic CAP: разные части — разные CAP
Eventual consistency подходит для core ledgerТолько для periphery (reporting, analytics)
2PC — лучший выбор для distributed financial transactionsSaga pattern предпочтительнее для high-throughput
CAP — это бинарный выбор⚠️На практике — спектр: от strong consistency до eventual
Quorum обеспечивает consistencyПри R + W > N

Decision Framework

1. Определите CAP boundary вашей системы:
           ┌─────────────────────────────────────────────┐
           │ Edge: AP (принимать запросы при любой partition) │
           │ Core: CP (ledger — strong consistency)           │
           │ Periphery: AP (reporting, analytics)             │
           └─────────────────────────────────────────────┘

        2. Выберите механизм consistency для каждого слоя:
           - Core: CP с quorum-based consensus (CockroachDB, Spanner, PostgreSQL)
           - Edge: AP с Kafka (at-least-once, idempotency)
           - Periphery: AP с eventual consistency (Cassandra, Elasticsearch)

        3. Orchestrация между слоями:
           - Saga pattern для cross-service transactions
           - Outbox pattern для event publishing
           - Idempotency keys для duplicate protection

        4. Reconciliation:
           - Automated comparison journal entries vs materialized balances
           - Alerting при обнаружении расхождений
           - Remediation pipeline для исправления

Дополнительные материалы

  • Eric Brewer, "CAP Writings" — https://cs.brown.edu/~mph/JavaPY2W2002/p12-brewer.pdf
  • Gilbert & Lynch, "Brewer's Conjecture and the Feasibility of Consistent, Available, Partition-Tolerant Web Services" (2002)
  • Martin Kleppmann, "Designing Data-Intensive Applications" — главы 6, 7
  • RIG Model: "Guaranteed Data-Consistent Microservice Systems" — https://www.infoq.com/articles/rig-data-consistent-microservices/
  • TechBullion, "Distributed Systems in U.S. Finance" — https://techbullion.com/distributed-systems-in-u-s-finance-the-patterns-that-survive-production/

Практика


Паттерны распределённой согласованности

Введение

В fintech distributed systems, где нет единой БД с ACID транзакциями для всех данных, согласованность достигается через композицию паттернов. Каждый паттерн решает определённую проблему, и правильный выбор — ключ к построению корректной системы.


Saga Pattern

Saga — последовательность локальных транзакций, где каждая завершается independently, а при ошибке выполняются compensating transactions (компенсирующие транзакции).

Choreography-based Saga

OrderService ──→ OrderCreatedEvent
                            │
                            ├──→ InventoryService (reserve stock)
                            │       │
                            │       ├──→ StockReservedEvent
                            │       │       │
                            │       │       └──→ PaymentService (charge)
                            │       │               │
                            │       │               ├──→ PaymentChargedEvent
                            │       │               │       │
                            │       │               │       └──→ OrderService (confirm)
                            │       │               │
                            │       │               └──→ PaymentFailedEvent ──→ InventoryService (release stock)
                            │       │
                            │       └──→ StockReservationFailedEvent ──→ OrderService (cancel)
                            │
                            └──→ NotificationService (send confirmation)
// Choreography-based: events drive the saga
        public class OrderCreatedEventHandler : IEventHandler<OrderCreatedEvent>
        {
            private readonly IInventoryService _inventory;
            private readonly ILogger<OrderCreatedEventHandler> _logger;
    
            public async Task HandleAsync(OrderCreatedEvent evt, CancellationToken ct)
            {
                try
                {
                    await _inventory.ReserveStockAsync(evt.OrderId, evt.Items, ct);
                    await _eventBus.PublishAsync(new StockReservedEvent(evt.OrderId), ct);
                }
                catch (Exception ex)
                {
                    _logger.LogError(ex, "Stock reservation failed for order {OrderId}", evt.OrderId);
                    await _eventBus.PublishAsync(new StockReservationFailedEvent(evt.OrderId), ct);
                }
            }
        }

        public class PaymentChargedEventHandler : IEventHandler<PaymentChargedEvent>
        {
            private readonly IOrderService _order;
    
            public async Task HandleAsync(PaymentChargedEvent evt, CancellationToken ct)
            {
                await _order.ConfirmOrderAsync(evt.OrderId, ct);
                await _eventBus.PublishAsync(new OrderConfirmedEvent(evt.OrderId), ct);
            }
        }

        public class PaymentFailedEventHandler : IEventHandler<PaymentFailedEvent>
        {
            private readonly IInventoryService _inventory;
    
            public async Task HandleAsync(PaymentFailedEvent evt, CancellationToken ct)
            {
                // Compensating: release reserved stock
                await _inventory.ReleaseStockAsync(evt.OrderId, ct);
                await _eventBus.PublishAsync(new StockReleasedEvent(evt.OrderId), ct);
            }
        }

Orchestration-based Saga (рекомендуется для fintech)

// Orchestration-based: central orchestrator controls the flow
        public class PaymentSagaOrchestrator
        {
            private readonly ILedgerService _ledger;
            private readonly IPaymentGateway _gateway;
            private readonly IKafkaProducer _producer;
            private readonly ILogger<PaymentSagaOrchestrator> _logger;
    
            public async Task<SagaResult> ExecuteAsync(PaymentSagaCommand cmd, 
                CancellationToken ct)
            {
                var saga = new SagaInstance
                {
                    Id = Guid.NewGuid(),
                    IdempotencyKey = cmd.IdempotencyKey,
                    SourceAccount = cmd.SourceAccount,
                    DestinationAccount = cmd.DestinationAccount,
                    Amount = cmd.Amount,
                    Currency = cmd.Currency,
                    Step = 0,
                    State = SagaState.Active
                };
        
                // Persist saga state (для recovery после crash)
                await PersistSagaStateAsync(saga, ct);
        
                try
                {
                    // Step 1: Validate
                    saga.Step = 1;
                    var isValid = await ValidateAsync(saga, ct);
                    if (!isValid)
                        return SagaResult.Invalid(saga.Id);
            
                    // Step 2: Reserve funds
                    saga.Step = 2;
                    await ReserveFundsAsync(saga, ct);
            
                    // Step 3: Post journal entry
                    saga.Step = 3;
                    await PostJournalEntryAsync(saga, ct);
            
                    // Step 4: Execute external payment
                    saga.Step = 4;
                    var paymentResult = await ExecuteExternalPaymentAsync(saga, ct);
            
                    if (!paymentResult.Success)
                        throw new ExternalPaymentFailedException(paymentResult.Error);
            
                    // Step 5: Confirm
                    saga.Step = 5;
                    await ConfirmPaymentAsync(saga, ct);
            
                    saga.State = SagaState.Completed;
                    await PersistSagaStateAsync(saga, ct);
            
                    return SagaResult.Success(saga.Id);
                }
                catch (Exception ex)
                {
                    saga.State = SagaState.Compensating;
                    await PersistSagaStateAsync(saga, ct);
            
                    // Compensate in reverse
                    await CompensateAsync(saga, ex, ct);
            
                    saga.State = SagaState.Compensated;
                    await PersistSagaStateAsync(saga, ct);
            
                    return SagaResult.Compensated(saga.Id);
                }
            }
    
            private async Task CompensateAsync(SagaInstance saga, Exception reason, CancellationToken ct)
            {
                _logger.LogWarning(reason, "Saga {SagaId} compensating from step {Step}", 
                    saga.Id, saga.Step);
        
                // Compensate in REVERSE order
                if (saga.Step >= 5)
                    try { await UnconfirmPaymentAsync(saga, ct); }
                    catch (Exception ex) { await LogCompensationFailureAsync(saga.Id, ex); }
        
                if (saga.Step >= 4)
                    try { await CancelExternalPaymentAsync(saga, ct); }
                    catch (Exception ex) { await LogCompensationFailureAsync(saga.Id, ex); }
        
                if (saga.Step >= 3)
                    try { await ReverseJournalEntryAsync(saga, ct); }
                    catch (Exception ex) { await LogCompensationFailureAsync(saga.Id, ex); }
        
                if (saga.Step >= 2)
                    try { await ReleaseFundsAsync(saga, ct); }
                    catch (Exception ex) { await LogCompensationFailureAsync(saga.Id, ex); }
            }
        }

        public record SagaInstance
        {
            public Guid Id { get; set; }
            public string IdempotencyKey { get; set; } = string.Empty;
            public Guid SourceAccount { get; set; }
            public Guid DestinationAccount { get; set; }
            public decimal Amount { get; set; }
            public string Currency { get; set; } = string.Empty;
            public int Step { get; set; }
            public SagaState State { get; set; }
        }

        public enum SagaState { Active, Compensating, Completed, Compensated, Failed }

Saga: правила для fintech

ПравилоОписание
Idempotent compensationsКаждая compensating transaction должна быть idempotent
Persist saga stateSaga state сохраняется в БД для recovery после crash
No interleavingSagas не должны interleaving'аться на одних и тех же данных
Audit every stepКаждый step и compensation логируются
Timeout handlingTimeout для long-running sagas → manual review
No 2PCSaga заменяет distributed 2PC в microservices

Outbox Pattern

Outbox гарантирует atomicity между записью данных и публикацией события.

Inbox Pattern (для exactly-once обработки)

// Inbox: prevents duplicate processing of events
        CREATE TABLE inbox_events (
            id UUID PRIMARY KEY,
            source VARCHAR(100) NOT NULL,
            event_type VARCHAR(100) NOT NULL,
            payload JSONB NOT NULL,
            received_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
        );

        CREATE UNIQUE INDEX uq_inbox_id_source ON inbox_events(id, source);

        // При получении события:
        // INSERT ... ON CONFLICT DO NOTHING — если уже есть, событие пропущено
public class InboxProcessor<TEvent>
        {
            private readonly NpgsqlDataSource _dataSource;
            private readonly IEventHandler<TEvent> _handler;
            private readonly ILogger<InboxProcessor<TEvent>> _logger;
    
            public async Task ProcessAsync(InboxMessage<TEvent> message, CancellationToken ct)
            {
                using var conn = await _dataSource.OpenConnectionAsync(ct);
                using var txn = await conn.BeginTransactionAsync(ct);
        
                // Try to insert into inbox (UNIQUE constraint prevents duplicates)
                var inserted = await conn.ExecuteAsync(
                    """
                    INSERT INTO inbox_events (id, source, event_type, payload)
                    VALUES (@Id, @Source, @Type, @Payload)
                    """,
                    new { message.Id, Source = message.Source, Type = message.EventType, 
                          Payload = message.Payload },
                    transaction: txn, ct: ct);
        
                if (inserted == 0)
                {
                    // Already processed — skip
                    _logger.LogDebug("Inbox event {EventId} already processed, skipping", message.Id);
                    return;
                }
        
                // Process event within the same transaction
                var evt = JsonSerializer.Deserialize<TEvent>(message.Payload);
                if (evt != null)
                    await _handler.HandleAsync(evt, ct);
        
                await txn.CommitAsync(ct);
            }
        }

CQRS (Command Query Responsibility Segregation)

Разделение write (command) и read (query) моделей для independent scaling и consistency control.

CQRS в fintech

┌──────────────────────────────────────────────────────────────────┐
        │                          Write Path                              │
        │                                                                  │
        │  API ──→ Command Validator ──→ Saga Orchestrator ──→ PostgreSQL  │
        │                                    │                              │
        │                                    ▼                              │
        │                              Journal Entry                        │
        │                                    │                              │
        │                                    ▼                              │
        │                              Outbox Event                         │
        │                                    │                              │
        └──────────────────────────────────────────────────────────────────┘
                                            │
        ┌──────────────────────────────────────────────────────────────────┐
        │                          Read Path                               │
        │                                                                  │
        │                              Event Bus                            │
        │                                    │                              │
        │                                    ▼                              │
        │                              Read Processor                       │
        │                                    │                              │
        │                                    ▼                              │
        │                              Materialized Views (PostgreSQL)      │
        │                                    │                              │
        │                                    ▼                              │
        │                              API ──→ Balance / History / Reports  │
        └──────────────────────────────────────────────────────────────────┘
// Command side: strongly consistent
        public class TransferCommandHandler
        {
            public async Task<TransferResult> Handle(TransferCommand cmd, CancellationToken ct)
            {
                using var txn = await _db.BeginTransactionAsync(IsolationLevel.Serializable, ct);
        
                // Validate: check balance
                var balance = await GetBalanceAsync(cmd.SourceAccount, BalanceType.Writer, txn, ct);
                if (balance < cmd.Amount)
                    return TransferResult.InsufficientFunds;
        
                // Execute: atomic debit + credit + journal
                await DebitAsync(cmd.SourceAccount, cmd.Amount, txn, ct);
                await CreditAsync(cmd.DestinationAccount, cmd.Amount, txn, ct);
                await PostJournalEntryAsync(cmd, txn, ct);
        
                // Publish event to outbox (same transaction)
                await PublishOutboxEventAsync(new FundsTransferredEvent(
                    cmd.SourceAccount, cmd.DestinationAccount, cmd.Amount), txn, ct);
        
                await txn.CommitAsync();
                return TransferResult.Success;
            }
        }

        // Query side: eventually consistent, optimized for reads
        public class BalanceQueryHandler
        {
            public async Task<BalanceResult> Handle(GetBalanceQuery query, CancellationToken ct)
            {
                // Fast read from materialized view
                var balance = await _readDb.QueryFirstOrDefaultAsync<decimal>(
                    "SELECT balance FROM account_balances WHERE account_id = @Id",
                    new { query.AccountId }, ct: ct);
        
                return new BalanceResult(query.AccountId, balance, DateTimeOffset.UtcNow);
            }
        }

        // Read Processor: consumes events and updates materialized views
        public class FundsTransferredReadProcessor : IEventHandler<FundsTransferredEvent>
        {
            public async Task HandleAsync(FundsTransferredEvent evt, CancellationToken ct)
            {
                using var conn = await _readDb.OpenConnectionAsync(ct);
        
                // Update materialized balances
                await conn.ExecuteAsync(
                    "UPDATE account_balances SET balance = balance - @Amount WHERE account_id = @Source",
                    new { evt.Amount, Source = evt.SourceAccount }, ct: ct);
        
                await conn.ExecuteAsync(
                    "UPDATE account_balances SET balance = balance + @Amount WHERE account_id = @Dest",
                    new { evt.Amount, Dest = evt.DestinationAccount }, ct: ct);
            }
        }

Event Sourcing

Event Sourcing — паттерн, где состояние системы восстанавливается из последовательности immutable events.

Event Sourcing для ledger

// Events (immutable, append-only)
        public record FundsTransferred(
            Guid AggregateId,
            Guid SourceAccount,
            Guid DestinationAccount,
            decimal Amount,
            string Currency,
            string IdempotencyKey,
            DateTimeOffset OccurredAt);

        public record AccountOpened(
            Guid AggregateId,
            string AccountNumber,
            string Currency,
            decimal OpeningBalance,
            DateTimeOffset OccurredAt);

        // Aggregate root
        public class Account : EntityGuid
        {
            private readonly List<object> _uncommittedEvents = new();
    
            public string Number { get; private set; } = string.Empty;
            public string Currency { get; private set; } = string.Empty;
            public decimal Balance => CalculateBalance();
            public int Version { get; private set; }
    
            public void Open(string accountNumber, string currency, decimal openingBalance, 
                Guid createdBy)
            {
                Ensure.NotEmpty(accountNumber, nameof(accountNumber));
        
                var @event = new AccountOpened(Id, accountNumber, currency, openingBalance, 
                    DateTimeOffset.UtcNow);
        
                Apply(@event);
            }
    
            public void Transfer(Guid sourceId, Guid destId, decimal amount, string currency,
                string idempotencyKey, Guid createdBy)
            {
                if (amount <= 0) throw new ArgumentException("Amount must be positive");
                if (Balance < amount) throw new InsufficientFundsException(Balance, amount);
        
                var @event = new FundsTransferred(Id, sourceId, destId, amount, currency,
                    idempotencyKey, DateTimeOffset.UtcNow);
        
                Apply(@event);
            }
    
            private void Apply(@event)
            {
                switch (@event)
                {
                    case AccountOpened e:
                        Number = e.AccountNumber;
                        Currency = e.Currency;
                        break;
                    case FundsTransferred e:
                        // Balance updated by event processor, not directly
                        break;
                }
        
                _uncommittedEvents.Add(@event);
                Version++;
            }
    
            private decimal CalculateBalance()
            {
                // Balance derived from events
                return 0m; // computed by projection
            }
    
            public IReadOnlyList<object> GetUncommittedEvents() => _uncommittedEvents.AsReadOnly();
            public void ClearUncommittedEvents() => _uncommittedEvents.Clear();
        }
-- Event store table
        CREATE TABLE event_store (
            id BIGSERIAL PRIMARY KEY,
            aggregate_id UUID NOT NULL,
            event_type VARCHAR(200) NOT NULL,
            payload JSONB NOT NULL,
            version INTEGER NOT NULL,
            created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
            metadata JSONB
        );

        CREATE INDEX idx_event_store_aggregate ON event_store(aggregate_id, version);
        CREATE UNIQUE INDEX uq_event_store_aggregate_version ON event_store(aggregate_id, version);

        -- Append-only: no UPDATE/DELETE
        -- GRANT INSERT, SELECT ON event_store TO event_store_writer;
        -- REVOKE UPDATE, DELETE ON event_store FROM event_store_writer;
// Event Sourcing repository
        public class EventSourcingRepository<TAggregate>
            where TAggregate : EntityGuid, IAggregateRoot
        {
            private readonly NpgsqlDataSource _dataSource;
    
            public async Task<TAggregate> LoadAsync(Guid id, CancellationToken ct)
            {
                var events = await _dataSource.QueryAsync<EventRecord>(
                    """
                    SELECT event_type, payload, version
                    FROM event_store
                    WHERE aggregate_id = @Id
                    ORDER BY version
                    """,
                    new { Id = id }, ct: ct);
        
                // Create empty aggregate and replay events
                var aggregate = (TAggregate)Activator.CreateInstance(typeof(TAggregate), id);
        
                foreach (var evt in events)
                {
                    var deserialized = JsonSerializer.Deserialize(evt.Payload);
                    if (deserialized != null)
                        aggregate.Apply(deserialized);
            
                    aggregate.Version = evt.Version;
                }
        
                return aggregate;
            }
    
            public async Task SaveAsync(TAggregate aggregate, CancellationToken ct)
            {
                using var txn = await _dataSource.BeginTransactionAsync(IsolationLevel.Serializable, ct);
        
                var uncommitted = aggregate.GetUncommittedEvents();
                if (uncommitted.Count == 0) return;
        
                foreach (var evt in uncommitted)
                {
                    var payload = JsonSerializer.Serialize(evt);
                    var eventType = evt.GetType().Name;
                    var version = aggregate.Version - uncommitted.Count + 
                                  Array.IndexOf(uncommitted.ToArray(), evt) + 1;
            
                    await txn.ExecuteAsync(
                        """
                        INSERT INTO event_store (aggregate_id, event_type, payload, version, created_at)
                        VALUES (@Id, @Type, @Payload, @Version, @Created)
                        """,
                        new { Id = aggregate.Id, Type = eventType, Payload = payload,
                              Version = version, Created = DateTimeOffset.UtcNow },
                        ct: ct);
                }
        
                aggregate.ClearUncommittedEvents();
                await txn.CommitAsync(ct);
            }
        }

        public record EventRecord(string EventType, string Payload, int Version);

RIG Model

RIG (Run-Isolate-Group) — модель для проектирования microservices с гарантированной eventual consistency через saga pattern.

RIG категории

КатегорияОписаниеПравилоПример
RunТранзакции, которые можно interleaving'атьНет ограниченийAnalytics, reporting
IsolateТранзакции, которые блокируют данные на время sagaData locked during sagaOrder processing
GroupТранзакции, которые работают с общими данными без блокировкиRequires careful saga designShared inventory

RIG в действии

Payment Saga:
        ┌────────────────────────────────────────────────────────────┐
        │                                                            │
        │  Run (no isolation needed):                                │
        │  ┌─────────────┐  ┌─────────────┐                         │
        │  │ Notification │  │  Analytics  │                         │
        │  └─────────────┘  └─────────────┘                         │
        │                                                            │
        │  Isolate (data locked during saga):                        │
        │  ┌─────────────┐  ┌─────────────┐  ┌─────────────┐       │
        │  │  Ledger     │→│  Payment    │→│  Settlement │       │
        │  │  (lock acct)│  │  (lock acct)│  │             │       │
        │  └─────────────┘  └─────────────┘  └─────────────┘       │
        │                                                            │
        │  Group (shared data, careful saga):                        │
        │  ┌─────────────┐  ┌─────────────┐                         │
        │  │  Fraud      │  │  Compliance │                         │
        │  │  Detection  │  │  Check      │                         │
        │  └─────────────┘  └─────────────┘                         │
        │                                                            │
        └────────────────────────────────────────────────────────────┘

Idempotency Keys

Idempotency — обязательный паттерн для всех state-changing операций в fintech.

Implementation

public class IdempotencyMiddleware<TContext> : IMiddleware
        {
            private readonly NpgsqlDataSource _dataSource;
            private readonly RequestDelegate _next;
    
            public IdempotencyMiddleware(NpgsqlDataSource dataSource, RequestDelegate next)
            {
                _dataSource = dataSource;
                _next = next;
            }
    
            public async Task InvokeAsync(HttpContext context, CancellationToken ct)
            {
                var idempotencyKey = context.Request.Headers["Idempotency-Key"].ToString();
        
                if (!string.IsNullOrEmpty(idempotencyKey))
                {
                    // Check if result already exists
                    var cached = await GetCachedResultAsync(idempotencyKey, ct);
                    if (cached != null)
                    {
                        context.Response.StatusCode = cached.StatusCode;
                        context.Response.ContentType = "application/json";
                        await context.Response.WriteAsync(cached.Body, ct);
                        return;
                    }
            
                    // Store pending state
                    await StorePendingStateAsync(idempotencyKey, ct);
                }
        
                await _next(context);
        
                // Cache result
                if (!string.IsNullOrEmpty(idempotencyKey))
                {
                    await StoreResultAsync(idempotencyKey, context.Response, ct);
                }
            }
    
            private async Task<CachedResult?> GetCachedResultAsync(string key, CancellationToken ct)
            {
                using var conn = await _dataSource.OpenConnectionAsync(ct);
                return await conn.QueryFirstOrDefaultAsync<CachedResult>(
                    """
                    SELECT status_code, content_type, body
                    FROM idempotency_cache
                    WHERE key = @Key AND expires_at > NOW()
                    """, new { Key = key }, ct: ct);
            }
    
            private Task StoreResultAsync(string key, HttpResponse response, CancellationToken ct)
            {
                return _dataSource.ExecuteAsync(
                    """
                    INSERT INTO idempotency_cache (key, status_code, content_type, body, expires_at)
                    VALUES (@Key, @Status, @Type, @Body, @Expires)
                    ON CONFLICT (key) DO NOTHING
                    """,
                    new {
                        Key = key,
                        Status = response.StatusCode,
                        Type = response.ContentType ?? "application/json",
                        Body = "cached_response_body",
                        Expires = DateTimeOffset.UtcNow + TimeSpan.FromHours(24)
                    }, ct: ct);
            }
        }

        public record CachedResult(int StatusCode, string ContentType, string Body);

Сравнение паттернов

ПаттернConsistencyComplexityUse CaseFintech适用
2PCStrongВысокаяMonolithic distributed❌ lock contention
Saga (choreography)EventualСредняяSimple workflows⚠️ hard to debug
Saga (orchestration)EventualСредняяComplex financial flows✅ recommended
Outbox + InboxExactly-onceНизкаяEvent publishing/consuming✅ essential
CQRSStrong (write) / Eventual (read)СредняяHigh-throughput reads✅ recommended
Event SourcingStrongВысокаяAudit trails, time-travel✅ for ledger
Idempotency keysExactly-onceНизкаяAll state-changing APIs✅ essential

Checklist для distributed consistency

  • [ ] Saga pattern для всех cross-service транзакций
  • [ ] Saga state persisted для crash recovery
  • [ ] Idempotent compensating transactions
  • [ ] Outbox pattern для event publishing
  • [ ] Inbox pattern для exactly-once event processing
  • [ ] Idempotency keys на всех state-changing API
  • [ ] CQRS split: strong consistency (write) + eventual (read)
  • [ ] Reconciliation pipeline для verification
  • [ ] No interleaving sagas на одних данных
  • [ ] Distributed tracing для saga debugging

Дополнительные материалы

  • Chris Richardson, "Microservices Patterns" — Saga chapter
  • Sam Newman, "Building Microservices" — Chapter 6: Transactional Messaging
  • RIG Model: https://www.infoq.com/articles/rig-data-consistent-microservices/
  • Martin Fowler, "Saga Pattern": https://martinfowler.com/articles/saga.html

Практика


Архитектура финансовых реестров

Введение

Financial ledger — сердце любой fintech системы. Это append-only, double-entry journal, который является source of truth для всех финансовых операций. Правильная архитектура ledger определяет корректность, масштабируемость и auditable всей системы.


Ledger Architecture: Core Principles

принципов financial ledger

#ПринципОписание
1Double-entrysum(debits) = sum(credits) для каждой записи
2Append-onlyZero UPDATE/DELETE на journal entries
3IdempotentIdempotency keys предотвращают дублирование
4AtomicPosting journal entry + events в одной транзакции
5Immutable auditTamper-evident audit log с cryptographic hashes
6ReconcilableAutomated reconciliation journal vs balances
7Strongly consistentSerializable isolation для posting operations

Ledger Schema Design

PostgreSQL Schema

-- Accounts table
        CREATE TABLE accounts (
            id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
            account_number VARCHAR(50) NOT NULL UNIQUE,
            name VARCHAR(200) NOT NULL,
            currency CHAR(3) NOT NULL CHECK (currency ~ '^[A-Z]{3}$'),
            type VARCHAR(20) NOT NULL CHECK (type IN ('ASSET', 'LIABILITY', 'EQUITY', 'REVENUE', 'EXPENSE')),
            status VARCHAR(20) NOT NULL DEFAULT 'ACTIVE' CHECK (status IN ('ACTIVE', 'SUSPENDED', 'CLOSED')),
            created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
            updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
        );

        CREATE INDEX idx_accounts_currency ON accounts(currency);
        CREATE INDEX idx_accounts_status ON accounts(status);

        -- Journal entries (immutable, append-only)
        CREATE TABLE journal_entries (
            id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
            idempotency_key VARCHAR(100) NOT NULL UNIQUE,
            timestamp TIMESTAMPTZ NOT NULL DEFAULT NOW(),
            description TEXT,
            metadata JSONB,
            status VARCHAR(20) NOT NULL DEFAULT 'POSTED' CHECK (status IN ('POSTED', 'REVERSED')),
            created_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
        );

        -- CHECK constraint: double-entry invariant
        ALTER TABLE journal_entries ADD CONSTRAINT chk_double_entry_invariant
        CHECK (
            (SELECT COALESCE(SUM(amount * CASE WHEN is_debit = true THEN 1 ELSE -1 END), 0)
             FROM journal_postings WHERE entry_id = journal_entries.id) = 0
        );

        -- Journal postings
        CREATE TABLE journal_postings (
            id BIGSERIAL PRIMARY KEY,
            entry_id UUID NOT NULL REFERENCES journal_entries(id) ON DELETE RESTRICT,
            account_id UUID NOT NULL REFERENCES accounts(id) ON DELETE RESTRICT,
            amount DECIMAL(20,8) NOT NULL CHECK (amount > 0),
            is_debit BOOLEAN NOT NULL,
            version INTEGER NOT NULL DEFAULT 1,
            created_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
        );

        CREATE INDEX idx_postings_account ON journal_postings(account_id);
        CREATE INDEX idx_postings_entry ON journal_postings(entry_id);

        -- Materialized balances (read-optimized)
        CREATE MATERIALIZED VIEW account_balances AS
        SELECT 
            jp.account_id,
            a.currency,
            SUM(jp.amount * CASE WHEN jp.is_debit = true THEN -1 ELSE 1 END) AS balance
        FROM journal_postings jp
        JOIN journal_entries je ON jp.entry_id = je.id
        JOIN accounts a ON jp.account_id = a.id
        WHERE je.status = 'POSTED'
        GROUP BY jp.account_id, a.currency;

        CREATE UNIQUE INDEX uq_account_balances_account ON account_balances(account_id);

        -- Refresh trigger
        CREATE OR REPLACE FUNCTION refresh_balances()
        RETURNS TRIGGER AS $$
        BEGIN
            REFRESH MATERIALIZED VIEW CONCURRENTLY account_balances;
            RETURN NEW;
        END;
        $$ LANGUAGE plpgsql;

        CREATE TRIGGER trg_refresh_balances
            AFTER INSERT ON journal_entries
            FOR EACH STATEMENT EXECUTE FUNCTION refresh_balances();

        -- Outbox events
        CREATE TABLE outbox_events (
            id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
            aggregate_id UUID NOT NULL,
            event_type VARCHAR(100) NOT NULL,
            payload JSONB NOT NULL,
            created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
            published_at TIMESTAMPTZ,
            retry_count INTEGER NOT NULL DEFAULT 0
        );

        CREATE INDEX idx_outbox_unpublished ON outbox_events(created_at) 
            WHERE published_at IS NULL;

        -- Idempotency keys
        CREATE TABLE idempotency_keys (
            key VARCHAR(100) PRIMARY KEY,
            result JSONB NOT NULL,
            expires_at TIMESTAMPTZ NOT NULL
        );

        CREATE INDEX idx_idempotency_expires ON idempotency_keys(expires_at) 
            WHERE expires_at > NOW();

        -- Audit log (tamper-evident)
        CREATE TABLE audit_log (
            id BIGSERIAL PRIMARY KEY,
            event_type VARCHAR(100) NOT NULL,
            actor_id UUID,
            entity_type VARCHAR(50) NOT NULL,
            entity_id UUID,
            action VARCHAR(50) NOT NULL,
            old_state JSONB,
            new_state JSONB,
            metadata JSONB,
            ip_address INET,
            user_agent TEXT,
            hash TEXT,
            prev_hash TEXT,
            created_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
        );

        CREATE INDEX idx_audit_log_entity ON audit_log(entity_type, entity_id);
        CREATE INDEX idx_audit_log_created ON audit_log(created_at);

Ledger Service Implementation

Posting Engine

public record JournalEntry(
            Guid Id,
            string IdempotencyKey,
            DateTimeOffset Timestamp,
            string Description,
            IReadOnlyList<JournalPosting> Postings,
            Dictionary<string, object>? Metadata = null);

        public record JournalPosting(
            Guid AccountId,
            decimal Amount,
            bool IsDebit,
            int Version);

        public record PostJournalEntryResult(
            bool Success,
            Guid? EntryId,
            string? Error,
            bool IsDuplicate);

        public class PostingEngine
        {
            private readonly NpgsqlDataSource _dataSource;
            private readonly IEventBus _eventBus;
            private readonly ILogger<PostingEngine> _logger;
    
            public PostingEngine(NpgsqlDataSource dataSource, IEventBus eventBus,
                ILogger<PostingEngine> logger)
            {
                _dataSource = dataSource;
                _eventBus = eventBus;
                _logger = logger;
            }
    
            public async Task<PostJournalEntryResult> PostAsync(JournalEntry entry, 
                CancellationToken ct)
            {
                // Validate double-entry invariant
                var netAmount = entry.Postings.Sum(p => p.Amount * (p.IsDebit ? 1m : -1m));
                if (Math.Abs(netAmount) > decimal.Epsilon)
                    return PostJournalEntryResult.Failure(null, 
                        $"Double-entry invariant violated: net amount = {netAmount}");
        
                using var conn = await _dataSource.OpenConnectionAsync(ct);
                using var txn = await conn.BeginTransactionAsync(IsolationLevel.Serializable, ct);
        
                try
                {
                    // 1. Idempotency check
                    var existingEntryId = await conn.QueryFirstOrDefaultAsync<Guid?>(
                        "SELECT id FROM journal_entries WHERE idempotency_key = @Key",
                        new { Key = entry.IdempotencyKey }, transaction: txn, ct: ct);
            
                    if (existingEntryId.HasValue)
                        return PostJournalEntryResult.Duplicate(existingEntryId.Value);
            
                    // 2. Insert journal entry
                    await conn.ExecuteAsync(
                        """
                        INSERT INTO journal_entries (id, idempotency_key, timestamp, description, metadata, status)
                        VALUES (@Id, @Key, @Ts, @Desc, @Meta, 'POSTED')
                        """,
                        new {
                            Id = entry.Id,
                            Key = entry.IdempotencyKey,
                            Ts = entry.Timestamp,
                            Desc = entry.Description,
                            Meta = entry.Metadata != null ? JsonSerializer.Serialize(entry.Metadata) : (object)DBNull.Value
                        },
                        transaction: txn, ct: ct);
            
                    // 3. Insert postings
                    var postings = entry.Postings.Select(p => new
                    {
                        EntryId = entry.Id,
                        p.AccountId,
                        p.Amount,
                        p.IsDebit,
                        p.Version
                    });
            
                    await conn.ExecuteAsync(
                        """
                        INSERT INTO journal_postings (entry_id, account_id, amount, is_debit, version)
                        VALUES (@EntryId, @AccountId, @Amount, @IsDebit, @Version)
                        """,
                        postings,
                        transaction: txn, ct: ct);
            
                    // 4. Publish outbox event (same transaction!)
                    var eventId = Guid.NewGuid();
                    var payload = JsonSerializer.Serialize(new
                    {
                        entry.Id,
                        entry.IdempotencyKey,
                        entry.Description,
                        entry.Postings.Count,
                        entry.Timestamp
                    });
            
                    await conn.ExecuteAsync(
                        """
                        INSERT INTO outbox_events (id, aggregate_id, event_type, payload, created_at)
                        VALUES (@Id, @AggId, @Type, @Payload, @Created)
                        """,
                        new {
                            Id = eventId,
                            AggId = entry.Id,
                            Type = "JournalEntryPosted",
                            Payload = payload,
                            Created = DateTimeOffset.UtcNow
                        },
                        transaction: txn, ct: ct);
            
                    // 5. Audit log
                    await conn.ExecuteAsync(
                        """
                        INSERT INTO audit_log (event_type, entity_type, entity_id, action, new_state, created_at)
                        VALUES ('LEDGER_POST', 'JOURNAL_ENTRY', @EntryId, 'POSTED', @State, NOW())
                        """,
                        new { EntryId = entry.Id, State = payload },
                        transaction: txn, ct: ct);
            
                    await txn.CommitAsync();
            
                    _logger.LogInformation("Journal entry {EntryId} posted with {PostingCount} postings",
                        entry.Id, entry.Postings.Count);
            
                    return PostJournalEntryResult.Success(entry.Id);
                }
                catch (DbException ex) when (ex.Message.Contains("chk_double_entry_invariant"))
                {
                    await txn.RollbackAsync(ct);
                    return PostJournalEntryResult.Failure(null, "Double-entry invariant violated");
                }
                catch (DbException ex) when (ex.Message.Contains("uq_journal_idempotency"))
                {
                    await txn.RollbackAsync(ct);
                    var existingId = await conn.QueryFirstOrDefaultAsync<Guid?>(
                        "SELECT id FROM journal_entries WHERE idempotency_key = @Key",
                        new { Key = entry.IdempotencyKey }, ct: ct);
                    return PostJournalEntryResult.Duplicate(existingId ?? Guid.Empty);
                }
                catch
                {
                    await txn.RollbackAsync(ct);
                    throw;
                }
            }
        }

        public static class PostJournalEntryResult
        {
            public static PostJournalEntryResult Success(Guid entryId) => 
                new(true, entryId, null, false);
    
            public static PostJournalEntryResult Failure(Guid? entryId, string error) => 
                new(false, entryId, error, false);
    
            public static PostJournalEntryResult Duplicate(Guid entryId) => 
                new(true, entryId, "Duplicate idempotency key", true);
        }

Transfer Service

public record TransferCommand(
            Guid SourceAccountId,
            Guid DestinationAccountId,
            decimal Amount,
            string Currency,
            string IdempotencyKey,
            string Description);

        public record TransferResult(
            bool Success,
            Guid? EntryId,
            string? Error,
            bool IsDuplicate);

        public class TransferService
        {
            private readonly PostingEngine _postingEngine;
            private readonly NpgsqlDataSource _dataSource;
            private readonly ILogger<TransferService> _logger;
    
            public TransferService(PostingEngine postingEngine, NpgsqlDataSource dataSource,
                ILogger<TransferService> logger)
            {
                _postingEngine = postingEngine;
                _dataSource = dataSource;
                _logger = logger;
            }
    
            public async Task<TransferResult> ExecuteAsync(TransferCommand cmd, 
                CancellationToken ct)
            {
                // Validate amounts
                if (cmd.Amount <= 0)
                    return TransferResult.Failure(null, "Amount must be positive");
        
                // Validate accounts exist
                var accountsExist = await ValidateAccountsAsync(cmd, ct);
                if (!accountsExist)
                    return TransferResult.Failure(null, "One or both accounts not found");
        
                // Check balance (strong consistency from writer)
                var sourceBalance = await GetWriterBalanceAsync(cmd.SourceAccountId, ct);
                if (sourceBalance < cmd.Amount)
                    return TransferResult.Failure(null, 
                        $"Insufficient funds: {sourceBalance} < {cmd.Amount}");
        
                // Create double-entry journal
                var entry = new JournalEntry(
                    Id: Guid.NewGuid(),
                    IdempotencyKey: cmd.IdempotencyKey,
                    Timestamp: DateTimeOffset.UtcNow,
                    Description: cmd.Description ?? $"Transfer {cmd.SourceAccountId} → {cmd.DestinationAccountId}",
                    Postings: new List<JournalPosting>
                    {
                        new JournalPosting(cmd.SourceAccountId, cmd.Amount, IsDebit: true, 1),
                        new JournalPosting(cmd.DestinationAccountId, cmd.Amount, IsDebit: false, 1)
                    }
                );
        
                var result = await _postingEngine.PostAsync(entry, ct);
        
                if (!result.Success)
                    return TransferResult.Failure(result.EntryId, result.Error ?? "Unknown error");
        
                if (result.IsDuplicate)
                    return TransferResult.Duplicate(result.EntryId ?? Guid.Empty);
        
                return TransferResult.Success(result.EntryId ?? Guid.Empty);
            }
    
            private async Task<bool> ValidateAccountsAsync(TransferCommand cmd, CancellationToken ct)
            {
                using var conn = await _dataSource.OpenConnectionAsync(ct);
                var count = await conn.QueryFirstOrDefaultAsync<int>(
                    """
                    SELECT COUNT(*) FROM accounts 
                    WHERE id IN (@Source, @Dest) AND status = 'ACTIVE'
                    """,
                    new { Source = cmd.SourceAccountId, Dest = cmd.DestinationAccountId },
                    ct: ct);
        
                return count == 2;
            }
    
            private async Task<decimal> GetWriterBalanceAsync(Guid accountId, CancellationToken ct)
            {
                using var conn = await _dataSource.OpenConnectionAsync(ct);
                return await conn.QueryFirstOrDefaultAsync<decimal>(
                    """
                    SELECT COALESCE(SUM(amount * CASE WHEN is_debit = true THEN -1 ELSE 1 END), 0)
                    FROM journal_postings jp
                    JOIN journal_entries je ON jp.entry_id = je.id
                    WHERE jp.account_id = @Id AND je.status = 'POSTED'
                    """,
                    new { Id = accountId }, ct: ct);
            }
        }

        public static class TransferResult
        {
            public static TransferResult Success(Guid entryId) => 
                new(true, entryId, null, false);
    
            public static TransferResult Failure(Guid? entryId, string error) => 
                new(false, entryId, error, false);
    
            public static TransferResult Duplicate(Guid entryId) => 
                new(true, entryId, "Duplicate idempotency key", true);
        }

Balance Types и Consistency Levels

Три типа баланса

ТипИсточникConsistencyLatencyUse Case
WriterDirect journal queryStrong (serializable)~10msTransfer validation, financial decisions
SessionWriter после writeSession-level~5ms"Check balance after transfer"
MaterializedMaterialized viewEventually consistent~1msUI display, reporting
public enum BalanceConsistency
        {
            Strong,      // Writer: strongest consistency
            Session,     // Read-your-writes guarantee
            Eventual     // Materialized view
        }

        public class BalanceService
        {
            private readonly NpgsqlDataSource _writeDb;
            private readonly NpgsqlDataSource _readDb;
    
            public async Task<decimal> GetBalanceAsync(Guid accountId, 
                BalanceConsistency consistency, CancellationToken ct)
            {
                return consistency switch
                {
                    BalanceConsistency.Strong => await GetStrongBalanceAsync(accountId, ct),
                    BalanceConsistency.Session => await GetSessionBalanceAsync(accountId, ct),
                    BalanceConsistency.Eventual => await GetEventualBalanceAsync(accountId, ct),
                    _ => throw new ArgumentOutOfRangeException(nameof(consistency), consistency, null)
                };
            }
    
            private async Task<decimal> GetStrongBalanceAsync(Guid accountId, CancellationToken ct)
            {
                using var conn = await _writeDb.OpenConnectionAsync(ct);
                return await conn.QueryFirstOrDefaultAsync<decimal>(
                    """
                    SELECT COALESCE(SUM(amount * CASE WHEN is_debit = true THEN -1 ELSE 1 END), 0)
                    FROM journal_postings jp
                    JOIN journal_entries je ON jp.entry_id = je.id
                    WHERE jp.account_id = @Id AND je.status = 'POSTED'
                    """,
                    new { Id = accountId }, ct: ct);
            }
    
            private async Task<decimal> GetSessionBalanceAsync(Guid accountId, 
                CancellationToken ct)
            {
                // Use session-level consistency: read from writer after last write
                // Implementation depends on connection pooling strategy
                return await GetStrongBalanceAsync(accountId, ct);
            }
    
            private async Task<decimal> GetEventualBalanceAsync(Guid accountId, 
                CancellationToken ct)
            {
                using var conn = await _readDb.OpenConnectionAsync(ct);
                return await conn.QueryFirstOrDefaultAsync<decimal>(
                    "SELECT balance FROM account_balances WHERE account_id = @Id",
                    new { Id = accountId }, ct: ct);
            }
        }

Reconciliation Pipeline

Automated Reconciliation

public class ReconciliationService
        {
            private readonly NpgsqlDataSource _dataSource;
            private readonly IEventBus _eventBus;
            private readonly ILogger<ReconciliationService> _logger;
    
            public ReconciliationService(NpgsqlDataSource dataSource, IEventBus eventBus,
                ILogger<ReconciliationService> logger)
            {
                _dataSource = dataSource;
                _eventBus = eventBus;
                _logger = logger;
            }
    
            public async Task<ReconciliationReport> RunAsync(CancellationToken ct)
            {
                var discrepancies = new List<Discrepancy>();
        
                using var conn = await _dataSource.OpenConnectionAsync(ct);
        
                // Get all active accounts
                var accounts = await conn.QueryAsync<AccountInfo>(
                    "SELECT id, currency FROM accounts WHERE status = 'ACTIVE'", ct: ct);
        
                foreach (var account in accounts)
                {
                    // Derived balance from journal entries (source of truth)
                    var derivedBalance = await conn.QueryFirstOrDefaultAsync<decimal>(
                        """
                        SELECT COALESCE(SUM(jp.amount * CASE WHEN jp.is_debit = true THEN -1 ELSE 1 END), 0)
                        FROM journal_postings jp
                        JOIN journal_entries je ON jp.entry_id = je.id
                        WHERE jp.account_id = @Id AND je.status = 'POSTED'
                        """,
                        new { Id = account.Id }, ct: ct);
            
                    // Materialized balance (cached, should match)
                    var materializedBalance = await conn.QueryFirstOrDefaultAsync<decimal?>(
                        "SELECT balance FROM account_balances WHERE account_id = @Id",
                        new { Id = account.Id }, ct: ct);
            
                    var materialized = materializedBalance.GetValueOrDefault();
            
                    // Compare with tolerance
                    if (Math.Abs(derivedBalance - materialized) > decimal.Epsilon)
                    {
                        var discrepancy = new Discrepancy(
                            AccountId: account.Id,
                            Currency: account.Currency,
                            DerivedBalance: derivedBalance,
                            MaterializedBalance: materialized,
                            Difference: derivedBalance - materialized,
                            DetectedAt: DateTimeOffset.UtcNow
                        );
                
                        discrepancies.Add(discrepancy);
                
                        _logger.LogWarning(
                            "Reconciliation discrepancy: Account={AccountId} Currency={Currency} " +
                            "Derived={Derived} Materialized={Materialized} Diff={Diff}",
                            account.Id, account.Currency, derivedBalance, materialized, 
                            discrepancy.Difference);
                
                        // Publish reconciliation alert
                        await _eventBus.PublishAsync(new ReconciliationAlertEvent(discrepancy), ct);
                    }
                }
        
                var report = new ReconciliationReport(
                    RanAt: DateTimeOffset.UtcNow,
                    AccountsScanned: accounts.Count,
                    Discrepancies: discrepancies);
        
                _logger.LogInformation(
                    "Reconciliation completed: {Accounts} scanned, {Discrepancies} discrepancies",
                    report.AccountsScanned, report.Discrepancies.Count);
        
                return report;
            }
        }

        public record AccountInfo(Guid Id, string Currency);
        public record Discrepancy(
            Guid AccountId,
            string Currency,
            decimal DerivedBalance,
            decimal MaterializedBalance,
            decimal Difference,
            DateTimeOffset DetectedAt);

        public record ReconciliationReport(
            DateTimeOffset RanAt,
            int AccountsScanned,
            IReadOnlyList<Discrepancy> Discrepancies);

        public record ReconciliationAlertEvent(Discrepancy Discrepancy);

Ledger Architecture: Сравнение реализаций

TigerBeetle

TigerBeetle — high-performance financial accounting database:

  • Strict serializability через replicated state machine
  • Fixed-size data structures для performance
  • Cryptographic checksums для tamper detection
  • Leader-based timestamping (no NTP dependency)
  • Synchronous replication

PostgreSQL (custom ledger)

PostgreSQL с custom schema — наиболее распространённый подход:

  • ACID транзакции, SERIALIZABLE isolation
  • CHECK constraints для double-entry
  • Materialized views для balances
  • Outbox pattern для events
  • Full control над schema и constraints

CockroachDB

CockroachDB для multi-region ledger:

  • Distributed SQL с strong consistency
  • Automatic replication и rebalancing
  • Raft consensus для distributed transactions
  • Global transactions без sharding logic

Выбор реализации

КритерийPostgreSQLTigerBeetleCockroachDB
Strong consistency
Double-entry enforcement✅ CHECK✅ built-in✅ CHECK
Multi-region⚠️ manual⚠️ limited✅ native
Performance10k-50k TPS100k+ TPS10k-30k TPS
Operational complexityНизкаяСредняяВысокая
EcosystemMatureEmergingMature
Best forMost fintechsHigh-throughputMulti-region

Checklist для Ledger Architecture

  • [ ] Double-entry enforced на уровне БД (CHECK constraint)
  • [ ] Append-only journal: zero UPDATE/DELETE
  • [ ] Idempotency keys с UNIQUE constraint
  • [ ] SERIALIZABLE isolation для posting
  • [ ] Outbox pattern для event publishing
  • [ ] Materialized balances с automatic refresh
  • [ ] Reconciliation pipeline: automated + alerting
  • [ ] Audit log: immutable, tamper-evident
  • [ ] Balance types: Strong / Session / Eventual
  • [ ] CAP boundary: CP для core ledger, AP для periphery

Дополнительные материалы

  • TigerBeetle Design: https://github.com/tigerbeetledb/tigerbeetle/blob/main/docs/DESIGN.md
  • Trio, "Payment Ledger Architecture": https://trio.dev/payment-ledger-architecture-fintech/
  • Gothar, "ACID Is a Contract, Not a Religion": https://gothar.com/en/insights/acid-ledger-correctness-2026
  • Chronicle Ledger (Event Sourcing + CQRS): https://github.com/Kimosabey/chronicle-ledger
  • AWS Aurora DSQL for Financial Transactions: https://aws.amazon.com/blogs/database/amazon-aurora-dsql-for-global-scale-financial-transactions/

Практика


Регуляторные требования и compliance

Введение

Fintech компании работают в условиях строгого регуляторного контроля. Compliance — не опция, а architectural constraint, который должен быть заложен в архитектуру с day one.


Ключевые регуляторные стандарты

PCI DSS (Payment Card Industry Data Security Standard)

Требования для систем, работающих с платёжными картами:

RequirementОписаниеArchitectural Impact
1Firewall для защиты cardholder dataNetwork segmentation, VPC isolation
2No default passwordsCredential management, secrets rotation
3Protect stored cardholder dataEncryption at rest, tokenization
4Encrypt transmission of cardholder dataTLS 1.2+, mutual TLS
5Antivirus protectionEndpoint security
6Secure systems and applicationsSecure SDLC, code scanning
7Restrict access to cardholder dataRBAC, least privilege
8Identify and authenticate accessMFA, session management
9Physical access controlsData center security
10Track and monitor accessAudit logs, monitoring
11Test security systemsPenetration testing, vulnerability scanning
-- PCI DSS: Encrypting stored cardholder data
        -- NEVER store CVV2/CVC2
        -- PAN (Primary Account Number) must be masked or truncated
        ALTER TABLE payment_cards ADD COLUMN pan_hash TEXT NOT NULL;
        ALTER TABLE payment_cards ADD COLUMN pan_last4 CHAR(4) NOT NULL;
        ALTER TABLE payment_cards ADD COLUMN pan_encrypted BYTEA;

        -- Example: PAN hashing (PCI DSS Requirement 3.4)
        -- Use strong one-way hash (AES-256 encryption with key rotation)
        CREATE OR REPLACE FUNCTION hash_pan(pan_text TEXT, key_id INTEGER)
        RETURNS TEXT AS $$
        BEGIN
            -- Encrypt PAN with AES-256
            RETURN encode(
                pgp_sym_encrypt(pan_text, get_encryption_key(key_id)),
                'hex'
            );
        END;
        $$ LANGUAGE plpgsql SECURITY DEFINER;

        -- Mask PAN for display (show only last 4 digits)
        CREATE OR REPLACE FUNCTION mask_pan(pan_text TEXT)
        RETURNS TEXT AS $$
        BEGIN
            RETURN '****' || right(pan_text, 4);
        END;
        $$ LANGUAGE plpgsql;

        -- Tokenization for PCI compliance
        CREATE TABLE payment_tokens (
            token_id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
            pan_hash TEXT NOT NULL UNIQUE,
            token_value VARCHAR(255) NOT NULL UNIQUE,
            customer_id UUID NOT NULL,
            card_network VARCHAR(20) NOT NULL,
            last4 CHAR(4) NOT NULL,
            expiry_month INTEGER NOT NULL,
            expiry_year INTEGER NOT NULL,
            is_active BOOLEAN NOT NULL DEFAULT true,
            created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
            expires_at TIMESTAMPTZ NOT NULL
        );

        -- PCI DSS: No CVV2 storage
        -- CVV2 is collected at transaction time only, never stored
        -- Table: payment_cards should NOT have cvv2 column

GDPR (General Data Protection Regulation)

Для fintech с EU customers:

ТребованиеОписаниеImplementation
Right to accessUser can request all personal dataData export API
Right to erasureUser can request data deletionAnonymization pipeline
Data portabilityUser can export data in structured formatCSV/JSON export
Consent managementExplicit consent for data processingConsent tracking table
Data minimizationCollect only necessary dataSchema design review
Purpose limitationUse data only for stated purposeData classification
-- GDPR: Consent tracking
        CREATE TABLE user_consents (
            id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
            user_id UUID NOT NULL,
            consent_type VARCHAR(100) NOT NULL,  -- 'MARKETING', 'DATA_PROCESSING', 'THIRD_PARTY_SHARING'
            consent_given BOOLEAN NOT NULL,
            consent_text TEXT NOT NULL,           -- version of consent text
            ip_address INET,
            user_agent TEXT,
            granted_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
            withdrawn_at TIMESTAMPTZ,
            CONSTRAINT chk_withdrawn CHECK (withdrawn_at IS NULL OR granted_at < withdrawn_at)
        );

        CREATE INDEX idx_consents_user ON user_consents(user_id, consent_type);

        -- GDPR: Right to erasure (anonymization)
        CREATE OR REPLACE FUNCTION anonymize_user_data(target_user_id UUID)
        RETURNS VOID AS $$
        DECLARE
            consent_record user_consents%ROWTYPE;
        BEGIN
            -- Anonymize personal data in consents
            UPDATE user_consents 
            SET user_id = gen_random_uuid(),  -- unlink from identity
                ip_address = '0.0.0.0',
                user_agent = NULL,
                withdrawn_at = NOW()
            WHERE user_id = target_user_id;
    
            -- Anonymize audit log (keep for compliance, remove PII)
            UPDATE audit_log
            SET actor_id = gen_random_uuid(),
                metadata = jsonb_set(metadata, '{pii}', '"anonymized"'::jsonb)
            WHERE entity_id = target_user_id AND entity_type = 'USER';
        END;
        $$ LANGUAGE plpgsql SECURITY DEFINER;

Data Residency

Требование хранить данные клиентов в определённой юрисдикции:

// Data residency routing
        public class DataResidencyRouter
        {
            private readonly Dictionary<string, string> _customerRegionMap;
            private readonly Dictionary<string, NpgsqlDataSource> _regionDataSources;
    
            public DataResidencyRouter(
                Dictionary<string, string> customerRegionMap,
                Dictionary<string, NpgsqlDataSource> regionDataSources)
            {
                _customerRegionMap = customerRegionMap;
                _regionDataSources = regionDataSources;
            }
    
            public NpgsqlDataSource GetDataSourceForCustomer(Guid customerId)
            {
                var region = _customerRegionMap[customerId.ToString()];
                return _regionDataSources[region];
            }
    
            // Example regions: 'EU', 'US', 'APAC'
            // Each region has its own database instance
        }

        // Multi-region PostgreSQL configuration
        var regionConfigurations = new Dictionary<string, string>
        {
            ["EU"] = "Host=eu-postgres.internal;Database=ledger_eu;...",
            ["US"] = "Host=us-postgres.internal;Database=ledger_us;...",
            ["APAC"] = "Host=apac-postgres.internal;Database=ledger_apac;..."
        };

Audit Trail Requirements

Regulatory audit requirements

RegulationAudit RequirementRetention Period
PCI DSSTrack all access to cardholder dataMinimum 1 year (3 years available)
GDPRRecord of processing activitiesDuration + 3 years
SOXFinancial reporting integrity7 years
CFTC Rule 1.73MVCC historical data retention5 years (5621 days)
BCBS 239Risk data aggregationOngoing
MiFID IITransaction reporting5 years
AML/KYCCustomer due diligence5 years after relationship ends

Tamper-Evident Audit Log

-- Cryptographic hash chain for audit log
        CREATE TABLE audit_log (
            id BIGSERIAL PRIMARY KEY,
            event_type VARCHAR(100) NOT NULL,
            actor_id UUID,
            entity_type VARCHAR(50) NOT NULL,
            entity_id UUID,
            action VARCHAR(50) NOT NULL,
            old_state JSONB,
            new_state JSONB,
            metadata JSONB,
            ip_address INET,
            user_agent TEXT,
            hash TEXT NOT NULL,
            prev_hash TEXT NOT NULL,
            created_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
        );

        -- Hash chain trigger
        CREATE OR REPLACE FUNCTION compute_audit_hash()
        RETURNS TRIGGER AS $$
        DECLARE
            prev_hash_val TEXT;
            data_to_hash TEXT;
        BEGIN
            -- Get previous hash (NULL for first record = genesis)
            SELECT COALESCE(hash, 'genesis') INTO prev_hash_val 
            FROM audit_log ORDER BY id DESC LIMIT 1;
    
            -- Create hash of record content + previous hash
            data_to_hash := NEW.id::TEXT || NEW.event_type || NEW.actor_id::TEXT ||
                            NEW.action || NEW.old_state::TEXT || NEW.new_state::TEXT || 
                            NEW.created_at::TEXT || prev_hash_val;
    
            NEW.hash := encode(sha256(data_to_hash::bytea), 'hex');
            NEW.prev_hash := prev_hash_val;
    
            RETURN NEW;
        END;
        $$ LANGUAGE plpgsql;

        CREATE TRIGGER trg_audit_hash
            BEFORE INSERT ON audit_log
            FOR EACH ROW EXECUTE FUNCTION compute_audit_hash();

        -- Verification function: check entire hash chain
        CREATE OR REPLACE FUNCTION verify_audit_chain()
        RETURNS TABLE(id BIGINT, valid BOOLEAN, error TEXT) AS $$
        DECLARE
            prev_hash_val TEXT := 'genesis';
            current_record audit_log%ROWTYPE;
        BEGIN
            FOR current_record IN 
                SELECT * FROM audit_log ORDER BY id
            LOOP
                -- Recompute hash
                data_to_hash := current_record.id::TEXT || current_record.event_type || 
                                current_record.actor_id::TEXT || current_record.action ||
                                current_record.old_state::TEXT || current_record.new_state::TEXT ||
                                current_record.created_at::TEXT || prev_hash_val;
        
                computed_hash := encode(sha256(data_to_hash::bytea), 'hex');
        
                IF computed_hash != current_record.hash THEN
                    RETURN NEXT SELECT current_record.id, false, 
                        'Hash mismatch at id ' || current_record.id;
                ELSE
                    prev_hash_val := current_record.hash;
                    RETURN NEXT SELECT current_record.id, true, NULL;
                END IF;
            END LOOP;
        END;
        $$ LANGUAGE plpgsql;

Compliance by Design

Privacy by Design

// Data classification
        public enum DataClassification
        {
            Public,           // No restrictions
            Internal,         // Internal use only
            Confidential,     // Access restricted
            Restricted,       // PCI DSS / GDPR regulated
            HighlyRestricted  // Financial secrets, encryption keys
        }

        // Data classification annotation
        public class DataClassifiedAttribute : Attribute
        {
            public DataClassification Classification { get; }
            public string[] AllowedRegions { get; }
    
            public DataClassifiedAttribute(DataClassification classification, 
                params string[] allowedRegions)
            {
                Classification = classification;
                AllowedRegions = allowedRegions;
            }
        }

        // Example: Customer PII
        public class CustomerData
        {
            [DataClassified(DataClassification.Restricted, "EU")]
            public string FullName { get; set; } = string.Empty;
    
            [DataClassified(DataClassification.Restricted, "EU", "US")]
            public string Address { get; set; } = string.Empty;
    
            [DataClassified(DataClassification.HighlyRestricted)]
            public string Ssn { get; set; } = string.Empty;
    
            [DataClassified(DataClassification.Restricted)]
            public List<PaymentCard> Cards { get; set; } = new();
        }

Compliance Monitoring

public class ComplianceMonitor
        {
            private readonly NpgsqlDataSource _dataSource;
            private readonly IEventBus _eventBus;
            private readonly ILogger<ComplianceMonitor> _logger;
    
            public async Task RunComplianceChecksAsync(CancellationToken ct)
            {
                await CheckPciComplianceAsync(ct);
                await CheckGDPRComplianceAsync(ct);
                await CheckAuditLogIntegrityAsync(ct);
                await CheckDataRetentionAsync(ct);
            }
    
            private async Task CheckPciComplianceAsync(CancellationToken ct)
            {
                using var conn = await _dataSource.OpenConnectionAsync(ct);
        
                // Check: no CVV2 in database
                var cvv2Columns = await conn.QueryFirstOrDefaultAsync<int>(
                    """
                    SELECT COUNT(*) FROM information_schema.columns 
                    WHERE column_name IN ('cvv2', 'cvc2', 'cvv') 
                    AND table_name = 'payment_cards'
                    """, ct: ct);
        
                if (cvv2Columns > 0)
                {
                    _logger.Critical("PCI DSS VIOLATION: CVV2 columns found in payment_cards table");
                    await _eventBus.PublishAsync(new ComplianceViolationEvent(
                        "PCI_DSS_VIOLATION_CVV2", "Critical", 
                        "CVV2 columns detected in payment_cards table"));
                }
        
                // Check: PAN encryption enabled
                var unencryptedPan = await conn.QueryFirstOrDefaultAsync<int>(
                    """
                    SELECT COUNT(*) FROM payment_cards 
                    WHERE pan_encrypted IS NULL AND pan_hash IS NULL
                    """, ct: ct);
        
                if (unencryptedPan > 0)
                {
                    _logger.LogWarning("PCI DSS WARNING: {Count} unencrypted PANs found", unencryptedPan);
                    await _eventBus.PublishAsync(new ComplianceViolationEvent(
                        "PCI_DSS_WARNING_UNENCRYPTED_PAN", "Warning",
                        $"{unencryptedPan} unencrypted PANs in database"));
                }
            }
    
            private async Task CheckAuditLogIntegrityAsync(CancellationToken ct)
            {
                using var conn = await _dataSource.OpenConnectionAsync(ct);
        
                // Verify hash chain
                var invalidRecords = await conn.QueryAsync<(long Id, bool Valid, string Error)>(
                    "SELECT * FROM verify_audit_chain() WHERE valid = false", ct: ct);
        
                if (invalidRecords.Any())
                {
                    _logger.Critical("AUDIT LOG INTEGRITY VIOLATION: {Count} invalid records", 
                        invalidRecords.Count());
                    await _eventBus.PublishAsync(new ComplianceViolationEvent(
                        "AUDIT_LOG_INTEGRITY_VIOLATION", "Critical",
                        $"{invalidRecords.Count()} tampered audit log entries detected"));
                }
            }
    
            private async Task CheckDataRetentionAsync(CancellationToken ct)
            {
                using var conn = await _dataSource.OpenConnectionAsync(ct);
        
                // Check: CFTC Rule 1.73 — MVCC historical data ≥ 7 years
                var oldData = await conn.QueryFirstOrDefaultAsync<int>(
                    """
                    SELECT COUNT(*) FROM event_store 
                    WHERE created_at < NOW() - INTERVAL '7 years'
                    AND NOT archived
                    """, ct: ct);
        
                if (oldData > 0)
                {
                    _logger.LogWarning("Data retention: {Count} records older than 7 years not archived", 
                        oldData);
                }
            }
        }

        public record ComplianceViolationEvent(
            string ViolationType,
            string Severity,
            string Description);

Checklist для Regulatory Compliance

  • [ ] PCI DSS: no CVV2 storage, PAN encryption/tokenization
  • [ ] GDPR: consent management, right to erasure, data portability
  • [ ] Data residency: region-specific database instances
  • [ ] Audit log: tamper-evident с cryptographic hash chain
  • [ ] Data retention: automated archival per regulatory requirements
  • [ ] Compliance monitoring: automated checks + alerting
  • [ ] RBAC: least privilege access to financial data
  • [ ] MFA: multi-factor authentication для access
  • [ ] Encryption: TLS 1.2+ для transmission, AES-256 для rest
  • [ ] Penetration testing: regular security assessments

Дополнительные материалы

  • PCI DSS v4.0: https://www.pcisecuritystandards.org/
  • GDPR: https://gdpr.eu/
  • CFTC Rule 1.73: https://www.law.cornell.edu/cfr/text/17/1.73
  • BCBS 239: https://www.bis.org/bcbs/publ/d377.htm

Практика


Consistency tiering и hybrid models

Введение

Consistency tiering — подход, при котором система использует разные уровни согласованности для разных частей данных или разных операций. Это позволяет оптимизировать performance без компромиссов в critical path'ах.


Hybrid Consistency Architecture

Tiered consistency model

┌─────────────────────────────────────────────────────────────────┐
        │  Tier 0: Strong Consistency (CP)                               │
        │                                                                  │
        │  Ledger journal entries                                          │
        │  Balance validation for transfers                                │
        │  Idempotency keys                                                │
        │                                                                  │
        │  Database: PostgreSQL / CockroachDB                              │
        │  Isolation: SERIALIZABLE                                         │
        │  Latency: 5-50ms                                                 │
        │                                                                  │
        │  ────────────────────────────────────────────────────────────   │
        │                                                                  │
        │  Tier 1: Session Consistency                                     │
        │                                                                  │
        │  Read-your-writes guarantee                                      │
        │  Balance check after transfer                                    │
        │                                                                  │
        │  Mechanism: Session-affinity routing or writer-read              │
        │  Latency: 2-10ms                                                 │
        │                                                                  │
        │  ────────────────────────────────────────────────────────────   │
        │                                                                  │
        │  Tier 2: Eventual Consistency (AP)                               │
        │                                                                  │
        │  Materialized balances                                           │
        │  Account history (UI)                                            │
        │  Reporting dashboard                                             │
        │                                                                  │
        │  Database: PostgreSQL materialized views / Redis                 │
        │  Refresh: Event-driven (outbox → event bus → processor)          │
        │  Latency: <1ms (cached)                                          │
        │  Staleness: 10-500ms                                             │
        │                                                                  │
        │  ────────────────────────────────────────────────────────────   │
        │                                                                  │
        │  Tier 3: Best-Effort (Analytics)                                 │
        │                                                                  │
        │  Aggregates, ML features, fraud detection                        │
        │  Data warehouse (Snowflake, BigQuery)                            │
        │                                                                  │
        │  Refresh: Batch (hourly/daily)                                   │
        │  Latency: N/A (analytical queries)                               │
        └─────────────────────────────────────────────────────────────────┘

Implementation: Consistency Router

public enum ConsistencyTier
        {
            Strong,      // Tier 0: ledger, balance validation
            Session,     // Tier 1: read-your-writes
            Eventual,    // Tier 2: materialized views
            Analytics    // Tier 3: data warehouse
        }

        public class ConsistencyRouter
        {
            private readonly NpgsqlDataSource _writeDb;
            private readonly NpgsqlDataSource _readDb;
            private readonly RedisConnection _cache;
            private readonly AnalyticsDataSource _analytics;
    
            public async Task<T> ExecuteWithConsistencyAsync<T>(
                ConsistencyTier tier,
                Func<NpgsqlConnection, Task<T>> strongOperation,
                Func<NpgsqlConnection, Task<T>> eventualOperation,
                Func<Task<T>> analyticsOperation,
                CancellationToken ct)
            {
                return tier switch
                {
                    ConsistencyTier.Strong => await ExecuteStrongAsync(strongOperation, ct),
                    ConsistencyTier.Session => await ExecuteSessionAsync(strongOperation, ct),
                    ConsistencyTier.Eventual => await ExecuteEventualAsync(eventualOperation, ct),
                    ConsistencyTier.Analytics => await analyticsOperation(),
                    _ => throw new ArgumentOutOfRangeException(nameof(tier), tier, null)
                };
            }
    
            private async Task<T> ExecuteStrongAsync<T>(
                Func<NpgsqlConnection, Task<T>> operation, CancellationToken ct)
            {
                using var conn = await _writeDb.OpenConnectionAsync(ct);
                return await operation(conn);
            }
    
            private async Task<T> ExecuteSessionAsync<T>(
                Func<NpgsqlConnection, Task<T>> operation, CancellationToken ct)
            {
                // Session affinity: route to same node that handled the write
                // Implementation depends on connection pooling strategy
                return await ExecuteStrongAsync(operation, ct);
            }
    
            private async Task<T> ExecuteEventualAsync<T>(
                Func<NpgsqlConnection, Task<T>> operation, CancellationToken ct)
            {
                // Try cache first
                var cached = await _cache.GetAsync<T>("balance");
                if (cached != null)
                    return cached;
        
                // Fallback to materialized view
                using var conn = await _readDb.OpenConnectionAsync(ct);
                return await operation(conn);
            }
        }

Quorum-Based Checkpoints

Quorum checkpoints — периодическая синхронизация состояния между нодами через quorum consensus.

Quorum Checkpoint Pattern

Каждые N секунд (например, 30s):

        1. Каждая нода создаёт snapshot своего состояния
        2. Snapshots отправляются quorum нод (R > N/2)
        3. Quorum ноды сравнивают snapshots
        4. При mismatch — trigger reconciliation
        5. При match — checkpoint committed
public class QuorumCheckpointService
        {
            private readonly List<CheckpointNode> _nodes;
            private readonly NpgsqlDataSource _dataSource;
            private readonly ILogger<QuorumCheckpointService> _logger;
            private readonly TimeSpan _checkpointInterval = TimeSpan.FromSeconds(30);
    
            public async Task RunCheckpointLoopAsync(CancellationToken ct)
            {
                while (!ct.IsCancellationRequested)
                {
                    try
                    {
                        await RunCheckpointAsync(ct);
                    }
                    catch (Exception ex)
                    {
                        _logger.LogError(ex, "Checkpoint failed");
                    }
            
                    await Task.Delay(_checkpointInterval, ct);
                }
            }
    
            private async Task RunCheckpointAsync(CancellationToken ct)
            {
                // 1. Create local snapshot
                var localSnapshot = await CreateSnapshotAsync(ct);
        
                // 2. Send to quorum nodes
                var responses = new List<(string NodeId, bool Accepted, string Hash)>();
        
                foreach (var node in _nodes.Where(n => n.Id != LocalNodeId))
                {
                    try
                    {
                        var response = await node.SendSnapshotAsync(localSnapshot, ct);
                        responses.Add((node.Id, response.Accepted, response.Hash));
                    }
                    catch
                    {
                        // Node unreachable — skip
                    }
                }
        
                // 3. Check quorum (R + W > N)
                var acceptedCount = responses.Count(r => r.Accepted);
                var quorumRequired = (_nodes.Count / 2) + 1;
        
                if (acceptedCount >= quorumRequired)
                {
                    // 4. Verify hash consistency
                    var hashes = responses.Where(r => r.Accepted).Select(r => r.Hash).ToList();
                    var majorityHash = GetMajorityHash(hashes);
            
                    if (majorityHash != localSnapshot.Hash)
                    {
                        // Mismatch detected — trigger reconciliation
                        await TriggerReconciliationAsync(localSnapshot, ct);
                    }
                    else
                    {
                        _logger.LogInformation("Checkpoint committed at {Timestamp}", 
                            localSnapshot.Timestamp);
                    }
                }
                else
                {
                    _logger.LogWarning("Checkpoint failed: {Accepted}/{Required} nodes accepted",
                        acceptedCount, quorumRequired);
                }
            }
    
            private async Task<Snapshot> CreateSnapshotAsync(CancellationToken ct)
            {
                using var conn = await _dataSource.OpenConnectionAsync(ct);
        
                // Capture consistent snapshot of account balances
                var balances = await conn.QueryAsync<(Guid AccountId, decimal Balance)>(
                    "SELECT account_id, balance FROM account_balances", ct: ct);
        
                var hash = ComputeSnapshotHash(balances);
        
                return new Snapshot(
                    Timestamp: DateTimeOffset.UtcNow,
                    Balances: balances.ToList(),
                    Hash: hash
                );
            }
    
            private string ComputeSnapshotHash(IEnumerable<(Guid, decimal)> balances)
            {
                var data = string.Join("|", balances.OrderBy(b => b.Item1)
                    .Select(b => $"{b.Item1}:{b.Item2}"));
        
                using var sha256 = System.Security.Cryptography.SHA256.Create();
                var hashBytes = sha256.ComputeHash(Encoding.UTF8.GetBytes(data));
                return Convert.ToHexString(hashBytes);
            }
    
            private string GetMajorityHash(List<string> hashes)
            {
                var counts = hashes.GroupBy(h => h)
                    .OrderByDescending(g => g.Count())
                    .ToList();
        
                return counts.First().Key;
            }
    
            private Task TriggerReconciliationAsync(Snapshot snapshot, CancellationToken ct)
            {
                _logger.LogWarning("Hash mismatch detected at {Timestamp}, triggering reconciliation",
                    snapshot.Timestamp);
                // Trigger full reconciliation
                return Task.CompletedTask;
            }
        }

        public record Snapshot(
            DateTimeOffset Timestamp,
            List<(Guid AccountId, decimal Balance)> Balances,
            string Hash);

Temporal Sequence Barrier

Temporal Sequence Barrier — consistency model для asynchronous high-throughput ledger systems, которыйCombines logical vector clocks с epoch-based orchestration для обеспечения causal consistency.

Vector Clocks для causal ordering

public class VectorClock
        {
            private readonly Dictionary<string, long> _clocks = new();
    
            public void Increment(string nodeId)
            {
                _clocks[nodeId] = _clocks.GetValueOrDefault(nodeId, 0) + 1;
            }
    
            public void Merge(VectorClock other)
            {
                foreach (var (nodeId, timestamp) in other._clocks)
                {
                    _clocks[nodeId] = Math.Max(_clocks.GetValueOrDefault(nodeId, 0), timestamp);
                }
            }
    
            public bool HappensBefore(VectorClock other)
            {
                bool atLeastOneLess = false;
                foreach (var nodeId in _clocks.Keys.Union(other._clocks.Keys))
                {
                    var myTime = _clocks.GetValueOrDefault(nodeId, 0);
                    var otherTime = other._clocks.GetValueOrDefault(nodeId, 0);
            
                    if (myTime > otherTime)
                        return false;
                    if (myTime < otherTime)
                        atLeastOneLess = true;
                }
                return atLeastOneLess;
            }
    
            public bool ConcurrentsWith(VectorClock other)
            {
                return !HappensBefore(other) && !other.HappensBefore(this);
            }
        }

        // Event with vector clock for causal ordering
        public record CausallyOrderedEvent(
            Guid Id,
            string EventType,
            string Payload,
            VectorClock VectorClock,
            Guid? DependsOn,  // explicit dependency
            DateTimeOffset LogicalTime,
            string SourceRegion);

Epoch-based orchestration

public class EpochOrchestrator
        {
            private readonly TimeSpan _epochDuration = TimeSpan.FromSeconds(5);
            private long _currentEpoch = 0;
            private readonly Dictionary<string, Queue<CausallyOrderedEvent>> _regionQueues = new();
    
            public async Task ProcessEventsAsync(IEnumerable<CausallyOrderedEvent> events,
                CancellationToken ct)
            {
                // Group events by epoch
                var epochGroups = events.GroupBy(e => 
                    (long)(e.LogicalTime.ToUnixTimeMilliseconds() / _epochDuration.TotalMilliseconds));
        
                foreach (var epochGroup in epochGroups.OrderBy(g => g.Key))
                {
                    _currentEpoch = epochGroup.Key;
            
                    // Process events within epoch in causal order
                    var orderedEvents = TopologicalSortByVectorClock(epochGroup.ToList());
            
                    foreach (var evt in orderedEvents)
                    {
                        await ProcessEventAsync(evt, ct);
                    }
            
                    // Epoch checkpoint: ensure all dependencies resolved
                    await CheckEpochCompletenessAsync(_currentEpoch, ct);
                }
            }
    
            private List<CausallyOrderedEvent> TopologicalSortByVectorClock(
                List<CausallyOrderedEvent> events)
            {
                // Sort events by vector clock (causal order)
                return events.OrderBy(e => SerializeVectorClock(e.VectorClock)).ToList();
            }
    
            private string SerializeVectorClock(VectorClock clock)
            {
                return string.Join(",", clock.GetClocks()
                    .OrderBy(kvp => kvp.Key)
                    .Select(kvp => $"{kvp.Key}:{kvp.Value}"));
            }
    
            private Task CheckEpochCompletenessAsync(long epoch, CancellationToken ct)
            {
                // Verify all causal dependencies for this epoch are resolved
                return Task.CompletedTask;
            }
        }

Dynamic CAP в Fintech

Dynamic CAP — адаптация CAP позиции в зависимости от:

  • Суммы транзакции
  • Типа операции
  • Риска
  • Юрисдикции

Dynamic CAP Implementation

public record CapDecision(
            CapMode Mode,
            string Reason,
            decimal? Threshold,
            string? RiskFactor);

        public enum CapMode
        {
            Consistency,   // CP: strong consistency required
            Availability   // AP: availability prioritized
        }

        public class DynamicCapEngine
        {
            private const decimal HighValueThreshold = 10_000m;
            private const decimal CriticalValueThreshold = 100_000m;
            private const decimal RegulatoryThreshold = 10_001m;  // CTR threshold (US)
    
            public CapDecision DecideCapMode(
                decimal amount,
                string currency,
                string transactionType,
                string sourceRegion,
                string destRegion,
                decimal? customerRiskScore,
                bool isSettlement)
            {
                // Rule 1: Settlement transactions — always CP
                if (isSettlement)
                    return new CapDecision(CapMode.Consistency, 
                        "Settlement transactions require strong consistency",
                        null, "settlement");
        
                // Rule 2: Cross-jurisdiction — always CP
                if (sourceRegion != destRegion)
                    return new CapDecision(CapMode.Consistency,
                        $"Cross-jurisdiction transfer ({sourceRegion} → {destRegion}) requires consistency",
                        null, "data_residency");
        
                // Rule 3: High-value — CP
                if (amount >= CriticalValueThreshold)
                    return new CapDecision(CapMode.Consistency,
                        $"Critical value: {amount} {currency}",
                        CriticalValueThreshold, "high_value");
        
                // Rule 4: Regulatory threshold (CTR) — CP
                if (amount > RegulatoryThreshold)
                    return new CapDecision(CapMode.Consistency,
                        $"Regulatory threshold exceeded: {amount} {currency} > {RegulatoryThreshold}",
                        RegulatoryThreshold, "regulatory");
        
                // Rule 5: High risk customer — CP
                if (customerRiskScore.HasValue && customerRiskScore.Value > 0.7m)
                    return new CapDecision(CapMode.Consistency,
                        $"High risk customer: score {customerRiskScore}",
                        null, "risk_score");
        
                // Rule 6: Balance check (read-only) — AP
                if (transactionType == "balance_check")
                    return new CapDecision(CapMode.Availability,
                        "Read-only balance check",
                        null, "read_only");
        
                // Rule 7: Low-value domestic — CP by default (fintech baseline)
                // Even small transactions in fintech default to CP
                return new CapDecision(CapMode.Consistency,
                    $"Standard transaction: {amount} {currency}",
                    amount, "standard");
            }
        }

CAP-aware Transaction Router

public class CapAwareTransactionRouter
        {
            private readonly DynamicCapEngine _capEngine;
            private readonly ILedgerService _ledgerService;     // CP
            private readonly IBalanceCacheService _balanceCache; // AP
            private readonly IKafkaProducer _eventProducer;      // AP
    
            public async Task<TransferResult> ExecuteTransferAsync(
                TransferCommand cmd, CancellationToken ct)
            {
                // Determine CAP mode
                var capDecision = _capEngine.DecideCapMode(
                    cmd.Amount, cmd.Currency, "transfer", 
                    cmd.SourceRegion, cmd.DestinationRegion,
                    cmd.CustomerRiskScore, isSettlement: false);
        
                // Route based on CAP decision
                return capDecision.Mode switch
                {
                    CapMode.Consistency => await ExecuteConsistentAsync(cmd, ct),
                    CapMode.Availability => await ExecuteAvailableAsync(cmd, ct),
                    _ => await ExecuteConsistentAsync(cmd, ct)  // default to CP
                };
            }
    
            private async Task<TransferResult> ExecuteConsistentAsync(
                TransferCommand cmd, CancellationToken ct)
            {
                // CP path: strong consistency through ledger
                return await _ledgerService.ExecuteTransferAsync(cmd, ct);
            }
    
            private async Task<TransferResult> ExecuteAvailableAsync(
                TransferCommand cmd, CancellationToken ct)
            {
                // AP path: accept at edge, process asynchronously
                await _eventProducer.PublishAsync("transfer-events", cmd.IdempotencyKey,
                    JsonSerializer.Serialize(cmd), ct);
        
                return TransferResult.AcceptedAsync;
            }
        }

Consistency Trade-offs Matrix

Decision Framework

┌──────────────────────────────────────────────────────────────┐
        │  Выбор уровня consistency                                    │
        │                                                              │
        │  1. Transaction affects ledger balance?                      │
        │     YES → Tier 0: Strong (SERIALIZABLE)                      │
        │     NO ↓                                                       │
        │                                                              │
        │  2. Read-your-writes required?                               │
        │     YES → Tier 1: Session                                    │
        │     NO ↓                                                       │
        │                                                              │
        │  3. Real-time balance display?                               │
        │     YES → Tier 2: Eventual (materialized)                    │
        │     NO ↓                                                       │
        │                                                              │
        │  4. Analytics / reporting?                                   │
        │     YES → Tier 3: Analytics (batch)                          │
        └──────────────────────────────────────────────────────────────┘

Trade-offs Summary

TierConsistencyLatencyThroughputUse CaseRisk
0 - StrongLinearizable5-50ms10k-50k TPSLedger, balance validationNone
1 - SessionSession2-10ms50k-100k TPSPost-transfer balance checkMinimal
2 - EventualEventual<1ms100k+ TPSUI display, reportingBalance staleness 10-500ms
3 - AnalyticsBest-effortN/A1M+ TPSAnalytics, MLHours of staleness

Checklist для Consistency Tiering

  • [ ] Определены consistency tiers для всех операций
  • [ ] Tier 0 (Strong) для всех ledger operations
  • [ ] Tier 1 (Session) для read-your-writes
  • [ ] Tier 2 (Eventual) для UI и reporting
  • [ ] Tier 3 (Analytics) для data warehouse
  • [ ] Quorum checkpoints для verification
  • [ ] Temporal ordering для async processing
  • [ ] Dynamic CAP: адаптация по сумме/риску/юрисдикции
  • [ ] Reconciliation между tiers
  • [ ] Monitoring staleness между tiers

Дополнительные материалы

  • Gilbert & Lynch, "Brewer's Conjecture and the Feasibility of Consistent, Available, Partition-Tolerant Web Services" (2002)
  • RIG Model: https://www.infoq.com/articles/rig-data-consistent-microservices/
  • Temporal Consistency Models for Financial Data: https://www.ijcesen.com/index.php/ijcesen/article/view/4967
  • Martin Kleppmann, "Designing Data-Intensive Applications" — Chapter 7: Decision-making

Практика


Operational discipline для fintech

Введение

В fintech архитектура — это только половина дела. Operational discipline — практики, процессы и культура, которые делают архитектуру работающей в production. Без operational discipline даже идеально спроектированная система упадёт.

> Ключевая мысль: Architectural choices matter much less than operational discipline. Distributed tracing, structured logging, monitoring, и runbooks — это multiplier для всей архитектуры.


Distributed Tracing

Correlation ID для всех сервисов

// Correlation ID middleware — propagates trace across service boundaries
        public class CorrelationIdMiddleware
        {
            private const string CorrelationIdHeader = "X-Correlation-ID";
            private readonly RequestDelegate _next;
            private readonly ILogger<CorrelationIdMiddleware> _logger;
    
            public CorrelationIdMiddleware(RequestDelegate next, 
                ILogger<CorrelationIdMiddleware> logger)
            {
                _next = next;
                _logger = logger;
            }
    
            public async Task InvokeAsync(HttpContext context, ILogger logger)
            {
                // Extract or generate correlation ID
                var correlationId = context.Request.Headers[CorrelationIdHeader].FirstOrDefault();
        
                if (string.IsNullOrEmpty(correlationId))
                {
                    correlationId = Guid.NewGuid().ToString("N");
                    context.Response.Headers[CorrelationIdHeader] = correlationId;
                }
        
                // Store in DI scope for downstream use
                context.Items["CorrelationId"] = correlationId;
        
                // Add to logging scope
                using logger.BeginScope(new Dictionary<string, object>
                {
                    ["CorrelationId"] = correlationId
                });
        
                var sw = Stopwatch.StartNew();
                try
                {
                    await _next(context);
                }
                finally
                {
                    sw.Stop();
                    logger.LogInformation(
                        "[{CorrelationId}] {Method} {Path} → {StatusCode} ({Elapsed}ms)",
                        correlationId, context.Request.Method, context.Request.Path,
                        context.Response.StatusCode, sw.ElapsedMilliseconds);
                }
            }
        }

        // Serilog enrichment with CorrelationId
        Log.Logger = new LoggerConfiguration()
            .Enrich.WithProperty("CorrelationId", "unknown")
            .Enrich.WithExceptionDetails()
            .Enrich.WithMachineName()
            .WriteTo.Console()
            .WriteTo.Seq("http://seq:5341")
            .CreateLogger();

        // ASP.NET Core: auto-correlation from HttpContext
        app.Use(async (context, next) =>
        {
            var correlationId = context.Items["CorrelationId"]?.ToString() ?? "unknown";
            LogContext.PushProperty("CorrelationId", correlationId, destructure: false);
            await next();
        });

OpenTelemetry Integration

// OpenTelemetry setup for .NET
        builder.Services.AddOpenTelemetry()
            .WithTracing(tracing => tracing
                .AddSource(builder.Environment.ApplicationName)
                .AddAspNetCoreInstrumentation()
                .AddHttpClientInstrumentation()
                .AddNpgsqlInstrumentation()
                .AddKafkaInstrumentation()
                .AddSource("PaymentSaga")
                .AddSource("JournalPosting")
                .AddJaegerExporter(options =>
                {
                    options.Endpoint = new Uri("http://jaeger:14268/api/traces");
                })
            )
            .WithMetrics(metrics => metrics
                .AddAspNetCoreInstrumentation()
                .AddHttpClientInstrumentation()
                .AddRuntimeInstrumentation()
                .AddPrometheusExporter()
            );

        // Custom spans for financial operations
        public class TransferService
        {
            private readonly ILogger<TransferService> _logger;
            private readonly Tracer _tracer;
    
            public TransferService(ILogger<TransferService> logger, ILoggerFactory factory)
            {
                _logger = logger;
                _tracer = factory.CreateTracer("fintech-tracer");
            }
    
            public async Task<TransferResult> ExecuteAsync(TransferCommand cmd, 
                CancellationToken ct)
            {
                using var span = _tracer.StartActiveSpan("transfer.execute");
                span.SetAttribute("source_account", cmd.SourceAccount.ToString());
                span.SetAttribute("destination_account", cmd.DestinationAccount.ToString());
                span.SetAttribute("amount", cmd.Amount.ToString());
                span.SetAttribute("currency", cmd.Currency);
                span.SetAttribute("idempotency_key", cmd.IdempotencyKey);
        
                try
                {
                    var result = await ExecuteTransferInternalAsync(cmd, ct);
            
                    span.SetAttribute("result", result.Success ? "success" : "failed");
                    if (!result.Success)
                        span.SetStatus(Status.Error);
            
                    return result;
                }
                catch (Exception ex)
                {
                    span.RecordException(ex);
                    span.SetStatus(Status.Error);
                    throw;
                }
            }
        }

Structured Logging

Financial-specific log events

// Structured log events for financial operations
        public static class FinancialLogEvents
        {
            // Informational
            public const int JournalEntryPosted = 1001;
            public const int TransferCompleted = 1002;
            public const int BalanceChecked = 1003;
            public const int IdempotencyHit = 1004;
    
            // Warnings
            public const int BalanceDiscrepancy = 2001;
            public const int SagaCompensation = 2002;
            public const int RetryAttempt = 2003;
            public const int SlowQuery = 2004;
    
            // Errors
            public const int TransferFailed = 3001;
            public const int JournalPostingFailed = 3002;
            public const int ReconciliationFailed = 3003;
            public const int OutboxPublishFailed = 3004;
            public const int IdempotencyConflict = 3005;
        }

        public class FinancialLogger
        {
            private readonly ILogger<FinancialLogger> _logger;
    
            public void LogJournalEntryPosted(Guid entryId, string idempotencyKey, 
                int postingCount, decimal totalAmount, string currency, double elapsedMs)
            {
                _logger.Log(FinancialLogEvents.JournalEntryPosted, LogLevel.Information,
                    null,
                    "Journal entry posted: EntryId={EntryId} IdempotencyKey={IdempotencyKey} " +
                    "Postings={PostingCount} Amount={Amount} Currency={Currency} ElapsedMs={ElapsedMs}",
                    entryId, idempotencyKey, postingCount, totalAmount, currency, elapsedMs);
            }
    
            public void LogBalanceDiscrepancy(Guid accountId, string currency,
                decimal derived, decimal materialized, decimal difference)
            {
                _logger.Log(FinancialLogEvents.BalanceDiscrepancy, LogLevel.Warning,
                    null,
                    "Balance discrepancy: Account={AccountId} Currency={Currency} " +
                    "Derived={Derived} Materialized={Materialized} Difference={Difference}",
                    accountId, currency, derived, materialized, difference);
            }
    
            public void LogTransferFailed(Guid? transferId, string reason,
                string sourceAccount, string destAccount, decimal amount, string currency)
            {
                _logger.Log(FinancialLogEvents.TransferFailed, LogLevel.Error,
                    null,
                    "Transfer failed: TransferId={TransferId} Reason={Reason} " +
                    "Source={Source} Dest={Dest} Amount={Amount} Currency={Currency}",
                    transferId, reason, sourceAccount, destAccount, amount, currency);
            }
        }

Monitoring и Alerting

Golden Metrics для Fintech

MetricTypeDescriptionAlert Threshold
ledger_journal_post_latencyHistogramTime to post journal entryp99 > 100ms
ledger_transfer_success_rateCounterTransfer success/failure rate< 99.9%
ledger_balance_discrepancy_countCounterBalance reconciliation mismatches> 0
saga_compensation_rateCounterSaga compensation rate> 1%
outbox_publish_lagGaugeSeconds behind real-time> 30s
idempotency_hit_rateCounterIdempotency cache hit rateinformational
reconciliation_interval_secondsGaugeTime since last reconciliation> 3600s (1h)
audit_log_hash_valid_countCounterValid audit log entriesinformational
audit_log_hash_invalid_countCounterInvalid audit log entries> 0 → CRITICAL
db_connection_pool_activeGaugeActive DB connections> 80% max

Prometheus Metrics

// Custom metrics for ledger
        public class LedgerMetrics
        {
            private readonly Histogram _journalPostLatency;
            private readonly Counter _journalPostsTotal;
            private readonly Counter _journalPostsFailed;
            private readonly Counter _transfersTotal;
            private readonly Counter _transfersFailed;
            private readonly Counter _balanceDiscrepancies;
            private readonly Counter _sagaCompensations;
            private readonly Gauge _outboxLagSeconds;
            private readonly Gauge _reconciliationLastRun;
    
            public LedgerMetrics(Meter meter)
            {
                _journalPostLatency = meter.CreateHistogram<double>(
                    "ledger.journal_post_latency_seconds",
                    unit: "s",
                    description: "Time to post a journal entry");
        
                _journalPostsTotal = meter.CreateCounter<long>(
                    "ledger.journal_posts_total",
                    description: "Total journal posts");
        
                _journalPostsFailed = meter.CreateCounter<long>(
                    "ledger.journal_posts_failed",
                    description: "Failed journal posts");
        
                _transfersTotal = meter.CreateCounter<long>(
                    "ledger.transfers_total",
                    description: "Total transfers");
        
                _transfersFailed = meter.CreateCounter<long>(
                    "ledger.transfers_failed",
                    description: "Failed transfers");
        
                _balanceDiscrepancies = meter.CreateCounter<long>(
                    "ledger.balance_discrepancies_total",
                    description: "Balance reconciliation discrepancies");
        
                _sagaCompensations = meter.CreateCounter<long>(
                    "ledger.saga_compensations_total",
                    description: "Saga compensations");
        
                _outboxLagSeconds = meter.CreateGauge<double>(
                    "ledger.outbox_lag_seconds",
                    description: "Outbox publish lag in seconds");
        
                _reconciliationLastRun = meter.CreateGauge<double>(
                    "ledger.reconciliation_last_run_timestamp",
                    description: "Unix timestamp of last reconciliation run");
            }
    
            public void RecordJournalPost(double elapsedSeconds, bool success)
            {
                _journalPostLatency.Record(elapsedSeconds);
                _journalPostsTotal.Add(1);
                if (!success)
                    _journalPostsFailed.Add(1);
            }
    
            public void RecordTransfer(bool success)
            {
                _transfersTotal.Add(1);
                if (!success)
                    _transfersFailed.Add(1);
            }
    
            public void RecordBalanceDiscrepancy()
            {
                _balanceDiscrepancies.Add(1);
            }
    
            public void RecordSagaCompensation()
            {
                _sagaCompensations.Add(1);
            }
    
            public void SetOutboxLag(double seconds)
            {
                _outboxLagSeconds.Set(seconds);
            }
    
            public void SetReconciliationLastRun()
            {
                _reconciliationLastRun.Set(DateTimeOffset.UtcNow.ToUnixTimeSeconds());
            }
        }

Alerting Rules (Prometheus)

# prometheus/alerting_rules.yml
        groups:
          - name: fintech-critical
            rules:
              # Balance discrepancy — immediate critical
              - alert: BalanceDiscrepancyDetected
                expr: ledger_balance_discrepancies_total > 0
                for: 0m
                labels:
                  severity: critical
                annotations:
                  summary: "Balance discrepancy detected"
                  description: "Account balance mismatch: {{ $value }} discrepancies found"
      
              # Audit log integrity violation
              - alert: AuditLogIntegrityViolation
                expr: audit_log_hash_invalid_count > 0
                for: 0m
                labels:
                  severity: critical
                annotations:
                  summary: "Audit log integrity violation"
                  description: "Tampered audit log entries detected"
      
              # Transfer success rate drop
              - alert: TransferSuccessRateLow
                expr: rate(ledger_transfers_failed_total[5m]) / (rate(ledger_transfers_total[5m]) + rate(ledger_transfers_failed_total[5m])) > 0.001
                for: 2m
                labels:
                  severity: critical
                annotations:
                  summary: "Transfer success rate below 99.9%"
                  description: "Current rate: {{ $value | humanizePercentage }}"
      
              # Outbox lag
              - alert: OutboxLagHigh
                expr: ledger_outbox_lag_seconds > 30
                for: 1m
                labels:
                  severity: warning
                annotations:
                  summary: "Outbox publish lag high"
                  description: "Outbox lag: {{ $value }} seconds"
      
              # Saga compensation rate
              - alert: SagaCompensationRateHigh
                expr: rate(ledger_saga_compensations_total[5m]) / rate(ledger_transfers_total[5m]) > 0.01
                for: 5m
                labels:
                  severity: warning
                annotations:
                  summary: "High saga compensation rate"
                  description: "Compensation rate: {{ $value | humanizePercentage }}"
      
              # Reconciliation overdue
              - alert: ReconciliationOverdue
                expr: time() - ledger_reconciliation_last_run_timestamp > 3600
                for: 1m
                labels:
                  severity: warning
                annotations:
                  summary: "Reconciliation not run in over 1 hour"
                  description: "Last reconciliation: {{ $value }} seconds ago"

Chaos Engineering для Fintech

Chaos experiments для financial systems

ExperimentTargetExpected BehaviorSuccess Criteria
Network partitionLedger DB nodesCP behavior: quorum works, non-quorum blocksNo data loss, consistent ledger
DB failoverPrimary PostgreSQLStreaming replication takes overZero data loss, <30s failover
Kafka partitionEvent bus topicEvents buffered, no lossAll events eventually processed
Idempotency key collisionLedger serviceDuplicate requests handled correctlyZero duplicate transfers
Saga timeoutSaga orchestratorCompensation executes correctlySystem returns to consistent state
Outbox backlogOutbox publisherMessages processed, no lossLag returns to normal
Balance cache invalidationMaterialized viewsViews refresh correctlyDerived = materialized after refresh

Chaos Engineering Implementation

// Chaos engineering simulation for ledger
        public interface IChaosExperiment
        {
            string Name { get; }
            Task ExecuteAsync(CancellationToken ct);
            Task<bool> ValidateAsync(CancellationToken ct);
        }

        // Network partition simulation
        public class NetworkPartitionExperiment : IChaosExperiment
        {
            private readonly string _targetNode;
            private readonly ILogger<NetworkPartitionExperiment> _logger;
    
            public string Name => "Network Partition";
    
            public NetworkPartitionExperiment(string targetNode, 
                ILogger<NetworkPartitionExperiment> logger)
            {
                _targetNode = targetNode;
                _logger = logger;
            }
    
            public async Task ExecuteAsync(CancellationToken ct)
            {
                _logger.LogWarning("CHAOS: Simulating network partition for node {Node}", _targetNode);
        
                // Simulate partition: block all writes to target node
                // In production: use tc (traffic control) or cloud network ACLs
        
                await Task.Delay(TimeSpan.FromSeconds(30), ct);
        
                _logger.LogWarning("CHAOS: Restoring network for node {Node}", _targetNode);
                // Restore network
            }
    
            public async Task<bool> ValidateAsync(CancellationToken ct)
            {
                // Verify: no data inconsistency after partition recovery
                var report = await RunReconciliationAsync(ct);
                return report.Discrepancies.Count == 0;
            }
        }

        // Chaos testing pipeline
        public class ChaosPipeline
        {
            private readonly List<IChaosExperiment> _experiments;
            private readonly ILogger<ChaosPipeline> _logger;
    
            public ChaosPipeline(List<IChaosExperiment> experiments,
                ILogger<ChaosPipeline> logger)
            {
                _experiments = experiments;
                _logger = logger;
            }
    
            public async Task RunAsync(CancellationToken ct)
            {
                foreach (var experiment in _experiments)
                {
                    _logger.LogInformation("Running chaos experiment: {Name}", experiment.Name);
            
                    var preState = await CaptureStateAsync(ct);
            
                    await experiment.ExecuteAsync(ct);
            
                    var postState = await CaptureStateAsync(ct);
                    var valid = await experiment.ValidateAsync(ct);
            
                    if (!valid)
                    {
                        _logger.Critical("CHAOS FAILED: {Name} — system state inconsistent", experiment.Name);
                        await AlertOnChaosFailureAsync(experiment.Name, preState, postState, ct);
                    }
                    else
                    {
                        _logger.LogInformation("CHAOS PASSED: {Name}", experiment.Name);
                    }
                }
            }
    
            private Task<StateSnapshot> CaptureStateAsync(CancellationToken ct) => Task.FromResult(new StateSnapshot());
            private Task AlertOnChaosFailureAsync(string experiment, StateSnapshot pre, 
                StateSnapshot post, CancellationToken ct) => Task.CompletedTask;
        }

        public record StateSnapshot(
            DateTimeOffset CapturedAt,
            int JournalEntries,
            decimal TotalBalance,
            int ActiveAccounts);

Runbooks

Standard Runbooks для Fintech

Runbook: Balance Discrepancy
## Balance Discrepancy Detected

        ### Symptoms
        - Alert: BalanceDiscrepancyDetected
        - Grafana: ledger_balance_discrepancies_total > 0

        ### Immediate Actions
        1. [ ] Check reconciliation report: `GET /api/reconciliation/latest`
        2. [ ] Identify affected accounts
        3. [ ] Determine root cause:
           - Materialized view refresh failure?
           - Journal entry corruption?
           - Bug in balance calculation?
        4. [ ] If materialized view issue:
           - Manually trigger refresh: `REFRESH MATERIALIZED VIEW CONCURRENTLY account_balances;`
           - Verify discrepancy resolved
        5. [ ] If journal entry issue:
           - DO NOT modify journal entries
           - Create correcting journal entry
           - Escalate to engineering

        ### Resolution Criteria
        - [ ] All discrepancies resolved
        - [ ] Root cause identified and documented
        - [ ] Fix deployed or workaround implemented
        - [ ] Post-mortem scheduled if severity >= warning
Runbook: Outbox Backlog
## Outbox Events Backlog

        ### Symptoms
        - Alert: OutboxLagHigh (lag > 30s)
        - Grafana: ledger_outbox_lag_seconds increasing

        ### Immediate Actions
        1. [ ] Check outbox publisher health
        2. [ ] Check event bus connectivity
        3. [ ] Check consumer processing rate
        4. [ ] If publisher down:
           - Restart outbox publisher
           - Verify it picks up from last published offset
        5. [ ] If consumer slow:
           - Scale out consumers
           - Check for blocking operations
        6. [ ] If event bus issue:
           - Check Kafka/Pulsar cluster health
           - Verify topic partitions

        ### Resolution Criteria
        - [ ] Outbox lag < 5s
        - [ ] No event loss (verify outbox published count = event bus received count)
Runbook: Saga Compensation Storm
## High Saga Compensation Rate

        ### Symptoms
        - Alert: SagaCompensationRateHigh (> 1%)
        - Multiple saga compensations in logs

        ### Immediate Actions
        1. [ ] Identify failing saga step
        2. [ ] Check downstream service health
        3. [ ] If external payment gateway down:
           - Switch to backup gateway
           - Queue pending payments
        4. [ ] If internal service issue:
           - Check service logs and metrics
           - Restart if necessary
        5. [ ] Monitor compensation rate

        ### Resolution Criteria
        - [ ] Compensation rate < 0.1%
        - [ ] Root cause identified
        - [ ] Pending sagas completed or compensated

Reconciliation Pipeline

Automated Reconciliation Architecture

┌─────────────────────────────────────────────────────────────────┐
        │  Reconciliation Pipeline                                         │
        │                                                                  │
        │  ┌─────────────┐    ┌──────────────┐    ┌──────────────────┐   │
        │  │  Scheduler   │───→│  Journal     │───→│  Materialized    │   │
        │  │  (every 1h)  │    │  Query       │    │  Balance Query   │   │
        │  └─────────────┘    └──────────────┘    └──────────────────┘   │
        │                            │                      │              │
        │                            ▼                      ▼              │
        │                     ┌──────────────┐    ┌──────────────────┐   │
        │                     │  Compare     │←───│  Discrepancy     │   │
        │                     │  & Detect    │    │  Alert           │   │
        │                     └──────────────┘    └──────────────────┘   │
        │                            │                      │              │
        │                            ▼                      ▼              │
        │                     ┌──────────────┐    ┌──────────────────┐   │
        │                     │  Report      │───→│  Remediation     │   │
        │                     │  & Metrics   │    │  Pipeline        │   │
        │                     └──────────────┘    └──────────────────┘   │
        └─────────────────────────────────────────────────────────────────┘
// Reconciliation scheduler
        public class ReconciliationScheduler
        {
            private readonly ReconciliationService _reconciliation;
            private readonly LedgerMetrics _metrics;
            private readonly ILogger<ReconciliationScheduler> _logger;
            private readonly CancellationTokenSource _cts;
    
            private readonly TimeSpan _interval = TimeSpan.FromHours(1);
    
            public ReconciliationScheduler(ReconciliationService reconciliation,
                LedgerMetrics metrics, ILogger<ReconciliationScheduler> logger)
            {
                _reconciliation = reconciliation;
                _metrics = metrics;
                _logger = logger;
                _cts = new CancellationTokenSource();
            }
    
            public void Start()
            {
                _ = Task.Run(async () =>
                {
                    while (!_cts.Token.IsCancellationRequested)
                    {
                        try
                        {
                            _logger.LogInformation("Starting scheduled reconciliation");
                            var report = await _reconciliation.RunAsync(_cts.Token);
                    
                            _metrics.SetReconciliationLastRun();
                    
                            if (report.Discrepancies.Any())
                            {
                                _logger.LogWarning(
                                    "Reconciliation found {Count} discrepancies",
                                    report.Discrepancies.Count);
                            }
                            else
                            {
                                _logger.LogInformation(
                                    "Reconciliation passed: {Accounts} accounts, zero discrepancies",
                                    report.AccountsScanned);
                            }
                        }
                        catch (OperationCanceledException)
                        {
                            break;
                        }
                        catch (Exception ex)
                        {
                            _logger.LogError(ex, "Reconciliation failed");
                        }
                
                        await Task.Delay(_interval, _cts.Token);
                    }
                }, _cts.Token);
            }
    
            public void Stop()
            {
                _cts.Cancel();
            }
        }

Checklist для Operational Discipline

  • [ ] Distributed tracing: correlation ID across all services
  • [ ] Structured logging: financial-specific log events
  • [ ] OpenTelemetry: metrics, traces, logs integrated
  • [ ] Golden metrics: defined и monitored
  • [ ] Alerting: critical + warning alerts configured
  • [ ] Runbooks: standard procedures для common incidents
  • [ ] Chaos engineering: regular experiments в staging
  • [ ] Reconciliation: automated hourly + alerting
  • [ ] Post-mortem culture: blameless incident review
  • [ ] On-call rotation: 24/7 coverage для critical systems

Дополнительные материалы

  • Google SRE Workbook — chapters on monitoring, incident response
  • Netflix Chaos Engineering principles
  • Martin Kleppmann, "Designing Data-Intensive Applications" — Chapter 11: Life of Data
  • TechBullion, "Distributed Systems in U.S. Finance": https://techbullion.com/distributed-systems-in-u-s-finance-the-patterns-that-survive-production/

Практика


Контрольная точка модуля 14

Проект: Ledger-система для платёжного сервиса
  • Double-entry journal с atomic posting и CHECK constraint sum(debits) = sum(credits)
  • Append-only journal: zero UPDATE/DELETE на journal entries, enforced на уровне БД
  • Idempotency keys на каждом state-changing endpoint с UNIQUE CONSTRAINT
  • Saga pattern для cross-service транзакций с idempotent compensating transactions
  • CAP boundary: CP для core ledger, AP для edge/gateway с Kafka asynchrony
  • CQRS split: write store (PostgreSQL) → event bus → read store (materialized balances)
  • Reconciliation pipeline: automated comparison journal entries vs balances, alerting
  • Audit log: immutable, tamper-evident с cryptographic hash chain
Критерии прохождения:
  • Zero balance drift между journal entries и materialized balances под нагрузкой
  • All cross-service transactions завершаются в eventually consistent state (saga + compensation)
  • Double-entry invariant enforced на уровне БД (CHECK constraint), не в application code
  • Idempotency: zero duplicate charges при retry с одинаковым idempotency key
  • Ledger доступен только в CP mode при network partition (quorum-based)
  • Reconciliation runs automated hourly с zero unexplained discrepancies
  • Audit log tamper-evident: cryptographic verification passes for all entries