В многопоточной обработке самое сложное — гарантировать сериализованный доступ к ресурсам, потому что если это сделано неправильно, отладка становится кошмаром. Поскольку многопоточная программа по своей сути недетерминирована (так как потоки могут выполняться в различной очередности и с различными квантами времени при каждом новом выполнении программы), очень трудно точно обнаружить место и способ ошибочной модификации чего-либо. Здесь еще в большей степени, чем в однопоточном программировании, надежный проект позволяет минимизировать затраты на отладку и переработку.

<p>12.3. Уведомление одного потока другим</p>Проблема

Используется шаблон, в котором один поток (или группа потоков) выполняет какие-то действия, и требуется сделать так, чтобы об этом узнал другой поток (или группа потоков). Может использоваться главный поток, который передает работу подчиненным потокам, или может использоваться одна группа потоков для пополнения очереди и другая для удаления данных из очереди и выполнения чего-либо полезного.

Решение

Используйте объекты mutex и condition, которые объявлены в boost/thread/mutex.hpp и boost/thread/condition.hpp. Можно создать условие (condition) для каждой ожидаемой потоками ситуации и при возникновении такой ситуации уведомлять все ее ожидающие потоки. Пример 12.4 показывает, как можно обеспечить передачу уведомлений в модели потоков «главный/подчиненные».

Пример 12.4. Передача уведомлений между потоками

#include

#include

#include

#include

#include

#include

class Request { /*...*/ };

// Простой класс очереди заданий; в реальной программе вместо этого класса

// используйте std::queue

template

class JobQueue {

public:

 JobQueue() {}

 ~JobQueue() {}

 void submitJob(const T& x) {

  boost::mutex::scoped_lock lock(mutex_);

  list_.push_back(x);

  workToBeDone_.notify_one();

 }

 T getJob() {

  boost::mutex::scoped_lock lock(mutex_);

  workToBeDone_.wait(lock); // Ждать удовлетворения этого условия, затем

                            // блокировать мьютекс

  T tmp = list_.front();

  list_.pop_front();

  return(tmp);

 }

private:

 std::list list_;

 boost::mutex mutex_;

 boost::condition workToBeDone_;

};

JobQueue myJobQueue;

void boss() {

 for (;;) {

  // Получить откуда-то запрос

  Request req;

  myJobQueue.submitJob(req);

 }

}

void worker() {

 for (;;) {

  Request r(myJobQueue.getJob());

  // Выполнить какие-то действия с заданием...

 }

}

int main() {

 boost::thread thr1(boss);

 boost::thread thr2(worker);

 boost::thread thr3(worker);

 thr1.join();

 thr2.join();

 thr3.join();

}

Обсуждение

Объект условия использует мьютекс mutex и позволяет дождаться ситуации, когда он становится заблокированным. Рассмотрим пример 12.4, в котором представлена модифицированная версии класса Queue из примера 12.2. Я модифицировал очередь Queue, получая более специализированную очередь, а именно JobQueue, объекты которой являются заданиями, поступающими в очередь со стороны главного потока и обрабатываемыми подчиненными потоками.

Самое важное изменение класса JobQueue связано переменной-членом workToBeDone_ типа condition. Эта переменная показывает, имеется или нет задание в очереди. Когда потоку требуется получить элемент из очереди, он вызывает функцию getJob, которая пытается захватить мьютекс и затем дожидаться возникновения новой ситуации, что реализуют следующие строки.

boost::mutex::scoped_lock lock(mutex_);

workToBeDone_.wait(lock);

Первая строка блокирует мьютекс обычным образом. Вторая строка разблокирует мьютекс и переводит его в состояние ожидания или в неактивное состояние до тех пор, пока не будет удовлетворено условие. Разблокирование мьютекса позволяет другим потокам использовать этот мьютекс; один из них должен установить ожидаемое условие, в противном случае другие потоки не смогут блокировать мьютекс, пока один поток ожидает возникновения необходимого условия.

Перейти на страницу:

Похожие книги