$search
00001 /*************************************************************************** 00002 tag: Peter Soetens Mon May 10 19:10:38 CEST 2004 ReportingComponent.cxx 00003 00004 ReportingComponent.cxx - description 00005 ------------------- 00006 begin : Mon May 10 2004 00007 copyright : (C) 2004 Peter Soetens 00008 email : peter.soetens@mech.kuleuven.ac.be 00009 00010 *************************************************************************** 00011 * This library is free software; you can redistribute it and/or * 00012 * modify it under the terms of the GNU Lesser General Public * 00013 * License as published by the Free Software Foundation; either * 00014 * version 2.1 of the License, or (at your option) any later version. * 00015 * * 00016 * This library is distributed in the hope that it will be useful, * 00017 * but WITHOUT ANY WARRANTY; without even the implied warranty of * 00018 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * 00019 * Lesser General Public License for more details. * 00020 * * 00021 * You should have received a copy of the GNU Lesser General Public * 00022 * License along with this library; if not, write to the Free Software * 00023 * Foundation, Inc., 59 Temple Place, * 00024 * Suite 330, Boston, MA 02111-1307 USA * 00025 * * 00026 ***************************************************************************/ 00027 00028 #include "ReportingComponent.hpp" 00029 #include <rtt/Logger.hpp> 00030 00031 // Impl. 00032 #include "EmptyMarshaller.hpp" 00033 #include <rtt/marsh/PropertyDemarshaller.hpp> 00034 #include <rtt/marsh/PropertyMarshaller.hpp> 00035 #include <iostream> 00036 #include <fstream> 00037 #include <exception> 00038 #include <boost/algorithm/string.hpp> 00039 00040 #include "ocl/Component.hpp" 00041 #include <rtt/types/PropertyDecomposition.hpp> 00042 #include <boost/lexical_cast.hpp> 00043 00044 ORO_CREATE_COMPONENT_TYPE() 00045 00046 00047 namespace OCL 00048 { 00049 using namespace std; 00050 using namespace RTT; 00051 using namespace RTT::detail; 00052 00055 class CheckSizeDataSource : public ValueDataSource<bool> 00056 { 00057 mutable int msize; 00058 DataSource<int>::shared_ptr mds; 00059 DataSource<bool>::shared_ptr mupstream; 00060 public: 00061 CheckSizeDataSource(int size, DataSource<int>::shared_ptr ds, DataSource<bool>::shared_ptr upstream) 00062 : msize(size), mds(ds), mupstream(upstream) 00063 {} 00067 bool get() const{ 00068 // it's very important to first check upstream, because 00069 // if upstream changed size, downstream might already be corrupt ! 00070 // (downstream will be corrupt upon capacity changes upstream) 00071 bool result = true; 00072 if (mupstream) 00073 result = (mupstream->get() && msize == mds->get()); 00074 else 00075 result = (msize == mds->get()); 00076 msize = mds->get(); 00077 return result; 00078 } 00079 }; 00080 00087 bool memberDecomposition( base::DataSourceBase::shared_ptr dsb, PropertyBag& targetbag, DataSource<bool>::shared_ptr& resized) 00088 { 00089 assert(dsb); 00090 00091 vector<string> parts = dsb->getMemberNames(); 00092 if ( parts.empty() ) { 00093 return false; 00094 } 00095 00096 targetbag.setType( dsb->getTypeName() ); 00097 00098 // needed for recursion. 00099 auto_ptr< Property<PropertyBag> > recurse_bag( new Property<PropertyBag>("recurse_bag","") ); 00100 // First at the explicitly listed parts: 00101 for(vector<string>::iterator it = parts.begin(); it != parts.end(); ++it ) { 00102 // we first force getMember to get to the type, then we do it again but with a reference set. 00103 DataSourceBase::shared_ptr part = dsb->getMember( *it ); 00104 if (!part) { 00105 log(Error) <<"memberDecomposition: Inconsistent type info for "<< dsb->getTypeName() << ": reported to have part '"<<*it<<"' but failed to return it."<<endlog(); 00106 continue; 00107 } 00108 if ( !part->isAssignable() ) { 00109 // For example: the case for size() and capacity() in SequenceTypeInfo 00110 log(Debug)<<"memberDecomposition: Part "<< *it << ":"<< part->getTypeName() << " is not changeable."<<endlog(); 00111 continue; 00112 } 00113 // now the reference magic: 00114 DataSourceBase::shared_ptr ref = part->getTypeInfo()->buildReference( 0 ); 00115 dsb->getTypeInfo()->getMember( dynamic_cast<Reference*>(ref.get() ), dsb, *it); // fills in ref 00116 // newpb will contain a reference to the port's datasource data ! 00117 PropertyBase* newpb = part->getTypeInfo()->buildProperty(*it,"Part", ref); 00118 if ( !newpb ) { 00119 log(Error)<< "Decomposition failed because Part '"<<*it<<"' is not known to type system."<<endlog(); 00120 continue; 00121 } 00122 // finally recurse or add it to the target bag: 00123 if ( !memberDecomposition( ref, recurse_bag->value(), resized) ) { 00124 assert( recurse_bag->value().empty() ); 00125 // finally: check for conversions (enums use this): 00126 base::DataSourceBase::shared_ptr converted = newpb->getTypeInfo()->convertType( dsb ); 00127 if ( converted && converted != dsb ) { 00128 // converted contains another type. 00129 targetbag.add( converted->getTypeInfo()->buildProperty(*it, "", converted) ); 00130 delete newpb; 00131 } else 00132 targetbag.ownProperty( newpb ); // leaf 00133 } else { 00134 recurse_bag->setName(*it); 00135 // setType() is done by recursive of self. 00136 targetbag.ownProperty( recurse_bag.release() ); //recursed. 00137 recurse_bag.reset( new Property<PropertyBag>("recurse_bag","") ); 00138 delete newpb; // since we recursed, the recurse_bag now 'embodies' newpb. 00139 } 00140 } 00141 00142 // Next get the numbered parts. This is much more involved since sequences may be resizable. 00143 // We keep track of the size, and if that changes, we will have to force a re-decomposition 00144 // of the sequence's internals. 00145 DataSource<int>::shared_ptr size = DataSource<int>::narrow( dsb->getMember("size").get() ); 00146 if (size) { 00147 int msize = size->get(); 00148 for (int i=0; i < msize; ++i) { 00149 string indx = boost::lexical_cast<string>( i ); 00150 DataSourceBase::shared_ptr item = dsb->getMember(indx); 00151 resized = new CheckSizeDataSource( msize, size, resized ); 00152 if (item) { 00153 if ( !item->isAssignable() ) { 00154 // For example: the case for size() and capacity() in SequenceTypeInfo 00155 log(Warning)<<"memberDecomposition: Item '"<< indx << "' of type "<< dsb->getTypeName() << " is not changeable."<<endlog(); 00156 continue; 00157 } 00158 // finally recurse or add it to the target bag: 00159 PropertyBase* newpb = item->getTypeInfo()->buildProperty( indx,"",item); 00160 if ( !memberDecomposition( item, recurse_bag->value(), resized) ) { 00161 targetbag.ownProperty( newpb ); // leaf 00162 } else { 00163 delete newpb; 00164 recurse_bag->setName( indx ); 00165 // setType() is done by recursive of self. 00166 targetbag.ownProperty( recurse_bag.release() ); //recursed. 00167 recurse_bag.reset( new Property<PropertyBag>("recurse_bag","") ); 00168 } 00169 } 00170 } 00171 } 00172 if (targetbag.empty() ) 00173 log(Debug) << "memberDecomposition: "<< dsb->getTypeName() << " returns an empty property bag." << endlog(); 00174 return true; 00175 } 00176 00177 ReportingComponent::ReportingComponent( std::string name /*= "Reporting" */ ) 00178 : TaskContext( name ), 00179 report("Report"), snapshotted(false), 00180 writeHeader("WriteHeader","Set to true to start each report with a header.", true), 00181 decompose("Decompose","Set to false in order to create multidimensional array in netcdf", true), 00182 insnapshot("Snapshot","Set to true to enable snapshot mode. This will cause a non-periodic reporter to only report data upon the snapshot() operation.",false), 00183 synchronize_with_logging("Synchronize","Set to true if the timestamp should be synchronized with the logging",false), 00184 report_data("ReportData","A PropertyBag which defines which ports or components to report."), 00185 starttime(0), 00186 timestamp("TimeStamp","The time at which the data was read.",0.0) 00187 { 00188 this->provides()->doc("Captures data on data ports. A periodic reporter will sample each added port according to its period, a non-periodic reporter will write out data as it comes in, or only during a snapshot() if the Snapshot property is true."); 00189 00190 this->properties()->addProperty( writeHeader ); 00191 this->properties()->addProperty( decompose ); 00192 this->properties()->addProperty( insnapshot ); 00193 this->properties()->addProperty( synchronize_with_logging); 00194 this->properties()->addProperty( report_data); 00195 00196 // Add the methods, methods make sure that they are 00197 // executed in the context of the (non realtime) caller. 00198 00199 this->addOperation("snapshot", &ReportingComponent::snapshot , this, RTT::OwnThread).doc("Take a new shapshot of all data and cause them to be written out."); 00200 this->addOperation("screenComponent", &ReportingComponent::screenComponent , this, RTT::ClientThread).doc("Display the variables and ports of a Component.").arg("Component", "Name of the Component"); 00201 this->addOperation("reportComponent", &ReportingComponent::reportComponent , this, RTT::ClientThread).doc("Add a peer Component and report all its data ports").arg("Component", "Name of the Component"); 00202 this->addOperation("unreportComponent", &ReportingComponent::unreportComponent , this, RTT::ClientThread).doc("Remove all Component's data ports from reporting.").arg("Component", "Name of the Component"); 00203 this->addOperation("reportData", &ReportingComponent::reportData , this, RTT::ClientThread).doc("Add a Component's Property or attribute for reporting.").arg("Component", "Name of the Component").arg("Data", "Name of the Data to report. A property's or attribute's name."); 00204 this->addOperation("unreportData", &ReportingComponent::unreportData , this, RTT::ClientThread).doc("Remove a Data object from reporting.").arg("Component", "Name of the Component").arg("Data", "Name of the property or attribute."); 00205 this->addOperation("reportPort", &ReportingComponent::reportPort , this, RTT::ClientThread).doc("Add a Component's OutputPort for reporting.").arg("Component", "Name of the Component").arg("Port", "Name of the Port."); 00206 this->addOperation("unreportPort", &ReportingComponent::unreportPort , this, RTT::ClientThread).doc("Remove a Port from reporting.").arg("Component", "Name of the Component").arg("Port", "Name of the Port."); 00207 00208 } 00209 00210 ReportingComponent::~ReportingComponent() {} 00211 00212 00213 bool ReportingComponent::addMarshaller( marsh::MarshallInterface* headerM, marsh::MarshallInterface* bodyM) 00214 { 00215 boost::shared_ptr<marsh::MarshallInterface> header(headerM); 00216 boost::shared_ptr<marsh::MarshallInterface> body(bodyM); 00217 if ( !header && !body) 00218 return false; 00219 if ( !header ) 00220 header.reset( new EmptyMarshaller() ); 00221 if ( !body) 00222 body.reset( new EmptyMarshaller()); 00223 00224 marshallers.push_back( std::make_pair( header, body ) ); 00225 return true; 00226 } 00227 00228 bool ReportingComponent::removeMarshallers() 00229 { 00230 marshallers.clear(); 00231 return true; 00232 } 00233 00234 void ReportingComponent::cleanupHook() 00235 { 00236 root.clear(); // uses shared_ptr. 00237 deletePropertyBag( report ); 00238 } 00239 00240 bool ReportingComponent::configureHook() 00241 { 00242 Logger::In in("ReportingComponent"); 00243 00244 // we make a copy to be allowed to iterate over and exted report_data: 00245 PropertyBag bag = report_data.value(); 00246 00247 if ( bag.empty() ) { 00248 log(Error) <<"No port or component configuration loaded."<<endlog(); 00249 log(Error) <<"Please use marshalling.loadProperties(), reportComponent() (scripting) or LoadProperties (XML) in order to fill in ReportData." <<endlog(); 00250 return false; 00251 } 00252 00253 bool ok = true; 00254 PropertyBag::const_iterator it = bag.getProperties().begin(); 00255 while ( it != bag.getProperties().end() ) 00256 { 00257 Property<std::string>* compName = dynamic_cast<Property<std::string>* >( *it ); 00258 if ( !compName ) 00259 log(Error) << "Expected Property \"" 00260 << (*it)->getName() <<"\" to be of type string."<< endlog(); 00261 else if ( compName->getName() == "Component" ) { 00262 std::string name = compName->value(); // we will delete this property ! 00263 this->unreportComponent( name ); 00264 ok &= this->reportComponent( name ); 00265 } 00266 else if ( compName->getName() == "Port" ) { 00267 string cname = compName->value().substr(0, compName->value().find(".")); 00268 string pname = compName->value().substr( compName->value().find(".")+1, string::npos); 00269 if (cname.empty() || pname.empty() ) { 00270 log(Error) << "The Port value '"<<compName->getName()<< "' must at least consist of a component name followed by a dot and the port name." <<endlog(); 00271 ok = false; 00272 continue; 00273 } 00274 this->unreportPort(cname,pname); 00275 ok &= this->reportPort(cname, pname); 00276 } 00277 else if ( compName->getName() == "Data" ) { 00278 string cname = compName->value().substr(0, compName->value().find(".")); 00279 string pname = compName->value().substr( compName->value().find(".")+1, string::npos); 00280 if (cname.empty() || pname.empty() ) { 00281 log(Error) << "The Data value '"<<compName->getName()<< "' must at least consist of a component name followed by a dot and the property/attribute name." <<endlog(); 00282 ok = false; 00283 continue; 00284 } 00285 this->unreportData(cname,pname); 00286 ok &= this->reportData(cname, pname); 00287 } 00288 else { 00289 log(Error) << "Expected \"Component\", \"Port\" or \"Data\", got " 00290 << compName->getName() << endlog(); 00291 ok = false; 00292 } 00293 ++it; 00294 } 00295 return ok; 00296 } 00297 00298 bool ReportingComponent::screenComponent( const std::string& comp ) 00299 { 00300 Logger::In in("ReportingComponent::screenComponent"); 00301 log(Error) << "not implemented." <<comp<<endlog(); 00302 return false; 00303 } 00304 00305 bool ReportingComponent::screenImpl( const std::string& comp, std::ostream& output) 00306 { 00307 Logger::In in("ReportingComponent"); 00308 TaskContext* c = this->getPeer(comp); 00309 if ( c == 0) { 00310 log(Error) << "Unknown Component: " <<comp<<endlog(); 00311 return false; 00312 } 00313 output << "Screening Component '"<< comp << "' : "<< endl << endl; 00314 PropertyBag* bag = c->properties(); 00315 if (bag) { 00316 output << "Properties :" << endl; 00317 for (PropertyBag::iterator it= bag->begin(); it != bag->end(); ++it) 00318 output << " " << (*it)->getName() << " : " << (*it)->getDataSource() << endl; 00319 } 00320 ConfigurationInterface::AttributeNames atts = c->provides()->getAttributeNames(); 00321 if ( !atts.empty() ) { 00322 output << "Attributes :" << endl; 00323 for (ConfigurationInterface::AttributeNames::iterator it= atts.begin(); it != atts.end(); ++it) 00324 output << " " << *it << " : " << c->provides()->getValue(*it)->getDataSource() << endl; 00325 } 00326 00327 vector<string> ports = c->ports()->getPortNames(); 00328 if ( !ports.empty() ) { 00329 output << "Ports :" << endl; 00330 for (vector<string>::iterator it= ports.begin(); it != ports.end(); ++it) { 00331 output << " " << *it << " : "; 00332 if (c->ports()->getPort(*it)->connected() ) 00333 output << "(connected)" << endl; 00334 else 00335 output << "(not connected)" << endl; 00336 } 00337 } 00338 return true; 00339 } 00340 00341 bool ReportingComponent::reportComponent( const std::string& component ) { 00342 Logger::In in("ReportingComponent"); 00343 // Users may add own data sources, so avoid duplicates 00344 //std::vector<std::string> sources = comp->data()->getNames(); 00345 TaskContext* comp = this->getPeer(component); 00346 if ( !comp ) { 00347 log(Error) << "Could not report Component " << component <<" : no such peer."<<endlog(); 00348 return false; 00349 } 00350 if ( !report_data.value().findValue<string>(component) ) 00351 report_data.value().ownProperty( new Property<string>("Component","",component) ); 00352 Ports ports = comp->ports()->getPorts(); 00353 for (Ports::iterator it = ports.begin(); it != ports.end() ; ++it) { 00354 log(Debug) << "Checking port " << (*it)->getName()<<"."<<endlog(); 00355 this->reportPort( component, (*it)->getName() ); 00356 } 00357 return true; 00358 } 00359 00360 00361 bool ReportingComponent::unreportComponent( const std::string& component ) { 00362 TaskContext* comp = this->getPeer(component); 00363 if ( !comp ) { 00364 log(Error) << "Could not unreport Component " << component <<" : no such peer."<<endlog(); 00365 return false; 00366 } 00367 Ports ports = comp->ports()->getPorts(); 00368 for (Ports::iterator it = ports.begin(); it != ports.end() ; ++it) { 00369 this->unreportDataSource( component + "." + (*it)->getName() ); 00370 unreportPort(component, (*it)->getName() ); 00371 } 00372 base::PropertyBase* pb = report_data.value().findValue<string>(component); 00373 if (pb) 00374 report_data.value().removeProperty( pb );// pb is deleted by bag 00375 return true; 00376 } 00377 00378 // report a specific connection. 00379 bool ReportingComponent::reportPort(const std::string& component, const std::string& port ) { 00380 Logger::In in("ReportingComponent"); 00381 TaskContext* comp = this->getPeer(component); 00382 if ( this->ports()->getPort(component +"_"+port) ) { 00383 log(Warning) <<"Already reporting "<<component<<"."<<port<<": removing old port first."<<endlog(); 00384 this->unreportPort(component,port); 00385 } 00386 if ( !comp ) { 00387 log(Error) << "Could not report Component " << component <<" : no such peer."<<endlog(); 00388 return false; 00389 } 00390 std::vector<std::string> strs; 00391 boost::split(strs, port, boost::is_any_of(".")); 00392 00393 // strs could be empty because of a bug in Boost 1.44 (see https://svn.boost.org/trac/boost/ticket/4751) 00394 if (strs.empty()) return false; 00395 00396 Service::shared_ptr service=comp->provides(); 00397 while ( strs.size() != 1 && service) { 00398 service = service->getService( strs.front() ); 00399 if (service) 00400 strs.erase( strs.begin() ); 00401 } 00402 if (!service) { 00403 log(Error) <<"No such service: '"<< strs.front() <<"' while looking for port '"<< port<<"'"<<endlog(); 00404 return 0; 00405 } 00406 base::PortInterface* porti = 0; 00407 porti = service->getPort(strs.front()); 00408 if ( !porti ) { 00409 log(Error) << "Could not report Port " << port 00410 <<" : no such port on Component "<<component<<"."<<endlog(); 00411 return false; 00412 } 00413 00414 base::InputPortInterface* ipi = dynamic_cast<base::InputPortInterface*>(porti); 00415 if (ipi) { 00416 log(Error) << "Can not report InputPort "<< porti->getName() <<" of Component " << component <<endlog(); 00417 return false; 00418 } 00419 // create new port temporarily 00420 // this port is only created with the purpose of 00421 // creating a connection object. 00422 base::PortInterface* ourport = porti->antiClone(); 00423 assert(ourport); 00424 ourport->setName(component + "_" + porti->getName()); 00425 ipi = dynamic_cast<base::InputPortInterface*> (ourport); 00426 assert(ipi); 00427 00428 ConnPolicy pol; 00429 log(Info) << "Not buffering of data flow connections. You may miss samples." <<endlog(); 00430 pol = ConnPolicy::data(ConnPolicy::LOCK_FREE,true,false); 00431 00432 if (porti->connectTo(ourport, pol ) == false) 00433 { 00434 log(Error) << "Could not connect to OutputPort " << porti->getName() << endlog(); 00435 delete ourport; // XXX/TODO We're leaking ourport ! 00436 return false; 00437 } 00438 00439 if (this->reportDataSource(component + "." + porti->getName(), "Port", 00440 ipi->getDataSource(), true) == false) 00441 { 00442 log(Error) << "Failed reporting port " << port << endlog(); 00443 delete ourport; 00444 return false; 00445 } 00446 this->ports()->addEventPort( *ipi ); 00447 log(Info) << "Monitoring OutputPort " << porti->getName() << " : ok." << endlog(); 00448 // Add port to ReportData properties if component nor port are listed yet. 00449 if ( !report_data.value().findValue<string>(component) && !report_data.value().findValue<string>( component+"."+port) ) 00450 report_data.value().ownProperty(new Property<string>("Port","",component+"."+port)); 00451 return true; 00452 } 00453 00454 bool ReportingComponent::unreportPort(const std::string& component, const std::string& port ) { 00455 base::PortInterface* ourport = this->ports()->getPort(component + "_" + port); 00456 if ( this->unreportDataSource( component + "." + port ) && report_data.value().removeProperty( report_data.value().findValue<string>(component+"."+port))) { 00457 this->ports()->removePort(ourport->getName()); 00458 delete ourport; // also deletes datasource. 00459 return true; 00460 } 00461 return false; 00462 } 00463 00464 // report a specific datasource, property,... 00465 bool ReportingComponent::reportData(const std::string& component,const std::string& dataname) 00466 { 00467 Logger::In in("ReportingComponent"); 00468 TaskContext* comp = this->getPeer(component); 00469 if ( !comp ) { 00470 log(Error) << "Could not report Component " << component <<" : no such peer."<<endlog(); 00471 return false; 00472 } 00473 // Is it an attribute ? 00474 if ( comp->provides()->getValue( dataname ) ) { 00475 if (this->reportDataSource( component + "." + dataname, "Data", 00476 comp->provides()->getValue( dataname )->getDataSource(), false ) == false) { 00477 log(Error) << "Failed reporting data " << dataname <<endlog(); 00478 return false; 00479 } 00480 } 00481 00482 // Is it a property ? 00483 if ( comp->properties() && comp->properties()->find( dataname ) ) { 00484 if (this->reportDataSource( component + "." + dataname, "Data", 00485 comp->properties()->find( dataname )->getDataSource(), false ) == false) { 00486 log(Error) << "Failed reporting data " << dataname <<endlog(); 00487 return false; 00488 } 00489 } 00490 // Ok. we passed. 00491 // Add port to ReportData properties if data not listed yet. 00492 if ( !report_data.value().findValue<string>( component+"."+dataname) ) 00493 report_data.value().ownProperty(new Property<string>("Data","",component+"."+dataname)); 00494 return true; 00495 } 00496 00497 bool ReportingComponent::unreportData(const std::string& component,const std::string& datasource) { 00498 return this->unreportDataSource( component +"." + datasource) && report_data.value().removeProperty( report_data.value().findValue<string>(component+"."+datasource)); 00499 } 00500 00501 bool ReportingComponent::reportDataSource(std::string tag, std::string type, base::DataSourceBase::shared_ptr orig, bool track) 00502 { 00503 // check for duplicates: 00504 for (Reports::iterator it = root.begin(); 00505 it != root.end(); ++it) 00506 if ( it->get<T_QualName>() == tag ) { 00507 return true; 00508 } 00509 00510 // creates a copy of the data and an update command to 00511 // update the copy from the original. 00512 base::DataSourceBase::shared_ptr clone = orig->getTypeInfo()->buildValue(); 00513 if ( !clone ) { 00514 log(Error) << "Could not report '"<< tag <<"' : unknown type." << endlog(); 00515 return false; 00516 } 00517 root.push_back( boost::make_tuple( tag, orig, type, false, track ) ); 00518 return true; 00519 } 00520 00521 bool ReportingComponent::unreportDataSource(std::string tag) 00522 { 00523 for (Reports::iterator it = root.begin(); 00524 it != root.end(); ++it) 00525 if ( it->get<T_QualName>() == tag ) { 00526 root.erase(it); 00527 return true; 00528 } 00529 return false; 00530 } 00531 00532 bool ReportingComponent::startHook() { 00533 Logger::In in("ReportingComponent"); 00534 if (marshallers.begin() == marshallers.end()) { 00535 log(Error) << "Need at least one marshaller to write reports." <<endlog(); 00536 return false; 00537 } 00538 00539 if(synchronize_with_logging.get()) 00540 starttime = Logger::Instance()->getReferenceTime(); 00541 else 00542 starttime = os::TimeService::Instance()->getTicks(); 00543 00544 // Get initial data samples 00545 this->copydata(); 00546 this->makeReport2(); 00547 00548 // write headers 00549 if (writeHeader.get()) { 00550 // call all header marshallers. 00551 for(Marshallers::iterator it=marshallers.begin(); it != marshallers.end(); ++it) { 00552 it->first->serialize( report ); 00553 it->first->flush(); 00554 } 00555 } 00556 00557 // write initial values with all value marshallers (uses the forcing above) 00558 if ( getActivity()->isPeriodic() ) { 00559 for(Marshallers::iterator it=marshallers.begin(); it != marshallers.end(); ++it) { 00560 it->second->serialize( report ); 00561 it->second->flush(); 00562 } 00563 } 00564 00565 snapshotted = false; 00566 return true; 00567 } 00568 00569 void ReportingComponent::snapshot() { 00570 // this function always copies and reports all data It's run in ownthread, so updateHook will be run later. 00571 if ( getActivity()->isPeriodic() ) 00572 return; 00573 copydata(); 00574 snapshotted = true; 00575 updateHook(); 00576 } 00577 00578 bool ReportingComponent::copydata() { 00579 timestamp = os::TimeService::Instance()->secondsSince( starttime ); 00580 00581 // result will become true if more data is to be read. 00582 bool result = false; 00583 // This evaluates the InputPortDataSource evaluate() returns true upon new data. 00584 for(Reports::iterator it = root.begin(); it != root.end(); ++it ) { 00585 it->get<T_NewData>() = (it->get<T_PortDS>())->evaluate(); // stores 'NewData' flag. 00586 // if its a property/attr, get<T_NewData> will always be true, so we override (clear) with get<T_Tracked>. 00587 result = result || ( it->get<T_NewData>() && it->get<T_Tracked>() ); 00588 } 00589 return result; 00590 } 00591 00592 void ReportingComponent::makeReport2() 00593 { 00594 // Uses the port DS itself to make the report. 00595 assert( report.empty() ); 00596 // For the timestamp, we need to add a new property object: 00597 report.add( timestamp.getTypeInfo()->buildProperty( timestamp.getName(), "", timestamp.getDataSource() ) ); 00598 DataSource<bool>::shared_ptr checker; 00599 for(Reports::iterator it = root.begin(); it != root.end(); ++it ) { 00600 Property<PropertyBag>* subbag = new Property<PropertyBag>( it->get<T_QualName>(), ""); 00601 if ( memberDecomposition( it->get<T_PortDS>(), subbag->value(), checker ) ) 00602 report.add( subbag ); 00603 else { 00604 // property or simple value port... 00605 base::DataSourceBase::shared_ptr converted = it->get<T_PortDS>()->getTypeInfo()->convertType( it->get<T_PortDS>() ); 00606 if ( converted && converted != it->get<T_PortDS>() ) { 00607 // converted contains another type. 00608 report.add( converted->getTypeInfo()->buildProperty(it->get<T_QualName>(), "", converted) ); 00609 } else 00610 report.add( it->get<T_PortDS>()->getTypeInfo()->buildProperty(it->get<T_QualName>(), "", it->get<T_PortDS>()) ); 00611 delete subbag; 00612 } 00613 00614 } 00615 mchecker = checker; 00616 timestamp = 0.0; // reset. 00617 } 00618 00619 void ReportingComponent::cleanReport() 00620 { 00621 // Only clones were added to result, so delete them. 00622 deletePropertyBag( report ); 00623 } 00624 00625 void ReportingComponent::updateHook() { 00626 //If not periodic and insnapshot is true, only continue if snapshot is called. 00627 if( !getActivity()->isPeriodic() && insnapshot.get() && !snapshotted) 00628 return; 00629 else 00630 snapshotted = false; 00631 00632 // if any data sequence got resized, we rebuild the whole bunch. 00633 // otherwise, we need to track every individual array (not impossible though, but still needs an upstream concept). 00634 if ( mchecker && mchecker->get() == false ) { 00635 cleanReport(); 00636 makeReport2(); 00637 } else 00638 copydata(); 00639 00640 do { 00641 // Step 3: print out the result 00642 // write out to all marshallers 00643 for(Marshallers::iterator it=marshallers.begin(); it != marshallers.end(); ++it) { 00644 it->second->serialize( report ); 00645 it->second->flush(); 00646 } 00647 } while( !getActivity()->isPeriodic() && !insnapshot.get() && copydata() ); // repeat if necessary. In periodic mode we always only sample once. 00648 } 00649 00650 void ReportingComponent::stopHook() { 00651 // tell body marshallers that serialization is done. 00652 for(Marshallers::iterator it=marshallers.begin(); it != marshallers.end(); ++it) { 00653 it->second->flush(); 00654 } 00655 cleanReport(); 00656 } 00657 00658 }