00001
00002 #include <nanomsg/nn.h>
00003 #include <nanomsg/pair.h>
00004 #include <nanomsg/pubsub.h>
00005 #include <nanomsg/inproc.h>
00006 #include <cstring>
00007 #include <iostream>
00008 #include <cstdlib>
00009 #include <unistd.h>
00010 #define SOCKET_ADDRESS "inproc://test"
00011
00012 static int test_socket (int family, int protocol)
00013 {
00014 int sock;
00015
00016 sock = nn_socket (family, protocol);
00017 if (sock == -1) {
00018 std::cout << "Error opening socket" << std::endl;
00019 }
00020 return sock;
00021 }
00022
00023 static int test_connect (int sock, const char *address)
00024 {
00025 int rc;
00026 rc = nn_connect (sock, address);
00027 if(rc < 0) {
00028 std::cout << "Error on connect"<< std::endl;
00029 }
00030 return rc;
00031 }
00032 static int test_bind(int sock, const char *address)
00033 {
00034 int rc;
00035
00036 rc = nn_bind (sock, address);
00037 if(rc < 0) {
00038 std::cout << "Error on bind"<< std::endl;
00039 }
00040 return rc;
00041 }
00042 static void test_send (int sock, const char *data)
00043 {
00044 size_t data_len;
00045 int rc;
00046
00047 data_len = strlen(data);
00048
00049 std::cout << "Send: " << data << std::endl;
00050 rc = nn_send (sock, data, data_len, 0);
00051 if (rc < 0) {
00052 std::cout << "Error on send"<< std::endl;
00053 }
00054 if (rc != (int)data_len) {
00055 std::cout << "error on data to send"<< std::endl;
00056 }
00057 }
00058 static void test_recv (int sock, const char *data)
00059 {
00060 size_t data_len;
00061 int rc;
00062 char *buf;
00063
00064 data_len = strlen(data);
00065
00066
00067 buf = (char*)malloc(data_len+1);
00068
00069
00070 rc = nn_recv (sock, buf, data_len+1, 0);
00071 if (rc < 0) {
00072 std::cout << "Error on receive"<< std::endl;
00073 }
00074 if (rc != (int)data_len) {
00075 std::cout << "Error on data wrong length"<< std::endl;
00076 }
00077 if (memcmp (data, buf, data_len) != 0) {
00078 std::cout << "Error on received data is wrong"<< std::endl;
00079 }
00080 std::cout << "Receive: " << buf<< std::endl;
00081
00082 free (buf);
00083 }
00084
00085 static void test_close (int sock)
00086 {
00087 int rc;
00088
00089 rc = nn_close (sock);
00090 if (rc != 0) {
00091 std::cout << "Error on close"<< std::endl;
00092 }
00093 }
00094
00095 void test_pair() {
00096 int rc;
00097 int sb;
00098 int sc;
00099 int i;
00100 char buf [256];
00101 int val;
00102
00103
00104 sc = test_socket (AF_SP, NN_PAIR);
00105 test_connect (sc, SOCKET_ADDRESS);
00106 sb = test_socket (AF_SP, NN_PAIR);
00107 test_bind (sb, SOCKET_ADDRESS);
00108
00109
00110 rc = nn_bind (sc, SOCKET_ADDRESS);
00111
00112
00113
00114 for (i = 0; i != 100; ++i) {
00115
00116 test_send (sc, "ABC");
00117 test_recv (sb, "ABC");
00118 test_send (sb, "DEFG");
00119 test_recv (sc, "DEFG");
00120 }
00121
00122
00123 for (i = 0; i != 100; ++i) {
00124 test_send (sc, "XYZ");
00125 }
00126 for (i = 0; i != 100; ++i) {
00127 test_recv (sb, "XYZ");
00128 }
00129
00130 test_close (sc);
00131 test_close (sb);
00132
00133
00134 sb = test_socket (AF_SP, NN_PAIR);
00135 val = 200;
00136 rc = nn_setsockopt (sb, NN_SOL_SOCKET, NN_RCVBUF, &val, sizeof (val));
00137
00138 test_bind (sb, SOCKET_ADDRESS);
00139 sc = test_socket (AF_SP, NN_PAIR);
00140 test_connect (sc, SOCKET_ADDRESS);
00141
00142 val = 200;
00143 rc = nn_setsockopt (sc, NN_SOL_SOCKET, NN_SNDTIMEO, &val, sizeof (val));
00144
00145 i = 0;
00146 while (1) {
00147 rc = nn_send (sc, "0123456789", 10, 0);
00148 if (rc < 0 && nn_errno () == EAGAIN)
00149 break;
00150
00151
00152 ++i;
00153 }
00154
00155 test_recv (sb, "0123456789");
00156 test_send (sc, "0123456789");
00157 rc = nn_send (sc, "0123456789", 10, 0);
00158
00159 for (i = 0; i != 20; ++i) {
00160 test_recv (sb, "0123456789");
00161 }
00162
00163
00164
00165 for (i = 0; i != sizeof (buf); ++i)
00166 buf [i] = 'A';
00167 rc = nn_send (sc, buf, 256, 0);
00168
00169
00170 rc = nn_recv (sb, buf, sizeof (buf), 0);
00171
00172
00173
00174 test_close (sc);
00175 test_close (sb);
00176 }
00177
00178 void test_pubsub() {
00179 int rc;
00180 int pub1;
00181 int pub2;
00182 int sub1;
00183 int sub2;
00184 char buf [8];
00185 size_t sz;
00186
00187 pub1 = test_socket (AF_SP, NN_PUB);
00188 test_bind (pub1, SOCKET_ADDRESS);
00189 sub1 = test_socket (AF_SP, NN_SUB);
00190 rc = nn_setsockopt (sub1, NN_SUB, NN_SUB_SUBSCRIBE, "", 0);
00191
00192 sz = sizeof (buf);
00193 rc = nn_getsockopt (sub1, NN_SUB, NN_SUB_SUBSCRIBE, buf, &sz);
00194
00195 test_connect (sub1, SOCKET_ADDRESS);
00196 sub2 = test_socket (AF_SP, NN_SUB);
00197 rc = nn_setsockopt (sub2, NN_SUB, NN_SUB_SUBSCRIBE, "", 0);
00198
00199 test_connect (sub2, SOCKET_ADDRESS);
00200
00201
00202 sleep(1);
00203
00204 test_send (pub1, "0123456789012345678901234567890123456789");
00205 test_recv (sub1, "0123456789012345678901234567890123456789");
00206 test_recv (sub2, "0123456789012345678901234567890123456789");
00207
00208 test_close (pub1);
00209 test_close (sub1);
00210 test_close (sub2);
00211
00212
00213
00214 sub1 = test_socket (AF_SP, NN_SUB);
00215 rc = nn_setsockopt (sub1, NN_SUB, NN_SUB_SUBSCRIBE, "", 0);
00216
00217 test_bind (sub1, SOCKET_ADDRESS);
00218 pub1 = test_socket (AF_SP, NN_PUB);
00219 test_connect (pub1, SOCKET_ADDRESS);
00220 pub2 = test_socket (AF_SP, NN_PUB);
00221 test_connect (pub2, SOCKET_ADDRESS);
00222 sleep(1);
00223
00224 test_send (pub1, "0123456789012345678901234567890123456789");
00225 test_send (pub2, "0123456789012345678901234567890123456789");
00226 test_recv (sub1, "0123456789012345678901234567890123456789");
00227 test_recv (sub1, "0123456789012345678901234567890123456789");
00228
00229 test_close (pub2);
00230 test_close (pub1);
00231 test_close (sub1);
00232 }
00233
00234 int main ()
00235 {
00236 test_pubsub();
00237 return 0;
00238 }