В примерах из главы 8, где потоки запускались явно, главный поток после распределения работы между потоками всегда ждал завершения запущенных потоков. Тем самым гарантировалось, что вызывающая программа получит управление только после полного завершения задачи. При использовании пула потоков ждать нужно завершения задачи, переданной пулу, а не самих рабочих потоков. Это похоже на то, как мы ждали будущих результатов при работе с std::async в главе 8. В случае простого пула потоков, показанного в листинге 9.1, организовывать ожидание придется вручную, применяя механизмы, описанные в главе 4: условные переменные и будущие результаты. Это усложняет код; намного удобнее было бы ждать задачу напрямую.

За счет переноса сложности в сам пул потоков мы сумеем добиться желаемого. Функция submit() могла бы возвращать некий описатель задачи, по которому затем можно было бы ждать ее завершения. Сам описатель должен был бы инкапсулировать условную переменную или будущий результат. Это упростило бы код, пользующийся пулом потоков.

Частный случай ожидания завершения запущенной задачи возникает, когда главный поток нуждается в вычисленном ей результате. Мы уже встречались с такой ситуацией выше, например, в функции parallel_accumulate() из главы 2. В таком случае путем использования будущих результатов мы можем объединить ожидание с передачей результата. В листинге 9.2 приведен код модифицированного пула потоков, который разрешает ожидать завершения задачи и передает возвращенный ей результат ожидающему потоку. Поскольку экземпляры класса std::packaged_task<> допускают только перемещение, но не копирование, мы больше не можем воспользоваться классом std::function<> для обертывания элементов очереди, потому что std::function<> требует, чтобы в обернутых объектах-функциях был определён копирующий конструктор. Вместо этого мы напишем специальный класс-обертку, умеющий работать с объектами, обладающими только перемещающим конструктором. Это простой маскирующий тип класс (type-erasure class), в котором определён оператор вызова. Нам нужно поддержать функции, которые не принимают параметров и возвращают void, поэтому оператор всего лишь вызывает виртуальный метод call(), который в свою очередь вызывает обернутую функцию.

Листинг 9.2. Пул потоков, ожидающий завершения задачи

class function_wrapper {

 struct impl_base {

  virtual void call() = 0;

  virtual ~impl_base() {}

 };

 std::unique_ptr impl;

 template

 struct impl_type: impl_base {

  F f;

  impl_type(F&& f_): f(std::move(f_)) {}

  void call() { f(); }

 };

public:

 template function_wrapper(F&& f):

  impl(new impl_type(std::move(f))) {}

 void operator()() { impl->call(); }

 function_wrapper() = default;

 function_wrapper(function_wrapper&& other):

  impl(std::move(other.impl)) {}

 function_wrapper& operator=(function_wrapper&& other) {

  impl = std::move(other.impl);

  return *this;

 }

 function_wrapper(const function_wrapper&) = delete;

 function_wrapper(function_wrapper&) = delete;

 function_wrapper& operator=(const function_wrapper&) = delete;

};

class thread_pool {

 thread_safe_queue work_queue;←┐

                                                 │Используем

 void worker_thread()                            │function_

 {                                               │wrapper

  while (!done)                                  │вместо std::

  {                                              │function

   function_wrapper task;                       ←┘

   if (work_queue.try_pop(task))

    task();

   else

    std::this_thread::yield();

  }

 }

public:

 template

 std::future::type>←(1)

 submit(FunctionType f) {

  typedef typename std::result_of::type

   result_type;                                        ←(2)

  std::packaged_task task(std::move(f));←(3)

Перейти на страницу:

Похожие книги