std::future res(task.get_future());     ←(4)

  work_queue.push(std::move(task));                    ←(5)

  return res;                                          ←(6)

 }

 // остальное, как и раньше

};

Прежде всего отметим, что модифицированная функция submit() (1) возвращает объект std::future<>, который будет содержать возвращенное задачей значение и позволит вызывающей программе ждать ее завершения. Для этого нам необходимо знать тип значения, возвращаемого переданной функцией f, и здесь на помощь приходит шаблон std::result_of<>: std::result_of::type — это тип результата, возвращенного вызовом объекта типа FunctionType (например, f) без аргументов. Выражение std::result_of<> мы используем также в определении псевдонима типа result_type (2) внутри функции.

Затем f обертывается объектом std::packaged_task (3), потому что f — функция или допускающий вызов объект, который не принимает параметров и возвращает результат типа result_type. Теперь мы можем получить будущий результат из std::packaged_task<> (4), перед тем как помещать задачу в очередь (5) и возвращать будущий результат (6). Отметим, что при помещении задачи в очередь мы должны использовать функцию std::move(), потому что класс std::packaged_task<> не допускает копирования. Именно поэтому в очереди хранятся объекты function_wrapper, а не объекты типа.

Этот пул позволяет ожидать завершения задач и получать возвращаемые ими результаты. В листинге ниже показано, как выглядит функция parallel_accumulate, работающая с таким пулом потоков.

Листинг 9.3. Функция parallel_accumulate, реализованная с помощью пула потоков, допускающего ожидание задач

template

T parallel_accumulate(Iterator first, Iterator last, T init) {

 unsigned long const length = std::distance(first, last);

 if (!length)

  return init;

 unsigned long const block_size = 25;

 unsigned long const num_blocks =

  (length + block_size - 1) / block_size; ←(1)

 std::vector > futures(num_blocks-1);

 thread_pool pool;

 Iterator block_start = first;

 for (unsigned long i = 0; i < (num_blocks - 1); ++i) {

  Iterator block_end = block_start;

  std::advance(block_end, block_size);

  futures[i] = pool.submit(accumulate_block());←(2)

  block_start = block_end;

 }

 T last_result =

  accumulate_block()(block_start, last);

 T result = init;

 for (unsigned long i = 0; i < (num_blocks - 1); ++i) {

  result += futures[i].get();

 }

 result += last_result;

 return result;

}

Сравнивая этот код с листингом 8.4, следует обратить внимание на две вещи. Во-первых, мы работаем с количеством блоков (num_blocks (1)), а не потоков. Чтобы в полной мере воспользоваться масштабируемостью пула потоков, мы должны разбить работу на максимально мелкие блоки, с которыми имеет смысл работать параллельно. Если потоков в пуле немного, то каждый поток будет обрабатывать много блоков, но по мере роста числа потоков, поддерживаемых оборудованием, будет расти и количество блоков, обрабатываемых параллельно.

Но, выбирая «максимально мелкие блоки, с которыми имеет смысл работать параллельно», будьте осторожны. Отправка задачи пулу потоков, выбор ее рабочим потоком из очереди и передача возвращенного значения с помощью std::future<> — всё это операции не бесплатные, и для совсем мелких задач они не окупятся. Если размер задачи слишком мал, то программа, в которой используется пул потоков, может работать медленнее, чем однопоточная.

В предположении, что размер блока выбран разумно, вам не надо заботиться об упаковке задач, получении будущих результатов и хранении объектов std::thread, чтобы впоследствии их можно было присоединить; все это пул берет на себя. Вам остается лишь вызвать функцию submit(), передав ей свою задачу (2).

Перейти на страницу:

Похожие книги