nanomsg_pipeline.cpp
Go to the documentation of this file.
00001 
00004 /*****************************************************************************
00005 ** Includes
00006 *****************************************************************************/
00007 
00008 #include <assert.h>
00009 #include <cstdio>
00010 #include <ctime>
00011 #include <cstring>
00012 #include <unistd.h>
00013 #include <nanomsg/nn.h>
00014 #include <nanomsg/pipeline.h>
00015 #include <iostream>
00016 
00017 #define SOCKET_ADDRESS "inproc://a"
00018 
00019 
00020 int main ()
00021 {
00022     int push1;
00023     int push2;
00024     int pull1;
00025     int pull2;
00026 
00027     /*  Test fan-out. */
00028 
00029     // these were originally test_xxx from https://github.com/nanomsg/nanomsg/blob/master/tests/testutil.h
00030     // only difference is they had error checking
00031     push1 = nn_socket(AF_SP, NN_PUSH);
00032     nn_bind(push1, SOCKET_ADDRESS);
00033     pull1 = nn_socket(AF_SP, NN_PULL);
00034     nn_connect (pull1, SOCKET_ADDRESS);
00035     pull2 = nn_socket (AF_SP, NN_PULL);
00036     nn_connect (pull2, SOCKET_ADDRESS);
00037 
00038     /*  Wait till both connections are established to get messages spread
00039         evenly between the two pull sockets. */
00040     //nn_sleep(10);
00041     sleep(1);
00042 
00043     int length = strlen("ABC");
00044     //test_send (push1, "ABC");
00045     nn_send(push1, "ABC", length, 0);
00046     //test_send (push1, "DEF");
00047     nn_send(push1, "DEF", length, 0);
00048 
00049     //test_recv (pull1, "ABC");
00050     //test_recv (pull2, "DEF");
00051     char *buf = NULL;
00052     int bytes;
00053     bytes = nn_recv(pull1, &buf, NN_MSG, 0);
00054     assert (bytes >= 0);
00055     printf ("PULL1: RECEIVED %s\n", buf);
00056     bytes = nn_recv(pull2, &buf, NN_MSG, 0);
00057     assert (bytes >= 0);
00058     printf ("PULL2: RECEIVED %s\n", buf);
00059 
00060     nn_close(push1);
00061     nn_close(pull1);
00062     nn_close(pull2);
00063 
00064     /*  Test fan-in. */
00065 
00066     pull1 = nn_socket (AF_SP, NN_PULL);
00067     nn_bind(pull1, SOCKET_ADDRESS);
00068     push1 = nn_socket (AF_SP, NN_PUSH);
00069     nn_connect(push1, SOCKET_ADDRESS);
00070     push2 = nn_socket(AF_SP, NN_PUSH);
00071     nn_connect(push2, SOCKET_ADDRESS);
00072 
00073     nn_send (push1, "ABC", length, 0);
00074     nn_send (push2, "DEF", length, 0);
00075 
00076     //test_recv (pull1, "ABC");
00077     //test_recv (pull1, "DEF");
00078     bytes = nn_recv (pull1, &buf, NN_MSG, 0);
00079     assert (bytes >= 0);
00080     printf ("PULL1: RECEIVED %s\n", buf);
00081     bytes = nn_recv (pull1, &buf, NN_MSG, 0);
00082     assert (bytes >= 0);
00083     printf ("PULL2: RECEIVED %s\n", buf);
00084 
00085     nn_close (pull1);
00086     nn_close (push1);
00087     nn_close (push2);
00088 
00089     return 0;
00090 }


mm_mux_demux
Author(s): Daniel Stonier
autogenerated on Thu Jun 6 2019 21:13:22