Ранее отмечалось, что функция pthread_join() может быть задействована только для присоединения конкретных потоков. Она не позволяет присоединить любой завершающийся поток. Теперь будет показано, как обойти это ограничение с помощью условной переменной.

Программа из листинга 30.4 создает по одному потоку для каждого аргумента командной строки. Каждый поток ждет определенное количество секунд, указанное с помощью соответствующего аргумента, и затем завершает работу. Задавая временные интервалы, мы можем имитировать поток, который работает на протяжении какого-то времени.

Программа содержит набор глобальных переменных, в которые записывается информация обо всех создаваемых потоках. Каждому из них соответствует элемент в глобальном массиве thread, содержащем идентификатор потока (поле tid) и его текущее состояние (поле state). Поле state может принимать одно из следующих значений: TS_ALIVE (означает, что поток выполняется), TS_TERMINATED (означает, что поток завершил работу, но еще не был присоединен) и TS_JOINED (означает, что поток завершился и был присоединен).

Во время завершения каждый поток присваивает полю state в соответствующем элементе массива thread значение TS_TERMINATED, инкрементирует глобальный счетчик завершенных, но еще не присоединенных потоков (numUnjoined) и уведомляет условную переменную threadDied.

Главный поток содержит цикл, который постоянно ждет изменения условной переменной threadDied. Каждый раз, когда эта переменная получает уведомление, главный поток сканирует массив thread в поисках элементов с полем state, равным TS_TERMINATED. Для каждого потока с этим состоянием (который определяется с помощью поля tid) вызывается функция pthread_join(), после чего полю state присваивается значение TS_JOINED. Главный цикл завершается, когда все созданные потоки завершили работу — то есть когда глобальная переменная numLive равна 0.

Сессия выполнения программы из листинга 30.4 показана ниже:

$ ./thread_multijoin 1 1 2 3 3 Создаем пять потоков

Thread 0 terminating

Thread 1 terminating

Reaped thread 0 (numLive=4)

Reaped thread 1 (numLive=3)

Thread 2 terminating

Reaped thread 2 (numLive=2)

Thread 3 terminating

Thread 4 terminating

Reaped thread 3 (numLive=1)

Reaped thread 4 (numLive=0)

Наконец, стоит отметить, что, хоть в данном примере все потоки создаются присоединяемыми и при завершении работы сразу же утилизируются функцией pthread_join(), нам не обязательно использовать этот подход для отслеживания завершающихся потоков. Мы могли бы сделать потоки отсоединенными, избавиться от функции pthread_join() и просто записывать сведения о завершении потоков в массив thread (и соответствующие глобальные переменные).

Листинг 30.4. Главный поток, способный присоединить любой другой завершающийся поток

threads/thread_multijoin.c

#include

#include "tlpi_hdr.h"

static pthread_cond_t threadDied = PTHREAD_COND_INITIALIZER;

static pthread_mutex_t threadMutex = PTHREAD_MUTEX_INITIALIZER;

/* Защищает все глобальные переменные, указанные ниже */

static int totThreads = 0; /* Общее количество созданных потоков */

static int numLive = 0; /* Общее количество созданных выполняемых

или уже завершенных потоков, которые еще

не были присоединены */

static int numUnjoined = 0; /* Количество завершенных потоков, которые

еще не были присоединены */

enum tstate { /* Состояния потоков */

TS_ALIVE, /* Поток выполняется */

TS_TERMINATED, /* Поток завершен, но еще не присоединен */

TS_JOINED /* Поток завершен и присоединен */

};

static struct { /* Данные о каждом потоке */

pthread_t tid; /* Идентификатор этого потока */

enum tstate state; /* Состояние потока (константы TS_*, указанные выше */

int sleepTime; /* Количество секунд, оставшихся до завершения */

} *thread;

static void * /* Начальная функция потока */

threadFunc(void *arg)

{

int idx = (int) arg;

int s;

sleep(thread[idx].sleepTime); /* Имитируем некую работу */

printf("Thread %d terminating\n", idx);

s = pthread_mutex_lock(&threadMutex);

if (s!= 0)

errExitEN(s, "pthread_mutex_lock");

numUnjoined++;

thread[idx].state = TS_TERMINATED;

s = pthread_mutex_unlock(&threadMutex);

if (s!= 0)

errExitEN(s, "pthread_mutex_unlock");

s = pthread_cond_signal(&threadDied);

if (s!= 0)

errExitEN(s, "pthread_cond_signal");

return NULL;

}

int

main(int argc, char *argv[])

{

int s, idx;

if (argc < 2 || strcmp(argv[1], "-help") == 0)

usageErr("%s num-secs…\n", argv[0]);

thread = calloc(argc — 1, sizeof(*thread));

if (thread == NULL)

errExit("calloc");

/* Создаем все потоки */

for (idx = 0; idx < argc — 1; idx++) {

thread[idx].sleepTime = getInt(argv[idx + 1], GN_NONNEG, NULL);

thread[idx].state = TS_ALIVE;

s = pthread_create(&thread[idx].tid, NULL,

threadFunc, (void *) idx);

if (s!= 0)

errExitEN(s, "pthread_create");

}

totThreads = argc — 1;

numLive = totThreads;

/* Присоединяем завершенные потоки */

while (numLive > 0) {

s = pthread_mutex_lock(&threadMutex);

if (s!= 0)

errExitEN(s, "pthread_mutex_lock");

Перейти на страницу:

Похожие книги