03Потоки, Таски и Асинхронность

Уровень 1: Foundation

Основы многопоточности

Thread Lifecycle

Состояния потока

Unstarted → Running → WaitSleepJoin → Stopped
                        ↓
                    AbortRequested (.NET Framework only)
СостояниеОписание
UnstartedПоток создан, но Start() не вызван
RunningПоток выполняет код
WaitSleepJoinПоток заблокирован (Wait, Sleep, Join)
StoppedПоток завершил выполнение

Создание потока

// Базовое создание
        var thread = new Thread(Work);
        thread.Start("arg");

        // С параметром через lambda
        var thread2 = new Thread(() => WorkWithParam(42));
        thread2.Start();

        // С настройками
        var thread3 = new Thread(Work)
        {
            Name = "Worker-1",
            IsBackground = true,
            Priority = ThreadPriority.AboveNormal
        };
        thread3.Start();

        void Work(object? state) => Console.WriteLine($"Working: {state}");
        void WorkWithParam(int n) => Console.WriteLine($"Working: {n}");

Управление потоком

// Join — ожидание завершения
        thread.Join();           // ждать бесконечно
        thread.Join(1000);       // ждать 1 секунду, вернуть false если не завершился

        // Interrupt — прерывание WaitSleepJoin
        thread.Interrupt();      // бросает ThreadInterruptedException

        // Thread.Sleep — блокировка текущего потока
        Thread.Sleep(100);       // НЕ используйте в async коде!

        // Thread.Yield — уступить квант времени
        Thread.Yield();          // вернуть true если ОС переключила контекст

Thread.Sleep() — почему антипаттерн в async коде

// ПЛОХО — блокирует поток из ThreadPool
        public void BadSyncMethod()
        {
            Thread.Sleep(1000);  // Поток простаивает, не может выполнять другую работу
        }

        // ХОРОШО — асинхронное ожидание
        public async Task GoodAsyncMethod()
        {
            await Task.Delay(1000);  // Поток возвращается в ThreadPool
        }

Почему Thread.Sleep() плох:

  • Блокирует поток ThreadPool на всё время сна
  • Уменьшает доступные потоки для других задач
  • Может привести к thread pool starvation
  • Не освобождает ресурсы во время ожидания

Thread vs Task vs ThreadPool

Сравнение

ХарактеристикаThreadTaskThreadPool
overheadВысокий (~1MB стек)НизкийУправляемый
СозданиеДорогоеДешёвоеУже создан
УправлениеРучноеTPL управляетАвтоматическое
ComposabilityНетWhenAll, ContinueWithНет
CancellationРучноеCancellationTokenНет
Return valueНетTask<T>Нет

ThreadPool

// QueueUserWorkItem — базовый способ
        ThreadPool.QueueUserWorkItem(state =>
        {
            Console.WriteLine($"Working on thread: {Thread.CurrentThread.ManagedThreadId}");
        }, null);

        // RegisterWaitForSingleObject — ожидание с таймаутом
        var handle = new ManualResetEvent(false);
        ThreadPool.RegisterWaitForSingleObject(
            handle,
            (state, timedOut) => Console.WriteLine("Signaled!"),
            null,
            TimeSpan.FromSeconds(5),
            executeOnlyOnce: true
        );

Task — современный подход

// Task.Run — выполнение в ThreadPool
        Task.Run(() => ComputeHeavy());

        // Task.FromResult — уже завершённая задача
        Task<int> completed = Task.FromResult(42);

        // Task.FromException — задача с ошибкой
        Task<int> faulted = Task.FromException<int>(new InvalidOperationException());

        // Task.FromCanceled — отменённая задача
        var cts = new CancellationTokenSource();
        cts.Cancel();
        Task<int> canceled = Task.FromCanceled<int>(cts.Token);

ThreadPool Internals

Базовые параметры

// Получение текущих лимитов
        ThreadPool.GetMaxThreads(out int maxWorker, out int maxIO);
        ThreadPool.GetMinThreads(out int minWorker, out int minIO);

        Console.WriteLine($"Max Worker: {maxWorker}, Max IO: {maxIO}");
        Console.WriteLine($"Min Worker: {minWorker}, Min IO: {minIO}");

        // Установка минимального количества потоков
        ThreadPool.SetMinThreads(workerThreads: 100, completionPortThreads: 100);

Динамическое управление (.NET 5+)

ThreadPool использует hill-climbing алгоритм:
        1. Мониторит throughput и latency
        2. Увеличивает потоки если очередь растёт
        3. Уменьшает если потоки простаивают
        4. Hill-climbing period: ~500ms

Внутренняя структура

ThreadPool:
        ├── Global Queue (work items от Task.Run)
        ├── Local Queues (per-thread, work-stealing)
        │   ├── Thread 1 Queue
        │   ├── Thread 2 Queue
        │   └── Thread 3 Queue
        └── IO Completion Port (async I/O callbacks)

Work Stealing

// Каждый поток имеет локальную очередь (LIFO)
        // Если очередь пуста — ворует из другой очереди (FIFO)
        // Это обеспечивает балансировку нагрузки

        // Пример демонстрации
        var tasks = Enumerable.Range(0, 1000)
            .Select(i => Task.Run(() =>
            {
                Thread.Sleep(1); // имитация работы
                return Interlocked.Increment(ref _counter);
            }))
            .ToArray();

        await Task.WhenAll(tasks);

Влияние SetMinThreads на latency

// Сценарий: burst нагрузки
        // Без SetMinThreads: ThreadPool создаёт потоки постепенно (1 поток/500ms)
        // С SetMinThreads: все потоки доступны сразу

        // Benchmark результат (1000 задач по 10ms):
        // Без SetMinThreads: ~15 секунд (потоки создаются постепенно)
        // С SetMinThreads(1000): ~1 секунда (все потоки готовы)

        // Но: установка слишком большого minThreads ведёт к:
        // - Избыточному потреблению памяти (1MB на поток)
        // - Context switching overhead
        // - Degraded performance при низкой нагрузке

Race Conditions

Что такое Race Condition

// Классический race condition
        public class Counter
        {
            private int _count;

            public void Increment()
            {
                // НЕАТОМАРНАЯ операция:
                // 1. Прочитать _count
                // 2. Увеличить на 1
                // 3. Записать _count
                _count++;
            }

            public int Count => _count;
        }

        // Демонстрация проблемы
        var counter = new Counter();
        var tasks = Enumerable.Range(0, 1000)
            .Select(_ => Task.Run(() => counter.Increment()))
            .ToArray();

        await Task.WhenAll(tasks);
        Console.WriteLine(counter.Count); // Скорее всего < 1000!

Воспроизведение и исправление

// ПЛОХО — race condition
        public class BadCache
        {
            private Dictionary<string, string> _data = new();

            public string GetOrAdd(string key, Func<string> factory)
            {
                if (!_data.TryGetValue(key, out var value))
                {
                    value = factory();
                    _data[key] = value;  // Race: другой поток мог уже добавить
                }
                return value;
            }
        }

        // ХОРОШО — с lock
        public class GoodCache
        {
            private readonly Dictionary<string, string> _data = new();
            private readonly object _lock = new();

            public string GetOrAdd(string key, Func<string> factory)
            {
                lock (_lock)
                {
                    if (!_data.TryGetValue(key, out var value))
                    {
                        value = factory();
                        _data[key] = value;
                    }
                    return value;
                }
            }
        }

        // ЛУЧШЕ — ConcurrentDictionary
        public class BetterCache
        {
            private readonly ConcurrentDictionary<string, string> _data = new();

            public string GetOrAdd(string key, Func<string> factory)
            {
                return _data.GetOrAdd(key, _ => factory());
            }
        }

Deadlocks

Что такое Deadlock

Deadlock возникает когда:
        1. Mutual Exclusion — ресурс не может быть общим
        2. Hold and Wait — поток держит ресурс и ждёт другой
        3. No Preemption — ресурс нельзя отнять
        4. Circular Wait — цикл ожидания

Классический deadlock

// DEADLOCK — два потока, два lock'а в разном порядке
        var lockA = new object();
        var lockB = new object();

        var t1 = Task.Run(() =>
        {
            lock (lockA)
            {
                Thread.Sleep(100);
                lock (lockB)  // Ждёт lockB, который держит t2
                {
                    Console.WriteLine("T1 done");
                }
            }
        });

        var t2 = Task.Run(() =>
        {
            lock (lockB)
            {
                Thread.Sleep(100);
                lock (lockA)  // Ждёт lockA, который держит t1
                {
                    Console.WriteLine("T2 done");
                }
            }
        });

        await Task.WhenAll(t1, t2); // Никогда не завершится!

Async Deadlock

// DEADLOCK — sync over async
        public string GetData()
        {
            return GetDataAsync().Result;  // Блокирует поток
            // или .Wait()
        }

        public async Task<string> GetDataAsync()
        {
            await Task.Delay(100);
            return "data";
        }

        // Почему deadlock:
        // 1. GetData() блокирует UI/SynchronizationContext поток
        // 2. GetDataAsync() пытается вернуться на тот же поток после await
        // 3. Поток заблокирован → await никогда не завершится

        // ИСПРАВЛЕНИЕ:
        public async Task<string> GetDataFixed()
        {
            return await GetDataAsync();  // async all the way
        }

Исправление deadlock

// 1. Фиксированный порядок lock'ов
        public void SafeMethod()
        {
            // Всегда lockA, потом lockB
            lock (lockA)
            lock (lockB)
            {
                // работа
            }
        }

        // 2. Timeout с Monitor.TryEnter
        if (Monitor.TryEnter(lockA, TimeSpan.FromSeconds(1)))
        {
            try
            {
                if (Monitor.TryEnter(lockB, TimeSpan.FromSeconds(1)))
                {
                    try { /* работа */ }
                    finally { Monitor.Exit(lockB); }
                }
            }
            finally { Monitor.Exit(lockA); }
        }

        // 3. Async all the way
        public async Task<string> GetDataAsync()
        {
            await using var resource = await AcquireResourceAsync();
            return await resource.ProcessAsync();
        }

Livelocks и Starvation

Livelock

// Livelock — потоки активны, но не прогрессируют
        // Пример: два потока постоянно уступают друг другу

        public class LivelockDemo
        {
            private static bool _t1Active = true;

            public static void Run()
            {
                var t1 = Task.Run(() =>
                {
                    while (true)
                    {
                        if (_t1Active)
                        {
                            // Пытаемся работать, но уступаем
                            Thread.Sleep(1);
                            _t1Active = false;
                        }
                    }
                });

                var t2 = Task.Run(() =>
                {
                    while (true)
                    {
                        if (!_t1Active)
                        {
                            Thread.Sleep(1);
                            _t1Active = true;
                        }
                    }
                });
            }
        }

Starvation

// Starvation — поток никогда не получает ресурс
        // Пример: приоритетные потоки постоянно抢占 низкоприоритетные

        var lowPriority = Task.Factory.StartNew(
            () => Console.WriteLine("Low priority"),
            CancellationToken.None,
            TaskCreationOptions.LongRunning,
            TaskScheduler.Default
        );

        // Высокоприоритетные задачи могут не дать выполниться низкоприоритетной

Memory Visibility и Happens-Before

Проблема видимости

// БЕЗ синхронизации — другой поток может не увидеть изменение
        private bool _flag = false;
        private int _data = 0;

        // Поток 1
        _data = 42;
        _flag = true;

        // Поток 2
        if (_flag)
        {
            Console.WriteLine(_data); // Может быть 0! (без синхронизации)
        }

Happens-Before Relationship

Happens-Before гарантирует:
        1. lock release → следующий lock acquire
        2. volatile write → следующий volatile read
        3. Thread.Start() → код в запущенном потоке
        4. Код в потоке → Thread.Join() завершение
        5. Monitor.Exit → Monitor.Enter на том же объекте

volatile

// volatile гарантирует видимость между потоками
        private volatile bool _shouldStop = false;

        public void Stop() => _shouldStop = true;

        public void Run()
        {
            while (!_shouldStop)  // Всегда видит последнее значение
            {
                // работа
            }
        }

Memory Barriers

// Thread.MemoryBarrier() — полный memory barrier
        // Запрещает reorder инструкций CPU и компилятора

        int x = 0, y = 0;
        bool ready = false;

        // Поток 1
        x = 1;
        Thread.MemoryBarrier();  // Запрещает reorder
        ready = true;

        // Поток 2
        if (ready)
        {
            Thread.MemoryBarrier();
            Console.WriteLine(x); // Всегда 1
        }

Практика

Задание 1: Воспроизвести deadlock и race condition

// 1. Создайте проект и воспроизведите race condition с counter
        // 2. Создайте deadlock с двумя lock'ами
        // 3. Исправьте оба с помощью lock и ConcurrentDictionary

Задание 2: Benchmark Thread vs ThreadPool vs Task.Run

// Используйте BenchmarkDotNet для сравнения:
        // - new Thread() + Start()
        // - ThreadPool.QueueUserWorkItem()
        // - Task.Run()
        // Измерьте overhead создания и выполнения

Задание 3: Измерить влияние SetMinThreads

// 1. Запустите 1000 задач по 10ms без SetMinThreads
        // 2. Запустите с ThreadPool.SetMinThreads(1000, 1000)
        // 3. Сравните total execution time

Контрольные вопросы

  1. Почему Thread.Sleep() антипаттерн в async коде?
  2. Что такое thread pool starvation и как его избежать?
  3. В чём разница между Task.Run и ConfigureAwait(false)?

Синхронизация

lock / Monitor

lock — синтаксический сахар

// Это:
        lock (obj)
        {
            // критическая секция
        }

        // Эквивалентно:
        bool lockTaken = false;
        try
        {
            Monitor.Enter(obj, ref lockTaken);
            // критическая секция
        }
        finally
        {
            if (lockTaken) Monitor.Exit(obj);
        }

Monitor — расширенные возможности

private readonly object _lock = new();

        public void WithTimeout()
        {
            // TryEnter с таймаутом — не блокирует бесконечно
            if (Monitor.TryEnter(_lock, TimeSpan.FromSeconds(1)))
            {
                try
                {
                    // критическая секция
                }
                finally
                {
                    Monitor.Exit(_lock);
                }
            }
            else
            {
                Console.WriteLine("Не удалось получить lock");
            }
        }

        public void WithWaitPulse()
        {
            lock (_lock)
            {
                while (!condition)
                {
                    Monitor.Wait(_lock);  // Освобождает lock и ждёт
                }
                // работа
                Monitor.Pulse(_lock);     // Уведомляет один ждущий поток
                Monitor.PulseAll(_lock);  // Уведомляет все ждущие потоки
            }
        }

Spin Lock Behavior

// Monitor использует spin wait перед блокировкой:
        // 1. Кратковременно "крутится" (spin) — не переключает контекст
        // 2. Если lock не доступен — переходит в реальную блокировку
        // 3. Spin count зависит от версии .NET и нагрузки

        // SpinLock — явный spin lock (для очень коротких критических секций)
        var spinLock = new SpinLock();
        bool lockTaken = false;
        spinLock.Enter(ref lockTaken);
        try
        {
            // ОЧЕНЬ короткая операция (< 100ns)
        }
        finally
        {
            if (lockTaken) spinLock.Exit();
        }

Когда использовать lock

Используйте lock когда:
        ✅ Критическая секция короткая (< 1ms)
        ✅ Нет риска deadlock
        ✅ Нужна простота

        НЕ используйте lock когда:
        ❌ Долгие операции в критической секции
        ❌ Нужен timeout
        ❌ Read-heavy workload (используйте ReaderWriterLockSlim)
        ❌ Нужна async совместимость (используйте SemaphoreSlim)

Mutex

Mutex — межпроцессная синхронизация

// Mutex — kernel object, работает между процессами
        // Медленнее lock (~100x), но跨процессный

        // Именованный Mutex — для межпроцессной синхронизации
        using var mutex = new Mutex(false, "Global\\MyAppSingleton");

        if (mutex.WaitOne(TimeSpan.Zero))
        {
            try
            {
                // Только один экземпляр приложения
                Console.WriteLine("Первый экземпляр");
                Console.ReadLine();
            }
            finally
            {
                mutex.ReleaseMutex();
            }
        }
        else
        {
            Console.WriteLine("Приложение уже запущено");
        }

Mutex vs lock

Характеристикаlock (Monitor)Mutex
СкоростьБыстрый (~25ns)Медленный (~2500ns)
МежпроцессныйНетДа
РекурсивныйДаДа (по умолчанию)
Может быть abandonedНетДа

Semaphore / SemaphoreSlim

SemaphoreSlim — асинхронный семафор

// SemaphoreSlim — ограничивает количество одновременных доступов
        // Поддерживает async WaitAsync()

        private readonly SemaphoreSlim _semaphore = new(3, 3); // max 3 параллельно

        public async Task ProcessAsync(int id)
        {
            await _semaphore.WaitAsync();
            try
            {
                Console.WriteLine($"[{id}] Начал обработку");
                await Task.Delay(1000);
                Console.WriteLine($"[{id}] Завершил обработку");
            }
            finally
            {
                _semaphore.Release();
            }
        }

        // Использование
        var tasks = Enumerable.Range(1, 10)
            .Select(i => ProcessAsync(i));
        await Task.WhenAll(tasks);

Semaphore — kernel object

// Semaphore — kernel object, межпроцессный
        using var semaphore = new Semaphore(3, 3, "MySemaphore");

        semaphore.WaitOne();
        try
        {
            // работа
        }
        finally
        {
            semaphore.Release();
        }

Когда использовать

ПримитивКогда использовать
SemaphoreSlimОграничение параллелизма, async код
SemaphoreМежпроцессное ограничение
lockMutual exclusion (1 поток)

ReaderWriterLockSlim

Read-heavy оптимизация

