Go to the documentation of this file.00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035
00036
00037
00038
00039
00040
00041
00042
00043
00044
00045
00046
00047
00048
00049 #ifndef __RTT_ROSCOMM_ROS_MSG_TRANSPORTER_HPP_
00050 #define __RTT_ROSCOMM_ROS_MSG_TRANSPORTER_HPP_
00051
00052 #include <rtt/rtt-config.h>
00053 #include <rtt/types/TypeTransporter.hpp>
00054 #include <rtt/Port.hpp>
00055 #include <rtt/TaskContext.hpp>
00056 #include <rtt/internal/ConnFactory.hpp>
00057 #include <ros/ros.h>
00058
00059 #include <rtt_roscomm/rtt_rostopic_ros_publish_activity.hpp>
00060
00061 #ifndef RTT_VERSION_GTE
00062 #define RTT_VERSION_GTE(major,minor,patch) \
00063 ((RTT_VERSION_MAJOR > major) || (RTT_VERSION_MAJOR == major && \
00064 (RTT_VERSION_MINOR > minor) || (RTT_VERSION_MINOR == minor && \
00065 (RTT_VERSION_PATCH >= patch))))
00066 #endif
00067
00068 namespace rtt_roscomm {
00069
00074 template <typename T>
00075 struct RosMessageAdapter
00076 {
00077 typedef T OrocosType;
00078 typedef T RosType;
00079 static const RosType &toRos(const OrocosType &t) { return t; }
00080 static const OrocosType &fromRos(const RosType &t) { return t; }
00081 };
00082
00086 template<typename T>
00087 class RosPubChannelElement: public RTT::base::ChannelElement<T>, public RosPublisher
00088 {
00089 typedef RosMessageAdapter<T> adapter;
00090 typedef typename adapter::RosType RosType;
00091
00092 char hostname[1024];
00093 std::string topicname;
00094 ros::NodeHandle ros_node;
00095 ros::NodeHandle ros_node_private;
00096 ros::Publisher ros_pub;
00098 RosPublishActivity::shared_ptr act;
00099
00100 typename RTT::base::ChannelElement<T>::value_t sample;
00101
00102 public:
00103
00114 RosPubChannelElement(RTT::base::PortInterface* port, const RTT::ConnPolicy& policy) :
00115 ros_node(),
00116 ros_node_private("~")
00117 {
00118 if ( policy.name_id.empty() ){
00119 std::stringstream namestr;
00120 gethostname(hostname, sizeof(hostname));
00121
00122 if (port->getInterface() && port->getInterface()->getOwner()) {
00123 namestr << hostname<<'/' << port->getInterface()->getOwner()->getName()
00124 << '/' << port->getName() << '/'<<this << '/' << getpid();
00125 } else {
00126 namestr << hostname<<'/' << port->getName() << '/'<<this << '/' << getpid();
00127 }
00128 policy.name_id = namestr.str();
00129 }
00130 topicname=policy.name_id;
00131 RTT::Logger::In in(topicname);
00132
00133 if (port->getInterface() && port->getInterface()->getOwner()) {
00134 RTT::log(RTT::Debug)<<"Creating ROS publisher for port "<<port->getInterface()->getOwner()->getName()<<"."<<port->getName()<<" on topic "<<policy.name_id<<RTT::endlog();
00135 } else {
00136 RTT::log(RTT::Debug)<<"Creating ROS publisher for port "<<port->getName()<<" on topic "<<policy.name_id<<RTT::endlog();
00137 }
00138
00139
00140 if(topicname.length() > 1 && topicname.at(0) == '~') {
00141 ros_pub = ros_node_private.advertise<RosType>(policy.name_id.substr(1), policy.size > 0 ? policy.size : 1, policy.init);
00142 } else {
00143 ros_pub = ros_node.advertise<RosType>(policy.name_id, policy.size > 0 ? policy.size : 1, policy.init);
00144 }
00145 act = RosPublishActivity::Instance();
00146 act->addPublisher( this );
00147 }
00148
00149 ~RosPubChannelElement() {
00150 RTT::Logger::In in(topicname);
00151
00152 act->removePublisher( this );
00153 }
00154
00160 virtual bool inputReady() {
00161 return true;
00162 }
00163
00171 #if RTT_VERSION_GTE(2,8,99)
00172 virtual RTT::WriteStatus data_sample(typename RTT::base::ChannelElement<T>::param_t sample)
00173 {
00174 this->sample = sample;
00175 return RTT::WriteSuccess;
00176 }
00177 #else
00178 virtual bool data_sample(typename RTT::base::ChannelElement<T>::param_t sample)
00179 {
00180 this->sample = sample;
00181 return true;
00182 }
00183 #endif
00184
00190 bool signal(){
00191
00192
00193 return act->trigger();
00194 }
00195
00196 void publish(){
00197
00198 typename RTT::base::ChannelElement<T>::shared_ptr input = this->getInput();
00199 while( input && (input->read(sample,false) == RTT::NewData) )
00200 write(sample);
00201 }
00202
00203 #if RTT_VERSION_GTE(2,8,99)
00204 RTT::WriteStatus write(typename RTT::base::ChannelElement<T>::param_t sample)
00205 #else
00206 bool write(typename RTT::base::ChannelElement<T>::param_t sample)
00207 #endif
00208 {
00209 ros_pub.publish(adapter::toRos(sample));
00210 #if RTT_VERSION_GTE(2,8,99)
00211 return RTT::WriteSuccess;
00212 #else
00213 return true;
00214 #endif
00215 }
00216
00217 };
00218
00222 template<typename T>
00223 class RosSubChannelElement: public RTT::base::ChannelElement<T>
00224 {
00225 typedef RosMessageAdapter<T> adapter;
00226 typedef typename adapter::RosType RosType;
00227
00228 std::string topicname;
00229 ros::NodeHandle ros_node;
00230 ros::NodeHandle ros_node_private;
00231 ros::Subscriber ros_sub;
00232
00233 public:
00243 RosSubChannelElement(RTT::base::PortInterface* port, const RTT::ConnPolicy& policy) :
00244 ros_node(),
00245 ros_node_private("~")
00246 {
00247 topicname=policy.name_id;
00248 RTT::Logger::In in(topicname);
00249 if (port->getInterface() && port->getInterface()->getOwner()) {
00250 RTT::log(RTT::Debug)<<"Creating ROS subscriber for port "<<port->getInterface()->getOwner()->getName()<<"."<<port->getName()<<" on topic "<<policy.name_id<<RTT::endlog();
00251 } else {
00252 RTT::log(RTT::Debug)<<"Creating ROS subscriber for port "<<port->getName()<<" on topic "<<policy.name_id<<RTT::endlog();
00253 }
00254 if(topicname.length() > 1 && topicname.at(0) == '~') {
00255 ros_sub = ros_node_private.subscribe(policy.name_id.substr(1), policy.size > 0 ? policy.size : 1, &RosSubChannelElement::newData, this);
00256 } else {
00257 ros_sub = ros_node.subscribe(policy.name_id, policy.size > 0 ? policy.size : 1, &RosSubChannelElement::newData, this);
00258 }
00259 }
00260
00261 ~RosSubChannelElement() {
00262 RTT::Logger::In in(topicname);
00263
00264 }
00265
00266 virtual bool inputReady() {
00267 return true;
00268 }
00269
00275 void newData(const RosType& msg){
00276 typename RTT::base::ChannelElement<T>::shared_ptr output = this->getOutput();
00277 if (output)
00278 output->write(adapter::fromRos(msg));
00279 }
00280 };
00281
00282 template <class T>
00283 class RosMsgTransporter : public RTT::types::TypeTransporter
00284 {
00285 virtual RTT::base::ChannelElementBase::shared_ptr createStream (RTT::base::PortInterface *port, const RTT::ConnPolicy &policy, bool is_sender) const{
00286 RTT::base::ChannelElementBase::shared_ptr channel;
00287
00288
00289 if (policy.pull) {
00290 RTT::log(RTT::Error) << "Pull connections are not supported by the ROS message transport." << RTT::endlog();
00291 return RTT::base::ChannelElementBase::shared_ptr();
00292 }
00293
00294
00295 if (!ros::ok()) {
00296 RTT::log(RTT::Error) << "Cannot create ROS message transport because the node is not initialized or already shutting down. Did you import package rtt_rosnode before?" << RTT::endlog();
00297 return RTT::base::ChannelElementBase::shared_ptr();
00298 }
00299
00300 if (is_sender){
00301 channel = new RosPubChannelElement<T>(port, policy);
00302
00303 if (policy.type == RTT::ConnPolicy::UNBUFFERED){
00304 RTT::log(RTT::Debug) << "Creating unbuffered publisher connection for port " << port->getName() << ". This may not be real-time safe!" << RTT::endlog();
00305 return channel;
00306 }
00307
00308 RTT::base::ChannelElementBase::shared_ptr buf = RTT::internal::ConnFactory::buildDataStorage<T>(policy);
00309 if (!buf) return RTT::base::ChannelElementBase::shared_ptr();
00310 #if RTT_VERSION_GTE(2,8,99)
00311 buf->connectTo(channel);
00312 #else
00313 buf->setOutput(channel);
00314 #endif
00315 return buf;
00316
00317 } else {
00318 channel = new RosSubChannelElement<T>(port, policy);
00319
00320 #if !RTT_VERSION_GTE(2,8,99)
00321 RTT::base::ChannelElementBase::shared_ptr buf = RTT::internal::ConnFactory::buildDataStorage<T>(policy);
00322 if (!buf) return RTT::base::ChannelElementBase::shared_ptr();
00323 channel->setOutput(buf);
00324 #endif
00325 }
00326
00327 return channel;
00328 }
00329 };
00330 }
00331 #endif