12 zmq::stream_t::stream_t (
class ctx_t *parent_, uint32_t tid_,
int sid_) :
13 routing_socket_base_t (parent_, tid_, sid_),
15 _routing_id_sent (
false),
23 _prefetched_routing_id.init ();
24 _prefetched_msg.init ();
27 zmq::stream_t::~stream_t ()
29 _prefetched_routing_id.close ();
30 _prefetched_msg.close ();
33 void zmq::stream_t::xattach_pipe (pipe_t *pipe_,
34 bool subscribe_to_all_,
35 bool locally_initiated_)
41 identify_peer (pipe_, locally_initiated_);
45 void zmq::stream_t::xpipe_terminated (pipe_t *pipe_)
47 erase_out_pipe (pipe_);
48 _fq.pipe_terminated (pipe_);
51 if (pipe_ == _current_out)
55 void zmq::stream_t::xread_activated (pipe_t *pipe_)
57 _fq.activated (pipe_);
60 int zmq::stream_t::xsend (msg_t *msg_)
70 if (msg_->flags () & msg_t::more) {
74 out_pipe_t *out_pipe = lookup_out_pipe (
75 blob_t (
static_cast<unsigned char *
> (msg_->data ()),
76 msg_->size (), reference_tag_t ()));
79 _current_out = out_pipe->pipe;
80 if (!_current_out->check_write ()) {
81 out_pipe->active =
false;
95 int rc = msg_->close ();
103 msg_->reset_flags (msg_t::more);
113 if (msg_->size () == 0) {
114 _current_out->terminate (
false);
115 int rc = msg_->close ();
122 const bool ok = _current_out->write (msg_);
124 _current_out->flush ();
127 const int rc = msg_->close ();
132 const int rc = msg_->init ();
138 int zmq::stream_t::xsetsockopt (
int option_,
148 return routing_socket_base_t::xsetsockopt (option_, optval_,
153 int zmq::stream_t::xrecv (msg_t *msg_)
156 if (!_routing_id_sent) {
157 const int rc = msg_->move (_prefetched_routing_id);
159 _routing_id_sent =
true;
161 const int rc = msg_->move (_prefetched_msg);
169 int rc = _fq.recvpipe (&_prefetched_msg, &pipe);
174 zmq_assert ((_prefetched_msg.flags () & msg_t::more) == 0);
179 const blob_t &routing_id = pipe->get_routing_id ();
182 rc = msg_->init_size (routing_id.size ());
186 metadata_t *
metadata = _prefetched_msg.metadata ();
190 memcpy (msg_->data (), routing_id.data (), routing_id.size ());
191 msg_->set_flags (msg_t::more);
194 _routing_id_sent =
true;
199 bool zmq::stream_t::xhas_in ()
208 int rc = _fq.recvpipe (&_prefetched_msg, &pipe);
213 zmq_assert ((_prefetched_msg.flags () & msg_t::more) == 0);
215 const blob_t &routing_id = pipe->get_routing_id ();
216 rc = _prefetched_routing_id.init_size (routing_id.size ());
220 metadata_t *
metadata = _prefetched_msg.metadata ();
222 _prefetched_routing_id.set_metadata (
metadata);
224 memcpy (_prefetched_routing_id.data (), routing_id.data (),
226 _prefetched_routing_id.set_flags (msg_t::more);
229 _routing_id_sent =
false;
234 bool zmq::stream_t::xhas_out ()
242 void zmq::stream_t::identify_peer (pipe_t *pipe_,
bool locally_initiated_)
248 if (locally_initiated_ && connect_routing_id_is_set ()) {
249 const std::string connect_routing_id = extract_connect_routing_id ();
251 reinterpret_cast<const unsigned char *
> (connect_routing_id.c_str ()),
252 connect_routing_id.length ());
258 memcpy (
options.routing_id, routing_id.data (), routing_id.size ());
260 static_cast<unsigned char> (routing_id.size ());
262 pipe_->set_router_socket_routing_id (routing_id);
263 add_out_pipe (
ZMQ_MOVE (routing_id), pipe_);