// ReaderWriterLockSlim — multiple readers OR single writer
        // Оптимален для read-heavy workloads

        public class ThreadSafeCache<TKey, TValue>
        {
            private readonly Dictionary<TKey, TValue> _data = new();
            private readonly ReaderWriterLockSlim _rwLock = new();

            public TValue Get(TKey key)
            {
                _rwLock.EnterReadLock();
                try
                {
                    return _data[key];
                }
                finally
                {
                    _rwLock.ExitReadLock();
                }
            }

            public void Set(TKey key, TValue value)
            {
                _rwLock.EnterWriteLock();
                try
                {
                    _data[key] = value;
                }
                finally
                {
                    _rwLock.ExitWriteLock();
                }
            }

            public TValue GetOrAdd(TKey key, Func<TKey, TValue> factory)
            {
                // Сначала пробуем read lock
                _rwLock.EnterUpgradeableReadLock();
                try
                {
                    if (_data.TryGetValue(key, out var value))
                        return value;

                    // Апгрейд до write lock
                    _rwLock.EnterWriteLock();
                    try
                    {
                        value = factory(key);
                        _data[key] = value;
                        return value;
                    }
                    finally
                    {
                        _rwLock.ExitWriteLock();
                    }
                }
                finally
                {
                    _rwLock.ExitUpgradeableReadLock();
                }
            }
        }

Режимы блокировки

EnterReadLock()        — множественные читатели
        EnterWriteLock()       — единственный писатель
        EnterUpgradeableReadLock() — читатель с возможностью апгрейда

Performance сравнение

1000 readers, 10 writers:
        - lock:                    ~500ms (сериализует всё)
        - ReaderWriterLockSlim:    ~50ms  (readers параллельны)
        - ConcurrentDictionary:    ~30ms  (lock-free internals)

Event Wait Handles

ManualResetEvent

// ManualResetEvent — остаётся сигнальным до Reset()
        // Как переключатель: ON/OFF

        var mre = new ManualResetEvent(false);

        // Поток-ожидатель
        Task.Run(() =>
        {
            Console.WriteLine("Жду сигнал...");
            mre.WaitOne();  // Блокируется до сигнала
            Console.WriteLine("Получил сигнал!");
        });

        // Поток-сигнализатор
        Thread.Sleep(2000);
        mre.Set();    // Сигнал — все ждущие продолжат
        // mre.Reset(); // Сброс — следующие WaitOne будут блокироваться

AutoResetEvent

// AutoResetEvent — автоматически сбрасывается после одного WaitOne
        // Как турникет: один прошёл — закрылся

        var are = new AutoResetEvent(false);

        // Только ОДИН поток получит сигнал
        Task.Run(() => { are.WaitOne(); Console.WriteLine("T1"); });
        Task.Run(() => { are.WaitOne(); Console.WriteLine("T2"); });

        are.Set();  // Только T1 или T2 продолжит, другой останется ждать

CountdownEvent

// CountdownEvent — ждёт N сигналов
        // Идеально для "ждём завершения N задач"

        var countdown = new CountdownEvent(3);

        Task.Run(() => { Thread.Sleep(1000); countdown.Signal(); });
        Task.Run(() => { Thread.Sleep(2000); countdown.Signal(); });
        Task.Run(() => { Thread.Sleep(1500); countdown.Signal(); });

        countdown.Wait();  // Ждёт пока Count станет 0
        Console.WriteLine("Все задачи завершены!");

        // С CancellationToken
        countdown.Wait(cancellationToken);

Сравнение Event Wait Handles

ПримитивПоведениеUse case
ManualResetEventОстаётся сигнальным"Готово, работайте все"
AutoResetEventСбрасывается после одного"Один за раз"
CountdownEventЖдёт N сигналов"Ждём N задач"
ManualResetEventSlimБыстрая user-mode версияКороткие ожидания

Barrier

Coordinated Iteration

// Barrier — синхронизирует N потоков на определённой фазе
        // Все потоки должны достичь Barrier прежде чем продолжить

        var barrier = new Barrier(3, b =>
        {
            // Post-phase action — выполняется когда все достигли barrier
            Console.WriteLine($"--- Фаза {b.CurrentPhaseNumber} завершена ---");
        });

        var tasks = new[]
        {
            Task.Run(() =>
            {
                for (int i = 0; i < 3; i++)
                {
                    Console.WriteLine($"T1: фаза {i}");
                    barrier.SignalAndWait();
                }
            }),
            Task.Run(() =>
            {
                for (int i = 0; i < 3; i++)
                {
                    Console.WriteLine($"T2: фаза {i}");
                    barrier.SignalAndWait();
                }
            }),
            Task.Run(() =>
            {
                for (int i = 0; i < 3; i++)
                {
                    Console.WriteLine($"T3: фаза {i}");
                    barrier.SignalAndWait();
                }
            })
        };

        await Task.WhenAll(tasks);

Barrier с добавлением/удалением участников

var barrier = new Barrier(2);

        // Динамическое изменение участников
        barrier.AddParticipant();   // Теперь 3 участника
        barrier.RemoveParticipant(); // Снова 2

Практика

Задание 1: Thread-safe cache с TTL и max size

// Требования:
        // - GetOrAdd с TTL
        // - Max size с eviction (LRU или oldest first)
        // - ReaderWriterLockSlim для оптимизации reads
        // - Background cleanup expired entries

        public class TtlCache<TKey, TValue>
        {
            private readonly int _maxSize;
            private readonly TimeSpan _ttl;
            private readonly Dictionary<TKey, (TValue Value, DateTime Expires)> _data = new();
            private readonly ReaderWriterLockSlim _lock = new();
            private readonly Timer _cleanupTimer;

            public TtlCache(int maxSize, TimeSpan ttl)
            {
                _maxSize = maxSize;
                _ttl = ttl;
                _cleanupTimer = new Timer(Cleanup, null, ttl, ttl);
            }

            public TValue GetOrAdd(TKey key, Func<TKey, TValue> factory)
            {
                // Реализуйте с UpgradeableReadLock
                // Проверьте TTL
                // Evict если maxSize reached
            }

            private void Cleanup(object? state)
            {
                // Удалите expired entries
                // Используйте WriteLock
            }
        }

Задание 2: Producer-Consumer queue с backpressure

// Требования:
        // - Bounded capacity
        // - Backpressure — producer ждёт если queue full
        // - Graceful shutdown
        // - Используйте SemaphoreSlim

        public class BoundedQueue<T>
        {
            private readonly Queue<T> _queue = new();
            private readonly SemaphoreSlim _spaceAvailable;
            private readonly SemaphoreSlim _itemAvailable;
            private readonly object _lock = new();
            private bool _completed;

            public BoundedQueue(int capacity)
            {
                _spaceAvailable = new SemaphoreSlim(capacity, capacity);
                _itemAvailable = new SemaphoreSlim(0, capacity);
            }

            public async Task EnqueueAsync(T item, CancellationToken ct = default)
            {
                await _spaceAvailable.WaitAsync(ct);
                lock (_lock)
                {
                    _queue.Enqueue(item);
                }
                _itemAvailable.Release();
            }

            public async Task<T> DequeueAsync(CancellationToken ct = default)
            {
                await _itemAvailable.WaitAsync(ct);
                lock (_lock)
                {
                    return _queue.Dequeue();
                }
            }

            public void Complete()
            {
                _completed = true;
                // Разбудите всех ждущих consumers
            }
        }

Задание 3: Connection pool с SemaphoreSlim

// Требования:
        // - Ограничение количества подключений
        // - Reuse connections
        // - Timeout на получение connection
        // - Health check

        public class ConnectionPool : IAsyncDisposable
        {
            private readonly SemaphoreSlim _semaphore;
            private readonly Queue<IDbConnection> _available = new();
            private readonly string _connectionString;
            private readonly int _maxSize;

            public ConnectionPool(string connectionString, int maxSize)
            {
                _connectionString = connectionString;
                _maxSize = maxSize;
                _semaphore = new SemaphoreSlim(maxSize, maxSize);
            }

            public async Task<PooledConnection> GetAsync(CancellationToken ct = default)
            {
                await _semaphore.WaitAsync(ct);
                // Возьмите connection из queue или создайте новое
            }

            public async ValueTask DisposeAsync()
            {
                // Закройте все connections
            }
        }

        public class PooledConnection : IAsyncDisposable
        {
            private readonly ConnectionPool _pool;
            private readonly IDbConnection _connection;

            public async ValueTask DisposeAsync()
            {
                // Верните connection в pool
            }
        }

Шпаргалка: Выбор примитива синхронизации

СценарийПримитив
Короткая критическая секцияlock
Async кодSemaphoreSlim
Read-heavy данныеReaderWriterLockSlim
Ограничение параллелизмаSemaphoreSlim
Producer-ConsumerChannel<T> или BlockingCollection<T>
Межпроцессная синхронизацияMutex или Semaphore
Signal одному потокуAutoResetEvent
Signal всем потокамManualResetEvent
Ждать N потоковCountdownEvent
Синхронизация фазBarrier
Lock-free счётчикInterlocked

Thread-Safe Коллекции

ConcurrentDictionary

Internals

