Одна из классических задач на синхронизацию называется задачей производителя и потребителя. Она также известна как задача ограниченного буфера. Один или несколько производителей (потоков или процессов) создают данные, которые обрабатываются одним или несколькими потребителями. Эти данные передаются между производителями и потребителями с помощью одной из форм IPC.

С этой задачей мы регулярно сталкиваемся при использовании каналов Unix. Команда интерпретатора, использующая канал

grep pattern chapters.* | wc -l

является примером такой задачи. Программа grep выступает как производитель (единственный), a wc — как потребитель (тоже единственный). Канал используется как форма IPC. Требуемая синхронизация между производителем и потребителем обеспечивается ядром, обрабатывающим команды write производителя и read покупателя. Если производитель опережает потребителя (канал переполняется), ядро приостанавливает производителя при вызове write, пока в канале не появится место. Если потребитель опережает производителя (канал опустошается), ядро приостанавливает потребителя при вызове read, пока в канале не появятся данные.

Такой тип синхронизации называется неявным; производитель и потребитель не знают о том, что синхронизация вообще осуществляется. Если бы мы использовали очередь сообщений Posix или System V в качестве средства IPC между производителем и потребителем, ядро снова взяло бы на себя обеспечение синхронизации.

При использовании разделяемой памяти как средства IPC производителя и потребителя, однако, требуется использование какого-либо вида явной синхронизации. Мы продемонстрируем это на использовании взаимного исключения. Схема рассматриваемого примера изображена на рис. 7.1.

В одном процессе у нас имеется несколько потоков-производителей и один поток-потребитель. Целочисленный массив buff содержит производимые и потребляемые данные (данные совместного пользования). Для простоты производители просто устанавливают значение buff[0] в 0, buff [1] в 1 и т.д. Потребитель перебирает элементы массива, проверяя правильность записей.

В этом первом примере мы концентрируем внимание на синхронизации между отдельными потоками-производителями. Поток-потребитель не будет запущен, пока все производители не завершат свою работу. В листинге 7.1 приведена функция main нашего примера.

Рис. 7.1. Производители и потребитель

Листинг 7.1.[1] Функция main

//mutex/prodcons2.с

1  #include "unpipc.h"

2  #define MAXNITEMS 1000000

3  #define MAXNTHREADS 100

4  int nitems; /* только для чтения потребителем и производителем */

5  struct {

6   pthread_mutex_t mutex;

7   int buff[MAXNITEMS];

8   int nput;

9   int nval;

10 } shared = {

11  PTHREAD_MUTEX_INITIALIZER

12 };

13 void *produce(void *), *consume(void *);

14 int

15 main(int argc, char **argv)

16 {

17  int i, nthreads, count[MAXNTHREADS];

18  pthread_t tid_produce[MAXNTHREADS], tid_consume;

19  if (argc != 3)

20   err_quit("usage: prodcons2 #items #threads");

21  nitems = min(atoi(argv[1]), MAXNITEMS);

22  nthreads = min(atoi(argv[2]), MAXNTHREADS);

23  Set_concurrency(nthreads);

24  /* запуск всех потоков-производителей */

25  for (i = 0; i nthreads; i++) {

26   count[i] = 0;

27   Pthread_create(tid_produce[i], NULL, produce, count[i]);

28  }

29  /* ожидание завершения всех производителей */

30  for (i = 0; i nthreads; i++) {

31   Pthread_join(tid_produce[i], NULL);

32   printf("count[%d] = %d\n", i, count[i]);

33  }

34  /* запуск и ожидание завершения потока-потребителя */

35  Pthread_create(tid_consume, NULL, consume, NULL);

36  Pthread_join(tid_consume, NULL);

37  exit(0);

38 }

Совместное использование глобальных переменных потоками
Перейти на страницу:
Нет соединения с сервером, попробуйте зайти чуть позже