4 #include "platform.hpp"
6 #if defined ZMQ_HAVE_NORM
9 #ifdef ZMQ_USE_NORM_SOCKET_WRAPPER
17 #ifdef ZMQ_USE_NORM_SOCKET_WRAPPER
19 struct norm_wrapper_thread_args_t
21 NormDescriptor norm_descriptor;
23 NormInstanceHandle norm_instance_handle;
26 DWORD WINAPI normWrapperThread (LPVOID lpParam);
29 zmq::norm_engine_t::norm_engine_t (io_thread_t *parent_,
31 io_object_t (parent_),
34 norm_instance (NORM_INSTANCE_INVALID),
35 norm_session (NORM_SESSION_INVALID),
39 norm_tx_stream (NORM_OBJECT_INVALID),
42 zmq_output_ready (
false),
43 norm_tx_ready (
false),
46 zmq_input_ready (
false)
48 int rc = tx_msg.init ();
52 zmq::norm_engine_t::~norm_engine_t ()
64 NormNodeId localId = NORM_NODE_ANY;
65 const char *ifacePtr = strchr (network_,
',');
66 if (
NULL != ifacePtr) {
67 size_t idLen = ifacePtr - network_;
71 strncpy (idText, network_, idLen);
73 localId = (NormNodeId) atoi (idText);
81 const char *addrPtr = strchr (ifacePtr,
';');
82 if (
NULL != addrPtr) {
83 size_t ifaceLen = addrPtr - ifacePtr;
86 strncpy (ifaceName, ifacePtr, ifaceLen);
87 ifaceName[ifaceLen] =
'\0';
96 const char *portPtr = strrchr (addrPtr,
':');
97 if (
NULL == portPtr) {
103 size_t addrLen = portPtr - addrPtr;
106 strncpy (addr, addrPtr, addrLen);
107 addr[addrLen] =
'\0';
109 unsigned short portNumber = atoi (portPtr);
111 if (NORM_INSTANCE_INVALID == norm_instance) {
112 if (NORM_INSTANCE_INVALID == (norm_instance = NormCreateInstance ())) {
125 norm_session = NormCreateSession (norm_instance, addr, portNumber, localId);
126 if (NORM_SESSION_INVALID == norm_session) {
127 int savedErrno =
errno;
128 NormDestroyInstance (norm_instance);
129 norm_instance = NORM_INSTANCE_INVALID;
134 if (!NormIsUnicastAddress (addr)) {
136 NormSetTTL (norm_session,
options.multicast_hops);
140 NormSetLoopback (norm_session,
142 if (
NULL != ifacePtr) {
145 NormSetMulticastInterface (norm_session, ifacePtr);
148 if (NormIsUnicastAddress (addr) ||
options.norm_unicast_nacks) {
149 NormSetDefaultUnicastNack (norm_session,
true);
156 NormSetTOS (norm_session,
options.tos + 1);
161 NormSetTOS (norm_session,
options.tos - 1);
163 NormSetTOS (norm_session,
options.tos);
170 NormSetDefaultSyncPolicy (norm_session, NORM_SYNC_STREAM);
171 if (!NormStartReceiver (
172 norm_session, (
unsigned long)
options.norm_buffer_size * 1024)) {
174 int savedErrno =
errno;
175 NormDestroyInstance (norm_instance);
176 norm_session = NORM_SESSION_INVALID;
177 norm_instance = NORM_INSTANCE_INVALID;
186 unsigned char numparity =
189 :
options.norm_num_autoparity);
191 unsigned char blocksize =
195 NormSessionId instanceId = NormGetRandomSessionId ();
197 if (!NormStartSender (norm_session, instanceId,
198 (
unsigned long)
options.norm_buffer_size * 1024,
199 options.norm_segment_size, blocksize,
202 int savedErrno =
errno;
203 NormDestroyInstance (norm_instance);
204 norm_session = NORM_SESSION_INVALID;
205 norm_instance = NORM_INSTANCE_INVALID;
211 NormSetTxRate (norm_session, (
double)
options.rate * 1000);
213 NormSetCongestionControl (norm_session,
true);
223 if (
options.norm_num_autoparity > 0) {
224 NormSetAutoParity (norm_session,
options.norm_num_autoparity);
226 norm_tx_ready =
true;
228 if (NORM_OBJECT_INVALID
229 == (norm_tx_stream = NormStreamOpen (
231 (
unsigned long)
options.norm_buffer_size * 1024))) {
233 int savedErrno =
errno;
234 NormDestroyInstance (norm_instance);
235 norm_session = NORM_SESSION_INVALID;
236 norm_instance = NORM_INSTANCE_INVALID;
241 NormStreamSetPushEnable (norm_tx_stream,
options.norm_push_enable);
251 void zmq::norm_engine_t::shutdown ()
255 NormStopReceiver (norm_session);
258 rx_pending_list.Destroy ();
259 rx_ready_list.Destroy ();
260 msg_ready_list.Destroy ();
265 NormStopSender (norm_session);
268 if (NORM_SESSION_INVALID != norm_session) {
269 NormDestroySession (norm_session);
270 norm_session = NORM_SESSION_INVALID;
272 if (NORM_INSTANCE_INVALID != norm_instance) {
273 NormStopInstance (norm_instance);
274 NormDestroyInstance (norm_instance);
275 norm_instance = NORM_INSTANCE_INVALID;
279 void zmq::norm_engine_t::plug (io_thread_t *io_thread_,
280 session_base_t *session_)
282 #ifdef ZMQ_USE_NORM_SOCKET_WRAPPER
283 norm_wrapper_thread_args_t *threadArgs =
new norm_wrapper_thread_args_t;
284 int rc =
make_fdpair (&wrapper_read_fd, &threadArgs->wrapper_write_fd);
287 threadArgs->norm_descriptor = NormGetDescriptor (norm_instance);
288 threadArgs->norm_instance_handle = norm_instance;
289 norm_descriptor_handle = add_fd (wrapper_read_fd);
291 fd_t normDescriptor = NormGetDescriptor (norm_instance);
292 norm_descriptor_handle = add_fd (normDescriptor);
295 set_pollin (norm_descriptor_handle);
297 zmq_session = session_;
299 zmq_output_ready =
true;
301 zmq_input_ready =
true;
307 #ifdef ZMQ_USE_NORM_SOCKET_WRAPPER
308 wrapper_thread_handle = CreateThread (
NULL, 0, normWrapperThread,
309 threadArgs, 0, &wrapper_thread_id);
314 void zmq::norm_engine_t::unplug ()
316 rm_fd (norm_descriptor_handle);
317 #ifdef ZMQ_USE_NORM_SOCKET_WRAPPER
318 PostThreadMessage (wrapper_thread_id, WM_QUIT, (WPARAM)
NULL,
320 WaitForSingleObject (wrapper_thread_handle, INFINITE);
322 GetExitCodeThread (wrapper_thread_handle, &exitCode);
330 void zmq::norm_engine_t::terminate ()
337 void zmq::norm_engine_t::restart_output ()
340 zmq_output_ready =
true;
346 void zmq::norm_engine_t::send_data ()
349 while (zmq_output_ready && norm_tx_ready) {
353 size_t space = BUFFER_SIZE;
354 unsigned char *bufPtr = (
unsigned char *) tx_buffer;
355 tx_len = zmq_encoder.encode (&bufPtr, space);
359 tx_first_msg =
false;
368 NormStreamFlush (norm_tx_stream,
true, NORM_FLUSH_ACTIVE);
371 if (-1 == zmq_session->pull_msg (&tx_msg)) {
373 zmq_output_ready =
false;
376 zmq_encoder.load_msg (&tx_msg);
389 tx_more_bit = (0 != (tx_msg.flags () & msg_t::more));
393 tx_len = 1 + zmq_encoder.encode (&bufPtr, space);
398 if (tx_index < tx_len) {
400 tx_index += NormStreamWrite (norm_tx_stream, tx_buffer + tx_index,
402 if (tx_index < tx_len) {
404 norm_tx_ready =
false;
412 void zmq::norm_engine_t::in_event ()
416 #ifdef ZMQ_USE_NORM_SOCKET_WRAPPER
417 int rc = recv (wrapper_read_fd,
reinterpret_cast<char *
> (&
event),
421 if (!NormGetNextEvent (norm_instance, &
event)) {
428 switch (
event.type) {
429 case NORM_TX_QUEUE_VACANCY:
430 case NORM_TX_QUEUE_EMPTY:
431 if (!norm_tx_ready) {
432 norm_tx_ready =
true;
437 case NORM_RX_OBJECT_NEW:
439 case NORM_RX_OBJECT_UPDATED:
440 recv_data (
event.object);
443 case NORM_RX_OBJECT_ABORTED: {
444 NormRxStreamState *rxState =
445 (NormRxStreamState *) NormObjectGetUserData (
event.object);
446 if (
NULL != rxState) {
450 NormRxStreamState::List *list = rxState->AccessList ();
452 list->Remove (*rxState);
457 case NORM_REMOTE_SENDER_INACTIVE:
466 NormNodeDelete (
event.sender);
475 bool zmq::norm_engine_t::restart_input ()
478 zmq_input_ready =
true;
480 if (!msg_ready_list.IsEmpty ())
481 recv_data (NORM_OBJECT_INVALID);
486 void zmq::norm_engine_t::recv_data (NormObjectHandle
object)
488 if (NORM_OBJECT_INVALID !=
object) {
492 zmq_assert (NORM_OBJECT_STREAM == NormObjectGetType (
object));
495 NormRxStreamState *rxState =
496 (NormRxStreamState *) NormObjectGetUserData (
object);
497 if (
NULL == rxState) {
499 rxState =
new (std::nothrow)
504 if (!rxState->Init ()) {
509 NormObjectSetUserData (
object, rxState);
510 }
else if (!rxState->IsRxReady ()) {
513 rx_pending_list.Remove (*rxState);
515 if (!rxState->IsRxReady ()) {
517 rxState->SetRxReady (
true);
518 rx_ready_list.Append (*rxState);
523 while (!rx_ready_list.IsEmpty ()
524 || (zmq_input_ready && !msg_ready_list.IsEmpty ())) {
527 NormRxStreamState::List::Iterator iterator (rx_ready_list);
528 NormRxStreamState *rxState;
529 while (
NULL != (rxState = iterator.GetNextItem ())) {
530 switch (rxState->Decode ()) {
535 rx_ready_list.Remove (*rxState);
536 msg_ready_list.Append (*rxState);
541 rxState->SetSync (
false);
548 NormObjectHandle
stream = rxState->GetStreamHandle ();
550 while (!rxState->InSync ()) {
552 if (!NormStreamSeekMsgStart (
stream)) {
558 unsigned int numBytes = 1;
559 if (!NormStreamRead (
stream, &syncFlag, &numBytes)) {
569 rxState->SetSync (
true);
572 if (!rxState->InSync ()) {
575 rxState->SetRxReady (
false);
577 rx_ready_list.Remove (*rxState);
578 rx_pending_list.Append (*rxState);
583 unsigned int numBytes = rxState->GetBytesNeeded ();
584 if (!NormStreamRead (
stream, rxState->AccessBuffer (), &numBytes)) {
591 rxState->IncrementBufferCount (numBytes);
595 rxState->SetRxReady (
false);
597 rx_ready_list.Remove (*rxState);
598 rx_pending_list.Append (*rxState);
602 if (zmq_input_ready) {
607 NormRxStreamState::List::Iterator iterator (msg_ready_list);
608 NormRxStreamState *rxState;
609 while (
NULL != (rxState = iterator.GetNextItem ())) {
610 msg_t *msg = rxState->AccessMsg ();
611 int rc = zmq_session->push_msg (msg);
615 zmq_input_ready =
false;
624 msg_ready_list.Remove (*rxState);
628 rx_ready_list.Append (*rxState);
630 msg_ready_list.Append (*rxState);
636 zmq_session->flush ();
640 zmq::norm_engine_t::NormRxStreamState::NormRxStreamState (
641 NormObjectHandle normStream,
645 norm_stream (normStream),
646 max_msg_size (maxMsgSize),
647 zero_copy (zeroCopy),
648 in_batch_size (inBatchSize),
652 skip_norm_sync (
false),
662 zmq::norm_engine_t::NormRxStreamState::~NormRxStreamState ()
664 if (
NULL != zmq_decoder) {
669 list->Remove (*
this);
677 skip_norm_sync =
false;
678 if (
NULL != zmq_decoder)
681 new (std::nothrow) v2_decoder_t (in_batch_size, max_msg_size, zero_copy);
683 if (
NULL != zmq_decoder) {
686 zmq_decoder->get_buffer (&buffer_ptr, &
buffer_size);
695 int zmq::norm_engine_t::NormRxStreamState::Decode ()
698 while (buffer_count > 0) {
700 size_t processed = 0;
705 if (skip_norm_sync) {
708 skip_norm_sync =
false;
711 int rc = zmq_decoder->decode (buffer_ptr, buffer_count, processed);
712 buffer_ptr += processed;
713 buffer_count -= processed;
717 if (0 == buffer_count) {
719 zmq_decoder->get_buffer (&buffer_ptr, &
buffer_size);
721 skip_norm_sync =
true;
726 skip_norm_sync =
false;
738 zmq_decoder->get_buffer (&buffer_ptr, &
buffer_size);
743 zmq::norm_engine_t::NormRxStreamState::List::List () : head (
NULL), tail (
NULL)
747 zmq::norm_engine_t::NormRxStreamState::List::~List ()
752 void zmq::norm_engine_t::NormRxStreamState::List::Destroy ()
754 NormRxStreamState *
item = head;
763 NormRxStreamState &
item)
776 NormRxStreamState &
item)
790 zmq::norm_engine_t::NormRxStreamState::List::Iterator::Iterator (
792 next_item (list.head)
796 zmq::norm_engine_t::NormRxStreamState *
797 zmq::norm_engine_t::NormRxStreamState::List::Iterator::GetNextItem ()
799 NormRxStreamState *nextItem = next_item;
800 if (
NULL != nextItem)
801 next_item = nextItem->next;
807 return _empty_endpoint;
811 #ifdef ZMQ_USE_NORM_SOCKET_WRAPPER
813 DWORD WINAPI normWrapperThread (LPVOID lpParam)
815 norm_wrapper_thread_args_t *norm_wrapper_thread_args =
816 (norm_wrapper_thread_args_t *) lpParam;
824 waitRc = MsgWaitForMultipleObjectsEx (
825 1, &norm_wrapper_thread_args->norm_descriptor, INFINITE,
826 QS_ALLPOSTMESSAGE, 0);
829 if (waitRc == WAIT_OBJECT_0) {
831 if (!NormGetNextEvent (
832 norm_wrapper_thread_args->norm_instance_handle, &
message)) {
837 send (norm_wrapper_thread_args->wrapper_write_fd,
841 }
else if (waitRc == WAIT_OBJECT_0 + 1) {
844 GetMessage (&
message, 0, 0, 0);
845 if (
message.message == WM_QUIT) {
857 rc =
closesocket (norm_wrapper_thread_args->wrapper_write_fd);
858 free (norm_wrapper_thread_args);
866 #endif // ZMQ_HAVE_NORM