ReportingComponent.cpp
Go to the documentation of this file.
00001 /***************************************************************************
00002   tag: Peter Soetens  Mon May 10 19:10:38 CEST 2004  ReportingComponent.cxx
00003 
00004                         ReportingComponent.cxx -  description
00005                            -------------------
00006     begin                : Mon May 10 2004
00007     copyright            : (C) 2004 Peter Soetens
00008     email                : peter.soetens@mech.kuleuven.ac.be
00009 
00010  ***************************************************************************
00011  *   This library is free software; you can redistribute it and/or         *
00012  *   modify it under the terms of the GNU Lesser General Public            *
00013  *   License as published by the Free Software Foundation; either          *
00014  *   version 2.1 of the License, or (at your option) any later version.    *
00015  *                                                                         *
00016  *   This library is distributed in the hope that it will be useful,       *
00017  *   but WITHOUT ANY WARRANTY; without even the implied warranty of        *
00018  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU     *
00019  *   Lesser General Public License for more details.                       *
00020  *                                                                         *
00021  *   You should have received a copy of the GNU Lesser General Public      *
00022  *   License along with this library; if not, write to the Free Software   *
00023  *   Foundation, Inc., 59 Temple Place,                                    *
00024  *   Suite 330, Boston, MA  02111-1307  USA                                *
00025  *                                                                         *
00026  ***************************************************************************/
00027 
00028 #include "ReportingComponent.hpp"
00029 #include <rtt/Logger.hpp>
00030 
00031 // Impl.
00032 #include "EmptyMarshaller.hpp"
00033 #include <rtt/marsh/PropertyDemarshaller.hpp>
00034 #include <rtt/marsh/PropertyMarshaller.hpp>
00035 #include <iostream>
00036 #include <fstream>
00037 #include <exception>
00038 #include <boost/algorithm/string.hpp>
00039 
00040 #include "ocl/Component.hpp"
00041 #include <rtt/types/PropertyDecomposition.hpp>
00042 #include <boost/lexical_cast.hpp>
00043 
00044 ORO_CREATE_COMPONENT_TYPE()
00045 
00046 
00047 namespace OCL
00048 {
00049     using namespace std;
00050     using namespace RTT;
00051     using namespace RTT::detail;
00052 
00055     class CheckSizeDataSource : public ValueDataSource<bool>
00056     {
00057         mutable int msize;
00058         DataSource<int>::shared_ptr mds;
00059         DataSource<bool>::shared_ptr mupstream;
00060     public:
00061         CheckSizeDataSource(int size, DataSource<int>::shared_ptr ds, DataSource<bool>::shared_ptr upstream)
00062             : msize(size), mds(ds), mupstream(upstream)
00063         {}
00067         bool get() const{
00068             // it's very important to first check upstream, because
00069             // if upstream changed size, downstream might already be corrupt !
00070             // (downstream will be corrupt upon capacity changes upstream)
00071             bool result = true;
00072             if (mupstream)
00073                 result = (mupstream->get() && msize == mds->get());
00074             else
00075                 result = (msize == mds->get());
00076             msize = mds->get();
00077             return result;
00078         }
00079     };
00080 
00087     bool memberDecomposition( base::DataSourceBase::shared_ptr dsb, PropertyBag& targetbag, DataSource<bool>::shared_ptr& resized)
00088     {
00089         assert(dsb);
00090 
00091         vector<string> parts = dsb->getMemberNames();
00092         if ( parts.empty() ) {
00093             return false;
00094         }
00095 
00096         targetbag.setType( dsb->getTypeName() );
00097 
00098         // needed for recursion.
00099         auto_ptr< Property<PropertyBag> > recurse_bag( new Property<PropertyBag>("recurse_bag","") );
00100         // First at the explicitly listed parts:
00101         for(vector<string>::iterator it = parts.begin(); it != parts.end(); ++it ) {
00102             // we first force getMember to get to the type, then we do it again but with a reference set.
00103             DataSourceBase::shared_ptr part = dsb->getMember( *it );
00104             if (!part) {
00105                 log(Error) <<"memberDecomposition: Inconsistent type info for "<< dsb->getTypeName() << ": reported to have part '"<<*it<<"' but failed to return it."<<endlog();
00106                 continue;
00107             }
00108             if ( !part->isAssignable() ) {
00109                 // For example: the case for size() and capacity() in SequenceTypeInfo
00110                 log(Debug)<<"memberDecomposition: Part "<< *it << ":"<< part->getTypeName() << " is not changeable."<<endlog();
00111                 continue;
00112             }
00113             // now the reference magic:
00114             DataSourceBase::shared_ptr ref = part->getTypeInfo()->buildReference( 0 );
00115             dsb->getTypeInfo()->getMember( dynamic_cast<Reference*>(ref.get() ), dsb, *it); // fills in ref
00116             // newpb will contain a reference to the port's datasource data !
00117             PropertyBase* newpb = part->getTypeInfo()->buildProperty(*it,"Part", ref);
00118             if ( !newpb ) {
00119                 log(Error)<< "Decomposition failed because Part '"<<*it<<"' is not known to type system."<<endlog();
00120                 continue;
00121             }
00122             // finally recurse or add it to the target bag:
00123             if ( !memberDecomposition( ref, recurse_bag->value(), resized) ) {
00124                 assert( recurse_bag->value().empty() );
00125                 // finally: check for conversions (enums use this):
00126                 base::DataSourceBase::shared_ptr converted = newpb->getTypeInfo()->convertType( dsb );
00127                 if ( converted && converted != dsb ) {
00128                     // converted contains another type.
00129                     targetbag.add( converted->getTypeInfo()->buildProperty(*it, "", converted) );
00130                     delete newpb;
00131                 } else
00132                     targetbag.ownProperty( newpb ); // leaf
00133             } else {
00134                 recurse_bag->setName(*it);
00135                 // setType() is done by recursive of self.
00136                 targetbag.ownProperty( recurse_bag.release() ); //recursed.
00137                 recurse_bag.reset( new Property<PropertyBag>("recurse_bag","") );
00138                 delete newpb; // since we recursed, the recurse_bag now 'embodies' newpb.
00139             }
00140         }
00141 
00142         // Next get the numbered parts. This is much more involved since sequences may be resizable.
00143         // We keep track of the size, and if that changes, we will have to force a re-decomposition
00144         // of the sequence's internals.
00145         DataSource<int>::shared_ptr size = DataSource<int>::narrow( dsb->getMember("size").get() );
00146         if (size) {
00147             int msize = size->get();
00148             for (int i=0; i < msize; ++i) {
00149                 string indx = boost::lexical_cast<string>( i );
00150                 DataSourceBase::shared_ptr item = dsb->getMember(indx);
00151                 resized = new CheckSizeDataSource( msize, size, resized );
00152                 if (item) {
00153                     if ( !item->isAssignable() ) {
00154                         // For example: the case for size() and capacity() in SequenceTypeInfo
00155                         log(Warning)<<"memberDecomposition: Item '"<< indx << "' of type "<< dsb->getTypeName() << " is not changeable."<<endlog();
00156                         continue;
00157                     }
00158                     // finally recurse or add it to the target bag:
00159                     PropertyBase* newpb = item->getTypeInfo()->buildProperty( indx,"",item);
00160                     if ( !memberDecomposition( item, recurse_bag->value(), resized) ) {
00161                         targetbag.ownProperty( newpb ); // leaf
00162                     } else {
00163                         delete newpb;
00164                         recurse_bag->setName( indx );
00165                         // setType() is done by recursive of self.
00166                         targetbag.ownProperty( recurse_bag.release() ); //recursed.
00167                         recurse_bag.reset( new Property<PropertyBag>("recurse_bag","") );
00168                     }
00169                 }
00170             }
00171         }
00172         if (targetbag.empty() )
00173             log(Debug) << "memberDecomposition: "<<  dsb->getTypeName() << " returns an empty property bag." << endlog();
00174         return true;
00175     }
00176 
00177   ReportingComponent::ReportingComponent( std::string name /*= "Reporting" */ )
00178         : TaskContext( name ),
00179           report("Report"), snapshotted(false),
00180           writeHeader("WriteHeader","Set to true to start each report with a header.", true),
00181           decompose("Decompose","Set to false in order to not decompose the port data. The marshaller must be able to handle this itself for this to work.", true),
00182           insnapshot("Snapshot","Set to true to enable snapshot mode. This will cause a non-periodic reporter to only report data upon the snapshot() operation.",false),
00183           synchronize_with_logging("Synchronize","Set to true if the timestamp should be synchronized with the logging",false),
00184           report_data("ReportData","A PropertyBag which defines which ports or components to report."),
00185           report_policy( ConnPolicy::data(ConnPolicy::LOCK_FREE,true,false) ),
00186           onlyNewData(false),
00187           starttime(0),
00188           timestamp("TimeStamp","The time at which the data was read.",0.0)
00189     {
00190         this->provides()->doc("Captures data on data ports. A periodic reporter will sample each added port according to its period, a non-periodic reporter will write out data as it comes in, or only during a snapshot() if the Snapshot property is true.");
00191 
00192         this->properties()->addProperty( writeHeader );
00193         this->properties()->addProperty( decompose );
00194         this->properties()->addProperty( insnapshot );
00195         this->properties()->addProperty( synchronize_with_logging);
00196         this->properties()->addProperty( report_data);
00197         this->properties()->addProperty( "ReportPolicy", report_policy).doc("The ConnPolicy for the reporter's port connections.");
00198         this->properties()->addProperty( "ReportOnlyNewData", onlyNewData).doc("Turn on in order to only write out NewData on ports and omit unchanged ports. Turn off in order to sample and write out all ports (even old data).");
00199         // Add the methods, methods make sure that they are
00200         // executed in the context of the (non realtime) caller.
00201 
00202         this->addOperation("snapshot", &ReportingComponent::snapshot , this, RTT::OwnThread).doc("Take a new shapshot of all data and cause them to be written out.");
00203         this->addOperation("screenComponent", &ReportingComponent::screenComponent , this, RTT::ClientThread).doc("Display the variables and ports of a Component.").arg("Component", "Name of the Component");
00204         this->addOperation("reportComponent", &ReportingComponent::reportComponent , this, RTT::ClientThread).doc("Add a peer Component and report all its data ports").arg("Component", "Name of the Component");
00205         this->addOperation("unreportComponent", &ReportingComponent::unreportComponent , this, RTT::ClientThread).doc("Remove all Component's data ports from reporting.").arg("Component", "Name of the Component");
00206         this->addOperation("reportData", &ReportingComponent::reportData , this, RTT::ClientThread).doc("Add a Component's Property or attribute for reporting.").arg("Component", "Name of the Component").arg("Data", "Name of the Data to report. A property's or attribute's name.");
00207         this->addOperation("unreportData", &ReportingComponent::unreportData , this, RTT::ClientThread).doc("Remove a Data object from reporting.").arg("Component", "Name of the Component").arg("Data", "Name of the property or attribute.");
00208         this->addOperation("reportPort", &ReportingComponent::reportPort , this, RTT::ClientThread).doc("Add a Component's OutputPort for reporting.").arg("Component", "Name of the Component").arg("Port", "Name of the Port.");
00209         this->addOperation("unreportPort", &ReportingComponent::unreportPort , this, RTT::ClientThread).doc("Remove a Port from reporting.").arg("Component", "Name of the Component").arg("Port", "Name of the Port.");
00210 
00211     }
00212 
00213     ReportingComponent::~ReportingComponent() {}
00214 
00215 
00216     bool ReportingComponent::addMarshaller( marsh::MarshallInterface* headerM, marsh::MarshallInterface* bodyM)
00217     {
00218         boost::shared_ptr<marsh::MarshallInterface> header(headerM);
00219         boost::shared_ptr<marsh::MarshallInterface> body(bodyM);
00220         if ( !header && !body)
00221             return false;
00222         if ( !header )
00223             header.reset( new EmptyMarshaller() );
00224         if ( !body)
00225             body.reset( new EmptyMarshaller());
00226 
00227         marshallers.push_back( std::make_pair( header, body ) );
00228         return true;
00229     }
00230 
00231     bool ReportingComponent::removeMarshallers()
00232     {
00233         marshallers.clear();
00234         return true;
00235     }
00236 
00237     void ReportingComponent::cleanupHook()
00238     {
00239         root.clear(); // uses shared_ptr.
00240         deletePropertyBag( report );
00241     }
00242 
00243     bool ReportingComponent::configureHook()
00244     {
00245         Logger::In in("ReportingComponent");
00246 
00247         // we make a copy to be allowed to iterate over and exted report_data:
00248         PropertyBag bag = report_data.value();
00249 
00250         if ( bag.empty() ) {
00251             log(Error) <<"No port or component configuration loaded."<<endlog();
00252             log(Error) <<"Please use marshalling.loadProperties(), reportComponent() (scripting) or LoadProperties (XML) in order to fill in ReportData." <<endlog();
00253             return false;
00254         }
00255 
00256         bool ok = true;
00257         PropertyBag::const_iterator it = bag.getProperties().begin();
00258         while ( it != bag.getProperties().end() )
00259             {
00260                 Property<std::string>* compName = dynamic_cast<Property<std::string>* >( *it );
00261                 if ( !compName )
00262                     log(Error) << "Expected Property \""
00263                                   << (*it)->getName() <<"\" to be of type string."<< endlog();
00264                 else if ( compName->getName() == "Component" ) {
00265                     std::string name = compName->value(); // we will delete this property !
00266                     this->unreportComponent( name );
00267                     ok &= this->reportComponent( name );
00268                 }
00269                 else if ( compName->getName() == "Port" ) {
00270                     string cname = compName->value().substr(0, compName->value().find("."));
00271                     string pname = compName->value().substr( compName->value().find(".")+1, string::npos);
00272                     if (cname.empty() || pname.empty() ) {
00273                         log(Error) << "The Port value '"<<compName->getName()<< "' must at least consist of a component name followed by a dot and the port name." <<endlog();
00274                         ok = false;
00275                         continue;
00276                     }
00277                     this->unreportPort(cname,pname);
00278                     ok &= this->reportPort(cname, pname);
00279                 }
00280                 else if ( compName->getName() == "Data" ) {
00281                     string cname = compName->value().substr(0, compName->value().find("."));
00282                     string pname = compName->value().substr( compName->value().find(".")+1, string::npos);
00283                     if (cname.empty() || pname.empty() ) {
00284                         log(Error) << "The Data value '"<<compName->getName()<< "' must at least consist of a component name followed by a dot and the property/attribute name." <<endlog();
00285                         ok = false;
00286                         continue;
00287                     }
00288                     this->unreportData(cname,pname);
00289                     ok &= this->reportData(cname, pname);
00290                 }
00291                 else {
00292                     log(Error) << "Expected \"Component\", \"Port\" or \"Data\", got "
00293                                   << compName->getName() << endlog();
00294                     ok = false;
00295                 }
00296                 ++it;
00297             }
00298         return ok;
00299     }
00300 
00301     bool ReportingComponent::screenComponent( const std::string& comp )
00302     {
00303         Logger::In in("ReportingComponent::screenComponent");
00304         log(Error) << "not implemented." <<comp<<endlog();
00305         return false;
00306     }
00307 
00308     bool ReportingComponent::screenImpl( const std::string& comp, std::ostream& output)
00309     {
00310         Logger::In in("ReportingComponent");
00311         TaskContext* c = this->getPeer(comp);
00312         if ( c == 0) {
00313             log(Error) << "Unknown Component: " <<comp<<endlog();
00314             return false;
00315         }
00316         output << "Screening Component '"<< comp << "' : "<< endl << endl;
00317         PropertyBag* bag = c->properties();
00318         if (bag) {
00319             output << "Properties :" << endl;
00320             for (PropertyBag::iterator it= bag->begin(); it != bag->end(); ++it)
00321                 output << "  " << (*it)->getName() << " : " << (*it)->getDataSource() << endl;
00322         }
00323         ConfigurationInterface::AttributeNames atts = c->provides()->getAttributeNames();
00324         if ( !atts.empty() ) {
00325             output << "Attributes :" << endl;
00326             for (ConfigurationInterface::AttributeNames::iterator it= atts.begin(); it != atts.end(); ++it)
00327                 output << "  " << *it << " : " << c->provides()->getValue(*it)->getDataSource() << endl;
00328         }
00329 
00330         vector<string> ports = c->ports()->getPortNames();
00331         if ( !ports.empty() ) {
00332             output << "Ports :" << endl;
00333             for (vector<string>::iterator it= ports.begin(); it != ports.end(); ++it) {
00334                 output << "  " << *it << " : ";
00335                 if (c->ports()->getPort(*it)->connected() )
00336                     output << "(connected)" << endl;
00337                 else
00338                     output << "(not connected)" << endl;
00339             }
00340         }
00341         return true;
00342     }
00343 
00344     bool ReportingComponent::reportComponent( const std::string& component ) {
00345         Logger::In in("ReportingComponent");
00346         // Users may add own data sources, so avoid duplicates
00347         //std::vector<std::string> sources                = comp->data()->getNames();
00348         TaskContext* comp = this->getPeer(component);
00349         if ( !comp ) {
00350             log(Error) << "Could not report Component " << component <<" : no such peer."<<endlog();
00351             return false;
00352         }
00353         if ( !report_data.value().findValue<string>(component) )
00354             report_data.value().ownProperty( new Property<string>("Component","",component) );
00355         Ports ports   = comp->ports()->getPorts();
00356         for (Ports::iterator it = ports.begin(); it != ports.end() ; ++it) {
00357             log(Debug) << "Checking port " << (*it)->getName()<<"."<<endlog();
00358             this->reportPort( component, (*it)->getName() );
00359         }
00360         return true;
00361     }
00362 
00363 
00364     bool ReportingComponent::unreportComponent( const std::string& component ) {
00365         TaskContext* comp = this->getPeer(component);
00366         if ( !comp ) {
00367             log(Error) << "Could not unreport Component " << component <<" : no such peer."<<endlog();
00368             return false;
00369         }
00370         Ports ports   = comp->ports()->getPorts();
00371         for (Ports::iterator it = ports.begin(); it != ports.end() ; ++it) {
00372             this->unreportDataSource( component + "." + (*it)->getName() );
00373             unreportPort(component, (*it)->getName() );
00374         }
00375         base::PropertyBase* pb = report_data.value().findValue<string>(component);
00376         if (pb)
00377             report_data.value().removeProperty( pb );// pb is deleted by bag
00378         return true;
00379     }
00380 
00381     // report a specific connection.
00382     bool ReportingComponent::reportPort(const std::string& component, const std::string& port ) {
00383         Logger::In in("ReportingComponent");
00384         TaskContext* comp = this->getPeer(component);
00385         if ( this->ports()->getPort(component +"_"+port) ) {
00386             log(Warning) <<"Already reporting "<<component<<"."<<port<<": removing old port first."<<endlog();
00387             this->unreportPort(component,port);
00388         }
00389         if ( !comp ) {
00390             log(Error) << "Could not report Component " << component <<" : no such peer."<<endlog();
00391             return false;
00392         }
00393         std::vector<std::string> strs;
00394         boost::split(strs, port, boost::is_any_of("."));
00395 
00396         // strs could be empty because of a bug in Boost 1.44 (see https://svn.boost.org/trac/boost/ticket/4751)
00397         if (strs.empty()) return false;
00398 
00399         Service::shared_ptr service=comp->provides();
00400         while ( strs.size() != 1 && service) {
00401             service = service->getService( strs.front() );
00402             if (service)
00403                 strs.erase( strs.begin() );
00404         }
00405         if (!service) {
00406             log(Error) <<"No such service: '"<< strs.front() <<"' while looking for port '"<< port<<"'"<<endlog();
00407             return 0;
00408         }
00409         base::PortInterface* porti = 0;
00410         porti = service->getPort(strs.front());
00411         if ( !porti ) {
00412             log(Error) << "Could not report Port " << port
00413                        <<" : no such port on Component "<<component<<"."<<endlog();
00414             return false;
00415         }
00416 
00417         base::InputPortInterface* ipi =  dynamic_cast<base::InputPortInterface*>(porti);
00418         if (ipi) {
00419             log(Error) << "Can not report InputPort "<< porti->getName() <<" of Component " << component <<endlog();
00420             return false;
00421         }
00422             // create new port temporarily
00423         // this port is only created with the purpose of
00424         // creating a connection object.
00425         base::PortInterface* ourport = porti->antiClone();
00426         assert(ourport);
00427         ourport->setName(component + "_" + port);
00428         ipi = dynamic_cast<base::InputPortInterface*> (ourport);
00429         assert(ipi);
00430 
00431         if (report_policy.type == ConnPolicy::DATA ) {
00432             log(Info) << "Not buffering of data flow connections. You may miss samples." <<endlog();
00433         } else {
00434             log(Info) << "Buffering ports with size "<< report_policy.size << ", as set in ReportPolicy property." <<endlog();
00435         }
00436 
00437         this->ports()->addEventPort( *ipi );
00438         if (porti->connectTo(ourport, report_policy ) == false)
00439         {
00440             log(Error) << "Could not connect to OutputPort " << porti->getName() << endlog();
00441             this->ports()->removePort(ourport->getName());
00442             delete ourport; // XXX/TODO We're leaking ourport !
00443             return false;
00444         }
00445 
00446         if (this->reportDataSource(component + "." + port, "Port",
00447                                    ipi->getDataSource(),ipi, true) == false)
00448         {
00449             log(Error) << "Failed reporting port " << port << endlog();
00450             this->ports()->removePort(ourport->getName());
00451             delete ourport;
00452             return false;
00453         }
00454 
00455         log(Info) << "Monitoring OutputPort " << port << " : ok." << endlog();
00456         // Add port to ReportData properties if component nor port are listed yet.
00457         if ( !report_data.value().findValue<string>(component) && !report_data.value().findValue<string>( component+"."+port) )
00458             report_data.value().ownProperty(new Property<string>("Port","",component+"."+port));
00459         return true;
00460     }
00461 
00462     bool ReportingComponent::unreportPort(const std::string& component, const std::string& port ) {
00463         base::PortInterface* ourport = this->ports()->getPort(component + "_" + port);
00464         if ( this->unreportDataSource( component + "." + port ) && report_data.value().removeProperty( report_data.value().findValue<string>(component+"."+port))) {
00465             this->ports()->removePort(ourport->getName());
00466             delete ourport; // also deletes datasource.
00467             return true;
00468         }
00469         return false;
00470     }
00471 
00472     // report a specific datasource, property,...
00473     bool ReportingComponent::reportData(const std::string& component,const std::string& dataname)
00474     {
00475         Logger::In in("ReportingComponent");
00476         TaskContext* comp = this->getPeer(component);
00477         if ( !comp ) {
00478             log(Error) << "Could not report Component " << component <<" : no such peer."<<endlog();
00479             return false;
00480         }
00481         // Is it an attribute ?
00482         if ( comp->provides()->getValue( dataname ) ) {
00483             if (this->reportDataSource( component + "." + dataname, "Data",
00484                                         comp->provides()->getValue( dataname )->getDataSource(), 0,  false ) == false) {
00485                 log(Error) << "Failed reporting data " << dataname <<endlog();
00486                 return false;
00487             }
00488         }
00489 
00490         // Is it a property ?
00491         if ( comp->properties() && comp->properties()->find( dataname ) ) {
00492             if (this->reportDataSource( component + "." + dataname, "Data",
00493                                         comp->properties()->find( dataname )->getDataSource(), 0, false ) == false) {
00494                 log(Error) << "Failed reporting data " << dataname <<endlog();
00495                 return false;
00496             }
00497         }
00498         // Ok. we passed.
00499         // Add port to ReportData properties if data not listed yet.
00500         if ( !report_data.value().findValue<string>( component+"."+dataname) )
00501             report_data.value().ownProperty(new Property<string>("Data","",component+"."+dataname));
00502         return true;
00503     }
00504 
00505     bool ReportingComponent::unreportData(const std::string& component,const std::string& datasource) {
00506         return this->unreportDataSource( component +"." + datasource) && report_data.value().removeProperty( report_data.value().findValue<string>(component+"."+datasource));
00507     }
00508 
00509     bool ReportingComponent::reportDataSource(std::string tag, std::string type, base::DataSourceBase::shared_ptr orig, base::InputPortInterface* ipi, bool track)
00510     {
00511         // check for duplicates:
00512         for (Reports::iterator it = root.begin();
00513              it != root.end(); ++it)
00514             if ( it->get<T_QualName>() == tag ) {
00515                 return true;
00516             }
00517 
00518         // creates a copy of the data and an update command to
00519         // update the copy from the original.
00520         base::DataSourceBase::shared_ptr clone = orig->getTypeInfo()->buildValue();
00521         if ( !clone ) {
00522             log(Error) << "Could not report '"<< tag <<"' : unknown type." << endlog();
00523             return false;
00524         }
00525         PropertyBase* prop = 0;
00526         root.push_back( boost::make_tuple( tag, orig, type, prop, ipi, false, track ) );
00527         return true;
00528     }
00529 
00530     bool ReportingComponent::unreportDataSource(std::string tag)
00531     {
00532         for (Reports::iterator it = root.begin();
00533              it != root.end(); ++it)
00534             if ( it->get<T_QualName>() == tag ) {
00535                 root.erase(it);
00536                 return true;
00537             }
00538         return false;
00539     }
00540 
00541     bool ReportingComponent::startHook() {
00542         Logger::In in("ReportingComponent");
00543         if (marshallers.begin() == marshallers.end()) {
00544             log(Error) << "Need at least one marshaller to write reports." <<endlog();
00545             return false;
00546         }
00547 
00548         if(synchronize_with_logging.get())
00549             starttime = Logger::Instance()->getReferenceTime();
00550         else
00551             starttime = os::TimeService::Instance()->getTicks();
00552 
00553         // Get initial data samples
00554         this->copydata();
00555         this->makeReport2();
00556 
00557         // write headers
00558         if (writeHeader.get()) {
00559             // call all header marshallers.
00560             for(Marshallers::iterator it=marshallers.begin(); it != marshallers.end(); ++it) {
00561                 it->first->serialize( report );
00562                 it->first->flush();
00563             }
00564         }
00565 
00566         // write initial values with all value marshallers (uses the forcing above)
00567         if ( getActivity()->isPeriodic() ) {
00568             for(Marshallers::iterator it=marshallers.begin(); it != marshallers.end(); ++it) {
00569                 it->second->serialize( report );
00570                 it->second->flush();
00571             }
00572         }
00573 
00574         // Turn off port triggering in snapshot mode, and vice versa.
00575         // Also clears any old data in the buffers
00576         for(Reports::iterator it = root.begin(); it != root.end(); ++it )
00577             if ( it->get<T_Port>() ) {
00578 #ifndef ORO_SIGNALLING_PORTS
00579                 it->get<T_Port>()->signalInterface( !insnapshot.get() );
00580 #endif
00581                 it->get<T_Port>()->clear();
00582             }
00583 
00584 
00585         snapshotted = false;
00586         return true;
00587     }
00588 
00589     void ReportingComponent::snapshot() {
00590         // this function always copies and reports all data It's run in ownthread, so updateHook will be run later.
00591         if ( getActivity()->isPeriodic() )
00592             return;
00593         snapshotted = true;
00594         updateHook();
00595     }
00596 
00597     bool ReportingComponent::copydata() {
00598         timestamp = os::TimeService::Instance()->secondsSince( starttime );
00599 
00600         // result will become true if more data is to be read.
00601         bool result = false;
00602         // This evaluates the InputPortDataSource evaluate() returns true upon new data.
00603         for(Reports::iterator it = root.begin(); it != root.end(); ++it ) {
00604             it->get<T_NewData>() = (it->get<T_PortDS>())->evaluate(); // stores 'NewData' flag.
00605             // if its a property/attr, get<T_NewData> will always be true, so we override (clear) with get<T_Tracked>.
00606             result = result || ( it->get<T_NewData>() && it->get<T_Tracked>() );
00607         }
00608         return result;
00609     }
00610 
00611     void ReportingComponent::makeReport2()
00612     {
00613         // Uses the port DS itself to make the report.
00614         assert( report.empty() );
00615         // For the timestamp, we need to add a new property object:
00616         report.add( timestamp.getTypeInfo()->buildProperty( timestamp.getName(), "", timestamp.getDataSource() ) );
00617         DataSource<bool>::shared_ptr checker;
00618         for(Reports::iterator it = root.begin(); it != root.end(); ++it ) {
00619             Property<PropertyBag>* subbag = new Property<PropertyBag>( it->get<T_QualName>(), "");
00620             if ( decompose.get() && memberDecomposition( it->get<T_PortDS>(), subbag->value(), checker ) ) {
00621                 report.add( subbag );
00622                 it->get<T_Property>() = subbag;
00623             } else {
00624                 // property or simple value port...
00625                 base::DataSourceBase::shared_ptr converted = it->get<T_PortDS>()->getTypeInfo()->convertType( it->get<T_PortDS>() );
00626                 if ( converted && converted != it->get<T_PortDS>() ) {
00627                     // converted contains another type.
00628                     PropertyBase* convProp = converted->getTypeInfo()->buildProperty(it->get<T_QualName>(), "", converted);
00629                     it->get<T_Property>() = convProp;
00630                     report.add(convProp);
00631                 } else {
00632                     PropertyBase* origProp = it->get<T_PortDS>()->getTypeInfo()->buildProperty(it->get<T_QualName>(), "", it->get<T_PortDS>());
00633                     it->get<T_Property>() = origProp;
00634                     report.add(origProp);
00635                 }
00636                 delete subbag;
00637             }
00638 
00639         }
00640         mchecker = checker;
00641     }
00642         
00643     void ReportingComponent::cleanReport()
00644     {
00645         // Only clones were added to result, so delete them.
00646         deletePropertyBag( report );
00647     }
00648 
00649     void ReportingComponent::updateHook() {
00650         //If not periodic and insnapshot is true, only continue if snapshot is called.
00651         if( !getActivity()->isPeriodic() && insnapshot.get() && !snapshotted)
00652             return;
00653         else
00654             snapshotted = false;
00655 
00656         // if any data sequence got resized, we rebuild the whole bunch.
00657         // otherwise, we need to track every individual array (not impossible though, but still needs an upstream concept).
00658         if ( mchecker && mchecker->get() == false ) {
00659             cleanReport();
00660             makeReport2();
00661         } else
00662             copydata();
00663 
00664         do {
00665             // Step 3: print out the result
00666             // write out to all marshallers
00667             for(Marshallers::iterator it=marshallers.begin(); it != marshallers.end(); ++it) {
00668                 if ( onlyNewData ) {
00669                     // Serialize only changed ports:
00670                     it->second->serialize( *report.begin() ); // TimeStamp.
00671                     for (Reports::const_iterator i = root.begin();
00672                          i != root.end();
00673                          i++ )
00674                         {
00675                             if ( i->get<T_NewData>() )
00676                                 it->second->serialize( i->get<T_Property>() );
00677                         }
00678                 } else {
00679                     // pass on all ports to the marshaller
00680                     it->second->serialize( report );
00681                 }
00682                 it->second->flush();
00683             }
00684         } while( !getActivity()->isPeriodic() && !insnapshot.get() && copydata() ); // repeat if necessary. In periodic mode we always only sample once.
00685     }
00686 
00687     void ReportingComponent::stopHook() {
00688         // tell body marshallers that serialization is done.
00689         for(Marshallers::iterator it=marshallers.begin(); it != marshallers.end(); ++it) {
00690             it->second->flush();
00691         }
00692         cleanReport();
00693     }
00694 
00695 }


ocl
Author(s): OCL Development Team
autogenerated on Mon Sep 14 2015 14:21:46