ConcurrentDictionary использует:
        - Lock striping (сегментация lock'ов) — не один lock на всю коллекцию
        - Каждый bucket имеет свой lock
        - CAS операции для lock-free reads
        - Dynamic resizing с lock migration

Основные операции

var dict = new ConcurrentDictionary<string, int>();

        // Thread-safe добавление/получение
        dict.TryAdd("key", 42);
        dict.TryGetValue("key", out var value);
        dict.TryUpdate("key", 100, 42);  // Обновить только если current == 42
        dict.TryRemove("key", out var removed);

        // GetOrAdd — атомарно
        // ВАЖНО: factory вызывается ВНУТРИ lock, но значение может быть выброшено
        // если другой поток уже добавил значение
        int result = dict.GetOrAdd("key", k => ComputeExpensive(k));

        // AddOrUpdate — атомарно
        int updated = dict.AddOrUpdate(
            "key",
            k => 1,                        // add factory
            (k, v) => v + 1                // update factory
        );

GetOrAdd caveat

// ВАЖНО: factory в GetOrAdd может вызваться несколько раз!
        // Только одно значение будет сохранено, но factory вызывается для каждого конкурентного потока

        var dict = new ConcurrentDictionary<string, Lazy<ExpensiveObject>>();

        // ПЛОХО — factory может вызваться N раз
        var obj = dict.GetOrAdd("key", _ => new ExpensiveObject());

        // ХОРОШО — Lazy гарантирует однократное создание
        var lazy = dict.GetOrAdd("key", _ => new Lazy<ExpensiveObject>(() => new ExpensiveObject()));
        var obj = lazy.Value;  // Создается только один раз

Когда использовать ConcurrentDictionary

Используйте ConcurrentDictionary когда:
        ✅ Высокая конкурентность (много потоков)
        ✅ Частые reads, умеренные writes
        ✅ Нужна thread safety без ручных lock'ов

        НЕ используйте когда:
        ❌ Один поток (обычный Dictionary быстрее)
        ❌ Нужна атомарность нескольких операций (используйте lock)
        ❌ Нужен порядок элементов (ConcurrentDictionary не гарантирует порядок)

ConcurrentQueue

Lock-free очередь

// ConcurrentQueue — lock-free MPMC (multi-producer, multi-consumer)
        // Использует CAS и segmented array

        var queue = new ConcurrentQueue<int>();

        // Enqueue — lock-free
        queue.Enqueue(1);
        queue.Enqueue(2);
        queue.Enqueue(3);

        // TryDequeue — lock-free
        while (queue.TryDequeue(out var item))
        {
            Console.WriteLine(item);
        }

        // TryPeek — может быть stale (другой поток уже dequeued)
        if (queue.TryPeek(out var peeked))
        {
            Console.WriteLine($"Следующий: {peeked}");
        }

        // Count — approximate (может измениться сразу после чтения)
        Console.WriteLine($"Размер: {queue.Count}");

Producer-Consumer с ConcurrentQueue

var queue = new ConcurrentQueue<int>();
        var signal = new AutoResetEvent(false);
        bool completed = false;

        // Producer
        Task.Run(() =>
        {
            for (int i = 0; i < 1000; i++)
            {
                queue.Enqueue(i);
                signal.Set();  // Сигнал consumer
            }
            completed = true;
            signal.Set();
        });

        // Consumer
        Task.Run(() =>
        {
            while (true)
            {
                while (queue.TryDequeue(out var item))
                {
                    Process(item);
                }

                if (completed && queue.IsEmpty)
                    break;

                signal.WaitOne(100);  // Ждём с таймаутом
            }
        });

ConcurrentBag

Thread-local оптимизация

// ConcurrentBag оптимизирован для сценария:
        // - Один поток добавляет и забирает (thread-local)
        // - Другие потоки occasionally "steal" элементы

        var bag = new ConcurrentBag<int>();

        // Добавление — thread-local, очень быстрое
        bag.Add(1);
        bag.Add(2);
        bag.Add(3);

        // Извлечение — сначала из thread-local, потом steal
        bool success = bag.TryTake(out var item);
        bool peek = bag.TryPeek(out var peeked);

        // Когда использовать:
        // ✅ Producer и consumer — один поток
        // ✅ Work-stealing scenarios
        // ❌ Когда нужен порядок (bag не гарантирует FIFO)

BlockingCollection

Producer-Consumer из коробки

// BlockingCollection — обёртка над IProducerConsumerCollection
        // Блокирует когда пусто (для Take) или полно (для Add с bounded)

        // Unbounded
        var collection = new BlockingCollection<int>();

        // Bounded — с backpressure
        var bounded = new BlockingCollection<int>(boundedCapacity: 100);

        // Producer
        Task.Run(() =>
        {
            for (int i = 0; i < 1000; i++)
            {
                bounded.Add(i);  // Блокирует если capacity достигнут
            }
            bounded.CompleteAdding();  // Сигнал consumers
        });

        // Consumer
        Task.Run(() =>
        {
            foreach (var item in bounded.GetConsumingEnumerable())
            {
                Process(item);
            }
            // Выход после CompleteAdding + очередь пуста
        });

        // Multiple consumers
        var consumers = Enumerable.Range(0, 4)
            .Select(_ => Task.Run(() =>
            {
                foreach (var item in bounded.GetConsumingEnumerable())
                {
                    Process(item);
                }
            }))
            .ToArray();

        await Task.WhenAll(consumers);

TryAdd/TryTake с таймаутом

var collection = new BlockingCollection<int>(10);

        // TryAdd — не блокирует бесконечно
        if (collection.TryAdd(42, TimeSpan.FromSeconds(1)))
        {
            Console.WriteLine("Добавлено");
        }
        else
        {
            Console.WriteLine("Timeout — collection full");
        }

        // TryTake
        if (collection.TryTake(out var item, TimeSpan.FromSeconds(1)))
        {
            Console.WriteLine($"Взято: {item}");
        }
        else
        {
            Console.WriteLine("Timeout — collection empty");
        }

Channel

Modern Producer-Consumer

// Channel<T> — современный подход (.NET Core 3.0+)
        // Более производительный чем BlockingCollection
        // Поддерживает async I/O pattern

        using System.Threading.Channels;

        // Unbounded channel
        var unbounded = Channel.CreateUnbounded<int>();

        // Bounded channel — с backpressure
        var bounded = Channel.CreateBounded<int>(new BoundedChannelOptions(100)
        {
            FullMode = BoundedChannelFullMode.Wait,      // Ждать (по умолчанию)
            // FullMode = BoundedChannelFullMode.DropOldest,  // Удалить oldest
            // FullMode = BoundedChannelFullMode.DropNewest,  // Удалить newest
            // FullMode = BoundedChannelFullMode.DropWrite,   // Пропустить write
            SingleReader = false,
            SingleWriter = false
        });

        // Single reader/writer оптимизация
        var singleReader = Channel.CreateBounded<int>(new BoundedChannelOptions(100)
        {
            SingleReader = true,   // Оптимизация если один consumer
            SingleWriter = true    // Оптимизация если один producer
        });

Producer-Consumer с Channel

var channel = Channel.CreateBounded<int>(10);

        // Producer
        var producer = Task.Run(async () =>
        {
            for (int i = 0; i < 1000; i++)
            {
                await channel.Writer.WriteAsync(i);
                Console.WriteLine($"Written: {i}");
            }
            channel.Writer.Complete();  // Сигнал consumers
        });

        // Consumer
        var consumer = Task.Run(async () =>
        {
            await foreach (var item in channel.Reader.ReadAllAsync())
            {
                Console.WriteLine($"Read: {item}");
                await ProcessAsync(item);
            }
        });

        await Task.WhenAll(producer, consumer);

Multiple Producers/Consumers

var channel = Channel.CreateBounded<int>(new BoundedChannelOptions(1000)
        {
            SingleReader = false,
            SingleWriter = false
        });

        // Multiple producers
        var producers = Enumerable.Range(0, 4)
            .Select(i => Task.Run(async () =>
            {
                for (int j = 0; j < 250; j++)
                {
                    await channel.Writer.WriteAsync(i * 250 + j);
                }
            }))
            .ToArray();

        // Multiple consumers
        var consumers = Enumerable.Range(0, 4)
            .Select(_ => Task.Run(async () =>
            {
                await foreach (var item in channel.Reader.ReadAllAsync())
                {
                    await ProcessAsync(item);
                }
            }))
            .ToArray();

        await Task.WhenAll(producers);
        channel.Writer.Complete();
        await Task.WhenAll(consumers);

Channel vs BlockingCollection

ХарактеристикаChannelBlockingCollection
Async supportПолное (ValueTask)Нет (только sync)
PerformanceВышеНиже
BackpressureBoundedChannelFullModeBounded capacity
APIModern, async-firstLegacy, sync
CompletionWriter.Complete()CompleteAdding()

Compare-and-Swap (CAS) и Interlocked

CAS Operations

CAS (Compare-And-Swap) — атомарная операция:
        1. Сравнить текущее значение с ожидаемым
        2. Если равно — заменить на новое
        3. Вернуть старое значение

        Реализуется через CPU инструкцию CMPXCHG

Interlocked методы

private int _counter = 0;
        private long _total = 0;

        // Increment/Decrement
        Interlocked.Increment(ref _counter);
        Interlocked.Decrement(ref _counter);

        // Add
        Interlocked.Add(ref _total, 100);

        // Exchange (атомарная замена)
        int old = Interlocked.Exchange(ref _counter, 0);

        // CompareExchange (CAS)
        // Если _counter == expected, заменить на newValue
        // Возвращает original value
        int original = Interlocked.CompareExchange(ref _counter, newValue, expected);

        // Pattern: retry loop
        do
        {
            original = _counter;
            newValue = original + 1;
        } while (Interlocked.CompareExchange(ref _counter, newValue, original) != original);

Interlocked для объектов

private object? _cache = null;

        public object GetOrCreate()
        {
            var current = Volatile.Read(ref _cache);
            if (current != null)
                return current;

            var created = new object();
            var original = Interlocked.CompareExchange(ref _cache, created, null);

            // Если original != null, другой поток уже установил значение
            return original ?? created;
        }

Interlocked.Read для 64-битных значений

// На 32-битных системах чтение long не атомарно
        // Interlocked.Read гарантирует атомарность

        private long _value;

        public long GetValue()
        {
            return Interlocked.Read(ref _value);
        }

        public void SetValue(long newValue)
        {
            Interlocked.Exchange(ref _value, newValue);
        }

Практика

Задание 1: Rate Limiter через ConcurrentDictionary + Interlocked

// Token bucket algorithm
        public class RateLimiter
        {
            private readonly ConcurrentDictionary<string, Bucket> _buckets = new();
            private readonly int _maxTokens;
            private readonly TimeSpan _refillInterval;

            private record Bucket(int Tokens, DateTime LastRefill);

            public RateLimiter(int maxTokens, TimeSpan refillInterval)
            {
                _maxTokens = maxTokens;
                _refillInterval = refillInterval;
            }

            public bool TryConsume(string clientId)
            {
                // Реализуйте:
                // 1. GetOrAdd bucket для клиента
                // 2. Refill tokens если прошло достаточно времени
                // 3. Interlocked.Decrement если tokens > 0
                // 4. Вернуть true если token получен
            }

            public int GetRemainingTokens(string clientId)
            {
                // Вернуть оставшиеся tokens
            }
        }

Задание 2: Distributed Lock Simulator с Channel

// Симуляция distributed lock через messaging
        public class DistributedLockSimulator
        {
            private readonly Channel<LockMessage> _messageChannel;
            private readonly Dictionary<string, LockState> _locks = new();

            private record LockMessage(string LockName, string ClientId, LockAction Action);
            private enum LockAction { Acquire, Release }
            private record LockState(string Owner, DateTime AcquiredAt);

            public DistributedLockSimulator()
            {
                _messageChannel = Channel.CreateUnbounded<LockMessage>();
                _ = ProcessMessagesAsync();
            }

            public async Task<bool> AcquireAsync(string lockName, string clientId)
            {
                await _messageChannel.Writer.WriteAsync(
                    new LockMessage(lockName, clientId, LockAction.Acquire));
                // Ждать response
            }

            public async Task ReleaseAsync(string lockName, string clientId)
            {
                await _messageChannel.Writer.WriteAsync(
                    new LockMessage(lockName, clientId, LockAction.Release));
            }

            private async Task ProcessMessagesAsync()
            {
                await foreach (var msg in _messageChannel.Reader.ReadAllAsync())
                {
                    // Обработать lock/unlock
                }
            }
        }

Задание 3: Thread-Safe Event Log с Bounded Capacity

public class ThreadSafeEventLog
        {
            private readonly ConcurrentQueue<LogEntry> _entries = new();
            private readonly int _maxCapacity;
            private int _count;

            public record LogEntry(DateTime Timestamp, string Level, string Message);

            public ThreadSafeEventLog(int maxCapacity)
            {
                _maxCapacity = maxCapacity;
            }

            public void Write(string level, string message)
            {
                // Реализуйте:
                // 1. Interlocked.Increment для _count
                // 2. Если _count > _maxCapacity, удалить oldest (TryDequeue)
                // 3. Enqueue новую запись
            }

            public IReadOnlyList<LogEntry> GetEntries(int count = 100)
            {
                // Вернуть последние count entries
                // Snapshot — не блокирует writes
            }

            public int Count => _count;
        }

Шпаргалка: Выбор коллекции

СценарийКоллекция
High-concurrency key-valueConcurrentDictionary
Producer-Consumer (sync)BlockingCollection
Producer-Consumer (async)Channel<T>
FIFO очередь (multi-thread)ConcurrentQueue
Work-stealingConcurrentBag
Lock-free счётчикInterlocked
Lock-free CASInterlocked.CompareExchange

Async/Await Deep Dive

State Machine Generated by Compiler

Как работает async/await

// Исходный код
        public async Task<int> ComputeAsync()
        {
            var a = await GetValueAsync(1);
            var b = await GetValueAsync(2);
            return a + b;
        }

        // Компилятор генерирует state machine (упрощённо):
        [CompilerGenerated]
        private struct <ComputeAsync>d__0 : IAsyncStateMachine
        {
            public int <>1__state;
            public AsyncTaskMethodBuilder<int> <>t__builder;
            private TaskAwaiter<int> <>u__1;
            private int <a>5__1;
            private int <b>5__2;

            void IAsyncStateMachine.MoveNext()
            {
                int result;
                try
                {
                    switch (<>1__state)
                    {
                        case 0:
                            // await GetValueAsync(1)
                            var awaiter1 = GetValueAsync(1).GetAwaiter();
                            if (!awaiter1.IsCompleted)
                            {
                                <>1__state = 0;
                                <>u__1 = awaiter1;
                                <>t__builder.AwaitUnsafeOnCompleted(ref awaiter1, ref this);
                                return;
                            }
                            goto case 1;

                        case 1:
                            <a>5__1 = <>u__1.GetResult();
                            // await GetValueAsync(2)
                            var awaiter2 = GetValueAsync(2).GetAwaiter();
                            if (!awaiter2.IsCompleted)
                            {
                                <>1__state = 1;
                                <>u__1 = awaiter2;
                                <>t__builder.AwaitUnsafeOnCompleted(ref awaiter2, ref this);
                                return;
                            }
                            goto case 2;

                        case 2:
                            <b>5__2 = <>u__1.GetResult();
                            result = <a>5__1 + <b>5__2;
                            break;
                    }
                }
                catch (Exception ex)
                {
                    <>1__state = -2;
                    <>t__builder.SetException(ex);
                    return;
                }
                <>1__state = -2;
                <>t__builder.SetResult(result);
            }
        }

IL код (ключевые моменты)

// AsyncMethodBuilder создаётся
        call instance void [System.Runtime]System.Runtime.CompilerServices.AsyncTaskMethodBuilder::Start<...>(!!0&)

        // GetAwaiter вызывается
        call instance valuetype ... Task::GetAwaiter()

        // IsCompleted проверяется
        call instance bool TaskAwaiter::get_IsCompleted()

        // Если false — AwaitUnsafeOnCompleted
        call instance void AsyncTaskMethodBuilder::AwaitUnsafeOnCompleted<...>(!!0&, !!1&)

        // Если true — GetResult
        call instance !0 TaskAwaiter::GetResult()

Разбор состояний

State -1: Start (initial)
        State  0: После первого await
        State  1: После второго await
        State -2: Complete

        Каждый await — потенциальная точка возврата
        MoveNext() вызывается при каждом continuation

SynchronizationContext

Что такое SynchronizationContext

// SynchronizationContext — абстракция для "где продолжить"
        // Определяет на каком потоке продолжится код после await

        // Основные реализации:
        // 1. WindowsFormsSynchronizationContext — UI поток WinForms
        // 2. DispatcherSynchronizationContext — UI поток WPF
        // 3. AspNetSynchronizationContext — ASP.NET (legacy)
        // 4. null — Console/ThreadPool (продолжение на любом ThreadPool потоке)

Как работает захват контекста

// UI приложение (WPF/WinForms):
        public async Task<Button> LoadDataAsync()
        {
            // Здесь: SynchronizationContext = DispatcherSynchronizationContext
            var data = await httpClient.GetStringAsync(url);
            // После await: продолжение на UI потоке (захвачен контекст)
            button.Content = data;  // OK — UI поток
            return button;
        }

        // Console приложение:
        public async Task ProcessAsync()
        {
            // Здесь: SynchronizationContext = null
            var data = await httpClient.GetStringAsync(url);
            // После await: продолжение на любом ThreadPool потоке
            Console.WriteLine(data);
        }

Когда SynchronizationContext мешает

// DEADLOCK в UI приложении
        public string GetData()
        {
            return GetDataAsync().Result;  // Блокирует UI поток
        }

        public async Task<string> GetDataAsync()
        {
            var data = await httpClient.GetStringAsync(url);
            // Пытается вернуться на UI поток — но он заблокирован!
            return data;
        }

        // ИСПРАВЛЕНИЕ 1: ConfigureAwait(false)
        public async Task<string> GetDataAsync()
        {
            var data = await httpClient.GetStringAsync(url).ConfigureAwait(false);
            // Продолжение на ThreadPool, не на UI
            return data;
        }

        // ИСПРАВЛЕНИЕ 2: async all the way
        public async Task<string> GetDataAsync()
        {
            return await GetDataInternalAsync();
        }

ConfigureAwait(false)

Где обязательно

// ✅ В библиотечном коде — всегда ConfigureAwait(false)
        public async Task<string> GetDataAsync()
        {
            var response = await _httpClient.GetAsync(url).ConfigureAwait(false);
            var content = await response.Content.ReadAsStringAsync().ConfigureAwait(false);
            return content;
        }

        // ✅ В middleware — обычно не нужно (нужен HttpContext)
        public async Task InvokeAsync(HttpContext context)
        {
            var data = await _service.GetDataAsync();
            // ConfigureAwait(false) МОЖЕТ сломать HttpContext
            await context.Response.WriteAsync(data);
        }

Где не нужно

// ❌ В UI коде — нужно обновлять UI
        public async Task LoadDataAsync()
        {
            var data = await _service.GetDataAsync();
            // ConfigureAwait(false) НЕЛЬЗЯ — нужно обновить UI
            textBox.Text = data;
        }

        // ❌ В ASP.NET Core — нет SynchronizationContext
        // ASP.NET Core НЕ использует SynchronizationContext
        // ConfigureAwait(false) не нужен, но и не вредит

ConfigureAwait(false) в .NET 8+

// .NET 8: Task.CompletedTask и другие "завершённые" задачи
        // игнорируют ConfigureAwait — продолжение синхронно

        // .NET 8: Linker может удалить ConfigureAwait(false) в publish
        // если приложение не использует SynchronizationContext

ValueTask vs Task

Когда использовать ValueTask

// ValueTask<T> — struct, может быть allocation-free
        // Используйте когда:
        // - Результат часто доступен синхронно
        // - Метод вызывается очень часто
        // - Важна минимизация аллокаций

        // ПЛОХО — ValueTask с async методом (бессмысленно)
        public async ValueTask<int> BadAsync()  // async создаёт Task, не ValueTask!
        {
            await Task.Delay(100);
            return 42;
        }

        // ХОРОШО — синхронный результат возможен
        public ValueTask<int> ReadAsync()
        {
            if (_buffer.HasData)
            {
                return new ValueTask<int>(_buffer.Read());  // Синхронно, без аллокации
            }
            return new ValueTask<int>(ReadFromDiskAsync());  // Асинхронно, аллокация
        }

        // ХОРОШО — I/O с кэшем
        public ValueTask<byte[]> GetAsync(string key)
        {
            if (_cache.TryGetValue(key, out var data))
            {
                return new ValueTask<byte[]>(data);  // Cache hit — без аллокации
            }
            return new ValueTask<byte[]>(FetchFromStorageAsync(key));
        }

ValueTask ограничения

// ⚠️ ValueTask НЕЛЬЗЯ await'ить несколько раз
        var vt = GetValueTask();
        await vt;
        await vt;  // ОШИБКА! ValueTask можно await только один раз

        // ⚠️ ValueTask НЕЛЬЗЯ хранить для later await
        var vt = GetValueTask();
        await Task.Delay(100);
        await vt;  // ОШИБКА! ValueTask может быть уже использован

        // ✅ Task можно await'ить много раз
        var task = GetTask();
        await task;
        await task;  // OK

        // ✅ Если нужно multiple await — используйте AsTask()
        var vt = GetValueTask();
        var task = vt.AsTask();
        await task;
        await task;  // OK

Benchmark: Task vs ValueTask

1,000,000 вызовов (sync result):
        Task<int>:         ~50ms, 32MB аллокаций
        ValueTask<int>:    ~5ms,  0MB аллокаций

        1,000,000 вызовов (async result):
        Task<int>:         ~100ms, 32MB аллокаций
        ValueTask<int>:    ~110ms, 32MB аллокаций (такой же overhead)

IAsyncDisposable

Паттерн async disposal

// IAsyncDisposable — для async cleanup
        public class AsyncConnection : IAsyncDisposable
        {
            private readonly HttpClient _client = new();

            public async Task<string> GetDataAsync()
            {
                return await _client.GetStringAsync("https://api.example.com");
            }

            public async ValueTask DisposeAsync()
            {
                // Async cleanup — например, graceful shutdown
                await _client.GetAsync("https://api.example.com/logout");
                _client.Dispose();
            }
        }

        // Использование с await using
        await using var conn = new AsyncConnection();
        var data = await conn.GetDataAsync();
        // DisposeAsync вызывается автоматически

        // В scope
        await using (var conn = new AsyncConnection())
        {
            var data = await conn.GetDataAsync();
        }

AsyncEnumerator с IAsyncDisposable

public class AsyncResourceReader : IAsyncEnumerable<string>, IAsyncDisposable
        {
            private readonly FileStream _stream;

            public AsyncResourceReader(string path)
            {
                _stream = new FileStream(path, FileMode.Open, FileAccess.Read);
            }

            public async IAsyncEnumerator<string> GetAsyncEnumerator(CancellationToken ct = default)
            {
                using var reader = new StreamReader(_stream);
                string? line;
                while ((line = await reader.ReadLineAsync()) != null)
                {
                    yield return line;
                }
            }

            public ValueTask DisposeAsync()
            {
                return _stream.DisposeAsync();
            }
        }

IAsyncEnumerable и IAsyncEnumerator

Базовое использование

// Async stream — async генератор значений
        public async IAsyncEnumerable<int> GetNumbersAsync(int count)
        {
            for (int i = 0; i < count; i++)
            {
                await Task.Delay(100);  // Имитация async операции
                yield return i;
            }
        }

        // Потребление
        await foreach (var number in GetNumbersAsync(10))
        {
            Console.WriteLine(number);
        }

        // С CancellationToken
        await foreach (var number in GetNumbersAsync(10).WithCancellation(ct))
        {
            Console.WriteLine(number);
        }

        // С ConfigureAwait
        await foreach (var number in GetNumbersAsync(10).ConfigureAwait(false))
        {
            Console.WriteLine(number);
        }

Реализация IAsyncEnumerable

public class AsyncFileReader : IAsyncEnumerable<string>
        {
            private readonly string _path;

            public AsyncFileReader(string path) => _path = path;

            public async IAsyncEnumerator<string> GetAsyncEnumerator(CancellationToken ct = default)
            {
                await using var stream = new FileStream(_path, FileMode.Open, FileAccess.Read);
                using var reader = new StreamReader(stream);

                string? line;
                while ((line = await reader.ReadLineAsync().WaitAsync(ct)) != null)
                {
                    ct.ThrowIfCancellationRequested();
                    yield return line;
                }
            }
        }

LINQ с IAsyncEnumerable

// System.Linq.Async (NuGet: System.Interactive.Async)
        var numbers = GetNumbersAsync(100);

        var result = await numbers
            .Where(n => n % 2 == 0)
            .Select(n => n * 10)
            .ToListAsync();

        // Или вручную
        public static async IAsyncEnumerable<T> WhereAsync<T>(
            this IAsyncEnumerable<T> source,
            Func<T, bool> predicate)
        {
            await foreach (var item in source)
            {
                if (predicate(item))
                    yield return item;
            }
        }

Практика

Задание 1: Async Pipeline с Channel и Backpressure

// Pipeline: Source → Transform → Filter → Sink
        public class AsyncPipeline<TInput, TOutput>
        {
            private readonly Channel<TInput> _inputChannel;
            private readonly Channel<TOutput> _outputChannel;
            private readonly Func<TInput, Task<TOutput>> _transform;
            private readonly Func<TOutput, bool> _filter;

            public AsyncPipeline(
                int capacity,
                Func<TInput, Task<TOutput>> transform,
                Func<TOutput, bool> filter)
            {
                _inputChannel = Channel.CreateBounded<TInput>(capacity);
                _outputChannel = Channel.CreateBounded<TOutput>(capacity);
                _transform = transform;
                _filter = filter;
            }

            public async Task StartAsync(int parallelism, CancellationToken ct)
            {
                // Запустите parallelism workers
                // Каждый: читает из _inputChannel, transform, filter, пишет в _outputChannel
            }

            public async Task EnqueueAsync(TInput item, CancellationToken ct)
            {
                await _inputChannel.Writer.WriteAsync(item, ct);
            }

            public IAsyncEnumerable<TOutput> Results()
            {
                return _outputChannel.Reader.ReadAllAsync();
            }

            public void Complete()
            {
                _inputChannel.Writer.Complete();
            }
        }

Задание 2: Custom Awaiter

// Custom awaiter для non-task async operations
        public struct DelayAwaiter : ICriticalNotifyCompletion
        {
            private readonly TimeSpan _delay;
            private readonly Timer _timer;
            private Action? _continuation;

            public DelayAwaiter(TimeSpan delay)
            {
                _delay = delay;
                _timer = new Timer(OnTimerCallback);
            }

            public bool IsCompleted => false;

            public void OnCompleted(Action continuation)
            {
                _continuation = continuation;
                _timer.Change(_delay, Timeout.InfiniteTimeSpan);
            }

            public void UnsafeOnCompleted(Action continuation)
            {
                OnCompleted(continuation);
            }

            public void GetResult() { }

            private void OnTimerCallback(object? state)
            {
                _continuation?.Invoke();
            }
        }

        // Extension method
        public static class DelayExtensions
        {
            public static DelayAwaiter GetAwaiter(this TimeSpan delay)
            {
                return new DelayAwaiter(delay);
            }
        }

        // Использование
        await TimeSpan.FromSeconds(1);  // Custom awaiter!

Задание 3: Streaming API Client

public class StreamingApiClient : IAsyncDisposable
        {
            private readonly HttpClient _client;

            public StreamingApiClient(HttpClient client)
            {
                _client = client;
            }

            public async IAsyncEnumerable<HttpResponseMessage> StreamAsync(
                IEnumerable<string> urls,
                [EnumeratorCancellation] CancellationToken ct = default)
            {
                foreach (var url in urls)
                {
                    ct.ThrowIfCancellationRequested();
                    var response = await _client.GetAsync(url, ct);
                    yield return response;
                }
            }

            public async IAsyncEnumerable<string> StreamLinesAsync(
                string url,
                [EnumeratorCancellation] CancellationToken ct = default)
            {
                using var response = await _client.GetAsync(url, HttpCompletionOption.ResponseHeadersRead, ct);
                using var stream = await response.Content.ReadAsStreamAsync(ct);
                using var reader = new StreamReader(stream);

                string? line;
                while ((line = await reader.ReadLineAsync()) != null)
                {
                    ct.ThrowIfCancellationRequested();
                    yield return line;
                }
            }

            public ValueTask DisposeAsync()
            {
                _client.Dispose();
                return default;
            }
        }

Шпаргалка: Async Best Practices

ПравилоОписание
Async all the wayНе блокируйте async код через .Result/.Wait()
ConfigureAwait(false)В библиотечном коде всегда
ValueTaskКогда результат часто синхронный
IAsyncEnumerableДля streaming данных
IAsyncDisposableДля async cleanup ресурсов
CancellationTokenВсегда передавайте в async методы
Task.WhenAllДля параллельного выполнения
Avoid async voidТолько для event handlers

Parallel Programming

Parallel.ForEach и Parallel.For

Базовое использование

// Parallel.For — параллельный цикл
        Parallel.For(0, 100, i =>
        {
            Console.WriteLine($"Processing {i} on thread {Thread.CurrentThread.ManagedThreadId}");
        });

        // Parallel.ForEach — параллельная обработка коллекции
        var items = Enumerable.Range(0, 1000);
        Parallel.ForEach(items, item =>
        {
            Process(item);
        });

        // С ParallelOptions
        Parallel.ForEach(items, new ParallelOptions
        {
            MaxDegreeOfParallelism = 4,
            CancellationToken = ct
        }, item =>
        {
            Process(item);
        });

Partitioning

// Partitioner — контроль над разбиением данных
        var items = Enumerable.Range(0, 1000000);

        // Range partitioning — для равномерных workload
        Parallel.ForEach(Partitioner.Create(0, items.Count()), range =>
        {
            for (int i = range.Item1; i < range.Item2; i++)
            {
                Process(items.ElementAt(i));
            }
        });

        // Chunk partitioning — для неравномерных workload
        Parallel.ForEach(Partitioner.Create(items, EnumerablePartitionerOptions.NoBuffering), item =>
        {
            Process(item);
        });

        // Custom partitioner
        var partitioner = Partitioner.Create(items, true);  // load balancing
        Parallel.ForEach(partitioner, item => Process(item));

Degree of Parallelism

// MaxDegreeOfParallelism — ограничение параллелизма
        var options = new ParallelOptions
        {
            MaxDegreeOfParallelism = Environment.ProcessorCount  // По умолчанию
            // MaxDegreeOfParallelism = 4  // Фиксированное
            // MaxDegreeOfParallelism = -1 // Без ограничений
        };

        // Когда ограничивать:
        // - I/O bound операции (не используйте все ядра)
        // - Shared resource (БД, external API)
        // - Memory pressure (больше потоков = больше памяти)

PLINQ (AsParallel)

Базовое использование

// AsParallel — параллельный LINQ
        var result = Enumerable.Range(0, 1000000)
            .AsParallel()
            .Where(n => IsPrime(n))
            .ToList();

        // С сохранением порядка
        var ordered = Enumerable.Range(0, 1000000)
            .AsParallel()
            .AsOrdered()  // Сохраняет порядок (медленнее)
            .Where(n => IsPrime(n))
            .ToList();

        // С ограничением параллелизма
        var limited = Enumerable.Range(0, 1000000)
            .AsParallel()
            .WithDegreeOfParallelism(4)
            .Where(n => IsPrime(n))
            .ToList();

Когда использовать PLINQ

Используйте PLINQ когда:
        ✅ CPU-bound операции
        ✅ Большие коллекции (> 10000 элементов)
        ✅ Простые трансформации (Where, Select, Aggregate)
        ✅ Нет side effects

        НЕ используйте PLINQ когда:
        ❌ I/O bound операции (используйте async/await)
        ❌ Маленькие коллекции (overhead > benefit)
        ❌ Нужен порядок и порядок важен (AsOrdered замедляет)
        ❌ Сложные операции с состоянием

PLINQ vs Parallel.ForEach

ХарактеристикаPLINQParallel.ForEach
APIDeclarative (LINQ)Imperative
ComposabilityВысокаяНизкая
Order preservationAsOrdered()Нет
Early exitTakeWhileStop()
PerformanceЧуть медленнееЧуть быстрее

Task.WhenAll, Task.WhenAny, Task.WhenEach

Task.WhenAll

// WhenAll — ждать все задачи
        var tasks = urls.Select(url => httpClient.GetStringAsync(url));
        string[] results = await Task.WhenAll(tasks);

        // С обработкой ошибок
        var tasks = new[]
        {
            Task1Async(),
            Task2Async(),
            Task3Async()
        };

        try
        {
            await Task.WhenAll(tasks);
        }
        catch (Exception ex)
        {
            // WhenAll бросает первую ошибку
            // Все ошибки доступны в tasks[].Exception
            foreach (var task in tasks.Where(t => t.IsFaulted))
            {
                Console.WriteLine(task.Exception?.InnerException);
            }
        }

Task.WhenAny

// WhenAny — ждать первую завершённую задачу
        var tasks = new[]
        {
            httpClient.GetStringAsync(url1),
            httpClient.GetStringAsync(url2),
            httpClient.GetStringAsync(url3)
        };

        Task<string> first = await Task.WhenAny(tasks);
        string result = await first;  // Получить результат

        // Timeout pattern
        var task = SlowOperationAsync();
        var timeout = Task.Delay(TimeSpan.FromSeconds(5));

        var completed = await Task.WhenAny(task, timeout);
        if (completed == timeout)
        {
            throw new TimeoutException();
        }

Task.WhenEach (.NET 6+)

// WhenEach — yield results as they complete
        var tasks = urls.Select(url => httpClient.GetStringAsync(url));

        await foreach (var result in Task.WhenEach(tasks))
        {
            Console.WriteLine($"Completed: {result.Result.Length} chars");
        }

        // Когда использовать WhenEach:
        // - Нужна обработка результатов по мере готовности
        // - Не нужно ждать все задачи
        // - Streaming processing

Cancellation Patterns

CancellationToken Basics

// Создание CancellationToken
        var cts = new CancellationTokenSource();
        CancellationToken token = cts.Token;

        // Отмена
        cts.Cancel();           // Синхронная отмена
        cts.CancelAfter(5000);  // Отмена через 5 секунд
        await cts.CancelAsync(); // Асинхронная отмена (вызывает registered callbacks async)

        // Linked token
        var cts1 = new CancellationTokenSource();
        var cts2 = new CancellationTokenSource();
        var linkedCts = CancellationTokenSource.CreateLinkedTokenSource(cts1.Token, cts2.Token);
        // linkedCts.Cancel() если любой из tokens отменён

Propagation

// Передавайте CancellationToken через весь call chain
        public async Task ProcessAsync(CancellationToken ct = default)
        {
            await FetchDataAsync(ct);
            await TransformDataAsync(ct);
            await SaveDataAsync(ct);
        }

        private async Task<string> FetchDataAsync(CancellationToken ct)
        {
            // HttpClient поддерживает cancellation
            var response = await _httpClient.GetAsync(url, ct);
            return await response.Content.ReadAsStringAsync(ct);
        }

        // Проверка cancellation
        public async Task LongRunningAsync(CancellationToken ct)
        {
            for (int i = 0; i < 1000; i++)
            {
                ct.ThrowIfCancellationRequested();  // Проверка в цикле
                await ProcessItemAsync(i, ct);
            }
        }

Cleanup при отмене

// finally блок выполняется при отмене
        public async Task ProcessAsync(CancellationToken ct)
        {
            var resource = await AcquireResourceAsync(ct);
            try
            {
                await UseResourceAsync(resource, ct);
            }
            finally
            {
                // Cleanup всегда выполняется
                await resource.ReleaseAsync();
            }
        }

        // Register callback на отмену
        public async Task ProcessAsync(CancellationToken ct)
        {
            using var registration = ct.Register(() =>
            {
                Console.WriteLine("Отмена запрошена!");
                // Cleanup logic
            });

            await LongOperationAsync(ct);
        }

Cancellation с Task.WhenAll

// Когда одна задача отменена — отменить остальные
        public async Task ProcessAllAsync(CancellationToken ct)
        {
            var tasks = new[]
            {
                Task1Async(ct),
                Task2Async(ct),
                Task3Async(ct)
            };

            try
            {
                await Task.WhenAll(tasks);
            }
            catch (OperationCanceledException)
            {
                // Одна задача отменена — ждать остальные для cleanup
                await Task.WhenAll(tasks.Where(t => !t.IsCanceled));
                throw;
            }
        }

Практика

Задание 1: Parallel Data Processor с Dynamic Partitioning

public class ParallelDataProcessor<TInput, TOutput>
        {
            private readonly Func<TInput, TOutput> _process;
            private readonly int _batchSize;

            public ParallelDataProcessor(Func<TInput, TOutput> process, int batchSize = 100)
            {
                _process = process;
                _batchSize = batchSize;
            }

            public IReadOnlyList<TOutput> Process(IReadOnlyList<TInput> inputs, int maxDegreeOfParallelism = -1)
            {
                var outputs = new TOutput[inputs.Count];

                // Реализуйте:
                // 1. Partition inputs на batches
                // 2. Parallel.ForEach с dynamic partitioning
                // 3. Aggregate results
                // 4. Поддержка CancellationToken

                return outputs;
            }

            public async Task<IReadOnlyList<TOutput>> ProcessAsync(
                IReadOnlyList<TInput> inputs,
                int maxDegreeOfParallelism = -1,
                CancellationToken ct = default)
            {
                // Async версия с Parallel.ForEachAsync (.NET 6+)
            }
        }

Задание 2: Fault-Tolerant Batch Processor

public class FaultTolerantBatchProcessor<T>
        {
            private readonly int _maxRetries;
            private readonly TimeSpan _retryDelay;
            private readonly CircuitBreaker _circuitBreaker;

            public FaultTolerantBatchProcessor(int maxRetries, TimeSpan retryDelay)
            {
                _maxRetries = maxRetries;
                _retryDelay = retryDelay;
                _circuitBreaker = new CircuitBreaker(5, TimeSpan.FromSeconds(30));
            }

            public async Task<IReadOnlyList<Result<T>>> ProcessBatchAsync(
                IReadOnlyList<T> items,
                Func<T, Task<Result<T>>> processor,
                CancellationToken ct = default)
            {
                // Реализуйте:
                // 1. Разбейте на batches
                // 2. Parallel.ForEachAsync с ограничением параллелизма
                // 3. Retry logic с exponential backoff
                // 4. Circuit breaker — если много ошибок, stop processing
                // 5. Cancellation propagation
            }
        }

        public class CircuitBreaker
        {
            private readonly int _failureThreshold;
            private readonly TimeSpan _resetTimeout;
            private int _failures;
            private DateTime? _lastFailure;
            private CircuitState _state = CircuitState.Closed;

            public enum CircuitState { Closed, Open, HalfOpen }

            public CircuitBreaker(int failureThreshold, TimeSpan resetTimeout)
            {
                _failureThreshold = failureThreshold;
                _resetTimeout = resetTimeout;
            }

            public async Task<T> ExecuteAsync<T>(Func<Task<T>> action, CancellationToken ct)
            {
                // Реализуйте circuit breaker pattern
            }
        }

Задание 3: Concurrent HTTP Client

public class ConcurrentHttpClient : IAsyncDisposable
        {
            private readonly HttpClient _httpClient;
            private readonly SemaphoreSlim _semaphore;
            private readonly int _maxConcurrentRequests;

            public ConcurrentHttpClient(int maxConcurrentRequests)
            {
                _httpClient = new HttpClient();
                _maxConcurrentRequests = maxConcurrentRequests;
                _semaphore = new SemaphoreSlim(maxConcurrentRequests);
            }

            public async Task<HttpResponseMessage> GetAsync(
                string url,
                CancellationToken ct = default)
            {
                await _semaphore.WaitAsync(ct);
                try
                {
                    return await _httpClient.GetAsync(url, ct);
                }
                finally
                {
                    _semaphore.Release();
                }
            }

            public async Task<IReadOnlyList<HttpResponseMessage>> GetManyAsync(
                IEnumerable<string> urls,
                CancellationToken ct = default)
            {
                // Реализуйте:
                // 1. Task.WhenAll с ограничением через _semaphore
                // 2. Cancellation propagation
                // 3. Error handling — не fail fast, collect errors
            }

            public ValueTask DisposeAsync()
            {
                _httpClient.Dispose();
                _semaphore.Dispose();
                return default;
            }
        }

Шпаргалка: Parallel vs Async

СценарийПодход
CPU-bound, много данныхParallel.ForEach
CPU-bound, LINQ pipelinePLINQ
I/O-bound, много запросовasync/await + Task.WhenAll
I/O-bound, streamingIAsyncEnumerable
Mixed CPU + I/OParallel.ForEachAsync
Need results as they completeTask.WhenEach
TimeoutTask.WhenAny(task, Task.Delay(timeout))
CancellationCancellationToken через весь chain

Advanced Concurrency Patterns

Actor Model

Концепции Actor Model

Actor — фундаментальная единица вычислений:
        1. Каждый actor имеет уникальный адрес
        2. Actors общаются только через messages
        3. Actors обрабатывают messages последовательно
        4. Actors могут создавать других actors
        5. Actors могут менять своё состояние

        Преимущества:
        - Нет shared state — нет race conditions
        - Естественная распределённость
        - Fault isolation — crash одного actor не влияет на других

Orleans концепции

// Orleans — virtual actor model от Microsoft
        // Grain = actor, Silo = host

        // Grain interface
        public interface IUserGrain : IGrainWithIntegerKey
        {
            Task<string> GetNameAsync();
            Task SetNameAsync(string name);
            Task<int> GetLoginCountAsync();
        }

        // Grain implementation
        public class UserGrain : Grain, IUserGrain
        {
            private string _name = "";
            private int _loginCount;

            public Task<string> GetNameAsync() => Task.FromResult(_name);

            public Task SetNameAsync(string name)
            {
                _name = name;
                return Task.CompletedTask;
            }

            public Task<int> GetLoginCountAsync() => Task.FromResult(_loginCount);

            public override Task OnActivateAsync(CancellationToken ct)
            {
                // Вызывается при активации grain
                return base.OnActivateAsync(ct);
            }
        }

        // Использование
        var grain = grainFactory.GetGrain<IUserGrain>(userId);
        await grain.SetNameAsync("John");
        var name = await grain.GetNameAsync();

Akka.NET концепции

// Akka.NET — actor model для .NET

        // Actor definition
        public class WorkerActor : ReceiveActor
        {
            public WorkerActor()
            {
                Receive<ProcessMessage>(msg =>
                {
                    // Обработка сообщения
                    Sender.Tell(new Result(msg.Data));
                });

                Receive<StopMessage>(msg =>
                {
                    Context.Stop(Self);
                });
            }
        }

        // Actor system
        var system = ActorSystem.Create("MySystem");
        var worker = system.ActorOf(Props.Create<WorkerActor>(), "worker");

        // Send message
        worker.Tell(new ProcessMessage("data"));

        // Ask pattern (request-response)
        var result = await worker.Ask<Result>(new ProcessMessage("data"));

Simple Actor System

// Простая реализация actor system
        public class ActorSystem
        {
            private readonly Channel<ActorMessage> _mailbox = Channel.CreateUnbounded<ActorMessage>();
            private readonly Dictionary<string, Actor> _actors = new();
            private readonly CancellationTokenSource _cts = new();

            public ActorRef CreateActor(string name, Func<CancellationToken, Task> behavior)
            {
                var actor = new Actor(name, behavior, _mailbox.Writer);
                _actors[name] = actor;
                return actor.Ref;
            }

            public async Task StartAsync()
            {
                await foreach (var msg in _mailbox.Reader.ReadAllAsync(_cts.Token))
                {
                    await msg.Handler(_cts.Token);
                }
            }

            public async Task StopAsync()
            {
                _cts.Cancel();
                _mailbox.Writer.Complete();
            }
        }

        public record ActorMessage(string Target, Func<CancellationToken, Task> Handler);

        public class Actor
        {
            public ActorRef Ref { get; }

            public Actor(string name, Func<CancellationToken, Task> behavior, ChannelWriter<ActorMessage> writer)
            {
                Ref = new ActorRef(name, writer);
                _ = RunAsync(behavior);
            }

            private async Task RunAsync(Func<CancellationToken, Task> behavior)
            {
                while (true)
                {
                    try
                    {
                        await behavior(CancellationToken.None);
                    }
                    catch (OperationCanceledException)
                    {
                        break;
                    }
                }
            }
        }

        public class ActorRef
        {
            private readonly string _name;
            private readonly ChannelWriter<ActorMessage> _writer;

            public ActorRef(string name, ChannelWriter<ActorMessage> writer)
            {
                _name = name;
                _writer = writer;
            }

            public async Task TellAsync(Func<CancellationToken, Task> handler, CancellationToken ct = default)
            {
                await _writer.WriteAsync(new ActorMessage(_name, handler), ct);
            }
        }

Immutable Data + Structural Sharing

Immutable Collections

// System.Collections.Immutable (NuGet)

        // ImmutableList
        var list1 = ImmutableList<int>.Empty.Add(1).Add(2).Add(3);
        var list2 = list1.Add(4);  // Новый список, list1 не изменён

        // ImmutableDictionary
        var dict1 = ImmutableDictionary<string, int>.Empty
            .Add("a", 1)
            .Add("b", 2);
        var dict2 = dict1.SetItem("a", 10);  // Новый dict

        // ImmutableHashSet
        var set1 = ImmutableHashSet<int>.Empty.Add(1).Add(2);
        var set2 = set1.Add(3);

        // ImmutableSortedDictionary — ordered
        var sorted = ImmutableSortedDictionary<int, string>.Empty
            .Add(3, "three")
            .Add(1, "one")
            .Add(2, "two");

Structural Sharing

Immutable коллекции используют structural sharing:
        - Новые версии разделяют данные со старыми
        - Изменяется только путь от корня до изменённого узла
        - O(log N) для update вместо O(N)

        Пример для ImmutableList (AVL tree):
            Root          Root'
            /  \          /   \
           A    B   →    A'    B
          / \            / \
         C   D          C   D'

        Только путь до D' создан заново, A, B, C shared

Persistent Data Structures

// Пример persistent data structure
        public class PersistentList<T>
        {
            private readonly Node _root;
            private readonly int _count;

            private sealed record Node(T Value, Node? Left, Node? Right, int Height);

            private PersistentList(Node root, int count)
            {
                _root = root;
                _count = count;
            }

            public static PersistentList<T> Empty => new(null!, 0);

            public PersistentList<T> Add(T value)
            {
                var newNode = new Node(value, null, null, 1);
                var newRoot = Insert(_root, newNode, 0);
                return new PersistentList<T>(newRoot, _count + 1);
            }

            private Node Insert(Node? node, Node newNode, int index)
            {
                if (node == null) return newNode;

                var mid = Count(node.Left) + 1;
                if (index < mid)
                {
                    return node with { Left = Insert(node.Left, newNode, index) };
                }
                else
                {
                    return node with { Right = Insert(node.Right, newNode, index - mid) };
                }
            }

            private int Count(Node? node) => node == null ? 0 : 1 + Count(node.Left) + Count(node.Right);
        }

Thread Safety без Locks

// Immutable данные thread-safe по определению
        public class ImmutableCache<TKey, TValue>
        {
            private ImmutableDictionary<TKey, TValue> _data = ImmutableDictionary<TKey, TValue>.Empty;

            public void Add(TKey key, TValue value)
            {
                // CAS loop — нет lock
                ImmutableDictionary<TKey, TValue> original, updated;
                do
                {
                    original = _data;
                    updated = original.Add(key, value);
                }
                while (Interlocked.CompareExchange(ref _data, updated, original) != original);
            }

            public bool TryGetValue(TKey key, out TValue value)
            {
                return _data.TryGetValue(key, out value);
            }
        }

Lock-Free Algorithms

MPMC Queue

// Lock-free MPMC queue (упрощённая версия)
        public class LockFreeQueue<T> where T : class
        {
            private sealed class Node
            {
                public T? Value;
                public Node? Next;
            }

            private Node _head = new();
            private Node _tail = _head;

            public void Enqueue(T value)
            {
                var node = new Node { Value = value };
                Node? last;

                while (true)
                {
                    last = Volatile.Read(ref _tail);
                    var next = Volatile.Read(ref last.Next);

                    if (last == Volatile.Read(ref _tail))
                    {
                        if (next == null)
                        {
                            // CAS: если last.Next == null, установить node
                            if (Interlocked.CompareExchange(ref last.Next, node, null) == null)
                                break;
                        }
                        else
                        {
                            // Помочь другому потоку продвинуть tail
                            Interlocked.CompareExchange(ref _tail, next, last);
                        }
                    }
                }

                // Продвинуть tail
                Interlocked.CompareExchange(ref _tail, node, last);
            }

            public bool TryDequeue(out T? value)
            {
                Node? first;

                while (true)
                {
                    first = Volatile.Read(ref _head);
                    var last = Volatile.Read(ref _tail);
                    var next = Volatile.Read(ref first.Next);

                    if (first == Volatile.Read(ref _head))
                    {
                        if (first == last)
                        {
                            if (next == null)
                            {
                                value = default;
                                return false;  // Queue empty
                            }
                            Interlocked.CompareExchange(ref _tail, next, first);
                        }
                        else
                        {
                            if (next is { } n)
                            {
                                value = n.Value;
                                if (Interlocked.CompareExchange(ref _head, n, first) == first)
                                    return true;
                            }
                        }
                    }
                }
            }
        }

Lock-Free Stack

// Lock-free stack через Interlocked.CompareExchange
        public class LockFreeStack<T>
        {
            private sealed class Node
            {
                public T Value;
                public Node? Next;

                public Node(T value) => Value = value;
            }

            private Node? _head;

            public void Push(T value)
            {
                var newNode = new Node(value);
                Node? oldHead;

                do
                {
                    oldHead = _head;
                    newNode.Next = oldHead;
                }
                while (Interlocked.CompareExchange(ref _head, newNode, oldHead) != oldHead);
            }

            public bool TryPop(out T value)
            {
                Node? oldHead;
                Node? newHead;

                do
                {
                    oldHead = _head;
                    if (oldHead == null)
                    {
                        value = default!;
                        return false;
                    }
                    newHead = oldHead.Next;
                }
                while (Interlocked.CompareExchange(ref _head, newHead, oldHead) != oldHead);

                value = oldHead.Value;
                return true;
            }
        }

ABA Problem

ABA Problem в lock-free алгоритмах:
        1. Thread 1 читает head = A
        2. Thread 2 pop A, pop B, push A (head снова A!)
        3. Thread 1 CAS: head == A, заменить на newHead
        4. PROBLEM: head A — другой объект!

        Решения:
        1. Tagged pointers — добавить version к pointer
        2. Hazard pointers — mark nodes in use
        3. Epoch-based reclamation — defer deletion

Double-Checked Locking Singleton

Pattern в .NET

// Double-checked locking — thread-safe lazy initialization
        public class Singleton
        {
            private static Singleton? _instance;
            private static readonly object _lock = new();

            public static Singleton Instance
            {
                get
                {
                    if (_instance == null)  // First check (без lock)
                    {
                        lock (_lock)
                        {
                            if (_instance == null)  // Second check (в lock)
                            {
                                _instance = new Singleton();
                            }
                        }
                    }
                    return _instance;
                }
            }
        }

        // ВАЖНО: В .NET это работает без volatile благодаря memory model
        // Но для portability лучше использовать volatile:
        private static volatile Singleton? _instance;

Memory Barriers в DCL

// Почему DCL работает в .NET:
        // 1. .NET memory model сильнее чем Java/C++
        // 2. Запись в _instance происходит до выхода из lock
        // 3. Lock release — acquire memory barrier
        // 4. Следующий read видит записанное значение

        // Для других платформ:
        private static volatile Singleton? _instance;

        // Или через Interlocked:
        public static Singleton Instance =>
            Volatile.Read(ref _instance) ?? CreateInstance();

        private static Singleton CreateInstance()
        {
            var instance = new Singleton();
            var original = Interlocked.CompareExchange(ref _instance, instance, null);
            return original ?? instance;
        }

Lazy — встроенный singleton

// Lazy<T> — thread-safe lazy initialization из коробки
        public class Singleton
        {
            private static readonly Lazy<Singleton> _instance =
                new Lazy<Singleton>(() => new Singleton(), LazyThreadSafetyMode.ExecutionAndPublication);

            public static Singleton Instance => _instance.Value;
        }

        // LazyThreadSafetyMode:
        // ExecutionAndPublication — lock вокруг factory (по умолчанию)
        // PublicationOnly — multiple factory calls, first wins
        // None — no thread safety

Практика

Задание 1: Lock-Free Stack

// Реализуйте lock-free stack через Interlocked.CompareExchange
        // Тесты:
        // 1. Multiple pushers, multiple poppers
        // 2. Проверьте correctness (все pushed items popped)
        // 3. Benchmark vs ConcurrentStack

        public class LockFreeStack<T>
        {
            // См. реализацию выше
            // Добавьте:
            // - Count через Interlocked
            // - TryPop с timeout
            // - Clear через CAS loop
        }

Задание 2: Immutable Collection с Structural Sharing

// Persistent Vector (HAMT — Hash Array Mapped Trie)
        public class PersistentVector<T>
        {
            private const int BranchingFactor = 32;
            private readonly Node? _root;
            private readonly int _count;

            private sealed record Node(T[] Items, Node?[] Children);

            public static PersistentVector<T> Empty => new(null, 0);

            public PersistentVector<T> Add(T value)
            {
                // Реализуйте:
                // 1. Вычислите путь в trie
                // 2. Скопируйте путь от корня до листа
                // 3. Остальные узлы shared
            }

            public T this[int index]
            {
                get
                {
                    // O(log32 N) — очень быстро
                }
            }

            public PersistentVector<T> Set(int index, T value)
            {
                // Persistent update — возвращает новый vector
            }

            public int Count => _count;
        }

Задание 3: Simple Actor System

public class SimpleActorSystem
        {
            private readonly Dictionary<string, Actor> _actors = new();
            private readonly Channel<Message> _deadLetterQueue = Channel.CreateUnbounded<Message>();

            public ActorRef Spawn<T>(string name, Func<ActorContext, T> factory)
                where T : class, IActor
            {
                // Создайте actor и верните ref
            }

            public async Task ShutdownAsync()
            {
                // Graceful shutdown всех actors
            }
        }

        public interface IActor
        {
            Task ReceiveAsync(Message message, ActorContext context);
        }

        public record Message(object Payload, ActorRef Sender);

        public class ActorRef
        {
            public Task TellAsync(object payload, ActorRef? sender = null)
            {
                // Отправить message
            }

            public Task<T> AskAsync<T>(object payload, TimeSpan? timeout = null)
            {
                // Request-response pattern
            }
        }

        public class ActorContext
        {
            public ActorRef Self { get; }
            public ActorRef Sender { get; }

            public void Become(IActor newBehavior) { }
            public void Stop() { }
            public ActorRef Spawn<T>(string name, Func<ActorContext, T> factory) where T : class, IActor { }
        }

Шпаргалка: Concurrency Patterns

PatternКогда использовать
Actor ModelИзолированное состояние, message passing
Immutable DataRead-heavy, functional style
Lock-FreeUltra-low latency, high contention
Double-Checked LockingLazy singleton initialization
CAS LoopLock-free updates
Structural SharingPersistent data structures

Distributed Concurrency

Distributed Locks

Redis Distributed Lock

// Redis SET NX EX — атомарное получение lock
        public class RedisDistributedLock : IDisposable
        {
            private readonly IDatabase _redis;
            private readonly string _resource;
            private readonly string _lockValue;
            private readonly TimeSpan _expiry;

            public RedisDistributedLock(IDatabase redis, string resource, TimeSpan expiry)
            {
                _redis = redis;
                _resource = $"lock:{resource}";
                _lockValue = Guid.NewGuid().ToString();
                _expiry = expiry;
            }

            public async Task<bool> AcquireAsync(CancellationToken ct = default)
            {
                // SET key value NX EX timeout — атомарно
                var acquired = await _redis.StringSetAsync(
                    _resource,
                    _lockValue,
                    _expiry,
                    When.NotExists);

                return acquired;
            }

            public async Task<bool> ReleaseAsync()
            {
                // Lua script — атомарная проверка и удаление
                var script = @"
                    if redis.call('get', KEYS[1]) == ARGV[1] then
                        return redis.call('del', KEYS[1])
                    else
                        return 0
                    end";

                var result = await _redis.ScriptEvaluateAsync(script,
                    new RedisKey[] { _resource },
                    new RedisValue[] { _lockValue });

                return (long)result == 1;
            }

            public void Dispose()
            {
                ReleaseAsync().GetAwaiter().GetResult();
            }
        }

        // Использование
        using var scope = new RedisDistributedLock(redis, "my-resource", TimeSpan.FromSeconds(30));
        if (await scope.AcquireAsync())
        {
            // Критическая секция
            await DoWorkAsync();
        }

Redlock Algorithm

// Redlock — distributed lock с multiple Redis instances
        // Защита от split-brain

        public class Redlock
        {
            private readonly IDatabase[] _instances;
            private readonly TimeSpan _quorumTimeout;

            public Redlock(IDatabase[] instances)
            {
                _instances = instances;
                _quorumTimeout = TimeSpan.FromMilliseconds(50);
            }

            public async Task<RedlockResult> LockAsync(string resource, TimeSpan ttl)
            {
                var startTime = DateTime.UtcNow;
                var lockValue = Guid.NewGuid().ToString();
                int acquired = 0;

                // Попытка получить lock на всех instances
                foreach (var instance in _instances)
                {
                    var acquired_instance = await instance.StringSetAsync(
                        $"lock:{resource}", lockValue, ttl, When.NotExists);

                    if (acquired_instance)
                        acquired++;
                }

                var elapsedTime = DateTime.UtcNow - startTime;
                var validTime = ttl - elapsedTime;

                // Quorum: N/2 + 1
                var quorum = _instances.Length / 2 + 1;
                var isValid = acquired >= quorum && validTime > TimeSpan.Zero;

                return new RedlockResult(isValid, lockValue, validTime);
            }
        }

        public record RedlockResult(bool IsValid, string LockValue, TimeSpan ValidTime);

ZooKeeper Distributed Lock

// ZooKeeper — ephemeral sequential nodes
        public class ZooKeeperDistributedLock
        {
            private readonly IZooKeeper _zk;
            private readonly string _path;
            private string? _lockPath;

            public ZooKeeperDistributedLock(IZooKeeper zk, string resource)
            {
                _zk = zk;
                _path = $"/locks/{resource}";
            }

            public async Task<bool> AcquireAsync(CancellationToken ct)
            {
                // Создать ephemeral sequential node
                _lockPath = await _zk.CreateAsync(
                    $"{_path}/lock-",
                    Array.Empty<byte>(),
                    ZooDefs.Ids.OPEN_ACL_UNSAFE,
                    CreateMode.EphemeralSequential);

                // Проверить, что мы первый
                var children = await _zk.GetChildrenAsync(_path);
                var sorted = children.OrderBy(x => x).ToList();

                if (sorted[0] == Path.GetFileName(_lockPath))
                {
                    return true;  // Мы первые — lock получен
                }

                // Ждать пока предыдущий node удалится
                var previous = sorted[sorted.IndexOf(Path.GetFileName(_lockPath)) - 1];
                await _zk.WaitForDeleteAsync($"{_path}/{previous}", ct);
                return true;
            }

            public async Task ReleaseAsync()
            {
                if (_lockPath != null)
                {
                    await _zk.DeleteAsync(_lockPath);
                }
            }
        }

Consensus Algorithms — Raft Basics

Raft Overview

Raft — consensus algorithm для replicated state machine:

        Роли:
        - Leader: принимает все запросы, replicates log
        - Follower: пассивный, отвечает на heartbeats
        - Candidate: временно, для elections

        Terms:
        - Время разделено на terms
        - Каждый term начинается с election
        - Один leader per term

        Log Replication:
        1. Client отправляет command Leader
        2. Leader appends to log
        3. Leader replicates to Followers
        4. Когда majority committed — apply to state machine

Raft Implementation (упрощённая)

public class RaftNode
        {
            private NodeState _state = NodeState.Follower;
            private int _currentTerm;
            private int _votedFor = -1;
            private int _commitIndex;
            private readonly List<LogEntry> _log = new();
            private readonly int _nodeId;
            private readonly int[] _peerIds;

            public enum NodeState { Follower, Candidate, Leader }
            public record LogEntry(int Term, int Index, string Command);

            public RaftNode(int nodeId, int[] peerIds)
            {
                _nodeId = nodeId;
                _peerIds = peerIds;
            }

            public async Task StartElectionAsync()
            {
                _state = NodeState.Candidate;
                _currentTerm++;
                _votedFor = _nodeId;

                // Request votes from peers
                var votes = 1;  // Vote for self
                foreach (var peerId in _peerIds)
                {
                    var response = await RequestVoteAsync(peerId, _currentTerm, _nodeId);
                    if (response.VoteGranted)
                        votes++;
                }

                // Majority wins
                if (votes > (_peerIds.Length + 1) / 2)
                {
                    _state = NodeState.Leader;
                    _ = SendHeartbeatsAsync();
                }
            }

            private async Task<VoteResponse> RequestVoteAsync(int peerId, int term, int candidateId)
            {
                // RPC к peer
                return new VoteResponse(term, true);
            }

            private async Task SendHeartbeatsAsync()
            {
                while (_state == NodeState.Leader)
                {
                    foreach (var peerId in _peerIds)
                    {
                        await AppendEntriesAsync(peerId);
                    }
                    await Task.Delay(150);  // Heartbeat interval
                }
            }

            private async Task AppendEntriesAsync(int peerId)
            {
                // Replicate log entries to follower
            }

            public async Task<string> SubmitCommandAsync(string command)
            {
                if (_state != NodeState.Leader)
                    throw new InvalidOperationException("Not leader");

                var entry = new LogEntry(_currentTerm, _log.Count + 1, command);
                _log.Add(entry);

                // Replicate to followers
                await ReplicateAsync(entry);

                // Когда majority replicated — commit
                _commitIndex = entry.Index;
                return Apply(entry);
            }

            private async Task ReplicateAsync(LogEntry entry)
            {
                foreach (var peerId in _peerIds)
                {
                    await AppendEntriesAsync(peerId, entry);
                }
            }

            private string Apply(LogEntry entry) => $"Applied: {entry.Command}";
        }

        public record VoteResponse(int Term, bool VoteGranted);

Eventual Consistency и CRDTs

Eventual Consistency

Eventual Consistency:
        - Updates propagate asynchronously
        - replicas eventually converge
        - Conflicts possible during propagation

        Conflict Resolution Strategies:
        1. Last Write Wins (LWW) — по timestamp
        2. Vector Clocks — causal ordering
        3. CRDTs — conflict-free replicated data types

CRDTs — G-Counter

// G-Counter (Grow-only Counter) — mergeable counter
        public class GCounter
        {
            private readonly Dictionary<string, int> _counts = new();

            public void Increment(string nodeId, int amount = 1)
            {
                _counts[nodeId] = _counts.GetValueOrDefault(nodeId) + amount;
            }

            public int Value => _counts.Values.Sum();

            // Merge — idempotent, commutative, associative
            public GCounter Merge(GCounter other)
            {
                var result = new GCounter();
                foreach (var key in _counts.Keys.Union(other._counts.Keys))
                {
                    result._counts[key] = Math.Max(
                        _counts.GetValueOrDefault(key, 0),
                        other._counts.GetValueOrDefault(key, 0));
                }
                return result;
            }
        }

        // Использование
        var counter1 = new GCounter();
        counter1.Increment("node-A", 5);

        var counter2 = new GCounter();
        counter2.Increment("node-B", 3);

        // Merge на обоих nodes
        var merged1 = counter1.Merge(counter2);  // 8
        var merged2 = counter2.Merge(counter1);  // 8 — same!

CRDTs — LWW-Register

// Last-Writer-Wins Register
        public class LWWRegister<T>
        {
            private T _value;
            private DateTime _timestamp;
            private readonly string _nodeId;

            public LWWRegister(string nodeId, T initialValue)
            {
                _nodeId = nodeId;
                _value = initialValue;
                _timestamp = DateTime.UtcNow;
            }

            public void Set(T value)
            {
                _value = value;
                _timestamp = DateTime.UtcNow;
            }

            public T Value => _value;

            public LWWRegister<T> Merge(LWWRegister<T> other)
            {
                if (other._timestamp > _timestamp)
                {
                    return new LWWRegister<T>(_nodeId, other._value)
                    {
                        _timestamp = other._timestamp
                    };
                }
                return this;
            }
        }

Saga Pattern

Saga для Distributed Transactions

Saga — последовательность локальных транзакций:
        1. Каждая транзакция имеет compensating action
        2. Если step fails — execute compensations in reverse
        3. Нет distributed lock — eventual consistency

        Types:
        - Choreography: events trigger next step
        - Orchestration: central coordinator

Saga Orchestrator

public class SagaOrchestrator
        {
            private readonly List<SagaStep> _steps = new();

            public SagaOrchestrator AddStep(
                Func<SagaContext, Task> action,
                Func<SagaContext, Task> compensate)
            {
                _steps.Add(new SagaStep(action, compensate));
                return this;
            }

            public async Task ExecuteAsync(SagaContext context, CancellationToken ct = default)
            {
                var completedSteps = new Stack<int>();

                try
                {
                    for (int i = 0; i < _steps.Count; i++)
                    {
                        await _steps[i].Action(context);
                        completedSteps.Push(i);
                    }
                }
                catch (Exception ex)
                {
                    context.Exception = ex;
                    // Compensate in reverse order
                    while (completedSteps.TryPop(out var stepIndex))
                    {
                        try
                        {
                            await _steps[stepIndex].Compensate(context);
                        }
                        catch (Exception compEx)
                        {
                            context.CompensationErrors.Add(compEx);
                        }
                    }
                    throw new SagaCompensationFailedException(ex, context.CompensationErrors);
                }
            }
        }

        public record SagaStep(Func<SagaContext, Task> Action, Func<SagaContext, Task> Compensate);

        public class SagaContext
        {
            public Dictionary<string, object> Data { get; } = new();
            public Exception? Exception { get; set; }
            public List<Exception> CompensationErrors { get; } = new();
        }

        public class SagaCompensationFailedException : Exception
        {
            public Exception OriginalException { get; }
            public IReadOnlyList<Exception> CompensationErrors { get; }

            public SagaCompensationFailedException(Exception original, IReadOnlyList<Exception> errors)
                : base("Saga failed and compensation had errors", original)
            {
                OriginalException = original;
                CompensationErrors = errors;
            }
        }

        // Использование
        var saga = new SagaOrchestrator()
            .AddStep(
                async ctx => { /* Create Order */ },
                async ctx => { /* Cancel Order */ })
            .AddStep(
                async ctx => { /* Reserve Inventory */ },
                async ctx => { /* Release Inventory */ })
            .AddStep(
                async ctx => { /* Process Payment */ },
                async ctx => { /* Refund Payment */ })
            .AddStep(
                async ctx => { /* Ship Order */ },
                async ctx => { /* Return Shipment */ });

        await saga.ExecuteAsync(new SagaContext());

Idempotency Patterns

Idempotency Key

// Idempotency — multiple same requests = same result
        public class IdempotentApi
        {
            private readonly IDistributedCache _cache;

            public IdempotentApi(IDistributedCache cache)
            {
                _cache = cache;
            }

            public async Task<ApiResponse> ProcessAsync(
                string idempotencyKey,
                Func<Task<ApiResponse>> action)
            {
                // Check if already processed
                var cached = await _cache.GetAsync(idempotencyKey);
                if (cached != null)
                {
                    return JsonSerializer.Deserialize<ApiResponse>(cached)!;
                }

                // Execute action
                var result = await action();

                // Cache result
                await _cache.SetAsync(
                    idempotencyKey,
                    JsonSerializer.SerializeToUtf8Bytes(result),
                    new DistributedCacheEntryOptions
                    {
                        AbsoluteExpirationRelativeToNow = TimeSpan.FromHours(24)
                    });

                return result;
            }
        }

Deduplication Layer

public class DeduplicationLayer
        {
            private readonly ConcurrentDictionary<string, DedupEntry> _entries = new();
            private readonly TimeSpan _retention;

            private record DedupEntry(object Result, DateTime CreatedAt);

            public DeduplicationLayer(TimeSpan retention)
            {
                _retention = retention;
            }

            public async Task<T> ExecuteAsync<T>(
                string key,
                Func<Task<T>> action)
            {
                // Cleanup expired entries
                CleanupExpired();

                // Check if already processed
                if (_entries.TryGetValue(key, out var entry))
                {
                    if (DateTime.UtcNow - entry.CreatedAt < _retention)
                    {
                        return (T)entry.Result;
                    }
                    _entries.TryRemove(key, out _);
                }

                // Execute and cache
                var result = await action();
                _entries[key] = new DedupEntry(result, DateTime.UtcNow);
                return result;
            }

            private void CleanupExpired()
            {
                var now = DateTime.UtcNow;
                foreach (var kvp in _entries)
                {
                    if (now - kvp.Value.CreatedAt > _retention)
                    {
                        _entries.TryRemove(kvp.Key, out _);
                    }
                }
            }
        }

Практика

Задание 1: Distributed Lock через Redis SET NX

// Реализуйте:
        // 1. Acquire с retry и timeout
        // 2. Release с Lua script (атомарно)
        // 3. Auto-renewal (extend lock before expiry)
        // 4. IDisposable pattern

        public class RedisLock : IAsyncDisposable
        {
            private readonly IDatabase _redis;
            private readonly string _key;
            private readonly string _value;
            private readonly TimeSpan _expiry;
            private Timer? _renewalTimer;

            public async Task<bool> AcquireAsync(TimeSpan timeout, CancellationToken ct)
            {
                // Retry loop с timeout
            }

            public async Task<bool> ReleaseAsync()
            {
                // Lua script
            }

            private async Task RenewAsync()
            {
                // Extend expiry
            }

            public async ValueTask DisposeAsync()
            {
                _renewalTimer?.Dispose();
                await ReleaseAsync();
            }
        }

Задание 2: Saga Orchestrator

// См. реализацию выше
        // Добавьте:
        // 1. Persistence — saga state в storage
        // 2. Retry для compensation
        // 3. Timeout для каждого step
        // 4. Saga history/audit log

Задание 3: Idempotent API

// Реализуйте:
        // 1. Idempotency key в header
        // 2. Deduplication storage (Redis или in-memory)
        // 3. Response caching
        // 4. Cleanup expired entries

        public class IdempotencyMiddleware
        {
            private readonly RequestDelegate _next;
            private readonly IDistributedCache _cache;

            public async Task InvokeAsync(HttpContext context)
            {
                var key = context.Request.Headers["Idempotency-Key"].ToString();
                if (string.IsNullOrEmpty(key))
                {
                    await _next(context);
                    return;
                }

                // Check cache
                // If exists — return cached response
                // Else — execute, cache, return
            }
        }

Шпаргалка: Distributed Patterns

PatternUse Case
Redis LockSingle resource coordination
RedlockHigh availability distributed lock
ZooKeeper LockOrdered queue, leader election
RaftConsensus, replicated state
SagaDistributed transactions
CRDTsConflict-free replication
Idempotency KeySafe retries, exactly-once semantics

Performance Tuning Concurrent Systems

Thread Pool Starvation

Диагностика

// ThreadPool starvation — все потоки заблокированы
        // Симптомы:
        // - Высокая latency
        // - Queue length растёт
        // - CPU usage низкий (потоки ждут)

        // Мониторинг ThreadPool
        public class ThreadPoolMonitor
        {
            public static void LogStats()
            {
                ThreadPool.GetMaxThreads(out int maxWorker, out int maxIO);
                ThreadPool.GetMinThreads(out int minWorker, out int minIO);
                ThreadPool.GetAvailableThreads(out int availWorker, out int availIO);

                var inUseWorker = maxWorker - availWorker;
                var inUseIO = maxIO - availIO;

                Console.WriteLine($"Worker threads: {inUseWorker}/{maxWorker} (min: {minWorker})");
                Console.WriteLine($"IO threads: {inUseIO}/{maxIO} (min: {minIO})");
                Console.WriteLine($"Queue length: {ThreadPool.PendingWorkItems}");
            }
        }

Причины Starvation

// 1. Sync over async — блокировка ThreadPool потоков
        public string GetData()
        {
            return GetDataAsync().Result;  // Блокирует поток
        }

        // 2. Nested Task.Run — рекурсивное использование ThreadPool
        public async Task BadAsync()
        {
            await Task.Run(async () =>
            {
                await Task.Run(async () =>
                {
                    await Task.Run(() => { });  // Nested!
                });
            });
        }

        // 3. Blocking I/O в ThreadPool
        public void ProcessRequests()
        {
            Parallel.For(0, 1000, i =>
            {
                Thread.Sleep(100);  // I/O simulation — блокирует поток
            });
        }

Лечение Starvation

// 1. Async all the way
        public async Task<string> GetDataAsync()
        {
            return await _httpClient.GetStringAsync(url);
        }

        // 2. ConfigureAwait(false) в библиотеках
        public async Task<string> GetDataAsync()
        {
            return await _httpClient.GetStringAsync(url).ConfigureAwait(false);
        }

        // 3. Увеличить MinThreads для burst
        ThreadPool.SetMinThreads(100, 100);

        // 4. Использовать SemaphoreSlim для ограничения
        private readonly SemaphoreSlim _semaphore = new(10);

        public async Task ProcessAsync()
        {
            await _semaphore.WaitAsync();
            try
            {
                await DoWorkAsync();
            }
            finally
            {
                _semaphore.Release();
            }
        }

Async Context Flow — ExecutionContext, AsyncLocal

ExecutionContext

// ExecutionContext — flow context между async continuations
        // Включает:
        // - Security context
        // - CallContext
        // - CultureInfo
        // - AsyncLocal<T> values

        // ExecutionContext не flow'ится через Task.Run по умолчанию
        // Но flow'ится через await

        var culture = CultureInfo.CurrentCulture;
        await Task.Run(() =>
        {
            // Culture тот же — ExecutionContext flowed
            Console.WriteLine(CultureInfo.CurrentCulture);
        });

        // Suppress flow
        ExecutionContext.SuppressFlow();
        await Task.Run(() =>
        {
            // Default culture — flow suppressed
        });
        ExecutionContext.RestoreFlow();

AsyncLocal

// AsyncLocal<T> — async-aware thread-local storage
        // Значение flow'ится через async continuations

        public static class RequestContext
        {
            private static readonly AsyncLocal<string?> _requestId = new();

            public static string? RequestId
            {
                get => _requestId.Value;
                set => _requestId.Value = value;
            }
        }

        // Использование в middleware
        public async Task InvokeAsync(HttpContext context)
        {
            RequestContext.RequestId = Guid.NewGuid().ToString();
            await _next(context);
            // RequestId доступен во всём async chain
        }

        // В downstream коде
        public async Task ProcessAsync()
        {
            var requestId = RequestContext.RequestId;
            _logger.LogInformation("[{RequestId}] Processing", requestId);
        }

AsyncLocal Caveats

// AsyncLocal — copy-on-write semantics
        // Изменение в nested scope НЕ влияет на parent

        private static AsyncLocal<int> _value = new();

        public async Task DemoAsync()
        {
            _value.Value = 1;
            Console.WriteLine(_value.Value);  // 1

            await InnerAsync();

            Console.WriteLine(_value.Value);  // 1 (не изменился!)
        }

        private async Task InnerAsync()
        {
            _value.Value = 2;
            Console.WriteLine(_value.Value);  // 2
            // После await — значение 2 только в этом контексте
        }

        // ВАЖНО: AsyncLocal использует immutable snapshot
        // При await — snapshot копируется
        // Изменения в nested scope не propagate up

Synchronization Overhead Measurement

Benchmarking Locks

// BenchmarkDotNet для измерения overhead
        [MemoryDiagnoser]
        public class LockBenchmark
        {
            private int _counter;
            private readonly object _lock = new();
            private readonly SemaphoreSlim _semaphore = new(1, 1);

            [Benchmark]
            public void NoLock()
            {
                _counter++;
            }

            [Benchmark]
            public void WithLock()
            {
                lock (_lock)
                {
                    _counter++;
                }
            }

            [Benchmark]
            public async Task WithSemaphoreAsync()
            {
                await _semaphore.WaitAsync();
                try
                {
                    _counter++;
                }
                finally
                {
                    _semaphore.Release();
                }
            }

            [Benchmark]
            public void WithInterlocked()
            {
                Interlocked.Increment(ref _counter);
            }
        }

        // Результаты (примерные):
        // Method            | Mean     | Allocated |
        // ----------------- |--------- |---------- |
        // NoLock            |   0.5 ns |         - |
        // WithInterlocked   |   2.0 ns |         - |
        // WithLock          |  25.0 ns |         - |
        // WithSemaphoreAsync| 500.0 ns |     100 B |

Contention Measurement

// Monitor.LockContentionCount (.NET Core 3.0+)
        // Показывает количество lock contention events

        var before = Monitor.LockContentionCount;
        // ... execute code ...
        var after = Monitor.LockContentionCount;
        var contention = after - before;

        // Высокая contention — рассмотрите:
        // 1. ReaderWriterLockSlim для read-heavy
        // 2. ConcurrentDictionary для key-value
        // 3. Lock striping (разделить на多个 lock'ов)
        // 4. Lock-free структуры

Lock Striping

// Lock striping — уменьшить contention разделив на多个 lock'ов
        public class StripedCache<TKey, TValue>
        {
            private readonly int _stripeCount;
            private readonly object[] _locks;
            private readonly Dictionary<TKey, TValue>[] _stripes;

            public StripedCache(int stripeCount)
            {
                _stripeCount = stripeCount;
                _locks = Enumerable.Range(0, stripeCount).Select(_ => new object()).ToArray();
                _stripes = Enumerable.Range(0, stripeCount).Select(_ => new Dictionary<TKey, TValue>()).ToArray();
            }

            private int GetStripe(TKey key) => Math.Abs(key.GetHashCode()) % _stripeCount;

            public void Add(TKey key, TValue value)
            {
                var stripe = GetStripe(key);
                lock (_locks[stripe])
                {
                    _stripes[stripe][key] = value;
                }
            }

            public bool TryGetValue(TKey key, out TValue value)
            {
                var stripe = GetStripe(key);
                lock (_locks[stripe])
                {
                    return _stripes[stripe].TryGetValue(key, out value);
                }
            }
        }

Scalability Testing

Load Patterns

// Load testing patterns
        public class LoadTestRunner
        {
            public async Task RunConstantLoadAsync(
                Func<Task> action,
                int concurrentUsers,
                TimeSpan duration)
            {
                var cts = new CancellationTokenSource(duration);
                var tasks = Enumerable.Range(0, concurrentUsers)
                    .Select(_ => SimulateUserAsync(action, cts.Token))
                    .ToArray();

                await Task.WhenAll(tasks);
            }

            private async Task SimulateUserAsync(Func<Task> action, CancellationToken ct)
            {
                while (!ct.IsCancellationRequested)
                {
                    await action();
                    await Task.Delay(Random.Shared.Next(100, 1000), ct);
                }
            }
        }

Soak Tests

// Soak test — длительная нагрузка для поиска memory leaks
        public async Task RunSoakTestAsync(
            Func<Task> action,
            int concurrentUsers,
            TimeSpan duration)
        {
            var startTime = DateTime.UtcNow;
            var cts = new CancellationTokenSource(duration);

            var tasks = Enumerable.Range(0, concurrentUsers)
                .Select(_ => Task.Run(async () =>
                {
                    while (!cts.IsCancellationRequested)
                    {
                        var sw = Stopwatch.StartNew();
                        await action();
                        sw.Stop();

                        // Log latency
                        RecordLatency(sw.Elapsed);
                    }
                }))
                .ToArray();

            // Periodic GC и memory stats
            var monitorTask = Task.Run(async () =>
            {
                while (!cts.IsCancellationRequested)
                {
                    GC.Collect();
                    GC.WaitForPendingFinalizers();
                    GC.Collect();

                    LogMemoryStats();
                    await Task.Delay(TimeSpan.FromMinutes(1), cts.Token);
                }
            });

            await Task.WhenAll(tasks);
            await monitorTask;
        }

Spike Tests

// Spike test — резкое увеличение нагрузки
        public async Task RunSpikeTestAsync(Func<Task> action)
        {
            // Baseline — 10 users
            await RunConstantLoadAsync(action, 10, TimeSpan.FromMinutes(1));

            // Spike — 100 users
            await RunConstantLoadAsync(action, 100, TimeSpan.FromMinutes(1));

            // Recovery — 10 users
            await RunConstantLoadAsync(action, 10, TimeSpan.FromMinutes(1));

            // Measure:
            // - Latency degradation
            // - Error rate
            // - Recovery time
        }

Metrics Collection

public class MetricsCollector
        {
            private readonly ConcurrentQueue<TimeSpan> _latencies = new();
            private long _successCount;
            private long _errorCount;

            public void RecordLatency(TimeSpan latency, bool success)
            {
                _latencies.Enqueue(latency);
                if (success)
                    Interlocked.Increment(ref _successCount);
                else
                    Interlocked.Increment(ref _errorCount);
            }

            public MetricsReport GetReport()
            {
                var sorted = _latencies.OrderBy(x => x).ToList();
                return new MetricsReport
                {
                    TotalRequests = sorted.Count,
                    SuccessCount = Interlocked.Read(ref _successCount),
                    ErrorCount = Interlocked.Read(ref _errorCount),
                    P50 = sorted[sorted.Count / 2],
                    P95 = sorted[(int)(sorted.Count * 0.95)],
                    P99 = sorted[(int)(sorted.Count * 0.99)],
                    Throughput = sorted.Count / (sorted.Last() - sorted.First()).TotalSeconds
                };
            }
        }

        public record MetricsReport
        {
            public int TotalRequests { get; init; }
            public long SuccessCount { get; init; }
            public long ErrorCount { get; init; }
            public TimeSpan P50 { get; init; }
            public TimeSpan P95 { get; init; }
            public TimeSpan P99 { get; init; }
            public double Throughput { get; init; }
        }

Практика

Задание 1: Найти и устранить Thread Pool Starvation

// Сценарий: приложение с sync-over-async
        // 1. Создайте приложение с .Result/.Wait()
        // 2. Запустите load test — observe starvation
        // 3. Замените на async/await
        // 4. Сравните latency и throughput

        public class StarvationDemo
        {
            private readonly HttpClient _httpClient = new();

            // BAD — sync over async
            public string GetDataBad(string url)
            {
                return _httpClient.GetStringAsync(url).Result;
            }

            // GOOD — async all the way
            public async Task<string> GetDataGoodAsync(string url)
            {
                return await _httpClient.GetStringAsync(url);
            }
        }

Задание 2: Custom Diagnostic для Async Chain Analysis

public class AsyncChainDiagnostic
        {
            private readonly AsyncLocal<AsyncContext> _context = new();

            public IDisposable BeginScope(string operationName)
            {
                var ctx = new AsyncContext
                {
                    OperationName = operationName,
                    StartTime = DateTime.UtcNow,
                    Parent = _context.Value
                };
                _context.Value = ctx;
                return new Scope(this, ctx);
            }

            public void EndScope(AsyncContext ctx)
            {
                ctx.EndTime = DateTime.UtcNow;
                _context.Value = ctx.Parent;
                LogChain(ctx);
            }

            private void LogChain(AsyncContext ctx)
            {
                var chain = new List<AsyncContext>();
                var current = ctx;
                while (current != null)
                {
                    chain.Add(current);
                    current = current.Parent;
                }

                Console.WriteLine(string.Join(" → ", chain.Select(c =>
                    $"{c.OperationName} ({(c.EndTime - c.StartTime).TotalMilliseconds}ms)")));
            }

            private sealed record AsyncContext
            {
                public string OperationName { get; init; } = "";
                public DateTime StartTime { get; init; }
                public DateTime? EndTime { get; set; }
                public AsyncContext? Parent { get; set; }
            }

            private sealed class Scope : IDisposable
            {
                private readonly AsyncChainDiagnostic _diagnostic;
                private readonly AsyncContext _context;

                public Scope(AsyncChainDiagnostic diagnostic, AsyncContext context)
                {
                    _diagnostic = diagnostic;
                    _context = context;
                }

                public void Dispose() => _diagnostic.EndScope(_context);
            }
        }

        // Использование
        using var diag = new AsyncChainDiagnostic();
        using (diag.BeginScope("ProcessRequest"))
        {
            using (diag.BeginScope("FetchData"))
            {
                await Task.Delay(100);
            }
            using (diag.BeginScope("TransformData"))
            {
                await Task.Delay(50);
            }
        }

Задание 3: Load Test Framework

public class LoadTestFramework
        {
            private readonly MetricsCollector _metrics = new();

            public async Task<MetricsReport> RunAsync(
                Func<CancellationToken, Task> action,
                LoadTestOptions options)
            {
                var cts = new CancellationTokenSource(options.Duration);
                var tasks = Enumerable.Range(0, options.Concurrency)
                    .Select(_ => RunWorkerAsync(action, cts.Token))
                    .ToArray();

                await Task.WhenAll(tasks);
                return _metrics.GetReport();
            }

            private async Task RunWorkerAsync(
                Func<CancellationToken, Task> action,
                CancellationToken ct)
            {
                while (!ct.IsCancellationRequested)
                {
                    var sw = Stopwatch.StartNew();
                    try
                    {
                        await action(ct);
                        _metrics.RecordLatency(sw.Elapsed, true);
                    }
                    catch (Exception)
                    {
                        _metrics.RecordLatency(sw.Elapsed, false);
                    }

                    if (options.ThinkTime > TimeSpan.Zero)
                    {
                        await Task.Delay(options.ThinkTime, ct);
                    }
                }
            }
        }

        public record LoadTestOptions
        {
            public int Concurrency { get; init; }
            public TimeSpan Duration { get; init; }
            public TimeSpan ThinkTime { get; init; }
        }

Шпаргалка: Performance Checklist

ПроверкаИнструмент
Thread pool starvationThreadPool.GetAvailableThreads
Lock contentionMonitor.LockContentionCount
Async context flowAsyncLocal<T>, ExecutionContext
Memory leaksSoak test + GC stats
Latency percentilesP50, P95, P99 metrics
ThroughputRequests/sec measurement
ScalabilityLoad test с increasing concurrency

Advanced Async Patterns

PeriodicTimer vs Timer

PeriodicTimer (.NET 6+)

// PeriodicTimer — современный async timer
        // Преимущества:
        // - Полностью async (WaitForNextTickAsync)
        // - No callback hell
        // - Easy cancellation
        // - Skips missed ticks

        public async Task RunPeriodicAsync(CancellationToken ct)
        {
            using var timer = new PeriodicTimer(TimeSpan.FromSeconds(1));

            while (await timer.WaitForNextTickAsync(ct))
            {
                await DoWorkAsync();
            }
        }

        // Обработка drift
        public async Task RunWithDriftAsync(CancellationToken ct)
        {
            using var timer = new PeriodicTimer(TimeSpan.FromSeconds(1));

            while (await timer.WaitForNextTickAsync(ct))
            {
                var now = DateTime.UtcNow;
                await DoWorkAsync();
                var elapsed = DateTime.UtcNow - now;

                if (elapsed > TimeSpan.FromSeconds(1.5))
                {
                    Console.WriteLine($"Drift detected: {elapsed.TotalMilliseconds}ms");
                }
            }
        }

System.Threading.Timer

// Legacy timer — callback-based
        public class LegacyTimerDemo
        {
            private Timer? _timer;

            public void Start()
            {
                _timer = new Timer(
                    callback: async state => await DoWorkAsync(),
                    state: null,
                    dueTime: TimeSpan.Zero,
                    period: TimeSpan.FromSeconds(1));
            }

            // PROBLEM: callback может overlapped
            private async Task DoWorkAsync()
            {
                // Если работа занимает > 1s, следующий callback запустится
                // Нужно manual overlap protection
            }
        }

Timer Comparison

ХарактеристикаPeriodicTimerSystem.Threading.Timer
APIAsync-firstCallback-based
Overlap protectionBuilt-in (await)Manual
CancellationCancellationTokenChange(Timeout, Timeout)
Missed ticksSkippedQueued
.NET Version6+All

Structured Concurrency

Task.Run и Cancellation Patterns

// Structured concurrency — clear parent-child relationships
        // Все дочерние задачи завершаются когда parent завершается

        // Pattern: using var cts = ...
        public async Task ProcessWithTimeoutAsync()
        {
            using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(30));

            try
            {
                await DoWorkAsync(cts.Token);
            }
            catch (OperationCanceledException)
            {
                Console.WriteLine("Timed out");
            }
        }

        // Pattern: linked cancellation
        public async Task ProcessWithUserCancellationAsync(CancellationToken userCt)
        {
            using var cts = CancellationTokenSource.CreateLinkedTokenSource(userCt);
            cts.CancelAfter(TimeSpan.FromSeconds(30));

            await DoWorkAsync(cts.Token);
            // Отменится либо по userCt, либо по timeout
        }

