nanomsg_pipeline.cpp
Go to the documentation of this file.
1 
4 /*****************************************************************************
5 ** Includes
6 *****************************************************************************/
7 
8 #include <assert.h>
9 #include <cstdio>
10 #include <ctime>
11 #include <cstring>
12 #include <unistd.h>
13 #include <nanomsg/nn.h>
14 #include <nanomsg/pipeline.h>
15 #include <iostream>
16 
17 #define SOCKET_ADDRESS "inproc://a"
18 
19 
20 int main ()
21 {
22  int push1;
23  int push2;
24  int pull1;
25  int pull2;
26 
27  /* Test fan-out. */
28 
29  // these were originally test_xxx from https://github.com/nanomsg/nanomsg/blob/master/tests/testutil.h
30  // only difference is they had error checking
31  push1 = nn_socket(AF_SP, NN_PUSH);
32  nn_bind(push1, SOCKET_ADDRESS);
33  pull1 = nn_socket(AF_SP, NN_PULL);
34  nn_connect (pull1, SOCKET_ADDRESS);
35  pull2 = nn_socket (AF_SP, NN_PULL);
36  nn_connect (pull2, SOCKET_ADDRESS);
37 
38  /* Wait till both connections are established to get messages spread
39  evenly between the two pull sockets. */
40  //nn_sleep(10);
41  sleep(1);
42 
43  int length = strlen("ABC");
44  //test_send (push1, "ABC");
45  nn_send(push1, "ABC", length, 0);
46  //test_send (push1, "DEF");
47  nn_send(push1, "DEF", length, 0);
48 
49  //test_recv (pull1, "ABC");
50  //test_recv (pull2, "DEF");
51  char *buf = NULL;
52  int bytes;
53  bytes = nn_recv(pull1, &buf, NN_MSG, 0);
54  assert (bytes >= 0);
55  printf ("PULL1: RECEIVED %s\n", buf);
56  bytes = nn_recv(pull2, &buf, NN_MSG, 0);
57  assert (bytes >= 0);
58  printf ("PULL2: RECEIVED %s\n", buf);
59 
60  nn_close(push1);
61  nn_close(pull1);
62  nn_close(pull2);
63 
64  /* Test fan-in. */
65 
66  pull1 = nn_socket (AF_SP, NN_PULL);
67  nn_bind(pull1, SOCKET_ADDRESS);
68  push1 = nn_socket (AF_SP, NN_PUSH);
69  nn_connect(push1, SOCKET_ADDRESS);
70  push2 = nn_socket(AF_SP, NN_PUSH);
71  nn_connect(push2, SOCKET_ADDRESS);
72 
73  nn_send (push1, "ABC", length, 0);
74  nn_send (push2, "DEF", length, 0);
75 
76  //test_recv (pull1, "ABC");
77  //test_recv (pull1, "DEF");
78  bytes = nn_recv (pull1, &buf, NN_MSG, 0);
79  assert (bytes >= 0);
80  printf ("PULL1: RECEIVED %s\n", buf);
81  bytes = nn_recv (pull1, &buf, NN_MSG, 0);
82  assert (bytes >= 0);
83  printf ("PULL2: RECEIVED %s\n", buf);
84 
85  nn_close (pull1);
86  nn_close (push1);
87  nn_close (push2);
88 
89  return 0;
90 }
#define SOCKET_ADDRESS
int main()


mm_mux_demux
Author(s): Daniel Stonier
autogenerated on Mon Jun 10 2019 13:52:14