Как и раньше, в этой реализации предполагается, что мы собираемся использовать все доступные аппаратные потоки или располагаем каким-то механизмом, который позволит заранее определить количество потоков для предварительного разделения между ними работы. И снова мы можем упростить код, воспользовавшись функцией std::async и рекурсивным разбиением данных, если готовы принять автоматический механизм масштабирования, скрытый в стандартной библиотеке С++. Реализация parallel_find с применением std::async приведена в листинге ниже.

Листинг 8.10. Параллельная реализация алгоритма find() с применением std::async

template(1)

Iterator parallel_find_impl(Iterator first, Iterator last,

 MatchType match,

 std::atomic& done) {

 try {

  unsigned long const length = std::distance(first, last);

  unsigned long const min_per_thread = 25;          ←(2)

  if (length < (2 * min_per_thread)) {              ←(3)

   for(; (first != last) && !done.load(); ++first) {←(4)

    if (*first == match) {

     done = true;                                   ←(5)

     return first;

    }

   }

   return last; ←(6)

  } else {

   Iterator const mid_point = first + (length / 2);    ←(7)

   std::future async_result =

    std::async(∥_find_impl,←(8)

   mid_point, last, match, std::ref(done));

   Iterator const direct_result =

    parallel_find_impl(first, mid_point, match, done);  ←(9)

   return (direct_result == mid_point) ?

    async_result.get() : direct_result; ←(10)

  }

 } catch (...) {

  done = true; ←(11)

  throw;

 }

}

template

Iterator parallel_find(

 Iterator first, Iterator last, MatchType match) {

 std::atomic done(false);

 return parallel_find_impl(first, last, match, done); ←(12)

}

Желание закончить поиск досрочно при обнаружении совпадения заставило нас ввести флаг, разделяемый между всеми потоками. Этот флаг, следовательно, нужно передавать во все рекурсивные вызовы. Проще всего сделать это, делегировав работу отдельной функции (1), которая принимает дополнительный параметр — ссылку на флаг done, передаваемый из главной точки входа (12).

Основная же ветвь кода не таит никаких неожиданностей. Как и во многих предыдущих реализациях, мы задаем минимальное количество элементов, обрабатываемых в одном потоке (2); если размер обеих половин диапазона меньше этой величины, то весь диапазон обрабатывается в текущем потоке (3). Собственно алгоритм сводится к простому циклу — он продолжается, пока не будет достигнут конец заданного диапазона или не установлен флаг done (4). При обнаружении совпадения мы устанавливаем флаг done и выходим из функции (5). Если мы дошли до конца списка или вышли из цикла, потому что другой поток установил флаг done, то возвращаем значение last, означающее, что совпадение не найдено (6).

Если диапазон можно разбивать, то мы сначала находим среднюю точку (7), а потом через std::async запускаем поиск во второй половине диапазона (8), не забыв передать ссылку на флаг done с помощью std::ref. Одновременно мы просматриваем первую половину диапазона, рекурсивно вызывая себя же (9). И асинхронный, и рекурсивный вызов могут разбивать диапазон и дальше, если он достаточно велик.

Если прямой рекурсивный вызов вернул mid_point, значит, он не нашел совпадения, поэтому нужно получить результат асинхронного поиска. Если и в той половине ничего не было найдено, то мы получим last (10). Если «асинхронный» вызов на самом деле был не асинхронным, а отложенным, то выполняться он начнет именно при обращении к get(); в таком случае поиск во второй половине списке вообще не будет производиться, если поиск в первой оказался успешным. Если же асинхронный поиск действительно выполнялся в другом потоке, то деструктор переменной async_result будет ждать завершения этого потока, поэтому утечки потоков не произойдет.

Перейти на страницу:

Похожие книги