В простейшем случае пул состоит из фиксированного числа std::thread::hardware_concurrency()). Когда у программы появляется какая-то работа, она вызывает функцию, которая помещает эту работу в очередь. Рабочий поток забирает работу из очереди, выполняет указанную в ней задачу, после чего проверяет, есть ли в очереди другие работы. В этой реализации никакого механизма ожидания завершения задачи не предусмотрело. Если это необходимо, то вы должны будете управлять синхронизацией самостоятельно.
В следующем листинге приведена реализация такого пула потоков.
Листинг 9.1. Простой пул потоков
class thread_pool {
std::atomic_bool done;
thread_safe_queue(1)
std::vector(2)
join_threads joiner; ←(3)
void worker_thread() {
while (!done) { ←(4)
std::function
if (work_queue.try_pop(task)) { ←(5)
task(); ←(6)
} else {
std::this_thread::yield(); ←(7)
}
}
}
public:
thread_pool():
done(false), joiner(threads) {
unsigned const thread_count =
std::thread::hardware_concurrency();←(8)
try {
for (unsigned i = 0; i < thread_count; ++i) {
threads.push_back(
std::thread(&thread_pool::worker_thread, this)); ←(9)
}
} catch (...) {
done = true; ←(10)
throw;
}
}
~thread_pool() {
done = true; ←(11)
}
template
void submit(FunctionType f) {
work_queue.push(std::function(12)
}
};
Здесь мы определили вектор рабочих потоков (2) и используем одну из потокобезопасных очередей из главы 6 (1) для хранения очереди работ. В данном случае пользователь не может ждать завершения задачи, а задача не может возвращать значения, поэтому для инкапсуляции задач можно использовать тип std::function. Функция submit() обертывает переданную функцию или допускающий вызов объект в объект std::function и помещает его в очередь (12).
Потоки запускаются в конструкторе; их количество равно значению, возвращаемому функцией std::thread::hardware_concurrency(), то есть мы создаем столько потоков, сколько может поддержать оборудование (8). Все эти потоки исполняют функцию-член нашего класса worker_thread() (9).
Запуск потока может завершиться исключением, поэтому необходимо позаботиться о том, чтобы уже запущенные к этому моменту потоки корректно завершались. Для этого мы включили блок try-catch, который в случае исключения поднимает флаг done (10). Кроме того, мы воспользовались классом join_threads из главы 8 (3), чтобы обеспечить присоединение всех потоков. То же самое происходит в деструкторе: мы просто поднимаем флаг done (11), а объект join_threads гарантирует, что потоки завершатся до уничтожения пула. Отметим, что порядок объявления членов важен: и флаг done и объект worker_queue должны быть объявлены раньше вектора threads, который, в свою очередь, должен быть объявлен раньше joiner. Только тогда деструкторы членов класса будут вызываться в правильном порядке; в частности, нельзя уничтожать очередь раньше, чем остановлены все потоки.
Сама функция worker_thread проста до чрезвычайности: в цикле, который продолжается, пока не поднят флаг done (4), она извлекает задачи из очереди (5) и выполняет их (6). Если в очереди нет задач, функция вызывает std::this_thread::yield() (7), чтобы немного отдохнуть и дать возможность поработать другим потокам.
Часто даже такого простого пула потоков достаточно, особенно если задачи независимы, не возвращают значений и не выполняют блокирующих операций. Но бывает и по-другому: во-первых, у программы могут быть более жесткие требования, а, во-вторых, в таком пуле возможны проблемы, в частности, из-за взаимоблокировок. Кроме того, в простых случаях иногда лучше прибегнуть к функции std::async, как неоднократно демонстрировалось в главе 8. В этой главе мы рассмотрим и более изощренные реализации пула потоков с дополнительными возможностями, которые призваны либо удовлетворить особые потребности пользователя, либо уменьшить количество потенциальных ошибок. Для начала разрешим ожидать завершения переданной пулу задачи.
9.1.2. Ожидание задачи, переданной пулу потоков