PublisherPeriodic.cpp
Go to the documentation of this file.
1 // -*- C++ -*-
19 #include <rtm/RTC.h>
20 #include <coil/Properties.h>
21 #include <coil/Time.h>
22 #include <coil/stringutil.h>
23 #include <rtm/PublisherPeriodic.h>
24 #include <rtm/InPortConsumer.h>
25 #include <rtm/RTC.h>
26 #include <stdlib.h>
27 #include <rtm/idl/DataPortSkel.h>
29 #include <rtm/SystemLogger.h>
30 
31 namespace RTC
32 {
41  : rtclog("PublisherPeriodic"),
42  m_consumer(0), m_buffer(0), m_task(0), m_listeners(0),
43  m_retcode(PORT_OK), m_pushPolicy(NEW),
44  m_skipn(0), m_active(false), m_readback(false), m_leftskip(0)
45  {
46  }
47 
48 
57  {
58  RTC_TRACE(("~PublisherPeriodic()"));
59  if (m_task != 0)
60  {
61  m_task->resume();
62  m_task->finalize();
63  RTC_PARANOID(("task finalized."));
64 
66  RTC_PARANOID(("task deleted."));
67  }
68 
69  // "consumer" should be deleted in the Connector
70  m_consumer = 0;
71  // "buffer" should be deleted in the Connector
72  m_buffer = 0;
73  }
74 
83  {
84  RTC_TRACE(("init()"));
85  RTC_DEBUG_STR((prop));
86 
87  setPushPolicy(prop);
88  if (!createTask(prop))
89  {
90  return INVALID_ARGS;
91  }
92  return PORT_OK;
93 
94 
95 
96 
97 
98  return PORT_OK;
99  }
100 
110  {
111  RTC_TRACE(("setConsumer()"));
112 
113  if (consumer == 0)
114  {
115  RTC_ERROR(("setConsumer(consumer = 0): invalid argument."));
116  return INVALID_ARGS;
117  }
118  m_consumer = consumer;
119  return PORT_OK;
120  }
121 
130  {
131  RTC_TRACE(("setBuffer()"));
132 
133  if (buffer == 0)
134  {
135  RTC_ERROR(("setBuffer(buffer == 0): invalid argument"));
136  return INVALID_ARGS;
137  }
138  m_buffer = buffer;
139  return PORT_OK;
140  }
141 
151  ConnectorListeners* listeners)
152  {
153  RTC_TRACE(("setListeners()"));
154 
155  if (listeners == 0)
156  {
157  RTC_ERROR(("setListeners(listeners == 0): invalid argument"));
158  return INVALID_ARGS;
159  }
160  m_profile = info;
161  m_listeners = listeners;
162  return PORT_OK;
163  }
164 
173  PublisherPeriodic::write(const cdrMemoryStream& data,
174  unsigned long sec,
175  unsigned long usec)
176  {
177  RTC_PARANOID(("write()"));
178 
179  if (m_consumer == 0) { return PRECONDITION_NOT_MET; }
180  if (m_buffer == 0) { return PRECONDITION_NOT_MET; }
181  if (m_listeners == 0) { return PRECONDITION_NOT_MET; }
182 
183  if (m_retcode == CONNECTION_LOST)
184  {
185  RTC_DEBUG(("write(): connection lost."));
186  return m_retcode;
187  }
188 
189  if (m_retcode == SEND_FULL)
190  {
191  RTC_DEBUG(("write(): InPort buffer is full."));
192  m_buffer->write(data, sec, usec);
193  return BUFFER_FULL;
194  }
195 
196  onBufferWrite(data);
197  CdrBufferBase::ReturnCode ret(m_buffer->write(data, sec, usec));
198  RTC_DEBUG(("%s = write()", CdrBufferBase::toString(ret)));
199  m_task->resume();
200  return convertReturn(ret, data);
201  }
202 
211  {
212  return m_active;
213  }
214 
223  {
224  if (m_task == 0) { return PRECONDITION_NOT_MET; }
225  if (m_buffer == 0) { return PRECONDITION_NOT_MET; }
226  m_active = true;
227  m_task->resume();
228  return PORT_OK;
229  }
230 
239  {
240  if (m_task == 0) { return PRECONDITION_NOT_MET; }
241  m_active = false;
242  m_task->suspend();
243  return PORT_OK;
244  }
245 
254  {
255  Guard guard(m_retmutex);
256  switch (m_pushPolicy)
257  {
258  case ALL:
259  m_retcode = pushAll();
260  break;
261  case FIFO:
262  m_retcode = pushFifo();
263  break;
264  case SKIP:
265  m_retcode = pushSkip();
266  break;
267  case NEW:
268  m_retcode = pushNew();
269  break;
270  default:
271  m_retcode = pushNew();
272  break;
273  }
274  return 0;
275  }
276 
281  {
282  RTC_TRACE(("pushAll()"));
283  if (bufferIsEmpty()) { return BUFFER_EMPTY; }
284 
285  while (m_buffer->readable() > 0)
286  {
287  const cdrMemoryStream& cdr(m_buffer->get());
288  onBufferRead(cdr);
289 
290  onSend(cdr);
291  ReturnCode ret(m_consumer->put(cdr));
292  if (ret != PORT_OK)
293  {
294  RTC_DEBUG(("%s = consumer.put()", DataPortStatus::toString(ret)));
295  return invokeListener(ret, cdr);
296  }
297  onReceived(cdr);
298 
300 
301  }
302  return PORT_OK;
303  }
304 
309  {
310  RTC_TRACE(("pushFifo()"));
311  if (bufferIsEmpty()) { return BUFFER_EMPTY; }
312 
313  const cdrMemoryStream& cdr(m_buffer->get());
314  onBufferRead(cdr);
315 
316  onSend(cdr);
317  ReturnCode ret(m_consumer->put(cdr));
318 
319  if (ret != PORT_OK)
320  {
321  RTC_DEBUG(("%s = consumer.put()", DataPortStatus::toString(ret)));
322  return invokeListener(ret, cdr);
323  }
324  onReceived(cdr);
325 
327 
328  return PORT_OK;
329  }
330 
335  {
336  RTC_TRACE(("pushSkip()"));
337  if (bufferIsEmpty()) { return BUFFER_EMPTY; }
338 
340  int readable = m_buffer->readable();
341  int preskip(readable + m_leftskip);
342  int loopcnt(preskip/(m_skipn +1));
343  int postskip(m_skipn - m_leftskip);
344  for (int i(0); i < loopcnt; ++i)
345  {
346  m_buffer->advanceRptr(postskip);
347  readable -= postskip;
348 
349  const cdrMemoryStream& cdr(m_buffer->get());
350  onBufferRead(cdr);
351 
352  onSend(cdr);
353  ret = m_consumer->put(cdr);
354  if (ret != PORT_OK)
355  {
356  m_buffer->advanceRptr(-postskip);
357  RTC_DEBUG(("%s = consumer.put()", DataPortStatus::toString(ret)));
358  return invokeListener(ret, cdr);
359  }
360  onReceived(cdr);
361  postskip = m_skipn + 1;
362  }
363 
364  m_buffer->advanceRptr(readable);
365  m_leftskip = preskip % (m_skipn +1);
366 
367  return ret;
368  }
369 
374  {
375  RTC_TRACE(("pushNew()"));
376  if (bufferIsEmpty()) { return BUFFER_EMPTY; }
377 
378  // In case of the periodic/push_new policy, the buffer should
379  // allow readback. But, readback flag should be set as "true"
380  // after written at least one datum into the buffer.
381  m_readback = true;
383 
384  const cdrMemoryStream& cdr(m_buffer->get());
385  onBufferRead(cdr);
386 
387  onSend(cdr);
388  ReturnCode ret(m_consumer->put(cdr));
389  if (ret != PORT_OK)
390  {
391  RTC_DEBUG(("%s = consumer.put()", DataPortStatus::toString(ret)));
392  return invokeListener(ret, cdr);
393  }
394  onReceived(cdr);
395 
397 
398  return PORT_OK;
399  }
400 
409  {
410  // push_policy default: NEW
411  std::string push_policy = prop.getProperty("publisher.push_policy", "new");
412  RTC_DEBUG(("push_policy: %s", push_policy.c_str()));
413 
414  coil::normalize(push_policy);
415  if (push_policy == "all") { m_pushPolicy = ALL; }
416  else if (push_policy == "fifo") { m_pushPolicy = FIFO; }
417  else if (push_policy == "skip") { m_pushPolicy = SKIP; }
418  else if (push_policy == "new") { m_pushPolicy = NEW; }
419  else
420  {
421  RTC_ERROR(("invalid push_policy value: %s", push_policy.c_str()));
422  m_pushPolicy = NEW; // default push policy
423  }
424 
425  // skip_count default: 0
426  std::string skip_count = prop.getProperty("publisher.skip_count", "0");
427  RTC_DEBUG(("skip_count: %s", skip_count.c_str()));
428 
429  if (!coil::stringTo(m_skipn, skip_count.c_str()))
430  {
431  RTC_ERROR(("invalid skip_count value: %s", skip_count.c_str()));
432  m_skipn = 0; // desfault skip count
433  }
434  if (m_skipn < 0)
435  {
436  RTC_ERROR(("invalid skip_count value: %d", m_skipn));
437  m_skipn = 0; // default skip count
438  }
439  }
440 
449  {
451 
452  // Creating and setting task object
453  coil::vstring th = factory.getIdentifiers();
454  RTC_DEBUG(("available task types: %s", coil::flatten(th).c_str()));
455 
456  m_task = factory.createObject(prop.getProperty("thread_type", "default"));
457  if (m_task == 0)
458  {
459  RTC_ERROR(("Task creation failed: %s",
460  prop.getProperty("thread_type", "default").c_str()));
461  return INVALID_ARGS;
462  }
464  RTC_PARANOID(("Task creation succeeded."));
465 
466  // Extracting publisher's period time
467  double hz;
468  if (!coil::stringTo(hz, prop["publisher.push_rate"].c_str()) &&
469  !coil::stringTo(hz, prop["push_rate"].c_str())) // for 0.4 compatibility
470  {
471  RTC_ERROR(("publisher.push_rate/push_rate were not found."));
472  return false;
473  }
474 
475  if (hz <= 0)
476  {
477  RTC_ERROR(("invalid period: %f [s]", hz));
478  return false;
479  }
480  m_task->setPeriod(1.0/hz);
481 
482  // Setting task measurement function
483  m_task->executionMeasure(coil::toBool(prop["measurement.exec_time"],
484  "enable", "disable", true));
485 
486  int ecount;
487  if (coil::stringTo(ecount, prop["measurement.exec_count"].c_str()))
488  {
489  m_task->executionMeasureCount(ecount);
490  }
491 
492  m_task->periodicMeasure(coil::toBool(prop["measurement.period_time"],
493  "enable", "disable", true));
494  int pcount;
495  if (coil::stringTo(pcount, prop["measurement.period_count"].c_str()))
496  {
497  m_task->periodicMeasureCount(pcount);
498  }
499 
500  // Start task in suspended mode
501  m_task->suspend();
502  m_task->activate();
503  m_task->suspend();
504 
505  return true;
506  }
507 
517  const cdrMemoryStream& data)
518  {
519  /*
520  * BufferStatus -> DataPortStatus
521  *
522  * BUFFER_OK -> PORT_OK
523  * BUFFER_ERROR -> BUFFER_ERROR
524  * BUFFER_FULL -> BUFFER_FULL
525  * NOT_SUPPORTED -> PORT_ERROR
526  * TIMEOUT -> BUFFER_TIMEOUT
527  * PRECONDITION_NOT_MET -> PRECONDITION_NOT_MET
528  */
529  switch (status)
530  {
532  // no callback
535  // no callback
538  onBufferFull(data);
541  // no callback
544  onBufferWriteTimeout(data);
547  // no callback
549  default:
550  // no callback
552  }
554  }
555 
565  const cdrMemoryStream& data)
566  {
567  // ret:
568  // PORT_OK, PORT_ERROR, SEND_FULL, SEND_TIMEOUT, CONNECTION_LOST,
569  // UNKNOWN_ERROR
570  switch (status)
571  {
572  case PORT_ERROR:
573  onReceiverError(data);
574  return PORT_ERROR;
575 
576  case SEND_FULL:
577  onReceiverFull(data);
578  return SEND_FULL;
579 
580  case SEND_TIMEOUT:
581  onReceiverTimeout(data);
582  return SEND_TIMEOUT;
583 
584  case CONNECTION_LOST:
585  onReceiverError(data);
586  return CONNECTION_LOST;
587 
588  case UNKNOWN_ERROR:
589  onReceiverError(data);
590  return UNKNOWN_ERROR;
591 
592  default:
593  onReceiverError(data);
594  return PORT_ERROR;
595  }
596  }
597 
598 }; // namespace RTC
599 
600 extern "C"
601 {
603  {
605  instance().addFactory("periodic",
610  }
611 };
DATAPORTSTATUS_ENUM PublisherPeriodic(void)
Constructor.
ConnectorListeners class.
InPortConsumer abstract class.
#define RTC_ERROR(fmt)
Error log output macro.
Definition: SystemLogger.h:422
std::string normalize(std::string &str)
Erase the head/tail blank and replace upper case to lower case.
Definition: stringutil.cpp:303
void onBufferWriteTimeout(const cdrMemoryStream &data)
Notify an ON_BUFFER_WRITE_TIMEOUT event to listeners.
virtual int resume(void)=0
Resuming the suspended task.
RT-Component.
bool stringTo(To &val, const char *str)
Convert the given std::string to object.
Definition: stringutil.h:597
RT component logger class.
AbstractClass * Creator()
Creator template.
RTC::ReturnCode_t ret(RTC::Local::ReturnCode_t r)
virtual ~PublisherPeriodic(void)
Destructor.
static const char * toString(Enum status)
Convert BufferStatus into the string.
Definition: BufferStatus.h:118
virtual int suspend(void)=0
Suspending the task.
void onReceiverTimeout(const cdrMemoryStream &data)
Notify an ON_RECEIVER_TIMEOUT event to listeners.
void onReceiverError(const cdrMemoryStream &data)
Notify an ON_RECEIVER_ERROR event to listeners.
virtual ReturnCode activate()
activation
ReturnCode pushAll()
push "all" policy
void onBufferWrite(const cdrMemoryStream &data)
Notify an ON_BUFFER_WRITE event to listeners.
virtual bool setTask(TaskFuncBase *func, bool delete_in_dtor=true)=0
Setting task execution function.
static GlobalFactory< AbstractClass, Identifier, Compare, Creator, Destructor > & instance()
Create instance.
Definition: Singleton.h:131
GlobalFactory template class.
Enum
DataPortStatus return codes.
Definition: BufferStatus.h:84
virtual ReturnCode init(coil::Properties &prop)
Initialization.
void onBufferRead(const cdrMemoryStream &data)
Notify an ON_BUFFER_READ event to listeners.
#define RTC_DEBUG_STR(str)
Definition: SystemLogger.h:489
coil::PeriodicTaskBase * m_task
#define RTC_PARANOID(fmt)
Paranoid level log output macro.
Definition: SystemLogger.h:555
std::vector< Identifier > getIdentifiers()
Get factory ID list.
virtual ReturnCode get(DataType &value)=0
Read data from the buffer.
virtual ReturnCode setConsumer(InPortConsumer *consumer)
Store InPort consumer.
void setPushPolicy(const coil::Properties &prop)
Setting PushPolicy.
std::vector< std::string > vstring
Definition: stringutil.h:37
virtual ReturnCode deactivate()
deactivation
void onReceiverFull(const cdrMemoryStream &data)
Notify an ON_RECEIVER_FULL event to listeners.
virtual void executionMeasureCount(int n)=0
Task execute time measurement period.
ReturnCode invokeListener(DataPortStatus::Enum status, const cdrMemoryStream &data)
Call listeners according to the DataPortStatus.
virtual void activate()=0
Starting the task.
#define RTC_DEBUG(fmt)
Debug level log output macro.
Definition: SystemLogger.h:488
PeiodicTaskFactory class.
virtual void periodicMeasure(bool value)=0
Validate a Task period time measurement.
std::string flatten(vstring sv)
Create CSV file from the given string list.
Definition: stringutil.cpp:549
PublisherPeriodic class.
#define RTC_TRACE(fmt)
virtual void finalize()=0
Finalizing the task.
virtual ReturnCode setBuffer(CdrBufferBase *buffer)
Setting buffer pointer.
virtual bool isActive()
If publisher is active state.
ReturnCode pushFifo()
push "fifo" policy
PublisherPeriodic class.
virtual int svc(void)
Thread execution function.
AbstractClass * createObject(const Identifier &id)
Create factory object.
ReturnCode pushNew()
push "new" policy
ConnectorListeners * m_listeners
::RTC::BufferStatus::Enum ReturnCode
prop
Organization::get_organization_property ();.
void onReceived(const cdrMemoryStream &data)
Notify an ON_RECEIVED event to listeners.
InPortConsumer * m_consumer
void onBufferFull(const cdrMemoryStream &data)
Notify an ON_BUFFER_FULL event to listeners.
void Destructor(AbstractClass *&obj)
Destructor template.
Class represents a set of properties.
Definition: Properties.h:101
bool toBool(std::string str, std::string yes, std::string no, bool default_value)
Convert given string into bool value.
Definition: stringutil.cpp:410
virtual void setPeriod(double period)=0
Setting task execution period.
virtual void executionMeasure(bool value)=0
Validate a Task execute time measurement.
virtual ReturnCode advanceRptr(long int n=1)=0
Forward n reading pointers.
virtual void periodicMeasureCount(int n)=0
Task period time measurement count.
Base class of Publisher.
Definition: PublisherBase.h:63
ReturnCode convertReturn(BufferStatus::Enum status, const cdrMemoryStream &data)
Convertion from BufferStatus to DataPortStatus.
bool createTask(const coil::Properties &prop)
Setting Task.
ReturnCode pushSkip()
push "skip" policy
virtual size_t readable() const =0
Write data into the buffer.
RTComponent header.
virtual ReturnCode write(const DataType &value, long int sec=-1, long int nsec=-1)=0
Write data into the buffer.
virtual ReturnCode setListener(ConnectorInfo &info, ConnectorListeners *listeners)
Set the listener.
virtual ReturnCode put(const cdrMemoryStream &data)=0
Send data to the destination port.
virtual ReturnCode write(const cdrMemoryStream &data, unsigned long sec, unsigned long usec)
Write data.
BufferBase abstract class.
Definition: BufferBase.h:104
void PublisherPeriodicInit()
InPortConsumer class.
Enum
DataPortStatus return codes.
static const char * toString(DataPortStatus::Enum status)
Convert DataPortStatus into the string.
const std::string & getProperty(const std::string &key) const
Search for the property with the specified key in this property.
Definition: Properties.cpp:156
void onSend(const cdrMemoryStream &data)
Notify an ON_SEND event to listners.


openrtm_aist
Author(s): Noriaki Ando
autogenerated on Mon Jun 10 2019 14:07:54