PublisherPeriodic.cpp
Go to the documentation of this file.
00001 // -*- C++ -*-
00019 #include <rtm/RTC.h>
00020 #include <coil/Properties.h>
00021 #include <coil/Time.h>
00022 #include <coil/stringutil.h>
00023 #include <rtm/PublisherPeriodic.h>
00024 #include <rtm/InPortConsumer.h>
00025 #include <rtm/RTC.h>
00026 #include <stdlib.h>
00027 #include <rtm/idl/DataPortSkel.h>
00028 #include <rtm/PeriodicTaskFactory.h>
00029 #include <rtm/SystemLogger.h>
00030 
00031 namespace RTC
00032 {
00040   PublisherPeriodic::PublisherPeriodic()
00041     : rtclog("PublisherPeriodic"),
00042       m_consumer(0), m_buffer(0), m_task(0), m_listeners(0),
00043       m_retcode(PORT_OK), m_pushPolicy(NEW),
00044       m_skipn(0), m_active(false), m_readback(false), m_leftskip(0)
00045   {
00046   }
00047   
00048 
00056   PublisherPeriodic::~PublisherPeriodic()
00057   {
00058     RTC_TRACE(("~PublisherPeriodic()"));
00059     if (m_task != 0)
00060       {
00061         m_task->resume();
00062         m_task->finalize();
00063         RTC_PARANOID(("task finalized."));
00064 
00065         RTC::PeriodicTaskFactory::instance().deleteObject(m_task);
00066         RTC_PARANOID(("task deleted."));
00067       }
00068 
00069     // "consumer" should be deleted in the Connector
00070     m_consumer = 0;
00071     // "buffer"   should be deleted in the Connector
00072     m_buffer = 0;
00073   }
00074 
00082   PublisherBase::ReturnCode PublisherPeriodic::init(coil::Properties& prop)
00083   {
00084     RTC_TRACE(("init()"));
00085     RTC_DEBUG_STR((prop));
00086 
00087     setPushPolicy(prop);
00088     if (!createTask(prop))
00089       {
00090         return INVALID_ARGS;
00091       }
00092     return PORT_OK;
00093 
00094 
00095 
00096 
00097 
00098     return PORT_OK;
00099   }
00100   
00108   PublisherBase::ReturnCode
00109   PublisherPeriodic::setConsumer(InPortConsumer* consumer)
00110   {
00111     RTC_TRACE(("setConsumer()"));
00112 
00113     if (consumer == 0)
00114       {
00115         RTC_ERROR(("setConsumer(consumer = 0): invalid argument."));
00116         return INVALID_ARGS;
00117       }
00118     m_consumer = consumer;
00119     return PORT_OK;
00120   }
00121 
00129   PublisherBase::ReturnCode PublisherPeriodic::setBuffer(CdrBufferBase* buffer)
00130   {
00131     RTC_TRACE(("setBuffer()"));
00132 
00133     if (buffer == 0)
00134       {
00135         RTC_ERROR(("setBuffer(buffer == 0): invalid argument"));
00136         return INVALID_ARGS;
00137       }
00138     m_buffer = buffer;
00139     return PORT_OK;
00140   }
00141 
00149   PublisherBase::ReturnCode
00150   PublisherPeriodic::setListener(ConnectorInfo& info,
00151                                  ConnectorListeners* listeners)
00152   {
00153     RTC_TRACE(("setListeners()"));
00154 
00155     if (listeners == 0)
00156       {
00157         RTC_ERROR(("setListeners(listeners == 0): invalid argument"));
00158         return INVALID_ARGS;
00159       }
00160     m_profile = info;
00161     m_listeners = listeners;
00162     return PORT_OK;
00163   }
00164 
00172   PublisherBase::ReturnCode
00173   PublisherPeriodic::write(const cdrMemoryStream& data,
00174                            unsigned long sec,
00175                            unsigned long usec)
00176   {
00177     RTC_PARANOID(("write()"));
00178 
00179     if (m_consumer == 0) { return PRECONDITION_NOT_MET; }
00180     if (m_buffer == 0) { return PRECONDITION_NOT_MET; }
00181     if (m_listeners == 0) { return PRECONDITION_NOT_MET; }
00182 
00183     if (m_retcode == CONNECTION_LOST)
00184       {
00185         RTC_DEBUG(("write(): connection lost."));
00186         return m_retcode;
00187       }
00188 
00189     if (m_retcode == SEND_FULL)
00190       {
00191         RTC_DEBUG(("write(): InPort buffer is full."));
00192         m_buffer->write(data, sec, usec);
00193         return BUFFER_FULL;
00194       }
00195 
00196     onBufferWrite(data);
00197     CdrBufferBase::ReturnCode ret(m_buffer->write(data, sec, usec));
00198     RTC_DEBUG(("%s = write()", CdrBufferBase::toString(ret)));
00199     m_task->resume();
00200     return convertReturn(ret, data);
00201   }
00202 
00210   bool PublisherPeriodic::isActive()
00211   {
00212     return m_active;
00213   }
00214 
00222   PublisherBase::ReturnCode PublisherPeriodic::activate()
00223   {
00224     if (m_task == 0) { return PRECONDITION_NOT_MET; }
00225     if (m_buffer == 0) { return PRECONDITION_NOT_MET; }
00226     m_active = true;
00227     m_task->resume();
00228     return PORT_OK;
00229   }
00230 
00238   PublisherBase::ReturnCode PublisherPeriodic::deactivate()
00239   {
00240     if (m_task == 0) { return PRECONDITION_NOT_MET; }
00241     m_active = false;
00242     m_task->suspend();
00243     return PORT_OK;
00244   }
00245 
00253   int PublisherPeriodic::svc(void)
00254   {
00255     Guard guard(m_retmutex);
00256     switch (m_pushPolicy)
00257       {
00258       case ALL:
00259         m_retcode = pushAll();
00260         break;
00261       case FIFO:
00262         m_retcode = pushFifo();
00263         break;
00264       case SKIP:
00265         m_retcode = pushSkip();
00266         break;
00267       case NEW:
00268         m_retcode = pushNew();
00269         break;
00270       default:
00271         m_retcode = pushNew();
00272         break;
00273       }
00274     return 0;
00275   }
00276   
00280   PublisherBase::ReturnCode PublisherPeriodic::pushAll()
00281   {
00282     RTC_TRACE(("pushAll()"));
00283     if (bufferIsEmpty()) { return BUFFER_EMPTY; }
00284 
00285     while (m_buffer->readable() > 0)
00286       {
00287         const cdrMemoryStream& cdr(m_buffer->get());
00288         onBufferRead(cdr);
00289 
00290         onSend(cdr);
00291         ReturnCode ret(m_consumer->put(cdr));
00292         if (ret != PORT_OK)
00293           {
00294             RTC_DEBUG(("%s = consumer.put()", DataPortStatus::toString(ret)));
00295             return invokeListener(ret, cdr);
00296           }
00297         onReceived(cdr);
00298 
00299         m_buffer->advanceRptr();
00300 
00301       }
00302     return PORT_OK;
00303    }
00304 
00308   PublisherBase::ReturnCode PublisherPeriodic::pushFifo()
00309   {
00310     RTC_TRACE(("pushFifo()"));
00311     if (bufferIsEmpty()) { return BUFFER_EMPTY; }
00312 
00313     const cdrMemoryStream& cdr(m_buffer->get());
00314     onBufferRead(cdr);
00315 
00316     onSend(cdr);
00317     ReturnCode ret(m_consumer->put(cdr));
00318     
00319     if (ret != PORT_OK)
00320       {
00321         RTC_DEBUG(("%s = consumer.put()", DataPortStatus::toString(ret)));
00322         return invokeListener(ret, cdr);
00323       }
00324     onReceived(cdr);
00325 
00326     m_buffer->advanceRptr();
00327     
00328     return PORT_OK;
00329   }
00330 
00334   PublisherBase::ReturnCode PublisherPeriodic::pushSkip()
00335   {
00336     RTC_TRACE(("pushSkip()"));
00337     if (bufferIsEmpty()) { return BUFFER_EMPTY; }
00338 
00339     ReturnCode ret(PORT_OK);
00340     int readable = m_buffer->readable();
00341     int preskip(readable + m_leftskip);
00342     int loopcnt(preskip/(m_skipn +1));
00343     int postskip(m_skipn - m_leftskip);
00344     for (int i(0); i < loopcnt; ++i)
00345       {
00346         m_buffer->advanceRptr(postskip);
00347         readable -= postskip;
00348 
00349         const cdrMemoryStream& cdr(m_buffer->get());
00350         onBufferRead(cdr);
00351 
00352         onSend(cdr);
00353         ret = m_consumer->put(cdr);
00354         if (ret != PORT_OK)
00355           {
00356             m_buffer->advanceRptr(-postskip);
00357             RTC_DEBUG(("%s = consumer.put()", DataPortStatus::toString(ret)));
00358             return invokeListener(ret, cdr);
00359           }
00360         onReceived(cdr);
00361         postskip = m_skipn + 1;
00362       }
00363 
00364     m_buffer->advanceRptr(readable);
00365     m_leftskip = preskip % (m_skipn +1);
00366 
00367     return ret;
00368   }
00369 
00373   PublisherBase::ReturnCode PublisherPeriodic::pushNew()
00374   {
00375     RTC_TRACE(("pushNew()"));
00376     if (bufferIsEmpty()) { return BUFFER_EMPTY; }
00377     
00378     // In case of the periodic/push_new policy, the buffer should
00379     // allow readback. But, readback flag should be set as "true"
00380     // after written at least one datum into the buffer.
00381     m_readback = true;
00382     m_buffer->advanceRptr(m_buffer->readable() - 1);
00383     
00384     const cdrMemoryStream& cdr(m_buffer->get());
00385     onBufferRead(cdr);
00386 
00387     onSend(cdr);
00388     ReturnCode ret(m_consumer->put(cdr));
00389     if (ret != PORT_OK)
00390       {
00391         RTC_DEBUG(("%s = consumer.put()", DataPortStatus::toString(ret)));
00392         return invokeListener(ret, cdr);
00393       }
00394     onReceived(cdr);
00395 
00396     m_buffer->advanceRptr();
00397 
00398     return PORT_OK;
00399   }
00400 
00408   void PublisherPeriodic::setPushPolicy(const coil::Properties& prop)
00409   {
00410     // push_policy default: NEW
00411     std::string push_policy = prop.getProperty("publisher.push_policy", "new");
00412     RTC_DEBUG(("push_policy: %s", push_policy.c_str()));
00413 
00414     coil::normalize(push_policy);
00415     if      (push_policy == "all")  { m_pushPolicy = ALL;  }
00416     else if (push_policy == "fifo") { m_pushPolicy = FIFO; }
00417     else if (push_policy == "skip") { m_pushPolicy = SKIP; }
00418     else if (push_policy == "new")  { m_pushPolicy = NEW;  }
00419     else
00420       {
00421         RTC_ERROR(("invalid push_policy value: %s", push_policy.c_str()));
00422         m_pushPolicy = NEW;     // default push policy
00423       }
00424 
00425     // skip_count default: 0
00426     std::string skip_count = prop.getProperty("publisher.skip_count", "0");
00427     RTC_DEBUG(("skip_count: %s", skip_count.c_str()));
00428 
00429     if (!coil::stringTo(m_skipn, skip_count.c_str()))
00430       {
00431         RTC_ERROR(("invalid skip_count value: %s", skip_count.c_str()));
00432         m_skipn = 0;           // desfault skip count
00433       }
00434     if (m_skipn < 0)
00435       {
00436         RTC_ERROR(("invalid skip_count value: %d", m_skipn));
00437         m_skipn = 0;           // default skip count
00438       }
00439   }
00440 
00448   bool PublisherPeriodic::createTask(const coil::Properties& prop)
00449   {
00450     RTC::PeriodicTaskFactory& factory(RTC::PeriodicTaskFactory::instance());
00451 
00452     // Creating and setting task object
00453     coil::vstring th = factory.getIdentifiers();
00454     RTC_DEBUG(("available task types: %s", coil::flatten(th).c_str()));
00455 
00456     m_task = factory.createObject(prop.getProperty("thread_type", "default"));
00457     if (m_task == 0)
00458       {
00459         RTC_ERROR(("Task creation failed: %s",
00460                    prop.getProperty("thread_type", "default").c_str()));
00461         return INVALID_ARGS;
00462       }
00463     m_task->setTask(this, &PublisherPeriodic::svc);
00464     RTC_PARANOID(("Task creation succeeded."));
00465 
00466     // Extracting publisher's period time
00467     double hz;
00468     if (!coil::stringTo(hz, prop["publisher.push_rate"].c_str()) &&
00469         !coil::stringTo(hz, prop["push_rate"].c_str())) // for 0.4 compatibility
00470       {
00471         RTC_ERROR(("publisher.push_rate/push_rate were not found."));
00472         return false;
00473       }
00474 
00475     if (hz <= 0)
00476       {
00477         RTC_ERROR(("invalid period: %f [s]", hz));
00478         return false;
00479       }
00480     m_task->setPeriod(1.0/hz);
00481     
00482     // Setting task measurement function
00483     m_task->executionMeasure(coil::toBool(prop["measurement.exec_time"],
00484                                           "enable", "disable", true));
00485     
00486     int ecount;
00487     if (coil::stringTo(ecount, prop["measurement.exec_count"].c_str()))
00488       {
00489         m_task->executionMeasureCount(ecount);
00490       }
00491 
00492     m_task->periodicMeasure(coil::toBool(prop["measurement.period_time"],
00493                                    "enable", "disable", true));
00494     int pcount;
00495     if (coil::stringTo(pcount, prop["measurement.period_count"].c_str()))
00496       {
00497         m_task->periodicMeasureCount(pcount);
00498       }
00499 
00500     // Start task in suspended mode
00501     m_task->suspend();
00502     m_task->activate();
00503     m_task->suspend();
00504 
00505     return true;
00506   }
00507 
00515   PublisherBase::ReturnCode
00516   PublisherPeriodic::convertReturn(BufferStatus::Enum status,
00517                                    const cdrMemoryStream& data)
00518   {
00519     /*
00520      * BufferStatus -> DataPortStatus
00521      *
00522      * BUFFER_OK     -> PORT_OK
00523      * BUFFER_ERROR  -> BUFFER_ERROR
00524      * BUFFER_FULL   -> BUFFER_FULL
00525      * NOT_SUPPORTED -> PORT_ERROR
00526      * TIMEOUT       -> BUFFER_TIMEOUT
00527      * PRECONDITION_NOT_MET -> PRECONDITION_NOT_MET
00528      */
00529     switch (status)
00530       {
00531       case BufferStatus::BUFFER_OK:
00532         // no callback
00533         return DataPortStatus::PORT_OK;
00534       case BufferStatus::BUFFER_ERROR:
00535         // no callback
00536         return DataPortStatus::BUFFER_ERROR;
00537       case BufferStatus::BUFFER_FULL:
00538         onBufferFull(data);
00539         return DataPortStatus::BUFFER_FULL;
00540       case BufferStatus::NOT_SUPPORTED:
00541         // no callback
00542         return DataPortStatus::PORT_ERROR;
00543       case BufferStatus::TIMEOUT:
00544         onBufferWriteTimeout(data);
00545         return DataPortStatus::BUFFER_TIMEOUT;
00546       case BufferStatus::PRECONDITION_NOT_MET:
00547         // no callback
00548         return DataPortStatus::PRECONDITION_NOT_MET;
00549       default:
00550         // no callback
00551         return DataPortStatus::PORT_ERROR;
00552       }
00553     return DataPortStatus::PORT_ERROR;
00554   }
00555 
00563   PublisherPeriodic::ReturnCode
00564   PublisherPeriodic::invokeListener(DataPortStatus::Enum status,
00565                                     const cdrMemoryStream& data)
00566   {
00567     // ret:
00568     // PORT_OK, PORT_ERROR, SEND_FULL, SEND_TIMEOUT, CONNECTION_LOST,
00569     // UNKNOWN_ERROR
00570     switch (status)
00571       {
00572       case PORT_ERROR:
00573         onReceiverError(data);
00574         return PORT_ERROR;
00575         
00576       case SEND_FULL:
00577         onReceiverFull(data);
00578         return SEND_FULL;
00579         
00580       case SEND_TIMEOUT:
00581         onReceiverTimeout(data);
00582         return SEND_TIMEOUT;
00583         
00584       case CONNECTION_LOST:
00585         onReceiverError(data);
00586         return CONNECTION_LOST;
00587         
00588       case UNKNOWN_ERROR:
00589         onReceiverError(data);
00590         return UNKNOWN_ERROR;
00591         
00592       default:
00593         onReceiverError(data);
00594         return PORT_ERROR;
00595       }
00596   }
00597 
00598 }; // namespace RTC
00599 
00600 extern "C"
00601 {
00602   void PublisherPeriodicInit()
00603   {
00604     ::RTC::PublisherFactory::
00605       instance().addFactory("periodic",
00606                             ::coil::Creator< ::RTC::PublisherBase,
00607                                              ::RTC::PublisherPeriodic>,
00608                             ::coil::Destructor< ::RTC::PublisherBase,
00609                                                 ::RTC::PublisherPeriodic>);
00610   }
00611 };


openrtm_aist
Author(s): Noriaki Ando
autogenerated on Thu Aug 27 2015 14:16:38