Пул потоков обеспечивает также безопасность относительно исключений. Любое возбужденное задачей исключение передается через будущий результат, возвращенный submit(), а, если выход из функции происходит в результате исключения, то деструктор пула потоков снимет еще работающие задачи и дождется завершения потоков, входящих в пул.

Эта схема работает для простых случаев, когда задачи независимы. Но не годится, когда одни задачи зависят от других, также переданных пулу.

<p>9.1.3. Задачи, ожидающие других задач</p>

В этой книге я уже неоднократно приводил пример алгоритма Quicksort. Его идея проста — подлежащие сортировке данные разбиваются на две части: до и после опорного элемента (в смысле заданной операции сравнения). Затем обе части рекурсивно сортируются и объединяются для получения полностью отсортированной последовательности. При распараллеливании алгоритма надо позаботиться о том, чтобы рекурсивные вызовы задействовали имеющийся аппаратный параллелизм.

В главе 4, где этот пример впервые был представлен, мы использовали std::async для выполнения одного из рекурсивных вызовов на каждом шаге и оставляли библиотеке решение о том, запускать ли новый поток или сортировать синхронно при обращении к get(). Этот подход неплохо работает — каждая задача либо выполняется в отдельном потоке, либо в тот момент, когда нужны ее результаты.

В главе 8 мы переработали эту реализацию, продемонстрировав альтернативный подход, когда количество потоков фиксировано и определяется уровнем аппаратного параллелизма. В данном случае мы воспользовались стеком ожидающих сортировки блоков. Разбивая на части предложенные для сортировки данные, каждый поток помещал один блок в стек, а второй сортировал непосредственно. Бесхитростное ожидание завершения сортировки второго блока могло бы закончиться взаимоблокировкой, потому что число потоков ограничено, и некоторые из них ждут. Очень легко оказаться в ситуации, когда все потоки ждут завершения сортировки блоков, и ни один ничего не делает. Тогда мы решили эту проблему, заставив поток извлекать блоки из стека и сортировать их, пока тот конкретный блок, которого он ждет, еще не отсортирован.

Точно такая же проблема возникает при использовании одного из рассмотренных выше простых пулов потоков вместо std::async, как в примере из главы 4. Число потоков тоже ограничено, и может случиться так, что все они будут ждать задач, которые еще запланированы, так как нет свободных потоков. И решение должно быть таким же,

как в главе 8: обрабатывать стоящие в очереди блоки во время ожидания завершения сортировки своего блока. Но если мы применяем пул потоков для управления списком задач и их ассоциациями с потоками — а именно в этом и состоит смысл пула потоков, — то доступа к списку задач у нас нет. Поэтому необходимо модифицировать сам пул, чтобы он делал это автоматически.

Проще всего будет добавить в класс thread_pool новую функцию, чтобы исполнять задачу из очереди и управлять циклом самостоятельно. Так мы и поступим. Более развитые реализации пула могли бы включить дополнительную логику в функцию ожидания или добавить другие функции ожидания для обработки этого случая, быть может, даже назначая приоритеты ожидаемым задачам. В листинге ниже приведена новая функция run_pending_task(), а модифицированный алгоритм Quicksort, в котором она используется, показан в листинге 9.5.

Листинг 9.4. Реализация функции run_pending_task()

void thread_pool::run_pending_task() {

 function_wrapper task;

 if (work_queue.try_pop(task)) {

  task();

 } else {

  std::this_thread::yield();

 }

}

Код run_pending_task() вынесен из главного цикла внутри функции worker_thread(), которую теперь можно будет изменить, так чтобы она вызывала run_pending_task(). Функция run_pending_task() пытается получить задачу из очереди и в случае успеха выполняет ее; если очередь пуста, то функция уступает управление ОС, чтобы та могла запланировать другой поток. Показанная ниже реализация Quicksort гораздо проще, чем версия в листинге 8.1, потому что вся логика управления потоками перенесена в пул.

Листинг 9.5. Реализация Quicksort на основе пула потоков

template

struct sorter {    ←(1)

 thread_pool pool; ←(2)

 std::list do_sort(std::list& chunk_data) {

  if (chunk_data.empty()) {

   return chunk_data;

  }

  std::list result;

  result.splice(result.begin(), chunk_data, chunk_data.begin());

  T const& partition_val = *result.begin();

  typename std::list::iterator divide_point =

Перейти на страницу:

Похожие книги