14 zmq::v2_decoder_t::v2_decoder_t (
size_t bufsize_,
17 decoder_base_t<v2_decoder_t, shared_message_memory_allocator> (bufsize_),
19 _zero_copy (zero_copy_),
20 _max_msg_size (maxmsgsize_)
26 next_step (
_tmpbuf, 1, &v2_decoder_t::flags_ready);
29 zmq::v2_decoder_t::~v2_decoder_t ()
31 const int rc = _in_progress.close ();
35 int zmq::v2_decoder_t::flags_ready (
unsigned char const *)
46 next_step (_tmpbuf, 8, &v2_decoder_t::eight_byte_size_ready);
48 next_step (_tmpbuf, 1, &v2_decoder_t::one_byte_size_ready);
53 int zmq::v2_decoder_t::one_byte_size_ready (
unsigned char const *read_from_)
55 return size_ready (_tmpbuf[0], read_from_);
58 int zmq::v2_decoder_t::eight_byte_size_ready (
unsigned char const *read_from_)
62 const uint64_t msg_size =
get_uint64 (_tmpbuf);
64 return size_ready (msg_size, read_from_);
67 int zmq::v2_decoder_t::size_ready (uint64_t msg_size_,
68 unsigned char const *read_pos_)
71 if (_max_msg_size >= 0)
72 if (
unlikely (msg_size_ >
static_cast<uint64_t
> (_max_msg_size))) {
78 if (
unlikely (msg_size_ !=
static_cast<size_t> (msg_size_))) {
83 int rc = _in_progress.close ();
89 shared_message_memory_allocator &allocator = get_allocator ();
91 || msg_size_ >
static_cast<size_t> (
92 allocator.data () + allocator.size () - read_pos_))) {
95 rc = _in_progress.init_size (
static_cast<size_t> (msg_size_));
101 _in_progress.init (
const_cast<unsigned char *
> (read_pos_),
102 static_cast<size_t> (msg_size_),
104 allocator.buffer (), allocator.provide_content ());
107 if (_in_progress.is_zcmsg ()) {
108 allocator.advance_content ();
109 allocator.inc_ref ();
115 rc = _in_progress.init ();
121 _in_progress.set_flags (_msg_flags);
128 next_step (_in_progress.data (), _in_progress.size (),
129 &v2_decoder_t::message_ready);
134 int zmq::v2_decoder_t::message_ready (
unsigned char const *)
138 next_step (_tmpbuf, 1, &v2_decoder_t::flags_ready);