throttle.cpp
Go to the documentation of this file.
00001 
00002 // throttle will transform a topic to have a limited number of bytes per second
00003 //
00004 // Copyright (C) 2009, Morgan Quigley
00005 //
00006 // Redistribution and use in source and binary forms, with or without
00007 // modification, are permitted provided that the following conditions are met:
00008 //   * Redistributions of source code must retain the above copyright notice,
00009 //     this list of conditions and the following disclaimer.
00010 //   * Redistributions in binary form must reproduce the above copyright
00011 //     notice, this list of conditions and the following disclaimer in the
00012 //     documentation and/or other materials provided with the distribution.
00013 //   * Neither the name of Stanford University nor the names of its
00014 //     contributors may be used to endorse or promote products derived from
00015 //     this software without specific prior written permission.
00016 //
00017 // THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
00018 // AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
00019 // IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
00020 // ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
00021 // LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
00022 // CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
00023 // SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
00024 // INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
00025 // CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
00026 // ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
00027 // POSSIBILITY OF SUCH DAMAGE.
00029 
00030 
00031 // this could be made a lot smarter by trying to analyze and predict the
00032 // message stream density, etc., rather than just being greedy and stuffing
00033 // the output as fast as it can. 
00034 
00035 #include <cstdio>
00036 #include <cstdlib>
00037 #include <deque>
00038 #include "topic_tools/shape_shifter.h"
00039 #include "topic_tools/parse.h"
00040 
00041 using std::string;
00042 using std::vector;
00043 using std::deque;
00044 using namespace topic_tools;
00045 
00046 // TODO: move all these globals into a reasonable local scope
00047 ros::NodeHandle *g_node = NULL;
00048 uint32_t g_bps = 0; // bytes per second, not bits!
00049 ros::Duration g_period; // minimum inter-message period
00050 double g_window = 1.0; // 1 second window for starters
00051 bool g_advertised = false;
00052 string g_output_topic;
00053 string g_input_topic;
00054 ros::Publisher g_pub;
00055 ros::Subscriber* g_sub;
00056 bool g_use_messages;
00057 ros::Time g_last_time;
00058 bool g_use_wallclock;
00059 bool g_lazy;
00060 ros::TransportHints g_th;
00061 
00062 class Sent
00063 {
00064 public:
00065   double t;
00066   uint32_t len;
00067   Sent(double _t, uint32_t _len) : t(_t), len(_len) { }
00068 };
00069 deque<Sent> g_sent;
00070 
00071 void conn_cb(const ros::SingleSubscriberPublisher&);
00072 void in_cb(const boost::shared_ptr<ShapeShifter const>& msg);
00073 
00074 void conn_cb(const ros::SingleSubscriberPublisher&)
00075 {
00076   // If we're in lazy subscribe mode, and the first subscriber just
00077   // connected, then subscribe, #3546
00078   if(g_lazy && !g_sub)
00079   {
00080     ROS_DEBUG("lazy mode; resubscribing");
00081     g_sub = new ros::Subscriber(g_node->subscribe<ShapeShifter>(g_input_topic, 10, &in_cb, g_th));
00082   }
00083 }
00084 
00085 void in_cb(const boost::shared_ptr<ShapeShifter const>& msg)
00086 {
00087   if (!g_advertised)
00088   {
00089     // If the input topic is latched, make the output topic latched
00090     bool latch = false;
00091     ros::M_string::iterator it = msg->__connection_header->find("latching");
00092     if((it != msg->__connection_header->end()) && (it->second == "1"))
00093     {
00094       ROS_DEBUG("input topic is latched; latching output topic to match");
00095       latch = true;
00096     }
00097     g_pub = msg->advertise(*g_node, g_output_topic, 10, latch, conn_cb);
00098     g_advertised = true;
00099     printf("advertised as %s\n", g_output_topic.c_str());
00100   }
00101   // If we're in lazy subscribe mode, and nobody's listening, 
00102   // then unsubscribe, #3546.
00103   if(g_lazy && !g_pub.getNumSubscribers())
00104   {
00105     ROS_DEBUG("lazy mode; unsubscribing");
00106     delete g_sub;
00107     g_sub = NULL;
00108   }
00109   else
00110   {
00111     if(g_use_messages)
00112     {
00113       ros::Time now;
00114       if(g_use_wallclock)
00115         now.fromSec(ros::WallTime::now().toSec());
00116       else
00117         now = ros::Time::now();
00118       if((now - g_last_time) > g_period)
00119       {
00120         g_pub.publish(msg);
00121         g_last_time = now;
00122       }
00123     }
00124     else
00125     {
00126       // pop the front of the queue until it's within the window
00127       ros::Time now;
00128       if(g_use_wallclock)
00129         now.fromSec(ros::WallTime::now().toSec());
00130       else
00131         now = ros::Time::now();
00132       const double t = now.toSec();
00133       while (!g_sent.empty() && g_sent.front().t < t - g_window)
00134         g_sent.pop_front();
00135       // sum up how many bytes are in the window
00136       uint32_t bytes = 0;
00137       for (deque<Sent>::iterator i = g_sent.begin(); i != g_sent.end(); ++i)
00138         bytes += i->len;
00139       if (bytes < g_bps)
00140       {
00141         g_pub.publish(msg);
00142         g_sent.push_back(Sent(t, msg->size()));
00143       }
00144     }
00145   }
00146 }
00147 
00148 #define USAGE "\nusage: \n"\
00149            "  throttle messages IN_TOPIC MSGS_PER_SEC [OUT_TOPIC]]\n"\
00150            "OR\n"\
00151            "  throttle bytes IN_TOPIC BYTES_PER_SEC WINDOW [OUT_TOPIC]]\n\n"\
00152            "  This program will drop messages from IN_TOPIC so that either: the \n"\
00153            "  average bytes per second on OUT_TOPIC, averaged over WINDOW \n"\
00154            "  seconds, remains below BYTES_PER_SEC, or: the minimum inter-message\n"\
00155            "  period is 1/MSGS_PER_SEC. The messages are output \n"\
00156            "  to OUT_TOPIC, or (if not supplied), to IN_TOPIC_throttle.\n\n"
00157 
00158 int main(int argc, char **argv)
00159 {
00160   if(argc < 3)
00161   {
00162     puts(USAGE);
00163     return 1;
00164   }
00165 
00166   g_input_topic = string(argv[2]);
00167 
00168   std::string topic_name;
00169   if(!getBaseName(string(argv[2]), topic_name))
00170     return 1;
00171 
00172   ros::init(argc, argv, topic_name + string("_throttle"),
00173             ros::init_options::AnonymousName);
00174   bool unreliable = false;
00175   ros::NodeHandle pnh("~");
00176   pnh.getParam("wall_clock", g_use_wallclock);
00177   pnh.getParam("unreliable", unreliable);
00178   pnh.getParam("lazy", g_lazy);
00179 
00180   if (unreliable)
00181     g_th.unreliable().reliable(); // Prefers unreliable, but will accept reliable.
00182 
00183   if(!strcmp(argv[1], "messages"))
00184     g_use_messages = true;
00185   else if(!strcmp(argv[1], "bytes"))
00186     g_use_messages = false;
00187   else
00188   {
00189     puts(USAGE);
00190     return 1;
00191   }
00192 
00193   if(g_use_messages && argc == 5)
00194     g_output_topic = string(argv[4]);
00195   else if(!g_use_messages && argc == 6)
00196     g_output_topic = string(argv[5]);
00197   else
00198     g_output_topic = g_input_topic + "_throttle";
00199 
00200   if(g_use_messages)
00201   {
00202     if(argc < 4)
00203     {
00204       puts(USAGE);
00205       return 1;
00206     }
00207     g_period = ros::Duration(1.0/atof(argv[3]));
00208   }
00209   else
00210   {
00211     if(argc < 5)
00212     {
00213       puts(USAGE);
00214       return 1;
00215     }
00216     g_bps = atoi(argv[3]);
00217     g_window = atof(argv[4]);
00218   }
00219 
00220   ros::NodeHandle n;
00221   g_node = &n;
00222   g_sub = new ros::Subscriber(n.subscribe<ShapeShifter>(g_input_topic, 10, &in_cb, g_th));
00223   ros::spin();
00224   return 0;
00225 }
00226 


topic_tools
Author(s): Morgan Quigley, Brian Gerkey
autogenerated on Sat Dec 28 2013 17:36:13