Task.WhenAll с Cleanup

// Structured cleanup — все задачи завершаются cleanly
        public async Task ProcessAllWithCleanupAsync(
            IEnumerable<Func<CancellationToken, Task>> tasks,
            CancellationToken ct)
        {
            using var cts = CancellationTokenSource.CreateLinkedTokenSource(ct);
            var runningTasks = new List<Task>();

            try
            {
                foreach (var taskFactory in tasks)
                {
                    runningTasks.Add(taskFactory(cts.Token));
                }

                await Task.WhenAll(runningTasks);
            }
            catch
            {
                // Отменить все running tasks
                cts.Cancel();

                // Ждать cleanup
                await Task.WhenAll(runningTasks.Where(t => !t.IsCompleted));
                throw;
            }
        }

Background Service Pattern

// IHostedService с graceful shutdown
        public class MyBackgroundService : BackgroundService
        {
            protected override async Task ExecuteAsync(CancellationToken stoppingToken)
            {
                using var timer = new PeriodicTimer(TimeSpan.FromSeconds(1));

                while (await timer.WaitForNextTickAsync(stoppingToken))
                {
                    try
                    {
                        await DoWorkAsync(stoppingToken);
                    }
                    catch (OperationCanceledException) when (stoppingToken.IsCancellationRequested)
                    {
                        // Graceful shutdown
                        break;
                    }
                    catch (Exception ex)
                    {
                        _logger.LogError(ex, "Error in background service");
                    }
                }
            }

            public override async Task StopAsync(CancellationToken cancellationToken)
            {
                // Cleanup before base.StopAsync
                await CleanupAsync();
                await base.StopAsync(cancellationToken);
            }
        }

