В главе 4 мы уже познакомились с подходящим для решения проблемы средством. Для чего нам вообще нужны новые потоки? Для того чтобы вычислить результат и при этом учесть возможность возникновения исключений. Но это именно то, для чего предназначено сочетание std::packaged_task и std::future. В листинге 8.3 показан код, переписанный с использованием std::packaged_task.

Листинг 8.3. Параллельная версия std::accumulate с применением std::packaged_task

template

struct accumulate_block {

 T operator()(Iterator first, Iterator last) {←(1)

  return std::accumulate(first, last, T());   ←(2)

 }

};

template

T parallel_accumulate(Iterator first, Iterator last, T init) {

 unsigned long const length = std::distance(first, last);

 if (!length)

  return init;

 unsigned long const min_per_thread = 25;

 unsigned long const max_threads =

  (length + min_per_thread — 1) / min_per_thread;

 unsigned long const hardware_threads =

  std::thread::hardware_concurrency();

 unsigned long const num_threads =

  std::min(

   hardware_threads i = 0 ? hardware_threads : 2, max_threads);

 unsigned long const block_size = length / num_threads;

 std::vector > futures(num_threads-1);←(3)

 std::vector threads(num_threads — 1);

 Iterator block_start = first;

 for (unsigned long i = 0; i < (num_threads - 1); ++i) {

  Iterator block_end = block_start;

  std::advance(block_end, block_size);

  std::packaged_task task( ←(4)

   accumulate_block());

  futures[i] = task.get_future(); ←(5)

  threads[i] =

   std::thread(std::move(task), block_start, block_end);←(6)

  block_start = block_end;

 }

 T last_result = accumulate_block()(block_start, last); ←(7)

 std::for_each(threads.begin(), threads.end(),

  std::mem_fn(&std::thread::join));

 T result = init; ←(8)

 for (unsigned long i = 0; i < (num_threads - 1); ++i) {

  result += futures[i].get(); ←(9)

 }

 result += last_result; ←(10)

 return result;

}

Первое изменение заключается в том, что оператор вызова в accumulate_block теперь возвращает результат по значению, а не принимает ссылку на место, где его требуется сохранить (1). Для обеспечения безопасности относительно исключений мы используем std::packaged_task и std::future, поэтому можем воспользоваться этим и для передачи результата. Правда, для этого требуется при вызове std::accumulate (2) явно передавать сконструированный по умолчанию экземпляр T, а не использовать повторно предоставленное значение result, но это не слишком существенное изменение.

Далее, вместо того заводить вектор результатов, мы создаем вектор futures (3), в котором храним объекты std::future для каждого запущенного потока. В цикле запуска потоков мы сначала создаем задачу для accumulate_block (4). В классе std::packaged_task объявлена задача, которая принимает два объекта Iterator и возвращает T, а это именно то, что делает наша функция. Затем мы получаем будущий результат для этой задачи (5) и исполняем ее в новом потоке, передавая начало и конец обрабатываемого блока (6). Результат работы задачи, равно как и исключение, если оно возникнет, запоминается в объекте future.

Перейти на страницу:

Похожие книги