Go to the documentation of this file.00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00029
00030
00031
00032
00033
00034
00035 #include <cstdio>
00036 #include <cstdlib>
00037 #include <deque>
00038 #include "topic_tools/shape_shifter.h"
00039 #include "topic_tools/parse.h"
00040
00041 using std::string;
00042 using std::vector;
00043 using std::deque;
00044 using namespace topic_tools;
00045
00046
00047 ros::NodeHandle *g_node = NULL;
00048 uint32_t g_bps = 0;
00049 ros::Duration g_period;
00050 double g_window = 1.0;
00051 bool g_advertised = false;
00052 string g_output_topic;
00053 string g_input_topic;
00054 ros::Publisher g_pub;
00055 ros::Subscriber* g_sub;
00056 bool g_use_messages;
00057 ros::Time g_last_time;
00058 bool g_use_wallclock;
00059 bool g_lazy;
00060 ros::TransportHints g_th;
00061
00062 class Sent
00063 {
00064 public:
00065 double t;
00066 uint32_t len;
00067 Sent(double _t, uint32_t _len) : t(_t), len(_len) { }
00068 };
00069 deque<Sent> g_sent;
00070
00071 void conn_cb(const ros::SingleSubscriberPublisher&);
00072 void in_cb(const boost::shared_ptr<ShapeShifter const>& msg);
00073
00074 void conn_cb(const ros::SingleSubscriberPublisher&)
00075 {
00076
00077
00078 if(g_lazy && !g_sub)
00079 {
00080 ROS_DEBUG("lazy mode; resubscribing");
00081 g_sub = new ros::Subscriber(g_node->subscribe<ShapeShifter>(g_input_topic, 10, &in_cb, g_th));
00082 }
00083 }
00084
00085 void in_cb(const boost::shared_ptr<ShapeShifter const>& msg)
00086 {
00087 if (!g_advertised)
00088 {
00089
00090 bool latch = false;
00091 ros::M_string::iterator it = msg->__connection_header->find("latching");
00092 if((it != msg->__connection_header->end()) && (it->second == "1"))
00093 {
00094 ROS_DEBUG("input topic is latched; latching output topic to match");
00095 latch = true;
00096 }
00097 g_pub = msg->advertise(*g_node, g_output_topic, 10, latch, conn_cb);
00098 g_advertised = true;
00099 printf("advertised as %s\n", g_output_topic.c_str());
00100 }
00101
00102
00103 if(g_lazy && !g_pub.getNumSubscribers())
00104 {
00105 ROS_DEBUG("lazy mode; unsubscribing");
00106 delete g_sub;
00107 g_sub = NULL;
00108 }
00109 else
00110 {
00111 if(g_use_messages)
00112 {
00113 ros::Time now;
00114 if(g_use_wallclock)
00115 now.fromSec(ros::WallTime::now().toSec());
00116 else
00117 now = ros::Time::now();
00118 if((now - g_last_time) > g_period)
00119 {
00120 g_pub.publish(msg);
00121 g_last_time = now;
00122 }
00123 }
00124 else
00125 {
00126
00127 ros::Time now;
00128 if(g_use_wallclock)
00129 now.fromSec(ros::WallTime::now().toSec());
00130 else
00131 now = ros::Time::now();
00132 const double t = now.toSec();
00133 while (!g_sent.empty() && g_sent.front().t < t - g_window)
00134 g_sent.pop_front();
00135
00136 uint32_t bytes = 0;
00137 for (deque<Sent>::iterator i = g_sent.begin(); i != g_sent.end(); ++i)
00138 bytes += i->len;
00139 if (bytes < g_bps)
00140 {
00141 g_pub.publish(msg);
00142 g_sent.push_back(Sent(t, msg->size()));
00143 }
00144 }
00145 }
00146 }
00147
00148 #define USAGE "\nusage: \n"\
00149 " throttle messages IN_TOPIC MSGS_PER_SEC [OUT_TOPIC]]\n"\
00150 "OR\n"\
00151 " throttle bytes IN_TOPIC BYTES_PER_SEC WINDOW [OUT_TOPIC]]\n\n"\
00152 " This program will drop messages from IN_TOPIC so that either: the \n"\
00153 " average bytes per second on OUT_TOPIC, averaged over WINDOW \n"\
00154 " seconds, remains below BYTES_PER_SEC, or: the minimum inter-message\n"\
00155 " period is 1/MSGS_PER_SEC. The messages are output \n"\
00156 " to OUT_TOPIC, or (if not supplied), to IN_TOPIC_throttle.\n\n"
00157
00158 int main(int argc, char **argv)
00159 {
00160 if(argc < 3)
00161 {
00162 puts(USAGE);
00163 return 1;
00164 }
00165
00166 g_input_topic = string(argv[2]);
00167
00168 std::string topic_name;
00169 if(!getBaseName(string(argv[2]), topic_name))
00170 return 1;
00171
00172 ros::init(argc, argv, topic_name + string("_throttle"),
00173 ros::init_options::AnonymousName);
00174 bool unreliable = false;
00175 ros::NodeHandle pnh("~");
00176 pnh.getParam("wall_clock", g_use_wallclock);
00177 pnh.getParam("unreliable", unreliable);
00178 pnh.getParam("lazy", g_lazy);
00179
00180 if (unreliable)
00181 g_th.unreliable().reliable();
00182
00183 if(!strcmp(argv[1], "messages"))
00184 g_use_messages = true;
00185 else if(!strcmp(argv[1], "bytes"))
00186 g_use_messages = false;
00187 else
00188 {
00189 puts(USAGE);
00190 return 1;
00191 }
00192
00193 if(g_use_messages && argc == 5)
00194 g_output_topic = string(argv[4]);
00195 else if(!g_use_messages && argc == 6)
00196 g_output_topic = string(argv[5]);
00197 else
00198 g_output_topic = g_input_topic + "_throttle";
00199
00200 if(g_use_messages)
00201 {
00202 if(argc < 4)
00203 {
00204 puts(USAGE);
00205 return 1;
00206 }
00207 g_period = ros::Duration(1.0/atof(argv[3]));
00208 }
00209 else
00210 {
00211 if(argc < 5)
00212 {
00213 puts(USAGE);
00214 return 1;
00215 }
00216 g_bps = atoi(argv[3]);
00217 g_window = atof(argv[4]);
00218 }
00219
00220 ros::NodeHandle n;
00221 g_node = &n;
00222 g_sub = new ros::Subscriber(n.subscribe<ShapeShifter>(g_input_topic, 10, &in_cb, g_th));
00223 ros::spin();
00224 return 0;
00225 }
00226