Parallel.ForEachAsync

Streaming Parallel Processing

// Parallel.ForEachAsync (.NET 6+) — async parallel processing
        public async Task ProcessUrlsAsync(IEnumerable<string> urls)
        {
            var options = new ParallelOptions
            {
                MaxDegreeOfParallelism = 10,
                CancellationToken = ct
            };

            await Parallel.ForEachAsync(urls, options, async (url, ct) =>
            {
                var content = await _httpClient.GetStringAsync(url, ct);
                await ProcessContentAsync(content, ct);
            });
        }

        // С throttling
        public async Task ThrottledProcessAsync(IEnumerable<string> urls)
        {
            var options = new ParallelOptions
            {
                MaxDegreeOfParallelism = 5  // Max 5 concurrent
            };

            await Parallel.ForEachAsync(urls, options, async (url, ct) =>
            {
                // Каждый url обрабатывается async, но не более 5 параллельно
                await ProcessAsync(url, ct);
            });
        }

Parallel.ForEachAsync vs Task.WhenAll

// Task.WhenAll — all at once (no throttling)
        var tasks = urls.Select(url => ProcessAsync(url));
        await Task.WhenAll(tasks);  // Все одновременно!

        // Parallel.ForEachAsync — throttled
        await Parallel.ForEachAsync(urls, new ParallelOptions
        {
            MaxDegreeOfParallelism = 10
        }, async (url, ct) =>
        {
            await ProcessAsync(url, ct);
        });  // Max 10 одновременно

        // Когда использовать:
        // Task.WhenAll — когда все задачи лёгкие и memory OK
        // Parallel.ForEachAsync — когда нужно throttling

