18 template <
class It,
class T,
class Pred>
21 for (;
b_ != e_; ++
b_) {
32 #
if defined ZMQ_POLL_BASED_ON_POLL
35 #elif defined ZMQ_POLL_BASED_ON_SELECT
48 for (items_t::iterator
it = _items.begin (),
end = _items.end ();
it !=
end;
51 if (
it->socket &&
it->socket->check_tag ()
53 it->socket->remove_signaler (_signaler);
57 if (_signaler !=
NULL) {
61 #if defined ZMQ_POLL_BASED_ON_POLL
71 return _tag == 0xCAFEBABE;
77 *fd_ = _signaler->get_fd ();
89 if (
find_if2 (_items.begin (), _items.end (), socket_, &is_socket)
96 if (_signaler ==
NULL) {
102 if (!_signaler->valid ()) {
118 #if defined ZMQ_POLL_BASED_ON_POLL
124 _items.push_back (
item);
126 catch (
const std::bad_alloc &) {
130 _need_rebuild =
true;
137 if (
find_if2 (_items.begin (), _items.end (), fd_, &is_fd)
148 #if defined ZMQ_POLL_BASED_ON_POLL
154 _items.push_back (
item);
156 catch (
const std::bad_alloc &) {
160 _need_rebuild =
true;
167 const items_t::iterator
it =
168 find_if2 (_items.begin (), _items.end (), socket_, &is_socket);
170 if (
it == _items.end ()) {
175 it->events = events_;
176 _need_rebuild =
true;
184 const items_t::iterator
it =
185 find_if2 (_items.begin (), _items.end (), fd_, &is_fd);
187 if (
it == _items.end ()) {
192 it->events = events_;
193 _need_rebuild =
true;
201 const items_t::iterator
it =
202 find_if2 (_items.begin (), _items.end (), socket_, &is_socket);
204 if (
it == _items.end ()) {
210 _need_rebuild =
true;
221 const items_t::iterator
it =
222 find_if2 (_items.begin (), _items.end (), fd_, &is_fd);
224 if (
it == _items.end ()) {
230 _need_rebuild =
true;
237 _use_signaler =
false;
239 _need_rebuild =
false;
241 #if defined ZMQ_POLL_BASED_ON_POLL
248 for (items_t::iterator
it = _items.begin (),
end = _items.end ();
it !=
end;
252 if (!_use_signaler) {
253 _use_signaler =
true;
261 if (_pollset_size == 0)
264 _pollfds =
static_cast<pollfd *
> (malloc (_pollset_size *
sizeof (pollfd)));
268 _need_rebuild =
true;
276 _pollfds[0].fd = _signaler->get_fd ();
277 _pollfds[0].events = POLLIN;
280 for (items_t::iterator
it = _items.begin (),
end = _items.end ();
it !=
end;
286 const int rc =
it->socket->getsockopt (
287 ZMQ_FD, &_pollfds[item_nbr].fd, &fd_size);
290 _pollfds[item_nbr].events = POLLIN;
294 _pollfds[item_nbr].fd =
it->fd;
295 _pollfds[item_nbr].events =
299 it->pollfd_index = item_nbr;
305 #elif defined ZMQ_POLL_BASED_ON_SELECT
311 _pollset_in.resize (_items.size ());
312 _pollset_out.resize (_items.size ());
313 _pollset_err.resize (_items.size ());
315 FD_ZERO (_pollset_in.get ());
316 FD_ZERO (_pollset_out.get ());
317 FD_ZERO (_pollset_err.get ());
319 for (items_t::iterator
it = _items.begin (),
end = _items.end ();
it !=
end;
322 _use_signaler =
true;
323 FD_SET (_signaler->get_fd (), _pollset_in.get ());
332 for (items_t::iterator
it = _items.begin (),
end = _items.end ();
it !=
end;
342 it->socket->getsockopt (
ZMQ_FD, ¬ify_fd, &fd_size);
345 FD_SET (notify_fd, _pollset_in.get ());
346 if (_max_fd < notify_fd)
356 FD_SET (
it->fd, _pollset_in.get ());
358 FD_SET (
it->fd, _pollset_out.get ());
360 FD_SET (
it->fd, _pollset_err.get ());
361 if (_max_fd < it->fd)
377 for (
int i = found_;
i < n_events_; ++
i) {
385 #if defined ZMQ_POLL_BASED_ON_POLL
388 #elif defined ZMQ_POLL_BASED_ON_SELECT
397 for (items_t::iterator
it = _items.begin (),
end = _items.end ();
402 size_t events_size =
sizeof (uint32_t);
404 if (
it->socket->getsockopt (
ZMQ_EVENTS, &events, &events_size)
409 if (
it->events & events) {
419 else if (
it->events) {
420 #if defined ZMQ_POLL_BASED_ON_POLL
422 const short revents = _pollfds[
it->pollfd_index].revents;
425 if (revents & POLLIN)
427 if (revents & POLLOUT)
429 if (revents & POLLPRI)
431 if (revents & ~(POLLIN | POLLOUT | POLLPRI))
434 #elif defined ZMQ_POLL_BASED_ON_SELECT
438 if (FD_ISSET (
it->fd, &inset_))
440 if (FD_ISSET (
it->fd, &outset_))
442 if (FD_ISSET (
it->fd, &errset_))
485 end_ = now_ + timeout_;
501 if (_items.empty () && timeout_ < 0) {
507 const int rc = rebuild ();
512 if (
unlikely (_pollset_size == 0)) {
525 #if defined ZMQ_HAVE_WINDOWS
526 Sleep (timeout_ > 0 ? timeout_ : INFINITE);
528 #elif defined ZMQ_HAVE_ANDROID
529 usleep (timeout_ * 1000);
531 #elif defined ZMQ_HAVE_OSX
532 usleep (timeout_ * 1000);
535 #elif defined ZMQ_HAVE_VXWORKS
537 ns_.tv_sec = timeout_ / 1000;
538 ns_.tv_nsec = timeout_ % 1000 * 1000000;
542 usleep (timeout_ * 1000);
547 #if defined ZMQ_POLL_BASED_ON_POLL
552 bool first_pass =
true;
559 else if (timeout_ < 0)
563 static_cast<int> (std::min<uint64_t> (
end - now, INT_MAX));
566 const int rc =
poll (_pollfds, _pollset_size,
timeout);
573 if (_use_signaler && _pollfds[0].revents & POLLIN)
580 zero_trail_events (events_, n_events_,
found);
585 if (adjust_timeout (clock, timeout_, now,
end, first_pass) == 0)
591 #elif defined ZMQ_POLL_BASED_ON_SELECT
597 bool first_pass =
true;
599 optimized_fd_set_t inset (_pollset_size);
600 optimized_fd_set_t outset (_pollset_size);
601 optimized_fd_set_t errset (_pollset_size);
611 }
else if (timeout_ < 0)
614 timeout.tv_sec =
static_cast<long> ((
end - now) / 1000);
615 timeout.tv_usec =
static_cast<long> ((
end - now) % 1000 * 1000);
620 memcpy (inset.get (), _pollset_in.get (),
621 valid_pollset_bytes (*_pollset_in.get ()));
622 memcpy (outset.get (), _pollset_out.get (),
623 valid_pollset_bytes (*_pollset_out.get ()));
624 memcpy (errset.get (), _pollset_err.get (),
625 valid_pollset_bytes (*_pollset_err.get ()));
626 const int rc = select (
static_cast<int> (_max_fd + 1), inset.get (),
627 outset.get (), errset.get (), ptimeout);
628 #if defined ZMQ_HAVE_WINDOWS
629 if (
unlikely (rc == SOCKET_ERROR)) {
630 errno = wsa_error_to_errno (WSAGetLastError ());
641 if (_use_signaler && FD_ISSET (_signaler->get_fd (), inset.get ()))
646 *outset.get (), *errset.get ());
649 zero_trail_events (events_, n_events_,
found);
654 if (adjust_timeout (clock, timeout_, now,
end, first_pass) == 0)