00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035
00036
00037 #ifndef NODELET_NODELET_MUX_H_
00038 #define NODELET_NODELET_MUX_H_
00039
00040 #include <ros/ros.h>
00041 #include <nodelet/nodelet.h>
00042 #include <message_filters/time_synchronizer.h>
00043 #include <message_filters/pass_through.h>
00044
00045 namespace nodelet
00046 {
00051 template <typename T, typename Filter>
00052 class NodeletMUX: public Nodelet
00053 {
00054 typedef typename boost::shared_ptr<T> TPtr;
00055 typedef typename boost::shared_ptr<const T> TConstPtr;
00056
00057 public:
00058 NodeletMUX () : maximum_queue_size_ (3) {}
00059
00061
00062 virtual void
00063 onInit ()
00064 {
00065 private_nh_ = getMTPrivateNodeHandle ();
00066 pub_output_ = private_nh_.template advertise<T> ("output", 1);
00067
00068 XmlRpc::XmlRpcValue input_topics;
00069 if (!private_nh_.getParam ("input_topics", input_topics))
00070 {
00071 ROS_ERROR ("[nodelet::NodeletMUX::init] Need a 'input_topics' parameter to be set before continuing!");
00072 return;
00073 }
00074
00075 private_nh_.getParam ("max_queue_size", maximum_queue_size_);
00076
00077
00078 switch (input_topics.getType ())
00079 {
00080 case XmlRpc::XmlRpcValue::TypeArray:
00081 {
00082 if (input_topics.size () == 1)
00083 {
00084 ROS_ERROR ("[nodelet::NodeletMUX::init] Only one topic given. Does it make sense to passthrough?");
00085 return;
00086 }
00087
00088 if (input_topics.size () > 8)
00089 {
00090 ROS_ERROR ("[nodelet::NodeletMUX::init] More than 8 topics passed!");
00091 return;
00092 }
00093
00094 ROS_INFO_STREAM ("[nodelet::NodeletMUX::init] Subscribing to " << input_topics.size () << " user given topics as inputs:");
00095 for (int d = 0; d < input_topics.size (); ++d)
00096 ROS_INFO_STREAM (" - " << (std::string)(input_topics[d]));
00097
00098
00099 filters_.resize (input_topics.size ());
00100 for (int d = 0; d < input_topics.size (); ++d)
00101 {
00102 filters_[d].reset (new Filter ());
00103 filters_[d]->subscribe (private_nh_, (std::string)(input_topics[d]), 1);
00104 }
00105
00106
00107 filters_[0]->registerCallback (boost::bind (&NodeletMUX<T,Filter>::filter_cb, this, _1));
00108
00109 ts_.reset (new message_filters::TimeSynchronizer<T,T,T,T,T,T,T,T> (maximum_queue_size_));
00110
00111 switch (input_topics.size ())
00112 {
00113 case 2:
00114 {
00115 ts_->connectInput (*filters_[0], *filters_[1], nf_, nf_, nf_, nf_, nf_, nf_);
00116 break;
00117 }
00118 case 3:
00119 {
00120 ts_->connectInput (*filters_[0], *filters_[1], *filters_[2], nf_, nf_, nf_, nf_, nf_);
00121 break;
00122 }
00123 case 4:
00124 {
00125 ts_->connectInput (*filters_[0], *filters_[1], *filters_[2], *filters_[3], nf_, nf_, nf_, nf_);
00126 break;
00127 }
00128 case 5:
00129 {
00130 ts_->connectInput (*filters_[0], *filters_[1], *filters_[2], *filters_[3], *filters_[4], nf_, nf_, nf_);
00131 break;
00132 }
00133 case 6:
00134 {
00135 ts_->connectInput (*filters_[0], *filters_[1], *filters_[2], *filters_[3], *filters_[4], *filters_[5], nf_, nf_);
00136 break;
00137 }
00138 case 7:
00139 {
00140 ts_->connectInput (*filters_[0], *filters_[1], *filters_[2], *filters_[3], *filters_[4], *filters_[5], *filters_[6], nf_);
00141 break;
00142 }
00143 case 8:
00144 {
00145 ts_->connectInput (*filters_[0], *filters_[1], *filters_[2], *filters_[3], *filters_[4], *filters_[5], *filters_[6], *filters_[7]);
00146 break;
00147 }
00148 default:
00149 {
00150 ROS_ERROR ("[nodelet::NodeletMUX::init] Invalid 'input_topics' parameter given!");
00151 return;
00152 }
00153 }
00154 break;
00155 }
00156 default:
00157 {
00158 ROS_ERROR ("[nodelet::NodeletMUX::init] Invalid 'input_topics' parameter given!");
00159 return;
00160 }
00161 }
00162
00163 ts_->registerCallback (boost::bind (&NodeletMUX<T,Filter>::input, this, _1, _2, _3, _4, _5, _6, _7, _8));
00164 }
00165
00166 private:
00167
00168 void
00169 filter_cb (const TConstPtr &input)
00170 {
00171 nf_.add (input);
00172 }
00173
00174 void
00175 input (const TConstPtr &in1, const TConstPtr &in2, const TConstPtr &in3, const TConstPtr &in4,
00176 const TConstPtr &in5, const TConstPtr &in6, const TConstPtr &in7, const TConstPtr &in8)
00177 {
00178 pub_output_.publish (in1); pub_output_.publish (in2); pub_output_.publish (in3); pub_output_.publish (in4);
00179 pub_output_.publish (in5); pub_output_.publish (in6); pub_output_.publish (in7); pub_output_.publish (in8);
00180 }
00181
00183 ros::NodeHandle private_nh_;
00185 ros::Publisher pub_output_;
00186
00188 message_filters::PassThrough<T> nf_;
00189
00191 int maximum_queue_size_;
00192
00194 std::vector<boost::shared_ptr<Filter> > filters_;
00195
00197 boost::shared_ptr<message_filters::TimeSynchronizer<T,T,T,T,T,T,T,T> > ts_;
00198 };
00199
00200 }
00201 #endif