iproc.cpp
Go to the documentation of this file.
00001 
00002 #include <nanomsg/nn.h>
00003 #include <nanomsg/pair.h>
00004 #include <nanomsg/pubsub.h>
00005 #include <nanomsg/inproc.h>
00006 #include <cstring>
00007 #include <iostream>
00008 #include <cstdlib>
00009 #include <unistd.h>
00010 #define SOCKET_ADDRESS "inproc://test"
00011 
00012 static int test_socket (int family, int protocol)
00013 {
00014     int sock;
00015 
00016     sock = nn_socket (family, protocol);
00017     if (sock == -1) {
00018       std::cout << "Error opening socket" << std::endl;
00019     }
00020     return sock;
00021 }
00022 
00023 static int test_connect (int sock, const char *address)
00024 {
00025     int rc;
00026     rc = nn_connect (sock, address);
00027     if(rc < 0) {
00028       std::cout << "Error on connect"<< std::endl;
00029     }
00030     return rc;
00031 }
00032 static int test_bind(int sock, const char *address)
00033 {
00034     int rc;
00035 
00036     rc = nn_bind (sock, address);
00037     if(rc < 0) {
00038       std::cout << "Error on bind"<< std::endl;
00039     }
00040     return rc;
00041 }
00042 static void test_send (int sock, const char *data)
00043 {
00044     size_t data_len;
00045     int rc;
00046 
00047     data_len = strlen(data);
00048 
00049     std::cout << "Send: " << data << std::endl;
00050     rc = nn_send (sock, data, data_len, 0);
00051     if (rc < 0) {
00052       std::cout << "Error on send"<< std::endl;
00053     }
00054     if (rc != (int)data_len) {
00055       std::cout << "error on data to send"<< std::endl;
00056     }
00057 }
00058 static void test_recv (int sock, const char *data)
00059 {
00060     size_t data_len;
00061     int rc;
00062     char *buf;
00063 
00064     data_len = strlen(data);
00065     /*  We allocate plus one byte so that we are sure that message received
00066         has corrent length and not truncated  */
00067     buf = (char*)malloc(data_len+1);
00068     //alloc_assert (buf);
00069 
00070     rc = nn_recv (sock, buf, data_len+1, 0);
00071     if (rc < 0) {
00072       std::cout << "Error on receive"<< std::endl;
00073     }
00074     if (rc != (int)data_len) {
00075       std::cout << "Error on data wrong length"<< std::endl;
00076     }
00077     if (memcmp (data, buf, data_len) != 0) {
00078       std::cout << "Error on received data is wrong"<< std::endl;
00079     }
00080     std::cout << "Receive: " << buf<< std::endl;
00081 
00082     free (buf);
00083 }
00084 
00085 static void test_close (int sock)
00086 {
00087     int rc;
00088 
00089     rc = nn_close (sock);
00090     if (rc != 0) {
00091       std::cout << "Error on close"<< std::endl;
00092     }
00093 }
00094 
00095 void test_pair() {
00096   int rc;
00097   int sb;
00098   int sc;
00099   int i;
00100   char buf [256];
00101   int val;
00102 
00103   /*  Create a simple topology. */
00104   sc = test_socket (AF_SP, NN_PAIR);
00105   test_connect (sc, SOCKET_ADDRESS);
00106   sb = test_socket (AF_SP, NN_PAIR);
00107   test_bind (sb, SOCKET_ADDRESS);
00108 
00109   /*  Try a duplicate bind. It should fail. */
00110   rc = nn_bind (sc, SOCKET_ADDRESS);
00111   //nn_assert (rc < 0 && errno == EADDRINUSE);
00112 
00113   /*  Ping-pong test. */
00114   for (i = 0; i != 100; ++i) {
00115 
00116       test_send (sc, "ABC");
00117       test_recv (sb, "ABC");
00118       test_send (sb, "DEFG");
00119       test_recv (sc, "DEFG");
00120   }
00121 
00122   /*  Batch transfer test. */
00123   for (i = 0; i != 100; ++i) {
00124       test_send (sc, "XYZ");
00125   }
00126   for (i = 0; i != 100; ++i) {
00127       test_recv (sb, "XYZ");
00128   }
00129 
00130   test_close (sc);
00131   test_close (sb);
00132 
00133   /*  Test whether queue limits are observed. */
00134   sb = test_socket (AF_SP, NN_PAIR);
00135   val = 200;
00136   rc = nn_setsockopt (sb, NN_SOL_SOCKET, NN_RCVBUF, &val, sizeof (val));
00137   //errno_assert (rc == 0);
00138   test_bind (sb, SOCKET_ADDRESS);
00139   sc = test_socket (AF_SP, NN_PAIR);
00140   test_connect (sc, SOCKET_ADDRESS);
00141 
00142   val = 200;
00143   rc = nn_setsockopt (sc, NN_SOL_SOCKET, NN_SNDTIMEO, &val, sizeof (val));
00144   //errno_assert (rc == 0);
00145   i = 0;
00146   while (1) {
00147       rc = nn_send (sc, "0123456789", 10, 0);
00148       if (rc < 0 && nn_errno () == EAGAIN)
00149           break;
00150       //errno_assert (rc >= 0);
00151       //nn_assert (rc == 10);
00152       ++i;
00153   }
00154   //nn_assert (i == 20);
00155   test_recv (sb, "0123456789");
00156   test_send (sc, "0123456789");
00157   rc = nn_send (sc, "0123456789", 10, 0);
00158   //nn_assert (rc < 0 && nn_errno () == EAGAIN);
00159   for (i = 0; i != 20; ++i) {
00160       test_recv (sb, "0123456789");
00161   }
00162 
00163   /*  Make sure that even a message that doesn't fit into the buffers
00164       gets across. */
00165   for (i = 0; i != sizeof (buf); ++i)
00166       buf [i] = 'A';
00167   rc = nn_send (sc, buf, 256, 0);
00168   //errno_assert (rc >= 0);
00169   //nn_assert (rc == 256);
00170   rc = nn_recv (sb, buf, sizeof (buf), 0);
00171   //errno_assert (rc >= 0);
00172   //nn_assert (rc == 256);
00173 
00174   test_close (sc);
00175   test_close (sb);
00176 }
00177 
00178 void test_pubsub() {
00179   int rc;
00180       int pub1;
00181       int pub2;
00182       int sub1;
00183       int sub2;
00184       char buf [8];
00185       size_t sz;
00186 
00187       pub1 = test_socket (AF_SP, NN_PUB);
00188       test_bind (pub1, SOCKET_ADDRESS);
00189       sub1 = test_socket (AF_SP, NN_SUB);
00190       rc = nn_setsockopt (sub1, NN_SUB, NN_SUB_SUBSCRIBE, "", 0);
00191       //errno_assert (rc == 0);
00192       sz = sizeof (buf);
00193       rc = nn_getsockopt (sub1, NN_SUB, NN_SUB_SUBSCRIBE, buf, &sz);
00194       //nn_assert (rc == -1 && nn_errno () == ENOPROTOOPT);
00195       test_connect (sub1, SOCKET_ADDRESS);
00196       sub2 = test_socket (AF_SP, NN_SUB);
00197       rc = nn_setsockopt (sub2, NN_SUB, NN_SUB_SUBSCRIBE, "", 0);
00198       //errno_assert (rc == 0);
00199       test_connect (sub2, SOCKET_ADDRESS);
00200 
00201       /*  Wait till connections are established to prevent message loss. */
00202       sleep(1); //nn_sleep (10);
00203 
00204       test_send (pub1, "0123456789012345678901234567890123456789");
00205       test_recv (sub1, "0123456789012345678901234567890123456789");
00206       test_recv (sub2, "0123456789012345678901234567890123456789");
00207 
00208       test_close (pub1);
00209       test_close (sub1);
00210       test_close (sub2);
00211 
00212       /*  Check receiving messages from two publishers. */
00213 
00214       sub1 = test_socket (AF_SP, NN_SUB);
00215       rc = nn_setsockopt (sub1, NN_SUB, NN_SUB_SUBSCRIBE, "", 0);
00216       //rrno_assert (rc == 0);
00217       test_bind (sub1, SOCKET_ADDRESS);
00218       pub1 = test_socket (AF_SP, NN_PUB);
00219       test_connect (pub1, SOCKET_ADDRESS);
00220       pub2 = test_socket (AF_SP, NN_PUB);
00221       test_connect (pub2, SOCKET_ADDRESS);
00222       sleep(1); //nn_sleep (100);
00223 
00224       test_send (pub1, "0123456789012345678901234567890123456789");
00225       test_send (pub2, "0123456789012345678901234567890123456789");
00226       test_recv (sub1, "0123456789012345678901234567890123456789");
00227       test_recv (sub1, "0123456789012345678901234567890123456789");
00228 
00229       test_close (pub2);
00230       test_close (pub1);
00231       test_close (sub1);
00232 }
00233 
00234 int main ()
00235 {
00236   test_pubsub();
00237     return 0;
00238 }


mm_mux_demux
Author(s): Daniel Stonier
autogenerated on Thu Jun 6 2019 21:13:22