По своей структуре этот алгоритм похож на параллельную версию std::accumulate, описанную в разделе 8.4.1, только вместо вычисления суммы элементов он применяет к ним заданную функцию. На первый взгляд, это должно бы существенно упростить код, потому что не нужно возвращать никакой результат. Но если мы собираемся передавать исключения вызывающей программе, то все равно придется воспользоваться механизмами std::packaged_task и std::future, чтобы передавать исключения из одного потока в другой. Ниже приведен пример реализации.
Листинг 8.7. Параллельная реализация std::for_each
template
void parallel_for_each(Iterator first, Iterator last, Func f) {
unsigned long const length = std::distance(first, last);
if (!length)
return;
unsigned long const min_per_thread = 25;
unsigned long const max_threads =
(length + min_per_thread - 1) / min_per_thread;
unsigned long const hardware_threads =
std::thread::hardware_concurrency();
unsigned long const num_threads =
std::min(
hardware_threads != 0 ? hardware_threads : 2, max_threads);
unsigned long const block_size = length / num_threads;
std::vector(1)
std::vector
join_threads joiner(threads);
Iterator block_start = first;
for (unsigned long i = 0; i < (num_threads - 1); ++i) {
Iterator block_end = block_start;
std::advance(block_end, block_size);
std::packaged_task(2)
[=]() {
std::for_each(block_start, block_end, f);
});
futures[i] = task.get_future();
threads[i] = std::thread(std::move(task)); ←(3)
block_start = block_end;
}
std::for_each(block_start, last, f);
for (unsigned long i = 0; i < (num_threads - 1); ++i) {
futures[i].get(); ←(4)
}
}
Структурно код ничем не отличается от приведенного в листинге 8.4, что и неудивительно. Основное различие состоит в том, что в векторе futures хранятся объекты std::future (1), потому что рабочие потоки не возвращают значение, а в качестве задачи мы используем простую лямбда-функцию, которая вызывает функцию f для элементов из диапазона от block_start до block_end (2). Это позволяет не передавать конструктору потока (3) диапазон. Поскольку рабочие потоки ничего не возвращают, обращения к futures[i].get() (4) служат только для получения исключений, возникших в рабочих потоках; если мы не хотим передавать исключения, то эти обращения можно вообще опустить.
Реализацию parallel_for_each можно упростить, воспользовавшись std::async, — точно так же, как мы делали при распараллеливании std::accumulate.
Листинг 8.8. Параллельная реализация std::for_each с применением std::async
template
void parallel_for_each(Iterator first, Iterator last, Func f) {
unsigned long const length = std::distance(first, last);
if (!length)
return;
unsigned long const min_per_thread = 25;
if (length < (2 * min_per_thread)) {
std::for_each(first, last, f); ←(1)
} else {
Iterator const mid_point = first + length / 2;
std::future(2)
std::async(∥_for_each
first, mid_point, f);
parallel_for_each(mid_point, last, f); ←(3)
first_half.get(); ←(4)
}
}