Понимание того, что последний шаг является редукцией, важно; в наивной реализации, представленной в листинге 2.8, финальная редукция выполняется как последовательная операция. Однако часто и ее можно распараллелить; операция accumulate по сути дела
Хотя это действенная техника, применима она не в любой ситуации. Иногда данные не удается заранее распределить равномерно, а как это сделать, становится понятно только в процессе обработки. Особенно наглядно это проявляется в таких рекурсивных алгоритмах, как Quicksort; здесь нужен другой подход.
8.1.2. Рекурсивное распределение данных
Алгоритм Quicksort состоит из двух шагов: разбиение данных на две части — до и после опорного элемента в смысле требуемого упорядочения, и рекурсивная сортировка обеих «половин». Невозможно распараллелить этот алгоритм, разбив данные заранее, потому что состав каждой «половины» становится известен только в процессе обработки элементов. Поэтому распараллеливание по необходимости должно быть рекурсивным. На каждом уровне рекурсии производится quick_sort, потому что предстоит отсортировать элементы, меньшие опорного, и большие опорного. Эти рекурсивные вызовы не зависят друг от друга, так как обращаются к разным элементам, поэтому являются идеальными кандидатами для параллельного выполнения. На рис. 8.2 изображено такое рекурсивное разбиение.
Рис. 8.2. Рекурсивное разбиение данных
В главе 4 была приведена подобная реализация. Вместо того чтобы просто вызывать функцию рекурсивно для большей и меньшей «половины», мы с помощью std::async() на каждом шаге запускали асинхронную задачу для меньшей половины. Вызывая std::async(), мы просим стандартную библиотеку С++ самостоятельно решить, имеет ли смысл действительно выполнять задачу в новом потоке или лучше сделать это синхронно.
Это существенно: при сортировке большого набора данных запуск нового потока для каждого рекурсивного вызова быстро приводит к образованию чрезмерного количества потоков. Как мы увидим ниже, когда потоков слишком много, производительность может не возрасти, а наоборот std::async(), но есть и другие варианты.
Один из них — воспользоваться функцией std::thread::hardware_concurrency() для выбора нужного числа потоков, как мы делали в параллельной версии accumulate() из листинга 2.8. Тогда вместо того чтобы запускать новый поток для каждого рекурсивного вызова, мы можем просто поместить подлежащий сортировке блок данных в потокобезопасный стек типа того, что был описан в главах 6 и 7. Если потоку больше нечего делать — потому что он закончил обработку всех своих блоков или потому что ждет, когда необходимый ему блок будет отсортирован, то он может взять блок из стека и заняться его сортировкой.
В следующем листинге показана реализация этой идеи.
Листинг 8.1. Параллельный алгоритм Quicksort с применением стека блоков, ожидающих сортировки
template
struct sorter { ←(1)
struct chunk_to_sort {
std::list
std::promise
};
thread_safe_stack(2)
std::vector(3)
unsigned const max_thread_count;
std::atomic
sorter():
max_thread_count(std::thread::hardware_concurrency() - 1),
end_of_data(false) {}
~sorter() { ←(4)
end_of_data = true; ←(5)
for (unsigned i = 0; i < threads.size(); ++i) {
threads[i].join(); ←(6)
}
}
void try_sort_chunk() {
boost::shared_ptr(7)
if (chunk) {
sort_chunk(chunk); ←(8)
}
}
std::list(9)
if (chunk_data.empty()) {
return chunk_data;
}
std::list
result.splice(result.begin(), chunk_data, chunk_data.begin());