Cooperative Cancellation Best Practices

Cancellation Propagation

// Всегда передавайте CancellationToken
        public async Task ProcessAsync(CancellationToken ct = default)
        {
            await Step1Async(ct);
            await Step2Async(ct);
            await Step3Async(ct);
        }

        // Проверка в циклах
        public async Task ProcessManyAsync(IEnumerable<Item> items, CancellationToken ct)
        {
            foreach (var item in items)
            {
                ct.ThrowIfCancellationRequested();  // Check before each item
                await ProcessItemAsync(item, ct);
            }
        }

Cancelable Operations

// HttpClient поддерживает cancellation
        var response = await _httpClient.GetAsync(url, ct);

        // Task.Delay поддерживает cancellation
        await Task.Delay(TimeSpan.FromSeconds(5), ct);

        // Channel поддерживает cancellation
        await channel.Writer.WriteAsync(item, ct);
        await foreach (var item in channel.Reader.ReadAllAsync(ct)) { }

        // Stream поддерживает cancellation
        await stream.ReadAsync(buffer, ct);

        // Custom cancellation
        public async Task<T> WithCancellation<T>(
            Task<T> task,
            CancellationToken ct)
        {
            var completed = await Task.WhenAny(task, CreateCancellationTask(ct));
            if (completed != task)
            {
                throw new OperationCanceledException(ct);
            }
            return await task;
        }

        private Task CreateCancellationTask(CancellationToken ct)
        {
            var tcs = new TaskCompletionSource();
            ct.Register(() => tcs.TrySetCanceled(ct));
            return tcs.Task;
        }

