Iterator parallel_find(Iterator first, Iterator last,
MatchType match) {
struct find_element { ←(1)
void operator()(Iterator begin, Iterator end,
MatchType match,
std::promise
std::atomic
try {
for(; (begin != end) && !done_flag->load(); ++begin) {←(2)
if (*begin == match) {
result->set_value(begin);←(3)
done_flag->store(true); ←(4)
return;
}
}
} catch (...) { ←(5)
try {
result->set_exception(std::current_exception());←(6)
done_flag->store(true);
} catch (...) ←(7)
{}
}
}
};
unsigned long const length = std::distance(first, last);
if (!length)
return last;
unsigned long const min_per_thread = 25;
unsigned long const max_threads =
(length + min_per_thread — 1) / min_per_thread;
unsigned long const hardware_threads =
std::thread::hardware_concurrency();
unsigned long const num_threads =
std::min(
hardware_threads != 0 ? hardware_threads : 2, max_threads);
unsigned long const block_size = length / num_threads;
std::promise(8)
std::atomic(9)
std::vector(10)
join_threads joiner(threads);
Iterator block_start = first;
for (unsigned long i = 0; i < (num_threads - 1); ++i) {
Iterator block_end = block_start;
std::advance(block_end, block_size);
threads[i] = std::thread(find_element(), ←(11)
block_start, block_end, match,
&result, &done_flag);
block_start = block_end;
}
find_element()(
block_start, last, match, &result, &done_flag);←(12)
if (!done_flag.load()) { ←(13)
return last;
}
return result.get_future().get() ←(14)
}
В основе своей код в листинге 8.9 похож на предыдущие примеры. На этот раз вся работа производится в операторе вызова, определенном в локальном классе find_element (1). Здесь мы в цикле обходим элементы из назначенного потоку блока, проверяя флаг на каждой итерации (2). Если искомый элемент найден, то мы записываем окончательный результат в объект-обещание (3) и перед возвратом устанавливаем флаг done_flag (4).
Если было возбуждено исключение, то его перехватит универсальный обработчик (5) и попытается сохранить исключение в обещании (6) перед установкой done_flag. Но установка значения объекта-обещания может возбудить исключение, если значение уже установлено, поэтому мы перехватываем и игнорируем любые возникающие здесь исключения (7).
Это означает, что если поток, вызвавший find_element, найдет искомый элемент или возбудит исключение, то все остальные потоки увидят поднятый флаг done_flag и прекратят работу. Если несколько потоков одновременно найдут искомое или возбудят исключение, то возникнет гонка за установку результата в обещании. Но это безобидная гонка: победивший поток считается «первым», и установленный им результат приемлем.
В самой функции parallel_find мы определяем обещание (8) и флаг прекращения поиска (9), которые передаем новым потокам вместе с диапазоном для просмотра (11). Кроме того, главный поток пользуется классом find_element для поиска среди оставшихся элементов (12). Как уже отмечалось, мы должны дождаться завершения всех потоков перед тем, как проверять результат, потому что искомого элемента может вообще не оказаться. Для этого мы заключаем код запуска и присоединения потоков в блок (10), так что к моменту проверки флага (13) все потоки гарантировано присоединены. Если элемент был найден, то, обратившись к функции get() объекта std::future, мы либо получим результат из обещания, либо возбудим сохраненное исключение.