norm_engine.cpp
Go to the documentation of this file.
1 
2 #include "precompiled.hpp"
3 
4 #include "platform.hpp"
5 
6 #if defined ZMQ_HAVE_NORM
7 
8 #include "norm_engine.hpp"
9 #ifdef ZMQ_USE_NORM_SOCKET_WRAPPER
10 #include "ip.hpp"
11 #endif
12 
13 #include "session_base.hpp"
14 #include "v2_protocol.hpp"
15 
16 
17 #ifdef ZMQ_USE_NORM_SOCKET_WRAPPER
18 
19 struct norm_wrapper_thread_args_t
20 {
21  NormDescriptor norm_descriptor;
22  SOCKET wrapper_write_fd;
23  NormInstanceHandle norm_instance_handle;
24 };
25 
26 DWORD WINAPI normWrapperThread (LPVOID lpParam);
27 #endif
28 
29 zmq::norm_engine_t::norm_engine_t (io_thread_t *parent_,
30  const options_t &options_) :
31  io_object_t (parent_),
32  zmq_session (NULL),
33  options (options_),
34  norm_instance (NORM_INSTANCE_INVALID),
35  norm_session (NORM_SESSION_INVALID),
36  is_sender (false),
37  is_receiver (false),
38  zmq_encoder (0),
39  norm_tx_stream (NORM_OBJECT_INVALID),
40  tx_first_msg (true),
41  tx_more_bit (false),
42  zmq_output_ready (false),
43  norm_tx_ready (false),
44  tx_index (0),
45  tx_len (0),
46  zmq_input_ready (false)
47 {
48  int rc = tx_msg.init ();
49  errno_assert (0 == rc);
50 }
51 
52 zmq::norm_engine_t::~norm_engine_t ()
53 {
54  shutdown (); // in case it was not already called
55 }
56 
57 
58 int zmq::norm_engine_t::init (const char *network_, bool send, bool recv)
59 {
60  // Parse the "network_" address int "iface", "addr", and "port"
61  // norm endpoint format: [id,][<iface>;]<addr>:<port>
62  // First, look for optional local NormNodeId
63  // (default NORM_NODE_ANY causes NORM to use host IP addr for NormNodeId)
64  NormNodeId localId = NORM_NODE_ANY;
65  const char *ifacePtr = strchr (network_, ',');
66  if (NULL != ifacePtr) {
67  size_t idLen = ifacePtr - network_;
68  if (idLen > 31)
69  idLen = 31;
70  char idText[32];
71  strncpy (idText, network_, idLen);
72  idText[idLen] = '\0';
73  localId = (NormNodeId) atoi (idText);
74  ifacePtr++;
75  } else {
76  ifacePtr = network_;
77  }
78 
79  // Second, look for optional multicast ifaceName
80  char ifaceName[256];
81  const char *addrPtr = strchr (ifacePtr, ';');
82  if (NULL != addrPtr) {
83  size_t ifaceLen = addrPtr - ifacePtr;
84  if (ifaceLen > 255)
85  ifaceLen = 255; // return error instead?
86  strncpy (ifaceName, ifacePtr, ifaceLen);
87  ifaceName[ifaceLen] = '\0';
88  ifacePtr = ifaceName;
89  addrPtr++;
90  } else {
91  addrPtr = ifacePtr;
92  ifacePtr = NULL;
93  }
94 
95  // Finally, parse IP address and port number
96  const char *portPtr = strrchr (addrPtr, ':');
97  if (NULL == portPtr) {
98  errno = EINVAL;
99  return -1;
100  }
101 
102  char addr[256];
103  size_t addrLen = portPtr - addrPtr;
104  if (addrLen > 255)
105  addrLen = 255;
106  strncpy (addr, addrPtr, addrLen);
107  addr[addrLen] = '\0';
108  portPtr++;
109  unsigned short portNumber = atoi (portPtr);
110 
111  if (NORM_INSTANCE_INVALID == norm_instance) {
112  if (NORM_INSTANCE_INVALID == (norm_instance = NormCreateInstance ())) {
113  // errno set by whatever caused NormCreateInstance() to fail
114  return -1;
115  }
116  }
117 
118  // TBD - What do we use for our local NormNodeId?
119  // (for now we use automatic, IP addr based assignment or passed in 'id')
120  // a) Use ZMQ Identity somehow?
121  // b) Add function to use iface addr
122  // c) Randomize and implement a NORM session layer
123  // conflict detection/resolution protocol
124 
125  norm_session = NormCreateSession (norm_instance, addr, portNumber, localId);
126  if (NORM_SESSION_INVALID == norm_session) {
127  int savedErrno = errno;
128  NormDestroyInstance (norm_instance);
129  norm_instance = NORM_INSTANCE_INVALID;
130  errno = savedErrno;
131  return -1;
132  }
133  // There's many other useful NORM options that could be applied here
134  if (!NormIsUnicastAddress (addr)) {
135  // These only apply for multicast sessions
136  NormSetTTL (norm_session, options.multicast_hops);
137  NormSetRxPortReuse (
138  norm_session,
139  true); // port reuse doesn't work for non-connected unicast
140  NormSetLoopback (norm_session,
141  true); // needed when multicast users on same machine
142  if (NULL != ifacePtr) {
143  // Note a bad interface may not be caught until sender or receiver start
144  // (Since sender/receiver is not yet started, this always succeeds here)
145  NormSetMulticastInterface (norm_session, ifacePtr);
146  }
147  }
148  if (NormIsUnicastAddress (addr) || options.norm_unicast_nacks) {
149  NormSetDefaultUnicastNack (norm_session, true);
150  }
151  // Set TOS but check TOS ECN bit for CCE modes
152  if ((options.norm_mode == ZMQ_NORM_CCE
153  || options.norm_mode == ZMQ_NORM_CCE_ECNONLY)
154  && (options.tos % 4 == 0)) {
155  // ECN Capable Transport not set, so set it
156  NormSetTOS (norm_session, options.tos + 1);
157  } else if ((options.norm_mode == ZMQ_NORM_CCE
158  || options.norm_mode == ZMQ_NORM_CCE_ECNONLY)
159  && (options.tos % 4 == 3)) {
160  // Congestion Experienced is an invalid setting, remove one of the bits
161  NormSetTOS (norm_session, options.tos - 1);
162  } else {
163  NormSetTOS (norm_session, options.tos);
164  }
165 
166  if (recv) {
167  // The alternative NORM_SYNC_CURRENT here would provide "instant"
168  // receiver sync to the sender's _current_ message transmission.
169  // NORM_SYNC_STREAM tries to get everything the sender has cached/buffered
170  NormSetDefaultSyncPolicy (norm_session, NORM_SYNC_STREAM);
171  if (!NormStartReceiver (
172  norm_session, (unsigned long) options.norm_buffer_size * 1024)) {
173  // errno set by whatever failed
174  int savedErrno = errno;
175  NormDestroyInstance (norm_instance); // session gets closed, too
176  norm_session = NORM_SESSION_INVALID;
177  norm_instance = NORM_INSTANCE_INVALID;
178  errno = savedErrno;
179  return -1;
180  }
181  is_receiver = true;
182  }
183 
184  if (send) {
185  // Handle invalid settings -- num_parity must be >= num_autoparity (which has a default of 0)
186  unsigned char numparity =
187  (options.norm_num_parity >= options.norm_num_autoparity
188  ? options.norm_num_parity
189  : options.norm_num_autoparity);
190  // Handle invalid settings -- block size must be > effective num_parity (which is <255)
191  unsigned char blocksize =
192  (options.norm_block_size > numparity ? options.norm_block_size
193  : numparity + 1);
194  // Pick a random sender instance id (aka norm sender session id)
195  NormSessionId instanceId = NormGetRandomSessionId ();
196  // TBD - provide "options" for some NORM sender parameters
197  if (!NormStartSender (norm_session, instanceId,
198  (unsigned long) options.norm_buffer_size * 1024,
199  options.norm_segment_size, blocksize,
200  numparity)) {
201  // errno set by whatever failed
202  int savedErrno = errno;
203  NormDestroyInstance (norm_instance); // session gets closed, too
204  norm_session = NORM_SESSION_INVALID;
205  norm_instance = NORM_INSTANCE_INVALID;
206  errno = savedErrno;
207  return -1;
208  }
209  // Handle NORM mode
210  if (options.norm_mode == ZMQ_NORM_FIXED) {
211  NormSetTxRate (norm_session, (double) options.rate * 1000);
212  } else {
213  NormSetCongestionControl (norm_session, true);
214  if (options.norm_mode != ZMQ_NORM_CC) {
215  NormSetEcnSupport (
216  norm_session,
217  ((options.norm_mode == ZMQ_NORM_CCE)
218  || (options.norm_mode == ZMQ_NORM_CCE_ECNONLY)),
219  options.norm_mode == ZMQ_NORM_CCE_ECNONLY,
220  options.norm_mode == ZMQ_NORM_CCL);
221  }
222  }
223  if (options.norm_num_autoparity > 0) {
224  NormSetAutoParity (norm_session, options.norm_num_autoparity);
225  }
226  norm_tx_ready = true;
227  is_sender = true;
228  if (NORM_OBJECT_INVALID
229  == (norm_tx_stream = NormStreamOpen (
230  norm_session,
231  (unsigned long) options.norm_buffer_size * 1024))) {
232  // errno set by whatever failed
233  int savedErrno = errno;
234  NormDestroyInstance (norm_instance); // session gets closed, too
235  norm_session = NORM_SESSION_INVALID;
236  norm_instance = NORM_INSTANCE_INVALID;
237  errno = savedErrno;
238  return -1;
239  }
240  // NORM Stream options
241  NormStreamSetPushEnable (norm_tx_stream, options.norm_push_enable);
242  }
243 
244  //NormSetMessageTrace(norm_session, true);
245  //NormSetDebugLevel(3);
246  //NormOpenDebugLog(norm_instance, "normLog.txt");
247 
248  return 0; // no error
249 } // end zmq::norm_engine_t::init()
250 
251 void zmq::norm_engine_t::shutdown ()
252 {
253  // TBD - implement a more graceful shutdown option
254  if (is_receiver) {
255  NormStopReceiver (norm_session);
256 
257  // delete any active NormRxStreamState
258  rx_pending_list.Destroy ();
259  rx_ready_list.Destroy ();
260  msg_ready_list.Destroy ();
261 
262  is_receiver = false;
263  }
264  if (is_sender) {
265  NormStopSender (norm_session);
266  is_sender = false;
267  }
268  if (NORM_SESSION_INVALID != norm_session) {
269  NormDestroySession (norm_session);
270  norm_session = NORM_SESSION_INVALID;
271  }
272  if (NORM_INSTANCE_INVALID != norm_instance) {
273  NormStopInstance (norm_instance);
274  NormDestroyInstance (norm_instance);
275  norm_instance = NORM_INSTANCE_INVALID;
276  }
277 } // end zmq::norm_engine_t::shutdown()
278 
279 void zmq::norm_engine_t::plug (io_thread_t *io_thread_,
280  session_base_t *session_)
281 {
282 #ifdef ZMQ_USE_NORM_SOCKET_WRAPPER
283  norm_wrapper_thread_args_t *threadArgs = new norm_wrapper_thread_args_t;
284  int rc = make_fdpair (&wrapper_read_fd, &threadArgs->wrapper_write_fd);
285  errno_assert (rc != -1);
286 
287  threadArgs->norm_descriptor = NormGetDescriptor (norm_instance);
288  threadArgs->norm_instance_handle = norm_instance;
289  norm_descriptor_handle = add_fd (wrapper_read_fd);
290 #else
291  fd_t normDescriptor = NormGetDescriptor (norm_instance);
292  norm_descriptor_handle = add_fd (normDescriptor);
293 #endif
294  // Set POLLIN for notification of pending NormEvents
295  set_pollin (norm_descriptor_handle);
296  // TBD - we may assign the NORM engine to an io_thread in the future???
297  zmq_session = session_;
298  if (is_sender)
299  zmq_output_ready = true;
300  if (is_receiver)
301  zmq_input_ready = true;
302 
303 
304  if (is_sender)
305  send_data ();
306 
307 #ifdef ZMQ_USE_NORM_SOCKET_WRAPPER
308  wrapper_thread_handle = CreateThread (NULL, 0, normWrapperThread,
309  threadArgs, 0, &wrapper_thread_id);
310 #endif
311 
312 } // end zmq::norm_engine_t::init()
313 
314 void zmq::norm_engine_t::unplug ()
315 {
316  rm_fd (norm_descriptor_handle);
317 #ifdef ZMQ_USE_NORM_SOCKET_WRAPPER
318  PostThreadMessage (wrapper_thread_id, WM_QUIT, (WPARAM) NULL,
319  (LPARAM) NULL);
320  WaitForSingleObject (wrapper_thread_handle, INFINITE);
321  DWORD exitCode;
322  GetExitCodeThread (wrapper_thread_handle, &exitCode);
323  zmq_assert (exitCode != -1);
324  int rc = closesocket (wrapper_read_fd);
325  errno_assert (rc != -1);
326 #endif
327  zmq_session = NULL;
328 } // end zmq::norm_engine_t::unplug()
329 
330 void zmq::norm_engine_t::terminate ()
331 {
332  unplug ();
333  shutdown ();
334  delete this;
335 }
336 
337 void zmq::norm_engine_t::restart_output ()
338 {
339  // There's new message data available from the session
340  zmq_output_ready = true;
341  if (norm_tx_ready)
342  send_data ();
343 
344 } // end zmq::norm_engine_t::restart_output()
345 
346 void zmq::norm_engine_t::send_data ()
347 {
348  // Here we write as much as is available or we can
349  while (zmq_output_ready && norm_tx_ready) {
350  if (0 == tx_len) {
351  // Our tx_buffer needs data to send
352  // Get more data from encoder
353  size_t space = BUFFER_SIZE;
354  unsigned char *bufPtr = (unsigned char *) tx_buffer;
355  tx_len = zmq_encoder.encode (&bufPtr, space);
356  if (0 == tx_len) {
357  if (tx_first_msg) {
358  // We don't need to mark eom/flush until a message is sent
359  tx_first_msg = false;
360  } else {
361  // A prior message was completely written to stream, so
362  // mark end-of-message and possibly flush (to force packet transmission,
363  // even if it's not a full segment so message gets delivered quickly)
364  // NormStreamMarkEom(norm_tx_stream); // the flush below marks eom
365  // Note NORM_FLUSH_ACTIVE makes NORM fairly chatty for low duty cycle messaging
366  // but makes sure content is delivered quickly. Positive acknowledgements
367  // with flush override would make NORM more succinct here
368  NormStreamFlush (norm_tx_stream, true, NORM_FLUSH_ACTIVE);
369  }
370  // Need to pull and load a new message to send
371  if (-1 == zmq_session->pull_msg (&tx_msg)) {
372  // We need to wait for "restart_output()" to be called by ZMQ
373  zmq_output_ready = false;
374  break;
375  }
376  zmq_encoder.load_msg (&tx_msg);
377  // Should we write message size header for NORM to use? Or expect NORM
378  // receiver to decode ZMQ message framing format(s)?
379  // OK - we need to use a byte to denote when the ZMQ frame is the _first_
380  // frame of a message so it can be decoded properly when a receiver
381  // 'syncs' mid-stream. We key off the the state of the 'more_flag'
382  // I.e.,If more_flag _was_ false previously, this is the first
383  // frame of a ZMQ message.
384  if (tx_more_bit)
385  tx_buffer[0] =
386  (char) 0xff; // this is not first frame of message
387  else
388  tx_buffer[0] = 0x00; // this is first frame of message
389  tx_more_bit = (0 != (tx_msg.flags () & msg_t::more));
390  // Go ahead an get a first chunk of the message
391  bufPtr++;
392  space--;
393  tx_len = 1 + zmq_encoder.encode (&bufPtr, space);
394  tx_index = 0;
395  }
396  }
397  // Do we have data in our tx_buffer pending
398  if (tx_index < tx_len) {
399  // We have data in our tx_buffer to send, so write it to the stream
400  tx_index += NormStreamWrite (norm_tx_stream, tx_buffer + tx_index,
401  tx_len - tx_index);
402  if (tx_index < tx_len) {
403  // NORM stream buffer full, wait for NORM_TX_QUEUE_VACANCY
404  norm_tx_ready = false;
405  break;
406  }
407  tx_len = 0; // all buffered data was written
408  }
409  } // end while (zmq_output_ready && norm_tx_ready)
410 } // end zmq::norm_engine_t::send_data()
411 
412 void zmq::norm_engine_t::in_event ()
413 {
414  // This means a NormEvent is pending, so call NormGetNextEvent() and handle
415  NormEvent event;
416 #ifdef ZMQ_USE_NORM_SOCKET_WRAPPER
417  int rc = recv (wrapper_read_fd, reinterpret_cast<char *> (&event),
418  sizeof (event), 0);
419  errno_assert (rc == sizeof (event));
420 #else
421  if (!NormGetNextEvent (norm_instance, &event)) {
422  // NORM has died before we unplugged?!
423  zmq_assert (false);
424  return;
425  }
426 #endif
427 
428  switch (event.type) {
429  case NORM_TX_QUEUE_VACANCY:
430  case NORM_TX_QUEUE_EMPTY:
431  if (!norm_tx_ready) {
432  norm_tx_ready = true;
433  send_data ();
434  }
435  break;
436 
437  case NORM_RX_OBJECT_NEW:
438  //break;
439  case NORM_RX_OBJECT_UPDATED:
440  recv_data (event.object);
441  break;
442 
443  case NORM_RX_OBJECT_ABORTED: {
444  NormRxStreamState *rxState =
445  (NormRxStreamState *) NormObjectGetUserData (event.object);
446  if (NULL != rxState) {
447  // Remove the state from the list it's in
448  // This is now unnecessary since deletion takes care of list removal
449  // but in the interest of being clear ...
450  NormRxStreamState::List *list = rxState->AccessList ();
451  if (NULL != list)
452  list->Remove (*rxState);
453  }
454  delete rxState;
455  break;
456  }
457  case NORM_REMOTE_SENDER_INACTIVE:
458  // Here we free resources used for this formerly active sender.
459  // Note w/ NORM_SYNC_STREAM, if sender reactivates, we may
460  // get some messages delivered twice. NORM_SYNC_CURRENT would
461  // mitigate that but might miss data at startup. Always tradeoffs.
462  // Instead of immediately deleting, we could instead initiate a
463  // user configurable timeout here to wait some amount of time
464  // after this event to declare the remote sender truly dead
465  // and delete its state???
466  NormNodeDelete (event.sender);
467  break;
468 
469  default:
470  // We ignore some NORM events
471  break;
472  }
473 } // zmq::norm_engine_t::in_event()
474 
475 bool zmq::norm_engine_t::restart_input ()
476 {
477  // TBD - should we check/assert that zmq_input_ready was false???
478  zmq_input_ready = true;
479  // Process any pending received messages
480  if (!msg_ready_list.IsEmpty ())
481  recv_data (NORM_OBJECT_INVALID);
482 
483  return true;
484 } // end zmq::norm_engine_t::restart_input()
485 
486 void zmq::norm_engine_t::recv_data (NormObjectHandle object)
487 {
488  if (NORM_OBJECT_INVALID != object) {
489  // Call result of NORM_RX_OBJECT_UPDATED notification
490  // This is a rx_ready indication for a new or existing rx stream
491  // First, determine if this is a stream we already know
492  zmq_assert (NORM_OBJECT_STREAM == NormObjectGetType (object));
493  // Since there can be multiple senders (publishers), we keep
494  // state for each separate rx stream.
495  NormRxStreamState *rxState =
496  (NormRxStreamState *) NormObjectGetUserData (object);
497  if (NULL == rxState) {
498  // This is a new stream, so create rxState with zmq decoder, etc
499  rxState = new (std::nothrow)
500  NormRxStreamState (object, options.maxmsgsize, options.zero_copy,
501  options.in_batch_size);
502  errno_assert (rxState);
503 
504  if (!rxState->Init ()) {
505  errno_assert (false);
506  delete rxState;
507  return;
508  }
509  NormObjectSetUserData (object, rxState);
510  } else if (!rxState->IsRxReady ()) {
511  // Existing non-ready stream, so remove from pending
512  // list to be promoted to rx_ready_list ...
513  rx_pending_list.Remove (*rxState);
514  }
515  if (!rxState->IsRxReady ()) {
516  // TBD - prepend up front for immediate service?
517  rxState->SetRxReady (true);
518  rx_ready_list.Append (*rxState);
519  }
520  }
521  // This loop repeats until we've read all data available from "rx ready" inbound streams
522  // and pushed any accumulated messages we can up to the zmq session.
523  while (!rx_ready_list.IsEmpty ()
524  || (zmq_input_ready && !msg_ready_list.IsEmpty ())) {
525  // Iterate through our rx_ready streams, reading data into the decoder
526  // (This services incoming "rx ready" streams in a round-robin fashion)
527  NormRxStreamState::List::Iterator iterator (rx_ready_list);
528  NormRxStreamState *rxState;
529  while (NULL != (rxState = iterator.GetNextItem ())) {
530  switch (rxState->Decode ()) {
531  case 1: // msg completed
532  // Complete message decoded, move this stream to msg_ready_list
533  // to push the message up to the session below. Note the stream
534  // will be returned to the "rx_ready_list" after that's done
535  rx_ready_list.Remove (*rxState);
536  msg_ready_list.Append (*rxState);
537  continue;
538 
539  case -1: // decoding error (shouldn't happen w/ NORM, but ...)
540  // We need to re-sync this stream (decoder buffer was reset)
541  rxState->SetSync (false);
542  break;
543 
544  default: // 0 - need more data
545  break;
546  }
547  // Get more data from this stream
548  NormObjectHandle stream = rxState->GetStreamHandle ();
549  // First, make sure we're in sync ...
550  while (!rxState->InSync ()) {
551  // seek NORM message start
552  if (!NormStreamSeekMsgStart (stream)) {
553  // Need to wait for more data
554  break;
555  }
556  // read message 'flag' byte to see if this it's a 'final' frame
557  char syncFlag;
558  unsigned int numBytes = 1;
559  if (!NormStreamRead (stream, &syncFlag, &numBytes)) {
560  // broken stream (can happen on late-joining subscriber)
561  continue;
562  }
563  if (0 == numBytes) {
564  // This probably shouldn't happen either since we found msg start
565  // Need to wait for more data
566  break;
567  }
568  if (0 == syncFlag)
569  rxState->SetSync (true);
570  // else keep seeking ...
571  } // end while(!rxState->InSync())
572  if (!rxState->InSync ()) {
573  // Need more data for this stream, so remove from "rx ready"
574  // list and iterate to next "rx ready" stream
575  rxState->SetRxReady (false);
576  // Move from rx_ready_list to rx_pending_list
577  rx_ready_list.Remove (*rxState);
578  rx_pending_list.Append (*rxState);
579  continue;
580  }
581  // Now we're actually ready to read data from the NORM stream to the zmq_decoder
582  // the underlying zmq_decoder->get_buffer() call sets how much is needed.
583  unsigned int numBytes = rxState->GetBytesNeeded ();
584  if (!NormStreamRead (stream, rxState->AccessBuffer (), &numBytes)) {
585  // broken NORM stream, so re-sync
586  rxState->Init (); // TBD - check result
587  // This will retry syncing, and getting data from this stream
588  // since we don't increment the "it" iterator
589  continue;
590  }
591  rxState->IncrementBufferCount (numBytes);
592  if (0 == numBytes) {
593  // All the data available has been read
594  // Need to wait for NORM_RX_OBJECT_UPDATED for this stream
595  rxState->SetRxReady (false);
596  // Move from rx_ready_list to rx_pending_list
597  rx_ready_list.Remove (*rxState);
598  rx_pending_list.Append (*rxState);
599  }
600  } // end while(NULL != (rxState = iterator.GetNextItem()))
601 
602  if (zmq_input_ready) {
603  // At this point, we've made a pass through the "rx_ready" stream list
604  // Now make a pass through the "msg_pending" list (if the zmq session
605  // ready for more input). This may possibly return streams back to
606  // the "rx ready" stream list after their pending message is handled
607  NormRxStreamState::List::Iterator iterator (msg_ready_list);
608  NormRxStreamState *rxState;
609  while (NULL != (rxState = iterator.GetNextItem ())) {
610  msg_t *msg = rxState->AccessMsg ();
611  int rc = zmq_session->push_msg (msg);
612  if (-1 == rc) {
613  if (EAGAIN == errno) {
614  // need to wait until session calls "restart_input()"
615  zmq_input_ready = false;
616  break;
617  } else {
618  // session rejected message?
619  // TBD - handle this better
620  zmq_assert (false);
621  }
622  }
623  // else message was accepted.
624  msg_ready_list.Remove (*rxState);
625  if (
626  rxState
627  ->IsRxReady ()) // Move back to "rx_ready" list to read more data
628  rx_ready_list.Append (*rxState);
629  else // Move back to "rx_pending" list until NORM_RX_OBJECT_UPDATED
630  msg_ready_list.Append (*rxState);
631  } // end while(NULL != (rxState = iterator.GetNextItem()))
632  } // end if (zmq_input_ready)
633  } // end while ((!rx_ready_list.empty() || (zmq_input_ready && !msg_ready_list.empty()))
634 
635  // Alert zmq of the messages we have pushed up
636  zmq_session->flush ();
637 
638 } // end zmq::norm_engine_t::recv_data()
639 
640 zmq::norm_engine_t::NormRxStreamState::NormRxStreamState (
641  NormObjectHandle normStream,
642  int64_t maxMsgSize,
643  bool zeroCopy,
644  int inBatchSize) :
645  norm_stream (normStream),
646  max_msg_size (maxMsgSize),
647  zero_copy (zeroCopy),
648  in_batch_size (inBatchSize),
649  in_sync (false),
650  rx_ready (false),
651  zmq_decoder (NULL),
652  skip_norm_sync (false),
653  buffer_ptr (NULL),
654  buffer_size (0),
655  buffer_count (0),
656  prev (NULL),
657  next (NULL),
658  list (NULL)
659 {
660 }
661 
662 zmq::norm_engine_t::NormRxStreamState::~NormRxStreamState ()
663 {
664  if (NULL != zmq_decoder) {
665  delete zmq_decoder;
666  zmq_decoder = NULL;
667  }
668  if (NULL != list) {
669  list->Remove (*this);
670  list = NULL;
671  }
672 }
673 
675 {
676  in_sync = false;
677  skip_norm_sync = false;
678  if (NULL != zmq_decoder)
679  delete zmq_decoder;
680  zmq_decoder =
681  new (std::nothrow) v2_decoder_t (in_batch_size, max_msg_size, zero_copy);
682  alloc_assert (zmq_decoder);
683  if (NULL != zmq_decoder) {
684  buffer_count = 0;
685  buffer_size = 0;
686  zmq_decoder->get_buffer (&buffer_ptr, &buffer_size);
687  return true;
688  } else {
689  return false;
690  }
691 } // end zmq::norm_engine_t::NormRxStreamState::Init()
692 
693 // This decodes any pending data sitting in our stream decoder buffer
694 // It returns 1 upon message completion, -1 on error, 1 on msg completion
695 int zmq::norm_engine_t::NormRxStreamState::Decode ()
696 {
697  // If we have pending bytes to decode, process those first
698  while (buffer_count > 0) {
699  // There's pending data for the decoder to decode
700  size_t processed = 0;
701 
702  // This a bit of a kludgy approach used to weed
703  // out the NORM ZMQ message transport "syncFlag" byte
704  // from the ZMQ message stream being decoded (but it works!)
705  if (skip_norm_sync) {
706  buffer_ptr++;
707  buffer_count--;
708  skip_norm_sync = false;
709  }
710 
711  int rc = zmq_decoder->decode (buffer_ptr, buffer_count, processed);
712  buffer_ptr += processed;
713  buffer_count -= processed;
714  switch (rc) {
715  case 1:
716  // msg completed
717  if (0 == buffer_count) {
718  buffer_size = 0;
719  zmq_decoder->get_buffer (&buffer_ptr, &buffer_size);
720  }
721  skip_norm_sync = true;
722  return 1;
723  case -1:
724  // decoder error (reset decoder and state variables)
725  in_sync = false;
726  skip_norm_sync = false; // will get consumed by norm sync check
727  Init ();
728  break;
729 
730  case 0:
731  // need more data, keep decoding until buffer exhausted
732  break;
733  }
734  }
735  // Reset buffer pointer/count for next read
736  buffer_count = 0;
737  buffer_size = 0;
738  zmq_decoder->get_buffer (&buffer_ptr, &buffer_size);
739  return 0; // need more data
740 
741 } // end zmq::norm_engine_t::NormRxStreamState::Decode()
742 
743 zmq::norm_engine_t::NormRxStreamState::List::List () : head (NULL), tail (NULL)
744 {
745 }
746 
747 zmq::norm_engine_t::NormRxStreamState::List::~List ()
748 {
749  Destroy ();
750 }
751 
752 void zmq::norm_engine_t::NormRxStreamState::List::Destroy ()
753 {
754  NormRxStreamState *item = head;
755  while (NULL != item) {
756  Remove (*item);
757  delete item;
758  item = head;
759  }
760 } // end zmq::norm_engine_t::NormRxStreamState::List::Destroy()
761 
763  NormRxStreamState &item)
764 {
765  item.prev = tail;
766  if (NULL != tail)
767  tail->next = &item;
768  else
769  head = &item;
770  item.next = NULL;
771  tail = &item;
772  item.list = this;
773 } // end zmq::norm_engine_t::NormRxStreamState::List::Append()
774 
776  NormRxStreamState &item)
777 {
778  if (NULL != item.prev)
779  item.prev->next = item.next;
780  else
781  head = item.next;
782  if (NULL != item.next)
783  item.next->prev = item.prev;
784  else
785  tail = item.prev;
786  item.prev = item.next = NULL;
787  item.list = NULL;
788 } // end zmq::norm_engine_t::NormRxStreamState::List::Remove()
789 
790 zmq::norm_engine_t::NormRxStreamState::List::Iterator::Iterator (
791  const List &list) :
792  next_item (list.head)
793 {
794 }
795 
796 zmq::norm_engine_t::NormRxStreamState *
797 zmq::norm_engine_t::NormRxStreamState::List::Iterator::GetNextItem ()
798 {
799  NormRxStreamState *nextItem = next_item;
800  if (NULL != nextItem)
801  next_item = nextItem->next;
802  return nextItem;
803 } // end zmq::norm_engine_t::NormRxStreamState::List::Iterator::GetNextItem()
804 
805 const zmq::endpoint_uri_pair_t &zmq::norm_engine_t::get_endpoint () const
806 {
807  return _empty_endpoint;
808 }
809 
810 
811 #ifdef ZMQ_USE_NORM_SOCKET_WRAPPER
812 #include <iostream>
813 DWORD WINAPI normWrapperThread (LPVOID lpParam)
814 {
815  norm_wrapper_thread_args_t *norm_wrapper_thread_args =
816  (norm_wrapper_thread_args_t *) lpParam;
817  NormEvent message;
818  DWORD waitRc;
819  DWORD exitCode = 0;
820  int rc;
821 
822  for (;;) {
823  // wait for norm event or message
824  waitRc = MsgWaitForMultipleObjectsEx (
825  1, &norm_wrapper_thread_args->norm_descriptor, INFINITE,
826  QS_ALLPOSTMESSAGE, 0);
827 
828  // Check if norm event
829  if (waitRc == WAIT_OBJECT_0) {
830  // Process norm event
831  if (!NormGetNextEvent (
832  norm_wrapper_thread_args->norm_instance_handle, &message)) {
833  exitCode = -1;
834  break;
835  }
836  rc =
837  send (norm_wrapper_thread_args->wrapper_write_fd,
838  reinterpret_cast<char *> (&message), sizeof (message), 0);
839  errno_assert (rc != -1);
840  // Check if message
841  } else if (waitRc == WAIT_OBJECT_0 + 1) {
842  // Exit if WM_QUIT is received otherwise do nothing
843  MSG message;
844  GetMessage (&message, 0, 0, 0);
845  if (message.message == WM_QUIT) {
846  break;
847  } else {
848  // do nothing
849  }
850  // Otherwise an error occurred
851  } else {
852  exitCode = -1;
853  break;
854  }
855  }
856  // Free resources
857  rc = closesocket (norm_wrapper_thread_args->wrapper_write_fd);
858  free (norm_wrapper_thread_args);
859  errno_assert (rc != -1);
860 
861  return exitCode;
862 }
863 
864 #endif
865 
866 #endif // ZMQ_HAVE_NORM
init
WEPOLL_INTERNAL int init(void)
Definition: wepoll.c:858
closesocket
#define closesocket
Definition: unittest_poller.cpp:13
ip.hpp
stream
GLuint GLuint stream
Definition: glcorearb.h:3946
NULL
NULL
Definition: test_security_zap.cpp:405
google::protobuf::python::repeated_composite_container::Remove
static PyObject * Remove(PyObject *pself, PyObject *value)
Definition: repeated_composite_container.cc:301
item
cJSON * item
Definition: cJSON.h:236
options
Message * options
Definition: src/google/protobuf/descriptor.cc:3119
EINVAL
#define EINVAL
Definition: errno.hpp:25
ZMQ_NORM_FIXED
#define ZMQ_NORM_FIXED
Definition: zmq_draft.h:59
EAGAIN
#define EAGAIN
Definition: errno.hpp:14
ZMQ_NORM_CCE
#define ZMQ_NORM_CCE
Definition: zmq_draft.h:62
precompiled.hpp
zmq_assert
#define zmq_assert(x)
Definition: err.hpp:102
Append
static void Append(State *state, const char *const str, const int length)
Definition: demangle.cc:272
google::protobuf::python::cmessage::Init
static int Init(CMessage *self, PyObject *args, PyObject *kwargs)
Definition: python/google/protobuf/pyext/message.cc:1286
errno
int errno
send
void send(fd_t fd_, const char(&data_)[N])
Definition: test_security_curve.cpp:209
alloc_assert
#define alloc_assert(x)
Definition: err.hpp:146
errno_assert
#define errno_assert(x)
Definition: err.hpp:113
cJSON::next
struct cJSON * next
Definition: cJSON.h:107
zmq::endpoint_uri_pair_t
Definition: endpoint.hpp:17
shutdown
ROSCONSOLE_DECL void shutdown()
event
struct _cl_event * event
Definition: glcorearb.h:4163
zmq::make_fdpair
int make_fdpair(fd_t *r_, fd_t *w_)
Definition: ip.cpp:532
cJSON::prev
struct cJSON * prev
Definition: cJSON.h:108
versiongenerate.buffer_size
int buffer_size
Definition: versiongenerate.py:65
ZMQ_NORM_CC
#define ZMQ_NORM_CC
Definition: zmq_draft.h:60
fd_t
zmq_fd_t fd_t
Definition: libzmq/tests/testutil.hpp:98
ZMQ_NORM_CCE_ECNONLY
#define ZMQ_NORM_CCE_ECNONLY
Definition: zmq_draft.h:63
ZMQ_NORM_CCL
#define ZMQ_NORM_CCL
Definition: zmq_draft.h:61
true
#define true
Definition: cJSON.c:65
v2_protocol.hpp
next
static size_t next(const upb_table *t, size_t i)
Definition: php/ext/google/protobuf/upb.c:4889
session_base.hpp
SOCKET
uintptr_t SOCKET
Definition: wepoll.c:71
false
#define false
Definition: cJSON.c:70
message
GLenum GLuint GLenum GLsizei const GLchar * message
Definition: glcorearb.h:2695
norm_engine.hpp
options_
DebugStringOptions options_
Definition: src/google/protobuf/descriptor.cc:2410


libaditof
Author(s):
autogenerated on Wed May 21 2025 02:06:57