nodelet_mux.h
Go to the documentation of this file.
00001 /*
00002  * Software License Agreement (BSD License)
00003  *
00004  *  Copyright (c) 2010, Willow Garage, Inc.
00005  *  All rights reserved.
00006  *
00007  *  Redistribution and use in source and binary forms, with or without
00008  *  modification, are permitted provided that the following conditions
00009  *  are met:
00010  *
00011  *   * Redistributions of source code must retain the above copyright
00012  *     notice, this list of conditions and the following disclaimer.
00013  *   * Redistributions in binary form must reproduce the above
00014  *     copyright notice, this list of conditions and the following
00015  *     disclaimer in the documentation and/or other materials provided
00016  *     with the distribution.
00017  *   * Neither the name of Willow Garage, Inc. nor the names of its
00018  *     contributors may be used to endorse or promote products derived
00019  *     from this software without specific prior written permission.
00020  *
00021  *  THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
00022  *  "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
00023  *  LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
00024  *  FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
00025  *  COPYRIGHT OWNERff OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
00026  *  INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
00027  *  BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
00028  *  LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
00029  *  CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
00030  *  LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
00031  *  ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
00032  *  POSSIBILITY OF SUCH DAMAGE.
00033  *
00034  * $Id$
00035  *
00036  */
00037 #ifndef NODELET_NODELET_MUX_H_
00038 #define NODELET_NODELET_MUX_H_
00039 
00040 #include <ros/ros.h>
00041 #include <nodelet/nodelet.h>
00042 #include <message_filters/time_synchronizer.h>
00043 #include <message_filters/pass_through.h>
00044 
00045 namespace nodelet
00046 {
00051   template <typename T, typename Filter>
00052   class NodeletMUX: public Nodelet
00053   {
00054     typedef typename boost::shared_ptr<T> TPtr;
00055     typedef typename boost::shared_ptr<const T> TConstPtr;
00056 
00057     public:
00058       NodeletMUX () : maximum_queue_size_ (3) {}
00059 
00061 
00062       virtual void
00063         onInit ()
00064       {
00065         private_nh_ = getMTPrivateNodeHandle ();
00066         pub_output_ = private_nh_.template advertise<T> ("output", 1);
00067 
00068         XmlRpc::XmlRpcValue input_topics;
00069         if (!private_nh_.getParam ("input_topics", input_topics))
00070         {
00071           ROS_ERROR ("[nodelet::NodeletMUX::init] Need a 'input_topics' parameter to be set before continuing!");
00072           return;
00073         }
00074 
00075         private_nh_.getParam ("max_queue_size", maximum_queue_size_);
00076 
00077         // Check the type
00078         switch (input_topics.getType ())
00079         {
00080           case XmlRpc::XmlRpcValue::TypeArray:
00081           {
00082             if (input_topics.size () == 1)
00083             {
00084               ROS_ERROR ("[nodelet::NodeletMUX::init] Only one topic given. Does it make sense to passthrough?");
00085               return;
00086             }
00087 
00088             if (input_topics.size () > 8)
00089             {
00090               ROS_ERROR ("[nodelet::NodeletMUX::init] More than 8 topics passed!");
00091               return;
00092              }
00093 
00094             ROS_INFO_STREAM ("[nodelet::NodeletMUX::init] Subscribing to " << input_topics.size () << " user given topics as inputs:");
00095             for (int d = 0; d < input_topics.size (); ++d)
00096               ROS_INFO_STREAM (" - " << (std::string)(input_topics[d]));
00097 
00098             // Subscribe to the filters
00099             filters_.resize (input_topics.size ());
00100             for (int d = 0; d < input_topics.size (); ++d)
00101             {
00102               filters_[d].reset (new Filter ());
00103               filters_[d]->subscribe (private_nh_, (std::string)(input_topics[d]), 1);
00104             }
00105 
00106             // Subscribe to 1 callback to fill in the passthrough
00107             filters_[0]->registerCallback (boost::bind (&NodeletMUX<T,Filter>::filter_cb, this, _1));
00108 
00109             ts_.reset (new message_filters::TimeSynchronizer<T,T,T,T,T,T,T,T> (maximum_queue_size_));
00110 
00111             switch (input_topics.size ())
00112             {
00113               case 2:
00114               {
00115                 ts_->connectInput (*filters_[0], *filters_[1], nf_, nf_, nf_, nf_, nf_, nf_);
00116                 break;
00117               }
00118               case 3:
00119               {
00120                 ts_->connectInput (*filters_[0], *filters_[1], *filters_[2], nf_, nf_, nf_, nf_, nf_);
00121                 break;
00122               }
00123               case 4:
00124               {
00125                 ts_->connectInput (*filters_[0], *filters_[1], *filters_[2], *filters_[3], nf_, nf_, nf_, nf_);
00126                 break;
00127               }
00128               case 5:
00129               {
00130                 ts_->connectInput (*filters_[0], *filters_[1], *filters_[2], *filters_[3], *filters_[4], nf_, nf_, nf_);
00131                 break;
00132               }
00133               case 6:
00134               {
00135                 ts_->connectInput (*filters_[0], *filters_[1], *filters_[2], *filters_[3], *filters_[4], *filters_[5], nf_, nf_);
00136                 break;
00137               }
00138               case 7:
00139               {
00140                 ts_->connectInput (*filters_[0], *filters_[1], *filters_[2], *filters_[3], *filters_[4], *filters_[5], *filters_[6], nf_);
00141                 break;
00142               }
00143               case 8:
00144               {
00145                 ts_->connectInput (*filters_[0], *filters_[1], *filters_[2], *filters_[3], *filters_[4], *filters_[5], *filters_[6], *filters_[7]);
00146                 break;
00147               }
00148               default:
00149               {
00150                 ROS_ERROR ("[nodelet::NodeletMUX::init] Invalid 'input_topics' parameter given!");
00151                 return;
00152               }
00153              }
00154             break;
00155           }
00156           default:
00157           {
00158             ROS_ERROR ("[nodelet::NodeletMUX::init] Invalid 'input_topics' parameter given!");
00159             return;
00160           }
00161         }
00162 
00163         ts_->registerCallback (boost::bind (&NodeletMUX<T,Filter>::input, this, _1, _2, _3, _4, _5, _6, _7, _8));
00164       }
00165 
00166     private:
00167 
00168       void
00169       filter_cb (const TConstPtr &input)
00170       {
00171         nf_.add (input);
00172       }
00173 
00174       void
00175       input (const TConstPtr &in1, const TConstPtr &in2, const TConstPtr &in3, const TConstPtr &in4,
00176              const TConstPtr &in5, const TConstPtr &in6, const TConstPtr &in7, const TConstPtr &in8)
00177       {
00178         pub_output_.publish (in1); pub_output_.publish (in2); pub_output_.publish (in3); pub_output_.publish (in4);
00179         pub_output_.publish (in5); pub_output_.publish (in6); pub_output_.publish (in7); pub_output_.publish (in8);
00180       }
00181 
00183       ros::NodeHandle private_nh_;
00185       ros::Publisher pub_output_;
00186 
00188       message_filters::PassThrough<T> nf_;
00189 
00191       int maximum_queue_size_;
00192 
00194       std::vector<boost::shared_ptr<Filter> > filters_;
00195 
00197       boost::shared_ptr<message_filters::TimeSynchronizer<T,T,T,T,T,T,T,T> > ts_;
00198   };
00199 
00200 }
00201 #endif


nodelet_topic_tools
Author(s): Radu Bogdan Rusu, Tully Foote
autogenerated on Sun Feb 17 2019 03:43:54