Мы немало потрудились, но наконец-то дошли до конца, и стек теперь стал куда лучше. За счет тщательно продуманного применения ослабленных операций нам удалось повысить производительность, не жертвуя корректностью. Как видите, реализация pop() теперь насчитывает 37 строк вместо 8 в эквивалентной реализации pop() для стека с блокировками (листинг 7.1) и 7 строк для простого свободного от блокировок стека без управления памятью (листинг 7.2). При рассмотрении свободной от блокировок очереди мы встретимся с аналогичной ситуацией: сложность кода в значительной степени обусловлена именно управлением памятью.
7.2.6. Потокобезопасная очередь без блокировок
Очередь отличается от стека прежде всего тем, что операции push() и pop() обращаются к разным частям структуры данных, тогда как в стеке та и другая работают с головным узлом списка. Следовательно, и проблемы синхронизации тоже другие. Требуется сделать так, чтобы изменения, произведенные на одном конце, были видны при доступе с другого конца. Однако структура функции try_pop() в листинге 6.6 не так уж сильно отличается от структуры pop() в простом свободном от блокировок стеке в листинге 7.2, поэтому можно с достаточными основаниями предположить, что и весь свободный от блокировок код будет схожим. Посмотрим, так ли это.
Если взять листинг 6.6 за основу, то нам понадобятся два указателя на node: один для головы списка (head), второй — для хвоста (tail). Поскольку мы собираемся обращаться к ним из нескольких потоков, то надо бы сделать эти указатели атомарными и расстаться с соответствующими мьютексами. Начнём с этого небольшого изменения и посмотрим, куда оно нас приведет. Результат показан в листинге ниже.
Листинг 7.13. Свободная от блокировок очередь с одним производителем и одним потребителем
template
class lock_free_queue {
private:
struct node {
std::shared_ptr
node* next;
node():
next(nullptr) {}
};
std::atomic
std::atomic
node* pop_head() {
node* const old_head = head.load();
if (old_head == tail.load()) {←(1)
return nullptr;
}
head.store(old_head->next);
return old_head;
}
public:
lock_free_queue():
head(new node), tail(head.load()) {}
lock_free_queue(const lock_free_queue& other) = delete;
lock_free_queue& operator=(
const lock_free_queue& other) = delete;
~lock_free_queue() {
while(node* const old_head = head.load()) {
head.store(old_head->next);
delete old_head;
}
}
std::shared_ptr
node* old_head = pop_head();
if (!old_head) {
return std::shared_ptr
}
std::shared_ptr(2)
delete old_head;
return res;
}
void push(T new_value) {
std::shared_ptr
node* p = new node; ←(3)
node* const old_tail = tail.load(); ←(4)
old_tail->data.swap(new_data); ←(5)
old_tail->next = p; ←(6)
tail.store(p); ←(7)
}
};
На первый взгляд, неплохо, и если в каждый момент времени существует только один поток, вызывающий push(), и только один поток, вызывающий pop(), то вообще всё прекрасно. Важно отметить, что в этом случае существует отношение происходит-раньше между push() и pop(), благодаря которому извлечение данных безопасно. Сохранение tail (7) синхронизируется-с загрузкой tail (1), сохранение указателя на data в предыдущем узле (5) расположено перед сохранением tail, а загрузка tail расположена перед загрузкой указателя на data (2), поэтому сохранение data происходит раньше его загрузки, и всё замечательно. Таким образом, мы получили корректно обслуживаемую очередь с