00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035
00036
00037 #ifndef NODELET_NODELET_MUX_H_
00038 #define NODELET_NODELET_MUX_H_
00039
00040 #include <ros/ros.h>
00041 #include <nodelet/nodelet.h>
00042 #include <message_filters/time_synchronizer.h>
00043 #include <message_filters/pass_through.h>
00044
00045 namespace nodelet
00046 {
00051 template <typename T, typename Filter>
00052 class NodeletMUX: public Nodelet
00053 {
00054 typedef typename boost::shared_ptr<T> TPtr;
00055 typedef typename boost::shared_ptr<const T> TConstPtr;
00056
00057 public:
00058
00059 NodeletMUX () : maximum_queue_size_ (3) {}
00060
00062
00063 virtual void
00064 onInit ()
00065 {
00066 private_nh_ = getMTPrivateNodeHandle ();
00067 pub_output_ = private_nh_.template advertise<T> ("output", 1);
00068
00069 XmlRpc::XmlRpcValue input_topics;
00070 if (!private_nh_.getParam ("input_topics", input_topics))
00071 {
00072 ROS_ERROR ("[nodelet::NodeletMUX::init] Need a 'input_topics' parameter to be set before continuing!");
00073 return;
00074 }
00075
00076 private_nh_.getParam ("max_queue_size", maximum_queue_size_);
00077
00078
00079 switch (input_topics.getType ())
00080 {
00081 case XmlRpc::XmlRpcValue::TypeArray:
00082 {
00083 if (input_topics.size () == 1)
00084 {
00085 ROS_ERROR ("[nodelet::NodeletMUX::init] Only one topic given. Does it make sense to passthrough?");
00086 return;
00087 }
00088
00089 if (input_topics.size () > 8)
00090 {
00091 ROS_ERROR ("[nodelet::NodeletMUX::init] More than 8 topics passed!");
00092 return;
00093 }
00094
00095 ROS_INFO_STREAM ("[nodelet::NodeletMUX::init] Subscribing to " << input_topics.size () << " user given topics as inputs:");
00096 for (int d = 0; d < input_topics.size (); ++d)
00097 ROS_INFO_STREAM (" - " << (std::string)(input_topics[d]));
00098
00099
00100 filters_.resize (input_topics.size ());
00101 for (int d = 0; d < input_topics.size (); ++d)
00102 {
00103 filters_[d].reset (new Filter ());
00104 filters_[d]->subscribe (private_nh_, (std::string)(input_topics[d]), 1);
00105 }
00106
00107
00108 filters_[0]->registerCallback (boost::bind (&NodeletMUX<T,Filter>::filter_cb, this, _1));
00109
00110 ts_.reset (new message_filters::TimeSynchronizer<T,T,T,T,T,T,T,T> (maximum_queue_size_));
00111
00112 switch (input_topics.size ())
00113 {
00114 case 2:
00115 {
00116 ts_->connectInput (*filters_[0], *filters_[1], nf_, nf_, nf_, nf_, nf_, nf_);
00117 break;
00118 }
00119 case 3:
00120 {
00121 ts_->connectInput (*filters_[0], *filters_[1], *filters_[2], nf_, nf_, nf_, nf_, nf_);
00122 break;
00123 }
00124 case 4:
00125 {
00126 ts_->connectInput (*filters_[0], *filters_[1], *filters_[2], *filters_[3], nf_, nf_, nf_, nf_);
00127 break;
00128 }
00129 case 5:
00130 {
00131 ts_->connectInput (*filters_[0], *filters_[1], *filters_[2], *filters_[3], *filters_[4], nf_, nf_, nf_);
00132 break;
00133 }
00134 case 6:
00135 {
00136 ts_->connectInput (*filters_[0], *filters_[1], *filters_[2], *filters_[3], *filters_[4], *filters_[5], nf_, nf_);
00137 break;
00138 }
00139 case 7:
00140 {
00141 ts_->connectInput (*filters_[0], *filters_[1], *filters_[2], *filters_[3], *filters_[4], *filters_[5], *filters_[6], nf_);
00142 break;
00143 }
00144 case 8:
00145 {
00146 ts_->connectInput (*filters_[0], *filters_[1], *filters_[2], *filters_[3], *filters_[4], *filters_[5], *filters_[6], *filters_[7]);
00147 break;
00148 }
00149 default:
00150 {
00151 ROS_ERROR ("[nodelet::NodeletMUX::init] Invalid 'input_topics' parameter given!");
00152 return;
00153 }
00154 }
00155 break;
00156 }
00157 default:
00158 {
00159 ROS_ERROR ("[nodelet::NodeletMUX::init] Invalid 'input_topics' parameter given!");
00160 return;
00161 }
00162 }
00163
00164 ts_->registerCallback (boost::bind (&NodeletMUX<T,Filter>::input, this, _1, _2, _3, _4, _5, _6, _7, _8));
00165 }
00166
00167 private:
00168
00169 void
00170 filter_cb (const TConstPtr &input)
00171 {
00172 nf_.add (input);
00173 }
00174
00175 void
00176 input (const TConstPtr &in1, const TConstPtr &in2, const TConstPtr &in3, const TConstPtr &in4,
00177 const TConstPtr &in5, const TConstPtr &in6, const TConstPtr &in7, const TConstPtr &in8)
00178 {
00179 pub_output_.publish (in1); pub_output_.publish (in2); pub_output_.publish (in3); pub_output_.publish (in4);
00180 pub_output_.publish (in5); pub_output_.publish (in6); pub_output_.publish (in7); pub_output_.publish (in8);
00181 }
00182
00184 ros::NodeHandle private_nh_;
00186 ros::Publisher pub_output_;
00187
00189 message_filters::PassThrough<T> nf_;
00190
00192 int maximum_queue_size_;
00193
00195 std::vector<boost::shared_ptr<Filter> > filters_;
00196
00198 boost::shared_ptr<message_filters::TimeSynchronizer<T,T,T,T,T,T,T,T> > ts_;
00199 };
00200
00201 }
00202 #endif