7.2.1. Потокобезопасный стек без блокировок
Основное свойство стека понятно: элементы извлекаются в порядке, обратном тому, в котором помещались — последним пришёл, первым ушел (LIFO). Поэтому важно убедиться, что после добавления значения в стек оно может быть сразу же безопасно извлечено другим потоком и что только один поток получает данное значение. Простейшая реализация стека основана на связанном списке; указатель head направлен на первый узел (который будет извлечен следующим), и каждый узел указывает на следующий в списке. При такой схеме добавление узла реализуется просто.
1. Создать новый узел.
2. Записать в его указатель next текущее значение head.
3. Записать в head указатель на новый узел.
Все это прекрасно работает в однопоточной программе, но, когда стек могут модифицировать сразу несколько потоков, этого недостаточно. Существенно, что если узлы добавляют два потока, то между шагами 2 и 3 возможна гонка: второй поток может модифицировать значение head после того, как первый прочитает его на шаге 2, но до изменения на шаге 3. В таком случае изменения, произведенные вторым потоком, будут отброшены или случится еще что-нибудь похуже. Прежде чем решать эту проблему, следует отметить, что после того, как указатель head будет изменен и станет указывать на новый узел, этот узел может быть прочитан другим потоком. Поэтому крайне важно, чтобы новый узел был аккуратно подготовлен head; потом изменять узел уже нельзя.
Ну хорошо, а как все-таки быть с этим неприятным состоянием гонки? Ответ таков — использовать атомарную операцию сравнить-и-обменять на шаге 3, гарантирующую, что head не был модифицирован с момента чтения на шаге 2. Если был, то следует вернуться в начало цикла и повторить. В листинге ниже показано, как можно реализовать потокобезопасную функцию push() без блокировок.
Листинг 7.2. Реализация функции push() без блокировок
template
class lock_free_stack {
private:
struct node {
T data;
node* next;
node(T const& data_) : ←(1)
data(data_) {}
};
std::atomic
public:
void push(T const& data) {
node* const new_node = new node(data);←(2)
new_node->next = head.load(); ←(3)
while (!head.compare_exchange_weak(
new_node->next, new_node)); ←(4)
}
};
В этом коде дотошно реализованы все три пункта изложенного выше плана: создать новый узел (2), записать в его поле next текущее значение head (3) и записать в head указатель на новый узел (4). Заполнив данные самой структуры node в конструкторе (1), мы гарантируем, что узел готов к использованию сразу после конструирования, так что легкая проблема решена. Затем мы вызываем функцию compare_exchange_weak(), которая проверяет, что указатель head по-прежнему содержит то значение, которое было сохранено в new_node->next (3), и, если это так, то записывает его в new_node. В этой части программы используется также полезное свойство сравнения с обменом: если функция возвращает false, означающее, что сравнение не прошло (например, потому что значение head было изменено другим потоком), то в переменную, которая передана в первом параметре (new_node->next) записывается текущее значение head. Поэтому нам не нужно перезагружать head на каждой итерации цикла — это сделает за нас компилятор. Кроме того, поскольку мы сразу переходим в начало цикла в случае неудачного сравнения, можно использовать функцию compare_exchange_weak, которая в некоторых архитектурах дает более оптимальный код, чем compare_exchange_strong (см. главу 5).
Итак, операции pop() у нас пока еще нет, но уже можно сверить реализацию push() с рекомендациями. Единственное место, где возможны исключения, — конструирование нового узла (1), но здесь все будет подчищено автоматически, и, поскольку список еще не модифицирован, то опасности нет. Поскольку мы сами строим данные, сохраняемые в узле node, и используем compare_exchange_weak() для обновления указателя head, то проблематичных состояний гонки здесь нет. Если операция сравнения с обменом завершилась успешно, то узел находится в списке, и его можно извлекать. Так как нет никаких блокировок, то нет и возможности взаимоблокировки, и, стало быть, функция push() успешно сдала экзамен.
Теперь, когда у нас есть средства для добавления данных в стек, надо научиться их извлекать обратно. На первый взгляд, тут всё просто.
1. Прочитать текущее значение head.
2. Прочитать head->next.
3. Записать в head значение head->next.