00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028 #include "ReportingComponent.hpp"
00029 #include <rtt/Logger.hpp>
00030
00031
00032 #include "EmptyMarshaller.hpp"
00033 #include <rtt/marsh/PropertyDemarshaller.hpp>
00034 #include <rtt/marsh/PropertyMarshaller.hpp>
00035 #include <iostream>
00036 #include <fstream>
00037 #include <exception>
00038 #include <boost/algorithm/string.hpp>
00039
00040 #include "ocl/Component.hpp"
00041 #include <rtt/types/PropertyDecomposition.hpp>
00042
00043 ORO_CREATE_COMPONENT_TYPE()
00044
00045
00046 namespace OCL
00047 {
00048 using namespace std;
00049 using namespace RTT;
00050 using namespace RTT::detail;
00051
00052 ReportingComponent::ReportingComponent( std::string name )
00053 : TaskContext( name ),
00054 report("Report"),
00055 snapshotOnly("SnapshotOnly","Set to true to only log data if a snapshot() was done.", false),
00056 writeHeader("WriteHeader","Set to true to start each report with a header.", true),
00057 decompose("Decompose","Set to false in order to create multidimensional array in netcdf", true),
00058 synchronize_with_logging("Synchronize","Set to true if the timestamp should be synchronized with the logging",false),
00059 report_data("ReportData","A PropertyBag which defines which ports or components to report."),
00060 null("NullSample","The characters written to the log to indicate that no new data was available for that port during a snapshot(). As a special value, the string 'last' is interpreted as repeating the last value.","last"),
00061 starttime(0),
00062 timestamp("TimeStamp","The time at which the data was read.",0.0)
00063 {
00064 this->properties()->addProperty( snapshotOnly );
00065 this->properties()->addProperty( writeHeader );
00066 this->properties()->addProperty( decompose );
00067 this->properties()->addProperty( synchronize_with_logging);
00068 this->properties()->addProperty( report_data);
00069 this->properties()->addProperty( null);
00070
00071
00072
00073
00074 this->addOperation("snapshot", &ReportingComponent::snapshot , this, RTT::ClientThread).doc("Take a new shapshot of all data and cause them to be written out.");
00075 this->addOperation("screenComponent", &ReportingComponent::screenComponent , this, RTT::ClientThread).doc("Display the variables and ports of a Component.").arg("Component", "Name of the Component");
00076 this->addOperation("reportComponent", &ReportingComponent::reportComponent , this, RTT::ClientThread).doc("Add a peer Component and report all its data ports").arg("Component", "Name of the Component");
00077 this->addOperation("unreportComponent", &ReportingComponent::unreportComponent , this, RTT::ClientThread).doc("Remove all Component's data ports from reporting.").arg("Component", "Name of the Component");
00078 this->addOperation("reportData", &ReportingComponent::reportData , this, RTT::ClientThread).doc("Add a Component's Property or attribute for reporting.").arg("Component", "Name of the Component").arg("Data", "Name of the Data to report. A property's or attribute's name.");
00079 this->addOperation("unreportData", &ReportingComponent::unreportData , this, RTT::ClientThread).doc("Remove a Data object from reporting.").arg("Component", "Name of the Component").arg("Data", "Name of the property or attribute.");
00080 this->addOperation("reportPort", &ReportingComponent::reportPort , this, RTT::ClientThread).doc("Add a Component's OutputPort for reporting.").arg("Component", "Name of the Component").arg("Port", "Name of the Port.");
00081 this->addOperation("unreportPort", &ReportingComponent::unreportPort , this, RTT::ClientThread).doc("Remove a Port from reporting.").arg("Component", "Name of the Component").arg("Port", "Name of the Port.");
00082
00083 }
00084
00085 ReportingComponent::~ReportingComponent() {}
00086
00087
00088 bool ReportingComponent::addMarshaller( marsh::MarshallInterface* headerM, marsh::MarshallInterface* bodyM)
00089 {
00090 boost::shared_ptr<marsh::MarshallInterface> header(headerM);
00091 boost::shared_ptr<marsh::MarshallInterface> body(bodyM);
00092 if ( !header && !body)
00093 return false;
00094 if ( !header )
00095 header.reset( new EmptyMarshaller() );
00096 if ( !body)
00097 body.reset( new EmptyMarshaller());
00098
00099 marshallers.push_back( std::make_pair( header, body ) );
00100 return true;
00101 }
00102
00103 bool ReportingComponent::removeMarshallers()
00104 {
00105 marshallers.clear();
00106 return true;
00107 }
00108
00109 void ReportingComponent::cleanupHook()
00110 {
00111 root.clear();
00112 deletePropertyBag( report );
00113 }
00114
00115 bool ReportingComponent::configureHook()
00116 {
00117 Logger::In in("ReportingComponent");
00118
00119
00120 PropertyBag bag = report_data.value();
00121
00122 if ( bag.empty() ) {
00123 log(Error) <<"No port or component configuration loaded."<<endlog();
00124 log(Error) <<"Please use marshalling.loadProperties(), reportComponent() (scripting) or LoadProperties (XML) in order to fill in ReportData." <<endlog();
00125 return false;
00126 }
00127
00128 bool ok = true;
00129 PropertyBag::const_iterator it = bag.getProperties().begin();
00130 while ( it != bag.getProperties().end() )
00131 {
00132 Property<std::string>* compName = dynamic_cast<Property<std::string>* >( *it );
00133 if ( !compName )
00134 log(Error) << "Expected Property \""
00135 << (*it)->getName() <<"\" to be of type string."<< endlog();
00136 else if ( compName->getName() == "Component" ) {
00137 std::string name = compName->value();
00138 this->unreportComponent( name );
00139 ok &= this->reportComponent( name );
00140 }
00141 else if ( compName->getName() == "Port" ) {
00142 string cname = compName->value().substr(0, compName->value().find("."));
00143 string pname = compName->value().substr( compName->value().find(".")+1, string::npos);
00144 if (cname.empty() || pname.empty() ) {
00145 log(Error) << "The Port value '"<<compName->getName()<< "' must at least consist of a component name followed by a dot and the port name." <<endlog();
00146 ok = false;
00147 continue;
00148 }
00149 this->unreportPort(cname,pname);
00150 ok &= this->reportPort(cname, pname);
00151 }
00152 else if ( compName->getName() == "Data" ) {
00153 string cname = compName->value().substr(0, compName->value().find("."));
00154 string pname = compName->value().substr( compName->value().find(".")+1, string::npos);
00155 if (cname.empty() || pname.empty() ) {
00156 log(Error) << "The Data value '"<<compName->getName()<< "' must at least consist of a component name followed by a dot and the property/attribute name." <<endlog();
00157 ok = false;
00158 continue;
00159 }
00160 this->unreportData(cname,pname);
00161 ok &= this->reportData(cname, pname);
00162 }
00163 else {
00164 log(Error) << "Expected \"Component\", \"Port\" or \"Data\", got "
00165 << compName->getName() << endlog();
00166 ok = false;
00167 }
00168 ++it;
00169 }
00170 return ok;
00171 }
00172
00173 bool ReportingComponent::screenComponent( const std::string& comp )
00174 {
00175 Logger::In in("ReportingComponent::screenComponent");
00176 log(Error) << "not implemented." <<comp<<endlog();
00177 return false;
00178 }
00179
00180 bool ReportingComponent::screenImpl( const std::string& comp, std::ostream& output)
00181 {
00182 Logger::In in("ReportingComponent");
00183 TaskContext* c = this->getPeer(comp);
00184 if ( c == 0) {
00185 log(Error) << "Unknown Component: " <<comp<<endlog();
00186 return false;
00187 }
00188 output << "Screening Component '"<< comp << "' : "<< endl << endl;
00189 PropertyBag* bag = c->properties();
00190 if (bag) {
00191 output << "Properties :" << endl;
00192 for (PropertyBag::iterator it= bag->begin(); it != bag->end(); ++it)
00193 output << " " << (*it)->getName() << " : " << (*it)->getDataSource() << endl;
00194 }
00195 ConfigurationInterface::AttributeNames atts = c->provides()->getAttributeNames();
00196 if ( !atts.empty() ) {
00197 output << "Attributes :" << endl;
00198 for (ConfigurationInterface::AttributeNames::iterator it= atts.begin(); it != atts.end(); ++it)
00199 output << " " << *it << " : " << c->provides()->getValue(*it)->getDataSource() << endl;
00200 }
00201
00202 vector<string> ports = c->ports()->getPortNames();
00203 if ( !ports.empty() ) {
00204 output << "Ports :" << endl;
00205 for (vector<string>::iterator it= ports.begin(); it != ports.end(); ++it) {
00206 output << " " << *it << " : ";
00207 if (c->ports()->getPort(*it)->connected() )
00208 output << "(connected)" << endl;
00209 else
00210 output << "(not connected)" << endl;
00211 }
00212 }
00213 return true;
00214 }
00215
00216 bool ReportingComponent::reportComponent( const std::string& component ) {
00217 Logger::In in("ReportingComponent");
00218
00219
00220 TaskContext* comp = this->getPeer(component);
00221 if ( !comp ) {
00222 log(Error) << "Could not report Component " << component <<" : no such peer."<<endlog();
00223 return false;
00224 }
00225 if ( !report_data.value().findValue<string>(component) )
00226 report_data.value().ownProperty( new Property<string>("Component","",component) );
00227 Ports ports = comp->ports()->getPorts();
00228 for (Ports::iterator it = ports.begin(); it != ports.end() ; ++it) {
00229 log(Debug) << "Checking port " << (*it)->getName()<<"."<<endlog();
00230 this->reportPort( component, (*it)->getName() );
00231 }
00232 return true;
00233 }
00234
00235
00236 bool ReportingComponent::unreportComponent( const std::string& component ) {
00237 TaskContext* comp = this->getPeer(component);
00238 if ( !comp ) {
00239 log(Error) << "Could not unreport Component " << component <<" : no such peer."<<endlog();
00240 return false;
00241 }
00242 Ports ports = comp->ports()->getPorts();
00243 for (Ports::iterator it = ports.begin(); it != ports.end() ; ++it) {
00244 this->unreportDataSource( component + "." + (*it)->getName() );
00245 unreportPort(component, (*it)->getName() );
00246 }
00247 base::PropertyBase* pb = report_data.value().findValue<string>(component);
00248 if (pb)
00249 report_data.value().removeProperty( pb );
00250 return true;
00251 }
00252
00253
00254 bool ReportingComponent::reportPort(const std::string& component, const std::string& port ) {
00255 Logger::In in("ReportingComponent");
00256 TaskContext* comp = this->getPeer(component);
00257 if ( this->ports()->getPort(component +"_"+port) ) {
00258 log(Warning) <<"Already reporting "<<component<<"."<<port<<": removing old port first."<<endlog();
00259 this->unreportPort(component,port);
00260 }
00261 if ( !comp ) {
00262 log(Error) << "Could not report Component " << component <<" : no such peer."<<endlog();
00263 return false;
00264 }
00265 std::vector<std::string> strs;
00266 boost::split(strs, port, boost::is_any_of("."));
00267 Service::shared_ptr service=comp->provides();
00268 while ( strs.size() != 1 && service) {
00269 service = service->getService( strs.front() );
00270 if (service)
00271 strs.erase( strs.begin() );
00272 }
00273 if (!service) {
00274 log(Error) <<"No such service: '"<< strs.front() <<"' while looking for port '"<< port<<"'"<<endlog();
00275 return 0;
00276 }
00277 base::PortInterface* porti = 0;
00278 porti = service->getPort(strs.front());
00279 if ( !porti ) {
00280 log(Error) << "Could not report Port " << port
00281 <<" : no such port on Component "<<component<<"."<<endlog();
00282 return false;
00283 }
00284
00285 base::InputPortInterface* ipi = dynamic_cast<base::InputPortInterface*>(porti);
00286 if (ipi) {
00287 log(Error) << "Can not report InputPort "<< porti->getName() <<" of Component " << component <<endlog();
00288 return false;
00289 }
00290
00291
00292
00293 base::PortInterface* ourport = porti->antiClone();
00294 assert(ourport);
00295 ourport->setName(component + "_" + porti->getName());
00296 ipi = dynamic_cast<base::InputPortInterface*> (ourport);
00297 assert(ipi);
00298
00299 ConnPolicy pol;
00300 if (snapshotOnly.get() ) {
00301 log(Info) << "Disabling buffering of data flow connections in SnapshotOnly mode." <<endlog();
00302 pol = ConnPolicy::data(ConnPolicy::LOCK_FREE,true,false);
00303 } else {
00304 log(Info) << "Buffering of data flow connections is set to 10 samples." <<endlog();
00305 pol = ConnPolicy::buffer(10,ConnPolicy::LOCK_FREE,true,false);
00306 }
00307
00308 if (porti->connectTo(ourport, ConnPolicy::buffer(10,ConnPolicy::LOCK_FREE,true,false) ) == false)
00309 {
00310 log(Error) << "Could not connect to OutputPort " << porti->getName() << endlog();
00311 delete ourport;
00312 return false;
00313 }
00314
00315 if (this->reportDataSource(component + "." + porti->getName(), "Port",
00316 ipi->getDataSource(), true) == false)
00317 {
00318 log(Error) << "Failed reporting port " << port << endlog();
00319 delete ourport;
00320 return false;
00321 }
00322 this->ports()->addEventPort( *ipi );
00323 log(Info) << "Monitoring OutputPort " << porti->getName() << " : ok." << endlog();
00324
00325 if ( !report_data.value().findValue<string>(component) && !report_data.value().findValue<string>( component+"."+port) )
00326 report_data.value().ownProperty(new Property<string>("Port","",component+"."+port));
00327 return true;
00328 }
00329
00330 bool ReportingComponent::unreportPort(const std::string& component, const std::string& port ) {
00331 base::PortInterface* ourport = this->ports()->getPort(component + "_" + port);
00332 if ( this->unreportDataSource( component + "." + port ) && report_data.value().removeProperty( report_data.value().findValue<string>(component+"."+port))) {
00333 this->ports()->removePort(ourport->getName());
00334 delete ourport;
00335 return true;
00336 }
00337 return false;
00338 }
00339
00340
00341 bool ReportingComponent::reportData(const std::string& component,const std::string& dataname)
00342 {
00343 Logger::In in("ReportingComponent");
00344 TaskContext* comp = this->getPeer(component);
00345 if ( !comp ) {
00346 log(Error) << "Could not report Component " << component <<" : no such peer."<<endlog();
00347 return false;
00348 }
00349
00350 if ( comp->provides()->getValue( dataname ) ) {
00351 if (this->reportDataSource( component + "." + dataname, "Data",
00352 comp->provides()->getValue( dataname )->getDataSource(), false ) == false) {
00353 log(Error) << "Failed reporting data " << dataname <<endlog();
00354 return false;
00355 }
00356 }
00357
00358
00359 if ( comp->properties() && comp->properties()->find( dataname ) ) {
00360 if (this->reportDataSource( component + "." + dataname, "Data",
00361 comp->properties()->find( dataname )->getDataSource(), false ) == false) {
00362 log(Error) << "Failed reporting data " << dataname <<endlog();
00363 return false;
00364 }
00365 }
00366
00367
00368 if ( !report_data.value().findValue<string>( component+"."+dataname) )
00369 report_data.value().ownProperty(new Property<string>("Data","",component+"."+dataname));
00370 return true;
00371 }
00372
00373 bool ReportingComponent::unreportData(const std::string& component,const std::string& datasource) {
00374 return this->unreportDataSource( component +"." + datasource) && report_data.value().removeProperty( report_data.value().findValue<string>(component+"."+datasource));
00375 }
00376
00377 bool ReportingComponent::reportDataSource(std::string tag, std::string type, base::DataSourceBase::shared_ptr orig, bool track)
00378 {
00379
00380 for (Reports::iterator it = root.begin();
00381 it != root.end(); ++it)
00382 if ( it->get<0>() == tag ) {
00383 return true;
00384 }
00385
00386
00387
00388 base::DataSourceBase::shared_ptr clone = orig->getTypeInfo()->buildValue();
00389 if ( !clone ) {
00390 log(Error) << "Could not report '"<< tag <<"' : unknown type." << endlog();
00391 return false;
00392 }
00393 try {
00394 boost::shared_ptr<base::ActionInterface> comm( clone->updateAction( orig.get() ) );
00395 assert( comm );
00396 root.push_back( boost::make_tuple( tag, orig, comm, clone, type, false, track ) );
00397 } catch ( internal::bad_assignment& ba ) {
00398 log(Error) << "Could not report '"<< tag <<"' : failed to create Command." << endlog();
00399 return false;
00400 }
00401 return true;
00402 }
00403
00404 bool ReportingComponent::unreportDataSource(std::string tag)
00405 {
00406 for (Reports::iterator it = root.begin();
00407 it != root.end(); ++it)
00408 if ( it->get<0>() == tag ) {
00409 root.erase(it);
00410 return true;
00411 }
00412 return false;
00413 }
00414
00415 bool ReportingComponent::startHook() {
00416 Logger::In in("ReportingComponent");
00417 if (marshallers.begin() == marshallers.end()) {
00418 log(Error) << "Need at least one marshaller to write reports." <<endlog();
00419 return false;
00420 }
00421
00422
00423 this->copydata();
00424
00425 for(Reports::iterator it = root.begin(); it != root.end(); ++it ) {
00426 it->get<5>() = true;
00427 }
00428 this->makeReport();
00429
00430
00431 if (writeHeader.get()) {
00432
00433 for(Marshallers::iterator it=marshallers.begin(); it != marshallers.end(); ++it) {
00434 it->first->serialize( report );
00435 it->first->flush();
00436 }
00437 }
00438
00439
00440 if (snapshotOnly.get() == false) {
00441 for(Marshallers::iterator it=marshallers.begin(); it != marshallers.end(); ++it) {
00442 it->second->serialize( report );
00443 it->second->flush();
00444 }
00445 }
00446
00447 this->cleanReport();
00448
00449 if(synchronize_with_logging.get())
00450 starttime = Logger::Instance()->getReferenceTime();
00451 else
00452 starttime = os::TimeService::Instance()->getTicks();
00453
00454 return true;
00455 }
00456
00457 void ReportingComponent::snapshot() {
00458
00459 copydata();
00460
00461 for(Reports::iterator it = root.begin(); it != root.end(); ++it ) {
00462 it->get<5>() = true;
00463 }
00464
00465 if( this->engine()->getActivity() )
00466 this->engine()->getActivity()->trigger();
00467 }
00468
00469 bool ReportingComponent::copydata() {
00470 timestamp = os::TimeService::Instance()->secondsSince( starttime );
00471
00472 bool result = false;
00473
00474 for(Reports::iterator it = root.begin(); it != root.end(); ++it ) {
00475 bool newdata = false;
00476 do {
00477
00478
00479 (it->get<2>())->readArguments();
00480 newdata = (it->get<2>())->execute();
00481 (it->get<5>()) |= newdata;
00482
00483 result |= newdata && (it->get<6>());
00484
00485 } while ( newdata && (it->get<6>()) && this->getActivity()->isPeriodic() );
00486 }
00487 return result;
00488 }
00489
00490 void ReportingComponent::makeReport()
00491 {
00492
00493
00494
00495
00496
00497
00498 report.add( timestamp.clone() );
00499 for(Reports::iterator it = root.begin(); it != root.end(); ++it ) {
00500 if ( it->get<5>() || null.rvalue() == "last" ) {
00501 base::DataSourceBase::shared_ptr clone = it->get<3>();
00502 Property<PropertyBag>* subbag = new Property<PropertyBag>( it->get<0>(), "");
00503 if ( decompose.get() && typeDecomposition( clone, subbag->value() ) )
00504 report.add( subbag );
00505 else {
00506 base::DataSourceBase::shared_ptr converted = clone->getTypeInfo()->decomposeType(clone);
00507 if ( converted && converted != clone ) {
00508
00509 report.add( converted->getTypeInfo()->buildProperty(it->get<0>(), "", converted) );
00510 } else
00511
00512 report.add( clone->getTypeInfo()->buildProperty(it->get<0>(), "", clone) );
00513 delete subbag;
00514 }
00515 it->get<5>() = false;
00516 } else {
00517
00518 report.add( null.clone() );
00519 }
00520 }
00521 timestamp = 0.0;
00522 }
00523
00524 void ReportingComponent::cleanReport()
00525 {
00526
00527 deletePropertyBag( report );
00528 }
00529
00530 void ReportingComponent::updateHook() {
00531
00532 if (snapshotOnly.get() && timestamp == 0.0)
00533 return;
00534
00535
00536 if ( !snapshotOnly.get() )
00537 this->copydata();
00538
00539 do {
00540
00541 this->makeReport();
00542
00543
00544
00545 for(Marshallers::iterator it=marshallers.begin(); it != marshallers.end(); ++it) {
00546 it->second->serialize( report );
00547 it->second->flush();
00548 }
00549
00550 this->cleanReport();
00551 } while( copydata() && !snapshotOnly.get() );
00552 }
00553
00554 void ReportingComponent::stopHook() {
00555
00556 for(Marshallers::iterator it=marshallers.begin(); it != marshallers.end(); ++it) {
00557 it->second->flush();
00558 }
00559 }
00560
00561 }