Мы немало потрудились, но наконец-то дошли до конца, и стек теперь стал куда лучше. За счет тщательно продуманного применения ослабленных операций нам удалось повысить производительность, не жертвуя корректностью. Как видите, реализация pop() теперь насчитывает 37 строк вместо 8 в эквивалентной реализации pop() для стека с блокировками (листинг 7.1) и 7 строк для простого свободного от блокировок стека без управления памятью (листинг 7.2). При рассмотрении свободной от блокировок очереди мы встретимся с аналогичной ситуацией: сложность кода в значительной степени обусловлена именно управлением памятью.

<p>7.2.6. Потокобезопасная очередь без блокировок</p>

Очередь отличается от стека прежде всего тем, что операции push() и pop() обращаются к разным частям структуры данных, тогда как в стеке та и другая работают с головным узлом списка. Следовательно, и проблемы синхронизации тоже другие. Требуется сделать так, чтобы изменения, произведенные на одном конце, были видны при доступе с другого конца. Однако структура функции try_pop() в листинге 6.6 не так уж сильно отличается от структуры pop() в простом свободном от блокировок стеке в листинге 7.2, поэтому можно с достаточными основаниями предположить, что и весь свободный от блокировок код будет схожим. Посмотрим, так ли это.

Если взять листинг 6.6 за основу, то нам понадобятся два указателя на node: один для головы списка (head), второй — для хвоста (tail). Поскольку мы собираемся обращаться к ним из нескольких потоков, то надо бы сделать эти указатели атомарными и расстаться с соответствующими мьютексами. Начнём с этого небольшого изменения и посмотрим, куда оно нас приведет. Результат показан в листинге ниже.

Листинг 7.13. Свободная от блокировок очередь с одним производителем и одним потребителем

template

class lock_free_queue {

private:

 struct node {

  std::shared_ptr data;

  node* next;

  node():

   next(nullptr) {}

 };

 std::atomic head;

 std::atomic tail;

 node* pop_head() {

  node* const old_head = head.load();

  if (old_head == tail.load()) {←(1)

   return nullptr;

  }

  head.store(old_head->next);

  return old_head;

 }

public:

 lock_free_queue():

  head(new node), tail(head.load()) {}

 lock_free_queue(const lock_free_queue& other) = delete;

 lock_free_queue& operator=(

  const lock_free_queue& other) = delete;

 ~lock_free_queue() {

  while(node* const old_head = head.load()) {

   head.store(old_head->next);

   delete old_head;

  }

 }

 std::shared_ptr pop() {

  node* old_head = pop_head();

  if (!old_head) {

   return std::shared_ptr();

  }

  std::shared_ptr const res(old_head->data);←(2)

  delete old_head;

  return res;

 }

 void push(T new_value) {

  std::shared_ptr new_data(std::make_shared(new_value));

  node* p = new node;                 ←(3)

  node* const old_tail = tail.load(); ←(4)

  old_tail->data.swap(new_data);      ←(5)

  old_tail->next = p;                 ←(6)

  tail.store(p);                      ←(7)

 }

};

На первый взгляд, неплохо, и если в каждый момент времени существует только один поток, вызывающий push(), и только один поток, вызывающий pop(), то вообще всё прекрасно. Важно отметить, что в этом случае существует отношение происходит-раньше между push() и pop(), благодаря которому извлечение данных безопасно. Сохранение tail (7) синхронизируется-с загрузкой tail (1), сохранение указателя на data в предыдущем узле (5) расположено перед сохранением tail, а загрузка tail расположена перед загрузкой указателя на data (2), поэтому сохранение data происходит раньше его загрузки, и всё замечательно. Таким образом, мы получили корректно обслуживаемую очередь с одним производителем и одним потребителем.

Перейти на страницу:

Похожие книги