futures[i] = task.get_future();
threads[i] =
std::thread(std:move(task), block_start, block_end);
block_start = block_end;
}
T last_result = accumulate_block()(block_start, last);
T result = init;
for (unsigned long i = 0; i < (num_threads - 1); ++i) {
result + = futures[i].get(); ←(2)
}
result += last_result;
return result;
}
Создав контейнер потоков, мы создаем объект написанного выше класса (1), который присоединяет все завершившиеся потоки. Теперь явный цикл присоединения можно удалить, точно зная, что при выходе из функции потоки будут присоединены. Отметим, что вызовы futures[i].get() (2) приостанавливают выполнение программы до готовности результатов, поэтому в этой точке явно присоединять потоки не нужно. Этим данный код отличается от первоначальной версии в листинге 8.2, где присоединять потоки было необходимо для нрав ильного заполнения вектора results. Мало того что мы получили безопасный относительно исключений код, так еще и функция стала короче, потому что код присоединения вынесен в новый класс (который, кстати, можно использовать и в других программах).
std::async()Теперь, когда мы знаем, что необходимо для обеспечения безопасности относительно исключений в программе, которая явно управляет потоками, посмотрим, как добиться того же результата при использовании std::async(). Мы уже видели, что в этом случае управление потоками берет на себя библиотека, а запущенный ей поток завершается, когда будущий результат std::async().
Листинг 8.5. Безопасная относительно исключений версия std::accumulate с применением std::async
template
T parallel_accumulate(Iterator first, Iterator last, T init) {
unsigned long const length = std::distance(first, last);←(1)
unsigned long const max_chunk_size = 25;
if (length <= max_chunk_size) {
return std::accumulate(first, last, init); ←(2)
} else {
Iterator mid_point = first;
std::advance(mid_point, length / 2); ←(3)
std::future
std::async(parallel_accumulate(4)
first, mid_point, init);
T second_half_result =
parallel_accumulate(mid_point, last, T()); ←(5)
return first_half_result.get() + second_half_result;←(6)
}
}
В этой версии мы распределяем данные рекурсивно, а не вычисляем распределение по блокам заранее, но в целом код намного проще предыдущей версии и при этом std::accumulate напрямую (2). В противном случае находим среднюю точку последовательности (3) и запускаем асинхронную задачу, которая будет обрабатывать левую половину (4). Для обработки правой половины мы вызываем себя рекурсивно (5), а затем складываем результаты обработки обеих половин (6). Библиотека гарантирует, что std::async задействует имеющийся аппаратный параллелизм, не создавая слишком большого количества потоков. Некоторые «асинхронные» вызовы на самом деле будут исполняться синхронно при обращении к get() (6).