Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

How to use pipes and message queues under Linux

2025-01-30 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Servers >

Share

Shulou(Shulou.com)06/01 Report--

This article will explain in detail how to use pipes and message queues under Linux. The editor thinks it is very practical, so I share it with you as a reference. I hope you can get something after reading this article.

One of the core goals of POSIX is thread safety.

Take a look at the man pages of some mq_open functions, which belong to the API of the memory queue. The section on features in this man page comes with a small table:

Interface property value mq_open () Thread safe MT-Safe

The above MT-Safe (MT refers to multithreaded multi-threaded) means that the mq_open function is thread-safe, which in turn implies that it is process-safe: the execution of a process is similar to that of one of its threads, and if a race condition does not occur in a thread in the same process, then such a condition will not occur in a thread in a different process. The MT-Safe feature ensures that there are no race conditions when calling mq_open. In general, channel-based IPC is concurrency secure, although there is a warning note in the following example.

Unnamed pipeline

First, let's use a specially constructed command-line example to show how unnamed pipes work. In all modern systems, the symbol | represents an unnamed pipe on the command line. Assuming our command line prompt is%, consider the following command:

# # the writer is on the left and the reader is on the right.% sleep 5 | echo "Hello, world!"

Sleep and echo programs execute in different processes, and unnamed pipes allow them to communicate. But the above example is deliberately designed so that no communication occurs. Greeting "Hello, world!" It appears on the screen, and then five seconds later, the command line returns, implying that both the sleep and echo processes are over. What happened in the meantime?

In the syntax of the vertical bar | on the command line, the process on the left (sleep) is the writer and the process on the right (echo) is the reader. By default, the reader will block until the byte data can be read from the channel, and after the writer has finished writing its byte data, it will send the end-of-stream sign that the stream has been terminated. Even if the writer terminates prematurely, a flag that the stream has been terminated is sent to the reader. The unnamed pipe will remain until both the writer and the reader stop

In the above example, the sleep process does not write any bytes of data to the channel, but terminates after 5 seconds, when a flag is sent to the channel that the stream has been terminated. At the same time, the echo process immediately writes a greeting to standard output (screen), because the process does not read any bytes from the channel, so it does not wait. Once the sleep and echo processes are terminated, the unnamed pipe that will no longer be used for communication disappears and returns to the command line prompt.

The following more practical example uses two unnamed pipes. Let's assume that the file test.dat is as follows:

Thisisthewaytheworldends

The following command:

% cat test.dat | sort | uniq

The output of the cat (abbreviation for connection concatenate) process is piped to the sort process to generate the sorted output, and then the sorted output is piped to the uniq process to eliminate duplicate records (in this case, the twice-occurring "the" is reduced to one):

Endsisthethiswayworld

The scenario shown below shows a program with two processes communicating through an unnamed pipeline.

Example 1. Two processes communicate through an unnamed pipe # include / * wait * / # include # include / * exit functions * / # include / * read, write, pipe, _ exit * / # include # define ReadEnd 0#define WriteEnd 1 void report_and_exit (const char* msg) {[perror] [6] (msg); [exit] [7] (- 1); / * failure * * /} int main () {int pipeFDs [2] / * two file descriptors * / char buf; / * 1-byte buffer * / const char* msg = "Nature's first green is gold\ n"; / * bytes to write * / if (pipe (pipeFDs)

