throttle.cpp
Go to the documentation of this file.
00001 
00002 // throttle will transform a topic to have a limited number of bytes per second
00003 //
00004 // Copyright (C) 2009, Morgan Quigley
00005 //
00006 // Redistribution and use in source and binary forms, with or without
00007 // modification, are permitted provided that the following conditions are met:
00008 //   * Redistributions of source code must retain the above copyright notice,
00009 //     this list of conditions and the following disclaimer.
00010 //   * Redistributions in binary form must reproduce the above copyright
00011 //     notice, this list of conditions and the following disclaimer in the
00012 //     documentation and/or other materials provided with the distribution.
00013 //   * Neither the name of Stanford University nor the names of its
00014 //     contributors may be used to endorse or promote products derived from
00015 //     this software without specific prior written permission.
00016 //
00017 // THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
00018 // AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
00019 // IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
00020 // ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
00021 // LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
00022 // CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
00023 // SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
00024 // INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
00025 // CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
00026 // ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
00027 // POSSIBILITY OF SUCH DAMAGE.
00029 
00030 
00031 // this could be made a lot smarter by trying to analyze and predict the
00032 // message stream density, etc., rather than just being greedy and stuffing
00033 // the output as fast as it can. 
00034 
00035 #include <cstdio>
00036 #include <cstdlib>
00037 #include <deque>
00038 #include "topic_tools/shape_shifter.h"
00039 #include "topic_tools/parse.h"
00040 
00041 using std::string;
00042 using std::vector;
00043 using std::deque;
00044 using namespace topic_tools;
00045 
00046 // TODO: move all these globals into a reasonable local scope
00047 ros::NodeHandle *g_node = NULL;
00048 uint32_t g_bps = 0; // bytes per second, not bits!
00049 ros::Duration g_period; // minimum inter-message period
00050 double g_window = 1.0; // 1 second window for starters
00051 bool g_advertised = false;
00052 string g_output_topic;
00053 string g_input_topic;
00054 ros::Publisher g_pub;
00055 ros::Subscriber* g_sub;
00056 bool g_use_messages;
00057 ros::Time g_last_time;
00058 bool g_use_wallclock;
00059 bool g_lazy;
00060 ros::TransportHints g_th;
00061 
00062 class Sent
00063 {
00064 public:
00065   double t;
00066   uint32_t len;
00067   Sent(double _t, uint32_t _len) : t(_t), len(_len) { }
00068 };
00069 deque<Sent> g_sent;
00070 
00071 void conn_cb(const ros::SingleSubscriberPublisher&);
00072 void in_cb(const ros::MessageEvent<ShapeShifter>& msg_event);
00073 
00074 void subscribe()
00075 {
00076   g_sub = new ros::Subscriber(g_node->subscribe(g_input_topic, 10, &in_cb, g_th));
00077 }
00078 
00079 void conn_cb(const ros::SingleSubscriberPublisher&)
00080 {
00081   // If we're in lazy subscribe mode, and the first subscriber just
00082   // connected, then subscribe, #3546
00083   if(g_lazy && !g_sub)
00084   {
00085     ROS_DEBUG("lazy mode; resubscribing");
00086     subscribe();
00087   }
00088 }
00089 
00090 void in_cb(const ros::MessageEvent<ShapeShifter>& msg_event)
00091 {
00092   boost::shared_ptr<ShapeShifter const> const &msg = msg_event.getConstMessage();
00093   boost::shared_ptr<const ros::M_string> const& connection_header = msg_event.getConnectionHeaderPtr();
00094 
00095   if (!g_advertised)
00096   {
00097     // If the input topic is latched, make the output topic latched
00098     bool latch = false;
00099     if (connection_header)
00100     {
00101       ros::M_string::const_iterator it = connection_header->find("latching");
00102       if((it != connection_header->end()) && (it->second == "1"))
00103       {
00104         ROS_DEBUG("input topic is latched; latching output topic to match");
00105         latch = true;
00106       }
00107     }
00108     g_pub = msg->advertise(*g_node, g_output_topic, 10, latch, conn_cb);
00109     g_advertised = true;
00110     printf("advertised as %s\n", g_output_topic.c_str());
00111   }
00112   // If we're in lazy subscribe mode, and nobody's listening, 
00113   // then unsubscribe, #3546.
00114   if(g_lazy && !g_pub.getNumSubscribers())
00115   {
00116     ROS_DEBUG("lazy mode; unsubscribing");
00117     delete g_sub;
00118     g_sub = NULL;
00119   }
00120   else
00121   {
00122     if(g_use_messages)
00123     {
00124       ros::Time now;
00125       if(g_use_wallclock)
00126         now.fromSec(ros::WallTime::now().toSec());
00127       else
00128         now = ros::Time::now();
00129       if((now - g_last_time) > g_period)
00130       {
00131         g_pub.publish(msg);
00132         g_last_time = now;
00133       }
00134     }
00135     else
00136     {
00137       // pop the front of the queue until it's within the window
00138       ros::Time now;
00139       if(g_use_wallclock)
00140         now.fromSec(ros::WallTime::now().toSec());
00141       else
00142         now = ros::Time::now();
00143       const double t = now.toSec();
00144       while (!g_sent.empty() && g_sent.front().t < t - g_window)
00145         g_sent.pop_front();
00146       // sum up how many bytes are in the window
00147       uint32_t bytes = 0;
00148       for (deque<Sent>::iterator i = g_sent.begin(); i != g_sent.end(); ++i)
00149         bytes += i->len;
00150       if (bytes < g_bps)
00151       {
00152         g_pub.publish(msg);
00153         g_sent.push_back(Sent(t, msg->size()));
00154       }
00155     }
00156   }
00157 }
00158 
00159 #define USAGE "\nusage: \n"\
00160            "  throttle messages IN_TOPIC MSGS_PER_SEC [OUT_TOPIC]]\n"\
00161            "OR\n"\
00162            "  throttle bytes IN_TOPIC BYTES_PER_SEC WINDOW [OUT_TOPIC]]\n\n"\
00163            "  This program will drop messages from IN_TOPIC so that either: the \n"\
00164            "  average bytes per second on OUT_TOPIC, averaged over WINDOW \n"\
00165            "  seconds, remains below BYTES_PER_SEC, or: the minimum inter-message\n"\
00166            "  period is 1/MSGS_PER_SEC. The messages are output \n"\
00167            "  to OUT_TOPIC, or (if not supplied), to IN_TOPIC_throttle.\n\n"
00168 
00169 int main(int argc, char **argv)
00170 {
00171   if(argc < 3)
00172   {
00173     puts(USAGE);
00174     return 1;
00175   }
00176 
00177   g_input_topic = string(argv[2]);
00178 
00179   std::string topic_name;
00180   if(!getBaseName(string(argv[2]), topic_name))
00181     return 1;
00182 
00183   ros::init(argc, argv, topic_name + string("_throttle"),
00184             ros::init_options::AnonymousName);
00185   bool unreliable = false;
00186   ros::NodeHandle pnh("~");
00187   pnh.getParam("wall_clock", g_use_wallclock);
00188   pnh.getParam("unreliable", unreliable);
00189   pnh.getParam("lazy", g_lazy);
00190 
00191   if (unreliable)
00192     g_th.unreliable().reliable(); // Prefers unreliable, but will accept reliable.
00193 
00194   if(!strcmp(argv[1], "messages"))
00195     g_use_messages = true;
00196   else if(!strcmp(argv[1], "bytes"))
00197     g_use_messages = false;
00198   else
00199   {
00200     puts(USAGE);
00201     return 1;
00202   }
00203 
00204   if(g_use_messages && argc == 5)
00205     g_output_topic = string(argv[4]);
00206   else if(!g_use_messages && argc == 6)
00207     g_output_topic = string(argv[5]);
00208   else
00209     g_output_topic = g_input_topic + "_throttle";
00210 
00211   if(g_use_messages)
00212   {
00213     if(argc < 4)
00214     {
00215       puts(USAGE);
00216       return 1;
00217     }
00218     g_period = ros::Duration(1.0/atof(argv[3]));
00219   }
00220   else
00221   {
00222     if(argc < 5)
00223     {
00224       puts(USAGE);
00225       return 1;
00226     }
00227     g_bps = atoi(argv[3]);
00228     g_window = atof(argv[4]);
00229   }
00230 
00231   ros::NodeHandle n;
00232   g_node = &n;
00233   subscribe();
00234   ros::spin();
00235   return 0;
00236 }
00237 


topic_tools
Author(s): Morgan Quigley, Brian Gerkey
autogenerated on Thu Jun 6 2019 21:10:59