12 zmq::req_t::req_t (
class ctx_t *parent_, uint32_t tid_,
int sid_) :
13 dealer_t (parent_, tid_, sid_),
14 _receiving_reply (
false),
15 _message_begins (
true),
17 _request_id_frames_enabled (
false),
28 int zmq::req_t::xsend (msg_t *msg_)
32 if (_receiving_reply) {
38 _receiving_reply =
false;
39 _message_begins =
true;
43 if (_message_begins) {
46 if (_request_id_frames_enabled) {
50 int rc =
id.init_size (
sizeof (uint32_t));
51 memcpy (
id.
data (), &_request_id,
sizeof (uint32_t));
53 id.set_flags (msg_t::more);
55 rc = dealer_t::sendpipe (&
id, &_reply_pipe);
64 bottom.set_flags (msg_t::more);
66 rc = dealer_t::sendpipe (&
bottom, &_reply_pipe);
71 _message_begins =
false;
82 rc = dealer_t::xrecv (&drop);
89 bool more = (msg_->flags () & msg_t::more) != 0;
91 int rc = dealer_t::xsend (msg_);
97 _receiving_reply =
true;
98 _message_begins =
true;
104 int zmq::req_t::xrecv (msg_t *msg_)
107 if (!_receiving_reply) {
113 while (_message_begins) {
115 if (_request_id_frames_enabled) {
116 int rc = recv_reply_pipe (msg_);
120 if (
unlikely (!(msg_->flags () & msg_t::more)
121 || msg_->size () != sizeof (_request_id)
122 || *
static_cast<uint32_t *
> (msg_->data ())
125 while (msg_->flags () & msg_t::more) {
126 rc = recv_reply_pipe (msg_);
135 int rc = recv_reply_pipe (msg_);
139 if (
unlikely (!(msg_->flags () & msg_t::more) || msg_->size () != 0)) {
141 while (msg_->flags () & msg_t::more) {
142 rc = recv_reply_pipe (msg_);
148 _message_begins =
false;
151 const int rc = recv_reply_pipe (msg_);
156 if (!(msg_->flags () & msg_t::more)) {
157 _receiving_reply =
false;
158 _message_begins =
true;
164 bool zmq::req_t::xhas_in ()
168 if (!_receiving_reply)
171 return dealer_t::xhas_in ();
174 bool zmq::req_t::xhas_out ()
176 if (_receiving_reply && _strict)
179 return dealer_t::xhas_out ();
182 int zmq::req_t::xsetsockopt (
int option_,
186 const bool is_int = (optvallen_ ==
sizeof (int));
189 memcpy (&
value, optval_,
sizeof (
int));
193 if (is_int &&
value >= 0) {
194 _request_id_frames_enabled = (
value != 0);
200 if (is_int &&
value >= 0) {
201 _strict = (
value == 0);
210 return dealer_t::xsetsockopt (option_, optval_, optvallen_);
213 void zmq::req_t::xpipe_terminated (pipe_t *pipe_)
215 if (_reply_pipe == pipe_)
217 dealer_t::xpipe_terminated (pipe_);
220 int zmq::req_t::recv_reply_pipe (msg_t *msg_)
224 const int rc = dealer_t::recvpipe (msg_, &pipe);
227 if (!_reply_pipe || pipe == _reply_pipe)
232 zmq::req_session_t::req_session_t (io_thread_t *io_thread_,
234 socket_base_t *socket_,
237 session_base_t (io_thread_, connect_, socket_,
options_, addr_),
242 zmq::req_session_t::~req_session_t ()
246 int zmq::req_session_t::push_msg (msg_t *msg_)
250 if (
unlikely (msg_->flags () & msg_t::command))
255 if (msg_->flags () == msg_t::more) {
259 if (msg_->size () == sizeof (uint32_t)) {
261 return session_base_t::push_msg (msg_);
263 if (msg_->size () == 0) {
265 return session_base_t::push_msg (msg_);
270 if (msg_->flags () == msg_t::more && msg_->size () == 0) {
272 return session_base_t::push_msg (msg_);
276 if (msg_->flags () == msg_t::more)
277 return session_base_t::push_msg (msg_);
278 if (msg_->flags () == 0) {
280 return session_base_t::push_msg (msg_);
288 void zmq::req_session_t::reset ()
290 session_base_t::reset ();