Чтобы вновь сделать код свободным от блокировок, нам нужно придумать, как ожидающий поток может продвигаться вперед, даже если поток, находящийся в push(), застрял. Один из способов — помочь застрявшему потоку, выполнив за него часть работы.

В данном случае мы точно знаем, что нужно сделать: указатель next в хвостовом узле требуется установить на новый фиктивный узел, и тогда сам указатель tail можно будет обновить. Все фиктивные узлы эквивалентны, поэтому не имеет значения, какой из них использовать — созданный потоком, который успешно поместил в очередь данные, или потоком, ожидающим входа в push(). Если сделать указатель next в узле атомарным, то для его установки можно будет применить compare_exchange_strong(). После того как указатель next установлен, в цикле по compare_exchange_weak() можно будет установить tail, проверяя при этом, указывает ли он по-прежнему на тот же самый исходный узел. Если это не так, значит, узел обновил какой-то другой поток, так что можно прекратить попытки и перейти в начало цикла. Реализация этой идеи потребует также небольшого изменения pop(), где нужно будет загрузить указатель next; эта модификация показана в листинге ниже.

Листинг 7.20. Модификация pop() с целью помочь при выполнении push()

template

class lock_free_queue {

private:

 struct node {

  std::atomic data;

  std::atomic count;

  std::atomic next;←(1)

 };

public:

 std::unique_ptr pop() {

  counted_node_ptr old_head =

   head.load(std::memory_order_relaxed);

  for (;;) {

   increase_external_count(head, old_head);

   node* const ptr = old_head.ptr;

   if (ptr == tail.load().ptr) {

    return std::unique_ptr();

   }

   counted_node_ptr next = ptr->next.load();←(2)

   if (head.compare_exchange_strong(old_head, next)) {

    T* const res = ptr->data.exchange(nullptr);

    free_external_counter(old_head);

    return std::unique_ptr(res);

   }

   ptr->release_ref();

  }

 }

};

Как я и говорил, изменения тривиальны: указатель next теперь атомарный (1), поэтому операция load в точке (2) атомарна. В данном примере мы используем упорядочение по умолчанию memory_order_seq_cst, поэтому явное обращение к load() можно было бы опустить, полагаясь на операцию загрузки в операторе неявного преобразования к типу counted_node_ptr, но явное обращение будет напоминать нам, куда впоследствии добавить явное задание порядка обращения к памяти.

Листинг 7.21. Пример реализации функции push(), освобождаемой от блокировок благодаря помощи извне

template

class lock_free_queue {

private:

 void set_new_tail(counted_node_ptr &old_tail, ←(1)

                   counted_node_ptr const &new_tail) {

  node* const current_tail_ptr = old_tail.ptr;

  while (!tail.compare_exchange_weak(old_tail, new_tail) &&←(2)

         old_tail.ptr == current_tail_ptr);

  if (old_tail.ptr == current_tail_ptr)←(3)

   free_external_counter(old_tail);    ←(4)

  else

   current_tail_ptr->release_ref();    ←(5)

 }

public:

 void push(T new_value) {

  std::unique_ptr new_data(new T(new_value));

  counted_node_ptr new_next;

  new_next.ptr = new node;

  new_next.external_count = 1;

  counted_node_ptr old_tail = tail.load();

  for (;;) {

   increase_external_count(tail, old_tail);

   T* old_data = nullptr;

   if (old_tail.ptr->data.compare_exchange_strong(  ←(6)

    old_data, new_data.get())) {

    counted_node_ptr old_next = {0};

    if (!old_tail.ptr->next.compare_exchange_strong(←(7)

        old_next, new_next)) {

     delete new_next.ptr;←(8)

     new_next = old_next;←(9)

    }

    set_new_tail(old_tail, new_next);

    new_data.release();

    break;

Перейти на страницу:

Похожие книги