22  nproducers = min(atoi(argv[2]), MAXNTHREADS);

23  nconsumers = min(atoi(argv[3]), MAXNTHREADS);

24  /* инициализация трех семафоров */

25  Sem_init(shared.mutex, 0, 1);

26  Sem_init(shared.nempty, 0, NBUFF);

27  Sem_init(shared.nstored, 0, 0);

28  /* создание производителей и потребителей */

29  Set_concurrency(nproducers + nconsumers);

30  for (i = 0; i nproducers; i++) {

31   prodcount[i] = 0;

32   Pthread_create(tid_produce[i], NULL, produce, prodcount[i]);

33  }

34  for (i = 0; i nconsumers; i++) {

35   conscount[i] = 0;

36   Pthread_create(tid_consume[i], NULL, consume, conscount[i]);

37  }

38  /* ожидание завершения всех производителей и потребителей */

39  for (i = 0; i nproducers: i++) {

40   Pthread_join(tid_produce[i], NULL);

41   printf("producer count[%d] = %d\n", i, prodcount[i]);

42  }

43  for (i = 0; i nconsumers; i++) {

44   Pthread_join(tid_consume[i], NULL);

45   printf("consumer count[%d] = %d\n", i, conscount[i]);

46  }

47  Sem_destroy(shared.mutex);

48  Sem_destroy(shared.nempty);

49  Sem_destroy(shared.nstored);

50  exit(0);

51 }

Функция produce содержит одну новую строку по сравнению с листингом 10.13. В части кода, относящейся к завершению потока-производителя, появляется строка, отмеченная знаком +:

 if (shared.nput = nitems) {

+ Sem_post(shared.nstored); /* даем возможность потребителям завершить работу */

  Sem_post(shared.nempty);

  Sem_post(shared.mutex);

  return(NULL); /* готово */

 }

Снова нам нужно быть аккуратными при обработке завершения процессов-производителей и потребителей. После обработки всех объектов в буфере все потребители блокируются в вызове

Sem_wait(shared.nstored); /* Ожидание помещения объекта в буфер */

Производителям приходится увеличивать семафор nstored для разблокирования потрeбитeлeй, чтобы они узнали, что работа завершена. Функция consume приведена в листинге 10.17. 

Листинг 10.17. Функция, выполняемая всеми потоками-потребителями

//pxsem/prodcons4.c

72 void *

73 consume(void *arg)

74 {

75  int i;

76  for (;;) {

77   Sem_wait(shared.nstored); /* ожидание помещения объекта в буфер */

78   Sem_wait(shared.mutex);

79   if (shared.nget = nitems) {

80    Sem_post(shared.nstored);

81    Sem_post(shared.mutex);

82    return(NULL); /* готово */

83   }

84   i = shared.nget % NBUFF;

85   if (shared.buff[i] != shared.ngetval)

86    printf("error: buff[%d] = %d\n", i, shared.buff[i]);

87   shared.nget++;

88   shared.ngetval++;

89   Sem_post(shared.mutex);

90   Sem_post(shared.nempty); /* освобождается место для элемента */

91   *((int *) arg) += 1;

92  }

93 }

Завершение потоков-потребителей

79-83 Функция consume сравнивает nget и nitems, чтобы узнать, когда следует остановиться (аналогично функции produce). Обработав последний объект в буфере, потоки-потребители блокируются, ожидая изменения семафора nstored. Когда завершается очередной поток-потребитель, он увеличивает семафор nstored, давая возможность завершить работу другому потоку-потребителю.

<p>10.11. Несколько буферов</p>

Во многих программах, обрабатывающих какие-либо данные, можно встретить цикл вида

while ((n = read(fdin, buff, BUFFSIZE)) 0) {

Перейти на страницу:
Нет соединения с сервером, попробуйте зайти чуть позже