5 #if defined ZMQ_IOTHREAD_POLLER_USE_SELECT
7 #if defined ZMQ_HAVE_WINDOWS
8 #elif defined ZMQ_HAVE_HPUX
10 #include <sys/types.h>
12 #elif defined ZMQ_HAVE_OPENVMS
13 #include <sys/types.h>
15 #elif defined ZMQ_HAVE_VXWORKS
16 #include <sys/types.h>
20 #include <sys/select.h>
32 worker_poller_base_t (ctx_),
35 _current_family_entry_it (_family_entries.
end ())
40 #if defined ZMQ_HAVE_WINDOWS
41 for (
size_t i = 0;
i < fd_family_cache_size; ++
i)
42 _fd_family_cache[
i] = std::make_pair (
retired_fd, 0);
46 zmq::select_t::~select_t ()
51 zmq::select_t::handle_t zmq::select_t::add_fd (
fd_t fd_, i_poll_events *events_)
58 fd_entry.events = events_;
60 #if defined ZMQ_HAVE_WINDOWS
61 u_short family = get_fd_family (fd_);
62 wsa_assert (family != AF_UNSPEC);
63 family_entry_t &family_entry = _family_entries[family];
65 family_entry_t &family_entry = _family_entry;
67 family_entry.fd_entries.push_back (fd_entry);
68 FD_SET (fd_, &family_entry.fds_set.error);
70 #if !defined ZMQ_HAVE_WINDOWS
80 zmq::select_t::fd_entries_t::iterator
81 zmq::select_t::find_fd_entry_by_handle (fd_entries_t &fd_entries_,
84 fd_entries_t::iterator fd_entry_it;
85 for (fd_entry_it = fd_entries_.begin (); fd_entry_it != fd_entries_.end ();
87 if (fd_entry_it->fd == handle_)
93 void zmq::select_t::trigger_events (
const fd_entries_t &fd_entries_,
94 const fds_set_t &local_fds_set_,
98 for (fd_entries_t::size_type
i = 0,
size = fd_entries_.size ();
99 i < size && event_count_ > 0; ++
i) {
103 if (is_retired_fd (fd_entries_[
i]))
106 if (FD_ISSET (fd_entries_[
i].fd, &local_fds_set_.read)) {
107 fd_entries_[
i].events->in_event ();
115 if (is_retired_fd (fd_entries_[
i]) || event_count_ == 0)
118 if (FD_ISSET (fd_entries_[
i].fd, &local_fds_set_.write)) {
119 fd_entries_[
i].events->out_event ();
124 if (is_retired_fd (fd_entries_[
i]) || event_count_ == 0)
127 if (FD_ISSET (fd_entries_[
i].fd, &local_fds_set_.error)) {
128 fd_entries_[
i].events->in_event ();
134 #if defined ZMQ_HAVE_WINDOWS
135 int zmq::select_t::try_retire_fd_entry (
136 family_entries_t::iterator family_entry_it_,
zmq::fd_t &handle_)
138 family_entry_t &family_entry = family_entry_it_->second;
140 fd_entries_t::iterator fd_entry_it =
141 find_fd_entry_by_handle (family_entry.fd_entries, handle_);
143 if (fd_entry_it == family_entry.fd_entries.end ())
146 fd_entry_t &fd_entry = *fd_entry_it;
149 if (family_entry_it_ != _current_family_entry_it) {
153 family_entry.fd_entries.erase (fd_entry_it);
158 family_entry.has_retired =
true;
160 family_entry.fds_set.remove_fd (handle_);
165 void zmq::select_t::rm_fd (handle_t handle_)
169 #if defined ZMQ_HAVE_WINDOWS
170 u_short family = get_fd_family (handle_);
171 if (family != AF_UNSPEC) {
172 family_entries_t::iterator family_entry_it =
173 _family_entries.find (family);
175 retired += try_retire_fd_entry (family_entry_it, handle_);
180 family_entries_t::iterator
end = _family_entries.end ();
181 for (family_entries_t::iterator family_entry_it =
182 _family_entries.begin ();
183 family_entry_it !=
end; ++family_entry_it) {
184 if (retired += try_retire_fd_entry (family_entry_it, handle_)) {
190 fd_entries_t::iterator fd_entry_it =
191 find_fd_entry_by_handle (_family_entry.fd_entries, handle_);
192 assert (fd_entry_it != _family_entry.fd_entries.end ());
196 _family_entry.fds_set.remove_fd (handle_);
200 if (handle_ == _max_fd) {
202 for (fd_entry_it = _family_entry.fd_entries.begin ();
203 fd_entry_it != _family_entry.fd_entries.end (); ++fd_entry_it)
204 if (fd_entry_it->fd > _max_fd)
205 _max_fd = fd_entry_it->fd;
208 _family_entry.has_retired =
true;
214 void zmq::select_t::set_pollin (handle_t handle_)
217 #if defined ZMQ_HAVE_WINDOWS
218 u_short family = get_fd_family (handle_);
219 wsa_assert (family != AF_UNSPEC);
220 family_entry_t &family_entry = _family_entries[family];
222 family_entry_t &family_entry = _family_entry;
224 FD_SET (handle_, &family_entry.fds_set.read);
227 void zmq::select_t::reset_pollin (handle_t handle_)
230 #if defined ZMQ_HAVE_WINDOWS
231 u_short family = get_fd_family (handle_);
232 wsa_assert (family != AF_UNSPEC);
233 family_entry_t &family_entry = _family_entries[family];
235 family_entry_t &family_entry = _family_entry;
237 FD_CLR (handle_, &family_entry.fds_set.read);
240 void zmq::select_t::set_pollout (handle_t handle_)
243 #if defined ZMQ_HAVE_WINDOWS
244 u_short family = get_fd_family (handle_);
245 wsa_assert (family != AF_UNSPEC);
246 family_entry_t &family_entry = _family_entries[family];
248 family_entry_t &family_entry = _family_entry;
250 FD_SET (handle_, &family_entry.fds_set.write);
253 void zmq::select_t::reset_pollout (handle_t handle_)
256 #if defined ZMQ_HAVE_WINDOWS
257 u_short family = get_fd_family (handle_);
258 wsa_assert (family != AF_UNSPEC);
259 family_entry_t &family_entry = _family_entries[family];
261 family_entry_t &family_entry = _family_entry;
263 FD_CLR (handle_, &family_entry.fds_set.write);
266 void zmq::select_t::stop ()
272 int zmq::select_t::max_fds ()
277 void zmq::select_t::loop ()
281 int timeout =
static_cast<int> (execute_timers ());
286 if (_family_entries.empty ()) {
288 if (_family_entry.fd_entries.empty ()) {
299 #if defined ZMQ_HAVE_OSX
302 struct timeval tv = {
static_cast<long> (
timeout / 1000),
303 static_cast<long> (
timeout % 1000 * 1000)};
306 #if defined ZMQ_HAVE_WINDOWS
324 const bool use_wsa_events = _family_entries.size () > 1;
325 if (use_wsa_events) {
333 wsa_events_t wsa_events;
335 for (family_entries_t::iterator family_entry_it =
336 _family_entries.begin ();
337 family_entry_it != _family_entries.end (); ++family_entry_it) {
338 family_entry_t &family_entry = family_entry_it->second;
340 for (fd_entries_t::iterator fd_entry_it =
341 family_entry.fd_entries.begin ();
342 fd_entry_it != family_entry.fd_entries.end ();
344 fd_t fd = fd_entry_it->fd;
347 if (FD_ISSET (fd, &family_entry.fds_set.read)
348 && FD_ISSET (fd, &family_entry.fds_set.write))
349 rc = WSAEventSelect (fd, wsa_events.events[3],
350 FD_READ | FD_ACCEPT | FD_CLOSE
351 | FD_WRITE | FD_CONNECT);
352 else if (FD_ISSET (fd, &family_entry.fds_set.read))
353 rc = WSAEventSelect (fd, wsa_events.events[0],
354 FD_READ | FD_ACCEPT | FD_CLOSE);
355 else if (FD_ISSET (fd, &family_entry.fds_set.write))
356 rc = WSAEventSelect (fd, wsa_events.events[1],
357 FD_WRITE | FD_CONNECT);
361 wsa_assert (rc != SOCKET_ERROR);
365 rc = WSAWaitForMultipleEvents (4, wsa_events.events, FALSE,
367 wsa_assert (rc != (
int) WSA_WAIT_FAILED);
370 if (rc == WSA_WAIT_TIMEOUT)
374 for (_current_family_entry_it = _family_entries.begin ();
375 _current_family_entry_it != _family_entries.end ();
376 ++_current_family_entry_it) {
377 family_entry_t &family_entry = _current_family_entry_it->second;
380 if (use_wsa_events) {
383 struct timeval tv_nodelay = {0, 0};
384 select_family_entry (family_entry, 0,
true, tv_nodelay);
386 select_family_entry (family_entry, 0,
timeout > 0, tv);
390 select_family_entry (_family_entry, _max_fd + 1,
timeout > 0, tv);
395 void zmq::select_t::select_family_entry (family_entry_t &family_entry_,
397 const bool use_timeout_,
401 fd_entries_t &fd_entries = family_entry_.fd_entries;
402 if (fd_entries.empty ())
405 fds_set_t local_fds_set = family_entry_.fds_set;
406 int rc = select (max_fd_, &local_fds_set.read, &local_fds_set.write,
407 &local_fds_set.error, use_timeout_ ? &tv_ :
NULL);
409 #if defined ZMQ_HAVE_WINDOWS
410 wsa_assert (rc != SOCKET_ERROR);
418 trigger_events (fd_entries, local_fds_set, rc);
420 cleanup_retired (family_entry_);
423 zmq::select_t::fds_set_t::fds_set_t ()
430 zmq::select_t::fds_set_t::fds_set_t (
const fds_set_t &other_)
432 #if defined ZMQ_HAVE_WINDOWS
437 memcpy (&read, &other_.read,
438 (
char *) (other_.read.fd_array + other_.read.fd_count)
439 - (
char *) &other_.read);
440 memcpy (&write, &other_.write,
441 (
char *) (other_.write.fd_array + other_.write.fd_count)
442 - (
char *) &other_.write);
443 memcpy (&
error, &other_.error,
444 (
char *) (other_.error.fd_array + other_.error.fd_count)
445 - (
char *) &other_.error);
447 memcpy (&read, &other_.read,
sizeof other_.read);
448 memcpy (&write, &other_.write,
sizeof other_.write);
449 memcpy (&
error, &other_.error,
sizeof other_.error);
453 zmq::select_t::fds_set_t &
454 zmq::select_t::fds_set_t::operator= (
const fds_set_t &other_)
456 #if defined ZMQ_HAVE_WINDOWS
461 memcpy (&read, &other_.read,
462 (
char *) (other_.read.fd_array + other_.read.fd_count)
463 - (
char *) &other_.read);
464 memcpy (&write, &other_.write,
465 (
char *) (other_.write.fd_array + other_.write.fd_count)
466 - (
char *) &other_.write);
467 memcpy (&
error, &other_.error,
468 (
char *) (other_.error.fd_array + other_.error.fd_count)
469 - (
char *) &other_.error);
471 memcpy (&read, &other_.read,
sizeof other_.read);
472 memcpy (&write, &other_.write,
sizeof other_.write);
473 memcpy (&
error, &other_.error,
sizeof other_.error);
478 void zmq::select_t::fds_set_t::remove_fd (
const fd_t &fd_)
481 FD_CLR (fd_, &write);
482 FD_CLR (fd_, &
error);
485 bool zmq::select_t::cleanup_retired (family_entry_t &family_entry_)
487 if (family_entry_.has_retired) {
488 family_entry_.has_retired =
false;
489 family_entry_.fd_entries.erase (
490 std::remove_if (family_entry_.fd_entries.begin (),
491 family_entry_.fd_entries.end (), is_retired_fd),
492 family_entry_.fd_entries.end ());
494 return family_entry_.fd_entries.empty ();
497 void zmq::select_t::cleanup_retired ()
500 for (family_entries_t::iterator
it = _family_entries.begin ();
501 it != _family_entries.end ();) {
502 if (cleanup_retired (
it->second))
503 it = _family_entries.erase (
it);
508 cleanup_retired (_family_entry);
512 bool zmq::select_t::is_retired_fd (
const fd_entry_t &entry_)
517 zmq::select_t::family_entry_t::family_entry_t () : has_retired (
false)
522 #if defined ZMQ_HAVE_WINDOWS
523 u_short zmq::select_t::get_fd_family (
fd_t fd_)
528 for (
i = 0;
i < fd_family_cache_size; ++
i) {
529 const std::pair<fd_t, u_short> &entry = _fd_family_cache[
i];
530 if (entry.first == fd_) {
537 std::pair<fd_t, u_short> res =
538 std::make_pair (fd_, determine_fd_family (fd_));
539 if (
i < fd_family_cache_size) {
540 _fd_family_cache[
i] = res;
544 _fd_family_cache[rand () % fd_family_cache_size] = res;
550 u_short zmq::select_t::determine_fd_family (
fd_t fd_)
553 sockaddr_storage addr = {0};
554 int addr_size =
sizeof addr;
557 int type_length =
sizeof (int);
559 int rc = getsockopt (fd_, SOL_SOCKET, SO_TYPE,
560 reinterpret_cast<char *
> (&
type), &type_length);
563 if (
type == SOCK_DGRAM)
567 getsockname (fd_,
reinterpret_cast<sockaddr *
> (&addr), &addr_size);
571 if (rc != SOCKET_ERROR)
572 return addr.ss_family == AF_INET6 ? AF_INET : addr.ss_family;
578 zmq::select_t::wsa_events_t::wsa_events_t ()
580 events[0] = WSACreateEvent ();
581 wsa_assert (events[0] != WSA_INVALID_EVENT);
582 events[1] = WSACreateEvent ();
583 wsa_assert (events[1] != WSA_INVALID_EVENT);
584 events[2] = WSACreateEvent ();
585 wsa_assert (events[2] != WSA_INVALID_EVENT);
586 events[3] = WSACreateEvent ();
587 wsa_assert (events[3] != WSA_INVALID_EVENT);
590 zmq::select_t::wsa_events_t::~wsa_events_t ()
592 wsa_assert (WSACloseEvent (events[0]));
593 wsa_assert (WSACloseEvent (events[1]));
594 wsa_assert (WSACloseEvent (events[2]));
595 wsa_assert (WSACloseEvent (events[3]));