stream.cpp
Go to the documentation of this file.
1 /* SPDX-License-Identifier: MPL-2.0 */
2 
3 #include "precompiled.hpp"
4 #include "macros.hpp"
5 #include "stream.hpp"
6 #include "pipe.hpp"
7 #include "wire.hpp"
8 #include "random.hpp"
9 #include "likely.hpp"
10 #include "err.hpp"
11 
12 zmq::stream_t::stream_t (class ctx_t *parent_, uint32_t tid_, int sid_) :
13  routing_socket_base_t (parent_, tid_, sid_),
14  _prefetched (false),
15  _routing_id_sent (false),
16  _current_out (NULL),
17  _more_out (false),
18  _next_integral_routing_id (generate_random ())
19 {
20  options.type = ZMQ_STREAM;
21  options.raw_socket = true;
22 
23  _prefetched_routing_id.init ();
24  _prefetched_msg.init ();
25 }
26 
27 zmq::stream_t::~stream_t ()
28 {
29  _prefetched_routing_id.close ();
30  _prefetched_msg.close ();
31 }
32 
33 void zmq::stream_t::xattach_pipe (pipe_t *pipe_,
34  bool subscribe_to_all_,
35  bool locally_initiated_)
36 {
37  LIBZMQ_UNUSED (subscribe_to_all_);
38 
39  zmq_assert (pipe_);
40 
41  identify_peer (pipe_, locally_initiated_);
42  _fq.attach (pipe_);
43 }
44 
45 void zmq::stream_t::xpipe_terminated (pipe_t *pipe_)
46 {
47  erase_out_pipe (pipe_);
48  _fq.pipe_terminated (pipe_);
49  // TODO router_t calls pipe_->rollback() here; should this be done here as
50  // well? then xpipe_terminated could be pulled up to routing_socket_base_t
51  if (pipe_ == _current_out)
52  _current_out = NULL;
53 }
54 
55 void zmq::stream_t::xread_activated (pipe_t *pipe_)
56 {
57  _fq.activated (pipe_);
58 }
59 
60 int zmq::stream_t::xsend (msg_t *msg_)
61 {
62  // If this is the first part of the message it's the ID of the
63  // peer to send the message to.
64  if (!_more_out) {
65  zmq_assert (!_current_out);
66 
67  // If we have malformed message (prefix with no subsequent message)
68  // then just silently ignore it.
69  // TODO: The connections should be killed instead.
70  if (msg_->flags () & msg_t::more) {
71  // Find the pipe associated with the routing id stored in the prefix.
72  // If there's no such pipe return an error
73 
74  out_pipe_t *out_pipe = lookup_out_pipe (
75  blob_t (static_cast<unsigned char *> (msg_->data ()),
76  msg_->size (), reference_tag_t ()));
77 
78  if (out_pipe) {
79  _current_out = out_pipe->pipe;
80  if (!_current_out->check_write ()) {
81  out_pipe->active = false;
82  _current_out = NULL;
83  errno = EAGAIN;
84  return -1;
85  }
86  } else {
88  return -1;
89  }
90  }
91 
92  // Expect one more message frame.
93  _more_out = true;
94 
95  int rc = msg_->close ();
96  errno_assert (rc == 0);
97  rc = msg_->init ();
98  errno_assert (rc == 0);
99  return 0;
100  }
101 
102  // Ignore the MORE flag
103  msg_->reset_flags (msg_t::more);
104 
105  // This is the last part of the message.
106  _more_out = false;
107 
108  // Push the message into the pipe. If there's no out pipe, just drop it.
109  if (_current_out) {
110  // Close the remote connection if user has asked to do so
111  // by sending zero length message.
112  // Pending messages in the pipe will be dropped (on receiving term- ack)
113  if (msg_->size () == 0) {
114  _current_out->terminate (false);
115  int rc = msg_->close ();
116  errno_assert (rc == 0);
117  rc = msg_->init ();
118  errno_assert (rc == 0);
119  _current_out = NULL;
120  return 0;
121  }
122  const bool ok = _current_out->write (msg_);
123  if (likely (ok))
124  _current_out->flush ();
125  _current_out = NULL;
126  } else {
127  const int rc = msg_->close ();
128  errno_assert (rc == 0);
129  }
130 
131  // Detach the message from the data buffer.
132  const int rc = msg_->init ();
133  errno_assert (rc == 0);
134 
135  return 0;
136 }
137 
138 int zmq::stream_t::xsetsockopt (int option_,
139  const void *optval_,
140  size_t optvallen_)
141 {
142  switch (option_) {
143  case ZMQ_STREAM_NOTIFY:
144  return do_setsockopt_int_as_bool_strict (optval_, optvallen_,
145  &options.raw_notify);
146 
147  default:
148  return routing_socket_base_t::xsetsockopt (option_, optval_,
149  optvallen_);
150  }
151 }
152 
153 int zmq::stream_t::xrecv (msg_t *msg_)
154 {
155  if (_prefetched) {
156  if (!_routing_id_sent) {
157  const int rc = msg_->move (_prefetched_routing_id);
158  errno_assert (rc == 0);
159  _routing_id_sent = true;
160  } else {
161  const int rc = msg_->move (_prefetched_msg);
162  errno_assert (rc == 0);
163  _prefetched = false;
164  }
165  return 0;
166  }
167 
168  pipe_t *pipe = NULL;
169  int rc = _fq.recvpipe (&_prefetched_msg, &pipe);
170  if (rc != 0)
171  return -1;
172 
173  zmq_assert (pipe != NULL);
174  zmq_assert ((_prefetched_msg.flags () & msg_t::more) == 0);
175 
176  // We have received a frame with TCP data.
177  // Rather than sending this frame, we keep it in prefetched
178  // buffer and send a frame with peer's ID.
179  const blob_t &routing_id = pipe->get_routing_id ();
180  rc = msg_->close ();
181  errno_assert (rc == 0);
182  rc = msg_->init_size (routing_id.size ());
183  errno_assert (rc == 0);
184 
185  // forward metadata (if any)
186  metadata_t *metadata = _prefetched_msg.metadata ();
187  if (metadata)
188  msg_->set_metadata (metadata);
189 
190  memcpy (msg_->data (), routing_id.data (), routing_id.size ());
191  msg_->set_flags (msg_t::more);
192 
193  _prefetched = true;
194  _routing_id_sent = true;
195 
196  return 0;
197 }
198 
199 bool zmq::stream_t::xhas_in ()
200 {
201  // We may already have a message pre-fetched.
202  if (_prefetched)
203  return true;
204 
205  // Try to read the next message.
206  // The message, if read, is kept in the pre-fetch buffer.
207  pipe_t *pipe = NULL;
208  int rc = _fq.recvpipe (&_prefetched_msg, &pipe);
209  if (rc != 0)
210  return false;
211 
212  zmq_assert (pipe != NULL);
213  zmq_assert ((_prefetched_msg.flags () & msg_t::more) == 0);
214 
215  const blob_t &routing_id = pipe->get_routing_id ();
216  rc = _prefetched_routing_id.init_size (routing_id.size ());
217  errno_assert (rc == 0);
218 
219  // forward metadata (if any)
220  metadata_t *metadata = _prefetched_msg.metadata ();
221  if (metadata)
222  _prefetched_routing_id.set_metadata (metadata);
223 
224  memcpy (_prefetched_routing_id.data (), routing_id.data (),
225  routing_id.size ());
226  _prefetched_routing_id.set_flags (msg_t::more);
227 
228  _prefetched = true;
229  _routing_id_sent = false;
230 
231  return true;
232 }
233 
234 bool zmq::stream_t::xhas_out ()
235 {
236  // In theory, STREAM socket is always ready for writing. Whether actual
237  // attempt to write succeeds depends on which pipe the message is going
238  // to be routed to.
239  return true;
240 }
241 
242 void zmq::stream_t::identify_peer (pipe_t *pipe_, bool locally_initiated_)
243 {
244  // Always assign routing id for raw-socket
245  unsigned char buffer[5];
246  buffer[0] = 0;
247  blob_t routing_id;
248  if (locally_initiated_ && connect_routing_id_is_set ()) {
249  const std::string connect_routing_id = extract_connect_routing_id ();
250  routing_id.set (
251  reinterpret_cast<const unsigned char *> (connect_routing_id.c_str ()),
252  connect_routing_id.length ());
253  // Not allowed to duplicate an existing rid
254  zmq_assert (!has_out_pipe (routing_id));
255  } else {
256  put_uint32 (buffer + 1, _next_integral_routing_id++);
257  routing_id.set (buffer, sizeof buffer);
258  memcpy (options.routing_id, routing_id.data (), routing_id.size ());
259  options.routing_id_size =
260  static_cast<unsigned char> (routing_id.size ());
261  }
262  pipe_->set_router_socket_routing_id (routing_id);
263  add_out_pipe (ZMQ_MOVE (routing_id), pipe_);
264 }
NULL
NULL
Definition: test_security_zap.cpp:405
zmq::put_uint32
void put_uint32(unsigned char *buffer_, uint32_t value_)
Definition: wire.hpp:35
options
Message * options
Definition: src/google/protobuf/descriptor.cc:3119
ZMQ_STREAM
#define ZMQ_STREAM
Definition: zmq.h:269
zmq::do_setsockopt_int_as_bool_strict
int do_setsockopt_int_as_bool_strict(const void *optval_, size_t optvallen_, bool *out_value_)
Definition: options.cpp:89
benchmarks.util.result_uploader.metadata
def metadata
Definition: result_uploader.py:97
EAGAIN
#define EAGAIN
Definition: errno.hpp:14
precompiled.hpp
zmq_assert
#define zmq_assert(x)
Definition: err.hpp:102
ZMQ_STREAM_NOTIFY
#define ZMQ_STREAM_NOTIFY
Definition: zmq.h:334
random.hpp
string
GLsizei const GLchar *const * string
Definition: glcorearb.h:3083
errno
int errno
ok
ROSCPP_DECL bool ok()
wire.hpp
likely
#define likely(x)
Definition: likely.hpp:10
errno_assert
#define errno_assert(x)
Definition: err.hpp:113
macros.hpp
LIBZMQ_UNUSED
#define LIBZMQ_UNUSED(object)
Definition: macros.hpp:6
pipe.hpp
buffer
Definition: buffer_processor.h:43
zmq::generate_random
uint32_t generate_random()
Definition: random.cpp:30
err.hpp
likely.hpp
ZMQ_MOVE
#define ZMQ_MOVE(x)
Definition: blob.hpp:33
stream.hpp
EHOSTUNREACH
#define EHOSTUNREACH
Definition: zmq.h:152
false
#define false
Definition: cJSON.c:70


libaditof
Author(s):
autogenerated on Wed May 21 2025 02:06:59