unsigned long const block_size = length / num_threads;

 typedef typename Iterator::value_type value_type;

 std::vector threads(num_threads - 1);←(13)

 std::vector >

  end_values(num_threads - 1);                     ←(14)

 std::vector >

  previous_end_values;                             ←(15)

 previous_end_values.reserve(num_threads - 1);     ←(16)

 join_threads joiner(threads);

 Iterator block_start = first;

 for (unsigned long i = 0; i < (num_threads - 1); ++i) {

  Iterator block_last = block_start;

  std::advance(block_last, block_size – 1); ←(17)

  threads[i] = std::thread(process_chunk(), ←(18)

   block_start, block_last,

   (i !=0 ) ? &previous_end_values[i - 1] : 0, &end_values[i]);

  block_start = block_last;

  ++block_start; ←(19)

  previous_end_values.push_back(end_values[i].get_future());←(20)

 }

 Iterator final_element = block_start;

 std::advance(

  final_element, std::distance(block_start, last) - 1);←(21)

  process_chunk()(block_start, final_element, ←(22)

  (num_threads > 1) ? &previous_end_values.back() : 0, 0);

}

Общая структура этого кода не отличается от рассмотренных ранее алгоритмов: разбиение задачи на блоки с указанием минимального размера блока, обрабатываемого одним потоком (12). В данном случае, помимо вектора потоков (13), мы завели вектор обещаний (14), в котором будут храниться значения последних элементов в каждом блоке, и вектор будущих результатов (15), используемый для получения последнего значения из предыдущего блока. Так как мы знаем, сколько всего понадобится будущих результатов, то можем заранее зарезервировать для них место в векторе (16), чтобы избежать перераспределения памяти при запуске потоков.

Главный цикл выглядит так же, как раньше, только на этот раз нам нужен итератор, который указывает на последний элемент в блоке (17), а не на элемент за последним, чтобы можно было распространить последние элементы поддиапазонов. Собственно обработка производится в объекте-функции process_chunk, который мы рассмотрим ниже; в качестве аргументов передаются итераторы, указывающие на начало и конец блока, а также будущий результат, в котором будет храниться последнее значение из предыдущего диапазона (если таковой существует), и объект-обещание для хранения последнего значения в текущем диапазоне (18).

Запустив поток, мы можем обновить итератор, указывающий на начало блока, не забыв продвинуть его на одну позицию вперед (за последний элемент предыдущего блока) (19), и поместить будущий результат, в котором будет храниться последнее значение в текущем блоке, в вектор будущих результатов, откуда он будет извлечён на следующей итерации цикла (20).

Перед тем как обрабатывать последний блок, мы должны получить итератор, указывающий на последний элемент (21), который передается в process_chunk (22). Алгоритм std::partial_sum не возвращает значения, поэтому по завершении обработки последнего блока больше ничего делать не надо. Можно считать, что операция закончилась, когда все потоки завершили выполнение.

Теперь настало время поближе познакомиться с объектом-функцией process_chunk, который собственно и делает всю содержательную работу (1). Сначала вызывается std::partial_sum для всего блока, включая последний элемент (2), но затем нам нужно узнать, первый это блок или нет (3). Если это не первый блок, то должно быть вычислено значение previous_end_value для предыдущего блока, поэтому нам придется его подождать (4). Чтобы добиться максимального распараллеливания, мы затем сразу же обновляем последний элемент (5), так чтобы это значение можно было передать дальше, в следующий блок (если таковой имеется) (6). Сделав это, мы можем с помощью std::for_each и простой лямбда-функции (7) обновить остальные элементы диапазона.

Если previous_end_value не существует, то это первый блок, поэтому достаточно обновить end_value для следующего блока (опять же, если таковой существует, — может случиться, что блок всего один) (8).

Перейти на страницу:

Похожие книги