Чтобы вновь сделать код свободным от блокировок, нам нужно придумать, как ожидающий поток может продвигаться вперед, даже если поток, находящийся в push(), застрял. Один из способов — помочь застрявшему потоку, выполнив за него часть работы.
В данном случае мы точно знаем, что нужно сделать: указатель next в хвостовом узле требуется установить на новый фиктивный узел, и тогда сам указатель tail можно будет обновить. Все фиктивные узлы эквивалентны, поэтому не имеет значения, какой из них использовать — созданный потоком, который успешно поместил в очередь данные, или потоком, ожидающим входа в push(). Если сделать указатель next в узле атомарным, то для его установки можно будет применить compare_exchange_strong(). После того как указатель next установлен, в цикле по compare_exchange_weak() можно будет установить tail, проверяя при этом, указывает ли он по-прежнему на тот же самый исходный узел. Если это не так, значит, узел обновил какой-то другой поток, так что можно прекратить попытки и перейти в начало цикла. Реализация этой идеи потребует также небольшого изменения pop(), где нужно будет загрузить указатель next; эта модификация показана в листинге ниже.
Листинг 7.20. Модификация pop() с целью помочь при выполнении push()
template
class lock_free_queue {
private:
struct node {
std::atomic
std::atomic
std::atomic(1)
};
public:
std::unique_ptr
counted_node_ptr old_head =
head.load(std::memory_order_relaxed);
for (;;) {
increase_external_count(head, old_head);
node* const ptr = old_head.ptr;
if (ptr == tail.load().ptr) {
return std::unique_ptr
}
counted_node_ptr next = ptr->next.load();←(2)
if (head.compare_exchange_strong(old_head, next)) {
T* const res = ptr->data.exchange(nullptr);
free_external_counter(old_head);
return std::unique_ptr
}
ptr->release_ref();
}
}
};
Как я и говорил, изменения тривиальны: указатель next теперь атомарный (1), поэтому операция load в точке (2) атомарна. В данном примере мы используем упорядочение по умолчанию memory_order_seq_cst, поэтому явное обращение к load() можно было бы опустить, полагаясь на операцию загрузки в операторе неявного преобразования к типу counted_node_ptr, но явное обращение будет напоминать нам, куда впоследствии добавить явное задание порядка обращения к памяти.
Листинг 7.21. Пример реализации функции push(), освобождаемой от блокировок благодаря помощи извне
template
class lock_free_queue {
private:
void set_new_tail(counted_node_ptr &old_tail, ←(1)
counted_node_ptr const &new_tail) {
node* const current_tail_ptr = old_tail.ptr;
while (!tail.compare_exchange_weak(old_tail, new_tail) &&←(2)
old_tail.ptr == current_tail_ptr);
if (old_tail.ptr == current_tail_ptr)←(3)
free_external_counter(old_tail); ←(4)
else
current_tail_ptr->release_ref(); ←(5)
}
public:
void push(T new_value) {
std::unique_ptr
counted_node_ptr new_next;
new_next.ptr = new node;
new_next.external_count = 1;
counted_node_ptr old_tail = tail.load();
for (;;) {
increase_external_count(tail, old_tail);
T* old_data = nullptr;
if (old_tail.ptr->data.compare_exchange_strong( ←(6)
old_data, new_data.get())) {
counted_node_ptr old_next = {0};
if (!old_tail.ptr->next.compare_exchange_strong(←(7)
old_next, new_next)) {
delete new_next.ptr;←(8)
new_next = old_next;←(9)
}
set_new_tail(old_tail, new_next);
new_data.release();
break;