< 0) report_and_exit("pipeFD"); pid_t cpid = fork(); /* fork a child process */ if (cpid < 0) report_and_exit("fork"); /* check for failure */ if (0 == cpid) { /*** child ***/ /* child process */ close(pipeFDs[WriteEnd]); /* child reads, doesn't write */ while (read(pipeFDs[ReadEnd], &buf, 1) >

0) / * read until end of byte stream * / write (STDOUT_FILENO, & buf, sizeof (buf)); / * echo to the standard output * / close (pipeFDs [ReadEnd]); / * close the ReadEnd: all done * / _ exit (0) / * exit and notify parent at once * /} else {/ * parent * * / close (pipeFDs [ReadEnd]); / * parent writes, doesn't read * / write (pipeFDs [WriteEnd], msg, [strlen] [8] (msg)) / * write the bytes to the pipe * / close (pipeFDs [WriteEnd]); / * done writing: generate eof * / wait (NULL); / * wait for child to exit * / [exit] [7] (0); / * exit normally * /} return 0;}

The above program named pipeUN uses the system function fork to create a process. Although this program has only a single source file, a multi-process situation will occur if it is executed correctly.

The following is a brief review of how the library function fork works:

The fork function is called by the parent process and returns-1 to the parent process on failure. In the case of pipeUN, the corresponding call is:

The return value after the function call is also saved. In this example, it is saved in the variable cpid of integer type pid_t. Each process has its own process ID, which is a non-negative integer used to mark the process. Replicating a new process may fail for a variety of reasons, including the process table is full, and this structure is maintained by the system to track the state of the process. To be clear, if the zombie process is not disposed of, it may cause the process table to be filled up.

Pid_t cpid = fork (); / * called in parent * /

If the fork call succeeds, it creates a new child process, returns a value to the parent process and another value to the child process. Both the parent and child processes execute the same code after calling fork. The child process inherits copies of all variables declared in the parent process so far. In particular, a successful fork call returns something like the following:

Returns 0 to the child process

Returns the process ID of the child process to the parent process

After a successful fork call, an if/else or equivalent structure will be used to isolate code against parent and child processes. In this example, the corresponding declaration is:

If (0 = = cpid) {/ * * child * * /

...

}

Else {/ * parent * * /

...

}

If a child process is successfully copied, the pipeUN program will execute as follows. In a sequence of integers:

Int pipeFDs [2]; / * two file descriptors * /

To save two file descriptors, one to write to the pipe and the other to write from the pipe. (the array element pipeFDs [0] is the file descriptor on the reader side, and the element pipeFDs [1] is the file descriptor on the write side. Before calling fork, a successful call to the system pipe function immediately gives the array two file descriptors:

If (pipe (pipeFDs))

< 0) report_and_exit("pipeFD"); 父进程和子进程现在都有了文件描述符的副本。但分离关注点模式意味着每个进程恰好只需要一个描述符。在这个例子中,父进程负责写入,而子进程负责读取,尽管这样的角色分配可以反过来。在 if 子句中的***个语句将用于关闭管道的读端: close(pipeFDs[WriteEnd]); /* called in child code */ 在父进程中的 else 子句将会关闭管道的读端: close(pipeFDs[ReadEnd]); /* called in parent code */ 然后父进程将向无名管道中写入某些字节数据(ASCII 代码),子进程读取这些数据,然后向标准输出中回放它们。 在这个程序中还需要澄清的一点是在父进程代码中的 wait 函数。一旦被创建后,子进程很大程度上独立于它的父进程,正如简短的 pipeUN 程序所展示的那样。子进程可以执行任意的代码,而它们可能与父进程完全没有关系。但是,假如当子进程终止时,系统将会通过一个信号来通知父进程。 要是父进程在子进程之前终止又该如何呢?在这种情形下,除非采取了预防措施,子进程将会变成在进程表中的一个僵尸进程。预防措施有两大类型:***种是让父进程去通知系统,告诉系统它对子进程的终止没有任何兴趣: signal(SIGCHLD, SIG_IGN); /* in parent: ignore notification */ 第二种方法是在子进程终止时,让父进程执行一个 wait。这样就确保了父进程可以独立于子进程而存在。在 pipeUN 程序中使用了第二种方法,其中父进程的代码使用的是下面的调用: wait(NULL); /* called in parent */ 这个对 wait 的调用意味着一直等待直到任意一个子进程的终止发生,因此在 pipeUN 程序中,只有一个子进程。(其中的 NULL 参数可以被替换为一个保存有子程序退出状态的整数变量的地址。)对于更细粒度的控制,还可以使用更灵活的 waitpid 函数,例如特别指定多个子进程中的某一个。 pipeUN 将会采取另一个预防措施。当父进程结束了等待,父进程将会调用常规的 exit 函数去退出。对应的,子进程将会调用 _exit 变种来退出,这类变种将快速跟踪终止相关的通知。在效果上,子进程会告诉系统立刻去通知父进程它的这个子进程已经终止了。 假如两个进程向相同的无名管道中写入内容,字节数据会交错吗?例如,假如进程 P1 向管道写入内容: foo bar 同时进程 P2 并发地写入: baz baz 到相同的管道,***的结果似乎是管道中的内容将会是任意错乱的,例如像这样: baz foo baz bar 只要没有写入超过 PIPE_BUF 字节,POSIX 标准就能确保写入不会交错。在 Linux 系统中, PIPE_BUF 的大小是 4096 字节。对于管道我更喜欢只有一个写入方和一个读取方,从而绕过这个问题。 命名管道 无名管道没有备份文件:系统将维持一个内存缓存来将字节数据从写方传给读方。一旦写方和读方终止,这个缓存将会被回收,进而无名管道消失。相反的,命名管道有备份文件和一个不同的 API。 下面让我们通过另一个命令行示例来了解命名管道的要点。下面是具体的步骤: 开启两个终端。这两个终端的工作目录应该相同。 在其中一个终端中,键入下面的两个命令(命令行提示符仍然是 %,我的注释以 ## 打头。): 在最开始,没有任何东西会出现在终端中,因为到现在为止没有在命名管道中写入任何东西。 % mkfifo tester ## 创建一个备份文件,名为 tester % cat tester ## 将管道的内容输出到 stdout 在第二个终端中输入下面的命令: 无论在这个终端中输入什么,它都会在另一个终端中显示出来。一旦键入 Ctrl+C,就会回到正常的命令行提示符,因为管道已经被关闭了。 % cat >

Tester # # redirect keyboard input to the pipe

Hello, world! # # then hit Return key

Bye, bye # # ditto

# # terminate session with a Control-C

Clean up by removing files that implement named pipes:

% unlink tester

As the name of the mkfifo program implies, named pipes are also called FIFO, because * incoming bytes will be * out, and others are similar. There is a library function called mkfifo that allows you to create a named pipe in the program, which will be used in the next example, which consists of two processes: one writes to the named pipe and the other reads from it.

Example 2. FifoWriter program # include # define MaxLoops 12000 / * outer loop * / # define ChunkSize 16 / * how many written at a time * / # define IntsPerChunk 4 / * four 4-byte ints per chunk * / # define MaxZs 250 / * max microseconds to sleep * / int main () {const char* pipeName = ". / fifoChannel"; mkfifo (pipeName, 0666) / * read/write for user/group/others * / int fd = open (pipeName, O_CREAT | O_WRONLY); / * open as write-only * / if (fd

< 0) return -1; /** error **/ int i; for (i = 0; i < MaxLoops; i++) { /* write MaxWrites times */ int j; for (j = 0; j < ChunkSize; j++) { /* each time, write ChunkSize bytes */ int k; int chunk[IntsPerChunk]; for (k = 0; k < IntsPerChunk; k++) chunk[k] = [rand][9](); write(fd, chunk, sizeof(chunk)); } usleep(([rand][9]() % MaxZs) + 1); /* pause a bit for realism */ } close(fd); /* close pipe: generates an end-of-file */ unlink(pipeName); /* unlink from the implementing file */ [printf][10]("%i ints sent to the pipe.\n", MaxLoops * ChunkSize * IntsPerChunk); return 0;} 上面的 fifoWriter 程序可以被总结为如下: 首先程序创建了一个命名管道用来写入数据: 其中的 pipeName 是备份文件的名字,传递给 mkfifo 作为它的***个参数。接着命名管道通过我们熟悉的 open 函数调用被打开,而这个函数将会返回一个文件描述符。 mkfifo(pipeName, 0666); /* read/write perms for user/group/others */ int fd = open(pipeName, O_CREAT | O_WRONLY); 在实现层面上,fifoWriter 不会一次性将所有的数据都写入,而是写入一个块,然后休息随机数目的微秒时间,接着再循环往复。总的来说,有 768000 个 4 字节整数值被写入到命名管道中。 在关闭命名管道后,fifoWriter 也将使用 unlink 取消对该文件的连接。 一旦连接到管道的每个进程都执行了 unlink 操作后,系统将回收这些备份文件。在这个例子中,只有两个这样的进程 fifoWriter 和 fifoReader,它们都做了 unlink 操作。 close(fd); /* close pipe: generates end-of-stream marker */ unlink(pipeName); /* unlink from the implementing file */ 这个两个程序应该在不同终端的相同工作目录中执行。但是 fifoWriter 应该在 fifoReader 之前被启动,因为需要 fifoWriter 去创建管道。然后 fifoReader 才能够获取到刚被创建的命名管道。 示例 3. fifoReader 程序#include #include #include #include #include unsigned is_prime(unsigned n) { /* not pretty, but gets the job done efficiently */ if (n 1; if (0 == (n % 2) || 0 == (n % 3)) return 0; unsigned i; for (i = 5; (i * i) |3|--->

