dish.cpp
Go to the documentation of this file.
1 /* SPDX-License-Identifier: MPL-2.0 */
2 
3 #include "precompiled.hpp"
4 #include <string.h>
5 
6 #include "macros.hpp"
7 #include "dish.hpp"
8 #include "err.hpp"
9 
10 zmq::dish_t::dish_t (class ctx_t *parent_, uint32_t tid_, int sid_) :
11  socket_base_t (parent_, tid_, sid_, true), _has_message (false)
12 {
13  options.type = ZMQ_DISH;
14 
15  // When socket is being closed down we don't want to wait till pending
16  // subscription commands are sent to the wire.
17  options.linger.store (0);
18 
19  const int rc = _message.init ();
20  errno_assert (rc == 0);
21 }
22 
23 zmq::dish_t::~dish_t ()
24 {
25  const int rc = _message.close ();
26  errno_assert (rc == 0);
27 }
28 
29 void zmq::dish_t::xattach_pipe (pipe_t *pipe_,
30  bool subscribe_to_all_,
31  bool locally_initiated_)
32 {
33  LIBZMQ_UNUSED (subscribe_to_all_);
34  LIBZMQ_UNUSED (locally_initiated_);
35 
36  zmq_assert (pipe_);
37  _fq.attach (pipe_);
38  _dist.attach (pipe_);
39 
40  // Send all the cached subscriptions to the new upstream peer.
41  send_subscriptions (pipe_);
42 }
43 
44 void zmq::dish_t::xread_activated (pipe_t *pipe_)
45 {
46  _fq.activated (pipe_);
47 }
48 
49 void zmq::dish_t::xwrite_activated (pipe_t *pipe_)
50 {
51  _dist.activated (pipe_);
52 }
53 
54 void zmq::dish_t::xpipe_terminated (pipe_t *pipe_)
55 {
56  _fq.pipe_terminated (pipe_);
57  _dist.pipe_terminated (pipe_);
58 }
59 
60 void zmq::dish_t::xhiccuped (pipe_t *pipe_)
61 {
62  // Send all the cached subscriptions to the hiccuped pipe.
63  send_subscriptions (pipe_);
64 }
65 
66 int zmq::dish_t::xjoin (const char *group_)
67 {
68  const std::string group = std::string (group_);
69 
70  if (group.length () > ZMQ_GROUP_MAX_LENGTH) {
71  errno = EINVAL;
72  return -1;
73  }
74 
75  // User cannot join same group twice
76  if (!_subscriptions.insert (group).second) {
77  errno = EINVAL;
78  return -1;
79  }
80 
81  msg_t msg;
82  int rc = msg.init_join ();
83  errno_assert (rc == 0);
84 
85  rc = msg.set_group (group_);
86  errno_assert (rc == 0);
87 
88  int err = 0;
89  rc = _dist.send_to_all (&msg);
90  if (rc != 0)
91  err = errno;
92  const int rc2 = msg.close ();
93  errno_assert (rc2 == 0);
94  if (rc != 0)
95  errno = err;
96  return rc;
97 }
98 
99 int zmq::dish_t::xleave (const char *group_)
100 {
101  const std::string group = std::string (group_);
102 
103  if (group.length () > ZMQ_GROUP_MAX_LENGTH) {
104  errno = EINVAL;
105  return -1;
106  }
107 
108  if (0 == _subscriptions.erase (group)) {
109  errno = EINVAL;
110  return -1;
111  }
112 
113  msg_t msg;
114  int rc = msg.init_leave ();
115  errno_assert (rc == 0);
116 
117  rc = msg.set_group (group_);
118  errno_assert (rc == 0);
119 
120  int err = 0;
121  rc = _dist.send_to_all (&msg);
122  if (rc != 0)
123  err = errno;
124  const int rc2 = msg.close ();
125  errno_assert (rc2 == 0);
126  if (rc != 0)
127  errno = err;
128  return rc;
129 }
130 
131 int zmq::dish_t::xsend (msg_t *msg_)
132 {
133  LIBZMQ_UNUSED (msg_);
134  errno = ENOTSUP;
135  return -1;
136 }
137 
138 bool zmq::dish_t::xhas_out ()
139 {
140  // Subscription can be added/removed anytime.
141  return true;
142 }
143 
144 int zmq::dish_t::xrecv (msg_t *msg_)
145 {
146  // If there's already a message prepared by a previous call to zmq_poll,
147  // return it straight ahead.
148  if (_has_message) {
149  const int rc = msg_->move (_message);
150  errno_assert (rc == 0);
151  _has_message = false;
152  return 0;
153  }
154 
155  return xxrecv (msg_);
156 }
157 
158 int zmq::dish_t::xxrecv (msg_t *msg_)
159 {
160  do {
161  // Get a message using fair queueing algorithm.
162  const int rc = _fq.recv (msg_);
163 
164  // If there's no message available, return immediately.
165  // The same when error occurs.
166  if (rc != 0)
167  return -1;
168 
169  // Skip non matching messages
170  } while (0 == _subscriptions.count (std::string (msg_->group ())));
171 
172  // Found a matching message
173  return 0;
174 }
175 
176 bool zmq::dish_t::xhas_in ()
177 {
178  // If there's already a message prepared by a previous call to zmq_poll,
179  // return straight ahead.
180  if (_has_message)
181  return true;
182 
183  const int rc = xxrecv (&_message);
184  if (rc != 0) {
186  return false;
187  }
188 
189  // Matching message found
190  _has_message = true;
191  return true;
192 }
193 
194 void zmq::dish_t::send_subscriptions (pipe_t *pipe_)
195 {
196  for (subscriptions_t::iterator it = _subscriptions.begin (),
197  end = _subscriptions.end ();
198  it != end; ++it) {
199  msg_t msg;
200  int rc = msg.init_join ();
201  errno_assert (rc == 0);
202 
203  rc = msg.set_group (it->c_str ());
204  errno_assert (rc == 0);
205 
206  // Send it to the pipe.
207  pipe_->write (&msg);
208  }
209 
210  pipe_->flush ();
211 }
212 
213 zmq::dish_session_t::dish_session_t (io_thread_t *io_thread_,
214  bool connect_,
215  socket_base_t *socket_,
216  const options_t &options_,
217  address_t *addr_) :
218  session_base_t (io_thread_, connect_, socket_, options_, addr_),
219  _state (group)
220 {
221 }
222 
223 zmq::dish_session_t::~dish_session_t ()
224 {
225 }
226 
227 int zmq::dish_session_t::push_msg (msg_t *msg_)
228 {
229  if (_state == group) {
230  if ((msg_->flags () & msg_t::more) != msg_t::more) {
231  errno = EFAULT;
232  return -1;
233  }
234 
235  if (msg_->size () > ZMQ_GROUP_MAX_LENGTH) {
236  errno = EFAULT;
237  return -1;
238  }
239 
240  _group_msg = *msg_;
241  _state = body;
242 
243  const int rc = msg_->init ();
244  errno_assert (rc == 0);
245  return 0;
246  }
247  const char *group_setting = msg_->group ();
248  int rc;
249  if (group_setting[0] != 0)
250  goto has_group;
251 
252  // Set the message group
253  rc = msg_->set_group (static_cast<char *> (_group_msg.data ()),
254  _group_msg.size ());
255  errno_assert (rc == 0);
256 
257  // We set the group, so we don't need the group_msg anymore
258  rc = _group_msg.close ();
259  errno_assert (rc == 0);
260 has_group:
261  // Thread safe socket doesn't support multipart messages
262  if ((msg_->flags () & msg_t::more) == msg_t::more) {
263  errno = EFAULT;
264  return -1;
265  }
266 
267  // Push message to dish socket
268  rc = session_base_t::push_msg (msg_);
269 
270  if (rc == 0)
271  _state = group;
272 
273  return rc;
274 }
275 
276 int zmq::dish_session_t::pull_msg (msg_t *msg_)
277 {
278  int rc = session_base_t::pull_msg (msg_);
279 
280  if (rc != 0)
281  return rc;
282 
283  if (!msg_->is_join () && !msg_->is_leave ())
284  return rc;
285 
286  const int group_length = static_cast<int> (strlen (msg_->group ()));
287 
288  msg_t command;
289  int offset;
290 
291  if (msg_->is_join ()) {
292  rc = command.init_size (group_length + 5);
293  errno_assert (rc == 0);
294  offset = 5;
295  memcpy (command.data (), "\4JOIN", 5);
296  } else {
297  rc = command.init_size (group_length + 6);
298  errno_assert (rc == 0);
299  offset = 6;
300  memcpy (command.data (), "\5LEAVE", 6);
301  }
302 
303  command.set_flags (msg_t::command);
304  char *command_data = static_cast<char *> (command.data ());
305 
306  // Copy the group
307  memcpy (command_data + offset, msg_->group (), group_length);
308 
309  // Close the join message
310  rc = msg_->close ();
311  errno_assert (rc == 0);
312 
313  *msg_ = command;
314 
315  return 0;
316 }
317 
318 void zmq::dish_session_t::reset ()
319 {
320  session_base_t::reset ();
321  _state = group;
322 }
end
GLuint GLuint end
Definition: glcorearb.h:2858
ENOTSUP
#define ENOTSUP
Definition: zmq.h:104
options
Message * options
Definition: src/google/protobuf/descriptor.cc:3119
EINVAL
#define EINVAL
Definition: errno.hpp:25
EAGAIN
#define EAGAIN
Definition: errno.hpp:14
command
ROSLIB_DECL std::string command(const std::string &cmd)
precompiled.hpp
zmq_assert
#define zmq_assert(x)
Definition: err.hpp:102
string
GLsizei const GLchar *const * string
Definition: glcorearb.h:3083
errno
int errno
ZMQ_GROUP_MAX_LENGTH
#define ZMQ_GROUP_MAX_LENGTH
Definition: zmq.h:368
errno_assert
#define errno_assert(x)
Definition: err.hpp:113
macros.hpp
offset
GLintptr offset
Definition: glcorearb.h:2944
LIBZMQ_UNUSED
#define LIBZMQ_UNUSED(object)
Definition: macros.hpp:6
err
static UPB_NORETURN void err(tarjan *t)
Definition: ruby/ext/google/protobuf_c/upb.c:5856
dish.hpp
ZMQ_DISH
#define ZMQ_DISH
Definition: zmq_draft.h:17
err.hpp
true
#define true
Definition: cJSON.c:65
EFAULT
#define EFAULT
Definition: errno.hpp:17
false
#define false
Definition: cJSON.c:70
group
static uint32_t * group(tarjan *t, upb_refcounted *r)
Definition: ruby/ext/google/protobuf_c/upb.c:5943
it
MapIter it
Definition: php/ext/google/protobuf/map.c:205
options_
DebugStringOptions options_
Definition: src/google/protobuf/descriptor.cc:2410


libaditof
Author(s):
autogenerated on Wed May 21 2025 02:06:50