29   Pthread_create(tid_produce[i], NULL, produce, count[i]);

30  }

31  Pthread_create(tid_consume, NULL, consume, NULL);

32  /* ожидание завершения всех производителей и потребителя */

33  for (i = 0; i nproducers; i++) {

34   Pthread_join(tid_produce[i], NULL);

35   printf("count[%d] = %d\n", i, count[i]);

36  }

37  Pthread_join(tid_consume, NULL);

38  Sem_destroy(shared.mutex);

39  Sem_destroy(shared.nempty);

40  Sem_destroy(shared.nstored);

41  exit(0);

42 }

Глобальные переменные

4 Глобальная переменная nitems хранит число элементов, которые должны быть совместно произведены. Переменная nproducers хранит число потоков-производителей. Оба эти значения устанавливаются с помощью аргументов командной строки.

Общая структура

5-10 В структуру shared добавляются два новых элемента: nput, обозначающий индекс следующего элемента, куда должен быть помещен объект (по модулю BUFF), и nputval следующее значение, которое будет помещено в буфер. Эти две переменные взяты из нашего решения в листингах 7.1 и 7.2. Они нужны для синхронизации нескольких потоков-производителей.

Новые аргументы командной строки

17-20 Два новых аргумента командной строки указывают полное количество элементов, которые должны быть помещены в буфер, и количество потоков-производителей. 

Запуск всех потоков

21-41 Инициализируем семафоры и запускаем потоки-производители и поток-потребитель. Затем ожидается завершение работы потоков. Эта часть кода практически идентична листингу 7.1.

В листинге 10.13 приведен текст функции produce, которая выполняется каждым потоком-производителем.

Листинг 10.13. Функция, выполняемая всеми потоками-производителями

//pxsem/prodcons3.c

43 void *

44 produce(void *arg)

45 {

46  for (;;) {

47   Sem_wait(shared.nempty); /* ожидание освобождения поля */

48   Sem_wait(shared.mutex);

49   if (shared.nput = nitems) {

50    Sem_post(shared.nempty);

51    Sem_post(shared.mutex);

52    return(NULL); /* готово */

53   }

54   shared.buff[shared.nput % NBUFF] = shared.nputval;

55   shared.nput++;

56   shared.nputval++;

57   Sem_post(shared.mutex);

58   Sem_post(shared.nstored); /* еще один элемент */

59   *((int *) arg) += 1;

60  }

61 }

Взаимное исключение между потоками-производителями

49-53 Отличие от листинга 10.8 в том, что цикл завершается, когда nitems объектов будет помещено в буфер всеми потоками. Обратите внимание, что потоки-производители могут получить семафор nempty в любой момент, но только один производитель может иметь семафор mutex. Это защищает переменные nput и nval от одновременного изменения несколькими производителями.

Завершение производителей

50-51 Нам нужно аккуратно обработать завершение потоков-производителей. После того как последний объект помещен в буфер, каждый поток выполняет

Sem_wait(shared.nempty); /* ожидание пустого поля */

в начале цикла, что уменьшает значение семафора nempty. Но прежде, чем поток будет завершен, он должен увеличить значение этого семафора, потому что он не помещает объект в буфер в последнем проходе цикла. Завершающий работу поток должен также освободить семафор mutex, чтобы другие производители смогли продолжить функционирование. Если мы не увеличим семафор nempty по завершении процесса и если производителей будет больше, чем мест в буфере, лишние потоки будут заблокированы навсегда, ожидая освобождения семафора nempty, и никогда не завершат свою работу.

Функция consume в листинге 10.14 проверяет правильность всех записей в буфере, выводя сообщение при обнаружении ошибки.

Листинг 10.14. Функция, выполняемая потоком-потребителем

//pxsem/prodcons3.с

62 void *

63 consume(void *arg)

64 {

65  int i;

66  for (i = 0; i nitems; i++) {

67   Sem_wait(shared.nstored); /* ожидание помещения по крайней мере одного элемента в буфер */

68   Sem_wait(shared.mutex);

69   if (shared.buff[i % NBUFF] != i)

Перейти на страницу:
Нет соединения с сервером, попробуйте зайти чуть позже