31  for (i = 0; i nthreads; i++) {

32   Pthread_join(tid_produce[i], NULL);

33   printf("count[%d] = %d\n", i, count[i]);

34  }

35  Pthread_join(tid_consume, NULL);

36  exit(0);

37 }

24 Мы увеличиваем уровень параллельного выполнения на единицу, чтобы учесть поток-потребитель, выполняемый параллельно с производителями.

25-29 Поток-потребитель создается сразу же после создания потоков-производителей.

Функция produce по сравнению с листингом 7.2 не изменяется. В листинге 7.4 приведен текст функции consume, вызывающей новую функцию consume_wait. 

Листинг 7.4. Функции consume и consume_wait

//mutex/prodcons3.с

54 void

55 consume wait(int i)

56 {

57  for (;;) {

58   Pthread_mutex_lock(shared.mutex);

59   if (i shared.nput) {

60    Pthread_mutex_unlock(shared.mutex);

61    return; /* элемент готов */

62   }

63   Pthread_mutex_unlock(shared.mutex);

64  }

65 }

66 void *

67 consume(void *arg)

68 {

69  int i;

70  for (i = 0; i nitems; i++) {

71   consume_wait(i);

72   if (shared.buff[i] != i)

73    printf("buff[%d] = %d\n", i, shared.buff[i]);

74  }

75  return(NULL);

76 }

Потребитель должен ждать

71 Единственное изменение в функции consume заключается в добавлении вызова consume_wait перед обработкой следующего элемента массива.

Ожидание производителей

57-64 Наша функция consume_wait должна ждать, пока производители не создадут i-й элемент. Для проверки этого условия производится блокировка взаимного исключения и значение i сравнивается с индексом производителя nput. Блокировка необходима, поскольку значение nput может быть изменено одним из производителей в момент его проверки.

Главная проблема — что делать, если нужный элемент еще не готов. Все, что нам остается и что мы делаем в листинге 7.4, — это повторять операции в цикле, устанавливая и снимая блокировку и проверяя значение индекса. Это называется опросом (spinning или polling) и является лишней тратой времени процессора.

Мы могли бы приостановить выполнение процесса на некоторое время, но мы не знаем, на какое. Что нам действительно нужно — это использовать какое-то другое средство синхронизации, позволяющее потоку или процессу приостанавливать работу, пока не произойдет какое-либо событие.

<p>7.5. Условные переменные: ожидание и сигнализация</p>

Взаимное исключение используется для блокирования, а условная переменная — для ожидания. Это два различных средства синхронизации, и оба они нужны. Условная переменная представляет собой переменную типа pthread_cond_t. Для работы с такими переменными предназначены две функции:

#include pthread.h

int pthread_cond_wait(pthread_cond_t *cptr, pthread_m_tex_t *mptr);

int pthread_cond_signal(pthread_cond_t *cptr);

/* Обе функции возвращают 0 в случае успешного завершения, положительное значение Еххх – в случае ошибки */

Слово signal в имени второй функции не имеет никакого отношения к сигналам Unix SIGxxx.

Мы определяем условие, уведомления о выполнении которого будем ожидать.

Взаимное исключение всегда связывается с условной переменной. При вызове pthread_cond_wait для ожидания выполнения какого-либо условия мы указываем адрес условной переменной и адрес связанного с ней взаимного исключения.

Мы проиллюстрируем использование условных переменных, переписав пример из предыдущего раздела. В листинге 7.5 объявляются глобальные переменные.

Переменные производителя и взаимное исключение объединяются в структуру

7-13 Две переменные nput и rival ассоциируются с mutex, и мы объединяем их в структуру с именем put. Эта структура используется производителями.

14-20 Другая структура, nready, содержит счетчик, условную переменную и взаимное исключение. Мы инициализируем условную переменную с помощью PTHREAD_ COND_INITIALIZER.

Функция main по сравнению с листингом 7.3 не изменяется.

Листинг 7.5. Глобальные переменные: использование условной переменной

//mutex/prodcons6.c

1  #include "unpipc.h"

2  #define MAXNITEMS 1000000

3  #define MAXNTHREADS 100

4  /* глобальные переменные для всех потоков */

Перейти на страницу:
Нет соединения с сервером, попробуйте зайти чуть позже