11 #if defined ZMQ_POLL_BASED_ON_POLL && !defined ZMQ_HAVE_WINDOWS \
12 && !defined ZMQ_HAVE_AIX
28 #ifdef ZMQ_HAVE_POLLER
35 #define PROXY_CLEANUP() \
39 delete poller_receive_blocked; \
40 delete poller_send_blocked; \
41 delete poller_both_blocked; \
42 delete poller_frontend_only; \
43 delete poller_backend_only; \
47 #define CHECK_RC_EXIT_ON_FAILURE() \
51 return close_and_return (&msg, -1); \
55 #endif // ZMQ_HAVE_POLLER
63 int rc = ctrl.
init ();
66 rc = ctrl.
copy (*msg_);
111 size_t nbytes = msg_->
size ();
113 recving.
bytes += nbytes;
115 moresz =
sizeof more;
121 rc =
capture (capture_, msg_, more);
129 sending.
bytes += nbytes;
153 int rc = cmsg.
init ();
161 uint8_t *
const command =
static_cast<uint8_t *
> (cmsg.
data ());
162 const size_t msiz = cmsg.
size ();
164 if (msiz == 10 && 0 == memcmp (
command,
"STATISTICS", 10)) {
175 const uint64_t stat_vals[8] = {
181 for (
size_t ind = 0; ind < 8; ++ind) {
183 memcpy (cmsg.
data (), stat_vals + ind, sizeof (uint64_t));
192 if (msiz == 5 && 0 == memcmp (
command,
"PAUSE", 5)) {
194 }
else if (msiz == 6 && 0 == memcmp (
command,
"RESUME", 6)) {
196 }
else if (msiz == 9 && 0 == memcmp (
command,
"TERMINATE", 9)) {
201 size_t sz =
sizeof (
type);
206 rc = control_->
send (&cmsg, 0);
214 #ifdef ZMQ_HAVE_POLLER
221 int rc = msg.
init ();
231 bool frontend_equal_to_backend;
232 bool frontend_in =
false;
233 bool frontend_out =
false;
234 bool backend_in =
false;
235 bool backend_out =
false;
239 stats_proxy stats = {{{0, 0}, {0, 0}}, {{0, 0}, {0, 0}}};
264 if (frontend_ != backend_) {
265 poller_send_blocked =
new (std::nothrow)
267 poller_both_blocked =
new (std::nothrow)
zmq::
269 poller_frontend_only =
new (std::nothrow)
zmq::
271 poller_backend_only =
new (std::nothrow)
zmq::
273 frontend_equal_to_backend =
false;
275 frontend_equal_to_backend =
true;
277 if (poller_all ==
NULL || poller_in ==
NULL
278 || poller_receive_blocked ==
NULL
279 || ((poller_send_blocked ==
NULL || poller_both_blocked ==
NULL)
280 && !frontend_equal_to_backend)) {
289 rc = poller_all->
add (frontend_,
NULL,
295 if (frontend_equal_to_backend) {
302 rc = poller_all->
add (backend_,
NULL,
307 rc = poller_both_blocked->
add (
310 rc = poller_both_blocked->
add (
313 rc = poller_send_blocked->
add (
317 rc = poller_send_blocked->
add (
321 rc = poller_receive_blocked->
add (
325 rc = poller_receive_blocked->
add (
364 bool request_processed =
false, reply_processed =
false;
370 rc = poller_wait->
wait (events, nevents, -1);
376 rc = poller_all->
wait (events, nevents, 0);
382 for (
int i = 0;
i < rc;
i++) {
383 if (control_ && events[
i].socket == control_) {
389 if (events[
i].socket == frontend_) {
395 if (events[
i].socket == backend_) {
404 if (frontend_in && (backend_out || frontend_equal_to_backend)) {
405 rc =
forward (frontend_, backend_, capture_, &msg,
408 request_processed =
true;
409 frontend_in = backend_out =
false;
411 request_processed =
false;
417 if (backend_in && frontend_out) {
418 rc =
forward (backend_, frontend_, capture_, &msg,
421 reply_processed =
true;
422 backend_in = frontend_out =
false;
424 reply_processed =
false;
426 if (request_processed || reply_processed) {
429 if (poller_wait != poller_in) {
430 if (request_processed) {
431 if (poller_wait == poller_both_blocked)
432 poller_wait = poller_send_blocked;
433 else if (poller_wait == poller_receive_blocked
434 || poller_wait == poller_frontend_only)
435 poller_wait = poller_in;
437 if (reply_processed) {
438 if (poller_wait == poller_both_blocked)
439 poller_wait = poller_receive_blocked;
440 else if (poller_wait == poller_send_blocked
441 || poller_wait == poller_backend_only)
442 poller_wait = poller_in;
455 poller_wait = poller_backend_only;
457 if (poller_wait == poller_send_blocked)
458 poller_wait = poller_both_blocked;
459 else if (poller_wait == poller_in)
460 poller_wait = poller_receive_blocked;
469 poller_wait = poller_frontend_only;
471 if (poller_wait == poller_receive_blocked)
472 poller_wait = poller_both_blocked;
473 else if (poller_wait == poller_in)
474 poller_wait = poller_send_blocked;
484 #else // ZMQ_HAVE_POLLER
487 class socket_base_t *backend_,
488 class socket_base_t *capture_,
489 class socket_base_t *control_)
492 int rc = msg.init ();
502 const int qt_poll_items = control_ ? 3 : 2;
514 rc =
zmq_poll (&items[0], qt_poll_items, -1);
518 if (control_ && items[2].revents &
ZMQ_POLLIN) {
527 if (frontend_ != backend_) {
535 && (frontend_ == backend_ || itemsout[1].revents &
ZMQ_POLLOUT)) {
536 rc =
forward (frontend_, backend_, capture_, &msg,
542 if (state ==
active && frontend_ != backend_
545 rc =
forward (backend_, frontend_, capture_, &msg,
555 #endif // ZMQ_HAVE_POLLER