req.cpp
Go to the documentation of this file.
1 /* SPDX-License-Identifier: MPL-2.0 */
2 
3 #include "precompiled.hpp"
4 #include "macros.hpp"
5 #include "req.hpp"
6 #include "err.hpp"
7 #include "msg.hpp"
8 #include "wire.hpp"
9 #include "random.hpp"
10 #include "likely.hpp"
11 
12 zmq::req_t::req_t (class ctx_t *parent_, uint32_t tid_, int sid_) :
13  dealer_t (parent_, tid_, sid_),
14  _receiving_reply (false),
15  _message_begins (true),
16  _reply_pipe (NULL),
17  _request_id_frames_enabled (false),
18  _request_id (generate_random ()),
19  _strict (true)
20 {
21  options.type = ZMQ_REQ;
22 }
23 
24 zmq::req_t::~req_t ()
25 {
26 }
27 
28 int zmq::req_t::xsend (msg_t *msg_)
29 {
30  // If we've sent a request and we still haven't got the reply,
31  // we can't send another request unless the strict option is disabled.
32  if (_receiving_reply) {
33  if (_strict) {
34  errno = EFSM;
35  return -1;
36  }
37 
38  _receiving_reply = false;
39  _message_begins = true;
40  }
41 
42  // First part of the request is the request routing id.
43  if (_message_begins) {
44  _reply_pipe = NULL;
45 
46  if (_request_id_frames_enabled) {
47  _request_id++;
48 
49  msg_t id;
50  int rc = id.init_size (sizeof (uint32_t));
51  memcpy (id.data (), &_request_id, sizeof (uint32_t));
52  errno_assert (rc == 0);
53  id.set_flags (msg_t::more);
54 
55  rc = dealer_t::sendpipe (&id, &_reply_pipe);
56  if (rc != 0) {
57  return -1;
58  }
59  }
60 
61  msg_t bottom;
62  int rc = bottom.init ();
63  errno_assert (rc == 0);
64  bottom.set_flags (msg_t::more);
65 
66  rc = dealer_t::sendpipe (&bottom, &_reply_pipe);
67  if (rc != 0)
68  return -1;
69  zmq_assert (_reply_pipe);
70 
71  _message_begins = false;
72 
73  // Eat all currently available messages before the request is fully
74  // sent. This is done to avoid:
75  // REQ sends request to A, A replies, B replies too.
76  // A's reply was first and matches, that is used.
77  // An hour later REQ sends a request to B. B's old reply is used.
78  msg_t drop;
79  while (true) {
80  rc = drop.init ();
81  errno_assert (rc == 0);
82  rc = dealer_t::xrecv (&drop);
83  if (rc != 0)
84  break;
85  drop.close ();
86  }
87  }
88 
89  bool more = (msg_->flags () & msg_t::more) != 0;
90 
91  int rc = dealer_t::xsend (msg_);
92  if (rc != 0)
93  return rc;
94 
95  // If the request was fully sent, flip the FSM into reply-receiving state.
96  if (!more) {
97  _receiving_reply = true;
98  _message_begins = true;
99  }
100 
101  return 0;
102 }
103 
104 int zmq::req_t::xrecv (msg_t *msg_)
105 {
106  // If request wasn't send, we can't wait for reply.
107  if (!_receiving_reply) {
108  errno = EFSM;
109  return -1;
110  }
111 
112  // Skip messages until one with the right first frames is found.
113  while (_message_begins) {
114  // If enabled, the first frame must have the correct request_id.
115  if (_request_id_frames_enabled) {
116  int rc = recv_reply_pipe (msg_);
117  if (rc != 0)
118  return rc;
119 
120  if (unlikely (!(msg_->flags () & msg_t::more)
121  || msg_->size () != sizeof (_request_id)
122  || *static_cast<uint32_t *> (msg_->data ())
123  != _request_id)) {
124  // Skip the remaining frames and try the next message
125  while (msg_->flags () & msg_t::more) {
126  rc = recv_reply_pipe (msg_);
127  errno_assert (rc == 0);
128  }
129  continue;
130  }
131  }
132 
133  // The next frame must be 0.
134  // TODO: Failing this check should also close the connection with the peer!
135  int rc = recv_reply_pipe (msg_);
136  if (rc != 0)
137  return rc;
138 
139  if (unlikely (!(msg_->flags () & msg_t::more) || msg_->size () != 0)) {
140  // Skip the remaining frames and try the next message
141  while (msg_->flags () & msg_t::more) {
142  rc = recv_reply_pipe (msg_);
143  errno_assert (rc == 0);
144  }
145  continue;
146  }
147 
148  _message_begins = false;
149  }
150 
151  const int rc = recv_reply_pipe (msg_);
152  if (rc != 0)
153  return rc;
154 
155  // If the reply is fully received, flip the FSM into request-sending state.
156  if (!(msg_->flags () & msg_t::more)) {
157  _receiving_reply = false;
158  _message_begins = true;
159  }
160 
161  return 0;
162 }
163 
164 bool zmq::req_t::xhas_in ()
165 {
166  // TODO: Duplicates should be removed here.
167 
168  if (!_receiving_reply)
169  return false;
170 
171  return dealer_t::xhas_in ();
172 }
173 
174 bool zmq::req_t::xhas_out ()
175 {
176  if (_receiving_reply && _strict)
177  return false;
178 
179  return dealer_t::xhas_out ();
180 }
181 
182 int zmq::req_t::xsetsockopt (int option_,
183  const void *optval_,
184  size_t optvallen_)
185 {
186  const bool is_int = (optvallen_ == sizeof (int));
187  int value = 0;
188  if (is_int)
189  memcpy (&value, optval_, sizeof (int));
190 
191  switch (option_) {
192  case ZMQ_REQ_CORRELATE:
193  if (is_int && value >= 0) {
194  _request_id_frames_enabled = (value != 0);
195  return 0;
196  }
197  break;
198 
199  case ZMQ_REQ_RELAXED:
200  if (is_int && value >= 0) {
201  _strict = (value == 0);
202  return 0;
203  }
204  break;
205 
206  default:
207  break;
208  }
209 
210  return dealer_t::xsetsockopt (option_, optval_, optvallen_);
211 }
212 
213 void zmq::req_t::xpipe_terminated (pipe_t *pipe_)
214 {
215  if (_reply_pipe == pipe_)
216  _reply_pipe = NULL;
217  dealer_t::xpipe_terminated (pipe_);
218 }
219 
220 int zmq::req_t::recv_reply_pipe (msg_t *msg_)
221 {
222  while (true) {
223  pipe_t *pipe = NULL;
224  const int rc = dealer_t::recvpipe (msg_, &pipe);
225  if (rc != 0)
226  return rc;
227  if (!_reply_pipe || pipe == _reply_pipe)
228  return 0;
229  }
230 }
231 
232 zmq::req_session_t::req_session_t (io_thread_t *io_thread_,
233  bool connect_,
234  socket_base_t *socket_,
235  const options_t &options_,
236  address_t *addr_) :
237  session_base_t (io_thread_, connect_, socket_, options_, addr_),
238  _state (bottom)
239 {
240 }
241 
242 zmq::req_session_t::~req_session_t ()
243 {
244 }
245 
246 int zmq::req_session_t::push_msg (msg_t *msg_)
247 {
248  // Ignore commands, they are processed by the engine and should not
249  // affect the state machine.
250  if (unlikely (msg_->flags () & msg_t::command))
251  return 0;
252 
253  switch (_state) {
254  case bottom:
255  if (msg_->flags () == msg_t::more) {
256  // In case option ZMQ_CORRELATE is on, allow request_id to be
257  // transferred as first frame (would be too cumbersome to check
258  // whether the option is actually on or not).
259  if (msg_->size () == sizeof (uint32_t)) {
260  _state = request_id;
261  return session_base_t::push_msg (msg_);
262  }
263  if (msg_->size () == 0) {
264  _state = body;
265  return session_base_t::push_msg (msg_);
266  }
267  }
268  break;
269  case request_id:
270  if (msg_->flags () == msg_t::more && msg_->size () == 0) {
271  _state = body;
272  return session_base_t::push_msg (msg_);
273  }
274  break;
275  case body:
276  if (msg_->flags () == msg_t::more)
277  return session_base_t::push_msg (msg_);
278  if (msg_->flags () == 0) {
279  _state = bottom;
280  return session_base_t::push_msg (msg_);
281  }
282  break;
283  }
284  errno = EFAULT;
285  return -1;
286 }
287 
288 void zmq::req_session_t::reset ()
289 {
290  session_base_t::reset ();
291  _state = bottom;
292 }
NULL
NULL
Definition: test_security_zap.cpp:405
EFSM
#define EFSM
Definition: zmq.h:159
options
Message * options
Definition: src/google/protobuf/descriptor.cc:3119
precompiled.hpp
zmq_assert
#define zmq_assert(x)
Definition: err.hpp:102
random.hpp
errno
int errno
ZMQ_REQ
#define ZMQ_REQ
Definition: zmq.h:261
ZMQ_REQ_RELAXED
#define ZMQ_REQ_RELAXED
Definition: zmq.h:318
wire.hpp
errno_assert
#define errno_assert(x)
Definition: err.hpp:113
macros.hpp
id
GLenum GLuint id
Definition: glcorearb.h:2695
req.hpp
msg.hpp
zmq::generate_random
uint32_t generate_random()
Definition: random.cpp:30
err.hpp
likely.hpp
ZMQ_REQ_CORRELATE
#define ZMQ_REQ_CORRELATE
Definition: zmq.h:317
data
GLint GLenum GLsizei GLsizei GLsizei GLint GLsizei const GLvoid * data
Definition: glcorearb.h:2879
true
#define true
Definition: cJSON.c:65
value
GLsizei const GLfloat * value
Definition: glcorearb.h:3093
EFAULT
#define EFAULT
Definition: errno.hpp:17
false
#define false
Definition: cJSON.c:70
bottom
GLint GLint bottom
Definition: glcorearb.h:4150
unlikely
#define unlikely(x)
Definition: likely.hpp:11
options_
DebugStringOptions options_
Definition: src/google/protobuf/descriptor.cc:2410


libaditof
Author(s):
autogenerated on Wed May 21 2025 02:06:58