00001
00020 #include <iostream>
00021 #include <assert.h>
00022
00023 #include <coil/Properties.h>
00024 #include <coil/stringutil.h>
00025
00026 #include <rtm/RTC.h>
00027 #include <rtm/PublisherNew.h>
00028 #include <rtm/InPortConsumer.h>
00029 #include <rtm/PeriodicTaskFactory.h>
00030 #include <rtm/idl/DataPortSkel.h>
00031 #include <rtm/ConnectorListener.h>
00032
00033 namespace RTC
00034 {
00042 PublisherNew::PublisherNew()
00043 : rtclog("PublisherNew"),
00044 m_consumer(0), m_buffer(0), m_task(0), m_listeners(0),
00045 m_retcode(PORT_OK), m_pushPolicy(NEW),
00046 m_skipn(0), m_active(false), m_leftskip(0)
00047 {
00048 }
00049
00057 PublisherNew::~PublisherNew()
00058 {
00059 RTC_TRACE(("~PublisherNew()"));
00060 if (m_task != 0)
00061 {
00062 m_task->resume();
00063 m_task->finalize();
00064
00065 RTC::PeriodicTaskFactory::instance().deleteObject(m_task);
00066 RTC_PARANOID(("task deleted."));
00067 }
00068
00069
00070 m_consumer = 0;
00071
00072 m_buffer = 0;
00073 }
00074
00082 PublisherBase::ReturnCode PublisherNew::init(coil::Properties& prop)
00083 {
00084 RTC_TRACE(("init()"));
00085 RTC_DEBUG_STR((prop));
00086
00087 setPushPolicy(prop);
00088 if (!createTask(prop))
00089 {
00090 return INVALID_ARGS;
00091 }
00092 return PORT_OK;
00093 }
00094
00102 PublisherBase::ReturnCode PublisherNew::setConsumer(InPortConsumer* consumer)
00103 {
00104 RTC_TRACE(("setConsumer()"));
00105
00106 if (consumer == 0)
00107 {
00108 RTC_ERROR(("setConsumer(consumer = 0): invalid argument."));
00109 return INVALID_ARGS;
00110 }
00111 m_consumer = consumer;
00112 return PORT_OK;
00113 }
00114
00122 PublisherBase::ReturnCode PublisherNew::setBuffer(CdrBufferBase* buffer)
00123 {
00124 RTC_TRACE(("setBuffer()"));
00125
00126 if (buffer == 0)
00127 {
00128 RTC_ERROR(("setBuffer(buffer == 0): invalid argument"));
00129 return INVALID_ARGS;
00130 }
00131 m_buffer = buffer;
00132 return PORT_OK;
00133 }
00134
00142 PublisherBase::ReturnCode
00143 PublisherNew::setListener(ConnectorInfo& info,
00144 ConnectorListeners* listeners)
00145 {
00146 RTC_TRACE(("setListeners()"));
00147
00148 if (listeners == 0)
00149 {
00150 RTC_ERROR(("setListeners(listeners == 0): invalid argument"));
00151 return INVALID_ARGS;
00152 }
00153 m_profile = info;
00154 m_listeners = listeners;
00155 return PORT_OK;
00156 }
00157
00165 PublisherBase::ReturnCode PublisherNew::write(const cdrMemoryStream& data,
00166 unsigned long sec,
00167 unsigned long usec)
00168 {
00169 RTC_PARANOID(("write()"));
00170
00171 if (m_consumer == 0) { return PRECONDITION_NOT_MET; }
00172 if (m_buffer == 0) { return PRECONDITION_NOT_MET; }
00173 if (m_listeners == 0) { return PRECONDITION_NOT_MET; }
00174
00175 if (m_retcode == CONNECTION_LOST)
00176 {
00177 RTC_DEBUG(("write(): connection lost."));
00178 return m_retcode;
00179 }
00180
00181 if (m_retcode == SEND_FULL)
00182 {
00183 RTC_DEBUG(("write(): InPort buffer is full."));
00184 m_buffer->write(data, sec, usec);
00185 m_task->signal();
00186 return BUFFER_FULL;
00187 }
00188
00189 assert(m_buffer != 0);
00190
00191 onBufferWrite(data);
00192 CdrBufferBase::ReturnCode ret(m_buffer->write(data, sec, usec));
00193
00194 m_task->signal();
00195 RTC_DEBUG(("%s = write()", CdrBufferBase::toString(ret)));
00196
00197 return convertReturn(ret, data);
00198 }
00199
00207 bool PublisherNew::isActive()
00208 {
00209 return m_active;
00210 }
00211
00219 PublisherBase::ReturnCode PublisherNew::activate()
00220 {
00221 m_active = true;
00222 return PORT_OK;
00223 }
00224
00232 PublisherBase::ReturnCode PublisherNew::deactivate()
00233 {
00234 m_active = false;
00235 return PORT_OK;
00236 }
00237
00245 int PublisherNew::svc(void)
00246 {
00247
00248 Guard guard(m_retmutex);
00249 switch (m_pushPolicy)
00250 {
00251 case ALL:
00252 m_retcode = pushAll();
00253 break;
00254 case FIFO:
00255 m_retcode = pushFifo();
00256 break;
00257 case SKIP:
00258 m_retcode = pushSkip();
00259 break;
00260 case NEW:
00261 m_retcode = pushNew();
00262 break;
00263 default:
00264 m_retcode = pushNew();
00265 break;
00266 }
00267 return 0;
00268 }
00269
00277 void PublisherNew::setPushPolicy(const coil::Properties& prop)
00278 {
00279
00280 std::string push_policy = prop.getProperty("publisher.push_policy", "new");
00281 RTC_DEBUG(("push_policy: %s", push_policy.c_str()));
00282
00283 coil::normalize(push_policy);
00284 if (push_policy == "all") { m_pushPolicy = ALL; }
00285 else if (push_policy == "fifo") { m_pushPolicy = FIFO; }
00286 else if (push_policy == "skip") { m_pushPolicy = SKIP; }
00287 else if (push_policy == "new") { m_pushPolicy = NEW; }
00288 else
00289 {
00290 RTC_ERROR(("invalid push_policy value: %s", push_policy.c_str()));
00291 m_pushPolicy = NEW;
00292 }
00293
00294
00295 std::string skip_count = prop.getProperty("publisher.skip_count", "0");
00296 RTC_DEBUG(("skip_count: %s", skip_count.c_str()));
00297
00298 if (!coil::stringTo(m_skipn, skip_count.c_str()))
00299 {
00300 RTC_ERROR(("invalid skip_count value: %s", skip_count.c_str()));
00301 m_skipn = 0;
00302 }
00303 if (m_skipn < 0)
00304 {
00305 RTC_ERROR(("invalid skip_count value: %d", m_skipn));
00306 m_skipn = 0;
00307 }
00308 }
00309
00317 bool PublisherNew::createTask(const coil::Properties& prop)
00318 {
00319 RTC::PeriodicTaskFactory& factory(RTC::PeriodicTaskFactory::instance());
00320 coil::vstring th = factory.getIdentifiers();
00321 RTC_DEBUG(("available task types: %s", coil::flatten(th).c_str()));
00322
00323 m_task = factory.createObject(prop.getProperty("thread_type", "default"));
00324 if (m_task == 0)
00325 {
00326 RTC_ERROR(("Task creation failed: %s",
00327 prop.getProperty("thread_type", "default").c_str()));
00328 return false;
00329 }
00330 RTC_PARANOID(("Task creation succeeded."));
00331
00332
00333 m_task->setTask(this, &PublisherNew::svc);
00334 m_task->setPeriod(0.0);
00335 m_task->executionMeasure(coil::toBool(prop["measurement.exec_time"],
00336 "enable", "disable", true));
00337
00338 int ecount;
00339 if (coil::stringTo(ecount, prop["measurement.exec_count"].c_str()))
00340 {
00341 m_task->executionMeasureCount(ecount);
00342 }
00343
00344 m_task->periodicMeasure(coil::toBool(prop["measurement.period_time"],
00345 "enable", "disable", true));
00346 int pcount;
00347 if (coil::stringTo(pcount, prop["measurement.period_count"].c_str()))
00348 {
00349 m_task->periodicMeasureCount(pcount);
00350 }
00351
00352
00353 m_task->suspend();
00354 m_task->activate();
00355 m_task->suspend();
00356
00357 return true;
00358 }
00359
00360
00364 PublisherNew::ReturnCode PublisherNew::pushAll()
00365 {
00366 RTC_TRACE(("pushAll()"));
00367
00368 while (m_buffer->readable() > 0)
00369 {
00370 cdrMemoryStream& cdr(m_buffer->get());
00371 onBufferRead(cdr);
00372
00373 onSend(cdr);
00374 ReturnCode ret(m_consumer->put(cdr));
00375 if (ret != PORT_OK)
00376 {
00377 RTC_DEBUG(("%s = consumer.put()", DataPortStatus::toString(ret)));
00378 return invokeListener(ret, cdr);
00379 }
00380 onReceived(cdr);
00381
00382 m_buffer->advanceRptr();
00383 }
00384 return PORT_OK;
00385 }
00386
00390 PublisherNew::ReturnCode PublisherNew::pushFifo()
00391 {
00392 RTC_TRACE(("pushFifo()"));
00393
00394 cdrMemoryStream& cdr(m_buffer->get());
00395 onBufferRead(cdr);
00396
00397 onSend(cdr);
00398 ReturnCode ret(m_consumer->put(cdr));
00399 if (ret != PORT_OK)
00400 {
00401 RTC_DEBUG(("%s = consumer.put()", DataPortStatus::toString(ret)));
00402 return invokeListener(ret, cdr);
00403 }
00404 onReceived(cdr);
00405
00406 m_buffer->advanceRptr();
00407
00408 return PORT_OK;
00409 }
00410
00414 PublisherNew::ReturnCode PublisherNew::pushSkip()
00415 {
00416 RTC_TRACE(("pushSkip()"));
00417
00418 ReturnCode ret(PORT_OK);
00419 int preskip(m_buffer->readable() + m_leftskip);
00420 int loopcnt(preskip/(m_skipn +1));
00421 int postskip(m_skipn - m_leftskip);
00422 for (int i(0); i < loopcnt; ++i)
00423 {
00424 m_buffer->advanceRptr(postskip);
00425
00426 const cdrMemoryStream& cdr(m_buffer->get());
00427 onBufferRead(cdr);
00428
00429 onSend(cdr);
00430 ret = m_consumer->put(cdr);
00431 if (ret != PORT_OK)
00432 {
00433 m_buffer->advanceRptr(-postskip);
00434 RTC_DEBUG(("%s = consumer.put()", DataPortStatus::toString(ret)));
00435 return invokeListener(ret, cdr);
00436 }
00437 onReceived(cdr);
00438 postskip = m_skipn + 1;
00439 }
00440 m_buffer->advanceRptr(m_buffer->readable());
00441 if (loopcnt == 0)
00442 {
00443 m_leftskip = preskip % (m_skipn +1);
00444 }
00445 else
00446 {
00447 if ( m_retcode != PORT_OK )
00448 {
00449 m_leftskip = 0;
00450 }
00451 else
00452 {
00453 m_leftskip = preskip % (m_skipn +1);
00454 }
00455 }
00456 return ret;
00457 }
00458
00462 PublisherNew::ReturnCode PublisherNew::pushNew()
00463 {
00464 RTC_TRACE(("pushNew()"));
00465
00466 m_buffer->advanceRptr(m_buffer->readable() - 1);
00467
00468 cdrMemoryStream& cdr(m_buffer->get());
00469 onBufferRead(cdr);
00470
00471 onSend(cdr);
00472 ReturnCode ret(m_consumer->put(cdr));
00473 if (ret != PORT_OK)
00474 {
00475 RTC_DEBUG(("%s = consumer.put()", DataPortStatus::toString(ret)));
00476 return invokeListener(ret, cdr);
00477 }
00478 onReceived(cdr);
00479
00480 m_buffer->advanceRptr();
00481
00482 return PORT_OK;
00483 }
00484
00492 PublisherBase::ReturnCode
00493 PublisherNew::convertReturn(BufferStatus::Enum status,
00494 const cdrMemoryStream& data)
00495 {
00496
00497
00498
00499
00500
00501
00502
00503
00504
00505
00506 switch (status)
00507 {
00508 case BufferStatus::BUFFER_OK:
00509
00510 return DataPortStatus::PORT_OK;
00511 case BufferStatus::BUFFER_ERROR:
00512
00513 return DataPortStatus::BUFFER_ERROR;
00514 case BufferStatus::BUFFER_FULL:
00515 onBufferFull(data);
00516 return DataPortStatus::BUFFER_FULL;
00517 case BufferStatus::NOT_SUPPORTED:
00518
00519 return DataPortStatus::PORT_ERROR;
00520 case BufferStatus::TIMEOUT:
00521 onBufferWriteTimeout(data);
00522 return DataPortStatus::BUFFER_TIMEOUT;
00523 case BufferStatus::PRECONDITION_NOT_MET:
00524
00525 return DataPortStatus::PRECONDITION_NOT_MET;
00526 default:
00527
00528 return DataPortStatus::PORT_ERROR;
00529 }
00530 return DataPortStatus::PORT_ERROR;
00531 }
00532
00540 PublisherNew::ReturnCode
00541 PublisherNew::invokeListener(DataPortStatus::Enum status,
00542 const cdrMemoryStream& data)
00543 {
00544
00545
00546
00547 switch (status)
00548 {
00549 case PORT_ERROR:
00550 onReceiverError(data);
00551 return PORT_ERROR;
00552
00553 case SEND_FULL:
00554 onReceiverFull(data);
00555 return SEND_FULL;
00556
00557 case SEND_TIMEOUT:
00558 onReceiverTimeout(data);
00559 return SEND_TIMEOUT;
00560
00561 case CONNECTION_LOST:
00562 onReceiverError(data);
00563 return CONNECTION_LOST;
00564
00565 case UNKNOWN_ERROR:
00566 onReceiverError(data);
00567 return UNKNOWN_ERROR;
00568
00569 default:
00570 onReceiverError(data);
00571 return PORT_ERROR;
00572 }
00573 }
00574
00575 };
00576
00577 extern "C"
00578 {
00579 void PublisherNewInit()
00580 {
00581 ::RTC::PublisherFactory::
00582 instance().addFactory("new",
00583 ::coil::Creator< ::RTC::PublisherBase,
00584 ::RTC::PublisherNew>,
00585 ::coil::Destructor< ::RTC::PublisherBase,
00586 ::RTC::PublisherNew>);
00587 }
00588 };