00001 /*************************************************************************** 00002 00003 datasender.cpp - description 00004 ------------------- 00005 begin : Wed August 9 2006 00006 copyright : (C) 2006 Bas Kemper 00007 (C) 2007 Ruben Smits //Changed subscription structure 00008 email : kst@baskemper.be 00009 first dot last at mech dot kuleuven dot be 00010 *************************************************************************** 00011 * This library is free software; you can redistribute it and/or * 00012 * modify it under the terms of the GNU Lesser General Public * 00013 * License as published by the Free Software Foundation; either * 00014 * version 2.1 of the License, or (at your option) any later version. * 00015 * * 00016 * This library is distributed in the hope that it will be useful, * 00017 * but WITHOUT ANY WARRANTY; without even the implied warranty of * 00018 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * 00019 * Lesser General Public License for more details. * 00020 * * 00021 * You should have received a copy of the GNU Lesser General Public * 00022 * License along with this library; if not, write to the Free Software * 00023 * Foundation, Inc., 59 Temple Place, * 00024 * Suite 330, Boston, MA 02111-1307 USA * 00025 * * 00026 ***************************************************************************/ 00027 00028 #include <vector> 00029 #include <rtt/Logger.hpp> 00030 #include <rtt/os/Mutex.hpp> 00031 #include <rtt/Property.hpp> 00032 #include <rtt/base/PropertyIntrospection.hpp> 00033 #include "socket.hpp" 00034 #include "socketmarshaller.hpp" 00035 #include "datasender.hpp" 00036 #include "command.hpp" 00037 #include "TcpReporting.hpp" 00038 #include <rtt/types/TemplateTypeInfo.hpp> 00039 00040 namespace OCL 00041 { 00042 namespace TCP 00043 { 00044 Datasender::Datasender(RTT::SocketMarshaller* _marshaller, Orocos::TCP::Socket* _os): 00045 Activity(10), os( _os ), marshaller(_marshaller) 00046 { 00047 limit = 0; 00048 curframe = 0; 00049 reporter = marshaller->getReporter(); 00050 silenced = true; 00051 interpreter = new TcpReportingInterpreter(this); 00052 } 00053 00054 Datasender::~Datasender() 00055 { 00056 subscriptions.clear(); 00057 delete interpreter; 00058 delete os; 00059 } 00060 00061 void Datasender::loop() 00062 { 00063 *os << "100 Orocos 1.0 TcpReporting Server 1.0" << std::endl; 00064 while( os->isValid() ) 00065 { 00066 interpreter->process(); 00067 } 00068 Logger::log() << Logger::Info << "Connection closed!" << Logger::endl; 00069 } 00070 00071 bool Datasender::breakloop() 00072 { 00073 os->close(); 00074 return true; 00075 } 00076 00077 RTT::SocketMarshaller* Datasender::getMarshaller() const 00078 { 00079 return marshaller; 00080 } 00081 00082 Socket& Datasender::getSocket() const 00083 { 00084 return *os; 00085 } 00086 00087 bool Datasender::isValid() const 00088 { 00089 return os && os->isValid(); 00090 } 00091 00092 bool Datasender::addSubscription(const std::string name ) 00093 { 00094 lock.lock(); 00095 log(Debug)<<"Datasender::addSubscription: "<<name<<endlog(); 00096 //Check if a property is available with that name? 00097 if(reporter->getReport()->find(name)!=NULL){ 00098 //check if subscription already exists 00099 std::vector<std::string>::const_iterator pos = 00100 find(subscriptions.begin(),subscriptions.end(),name); 00101 if(pos!=subscriptions.end()){ 00102 Logger::In("DataSender"); 00103 log(Info)<<"Already subscribed to "<<name<<endlog(); 00104 lock.unlock(); 00105 return false; 00106 }else{ 00107 Logger::In("DataSender"); 00108 log(Info)<<"Adding subscription for "<<name<<endlog(); 00109 subscriptions.push_back(name); 00110 lock.unlock(); 00111 return true; 00112 } 00113 }else{ 00114 Logger::In("DataSender"); 00115 log(Error)<<name<<" is not available for reporting"<<endlog(); 00116 lock.unlock(); 00117 return false; 00118 } 00119 } 00120 00121 void Datasender::remove() 00122 { 00123 getMarshaller()->removeConnection( this ); 00124 } 00125 00126 bool Datasender::removeSubscription( const std::string& name ) 00127 { 00128 lock.lock(); 00129 //check if subscription exists 00130 std::vector<std::string>::iterator pos = 00131 find(subscriptions.begin(),subscriptions.end(),name); 00132 if(pos!=subscriptions.end()){ 00133 Logger::In("DataSender"); 00134 log(Info)<<"Removing subscription for "<<name<<endlog(); 00135 subscriptions.erase(pos); 00136 lock.unlock(); 00137 return true; 00138 }else{ 00139 Logger::In("DataSenser"); 00140 log(Error)<<"No subscription found for "<<name<<endlog(); 00141 lock.unlock(); 00142 return false; 00143 } 00144 } 00145 00146 void Datasender::listSubscriptions() 00147 { 00148 for(std::vector<std::string>::const_iterator elem=subscriptions.begin(); 00149 elem!=subscriptions.end();elem++) 00150 *os<<"305 "<< *elem<<std::endl; 00151 *os << "306 End of list" << std::endl; 00152 } 00153 00154 void Datasender::writeOut(base::PropertyBase* v) 00155 { 00156 *os<<"202 "<<v->getName()<<"\n"; 00157 Property<PropertyBag>* bag = dynamic_cast< Property<PropertyBag>* >( v ); 00158 if ( bag ) 00159 this->writeOut( bag->value() ); 00160 else { 00161 *os<<"205 " <<v->getDataSource()<<"\n"; 00162 } 00163 00164 } 00165 00166 void Datasender::writeOut(const PropertyBag &v) 00167 { 00168 for ( 00169 PropertyBag::const_iterator i = v.getProperties().begin(); 00170 i != v.getProperties().end(); 00171 i++ ) 00172 { 00173 this->writeOut( *i ); 00174 } 00175 00176 } 00177 00178 00179 void Datasender::checkbag(const PropertyBag &v) 00180 { 00181 log(Debug)<<"Let's check the subscriptions"<<endlog(); 00182 for(std::vector<std::string>::iterator elem = subscriptions.begin(); 00183 elem!=subscriptions.end();elem++){ 00184 base::PropertyBase* prop = reporter->getReport()->find(*elem); 00185 if(prop!=NULL){ 00186 writeOut(prop); 00187 }else{ 00188 Logger::In("DataSender"); 00189 log(Error)<<*elem<<" not longer available for reporting,"<< 00190 ", removing the subscription."<<endlog(); 00191 subscriptions.erase(elem); 00192 elem--; 00193 } 00194 } 00195 } 00196 00197 void Datasender::silence(bool newstate) 00198 { 00199 silenced = newstate; 00200 } 00201 00202 void Datasender::setLimit(unsigned long long newlimit) 00203 { 00204 limit = newlimit; 00205 } 00206 00207 void Datasender::serialize(const PropertyBag &v) 00208 { 00209 if( silenced ) { 00210 return; 00211 } 00212 00213 lock.lock(); 00214 if( !subscriptions.empty() && ( limit == 0 || curframe <= limit ) ){ 00215 *os << "201 " <<curframe << " -- begin of frame\n"; 00216 checkbag(v); 00217 *os << "203 " << curframe << " -- end of frame" << std::endl; 00218 curframe++; 00219 if( curframe > limit && limit != 0 ) 00220 { 00221 *os << "204 Limit reached" << std::endl; 00222 } 00223 } 00224 lock.unlock(); 00225 } 00226 00227 } 00228 }