| | 2 |-- > | 2 |-- > | 1 |-- > receiver +-+-+ |

Of the four messages shown above, the one marked 1 is the beginning, that is, the closest to the receiver, then another message marked 2, followed by a message marked 3. If the strict FIFO behavior is followed, messages will be received in the order of 1-2-2-3. However, message queuing allows other collection orders. For example, messages can be received by the receiver in the order of 3-2-1-2.

The mqueue example contains two programs, sender will write data to the message queue, and receiver will read data from this queue. The header file queue.h that both programs contain is as follows:

Example 4. Header file queue.h#define ProjectId 123#define PathName "queue.h" / * any existing, accessible file would do * / # define MsgLen 4#define MsgCount 6 typedef struct {long type; / * must be of type long * / char payload [MsgLen + 1]; / * bytes in the message * /} queuedMessage

The header file above defines a structure type called queuedMessage with two fields, payload (byte array) and type (integer). The file also defines some symbolic constants (using the # define statement). The first two constants are used to generate a key, which in turn is used to get the ID of a message queue. ProjectId can be any positive integer value, and PathName must be an existing, accessible file, in this case, the file queue.h. In sender and receiver, both of them have set statements that are:

Key_t key = ftok (PathName, ProjectId); / * generate key * / int qid = msgget (key, 0666 | IPC_CREAT); / * use key to get queue id * /

ID qid is, in effect, the equivalent of a message queue file descriptor.

Example 5. Sender program # include # include "queue.h" void report_and_exit (const char* msg) {[perror] [6] (msg); [exit] [7] (- 1); / * EXIT_FAILURE * /} int main () {key_t key = ftok (PathName, ProjectId); if (key)

< 0) report_and_exit("couldn't get key..."); int qid = msgget(key, 0666 | IPC_CREAT); if (qid < 0) report_and_exit("couldn't get queue id..."); char* payloads[] = {"msg1", "msg2", "msg3", "msg4", "msg5", "msg6"}; int types[] = {1, 1, 2, 2, 3, 3}; /* each must be >

0 * / int i; for (I = 0; I < MsgCount; iTunes +) {/ * build the message * / queuedMessage msg; msg.type = types [I]; [strcpy] [11] (msg.payload, payloads [I]); / * send the message * / msgsnd (qid, & msg, sizeof (msg), IPC_NOWAIT) / * don't block * / [printf] [10] ("% s sent as type% I\ n", msg.payload, (int) msg.type);} return 0;}

The above sender program will send out six messages, each of which is a type: the first two are type 1, and the next two are type 3. Statement sent:

Msgsnd (qid, & msg, sizeof (msg), IPC_NOWAIT)

It is configured to be non-blocking (IPC_NOWAIT flag) because the message volume here is very small. The only danger is that a complete sequence may cause transmission to fail, while this example will not. The following receiver program will also use the IPC_NOWAIT flag to receive messages.

Example 6. Receiver program # include # include "queue.h" void report_and_exit (const char* msg) {[perror] [6] (msg); [exit] [7] (- 1); / * EXIT_FAILURE * /} int main () {key_t key= ftok (PathName, ProjectId); / * key to identify the queue * / if (key < 0) report_and_exit ("key not gotten...") Int qid = msgget (key, 0666 | IPC_CREAT); / * access if created already * / if (qid < 0) report_and_exit ("no access to queue..."); int types [] = {3, 1, 2, 1, 3, 2}; / * different than in sender * / int i; for (I = 0; I < MsgCount; iTunes +) {queuedMessage msg / * defined in queue.h * / if (msgrcv (qid, & msg, sizeof (msg), types [I], MSG_NOERROR | IPC_NOWAIT) < 0) [puts] [12] ("msgrcv trouble..."); [printf] [10] ("% s received as type% I\ n", msg.payload, (int) msg.type) } / * * remove the queue * * / if (msgctl (qid, IPC_RMID, NULL) < 0) / * NULL ='no flags' * / report_and_exit ("trouble removing queue..."); return 0;}

The receiver program does not create message queues, although API recommends it. In receiver, yes

Int qid = msgget (key, 0666 | IPC_CREAT)

May be misleading because of the IPC_CREAT flag, but the real meaning of this flag is to create it if necessary, otherwise get it directly. The sender program calls msgsnd to send messages, and receiver calls msgrcv to receive them. In this example, sender sends messages in the order of 1-1-2-2-3-3, but receiver receives them in the order of 3-1-2-1-3-2, indicating that message queuing is not bound by strict FIFO behavior:

%. / sendermsg1 sent as type 1msg2 sent as type 1msg3 sent as type 2msg4 sent as type 2msg5 sent as type 3msg6 sent as type 3%. / receivermsg5 received as type 3msg1 received as type 1msg3 received as type 2msg2 received as type 1msg6 received as type 3msg4 received as type 2

The above output shows that sender and receiver can be started in the same terminal. The output also shows that the message queue is persistent, even after the sender process completes the entire process of creating the queue, writing data to the queue, and then exiting. The queue disappears only if the receiver process explicitly calls msgctl to remove the queue:

If (msgctl (qid, IPC_RMID, NULL) < 0) / * remove queue * / this is the end of the article on "how to use pipes and message queues under Linux". I hope the above content can be helpful to you so that you can learn more knowledge. if you think the article is good, please share it for more people to see.

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Servers

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report