nanomsg实验——pubsub
发布订阅模式是很多消息中间件提供的常见功能。通过消息机制,能够将消息发布者和消息接收(消费)者
进行解耦。pubsub模式也是nanomsg直接支持的一直消息模型之一,因此通过pubsub模式实验,
同时也大致了解了下nanomsg的基础用法。
服务端
代码如下 | 复制代码 |
#include <stdio.h> #include <stdlib.h> #include <time.h> #include <string.h> #include <unistd.h> #include <nanomsg/nn.h> #include <nanomsg/pubsub.h> void usage(const char *name) { fprintf(stderr, "%s [ bind url]n", name); } int main(int argc, char **argv) { if(argc != 2) { usage(argv[0]); exit(-1); } const char *url = argv[1]; int sock = nn_socket(AF_SP, NN_PUB); if(sock < 0) { fprintf (stderr, "nn_socket failed: %sn", nn_strerror (errno)); exit(-1); } if(nn_bind(sock, url) < 0) { fprintf(stderr, "nn_bind failed: %sn", nn_strerror(errno)); exit(-1); } while(1) { time_t rawtime; struct tm * timeinfo; time (&rawtime); timeinfo = localtime (&rawtime); char *text = asctime (timeinfo); int textLen = strlen(text); text[textLen - 1] = ''; printf ("SERVER: PUBLISHING DATE %sn", text); nn_send(sock, text, textLen, 0); sleep(1); } return 0; } |
nanomsg使用非常简单,只要直接include nanomsg/nn.h,即可使用基本API。使用内置的通信模式,
需要引入对应的头文件,如pubsub模式,引入nonomsg/pubsub.h即可。
pubsub server,需要首先通过nn_socket调用创建socket,这里模仿了POSIX接口,
函数返回一个文件描述符。因此直接通过判断返回值是否大于0,判断是否创建成功。注意第二个参数为协议,
在协议相关头文件中会定义对应的宏。然后所有操作都将基于这个文件描述符。
和berkeley sockets一样,server需要bind一个端口,nanomsg需要bind一个url。目前nanomsg支持的格式有:
* 进程内通信(inproc):url格式为inproc://test
* 进程间同in想(ipc):url格式为ipc:///tmp/test.ipc
* tcp通信:url格式为tcp://*:5555
github上源码貌似已经支持websocket了。
nanomsg的错误和UNIX相同,失败之后会设置errno,可以通过nn_strerror获取对应的错误文本。
bind完了之后,就可以通过nn_send函数向socket发送消息了。这个函数参数和berkeley sockets api接口类似。
这里直接获取当前时间,然后发出给所有订阅者。
客户端
代码如下 | 复制代码 |
#include <stdio.h> #include <stdlib.h> #include <nanomsg/nn.h> #include <nanomsg/pubsub.h> int main(int argc, char **argv) { if(argc != 3) { fprintf(stderr, "usage: %s NAME BIND_URLn", argv[0]); exit(-1); } const char *name = argv[1]; const char *url = argv[2]; int sock = nn_socket (AF_SP, NN_SUB); if(sock < 0) { fprintf(stderr, "fail to create socket: %sn", nn_strerror(errno)); exit(-1); } if(nn_setsockopt (sock, NN_SUB, NN_SUB_SUBSCRIBE, "", 0) < 0) { fprintf(stderr, "fail to set sorket opts: %sn", nn_strerror(errno)); exit(-1); } if (nn_connect(sock, url) < 0) { fprintf(stderr, "fail to connect to %s : %sn", url, nn_strerror(errno)); exit(-1); } while ( 1 ) { char *buf = NULL; int bytes = nn_recv (sock, &buf, NN_MSG, 0); printf ("CLIENT (%s): RECEIVED %sn", name, buf); nn_freemsg (buf); } nn_shutdown(sock, 0); return 0; } |
客户端初始化和服务端差不多,在连接服务端之前,需要通过nn_setsockopt将当前socket设置成消息订阅者。
然后通过nn_connect连接发布者,参数和服务端bind的差不多,也是一个socket、一个url。
这里的url要和服务端bind的url相同。之后就是一个死循环不停的接收发布者的消息。
测试
首先是编译,和普通c程序相同,只是增加链接nanomsg。
gcc -o pubserver pubserver.c -lnanomsg
gcc -o pubclient pubclient.c -lnanomsg
为了方便测试,写了一个简单的shell脚本:
代码如下 | 复制代码 |
#!/bin/bash BASE="$( cd "$( dirname "$0" )" && pwd )" PUB=$BASE/pubserver SUB=$BASE/pubclient URL="tcp://127.0.0.1:1234" echo "start pubserver to bind tcp: $URL" $PUB tcp://127.0.0.1:1234 & echo "start to start pubclient" for((i = 0; i < 10; i++)) do echo "start client$i" $SUB client$i $URL & sleep 1 done sleep 20 echo "kill all process and exit" for pid in `jobs -p` do echo "kill $pid" kill $pid done wait |
脚本很简单,首先启动一个消息发布者,然后每秒启动一个消息接受者。等待20s之后,kill掉所有子进程。
脚本的输出:
代码如下 | 复制代码 |
start pubserver to bind tcp: tcp://127.0.0.1:1234 start to start pubclient start client0 SERVER: PUBLISHING DATE Tue Feb 17 15:12:11 2015 start client1 SERVER: PUBLISHING DATE Tue Feb 17 15:12:12 2015 CLIENT (client0): RECEIVED Tue Feb 17 15:12:12 2015 CLIENT (client1): RECEIVED Tue Feb 17 15:12:12 2015 start client2 SERVER: PUBLISHING DATE Tue Feb 17 15:12:13 2015 CLIENT (client0): RECEIVED Tue Feb 17 15:12:13 2015 CLIENT (client1): RECEIVED Tue Feb 17 15:12:13 2015 CLIENT (client2): RECEIVED Tue Feb 17 15:12:13 2015 start client3 SERVER: PUBLISHING DATE Tue Feb 17 15:12:14 2015 CLIENT (client0): RECEIVED Tue Feb 17 15:12:14 2015 CLIENT (client1): RECEIVED Tue Feb 17 15:12:14 2015 CLIENT (client2): RECEIVED Tue Feb 17 15:12:14 2015 ... SERVER: PUBLISHING DATE Tue Feb 17 15:12:41 2015 CLIENT (client0): RECEIVED Tue Feb 17 15:12:41 2015 CLIENT (client1): RECEIVED Tue Feb 17 15:12:41 2015 CLIENT (client2): RECEIVED Tue Feb 17 15:12:41 2015 CLIENT (client3): RECEIVED Tue Feb 17 15:12:41 2015 CLIENT (client4): RECEIVED Tue Feb 17 15:12:41 2015 CLIENT (client5): RECEIVED Tue Feb 17 15:12:41 2015 CLIENT (client6): RECEIVED Tue Feb 17 15:12:41 2015 CLIENT (client7): RECEIVED Tue Feb 17 15:12:41 2015 CLIENT (client8): RECEIVED Tue Feb 17 15:12:41 2015 CLIENT (client9): RECEIVED Tue Feb 17 15:12:41 2015 kill all process and exit |
可以看见每次启动一个新的订阅者,每个订阅者都能够收到发布者发布的当前时间。
nanomsg实验——survey
survey模式是由server发出询问,client针对请求回复响应的一种模式。这种模式在分布式系统中非常有用,
可以用来做服务发现、分布式事物等分布式询问。
客户端
客户端实现比较方便,除了基础调用(创建socket、连接url)之外,就是先接收服务端询问
(例子中比较简单,服务端询问是固定的,所以没有对内容进行检查)针对询问发送响应
(例子中是发送服务端当前时间)
代码如下 | 复制代码 |
#include <cstdio> #include <cstdlib> #include <cstring> #include <ctime> #include <nanomsg/nn.h> #include <nanomsg/survey.h> using namespace std; int main(int argc, const char **argv) { if(argc != 3) { fprintf(stderr, "usage: %s NAME URLn", argv[0]); exit(-1); } const char *name = argv[1]; const char *url = argv[2]; int sock = nn_socket(AF_SP, NN_RESPONDENT); if(sock < 0){ fprintf(stderr, "nn_socket fail: %sn", nn_strerror(errno)); exit(-1); } if(nn_connect(sock, url) < 0) { fprintf(stderr, "nn_connect fail: %sn", nn_strerror(errno)); exit(-1); } while(1){ char *buf = NULL; int bytes = nn_recv (sock, &buf, NN_MSG, 0); if(bytes > 0) { printf ("CLIENT (%s): RECEIVED "%s" SURVEY REQUESTn", name, buf); nn_freemsg (buf); char sendBuffer[128]; time_t rawtime; struct tm * timeinfo; time (&rawtime); timeinfo = localtime (&rawtime); char *timeText = asctime (timeinfo); int textLen = strlen(timeText); timeText[textLen - 1] = ''; sprintf(sendBuffer, "[ %s ] %s", name, timeText); int sendSize = strlen(sendBuffer) + 1; int actualSendSize = nn_send(sock, sendBuffer, sendSize, 0); if(actualSendSize != sendSize) { fprintf(stderr, "nn_send fail, expect length %d, actual length %dn", sendSize, actualSendSize); continue; } } } nn_shutdown(sock, 0); return 0; } |
这里收到消息后,就简单的打印,然后将响应数据写会给服务端。
服务端
服务端有个问题,之前搜索了几个例子都不太正常。经过尝试和简单查看代码之后发现,通过nanomsg基础api,
无法获取当前有多少客户端。但是,如果当前所有连接的客户端的响应都已经收到,再次调用nn_recv之后,
会直接返回-1,表示读取失败,同时errno(通过errno函数获取)被设置为EFSM,表示当前状态机状态不正确。
代码如下 | 复制代码 |
#include <cstdio> #include <cstdlib> #include <cstring> #include <unistd.h> #include <nanomsg/nn.h> #include <nanomsg/survey.h> using namespace std; const char *SURVEY_TYPE = "DATE"; int main(int argc, char** argv) { if ( argc != 2 ) { fprintf(stderr, "usage: %s URLn", argv[0]); exit(-1); } const char *url = argv[1]; int sock = nn_socket(AF_SP, NN_SURVEYOR); if(sock < 0) { fprintf (stderr, "nn_socket failed: %sn", nn_strerror (errno)); exit(-1); } if(nn_bind(sock, url) < 0) { fprintf(stderr, "nn_bind fail: %sn", nn_strerror(errno)); exit(-1); } while(1) { int sendSize = strlen(SURVEY_TYPE) + 1; int actualSendSize; printf ("SERVER: SENDING DATE SURVEY REQUESTn"); if ((actualSendSize = nn_send(sock, SURVEY_TYPE, sendSize, 0)) != sendSize) { fprintf(stderr, "nn_send fail, expect length %d, actual length %dn", sendSize, actualSendSize); continue; } int count = 0; while(1) { char *buf = NULL; int bytes = nn_recv (sock, &buf, NN_MSG, 0); if (bytes < 0 && nn_errno() == ETIMEDOUT) break; if (bytes >= 0) { printf ("SERVER: RECEIVED "%s" SURVEY RESPONSEn", buf); ++count; nn_freemsg (buf); } else { fprintf(stderr, "nn_recv fail: %sn", nn_strerror(errno)); break; } } printf("SERVER: current receive %d survey response.n", count); sleep(1); } nn_shutdown(sock, 0); return 0; } |
这里用了两个死循环,外层循环不停尝试向客户端发起询问。完成询问后,通过另外一个死循环读取所有的客户端响应,
当读取失败时退出循环。
之前找到的源码是直接判断错误是否ETIMEDOUT,经过打印会发现每次都没有超时,而是状态机错误:
代码如下 | 复制代码 |
/* If no survey is going on return EFSM error. */ if (nn_slow (!nn_surveyor_inprogress (surveyor))) return -EFSM; |
测试
测试和前文差不多,先启动一个server,然后再一个个启动client:
代码如下 | 复制代码 |
#!/bin/bash BASE="$( cd "$( dirname "$0" )" && pwd )" SERVER=$BASE/surveyserver CLIENT=$BASE/surveyclient URL="tcp://127.0.0.1:1234" echo "start surveyserver to bind tcp: $URL" $SERVER tcp://127.0.0.1:1234 & echo "start to start surveyclient" for((i = 0; i < 10; i++)) do echo "start client$i" $CLIENT client$i $URL & sleep 1 done sleep 20 echo "kill all process and exit" for pid in `jobs -p` do echo "kill $pid" kill $pid done wait |
输出为:
代码如下 | 复制代码 |
start surveyserver to bind tcp: tcp://127.0.0.1:1234 start to start surveyclient start client0 SERVER: SENDING DATE SURVEY REQUEST start client1 nn_recv fail: Operation cannot be performed in this state SERVER: current receive 0 survey response. start client2 SERVER: SENDING DATE SURVEY REQUEST CLIENT (client0): RECEIVED "DATE" SURVEY REQUEST SERVER: RECEIVED "[ client0 ] Tue Feb 17 23:32:43 2015" SURVEY RESPONSE CLIENT (client1): RECEIVED "DATE" SURVEY REQUEST SERVER: RECEIVED "[ client1 ] Tue Feb 17 23:32:43 2015" SURVEY RESPONSE nn_recv fail: Operation cannot be performed in this state SERVER: current receive 2 survey response. start client3 SERVER: SENDING DATE SURVEY REQUEST CLIENT (client0): RECEIVED "DATE" SURVEY REQUEST CLIENT (client1): RECEIVED "DATE" SURVEY REQUEST CLIENT (client2): RECEIVED "DATE" SURVEY REQUEST ... SERVER: SENDING DATE SURVEY REQUEST CLIENT (client0): RECEIVED "DATE" SURVEY REQUEST CLIENT (client1): RECEIVED "DATE" SURVEY REQUEST CLIENT (client2): RECEIVED "DATE" SURVEY REQUEST CLIENT (client3): RECEIVED "DATE" SURVEY REQUEST CLIENT (client4): RECEIVED "DATE" SURVEY REQUEST CLIENT (client5): RECEIVED "DATE" SURVEY REQUEST CLIENT (client6): RECEIVED "DATE" SURVEY REQUEST CLIENT (client7): RECEIVED "DATE" SURVEY REQUEST CLIENT (client9): RECEIVED "DATE" SURVEY REQUEST CLIENT (client8): RECEIVED "DATE" SURVEY REQUEST SERVER: RECEIVED "[ client0 ] Tue Feb 17 23:33:09 2015" SURVEY RESPONSE SERVER: RECEIVED "[ client1 ] Tue Feb 17 23:33:09 2015" SURVEY RESPONSE SERVER: RECEIVED "[ client2 ] Tue Feb 17 23:33:09 2015" SURVEY RESPONSE SERVER: RECEIVED "[ client3 ] Tue Feb 17 23:33:09 2015" SURVEY RESPONSE SERVER: RECEIVED "[ client4 ] Tue Feb 17 23:33:09 2015" SURVEY RESPONSE SERVER: RECEIVED "[ client5 ] Tue Feb 17 23:33:09 2015" SURVEY RESPONSE SERVER: RECEIVED "[ client6 ] Tue Feb 17 23:33:09 2015" SURVEY RESPONSE SERVER: RECEIVED "[ client7 ] Tue Feb 17 23:33:09 2015" SURVEY RESPONSE SERVER: RECEIVED "[ client9 ] Tue Feb 17 23:33:09 2015" SURVEY RESPONSE SERVER: RECEIVED "[ client8 ] Tue Feb 17 23:33:09 2015" SURVEY RESPONSE nn_recv fail: Operation cannot be performed in this state SERVER: current receive 10 survey response. |
从输出可以看见,每次最后一个接收完成之后,都会有一个“Operation cannot be performed in this state”
错误,也就是EFSM错误。
茶杯头甜蜜终章dlc 官方手机版v1.0.0.3
下载火柴人传说暗影格斗内置菜单 最新版v3.0.1
下载荒野乱斗测试服 安卓版v61.10.3
下载荒野乱斗彩虹服 安卓版v61.10.3
下载寒霜启示录 安卓版v1.25.10
寒霜启示录是一款生存模拟游戏,不少玩家可能对于末日都有着自己
末日城堡免广告版 安卓最新版v0.7.1
末日城堡免广告版是一款非常好玩的模拟经营类游戏,内部可以不看
甜蜜人生模拟器 最新版v1.4.5
甜蜜人生模拟器是一款非常好玩的模拟恋爱手游,玩家在这里能够对
武器锻造师内置功能菜单 v10.4
武器锻造师内置菜单版是游戏的破解版本,在该版本中为玩家提供了
开放空间overfield 安卓版v1.0.5
开放空间Overfield是一款箱庭养成经营手游,让你在广阔