Cancellation with Timeout

// Timeout pattern
        public async Task<T> WithTimeoutAsync<T>(
            Func<CancellationToken, Task<T>> action,
            TimeSpan timeout)
        {
            using var cts = new CancellationTokenSource(timeout);
            return await action(cts.Token);
        }

        // Nested timeouts
        public async Task ProcessWithNestedTimeoutsAsync()
        {
            using var outerCts = new CancellationTokenSource(TimeSpan.FromMinutes(5));

            using var step1Cts = CancellationTokenSource.CreateLinkedTokenSource(outerCts.Token);
            step1Cts.CancelAfter(TimeSpan.FromSeconds(30));
            await Step1Async(step1Cts.Token);

            using var step2Cts = CancellationTokenSource.CreateLinkedTokenSource(outerCts.Token);
            step2Cts.CancelAfter(TimeSpan.FromSeconds(60));
            await Step2Async(step2Cts.Token);
        }

Практика

Задание 1: Resilient Background Service

// Retry + Circuit Breaker + Bulkhead
        public class ResilientBackgroundService : BackgroundService
        {
            private readonly RetryPolicy _retryPolicy;
            private readonly CircuitBreaker _circuitBreaker;
            private readonly SemaphoreSlim _bulkhead;

            public ResilientBackgroundService()
            {
                _retryPolicy = new RetryPolicy(3, TimeSpan.FromSeconds(1));
                _circuitBreaker = new CircuitBreaker(5, TimeSpan.FromSeconds(30));
                _bulkhead = new SemaphoreSlim(10);
            }

            protected override async Task ExecuteAsync(CancellationToken stoppingToken)
            {
                using var timer = new PeriodicTimer(TimeSpan.FromSeconds(1));

                while (await timer.WaitForNextTickAsync(stoppingToken))
                {
                    await _bulkhead.WaitAsync(stoppingToken);
                    try
                    {
                        await _circuitBreaker.ExecuteAsync(async ct =>
                        {
                            await _retryPolicy.ExecuteAsync(async retryCt =>
                            {
                                await DoWorkAsync(retryCt);
                            }, stoppingToken);
                        }, stoppingToken);
                    }
                    catch (OperationCanceledException) when (stoppingToken.IsCancellationRequested)
                    {
                        break;
                    }
                    catch (Exception ex)
                    {
                        _logger.LogError(ex, "Background service error");
                    }
                    finally
                    {
                        _bulkhead.Release();
                    }
                }
            }
        }

        public class RetryPolicy
        {
            private readonly int _maxRetries;
            private readonly TimeSpan _initialDelay;

            public RetryPolicy(int maxRetries, TimeSpan initialDelay)
            {
                _maxRetries = maxRetries;
                _initialDelay = initialDelay;
            }

            public async Task ExecuteAsync(Func<CancellationToken, Task> action, CancellationToken ct)
            {
                for (int i = 0; i <= _maxRetries; i++)
                {
                    try
                    {
                        await action(ct);
                        return;
                    }
                    catch when (i < _maxRetries)
                    {
                        var delay = _initialDelay * Math.Pow(2, i);  // Exponential backoff
                        await Task.Delay(delay, ct);
                    }
                }
            }
        }

