Поскольку используются будущие результаты, массива results больше нет, поэтому мы должны сохранить результат обработки последнего блока в переменной (7), а не в элементе массива. Кроме того, поскольку мы получаем значения из будущих результатов, проще не вызывать std::accumulate, а написать простой цикл for, в котором к переданному начальному значению (8) будут прибавляться значения, полученные из каждого будущего результата (9). Если какая-то задача возбудит исключение, то оно будет запомнено в будущем результате и повторно возбуждено при обращении к get(). Наконец, перед тем как возвращать окончательный результат вызывающей программе, мы прибавляем результат обработки последнего блока (10).

Таким образом, мы устранили одну из потенциальных проблем: исключения, возбужденные в рабочих потоках, повторно возбуждаются в главном. Если исключения возникнут в нескольких рабочих потоках, то вверх распространится только одно, но это не очень страшно. Если вы считаете, что это все-таки важно, то можете воспользоваться чем-то вроде класса std::nested_exception, чтобы собрать все такие исключения и передать их главному потоку.

Осталось решить проблему утечки потоков в случае, когда исключение возникает между моментом запуска первого потока и присоединением всех запущенных. Для этого проще всего перехватить любое исключение, дождаться присоединения потоков, которые все еще находятся в состоянии joinable(), а потом возбудить исключение повторно:

try {

 for (unsigned long i = 0; i < (num_threads - 1); ++i) {

  // ... как и раньше

 }

 T last_result = accumulate_block()(block_start, last);

 std::for_each(threads.begin(), threads.end(),

 std::mem_fn(&std::thread::join));

} catch (...) {

 for (unsigned long i = 0; i < (num_thread - 1); ++i) {

 if (threads[i].joinable())

  thread[i].join();

 }

 throw;

}

Теперь все работает. Все потоки будут присоединены вне зависимости от того, как завершилась обработка блока. Однако блоки try-catch выглядят некрасиво, и часть кода дублируется. Мы присоединяем потоки как в «нормальной» ветке, так и в блоке catch. Дублирование кода — вещь почти всегда нежелательная, потому что изменения придётся вносить в несколько мест. Давайте лучше перенесём этот код в деструктор — ведь именно такова идиома очистки ресурсов в С++. Вот как выглядит этот класс:

class join_threads {

 std::vector& threads;

public:

 explicit join_threads(std::vector& threads_):

  threads(threads_) {}

 ~join_threads() {

  for (unsigned long i = 0; i < threads.size(); ++i) {

   if (threads[i].joinable())

    threads[i].join();

  }

 }

};

Это похоже на класс thread_guard из листинга 2.3, только на этот раз мы «охраняем» целый вектор потоков. Теперь наш код упрощается.

Листинг 8.4. Безопасная относительно исключений версия std::accumulate

template

T parallel_accumulate(Iterator first, Iterator last, T init) {

 unsigned long const length = std::distance(first, last);

 if (!length)

  return init;

 unsigned long const min_per_thread = 25;

 unsigned long const max_threads =

  (length + min_per_thread - 1) / min_per_thread;

 unsigned long const hardware_threads =

  std::thread::hardware_concurrency();

 unsigned long const num_threads =

  std::min(

   hardware_threads i = 0 ? hardware_threads : 2, max_threads);

 unsigned long const block_size = length / num_threads;

 std::vector > futures(num_threads — 1);

 std::vector threads(num_threads - 1);

 join_threads joiner(threads); ←(1)

 Iterator block_start = first;

 for (unsigned long i = 0; i < (num_threads - 1); ++i) {

  Iterator block_end = block_start;

  std::advance(block_end, block_size);

  std::packaged_task task(

   accumulate_block());

Перейти на страницу:

Похожие книги