proxy.cpp
Go to the documentation of this file.
1 /* SPDX-License-Identifier: MPL-2.0 */
2 
3 #include "precompiled.hpp"
4 
5 #include <stddef.h>
6 #include "poller.hpp"
7 #include "proxy.hpp"
8 #include "likely.hpp"
9 #include "msg.hpp"
10 
11 #if defined ZMQ_POLL_BASED_ON_POLL && !defined ZMQ_HAVE_WINDOWS \
12  && !defined ZMQ_HAVE_AIX
13 #include <poll.h>
14 #endif
15 
16 // These headers end up pulling in zmq.h somewhere in their include
17 // dependency chain
18 #include "socket_base.hpp"
19 #include "err.hpp"
20 
21 int zmq::proxy (class socket_base_t *frontend_,
22  class socket_base_t *backend_,
23  class socket_base_t *capture_)
24 {
25  return zmq::proxy_steerable (frontend_, backend_, capture_, NULL);
26 }
27 
28 #ifdef ZMQ_HAVE_POLLER
29 
30 #include "socket_poller.hpp"
31 
32 // Macros for repetitive code.
33 
34 // PROXY_CLEANUP() must not be used before these variables are initialized.
35 #define PROXY_CLEANUP() \
36  do { \
37  delete poller_all; \
38  delete poller_in; \
39  delete poller_receive_blocked; \
40  delete poller_send_blocked; \
41  delete poller_both_blocked; \
42  delete poller_frontend_only; \
43  delete poller_backend_only; \
44  } while (false)
45 
46 
47 #define CHECK_RC_EXIT_ON_FAILURE() \
48  do { \
49  if (rc < 0) { \
50  PROXY_CLEANUP (); \
51  return close_and_return (&msg, -1); \
52  } \
53  } while (false)
54 
55 #endif // ZMQ_HAVE_POLLER
56 
57 static int
58 capture (class zmq::socket_base_t *capture_, zmq::msg_t *msg_, int more_ = 0)
59 {
60  // Copy message to capture socket if any
61  if (capture_) {
62  zmq::msg_t ctrl;
63  int rc = ctrl.init ();
64  if (unlikely (rc < 0))
65  return -1;
66  rc = ctrl.copy (*msg_);
67  if (unlikely (rc < 0))
68  return -1;
69  rc = capture_->send (&ctrl, more_ ? ZMQ_SNDMORE : 0);
70  if (unlikely (rc < 0))
71  return -1;
72  }
73  return 0;
74 }
75 
77 {
78  uint64_t count, bytes;
79 };
81 {
83 };
85 {
87 };
88 
89 static int forward (class zmq::socket_base_t *from_,
90  class zmq::socket_base_t *to_,
91  class zmq::socket_base_t *capture_,
92  zmq::msg_t *msg_,
93  stats_socket &recving,
94  stats_socket &sending)
95 {
96  // Forward a burst of messages
97  for (unsigned int i = 0; i < zmq::proxy_burst_size; i++) {
98  int more;
99  size_t moresz;
100 
101  // Forward all the parts of one message
102  while (true) {
103  int rc = from_->recv (msg_, ZMQ_DONTWAIT);
104  if (rc < 0) {
105  if (likely (errno == EAGAIN && i > 0))
106  return 0; // End of burst
107 
108  return -1;
109  }
110 
111  size_t nbytes = msg_->size ();
112  recving.count += 1;
113  recving.bytes += nbytes;
114 
115  moresz = sizeof more;
116  rc = from_->getsockopt (ZMQ_RCVMORE, &more, &moresz);
117  if (unlikely (rc < 0))
118  return -1;
119 
120  // Copy message to capture socket if any
121  rc = capture (capture_, msg_, more);
122  if (unlikely (rc < 0))
123  return -1;
124 
125  rc = to_->send (msg_, more ? ZMQ_SNDMORE : 0);
126  if (unlikely (rc < 0))
127  return -1;
128  sending.count += 1;
129  sending.bytes += nbytes;
130 
131  if (more == 0)
132  break;
133  }
134  }
135 
136  return 0;
137 }
138 
140 {
144 };
145 
146 // Handle control request [5]PAUSE, [6]RESUME, [9]TERMINATE,
147 // [10]STATISTICS. Only STATISTICS results in a send.
148 static int handle_control (class zmq::socket_base_t *control_,
149  proxy_state_t &state,
150  const stats_proxy &stats)
151 {
152  zmq::msg_t cmsg;
153  int rc = cmsg.init ();
154  if (rc != 0) {
155  return -1;
156  }
157  rc = control_->recv (&cmsg, ZMQ_DONTWAIT);
158  if (rc < 0) {
159  return -1;
160  }
161  uint8_t *const command = static_cast<uint8_t *> (cmsg.data ());
162  const size_t msiz = cmsg.size ();
163 
164  if (msiz == 10 && 0 == memcmp (command, "STATISTICS", 10)) {
165  // The stats are a cross product:
166  //
167  // (Front,Back) X (Recv,Sent) X (Number,Bytes).
168  //
169  // that is flattened into sequence of 8 message parts according to the
170  // zmq_proxy_steerable(3) documentation as:
171  //
172  // (frn, frb, fsn, fsb, brn, brb, bsn, bsb)
173  //
174  // f=front/b=back, r=recv/s=send, n=number/b=bytes.
175  const uint64_t stat_vals[8] = {
176  stats.frontend.recv.count, stats.frontend.recv.bytes,
177  stats.frontend.send.count, stats.frontend.send.bytes,
178  stats.backend.recv.count, stats.backend.recv.bytes,
179  stats.backend.send.count, stats.backend.send.bytes};
180 
181  for (size_t ind = 0; ind < 8; ++ind) {
182  cmsg.init_size (sizeof (uint64_t));
183  memcpy (cmsg.data (), stat_vals + ind, sizeof (uint64_t));
184  rc = control_->send (&cmsg, ind < 7 ? ZMQ_SNDMORE : 0);
185  if (unlikely (rc < 0)) {
186  return -1;
187  }
188  }
189  return 0;
190  }
191 
192  if (msiz == 5 && 0 == memcmp (command, "PAUSE", 5)) {
193  state = paused;
194  } else if (msiz == 6 && 0 == memcmp (command, "RESUME", 6)) {
195  state = active;
196  } else if (msiz == 9 && 0 == memcmp (command, "TERMINATE", 9)) {
197  state = terminated;
198  }
199 
200  int type;
201  size_t sz = sizeof (type);
202  zmq_getsockopt (control_, ZMQ_TYPE, &type, &sz);
203  if (type == ZMQ_REP) {
204  // satisfy REP duty and reply no matter what.
205  cmsg.init_size (0);
206  rc = control_->send (&cmsg, 0);
207  if (unlikely (rc < 0)) {
208  return -1;
209  }
210  }
211  return 0;
212 }
213 
214 #ifdef ZMQ_HAVE_POLLER
215 int zmq::proxy_steerable (class socket_base_t *frontend_,
216  class socket_base_t *backend_,
217  class socket_base_t *capture_,
218  class socket_base_t *control_)
219 {
220  msg_t msg;
221  int rc = msg.init ();
222  if (rc != 0)
223  return -1;
224 
225  // The algorithm below assumes ratio of requests and replies processed
226  // under full load to be 1:1.
227 
228  // Proxy can be in these three states
229  proxy_state_t state = active;
230 
231  bool frontend_equal_to_backend;
232  bool frontend_in = false;
233  bool frontend_out = false;
234  bool backend_in = false;
235  bool backend_out = false;
237  int nevents = 3; // increase to 4 if we have control_
238 
239  stats_proxy stats = {{{0, 0}, {0, 0}}, {{0, 0}, {0, 0}}};
240 
241  // Don't allocate these pollers from stack because they will take more than 900 kB of stack!
242  // On Windows this blows up default stack of 1 MB and aborts the program.
243  // I wanted to use std::shared_ptr here as the best solution but that requires C++11...
244  zmq::socket_poller_t *poller_all =
245  new (std::nothrow) zmq::socket_poller_t; // Poll for everything.
246  zmq::socket_poller_t *poller_in = new (std::nothrow) zmq::
247  socket_poller_t; // Poll only 'ZMQ_POLLIN' on all sockets. Initial blocking poll in loop.
248  zmq::socket_poller_t *poller_receive_blocked = new (std::nothrow)
249  zmq::socket_poller_t; // All except 'ZMQ_POLLIN' on 'frontend_'.
250 
251  // If frontend_==backend_ 'poller_send_blocked' and 'poller_receive_blocked' are the same, 'ZMQ_POLLIN' is ignored.
252  // In that case 'poller_send_blocked' is not used. We need only 'poller_receive_blocked'.
253  // We also don't need 'poller_both_blocked', 'poller_backend_only' nor 'poller_frontend_only' no need to initialize it.
254  // We save some RAM and time for initialization.
255  zmq::socket_poller_t *poller_send_blocked =
256  NULL; // All except 'ZMQ_POLLIN' on 'backend_'.
257  zmq::socket_poller_t *poller_both_blocked =
258  NULL; // All except 'ZMQ_POLLIN' on both 'frontend_' and 'backend_'.
259  zmq::socket_poller_t *poller_frontend_only =
260  NULL; // Only 'ZMQ_POLLIN' and 'ZMQ_POLLOUT' on 'frontend_'.
261  zmq::socket_poller_t *poller_backend_only =
262  NULL; // Only 'ZMQ_POLLIN' and 'ZMQ_POLLOUT' on 'backend_'.
263 
264  if (frontend_ != backend_) {
265  poller_send_blocked = new (std::nothrow)
266  zmq::socket_poller_t; // All except 'ZMQ_POLLIN' on 'backend_'.
267  poller_both_blocked = new (std::nothrow) zmq::
268  socket_poller_t; // All except 'ZMQ_POLLIN' on both 'frontend_' and 'backend_'.
269  poller_frontend_only = new (std::nothrow) zmq::
270  socket_poller_t; // Only 'ZMQ_POLLIN' and 'ZMQ_POLLOUT' on 'frontend_'.
271  poller_backend_only = new (std::nothrow) zmq::
272  socket_poller_t; // Only 'ZMQ_POLLIN' and 'ZMQ_POLLOUT' on 'backend_'.
273  frontend_equal_to_backend = false;
274  } else
275  frontend_equal_to_backend = true;
276 
277  if (poller_all == NULL || poller_in == NULL
278  || poller_receive_blocked == NULL
279  || ((poller_send_blocked == NULL || poller_both_blocked == NULL)
280  && !frontend_equal_to_backend)) {
281  PROXY_CLEANUP ();
282  return close_and_return (&msg, -1);
283  }
284 
285  zmq::socket_poller_t *poller_wait =
286  poller_in; // Poller for blocking wait, initially all 'ZMQ_POLLIN'.
287 
288  // Register 'frontend_' and 'backend_' with pollers.
289  rc = poller_all->add (frontend_, NULL,
290  ZMQ_POLLIN | ZMQ_POLLOUT); // Everything.
292  rc = poller_in->add (frontend_, NULL, ZMQ_POLLIN); // All 'ZMQ_POLLIN's.
294 
295  if (frontend_equal_to_backend) {
296  // If frontend_==backend_ 'poller_send_blocked' and 'poller_receive_blocked' are the same,
297  // so we don't need 'poller_send_blocked'. We need only 'poller_receive_blocked'.
298  // We also don't need 'poller_both_blocked', no need to initialize it.
299  rc = poller_receive_blocked->add (frontend_, NULL, ZMQ_POLLOUT);
301  } else {
302  rc = poller_all->add (backend_, NULL,
303  ZMQ_POLLIN | ZMQ_POLLOUT); // Everything.
305  rc = poller_in->add (backend_, NULL, ZMQ_POLLIN); // All 'ZMQ_POLLIN's.
307  rc = poller_both_blocked->add (
308  frontend_, NULL, ZMQ_POLLOUT); // Waiting only for 'ZMQ_POLLOUT'.
310  rc = poller_both_blocked->add (
311  backend_, NULL, ZMQ_POLLOUT); // Waiting only for 'ZMQ_POLLOUT'.
313  rc = poller_send_blocked->add (
314  backend_, NULL,
315  ZMQ_POLLOUT); // All except 'ZMQ_POLLIN' on 'backend_'.
317  rc = poller_send_blocked->add (
318  frontend_, NULL,
319  ZMQ_POLLIN | ZMQ_POLLOUT); // All except 'ZMQ_POLLIN' on 'backend_'.
321  rc = poller_receive_blocked->add (
322  frontend_, NULL,
323  ZMQ_POLLOUT); // All except 'ZMQ_POLLIN' on 'frontend_'.
325  rc = poller_receive_blocked->add (
326  backend_, NULL,
327  ZMQ_POLLIN | ZMQ_POLLOUT); // All except 'ZMQ_POLLIN' on 'frontend_'.
329  rc =
330  poller_frontend_only->add (frontend_, NULL, ZMQ_POLLIN | ZMQ_POLLOUT);
332  rc =
333  poller_backend_only->add (backend_, NULL, ZMQ_POLLIN | ZMQ_POLLOUT);
335  }
336 
337  if (control_) {
338  ++nevents;
339 
340  // wherever you go, there you are.
341 
342  rc = poller_all->add (control_, NULL, ZMQ_POLLIN);
344 
345  rc = poller_in->add (control_, NULL, ZMQ_POLLIN);
347 
348  rc = poller_receive_blocked->add (control_, NULL, ZMQ_POLLIN);
350 
351  rc = poller_send_blocked->add (control_, NULL, ZMQ_POLLIN);
353 
354  rc = poller_both_blocked->add (control_, NULL, ZMQ_POLLIN);
356 
357  rc = poller_frontend_only->add (control_, NULL, ZMQ_POLLIN);
359 
360  rc = poller_backend_only->add (control_, NULL, ZMQ_POLLIN);
362  }
363 
364  bool request_processed = false, reply_processed = false;
365 
366  while (state != terminated) {
367  // Blocking wait initially only for 'ZMQ_POLLIN' - 'poller_wait' points to 'poller_in'.
368  // If one of receiving end's queue is full ('ZMQ_POLLOUT' not available),
369  // 'poller_wait' is pointed to 'poller_receive_blocked', 'poller_send_blocked' or 'poller_both_blocked'.
370  rc = poller_wait->wait (events, nevents, -1);
371  if (rc < 0 && errno == EAGAIN)
372  rc = 0;
374 
375  // Some of events waited for by 'poller_wait' have arrived, now poll for everything without blocking.
376  rc = poller_all->wait (events, nevents, 0);
377  if (rc < 0 && errno == EAGAIN)
378  rc = 0;
380 
381  // Process events.
382  for (int i = 0; i < rc; i++) {
383  if (control_ && events[i].socket == control_) {
384  rc = handle_control (control_, state, stats);
386  continue;
387  }
388 
389  if (events[i].socket == frontend_) {
390  frontend_in = (events[i].events & ZMQ_POLLIN) != 0;
391  frontend_out = (events[i].events & ZMQ_POLLOUT) != 0;
392  } else
393  // This 'if' needs to be after check for 'frontend_' in order never
394  // to be reached in case frontend_==backend_, so we ensure backend_in=false in that case.
395  if (events[i].socket == backend_) {
396  backend_in = (events[i].events & ZMQ_POLLIN) != 0;
397  backend_out = (events[i].events & ZMQ_POLLOUT) != 0;
398  }
399  }
400 
401  if (state == active) {
402  // Process a request, 'ZMQ_POLLIN' on 'frontend_' and 'ZMQ_POLLOUT' on 'backend_'.
403  // In case of frontend_==backend_ there's no 'ZMQ_POLLOUT' event.
404  if (frontend_in && (backend_out || frontend_equal_to_backend)) {
405  rc = forward (frontend_, backend_, capture_, &msg,
406  stats.frontend.recv, stats.backend.send);
408  request_processed = true;
409  frontend_in = backend_out = false;
410  } else
411  request_processed = false;
412 
413  // Process a reply, 'ZMQ_POLLIN' on 'backend_' and 'ZMQ_POLLOUT' on 'frontend_'.
414  // If 'frontend_' and 'backend_' are the same this is not needed because previous processing
415  // covers all of the cases. 'backend_in' is always false if frontend_==backend_ due to
416  // design in 'for' event processing loop.
417  if (backend_in && frontend_out) {
418  rc = forward (backend_, frontend_, capture_, &msg,
419  stats.backend.recv, stats.frontend.send);
421  reply_processed = true;
422  backend_in = frontend_out = false;
423  } else
424  reply_processed = false;
425 
426  if (request_processed || reply_processed) {
427  // If request/reply is processed that means we had at least one 'ZMQ_POLLOUT' event.
428  // Enable corresponding 'ZMQ_POLLIN' for blocking wait if any was disabled.
429  if (poller_wait != poller_in) {
430  if (request_processed) { // 'frontend_' -> 'backend_'
431  if (poller_wait == poller_both_blocked)
432  poller_wait = poller_send_blocked;
433  else if (poller_wait == poller_receive_blocked
434  || poller_wait == poller_frontend_only)
435  poller_wait = poller_in;
436  }
437  if (reply_processed) { // 'backend_' -> 'frontend_'
438  if (poller_wait == poller_both_blocked)
439  poller_wait = poller_receive_blocked;
440  else if (poller_wait == poller_send_blocked
441  || poller_wait == poller_backend_only)
442  poller_wait = poller_in;
443  }
444  }
445  } else {
446  // No requests have been processed, there were no 'ZMQ_POLLIN' with corresponding 'ZMQ_POLLOUT' events.
447  // That means that out queue(s) is/are full or one out queue is full and second one has no messages to process.
448  // Disable receiving 'ZMQ_POLLIN' for sockets for which there's no 'ZMQ_POLLOUT',
449  // or wait only on both 'backend_''s or 'frontend_''s 'ZMQ_POLLIN' and 'ZMQ_POLLOUT'.
450  if (frontend_in) {
451  if (frontend_out)
452  // If frontend_in and frontend_out are true, obviously backend_in and backend_out are both false.
453  // In that case we need to wait for both 'ZMQ_POLLIN' and 'ZMQ_POLLOUT' only on 'backend_'.
454  // We'll never get here in case of frontend_==backend_ because then frontend_out will always be false.
455  poller_wait = poller_backend_only;
456  else {
457  if (poller_wait == poller_send_blocked)
458  poller_wait = poller_both_blocked;
459  else if (poller_wait == poller_in)
460  poller_wait = poller_receive_blocked;
461  }
462  }
463  if (backend_in) {
464  // Will never be reached if frontend_==backend_, 'backend_in' will
465  // always be false due to design in 'for' event processing loop.
466  if (backend_out)
467  // If backend_in and backend_out are true, obviously frontend_in and frontend_out are both false.
468  // In that case we need to wait for both 'ZMQ_POLLIN' and 'ZMQ_POLLOUT' only on 'frontend_'.
469  poller_wait = poller_frontend_only;
470  else {
471  if (poller_wait == poller_receive_blocked)
472  poller_wait = poller_both_blocked;
473  else if (poller_wait == poller_in)
474  poller_wait = poller_send_blocked;
475  }
476  }
477  }
478  }
479  }
480  PROXY_CLEANUP ();
481  return close_and_return (&msg, 0);
482 }
483 
484 #else // ZMQ_HAVE_POLLER
485 
486 int zmq::proxy_steerable (class socket_base_t *frontend_,
487  class socket_base_t *backend_,
488  class socket_base_t *capture_,
489  class socket_base_t *control_)
490 {
491  msg_t msg;
492  int rc = msg.init ();
493  if (rc != 0)
494  return -1;
495 
496  // The algorithm below assumes ratio of requests and replies processed
497  // under full load to be 1:1.
498 
499  zmq_pollitem_t items[] = {{frontend_, 0, ZMQ_POLLIN, 0},
500  {backend_, 0, ZMQ_POLLIN, 0},
501  {control_, 0, ZMQ_POLLIN, 0}};
502  const int qt_poll_items = control_ ? 3 : 2;
503 
504  zmq_pollitem_t itemsout[] = {{frontend_, 0, ZMQ_POLLOUT, 0},
505  {backend_, 0, ZMQ_POLLOUT, 0}};
506 
507  stats_proxy stats = {0};
508 
509  // Proxy can be in these three states
510  proxy_state_t state = active;
511 
512  while (state != terminated) {
513  // Wait while there are either requests or replies to process.
514  rc = zmq_poll (&items[0], qt_poll_items, -1);
515  if (unlikely (rc < 0))
516  return close_and_return (&msg, -1);
517 
518  if (control_ && items[2].revents & ZMQ_POLLIN) {
519  rc = handle_control (control_, state, stats);
520  if (unlikely (rc < 0))
521  return close_and_return (&msg, -1);
522  }
523 
524  // Get the pollout separately because when combining this with pollin it maxes the CPU
525  // because pollout shall most of the time return directly.
526  // POLLOUT is only checked when frontend and backend sockets are not the same.
527  if (frontend_ != backend_) {
528  rc = zmq_poll (&itemsout[0], 2, 0);
529  if (unlikely (rc < 0)) {
530  return close_and_return (&msg, -1);
531  }
532  }
533 
534  if (state == active && items[0].revents & ZMQ_POLLIN
535  && (frontend_ == backend_ || itemsout[1].revents & ZMQ_POLLOUT)) {
536  rc = forward (frontend_, backend_, capture_, &msg,
537  stats.frontend.recv, stats.backend.send);
538  if (unlikely (rc < 0))
539  return close_and_return (&msg, -1);
540  }
541  // Process a reply
542  if (state == active && frontend_ != backend_
543  && items[1].revents & ZMQ_POLLIN
544  && itemsout[0].revents & ZMQ_POLLOUT) {
545  rc = forward (backend_, frontend_, capture_, &msg,
546  stats.backend.recv, stats.frontend.send);
547  if (unlikely (rc < 0))
548  return close_and_return (&msg, -1);
549  }
550  }
551 
552  return close_and_return (&msg, 0);
553 }
554 
555 #endif // ZMQ_HAVE_POLLER
CHECK_RC_EXIT_ON_FAILURE
#define CHECK_RC_EXIT_ON_FAILURE()
Definition: proxy.cpp:47
forward
static int forward(class zmq::socket_base_t *from_, class zmq::socket_base_t *to_, class zmq::socket_base_t *capture_, zmq::msg_t *msg_, stats_socket &recving, stats_socket &sending)
Definition: proxy.cpp:89
zmq::proxy
void proxy(void *frontend, void *backend, void *capture)
Definition: zmq.hpp:2274
zmq::socket_poller_t
Definition: socket_poller.hpp:30
NULL
NULL
Definition: test_security_zap.cpp:405
zmq_poll
ZMQ_EXPORT int zmq_poll(zmq_pollitem_t *items_, int nitems_, long timeout_)
Definition: zmq.cpp:827
EAGAIN
#define EAGAIN
Definition: errno.hpp:14
command
ROSLIB_DECL std::string command(const std::string &cmd)
ZMQ_TYPE
#define ZMQ_TYPE
Definition: zmq.h:287
active
@ active
Definition: proxy.cpp:141
zmq::msg_t::copy
int copy(msg_t &src_)
Definition: msg.cpp:326
precompiled.hpp
stats_endpoint::send
stats_socket send
Definition: proxy.cpp:82
zmq::socket_poller_t::add
int add(socket_base_t *socket_, void *user_data_, short events_)
Definition: socket_poller.cpp:85
zmq::socket_base_t::getsockopt
int getsockopt(int option_, void *optval_, size_t *optvallen_)
Definition: socket_base.cpp:426
errno
int errno
zmq_pollitem_t
Definition: zmq.h:487
zmq::socket_base_t
Definition: socket_base.hpp:31
stats_endpoint
Definition: proxy.cpp:80
terminated
@ terminated
Definition: proxy.cpp:143
zmq::msg_t::init_size
int init_size(size_t size_)
Definition: msg.cpp:62
stats_proxy
Definition: proxy.cpp:84
ZMQ_POLLIN
#define ZMQ_POLLIN
Definition: zmq.h:482
zmq_poller_event_t::events
short events
Definition: zmq_draft.h:119
paused
@ paused
Definition: proxy.cpp:142
zmq
Definition: zmq.hpp:229
ZMQ_POLLOUT
#define ZMQ_POLLOUT
Definition: zmq.h:483
socket_poller.hpp
stats_proxy::backend
stats_endpoint backend
Definition: proxy.cpp:86
likely
#define likely(x)
Definition: likely.hpp:10
zmq::socket_poller_t::wait
int wait(event_t *events_, int n_events_, long timeout_)
Definition: socket_poller.cpp:497
poller.hpp
zmq_poller_event_t
Definition: zmq_draft.h:114
ZMQ_REP
#define ZMQ_REP
Definition: zmq.h:262
stats_socket::bytes
uint64_t bytes
Definition: proxy.cpp:78
stats_socket::count
uint64_t count
Definition: proxy.cpp:78
zmq::socket_base_t::send
int send(zmq::msg_t *msg_, int flags_)
Definition: socket_base.cpp:1205
zmq::close_and_return
int close_and_return(zmq::msg_t *msg_, int echo_)
Definition: msg.hpp:300
ZMQ_DONTWAIT
#define ZMQ_DONTWAIT
Definition: zmq.h:358
zmq::proxy_steerable
void proxy_steerable(void *frontend, void *backend, void *capture, void *control)
Definition: zmq.hpp:2292
zmq::msg_t::init
int init()
Definition: msg.cpp:50
i
int i
Definition: gmock-matchers_test.cc:764
msg.hpp
stats_proxy::frontend
stats_endpoint frontend
Definition: proxy.cpp:86
type
GLenum type
Definition: glcorearb.h:2695
proxy_state_t
proxy_state_t
Definition: proxy.cpp:139
socket_base.hpp
err.hpp
ZMQ_SNDMORE
#define ZMQ_SNDMORE
Definition: zmq.h:359
likely.hpp
capture
static int capture(class zmq::socket_base_t *capture_, zmq::msg_t *msg_, int more_=0)
Definition: proxy.cpp:58
handle_control
static int handle_control(class zmq::socket_base_t *control_, proxy_state_t &state, const stats_proxy &stats)
Definition: proxy.cpp:148
stats_socket
Definition: proxy.cpp:76
zmq::socket_base_t::recv
int recv(zmq::msg_t *msg_, int flags_)
Definition: socket_base.cpp:1293
stats_endpoint::recv
stats_socket recv
Definition: proxy.cpp:82
zmq::msg_t::size
unsigned char size
Definition: msg.hpp:240
zmq::msg_t::data
unsigned char data[max_vsm_size]
Definition: msg.hpp:239
proxy.hpp
ZMQ_RCVMORE
#define ZMQ_RCVMORE
Definition: zmq.h:284
zmq::proxy_burst_size
@ proxy_burst_size
Definition: config.hpp:37
PROXY_CLEANUP
#define PROXY_CLEANUP()
Definition: proxy.cpp:35
zmq_getsockopt
ZMQ_EXPORT int zmq_getsockopt(void *s_, int option_, void *optval_, size_t *optvallen_)
Definition: zmq.cpp:261
zmq::msg_t
Definition: msg.hpp:33
unlikely
#define unlikely(x)
Definition: likely.hpp:11


libaditof
Author(s):
autogenerated on Wed May 21 2025 02:06:58