歡迎光臨
每天分享高質量文章

Linux 下的行程間通訊:使用管道和訊息佇列 | Linux 中國

學習在 Linux 中行程是如何與其他行程進行同步的。

— Marty Kalin

本篇是 Linux 下行程間通訊(IPC)系列的第二篇文章。第一篇文章 聚焦於透過共享檔案和共享記憶體段這樣的共享儲存來進行 IPC。這篇檔案的重點將轉向管道,它是連線需要通訊的行程之間的通道。管道擁有一個寫端用於寫入位元組資料,還有一個讀端用於按照先入先出的順序讀入這些位元組資料。而這些位元組資料可能代表任何東西:數字、員工記錄、數字電影等等。

管道有兩種型別,命名管道和無名管道,都可以互動式的在命令列或程式中使用它們;相關的例子在下麵展示。這篇文章也將介紹記憶體佇列,儘管它們有些過時了,但它們不應該受這樣的待遇。

在本系列的第一篇文章中的示例程式碼承認了在 IPC 中可能受到競爭條件(不管是基於檔案的還是基於記憶體的)的威脅。自然地我們也會考慮基於管道的 IPC 的安全併發問題,這個也將在本文中提及。針對管道和記憶體佇列的例子將會使用 POSIX 推薦使用的 API,POSIX 的一個核心標的就是執行緒安全。

請檢視一些 mq_open 函式的 man 頁,這個函式屬於記憶體佇列的 API。這個 man 頁中有關 特性的章節帶有一個小表格:

< 如顯示不全,請左右滑動 >
介面 特性
mq_open() 執行緒安全 MT-Safe

上面的 MT-Safe(MT 指的是多執行緒multi-threaded)意味著 mq_open 函式是執行緒安全的,進而暗示是行程安全的:一個行程的執行和它的一個執行緒執行的過程類似,假如競爭條件不會發生在處於相同行程的執行緒中,那麼這樣的條件也不會發生在處於不同行程的執行緒中。MT-Safe 特性保證了呼叫 mq_open 時不會出現競爭條件。一般來說,基於通道的 IPC 是併發安全的,儘管在下麵例子中會出現一個有關警告的註意事項。

無名管道

首先讓我們透過一個特意構造的命令列例子來展示無名管道是如何工作的。在所有的現代系統中,符號 | 在命令列中都代表一個無名管道。假設我們的命令列提示符為 %,接下來考慮下麵的命令:

  1. ## 寫入方在 | 左邊,讀取方在右邊
  2. % sleep 5 | echo "Hello, world!"

sleep 和 echo 程式以不同的行程執行,無名管道允許它們進行通訊。但是上面的例子被特意設計為沒有通訊發生。問候語 “Hello, world!” 出現在螢幕中,然後過了 5 秒後,命令列傳回,暗示 sleep 和 echo 行程都已經結束了。這期間發生了什麼呢?

在命令列中的豎線 | 的語法中,左邊的行程(sleep)是寫入方,右邊的行程(echo)為讀取方。預設情況下,讀取方將會阻塞,直到從通道中能夠讀取到位元組資料,而寫入方在寫完它的位元組資料後,將傳送 流已終止end-of-stream的標誌。(即便寫入方過早終止了,一個流已終止的標誌還是會發給讀取方。)無名管道將保持到寫入方和讀取方都停止的那個時刻。

在上面的例子中,sleep 行程並沒有向通道寫入任何的位元組資料,但在 5 秒後就終止了,這時將向通道傳送一個流已終止的標誌。與此同時,echo 行程立即向標準輸出(螢幕)寫入問候語,因為這個行程並不從通道中讀入任何位元組,所以它並沒有等待。一旦 sleep 和 echo 行程都終止了,不會再用作通訊的無名管道將會消失然後傳回命令列提示符。

下麵這個更加實用的示例將使用兩個無名管道。我們假定檔案 test.dat 的內容如下:

  1. this
  2. is
  3. the
  4. way
  5. the
  6. world
  7. ends

