В главе 4 мы уже познакомились с подходящим для решения проблемы средством. Для чего нам вообще нужны новые потоки? Для того чтобы вычислить результат и при этом учесть возможность возникновения исключений. Но это std::packaged_task и std::future. В листинге 8.3 показан код, переписанный с использованием std::packaged_task.
Листинг 8.3. Параллельная версия std::accumulate с применением std::packaged_task
template
struct accumulate_block {
T operator()(Iterator first, Iterator last) {←(1)
return std::accumulate(first, last, T()); ←(2)
}
};
template
T parallel_accumulate(Iterator first, Iterator last, T init) {
unsigned long const length = std::distance(first, last);
if (!length)
return init;
unsigned long const min_per_thread = 25;
unsigned long const max_threads =
(length + min_per_thread — 1) / min_per_thread;
unsigned long const hardware_threads =
std::thread::hardware_concurrency();
unsigned long const num_threads =
std::min(
hardware_threads i = 0 ? hardware_threads : 2, max_threads);
unsigned long const block_size = length / num_threads;
std::vector(3)
std::vector
Iterator block_start = first;
for (unsigned long i = 0; i < (num_threads - 1); ++i) {
Iterator block_end = block_start;
std::advance(block_end, block_size);
std::packaged_task(4)
accumulate_block
futures[i] = task.get_future(); ←(5)
threads[i] =
std::thread(std::move(task), block_start, block_end);←(6)
block_start = block_end;
}
T last_result = accumulate_block()(block_start, last); ←(7)
std::for_each(threads.begin(), threads.end(),
std::mem_fn(&std::thread::join));
T result = init; ←(8)
for (unsigned long i = 0; i < (num_threads - 1); ++i) {
result += futures[i].get(); ←(9)
}
result += last_result; ←(10)
return result;
}
Первое изменение заключается в том, что оператор вызова в accumulate_block теперь возвращает результат по значению, а не принимает ссылку на место, где его требуется сохранить (1). Для обеспечения безопасности относительно исключений мы используем std::packaged_task и std::future, поэтому можем воспользоваться этим и для передачи результата. Правда, для этого требуется при вызове std::accumulate (2) явно передавать сконструированный по умолчанию экземпляр T, а не использовать повторно предоставленное значение result, но это не слишком существенное изменение.
Далее, вместо того заводить вектор результатов, мы создаем вектор futures (3), в котором храним объекты std::future для каждого запущенного потока. В цикле запуска потоков мы сначала создаем задачу для accumulate_block (4). В классе std::packaged_task объявлена задача, которая принимает два объекта Iterator и возвращает T, а это именно то, что делает наша функция. Затем мы получаем будущий результат для этой задачи (5) и исполняем ее в новом потоке, передавая начало и конец обрабатываемого блока (6). Результат работы задачи, равно как и исключение, если оно возникнет, запоминается в объекте future.