futures[i] = task.get_future();

  threads[i] =

   std::thread(std:move(task), block_start, block_end);

  block_start = block_end;

 }

 T last_result = accumulate_block()(block_start, last);

 T result = init;

 for (unsigned long i = 0; i < (num_threads - 1); ++i) {

  result + = futures[i].get(); ←(2)

 }

 result += last_result;

 return result;

}

Создав контейнер потоков, мы создаем объект написанного выше класса (1), который присоединяет все завершившиеся потоки. Теперь явный цикл присоединения можно удалить, точно зная, что при выходе из функции потоки будут присоединены. Отметим, что вызовы futures[i].get() (2) приостанавливают выполнение программы до готовности результатов, поэтому в этой точке явно присоединять потоки не нужно. Этим данный код отличается от первоначальной версии в листинге 8.2, где присоединять потоки было необходимо для нрав ильного заполнения вектора results. Мало того что мы получили безопасный относительно исключений код, так еще и функция стала короче, потому что код присоединения вынесен в новый класс (который, кстати, можно использовать и в других программах).

Обеспечение безопасности относительно исключений при работе с std::async()

Теперь, когда мы знаем, что необходимо для обеспечения безопасности относительно исключений в программе, которая явно управляет потоками, посмотрим, как добиться того же результата при использовании std::async(). Мы уже видели, что в этом случае управление потоками берет на себя библиотека, а запущенный ей поток завершается, когда будущий результат готов. В плане безопасности относительно исключений важно то, что если уничтожить будущий результат, не дождавшись его, то деструктор будет ждать завершения потока. Тем самым мы элегантно решаем проблему утечки потоков, которые еще исполняются и хранят ссылки на данные. В следующем листинге показана безопасная относительно исключений реализация с применением std::async().

Листинг 8.5. Безопасная относительно исключений версия std::accumulate с применением std::async

template

T parallel_accumulate(Iterator first, Iterator last, T init) {

 unsigned long const length = std::distance(first, last);←(1)

 unsigned long const max_chunk_size = 25;

 if (length <= max_chunk_size) {

  return std::accumulate(first, last, init); ←(2)

 } else {

  Iterator mid_point = first;

  std::advance(mid_point, length / 2); ←(3)

  std::future first_half_result =

   std::async(parallel_accumulate, ←(4)

   first, mid_point, init);

  T second_half_result =

   parallel_accumulate(mid_point, last, T());         ←(5)

  return first_half_result.get() + second_half_result;←(6)

 }

}

В этой версии мы распределяем данные рекурсивно, а не вычисляем распределение по блокам заранее, но в целом код намного проще предыдущей версии и при этом безопасен относительно исключений. Как и раньше, сначала мы вычисляем длину последовательности (1), и, если она меньше максимального размера блока, то вызываем std::accumulate напрямую (2). В противном случае находим среднюю точку последовательности (3) и запускаем асинхронную задачу, которая будет обрабатывать левую половину (4). Для обработки правой половины мы вызываем себя рекурсивно (5), а затем складываем результаты обработки обеих половин (6). Библиотека гарантирует, что std::async задействует имеющийся аппаратный параллелизм, не создавая слишком большого количества потоков. Некоторые «асинхронные» вызовы на самом деле будут исполняться синхронно при обращении к get() (6).

Перейти на страницу:

Похожие книги