zmq_addon.hpp
Go to the documentation of this file.
1 /*
2  Copyright (c) 2016-2017 ZeroMQ community
3  Copyright (c) 2016 VOCA AS / Harald Nøkland
4 
5  Permission is hereby granted, free of charge, to any person obtaining a copy
6  of this software and associated documentation files (the "Software"), to
7  deal in the Software without restriction, including without limitation the
8  rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
9  sell copies of the Software, and to permit persons to whom the Software is
10  furnished to do so, subject to the following conditions:
11 
12  The above copyright notice and this permission notice shall be included in
13  all copies or substantial portions of the Software.
14 
15  THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
16  IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
17  FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
18  AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
19  LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
20  FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
21  IN THE SOFTWARE.
22 */
23 
24 #ifndef __ZMQ_ADDON_HPP_INCLUDED__
25 #define __ZMQ_ADDON_HPP_INCLUDED__
26 
27 #include "zmq.hpp"
28 
29 #include <deque>
30 #include <iomanip>
31 #include <sstream>
32 #include <stdexcept>
33 #ifdef ZMQ_CPP11
34 #include <limits>
35 #include <functional>
36 #include <unordered_map>
37 
38 namespace zmq
39 {
40  // socket ref or native file descriptor for poller
41  class poller_ref_t
42  {
43  public:
44  enum RefType
45  {
46  RT_SOCKET,
47  RT_FD
48  };
49 
50  poller_ref_t() : poller_ref_t(socket_ref{})
51  {}
52 
53  poller_ref_t(const zmq::socket_ref& socket) : data{RT_SOCKET, socket, {}}
54  {}
55 
56  poller_ref_t(zmq::fd_t fd) : data{RT_FD, {}, fd}
57  {}
58 
59  size_t hash() const ZMQ_NOTHROW
60  {
61  std::size_t h = 0;
62  hash_combine(h, std::get<0>(data));
63  hash_combine(h, std::get<1>(data));
64  hash_combine(h, std::get<2>(data));
65  return h;
66  }
67 
68  bool operator == (const poller_ref_t& o) const ZMQ_NOTHROW
69  {
70  return data == o.data;
71  }
72 
73  private:
74  template <class T>
75  static void hash_combine(std::size_t& seed, const T& v) ZMQ_NOTHROW
76  {
77  std::hash<T> hasher;
78  seed ^= hasher(v) + 0x9e3779b9 + (seed<<6) + (seed>>2);
79  }
80 
81  std::tuple<int, zmq::socket_ref, zmq::fd_t> data;
82 
83  }; // class poller_ref_t
84 
85 } // namespace zmq
86 
87 // std::hash<> specialization for std::unordered_map
88 template <> struct std::hash<zmq::poller_ref_t>
89 {
90  size_t operator()(const zmq::poller_ref_t& ref) const ZMQ_NOTHROW
91  {
92  return ref.hash();
93  }
94 };
95 #endif // ZMQ_CPP11
96 
97 namespace zmq
98 {
99 #ifdef ZMQ_CPP11
100 
101 namespace detail
102 {
103 template<bool CheckN, class OutputIt>
104 recv_result_t
105 recv_multipart_n(socket_ref s, OutputIt out, size_t n, recv_flags flags)
106 {
107  size_t msg_count = 0;
108  message_t msg;
109  while (true) {
110  if ZMQ_CONSTEXPR_IF (CheckN) {
111  if (msg_count >= n)
112  throw std::runtime_error(
113  "Too many message parts in recv_multipart_n");
114  }
115  if (!s.recv(msg, flags)) {
116  // zmq ensures atomic delivery of messages
117  assert(msg_count == 0);
118  return {};
119  }
120  ++msg_count;
121  const bool more = msg.more();
122  *out++ = std::move(msg);
123  if (!more)
124  break;
125  }
126  return msg_count;
127 }
128 
129 inline bool is_little_endian()
130 {
131  const uint16_t i = 0x01;
132  return *reinterpret_cast<const uint8_t *>(&i) == 0x01;
133 }
134 
135 inline void write_network_order(unsigned char *buf, const uint32_t value)
136 {
137  if (is_little_endian()) {
138  ZMQ_CONSTEXPR_VAR uint32_t mask = (std::numeric_limits<std::uint8_t>::max)();
139  *buf++ = static_cast<unsigned char>((value >> 24) & mask);
140  *buf++ = static_cast<unsigned char>((value >> 16) & mask);
141  *buf++ = static_cast<unsigned char>((value >> 8) & mask);
142  *buf++ = static_cast<unsigned char>(value & mask);
143  } else {
144  std::memcpy(buf, &value, sizeof(value));
145  }
146 }
147 
148 inline uint32_t read_u32_network_order(const unsigned char *buf)
149 {
150  if (is_little_endian()) {
151  return (static_cast<uint32_t>(buf[0]) << 24)
152  + (static_cast<uint32_t>(buf[1]) << 16)
153  + (static_cast<uint32_t>(buf[2]) << 8)
154  + static_cast<uint32_t>(buf[3]);
155  } else {
156  uint32_t value;
157  std::memcpy(&value, buf, sizeof(value));
158  return value;
159  }
160 }
161 } // namespace detail
162 
163 /* Receive a multipart message.
164 
165  Writes the zmq::message_t objects to OutputIterator out.
166  The out iterator must handle an unspecified number of writes,
167  e.g. by using std::back_inserter.
168 
169  Returns: the number of messages received or nullopt (on EAGAIN).
170  Throws: if recv throws. Any exceptions thrown
171  by the out iterator will be propagated and the message
172  may have been only partially received with pending
173  message parts. It is adviced to close this socket in that event.
174 */
175 template<class OutputIt>
176 ZMQ_NODISCARD recv_result_t recv_multipart(socket_ref s,
177  OutputIt out,
178  recv_flags flags = recv_flags::none)
179 {
180  return detail::recv_multipart_n<false>(s, std::move(out), 0, flags);
181 }
182 
183 /* Receive a multipart message.
184 
185  Writes at most n zmq::message_t objects to OutputIterator out.
186  If the number of message parts of the incoming message exceeds n
187  then an exception will be thrown.
188 
189  Returns: the number of messages received or nullopt (on EAGAIN).
190  Throws: if recv throws. Throws std::runtime_error if the number
191  of message parts exceeds n (exactly n messages will have been written
192  to out). Any exceptions thrown
193  by the out iterator will be propagated and the message
194  may have been only partially received with pending
195  message parts. It is adviced to close this socket in that event.
196 */
197 template<class OutputIt>
198 ZMQ_NODISCARD recv_result_t recv_multipart_n(socket_ref s,
199  OutputIt out,
200  size_t n,
201  recv_flags flags = recv_flags::none)
202 {
203  return detail::recv_multipart_n<true>(s, std::move(out), n, flags);
204 }
205 
206 /* Send a multipart message.
207 
208  The range must be a ForwardRange of zmq::message_t,
209  zmq::const_buffer or zmq::mutable_buffer.
210  The flags may be zmq::send_flags::sndmore if there are
211  more message parts to be sent after the call to this function.
212 
213  Returns: the number of messages sent (exactly msgs.size()) or nullopt (on EAGAIN).
214  Throws: if send throws. Any exceptions thrown
215  by the msgs range will be propagated and the message
216  may have been only partially sent. It is adviced to close this socket in that event.
217 */
218 template<class Range
219 #ifndef ZMQ_CPP11_PARTIAL
220  ,
221  typename = typename std::enable_if<
223  && (std::is_same<detail::range_value_t<Range>, message_t>::value
224  || detail::is_buffer<detail::range_value_t<Range>>::value)>::type
225 #endif
226  >
227 send_result_t
228 send_multipart(socket_ref s, Range &&msgs, send_flags flags = send_flags::none)
229 {
230  using std::begin;
231  using std::end;
232  auto it = begin(msgs);
233  const auto end_it = end(msgs);
234  size_t msg_count = 0;
235  while (it != end_it) {
236  const auto next = std::next(it);
237  const auto msg_flags =
238  flags | (next == end_it ? send_flags::none : send_flags::sndmore);
239  if (!s.send(*it, msg_flags)) {
240  // zmq ensures atomic delivery of messages
241  assert(it == begin(msgs));
242  return {};
243  }
244  ++msg_count;
245  it = next;
246  }
247  return msg_count;
248 }
249 
250 /* Encode a multipart message.
251 
252  The range must be a ForwardRange of zmq::message_t. A
253  zmq::multipart_t or STL container may be passed for encoding.
254 
255  Returns: a zmq::message_t holding the encoded multipart data.
256 
257  Throws: std::range_error is thrown if the size of any single part
258  can not fit in an unsigned 32 bit integer.
259 
260  The encoding is compatible with that used by the CZMQ function
261  zmsg_encode(), see https://rfc.zeromq.org/spec/50/.
262  Each part consists of a size followed by the data.
263  These are placed contiguously into the output message. A part of
264  size less than 255 bytes will have a single byte size value.
265  Larger parts will have a five byte size value with the first byte
266  set to 0xFF and the remaining four bytes holding the size of the
267  part's data.
268 */
269 template<class Range
270 #ifndef ZMQ_CPP11_PARTIAL
271  ,
272  typename = typename std::enable_if<
274  && (std::is_same<detail::range_value_t<Range>, message_t>::value
275  || detail::is_buffer<detail::range_value_t<Range>>::value)>::type
276 #endif
277  >
278 message_t encode(const Range &parts)
279 {
280  size_t mmsg_size = 0;
281 
282  // First pass check sizes
283  for (const auto &part : parts) {
284  const size_t part_size = part.size();
285  if (part_size > (std::numeric_limits<std::uint32_t>::max)()) {
286  // Size value must fit into uint32_t.
287  throw std::range_error("Invalid size, message part too large");
288  }
289  const size_t count_size =
290  part_size < (std::numeric_limits<std::uint8_t>::max)() ? 1 : 5;
291  mmsg_size += part_size + count_size;
292  }
293 
294  message_t encoded(mmsg_size);
295  unsigned char *buf = encoded.data<unsigned char>();
296  for (const auto &part : parts) {
297  const uint32_t part_size = static_cast<uint32_t>(part.size());
298  const unsigned char *part_data =
299  static_cast<const unsigned char *>(part.data());
300 
301  if (part_size < (std::numeric_limits<std::uint8_t>::max)()) {
302  // small part
303  *buf++ = (unsigned char) part_size;
304  } else {
305  // big part
306  *buf++ = (std::numeric_limits<uint8_t>::max)();
307  detail::write_network_order(buf, part_size);
308  buf += sizeof(part_size);
309  }
310  std::memcpy(buf, part_data, part_size);
311  buf += part_size;
312  }
313 
314  assert(static_cast<size_t>(buf - encoded.data<unsigned char>()) == mmsg_size);
315  return encoded;
316 }
317 
318 /* Decode an encoded message to multiple parts.
319 
320  The given output iterator must be a ForwardIterator to a container
321  holding zmq::message_t such as a zmq::multipart_t or various STL
322  containers.
323 
324  Returns the ForwardIterator advanced once past the last decoded
325  part.
326 
327  Throws: a std::out_of_range is thrown if the encoded part sizes
328  lead to exceeding the message data bounds.
329 
330  The decoding assumes the message is encoded in the manner
331  performed by zmq::encode(), see https://rfc.zeromq.org/spec/50/.
332  */
333 template<class OutputIt> OutputIt decode(const message_t &encoded, OutputIt out)
334 {
335  const unsigned char *source = encoded.data<unsigned char>();
336  const unsigned char *const limit = source + encoded.size();
337 
338  while (source < limit) {
339  size_t part_size = *source++;
340  if (part_size == (std::numeric_limits<std::uint8_t>::max)()) {
341  if (static_cast<size_t>(limit - source) < sizeof(uint32_t)) {
342  throw std::out_of_range(
343  "Malformed encoding, overflow in reading size");
344  }
345  part_size = detail::read_u32_network_order(source);
346  // the part size is allowed to be less than 0xFF
347  source += sizeof(uint32_t);
348  }
349 
350  if (static_cast<size_t>(limit - source) < part_size) {
351  throw std::out_of_range("Malformed encoding, overflow in reading part");
352  }
353  *out = message_t(source, part_size);
354  ++out;
355  source += part_size;
356  }
357 
358  assert(source == limit);
359  return out;
360 }
361 
362 #endif
363 
364 
365 #ifdef ZMQ_HAS_RVALUE_REFS
366 
367 /*
368  This class handles multipart messaging. It is the C++ equivalent of zmsg.h,
369  which is part of CZMQ (the high-level C binding). Furthermore, it is a major
370  improvement compared to zmsg.hpp, which is part of the examples in the ØMQ
371  Guide. Unnecessary copying is avoided by using move semantics to efficiently
372  add/remove parts.
373 */
374 class multipart_t
375 {
376  private:
377  std::deque<message_t> m_parts;
378 
379  public:
381 
382  typedef std::deque<message_t>::iterator iterator;
383  typedef std::deque<message_t>::const_iterator const_iterator;
384 
385  typedef std::deque<message_t>::reverse_iterator reverse_iterator;
386  typedef std::deque<message_t>::const_reverse_iterator const_reverse_iterator;
387 
388  // Default constructor
389  multipart_t() {}
390 
391  // Construct from socket receive
392  multipart_t(socket_ref socket) { recv(socket); }
393 
394  // Construct from memory block
395  multipart_t(const void *src, size_t size) { addmem(src, size); }
396 
397  // Construct from string
398  multipart_t(const std::string &string) { addstr(string); }
399 
400  // Construct from message part
401  multipart_t(message_t &&message) { add(std::move(message)); }
402 
403  // Move constructor
404  multipart_t(multipart_t &&other) ZMQ_NOTHROW { m_parts = std::move(other.m_parts); }
405 
406  // Move assignment operator
407  multipart_t &operator=(multipart_t &&other) ZMQ_NOTHROW
408  {
409  m_parts = std::move(other.m_parts);
410  return *this;
411  }
412 
413  // Destructor
414  virtual ~multipart_t() { clear(); }
415 
416  message_t &operator[](size_t n) { return m_parts[n]; }
417 
418  const message_t &operator[](size_t n) const { return m_parts[n]; }
419 
420  message_t &at(size_t n) { return m_parts.at(n); }
421 
422  const message_t &at(size_t n) const { return m_parts.at(n); }
423 
424  iterator begin() { return m_parts.begin(); }
425 
426  const_iterator begin() const { return m_parts.begin(); }
427 
428  const_iterator cbegin() const { return m_parts.cbegin(); }
429 
430  reverse_iterator rbegin() { return m_parts.rbegin(); }
431 
432  const_reverse_iterator rbegin() const { return m_parts.rbegin(); }
433 
434  iterator end() { return m_parts.end(); }
435 
436  const_iterator end() const { return m_parts.end(); }
437 
438  const_iterator cend() const { return m_parts.cend(); }
439 
440  reverse_iterator rend() { return m_parts.rend(); }
441 
442  const_reverse_iterator rend() const { return m_parts.rend(); }
443 
444  // Delete all parts
445  void clear() { m_parts.clear(); }
446 
447  // Get number of parts
448  size_t size() const { return m_parts.size(); }
449 
450  // Check if number of parts is zero
451  bool empty() const { return m_parts.empty(); }
452 
453  // Receive multipart message from socket
454  bool recv(socket_ref socket, int flags = 0)
455  {
456  clear();
457  bool more = true;
458  while (more) {
459  message_t message;
460 #ifdef ZMQ_CPP11
461  if (!socket.recv(message, static_cast<recv_flags>(flags)))
462  return false;
463 #else
464  if (!socket.recv(&message, flags))
465  return false;
466 #endif
467  more = message.more();
468  add(std::move(message));
469  }
470  return true;
471  }
472 
473  // Send multipart message to socket
474  bool send(socket_ref socket, int flags = 0)
475  {
476  flags &= ~(ZMQ_SNDMORE);
477  bool more = size() > 0;
478  while (more) {
479  message_t message = pop();
480  more = size() > 0;
481 #ifdef ZMQ_CPP11
482  if (!socket.send(message, static_cast<send_flags>(
483  (more ? ZMQ_SNDMORE : 0) | flags)))
484  return false;
485 #else
486  if (!socket.send(message, (more ? ZMQ_SNDMORE : 0) | flags))
487  return false;
488 #endif
489  }
490  clear();
491  return true;
492  }
493 
494  // Concatenate other multipart to front
495  void prepend(multipart_t &&other)
496  {
497  while (!other.empty())
498  push(other.remove());
499  }
500 
501  // Concatenate other multipart to back
502  void append(multipart_t &&other)
503  {
504  while (!other.empty())
505  add(other.pop());
506  }
507 
508  // Push memory block to front
509  void pushmem(const void *src, size_t size)
510  {
511  m_parts.push_front(message_t(src, size));
512  }
513 
514  // Push memory block to back
515  void addmem(const void *src, size_t size)
516  {
517  m_parts.push_back(message_t(src, size));
518  }
519 
520  // Push string to front
521  void pushstr(const std::string &string)
522  {
523  m_parts.push_front(message_t(string.data(), string.size()));
524  }
525 
526  // Push string to back
527  void addstr(const std::string &string)
528  {
529  m_parts.push_back(message_t(string.data(), string.size()));
530  }
531 
532  // Push type (fixed-size) to front
533  template<typename T> void pushtyp(const T &type)
534  {
536  "Use pushstr() instead of pushtyp<std::string>()");
537  m_parts.push_front(message_t(&type, sizeof(type)));
538  }
539 
540  // Push type (fixed-size) to back
541  template<typename T> void addtyp(const T &type)
542  {
544  "Use addstr() instead of addtyp<std::string>()");
545  m_parts.push_back(message_t(&type, sizeof(type)));
546  }
547 
548  // Push message part to front
549  void push(message_t &&message) { m_parts.push_front(std::move(message)); }
550 
551  // Push message part to back
552  void add(message_t &&message) { m_parts.push_back(std::move(message)); }
553 
554  // Alias to allow std::back_inserter()
555  void push_back(message_t &&message) { m_parts.push_back(std::move(message)); }
556 
557  // Pop string from front
558  std::string popstr()
559  {
560  std::string string(m_parts.front().data<char>(), m_parts.front().size());
561  m_parts.pop_front();
562  return string;
563  }
564 
565  // Pop type (fixed-size) from front
566  template<typename T> T poptyp()
567  {
569  "Use popstr() instead of poptyp<std::string>()");
570  if (sizeof(T) != m_parts.front().size())
571  throw std::runtime_error(
572  "Invalid type, size does not match the message size");
573  T type = *m_parts.front().data<T>();
574  m_parts.pop_front();
575  return type;
576  }
577 
578  // Pop message part from front
579  message_t pop()
580  {
581  message_t message = std::move(m_parts.front());
582  m_parts.pop_front();
583  return message;
584  }
585 
586  // Pop message part from back
587  message_t remove()
588  {
589  message_t message = std::move(m_parts.back());
590  m_parts.pop_back();
591  return message;
592  }
593 
594  // get message part from front
595  const message_t &front() { return m_parts.front(); }
596 
597  // get message part from back
598  const message_t &back() { return m_parts.back(); }
599 
600  // Get pointer to a specific message part
601  const message_t *peek(size_t index) const { return &m_parts[index]; }
602 
603  // Get a string copy of a specific message part
604  std::string peekstr(size_t index) const
605  {
606  std::string string(m_parts[index].data<char>(), m_parts[index].size());
607  return string;
608  }
609 
610  // Peek type (fixed-size) from front
611  template<typename T> T peektyp(size_t index) const
612  {
614  "Use peekstr() instead of peektyp<std::string>()");
615  if (sizeof(T) != m_parts[index].size())
616  throw std::runtime_error(
617  "Invalid type, size does not match the message size");
618  T type = *m_parts[index].data<T>();
619  return type;
620  }
621 
622  // Create multipart from type (fixed-size)
623  template<typename T> static multipart_t create(const T &type)
624  {
625  multipart_t multipart;
626  multipart.addtyp(type);
627  return multipart;
628  }
629 
630  // Copy multipart
631  multipart_t clone() const
632  {
633  multipart_t multipart;
634  for (size_t i = 0; i < size(); i++)
635  multipart.addmem(m_parts[i].data(), m_parts[i].size());
636  return multipart;
637  }
638 
639  // Dump content to string
640  std::string str() const
641  {
642  std::stringstream ss;
643  for (size_t i = 0; i < m_parts.size(); i++) {
644  const unsigned char *data = m_parts[i].data<unsigned char>();
645  size_t size = m_parts[i].size();
646 
647  // Dump the message as text or binary
648  bool isText = true;
649  for (size_t j = 0; j < size; j++) {
650  if (data[j] < 32 || data[j] > 127) {
651  isText = false;
652  break;
653  }
654  }
655  ss << "\n[" << std::dec << std::setw(3) << std::setfill('0') << size
656  << "] ";
657  if (size >= 1000) {
658  ss << "... (too big to print)";
659  continue;
660  }
661  for (size_t j = 0; j < size; j++) {
662  if (isText)
663  ss << static_cast<char>(data[j]);
664  else
665  ss << std::hex << std::setw(2) << std::setfill('0')
666  << static_cast<short>(data[j]);
667  }
668  }
669  return ss.str();
670  }
671 
672  // Check if equal to other multipart
673  bool equal(const multipart_t *other) const ZMQ_NOTHROW
674  {
675  return *this == *other;
676  }
677 
678  bool operator==(const multipart_t &other) const ZMQ_NOTHROW
679  {
680  if (size() != other.size())
681  return false;
682  for (size_t i = 0; i < size(); i++)
683  if (at(i) != other.at(i))
684  return false;
685  return true;
686  }
687 
688  bool operator!=(const multipart_t &other) const ZMQ_NOTHROW
689  {
690  return !(*this == other);
691  }
692 
693 #ifdef ZMQ_CPP11
694 
695  // Return single part message_t encoded from this multipart_t.
696  message_t encode() const { return zmq::encode(*this); }
697 
698  // Decode encoded message into multiple parts and append to self.
699  void decode_append(const message_t &encoded)
700  {
701  zmq::decode(encoded, std::back_inserter(*this));
702  }
703 
704  // Return a new multipart_t containing the decoded message_t.
705  static multipart_t decode(const message_t &encoded)
706  {
707  multipart_t tmp;
708  zmq::decode(encoded, std::back_inserter(tmp));
709  return tmp;
710  }
711 
712 #endif
713 
714  private:
715  // Disable implicit copying (moving is more efficient)
716  multipart_t(const multipart_t &other) ZMQ_DELETED_FUNCTION;
717  void operator=(const multipart_t &other) ZMQ_DELETED_FUNCTION;
718 }; // class multipart_t
719 
720 inline std::ostream &operator<<(std::ostream &os, const multipart_t &msg)
721 {
722  return os << msg.str();
723 }
724 
725 #endif // ZMQ_HAS_RVALUE_REFS
726 
727 #if defined(ZMQ_BUILD_DRAFT_API) && defined(ZMQ_CPP11) && defined(ZMQ_HAVE_POLLER)
728 class active_poller_t
729 {
730  public:
731  active_poller_t() = default;
732  ~active_poller_t() = default;
733 
734  active_poller_t(const active_poller_t &) = delete;
735  active_poller_t &operator=(const active_poller_t &) = delete;
736 
737  active_poller_t(active_poller_t &&src) = default;
738  active_poller_t &operator=(active_poller_t &&src) = default;
739 
740  using handler_type = std::function<void(event_flags)>;
741 
742  void add(zmq::socket_ref socket, event_flags events, handler_type handler)
743  {
744  const poller_ref_t ref{socket};
745 
746  if (!handler)
747  throw std::invalid_argument("null handler in active_poller_t::add (socket)");
748  auto ret = handlers.emplace(
749  ref, std::make_shared<handler_type>(std::move(handler)));
750  if (!ret.second)
751  throw error_t(EINVAL); // already added
752  try {
753  base_poller.add(socket, events, ret.first->second.get());
754  need_rebuild = true;
755  }
756  catch (...) {
757  // rollback
758  handlers.erase(ref);
759  throw;
760  }
761  }
762 
763  void add(fd_t fd, event_flags events, handler_type handler)
764  {
765  const poller_ref_t ref{fd};
766 
767  if (!handler)
768  throw std::invalid_argument("null handler in active_poller_t::add (fd)");
769  auto ret = handlers.emplace(
770  ref, std::make_shared<handler_type>(std::move(handler)));
771  if (!ret.second)
772  throw error_t(EINVAL); // already added
773  try {
774  base_poller.add(fd, events, ret.first->second.get());
775  need_rebuild = true;
776  }
777  catch (...) {
778  // rollback
779  handlers.erase(ref);
780  throw;
781  }
782  }
783 
784  void remove(zmq::socket_ref socket)
785  {
786  base_poller.remove(socket);
787  handlers.erase(socket);
788  need_rebuild = true;
789  }
790 
791  void remove(fd_t fd)
792  {
793  base_poller.remove(fd);
794  handlers.erase(fd);
795  need_rebuild = true;
796  }
797 
798  void modify(zmq::socket_ref socket, event_flags events)
799  {
800  base_poller.modify(socket, events);
801  }
802 
803  void modify(fd_t fd, event_flags events)
804  {
805  base_poller.modify(fd, events);
806  }
807 
808  size_t wait(std::chrono::milliseconds timeout)
809  {
810  if (need_rebuild) {
811  poller_events.resize(handlers.size());
812  poller_handlers.clear();
813  poller_handlers.reserve(handlers.size());
814  for (const auto &handler : handlers) {
815  poller_handlers.push_back(handler.second);
816  }
817  need_rebuild = false;
818  }
819  const auto count = base_poller.wait_all(poller_events, timeout);
820  std::for_each(poller_events.begin(),
821  poller_events.begin() + static_cast<ptrdiff_t>(count),
822  [](decltype(base_poller)::event_type &event) {
823  assert(event.user_data != nullptr);
824  (*event.user_data)(event.events);
825  });
826  return count;
827  }
828 
829  ZMQ_NODISCARD bool empty() const noexcept { return handlers.empty(); }
830 
831  size_t size() const noexcept { return handlers.size(); }
832 
833  private:
834  bool need_rebuild{false};
835 
836  poller_t<handler_type> base_poller{};
837 
838  std::unordered_map<zmq::poller_ref_t, std::shared_ptr<handler_type>> handlers{};
839 
840  std::vector<decltype(base_poller)::event_type> poller_events{};
841  std::vector<std::shared_ptr<handler_type>> poller_handlers{};
842 }; // class active_poller_t
843 #endif // defined(ZMQ_BUILD_DRAFT_API) && defined(ZMQ_CPP11) && defined(ZMQ_HAVE_POLLER)
844 
845 
846 } // namespace zmq
847 
848 #endif // __ZMQ_ADDON_HPP_INCLUDED__
ZMQ_NODISCARD
#define ZMQ_NODISCARD
Definition: zmq.hpp:77
benchmarks.python.py_benchmark.const
const
Definition: py_benchmark.py:14
ZMQ_CONSTEXPR_IF
#define ZMQ_CONSTEXPR_IF
Definition: zmq.hpp:105
end
GLuint GLuint end
Definition: glcorearb.h:2858
src
GLenum src
Definition: glcorearb.h:3364
EINVAL
#define EINVAL
Definition: errno.hpp:25
ZMQ_DELETED_FUNCTION
#define ZMQ_DELETED_FUNCTION
Definition: zmq.hpp:180
s
XmlRpcServer s
string
GLsizei const GLchar *const * string
Definition: glcorearb.h:3083
ZMQ_CONSTEXPR_VAR
#define ZMQ_CONSTEXPR_VAR
Definition: zmq.hpp:94
send
void send(fd_t fd_, const char(&data_)[N])
Definition: test_security_curve.cpp:209
T
#define T(upbtypeconst, upbtype, ctype, default_value)
zmq::operator==
bool operator==(const detail::socket_base &a, const detail::socket_base &b) ZMQ_NOTHROW
Definition: zmq.hpp:2146
zmq::socket_ref
Definition: zmq.hpp:2114
flags
GLbitfield flags
Definition: glcorearb.h:3585
zmq::operator<<
std::ostream & operator<<(std::ostream &os, const message_t &msg)
Definition: zmq.hpp:2792
zmq::fd_t
int fd_t
Definition: zmq.hpp:287
begin
static size_t begin(const upb_table *t)
Definition: php/ext/google/protobuf/upb.c:4898
zmq.hpp
zmq
Definition: zmq.hpp:229
mask
GLint GLuint mask
Definition: glcorearb.h:2789
event
struct _cl_event * event
Definition: glcorearb.h:4163
update_failure_list.str
str
Definition: update_failure_list.py:41
size
#define size
Definition: glcorearb.h:2944
timeout
GLbitfield GLuint64 timeout
Definition: glcorearb.h:3588
source
GLsizei GLsizei GLchar * source
Definition: glcorearb.h:3072
msgs
static const upb_msgdef msgs[22]
Definition: ruby/ext/google/protobuf_c/upb.c:7670
buf
GLenum GLuint GLenum GLsizei const GLchar * buf
Definition: glcorearb.h:4175
ZMQ_NOTHROW
#define ZMQ_NOTHROW
Definition: zmq.hpp:89
void
typedef void(APIENTRY *GLDEBUGPROCARB)(GLenum source
n
GLdouble n
Definition: glcorearb.h:4153
i
int i
Definition: gmock-matchers_test.cc:764
type
GLenum type
Definition: glcorearb.h:2695
v
const GLdouble * v
Definition: glcorearb.h:3106
append
ROSCPP_DECL std::string append(const std::string &left, const std::string &right)
value_type
zend_class_entry * value_type
Definition: php/ext/google/protobuf/message.c:2546
size
GLsizeiptr size
Definition: glcorearb.h:2943
zmq::operator!=
bool operator!=(const detail::socket_base &a, const detail::socket_base &b) ZMQ_NOTHROW
Definition: zmq.hpp:2150
pop
static upb_refcounted * pop(tarjan *t)
Definition: ruby/ext/google/protobuf_c/upb.c:5904
ZMQ_SNDMORE
#define ZMQ_SNDMORE
Definition: zmq.h:359
fd_t
zmq_fd_t fd_t
Definition: libzmq/tests/testutil.hpp:98
push
static void push(tarjan *t, const upb_refcounted *r)
Definition: ruby/ext/google/protobuf_c/upb.c:5890
handler
void * handler
Definition: test_security_curve.cpp:27
data
GLint GLenum GLsizei GLsizei GLsizei GLint GLsizei const GLvoid * data
Definition: glcorearb.h:2879
Range
Range(1<< 0, 1<< 10)
next
static size_t next(const upb_table *t, size_t i)
Definition: php/ext/google/protobuf/upb.c:4889
value
GLsizei const GLfloat * value
Definition: glcorearb.h:3093
count
GLint GLsizei count
Definition: glcorearb.h:2830
ref
GLint ref
Definition: glcorearb.h:2789
index
GLuint index
Definition: glcorearb.h:3055
it
MapIter it
Definition: php/ext/google/protobuf/map.c:205
message
GLenum GLuint GLenum GLsizei const GLchar * message
Definition: glcorearb.h:2695
h
GLfloat GLfloat GLfloat GLfloat h
Definition: glcorearb.h:4147


libaditof
Author(s):
autogenerated on Wed May 21 2025 02:07:02