typedef typename Iterator::value_type value_type;

 struct process_element { ←(1)

  void operator()(Iterator first, Iterator last,

   std::vector& buffer,

   unsigned i, barrier& b) {

   value_type& ith_element = *(first + i);

   bool update_source = false;

   for (unsigned step = 0, stride = 1;

        stride <= i; ++step, stride *= 2) {

    value_type const& source = (step % 2) ? ←(2)

     buffer[i] : ith_element;

    value_type& dest = (step % 2) ?

     ith_element:buffer[i];

    value_type const& addend = (step % 2) ? ←(3)

     buffer[i - stride] : *(first + i – stride);

    dest = source + addend; ←(4)

    update_source = !(step % 2);

    b.wait(); ←(5)

   }

   if (update_source) { ←(6)

    ith_element = buffer[i];

   }

   b.done_waiting(); ←(7)

  }

 };

 unsigned long const length = std::distance(first, last);

 if (length <= 1)

  return;

 std::vector buffer(length);

 barrier b(length);

 std::vector threads(length - 1); ←(8)

 join_threads joiner(threads);

 Iterator block_start = first;

 for (unsigned long i = 0; i < (length - 1); ++i) {

  threads[i] = std::thread(process_element(), first, last,←(9)

   std::ref(buffer), i, std::ref(b));

 }

 process_element()(first, last, buffer, length - 1, b);   ←(10)

}

Общая структура кода вам, наверное, уже понятна. Имеется класс с оператором вызова (process_element), который выполняет содержательную работу (1) и вызывается из нескольких потоков (9), хранящихся в векторе (8), а также из главного потока (10). Важное отличие заключается в том, что теперь число потоков зависит от числа элементов в списке, а не от результата, возвращаемого функцией std::thread::hardware_concurrency. Я уже говорил, что эта идея не слишком удачна, если только вы не работаете на машине с массивно параллельной архитектурой, где потоки обходятся дешево. Но это неотъемлемая часть самой идеи решения. Можно было бы обойтись и меньшим числом потоков, поручив каждому обработку нескольких значений из исходного диапазона, но тогда при относительно небольшом количестве потоков этот алгоритм оказался бы менее эффективен, чем алгоритм с прямым распространением.

Так или иначе, основная работа выполняется в операторе вызове из класса process_element. На каждом шаге берется либо i-ый элемент из исходного диапазона, либо i-ый элемент из буфера (2) и складывается с предшествующим элементом, отстоящим от него на расстояние stride (3); результат сохраняется в буфере, если мы читали из исходного диапазона, и в исходном диапазоне — если мы читали из буфера (4). Перед тем как переходить к следующему шагу, мы ждем у барьера (5). Работа заканчивается, когда элемент, отстоящий на расстояние stride, оказывается слева от начала диапазона. В этом случае мы должно обновить элемент в исходном диапазоне, если сохраняли окончательный результат в буфере (6). Наконец, мы вызываем функцию done_waiting() (7), сообщая барьеру, что больше ждать не будем.

Отметим, что это решение не безопасно относительно исключений. Если один из рабочих потоков возбудит исключение в process_element, то приложение аварийно завершится. Решить эту проблему можно было бы, воспользовавшись std::promise для запоминания исключения, как в реализации parallel_find из листинга 8.9, или просто с помощью объекта std::exception_ptr, защищенного мьютексом.

Вот и подошли к концу обещанные три примера. Надеюсь, они помогли уложить в сознании соображения, высказанные в разделе 8.1, 8.2, 8.3 и 8.4, и показали, как описанные приемы воплощаются в реальном коде.

<p>8.6. Резюме</p>
Перейти на страницу:

Похожие книги