std::future(4)
work_queue.push(std::move(task)); ←(5)
return res; ←(6)
}
// остальное, как и раньше
};
Прежде всего отметим, что модифицированная функция submit() (1) возвращает объект std::future<>, который будет содержать возвращенное задачей значение и позволит вызывающей программе ждать ее завершения. Для этого нам необходимо знать тип значения, возвращаемого переданной функцией f, и здесь на помощь приходит шаблон std::result_of<>: std::result_of — это тип результата, возвращенного вызовом объекта типа FunctionType (например, f) без аргументов. Выражение std::result_of<> мы используем также в определении псевдонима типа result_type (2) внутри функции.
Затем f обертывается объектом std::packaged_task (3), потому что f — функция или допускающий вызов объект, который не принимает параметров и возвращает результат типа result_type. Теперь мы можем получить будущий результат из std::packaged_task<> (4), перед тем как помещать задачу в очередь (5) и возвращать будущий результат (6). Отметим, что при помещении задачи в очередь мы должны использовать функцию std::move(), потому что класс std::packaged_task<> не допускает копирования. Именно поэтому в очереди хранятся объекты function_wrapper, а не объекты типа.
Этот пул позволяет ожидать завершения задач и получать возвращаемые ими результаты. В листинге ниже показано, как выглядит функция parallel_accumulate, работающая с таким пулом потоков.
Листинг 9.3. Функция parallel_accumulate, реализованная с помощью пула потоков, допускающего ожидание задач
template
T parallel_accumulate(Iterator first, Iterator last, T init) {
unsigned long const length = std::distance(first, last);
if (!length)
return init;
unsigned long const block_size = 25;
unsigned long const num_blocks =
(length + block_size - 1) / block_size; ←(1)
std::vector
thread_pool pool;
Iterator block_start = first;
for (unsigned long i = 0; i < (num_blocks - 1); ++i) {
Iterator block_end = block_start;
std::advance(block_end, block_size);
futures[i] = pool.submit(accumulate_block(2)
block_start = block_end;
}
T last_result =
accumulate_block
T result = init;
for (unsigned long i = 0; i < (num_blocks - 1); ++i) {
result += futures[i].get();
}
result += last_result;
return result;
}
Сравнивая этот код с листингом 8.4, следует обратить внимание на две вещи. Во-первых, мы работаем с количеством блоков (num_blocks (1)), а не потоков. Чтобы в полной мере воспользоваться масштабируемостью пула потоков, мы должны разбить работу на максимально мелкие блоки, с которыми имеет смысл работать параллельно. Если потоков в пуле немного, то каждый поток будет обрабатывать много блоков, но по мере роста числа потоков, поддерживаемых оборудованием, будет расти и количество блоков, обрабатываемых параллельно.
Но, выбирая «максимально мелкие блоки, с которыми имеет смысл работать параллельно», будьте осторожны. Отправка задачи пулу потоков, выбор ее рабочим потоком из очереди и передача возвращенного значения с помощью std::future<> — всё это операции не бесплатные, и для совсем мелких задач они не окупятся.
В предположении, что размер блока выбран разумно, вам не надо заботиться об упаковке задач, получении будущих результатов и хранении объектов std::thread, чтобы впоследствии их можно было присоединить; все это пул берет на себя. Вам остается лишь вызвать функцию submit(), передав ей свою задачу (2).