#include  /* Для определения O_NONBLOCK */

#include "tlpi_hdr.h"

#define NOTIFY_SIG SIGUSR1

static void

handler(int sig)

{

/* Просто прерываем вызов sigsuspend() */

}

int

main(int argc, char *argv[])

{

struct sigevent sev;

mqd_t mqd;

struct mq_attr attr;

void *buffer;

ssize_t numRead;

sigset_t blockMask, emptyMask;

struct sigaction sa;

if (argc!= 2 || strcmp(argv[1], "-help") == 0)

usageErr("%s mq-name\n", argv[0]);

mqd = mq_open(argv[1], O_RDONLY | O_NONBLOCK);

if (mqd == (mqd_t) -1)

errExit("mq_open");

if (mq_getattr(mqd, &attr) == -1)

errExit("mq_getattr");

buffer = malloc(attr.mq_msgsize);

if (buffer == NULL)

errExit("malloc");

sigemptyset(&blockMask);

sigaddset(&blockMask, NOTIFY_SIG);

if (sigprocmask(SIG_BLOCK, &blockMask, NULL) == -1)

errExit("sigprocmask");

sigemptyset(&sa.sa_mask);

sa.sa_flags = 0;

sa.sa_handler = handler;

if (sigaction(NOTIFY_SIG, &sa, NULL) == -1)

errExit("sigaction");

sev.sigev_notify = SIGEV_SIGNAL;

sev.sigev_signo = NOTIFY_SIG;

if (mq_notify(mqd, &sev) == -1)

errExit("mq_notify");

sigemptyset(&emptyMask);

for (;;) {

 sigsuspend(&emptyMask); /* Ждем сигнала-оповещения */

 if (mq_notify(mqd, &sev) == -1)

errExit("mq_notify");

 while ((numRead = mq_receive(mqd, buffer, attr.mq_msgsize, NULL)) >= 0)

printf("Read %ld bytes\n", (long) numRead);

if (errno!= EAGAIN) /* Непредвиденная ошибка */

errExit("mq_receive");

}

}

pmsg/mq_notify_sig.c

Ряд аспектов программы из листинга 48.6 заслуживает дополнительного внимания.

• Вместо использования вызова pause() мы блокируем сигнал и ждем его с помощью вызова sigsuspend(); так можно исключить возможность потери сигнала, который доставляется, пока программа занимается другими делами (то есть когда она не заблокирована в ожидании сигнала) в цикле for. Если бы мы задействовали для этого функцию pause(), то следующий ее вызов был бы заблокирован, несмотря на уже доставленный сигнал.

• Очередь открывается в неблокирующем режиме; при поступлении оповещения мы применяем цикл while для прочтения сообщения из очереди. Опустошение очереди таким способом гарантирует, что процесс будет оповещен при появлении нового сообщения. Использование неблокирующего режима означает, что при опустошении очереди цикл while завершится неудачей (вызов mq_receive() вернет ошибку EAGAIN). Такой подход аналогичен применению неблокирующего ввода/вывода в сочетании с оповещениями, срабатывающими при готовности (о чем мы поговорим в подразделе 59.1.1), и применяется по схожим причинам.

• Важно, чтобы в цикле for подписка на оповещение произошла до того, как из очереди будут прочитаны все сообщения. В противном случае может произойти следующая цепочка событий: все сообщения в очереди прочитаны и цикл while завершается; новое сообщение появляется в очереди; вызывается метод mq_notify(), который создает подписку на соответствующее оповещение. В результате не будет сгенерировано никакого дополнительного сигнала, поскольку очередь ни разу не была пустой. Следовательно, при следующем вызове sigsuspend() программа продолжит оставаться заблокированной.

48.6.2. Получение уведомлений в отдельном потоке

В листинге 48.7 показан пример оповещения с помощью потоков. В этой программе использован целый ряд архитектурных решений, которые применяются в листинге 48.6:

• при появлении оповещения программа, прежде чем опустошить очередь, возобновляет свою подписку ;

• применяется неблокирующий режим; это позволяет полностью опустошить очередь после получения оповещения, не блокируя программу .

Листинг 48.7. Получение оповещений с помощью потока

pmsg/mq_notify_thread.c

#include

#include

#include  /* Для определения O_NONBLOCK */

#include "tlpi_hdr.h"

static void notifySetup(mqd_t *mqdp);

static void /* Функция оповещения потока */

threadFunc(union sigval sv)

{

ssize_t numRead;

mqd_t *mqdp;

void *buffer;

struct mq_attr attr;

mqdp = sv.sival_ptr;

if (mq_getattr(*mqdp, &attr) == -1)

errExit("mq_getattr");

buffer = malloc(attr.mq_msgsize);

if (buffer == NULL)

errExit("malloc");

notifySetup(mqdp);

while ((numRead = mq_receive(*mqdp, buffer, attr.mq_msgsize,

NULL)) >= 0)

printf("Read %ld bytes\n", (long) numRead);

if (errno!= EAGAIN) /* Непредвиденная ошибка */

errExit("mq_receive");

free(buffer);

pthread_exit(NULL);

}

static void

notifySetup(mqd_t *mqdp)

{

struct sigevent sev;

sev.sigev_notify = SIGEV_THREAD; /* Оповещаем через поток */

sev.sigev_notify_function = threadFunc;

sev.sigev_notify_attributes = NULL;

/* Может быть указателем на структуру pthread_attr_t */

sev.sigev_value.sival_ptr = mqdp; /* Аргумент функции threadFunc() */

if (mq_notify(*mqdp, &sev) == -1)

errExit("mq_notify");

}

int

main(int argc, char *argv[])

{

mqd_t mqd;

if (argc!= 2 || strcmp(argv[1], "-help") == 0)

usageErr("%s mq-name\n", argv[0]);

mqd = mq_open(argv[1], O_RDONLY | O_NONBLOCK);

if (mqd == (mqd_t) -1)

errExit("mq_open");

notifySetup(&mqd);

pause(); /* Ждем оповещений через функцию потока */

}

Перейти на страницу:

Похожие книги