00001
00019 #include <rtm/RTC.h>
00020 #include <coil/Properties.h>
00021 #include <coil/Time.h>
00022 #include <coil/stringutil.h>
00023 #include <rtm/PublisherPeriodic.h>
00024 #include <rtm/InPortConsumer.h>
00025 #include <rtm/RTC.h>
00026 #include <stdlib.h>
00027 #include <rtm/idl/DataPortSkel.h>
00028 #include <rtm/PeriodicTaskFactory.h>
00029 #include <rtm/SystemLogger.h>
00030
00031 namespace RTC
00032 {
00040 PublisherPeriodic::PublisherPeriodic()
00041 : rtclog("PublisherPeriodic"),
00042 m_consumer(0), m_buffer(0), m_task(0), m_listeners(0),
00043 m_retcode(PORT_OK), m_pushPolicy(NEW),
00044 m_skipn(0), m_active(false), m_readback(false), m_leftskip(0)
00045 {
00046 }
00047
00048
00056 PublisherPeriodic::~PublisherPeriodic()
00057 {
00058 RTC_TRACE(("~PublisherPeriodic()"));
00059 if (m_task != 0)
00060 {
00061 m_task->resume();
00062 m_task->finalize();
00063 RTC_PARANOID(("task finalized."));
00064
00065 RTC::PeriodicTaskFactory::instance().deleteObject(m_task);
00066 RTC_PARANOID(("task deleted."));
00067 }
00068
00069
00070 m_consumer = 0;
00071
00072 m_buffer = 0;
00073 }
00074
00082 PublisherBase::ReturnCode PublisherPeriodic::init(coil::Properties& prop)
00083 {
00084 RTC_TRACE(("init()"));
00085 RTC_DEBUG_STR((prop));
00086
00087 setPushPolicy(prop);
00088 if (!createTask(prop))
00089 {
00090 return INVALID_ARGS;
00091 }
00092 return PORT_OK;
00093
00094
00095
00096
00097
00098 return PORT_OK;
00099 }
00100
00108 PublisherBase::ReturnCode
00109 PublisherPeriodic::setConsumer(InPortConsumer* consumer)
00110 {
00111 RTC_TRACE(("setConsumer()"));
00112
00113 if (consumer == 0)
00114 {
00115 RTC_ERROR(("setConsumer(consumer = 0): invalid argument."));
00116 return INVALID_ARGS;
00117 }
00118 m_consumer = consumer;
00119 return PORT_OK;
00120 }
00121
00129 PublisherBase::ReturnCode PublisherPeriodic::setBuffer(CdrBufferBase* buffer)
00130 {
00131 RTC_TRACE(("setBuffer()"));
00132
00133 if (buffer == 0)
00134 {
00135 RTC_ERROR(("setBuffer(buffer == 0): invalid argument"));
00136 return INVALID_ARGS;
00137 }
00138 m_buffer = buffer;
00139 return PORT_OK;
00140 }
00141
00149 PublisherBase::ReturnCode
00150 PublisherPeriodic::setListener(ConnectorInfo& info,
00151 ConnectorListeners* listeners)
00152 {
00153 RTC_TRACE(("setListeners()"));
00154
00155 if (listeners == 0)
00156 {
00157 RTC_ERROR(("setListeners(listeners == 0): invalid argument"));
00158 return INVALID_ARGS;
00159 }
00160 m_profile = info;
00161 m_listeners = listeners;
00162 return PORT_OK;
00163 }
00164
00172 PublisherBase::ReturnCode
00173 PublisherPeriodic::write(const cdrMemoryStream& data,
00174 unsigned long sec,
00175 unsigned long usec)
00176 {
00177 RTC_PARANOID(("write()"));
00178
00179 if (m_consumer == 0) { return PRECONDITION_NOT_MET; }
00180 if (m_buffer == 0) { return PRECONDITION_NOT_MET; }
00181 if (m_listeners == 0) { return PRECONDITION_NOT_MET; }
00182
00183 if (m_retcode == CONNECTION_LOST)
00184 {
00185 RTC_DEBUG(("write(): connection lost."));
00186 return m_retcode;
00187 }
00188
00189 if (m_retcode == SEND_FULL)
00190 {
00191 RTC_DEBUG(("write(): InPort buffer is full."));
00192 m_buffer->write(data, sec, usec);
00193 return BUFFER_FULL;
00194 }
00195
00196 onBufferWrite(data);
00197 CdrBufferBase::ReturnCode ret(m_buffer->write(data, sec, usec));
00198 RTC_DEBUG(("%s = write()", CdrBufferBase::toString(ret)));
00199 m_task->resume();
00200 return convertReturn(ret, data);
00201 }
00202
00210 bool PublisherPeriodic::isActive()
00211 {
00212 return m_active;
00213 }
00214
00222 PublisherBase::ReturnCode PublisherPeriodic::activate()
00223 {
00224 if (m_task == 0) { return PRECONDITION_NOT_MET; }
00225 if (m_buffer == 0) { return PRECONDITION_NOT_MET; }
00226 m_active = true;
00227 m_task->resume();
00228 return PORT_OK;
00229 }
00230
00238 PublisherBase::ReturnCode PublisherPeriodic::deactivate()
00239 {
00240 if (m_task == 0) { return PRECONDITION_NOT_MET; }
00241 m_active = false;
00242 m_task->suspend();
00243 return PORT_OK;
00244 }
00245
00253 int PublisherPeriodic::svc(void)
00254 {
00255 Guard guard(m_retmutex);
00256 switch (m_pushPolicy)
00257 {
00258 case ALL:
00259 m_retcode = pushAll();
00260 break;
00261 case FIFO:
00262 m_retcode = pushFifo();
00263 break;
00264 case SKIP:
00265 m_retcode = pushSkip();
00266 break;
00267 case NEW:
00268 m_retcode = pushNew();
00269 break;
00270 default:
00271 m_retcode = pushNew();
00272 break;
00273 }
00274 return 0;
00275 }
00276
00280 PublisherBase::ReturnCode PublisherPeriodic::pushAll()
00281 {
00282 RTC_TRACE(("pushAll()"));
00283 if (bufferIsEmpty()) { return BUFFER_EMPTY; }
00284
00285 while (m_buffer->readable() > 0)
00286 {
00287 const cdrMemoryStream& cdr(m_buffer->get());
00288 onBufferRead(cdr);
00289
00290 onSend(cdr);
00291 ReturnCode ret(m_consumer->put(cdr));
00292 if (ret != PORT_OK)
00293 {
00294 RTC_DEBUG(("%s = consumer.put()", DataPortStatus::toString(ret)));
00295 return invokeListener(ret, cdr);
00296 }
00297 onReceived(cdr);
00298
00299 m_buffer->advanceRptr();
00300
00301 }
00302 return PORT_OK;
00303 }
00304
00308 PublisherBase::ReturnCode PublisherPeriodic::pushFifo()
00309 {
00310 RTC_TRACE(("pushFifo()"));
00311 if (bufferIsEmpty()) { return BUFFER_EMPTY; }
00312
00313 const cdrMemoryStream& cdr(m_buffer->get());
00314 onBufferRead(cdr);
00315
00316 onSend(cdr);
00317 ReturnCode ret(m_consumer->put(cdr));
00318
00319 if (ret != PORT_OK)
00320 {
00321 RTC_DEBUG(("%s = consumer.put()", DataPortStatus::toString(ret)));
00322 return invokeListener(ret, cdr);
00323 }
00324 onReceived(cdr);
00325
00326 m_buffer->advanceRptr();
00327
00328 return PORT_OK;
00329 }
00330
00334 PublisherBase::ReturnCode PublisherPeriodic::pushSkip()
00335 {
00336 RTC_TRACE(("pushSkip()"));
00337 if (bufferIsEmpty()) { return BUFFER_EMPTY; }
00338
00339 ReturnCode ret(PORT_OK);
00340 int readable = m_buffer->readable();
00341 int preskip(readable + m_leftskip);
00342 int loopcnt(preskip/(m_skipn +1));
00343 int postskip(m_skipn - m_leftskip);
00344 for (int i(0); i < loopcnt; ++i)
00345 {
00346 m_buffer->advanceRptr(postskip);
00347 readable -= postskip;
00348
00349 const cdrMemoryStream& cdr(m_buffer->get());
00350 onBufferRead(cdr);
00351
00352 onSend(cdr);
00353 ret = m_consumer->put(cdr);
00354 if (ret != PORT_OK)
00355 {
00356 m_buffer->advanceRptr(-postskip);
00357 RTC_DEBUG(("%s = consumer.put()", DataPortStatus::toString(ret)));
00358 return invokeListener(ret, cdr);
00359 }
00360 onReceived(cdr);
00361 postskip = m_skipn + 1;
00362 }
00363
00364 m_buffer->advanceRptr(readable);
00365 m_leftskip = preskip % (m_skipn +1);
00366
00367 return ret;
00368 }
00369
00373 PublisherBase::ReturnCode PublisherPeriodic::pushNew()
00374 {
00375 RTC_TRACE(("pushNew()"));
00376 if (bufferIsEmpty()) { return BUFFER_EMPTY; }
00377
00378
00379
00380
00381 m_readback = true;
00382 m_buffer->advanceRptr(m_buffer->readable() - 1);
00383
00384 const cdrMemoryStream& cdr(m_buffer->get());
00385 onBufferRead(cdr);
00386
00387 onSend(cdr);
00388 ReturnCode ret(m_consumer->put(cdr));
00389 if (ret != PORT_OK)
00390 {
00391 RTC_DEBUG(("%s = consumer.put()", DataPortStatus::toString(ret)));
00392 return invokeListener(ret, cdr);
00393 }
00394 onReceived(cdr);
00395
00396 m_buffer->advanceRptr();
00397
00398 return PORT_OK;
00399 }
00400
00408 void PublisherPeriodic::setPushPolicy(const coil::Properties& prop)
00409 {
00410
00411 std::string push_policy = prop.getProperty("publisher.push_policy", "new");
00412 RTC_DEBUG(("push_policy: %s", push_policy.c_str()));
00413
00414 coil::normalize(push_policy);
00415 if (push_policy == "all") { m_pushPolicy = ALL; }
00416 else if (push_policy == "fifo") { m_pushPolicy = FIFO; }
00417 else if (push_policy == "skip") { m_pushPolicy = SKIP; }
00418 else if (push_policy == "new") { m_pushPolicy = NEW; }
00419 else
00420 {
00421 RTC_ERROR(("invalid push_policy value: %s", push_policy.c_str()));
00422 m_pushPolicy = NEW;
00423 }
00424
00425
00426 std::string skip_count = prop.getProperty("publisher.skip_count", "0");
00427 RTC_DEBUG(("skip_count: %s", skip_count.c_str()));
00428
00429 if (!coil::stringTo(m_skipn, skip_count.c_str()))
00430 {
00431 RTC_ERROR(("invalid skip_count value: %s", skip_count.c_str()));
00432 m_skipn = 0;
00433 }
00434 if (m_skipn < 0)
00435 {
00436 RTC_ERROR(("invalid skip_count value: %d", m_skipn));
00437 m_skipn = 0;
00438 }
00439 }
00440
00448 bool PublisherPeriodic::createTask(const coil::Properties& prop)
00449 {
00450 RTC::PeriodicTaskFactory& factory(RTC::PeriodicTaskFactory::instance());
00451
00452
00453 coil::vstring th = factory.getIdentifiers();
00454 RTC_DEBUG(("available task types: %s", coil::flatten(th).c_str()));
00455
00456 m_task = factory.createObject(prop.getProperty("thread_type", "default"));
00457 if (m_task == 0)
00458 {
00459 RTC_ERROR(("Task creation failed: %s",
00460 prop.getProperty("thread_type", "default").c_str()));
00461 return INVALID_ARGS;
00462 }
00463 m_task->setTask(this, &PublisherPeriodic::svc);
00464 RTC_PARANOID(("Task creation succeeded."));
00465
00466
00467 double hz;
00468 if (!coil::stringTo(hz, prop["publisher.push_rate"].c_str()) &&
00469 !coil::stringTo(hz, prop["push_rate"].c_str()))
00470 {
00471 RTC_ERROR(("publisher.push_rate/push_rate were not found."));
00472 return false;
00473 }
00474
00475 if (hz <= 0)
00476 {
00477 RTC_ERROR(("invalid period: %f [s]", hz));
00478 return false;
00479 }
00480 m_task->setPeriod(1.0/hz);
00481
00482
00483 m_task->executionMeasure(coil::toBool(prop["measurement.exec_time"],
00484 "enable", "disable", true));
00485
00486 int ecount;
00487 if (coil::stringTo(ecount, prop["measurement.exec_count"].c_str()))
00488 {
00489 m_task->executionMeasureCount(ecount);
00490 }
00491
00492 m_task->periodicMeasure(coil::toBool(prop["measurement.period_time"],
00493 "enable", "disable", true));
00494 int pcount;
00495 if (coil::stringTo(pcount, prop["measurement.period_count"].c_str()))
00496 {
00497 m_task->periodicMeasureCount(pcount);
00498 }
00499
00500
00501 m_task->suspend();
00502 m_task->activate();
00503 m_task->suspend();
00504
00505 return true;
00506 }
00507
00515 PublisherBase::ReturnCode
00516 PublisherPeriodic::convertReturn(BufferStatus::Enum status,
00517 const cdrMemoryStream& data)
00518 {
00519
00520
00521
00522
00523
00524
00525
00526
00527
00528
00529 switch (status)
00530 {
00531 case BufferStatus::BUFFER_OK:
00532
00533 return DataPortStatus::PORT_OK;
00534 case BufferStatus::BUFFER_ERROR:
00535
00536 return DataPortStatus::BUFFER_ERROR;
00537 case BufferStatus::BUFFER_FULL:
00538 onBufferFull(data);
00539 return DataPortStatus::BUFFER_FULL;
00540 case BufferStatus::NOT_SUPPORTED:
00541
00542 return DataPortStatus::PORT_ERROR;
00543 case BufferStatus::TIMEOUT:
00544 onBufferWriteTimeout(data);
00545 return DataPortStatus::BUFFER_TIMEOUT;
00546 case BufferStatus::PRECONDITION_NOT_MET:
00547
00548 return DataPortStatus::PRECONDITION_NOT_MET;
00549 default:
00550
00551 return DataPortStatus::PORT_ERROR;
00552 }
00553 return DataPortStatus::PORT_ERROR;
00554 }
00555
00563 PublisherPeriodic::ReturnCode
00564 PublisherPeriodic::invokeListener(DataPortStatus::Enum status,
00565 const cdrMemoryStream& data)
00566 {
00567
00568
00569
00570 switch (status)
00571 {
00572 case PORT_ERROR:
00573 onReceiverError(data);
00574 return PORT_ERROR;
00575
00576 case SEND_FULL:
00577 onReceiverFull(data);
00578 return SEND_FULL;
00579
00580 case SEND_TIMEOUT:
00581 onReceiverTimeout(data);
00582 return SEND_TIMEOUT;
00583
00584 case CONNECTION_LOST:
00585 onReceiverError(data);
00586 return CONNECTION_LOST;
00587
00588 case UNKNOWN_ERROR:
00589 onReceiverError(data);
00590 return UNKNOWN_ERROR;
00591
00592 default:
00593 onReceiverError(data);
00594 return PORT_ERROR;
00595 }
00596 }
00597
00598 };
00599
00600 extern "C"
00601 {
00602 void PublisherPeriodicInit()
00603 {
00604 ::RTC::PublisherFactory::
00605 instance().addFactory("periodic",
00606 ::coil::Creator< ::RTC::PublisherBase,
00607 ::RTC::PublisherPeriodic>,
00608 ::coil::Destructor< ::RTC::PublisherBase,
00609 ::RTC::PublisherPeriodic>);
00610 }
00611 };