PublisherNew.cpp
Go to the documentation of this file.
00001 // -*- C++ -*-
00020 #include <iostream>
00021 #include <assert.h>
00022 
00023 #include <coil/Properties.h>
00024 #include <coil/stringutil.h>
00025 
00026 #include <rtm/RTC.h>
00027 #include <rtm/PublisherNew.h>
00028 #include <rtm/InPortConsumer.h>
00029 #include <rtm/PeriodicTaskFactory.h>
00030 #include <rtm/idl/DataPortSkel.h>
00031 #include <rtm/ConnectorListener.h>
00032 
00033 namespace RTC
00034 {
00042   PublisherNew::PublisherNew()
00043     : rtclog("PublisherNew"),
00044       m_consumer(0), m_buffer(0), m_task(0), m_listeners(0),
00045       m_retcode(PORT_OK), m_pushPolicy(NEW),
00046       m_skipn(0), m_active(false), m_leftskip(0)
00047   {
00048   }
00049 
00057   PublisherNew::~PublisherNew()
00058   {
00059     RTC_TRACE(("~PublisherNew()"));
00060     if (m_task != 0)
00061       {
00062         m_task->resume();
00063         m_task->finalize();
00064 
00065         RTC::PeriodicTaskFactory::instance().deleteObject(m_task);
00066         RTC_PARANOID(("task deleted."));
00067       }
00068 
00069     // "consumer" should be deleted in the Connector
00070     m_consumer = 0;
00071     // "buffer"   should be deleted in the Connector
00072     m_buffer = 0;
00073   }
00074 
00082   PublisherBase::ReturnCode PublisherNew::init(coil::Properties& prop)
00083   {
00084     RTC_TRACE(("init()"));
00085     RTC_DEBUG_STR((prop));
00086     
00087     setPushPolicy(prop);
00088     if (!createTask(prop))
00089       {
00090         return INVALID_ARGS;
00091       }
00092     return PORT_OK;
00093   }
00094 
00102   PublisherBase::ReturnCode PublisherNew::setConsumer(InPortConsumer* consumer)
00103   {
00104     RTC_TRACE(("setConsumer()"));
00105     
00106     if (consumer == 0)
00107       {
00108         RTC_ERROR(("setConsumer(consumer = 0): invalid argument."));
00109         return INVALID_ARGS;
00110       }
00111     m_consumer = consumer;
00112     return PORT_OK;
00113   }
00114 
00122   PublisherBase::ReturnCode PublisherNew::setBuffer(CdrBufferBase* buffer)
00123   {
00124     RTC_TRACE(("setBuffer()"));
00125 
00126     if (buffer == 0)
00127       {
00128         RTC_ERROR(("setBuffer(buffer == 0): invalid argument"));
00129         return INVALID_ARGS;
00130       }
00131     m_buffer = buffer;
00132     return PORT_OK;
00133   }
00134 
00142   PublisherBase::ReturnCode
00143   PublisherNew::setListener(ConnectorInfo& info,
00144                             ConnectorListeners* listeners)
00145   {
00146     RTC_TRACE(("setListeners()"));
00147 
00148     if (listeners == 0)
00149       {
00150         RTC_ERROR(("setListeners(listeners == 0): invalid argument"));
00151         return INVALID_ARGS;
00152       }
00153     m_profile = info;
00154     m_listeners = listeners;
00155     return PORT_OK;
00156   }
00157 
00165   PublisherBase::ReturnCode PublisherNew::write(const cdrMemoryStream& data,
00166                                                 unsigned long sec,
00167                                                 unsigned long usec)
00168   {
00169     RTC_PARANOID(("write()"));
00170 
00171     if (m_consumer == 0) { return PRECONDITION_NOT_MET; }
00172     if (m_buffer == 0) { return PRECONDITION_NOT_MET; }
00173     if (m_listeners == 0) { return PRECONDITION_NOT_MET; }
00174 
00175     if (m_retcode == CONNECTION_LOST)
00176       {
00177         RTC_DEBUG(("write(): connection lost."));
00178         return m_retcode;
00179       }
00180 
00181     if (m_retcode == SEND_FULL)
00182       {
00183         RTC_DEBUG(("write(): InPort buffer is full."));
00184         m_buffer->write(data, sec, usec);
00185         m_task->signal();
00186         return BUFFER_FULL;
00187       }
00188 
00189     assert(m_buffer != 0);
00190 
00191     onBufferWrite(data);
00192     CdrBufferBase::ReturnCode ret(m_buffer->write(data, sec, usec));
00193 
00194     m_task->signal();
00195     RTC_DEBUG(("%s = write()", CdrBufferBase::toString(ret)));
00196 
00197     return convertReturn(ret, data);
00198   }
00199 
00207   bool PublisherNew::isActive()
00208   {
00209     return m_active;
00210   }
00211 
00219   PublisherBase::ReturnCode PublisherNew::activate()
00220   {
00221     m_active = true;
00222     return PORT_OK;
00223   }
00224 
00232   PublisherBase::ReturnCode PublisherNew::deactivate()
00233   {
00234     m_active = false;
00235     return PORT_OK;
00236   }
00237   
00245   int PublisherNew::svc(void)
00246   {
00247     
00248     Guard guard(m_retmutex);
00249     switch (m_pushPolicy)
00250       {
00251       case ALL:
00252         m_retcode = pushAll();
00253         break;
00254       case FIFO:
00255         m_retcode = pushFifo();
00256         break;
00257       case SKIP:
00258         m_retcode = pushSkip();
00259         break;
00260       case NEW:
00261         m_retcode = pushNew();
00262         break;
00263       default:
00264         m_retcode = pushNew();
00265         break;
00266       }
00267     return 0;
00268   }
00269 
00277   void PublisherNew::setPushPolicy(const coil::Properties& prop)
00278   {
00279     // push_policy default: NEW
00280     std::string push_policy = prop.getProperty("publisher.push_policy", "new");
00281     RTC_DEBUG(("push_policy: %s", push_policy.c_str()));
00282 
00283     coil::normalize(push_policy);
00284     if      (push_policy == "all")  { m_pushPolicy = ALL;  }
00285     else if (push_policy == "fifo") { m_pushPolicy = FIFO; }
00286     else if (push_policy == "skip") { m_pushPolicy = SKIP; }
00287     else if (push_policy == "new")  { m_pushPolicy = NEW;  }
00288     else
00289       {
00290         RTC_ERROR(("invalid push_policy value: %s", push_policy.c_str()));
00291         m_pushPolicy = NEW;     // default push policy
00292       }
00293 
00294     // skip_count default: 0
00295     std::string skip_count = prop.getProperty("publisher.skip_count", "0");
00296     RTC_DEBUG(("skip_count: %s", skip_count.c_str()));
00297 
00298     if (!coil::stringTo(m_skipn, skip_count.c_str()))
00299       {
00300         RTC_ERROR(("invalid skip_count value: %s", skip_count.c_str()));
00301         m_skipn = 0;           // default skip count
00302       }
00303     if (m_skipn < 0)
00304       {
00305         RTC_ERROR(("invalid skip_count value: %d", m_skipn));
00306         m_skipn = 0;           // default skip count
00307       }
00308   }
00309 
00317   bool PublisherNew::createTask(const coil::Properties& prop)
00318   {
00319     RTC::PeriodicTaskFactory& factory(RTC::PeriodicTaskFactory::instance());
00320     coil::vstring th = factory.getIdentifiers();
00321     RTC_DEBUG(("available task types: %s", coil::flatten(th).c_str()));
00322 
00323     m_task = factory.createObject(prop.getProperty("thread_type", "default"));
00324     if (m_task == 0)
00325       {
00326         RTC_ERROR(("Task creation failed: %s",
00327                    prop.getProperty("thread_type", "default").c_str()));
00328         return false;
00329       }
00330     RTC_PARANOID(("Task creation succeeded."));
00331 
00332     // setting task function
00333     m_task->setTask(this, &PublisherNew::svc);
00334     m_task->setPeriod(0.0);
00335     m_task->executionMeasure(coil::toBool(prop["measurement.exec_time"],
00336                                     "enable", "disable", true));
00337     
00338     int ecount;
00339     if (coil::stringTo(ecount, prop["measurement.exec_count"].c_str()))
00340       {
00341         m_task->executionMeasureCount(ecount);
00342       }
00343 
00344     m_task->periodicMeasure(coil::toBool(prop["measurement.period_time"],
00345                                    "enable", "disable", true));
00346     int pcount;
00347     if (coil::stringTo(pcount, prop["measurement.period_count"].c_str()))
00348       {
00349         m_task->periodicMeasureCount(pcount);
00350       }
00351 
00352     // Start task in suspended mode
00353     m_task->suspend();
00354     m_task->activate();
00355     m_task->suspend();
00356 
00357     return true;
00358   }
00359 
00360 
00364   PublisherNew::ReturnCode PublisherNew::pushAll()
00365   {
00366     RTC_TRACE(("pushAll()"));
00367 
00368     while (m_buffer->readable() > 0)
00369       {
00370         cdrMemoryStream& cdr(m_buffer->get());
00371         onBufferRead(cdr);
00372         
00373         onSend(cdr);
00374         ReturnCode ret(m_consumer->put(cdr));
00375         if (ret != PORT_OK)
00376           {
00377             RTC_DEBUG(("%s = consumer.put()", DataPortStatus::toString(ret)));
00378             return invokeListener(ret, cdr);
00379           }
00380         onReceived(cdr);
00381         
00382         m_buffer->advanceRptr();
00383       }
00384     return PORT_OK;
00385   }
00386 
00390   PublisherNew::ReturnCode PublisherNew::pushFifo()
00391   {
00392     RTC_TRACE(("pushFifo()"));
00393 
00394     cdrMemoryStream& cdr(m_buffer->get());
00395     onBufferRead(cdr);
00396     
00397     onSend(cdr);
00398     ReturnCode ret(m_consumer->put(cdr));
00399     if (ret != PORT_OK)
00400       {
00401         RTC_DEBUG(("%s = consumer.put()", DataPortStatus::toString(ret)));
00402         return invokeListener(ret, cdr);
00403       }
00404     onReceived(cdr);
00405     
00406     m_buffer->advanceRptr();
00407     
00408     return PORT_OK;
00409   }
00410 
00414   PublisherNew::ReturnCode PublisherNew::pushSkip()
00415   {
00416     RTC_TRACE(("pushSkip()"));
00417 
00418     ReturnCode ret(PORT_OK);
00419     int preskip(m_buffer->readable() + m_leftskip);
00420     int loopcnt(preskip/(m_skipn +1));
00421     int postskip(m_skipn - m_leftskip);
00422     for (int i(0); i < loopcnt; ++i)
00423       {
00424         m_buffer->advanceRptr(postskip);
00425         
00426         const cdrMemoryStream& cdr(m_buffer->get());
00427         onBufferRead(cdr);
00428         
00429         onSend(cdr);
00430         ret = m_consumer->put(cdr);
00431         if (ret != PORT_OK)
00432           {
00433             m_buffer->advanceRptr(-postskip);
00434             RTC_DEBUG(("%s = consumer.put()", DataPortStatus::toString(ret)));
00435             return invokeListener(ret, cdr);
00436           }
00437         onReceived(cdr);
00438         postskip = m_skipn + 1;
00439       }
00440     m_buffer->advanceRptr(m_buffer->readable());
00441     if (loopcnt == 0)
00442       {  // Not put
00443         m_leftskip = preskip % (m_skipn +1);
00444       }
00445     else
00446       {
00447         if ( m_retcode != PORT_OK )
00448           {  // put Error after 
00449             m_leftskip = 0;
00450           }
00451         else
00452           {  // put OK after
00453             m_leftskip = preskip % (m_skipn +1);
00454           }
00455       }
00456     return ret;
00457   }
00458 
00462   PublisherNew::ReturnCode PublisherNew::pushNew()
00463   {
00464     RTC_TRACE(("pushNew()"));
00465 
00466     m_buffer->advanceRptr(m_buffer->readable() - 1);
00467         
00468     cdrMemoryStream& cdr(m_buffer->get());
00469     onBufferRead(cdr);
00470 
00471     onSend(cdr);
00472     ReturnCode ret(m_consumer->put(cdr));
00473     if (ret != PORT_OK)
00474       {
00475         RTC_DEBUG(("%s = consumer.put()", DataPortStatus::toString(ret)));
00476         return invokeListener(ret, cdr);
00477       }
00478     onReceived(cdr);
00479     
00480     m_buffer->advanceRptr();
00481     
00482     return PORT_OK;
00483   }
00484 
00492   PublisherBase::ReturnCode
00493   PublisherNew::convertReturn(BufferStatus::Enum status,
00494                               const cdrMemoryStream& data)
00495   {
00496     /*
00497      * BufferStatus -> DataPortStatus
00498      *
00499      * BUFFER_OK     -> PORT_OK
00500      * BUFFER_ERROR  -> BUFFER_ERROR
00501      * BUFFER_FULL   -> BUFFER_FULL
00502      * NOT_SUPPORTED -> PORT_ERROR
00503      * TIMEOUT       -> BUFFER_TIMEOUT
00504      * PRECONDITION_NOT_MET -> PRECONDITION_NOT_MET
00505      */
00506     switch (status)
00507       {
00508       case BufferStatus::BUFFER_OK:
00509         // no callback
00510         return DataPortStatus::PORT_OK;
00511       case BufferStatus::BUFFER_ERROR:
00512         // no callback
00513         return DataPortStatus::BUFFER_ERROR;
00514       case BufferStatus::BUFFER_FULL:
00515         onBufferFull(data);
00516         return DataPortStatus::BUFFER_FULL;
00517       case BufferStatus::NOT_SUPPORTED:
00518         // no callback
00519         return DataPortStatus::PORT_ERROR;
00520       case BufferStatus::TIMEOUT:
00521         onBufferWriteTimeout(data);
00522         return DataPortStatus::BUFFER_TIMEOUT;
00523       case BufferStatus::PRECONDITION_NOT_MET:
00524         // no callback
00525         return DataPortStatus::PRECONDITION_NOT_MET;
00526       default:
00527         // no callback
00528         return DataPortStatus::PORT_ERROR;
00529       }
00530     return DataPortStatus::PORT_ERROR;
00531   }
00532 
00540   PublisherNew::ReturnCode
00541   PublisherNew::invokeListener(DataPortStatus::Enum status,
00542                                const cdrMemoryStream& data)
00543   {
00544     // ret:
00545     // PORT_OK, PORT_ERROR, SEND_FULL, SEND_TIMEOUT, CONNECTION_LOST,
00546     // UNKNOWN_ERROR
00547     switch (status)
00548       {
00549       case PORT_ERROR:
00550         onReceiverError(data);
00551         return PORT_ERROR;
00552         
00553       case SEND_FULL:
00554         onReceiverFull(data);
00555         return SEND_FULL;
00556         
00557       case SEND_TIMEOUT:
00558         onReceiverTimeout(data);
00559         return SEND_TIMEOUT;
00560         
00561       case CONNECTION_LOST:
00562         onReceiverError(data);
00563         return CONNECTION_LOST;
00564         
00565       case UNKNOWN_ERROR:
00566         onReceiverError(data);
00567         return UNKNOWN_ERROR;
00568         
00569       default:
00570         onReceiverError(data);
00571         return PORT_ERROR;
00572       }
00573   }
00574 
00575 }; // namespace RTC
00576 
00577 extern "C"
00578 {
00579   void PublisherNewInit()
00580   {
00581     ::RTC::PublisherFactory::
00582       instance().addFactory("new",
00583                             ::coil::Creator< ::RTC::PublisherBase,
00584                                              ::RTC::PublisherNew>,
00585                             ::coil::Destructor< ::RTC::PublisherBase,
00586                                                 ::RTC::PublisherNew>);
00587   }
00588 };


openrtm_aist
Author(s): Noriaki Ando
autogenerated on Sun Mar 26 2017 03:37:17