00001 00002 #include <ros/ros.h> 00003 #include <pluginlib/class_list_macros.h> 00004 #include <nodelet/nodelet.h> 00005 00006 #include <pcl/point_cloud.h> 00007 #include <pcl/point_types.h> 00008 #include <pcl_conversions/pcl_conversions.h> 00009 00010 #include <pcl_ros/transforms.h> 00011 00012 #include <tf/transform_listener.h> 00013 00014 #include <sensor_msgs/PointCloud2.h> 00015 00016 #include <image_transport/image_transport.h> 00017 #include <image_transport/subscriber_filter.h> 00018 00019 #include <message_filters/sync_policies/approximate_time.h> 00020 #include <message_filters/subscriber.h> 00021 #include <message_filters/sync_policies/exact_time.h> 00022 00023 #include <rtabmap_ros/MsgConversion.h> 00024 #include <rtabmap/utilite/UConversion.h> 00025 00026 namespace rtabmap_ros 00027 { 00028 00029 class PointCloudAggregator : public nodelet::Nodelet 00030 { 00031 public: 00032 PointCloudAggregator() : 00033 warningThread_(0), 00034 callbackCalled_(false), 00035 exactSync4_(0), 00036 approxSync4_(0), 00037 exactSync3_(0), 00038 approxSync3_(0), 00039 exactSync2_(0), 00040 approxSync2_(0) 00041 {} 00042 00043 virtual ~PointCloudAggregator() 00044 { 00045 if (exactSync4_!=0) delete exactSync4_; 00046 if (approxSync4_!=0) delete approxSync4_; 00047 if (exactSync3_!=0) delete exactSync3_; 00048 if (approxSync3_!=0) delete approxSync3_; 00049 if (exactSync2_!=0) delete exactSync2_; 00050 if (approxSync2_!=0) delete approxSync2_; 00051 00052 if(warningThread_) 00053 { 00054 callbackCalled_=true; 00055 warningThread_->join(); 00056 delete warningThread_; 00057 } 00058 } 00059 00060 private: 00061 virtual void onInit() 00062 { 00063 ros::NodeHandle & nh = getNodeHandle(); 00064 ros::NodeHandle & pnh = getPrivateNodeHandle(); 00065 00066 int queueSize = 5; 00067 int count = 2; 00068 bool approx=true; 00069 pnh.param("queue_size", queueSize, queueSize); 00070 pnh.param("frame_id", frameId_, frameId_); 00071 pnh.param("approx_sync", approx, approx); 00072 pnh.param("count", count, count); 00073 00074 cloudSub_1_.subscribe(nh, "cloud1", 1); 00075 cloudSub_2_.subscribe(nh, "cloud2", 1); 00076 00077 std::string subscribedTopicsMsg; 00078 if(count == 4) 00079 { 00080 cloudSub_3_.subscribe(nh, "cloud3", 1); 00081 cloudSub_4_.subscribe(nh, "cloud4", 1); 00082 if(approx) 00083 { 00084 approxSync4_ = new message_filters::Synchronizer<ApproxSync4Policy>(ApproxSync4Policy(queueSize), cloudSub_1_, cloudSub_2_, cloudSub_3_, cloudSub_4_); 00085 approxSync4_->registerCallback(boost::bind(&rtabmap_ros::PointCloudAggregator::clouds4_callback, this, _1, _2, _3, _4)); 00086 } 00087 else 00088 { 00089 exactSync4_ = new message_filters::Synchronizer<ExactSync4Policy>(ExactSync4Policy(queueSize), cloudSub_1_, cloudSub_2_, cloudSub_3_, cloudSub_4_); 00090 exactSync4_->registerCallback(boost::bind(&rtabmap_ros::PointCloudAggregator::clouds4_callback, this, _1, _2, _3, _4)); 00091 } 00092 subscribedTopicsMsg = uFormat("\n%s subscribed to (%s sync):\n %s,\n %s,\n %s,\n %s", 00093 getName().c_str(), 00094 approx?"approx":"exact", 00095 cloudSub_1_.getTopic().c_str(), 00096 cloudSub_2_.getTopic().c_str(), 00097 cloudSub_3_.getTopic().c_str(), 00098 cloudSub_4_.getTopic().c_str()); 00099 } 00100 else if(count == 3) 00101 { 00102 cloudSub_3_.subscribe(nh, "cloud3", 1); 00103 if(approx) 00104 { 00105 approxSync3_ = new message_filters::Synchronizer<ApproxSync3Policy>(ApproxSync3Policy(queueSize), cloudSub_1_, cloudSub_2_, cloudSub_3_); 00106 approxSync3_->registerCallback(boost::bind(&rtabmap_ros::PointCloudAggregator::clouds3_callback, this, _1, _2, _3)); 00107 } 00108 else 00109 { 00110 exactSync3_ = new message_filters::Synchronizer<ExactSync3Policy>(ExactSync3Policy(queueSize), cloudSub_1_, cloudSub_2_, cloudSub_3_); 00111 exactSync3_->registerCallback(boost::bind(&rtabmap_ros::PointCloudAggregator::clouds3_callback, this, _1, _2, _3)); 00112 } 00113 subscribedTopicsMsg = uFormat("\n%s subscribed to (%s sync):\n %s,\n %s,\n %s", 00114 getName().c_str(), 00115 approx?"approx":"exact", 00116 cloudSub_1_.getTopic().c_str(), 00117 cloudSub_2_.getTopic().c_str(), 00118 cloudSub_3_.getTopic().c_str()); 00119 } 00120 else 00121 { 00122 if(approx) 00123 { 00124 approxSync2_ = new message_filters::Synchronizer<ApproxSync2Policy>(ApproxSync2Policy(queueSize), cloudSub_1_, cloudSub_2_); 00125 approxSync2_->registerCallback(boost::bind(&rtabmap_ros::PointCloudAggregator::clouds2_callback, this, _1, _2)); 00126 } 00127 else 00128 { 00129 exactSync2_ = new message_filters::Synchronizer<ExactSync2Policy>(ExactSync2Policy(queueSize), cloudSub_1_, cloudSub_2_); 00130 exactSync2_->registerCallback(boost::bind(&rtabmap_ros::PointCloudAggregator::clouds2_callback, this, _1, _2)); 00131 } 00132 subscribedTopicsMsg = uFormat("\n%s subscribed to (%s sync):\n %s,\n %s", 00133 getName().c_str(), 00134 approx?"approx":"exact", 00135 cloudSub_1_.getTopic().c_str(), 00136 cloudSub_2_.getTopic().c_str()); 00137 } 00138 00139 cloudPub_ = nh.advertise<sensor_msgs::PointCloud2>("combined_cloud", 1); 00140 00141 warningThread_ = new boost::thread(boost::bind(&PointCloudAggregator::warningLoop, this, subscribedTopicsMsg, approx)); 00142 NODELET_INFO("%s", subscribedTopicsMsg.c_str()); 00143 } 00144 00145 void clouds4_callback(const sensor_msgs::PointCloud2ConstPtr & cloudMsg_1, 00146 const sensor_msgs::PointCloud2ConstPtr & cloudMsg_2, 00147 const sensor_msgs::PointCloud2ConstPtr & cloudMsg_3, 00148 const sensor_msgs::PointCloud2ConstPtr & cloudMsg_4) 00149 { 00150 std::vector<sensor_msgs::PointCloud2ConstPtr> clouds; 00151 clouds.push_back(cloudMsg_1); 00152 clouds.push_back(cloudMsg_2); 00153 clouds.push_back(cloudMsg_3); 00154 clouds.push_back(cloudMsg_4); 00155 00156 combineClouds(clouds); 00157 } 00158 void clouds3_callback(const sensor_msgs::PointCloud2ConstPtr & cloudMsg_1, 00159 const sensor_msgs::PointCloud2ConstPtr & cloudMsg_2, 00160 const sensor_msgs::PointCloud2ConstPtr & cloudMsg_3) 00161 { 00162 std::vector<sensor_msgs::PointCloud2ConstPtr> clouds; 00163 clouds.push_back(cloudMsg_1); 00164 clouds.push_back(cloudMsg_2); 00165 clouds.push_back(cloudMsg_3); 00166 00167 combineClouds(clouds); 00168 } 00169 void clouds2_callback(const sensor_msgs::PointCloud2ConstPtr & cloudMsg_1, 00170 const sensor_msgs::PointCloud2ConstPtr & cloudMsg_2) 00171 { 00172 std::vector<sensor_msgs::PointCloud2ConstPtr> clouds; 00173 clouds.push_back(cloudMsg_1); 00174 clouds.push_back(cloudMsg_2); 00175 00176 combineClouds(clouds); 00177 } 00178 void combineClouds(const std::vector<sensor_msgs::PointCloud2ConstPtr> & cloudMsgs) 00179 { 00180 callbackCalled_ = true; 00181 ROS_ASSERT(cloudMsgs.size() > 1); 00182 if(cloudPub_.getNumSubscribers()) 00183 { 00184 pcl::PCLPointCloud2 output; 00185 00186 std::string frameId = frameId_; 00187 if(!frameId.empty() && frameId.compare(cloudMsgs[0]->header.frame_id) != 0) 00188 { 00189 sensor_msgs::PointCloud2 tmp; 00190 pcl_ros::transformPointCloud(frameId, *cloudMsgs[0], tmp, tfListener_); 00191 pcl_conversions::toPCL(tmp, output); 00192 } 00193 else 00194 { 00195 pcl_conversions::toPCL(*cloudMsgs[0], output); 00196 frameId = cloudMsgs[0]->header.frame_id; 00197 } 00198 00199 for(unsigned int i=1; i<cloudMsgs.size(); ++i) 00200 { 00201 pcl::PCLPointCloud2 cloud2; 00202 if(frameId.compare(cloudMsgs[i]->header.frame_id) != 0) 00203 { 00204 sensor_msgs::PointCloud2 tmp; 00205 pcl_ros::transformPointCloud(frameId, *cloudMsgs[i], tmp, tfListener_); 00206 pcl_conversions::toPCL(tmp, cloud2); 00207 } 00208 else 00209 { 00210 pcl_conversions::toPCL(*cloudMsgs[i], cloud2); 00211 frameId = cloudMsgs[i]->header.frame_id; 00212 } 00213 00214 pcl::PCLPointCloud2 tmp_output; 00215 pcl::concatenatePointCloud(output, cloud2, tmp_output); 00216 output = tmp_output; 00217 } 00218 00219 sensor_msgs::PointCloud2 rosCloud; 00220 pcl_conversions::moveFromPCL(output, rosCloud); 00221 rosCloud.header.stamp = cloudMsgs[0]->header.stamp; 00222 rosCloud.header.frame_id = frameId; 00223 cloudPub_.publish(rosCloud); 00224 } 00225 } 00226 00227 void warningLoop(const std::string & subscribedTopicsMsg, bool approxSync) 00228 { 00229 ros::Duration r(5.0); 00230 while(!callbackCalled_) 00231 { 00232 r.sleep(); 00233 if(!callbackCalled_) 00234 { 00235 ROS_WARN("%s: Did not receive data since 5 seconds! Make sure the input topics are " 00236 "published (\"$ rostopic hz my_topic\") and the timestamps in their " 00237 "header are set. %s%s", 00238 getName().c_str(), 00239 approxSync?"":"Parameter \"approx_sync\" is false, which means that input " 00240 "topics should have all the exact timestamp for the callback to be called.", 00241 subscribedTopicsMsg.c_str()); 00242 } 00243 } 00244 } 00245 00246 boost::thread * warningThread_; 00247 bool callbackCalled_; 00248 00249 typedef message_filters::sync_policies::ExactTime<sensor_msgs::PointCloud2, sensor_msgs::PointCloud2, sensor_msgs::PointCloud2, sensor_msgs::PointCloud2> ExactSync4Policy; 00250 typedef message_filters::sync_policies::ApproximateTime<sensor_msgs::PointCloud2, sensor_msgs::PointCloud2, sensor_msgs::PointCloud2, sensor_msgs::PointCloud2> ApproxSync4Policy; 00251 typedef message_filters::sync_policies::ExactTime<sensor_msgs::PointCloud2, sensor_msgs::PointCloud2, sensor_msgs::PointCloud2> ExactSync3Policy; 00252 typedef message_filters::sync_policies::ApproximateTime<sensor_msgs::PointCloud2, sensor_msgs::PointCloud2, sensor_msgs::PointCloud2> ApproxSync3Policy; 00253 typedef message_filters::sync_policies::ExactTime<sensor_msgs::PointCloud2, sensor_msgs::PointCloud2> ExactSync2Policy; 00254 typedef message_filters::sync_policies::ApproximateTime<sensor_msgs::PointCloud2, sensor_msgs::PointCloud2> ApproxSync2Policy; 00255 message_filters::Synchronizer<ExactSync4Policy>* exactSync4_; 00256 message_filters::Synchronizer<ApproxSync4Policy>* approxSync4_; 00257 message_filters::Synchronizer<ExactSync3Policy>* exactSync3_; 00258 message_filters::Synchronizer<ApproxSync3Policy>* approxSync3_; 00259 message_filters::Synchronizer<ExactSync2Policy>* exactSync2_; 00260 message_filters::Synchronizer<ApproxSync2Policy>* approxSync2_; 00261 message_filters::Subscriber<sensor_msgs::PointCloud2> cloudSub_1_; 00262 message_filters::Subscriber<sensor_msgs::PointCloud2> cloudSub_2_; 00263 message_filters::Subscriber<sensor_msgs::PointCloud2> cloudSub_3_; 00264 message_filters::Subscriber<sensor_msgs::PointCloud2> cloudSub_4_; 00265 00266 ros::Publisher cloudPub_; 00267 00268 std::string frameId_; 00269 tf::TransformListener tfListener_; 00270 }; 00271 00272 PLUGINLIB_EXPORT_CLASS(rtabmap_ros::PointCloudAggregator, nodelet::Nodelet); 00273 } 00274