std::partition(chunk_data.begin(), chunk_data.end(),

    [&](T const& val){ return val < partition_val; });

  std::list new_lower_chunk;

  new_lower_chunk.splice(new_lower_chunk.end(),

   chunk_data, chunk_data.begin(),

   divide_point);

  std::future > new_lower = ←(3)

  pool.submit(std::bind(&sorter::do_sort, this,

   std::move(new_lower_chunk)));

  std::list new_higher(do_sort(chunk_data));

  result.splice(result.end(), new_higher);

  while (!new_lower.wait_for(std::chrono::seconds(0)) ==

   std::future_status::timeout) {

   pool.run_pending_task(); ←(4)

  }

  result.splice(result.begin(), new_lower.get());

  return result;

 }

};

template

std::list parallel_quick_sort(std::list input) {

 if (input.empty()) {

  return input;

 }

 sorter s;

 return s.do_sort(input);

}

Как и в листинге 8.1, реальная работа делегируется функции-члену do_sort() шаблона класса sorter (1), хотя в данном случае этот шаблон нужен лишь для обертывания экземпляра thread_pool (2).

Управление потоками и задачами теперь свелось к отправке задачи пулу (3) и исполнению находящихся в очереди задач в цикле ожидания (4). Это гораздо проще, чем в листинге 8.1, где нужно было явно управлять потоками и стеком подлежащих сортировке блоков. При отправке задачи пулу мы используем функцию std::bind(), чтобы связать указатель this с do_sort() и передать подлежащий сортировке блок. В данном случае мы вызываем std::move(), чтобы данные new_lower_chunk перемещались, а не копировались.

Мы решили проблему взаимоблокировки, возникающую из- за того, что одни потоки ждут других, но этот пул все еще далек от идеала. Отметим хотя бы, что все вызовы submit() и run_pending_task() обращаются к одной и той же очереди. В главе 8 мы видели, что модификация одного набора данных из разных потоков может негативно сказаться на производительности, стало быть, с этим нужно что-то делать.

<p>9.1.4. Предотвращение конкуренции за очередь работ</p>

Всякий раз, как поток вызывает функцию submit() экземпляра пула потоков, он помещает новый элемент в единственную разделяемую очередь работ. А рабочие потоки постоянно извлекают элементы из той же очереди. Следовательно, по мере увеличения числа процессоров будет возрастать конкуренция за очередь работ. Это может ощутимо отразиться на производительности; даже при использовании свободной от блокировок очереди, в которой нет явного ожидания, драгоценное время может тратиться на перебрасывание кэша.

Чтобы избежать перебрасывания кэша, мы можем завести по одной очереди работ на каждый поток. Тогда каждый поток будет помещать новые элементы в свою собственную очередь и брать работы из глобальной очереди работ только тогда, когда в его очереди работ нет. В следующем листинге приведена реализация с использованием переменной типа thread_local, благодаря которой каждый поток обладает собственной очередью работ в дополнение к глобальной.

Листинг 9.6. Пул с очередями в поточно-локальной памяти

class thread_pool {

 thread_safe_queue pool_work_queue;

 typedef std::queue local_queue_type;←(1)

 static thread_local std::unique_ptr

  local_work_queue; ←(2)

 void worker_thread() {

  local_work_queue.reset(new local_queue_type);←(3)

  while (!done) {

   run_pending_task();

  }

 }

public:

 template

 std::future::type>

 submit(FunctionType f) {

  typedef typename std::result_of::type

   result_type;

  std::packaged_task task(f);

  std::future res(task.get_future());

  if (local_work_queue) { ←(4)

   local_work_queue->push(std::move(task));

  } else {

   pool_work_queue.push(std::move(task)); ←(5)

  }

  return res;

 }

 void run_pending_task() {

  function_wrapper task;

Перейти на страницу:

Похожие книги