下麵的命令:

  1. % cat test.dat | sort | uniq

會將 cat連線concatenate的縮寫)行程的輸出透過管道傳給 sort 行程以生成排序後的輸出,然後將排序後的輸出透過管道傳給 uniq 行程以消除重覆的記錄(在本例中,會將兩次出現的 “the” 縮減為一個):

  1. ends
  2. is
  3. the
  4. this
  5. way
  6. world

下麵展示的情景展示的是一個帶有兩個行程的程式透過一個無名管道通訊來進行通訊。

示例 1. 兩個行程透過一個無名管道來進行通訊

  1. #include <sys/wait.h> /* wait */
  2. #include <stdio.h>
  3. #include <stdlib.h> /* exit functions */
  4. #include <unistd.h> /* read, write, pipe, _exit */
  5. #include <string.h>
  6. #define ReadEnd 0
  7. #define WriteEnd 1
  8. void report_and_exit(const char* msg) {
  9. [perror][6](msg);
  10. [exit][7](-1); /** failure **/
  11. }
  12. int main() {
  13. int pipeFDs[2]; /* two file descriptors */
  14. char buf; /* 1-byte buffer */
  15. const char* msg = "Nature's first green is gold\n"; /* bytes to write */
  16. if (pipe(pipeFDs) < 0) report_and_exit("pipeFD");
  17. pid_t cpid = fork(); /* fork a child process */
  18. if (cpid < 0) report_and_exit("fork"); /* check for failure */
  19. if (0 == cpid) { /*** child ***/ /* child process */
  20. close(pipeFDs[WriteEnd]); /* child reads, doesn't write */
  21. while (read(pipeFDs[ReadEnd], &buf, 1) > 0) /* read until end of byte stream */
  22. write(STDOUT_FILENO, &buf, sizeof(buf)); /* echo to the standard output */
  23. close(pipeFDs[ReadEnd]); /* close the ReadEnd: all done */
  24. _exit(0); /* exit and notify parent at once */
  25. }
  26. else { /*** parent ***/
  27. close(pipeFDs[ReadEnd]); /* parent writes, doesn't read */
  28. write(pipeFDs[WriteEnd], msg, [strlen][8](msg)); /* write the bytes to the pipe */
  29. close(pipeFDs[WriteEnd]); /* done writing: generate eof */
  30. wait(NULL); /* wait for child to exit */
  31. [exit][7](0); /* exit normally */
  32. }
  33. return 0;
  34. }

上面名為 pipeUN 的程式使用系統函式 fork 來建立一個行程。儘管這個程式只有一個單一的源檔案,在它正確執行的情況下將會發生多行程的情況。

下麵的內容是對庫函式 fork 如何工作的一個簡要回顧:

◈ fork 函式由行程呼叫,在失敗時傳回 -1 給父行程。在 pipeUN 這個例子中,相應的呼叫是:

  1. pid_t cpid = fork(); /* called in parent */

函式呼叫後的傳回值也被儲存下來了。在這個例子中,儲存在整數型別 pid_t的變數 cpid 中。(每個行程有它自己的行程 ID,這是一個非負的整數,用來標記行程)。復刻一個新的行程可能會因為多種原因而失敗,包括行程表滿了的原因,這個結構由系統維持,以此來追蹤行程狀態。明確地說,僵屍行程假如沒有被處理掉,將可能引起行程表被填滿的錯誤。

◈ 假如 fork 呼叫成功,則它將建立一個新的子行程,向父行程傳回一個值,向子行程傳回另外的一個值。在呼叫 fork 後父行程和子行程都將執行相同的程式碼。(子行程繼承了到此為止父行程中宣告的所有變數的複製),特別地,一次成功的 fork 呼叫將傳回如下的東西:

◈ 向子行程傳回 0
◈ 向父行程傳回子行程的行程 ID
◈ 在一次成功的 fork 呼叫後,一個 if/else 或等價的結構將會被用來隔離針對父行程和子行程的程式碼。在這個例子中,相應的宣告為:

  1. if (0 == cpid) { /*** child ***/
  2. ...
  3. }
  4. else { /*** parent ***/
  5. ...
  6. }

