Go to the documentation of this file.00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035
00036
00037
00038
00039
00040
00041
00042
00043
00044
00045
00046
00047
00048
00049 #ifndef __RTT_ROSCOMM_ROS_MSG_TRANSPORTER_HPP_
00050 #define __RTT_ROSCOMM_ROS_MSG_TRANSPORTER_HPP_
00051
00052 #include <rtt/types/TypeTransporter.hpp>
00053 #include <rtt/Port.hpp>
00054 #include <rtt/TaskContext.hpp>
00055 #include <rtt/internal/ConnFactory.hpp>
00056 #include <ros/ros.h>
00057
00058 #include <rtt_roscomm/rtt_rostopic_ros_publish_activity.hpp>
00059
00060 namespace rtt_roscomm {
00061
00062 using namespace RTT;
00066 template<typename T>
00067 class RosPubChannelElement: public base::ChannelElement<T>,public RosPublisher
00068 {
00069 char hostname[1024];
00070 std::string topicname;
00071 ros::NodeHandle ros_node;
00072 ros::NodeHandle ros_node_private;
00073 ros::Publisher ros_pub;
00075 RosPublishActivity::shared_ptr act;
00076
00077 typename base::ChannelElement<T>::value_t sample;
00078
00079 public:
00080
00091 RosPubChannelElement(base::PortInterface* port,const ConnPolicy& policy):
00092 ros_node(),
00093 ros_node_private("~")
00094 {
00095 if ( policy.name_id.empty() ){
00096 std::stringstream namestr;
00097 gethostname(hostname, sizeof(hostname));
00098
00099 if (port->getInterface() && port->getInterface()->getOwner()) {
00100 namestr << hostname<<'/' << port->getInterface()->getOwner()->getName()
00101 << '/' << port->getName() << '/'<<this << '/' << getpid();
00102 } else {
00103 namestr << hostname<<'/' << port->getName() << '/'<<this << '/' << getpid();
00104 }
00105 policy.name_id = namestr.str();
00106 }
00107 topicname=policy.name_id;
00108 Logger::In in(topicname);
00109
00110 if (port->getInterface() && port->getInterface()->getOwner()) {
00111 log(Debug)<<"Creating ROS publisher for port "<<port->getInterface()->getOwner()->getName()<<"."<<port->getName()<<" on topic "<<policy.name_id<<endlog();
00112 } else {
00113 log(Debug)<<"Creating ROS publisher for port "<<port->getName()<<" on topic "<<policy.name_id<<endlog();
00114 }
00115
00116
00117 if(topicname.length() > 1 && topicname.at(0) == '~') {
00118 ros_pub = ros_node_private.advertise<T>(policy.name_id.substr(1), policy.size > 0 ? policy.size : 1, policy.init);
00119 } else {
00120 ros_pub = ros_node.advertise<T>(policy.name_id, policy.size > 0 ? policy.size : 1, policy.init);
00121 }
00122 act = RosPublishActivity::Instance();
00123 act->addPublisher( this );
00124 }
00125
00126 ~RosPubChannelElement() {
00127 Logger::In in(topicname);
00128 log(Debug)<<"Destroying RosPubChannelElement"<<endlog();
00129 act->removePublisher( this );
00130 }
00131
00137 virtual bool inputReady() {
00138 return true;
00139 }
00140
00148 virtual bool data_sample(typename base::ChannelElement<T>::param_t sample)
00149 {
00150 this->sample = sample;
00151 return true;
00152 }
00153
00159 bool signal(){
00160
00161
00162 return act->trigger();
00163 }
00164
00165 void publish(){
00166
00167 typename base::ChannelElement<T>::shared_ptr input = this->getInput();
00168 while( input && (input->read(sample,false) == NewData) )
00169 write(sample);
00170 }
00171
00172 bool write(typename base::ChannelElement<T>::param_t sample)
00173 {
00174 ros_pub.publish(sample);
00175 return true;
00176 }
00177
00178 };
00179
00183 template<typename T>
00184 class RosSubChannelElement: public base::ChannelElement<T>
00185 {
00186 std::string topicname;
00187 ros::NodeHandle ros_node;
00188 ros::NodeHandle ros_node_private;
00189 ros::Subscriber ros_sub;
00190
00191 public:
00201 RosSubChannelElement(base::PortInterface* port, const ConnPolicy& policy) :
00202 ros_node(),
00203 ros_node_private("~")
00204 {
00205 topicname=policy.name_id;
00206 Logger::In in(topicname);
00207 if (port->getInterface() && port->getInterface()->getOwner()) {
00208 log(Debug)<<"Creating ROS subscriber for port "<<port->getInterface()->getOwner()->getName()<<"."<<port->getName()<<" on topic "<<policy.name_id<<endlog();
00209 } else {
00210 log(Debug)<<"Creating ROS subscriber for port "<<port->getName()<<" on topic "<<policy.name_id<<endlog();
00211 }
00212 if(topicname.length() > 1 && topicname.at(0) == '~') {
00213 ros_sub = ros_node_private.subscribe(policy.name_id.substr(1),policy.size,&RosSubChannelElement::newData,this);
00214 } else {
00215 ros_sub = ros_node.subscribe(policy.name_id,policy.size,&RosSubChannelElement::newData,this);
00216 }
00217 this->ref();
00218 }
00219
00220 ~RosSubChannelElement() {
00221 Logger::In in(topicname);
00222 log(Debug)<<"Destroying RosSubChannelElement"<<endlog();
00223 }
00224
00225 virtual bool inputReady() {
00226 return true;
00227 }
00233 void newData(const T& msg){
00234 typename base::ChannelElement<T>::shared_ptr output = this->getOutput();
00235 if (output)
00236 output->write(msg);
00237 }
00238 };
00239
00240 template <class T>
00241 class RosMsgTransporter : public RTT::types::TypeTransporter
00242 {
00243 virtual base::ChannelElementBase::shared_ptr createStream (base::PortInterface *port, const ConnPolicy &policy, bool is_sender) const{
00244 base::ChannelElementBase::shared_ptr buf = internal::ConnFactory::buildDataStorage<T>(policy);
00245 base::ChannelElementBase::shared_ptr tmp;
00246 if(is_sender){
00247 tmp = base::ChannelElementBase::shared_ptr(new RosPubChannelElement<T>(port,policy));
00248 if (policy.type == RTT::ConnPolicy::UNBUFFERED){
00249 log(Debug) << "Creating unbuffered publisher connection for port " << port->getName() << ". This may not be real-time safe!" << endlog();
00250 return tmp;
00251 }
00252 if (!buf) return base::ChannelElementBase::shared_ptr();
00253 buf->setOutput(tmp);
00254 return buf;
00255 }
00256 else {
00257 if (!buf) return base::ChannelElementBase::shared_ptr();
00258 tmp = new RosSubChannelElement<T>(port,policy);
00259 tmp->setOutput(buf);
00260 return tmp;
00261 }
00262 }
00263 };
00264 }
00265 #endif