msg.hpp
Go to the documentation of this file.
1 /* SPDX-License-Identifier: MPL-2.0 */
2 
3 #ifndef __ZMQ_MSG_HPP_INCLUDE__
4 #define __ZMQ_MSG_HPP_INCLUDE__
5 
6 #include <stddef.h>
7 #include <stdio.h>
8 
9 #include "config.hpp"
10 #include "err.hpp"
11 #include "fd.hpp"
12 #include "atomic_counter.hpp"
13 #include "metadata.hpp"
14 
15 // bits 2-5
16 #define CMD_TYPE_MASK 0x1c
17 
18 // Signature for free function to deallocate the message content.
19 // Note that it has to be declared as "C" so that it is the same as
20 // zmq_free_fn defined in zmq.h.
21 extern "C" {
22 typedef void (msg_free_fn) (void *data_, void *hint_);
23 }
24 
25 namespace zmq
26 {
27 // Note that this structure needs to be explicitly constructed
28 // (init functions) and destructed (close function).
29 
30 static const char cancel_cmd_name[] = "\6CANCEL";
31 static const char sub_cmd_name[] = "\x9SUBSCRIBE";
32 
33 class msg_t
34 {
35  public:
36  // Shared message buffer. Message data are either allocated in one
37  // continuous block along with this structure - thus avoiding one
38  // malloc/free pair or they are stored in user-supplied memory.
39  // In the latter case, ffn member stores pointer to the function to be
40  // used to deallocate the data. If the buffer is actually shared (there
41  // are at least 2 references to it) refcount member contains number of
42  // references.
43  struct content_t
44  {
45  void *data;
46  size_t size;
48  void *hint;
50  };
51 
52  // Message flags.
53  enum
54  {
55  more = 1, // Followed by more parts
56  command = 2, // Command frame (see ZMTP spec)
57  // Command types, use only bits 2-5 and compare with ==, not bitwise,
58  // a command can never be of more that one type at the same time
59  ping = 4,
60  pong = 8,
61  subscribe = 12,
62  cancel = 16,
63  close_cmd = 20,
64  credential = 32,
65  routing_id = 64,
66  shared = 128
67  };
68 
69  bool check () const;
70  int init ();
71 
72  int init (void *data_,
73  size_t size_,
74  msg_free_fn *ffn_,
75  void *hint_,
76  content_t *content_ = NULL);
77 
78  int init_size (size_t size_);
79  int init_buffer (const void *buf_, size_t size_);
80  int init_data (void *data_, size_t size_, msg_free_fn *ffn_, void *hint_);
81  int init_external_storage (content_t *content_,
82  void *data_,
83  size_t size_,
84  msg_free_fn *ffn_,
85  void *hint_);
86  int init_delimiter ();
87  int init_join ();
88  int init_leave ();
89  int init_subscribe (const size_t size_, const unsigned char *topic);
90  int init_cancel (const size_t size_, const unsigned char *topic);
91  int close ();
92  int move (msg_t &src_);
93  int copy (msg_t &src_);
94  void *data ();
95  size_t size () const;
96  unsigned char flags () const;
97  void set_flags (unsigned char flags_);
98  void reset_flags (unsigned char flags_);
99  metadata_t *metadata () const;
100  void set_metadata (metadata_t *metadata_);
101  void reset_metadata ();
102  bool is_routing_id () const;
103  bool is_credential () const;
104  bool is_delimiter () const;
105  bool is_join () const;
106  bool is_leave () const;
107  bool is_ping () const;
108  bool is_pong () const;
109  bool is_close_cmd () const;
110 
111  // These are called on each message received by the session_base class,
112  // so get them inlined to avoid the overhead of 2 function calls per msg
113  bool is_subscribe () const
114  {
115  return (_u.base.flags & CMD_TYPE_MASK) == subscribe;
116  }
117 
118  bool is_cancel () const
119  {
120  return (_u.base.flags & CMD_TYPE_MASK) == cancel;
121  }
122 
123  size_t command_body_size () const;
124  void *command_body ();
125  bool is_vsm () const;
126  bool is_cmsg () const;
127  bool is_lmsg () const;
128  bool is_zcmsg () const;
129  uint32_t get_routing_id () const;
130  int set_routing_id (uint32_t routing_id_);
131  int reset_routing_id ();
132  const char *group () const;
133  int set_group (const char *group_);
134  int set_group (const char *, size_t length_);
135 
136  // After calling this function you can copy the message in POD-style
137  // refs_ times. No need to call copy.
138  void add_refs (int refs_);
139 
140  // Removes references previously added by add_refs. If the number of
141  // references drops to 0, the message is closed and false is returned.
142  bool rm_refs (int refs_);
143 
144  void shrink (size_t new_size_);
145 
146  // Size in bytes of the largest message that is still copied around
147  // rather than being reference-counted.
148  enum
149  {
151  };
152  enum
153  {
155  msg_t_size - (sizeof (metadata_t *) + 3 + 16 + sizeof (uint32_t))
156  };
157  enum
158  {
159  ping_cmd_name_size = 5, // 4PING
160  cancel_cmd_name_size = 7, // 6CANCEL
161  sub_cmd_name_size = 10 // 9SUBSCRIBE
162  };
163 
164  private:
166 
167  // Different message types.
168  enum type_t
169  {
170  type_min = 101,
171  // VSM messages store the content in the message itself
172  type_vsm = 101,
173  // LMSG messages store the content in malloc-ed memory
174  type_lmsg = 102,
175  // Delimiter messages are used in envelopes
177  // CMSG messages point to constant data
178  type_cmsg = 104,
179 
180  // zero-copy LMSG message for v2_decoder
181  type_zclmsg = 105,
182 
183  // Join message for radio_dish
184  type_join = 106,
185 
186  // Leave message for radio_dish
187  type_leave = 107,
188 
189  type_max = 107
190  };
191 
193  {
196  };
197 
199  {
202  };
203 
204  union group_t
205  {
206  unsigned char type;
207  struct
208  {
209  unsigned char type;
210  char group[15];
211  } sgroup;
212  struct
213  {
214  unsigned char type;
216  } lgroup;
217  };
218 
219  // Note that fields shared between different message types are not
220  // moved to the parent class (msg_t). This way we get tighter packing
221  // of the data. Shared fields can be accessed via 'base' member of
222  // the union.
223  union
224  {
225  struct
226  {
228  unsigned char unused[msg_t_size
229  - (sizeof (metadata_t *) + 2
230  + sizeof (uint32_t) + sizeof (group_t))];
231  unsigned char type;
232  unsigned char flags;
233  uint32_t routing_id;
235  } base;
236  struct
237  {
239  unsigned char data[max_vsm_size];
240  unsigned char size;
241  unsigned char type;
242  unsigned char flags;
243  uint32_t routing_id;
244  group_t group;
245  } vsm;
246  struct
247  {
250  unsigned char
252  - (sizeof (metadata_t *) + sizeof (content_t *) + 2
253  + sizeof (uint32_t) + sizeof (group_t))];
254  unsigned char type;
255  unsigned char flags;
256  uint32_t routing_id;
257  group_t group;
258  } lmsg;
259  struct
260  {
262  content_t *content;
263  unsigned char
265  - (sizeof (metadata_t *) + sizeof (content_t *) + 2
266  + sizeof (uint32_t) + sizeof (group_t))];
267  unsigned char type;
268  unsigned char flags;
269  uint32_t routing_id;
270  group_t group;
271  } zclmsg;
272  struct
273  {
274  metadata_t *metadata;
275  void *data;
276  size_t size;
277  unsigned char unused[msg_t_size
278  - (sizeof (metadata_t *) + sizeof (void *)
279  + sizeof (size_t) + 2 + sizeof (uint32_t)
280  + sizeof (group_t))];
281  unsigned char type;
282  unsigned char flags;
283  uint32_t routing_id;
284  group_t group;
285  } cmsg;
286  struct
287  {
289  unsigned char unused[msg_t_size
290  - (sizeof (metadata_t *) + 2
291  + sizeof (uint32_t) + sizeof (group_t))];
292  unsigned char type;
293  unsigned char flags;
294  uint32_t routing_id;
295  group_t group;
296  } delimiter;
297  } _u;
298 };
299 
300 inline int close_and_return (zmq::msg_t *msg_, int echo_)
301 {
302  // Since we abort on close failure we preserve errno for success case.
303  const int err = errno;
304  const int rc = msg_->close ();
305  errno_assert (rc == 0);
306  errno = err;
307  return echo_;
308 }
309 
310 inline int close_and_return (zmq::msg_t msg_[], int count_, int echo_)
311 {
312  for (int i = 0; i < count_; i++)
313  close_and_return (&msg_[i], 0);
314  return echo_;
315 }
316 }
317 
318 #endif
zmq::msg_t::init_subscribe
int init_subscribe(const size_t size_, const unsigned char *topic)
Definition: msg.cpp:212
zmq::msg_t::long_group_t::refcnt
atomic_counter_t refcnt
Definition: msg.hpp:201
zmq::msg_t::content_t::size
size_t size
Definition: msg.hpp:46
zmq::msg_t::type_lmsg
@ type_lmsg
Definition: msg.hpp:174
zmq::msg_t::command
@ command
Definition: msg.hpp:56
zmq::msg_t::lmsg
struct zmq::msg_t::@52::@57 lmsg
zmq::msg_t::is_leave
bool is_leave() const
Definition: msg.cpp:506
zmq::msg_t::group_type_long
@ group_type_long
Definition: msg.hpp:195
zmq::msg_t::cmsg
struct zmq::msg_t::@52::@59 cmsg
data_
StringPiece data_
Definition: bytestream_unittest.cc:60
zmq::msg_t::data
void * data
Definition: msg.hpp:275
zmq::msg_t::type
unsigned char type
Definition: msg.hpp:231
zmq::msg_t::subscribe
@ subscribe
Definition: msg.hpp:61
zmq::msg_t::group_t::type
unsigned char type
Definition: msg.hpp:206
zmq::msg_t::init_buffer
int init_buffer(const void *buf_, size_t size_)
Definition: msg.cpp:97
zmq::msg_t::is_vsm
bool is_vsm() const
Definition: msg.cpp:481
zmq::msg_t::group_type_short
@ group_type_short
Definition: msg.hpp:194
atomic_counter.hpp
zmq::msg_t::reset_routing_id
int reset_routing_id()
Definition: msg.cpp:643
NULL
NULL
Definition: test_security_zap.cpp:405
zmq::msg_t::type_cmsg
@ type_cmsg
Definition: msg.hpp:178
config.hpp
zmq::msg_t::type_vsm
@ type_vsm
Definition: msg.hpp:172
zmq::msg_t::group_type_t
group_type_t
Definition: msg.hpp:192
zmq::msg_t::group_t
Definition: msg.hpp:204
zmq::msg_t::cancel_cmd_name_size
@ cancel_cmd_name_size
Definition: msg.hpp:160
zmq::msg_t::init_data
int init_data(void *data_, size_t size_, msg_free_fn *ffn_, void *hint_)
Definition: msg.cpp:137
zmq::msg_t::copy
int copy(msg_t &src_)
Definition: msg.cpp:326
zmq::msg_t::sub_cmd_name_size
@ sub_cmd_name_size
Definition: msg.hpp:161
zmq::cancel_cmd_name
static const char cancel_cmd_name[]
Definition: msg.hpp:30
errno
int errno
zmq::msg_t::type_max
@ type_max
Definition: msg.hpp:189
zmq::msg_t::_u
union zmq::msg_t::@52 _u
ZMQ_GROUP_MAX_LENGTH
#define ZMQ_GROUP_MAX_LENGTH
Definition: zmq.h:368
zmq::msg_t::base
struct zmq::msg_t::@52::@55 base
zmq::msg_t::move
int move(msg_t &src_)
Definition: msg.cpp:305
zmq::msg_t::init_cancel
int init_cancel(const size_t size_, const unsigned char *topic)
Definition: msg.cpp:227
zmq::msg_t::init_size
int init_size(size_t size_)
Definition: msg.cpp:62
zmq::msg_t::group_t::content
long_group_t * content
Definition: msg.hpp:215
zmq
Definition: zmq.hpp:229
zmq::msg_t::is_credential
bool is_credential() const
Definition: msg.cpp:471
errno_assert
#define errno_assert(x)
Definition: err.hpp:113
zmq::msg_t::cancel
@ cancel
Definition: msg.hpp:62
zmq::msg_t::is_routing_id
bool is_routing_id() const
Definition: msg.cpp:466
zmq::msg_t::get_routing_id
uint32_t get_routing_id() const
Definition: msg.cpp:628
zmq::msg_t::flags
unsigned char flags
Definition: msg.hpp:232
zmq::msg_t::type_leave
@ type_leave
Definition: msg.hpp:187
zmq::msg_t::type_t
type_t
Definition: msg.hpp:168
zmq::msg_t::close
int close()
Definition: msg.cpp:242
zmq::atomic_counter_t
Definition: atomic_counter.hpp:61
zmq::msg_t::shared
@ shared
Definition: msg.hpp:66
zmq::msg_t::is_lmsg
bool is_lmsg() const
Definition: msg.cpp:491
zmq::msg_t::add_refs
void add_refs(int refs_)
Definition: msg.cpp:561
zmq::msg_t::set_metadata
void set_metadata(metadata_t *metadata_)
Definition: msg.cpp:448
err
static UPB_NORETURN void err(tarjan *t)
Definition: ruby/ext/google/protobuf_c/upb.c:5856
zmq::close_and_return
int close_and_return(zmq::msg_t *msg_, int echo_)
Definition: msg.hpp:300
zmq::msg_t::content_t
Definition: msg.hpp:43
zmq::msg_t::check
bool check() const
Definition: msg.cpp:24
fd.hpp
zmq::msg_t::credential
@ credential
Definition: msg.hpp:64
zmq::msg_t::content_t::data
void * data
Definition: msg.hpp:45
zmq::msg_t::is_zcmsg
bool is_zcmsg() const
Definition: msg.cpp:496
CMD_TYPE_MASK
#define CMD_TYPE_MASK
Definition: msg.hpp:16
zmq::msg_t::init_leave
int init_leave()
Definition: msg.cpp:201
zmq::msg_t::group
group_t group
Definition: msg.hpp:234
zmq::msg_t::init_delimiter
int init_delimiter()
Definition: msg.cpp:179
void
typedef void(APIENTRY *GLDEBUGPROCARB)(GLenum source
zmq::msg_t::content_t::ffn
msg_free_fn * ffn
Definition: msg.hpp:47
zmq::msg_t::init
int init()
Definition: msg.cpp:50
i
int i
Definition: gmock-matchers_test.cc:764
zmq::msg_t::shrink
void shrink(size_t new_size_)
Definition: msg.cpp:404
zmq::metadata_t
Definition: metadata.hpp:13
zmq::msg_t::is_join
bool is_join() const
Definition: msg.cpp:501
zmq::msg_t::reset_metadata
void reset_metadata()
Definition: msg.cpp:456
zmq::msg_t::command_body
void * command_body()
Definition: msg.cpp:541
zmq::msg_t::group_t::lgroup
struct zmq::msg_t::group_t::@54 lgroup
zmq::msg_t::delimiter
struct zmq::msg_t::@52::@60 delimiter
zmq::msg_t::more
@ more
Definition: msg.hpp:55
zmq::msg_t::routing_id
uint32_t routing_id
Definition: msg.hpp:233
metadata.hpp
zmq::msg_t::command_body_size
size_t command_body_size() const
Definition: msg.cpp:526
zmq::msg_t::reset_flags
void reset_flags(unsigned char flags_)
Definition: msg.cpp:438
zmq::msg_t::set_routing_id
int set_routing_id(uint32_t routing_id_)
Definition: msg.cpp:633
zmq::msg_t::unused
unsigned char unused[msg_t_size -(sizeof(metadata_t *)+2+sizeof(uint32_t)+sizeof(group_t))]
Definition: msg.hpp:230
zmq::msg_t::init_join
int init_join()
Definition: msg.cpp:190
zmq::msg_t::type_delimiter
@ type_delimiter
Definition: msg.hpp:176
zmq::msg_t::content
content_t * content
Definition: msg.hpp:249
zmq::msg_t::content_t::hint
void * hint
Definition: msg.hpp:48
zmq::msg_t::init_external_storage
int init_external_storage(content_t *content_, void *data_, size_t size_, msg_free_fn *ffn_, void *hint_)
Definition: msg.cpp:111
zmq::msg_t::metadata
metadata_t * metadata
Definition: msg.hpp:227
zmq::msg_t::long_group_t
Definition: msg.hpp:198
zmq::msg_t::zclmsg
struct zmq::msg_t::@52::@58 zclmsg
err.hpp
zmq::msg_t::type_min
@ type_min
Definition: msg.hpp:170
zmq::msg_t::max_vsm_size
@ max_vsm_size
Definition: msg.hpp:154
zmq::msg_t::msg_t_size
@ msg_t_size
Definition: msg.hpp:150
zmq::msg_t::rm_refs
bool rm_refs(int refs_)
Definition: msg.cpp:584
zmq::msg_t::ping_cmd_name_size
@ ping_cmd_name_size
Definition: msg.hpp:159
zmq::sub_cmd_name
static const char sub_cmd_name[]
Definition: msg.hpp:31
zmq::msg_t::is_ping
bool is_ping() const
Definition: msg.cpp:511
zmq::msg_t::long_group_t::group
char group[ZMQ_GROUP_MAX_LENGTH+1]
Definition: msg.hpp:200
zmq::msg_t::type_zclmsg
@ type_zclmsg
Definition: msg.hpp:181
data
GLint GLenum GLsizei GLsizei GLsizei GLint GLsizei const GLvoid * data
Definition: glcorearb.h:2879
zmq::msg_t::vsm
struct zmq::msg_t::@52::@56 vsm
zmq::msg_t::is_pong
bool is_pong() const
Definition: msg.cpp:516
zmq::msg_t::is_delimiter
bool is_delimiter() const
Definition: msg.cpp:476
zmq::msg_t::pong
@ pong
Definition: msg.hpp:60
zmq::msg_t::ping
@ ping
Definition: msg.hpp:59
zmq::msg_t::content_t::refcnt
zmq::atomic_counter_t refcnt
Definition: msg.hpp:49
zmq::msg_t::size
unsigned char size
Definition: msg.hpp:240
zmq::msg_t::group_t::sgroup
struct zmq::msg_t::group_t::@53 sgroup
zmq::msg_t::is_subscribe
bool is_subscribe() const
Definition: msg.hpp:113
zmq::msg_t::data
unsigned char data[max_vsm_size]
Definition: msg.hpp:239
zmq::msg_t::is_cancel
bool is_cancel() const
Definition: msg.hpp:118
zmq::msg_t::refcnt
zmq::atomic_counter_t * refcnt()
Definition: msg.cpp:687
zmq::msg_t::is_cmsg
bool is_cmsg() const
Definition: msg.cpp:486
zmq::msg_t::group_t::group
char group[15]
Definition: msg.hpp:210
msg_free_fn
void() msg_free_fn(void *data_, void *hint_)
Definition: msg.hpp:22
zmq::msg_t::set_flags
void set_flags(unsigned char flags_)
Definition: msg.cpp:433
zmq::msg_t::is_close_cmd
bool is_close_cmd() const
Definition: msg.cpp:521
zmq::msg_t::set_group
int set_group(const char *group_)
Definition: msg.cpp:656
zmq::msg_t::type_join
@ type_join
Definition: msg.hpp:184
zmq::msg_t
Definition: msg.hpp:33
zmq::msg_t::close_cmd
@ close_cmd
Definition: msg.hpp:63


libaditof
Author(s):
autogenerated on Wed May 21 2025 02:06:57