24 #ifndef __ZMQ_ADDON_HPP_INCLUDED__
25 #define __ZMQ_ADDON_HPP_INCLUDED__
36 #include <unordered_map>
45 template<
bool CheckN,
class OutputIt>
47 recv_multipart_n(socket_ref s, OutputIt out,
size_t n, recv_flags flags)
54 throw std::runtime_error(
55 "Too many message parts in recv_multipart_n");
57 if (!s.recv(msg, flags)) {
59 assert(msg_count == 0);
63 const bool more = msg.more();
64 *out++ = std::move(msg);
71 inline bool is_little_endian()
73 const uint16_t i = 0x01;
74 return *
reinterpret_cast<const uint8_t *
>(&i) == 0x01;
77 inline void write_network_order(
unsigned char *buf,
const uint32_t value)
79 if (is_little_endian()) {
81 *buf++ =
static_cast<unsigned char>((
value >> 24) & mask);
82 *buf++ =
static_cast<unsigned char>((
value >> 16) & mask);
83 *buf++ =
static_cast<unsigned char>((
value >> 8) & mask);
84 *buf++ =
static_cast<unsigned char>(
value & mask);
86 std::memcpy(buf, &value,
sizeof(value));
90 inline uint32_t read_u32_network_order(
const unsigned char *buf)
92 if (is_little_endian()) {
93 return (
static_cast<uint32_t
>(buf[0]) << 24)
94 + (
static_cast<uint32_t
>(buf[1]) << 16)
95 + (
static_cast<uint32_t
>(buf[2]) << 8)
96 +
static_cast<uint32_t
>(buf[3]);
99 std::memcpy(&value, buf,
sizeof(value));
117 template<
class OutputIt>
120 recv_flags flags = recv_flags::none)
122 return detail::recv_multipart_n<false>(s, std::move(out), 0, flags);
139 template<
class OutputIt>
143 recv_flags flags = recv_flags::none)
145 return detail::recv_multipart_n<true>(s, std::move(out),
n, flags);
161 #ifndef ZMQ_CPP11_PARTIAL
165 && (std::is_same<detail::range_value_t<Range>, message_t>::value
166 || detail::is_buffer<detail::range_value_t<Range>>::value)>::type
170 send_multipart(socket_ref s, Range &&msgs, send_flags flags = send_flags::none)
174 auto it =
begin(msgs);
175 const auto end_it =
end(msgs);
176 size_t msg_count = 0;
177 while (it != end_it) {
178 const auto next = std::next(it);
179 const auto msg_flags =
180 flags | (
next == end_it ? send_flags::none : send_flags::sndmore);
181 if (!s.send(*it, msg_flags)) {
183 assert(it ==
begin(msgs));
212 #ifndef ZMQ_CPP11_PARTIAL
216 && (std::is_same<detail::range_value_t<Range>, message_t>::value
217 || detail::is_buffer<detail::range_value_t<Range>>::value)>::type
220 message_t encode(
const Range &parts)
222 size_t mmsg_size = 0;
225 for (
const auto &part : parts) {
226 const size_t part_size = part.size();
227 if (part_size > (std::numeric_limits<std::uint32_t>::max)()) {
229 throw std::range_error(
"Invalid size, message part too large");
231 const size_t count_size =
232 part_size < (std::numeric_limits<std::uint8_t>::max)() ? 1 : 5;
233 mmsg_size += part_size + count_size;
236 message_t encoded(mmsg_size);
237 unsigned char *buf = encoded.data<
unsigned char>();
238 for (
const auto &part : parts) {
239 const uint32_t part_size =
static_cast<uint32_t
>(part.size());
240 const unsigned char *part_data =
241 static_cast<const unsigned char *
>(part.data());
243 if (part_size < (std::numeric_limits<std::uint8_t>::max)()) {
245 *buf++ = (
unsigned char) part_size;
248 *buf++ = (std::numeric_limits<uint8_t>::max)();
249 detail::write_network_order(buf, part_size);
250 buf +=
sizeof(part_size);
252 std::memcpy(buf, part_data, part_size);
256 assert(
static_cast<size_t>(buf - encoded.data<
unsigned char>()) == mmsg_size);
275 template<
class OutputIt> OutputIt decode(
const message_t &encoded, OutputIt out)
277 const unsigned char *source = encoded.data<
unsigned char>();
278 const unsigned char *
const limit = source + encoded.size();
280 while (source < limit) {
281 size_t part_size = *source++;
282 if (part_size == (std::numeric_limits<std::uint8_t>::max)()) {
283 if (
static_cast<size_t>(limit - source) <
sizeof(uint32_t)) {
284 throw std::out_of_range(
285 "Malformed encoding, overflow in reading size");
287 part_size = detail::read_u32_network_order(source);
289 source +=
sizeof(uint32_t);
292 if (
static_cast<size_t>(limit - source) < part_size) {
293 throw std::out_of_range(
"Malformed encoding, overflow in reading part");
295 *out = message_t(source, part_size);
300 assert(source == limit);
307 #ifdef ZMQ_HAS_RVALUE_REFS
319 std::deque<message_t> m_parts;
322 typedef std::deque<message_t>::value_type value_type;
324 typedef std::deque<message_t>::iterator iterator;
325 typedef std::deque<message_t>::const_iterator const_iterator;
327 typedef std::deque<message_t>::reverse_iterator reverse_iterator;
328 typedef std::deque<message_t>::const_reverse_iterator const_reverse_iterator;
334 multipart_t(socket_ref socket) { recv(socket); }
337 multipart_t(
const void *src,
size_t size) { addmem(src,
size); }
340 multipart_t(
const std::string &
string) { addstr(
string); }
343 multipart_t(message_t &&message) { add(std::move(message)); }
346 multipart_t(multipart_t &&other)
ZMQ_NOTHROW { m_parts = std::move(other.m_parts); }
349 multipart_t &operator=(multipart_t &&other)
ZMQ_NOTHROW
351 m_parts = std::move(other.m_parts);
356 virtual ~multipart_t() { clear(); }
358 message_t &operator[](
size_t n) {
return m_parts[
n]; }
360 const message_t &operator[](
size_t n)
const {
return m_parts[
n]; }
362 message_t &at(
size_t n) {
return m_parts.at(
n); }
364 const message_t &at(
size_t n)
const {
return m_parts.at(
n); }
366 iterator
begin() {
return m_parts.begin(); }
368 const_iterator
begin()
const {
return m_parts.begin(); }
370 const_iterator
cbegin()
const {
return m_parts.cbegin(); }
372 reverse_iterator rbegin() {
return m_parts.rbegin(); }
374 const_reverse_iterator rbegin()
const {
return m_parts.rbegin(); }
376 iterator
end() {
return m_parts.end(); }
378 const_iterator
end()
const {
return m_parts.end(); }
380 const_iterator
cend()
const {
return m_parts.cend(); }
382 reverse_iterator rend() {
return m_parts.rend(); }
384 const_reverse_iterator rend()
const {
return m_parts.rend(); }
387 void clear() { m_parts.clear(); }
390 size_t size()
const {
return m_parts.size(); }
393 bool empty()
const {
return m_parts.empty(); }
396 bool recv(socket_ref socket,
int flags = 0)
403 if (!socket.recv(message,
static_cast<recv_flags
>(flags)))
406 if (!socket.recv(&message, flags))
409 more = message.more();
410 add(std::move(message));
416 bool send(socket_ref socket,
int flags = 0)
418 flags &= ~(ZMQ_SNDMORE);
419 bool more =
size() > 0;
421 message_t message = pop();
424 if (!socket.send(message,
static_cast<send_flags
>(
425 (more ? ZMQ_SNDMORE : 0) | flags)))
428 if (!socket.send(message, (more ? ZMQ_SNDMORE : 0) | flags))
437 void prepend(multipart_t &&other)
439 while (!other.empty())
440 push(other.remove());
444 void append(multipart_t &&other)
446 while (!other.empty())
451 void pushmem(
const void *src,
size_t size)
453 m_parts.push_front(message_t(src,
size));
457 void addmem(
const void *src,
size_t size)
459 m_parts.push_back(message_t(src,
size));
463 void pushstr(
const std::string &
string)
465 m_parts.push_front(message_t(
string.data(),
string.
size()));
469 void addstr(
const std::string &
string)
471 m_parts.push_back(message_t(
string.data(),
string.
size()));
475 template<
typename T>
void pushtyp(
const T &type)
478 "Use pushstr() instead of pushtyp<std::string>()");
479 m_parts.push_front(message_t(&type,
sizeof(type)));
483 template<
typename T>
void addtyp(
const T &type)
486 "Use addstr() instead of addtyp<std::string>()");
487 m_parts.push_back(message_t(&type,
sizeof(type)));
491 void push(message_t &&message) { m_parts.push_front(std::move(message)); }
494 void add(message_t &&message) { m_parts.push_back(std::move(message)); }
497 void push_back(message_t &&message) { m_parts.push_back(std::move(message)); }
502 std::string
string(m_parts.front().data<
char>(), m_parts.front().size());
508 template<
typename T> T poptyp()
511 "Use popstr() instead of poptyp<std::string>()");
512 if (
sizeof(T) != m_parts.front().size())
513 throw std::runtime_error(
514 "Invalid type, size does not match the message size");
515 T type = *m_parts.front().data<T>();
523 message_t message = std::move(m_parts.front());
531 message_t message = std::move(m_parts.back());
537 const message_t &front() {
return m_parts.front(); }
540 const message_t &back() {
return m_parts.back(); }
543 const message_t *
peek(
size_t index)
const {
return &m_parts[index]; }
546 std::string peekstr(
size_t index)
const
548 std::string
string(m_parts[index].data<char>(), m_parts[index].
size());
553 template<
typename T> T peektyp(
size_t index)
const
556 "Use peekstr() instead of peektyp<std::string>()");
557 if (
sizeof(T) != m_parts[index].
size())
558 throw std::runtime_error(
559 "Invalid type, size does not match the message size");
560 T type = *m_parts[index].data<T>();
565 template<
typename T>
static multipart_t create(
const T &type)
567 multipart_t multipart;
568 multipart.addtyp(type);
573 multipart_t clone()
const
575 multipart_t multipart;
576 for (
size_t i = 0; i <
size(); i++)
577 multipart.addmem(m_parts[i].data(), m_parts[i].size());
582 std::string str()
const
584 std::stringstream ss;
585 for (
size_t i = 0; i < m_parts.size(); i++) {
586 const unsigned char *data = m_parts[i].data<
unsigned char>();
587 size_t size = m_parts[i].size();
591 for (
size_t j = 0; j <
size; j++) {
592 if (data[j] < 32 || data[j] > 127) {
597 ss <<
"\n[" << std::dec << std::setw(3) << std::setfill(
'0') <<
size
600 ss <<
"... (too big to print)";
603 for (
size_t j = 0; j <
size; j++) {
605 ss << static_cast<char>(data[j]);
607 ss <<
std::hex << std::setw(2) << std::setfill(
'0')
608 <<
static_cast<short>(data[j]);
617 return *
this == *other;
622 if (
size() != other.size())
624 for (
size_t i = 0; i <
size(); i++)
625 if (at(i) != other.at(i))
632 return !(*
this == other);
638 message_t encode()
const {
return zmq::encode(*
this); }
641 void decode_append(
const message_t &encoded)
643 zmq::decode(encoded, std::back_inserter(*
this));
647 static multipart_t decode(
const message_t &encoded)
650 zmq::decode(encoded, std::back_inserter(tmp));
662 inline std::ostream &
operator<<(std::ostream &os,
const multipart_t &msg)
664 return os << msg.str();
667 #endif // ZMQ_HAS_RVALUE_REFS
669 #if defined(ZMQ_BUILD_DRAFT_API) && defined(ZMQ_CPP11) && defined(ZMQ_HAVE_POLLER)
670 class active_poller_t
673 active_poller_t() =
default;
674 ~active_poller_t() =
default;
676 active_poller_t(
const active_poller_t &) =
delete;
677 active_poller_t &operator=(
const active_poller_t &) =
delete;
679 active_poller_t(active_poller_t &&src) =
default;
680 active_poller_t &operator=(active_poller_t &&src) =
default;
682 using handler_type = std::function<
void(event_flags)>;
684 void add(
zmq::socket_ref socket, event_flags events, handler_type handler)
687 throw std::invalid_argument(
"null handler in active_poller_t::add");
688 auto ret = handlers.emplace(
689 socket, std::make_shared<handler_type>(std::move(handler)));
691 throw error_t(EINVAL);
693 base_poller.add(socket, events, ret.first->second.get());
698 handlers.erase(socket);
705 base_poller.remove(socket);
706 handlers.erase(socket);
712 base_poller.modify(socket, events);
715 size_t wait(std::chrono::milliseconds timeout)
718 poller_events.resize(handlers.size());
719 poller_handlers.clear();
720 poller_handlers.reserve(handlers.size());
721 for (
const auto &handler : handlers) {
722 poller_handlers.push_back(handler.second);
724 need_rebuild =
false;
726 const auto count = base_poller.wait_all(poller_events, timeout);
727 std::for_each(poller_events.begin(),
728 poller_events.begin() +
static_cast<ptrdiff_t
>(
count),
729 [](decltype(base_poller)::event_type &event) {
730 assert(event.user_data !=
nullptr);
731 (*
event.user_data)(event.events);
738 size_t size() const noexcept {
return handlers.size(); }
741 bool need_rebuild{
false};
743 poller_t<handler_type> base_poller{};
744 std::unordered_map<socket_ref, std::shared_ptr<handler_type>> handlers{};
745 std::vector<decltype(base_poller)::event_type> poller_events{};
746 std::vector<std::shared_ptr<handler_type>> poller_handlers{};
748 #endif // defined(ZMQ_BUILD_DRAFT_API) && defined(ZMQ_CPP11) && defined(ZMQ_HAVE_POLLER)
753 #endif // __ZMQ_ADDON_HPP_INCLUDED__