$search
00001 00002 // throttle will transform a topic to have a limited number of bytes per second 00003 // 00004 // Copyright (C) 2009, Morgan Quigley 00005 // 00006 // Redistribution and use in source and binary forms, with or without 00007 // modification, are permitted provided that the following conditions are met: 00008 // * Redistributions of source code must retain the above copyright notice, 00009 // this list of conditions and the following disclaimer. 00010 // * Redistributions in binary form must reproduce the above copyright 00011 // notice, this list of conditions and the following disclaimer in the 00012 // documentation and/or other materials provided with the distribution. 00013 // * Neither the name of Stanford University nor the names of its 00014 // contributors may be used to endorse or promote products derived from 00015 // this software without specific prior written permission. 00016 // 00017 // THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" 00018 // AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 00019 // IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 00020 // ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE 00021 // LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR 00022 // CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF 00023 // SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS 00024 // INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN 00025 // CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) 00026 // ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 00027 // POSSIBILITY OF SUCH DAMAGE. 00029 00030 00031 // this could be made a lot smarter by trying to analyze and predict the 00032 // message stream density, etc., rather than just being greedy and stuffing 00033 // the output as fast as it can. 00034 00035 #include <cstdio> 00036 #include <cstdlib> 00037 #include <deque> 00038 #include "topic_tools/shape_shifter.h" 00039 #include "topic_tools/parse.h" 00040 00041 using std::string; 00042 using std::vector; 00043 using std::deque; 00044 using namespace topic_tools; 00045 00046 // TODO: move all these globals into a reasonable local scope 00047 ros::NodeHandle *g_node = NULL; 00048 uint32_t g_bps = 0; // bytes per second, not bits! 00049 ros::Duration g_period; // minimum inter-message period 00050 double g_window = 1.0; // 1 second window for starters 00051 bool g_advertised = false; 00052 string g_output_topic; 00053 string g_input_topic; 00054 ros::Publisher g_pub; 00055 ros::Subscriber* g_sub; 00056 bool g_use_messages; 00057 ros::Time g_last_time; 00058 bool g_use_wallclock; 00059 bool g_lazy; 00060 ros::TransportHints g_th; 00061 00062 class Sent 00063 { 00064 public: 00065 double t; 00066 uint32_t len; 00067 Sent(double _t, uint32_t _len) : t(_t), len(_len) { } 00068 }; 00069 deque<Sent> g_sent; 00070 00071 void conn_cb(const ros::SingleSubscriberPublisher&); 00072 void in_cb(const boost::shared_ptr<ShapeShifter const>& msg); 00073 00074 void conn_cb(const ros::SingleSubscriberPublisher&) 00075 { 00076 // If we're in lazy subscribe mode, and the first subscriber just 00077 // connected, then subscribe, #3546 00078 if(g_lazy && !g_sub) 00079 { 00080 ROS_DEBUG("lazy mode; resubscribing"); 00081 g_sub = new ros::Subscriber(g_node->subscribe<ShapeShifter>(g_input_topic, 10, &in_cb, g_th)); 00082 } 00083 } 00084 00085 void in_cb(const boost::shared_ptr<ShapeShifter const>& msg) 00086 { 00087 if (!g_advertised) 00088 { 00089 // If the input topic is latched, make the output topic latched 00090 bool latch = false; 00091 ros::M_string::iterator it = msg->__connection_header->find("latching"); 00092 if((it != msg->__connection_header->end()) && (it->second == "1")) 00093 { 00094 ROS_DEBUG("input topic is latched; latching output topic to match"); 00095 latch = true; 00096 } 00097 g_pub = msg->advertise(*g_node, g_output_topic, 10, latch, conn_cb); 00098 g_advertised = true; 00099 printf("advertised as %s\n", g_output_topic.c_str()); 00100 } 00101 // If we're in lazy subscribe mode, and nobody's listening, 00102 // then unsubscribe, #3546. 00103 if(g_lazy && !g_pub.getNumSubscribers()) 00104 { 00105 ROS_DEBUG("lazy mode; unsubscribing"); 00106 delete g_sub; 00107 g_sub = NULL; 00108 } 00109 else 00110 { 00111 if(g_use_messages) 00112 { 00113 ros::Time now; 00114 if(g_use_wallclock) 00115 now.fromSec(ros::WallTime::now().toSec()); 00116 else 00117 now = ros::Time::now(); 00118 if((now - g_last_time) > g_period) 00119 { 00120 g_pub.publish(msg); 00121 g_last_time = now; 00122 } 00123 } 00124 else 00125 { 00126 // pop the front of the queue until it's within the window 00127 ros::Time now; 00128 if(g_use_wallclock) 00129 now.fromSec(ros::WallTime::now().toSec()); 00130 else 00131 now = ros::Time::now(); 00132 const double t = now.toSec(); 00133 while (!g_sent.empty() && g_sent.front().t < t - g_window) 00134 g_sent.pop_front(); 00135 // sum up how many bytes are in the window 00136 uint32_t bytes = 0; 00137 for (deque<Sent>::iterator i = g_sent.begin(); i != g_sent.end(); ++i) 00138 bytes += i->len; 00139 if (bytes < g_bps) 00140 { 00141 g_pub.publish(msg); 00142 g_sent.push_back(Sent(t, msg->size())); 00143 } 00144 } 00145 } 00146 } 00147 00148 #define USAGE "\nusage: \n"\ 00149 " throttle messages IN_TOPIC MSGS_PER_SEC [OUT_TOPIC]]\n"\ 00150 "OR\n"\ 00151 " throttle bytes IN_TOPIC BYTES_PER_SEC WINDOW [OUT_TOPIC]]\n\n"\ 00152 " This program will drop messages from IN_TOPIC so that either: the \n"\ 00153 " average bytes per second on OUT_TOPIC, averaged over WINDOW \n"\ 00154 " seconds, remains below BYTES_PER_SEC, or: the minimum inter-message\n"\ 00155 " period is 1/MSGS_PER_SEC. The messages are output \n"\ 00156 " to OUT_TOPIC, or (if not supplied), to IN_TOPIC_throttle.\n\n" 00157 00158 int main(int argc, char **argv) 00159 { 00160 if(argc < 3) 00161 { 00162 puts(USAGE); 00163 return 1; 00164 } 00165 00166 g_input_topic = string(argv[2]); 00167 00168 std::string topic_name; 00169 if(!getBaseName(string(argv[2]), topic_name)) 00170 return 1; 00171 00172 ros::init(argc, argv, topic_name + string("_throttle"), 00173 ros::init_options::AnonymousName); 00174 bool unreliable = false; 00175 ros::NodeHandle pnh("~"); 00176 pnh.getParam("wall_clock", g_use_wallclock); 00177 pnh.getParam("unreliable", unreliable); 00178 pnh.getParam("lazy", g_lazy); 00179 00180 if (unreliable) 00181 g_th.unreliable().reliable(); // Prefers unreliable, but will accept reliable. 00182 00183 if(!strcmp(argv[1], "messages")) 00184 g_use_messages = true; 00185 else if(!strcmp(argv[1], "bytes")) 00186 g_use_messages = false; 00187 else 00188 { 00189 puts(USAGE); 00190 return 1; 00191 } 00192 00193 if(g_use_messages && argc == 5) 00194 g_output_topic = string(argv[4]); 00195 else if(!g_use_messages && argc == 6) 00196 g_output_topic = string(argv[5]); 00197 else 00198 g_output_topic = g_input_topic + "_throttle"; 00199 00200 if(g_use_messages) 00201 { 00202 if(argc < 4) 00203 { 00204 puts(USAGE); 00205 return 1; 00206 } 00207 g_period = ros::Duration(1.0/atof(argv[3])); 00208 } 00209 else 00210 { 00211 if(argc < 5) 00212 { 00213 puts(USAGE); 00214 return 1; 00215 } 00216 g_bps = atoi(argv[3]); 00217 g_window = atof(argv[4]); 00218 } 00219 00220 ros::NodeHandle n; 00221 g_node = &n; 00222 g_sub = new ros::Subscriber(n.subscribe<ShapeShifter>(g_input_topic, 10, &in_cb, g_th)); 00223 ros::spin(); 00224 return 0; 00225 } 00226