9 #ifndef ZMQ_HAVE_WINDOWS
39 zmq::zmtp_engine_t::zmtp_engine_t (
42 const endpoint_uri_pair_t &endpoint_uri_pair_) :
43 stream_engine_base_t (fd_,
options_, endpoint_uri_pair_,
true),
44 _greeting_size (v2_greeting_size),
45 _greeting_bytes_read (0),
46 _subscription_required (
false),
47 _heartbeat_timeout (0)
49 _next_msg =
static_cast<int (stream_engine_base_t::*) (msg_t *)
> (
50 &zmtp_engine_t::routing_id_msg);
51 _process_msg =
static_cast<int (stream_engine_base_t::*) (msg_t *)
> (
52 &zmtp_engine_t::process_routing_id_msg);
54 int rc = _pong_msg.init ();
57 rc = _routing_id_msg.init ();
60 if (_options.heartbeat_interval > 0) {
61 _heartbeat_timeout = _options.heartbeat_timeout;
62 if (_heartbeat_timeout == -1)
63 _heartbeat_timeout = _options.heartbeat_interval;
67 zmq::zmtp_engine_t::~zmtp_engine_t ()
69 const int rc = _routing_id_msg.close ();
73 void zmq::zmtp_engine_t::plug_internal ()
76 set_handshake_timer ();
80 _outpos = _greeting_send;
81 _outpos[_outsize++] = UCHAR_MAX;
82 put_uint64 (&_outpos[_outsize], _options.routing_id_size + 1);
84 _outpos[_outsize++] = 0x7f;
96 bool zmq::zmtp_engine_t::handshake ()
98 zmq_assert (_greeting_bytes_read < _greeting_size);
100 const int rc = receive_greeting ();
103 const bool unversioned = rc != 0;
106 ->*select_handshake_fun (unversioned, _greeting_recv[
revision_pos],
117 int zmq::zmtp_engine_t::receive_greeting ()
119 bool unversioned =
false;
120 while (_greeting_bytes_read < _greeting_size) {
121 const int n = read (_greeting_recv + _greeting_bytes_read,
122 _greeting_size - _greeting_bytes_read);
125 error (connection_error);
129 _greeting_bytes_read +=
n;
134 if (_greeting_recv[0] != 0xff) {
139 if (_greeting_bytes_read < signature_size)
146 if (!(_greeting_recv[9] & 0x01)) {
152 receive_greeting_versioned ();
154 return unversioned ? 1 : 0;
157 void zmq::zmtp_engine_t::receive_greeting_versioned ()
160 if (_outpos + _outsize == _greeting_send + signature_size) {
163 _outpos[_outsize++] = 3;
166 if (_greeting_bytes_read > signature_size) {
167 if (_outpos + _outsize == _greeting_send + signature_size + 1) {
174 _outpos[_outsize++] = _options.type;
176 _outpos[_outsize++] = 1;
177 memset (_outpos + _outsize, 0, 20);
185 memcpy (_outpos + _outsize,
"NULL", 4);
186 else if (_options.mechanism ==
ZMQ_PLAIN)
187 memcpy (_outpos + _outsize,
"PLAIN", 5);
189 memcpy (_outpos + _outsize,
"GSSAPI", 6);
190 else if (_options.mechanism ==
ZMQ_CURVE)
191 memcpy (_outpos + _outsize,
"CURVE", 5);
193 memset (_outpos + _outsize, 0, 32);
195 _greeting_size = v3_greeting_size;
201 zmq::zmtp_engine_t::handshake_fun_t zmq::zmtp_engine_t::select_handshake_fun (
202 bool unversioned_,
unsigned char revision_,
unsigned char minor_)
206 return &zmtp_engine_t::handshake_v1_0_unversioned;
210 return &zmtp_engine_t::handshake_v1_0;
212 return &zmtp_engine_t::handshake_v2_0;
216 return &zmtp_engine_t::handshake_v3_0;
218 return &zmtp_engine_t::handshake_v3_1;
221 return &zmtp_engine_t::handshake_v3_1;
225 bool zmq::zmtp_engine_t::handshake_v1_0_unversioned ()
228 if (session ()->zap_enabled ()) {
230 error (protocol_error);
234 _encoder =
new (std::nothrow) v1_encoder_t (_options.out_batch_size);
237 _decoder =
new (std::nothrow)
238 v1_decoder_t (_options.in_batch_size, _options.maxmsgsize);
245 const size_t header_size =
246 _options.routing_id_size + 1 >= UCHAR_MAX ? 10 : 2;
247 unsigned char tmp[10], *bufferp = tmp;
251 int rc = _routing_id_msg.close ();
253 rc = _routing_id_msg.init_size (_options.routing_id_size);
255 memcpy (_routing_id_msg.data (), _options.routing_id,
256 _options.routing_id_size);
257 _encoder->load_msg (&_routing_id_msg);
258 const size_t buffer_size = _encoder->encode (&bufferp, header_size);
262 _inpos = _greeting_recv;
263 _insize = _greeting_bytes_read;
269 _subscription_required =
true;
273 _next_msg = &zmtp_engine_t::pull_msg_from_session;
276 _process_msg =
static_cast<int (stream_engine_base_t::*) (msg_t *)
> (
277 &zmtp_engine_t::process_routing_id_msg);
282 bool zmq::zmtp_engine_t::handshake_v1_0 ()
284 if (session ()->zap_enabled ()) {
286 error (protocol_error);
290 _encoder =
new (std::nothrow) v1_encoder_t (_options.out_batch_size);
293 _decoder =
new (std::nothrow)
294 v1_decoder_t (_options.in_batch_size, _options.maxmsgsize);
300 bool zmq::zmtp_engine_t::handshake_v2_0 ()
302 if (session ()->zap_enabled ()) {
304 error (protocol_error);
308 _encoder =
new (std::nothrow) v2_encoder_t (_options.out_batch_size);
311 _decoder =
new (std::nothrow) v2_decoder_t (
312 _options.in_batch_size, _options.maxmsgsize, _options.zero_copy);
318 bool zmq::zmtp_engine_t::handshake_v3_x (
const bool downgrade_sub_)
321 && memcmp (_greeting_recv + 12,
"NULL\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0",
324 _mechanism =
new (std::nothrow)
325 null_mechanism_t (session (), _peer_address, _options);
327 }
else if (_options.mechanism ==
ZMQ_PLAIN
328 && memcmp (_greeting_recv + 12,
329 "PLAIN\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0", 20)
331 if (_options.as_server)
332 _mechanism =
new (std::nothrow)
333 plain_server_t (session (), _peer_address, _options);
336 new (std::nothrow) plain_client_t (session (), _options);
339 #ifdef ZMQ_HAVE_CURVE
341 && memcmp (_greeting_recv + 12,
342 "CURVE\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0", 20)
344 if (_options.as_server)
345 _mechanism =
new (std::nothrow) curve_server_t (
346 session (), _peer_address, _options, downgrade_sub_);
348 _mechanism =
new (std::nothrow)
349 curve_client_t (session (), _options, downgrade_sub_);
353 #ifdef HAVE_LIBGSSAPI_KRB5
355 && memcmp (_greeting_recv + 12,
356 "GSSAPI\0\0\0\0\0\0\0\0\0\0\0\0\0\0", 20)
358 if (_options.as_server)
359 _mechanism =
new (std::nothrow)
360 gssapi_server_t (session (), _peer_address, _options);
363 new (std::nothrow) gssapi_client_t (session (), _options);
368 socket ()->event_handshake_failed_protocol (
369 session ()->get_endpoint (),
371 error (protocol_error);
374 #ifndef ZMQ_HAVE_CURVE
377 _next_msg = &zmtp_engine_t::next_handshake_command;
378 _process_msg = &zmtp_engine_t::process_handshake_command;
383 bool zmq::zmtp_engine_t::handshake_v3_0 ()
385 _encoder =
new (std::nothrow) v2_encoder_t (_options.out_batch_size);
388 _decoder =
new (std::nothrow) v2_decoder_t (
389 _options.in_batch_size, _options.maxmsgsize, _options.zero_copy);
392 return zmq::zmtp_engine_t::handshake_v3_x (
true);
395 bool zmq::zmtp_engine_t::handshake_v3_1 ()
397 _encoder =
new (std::nothrow) v3_1_encoder_t (_options.out_batch_size);
400 _decoder =
new (std::nothrow) v2_decoder_t (
401 _options.in_batch_size, _options.maxmsgsize, _options.zero_copy);
404 return zmq::zmtp_engine_t::handshake_v3_x (
false);
407 int zmq::zmtp_engine_t::routing_id_msg (msg_t *msg_)
409 const int rc = msg_->init_size (_options.routing_id_size);
411 if (_options.routing_id_size > 0)
412 memcpy (msg_->data (), _options.routing_id, _options.routing_id_size);
413 _next_msg = &zmtp_engine_t::pull_msg_from_session;
417 int zmq::zmtp_engine_t::process_routing_id_msg (msg_t *msg_)
419 if (_options.recv_routing_id) {
420 msg_->set_flags (msg_t::routing_id);
421 const int rc = session ()->push_msg (msg_);
424 int rc = msg_->close ();
430 if (_subscription_required) {
435 int rc = subscription.init_size (1);
437 *
static_cast<unsigned char *
> (subscription.data ()) = 1;
438 rc = session ()->push_msg (&subscription);
442 _process_msg = &zmtp_engine_t::push_msg_to_session;
447 int zmq::zmtp_engine_t::produce_ping_message (msg_t *msg_)
450 const size_t ping_ttl_len = msg_t::ping_cmd_name_size + 2;
453 int rc = msg_->init_size (ping_ttl_len);
455 msg_->set_flags (msg_t::command);
457 memcpy (msg_->data (),
"\4PING", msg_t::ping_cmd_name_size);
459 uint16_t ttl_val = htons (_options.heartbeat_ttl);
460 memcpy (
static_cast<uint8_t *
> (msg_->data ()) + msg_t::ping_cmd_name_size,
461 &ttl_val, sizeof (ttl_val));
463 rc = _mechanism->encode (msg_);
464 _next_msg = &zmtp_engine_t::pull_and_encode;
465 if (!_has_timeout_timer && _heartbeat_timeout > 0) {
466 add_timer (_heartbeat_timeout, heartbeat_timeout_timer_id);
467 _has_timeout_timer =
true;
472 int zmq::zmtp_engine_t::produce_pong_message (msg_t *msg_)
476 int rc = msg_->move (_pong_msg);
479 rc = _mechanism->encode (msg_);
480 _next_msg = &zmtp_engine_t::pull_and_encode;
484 int zmq::zmtp_engine_t::process_heartbeat_message (msg_t *msg_)
486 if (msg_->is_ping ()) {
488 const size_t ping_ttl_len = msg_t::ping_cmd_name_size + 2;
489 const size_t ping_max_ctx_len = 16;
490 uint16_t remote_heartbeat_ttl;
493 memcpy (&remote_heartbeat_ttl,
494 static_cast<uint8_t *
> (msg_->data ())
495 + msg_t::ping_cmd_name_size,
496 ping_ttl_len - msg_t::ping_cmd_name_size);
497 remote_heartbeat_ttl = ntohs (remote_heartbeat_ttl);
500 remote_heartbeat_ttl *= 100;
502 if (!_has_ttl_timer && remote_heartbeat_ttl > 0) {
503 add_timer (remote_heartbeat_ttl, heartbeat_ttl_timer_id);
504 _has_ttl_timer =
true;
512 const size_t context_len =
513 std::min (msg_->size () - ping_ttl_len, ping_max_ctx_len);
515 _pong_msg.init_size (msg_t::ping_cmd_name_size + context_len);
517 _pong_msg.set_flags (msg_t::command);
518 memcpy (_pong_msg.data (),
"\4PONG", msg_t::ping_cmd_name_size);
520 memcpy (
static_cast<uint8_t *
> (_pong_msg.data ())
521 + msg_t::ping_cmd_name_size,
522 static_cast<uint8_t *
> (msg_->data ()) + ping_ttl_len,
525 _next_msg =
static_cast<int (stream_engine_base_t::*) (msg_t *)
> (
526 &zmtp_engine_t::produce_pong_message);
533 int zmq::zmtp_engine_t::process_command_message (msg_t *msg_)
535 const uint8_t cmd_name_size =
536 *(
static_cast<const uint8_t *
> (msg_->data ()));
537 const size_t ping_name_size = msg_t::ping_cmd_name_size - 1;
538 const size_t sub_name_size = msg_t::sub_cmd_name_size - 1;
539 const size_t cancel_name_size = msg_t::cancel_cmd_name_size - 1;
541 if (
unlikely (msg_->size () < cmd_name_size +
sizeof (cmd_name_size)))
544 const uint8_t *
const cmd_name =
545 static_cast<const uint8_t *
> (msg_->data ()) + 1;
546 if (cmd_name_size == ping_name_size
547 && memcmp (cmd_name,
"PING", cmd_name_size) == 0)
549 if (cmd_name_size == ping_name_size
550 && memcmp (cmd_name,
"PONG", cmd_name_size) == 0)
552 if (cmd_name_size == sub_name_size
553 && memcmp (cmd_name,
"SUBSCRIBE", cmd_name_size) == 0)
555 if (cmd_name_size == cancel_name_size
556 && memcmp (cmd_name,
"CANCEL", cmd_name_size) == 0)
559 if (msg_->is_ping () || msg_->is_pong ())
560 return process_heartbeat_message (msg_);