iproc.cpp
Go to the documentation of this file.
1 
2 #include <nanomsg/nn.h>
3 #include <nanomsg/pair.h>
4 #include <nanomsg/pubsub.h>
5 #include <nanomsg/inproc.h>
6 #include <cstring>
7 #include <iostream>
8 #include <cstdlib>
9 #include <unistd.h>
10 #define SOCKET_ADDRESS "inproc://test"
11 
12 static int test_socket (int family, int protocol)
13 {
14  int sock;
15 
16  sock = nn_socket (family, protocol);
17  if (sock == -1) {
18  std::cout << "Error opening socket" << std::endl;
19  }
20  return sock;
21 }
22 
23 static int test_connect (int sock, const char *address)
24 {
25  int rc;
26  rc = nn_connect (sock, address);
27  if(rc < 0) {
28  std::cout << "Error on connect"<< std::endl;
29  }
30  return rc;
31 }
32 static int test_bind(int sock, const char *address)
33 {
34  int rc;
35 
36  rc = nn_bind (sock, address);
37  if(rc < 0) {
38  std::cout << "Error on bind"<< std::endl;
39  }
40  return rc;
41 }
42 static void test_send (int sock, const char *data)
43 {
44  size_t data_len;
45  int rc;
46 
47  data_len = strlen(data);
48 
49  std::cout << "Send: " << data << std::endl;
50  rc = nn_send (sock, data, data_len, 0);
51  if (rc < 0) {
52  std::cout << "Error on send"<< std::endl;
53  }
54  if (rc != (int)data_len) {
55  std::cout << "error on data to send"<< std::endl;
56  }
57 }
58 static void test_recv (int sock, const char *data)
59 {
60  size_t data_len;
61  int rc;
62  char *buf;
63 
64  data_len = strlen(data);
65  /* We allocate plus one byte so that we are sure that message received
66  has corrent length and not truncated */
67  buf = (char*)malloc(data_len+1);
68  //alloc_assert (buf);
69 
70  rc = nn_recv (sock, buf, data_len+1, 0);
71  if (rc < 0) {
72  std::cout << "Error on receive"<< std::endl;
73  }
74  if (rc != (int)data_len) {
75  std::cout << "Error on data wrong length"<< std::endl;
76  }
77  if (memcmp (data, buf, data_len) != 0) {
78  std::cout << "Error on received data is wrong"<< std::endl;
79  }
80  std::cout << "Receive: " << buf<< std::endl;
81 
82  free (buf);
83 }
84 
85 static void test_close (int sock)
86 {
87  int rc;
88 
89  rc = nn_close (sock);
90  if (rc != 0) {
91  std::cout << "Error on close"<< std::endl;
92  }
93 }
94 
95 void test_pair() {
96  int rc;
97  int sb;
98  int sc;
99  int i;
100  char buf [256];
101  int val;
102 
103  /* Create a simple topology. */
104  sc = test_socket (AF_SP, NN_PAIR);
106  sb = test_socket (AF_SP, NN_PAIR);
108 
109  /* Try a duplicate bind. It should fail. */
110  rc = nn_bind (sc, SOCKET_ADDRESS);
111  //nn_assert (rc < 0 && errno == EADDRINUSE);
112 
113  /* Ping-pong test. */
114  for (i = 0; i != 100; ++i) {
115 
116  test_send (sc, "ABC");
117  test_recv (sb, "ABC");
118  test_send (sb, "DEFG");
119  test_recv (sc, "DEFG");
120  }
121 
122  /* Batch transfer test. */
123  for (i = 0; i != 100; ++i) {
124  test_send (sc, "XYZ");
125  }
126  for (i = 0; i != 100; ++i) {
127  test_recv (sb, "XYZ");
128  }
129 
130  test_close (sc);
131  test_close (sb);
132 
133  /* Test whether queue limits are observed. */
134  sb = test_socket (AF_SP, NN_PAIR);
135  val = 200;
136  rc = nn_setsockopt (sb, NN_SOL_SOCKET, NN_RCVBUF, &val, sizeof (val));
137  //errno_assert (rc == 0);
139  sc = test_socket (AF_SP, NN_PAIR);
141 
142  val = 200;
143  rc = nn_setsockopt (sc, NN_SOL_SOCKET, NN_SNDTIMEO, &val, sizeof (val));
144  //errno_assert (rc == 0);
145  i = 0;
146  while (1) {
147  rc = nn_send (sc, "0123456789", 10, 0);
148  if (rc < 0 && nn_errno () == EAGAIN)
149  break;
150  //errno_assert (rc >= 0);
151  //nn_assert (rc == 10);
152  ++i;
153  }
154  //nn_assert (i == 20);
155  test_recv (sb, "0123456789");
156  test_send (sc, "0123456789");
157  rc = nn_send (sc, "0123456789", 10, 0);
158  //nn_assert (rc < 0 && nn_errno () == EAGAIN);
159  for (i = 0; i != 20; ++i) {
160  test_recv (sb, "0123456789");
161  }
162 
163  /* Make sure that even a message that doesn't fit into the buffers
164  gets across. */
165  for (i = 0; i != sizeof (buf); ++i)
166  buf [i] = 'A';
167  rc = nn_send (sc, buf, 256, 0);
168  //errno_assert (rc >= 0);
169  //nn_assert (rc == 256);
170  rc = nn_recv (sb, buf, sizeof (buf), 0);
171  //errno_assert (rc >= 0);
172  //nn_assert (rc == 256);
173 
174  test_close (sc);
175  test_close (sb);
176 }
177 
178 void test_pubsub() {
179  int rc;
180  int pub1;
181  int pub2;
182  int sub1;
183  int sub2;
184  char buf [8];
185  size_t sz;
186 
187  pub1 = test_socket (AF_SP, NN_PUB);
188  test_bind (pub1, SOCKET_ADDRESS);
189  sub1 = test_socket (AF_SP, NN_SUB);
190  rc = nn_setsockopt (sub1, NN_SUB, NN_SUB_SUBSCRIBE, "", 0);
191  //errno_assert (rc == 0);
192  sz = sizeof (buf);
193  rc = nn_getsockopt (sub1, NN_SUB, NN_SUB_SUBSCRIBE, buf, &sz);
194  //nn_assert (rc == -1 && nn_errno () == ENOPROTOOPT);
196  sub2 = test_socket (AF_SP, NN_SUB);
197  rc = nn_setsockopt (sub2, NN_SUB, NN_SUB_SUBSCRIBE, "", 0);
198  //errno_assert (rc == 0);
200 
201  /* Wait till connections are established to prevent message loss. */
202  sleep(1); //nn_sleep (10);
203 
204  test_send (pub1, "0123456789012345678901234567890123456789");
205  test_recv (sub1, "0123456789012345678901234567890123456789");
206  test_recv (sub2, "0123456789012345678901234567890123456789");
207 
208  test_close (pub1);
209  test_close (sub1);
210  test_close (sub2);
211 
212  /* Check receiving messages from two publishers. */
213 
214  sub1 = test_socket (AF_SP, NN_SUB);
215  rc = nn_setsockopt (sub1, NN_SUB, NN_SUB_SUBSCRIBE, "", 0);
216  //rrno_assert (rc == 0);
217  test_bind (sub1, SOCKET_ADDRESS);
218  pub1 = test_socket (AF_SP, NN_PUB);
220  pub2 = test_socket (AF_SP, NN_PUB);
222  sleep(1); //nn_sleep (100);
223 
224  test_send (pub1, "0123456789012345678901234567890123456789");
225  test_send (pub2, "0123456789012345678901234567890123456789");
226  test_recv (sub1, "0123456789012345678901234567890123456789");
227  test_recv (sub1, "0123456789012345678901234567890123456789");
228 
229  test_close (pub2);
230  test_close (pub1);
231  test_close (sub1);
232 }
233 
234 int main ()
235 {
236  test_pubsub();
237  return 0;
238 }
void test_pubsub()
Definition: iproc.cpp:178
static void test_send(int sock, const char *data)
Definition: iproc.cpp:42
static int test_bind(int sock, const char *address)
Definition: iproc.cpp:32
int main()
Definition: iproc.cpp:234
static void test_recv(int sock, const char *data)
Definition: iproc.cpp:58
static int test_connect(int sock, const char *address)
Definition: iproc.cpp:23
void test_pair()
Definition: iproc.cpp:95
#define SOCKET_ADDRESS
Definition: iproc.cpp:10
static int test_socket(int family, int protocol)
Definition: iproc.cpp:12
static void test_close(int sock)
Definition: iproc.cpp:85


mm_mux_demux
Author(s): Daniel Stonier
autogenerated on Mon Jun 10 2019 13:52:14