PublisherNew.cpp
Go to the documentation of this file.
1 // -*- C++ -*-
20 #include <iostream>
21 #include <assert.h>
22 
23 #include <coil/Properties.h>
24 #include <coil/stringutil.h>
25 
26 #include <rtm/RTC.h>
27 #include <rtm/PublisherNew.h>
28 #include <rtm/InPortConsumer.h>
30 #include <rtm/idl/DataPortSkel.h>
31 #include <rtm/ConnectorListener.h>
32 
33 namespace RTC
34 {
43  : rtclog("PublisherNew"),
44  m_consumer(0), m_buffer(0), m_task(0), m_listeners(0),
45  m_retcode(PORT_OK), m_pushPolicy(NEW),
46  m_skipn(0), m_active(false), m_leftskip(0)
47  {
48  }
49 
58  {
59  RTC_TRACE(("~PublisherNew()"));
60  if (m_task != 0)
61  {
62  m_task->resume();
63  m_task->finalize();
64 
66  RTC_PARANOID(("task deleted."));
67  }
68 
69  // "consumer" should be deleted in the Connector
70  m_consumer = 0;
71  // "buffer" should be deleted in the Connector
72  m_buffer = 0;
73  }
74 
83  {
84  RTC_TRACE(("init()"));
85  RTC_DEBUG_STR((prop));
86 
87  setPushPolicy(prop);
88  if (!createTask(prop))
89  {
90  return INVALID_ARGS;
91  }
92  return PORT_OK;
93  }
94 
103  {
104  RTC_TRACE(("setConsumer()"));
105 
106  if (consumer == 0)
107  {
108  RTC_ERROR(("setConsumer(consumer = 0): invalid argument."));
109  return INVALID_ARGS;
110  }
111  m_consumer = consumer;
112  return PORT_OK;
113  }
114 
123  {
124  RTC_TRACE(("setBuffer()"));
125 
126  if (buffer == 0)
127  {
128  RTC_ERROR(("setBuffer(buffer == 0): invalid argument"));
129  return INVALID_ARGS;
130  }
131  m_buffer = buffer;
132  return PORT_OK;
133  }
134 
144  ConnectorListeners* listeners)
145  {
146  RTC_TRACE(("setListeners()"));
147 
148  if (listeners == 0)
149  {
150  RTC_ERROR(("setListeners(listeners == 0): invalid argument"));
151  return INVALID_ARGS;
152  }
153  m_profile = info;
154  m_listeners = listeners;
155  return PORT_OK;
156  }
157 
166  unsigned long sec,
167  unsigned long usec)
168  {
169  RTC_PARANOID(("write()"));
170 
171  if (m_consumer == 0) { return PRECONDITION_NOT_MET; }
172  if (m_buffer == 0) { return PRECONDITION_NOT_MET; }
173  if (m_listeners == 0) { return PRECONDITION_NOT_MET; }
174 
175  if (m_retcode == CONNECTION_LOST)
176  {
177  RTC_DEBUG(("write(): connection lost."));
178  return m_retcode;
179  }
180 
181  if (m_retcode == SEND_FULL)
182  {
183  RTC_DEBUG(("write(): InPort buffer is full."));
184  m_buffer->write(data, sec, usec);
185  m_task->signal();
186  return BUFFER_FULL;
187  }
188 
189  assert(m_buffer != 0);
190 
191  onBufferWrite(data);
192  CdrBufferBase::ReturnCode ret(m_buffer->write(data, sec, usec));
193 
194  m_task->signal();
195  RTC_DEBUG(("%s = write()", CdrBufferBase::toString(ret)));
196 
197  return convertReturn(ret, data);
198  }
199 
208  {
209  return m_active;
210  }
211 
220  {
221  m_active = true;
222  return PORT_OK;
223  }
224 
233  {
234  m_active = false;
235  return PORT_OK;
236  }
237 
246  {
247 
248  Guard guard(m_retmutex);
249  switch (m_pushPolicy)
250  {
251  case ALL:
252  m_retcode = pushAll();
253  break;
254  case FIFO:
255  m_retcode = pushFifo();
256  break;
257  case SKIP:
258  m_retcode = pushSkip();
259  break;
260  case NEW:
261  m_retcode = pushNew();
262  break;
263  default:
264  m_retcode = pushNew();
265  break;
266  }
267  return 0;
268  }
269 
278  {
279  // push_policy default: NEW
280  std::string push_policy = prop.getProperty("publisher.push_policy", "new");
281  RTC_DEBUG(("push_policy: %s", push_policy.c_str()));
282 
283  coil::normalize(push_policy);
284  if (push_policy == "all") { m_pushPolicy = ALL; }
285  else if (push_policy == "fifo") { m_pushPolicy = FIFO; }
286  else if (push_policy == "skip") { m_pushPolicy = SKIP; }
287  else if (push_policy == "new") { m_pushPolicy = NEW; }
288  else
289  {
290  RTC_ERROR(("invalid push_policy value: %s", push_policy.c_str()));
291  m_pushPolicy = NEW; // default push policy
292  }
293 
294  // skip_count default: 0
295  std::string skip_count = prop.getProperty("publisher.skip_count", "0");
296  RTC_DEBUG(("skip_count: %s", skip_count.c_str()));
297 
298  if (!coil::stringTo(m_skipn, skip_count.c_str()))
299  {
300  RTC_ERROR(("invalid skip_count value: %s", skip_count.c_str()));
301  m_skipn = 0; // default skip count
302  }
303  if (m_skipn < 0)
304  {
305  RTC_ERROR(("invalid skip_count value: %d", m_skipn));
306  m_skipn = 0; // default skip count
307  }
308  }
309 
318  {
320  coil::vstring th = factory.getIdentifiers();
321  RTC_DEBUG(("available task types: %s", coil::flatten(th).c_str()));
322 
323  m_task = factory.createObject(prop.getProperty("thread_type", "default"));
324  if (m_task == 0)
325  {
326  RTC_ERROR(("Task creation failed: %s",
327  prop.getProperty("thread_type", "default").c_str()));
328  return false;
329  }
330  RTC_PARANOID(("Task creation succeeded."));
331 
332  // setting task function
334  m_task->setPeriod(0.0);
335  m_task->executionMeasure(coil::toBool(prop["measurement.exec_time"],
336  "enable", "disable", true));
337 
338  int ecount;
339  if (coil::stringTo(ecount, prop["measurement.exec_count"].c_str()))
340  {
341  m_task->executionMeasureCount(ecount);
342  }
343 
344  m_task->periodicMeasure(coil::toBool(prop["measurement.period_time"],
345  "enable", "disable", true));
346  int pcount;
347  if (coil::stringTo(pcount, prop["measurement.period_count"].c_str()))
348  {
349  m_task->periodicMeasureCount(pcount);
350  }
351 
352  // Start task in suspended mode
353  m_task->suspend();
354  m_task->activate();
355  m_task->suspend();
356 
357  return true;
358  }
359 
360 
365  {
366  RTC_TRACE(("pushAll()"));
367 
368  while (m_buffer->readable() > 0)
369  {
370  cdrMemoryStream& cdr(m_buffer->get());
371  onBufferRead(cdr);
372 
373  onSend(cdr);
374  ReturnCode ret(m_consumer->put(cdr));
375  if (ret != PORT_OK)
376  {
377  RTC_DEBUG(("%s = consumer.put()", DataPortStatus::toString(ret)));
378  return invokeListener(ret, cdr);
379  }
380  onReceived(cdr);
381 
383  }
384  return PORT_OK;
385  }
386 
391  {
392  RTC_TRACE(("pushFifo()"));
393 
394  cdrMemoryStream& cdr(m_buffer->get());
395  onBufferRead(cdr);
396 
397  onSend(cdr);
398  ReturnCode ret(m_consumer->put(cdr));
399  if (ret != PORT_OK)
400  {
401  RTC_DEBUG(("%s = consumer.put()", DataPortStatus::toString(ret)));
402  return invokeListener(ret, cdr);
403  }
404  onReceived(cdr);
405 
407 
408  return PORT_OK;
409  }
410 
415  {
416  RTC_TRACE(("pushSkip()"));
417 
419  int preskip(m_buffer->readable() + m_leftskip);
420  int loopcnt(preskip/(m_skipn +1));
421  int postskip(m_skipn - m_leftskip);
422  for (int i(0); i < loopcnt; ++i)
423  {
424  m_buffer->advanceRptr(postskip);
425 
426  const cdrMemoryStream& cdr(m_buffer->get());
427  onBufferRead(cdr);
428 
429  onSend(cdr);
430  ret = m_consumer->put(cdr);
431  if (ret != PORT_OK)
432  {
433  m_buffer->advanceRptr(-postskip);
434  RTC_DEBUG(("%s = consumer.put()", DataPortStatus::toString(ret)));
435  return invokeListener(ret, cdr);
436  }
437  onReceived(cdr);
438  postskip = m_skipn + 1;
439  }
441  if (loopcnt == 0)
442  { // Not put
443  m_leftskip = preskip % (m_skipn +1);
444  }
445  else
446  {
447  if ( m_retcode != PORT_OK )
448  { // put Error after
449  m_leftskip = 0;
450  }
451  else
452  { // put OK after
453  m_leftskip = preskip % (m_skipn +1);
454  }
455  }
456  return ret;
457  }
458 
463  {
464  RTC_TRACE(("pushNew()"));
465 
467 
468  cdrMemoryStream& cdr(m_buffer->get());
469  onBufferRead(cdr);
470 
471  onSend(cdr);
472  ReturnCode ret(m_consumer->put(cdr));
473  if (ret != PORT_OK)
474  {
475  RTC_DEBUG(("%s = consumer.put()", DataPortStatus::toString(ret)));
476  return invokeListener(ret, cdr);
477  }
478  onReceived(cdr);
479 
481 
482  return PORT_OK;
483  }
484 
494  const cdrMemoryStream& data)
495  {
496  /*
497  * BufferStatus -> DataPortStatus
498  *
499  * BUFFER_OK -> PORT_OK
500  * BUFFER_ERROR -> BUFFER_ERROR
501  * BUFFER_FULL -> BUFFER_FULL
502  * NOT_SUPPORTED -> PORT_ERROR
503  * TIMEOUT -> BUFFER_TIMEOUT
504  * PRECONDITION_NOT_MET -> PRECONDITION_NOT_MET
505  */
506  switch (status)
507  {
509  // no callback
512  // no callback
515  onBufferFull(data);
518  // no callback
521  onBufferWriteTimeout(data);
524  // no callback
526  default:
527  // no callback
529  }
531  }
532 
542  const cdrMemoryStream& data)
543  {
544  // ret:
545  // PORT_OK, PORT_ERROR, SEND_FULL, SEND_TIMEOUT, CONNECTION_LOST,
546  // UNKNOWN_ERROR
547  switch (status)
548  {
549  case PORT_ERROR:
550  onReceiverError(data);
551  return PORT_ERROR;
552 
553  case SEND_FULL:
554  onReceiverFull(data);
555  return SEND_FULL;
556 
557  case SEND_TIMEOUT:
558  onReceiverTimeout(data);
559  return SEND_TIMEOUT;
560 
561  case CONNECTION_LOST:
562  onReceiverError(data);
563  return CONNECTION_LOST;
564 
565  case UNKNOWN_ERROR:
566  onReceiverError(data);
567  return UNKNOWN_ERROR;
568 
569  default:
570  onReceiverError(data);
571  return PORT_ERROR;
572  }
573  }
574 
575 }; // namespace RTC
576 
577 extern "C"
578 {
580  {
582  instance().addFactory("new",
586  ::RTC::PublisherNew>);
587  }
588 };
ConnectorListeners class.
InPortConsumer abstract class.
virtual ReturnCode setListener(ConnectorInfo &info, ConnectorListeners *listeners)
Set the listener.
#define RTC_ERROR(fmt)
Error log output macro.
Definition: SystemLogger.h:422
virtual ~PublisherNew(void)
Destructor.
std::string normalize(std::string &str)
Erase the head/tail blank and replace upper case to lower case.
Definition: stringutil.cpp:303
ReturnCode pushFifo()
push "fifo" policy
ReturnCode pushSkip()
push "skip" policy
ConnectorListeners * m_listeners
Definition: PublisherNew.h:725
virtual int resume(void)=0
Resuming the suspended task.
RT-Component.
virtual ReturnCode write(const cdrMemoryStream &data, unsigned long sec, unsigned long usec)
Write data.
bool stringTo(To &val, const char *str)
Convert the given std::string to object.
Definition: stringutil.h:597
virtual ReturnCode deactivate()
deactivation
void PublisherNewInit()
AbstractClass * Creator()
Creator template.
DATAPORTSTATUS_ENUM PublisherNew()
Constructor.
virtual ReturnCode setBuffer(CdrBufferBase *buffer)
Setting buffer pointer.
void onBufferWrite(const cdrMemoryStream &data)
Notify an ON_BUFFER_WRITE event to listeners.
Definition: PublisherNew.h:562
RTC::ReturnCode_t ret(RTC::Local::ReturnCode_t r)
static const char * toString(Enum status)
Convert BufferStatus into the string.
Definition: BufferStatus.h:118
virtual int suspend(void)=0
Suspending the task.
virtual ReturnCode init(coil::Properties &prop)
Initialization.
virtual bool isActive()
If publisher is active state.
void onBufferFull(const cdrMemoryStream &data)
Notify an ON_BUFFER_FULL event to listeners.
Definition: PublisherNew.h:577
virtual bool setTask(TaskFuncBase *func, bool delete_in_dtor=true)=0
Setting task execution function.
static GlobalFactory< AbstractClass, Identifier, Compare, Creator, Destructor > & instance()
Create instance.
Definition: Singleton.h:131
GlobalFactory template class.
Enum
DataPortStatus return codes.
Definition: BufferStatus.h:84
#define RTC_DEBUG_STR(str)
Definition: SystemLogger.h:489
virtual ReturnCode activate()
activation
#define RTC_PARANOID(fmt)
Paranoid level log output macro.
Definition: SystemLogger.h:555
std::vector< Identifier > getIdentifiers()
Get factory ID list.
virtual ReturnCode get(DataType &value)=0
Read data from the buffer.
std::vector< std::string > vstring
Definition: stringutil.h:37
void onBufferWriteTimeout(const cdrMemoryStream &data)
Notify an ON_BUFFER_WRITE_TIMEOUT event to listeners.
Definition: PublisherNew.h:592
virtual void executionMeasureCount(int n)=0
Task execute time measurement period.
void setPushPolicy(const coil::Properties &prop)
Setting PushPolicy.
virtual void signal()=0
Executing the suspended task one tick.
virtual void activate()=0
Starting the task.
#define RTC_DEBUG(fmt)
Debug level log output macro.
Definition: SystemLogger.h:488
PeiodicTaskFactory class.
void onBufferRead(const cdrMemoryStream &data)
Notify an ON_BUFFER_READ event to listeners.
Definition: PublisherNew.h:622
void onReceiverError(const cdrMemoryStream &data)
Notify an ON_RECEIVER_ERROR event to listeners.
Definition: PublisherNew.h:697
virtual void periodicMeasure(bool value)=0
Validate a Task period time measurement.
std::string flatten(vstring sv)
Create CSV file from the given string list.
Definition: stringutil.cpp:549
#define RTC_TRACE(fmt)
virtual ReturnCode setConsumer(InPortConsumer *consumer)
Store InPort consumer.
virtual void finalize()=0
Finalizing the task.
bool createTask(const coil::Properties &prop)
Setting Task.
void onReceiverTimeout(const cdrMemoryStream &data)
Notify an ON_RECEIVER_TIMEOUT event to listeners.
Definition: PublisherNew.h:682
AbstractClass * createObject(const Identifier &id)
Create factory object.
::RTC::BufferStatus::Enum ReturnCode
prop
Organization::get_organization_property ();.
CdrBufferBase * m_buffer
Definition: PublisherNew.h:722
void Destructor(AbstractClass *&obj)
Destructor template.
Class represents a set of properties.
Definition: Properties.h:101
bool toBool(std::string str, std::string yes, std::string no, bool default_value)
Convert given string into bool value.
Definition: stringutil.cpp:410
virtual void setPeriod(double period)=0
Setting task execution period.
virtual void executionMeasure(bool value)=0
Validate a Task execute time measurement.
coil::PeriodicTaskBase * m_task
Definition: PublisherNew.h:724
virtual ReturnCode advanceRptr(long int n=1)=0
Forward n reading pointers.
ReturnCode convertReturn(BufferStatus::Enum status, const cdrMemoryStream &data)
Convertion from BufferStatus to DataPortStatus.
virtual void periodicMeasureCount(int n)=0
Task period time measurement count.
Base class of Publisher.
Definition: PublisherBase.h:63
virtual size_t readable() const =0
Write data into the buffer.
RTComponent header.
ReturnCode m_retcode
Definition: PublisherNew.h:726
PublisherNew class.
Definition: PublisherNew.h:69
void onReceiverFull(const cdrMemoryStream &data)
Notify an ON_RECEIVER_FULL event to listeners.
Definition: PublisherNew.h:667
virtual ReturnCode write(const DataType &value, long int sec=-1, long int nsec=-1)=0
Write data into the buffer.
virtual ReturnCode put(const cdrMemoryStream &data)=0
Send data to the destination port.
ReturnCode pushAll()
push "all" policy
BufferBase abstract class.
Definition: BufferBase.h:104
void onReceived(const cdrMemoryStream &data)
Notify an ON_RECEIVED event to listeners.
Definition: PublisherNew.h:652
virtual int svc(void)
Thread execution function.
ReturnCode pushNew()
push "new" policy
InPortConsumer class.
connector listener class
PublisherNew class.
void onSend(const cdrMemoryStream &data)
Notify an ON_SEND event to listners.
Definition: PublisherNew.h:637
Enum
DataPortStatus return codes.
static const char * toString(DataPortStatus::Enum status)
Convert DataPortStatus into the string.
const std::string & getProperty(const std::string &key) const
Search for the property with the specified key in this property.
Definition: Properties.cpp:156
ConnectorInfo m_profile
Definition: PublisherNew.h:723
ReturnCode invokeListener(DataPortStatus::Enum status, const cdrMemoryStream &data)
Call listeners according to the DataPortStatus.
InPortConsumer * m_consumer
Definition: PublisherNew.h:721


openrtm_aist
Author(s): Noriaki Ando
autogenerated on Thu Jun 6 2019 19:25:59