v2_decoder.cpp
Go to the documentation of this file.
1 /* SPDX-License-Identifier: MPL-2.0 */
2 
3 #include "precompiled.hpp"
4 #include <stdlib.h>
5 #include <string.h>
6 #include <cmath>
7 
8 #include "v2_protocol.hpp"
9 #include "v2_decoder.hpp"
10 #include "likely.hpp"
11 #include "wire.hpp"
12 #include "err.hpp"
13 
14 zmq::v2_decoder_t::v2_decoder_t (size_t bufsize_,
15  int64_t maxmsgsize_,
16  bool zero_copy_) :
17  decoder_base_t<v2_decoder_t, shared_message_memory_allocator> (bufsize_),
18  _msg_flags (0),
19  _zero_copy (zero_copy_),
20  _max_msg_size (maxmsgsize_)
21 {
22  int rc = _in_progress.init ();
23  errno_assert (rc == 0);
24 
25  // At the beginning, read one byte and go to flags_ready state.
26  next_step (_tmpbuf, 1, &v2_decoder_t::flags_ready);
27 }
28 
29 zmq::v2_decoder_t::~v2_decoder_t ()
30 {
31  const int rc = _in_progress.close ();
32  errno_assert (rc == 0);
33 }
34 
35 int zmq::v2_decoder_t::flags_ready (unsigned char const *)
36 {
37  _msg_flags = 0;
38  if (_tmpbuf[0] & v2_protocol_t::more_flag)
39  _msg_flags |= msg_t::more;
40  if (_tmpbuf[0] & v2_protocol_t::command_flag)
41  _msg_flags |= msg_t::command;
42 
43  // The payload length is either one or eight bytes,
44  // depending on whether the 'large' bit is set.
45  if (_tmpbuf[0] & v2_protocol_t::large_flag)
46  next_step (_tmpbuf, 8, &v2_decoder_t::eight_byte_size_ready);
47  else
48  next_step (_tmpbuf, 1, &v2_decoder_t::one_byte_size_ready);
49 
50  return 0;
51 }
52 
53 int zmq::v2_decoder_t::one_byte_size_ready (unsigned char const *read_from_)
54 {
55  return size_ready (_tmpbuf[0], read_from_);
56 }
57 
58 int zmq::v2_decoder_t::eight_byte_size_ready (unsigned char const *read_from_)
59 {
60  // The payload size is encoded as 64-bit unsigned integer.
61  // The most significant byte comes first.
62  const uint64_t msg_size = get_uint64 (_tmpbuf);
63 
64  return size_ready (msg_size, read_from_);
65 }
66 
67 int zmq::v2_decoder_t::size_ready (uint64_t msg_size_,
68  unsigned char const *read_pos_)
69 {
70  // Message size must not exceed the maximum allowed size.
71  if (_max_msg_size >= 0)
72  if (unlikely (msg_size_ > static_cast<uint64_t> (_max_msg_size))) {
73  errno = EMSGSIZE;
74  return -1;
75  }
76 
77  // Message size must fit into size_t data type.
78  if (unlikely (msg_size_ != static_cast<size_t> (msg_size_))) {
79  errno = EMSGSIZE;
80  return -1;
81  }
82 
83  int rc = _in_progress.close ();
84  assert (rc == 0);
85 
86  // the current message can exceed the current buffer. We have to copy the buffer
87  // data into a new message and complete it in the next receive.
88 
89  shared_message_memory_allocator &allocator = get_allocator ();
90  if (unlikely (!_zero_copy
91  || msg_size_ > static_cast<size_t> (
92  allocator.data () + allocator.size () - read_pos_))) {
93  // a new message has started, but the size would exceed the pre-allocated arena
94  // this happens every time when a message does not fit completely into the buffer
95  rc = _in_progress.init_size (static_cast<size_t> (msg_size_));
96  } else {
97  // construct message using n bytes from the buffer as storage
98  // increase buffer ref count
99  // if the message will be a large message, pass a valid refcnt memory location as well
100  rc =
101  _in_progress.init (const_cast<unsigned char *> (read_pos_),
102  static_cast<size_t> (msg_size_),
104  allocator.buffer (), allocator.provide_content ());
105 
106  // For small messages, data has been copied and refcount does not have to be increased
107  if (_in_progress.is_zcmsg ()) {
108  allocator.advance_content ();
109  allocator.inc_ref ();
110  }
111  }
112 
113  if (unlikely (rc)) {
114  errno_assert (errno == ENOMEM);
115  rc = _in_progress.init ();
116  errno_assert (rc == 0);
117  errno = ENOMEM;
118  return -1;
119  }
120 
121  _in_progress.set_flags (_msg_flags);
122  // this sets read_pos to
123  // the message data address if the data needs to be copied
124  // for small message / messages exceeding the current buffer
125  // or
126  // to the current start address in the buffer because the message
127  // was constructed to use n bytes from the address passed as argument
128  next_step (_in_progress.data (), _in_progress.size (),
129  &v2_decoder_t::message_ready);
130 
131  return 0;
132 }
133 
134 int zmq::v2_decoder_t::message_ready (unsigned char const *)
135 {
136  // Message is completely read. Signal this to the caller
137  // and prepare to decode next message.
138  next_step (_tmpbuf, 1, &v2_decoder_t::flags_ready);
139  return 1;
140 }
zmq::msg_t::command
@ command
Definition: msg.hpp:56
zmq::shared_message_memory_allocator::call_dec_ref
static void call_dec_ref(void *, void *hint_)
Definition: decoder_allocators.cpp:103
precompiled.hpp
errno
int errno
zmq::ZMQ_FINAL::_tmpbuf
unsigned char _tmpbuf[8]
Definition: v1_decoder.hpp:26
wire.hpp
errno_assert
#define errno_assert(x)
Definition: err.hpp:113
zmq::v2_protocol_t::command_flag
@ command_flag
Definition: v2_protocol.hpp:17
v2_decoder.hpp
zmq::v2_protocol_t::large_flag
@ large_flag
Definition: v2_protocol.hpp:16
zmq::ZMQ_FINAL::_in_progress
msg_t _in_progress
Definition: raw_decoder.hpp:32
zmq::msg_t::init
int init()
Definition: msg.cpp:50
EMSGSIZE
#define EMSGSIZE
Definition: zmq.h:131
zmq::msg_t::more
@ more
Definition: msg.hpp:55
zmq::get_uint64
uint64_t get_uint64(const unsigned char *buffer_)
Definition: wire.hpp:63
err.hpp
likely.hpp
v2_protocol.hpp
zmq::v2_protocol_t::more_flag
@ more_flag
Definition: v2_protocol.hpp:15
unlikely
#define unlikely(x)
Definition: likely.hpp:11


libaditof
Author(s):
autogenerated on Wed May 21 2025 02:07:01