Мы почти закончили, осталось только рассмотреть функции, в которых используются операции fetch_add(), изменяющие счетчик ссылок. Поток, который добрался до возврата данных из узла, может продолжать в твердой уверенности, что никакой другой поток не сможет модифицировать хранящиеся в узле данные. Однако любой поток, который потерпел неудачу при извлечении данных, знает, что какой-то другой поток данные в узле модифицировал; он использовал функцию swap() для извлечения данных. Следовательно, чтобы предотвратить гонку за данными мы должны гарантировать, что swap() происходит-раньше delete. Чтобы добиться этого, проще всего задать семантику std::memory_order_release при вызове fetch_add() в ветви, где мы возвращаем данные, и семантику std::memory_order_acquire — в ветви, где мы возвращаемся в начало цикла. Однако даже это перебор — лишь один поток выполняет delete (тот, что сбросил счетчик в нуль), поэтому только этому потоку нужно выполнить операцию захвата. К счастью, поскольку fetch_add() — операция чтения-модификации-записи, то она составляет часть последовательности освобождений, поэтому для достижения цели нам достаточно дополнительной операции load(). Если в ветви, где происходит возврат в начало цикла, счетчик ссылок уменьшается до нуля, то здесь же можно перезагрузить счетчик ссылок с семантикой std::memory_order_acquire, чтобы обеспечить требуемое отношение синхронизируется-с, а в самой операции fetch_add() достаточно задать std::memory_order_relaxed. Окончательная реализация стека с новой версией pop() приведена ниже.

Листинг 7.12. Свободный от блокировок стек с подсчётом ссылок и ослабленными атомарными операциями

template

class lock_free_stack {

private:

 struct node;

 struct counted_node_ptr {

  int external_count;

  node* ptr;

 };

 struct node {

  std::shared_ptr data;

  std::atomic internal_count;

  counted_node_ptr next;

  node(T const& data_):

  data(std::make_shared(data_)), internal_count(0) {}

 };

 std::atomic head;

 void increase_head_count(counted_node_ptr& old_counter) {

  counted_node_ptr new_counter;

  do {

   new_counter = old_counter;

   ++new_counter.external_count;

  }

  while (!head.compare_exchange_strong(old_counter, new_counter,

         std::memory_order_acquire,

         std::memory_order_relaxed));

  old_counter.external_count = new_counter.external_count;

 }

public:

 ~lock_free_stack() {

  while(pop());

 }

 void push(T const& data) {

  counted_node_ptr new_node;

  new_node.ptr = new node(data);

  new_node.external_count = 1;

  new_node.ptr->next = head.load(std::memory_order_relaxed);

  while (!head.compare_exchange_weak(

           new_node.ptr->next, new_node,

           std::memory_order_release,

           std::memory_order_relaxed));

 }

 std::shared_ptr pop() {

  counted_node_ptr old_head =

   head.load(std::memory_order_relaxed);

  for (;;) {

   increase_head_count(old_head);

   node* const ptr = old_head.ptr;

   if (!ptr) {

    return std::shared_ptr();

   }

   if (head.compare_exchange_strong(old_head, ptr->next,

       std::memory_order_relaxed)) {

    std::shared_ptr res;

    res.swap(ptr->data);

    int const count_increase = old_head.external_count — 2;

    if (ptr->internal_count.fetch_add(count_increase,

        std::memory_order_release) == -count_increase) {

     delete ptr;

    }

    return res;

   }

   else if (ptr->internal_count.fetch_add(-1,

            std::memory_order_relaxed) == 1) {

    ptr->internal_count.load(std::memory_order_acquire);

    delete ptr;

   }

  }

 }

};

Перейти на страницу:

Похожие книги