假如成功地復刻出了一個子行程,pipeUN 程式將像下麵這樣去執行。在一個整數的數列裡:

  1. int pipeFDs[2]; /* two file descriptors */

來儲存兩個檔案描述符,一個用來向管道中寫入,另一個從管道中寫入。(陣列元素 pipeFDs[0] 是讀端的檔案描述符,元素 pipeFDs[1] 是寫端的檔案描述符。)在呼叫 fork 之前,對系統 pipe 函式的成功呼叫,將立刻使得這個陣列獲得兩個檔案描述符:

  1. if (pipe(pipeFDs) < 0) report_and_exit("pipeFD");

父行程和子行程現在都有了檔案描述符的副本。但分離關註點樣式意味著每個行程恰好只需要一個描述符。在這個例子中,父行程負責寫入,而子行程負責讀取,儘管這樣的角色分配可以反過來。在 if 子句中的第一個陳述句將用於關閉管道的讀端:

  1. close(pipeFDs[WriteEnd]); /* called in child code */

在父行程中的 else 子句將會關閉管道的讀端:

  1. close(pipeFDs[ReadEnd]); /* called in parent code */

然後父行程將向無名管道中寫入某些位元組資料(ASCII 程式碼),子行程讀取這些資料,然後向標準輸出中回放它們。

在這個程式中還需要澄清的一點是在父行程程式碼中的 wait 函式。一旦被建立後,子行程很大程度上獨立於它的父行程,正如簡短的 pipeUN 程式所展示的那樣。子行程可以執行任意的程式碼,而它們可能與父行程完全沒有關係。但是,假如當子行程終止時,系統將會透過一個訊號來通知父行程。

要是父行程在子行程之前終止又該如何呢?在這種情形下,除非採取了預防措施,子行程將會變成在行程表中的一個僵屍行程。預防措施有兩大型別:第一種是讓父行程去通知系統,告訴系統它對子行程的終止沒有任何興趣:

  1. signal(SIGCHLD, SIG_IGN); /* in parent: ignore notification */

第二種方法是在子行程終止時,讓父行程執行一個 wait。這樣就確保了父行程可以獨立於子行程而存在。在 pipeUN 程式中使用了第二種方法,其中父行程的程式碼使用的是下麵的呼叫:

  1. wait(NULL); /* called in parent */

這個對 wait 的呼叫意味著一直等待直到任意一個子行程的終止發生,因此在 pipeUN 程式中,只有一個子行程。(其中的 NULL 引數可以被替換為一個儲存有子程式退出狀態的整數變數的地址。)對於更細粒度的控制,還可以使用更靈活的 waitpid 函式,例如特別指定多個子行程中的某一個。

pipeUN 將會採取另一個預防措施。當父行程結束了等待,父行程將會呼叫常規的 exit 函式去退出。對應的,子行程將會呼叫 _exit 變種來退出,這類變種將快速跟蹤終止相關的通知。在效果上,子行程會告訴系統立刻去通知父行程它的這個子行程已經終止了。

假如兩個行程向相同的無名管道中寫入內容,位元組資料會交錯嗎?例如,假如行程 P1 向管道寫入內容:

  1. foo bar

同時行程 P2 併發地寫入:

  1. baz baz

到相同的管道,最後的結果似乎是管道中的內容將會是任意錯亂的,例如像這樣:

  1. baz foo baz bar

只要沒有寫入超過 PIPE_BUF 位元組,POSIX 標準就能確保寫入不會交錯。在 Linux 系統中, PIPE_BUF 的大小是 4096 位元組。對於管道我更喜歡只有一個寫入方和一個讀取方,從而繞過這個問題。

命名管道

無名管道沒有備份檔案:系統將維持一個記憶體快取來將位元組資料從寫方傳給讀方。一旦寫方和讀方終止,這個快取將會被回收,進而無名管道消失。相反的,命名管道有備份檔案和一個不同的 API。

