Уровень 1: Foundation
Основы многопоточности
Thread Lifecycle
Состояния потока
Unstarted → Running → WaitSleepJoin → Stopped
↓
AbortRequested (.NET Framework only)
| Состояние | Описание |
Unstarted | Поток создан, но Start() не вызван |
Running | Поток выполняет код |
WaitSleepJoin | Поток заблокирован (Wait, Sleep, Join) |
Stopped | Поток завершил выполнение |
Создание потока
// Базовое создание
var thread = new Thread(Work);
thread.Start("arg");
// С параметром через lambda
var thread2 = new Thread(() => WorkWithParam(42));
thread2.Start();
// С настройками
var thread3 = new Thread(Work)
{
Name = "Worker-1",
IsBackground = true,
Priority = ThreadPriority.AboveNormal
};
thread3.Start();
void Work(object? state) => Console.WriteLine($"Working: {state}");
void WorkWithParam(int n) => Console.WriteLine($"Working: {n}");
Управление потоком
// Join — ожидание завершения
thread.Join(); // ждать бесконечно
thread.Join(1000); // ждать 1 секунду, вернуть false если не завершился
// Interrupt — прерывание WaitSleepJoin
thread.Interrupt(); // бросает ThreadInterruptedException
// Thread.Sleep — блокировка текущего потока
Thread.Sleep(100); // НЕ используйте в async коде!
// Thread.Yield — уступить квант времени
Thread.Yield(); // вернуть true если ОС переключила контекст
Thread.Sleep() — почему антипаттерн в async коде
// ПЛОХО — блокирует поток из ThreadPool
public void BadSyncMethod()
{
Thread.Sleep(1000); // Поток простаивает, не может выполнять другую работу
}
// ХОРОШО — асинхронное ожидание
public async Task GoodAsyncMethod()
{
await Task.Delay(1000); // Поток возвращается в ThreadPool
}
Почему Thread.Sleep() плох:
- Блокирует поток ThreadPool на всё время сна
- Уменьшает доступные потоки для других задач
- Может привести к thread pool starvation
- Не освобождает ресурсы во время ожидания
Thread vs Task vs ThreadPool
Сравнение
| Характеристика | Thread | Task | ThreadPool |
| overhead | Высокий (~1MB стек) | Низкий | Управляемый |
| Создание | Дорогое | Дешёвое | Уже создан |
| Управление | Ручное | TPL управляет | Автоматическое |
| Composability | Нет | WhenAll, ContinueWith | Нет |
| Cancellation | Ручное | CancellationToken | Нет |
| Return value | Нет | Task<T> | Нет |
ThreadPool
// QueueUserWorkItem — базовый способ
ThreadPool.QueueUserWorkItem(state =>
{
Console.WriteLine($"Working on thread: {Thread.CurrentThread.ManagedThreadId}");
}, null);
// RegisterWaitForSingleObject — ожидание с таймаутом
var handle = new ManualResetEvent(false);
ThreadPool.RegisterWaitForSingleObject(
handle,
(state, timedOut) => Console.WriteLine("Signaled!"),
null,
TimeSpan.FromSeconds(5),
executeOnlyOnce: true
);
Task — современный подход
// Task.Run — выполнение в ThreadPool
Task.Run(() => ComputeHeavy());
// Task.FromResult — уже завершённая задача
Task<int> completed = Task.FromResult(42);
// Task.FromException — задача с ошибкой
Task<int> faulted = Task.FromException<int>(new InvalidOperationException());
// Task.FromCanceled — отменённая задача
var cts = new CancellationTokenSource();
cts.Cancel();
Task<int> canceled = Task.FromCanceled<int>(cts.Token);
ThreadPool Internals
Базовые параметры
// Получение текущих лимитов
ThreadPool.GetMaxThreads(out int maxWorker, out int maxIO);
ThreadPool.GetMinThreads(out int minWorker, out int minIO);
Console.WriteLine($"Max Worker: {maxWorker}, Max IO: {maxIO}");
Console.WriteLine($"Min Worker: {minWorker}, Min IO: {minIO}");
// Установка минимального количества потоков
ThreadPool.SetMinThreads(workerThreads: 100, completionPortThreads: 100);
Динамическое управление (.NET 5+)
ThreadPool использует hill-climbing алгоритм:
1. Мониторит throughput и latency
2. Увеличивает потоки если очередь растёт
3. Уменьшает если потоки простаивают
4. Hill-climbing period: ~500ms
Внутренняя структура
ThreadPool:
├── Global Queue (work items от Task.Run)
├── Local Queues (per-thread, work-stealing)
│ ├── Thread 1 Queue
│ ├── Thread 2 Queue
│ └── Thread 3 Queue
└── IO Completion Port (async I/O callbacks)
Work Stealing
// Каждый поток имеет локальную очередь (LIFO)
// Если очередь пуста — ворует из другой очереди (FIFO)
// Это обеспечивает балансировку нагрузки
// Пример демонстрации
var tasks = Enumerable.Range(0, 1000)
.Select(i => Task.Run(() =>
{
Thread.Sleep(1); // имитация работы
return Interlocked.Increment(ref _counter);
}))
.ToArray();
await Task.WhenAll(tasks);
Влияние SetMinThreads на latency
// Сценарий: burst нагрузки
// Без SetMinThreads: ThreadPool создаёт потоки постепенно (1 поток/500ms)
// С SetMinThreads: все потоки доступны сразу
// Benchmark результат (1000 задач по 10ms):
// Без SetMinThreads: ~15 секунд (потоки создаются постепенно)
// С SetMinThreads(1000): ~1 секунда (все потоки готовы)
// Но: установка слишком большого minThreads ведёт к:
// - Избыточному потреблению памяти (1MB на поток)
// - Context switching overhead
// - Degraded performance при низкой нагрузке
Race Conditions
Что такое Race Condition
// Классический race condition
public class Counter
{
private int _count;
public void Increment()
{
// НЕАТОМАРНАЯ операция:
// 1. Прочитать _count
// 2. Увеличить на 1
// 3. Записать _count
_count++;
}
public int Count => _count;
}
// Демонстрация проблемы
var counter = new Counter();
var tasks = Enumerable.Range(0, 1000)
.Select(_ => Task.Run(() => counter.Increment()))
.ToArray();
await Task.WhenAll(tasks);
Console.WriteLine(counter.Count); // Скорее всего < 1000!
Воспроизведение и исправление
// ПЛОХО — race condition
public class BadCache
{
private Dictionary<string, string> _data = new();
public string GetOrAdd(string key, Func<string> factory)
{
if (!_data.TryGetValue(key, out var value))
{
value = factory();
_data[key] = value; // Race: другой поток мог уже добавить
}
return value;
}
}
// ХОРОШО — с lock
public class GoodCache
{
private readonly Dictionary<string, string> _data = new();
private readonly object _lock = new();
public string GetOrAdd(string key, Func<string> factory)
{
lock (_lock)
{
if (!_data.TryGetValue(key, out var value))
{
value = factory();
_data[key] = value;
}
return value;
}
}
}
// ЛУЧШЕ — ConcurrentDictionary
public class BetterCache
{
private readonly ConcurrentDictionary<string, string> _data = new();
public string GetOrAdd(string key, Func<string> factory)
{
return _data.GetOrAdd(key, _ => factory());
}
}
Deadlocks
Что такое Deadlock
Deadlock возникает когда:
1. Mutual Exclusion — ресурс не может быть общим
2. Hold and Wait — поток держит ресурс и ждёт другой
3. No Preemption — ресурс нельзя отнять
4. Circular Wait — цикл ожидания
Классический deadlock
// DEADLOCK — два потока, два lock'а в разном порядке
var lockA = new object();
var lockB = new object();
var t1 = Task.Run(() =>
{
lock (lockA)
{
Thread.Sleep(100);
lock (lockB) // Ждёт lockB, который держит t2
{
Console.WriteLine("T1 done");
}
}
});
var t2 = Task.Run(() =>
{
lock (lockB)
{
Thread.Sleep(100);
lock (lockA) // Ждёт lockA, который держит t1
{
Console.WriteLine("T2 done");
}
}
});
await Task.WhenAll(t1, t2); // Никогда не завершится!
Async Deadlock
// DEADLOCK — sync over async
public string GetData()
{
return GetDataAsync().Result; // Блокирует поток
// или .Wait()
}
public async Task<string> GetDataAsync()
{
await Task.Delay(100);
return "data";
}
// Почему deadlock:
// 1. GetData() блокирует UI/SynchronizationContext поток
// 2. GetDataAsync() пытается вернуться на тот же поток после await
// 3. Поток заблокирован → await никогда не завершится
// ИСПРАВЛЕНИЕ:
public async Task<string> GetDataFixed()
{
return await GetDataAsync(); // async all the way
}
Исправление deadlock
// 1. Фиксированный порядок lock'ов
public void SafeMethod()
{
// Всегда lockA, потом lockB
lock (lockA)
lock (lockB)
{
// работа
}
}
// 2. Timeout с Monitor.TryEnter
if (Monitor.TryEnter(lockA, TimeSpan.FromSeconds(1)))
{
try
{
if (Monitor.TryEnter(lockB, TimeSpan.FromSeconds(1)))
{
try { /* работа */ }
finally { Monitor.Exit(lockB); }
}
}
finally { Monitor.Exit(lockA); }
}
// 3. Async all the way
public async Task<string> GetDataAsync()
{
await using var resource = await AcquireResourceAsync();
return await resource.ProcessAsync();
}
Livelocks и Starvation
Livelock
// Livelock — потоки активны, но не прогрессируют
// Пример: два потока постоянно уступают друг другу
public class LivelockDemo
{
private static bool _t1Active = true;
public static void Run()
{
var t1 = Task.Run(() =>
{
while (true)
{
if (_t1Active)
{
// Пытаемся работать, но уступаем
Thread.Sleep(1);
_t1Active = false;
}
}
});
var t2 = Task.Run(() =>
{
while (true)
{
if (!_t1Active)
{
Thread.Sleep(1);
_t1Active = true;
}
}
});
}
}
Starvation
// Starvation — поток никогда не получает ресурс
// Пример: приоритетные потоки постоянно抢占 низкоприоритетные
var lowPriority = Task.Factory.StartNew(
() => Console.WriteLine("Low priority"),
CancellationToken.None,
TaskCreationOptions.LongRunning,
TaskScheduler.Default
);
// Высокоприоритетные задачи могут не дать выполниться низкоприоритетной
Memory Visibility и Happens-Before
Проблема видимости
// БЕЗ синхронизации — другой поток может не увидеть изменение
private bool _flag = false;
private int _data = 0;
// Поток 1
_data = 42;
_flag = true;
// Поток 2
if (_flag)
{
Console.WriteLine(_data); // Может быть 0! (без синхронизации)
}
Happens-Before Relationship
Happens-Before гарантирует:
1. lock release → следующий lock acquire
2. volatile write → следующий volatile read
3. Thread.Start() → код в запущенном потоке
4. Код в потоке → Thread.Join() завершение
5. Monitor.Exit → Monitor.Enter на том же объекте
volatile
// volatile гарантирует видимость между потоками
private volatile bool _shouldStop = false;
public void Stop() => _shouldStop = true;
public void Run()
{
while (!_shouldStop) // Всегда видит последнее значение
{
// работа
}
}
Memory Barriers
// Thread.MemoryBarrier() — полный memory barrier
// Запрещает reorder инструкций CPU и компилятора
int x = 0, y = 0;
bool ready = false;
// Поток 1
x = 1;
Thread.MemoryBarrier(); // Запрещает reorder
ready = true;
// Поток 2
if (ready)
{
Thread.MemoryBarrier();
Console.WriteLine(x); // Всегда 1
}
Практика
Задание 1: Воспроизвести deadlock и race condition
// 1. Создайте проект и воспроизведите race condition с counter
// 2. Создайте deadlock с двумя lock'ами
// 3. Исправьте оба с помощью lock и ConcurrentDictionary
Задание 2: Benchmark Thread vs ThreadPool vs Task.Run
// Используйте BenchmarkDotNet для сравнения:
// - new Thread() + Start()
// - ThreadPool.QueueUserWorkItem()
// - Task.Run()
// Измерьте overhead создания и выполнения
Задание 3: Измерить влияние SetMinThreads
// 1. Запустите 1000 задач по 10ms без SetMinThreads
// 2. Запустите с ThreadPool.SetMinThreads(1000, 1000)
// 3. Сравните total execution time
Контрольные вопросы
- Почему
Thread.Sleep() антипаттерн в async коде?
- Что такое thread pool starvation и как его избежать?
- В чём разница между
Task.Run и ConfigureAwait(false)?
Синхронизация
lock / Monitor
lock — синтаксический сахар
// Это:
lock (obj)
{
// критическая секция
}
// Эквивалентно:
bool lockTaken = false;
try
{
Monitor.Enter(obj, ref lockTaken);
// критическая секция
}
finally
{
if (lockTaken) Monitor.Exit(obj);
}
Monitor — расширенные возможности
private readonly object _lock = new();
public void WithTimeout()
{
// TryEnter с таймаутом — не блокирует бесконечно
if (Monitor.TryEnter(_lock, TimeSpan.FromSeconds(1)))
{
try
{
// критическая секция
}
finally
{
Monitor.Exit(_lock);
}
}
else
{
Console.WriteLine("Не удалось получить lock");
}
}
public void WithWaitPulse()
{
lock (_lock)
{
while (!condition)
{
Monitor.Wait(_lock); // Освобождает lock и ждёт
}
// работа
Monitor.Pulse(_lock); // Уведомляет один ждущий поток
Monitor.PulseAll(_lock); // Уведомляет все ждущие потоки
}
}
Spin Lock Behavior
// Monitor использует spin wait перед блокировкой:
// 1. Кратковременно "крутится" (spin) — не переключает контекст
// 2. Если lock не доступен — переходит в реальную блокировку
// 3. Spin count зависит от версии .NET и нагрузки
// SpinLock — явный spin lock (для очень коротких критических секций)
var spinLock = new SpinLock();
bool lockTaken = false;
spinLock.Enter(ref lockTaken);
try
{
// ОЧЕНЬ короткая операция (< 100ns)
}
finally
{
if (lockTaken) spinLock.Exit();
}
Когда использовать lock
Используйте lock когда:
✅ Критическая секция короткая (< 1ms)
✅ Нет риска deadlock
✅ Нужна простота
НЕ используйте lock когда:
❌ Долгие операции в критической секции
❌ Нужен timeout
❌ Read-heavy workload (используйте ReaderWriterLockSlim)
❌ Нужна async совместимость (используйте SemaphoreSlim)
Mutex
Mutex — межпроцессная синхронизация
// Mutex — kernel object, работает между процессами
// Медленнее lock (~100x), но跨процессный
// Именованный Mutex — для межпроцессной синхронизации
using var mutex = new Mutex(false, "Global\\MyAppSingleton");
if (mutex.WaitOne(TimeSpan.Zero))
{
try
{
// Только один экземпляр приложения
Console.WriteLine("Первый экземпляр");
Console.ReadLine();
}
finally
{
mutex.ReleaseMutex();
}
}
else
{
Console.WriteLine("Приложение уже запущено");
}
Mutex vs lock
| Характеристика | lock (Monitor) | Mutex |
| Скорость | Быстрый (~25ns) | Медленный (~2500ns) |
| Межпроцессный | Нет | Да |
| Рекурсивный | Да | Да (по умолчанию) |
| Может быть abandoned | Нет | Да |
Semaphore / SemaphoreSlim
SemaphoreSlim — асинхронный семафор
// SemaphoreSlim — ограничивает количество одновременных доступов
// Поддерживает async WaitAsync()
private readonly SemaphoreSlim _semaphore = new(3, 3); // max 3 параллельно
public async Task ProcessAsync(int id)
{
await _semaphore.WaitAsync();
try
{
Console.WriteLine($"[{id}] Начал обработку");
await Task.Delay(1000);
Console.WriteLine($"[{id}] Завершил обработку");
}
finally
{
_semaphore.Release();
}
}
// Использование
var tasks = Enumerable.Range(1, 10)
.Select(i => ProcessAsync(i));
await Task.WhenAll(tasks);
Semaphore — kernel object
// Semaphore — kernel object, межпроцессный
using var semaphore = new Semaphore(3, 3, "MySemaphore");
semaphore.WaitOne();
try
{
// работа
}
finally
{
semaphore.Release();
}
Когда использовать
| Примитив | Когда использовать |
SemaphoreSlim | Ограничение параллелизма, async код |
Semaphore | Межпроцессное ограничение |
lock | Mutual exclusion (1 поток) |
ReaderWriterLockSlim
Read-heavy оптимизация
// ReaderWriterLockSlim — multiple readers OR single writer
// Оптимален для read-heavy workloads
public class ThreadSafeCache<TKey, TValue>
{
private readonly Dictionary<TKey, TValue> _data = new();
private readonly ReaderWriterLockSlim _rwLock = new();
public TValue Get(TKey key)
{
_rwLock.EnterReadLock();
try
{
return _data[key];
}
finally
{
_rwLock.ExitReadLock();
}
}
public void Set(TKey key, TValue value)
{
_rwLock.EnterWriteLock();
try
{
_data[key] = value;
}
finally
{
_rwLock.ExitWriteLock();
}
}
public TValue GetOrAdd(TKey key, Func<TKey, TValue> factory)
{
// Сначала пробуем read lock
_rwLock.EnterUpgradeableReadLock();
try
{
if (_data.TryGetValue(key, out var value))
return value;
// Апгрейд до write lock
_rwLock.EnterWriteLock();
try
{
value = factory(key);
_data[key] = value;
return value;
}
finally
{
_rwLock.ExitWriteLock();
}
}
finally
{
_rwLock.ExitUpgradeableReadLock();
}
}
}
Режимы блокировки
EnterReadLock() — множественные читатели
EnterWriteLock() — единственный писатель
EnterUpgradeableReadLock() — читатель с возможностью апгрейда
Performance сравнение
1000 readers, 10 writers:
- lock: ~500ms (сериализует всё)
- ReaderWriterLockSlim: ~50ms (readers параллельны)
- ConcurrentDictionary: ~30ms (lock-free internals)
Event Wait Handles
ManualResetEvent
// ManualResetEvent — остаётся сигнальным до Reset()
// Как переключатель: ON/OFF
var mre = new ManualResetEvent(false);
// Поток-ожидатель
Task.Run(() =>
{
Console.WriteLine("Жду сигнал...");
mre.WaitOne(); // Блокируется до сигнала
Console.WriteLine("Получил сигнал!");
});
// Поток-сигнализатор
Thread.Sleep(2000);
mre.Set(); // Сигнал — все ждущие продолжат
// mre.Reset(); // Сброс — следующие WaitOne будут блокироваться
AutoResetEvent
// AutoResetEvent — автоматически сбрасывается после одного WaitOne
// Как турникет: один прошёл — закрылся
var are = new AutoResetEvent(false);
// Только ОДИН поток получит сигнал
Task.Run(() => { are.WaitOne(); Console.WriteLine("T1"); });
Task.Run(() => { are.WaitOne(); Console.WriteLine("T2"); });
are.Set(); // Только T1 или T2 продолжит, другой останется ждать
CountdownEvent
// CountdownEvent — ждёт N сигналов
// Идеально для "ждём завершения N задач"
var countdown = new CountdownEvent(3);
Task.Run(() => { Thread.Sleep(1000); countdown.Signal(); });
Task.Run(() => { Thread.Sleep(2000); countdown.Signal(); });
Task.Run(() => { Thread.Sleep(1500); countdown.Signal(); });
countdown.Wait(); // Ждёт пока Count станет 0
Console.WriteLine("Все задачи завершены!");
// С CancellationToken
countdown.Wait(cancellationToken);
Сравнение Event Wait Handles
| Примитив | Поведение | Use case |
ManualResetEvent | Остаётся сигнальным | "Готово, работайте все" |
AutoResetEvent | Сбрасывается после одного | "Один за раз" |
CountdownEvent | Ждёт N сигналов | "Ждём N задач" |
ManualResetEventSlim | Быстрая user-mode версия | Короткие ожидания |
Barrier
Coordinated Iteration
// Barrier — синхронизирует N потоков на определённой фазе
// Все потоки должны достичь Barrier прежде чем продолжить
var barrier = new Barrier(3, b =>
{
// Post-phase action — выполняется когда все достигли barrier
Console.WriteLine($"--- Фаза {b.CurrentPhaseNumber} завершена ---");
});
var tasks = new[]
{
Task.Run(() =>
{
for (int i = 0; i < 3; i++)
{
Console.WriteLine($"T1: фаза {i}");
barrier.SignalAndWait();
}
}),
Task.Run(() =>
{
for (int i = 0; i < 3; i++)
{
Console.WriteLine($"T2: фаза {i}");
barrier.SignalAndWait();
}
}),
Task.Run(() =>
{
for (int i = 0; i < 3; i++)
{
Console.WriteLine($"T3: фаза {i}");
barrier.SignalAndWait();
}
})
};
await Task.WhenAll(tasks);
Barrier с добавлением/удалением участников
var barrier = new Barrier(2);
// Динамическое изменение участников
barrier.AddParticipant(); // Теперь 3 участника
barrier.RemoveParticipant(); // Снова 2
Практика
Задание 1: Thread-safe cache с TTL и max size
// Требования:
// - GetOrAdd с TTL
// - Max size с eviction (LRU или oldest first)
// - ReaderWriterLockSlim для оптимизации reads
// - Background cleanup expired entries
public class TtlCache<TKey, TValue>
{
private readonly int _maxSize;
private readonly TimeSpan _ttl;
private readonly Dictionary<TKey, (TValue Value, DateTime Expires)> _data = new();
private readonly ReaderWriterLockSlim _lock = new();
private readonly Timer _cleanupTimer;
public TtlCache(int maxSize, TimeSpan ttl)
{
_maxSize = maxSize;
_ttl = ttl;
_cleanupTimer = new Timer(Cleanup, null, ttl, ttl);
}
public TValue GetOrAdd(TKey key, Func<TKey, TValue> factory)
{
// Реализуйте с UpgradeableReadLock
// Проверьте TTL
// Evict если maxSize reached
}
private void Cleanup(object? state)
{
// Удалите expired entries
// Используйте WriteLock
}
}
Задание 2: Producer-Consumer queue с backpressure
// Требования:
// - Bounded capacity
// - Backpressure — producer ждёт если queue full
// - Graceful shutdown
// - Используйте SemaphoreSlim
public class BoundedQueue<T>
{
private readonly Queue<T> _queue = new();
private readonly SemaphoreSlim _spaceAvailable;
private readonly SemaphoreSlim _itemAvailable;
private readonly object _lock = new();
private bool _completed;
public BoundedQueue(int capacity)
{
_spaceAvailable = new SemaphoreSlim(capacity, capacity);
_itemAvailable = new SemaphoreSlim(0, capacity);
}
public async Task EnqueueAsync(T item, CancellationToken ct = default)
{
await _spaceAvailable.WaitAsync(ct);
lock (_lock)
{
_queue.Enqueue(item);
}
_itemAvailable.Release();
}
public async Task<T> DequeueAsync(CancellationToken ct = default)
{
await _itemAvailable.WaitAsync(ct);
lock (_lock)
{
return _queue.Dequeue();
}
}
public void Complete()
{
_completed = true;
// Разбудите всех ждущих consumers
}
}
Задание 3: Connection pool с SemaphoreSlim
// Требования:
// - Ограничение количества подключений
// - Reuse connections
// - Timeout на получение connection
// - Health check
public class ConnectionPool : IAsyncDisposable
{
private readonly SemaphoreSlim _semaphore;
private readonly Queue<IDbConnection> _available = new();
private readonly string _connectionString;
private readonly int _maxSize;
public ConnectionPool(string connectionString, int maxSize)
{
_connectionString = connectionString;
_maxSize = maxSize;
_semaphore = new SemaphoreSlim(maxSize, maxSize);
}
public async Task<PooledConnection> GetAsync(CancellationToken ct = default)
{
await _semaphore.WaitAsync(ct);
// Возьмите connection из queue или создайте новое
}
public async ValueTask DisposeAsync()
{
// Закройте все connections
}
}
public class PooledConnection : IAsyncDisposable
{
private readonly ConnectionPool _pool;
private readonly IDbConnection _connection;
public async ValueTask DisposeAsync()
{
// Верните connection в pool
}
}
Шпаргалка: Выбор примитива синхронизации
| Сценарий | Примитив |
| Короткая критическая секция | lock |
| Async код | SemaphoreSlim |
| Read-heavy данные | ReaderWriterLockSlim |
| Ограничение параллелизма | SemaphoreSlim |
| Producer-Consumer | Channel<T> или BlockingCollection<T> |
| Межпроцессная синхронизация | Mutex или Semaphore |
| Signal одному потоку | AutoResetEvent |
| Signal всем потокам | ManualResetEvent |
| Ждать N потоков | CountdownEvent |
| Синхронизация фаз | Barrier |
| Lock-free счётчик | Interlocked |
Thread-Safe Коллекции
ConcurrentDictionary
Internals
ConcurrentDictionary использует:
- Lock striping (сегментация lock'ов) — не один lock на всю коллекцию
- Каждый bucket имеет свой lock
- CAS операции для lock-free reads
- Dynamic resizing с lock migration
Основные операции
var dict = new ConcurrentDictionary<string, int>();
// Thread-safe добавление/получение
dict.TryAdd("key", 42);
dict.TryGetValue("key", out var value);
dict.TryUpdate("key", 100, 42); // Обновить только если current == 42
dict.TryRemove("key", out var removed);
// GetOrAdd — атомарно
// ВАЖНО: factory вызывается ВНУТРИ lock, но значение может быть выброшено
// если другой поток уже добавил значение
int result = dict.GetOrAdd("key", k => ComputeExpensive(k));
// AddOrUpdate — атомарно
int updated = dict.AddOrUpdate(
"key",
k => 1, // add factory
(k, v) => v + 1 // update factory
);
GetOrAdd caveat
// ВАЖНО: factory в GetOrAdd может вызваться несколько раз!
// Только одно значение будет сохранено, но factory вызывается для каждого конкурентного потока
var dict = new ConcurrentDictionary<string, Lazy<ExpensiveObject>>();
// ПЛОХО — factory может вызваться N раз
var obj = dict.GetOrAdd("key", _ => new ExpensiveObject());
// ХОРОШО — Lazy гарантирует однократное создание
var lazy = dict.GetOrAdd("key", _ => new Lazy<ExpensiveObject>(() => new ExpensiveObject()));
var obj = lazy.Value; // Создается только один раз
Когда использовать ConcurrentDictionary
Используйте ConcurrentDictionary когда:
✅ Высокая конкурентность (много потоков)
✅ Частые reads, умеренные writes
✅ Нужна thread safety без ручных lock'ов
НЕ используйте когда:
❌ Один поток (обычный Dictionary быстрее)
❌ Нужна атомарность нескольких операций (используйте lock)
❌ Нужен порядок элементов (ConcurrentDictionary не гарантирует порядок)
ConcurrentQueue
Lock-free очередь
// ConcurrentQueue — lock-free MPMC (multi-producer, multi-consumer)
// Использует CAS и segmented array
var queue = new ConcurrentQueue<int>();
// Enqueue — lock-free
queue.Enqueue(1);
queue.Enqueue(2);
queue.Enqueue(3);
// TryDequeue — lock-free
while (queue.TryDequeue(out var item))
{
Console.WriteLine(item);
}
// TryPeek — может быть stale (другой поток уже dequeued)
if (queue.TryPeek(out var peeked))
{
Console.WriteLine($"Следующий: {peeked}");
}
// Count — approximate (может измениться сразу после чтения)
Console.WriteLine($"Размер: {queue.Count}");
Producer-Consumer с ConcurrentQueue
var queue = new ConcurrentQueue<int>();
var signal = new AutoResetEvent(false);
bool completed = false;
// Producer
Task.Run(() =>
{
for (int i = 0; i < 1000; i++)
{
queue.Enqueue(i);
signal.Set(); // Сигнал consumer
}
completed = true;
signal.Set();
});
// Consumer
Task.Run(() =>
{
while (true)
{
while (queue.TryDequeue(out var item))
{
Process(item);
}
if (completed && queue.IsEmpty)
break;
signal.WaitOne(100); // Ждём с таймаутом
}
});
ConcurrentBag
Thread-local оптимизация
// ConcurrentBag оптимизирован для сценария:
// - Один поток добавляет и забирает (thread-local)
// - Другие потоки occasionally "steal" элементы
var bag = new ConcurrentBag<int>();
// Добавление — thread-local, очень быстрое
bag.Add(1);
bag.Add(2);
bag.Add(3);
// Извлечение — сначала из thread-local, потом steal
bool success = bag.TryTake(out var item);
bool peek = bag.TryPeek(out var peeked);
// Когда использовать:
// ✅ Producer и consumer — один поток
// ✅ Work-stealing scenarios
// ❌ Когда нужен порядок (bag не гарантирует FIFO)
BlockingCollection
Producer-Consumer из коробки
// BlockingCollection — обёртка над IProducerConsumerCollection
// Блокирует когда пусто (для Take) или полно (для Add с bounded)
// Unbounded
var collection = new BlockingCollection<int>();
// Bounded — с backpressure
var bounded = new BlockingCollection<int>(boundedCapacity: 100);
// Producer
Task.Run(() =>
{
for (int i = 0; i < 1000; i++)
{
bounded.Add(i); // Блокирует если capacity достигнут
}
bounded.CompleteAdding(); // Сигнал consumers
});
// Consumer
Task.Run(() =>
{
foreach (var item in bounded.GetConsumingEnumerable())
{
Process(item);
}
// Выход после CompleteAdding + очередь пуста
});
// Multiple consumers
var consumers = Enumerable.Range(0, 4)
.Select(_ => Task.Run(() =>
{
foreach (var item in bounded.GetConsumingEnumerable())
{
Process(item);
}
}))
.ToArray();
await Task.WhenAll(consumers);
TryAdd/TryTake с таймаутом
var collection = new BlockingCollection<int>(10);
// TryAdd — не блокирует бесконечно
if (collection.TryAdd(42, TimeSpan.FromSeconds(1)))
{
Console.WriteLine("Добавлено");
}
else
{
Console.WriteLine("Timeout — collection full");
}
// TryTake
if (collection.TryTake(out var item, TimeSpan.FromSeconds(1)))
{
Console.WriteLine($"Взято: {item}");
}
else
{
Console.WriteLine("Timeout — collection empty");
}
Channel
Modern Producer-Consumer
// Channel<T> — современный подход (.NET Core 3.0+)
// Более производительный чем BlockingCollection
// Поддерживает async I/O pattern
using System.Threading.Channels;
// Unbounded channel
var unbounded = Channel.CreateUnbounded<int>();
// Bounded channel — с backpressure
var bounded = Channel.CreateBounded<int>(new BoundedChannelOptions(100)
{
FullMode = BoundedChannelFullMode.Wait, // Ждать (по умолчанию)
// FullMode = BoundedChannelFullMode.DropOldest, // Удалить oldest
// FullMode = BoundedChannelFullMode.DropNewest, // Удалить newest
// FullMode = BoundedChannelFullMode.DropWrite, // Пропустить write
SingleReader = false,
SingleWriter = false
});
// Single reader/writer оптимизация
var singleReader = Channel.CreateBounded<int>(new BoundedChannelOptions(100)
{
SingleReader = true, // Оптимизация если один consumer
SingleWriter = true // Оптимизация если один producer
});
Producer-Consumer с Channel
var channel = Channel.CreateBounded<int>(10);
// Producer
var producer = Task.Run(async () =>
{
for (int i = 0; i < 1000; i++)
{
await channel.Writer.WriteAsync(i);
Console.WriteLine($"Written: {i}");
}
channel.Writer.Complete(); // Сигнал consumers
});
// Consumer
var consumer = Task.Run(async () =>
{
await foreach (var item in channel.Reader.ReadAllAsync())
{
Console.WriteLine($"Read: {item}");
await ProcessAsync(item);
}
});
await Task.WhenAll(producer, consumer);
Multiple Producers/Consumers
var channel = Channel.CreateBounded<int>(new BoundedChannelOptions(1000)
{
SingleReader = false,
SingleWriter = false
});
// Multiple producers
var producers = Enumerable.Range(0, 4)
.Select(i => Task.Run(async () =>
{
for (int j = 0; j < 250; j++)
{
await channel.Writer.WriteAsync(i * 250 + j);
}
}))
.ToArray();
// Multiple consumers
var consumers = Enumerable.Range(0, 4)
.Select(_ => Task.Run(async () =>
{
await foreach (var item in channel.Reader.ReadAllAsync())
{
await ProcessAsync(item);
}
}))
.ToArray();
await Task.WhenAll(producers);
channel.Writer.Complete();
await Task.WhenAll(consumers);
Channel vs BlockingCollection
| Характеристика | Channel | BlockingCollection |
| Async support | Полное (ValueTask) | Нет (только sync) |
| Performance | Выше | Ниже |
| Backpressure | BoundedChannelFullMode | Bounded capacity |
| API | Modern, async-first | Legacy, sync |
| Completion | Writer.Complete() | CompleteAdding() |
Compare-and-Swap (CAS) и Interlocked
CAS Operations
CAS (Compare-And-Swap) — атомарная операция:
1. Сравнить текущее значение с ожидаемым
2. Если равно — заменить на новое
3. Вернуть старое значение
Реализуется через CPU инструкцию CMPXCHG
Interlocked методы
private int _counter = 0;
private long _total = 0;
// Increment/Decrement
Interlocked.Increment(ref _counter);
Interlocked.Decrement(ref _counter);
// Add
Interlocked.Add(ref _total, 100);
// Exchange (атомарная замена)
int old = Interlocked.Exchange(ref _counter, 0);
// CompareExchange (CAS)
// Если _counter == expected, заменить на newValue
// Возвращает original value
int original = Interlocked.CompareExchange(ref _counter, newValue, expected);
// Pattern: retry loop
do
{
original = _counter;
newValue = original + 1;
} while (Interlocked.CompareExchange(ref _counter, newValue, original) != original);
Interlocked для объектов
private object? _cache = null;
public object GetOrCreate()
{
var current = Volatile.Read(ref _cache);
if (current != null)
return current;
var created = new object();
var original = Interlocked.CompareExchange(ref _cache, created, null);
// Если original != null, другой поток уже установил значение
return original ?? created;
}
Interlocked.Read для 64-битных значений
// На 32-битных системах чтение long не атомарно
// Interlocked.Read гарантирует атомарность
private long _value;
public long GetValue()
{
return Interlocked.Read(ref _value);
}
public void SetValue(long newValue)
{
Interlocked.Exchange(ref _value, newValue);
}
Практика
Задание 1: Rate Limiter через ConcurrentDictionary + Interlocked
// Token bucket algorithm
public class RateLimiter
{
private readonly ConcurrentDictionary<string, Bucket> _buckets = new();
private readonly int _maxTokens;
private readonly TimeSpan _refillInterval;
private record Bucket(int Tokens, DateTime LastRefill);
public RateLimiter(int maxTokens, TimeSpan refillInterval)
{
_maxTokens = maxTokens;
_refillInterval = refillInterval;
}
public bool TryConsume(string clientId)
{
// Реализуйте:
// 1. GetOrAdd bucket для клиента
// 2. Refill tokens если прошло достаточно времени
// 3. Interlocked.Decrement если tokens > 0
// 4. Вернуть true если token получен
}
public int GetRemainingTokens(string clientId)
{
// Вернуть оставшиеся tokens
}
}
Задание 2: Distributed Lock Simulator с Channel
// Симуляция distributed lock через messaging
public class DistributedLockSimulator
{
private readonly Channel<LockMessage> _messageChannel;
private readonly Dictionary<string, LockState> _locks = new();
private record LockMessage(string LockName, string ClientId, LockAction Action);
private enum LockAction { Acquire, Release }
private record LockState(string Owner, DateTime AcquiredAt);
public DistributedLockSimulator()
{
_messageChannel = Channel.CreateUnbounded<LockMessage>();
_ = ProcessMessagesAsync();
}
public async Task<bool> AcquireAsync(string lockName, string clientId)
{
await _messageChannel.Writer.WriteAsync(
new LockMessage(lockName, clientId, LockAction.Acquire));
// Ждать response
}
public async Task ReleaseAsync(string lockName, string clientId)
{
await _messageChannel.Writer.WriteAsync(
new LockMessage(lockName, clientId, LockAction.Release));
}
private async Task ProcessMessagesAsync()
{
await foreach (var msg in _messageChannel.Reader.ReadAllAsync())
{
// Обработать lock/unlock
}
}
}
Задание 3: Thread-Safe Event Log с Bounded Capacity
public class ThreadSafeEventLog
{
private readonly ConcurrentQueue<LogEntry> _entries = new();
private readonly int _maxCapacity;
private int _count;
public record LogEntry(DateTime Timestamp, string Level, string Message);
public ThreadSafeEventLog(int maxCapacity)
{
_maxCapacity = maxCapacity;
}
public void Write(string level, string message)
{
// Реализуйте:
// 1. Interlocked.Increment для _count
// 2. Если _count > _maxCapacity, удалить oldest (TryDequeue)
// 3. Enqueue новую запись
}
public IReadOnlyList<LogEntry> GetEntries(int count = 100)
{
// Вернуть последние count entries
// Snapshot — не блокирует writes
}
public int Count => _count;
}
Шпаргалка: Выбор коллекции
| Сценарий | Коллекция |
| High-concurrency key-value | ConcurrentDictionary |
| Producer-Consumer (sync) | BlockingCollection |
| Producer-Consumer (async) | Channel<T> |
| FIFO очередь (multi-thread) | ConcurrentQueue |
| Work-stealing | ConcurrentBag |
| Lock-free счётчик | Interlocked |
| Lock-free CAS | Interlocked.CompareExchange |
Async/Await Deep Dive
State Machine Generated by Compiler
Как работает async/await
// Исходный код
public async Task<int> ComputeAsync()
{
var a = await GetValueAsync(1);
var b = await GetValueAsync(2);
return a + b;
}
// Компилятор генерирует state machine (упрощённо):
[CompilerGenerated]
private struct <ComputeAsync>d__0 : IAsyncStateMachine
{
public int <>1__state;
public AsyncTaskMethodBuilder<int> <>t__builder;
private TaskAwaiter<int> <>u__1;
private int <a>5__1;
private int <b>5__2;
void IAsyncStateMachine.MoveNext()
{
int result;
try
{
switch (<>1__state)
{
case 0:
// await GetValueAsync(1)
var awaiter1 = GetValueAsync(1).GetAwaiter();
if (!awaiter1.IsCompleted)
{
<>1__state = 0;
<>u__1 = awaiter1;
<>t__builder.AwaitUnsafeOnCompleted(ref awaiter1, ref this);
return;
}
goto case 1;
case 1:
<a>5__1 = <>u__1.GetResult();
// await GetValueAsync(2)
var awaiter2 = GetValueAsync(2).GetAwaiter();
if (!awaiter2.IsCompleted)
{
<>1__state = 1;
<>u__1 = awaiter2;
<>t__builder.AwaitUnsafeOnCompleted(ref awaiter2, ref this);
return;
}
goto case 2;
case 2:
<b>5__2 = <>u__1.GetResult();
result = <a>5__1 + <b>5__2;
break;
}
}
catch (Exception ex)
{
<>1__state = -2;
<>t__builder.SetException(ex);
return;
}
<>1__state = -2;
<>t__builder.SetResult(result);
}
}
IL код (ключевые моменты)
// AsyncMethodBuilder создаётся
call instance void [System.Runtime]System.Runtime.CompilerServices.AsyncTaskMethodBuilder::Start<...>(!!0&)
// GetAwaiter вызывается
call instance valuetype ... Task::GetAwaiter()
// IsCompleted проверяется
call instance bool TaskAwaiter::get_IsCompleted()
// Если false — AwaitUnsafeOnCompleted
call instance void AsyncTaskMethodBuilder::AwaitUnsafeOnCompleted<...>(!!0&, !!1&)
// Если true — GetResult
call instance !0 TaskAwaiter::GetResult()
Разбор состояний
State -1: Start (initial)
State 0: После первого await
State 1: После второго await
State -2: Complete
Каждый await — потенциальная точка возврата
MoveNext() вызывается при каждом continuation
SynchronizationContext
Что такое SynchronizationContext
// SynchronizationContext — абстракция для "где продолжить"
// Определяет на каком потоке продолжится код после await
// Основные реализации:
// 1. WindowsFormsSynchronizationContext — UI поток WinForms
// 2. DispatcherSynchronizationContext — UI поток WPF
// 3. AspNetSynchronizationContext — ASP.NET (legacy)
// 4. null — Console/ThreadPool (продолжение на любом ThreadPool потоке)
Как работает захват контекста
// UI приложение (WPF/WinForms):
public async Task<Button> LoadDataAsync()
{
// Здесь: SynchronizationContext = DispatcherSynchronizationContext
var data = await httpClient.GetStringAsync(url);
// После await: продолжение на UI потоке (захвачен контекст)
button.Content = data; // OK — UI поток
return button;
}
// Console приложение:
public async Task ProcessAsync()
{
// Здесь: SynchronizationContext = null
var data = await httpClient.GetStringAsync(url);
// После await: продолжение на любом ThreadPool потоке
Console.WriteLine(data);
}
Когда SynchronizationContext мешает
// DEADLOCK в UI приложении
public string GetData()
{
return GetDataAsync().Result; // Блокирует UI поток
}
public async Task<string> GetDataAsync()
{
var data = await httpClient.GetStringAsync(url);
// Пытается вернуться на UI поток — но он заблокирован!
return data;
}
// ИСПРАВЛЕНИЕ 1: ConfigureAwait(false)
public async Task<string> GetDataAsync()
{
var data = await httpClient.GetStringAsync(url).ConfigureAwait(false);
// Продолжение на ThreadPool, не на UI
return data;
}
// ИСПРАВЛЕНИЕ 2: async all the way
public async Task<string> GetDataAsync()
{
return await GetDataInternalAsync();
}
ConfigureAwait(false)
Где обязательно
// ✅ В библиотечном коде — всегда ConfigureAwait(false)
public async Task<string> GetDataAsync()
{
var response = await _httpClient.GetAsync(url).ConfigureAwait(false);
var content = await response.Content.ReadAsStringAsync().ConfigureAwait(false);
return content;
}
// ✅ В middleware — обычно не нужно (нужен HttpContext)
public async Task InvokeAsync(HttpContext context)
{
var data = await _service.GetDataAsync();
// ConfigureAwait(false) МОЖЕТ сломать HttpContext
await context.Response.WriteAsync(data);
}
Где не нужно
// ❌ В UI коде — нужно обновлять UI
public async Task LoadDataAsync()
{
var data = await _service.GetDataAsync();
// ConfigureAwait(false) НЕЛЬЗЯ — нужно обновить UI
textBox.Text = data;
}
// ❌ В ASP.NET Core — нет SynchronizationContext
// ASP.NET Core НЕ использует SynchronizationContext
// ConfigureAwait(false) не нужен, но и не вредит
ConfigureAwait(false) в .NET 8+
// .NET 8: Task.CompletedTask и другие "завершённые" задачи
// игнорируют ConfigureAwait — продолжение синхронно
// .NET 8: Linker может удалить ConfigureAwait(false) в publish
// если приложение не использует SynchronizationContext
ValueTask vs Task
Когда использовать ValueTask
// ValueTask<T> — struct, может быть allocation-free
// Используйте когда:
// - Результат часто доступен синхронно
// - Метод вызывается очень часто
// - Важна минимизация аллокаций
// ПЛОХО — ValueTask с async методом (бессмысленно)
public async ValueTask<int> BadAsync() // async создаёт Task, не ValueTask!
{
await Task.Delay(100);
return 42;
}
// ХОРОШО — синхронный результат возможен
public ValueTask<int> ReadAsync()
{
if (_buffer.HasData)
{
return new ValueTask<int>(_buffer.Read()); // Синхронно, без аллокации
}
return new ValueTask<int>(ReadFromDiskAsync()); // Асинхронно, аллокация
}
// ХОРОШО — I/O с кэшем
public ValueTask<byte[]> GetAsync(string key)
{
if (_cache.TryGetValue(key, out var data))
{
return new ValueTask<byte[]>(data); // Cache hit — без аллокации
}
return new ValueTask<byte[]>(FetchFromStorageAsync(key));
}
ValueTask ограничения
// ⚠️ ValueTask НЕЛЬЗЯ await'ить несколько раз
var vt = GetValueTask();
await vt;
await vt; // ОШИБКА! ValueTask можно await только один раз
// ⚠️ ValueTask НЕЛЬЗЯ хранить для later await
var vt = GetValueTask();
await Task.Delay(100);
await vt; // ОШИБКА! ValueTask может быть уже использован
// ✅ Task можно await'ить много раз
var task = GetTask();
await task;
await task; // OK
// ✅ Если нужно multiple await — используйте AsTask()
var vt = GetValueTask();
var task = vt.AsTask();
await task;
await task; // OK
Benchmark: Task vs ValueTask
1,000,000 вызовов (sync result):
Task<int>: ~50ms, 32MB аллокаций
ValueTask<int>: ~5ms, 0MB аллокаций
1,000,000 вызовов (async result):
Task<int>: ~100ms, 32MB аллокаций
ValueTask<int>: ~110ms, 32MB аллокаций (такой же overhead)
IAsyncDisposable
Паттерн async disposal
// IAsyncDisposable — для async cleanup
public class AsyncConnection : IAsyncDisposable
{
private readonly HttpClient _client = new();
public async Task<string> GetDataAsync()
{
return await _client.GetStringAsync("https://api.example.com");
}
public async ValueTask DisposeAsync()
{
// Async cleanup — например, graceful shutdown
await _client.GetAsync("https://api.example.com/logout");
_client.Dispose();
}
}
// Использование с await using
await using var conn = new AsyncConnection();
var data = await conn.GetDataAsync();
// DisposeAsync вызывается автоматически
// В scope
await using (var conn = new AsyncConnection())
{
var data = await conn.GetDataAsync();
}
AsyncEnumerator с IAsyncDisposable
public class AsyncResourceReader : IAsyncEnumerable<string>, IAsyncDisposable
{
private readonly FileStream _stream;
public AsyncResourceReader(string path)
{
_stream = new FileStream(path, FileMode.Open, FileAccess.Read);
}
public async IAsyncEnumerator<string> GetAsyncEnumerator(CancellationToken ct = default)
{
using var reader = new StreamReader(_stream);
string? line;
while ((line = await reader.ReadLineAsync()) != null)
{
yield return line;
}
}
public ValueTask DisposeAsync()
{
return _stream.DisposeAsync();
}
}
IAsyncEnumerable и IAsyncEnumerator
Базовое использование
// Async stream — async генератор значений
public async IAsyncEnumerable<int> GetNumbersAsync(int count)
{
for (int i = 0; i < count; i++)
{
await Task.Delay(100); // Имитация async операции
yield return i;
}
}
// Потребление
await foreach (var number in GetNumbersAsync(10))
{
Console.WriteLine(number);
}
// С CancellationToken
await foreach (var number in GetNumbersAsync(10).WithCancellation(ct))
{
Console.WriteLine(number);
}
// С ConfigureAwait
await foreach (var number in GetNumbersAsync(10).ConfigureAwait(false))
{
Console.WriteLine(number);
}
Реализация IAsyncEnumerable
public class AsyncFileReader : IAsyncEnumerable<string>
{
private readonly string _path;
public AsyncFileReader(string path) => _path = path;
public async IAsyncEnumerator<string> GetAsyncEnumerator(CancellationToken ct = default)
{
await using var stream = new FileStream(_path, FileMode.Open, FileAccess.Read);
using var reader = new StreamReader(stream);
string? line;
while ((line = await reader.ReadLineAsync().WaitAsync(ct)) != null)
{
ct.ThrowIfCancellationRequested();
yield return line;
}
}
}
LINQ с IAsyncEnumerable
// System.Linq.Async (NuGet: System.Interactive.Async)
var numbers = GetNumbersAsync(100);
var result = await numbers
.Where(n => n % 2 == 0)
.Select(n => n * 10)
.ToListAsync();
// Или вручную
public static async IAsyncEnumerable<T> WhereAsync<T>(
this IAsyncEnumerable<T> source,
Func<T, bool> predicate)
{
await foreach (var item in source)
{
if (predicate(item))
yield return item;
}
}
Практика
Задание 1: Async Pipeline с Channel и Backpressure
// Pipeline: Source → Transform → Filter → Sink
public class AsyncPipeline<TInput, TOutput>
{
private readonly Channel<TInput> _inputChannel;
private readonly Channel<TOutput> _outputChannel;
private readonly Func<TInput, Task<TOutput>> _transform;
private readonly Func<TOutput, bool> _filter;
public AsyncPipeline(
int capacity,
Func<TInput, Task<TOutput>> transform,
Func<TOutput, bool> filter)
{
_inputChannel = Channel.CreateBounded<TInput>(capacity);
_outputChannel = Channel.CreateBounded<TOutput>(capacity);
_transform = transform;
_filter = filter;
}
public async Task StartAsync(int parallelism, CancellationToken ct)
{
// Запустите parallelism workers
// Каждый: читает из _inputChannel, transform, filter, пишет в _outputChannel
}
public async Task EnqueueAsync(TInput item, CancellationToken ct)
{
await _inputChannel.Writer.WriteAsync(item, ct);
}
public IAsyncEnumerable<TOutput> Results()
{
return _outputChannel.Reader.ReadAllAsync();
}
public void Complete()
{
_inputChannel.Writer.Complete();
}
}
Задание 2: Custom Awaiter
// Custom awaiter для non-task async operations
public struct DelayAwaiter : ICriticalNotifyCompletion
{
private readonly TimeSpan _delay;
private readonly Timer _timer;
private Action? _continuation;
public DelayAwaiter(TimeSpan delay)
{
_delay = delay;
_timer = new Timer(OnTimerCallback);
}
public bool IsCompleted => false;
public void OnCompleted(Action continuation)
{
_continuation = continuation;
_timer.Change(_delay, Timeout.InfiniteTimeSpan);
}
public void UnsafeOnCompleted(Action continuation)
{
OnCompleted(continuation);
}
public void GetResult() { }
private void OnTimerCallback(object? state)
{
_continuation?.Invoke();
}
}
// Extension method
public static class DelayExtensions
{
public static DelayAwaiter GetAwaiter(this TimeSpan delay)
{
return new DelayAwaiter(delay);
}
}
// Использование
await TimeSpan.FromSeconds(1); // Custom awaiter!
Задание 3: Streaming API Client
public class StreamingApiClient : IAsyncDisposable
{
private readonly HttpClient _client;
public StreamingApiClient(HttpClient client)
{
_client = client;
}
public async IAsyncEnumerable<HttpResponseMessage> StreamAsync(
IEnumerable<string> urls,
[EnumeratorCancellation] CancellationToken ct = default)
{
foreach (var url in urls)
{
ct.ThrowIfCancellationRequested();
var response = await _client.GetAsync(url, ct);
yield return response;
}
}
public async IAsyncEnumerable<string> StreamLinesAsync(
string url,
[EnumeratorCancellation] CancellationToken ct = default)
{
using var response = await _client.GetAsync(url, HttpCompletionOption.ResponseHeadersRead, ct);
using var stream = await response.Content.ReadAsStreamAsync(ct);
using var reader = new StreamReader(stream);
string? line;
while ((line = await reader.ReadLineAsync()) != null)
{
ct.ThrowIfCancellationRequested();
yield return line;
}
}
public ValueTask DisposeAsync()
{
_client.Dispose();
return default;
}
}
Шпаргалка: Async Best Practices
| Правило | Описание |
| Async all the way | Не блокируйте async код через .Result/.Wait() |
| ConfigureAwait(false) | В библиотечном коде всегда |
| ValueTask | Когда результат часто синхронный |
| IAsyncEnumerable | Для streaming данных |
| IAsyncDisposable | Для async cleanup ресурсов |
| CancellationToken | Всегда передавайте в async методы |
| Task.WhenAll | Для параллельного выполнения |
| Avoid async void | Только для event handlers |
Parallel Programming
Parallel.ForEach и Parallel.For
Базовое использование
// Parallel.For — параллельный цикл
Parallel.For(0, 100, i =>
{
Console.WriteLine($"Processing {i} on thread {Thread.CurrentThread.ManagedThreadId}");
});
// Parallel.ForEach — параллельная обработка коллекции
var items = Enumerable.Range(0, 1000);
Parallel.ForEach(items, item =>
{
Process(item);
});
// С ParallelOptions
Parallel.ForEach(items, new ParallelOptions
{
MaxDegreeOfParallelism = 4,
CancellationToken = ct
}, item =>
{
Process(item);
});
Partitioning
// Partitioner — контроль над разбиением данных
var items = Enumerable.Range(0, 1000000);
// Range partitioning — для равномерных workload
Parallel.ForEach(Partitioner.Create(0, items.Count()), range =>
{
for (int i = range.Item1; i < range.Item2; i++)
{
Process(items.ElementAt(i));
}
});
// Chunk partitioning — для неравномерных workload
Parallel.ForEach(Partitioner.Create(items, EnumerablePartitionerOptions.NoBuffering), item =>
{
Process(item);
});
// Custom partitioner
var partitioner = Partitioner.Create(items, true); // load balancing
Parallel.ForEach(partitioner, item => Process(item));
Degree of Parallelism
// MaxDegreeOfParallelism — ограничение параллелизма
var options = new ParallelOptions
{
MaxDegreeOfParallelism = Environment.ProcessorCount // По умолчанию
// MaxDegreeOfParallelism = 4 // Фиксированное
// MaxDegreeOfParallelism = -1 // Без ограничений
};
// Когда ограничивать:
// - I/O bound операции (не используйте все ядра)
// - Shared resource (БД, external API)
// - Memory pressure (больше потоков = больше памяти)
PLINQ (AsParallel)
Базовое использование
// AsParallel — параллельный LINQ
var result = Enumerable.Range(0, 1000000)
.AsParallel()
.Where(n => IsPrime(n))
.ToList();
// С сохранением порядка
var ordered = Enumerable.Range(0, 1000000)
.AsParallel()
.AsOrdered() // Сохраняет порядок (медленнее)
.Where(n => IsPrime(n))
.ToList();
// С ограничением параллелизма
var limited = Enumerable.Range(0, 1000000)
.AsParallel()
.WithDegreeOfParallelism(4)
.Where(n => IsPrime(n))
.ToList();
Когда использовать PLINQ
Используйте PLINQ когда:
✅ CPU-bound операции
✅ Большие коллекции (> 10000 элементов)
✅ Простые трансформации (Where, Select, Aggregate)
✅ Нет side effects
НЕ используйте PLINQ когда:
❌ I/O bound операции (используйте async/await)
❌ Маленькие коллекции (overhead > benefit)
❌ Нужен порядок и порядок важен (AsOrdered замедляет)
❌ Сложные операции с состоянием
PLINQ vs Parallel.ForEach
| Характеристика | PLINQ | Parallel.ForEach |
| API | Declarative (LINQ) | Imperative |
| Composability | Высокая | Низкая |
| Order preservation | AsOrdered() | Нет |
| Early exit | TakeWhile | Stop() |
| Performance | Чуть медленнее | Чуть быстрее |
Task.WhenAll, Task.WhenAny, Task.WhenEach
Task.WhenAll
// WhenAll — ждать все задачи
var tasks = urls.Select(url => httpClient.GetStringAsync(url));
string[] results = await Task.WhenAll(tasks);
// С обработкой ошибок
var tasks = new[]
{
Task1Async(),
Task2Async(),
Task3Async()
};
try
{
await Task.WhenAll(tasks);
}
catch (Exception ex)
{
// WhenAll бросает первую ошибку
// Все ошибки доступны в tasks[].Exception
foreach (var task in tasks.Where(t => t.IsFaulted))
{
Console.WriteLine(task.Exception?.InnerException);
}
}
Task.WhenAny
// WhenAny — ждать первую завершённую задачу
var tasks = new[]
{
httpClient.GetStringAsync(url1),
httpClient.GetStringAsync(url2),
httpClient.GetStringAsync(url3)
};
Task<string> first = await Task.WhenAny(tasks);
string result = await first; // Получить результат
// Timeout pattern
var task = SlowOperationAsync();
var timeout = Task.Delay(TimeSpan.FromSeconds(5));
var completed = await Task.WhenAny(task, timeout);
if (completed == timeout)
{
throw new TimeoutException();
}
Task.WhenEach (.NET 6+)
// WhenEach — yield results as they complete
var tasks = urls.Select(url => httpClient.GetStringAsync(url));
await foreach (var result in Task.WhenEach(tasks))
{
Console.WriteLine($"Completed: {result.Result.Length} chars");
}
// Когда использовать WhenEach:
// - Нужна обработка результатов по мере готовности
// - Не нужно ждать все задачи
// - Streaming processing
Cancellation Patterns
CancellationToken Basics
// Создание CancellationToken
var cts = new CancellationTokenSource();
CancellationToken token = cts.Token;
// Отмена
cts.Cancel(); // Синхронная отмена
cts.CancelAfter(5000); // Отмена через 5 секунд
await cts.CancelAsync(); // Асинхронная отмена (вызывает registered callbacks async)
// Linked token
var cts1 = new CancellationTokenSource();
var cts2 = new CancellationTokenSource();
var linkedCts = CancellationTokenSource.CreateLinkedTokenSource(cts1.Token, cts2.Token);
// linkedCts.Cancel() если любой из tokens отменён
Propagation
// Передавайте CancellationToken через весь call chain
public async Task ProcessAsync(CancellationToken ct = default)
{
await FetchDataAsync(ct);
await TransformDataAsync(ct);
await SaveDataAsync(ct);
}
private async Task<string> FetchDataAsync(CancellationToken ct)
{
// HttpClient поддерживает cancellation
var response = await _httpClient.GetAsync(url, ct);
return await response.Content.ReadAsStringAsync(ct);
}
// Проверка cancellation
public async Task LongRunningAsync(CancellationToken ct)
{
for (int i = 0; i < 1000; i++)
{
ct.ThrowIfCancellationRequested(); // Проверка в цикле
await ProcessItemAsync(i, ct);
}
}
Cleanup при отмене
// finally блок выполняется при отмене
public async Task ProcessAsync(CancellationToken ct)
{
var resource = await AcquireResourceAsync(ct);
try
{
await UseResourceAsync(resource, ct);
}
finally
{
// Cleanup всегда выполняется
await resource.ReleaseAsync();
}
}
// Register callback на отмену
public async Task ProcessAsync(CancellationToken ct)
{
using var registration = ct.Register(() =>
{
Console.WriteLine("Отмена запрошена!");
// Cleanup logic
});
await LongOperationAsync(ct);
}
Cancellation с Task.WhenAll
// Когда одна задача отменена — отменить остальные
public async Task ProcessAllAsync(CancellationToken ct)
{
var tasks = new[]
{
Task1Async(ct),
Task2Async(ct),
Task3Async(ct)
};
try
{
await Task.WhenAll(tasks);
}
catch (OperationCanceledException)
{
// Одна задача отменена — ждать остальные для cleanup
await Task.WhenAll(tasks.Where(t => !t.IsCanceled));
throw;
}
}
Практика
Задание 1: Parallel Data Processor с Dynamic Partitioning
public class ParallelDataProcessor<TInput, TOutput>
{
private readonly Func<TInput, TOutput> _process;
private readonly int _batchSize;
public ParallelDataProcessor(Func<TInput, TOutput> process, int batchSize = 100)
{
_process = process;
_batchSize = batchSize;
}
public IReadOnlyList<TOutput> Process(IReadOnlyList<TInput> inputs, int maxDegreeOfParallelism = -1)
{
var outputs = new TOutput[inputs.Count];
// Реализуйте:
// 1. Partition inputs на batches
// 2. Parallel.ForEach с dynamic partitioning
// 3. Aggregate results
// 4. Поддержка CancellationToken
return outputs;
}
public async Task<IReadOnlyList<TOutput>> ProcessAsync(
IReadOnlyList<TInput> inputs,
int maxDegreeOfParallelism = -1,
CancellationToken ct = default)
{
// Async версия с Parallel.ForEachAsync (.NET 6+)
}
}
Задание 2: Fault-Tolerant Batch Processor
public class FaultTolerantBatchProcessor<T>
{
private readonly int _maxRetries;
private readonly TimeSpan _retryDelay;
private readonly CircuitBreaker _circuitBreaker;
public FaultTolerantBatchProcessor(int maxRetries, TimeSpan retryDelay)
{
_maxRetries = maxRetries;
_retryDelay = retryDelay;
_circuitBreaker = new CircuitBreaker(5, TimeSpan.FromSeconds(30));
}
public async Task<IReadOnlyList<Result<T>>> ProcessBatchAsync(
IReadOnlyList<T> items,
Func<T, Task<Result<T>>> processor,
CancellationToken ct = default)
{
// Реализуйте:
// 1. Разбейте на batches
// 2. Parallel.ForEachAsync с ограничением параллелизма
// 3. Retry logic с exponential backoff
// 4. Circuit breaker — если много ошибок, stop processing
// 5. Cancellation propagation
}
}
public class CircuitBreaker
{
private readonly int _failureThreshold;
private readonly TimeSpan _resetTimeout;
private int _failures;
private DateTime? _lastFailure;
private CircuitState _state = CircuitState.Closed;
public enum CircuitState { Closed, Open, HalfOpen }
public CircuitBreaker(int failureThreshold, TimeSpan resetTimeout)
{
_failureThreshold = failureThreshold;
_resetTimeout = resetTimeout;
}
public async Task<T> ExecuteAsync<T>(Func<Task<T>> action, CancellationToken ct)
{
// Реализуйте circuit breaker pattern
}
}
Задание 3: Concurrent HTTP Client
public class ConcurrentHttpClient : IAsyncDisposable
{
private readonly HttpClient _httpClient;
private readonly SemaphoreSlim _semaphore;
private readonly int _maxConcurrentRequests;
public ConcurrentHttpClient(int maxConcurrentRequests)
{
_httpClient = new HttpClient();
_maxConcurrentRequests = maxConcurrentRequests;
_semaphore = new SemaphoreSlim(maxConcurrentRequests);
}
public async Task<HttpResponseMessage> GetAsync(
string url,
CancellationToken ct = default)
{
await _semaphore.WaitAsync(ct);
try
{
return await _httpClient.GetAsync(url, ct);
}
finally
{
_semaphore.Release();
}
}
public async Task<IReadOnlyList<HttpResponseMessage>> GetManyAsync(
IEnumerable<string> urls,
CancellationToken ct = default)
{
// Реализуйте:
// 1. Task.WhenAll с ограничением через _semaphore
// 2. Cancellation propagation
// 3. Error handling — не fail fast, collect errors
}
public ValueTask DisposeAsync()
{
_httpClient.Dispose();
_semaphore.Dispose();
return default;
}
}
Шпаргалка: Parallel vs Async
| Сценарий | Подход |
| CPU-bound, много данных | Parallel.ForEach |
| CPU-bound, LINQ pipeline | PLINQ |
| I/O-bound, много запросов | async/await + Task.WhenAll |
| I/O-bound, streaming | IAsyncEnumerable |
| Mixed CPU + I/O | Parallel.ForEachAsync |
| Need results as they complete | Task.WhenEach |
| Timeout | Task.WhenAny(task, Task.Delay(timeout)) |
| Cancellation | CancellationToken через весь chain |
Advanced Concurrency Patterns
Actor Model
Концепции Actor Model
Actor — фундаментальная единица вычислений:
1. Каждый actor имеет уникальный адрес
2. Actors общаются только через messages
3. Actors обрабатывают messages последовательно
4. Actors могут создавать других actors
5. Actors могут менять своё состояние
Преимущества:
- Нет shared state — нет race conditions
- Естественная распределённость
- Fault isolation — crash одного actor не влияет на других
Orleans концепции
// Orleans — virtual actor model от Microsoft
// Grain = actor, Silo = host
// Grain interface
public interface IUserGrain : IGrainWithIntegerKey
{
Task<string> GetNameAsync();
Task SetNameAsync(string name);
Task<int> GetLoginCountAsync();
}
// Grain implementation
public class UserGrain : Grain, IUserGrain
{
private string _name = "";
private int _loginCount;
public Task<string> GetNameAsync() => Task.FromResult(_name);
public Task SetNameAsync(string name)
{
_name = name;
return Task.CompletedTask;
}
public Task<int> GetLoginCountAsync() => Task.FromResult(_loginCount);
public override Task OnActivateAsync(CancellationToken ct)
{
// Вызывается при активации grain
return base.OnActivateAsync(ct);
}
}
// Использование
var grain = grainFactory.GetGrain<IUserGrain>(userId);
await grain.SetNameAsync("John");
var name = await grain.GetNameAsync();
Akka.NET концепции
// Akka.NET — actor model для .NET
// Actor definition
public class WorkerActor : ReceiveActor
{
public WorkerActor()
{
Receive<ProcessMessage>(msg =>
{
// Обработка сообщения
Sender.Tell(new Result(msg.Data));
});
Receive<StopMessage>(msg =>
{
Context.Stop(Self);
});
}
}
// Actor system
var system = ActorSystem.Create("MySystem");
var worker = system.ActorOf(Props.Create<WorkerActor>(), "worker");
// Send message
worker.Tell(new ProcessMessage("data"));
// Ask pattern (request-response)
var result = await worker.Ask<Result>(new ProcessMessage("data"));
Simple Actor System
// Простая реализация actor system
public class ActorSystem
{
private readonly Channel<ActorMessage> _mailbox = Channel.CreateUnbounded<ActorMessage>();
private readonly Dictionary<string, Actor> _actors = new();
private readonly CancellationTokenSource _cts = new();
public ActorRef CreateActor(string name, Func<CancellationToken, Task> behavior)
{
var actor = new Actor(name, behavior, _mailbox.Writer);
_actors[name] = actor;
return actor.Ref;
}
public async Task StartAsync()
{
await foreach (var msg in _mailbox.Reader.ReadAllAsync(_cts.Token))
{
await msg.Handler(_cts.Token);
}
}
public async Task StopAsync()
{
_cts.Cancel();
_mailbox.Writer.Complete();
}
}
public record ActorMessage(string Target, Func<CancellationToken, Task> Handler);
public class Actor
{
public ActorRef Ref { get; }
public Actor(string name, Func<CancellationToken, Task> behavior, ChannelWriter<ActorMessage> writer)
{
Ref = new ActorRef(name, writer);
_ = RunAsync(behavior);
}
private async Task RunAsync(Func<CancellationToken, Task> behavior)
{
while (true)
{
try
{
await behavior(CancellationToken.None);
}
catch (OperationCanceledException)
{
break;
}
}
}
}
public class ActorRef
{
private readonly string _name;
private readonly ChannelWriter<ActorMessage> _writer;
public ActorRef(string name, ChannelWriter<ActorMessage> writer)
{
_name = name;
_writer = writer;
}
public async Task TellAsync(Func<CancellationToken, Task> handler, CancellationToken ct = default)
{
await _writer.WriteAsync(new ActorMessage(_name, handler), ct);
}
}
Immutable Data + Structural Sharing
Immutable Collections
// System.Collections.Immutable (NuGet)
// ImmutableList
var list1 = ImmutableList<int>.Empty.Add(1).Add(2).Add(3);
var list2 = list1.Add(4); // Новый список, list1 не изменён
// ImmutableDictionary
var dict1 = ImmutableDictionary<string, int>.Empty
.Add("a", 1)
.Add("b", 2);
var dict2 = dict1.SetItem("a", 10); // Новый dict
// ImmutableHashSet
var set1 = ImmutableHashSet<int>.Empty.Add(1).Add(2);
var set2 = set1.Add(3);
// ImmutableSortedDictionary — ordered
var sorted = ImmutableSortedDictionary<int, string>.Empty
.Add(3, "three")
.Add(1, "one")
.Add(2, "two");
Structural Sharing
Immutable коллекции используют structural sharing:
- Новые версии разделяют данные со старыми
- Изменяется только путь от корня до изменённого узла
- O(log N) для update вместо O(N)
Пример для ImmutableList (AVL tree):
Root Root'
/ \ / \
A B → A' B
/ \ / \
C D C D'
Только путь до D' создан заново, A, B, C shared
Persistent Data Structures
// Пример persistent data structure
public class PersistentList<T>
{
private readonly Node _root;
private readonly int _count;
private sealed record Node(T Value, Node? Left, Node? Right, int Height);
private PersistentList(Node root, int count)
{
_root = root;
_count = count;
}
public static PersistentList<T> Empty => new(null!, 0);
public PersistentList<T> Add(T value)
{
var newNode = new Node(value, null, null, 1);
var newRoot = Insert(_root, newNode, 0);
return new PersistentList<T>(newRoot, _count + 1);
}
private Node Insert(Node? node, Node newNode, int index)
{
if (node == null) return newNode;
var mid = Count(node.Left) + 1;
if (index < mid)
{
return node with { Left = Insert(node.Left, newNode, index) };
}
else
{
return node with { Right = Insert(node.Right, newNode, index - mid) };
}
}
private int Count(Node? node) => node == null ? 0 : 1 + Count(node.Left) + Count(node.Right);
}
Thread Safety без Locks
// Immutable данные thread-safe по определению
public class ImmutableCache<TKey, TValue>
{
private ImmutableDictionary<TKey, TValue> _data = ImmutableDictionary<TKey, TValue>.Empty;
public void Add(TKey key, TValue value)
{
// CAS loop — нет lock
ImmutableDictionary<TKey, TValue> original, updated;
do
{
original = _data;
updated = original.Add(key, value);
}
while (Interlocked.CompareExchange(ref _data, updated, original) != original);
}
public bool TryGetValue(TKey key, out TValue value)
{
return _data.TryGetValue(key, out value);
}
}
Lock-Free Algorithms
MPMC Queue
// Lock-free MPMC queue (упрощённая версия)
public class LockFreeQueue<T> where T : class
{
private sealed class Node
{
public T? Value;
public Node? Next;
}
private Node _head = new();
private Node _tail = _head;
public void Enqueue(T value)
{
var node = new Node { Value = value };
Node? last;
while (true)
{
last = Volatile.Read(ref _tail);
var next = Volatile.Read(ref last.Next);
if (last == Volatile.Read(ref _tail))
{
if (next == null)
{
// CAS: если last.Next == null, установить node
if (Interlocked.CompareExchange(ref last.Next, node, null) == null)
break;
}
else
{
// Помочь другому потоку продвинуть tail
Interlocked.CompareExchange(ref _tail, next, last);
}
}
}
// Продвинуть tail
Interlocked.CompareExchange(ref _tail, node, last);
}
public bool TryDequeue(out T? value)
{
Node? first;
while (true)
{
first = Volatile.Read(ref _head);
var last = Volatile.Read(ref _tail);
var next = Volatile.Read(ref first.Next);
if (first == Volatile.Read(ref _head))
{
if (first == last)
{
if (next == null)
{
value = default;
return false; // Queue empty
}
Interlocked.CompareExchange(ref _tail, next, first);
}
else
{
if (next is { } n)
{
value = n.Value;
if (Interlocked.CompareExchange(ref _head, n, first) == first)
return true;
}
}
}
}
}
}
Lock-Free Stack
// Lock-free stack через Interlocked.CompareExchange
public class LockFreeStack<T>
{
private sealed class Node
{
public T Value;
public Node? Next;
public Node(T value) => Value = value;
}
private Node? _head;
public void Push(T value)
{
var newNode = new Node(value);
Node? oldHead;
do
{
oldHead = _head;
newNode.Next = oldHead;
}
while (Interlocked.CompareExchange(ref _head, newNode, oldHead) != oldHead);
}
public bool TryPop(out T value)
{
Node? oldHead;
Node? newHead;
do
{
oldHead = _head;
if (oldHead == null)
{
value = default!;
return false;
}
newHead = oldHead.Next;
}
while (Interlocked.CompareExchange(ref _head, newHead, oldHead) != oldHead);
value = oldHead.Value;
return true;
}
}
ABA Problem
ABA Problem в lock-free алгоритмах:
1. Thread 1 читает head = A
2. Thread 2 pop A, pop B, push A (head снова A!)
3. Thread 1 CAS: head == A, заменить на newHead
4. PROBLEM: head A — другой объект!
Решения:
1. Tagged pointers — добавить version к pointer
2. Hazard pointers — mark nodes in use
3. Epoch-based reclamation — defer deletion
Double-Checked Locking Singleton
Pattern в .NET
// Double-checked locking — thread-safe lazy initialization
public class Singleton
{
private static Singleton? _instance;
private static readonly object _lock = new();
public static Singleton Instance
{
get
{
if (_instance == null) // First check (без lock)
{
lock (_lock)
{
if (_instance == null) // Second check (в lock)
{
_instance = new Singleton();
}
}
}
return _instance;
}
}
}
// ВАЖНО: В .NET это работает без volatile благодаря memory model
// Но для portability лучше использовать volatile:
private static volatile Singleton? _instance;
Memory Barriers в DCL
// Почему DCL работает в .NET:
// 1. .NET memory model сильнее чем Java/C++
// 2. Запись в _instance происходит до выхода из lock
// 3. Lock release — acquire memory barrier
// 4. Следующий read видит записанное значение
// Для других платформ:
private static volatile Singleton? _instance;
// Или через Interlocked:
public static Singleton Instance =>
Volatile.Read(ref _instance) ?? CreateInstance();
private static Singleton CreateInstance()
{
var instance = new Singleton();
var original = Interlocked.CompareExchange(ref _instance, instance, null);
return original ?? instance;
}
Lazy — встроенный singleton
// Lazy<T> — thread-safe lazy initialization из коробки
public class Singleton
{
private static readonly Lazy<Singleton> _instance =
new Lazy<Singleton>(() => new Singleton(), LazyThreadSafetyMode.ExecutionAndPublication);
public static Singleton Instance => _instance.Value;
}
// LazyThreadSafetyMode:
// ExecutionAndPublication — lock вокруг factory (по умолчанию)
// PublicationOnly — multiple factory calls, first wins
// None — no thread safety
Практика
Задание 1: Lock-Free Stack
// Реализуйте lock-free stack через Interlocked.CompareExchange
// Тесты:
// 1. Multiple pushers, multiple poppers
// 2. Проверьте correctness (все pushed items popped)
// 3. Benchmark vs ConcurrentStack
public class LockFreeStack<T>
{
// См. реализацию выше
// Добавьте:
// - Count через Interlocked
// - TryPop с timeout
// - Clear через CAS loop
}
Задание 2: Immutable Collection с Structural Sharing
// Persistent Vector (HAMT — Hash Array Mapped Trie)
public class PersistentVector<T>
{
private const int BranchingFactor = 32;
private readonly Node? _root;
private readonly int _count;
private sealed record Node(T[] Items, Node?[] Children);
public static PersistentVector<T> Empty => new(null, 0);
public PersistentVector<T> Add(T value)
{
// Реализуйте:
// 1. Вычислите путь в trie
// 2. Скопируйте путь от корня до листа
// 3. Остальные узлы shared
}
public T this[int index]
{
get
{
// O(log32 N) — очень быстро
}
}
public PersistentVector<T> Set(int index, T value)
{
// Persistent update — возвращает новый vector
}
public int Count => _count;
}
Задание 3: Simple Actor System
public class SimpleActorSystem
{
private readonly Dictionary<string, Actor> _actors = new();
private readonly Channel<Message> _deadLetterQueue = Channel.CreateUnbounded<Message>();
public ActorRef Spawn<T>(string name, Func<ActorContext, T> factory)
where T : class, IActor
{
// Создайте actor и верните ref
}
public async Task ShutdownAsync()
{
// Graceful shutdown всех actors
}
}
public interface IActor
{
Task ReceiveAsync(Message message, ActorContext context);
}
public record Message(object Payload, ActorRef Sender);
public class ActorRef
{
public Task TellAsync(object payload, ActorRef? sender = null)
{
// Отправить message
}
public Task<T> AskAsync<T>(object payload, TimeSpan? timeout = null)
{
// Request-response pattern
}
}
public class ActorContext
{
public ActorRef Self { get; }
public ActorRef Sender { get; }
public void Become(IActor newBehavior) { }
public void Stop() { }
public ActorRef Spawn<T>(string name, Func<ActorContext, T> factory) where T : class, IActor { }
}
Шпаргалка: Concurrency Patterns
| Pattern | Когда использовать |
| Actor Model | Изолированное состояние, message passing |
| Immutable Data | Read-heavy, functional style |
| Lock-Free | Ultra-low latency, high contention |
| Double-Checked Locking | Lazy singleton initialization |
| CAS Loop | Lock-free updates |
| Structural Sharing | Persistent data structures |
Distributed Concurrency
Distributed Locks
Redis Distributed Lock
// Redis SET NX EX — атомарное получение lock
public class RedisDistributedLock : IDisposable
{
private readonly IDatabase _redis;
private readonly string _resource;
private readonly string _lockValue;
private readonly TimeSpan _expiry;
public RedisDistributedLock(IDatabase redis, string resource, TimeSpan expiry)
{
_redis = redis;
_resource = $"lock:{resource}";
_lockValue = Guid.NewGuid().ToString();
_expiry = expiry;
}
public async Task<bool> AcquireAsync(CancellationToken ct = default)
{
// SET key value NX EX timeout — атомарно
var acquired = await _redis.StringSetAsync(
_resource,
_lockValue,
_expiry,
When.NotExists);
return acquired;
}
public async Task<bool> ReleaseAsync()
{
// Lua script — атомарная проверка и удаление
var script = @"
if redis.call('get', KEYS[1]) == ARGV[1] then
return redis.call('del', KEYS[1])
else
return 0
end";
var result = await _redis.ScriptEvaluateAsync(script,
new RedisKey[] { _resource },
new RedisValue[] { _lockValue });
return (long)result == 1;
}
public void Dispose()
{
ReleaseAsync().GetAwaiter().GetResult();
}
}
// Использование
using var scope = new RedisDistributedLock(redis, "my-resource", TimeSpan.FromSeconds(30));
if (await scope.AcquireAsync())
{
// Критическая секция
await DoWorkAsync();
}
Redlock Algorithm
// Redlock — distributed lock с multiple Redis instances
// Защита от split-brain
public class Redlock
{
private readonly IDatabase[] _instances;
private readonly TimeSpan _quorumTimeout;
public Redlock(IDatabase[] instances)
{
_instances = instances;
_quorumTimeout = TimeSpan.FromMilliseconds(50);
}
public async Task<RedlockResult> LockAsync(string resource, TimeSpan ttl)
{
var startTime = DateTime.UtcNow;
var lockValue = Guid.NewGuid().ToString();
int acquired = 0;
// Попытка получить lock на всех instances
foreach (var instance in _instances)
{
var acquired_instance = await instance.StringSetAsync(
$"lock:{resource}", lockValue, ttl, When.NotExists);
if (acquired_instance)
acquired++;
}
var elapsedTime = DateTime.UtcNow - startTime;
var validTime = ttl - elapsedTime;
// Quorum: N/2 + 1
var quorum = _instances.Length / 2 + 1;
var isValid = acquired >= quorum && validTime > TimeSpan.Zero;
return new RedlockResult(isValid, lockValue, validTime);
}
}
public record RedlockResult(bool IsValid, string LockValue, TimeSpan ValidTime);
ZooKeeper Distributed Lock
// ZooKeeper — ephemeral sequential nodes
public class ZooKeeperDistributedLock
{
private readonly IZooKeeper _zk;
private readonly string _path;
private string? _lockPath;
public ZooKeeperDistributedLock(IZooKeeper zk, string resource)
{
_zk = zk;
_path = $"/locks/{resource}";
}
public async Task<bool> AcquireAsync(CancellationToken ct)
{
// Создать ephemeral sequential node
_lockPath = await _zk.CreateAsync(
$"{_path}/lock-",
Array.Empty<byte>(),
ZooDefs.Ids.OPEN_ACL_UNSAFE,
CreateMode.EphemeralSequential);
// Проверить, что мы первый
var children = await _zk.GetChildrenAsync(_path);
var sorted = children.OrderBy(x => x).ToList();
if (sorted[0] == Path.GetFileName(_lockPath))
{
return true; // Мы первые — lock получен
}
// Ждать пока предыдущий node удалится
var previous = sorted[sorted.IndexOf(Path.GetFileName(_lockPath)) - 1];
await _zk.WaitForDeleteAsync($"{_path}/{previous}", ct);
return true;
}
public async Task ReleaseAsync()
{
if (_lockPath != null)
{
await _zk.DeleteAsync(_lockPath);
}
}
}
Consensus Algorithms — Raft Basics
Raft Overview
Raft — consensus algorithm для replicated state machine:
Роли:
- Leader: принимает все запросы, replicates log
- Follower: пассивный, отвечает на heartbeats
- Candidate: временно, для elections
Terms:
- Время разделено на terms
- Каждый term начинается с election
- Один leader per term
Log Replication:
1. Client отправляет command Leader
2. Leader appends to log
3. Leader replicates to Followers
4. Когда majority committed — apply to state machine
Raft Implementation (упрощённая)
public class RaftNode
{
private NodeState _state = NodeState.Follower;
private int _currentTerm;
private int _votedFor = -1;
private int _commitIndex;
private readonly List<LogEntry> _log = new();
private readonly int _nodeId;
private readonly int[] _peerIds;
public enum NodeState { Follower, Candidate, Leader }
public record LogEntry(int Term, int Index, string Command);
public RaftNode(int nodeId, int[] peerIds)
{
_nodeId = nodeId;
_peerIds = peerIds;
}
public async Task StartElectionAsync()
{
_state = NodeState.Candidate;
_currentTerm++;
_votedFor = _nodeId;
// Request votes from peers
var votes = 1; // Vote for self
foreach (var peerId in _peerIds)
{
var response = await RequestVoteAsync(peerId, _currentTerm, _nodeId);
if (response.VoteGranted)
votes++;
}
// Majority wins
if (votes > (_peerIds.Length + 1) / 2)
{
_state = NodeState.Leader;
_ = SendHeartbeatsAsync();
}
}
private async Task<VoteResponse> RequestVoteAsync(int peerId, int term, int candidateId)
{
// RPC к peer
return new VoteResponse(term, true);
}
private async Task SendHeartbeatsAsync()
{
while (_state == NodeState.Leader)
{
foreach (var peerId in _peerIds)
{
await AppendEntriesAsync(peerId);
}
await Task.Delay(150); // Heartbeat interval
}
}
private async Task AppendEntriesAsync(int peerId)
{
// Replicate log entries to follower
}
public async Task<string> SubmitCommandAsync(string command)
{
if (_state != NodeState.Leader)
throw new InvalidOperationException("Not leader");
var entry = new LogEntry(_currentTerm, _log.Count + 1, command);
_log.Add(entry);
// Replicate to followers
await ReplicateAsync(entry);
// Когда majority replicated — commit
_commitIndex = entry.Index;
return Apply(entry);
}
private async Task ReplicateAsync(LogEntry entry)
{
foreach (var peerId in _peerIds)
{
await AppendEntriesAsync(peerId, entry);
}
}
private string Apply(LogEntry entry) => $"Applied: {entry.Command}";
}
public record VoteResponse(int Term, bool VoteGranted);
Eventual Consistency и CRDTs
Eventual Consistency
Eventual Consistency:
- Updates propagate asynchronously
- replicas eventually converge
- Conflicts possible during propagation
Conflict Resolution Strategies:
1. Last Write Wins (LWW) — по timestamp
2. Vector Clocks — causal ordering
3. CRDTs — conflict-free replicated data types
CRDTs — G-Counter
// G-Counter (Grow-only Counter) — mergeable counter
public class GCounter
{
private readonly Dictionary<string, int> _counts = new();
public void Increment(string nodeId, int amount = 1)
{
_counts[nodeId] = _counts.GetValueOrDefault(nodeId) + amount;
}
public int Value => _counts.Values.Sum();
// Merge — idempotent, commutative, associative
public GCounter Merge(GCounter other)
{
var result = new GCounter();
foreach (var key in _counts.Keys.Union(other._counts.Keys))
{
result._counts[key] = Math.Max(
_counts.GetValueOrDefault(key, 0),
other._counts.GetValueOrDefault(key, 0));
}
return result;
}
}
// Использование
var counter1 = new GCounter();
counter1.Increment("node-A", 5);
var counter2 = new GCounter();
counter2.Increment("node-B", 3);
// Merge на обоих nodes
var merged1 = counter1.Merge(counter2); // 8
var merged2 = counter2.Merge(counter1); // 8 — same!
CRDTs — LWW-Register
// Last-Writer-Wins Register
public class LWWRegister<T>
{
private T _value;
private DateTime _timestamp;
private readonly string _nodeId;
public LWWRegister(string nodeId, T initialValue)
{
_nodeId = nodeId;
_value = initialValue;
_timestamp = DateTime.UtcNow;
}
public void Set(T value)
{
_value = value;
_timestamp = DateTime.UtcNow;
}
public T Value => _value;
public LWWRegister<T> Merge(LWWRegister<T> other)
{
if (other._timestamp > _timestamp)
{
return new LWWRegister<T>(_nodeId, other._value)
{
_timestamp = other._timestamp
};
}
return this;
}
}
Saga Pattern
Saga для Distributed Transactions
Saga — последовательность локальных транзакций:
1. Каждая транзакция имеет compensating action
2. Если step fails — execute compensations in reverse
3. Нет distributed lock — eventual consistency
Types:
- Choreography: events trigger next step
- Orchestration: central coordinator
Saga Orchestrator
public class SagaOrchestrator
{
private readonly List<SagaStep> _steps = new();
public SagaOrchestrator AddStep(
Func<SagaContext, Task> action,
Func<SagaContext, Task> compensate)
{
_steps.Add(new SagaStep(action, compensate));
return this;
}
public async Task ExecuteAsync(SagaContext context, CancellationToken ct = default)
{
var completedSteps = new Stack<int>();
try
{
for (int i = 0; i < _steps.Count; i++)
{
await _steps[i].Action(context);
completedSteps.Push(i);
}
}
catch (Exception ex)
{
context.Exception = ex;
// Compensate in reverse order
while (completedSteps.TryPop(out var stepIndex))
{
try
{
await _steps[stepIndex].Compensate(context);
}
catch (Exception compEx)
{
context.CompensationErrors.Add(compEx);
}
}
throw new SagaCompensationFailedException(ex, context.CompensationErrors);
}
}
}
public record SagaStep(Func<SagaContext, Task> Action, Func<SagaContext, Task> Compensate);
public class SagaContext
{
public Dictionary<string, object> Data { get; } = new();
public Exception? Exception { get; set; }
public List<Exception> CompensationErrors { get; } = new();
}
public class SagaCompensationFailedException : Exception
{
public Exception OriginalException { get; }
public IReadOnlyList<Exception> CompensationErrors { get; }
public SagaCompensationFailedException(Exception original, IReadOnlyList<Exception> errors)
: base("Saga failed and compensation had errors", original)
{
OriginalException = original;
CompensationErrors = errors;
}
}
// Использование
var saga = new SagaOrchestrator()
.AddStep(
async ctx => { /* Create Order */ },
async ctx => { /* Cancel Order */ })
.AddStep(
async ctx => { /* Reserve Inventory */ },
async ctx => { /* Release Inventory */ })
.AddStep(
async ctx => { /* Process Payment */ },
async ctx => { /* Refund Payment */ })
.AddStep(
async ctx => { /* Ship Order */ },
async ctx => { /* Return Shipment */ });
await saga.ExecuteAsync(new SagaContext());
Idempotency Patterns
Idempotency Key
// Idempotency — multiple same requests = same result
public class IdempotentApi
{
private readonly IDistributedCache _cache;
public IdempotentApi(IDistributedCache cache)
{
_cache = cache;
}
public async Task<ApiResponse> ProcessAsync(
string idempotencyKey,
Func<Task<ApiResponse>> action)
{
// Check if already processed
var cached = await _cache.GetAsync(idempotencyKey);
if (cached != null)
{
return JsonSerializer.Deserialize<ApiResponse>(cached)!;
}
// Execute action
var result = await action();
// Cache result
await _cache.SetAsync(
idempotencyKey,
JsonSerializer.SerializeToUtf8Bytes(result),
new DistributedCacheEntryOptions
{
AbsoluteExpirationRelativeToNow = TimeSpan.FromHours(24)
});
return result;
}
}
Deduplication Layer
public class DeduplicationLayer
{
private readonly ConcurrentDictionary<string, DedupEntry> _entries = new();
private readonly TimeSpan _retention;
private record DedupEntry(object Result, DateTime CreatedAt);
public DeduplicationLayer(TimeSpan retention)
{
_retention = retention;
}
public async Task<T> ExecuteAsync<T>(
string key,
Func<Task<T>> action)
{
// Cleanup expired entries
CleanupExpired();
// Check if already processed
if (_entries.TryGetValue(key, out var entry))
{
if (DateTime.UtcNow - entry.CreatedAt < _retention)
{
return (T)entry.Result;
}
_entries.TryRemove(key, out _);
}
// Execute and cache
var result = await action();
_entries[key] = new DedupEntry(result, DateTime.UtcNow);
return result;
}
private void CleanupExpired()
{
var now = DateTime.UtcNow;
foreach (var kvp in _entries)
{
if (now - kvp.Value.CreatedAt > _retention)
{
_entries.TryRemove(kvp.Key, out _);
}
}
}
}
Практика
Задание 1: Distributed Lock через Redis SET NX
// Реализуйте:
// 1. Acquire с retry и timeout
// 2. Release с Lua script (атомарно)
// 3. Auto-renewal (extend lock before expiry)
// 4. IDisposable pattern
public class RedisLock : IAsyncDisposable
{
private readonly IDatabase _redis;
private readonly string _key;
private readonly string _value;
private readonly TimeSpan _expiry;
private Timer? _renewalTimer;
public async Task<bool> AcquireAsync(TimeSpan timeout, CancellationToken ct)
{
// Retry loop с timeout
}
public async Task<bool> ReleaseAsync()
{
// Lua script
}
private async Task RenewAsync()
{
// Extend expiry
}
public async ValueTask DisposeAsync()
{
_renewalTimer?.Dispose();
await ReleaseAsync();
}
}
Задание 2: Saga Orchestrator
// См. реализацию выше
// Добавьте:
// 1. Persistence — saga state в storage
// 2. Retry для compensation
// 3. Timeout для каждого step
// 4. Saga history/audit log
Задание 3: Idempotent API
// Реализуйте:
// 1. Idempotency key в header
// 2. Deduplication storage (Redis или in-memory)
// 3. Response caching
// 4. Cleanup expired entries
public class IdempotencyMiddleware
{
private readonly RequestDelegate _next;
private readonly IDistributedCache _cache;
public async Task InvokeAsync(HttpContext context)
{
var key = context.Request.Headers["Idempotency-Key"].ToString();
if (string.IsNullOrEmpty(key))
{
await _next(context);
return;
}
// Check cache
// If exists — return cached response
// Else — execute, cache, return
}
}
Шпаргалка: Distributed Patterns
| Pattern | Use Case |
| Redis Lock | Single resource coordination |
| Redlock | High availability distributed lock |
| ZooKeeper Lock | Ordered queue, leader election |
| Raft | Consensus, replicated state |
| Saga | Distributed transactions |
| CRDTs | Conflict-free replication |
| Idempotency Key | Safe retries, exactly-once semantics |
Performance Tuning Concurrent Systems
Thread Pool Starvation
Диагностика
// ThreadPool starvation — все потоки заблокированы
// Симптомы:
// - Высокая latency
// - Queue length растёт
// - CPU usage низкий (потоки ждут)
// Мониторинг ThreadPool
public class ThreadPoolMonitor
{
public static void LogStats()
{
ThreadPool.GetMaxThreads(out int maxWorker, out int maxIO);
ThreadPool.GetMinThreads(out int minWorker, out int minIO);
ThreadPool.GetAvailableThreads(out int availWorker, out int availIO);
var inUseWorker = maxWorker - availWorker;
var inUseIO = maxIO - availIO;
Console.WriteLine($"Worker threads: {inUseWorker}/{maxWorker} (min: {minWorker})");
Console.WriteLine($"IO threads: {inUseIO}/{maxIO} (min: {minIO})");
Console.WriteLine($"Queue length: {ThreadPool.PendingWorkItems}");
}
}
Причины Starvation
// 1. Sync over async — блокировка ThreadPool потоков
public string GetData()
{
return GetDataAsync().Result; // Блокирует поток
}
// 2. Nested Task.Run — рекурсивное использование ThreadPool
public async Task BadAsync()
{
await Task.Run(async () =>
{
await Task.Run(async () =>
{
await Task.Run(() => { }); // Nested!
});
});
}
// 3. Blocking I/O в ThreadPool
public void ProcessRequests()
{
Parallel.For(0, 1000, i =>
{
Thread.Sleep(100); // I/O simulation — блокирует поток
});
}
Лечение Starvation
// 1. Async all the way
public async Task<string> GetDataAsync()
{
return await _httpClient.GetStringAsync(url);
}
// 2. ConfigureAwait(false) в библиотеках
public async Task<string> GetDataAsync()
{
return await _httpClient.GetStringAsync(url).ConfigureAwait(false);
}
// 3. Увеличить MinThreads для burst
ThreadPool.SetMinThreads(100, 100);
// 4. Использовать SemaphoreSlim для ограничения
private readonly SemaphoreSlim _semaphore = new(10);
public async Task ProcessAsync()
{
await _semaphore.WaitAsync();
try
{
await DoWorkAsync();
}
finally
{
_semaphore.Release();
}
}
Async Context Flow — ExecutionContext, AsyncLocal
ExecutionContext
// ExecutionContext — flow context между async continuations
// Включает:
// - Security context
// - CallContext
// - CultureInfo
// - AsyncLocal<T> values
// ExecutionContext не flow'ится через Task.Run по умолчанию
// Но flow'ится через await
var culture = CultureInfo.CurrentCulture;
await Task.Run(() =>
{
// Culture тот же — ExecutionContext flowed
Console.WriteLine(CultureInfo.CurrentCulture);
});
// Suppress flow
ExecutionContext.SuppressFlow();
await Task.Run(() =>
{
// Default culture — flow suppressed
});
ExecutionContext.RestoreFlow();
AsyncLocal
// AsyncLocal<T> — async-aware thread-local storage
// Значение flow'ится через async continuations
public static class RequestContext
{
private static readonly AsyncLocal<string?> _requestId = new();
public static string? RequestId
{
get => _requestId.Value;
set => _requestId.Value = value;
}
}
// Использование в middleware
public async Task InvokeAsync(HttpContext context)
{
RequestContext.RequestId = Guid.NewGuid().ToString();
await _next(context);
// RequestId доступен во всём async chain
}
// В downstream коде
public async Task ProcessAsync()
{
var requestId = RequestContext.RequestId;
_logger.LogInformation("[{RequestId}] Processing", requestId);
}
AsyncLocal Caveats
// AsyncLocal — copy-on-write semantics
// Изменение в nested scope НЕ влияет на parent
private static AsyncLocal<int> _value = new();
public async Task DemoAsync()
{
_value.Value = 1;
Console.WriteLine(_value.Value); // 1
await InnerAsync();
Console.WriteLine(_value.Value); // 1 (не изменился!)
}
private async Task InnerAsync()
{
_value.Value = 2;
Console.WriteLine(_value.Value); // 2
// После await — значение 2 только в этом контексте
}
// ВАЖНО: AsyncLocal использует immutable snapshot
// При await — snapshot копируется
// Изменения в nested scope не propagate up
Synchronization Overhead Measurement
Benchmarking Locks
// BenchmarkDotNet для измерения overhead
[MemoryDiagnoser]
public class LockBenchmark
{
private int _counter;
private readonly object _lock = new();
private readonly SemaphoreSlim _semaphore = new(1, 1);
[Benchmark]
public void NoLock()
{
_counter++;
}
[Benchmark]
public void WithLock()
{
lock (_lock)
{
_counter++;
}
}
[Benchmark]
public async Task WithSemaphoreAsync()
{
await _semaphore.WaitAsync();
try
{
_counter++;
}
finally
{
_semaphore.Release();
}
}
[Benchmark]
public void WithInterlocked()
{
Interlocked.Increment(ref _counter);
}
}
// Результаты (примерные):
// Method | Mean | Allocated |
// ----------------- |--------- |---------- |
// NoLock | 0.5 ns | - |
// WithInterlocked | 2.0 ns | - |
// WithLock | 25.0 ns | - |
// WithSemaphoreAsync| 500.0 ns | 100 B |
Contention Measurement
// Monitor.LockContentionCount (.NET Core 3.0+)
// Показывает количество lock contention events
var before = Monitor.LockContentionCount;
// ... execute code ...
var after = Monitor.LockContentionCount;
var contention = after - before;
// Высокая contention — рассмотрите:
// 1. ReaderWriterLockSlim для read-heavy
// 2. ConcurrentDictionary для key-value
// 3. Lock striping (разделить на多个 lock'ов)
// 4. Lock-free структуры
Lock Striping
// Lock striping — уменьшить contention разделив на多个 lock'ов
public class StripedCache<TKey, TValue>
{
private readonly int _stripeCount;
private readonly object[] _locks;
private readonly Dictionary<TKey, TValue>[] _stripes;
public StripedCache(int stripeCount)
{
_stripeCount = stripeCount;
_locks = Enumerable.Range(0, stripeCount).Select(_ => new object()).ToArray();
_stripes = Enumerable.Range(0, stripeCount).Select(_ => new Dictionary<TKey, TValue>()).ToArray();
}
private int GetStripe(TKey key) => Math.Abs(key.GetHashCode()) % _stripeCount;
public void Add(TKey key, TValue value)
{
var stripe = GetStripe(key);
lock (_locks[stripe])
{
_stripes[stripe][key] = value;
}
}
public bool TryGetValue(TKey key, out TValue value)
{
var stripe = GetStripe(key);
lock (_locks[stripe])
{
return _stripes[stripe].TryGetValue(key, out value);
}
}
}
Scalability Testing
Load Patterns
// Load testing patterns
public class LoadTestRunner
{
public async Task RunConstantLoadAsync(
Func<Task> action,
int concurrentUsers,
TimeSpan duration)
{
var cts = new CancellationTokenSource(duration);
var tasks = Enumerable.Range(0, concurrentUsers)
.Select(_ => SimulateUserAsync(action, cts.Token))
.ToArray();
await Task.WhenAll(tasks);
}
private async Task SimulateUserAsync(Func<Task> action, CancellationToken ct)
{
while (!ct.IsCancellationRequested)
{
await action();
await Task.Delay(Random.Shared.Next(100, 1000), ct);
}
}
}
Soak Tests
// Soak test — длительная нагрузка для поиска memory leaks
public async Task RunSoakTestAsync(
Func<Task> action,
int concurrentUsers,
TimeSpan duration)
{
var startTime = DateTime.UtcNow;
var cts = new CancellationTokenSource(duration);
var tasks = Enumerable.Range(0, concurrentUsers)
.Select(_ => Task.Run(async () =>
{
while (!cts.IsCancellationRequested)
{
var sw = Stopwatch.StartNew();
await action();
sw.Stop();
// Log latency
RecordLatency(sw.Elapsed);
}
}))
.ToArray();
// Periodic GC и memory stats
var monitorTask = Task.Run(async () =>
{
while (!cts.IsCancellationRequested)
{
GC.Collect();
GC.WaitForPendingFinalizers();
GC.Collect();
LogMemoryStats();
await Task.Delay(TimeSpan.FromMinutes(1), cts.Token);
}
});
await Task.WhenAll(tasks);
await monitorTask;
}
Spike Tests
// Spike test — резкое увеличение нагрузки
public async Task RunSpikeTestAsync(Func<Task> action)
{
// Baseline — 10 users
await RunConstantLoadAsync(action, 10, TimeSpan.FromMinutes(1));
// Spike — 100 users
await RunConstantLoadAsync(action, 100, TimeSpan.FromMinutes(1));
// Recovery — 10 users
await RunConstantLoadAsync(action, 10, TimeSpan.FromMinutes(1));
// Measure:
// - Latency degradation
// - Error rate
// - Recovery time
}
Metrics Collection
public class MetricsCollector
{
private readonly ConcurrentQueue<TimeSpan> _latencies = new();
private long _successCount;
private long _errorCount;
public void RecordLatency(TimeSpan latency, bool success)
{
_latencies.Enqueue(latency);
if (success)
Interlocked.Increment(ref _successCount);
else
Interlocked.Increment(ref _errorCount);
}
public MetricsReport GetReport()
{
var sorted = _latencies.OrderBy(x => x).ToList();
return new MetricsReport
{
TotalRequests = sorted.Count,
SuccessCount = Interlocked.Read(ref _successCount),
ErrorCount = Interlocked.Read(ref _errorCount),
P50 = sorted[sorted.Count / 2],
P95 = sorted[(int)(sorted.Count * 0.95)],
P99 = sorted[(int)(sorted.Count * 0.99)],
Throughput = sorted.Count / (sorted.Last() - sorted.First()).TotalSeconds
};
}
}
public record MetricsReport
{
public int TotalRequests { get; init; }
public long SuccessCount { get; init; }
public long ErrorCount { get; init; }
public TimeSpan P50 { get; init; }
public TimeSpan P95 { get; init; }
public TimeSpan P99 { get; init; }
public double Throughput { get; init; }
}
Практика
Задание 1: Найти и устранить Thread Pool Starvation
// Сценарий: приложение с sync-over-async
// 1. Создайте приложение с .Result/.Wait()
// 2. Запустите load test — observe starvation
// 3. Замените на async/await
// 4. Сравните latency и throughput
public class StarvationDemo
{
private readonly HttpClient _httpClient = new();
// BAD — sync over async
public string GetDataBad(string url)
{
return _httpClient.GetStringAsync(url).Result;
}
// GOOD — async all the way
public async Task<string> GetDataGoodAsync(string url)
{
return await _httpClient.GetStringAsync(url);
}
}
Задание 2: Custom Diagnostic для Async Chain Analysis
public class AsyncChainDiagnostic
{
private readonly AsyncLocal<AsyncContext> _context = new();
public IDisposable BeginScope(string operationName)
{
var ctx = new AsyncContext
{
OperationName = operationName,
StartTime = DateTime.UtcNow,
Parent = _context.Value
};
_context.Value = ctx;
return new Scope(this, ctx);
}
public void EndScope(AsyncContext ctx)
{
ctx.EndTime = DateTime.UtcNow;
_context.Value = ctx.Parent;
LogChain(ctx);
}
private void LogChain(AsyncContext ctx)
{
var chain = new List<AsyncContext>();
var current = ctx;
while (current != null)
{
chain.Add(current);
current = current.Parent;
}
Console.WriteLine(string.Join(" → ", chain.Select(c =>
$"{c.OperationName} ({(c.EndTime - c.StartTime).TotalMilliseconds}ms)")));
}
private sealed record AsyncContext
{
public string OperationName { get; init; } = "";
public DateTime StartTime { get; init; }
public DateTime? EndTime { get; set; }
public AsyncContext? Parent { get; set; }
}
private sealed class Scope : IDisposable
{
private readonly AsyncChainDiagnostic _diagnostic;
private readonly AsyncContext _context;
public Scope(AsyncChainDiagnostic diagnostic, AsyncContext context)
{
_diagnostic = diagnostic;
_context = context;
}
public void Dispose() => _diagnostic.EndScope(_context);
}
}
// Использование
using var diag = new AsyncChainDiagnostic();
using (diag.BeginScope("ProcessRequest"))
{
using (diag.BeginScope("FetchData"))
{
await Task.Delay(100);
}
using (diag.BeginScope("TransformData"))
{
await Task.Delay(50);
}
}
Задание 3: Load Test Framework
public class LoadTestFramework
{
private readonly MetricsCollector _metrics = new();
public async Task<MetricsReport> RunAsync(
Func<CancellationToken, Task> action,
LoadTestOptions options)
{
var cts = new CancellationTokenSource(options.Duration);
var tasks = Enumerable.Range(0, options.Concurrency)
.Select(_ => RunWorkerAsync(action, cts.Token))
.ToArray();
await Task.WhenAll(tasks);
return _metrics.GetReport();
}
private async Task RunWorkerAsync(
Func<CancellationToken, Task> action,
CancellationToken ct)
{
while (!ct.IsCancellationRequested)
{
var sw = Stopwatch.StartNew();
try
{
await action(ct);
_metrics.RecordLatency(sw.Elapsed, true);
}
catch (Exception)
{
_metrics.RecordLatency(sw.Elapsed, false);
}
if (options.ThinkTime > TimeSpan.Zero)
{
await Task.Delay(options.ThinkTime, ct);
}
}
}
}
public record LoadTestOptions
{
public int Concurrency { get; init; }
public TimeSpan Duration { get; init; }
public TimeSpan ThinkTime { get; init; }
}
Шпаргалка: Performance Checklist
| Проверка | Инструмент |
| Thread pool starvation | ThreadPool.GetAvailableThreads |
| Lock contention | Monitor.LockContentionCount |
| Async context flow | AsyncLocal<T>, ExecutionContext |
| Memory leaks | Soak test + GC stats |
| Latency percentiles | P50, P95, P99 metrics |
| Throughput | Requests/sec measurement |
| Scalability | Load test с increasing concurrency |
Advanced Async Patterns
PeriodicTimer vs Timer
PeriodicTimer (.NET 6+)
// PeriodicTimer — современный async timer
// Преимущества:
// - Полностью async (WaitForNextTickAsync)
// - No callback hell
// - Easy cancellation
// - Skips missed ticks
public async Task RunPeriodicAsync(CancellationToken ct)
{
using var timer = new PeriodicTimer(TimeSpan.FromSeconds(1));
while (await timer.WaitForNextTickAsync(ct))
{
await DoWorkAsync();
}
}
// Обработка drift
public async Task RunWithDriftAsync(CancellationToken ct)
{
using var timer = new PeriodicTimer(TimeSpan.FromSeconds(1));
while (await timer.WaitForNextTickAsync(ct))
{
var now = DateTime.UtcNow;
await DoWorkAsync();
var elapsed = DateTime.UtcNow - now;
if (elapsed > TimeSpan.FromSeconds(1.5))
{
Console.WriteLine($"Drift detected: {elapsed.TotalMilliseconds}ms");
}
}
}
System.Threading.Timer
// Legacy timer — callback-based
public class LegacyTimerDemo
{
private Timer? _timer;
public void Start()
{
_timer = new Timer(
callback: async state => await DoWorkAsync(),
state: null,
dueTime: TimeSpan.Zero,
period: TimeSpan.FromSeconds(1));
}
// PROBLEM: callback может overlapped
private async Task DoWorkAsync()
{
// Если работа занимает > 1s, следующий callback запустится
// Нужно manual overlap protection
}
}
Timer Comparison
| Характеристика | PeriodicTimer | System.Threading.Timer |
| API | Async-first | Callback-based |
| Overlap protection | Built-in (await) | Manual |
| Cancellation | CancellationToken | Change(Timeout, Timeout) |
| Missed ticks | Skipped | Queued |
| .NET Version | 6+ | All |
Structured Concurrency
Task.Run и Cancellation Patterns
// Structured concurrency — clear parent-child relationships
// Все дочерние задачи завершаются когда parent завершается
// Pattern: using var cts = ...
public async Task ProcessWithTimeoutAsync()
{
using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(30));
try
{
await DoWorkAsync(cts.Token);
}
catch (OperationCanceledException)
{
Console.WriteLine("Timed out");
}
}
// Pattern: linked cancellation
public async Task ProcessWithUserCancellationAsync(CancellationToken userCt)
{
using var cts = CancellationTokenSource.CreateLinkedTokenSource(userCt);
cts.CancelAfter(TimeSpan.FromSeconds(30));
await DoWorkAsync(cts.Token);
// Отменится либо по userCt, либо по timeout
}
Task.WhenAll с Cleanup
// Structured cleanup — все задачи завершаются cleanly
public async Task ProcessAllWithCleanupAsync(
IEnumerable<Func<CancellationToken, Task>> tasks,
CancellationToken ct)
{
using var cts = CancellationTokenSource.CreateLinkedTokenSource(ct);
var runningTasks = new List<Task>();
try
{
foreach (var taskFactory in tasks)
{
runningTasks.Add(taskFactory(cts.Token));
}
await Task.WhenAll(runningTasks);
}
catch
{
// Отменить все running tasks
cts.Cancel();
// Ждать cleanup
await Task.WhenAll(runningTasks.Where(t => !t.IsCompleted));
throw;
}
}
Background Service Pattern
// IHostedService с graceful shutdown
public class MyBackgroundService : BackgroundService
{
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
using var timer = new PeriodicTimer(TimeSpan.FromSeconds(1));
while (await timer.WaitForNextTickAsync(stoppingToken))
{
try
{
await DoWorkAsync(stoppingToken);
}
catch (OperationCanceledException) when (stoppingToken.IsCancellationRequested)
{
// Graceful shutdown
break;
}
catch (Exception ex)
{
_logger.LogError(ex, "Error in background service");
}
}
}
public override async Task StopAsync(CancellationToken cancellationToken)
{
// Cleanup before base.StopAsync
await CleanupAsync();
await base.StopAsync(cancellationToken);
}
}
Parallel.ForEachAsync
Streaming Parallel Processing
// Parallel.ForEachAsync (.NET 6+) — async parallel processing
public async Task ProcessUrlsAsync(IEnumerable<string> urls)
{
var options = new ParallelOptions
{
MaxDegreeOfParallelism = 10,
CancellationToken = ct
};
await Parallel.ForEachAsync(urls, options, async (url, ct) =>
{
var content = await _httpClient.GetStringAsync(url, ct);
await ProcessContentAsync(content, ct);
});
}
// С throttling
public async Task ThrottledProcessAsync(IEnumerable<string> urls)
{
var options = new ParallelOptions
{
MaxDegreeOfParallelism = 5 // Max 5 concurrent
};
await Parallel.ForEachAsync(urls, options, async (url, ct) =>
{
// Каждый url обрабатывается async, но не более 5 параллельно
await ProcessAsync(url, ct);
});
}
Parallel.ForEachAsync vs Task.WhenAll
// Task.WhenAll — all at once (no throttling)
var tasks = urls.Select(url => ProcessAsync(url));
await Task.WhenAll(tasks); // Все одновременно!
// Parallel.ForEachAsync — throttled
await Parallel.ForEachAsync(urls, new ParallelOptions
{
MaxDegreeOfParallelism = 10
}, async (url, ct) =>
{
await ProcessAsync(url, ct);
}); // Max 10 одновременно
// Когда использовать:
// Task.WhenAll — когда все задачи лёгкие и memory OK
// Parallel.ForEachAsync — когда нужно throttling
Cooperative Cancellation Best Practices
Cancellation Propagation
// Всегда передавайте CancellationToken
public async Task ProcessAsync(CancellationToken ct = default)
{
await Step1Async(ct);
await Step2Async(ct);
await Step3Async(ct);
}
// Проверка в циклах
public async Task ProcessManyAsync(IEnumerable<Item> items, CancellationToken ct)
{
foreach (var item in items)
{
ct.ThrowIfCancellationRequested(); // Check before each item
await ProcessItemAsync(item, ct);
}
}
Cancelable Operations
// HttpClient поддерживает cancellation
var response = await _httpClient.GetAsync(url, ct);
// Task.Delay поддерживает cancellation
await Task.Delay(TimeSpan.FromSeconds(5), ct);
// Channel поддерживает cancellation
await channel.Writer.WriteAsync(item, ct);
await foreach (var item in channel.Reader.ReadAllAsync(ct)) { }
// Stream поддерживает cancellation
await stream.ReadAsync(buffer, ct);
// Custom cancellation
public async Task<T> WithCancellation<T>(
Task<T> task,
CancellationToken ct)
{
var completed = await Task.WhenAny(task, CreateCancellationTask(ct));
if (completed != task)
{
throw new OperationCanceledException(ct);
}
return await task;
}
private Task CreateCancellationTask(CancellationToken ct)
{
var tcs = new TaskCompletionSource();
ct.Register(() => tcs.TrySetCanceled(ct));
return tcs.Task;
}
Cancellation with Timeout
// Timeout pattern
public async Task<T> WithTimeoutAsync<T>(
Func<CancellationToken, Task<T>> action,
TimeSpan timeout)
{
using var cts = new CancellationTokenSource(timeout);
return await action(cts.Token);
}
// Nested timeouts
public async Task ProcessWithNestedTimeoutsAsync()
{
using var outerCts = new CancellationTokenSource(TimeSpan.FromMinutes(5));
using var step1Cts = CancellationTokenSource.CreateLinkedTokenSource(outerCts.Token);
step1Cts.CancelAfter(TimeSpan.FromSeconds(30));
await Step1Async(step1Cts.Token);
using var step2Cts = CancellationTokenSource.CreateLinkedTokenSource(outerCts.Token);
step2Cts.CancelAfter(TimeSpan.FromSeconds(60));
await Step2Async(step2Cts.Token);
}
Практика
Задание 1: Resilient Background Service
// Retry + Circuit Breaker + Bulkhead
public class ResilientBackgroundService : BackgroundService
{
private readonly RetryPolicy _retryPolicy;
private readonly CircuitBreaker _circuitBreaker;
private readonly SemaphoreSlim _bulkhead;
public ResilientBackgroundService()
{
_retryPolicy = new RetryPolicy(3, TimeSpan.FromSeconds(1));
_circuitBreaker = new CircuitBreaker(5, TimeSpan.FromSeconds(30));
_bulkhead = new SemaphoreSlim(10);
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
using var timer = new PeriodicTimer(TimeSpan.FromSeconds(1));
while (await timer.WaitForNextTickAsync(stoppingToken))
{
await _bulkhead.WaitAsync(stoppingToken);
try
{
await _circuitBreaker.ExecuteAsync(async ct =>
{
await _retryPolicy.ExecuteAsync(async retryCt =>
{
await DoWorkAsync(retryCt);
}, stoppingToken);
}, stoppingToken);
}
catch (OperationCanceledException) when (stoppingToken.IsCancellationRequested)
{
break;
}
catch (Exception ex)
{
_logger.LogError(ex, "Background service error");
}
finally
{
_bulkhead.Release();
}
}
}
}
public class RetryPolicy
{
private readonly int _maxRetries;
private readonly TimeSpan _initialDelay;
public RetryPolicy(int maxRetries, TimeSpan initialDelay)
{
_maxRetries = maxRetries;
_initialDelay = initialDelay;
}
public async Task ExecuteAsync(Func<CancellationToken, Task> action, CancellationToken ct)
{
for (int i = 0; i <= _maxRetries; i++)
{
try
{
await action(ct);
return;
}
catch when (i < _maxRetries)
{
var delay = _initialDelay * Math.Pow(2, i); // Exponential backoff
await Task.Delay(delay, ct);
}
}
}
}
Задание 2: Async Rate Limiter с Token Bucket
public class AsyncRateLimiter
{
private readonly int _maxTokens;
private readonly int _refillRate;
private readonly TimeSpan _refillInterval;
private int _tokens;
private DateTime _lastRefill;
private readonly object _lock = new();
public AsyncRateLimiter(int maxTokens, int refillRate, TimeSpan refillInterval)
{
_maxTokens = maxTokens;
_refillRate = refillRate;
_refillInterval = refillInterval;
_tokens = maxTokens;
_lastRefill = DateTime.UtcNow;
}
public async Task WaitAsync(CancellationToken ct = default)
{
while (true)
{
ct.ThrowIfCancellationRequested();
lock (_lock)
{
RefillTokens();
if (_tokens > 0)
{
_tokens--;
return;
}
}
// Ждать пока tokens не появятся
await Task.Delay(_refillInterval, ct);
}
}
private void RefillTokens()
{
var now = DateTime.UtcNow;
var elapsed = now - _lastRefill;
var tokensToAdd = (int)(elapsed.TotalSeconds / _refillInterval.TotalSeconds * _refillRate);
if (tokensToAdd > 0)
{
_tokens = Math.Min(_maxTokens, _tokens + tokensToAdd);
_lastRefill = now;
}
}
}
Задание 3: Concurrent Job Scheduler
public class JobScheduler
{
private readonly PriorityQueue<Job, int> _queue = new();
private readonly SemaphoreSlim _semaphore;
private readonly object _lock = new();
private CancellationTokenSource? _cts;
private Task? _workerTask;
public JobScheduler(int maxConcurrency)
{
_semaphore = new SemaphoreSlim(maxConcurrency);
}
public void Enqueue(Job job)
{
lock (_lock)
{
_queue.Enqueue(job, job.Priority);
}
}
public async Task StartAsync(CancellationToken ct)
{
_cts = CancellationTokenSource.CreateLinkedTokenSource(ct);
_workerTask = ProcessQueueAsync(_cts.Token);
}
public async Task StopAsync()
{
_cts?.Cancel();
if (_workerTask != null)
{
await _workerTask;
}
}
private async Task ProcessQueueAsync(CancellationToken ct)
{
while (!ct.IsCancellationRequested)
{
Job? job;
lock (_lock)
{
if (_queue.TryDequeue(out var j, out _))
{
job = j;
}
else
{
job = null;
}
}
if (job == null)
{
await Task.Delay(100, ct);
continue;
}
await _semaphore.WaitAsync(ct);
_ = Task.Run(async () =>
{
try
{
await job.ExecuteAsync(ct);
}
catch (Exception ex)
{
job.OnError(ex);
}
finally
{
_semaphore.Release();
}
}, ct);
}
}
}
public record Job(int Priority, Func<CancellationToken, Task> ExecuteAsync, Action<Exception> OnError);
Шпаргалка: Async Patterns
| Pattern | Когда использовать |
| PeriodicTimer | Регулярные async операции (.NET 6+) |
| Structured Concurrency | Clear task lifecycle |
| Parallel.ForEachAsync | Throttled async parallel |
| Token Bucket | Rate limiting |
| Retry + Circuit Breaker | Resilient operations |
| Cooperative Cancellation | Graceful shutdown |
Контрольная точка модуля 3
Проект: Высоконагруженный Async Message Broker
Описание
Создайте высокопроизводительный message broker с использованием современных async/concurrency паттернов .NET.
Требования
Channel-Based Message Routing
public interface IMessageBroker
{
Task PublishAsync<T>(string topic, T message, CancellationToken ct = default);
IAsyncEnumerable<T> SubscribeAsync<T>(string topic, CancellationToken ct = default);
Task UnsubscribeAsync<T>(string topic, IAsyncEnumerable<T> subscription);
}
public class ChannelMessageBroker : IMessageBroker
{
private readonly ConcurrentDictionary<string, Channel<object>> _topics = new();
public async Task PublishAsync<T>(string topic, T message, CancellationToken ct = default)
{
var channel = _topics.GetOrAdd(topic, _ =>
Channel.CreateBounded<object>(new BoundedChannelOptions(10000)
{
FullMode = BoundedChannelFullMode.Wait,
SingleReader = false,
SingleWriter = false
}));
await channel.Writer.WriteAsync(message, ct);
}
public IAsyncEnumerable<T> SubscribeAsync<T>(string topic, CancellationToken ct = default)
{
var channel = _topics.GetOrAdd(topic, _ =>
Channel.CreateBounded<object>(new BoundedChannelOptions(10000)
{
FullMode = BoundedChannelFullMode.Wait,
SingleReader = false,
SingleWriter = false
}));
return channel.Reader.ReadAllAsync(ct).CastAsync<T>();
}
}
Backpressure через Bounded Channels
public class BackpressureConfig
{
public int ChannelCapacity { get; set; } = 10000;
public BoundedChannelFullMode FullMode { get; set; } = BoundedChannelFullMode.Wait;
public TimeSpan BackpressureTimeout { get; set; } = TimeSpan.FromSeconds(5);
}
public class BackpressureMessageBroker
{
private readonly BackpressureConfig _config;
public async Task PublishWithBackpressureAsync<T>(
string topic,
T message,
CancellationToken ct = default)
{
var channel = GetOrCreateChannel(topic);
using var timeoutCts = CancellationTokenSource.CreateLinkedTokenSource(ct);
timeoutCts.CancelAfter(_config.BackpressureTimeout);
try
{
await channel.Writer.WriteAsync(message, timeoutCts.Token);
}
catch (OperationCanceledException) when (!ct.IsCancellationRequested)
{
throw new BackpressureException($"Channel {topic} is full");
}
}
public int GetQueueDepth(string topic)
{
if (_topics.TryGetValue(topic, out var channel))
{
return channel.Reader.Count;
}
return 0;
}
}
public class BackpressureException : Exception
{
public BackpressureException(string message) : base(message) { }
}
Circuit Breaker для Downstream Calls
public class CircuitBreaker
{
private readonly int _failureThreshold;
private readonly TimeSpan _resetTimeout;
private int _failures;
private DateTime? _lastFailure;
private CircuitState _state = CircuitState.Closed;
public enum CircuitState { Closed, Open, HalfOpen }
public CircuitBreaker(int failureThreshold, TimeSpan resetTimeout)
{
_failureThreshold = failureThreshold;
_resetTimeout = resetTimeout;
}
public async Task<T> ExecuteAsync<T>(Func<CancellationToken, Task<T>> action, CancellationToken ct)
{
var state = GetState();
if (state == CircuitState.Open)
{
throw new CircuitBreakerOpenException("Circuit breaker is open");
}
try
{
var result = await action(ct);
OnSuccess();
return result;
}
catch
{
OnFailure();
throw;
}
}
private CircuitState GetState()
{
if (_state == CircuitState.Open)
{
if (DateTime.UtcNow - _lastFailure > _resetTimeout)
{
_state = CircuitState.HalfOpen;
}
}
return _state;
}
private void OnSuccess()
{
Interlocked.Exchange(ref _failures, 0);
_state = CircuitState.Closed;
}
private void OnFailure()
{
var failures = Interlocked.Increment(ref _failures);
_lastFailure = DateTime.UtcNow;
if (failures >= _failureThreshold)
{
_state = CircuitState.Open;
}
}
public CircuitState State => GetState();
}
public class CircuitBreakerOpenException : Exception
{
public CircuitBreakerOpenException(string message) : base(message) { }
}
Graceful Shutdown с Cooperative Cancellation
public class GracefulMessageBroker : IHostedService, IMessageBroker
{
private readonly CancellationTokenSource _shutdownCts = new();
private readonly List<Task> _activeSubscriptions = new();
private readonly object _lock = new();
public async Task StartAsync(CancellationToken cancellationToken)
{
// Инициализация
}
public async Task StopAsync(CancellationToken cancellationToken)
{
// 1. Stop accepting new messages
_shutdownCts.Cancel();
// 2. Wait for active subscriptions to complete
lock (_lock)
{
foreach (var subscription in _activeSubscriptions)
{
// Don't wait forever — respect cancellationToken
}
}
// 3. Complete all channels
foreach (var kvp in _topics)
{
kvp.Value.Writer.Complete();
}
// 4. Wait for readers to finish
await Task.WhenAll(GetReaderTasks());
}
public async Task PublishAsync<T>(string topic, T message, CancellationToken ct = default)
{
if (_shutdownCts.IsCancellationRequested)
{
throw new InvalidOperationException("Broker is shutting down");
}
// ... publish logic
}
}
Zero Thread Pool Starvation
// Гарантии:
// 1. Все I/O operations — async
// 2. Нет .Result/.Wait()
// 3. ConfigureAwait(false) в библиотеках
// 4. ThreadPool.SetMinThreads для burst
public class StarvationFreeBroker
{
public StarvationFreeBroker()
{
// Установить min threads для burst нагрузки
ThreadPool.GetMinThreads(out int minWorker, out _);
if (minWorker < 100)
{
ThreadPool.SetMinThreads(100, 100);
}
}
// Все методы async
public async Task ProcessAsync(Message message, CancellationToken ct)
{
await _handler.HandleAsync(message, ct).ConfigureAwait(false);
}
}
Comprehensive Metrics
public class BrokerMetrics
{
private readonly ConcurrentDictionary<string, TopicMetrics> _topicMetrics = new();
public void RecordPublish(string topic, TimeSpan latency)
{
var metrics = _topicMetrics.GetOrAdd(topic, _ => new TopicMetrics());
metrics.RecordPublish(latency);
}
public void RecordConsume(string topic, TimeSpan latency)
{
var metrics = _topicMetrics.GetOrAdd(topic, _ => new TopicMetrics());
metrics.RecordConsume(latency);
}
public void RecordError(string topic)
{
var metrics = _topicMetrics.GetOrAdd(topic, _ => new TopicMetrics());
metrics.RecordError();
}
public BrokerReport GetReport()
{
return new BrokerReport
{
Topics = _topicMetrics.ToDictionary(
kvp => kvp.Key,
kvp => kvp.Value.GetReport())
};
}
}
public class TopicMetrics
{
private long _publishCount;
private long _consumeCount;
private long _errorCount;
private readonly ConcurrentQueue<TimeSpan> _publishLatencies = new();
private readonly ConcurrentQueue<TimeSpan> _consumeLatencies = new();
public void RecordPublish(TimeSpan latency)
{
Interlocked.Increment(ref _publishCount);
_publishLatencies.Enqueue(latency);
}
public void RecordConsume(TimeSpan latency)
{
Interlocked.Increment(ref _consumeCount);
_consumeLatencies.Enqueue(latency);
}
public void RecordError()
{
Interlocked.Increment(ref _errorCount);
}
public TopicReport GetReport()
{
return new TopicReport
{
PublishCount = Interlocked.Read(ref _publishCount),
ConsumeCount = Interlocked.Read(ref _consumeCount),
ErrorCount = Interlocked.Read(ref _errorCount),
PublishP99 = GetPercentile(_publishLatencies, 0.99),
ConsumeP99 = GetPercentile(_consumeLatencies, 0.99),
QueueDepth = _publishLatencies.Count - _consumeLatencies.Count
};
}
private TimeSpan GetPercentile(ConcurrentQueue<TimeSpan> queue, double percentile)
{
var sorted = queue.OrderBy(x => x).ToList();
if (sorted.Count == 0) return TimeSpan.Zero;
return sorted[(int)(sorted.Count * percentile)];
}
}
public record BrokerReport
{
public Dictionary<string, TopicReport> Topics { get; init; } = new();
}
public record TopicReport
{
public long PublishCount { get; init; }
public long ConsumeCount { get; init; }
public long ErrorCount { get; init; }
public TimeSpan PublishP99 { get; init; }
public TimeSpan ConsumeP99 { get; init; }
public int QueueDepth { get; init; }
}
Критерии прохождения
Throughput > 50k msg/sec
// Benchmark
[Benchmark]
public async Task HighThroughput()
{
var broker = new ChannelMessageBroker();
var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10));
var producer = Task.Run(async () =>
{
int count = 0;
while (!cts.IsCancellationRequested)
{
await broker.PublishAsync("test", new Message { Data = count++ }, cts.Token);
}
});
var consumer = Task.Run(async () =>
{
int count = 0;
await foreach (var msg in broker.SubscribeAsync<Message>("test", cts.Token))
{
count++;
}
return count;
});
await producer;
broker.CompleteAll();
var total = await consumer;
var throughput = total / 10.0; // msg/sec
Assert.True(throughput > 50000, $"Throughput: {throughput} msg/sec");
}
P99 Latency < 10ms
// Latency test
var latencies = new ConcurrentQueue<TimeSpan>();
for (int i = 0; i < 100000; i++)
{
var sw = Stopwatch.StartNew();
await broker.PublishAsync("test", new Message { Data = i });
sw.Stop();
latencies.Enqueue(sw.Elapsed);
}
var sorted = latencies.OrderBy(x => x).ToList();
var p99 = sorted[(int)(sorted.Count * 0.99)];
Assert.True(p99 < TimeSpan.FromMilliseconds(10), $"P99: {p99.TotalMilliseconds}ms");
Graceful Degradation
// Backpressure test
var broker = new BackpressureMessageBroker(new BackpressureConfig
{
ChannelCapacity = 100,
FullMode = BoundedChannelFullMode.Wait,
BackpressureTimeout = TimeSpan.FromMilliseconds(100)
});
// Fill channel
for (int i = 0; i < 100; i++)
{
await broker.PublishWithBackpressureAsync("test", new Message { Data = i });
}
// Next publish should timeout
var ex = await Assert.ThrowsAsync<BackpressureException>(async () =>
{
await broker.PublishWithBackpressureAsync("test", new Message { Data = 101 });
});
Clean Shutdown
// Shutdown test
var broker = new GracefulMessageBroker();
await broker.StartAsync(CancellationToken.None);
// Publish messages
for (int i = 0; i < 1000; i++)
{
await broker.PublishAsync("test", new Message { Data = i });
}
// Graceful shutdown
await broker.StopAsync(CancellationToken.None);
// All messages should be processed
Assert.True(processedCount == 1000);
No Deadlocks или Race Conditions
// Concurrent load test
var broker = new ChannelMessageBroker();
var cts = new CancellationTokenSource(TimeSpan.FromSeconds(30));
var producers = Enumerable.Range(0, 10)
.Select(i => Task.Run(async () =>
{
int count = 0;
while (!cts.IsCancellationRequested)
{
await broker.PublishAsync($"topic-{i % 3}", new Message { Data = count++ }, cts.Token);
}
}))
.ToArray();
var consumers = Enumerable.Range(0, 10)
.Select(i => Task.Run(async () =>
{
int count = 0;
await foreach (var msg in broker.SubscribeAsync<Message>($"topic-{i % 3}", cts.Token))
{
count++;
}
return count;
}))
.ToArray();
// Should complete without deadlock
await Task.WhenAll(producers);
broker.CompleteAll();
var totals = await Task.WhenAll(consumers);
Архитектура проекта
MessageBroker/
├── Core/
│ ├── IMessageBroker.cs
│ ├── ChannelMessageBroker.cs
│ ├── BackpressureMessageBroker.cs
│ └── GracefulMessageBroker.cs
├── Resilience/
│ ├── CircuitBreaker.cs
│ ├── RetryPolicy.cs
│ └── Bulkhead.cs
├── Metrics/
│ ├── BrokerMetrics.cs
│ ├── TopicMetrics.cs
│ └── BrokerReport.cs
├── Routing/
│ ├── TopicRouter.cs
│ └── MessageFilter.cs
└── Tests/
├── ThroughputTests.cs
├── LatencyTests.cs
├── BackpressureTests.cs
├── ShutdownTests.cs
└── ConcurrencyTests.cs
Checklist
- [ ] Channel-based message routing реализован
- [ ] Bounded channels с backpressure
- [ ] Circuit breaker для downstream calls
- [ ] Graceful shutdown с cooperative cancellation
- [ ] ThreadPool starvation prevention
- [ ] Metrics (queue depth, processing time, error rate)
- [ ] Throughput > 50k msg/sec
- [ ] P99 latency < 10ms
- [ ] Graceful degradation при overload
- [ ] Clean shutdown без lost messages
- [ ] No deadlocks или race conditions