radio.cpp
Go to the documentation of this file.
1 /* SPDX-License-Identifier: MPL-2.0 */
2 
3 #include "precompiled.hpp"
4 #include <string.h>
5 
6 #include "radio.hpp"
7 #include "macros.hpp"
8 #include "pipe.hpp"
9 #include "err.hpp"
10 #include "msg.hpp"
11 
12 zmq::radio_t::radio_t (class ctx_t *parent_, uint32_t tid_, int sid_) :
13  socket_base_t (parent_, tid_, sid_, true), _lossy (true)
14 {
15  options.type = ZMQ_RADIO;
16 }
17 
18 zmq::radio_t::~radio_t ()
19 {
20 }
21 
22 void zmq::radio_t::xattach_pipe (pipe_t *pipe_,
23  bool subscribe_to_all_,
24  bool locally_initiated_)
25 {
26  LIBZMQ_UNUSED (subscribe_to_all_);
27  LIBZMQ_UNUSED (locally_initiated_);
28 
29  zmq_assert (pipe_);
30 
31  // Don't delay pipe termination as there is no one
32  // to receive the delimiter.
33  pipe_->set_nodelay ();
34 
35  _dist.attach (pipe_);
36 
37  if (subscribe_to_all_)
38  _udp_pipes.push_back (pipe_);
39  // The pipe is active when attached. Let's read the subscriptions from
40  // it, if any.
41  else
42  xread_activated (pipe_);
43 }
44 
45 void zmq::radio_t::xread_activated (pipe_t *pipe_)
46 {
47  // There are some subscriptions waiting. Let's process them.
48  msg_t msg;
49  while (pipe_->read (&msg)) {
50  // Apply the subscription to the trie
51  if (msg.is_join () || msg.is_leave ()) {
52  std::string group = std::string (msg.group ());
53 
54  if (msg.is_join ())
55  _subscriptions.ZMQ_MAP_INSERT_OR_EMPLACE (ZMQ_MOVE (group),
56  pipe_);
57  else {
58  std::pair<subscriptions_t::iterator, subscriptions_t::iterator>
59  range = _subscriptions.equal_range (group);
60 
61  for (subscriptions_t::iterator it = range.first;
62  it != range.second; ++it) {
63  if (it->second == pipe_) {
64  _subscriptions.erase (it);
65  break;
66  }
67  }
68  }
69  }
70  msg.close ();
71  }
72 }
73 
74 void zmq::radio_t::xwrite_activated (pipe_t *pipe_)
75 {
76  _dist.activated (pipe_);
77 }
78 int zmq::radio_t::xsetsockopt (int option_,
79  const void *optval_,
80  size_t optvallen_)
81 {
82  if (optvallen_ != sizeof (int) || *static_cast<const int *> (optval_) < 0) {
83  errno = EINVAL;
84  return -1;
85  }
86  if (option_ == ZMQ_XPUB_NODROP)
87  _lossy = (*static_cast<const int *> (optval_) == 0);
88  else {
89  errno = EINVAL;
90  return -1;
91  }
92  return 0;
93 }
94 
95 void zmq::radio_t::xpipe_terminated (pipe_t *pipe_)
96 {
97  for (subscriptions_t::iterator it = _subscriptions.begin (),
98  end = _subscriptions.end ();
99  it != end;) {
100  if (it->second == pipe_) {
101 #if __cplusplus >= 201103L || (defined _MSC_VER && _MSC_VER >= 1700)
102  it = _subscriptions.erase (it);
103 #else
104  _subscriptions.erase (it++);
105 #endif
106  } else {
107  ++it;
108  }
109  }
110 
111  {
112  const udp_pipes_t::iterator end = _udp_pipes.end ();
113  const udp_pipes_t::iterator it =
114  std::find (_udp_pipes.begin (), end, pipe_);
115  if (it != end)
116  _udp_pipes.erase (it);
117  }
118 
119  _dist.pipe_terminated (pipe_);
120 }
121 
122 int zmq::radio_t::xsend (msg_t *msg_)
123 {
124  // Radio sockets do not allow multipart data (ZMQ_SNDMORE)
125  if (msg_->flags () & msg_t::more) {
126  errno = EINVAL;
127  return -1;
128  }
129 
130  _dist.unmatch ();
131 
132  const std::pair<subscriptions_t::iterator, subscriptions_t::iterator>
133  range = _subscriptions.equal_range (std::string (msg_->group ()));
134 
135  for (subscriptions_t::iterator it = range.first; it != range.second; ++it)
136  _dist.match (it->second);
137 
138  for (udp_pipes_t::iterator it = _udp_pipes.begin (),
139  end = _udp_pipes.end ();
140  it != end; ++it)
141  _dist.match (*it);
142 
143  int rc = -1;
144  if (_lossy || _dist.check_hwm ()) {
145  if (_dist.send_to_matching (msg_) == 0) {
146  rc = 0; // Yay, sent successfully
147  }
148  } else
149  errno = EAGAIN;
150 
151  return rc;
152 }
153 
154 bool zmq::radio_t::xhas_out ()
155 {
156  return _dist.has_out ();
157 }
158 
159 int zmq::radio_t::xrecv (msg_t *msg_)
160 {
161  // Messages cannot be received from PUB socket.
162  LIBZMQ_UNUSED (msg_);
163  errno = ENOTSUP;
164  return -1;
165 }
166 
167 bool zmq::radio_t::xhas_in ()
168 {
169  return false;
170 }
171 
172 zmq::radio_session_t::radio_session_t (io_thread_t *io_thread_,
173  bool connect_,
174  socket_base_t *socket_,
175  const options_t &options_,
176  address_t *addr_) :
177  session_base_t (io_thread_, connect_, socket_, options_, addr_),
178  _state (group)
179 {
180 }
181 
182 zmq::radio_session_t::~radio_session_t ()
183 {
184 }
185 
186 int zmq::radio_session_t::push_msg (msg_t *msg_)
187 {
188  if (msg_->flags () & msg_t::command) {
189  char *command_data = static_cast<char *> (msg_->data ());
190  const size_t data_size = msg_->size ();
191 
192  int group_length;
193  const char *group;
194 
195  msg_t join_leave_msg;
196  int rc;
197 
198  // Set the msg type to either JOIN or LEAVE
199  if (data_size >= 5 && memcmp (command_data, "\4JOIN", 5) == 0) {
200  group_length = static_cast<int> (data_size) - 5;
201  group = command_data + 5;
202  rc = join_leave_msg.init_join ();
203  } else if (data_size >= 6 && memcmp (command_data, "\5LEAVE", 6) == 0) {
204  group_length = static_cast<int> (data_size) - 6;
205  group = command_data + 6;
206  rc = join_leave_msg.init_leave ();
207  }
208  // If it is not a JOIN or LEAVE just push the message
209  else
210  return session_base_t::push_msg (msg_);
211 
212  errno_assert (rc == 0);
213 
214  // Set the group
215  rc = join_leave_msg.set_group (group, group_length);
216  errno_assert (rc == 0);
217 
218  // Close the current command
219  rc = msg_->close ();
220  errno_assert (rc == 0);
221 
222  // Push the join or leave command
223  *msg_ = join_leave_msg;
224  return session_base_t::push_msg (msg_);
225  }
226  return session_base_t::push_msg (msg_);
227 }
228 
229 int zmq::radio_session_t::pull_msg (msg_t *msg_)
230 {
231  if (_state == group) {
232  int rc = session_base_t::pull_msg (&_pending_msg);
233  if (rc != 0)
234  return rc;
235 
236  const char *group = _pending_msg.group ();
237  const int length = static_cast<int> (strlen (group));
238 
239  // First frame is the group
240  rc = msg_->init_size (length);
241  errno_assert (rc == 0);
242  msg_->set_flags (msg_t::more);
243  memcpy (msg_->data (), group, length);
244 
245  // Next status is the body
246  _state = body;
247  return 0;
248  }
249  *msg_ = _pending_msg;
250  _state = group;
251  return 0;
252 }
253 
254 void zmq::radio_session_t::reset ()
255 {
256  session_base_t::reset ();
257  _state = group;
258 }
end
GLuint GLuint end
Definition: glcorearb.h:2858
ENOTSUP
#define ENOTSUP
Definition: zmq.h:104
length
GLenum GLuint GLenum GLsizei length
Definition: glcorearb.h:2695
options
Message * options
Definition: src/google/protobuf/descriptor.cc:3119
EINVAL
#define EINVAL
Definition: errno.hpp:25
EAGAIN
#define EAGAIN
Definition: errno.hpp:14
ZMQ_RADIO
#define ZMQ_RADIO
Definition: zmq_draft.h:16
precompiled.hpp
zmq_assert
#define zmq_assert(x)
Definition: err.hpp:102
string
GLsizei const GLchar *const * string
Definition: glcorearb.h:3083
errno
int errno
range
GLenum GLint * range
Definition: glcorearb.h:3963
errno_assert
#define errno_assert(x)
Definition: err.hpp:113
macros.hpp
LIBZMQ_UNUSED
#define LIBZMQ_UNUSED(object)
Definition: macros.hpp:6
pipe.hpp
msg.hpp
err.hpp
ZMQ_MOVE
#define ZMQ_MOVE(x)
Definition: blob.hpp:33
true
#define true
Definition: cJSON.c:65
group
static uint32_t * group(tarjan *t, upb_refcounted *r)
Definition: ruby/ext/google/protobuf_c/upb.c:5943
it
MapIter it
Definition: php/ext/google/protobuf/map.c:205
radio.hpp
options_
DebugStringOptions options_
Definition: src/google/protobuf/descriptor.cc:2410
ZMQ_XPUB_NODROP
#define ZMQ_XPUB_NODROP
Definition: zmq.h:330


libaditof
Author(s):
autogenerated on Wed May 21 2025 02:06:58