Задание 2: Async Rate Limiter с Token Bucket

public class AsyncRateLimiter
        {
            private readonly int _maxTokens;
            private readonly int _refillRate;
            private readonly TimeSpan _refillInterval;
            private int _tokens;
            private DateTime _lastRefill;
            private readonly object _lock = new();

            public AsyncRateLimiter(int maxTokens, int refillRate, TimeSpan refillInterval)
            {
                _maxTokens = maxTokens;
                _refillRate = refillRate;
                _refillInterval = refillInterval;
                _tokens = maxTokens;
                _lastRefill = DateTime.UtcNow;
            }

            public async Task WaitAsync(CancellationToken ct = default)
            {
                while (true)
                {
                    ct.ThrowIfCancellationRequested();

                    lock (_lock)
                    {
                        RefillTokens();

                        if (_tokens > 0)
                        {
                            _tokens--;
                            return;
                        }
                    }

                    // Ждать пока tokens не появятся
                    await Task.Delay(_refillInterval, ct);
                }
            }

            private void RefillTokens()
            {
                var now = DateTime.UtcNow;
                var elapsed = now - _lastRefill;
                var tokensToAdd = (int)(elapsed.TotalSeconds / _refillInterval.TotalSeconds * _refillRate);

                if (tokensToAdd > 0)
                {
                    _tokens = Math.Min(_maxTokens, _tokens + tokensToAdd);
                    _lastRefill = now;
                }
            }
        }

Задание 3: Concurrent Job Scheduler

public class JobScheduler
        {
            private readonly PriorityQueue<Job, int> _queue = new();
            private readonly SemaphoreSlim _semaphore;
            private readonly object _lock = new();
            private CancellationTokenSource? _cts;
            private Task? _workerTask;

            public JobScheduler(int maxConcurrency)
            {
                _semaphore = new SemaphoreSlim(maxConcurrency);
            }

            public void Enqueue(Job job)
            {
                lock (_lock)
                {
                    _queue.Enqueue(job, job.Priority);
                }
            }

            public async Task StartAsync(CancellationToken ct)
            {
                _cts = CancellationTokenSource.CreateLinkedTokenSource(ct);
                _workerTask = ProcessQueueAsync(_cts.Token);
            }

            public async Task StopAsync()
            {
                _cts?.Cancel();
                if (_workerTask != null)
                {
                    await _workerTask;
                }
            }

            private async Task ProcessQueueAsync(CancellationToken ct)
            {
                while (!ct.IsCancellationRequested)
                {
                    Job? job;
                    lock (_lock)
                    {
                        if (_queue.TryDequeue(out var j, out _))
                        {
                            job = j;
                        }
                        else
                        {
                            job = null;
                        }
                    }

                    if (job == null)
                    {
                        await Task.Delay(100, ct);
                        continue;
                    }

                    await _semaphore.WaitAsync(ct);
                    _ = Task.Run(async () =>
                    {
                        try
                        {
                            await job.ExecuteAsync(ct);
                        }
                        catch (Exception ex)
                        {
                            job.OnError(ex);
                        }
                        finally
                        {
                            _semaphore.Release();
                        }
                    }, ct);
                }
            }
        }

        public record Job(int Priority, Func<CancellationToken, Task> ExecuteAsync, Action<Exception> OnError);

Шпаргалка: Async Patterns

PatternКогда использовать
PeriodicTimerРегулярные async операции (.NET 6+)
Structured ConcurrencyClear task lifecycle
Parallel.ForEachAsyncThrottled async parallel
Token BucketRate limiting
Retry + Circuit BreakerResilient operations
Cooperative CancellationGraceful shutdown

Контрольная точка модуля 3

Проект: Высоконагруженный Async Message Broker

Описание

Создайте высокопроизводительный message broker с использованием современных async/concurrency паттернов .NET.


Требования

Channel-Based Message Routing

public interface IMessageBroker
        {
            Task PublishAsync<T>(string topic, T message, CancellationToken ct = default);
            IAsyncEnumerable<T> SubscribeAsync<T>(string topic, CancellationToken ct = default);
            Task UnsubscribeAsync<T>(string topic, IAsyncEnumerable<T> subscription);
        }

        public class ChannelMessageBroker : IMessageBroker
        {
            private readonly ConcurrentDictionary<string, Channel<object>> _topics = new();

            public async Task PublishAsync<T>(string topic, T message, CancellationToken ct = default)
            {
                var channel = _topics.GetOrAdd(topic, _ =>
                    Channel.CreateBounded<object>(new BoundedChannelOptions(10000)
                    {
                        FullMode = BoundedChannelFullMode.Wait,
                        SingleReader = false,
                        SingleWriter = false
                    }));

                await channel.Writer.WriteAsync(message, ct);
            }

            public IAsyncEnumerable<T> SubscribeAsync<T>(string topic, CancellationToken ct = default)
            {
                var channel = _topics.GetOrAdd(topic, _ =>
                    Channel.CreateBounded<object>(new BoundedChannelOptions(10000)
                    {
                        FullMode = BoundedChannelFullMode.Wait,
                        SingleReader = false,
                        SingleWriter = false
                    }));

                return channel.Reader.ReadAllAsync(ct).CastAsync<T>();
            }
        }

Backpressure через Bounded Channels

public class BackpressureConfig
        {
            public int ChannelCapacity { get; set; } = 10000;
            public BoundedChannelFullMode FullMode { get; set; } = BoundedChannelFullMode.Wait;
            public TimeSpan BackpressureTimeout { get; set; } = TimeSpan.FromSeconds(5);
        }

        public class BackpressureMessageBroker
        {
            private readonly BackpressureConfig _config;

            public async Task PublishWithBackpressureAsync<T>(
                string topic,
                T message,
                CancellationToken ct = default)
            {
                var channel = GetOrCreateChannel(topic);

                using var timeoutCts = CancellationTokenSource.CreateLinkedTokenSource(ct);
                timeoutCts.CancelAfter(_config.BackpressureTimeout);

                try
                {
                    await channel.Writer.WriteAsync(message, timeoutCts.Token);
                }
                catch (OperationCanceledException) when (!ct.IsCancellationRequested)
                {
                    throw new BackpressureException($"Channel {topic} is full");
                }
            }

            public int GetQueueDepth(string topic)
            {
                if (_topics.TryGetValue(topic, out var channel))
                {
                    return channel.Reader.Count;
                }
                return 0;
            }
        }

        public class BackpressureException : Exception
        {
            public BackpressureException(string message) : base(message) { }
        }

Circuit Breaker для Downstream Calls

public class CircuitBreaker
        {
            private readonly int _failureThreshold;
            private readonly TimeSpan _resetTimeout;
            private int _failures;
            private DateTime? _lastFailure;
            private CircuitState _state = CircuitState.Closed;

            public enum CircuitState { Closed, Open, HalfOpen }

            public CircuitBreaker(int failureThreshold, TimeSpan resetTimeout)
            {
                _failureThreshold = failureThreshold;
                _resetTimeout = resetTimeout;
            }

            public async Task<T> ExecuteAsync<T>(Func<CancellationToken, Task<T>> action, CancellationToken ct)
            {
                var state = GetState();

                if (state == CircuitState.Open)
                {
                    throw new CircuitBreakerOpenException("Circuit breaker is open");
                }

                try
                {
                    var result = await action(ct);
                    OnSuccess();
                    return result;
                }
                catch
                {
                    OnFailure();
                    throw;
                }
            }

            private CircuitState GetState()
            {
                if (_state == CircuitState.Open)
                {
                    if (DateTime.UtcNow - _lastFailure > _resetTimeout)
                    {
                        _state = CircuitState.HalfOpen;
                    }
                }
                return _state;
            }

            private void OnSuccess()
            {
                Interlocked.Exchange(ref _failures, 0);
                _state = CircuitState.Closed;
            }

            private void OnFailure()
            {
                var failures = Interlocked.Increment(ref _failures);
                _lastFailure = DateTime.UtcNow;

                if (failures >= _failureThreshold)
                {
                    _state = CircuitState.Open;
                }
            }

            public CircuitState State => GetState();
        }

        public class CircuitBreakerOpenException : Exception
        {
            public CircuitBreakerOpenException(string message) : base(message) { }
        }

Graceful Shutdown с Cooperative Cancellation

public class GracefulMessageBroker : IHostedService, IMessageBroker
        {
            private readonly CancellationTokenSource _shutdownCts = new();
            private readonly List<Task> _activeSubscriptions = new();
            private readonly object _lock = new();

            public async Task StartAsync(CancellationToken cancellationToken)
            {
                // Инициализация
            }

            public async Task StopAsync(CancellationToken cancellationToken)
            {
                // 1. Stop accepting new messages
                _shutdownCts.Cancel();

                // 2. Wait for active subscriptions to complete
                lock (_lock)
                {
                    foreach (var subscription in _activeSubscriptions)
                    {
                        // Don't wait forever — respect cancellationToken
                    }
                }

                // 3. Complete all channels
                foreach (var kvp in _topics)
                {
                    kvp.Value.Writer.Complete();
                }

                // 4. Wait for readers to finish
                await Task.WhenAll(GetReaderTasks());
            }

            public async Task PublishAsync<T>(string topic, T message, CancellationToken ct = default)
            {
                if (_shutdownCts.IsCancellationRequested)
                {
                    throw new InvalidOperationException("Broker is shutting down");
                }

                // ... publish logic
            }
        }

Zero Thread Pool Starvation

// Гарантии:
        // 1. Все I/O operations — async
        // 2. Нет .Result/.Wait()
        // 3. ConfigureAwait(false) в библиотеках
        // 4. ThreadPool.SetMinThreads для burst

        public class StarvationFreeBroker
        {
            public StarvationFreeBroker()
            {
                // Установить min threads для burst нагрузки
                ThreadPool.GetMinThreads(out int minWorker, out _);
                if (minWorker < 100)
                {
                    ThreadPool.SetMinThreads(100, 100);
                }
            }

            // Все методы async
            public async Task ProcessAsync(Message message, CancellationToken ct)
            {
                await _handler.HandleAsync(message, ct).ConfigureAwait(false);
            }
        }

Comprehensive Metrics

public class BrokerMetrics
        {
            private readonly ConcurrentDictionary<string, TopicMetrics> _topicMetrics = new();

            public void RecordPublish(string topic, TimeSpan latency)
            {
                var metrics = _topicMetrics.GetOrAdd(topic, _ => new TopicMetrics());
                metrics.RecordPublish(latency);
            }

            public void RecordConsume(string topic, TimeSpan latency)
            {
                var metrics = _topicMetrics.GetOrAdd(topic, _ => new TopicMetrics());
                metrics.RecordConsume(latency);
            }

            public void RecordError(string topic)
            {
                var metrics = _topicMetrics.GetOrAdd(topic, _ => new TopicMetrics());
                metrics.RecordError();
            }

            public BrokerReport GetReport()
            {
                return new BrokerReport
                {
                    Topics = _topicMetrics.ToDictionary(
                        kvp => kvp.Key,
                        kvp => kvp.Value.GetReport())
                };
            }
        }

        public class TopicMetrics
        {
            private long _publishCount;
            private long _consumeCount;
            private long _errorCount;
            private readonly ConcurrentQueue<TimeSpan> _publishLatencies = new();
            private readonly ConcurrentQueue<TimeSpan> _consumeLatencies = new();

            public void RecordPublish(TimeSpan latency)
            {
                Interlocked.Increment(ref _publishCount);
                _publishLatencies.Enqueue(latency);
            }

            public void RecordConsume(TimeSpan latency)
            {
                Interlocked.Increment(ref _consumeCount);
                _consumeLatencies.Enqueue(latency);
            }

            public void RecordError()
            {
                Interlocked.Increment(ref _errorCount);
            }

            public TopicReport GetReport()
            {
                return new TopicReport
                {
                    PublishCount = Interlocked.Read(ref _publishCount),
                    ConsumeCount = Interlocked.Read(ref _consumeCount),
                    ErrorCount = Interlocked.Read(ref _errorCount),
                    PublishP99 = GetPercentile(_publishLatencies, 0.99),
                    ConsumeP99 = GetPercentile(_consumeLatencies, 0.99),
                    QueueDepth = _publishLatencies.Count - _consumeLatencies.Count
                };
            }

            private TimeSpan GetPercentile(ConcurrentQueue<TimeSpan> queue, double percentile)
            {
                var sorted = queue.OrderBy(x => x).ToList();
                if (sorted.Count == 0) return TimeSpan.Zero;
                return sorted[(int)(sorted.Count * percentile)];
            }
        }

        public record BrokerReport
        {
            public Dictionary<string, TopicReport> Topics { get; init; } = new();
        }

        public record TopicReport
        {
            public long PublishCount { get; init; }
            public long ConsumeCount { get; init; }
            public long ErrorCount { get; init; }
            public TimeSpan PublishP99 { get; init; }
            public TimeSpan ConsumeP99 { get; init; }
            public int QueueDepth { get; init; }
        }

Критерии прохождения

Throughput > 50k msg/sec

// Benchmark
        [Benchmark]
        public async Task HighThroughput()
        {
            var broker = new ChannelMessageBroker();
            var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10));

            var producer = Task.Run(async () =>
            {
                int count = 0;
                while (!cts.IsCancellationRequested)
                {
                    await broker.PublishAsync("test", new Message { Data = count++ }, cts.Token);
                }
            });

            var consumer = Task.Run(async () =>
            {
                int count = 0;
                await foreach (var msg in broker.SubscribeAsync<Message>("test", cts.Token))
                {
                    count++;
                }
                return count;
            });

            await producer;
            broker.CompleteAll();
            var total = await consumer;

            var throughput = total / 10.0;  // msg/sec
            Assert.True(throughput > 50000, $"Throughput: {throughput} msg/sec");
        }

P99 Latency < 10ms

// Latency test
        var latencies = new ConcurrentQueue<TimeSpan>();

        for (int i = 0; i < 100000; i++)
        {
            var sw = Stopwatch.StartNew();
            await broker.PublishAsync("test", new Message { Data = i });
            sw.Stop();
            latencies.Enqueue(sw.Elapsed);
        }

        var sorted = latencies.OrderBy(x => x).ToList();
        var p99 = sorted[(int)(sorted.Count * 0.99)];

        Assert.True(p99 < TimeSpan.FromMilliseconds(10), $"P99: {p99.TotalMilliseconds}ms");

Graceful Degradation

// Backpressure test
        var broker = new BackpressureMessageBroker(new BackpressureConfig
        {
            ChannelCapacity = 100,
            FullMode = BoundedChannelFullMode.Wait,
            BackpressureTimeout = TimeSpan.FromMilliseconds(100)
        });

        // Fill channel
        for (int i = 0; i < 100; i++)
        {
            await broker.PublishWithBackpressureAsync("test", new Message { Data = i });
        }

        // Next publish should timeout
        var ex = await Assert.ThrowsAsync<BackpressureException>(async () =>
        {
            await broker.PublishWithBackpressureAsync("test", new Message { Data = 101 });
        });

Clean Shutdown

// Shutdown test
        var broker = new GracefulMessageBroker();
        await broker.StartAsync(CancellationToken.None);

        // Publish messages
        for (int i = 0; i < 1000; i++)
        {
            await broker.PublishAsync("test", new Message { Data = i });
        }

        // Graceful shutdown
        await broker.StopAsync(CancellationToken.None);

        // All messages should be processed
        Assert.True(processedCount == 1000);

No Deadlocks или Race Conditions

// Concurrent load test
        var broker = new ChannelMessageBroker();
        var cts = new CancellationTokenSource(TimeSpan.FromSeconds(30));

        var producers = Enumerable.Range(0, 10)
            .Select(i => Task.Run(async () =>
            {
                int count = 0;
                while (!cts.IsCancellationRequested)
                {
                    await broker.PublishAsync($"topic-{i % 3}", new Message { Data = count++ }, cts.Token);
                }
            }))
            .ToArray();

        var consumers = Enumerable.Range(0, 10)
            .Select(i => Task.Run(async () =>
            {
                int count = 0;
                await foreach (var msg in broker.SubscribeAsync<Message>($"topic-{i % 3}", cts.Token))
                {
                    count++;
                }
                return count;
            }))
            .ToArray();

        // Should complete without deadlock
        await Task.WhenAll(producers);
        broker.CompleteAll();
        var totals = await Task.WhenAll(consumers);

Архитектура проекта

MessageBroker/
        ├── Core/
        │   ├── IMessageBroker.cs
        │   ├── ChannelMessageBroker.cs
        │   ├── BackpressureMessageBroker.cs
        │   └── GracefulMessageBroker.cs
        ├── Resilience/
        │   ├── CircuitBreaker.cs
        │   ├── RetryPolicy.cs
        │   └── Bulkhead.cs
        ├── Metrics/
        │   ├── BrokerMetrics.cs
        │   ├── TopicMetrics.cs
        │   └── BrokerReport.cs
        ├── Routing/
        │   ├── TopicRouter.cs
        │   └── MessageFilter.cs
        └── Tests/
            ├── ThroughputTests.cs
            ├── LatencyTests.cs
            ├── BackpressureTests.cs
            ├── ShutdownTests.cs
            └── ConcurrencyTests.cs

Checklist

  • [ ] Channel-based message routing реализован
  • [ ] Bounded channels с backpressure
  • [ ] Circuit breaker для downstream calls
  • [ ] Graceful shutdown с cooperative cancellation
  • [ ] ThreadPool starvation prevention
  • [ ] Metrics (queue depth, processing time, error rate)
  • [ ] Throughput > 50k msg/sec
  • [ ] P99 latency < 10ms
  • [ ] Graceful degradation при overload
  • [ ] Clean shutdown без lost messages
  • [ ] No deadlocks или race conditions