無名管道的C++封裝
xpipe-無名管道的C++封裝類
無名管道的C++封裝類,用於父子進程進行通信
基礎介紹
unix下一切皆文件,管道也不例外。無名管道pipe定義在<unistd.h>中。
#include <unistd.h> int pipe(int fd[2]);其中fd[0]是讀端,fd[1]是寫端,fd[1]的輸出是fd[0]的輸入,因此管道是一個有向的半雙工通信方式。使用`write(fd[1],...)`和`read(fd[0],...)`對管道中的信息進行讀寫。無名管道通常運用於父子進程間通信。關閉讀端或者寫端是使用`close`函數,同文件句柄一樣,關閉後不能重新打開。如果關閉後使用該端,係統會發送一個`SIGPIPE`的信號。作為一個文件,管道有一個緩存大小限製,這是一個運行時限製,使用`fpathconf`函數可以查看其大小,類型名為`_PC_PIPE_BUF`.
如:
cout<<fpathconf(fd[0],_PC_PIPE_BUF)<<endl;在我的 Ubuntu10.10 下為4096字節,剛好一頁大小。而在AIX服務器上,管道大小的限製則為32768字節。
讀寫管道使用係統函數read和write,如:
write(m_fd[1],content.c_str(),content.length());這能體現管道作為文件的本質,但不能體現通信的意圖,因此我將管道的讀寫封裝為與socket中發送和接收。
ssize_t xpipe::send(void *buf, size_t n) { return write(m_fd[1], buf, n); } ssize_t xpipe::recv(void *buf, size_t nbytes) { return read(m_fd[0], buf, nbytes); }
使用中,通信的內容常常為字符串,上述兩個函數不僅能滿足這個要求,還能傳遞一些簡單結構體消息(稍後在討論),但是每次都要輸入長度。為簡化開發,我將send和recv重載,作為特化方法,方便字符串的傳遞。使用方法非常簡單,如:
xpipe x; x.send("Whose your daddy?"); string rs; x.recv(rs);
關於簡單結構體,需要作個說明,這裏指的是由C++基本類型組合而成的結構體,如:
class child { public: long id; char name[20]; };
注意: string不是基本類型 。傳遞結構體消息示例如下:
xpipe x; child cc; cc.id=10; strcpy(cc.name,"PAYBY"); x.send((child *)&cc,sizeof(child)); /*-------------------------*/ child dd; x.recv((child *)&dd,sizeof(child));
通信設計
文件是常見的通信媒介。但對文件的讀寫必須要加上讀寫的角色信息才能體現通信的過程。一個簡單的單向通信包含消息發送方和消息接收方。對管道讀寫時常常有可以看到接收進程(讀進程)關閉寫端口,發送進程(寫進程)關閉讀端口,這樣做是為了確保信息流向的單一,以免信息從接收進程流向發送進程。對通信而言,這依然不夠直觀。單向的信息流動中,一方僅僅是發送者(senderonly),另一方僅僅是接收者(receiveronly)。因此,在父子進程通信過程中,給他們指定角色就可以了。
示例代碼如下:
xpipe x; pid_t pid=fork(); string item="whose your daddy"; if (pid==0) {//child process x.receiveronly(); string rs; x.recv(rs); //check point assert(rs==item); exit(0); } else if (pid>0) {//parent process int ret; x.senderonly(); x.send(item); wait(&ret); }
在示例代碼中父進程被指定為發送者(x.senderonly();),不能通過`x`管道進行接收信息,子進程被指定為接收者(x.receiveronly();),不能通過x管道進行發送信息。要實現父子進程的互相通信。可以在指定另一個管道,將子進程指定為發送者,父進程指定為接收者。(見使用示例)
使用示例
父子進程互相通信
xpipe x; xpipe y; pid_t pid=fork(); string x_item="whose your daddy?"; string y_item="my father is Ligang!"; if (pid==0) {//child process x.receiveronly(); y.senderonly(); string rs; x.recv(rs); //check point assert(rs==x_item); y.send(y_item); cout<<"child process:"<<y_item<<endl; exit(0); } else if (pid>0) {//parent process int ret; x.senderonly(); y.receiveronly(); x.send(x_item); cout<<"parent process:"<<x_item<<endl; string ts; y.recv(ts); assert(ts==y_item); wait(&ret); }
預期結果為:
parent process:whose your daddy? child process:my father is Ligang!
代碼一覽
頭文件xpipe.h
#ifndef __XPIPEH__ #define __XPIPEH__ #include <unistd.h> #include <string> #include <string.h> #include <stdio.h> using namespace std; /* 無名管道的C++封裝類,用於父子進程進行通信 時間 :2013年7月15日 20:30:58 郵 箱:chen_xueyou@163.com */ class xpipe { public: xpipe(); ~xpipe(); ///核心方法 ssize_t send(void *buf, size_t n); ssize_t recv(void *buf, size_t nbytes); ///常用方法特化 void send(const string &content); void recv(string &content); //確定通信角色 void senderonly(){DisReadable();} void receiveronly(){DisWriteable();} //屬性操作 string role() const; long Bufsize(long newbufsize=0); private: //讀寫關閉操作 void DisReadable(); void DisWriteable(); /* data */ private: int m_fd[2]; bool m_readable; bool m_writeable; long m_bufsize; char * m_buf; }; #endif
xpipe.cpp
#ifndef __XPIPECPP__ #define __XPIPECPP__ #include "xpipe.h" xpipe::xpipe() :m_readable(true),m_writeable(true),m_buf(NULL) { int success=pipe(m_fd); if(success<0) { throw puts("create pipe failed!"); } //檢測係統設置的管道限製大小 m_bufsize=fpathconf(m_fd[0],_PC_PIPE_BUF); } xpipe::~xpipe() { if(m_readable) close(m_fd[0]); if(m_writeable) close(m_fd[1]); if(m_buf!=NULL) delete m_buf; } ssize_t xpipe::send(void *buf, size_t n) { return write(m_fd[1], buf, n); } ssize_t xpipe::recv(void *buf, size_t nbytes) { return read(m_fd[0], buf, nbytes); } void xpipe::send(const string &content) { write(m_fd[1],content.c_str(),content.length()); } void xpipe::recv(string &content) { if (m_buf==NULL) {//lazy run m_buf=new char[m_bufsize]; if (m_buf==NULL) { throw puts("memory not enough!"); } } memset(m_buf,0,m_bufsize); read(m_fd[0],m_buf,m_bufsize); content=string(m_buf); } //返回當前管道所扮演到角色 string xpipe::role() const { if (m_writeable&&m_readable) { return "sender and receiver"; } if (m_writeable) { return "sender"; } if (m_readable) { return "receiver"; } return "none"; } /*關閉讀端口*/ void xpipe::DisReadable() { if(m_readable) { close(m_fd[0]); m_readable=false; } } /*關閉寫端口*/ void xpipe::DisWriteable() { if (m_writeable) { close(m_fd[1]); m_writeable=false; } } /*如果輸入大於0:調整緩存區大小,並返回調整後緩存區大小 如果輸入小於等於0,則不設置,隻返回緩存區大小 緩存區大小構造時默認設置為係統對管道的限製大小 默認參數為0 */ long xpipe::Bufsize(long newbufsize) { //大於0才設置 if (newbufsize>0) { m_bufsize=newbufsize; delete m_buf; //重新申請緩存區 m_buf=new char[m_bufsize]; if (m_buf==NULL) { throw puts("memory not enough!"); } } return m_bufsize; } #endif
測試文件test.cpp
#include <iostream> #include <assert.h> #include <sys/types.h> #include <sys/wait.h> #include <stdlib.h> #include "xpipe.h" using namespace std; /*test Bufszie*/ void test1() { xpipe x; int fd[2]; pipe(fd); //check point assert(x.Bufsize()==fpathconf(fd[0],_PC_PIPE_BUF)); x.Bufsize(20); //check point assert(x.Bufsize()==20); } /*test read/recv*/ ///////////////////////////////////// class childreq { public: long recid; char billtype[20]; }; void test2() { xpipe x; pid_t pid=fork(); if (pid==0) { x.receiveronly(); childreq dd; x.recv((childreq *)&dd,sizeof(childreq)); //check point assert(dd.recid==10); assert(!strcmp(dd.billtype,"PAYBY")); exit(0); } else if (pid>0) { x.senderonly(); childreq cc; cc.recid=10; strcpy(cc.billtype,"PAYBY"); x.send((childreq *)&cc,sizeof(childreq)); int ret; wait(&ret); } } /*test read/recv*/ void test3() { xpipe x; pid_t pid=fork(); string item="whose your daddy"; if (pid==0) {//child process x.receiveronly(); string rs; x.recv(rs); //check point assert(rs==item); exit(0); } else if (pid>0) {//parent process int ret; x.senderonly(); x.send(item); wait(&ret); } } /*test role*/ void test4() { xpipe x; assert(x.role()=="sender and receiver"); x.senderonly(); assert(x.role()=="sender"); x.receiveronly(); assert(x.role()=="none"); xpipe y; y.receiveronly(); assert(y.role()=="receiver"); } /*test read/recv*/ void test5() { xpipe x; xpipe y; pid_t pid=fork(); string x_item="whose your daddy?"; string y_item="my father is Ligang!"; if (pid==0) {//child process x.receiveronly(); y.senderonly(); string rs; x.recv(rs); //check point assert(rs==x_item); y.send(y_item); cout<<"child process:"<<y_item<<endl; exit(0); } else if (pid>0) {//parent process int ret; x.senderonly(); y.receiveronly(); x.send(x_item); cout<<"parent process:"<<x_item<<endl; string ts; y.recv(ts); assert(ts==y_item); wait(&ret); } } int main(int argc, char const *argv[]) { test1(); test2(); test3(); test4(); test5(); cout<<"pass all the tests"<<endl; }
makefile文件
CXX=g++ all: $(CXX) -c xpipe.cpp $(CXX) test.cpp -o test xpipe.o clean: rm xpipe.o test
有問題,請給我留言,我的0CSDN博客 | 新浪微博 | 個人網站
源碼在Github上,點擊下載
最後更新:2017-04-03 16:48:36