下麵讓我們透過另一個命令列示例來瞭解命名管道的要點。下麵是具體的步驟:

◈ 開啟兩個終端。這兩個終端的工作目錄應該相同。
◈ 在其中一個終端中,鍵入下麵的兩個命令(命令列提示符仍然是 %,我的註釋以 ## 打頭。):

  1. % mkfifo tester ## 建立一個備份檔案,名為 tester
  2. % cat tester ## 將管道的內容輸出到 stdout

在最開始,沒有任何東西會出現在終端中,因為到現在為止沒有在命名管道中寫入任何東西。

◈ 在第二個終端中輸入下麵的命令:

  1. % cat > tester ## redirect keyboard input to the pipe
  2. hello, world! ## then hit Return key
  3. bye, bye ## ditto
  4. <Control-C> ## terminate session with a Control-C

無論在這個終端中輸入什麼,它都會在另一個終端中顯示出來。一旦鍵入 Ctrl+C,就會回到正常的命令列提示符,因為管道已經被關閉了。

◈ 透過移除實現命名管道的檔案來進行清理:

  1. % unlink tester

正如 mkfifo 程式的名字所暗示的那樣,命名管道也被叫做 FIFO,因為第一個進入的位元組,就會第一個出,其他的類似。有一個名為 mkfifo 的庫函式,用它可以在程式中建立一個命名管道,它將在下一個示例中被用到,該示例由兩個行程組成:一個向命名管道寫入,而另一個從該管道讀取。

示例 2. fifoWriter 程式

  1. #include <sys/types.h>
  2. #include <sys/stat.h>
  3. #include <fcntl.h>
  4. #include <unistd.h>
  5. #include <time.h>
  6. #include <stdlib.h>
  7. #include <stdio.h>
  8. #define MaxLoops 12000 /* outer loop */
  9. #define ChunkSize 16 /* how many written at a time */
  10. #define IntsPerChunk 4 /* four 4-byte ints per chunk */
  11. #define MaxZs 250 /* max microseconds to sleep */
  12. int main() {
  13. const char* pipeName = "./fifoChannel";
  14. mkfifo(pipeName, 0666); /* read/write for user/group/others */
  15. int fd = open(pipeName, O_CREAT | O_WRONLY); /* open as write-only */
  16. if (fd < 0) return -1; /** error **/
  17. int i;
  18. for (i = 0; i < MaxLoops; i++) { /* write MaxWrites times */
  19. int j;
  20. for (j = 0; j < ChunkSize; j++) { /* each time, write ChunkSize bytes */
  21. int k;
  22. int chunk[IntsPerChunk];
  23. for (k = 0; k < IntsPerChunk; k++)
  24. chunk[k] = [rand][9]();
  25. write(fd, chunk, sizeof(chunk));
  26. }
  27. usleep(([rand][9]() % MaxZs) + 1); /* pause a bit for realism */
  28. }
  29. close(fd); /* close pipe: generates an end-of-file */
  30. unlink(pipeName); /* unlink from the implementing file */
  31. [printf][10]("%i ints sent to the pipe.\n", MaxLoops * ChunkSize * IntsPerChunk);
  32. return 0;
  33. }

上面的 fifoWriter 程式可以被總結為如下:

◈ 首先程式建立了一個命名管道用來寫入資料:

  1. mkfifo(pipeName, 0666); /* read/write perms for user/group/others */
  2. int fd = open(pipeName, O_CREAT | O_WRONLY);

其中的 pipeName 是備份檔案的名字,傳遞給 mkfifo 作為它的第一個引數。接著命名管道透過我們熟悉的 open 函式呼叫被開啟,而這個函式將會傳回一個檔案描述符。

◈ 在實現層面上,fifoWriter 不會一次性將所有的資料都寫入,而是寫入一個塊,然後休息隨機數目的微秒時間,接著再迴圈往複。總的來說,有 768000 個 4 位元組整數值被寫入到命名管道中。
◈ 在關閉命名管道後,fifoWriter 也將使用 unlink 取消對該檔案的連線。

  1. close(fd); /* close pipe: generates end-of-stream marker */
  2. unlink(pipeName); /* unlink from the implementing file */

一旦連線到管道的每個行程都執行了 unlink 操作後,系統將回收這些備份檔案。在這個例子中,只有兩個這樣的行程 fifoWriter 和 fifoReader,它們都做了 unlink 操作。

這個兩個程式應該在不同終端的相同工作目錄中執行。但是 fifoWriter 應該在 fifoReader 之前被啟動,因為需要 fifoWriter 去建立管道。然後 fifoReader 才能夠獲取到剛被建立的命名管道。

示例 3. fifoReader 程式

  1. #include <stdio.h>
  2. #include <stdlib.h>
  3. #include <string.h>
  4. #include <fcntl.h>
  5. #include <unistd.h>
  6. unsigned is_prime(unsigned n) { /* not pretty, but gets the job done efficiently */
  7. if (n <= 3) return n > 1;
  8. if (0 == (n % 2) || 0 == (n % 3)) return 0;
  9. unsigned i;
  10. for (i = 5; (i * i) <= n; i += 6)
  11. if (0 == (n % i) || 0 == (n % (i + 2))) return 0;
  12. return 1; /* found a prime! */
  13. }
  14. int main() {
  15. const char* file = "./fifoChannel";
  16. int fd = open(file, O_RDONLY);
  17. if (fd < 0) return -1; /* no point in continuing */
  18. unsigned count = 0, total = 0, primes_count = 0;
  19. while (1) {
  20. int next;
  21. int i;
  22. ssize_t count = read(fd, &next, sizeof(int));
  23. if (0 == count) break; /* end of stream */
  24. else if (count == sizeof(int)) { /* read a 4-byte int value */
  25. total++;
  26. if (is_prime(next)) primes_count++;
  27. }
  28. }
  29. close(fd); /* close pipe from read end */
  30. unlink(file); /* unlink from the underlying file */
  31. [printf][10]("Received ints: %u, primes: %u\n", total, primes_count);
  32. return 0;
  33. }

上面的 fifoReader 的內容可以總結為如下:

◈ 因為 fifoWriter 已經建立了命名管道,所以 fifoReader 只需要利用標準的 open 呼叫來透過備份檔案來獲取到管道中的內容:

  1. const char* file = "./fifoChannel";
  2. int fd = open(file, O_RDONLY);

這個檔案的是以只讀開啟的。

◈ 然後這個程式進入一個潛在的無限迴圈,在每次迴圈時,嘗試讀取 4 位元組的塊。read 呼叫:

  1. ssize_t count = read(fd, &next, sizeof(int));

傳回 0 來暗示該流的結束。在這種情況下,fifoReader 跳出迴圈,關閉命名管道,併在終止前 unlink 備份檔案。

◈ 在讀入 4 位元組整數後,fifoReader 檢查這個數是否為質數。這個操作代表了一個生產級別的讀取器可能在接收到的位元組資料上執行的邏輯操作。在示例執行中,在接收到的 768000 個整數中有 37682 個質數。

重覆執行示例, fifoReader 將成功地讀取 fifoWriter 寫入的所有位元組。這不是很讓人驚訝的。這兩個行程在相同的機器上執行,從而可以不用考慮網路相關的問題。命名管道是一個可信且高效的 IPC 機制,因而被廣泛使用。

下麵是這兩個程式的輸出,它們在不同的終端中啟動,但處於相同的工作目錄:

  1. % ./fifoWriter
  2. 768000 ints sent to the pipe.
  3. ###
  4. % ./fifoReader
  5. Received ints: 768000, primes: 37682

訊息佇列

管道有著嚴格的先入先出行為:第一個被寫入的位元組將會第一個被讀,第二個寫入的位元組將第二個被讀,以此類推。訊息佇列可以做出相同的表現,但它又足夠靈活,可以使得位元組塊可以不以先入先出的次序來接收。

正如它的名字所提示的那樣,訊息佇列是一系列的訊息,每個訊息包含兩部分:

◈ 荷載,一個位元組序列(在 C 中是 char)
◈ 型別,以一個正整數值的形式給定,型別用來分類訊息,為了更靈活的回收

看一下下麵對一個訊息佇列的描述,每個訊息由一個整數型別標記:

  1. +-+ +-+ +-+ +-+
  2. sender--->|3|--->|2|--->|2|--->|1|--->receiver
  3. +-+ +-+ +-+ +-+

在上面展示的 4 個訊息中,標記為 1 的是開頭,即最接近接收端,然後另個標記為 2 的訊息,最後接著一個標記為 3 的訊息。假如按照嚴格的 FIFO 行為執行,訊息將會以 1-2-2-3 這樣的次序被接收。但是訊息佇列允許其他收取次序。例如,訊息可以被接收方以 3-2-1-2 的次序接收。

mqueue 示例包含兩個程式,sender 將向訊息佇列中寫入資料,而 receiver 將從這個佇列中讀取資料。這兩個程式都包含的頭檔案 queue.h 如下所示:

示例 4. 頭檔案 queue.h

  1. #define ProjectId 123
  2. #define PathName "queue.h" /* any existing, accessible file would do */
  3. #define MsgLen 4
  4. #define MsgCount 6
  5. typedef struct {
  6. long type; /* must be of type long */
  7. char payload[MsgLen + 1]; /* bytes in the message */
  8. } queuedMessage;

上面的頭檔案定義了一個名為 queuedMessage 的結構型別,它帶有 payload(位元組陣列)和 type(整數)這兩個域。該檔案也定義了一些符號常數(使用 #define 陳述句),前兩個常數被用來生成一個 key,而這個 key 反過來被用來獲取一個訊息佇列的 ID。ProjectId 可以是任何正整數值,而 PathName 必須是一個存在的、可訪問的檔案,在這個示例中,指的是檔案 queue.h。在 sender 和 receiver 中,它們都有的設定陳述句為:

  1. key_t key = ftok(PathName, ProjectId); /* generate key */
  2. int qid = msgget(key, 0666 | IPC_CREAT); /* use key to get queue id */

ID qid 在效果上是訊息佇列檔案描述符的對應物。

示例 5. sender 程式

  1. #include <stdio.h>
  2. #include <sys/ipc.h>
  3. #include <sys/msg.h>
  4. #include <stdlib.h>
  5. #include <string.h>
  6. #include "queue.h"
  7. void report_and_exit(const char* msg) {
  8. [perror][6](msg);
  9. [exit][7](-1); /* EXIT_FAILURE */
  10. }
  11. int main() {
  12. key_t key = ftok(PathName, ProjectId);
  13. if (key < 0) report_and_exit("couldn't get key...");
  14. int qid = msgget(key, 0666 | IPC_CREAT);
  15. if (qid < 0) report_and_exit("couldn't get queue id...");
  16. char* payloads[] = {"msg1", "msg2", "msg3", "msg4", "msg5", "msg6"};
  17. int types[] = {1, 1, 2, 2, 3, 3}; /* each must be > 0 */
  18. int i;
  19. for (i = 0; i < MsgCount; i++) {
  20. /* build the message */
  21. queuedMessage msg;
  22. msg.type = types[i];
  23. [strcpy][11](msg.payload, payloads[i]);
  24. /* send the message */
  25. msgsnd(qid, &msg, sizeof(msg), IPC_NOWAIT); /* don't block */
  26. [printf][10]("%s sent as type %i\n", msg.payload, (int) msg.type);
  27. }
  28. return 0;
  29. }

上面的 sender 程式將發送出 6 個訊息,每兩個為一個型別:前兩個是型別 1,接著的連個是型別 2,最後的兩個為型別 3。傳送的陳述句:

  1. msgsnd(qid, &msg, sizeof(msg), IPC_NOWAIT);

被配置為非阻塞的(IPC_NOWAIT 標誌),是因為這裡的訊息體量上都很小。唯一的危險在於一個完整的序列將可能導致傳送失敗,而這個例子不會。下麵的 receiver 程式也將使用 IPC_NOWAIT 標誌來接收訊息。

示例 6. receiver 程式

  1. #include <stdio.h>
  2. #include <sys/ipc.h>
  3. #include <sys/msg.h>
  4. #include <stdlib.h>
  5. #include "queue.h"
  6. void report_and_exit(const char* msg) {
  7. [perror][6](msg);
  8. [exit][7](-1); /* EXIT_FAILURE */
  9. }
  10. int main() {
  11. key_t key= ftok(PathName, ProjectId); /* key to identify the queue */
  12. if (key < 0) report_and_exit("key not gotten...");
  13. int qid = msgget(key, 0666 | IPC_CREAT); /* access if created already */
  14. if (qid < 0) report_and_exit("no access to queue...");
  15. int types[] = {3, 1, 2, 1, 3, 2}; /* different than in sender */
  16. int i;
  17. for (i = 0; i < MsgCount; i++) {
  18. queuedMessage msg; /* defined in queue.h */
  19. if (msgrcv(qid, &msg, sizeof(msg), types[i], MSG_NOERROR | IPC_NOWAIT) < 0)
  20. [puts][12]("msgrcv trouble...");
  21. [printf][10]("%s received as type %i\n", msg.payload, (int) msg.type);
  22. }
  23. /** remove the queue **/
  24. if (msgctl(qid, IPC_RMID, NULL) < 0) /* NULL = 'no flags' */
  25. report_and_exit("trouble removing queue...");
  26. return 0;
  27. }

這個 receiver 程式不會建立訊息佇列,儘管 API 儘管建議那樣。在 receiver 中,對

  1. int qid = msgget(key, 0666 | IPC_CREAT);

的呼叫可能因為帶有 IPC_CREAT 標誌而具有誤導性,但是這個標誌的真實意義是如果需要就建立,否則直接獲取sender 程式呼叫 msgsnd 來傳送訊息,而 receiver 呼叫 msgrcv 來接收它們。在這個例子中,sender 以 1-1-2-2-3-3 的次序傳送訊息,但 receiver 接收它們的次序為 3-1-2-1-3-2,這顯示訊息佇列沒有被嚴格的 FIFO 行為所拘泥:

  1. % ./sender
  2. msg1 sent as type 1
  3. msg2 sent as type 1
  4. msg3 sent as type 2
  5. msg4 sent as type 2
  6. msg5 sent as type 3
  7. msg6 sent as type 3
  8. % ./receiver
  9. msg5 received as type 3
  10. msg1 received as type 1
  11. msg3 received as type 2
  12. msg2 received as type 1
  13. msg6 received as type 3
  14. msg4 received as type 2

上面的輸出顯示 sender 和 receiver 可以在同一個終端中啟動。輸出也顯示訊息佇列是持久的,即便 sender 行程在完成建立佇列、向佇列寫資料、然後退出的整個過程後,該佇列仍然存在。只有在 receiver 行程顯式地呼叫 msgctl 來移除該佇列,這個佇列才會消失:

  1. if (msgctl(qid, IPC_RMID, NULL) < 0) /* remove queue */

總結

管道和訊息佇列的 API 在根本上來說都是單向的:一個行程寫,然後另一個行程讀。當然還存在雙向命名管道的實現,但我認為這個 IPC 機制在它最為簡單的時候反而是最佳的。正如前面提到的那樣,訊息佇列已經不大受歡迎了,儘管沒有找到什麼特別好的原因來解釋這個現象;而佇列仍然是 IPC 工具箱中的一個工具。這個快速的 IPC 工具箱之旅將以第 3 部分(透過套接字和訊號來示例 IPC)來終結。


已同步到看一看
贊(0)

分享創造快樂