24 #ifndef __ZMQ_ADDON_HPP_INCLUDED__
25 #define __ZMQ_ADDON_HPP_INCLUDED__
36 #include <unordered_map>
50 poller_ref_t() : poller_ref_t(socket_ref{})
62 hash_combine(
h, std::get<0>(
data));
63 hash_combine(
h, std::get<1>(
data));
64 hash_combine(
h, std::get<2>(
data));
70 return data == o.data;
75 static void hash_combine(std::size_t& seed,
const T&
v)
ZMQ_NOTHROW
78 seed ^= hasher(
v) + 0x9e3779b9 + (seed<<6) + (seed>>2);
81 std::tuple<int, zmq::socket_ref, zmq::fd_t>
data;
88 template <>
struct std::hash<
zmq::poller_ref_t>
103 template<
bool CheckN,
class OutputIt>
105 recv_multipart_n(socket_ref s, OutputIt out,
size_t n, recv_flags
flags)
107 size_t msg_count = 0;
112 throw std::runtime_error(
113 "Too many message parts in recv_multipart_n");
115 if (!
s.recv(msg,
flags)) {
117 assert(msg_count == 0);
121 const bool more = msg.more();
122 *out++ = std::move(msg);
129 inline bool is_little_endian()
131 const uint16_t
i = 0x01;
132 return *
reinterpret_cast<const uint8_t *
>(&
i) == 0x01;
135 inline void write_network_order(
unsigned char *
buf,
const uint32_t
value)
137 if (is_little_endian()) {
139 *
buf++ =
static_cast<unsigned char>((
value >> 24) &
mask);
140 *
buf++ =
static_cast<unsigned char>((
value >> 16) &
mask);
141 *
buf++ =
static_cast<unsigned char>((
value >> 8) &
mask);
148 inline uint32_t read_u32_network_order(
const unsigned char *
buf)
150 if (is_little_endian()) {
151 return (
static_cast<uint32_t
>(
buf[0]) << 24)
152 + (
static_cast<uint32_t
>(
buf[1]) << 16)
153 + (
static_cast<uint32_t
>(
buf[2]) << 8)
154 +
static_cast<uint32_t
>(
buf[3]);
175 template<
class OutputIt>
178 recv_flags
flags = recv_flags::none)
180 return detail::recv_multipart_n<false>(s, std::move(out), 0,
flags);
197 template<
class OutputIt>
201 recv_flags
flags = recv_flags::none)
203 return detail::recv_multipart_n<true>(s, std::move(out),
n,
flags);
219 #ifndef ZMQ_CPP11_PARTIAL
221 typename =
typename std::enable_if<
223 && (std::is_same<detail::range_value_t<Range>, message_t>
::value
224 || detail::is_buffer<detail::range_value_t<Range>>
::value)>::
type
228 send_multipart(socket_ref s,
Range &&
msgs, send_flags
flags = send_flags::none)
234 size_t msg_count = 0;
235 while (
it != end_it) {
237 const auto msg_flags =
238 flags | (
next == end_it ? send_flags::none : send_flags::sndmore);
239 if (!
s.send(*
it, msg_flags)) {
270 #ifndef ZMQ_CPP11_PARTIAL
272 typename =
typename std::enable_if<
274 && (std::is_same<detail::range_value_t<Range>, message_t>
::value
275 || detail::is_buffer<detail::range_value_t<Range>>
::value)>::
type
278 message_t encode(
const Range &parts)
280 size_t mmsg_size = 0;
283 for (
const auto &part : parts) {
284 const size_t part_size = part.size();
285 if (part_size > (std::numeric_limits<std::uint32_t>::max)()) {
287 throw std::range_error(
"Invalid size, message part too large");
289 const size_t count_size =
290 part_size < (std::numeric_limits<std::uint8_t>::max)() ? 1 : 5;
291 mmsg_size += part_size + count_size;
294 message_t encoded(mmsg_size);
295 unsigned char *
buf = encoded.data<
unsigned char>();
296 for (
const auto &part : parts) {
297 const uint32_t part_size =
static_cast<uint32_t
>(part.size());
298 const unsigned char *part_data =
299 static_cast<const unsigned char *
>(part.data());
301 if (part_size < (std::numeric_limits<std::uint8_t>::max)()) {
303 *
buf++ = (
unsigned char) part_size;
306 *
buf++ = (std::numeric_limits<uint8_t>::max)();
307 detail::write_network_order(
buf, part_size);
308 buf +=
sizeof(part_size);
310 std::memcpy(
buf, part_data, part_size);
314 assert(
static_cast<size_t>(
buf - encoded.data<
unsigned char>()) == mmsg_size);
333 template<
class OutputIt> OutputIt decode(
const message_t &encoded, OutputIt out)
335 const unsigned char *
source = encoded.data<
unsigned char>();
336 const unsigned char *
const limit =
source + encoded.size();
339 size_t part_size = *
source++;
340 if (part_size == (std::numeric_limits<std::uint8_t>::max)()) {
341 if (
static_cast<size_t>(limit -
source) <
sizeof(uint32_t)) {
342 throw std::out_of_range(
343 "Malformed encoding, overflow in reading size");
345 part_size = detail::read_u32_network_order(
source);
347 source +=
sizeof(uint32_t);
350 if (
static_cast<size_t>(limit -
source) < part_size) {
351 throw std::out_of_range(
"Malformed encoding, overflow in reading part");
353 *out = message_t(
source, part_size);
365 #ifdef ZMQ_HAS_RVALUE_REFS
377 std::deque<message_t> m_parts;
382 typedef std::deque<message_t>::iterator iterator;
383 typedef std::deque<message_t>::const_iterator const_iterator;
385 typedef std::deque<message_t>::reverse_iterator reverse_iterator;
386 typedef std::deque<message_t>::const_reverse_iterator const_reverse_iterator;
392 multipart_t(socket_ref socket) { recv(socket); }
398 multipart_t(
const std::string &
string) { addstr(
string); }
404 multipart_t(multipart_t &&other)
ZMQ_NOTHROW { m_parts = std::move(other.m_parts); }
407 multipart_t &operator=(multipart_t &&other)
ZMQ_NOTHROW
409 m_parts = std::move(other.m_parts);
414 virtual ~multipart_t() { clear(); }
416 message_t &operator[](
size_t n) {
return m_parts[
n]; }
418 const message_t &operator[](
size_t n)
const {
return m_parts[
n]; }
420 message_t &at(
size_t n) {
return m_parts.at(
n); }
422 const message_t &at(
size_t n)
const {
return m_parts.at(
n); }
424 iterator
begin() {
return m_parts.begin(); }
426 const_iterator
begin()
const {
return m_parts.begin(); }
428 const_iterator cbegin()
const {
return m_parts.cbegin(); }
430 reverse_iterator rbegin() {
return m_parts.rbegin(); }
432 const_reverse_iterator rbegin()
const {
return m_parts.rbegin(); }
434 iterator
end() {
return m_parts.end(); }
436 const_iterator
end()
const {
return m_parts.end(); }
438 const_iterator cend()
const {
return m_parts.cend(); }
440 reverse_iterator rend() {
return m_parts.rend(); }
442 const_reverse_iterator rend()
const {
return m_parts.rend(); }
445 void clear() { m_parts.clear(); }
448 size_t size()
const {
return m_parts.size(); }
451 bool empty()
const {
return m_parts.empty(); }
454 bool recv(socket_ref socket,
int flags = 0)
461 if (!socket.recv(
message,
static_cast<recv_flags
>(
flags)))
474 bool send(socket_ref socket,
int flags = 0)
477 bool more =
size() > 0;
482 if (!socket.send(
message,
static_cast<send_flags
>(
495 void prepend(multipart_t &&other)
497 while (!other.empty())
498 push(other.remove());
502 void append(multipart_t &&other)
504 while (!other.empty())
509 void pushmem(
const void *
src,
size_t size)
511 m_parts.push_front(message_t(
src,
size));
515 void addmem(
const void *
src,
size_t size)
517 m_parts.push_back(message_t(
src,
size));
523 m_parts.push_front(message_t(
string.
data(),
string.
size()));
529 m_parts.push_back(message_t(
string.
data(),
string.
size()));
533 template<
typename T>
void pushtyp(
const T &
type)
536 "Use pushstr() instead of pushtyp<std::string>()");
537 m_parts.push_front(message_t(&
type,
sizeof(
type)));
541 template<
typename T>
void addtyp(
const T &
type)
544 "Use addstr() instead of addtyp<std::string>()");
545 m_parts.push_back(message_t(&
type,
sizeof(
type)));
552 void add(message_t &&
message) { m_parts.push_back(std::move(
message)); }
555 void push_back(message_t &&
message) { m_parts.push_back(std::move(
message)); }
566 template<
typename T>
T poptyp()
569 "Use popstr() instead of poptyp<std::string>()");
570 if (
sizeof(
T) != m_parts.front().size())
571 throw std::runtime_error(
572 "Invalid type, size does not match the message size");
573 T type = *m_parts.front().data<
T>();
581 message_t
message = std::move(m_parts.front());
589 message_t
message = std::move(m_parts.back());
595 const message_t &front() {
return m_parts.front(); }
598 const message_t &back() {
return m_parts.back(); }
601 const message_t *peek(
size_t index)
const {
return &m_parts[
index]; }
611 template<
typename T>
T peektyp(
size_t index)
const
614 "Use peekstr() instead of peektyp<std::string>()");
616 throw std::runtime_error(
617 "Invalid type, size does not match the message size");
623 template<
typename T>
static multipart_t create(
const T &
type)
625 multipart_t multipart;
626 multipart.addtyp(
type);
631 multipart_t clone()
const
633 multipart_t multipart;
634 for (
size_t i = 0;
i <
size();
i++)
635 multipart.addmem(m_parts[
i].data(), m_parts[
i].size());
642 std::stringstream ss;
643 for (
size_t i = 0;
i < m_parts.size();
i++) {
644 const unsigned char *
data = m_parts[
i].data<
unsigned char>();
645 size_t size = m_parts[
i].size();
649 for (
size_t j = 0; j <
size; j++) {
650 if (
data[j] < 32 ||
data[j] > 127) {
655 ss <<
"\n[" << std::dec << std::setw(3) << std::setfill(
'0') <<
size
658 ss <<
"... (too big to print)";
661 for (
size_t j = 0; j <
size; j++) {
663 ss << static_cast<char>(
data[j]);
665 ss << std::hex << std::setw(2) << std::setfill(
'0')
666 <<
static_cast<short>(
data[j]);
673 bool equal(
const multipart_t *other)
const ZMQ_NOTHROW
675 return *
this == *other;
680 if (
size() != other.size())
682 for (
size_t i = 0;
i <
size();
i++)
683 if (at(
i) != other.at(
i))
690 return !(*
this == other);
696 message_t encode()
const {
return zmq::encode(*
this); }
699 void decode_append(
const message_t &encoded)
701 zmq::decode(encoded, std::back_inserter(*
this));
705 static multipart_t decode(
const message_t &encoded)
708 zmq::decode(encoded, std::back_inserter(tmp));
720 inline std::ostream &
operator<<(std::ostream &os,
const multipart_t &msg)
722 return os << msg.str();
725 #endif // ZMQ_HAS_RVALUE_REFS
727 #if defined(ZMQ_BUILD_DRAFT_API) && defined(ZMQ_CPP11) && defined(ZMQ_HAVE_POLLER)
728 class active_poller_t
731 active_poller_t() =
default;
732 ~active_poller_t() =
default;
734 active_poller_t(
const active_poller_t &) =
delete;
735 active_poller_t &operator=(
const active_poller_t &) =
delete;
737 active_poller_t(active_poller_t &&
src) =
default;
738 active_poller_t &operator=(active_poller_t &&
src) =
default;
740 using handler_type = std::function<
void(event_flags)>;
744 const poller_ref_t
ref{socket};
747 throw std::invalid_argument(
"null handler in active_poller_t::add (socket)");
748 auto ret = handlers.emplace(
749 ref, std::make_shared<handler_type>(std::move(
handler)));
753 base_poller.add(socket, events, ret.first->second.get());
763 void add(
fd_t fd, event_flags events, handler_type
handler)
765 const poller_ref_t
ref{fd};
768 throw std::invalid_argument(
"null handler in active_poller_t::add (fd)");
769 auto ret = handlers.emplace(
770 ref, std::make_shared<handler_type>(std::move(
handler)));
774 base_poller.add(fd, events, ret.first->second.get());
786 base_poller.remove(socket);
787 handlers.erase(socket);
793 base_poller.remove(fd);
800 base_poller.modify(socket, events);
803 void modify(
fd_t fd, event_flags events)
805 base_poller.modify(fd, events);
808 size_t wait(std::chrono::milliseconds
timeout)
811 poller_events.resize(handlers.size());
812 poller_handlers.clear();
813 poller_handlers.reserve(handlers.size());
814 for (
const auto &
handler : handlers) {
815 poller_handlers.push_back(
handler.second);
817 need_rebuild =
false;
819 const auto count = base_poller.wait_all(poller_events,
timeout);
820 std::for_each(poller_events.begin(),
821 poller_events.begin() +
static_cast<ptrdiff_t
>(
count),
822 [](decltype(base_poller)::event_type &
event) {
823 assert(
event.user_data !=
nullptr);
824 (*
event.user_data)(
event.events);
831 size_t size()
const noexcept {
return handlers.size(); }
834 bool need_rebuild{
false};
836 poller_t<handler_type> base_poller{};
838 std::unordered_map<zmq::poller_ref_t, std::shared_ptr<handler_type>> handlers{};
840 std::vector<decltype(base_poller)::event_type> poller_events{};
841 std::vector<std::shared_ptr<handler_type>> poller_handlers{};
843 #endif // defined(ZMQ_BUILD_DRAFT_API) && defined(ZMQ_CPP11) && defined(ZMQ_HAVE_POLLER)
848 #endif // __ZMQ_ADDON_HPP_INCLUDED__