15 _routing_id_sent (
false),
17 _terminate_current_in (
false),
25 _probe_router (
false),
41 _prefetched_id.close ();
42 _prefetched_msg.close ();
46 bool subscribe_to_all_,
47 bool locally_initiated_)
55 int rc = probe_msg.
init ();
58 rc = pipe_->write (&probe_msg);
64 rc = probe_msg.
close ();
68 const bool routing_id_ok = identify_peer (pipe_, locally_initiated_);
72 _anonymous_pipes.insert (pipe_);
79 const bool is_int = (optvallen_ ==
sizeof (int));
82 memcpy (&
value, optval_,
sizeof (
int));
86 if (is_int &&
value >= 0) {
87 _raw_socket = (
value != 0);
89 options.recv_routing_id =
false;
97 if (is_int &&
value >= 0) {
98 _mandatory = (
value != 0);
104 if (is_int &&
value >= 0) {
105 _probe_router = (
value != 0);
111 if (is_int &&
value >= 0) {
112 _handover = (
value != 0);
117 #ifdef ZMQ_BUILD_DRAFT_API
119 if (is_int &&
value >= 0
138 if (0 == _anonymous_pipes.erase (pipe_)) {
139 erase_out_pipe (pipe_);
140 _fq.pipe_terminated (pipe_);
142 if (pipe_ == _current_out)
149 const std::set<pipe_t *>::iterator
it = _anonymous_pipes.find (pipe_);
150 if (
it == _anonymous_pipes.end ())
151 _fq.activated (pipe_);
153 const bool routing_id_ok = identify_peer (pipe_,
false);
155 _anonymous_pipes.erase (
it);
178 blob_t (
static_cast<unsigned char *
> (msg_->
data ()),
182 _current_out = out_pipe->
pipe;
185 if (!_current_out->check_write ()) {
187 const bool pipe_full = !_current_out->check_hwm ();
200 }
else if (_mandatory) {
207 int rc = msg_->
close ();
226 if (_raw_socket && msg_->
size () == 0) {
227 _current_out->terminate (
false);
228 int rc = msg_->
close ();
236 const bool ok = _current_out->write (msg_);
239 const int rc = msg_->
close ();
243 _current_out->rollback ();
247 _current_out->flush ();
252 const int rc = msg_->
close ();
257 const int rc = msg_->
init ();
266 if (!_routing_id_sent) {
267 const int rc = msg_->
move (_prefetched_id);
269 _routing_id_sent =
true;
271 const int rc = msg_->
move (_prefetched_msg);
278 if (_terminate_current_in) {
279 _current_in->terminate (
true);
280 _terminate_current_in =
false;
288 int rc = _fq.recvpipe (msg_, &pipe);
294 rc = _fq.recvpipe (msg_, &pipe);
306 if (_terminate_current_in) {
307 _current_in->terminate (
true);
308 _terminate_current_in =
false;
316 rc = _prefetched_msg.move (*msg_);
321 const blob_t &routing_id = pipe->get_routing_id ();
324 memcpy (msg_->
data (), routing_id.
data (), routing_id.
size ());
326 if (_prefetched_msg.metadata ())
328 _routing_id_sent =
true;
337 _current_out->rollback ();
358 int rc = _fq.recvpipe (&_prefetched_msg, &pipe);
364 while (rc == 0 && _prefetched_msg.is_routing_id ())
365 rc = _fq.recvpipe (&_prefetched_msg, &pipe);
372 const blob_t &routing_id = pipe->get_routing_id ();
373 rc = _prefetched_id.init_size (routing_id.
size ());
375 memcpy (_prefetched_id.data (), routing_id.
data (), routing_id.
size ());
377 if (_prefetched_msg.metadata ())
378 _prefetched_id.set_metadata (_prefetched_msg.metadata ());
381 _routing_id_sent =
false;
389 return pipe_.check_hwm ();
405 size_t routing_id_size_)
const
410 const blob_t routing_id_blob (
411 static_cast<unsigned char *
> (
const_cast<void *
> (routing_id_)),
413 const out_pipe_t *out_pipe = lookup_out_pipe (routing_id_blob);
419 if (out_pipe->
pipe->check_hwm ())
432 if (locally_initiated_ && connect_routing_id_is_set ()) {
433 const std::string connect_routing_id = extract_connect_routing_id ();
435 reinterpret_cast<const unsigned char *
> (connect_routing_id.c_str ()),
436 connect_routing_id.length ());
442 unsigned char buf[5];
446 }
else if (!
options.raw_socket) {
449 const bool ok = pipe_->read (&msg);
453 if (msg.
size () == 0) {
455 unsigned char buf[5];
461 routing_id.
set (
static_cast<unsigned char *
> (msg.
data ()),
468 lookup_out_pipe (routing_id);
470 if (existing_outpipe) {
478 unsigned char buf[5];
483 pipe_t *
const old_pipe = existing_outpipe->
pipe;
485 erase_out_pipe (old_pipe);
486 old_pipe->set_router_socket_routing_id (new_routing_id);
487 add_out_pipe (
ZMQ_MOVE (new_routing_id), old_pipe);
489 if (old_pipe == _current_in)
490 _terminate_current_in =
true;
492 old_pipe->terminate (
true);
497 pipe_->set_router_socket_routing_id (routing_id);
498 add_out_pipe (
ZMQ_MOVE (routing_id), pipe_);