00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00029
00030
00031
00032
00033
00034
00035 #include <cstdio>
00036 #include <cstdlib>
00037 #include <deque>
00038 #include "topic_tools/shape_shifter.h"
00039 #include "topic_tools/parse.h"
00040
00041 using std::string;
00042 using std::vector;
00043 using std::deque;
00044 using namespace topic_tools;
00045
00046
00047 static ros::NodeHandle *g_node = NULL;
00048 static uint32_t g_bps = 0;
00049 static ros::Duration g_period;
00050 static double g_window = 1.0;
00051 static bool g_advertised = false;
00052 static string g_output_topic;
00053 static ros::Publisher g_pub;
00054 static bool g_use_messages;
00055 static ros::Time g_last_time;
00056
00057
00058 class Sent
00059 {
00060 public:
00061 double t;
00062 uint32_t len;
00063 Sent(double _t, uint32_t _len) : t(_t), len(_len) { }
00064 };
00065 deque<Sent> g_sent;
00066
00067 void in_cb(const boost::shared_ptr<ShapeShifter const>& msg)
00068 {
00069 if (!g_advertised)
00070 {
00071 g_pub = msg->advertise(*g_node, g_output_topic, 10);
00072 g_advertised = true;
00073 printf("advertised as %s\n", g_output_topic.c_str());
00074 }
00075 if(g_use_messages)
00076 {
00077 ros::Time now = ros::Time::now();
00078 if((now - g_last_time) > g_period)
00079 {
00080 g_pub.publish(msg);
00081 g_last_time = now;
00082 }
00083 }
00084 else
00085 {
00086
00087 const double t = ros::Time::now().toSec();
00088 while (!g_sent.empty() && g_sent.front().t < t - g_window)
00089 g_sent.pop_front();
00090
00091 uint32_t bytes = 0;
00092 for (deque<Sent>::iterator i = g_sent.begin(); i != g_sent.end(); ++i)
00093 bytes += i->len;
00094 if (bytes < g_bps)
00095 {
00096 g_pub.publish(msg);
00097 g_sent.push_back(Sent(t, msg->size()));
00098 }
00099 }
00100 }
00101
00102 #define USAGE "\nusage: \n"\
00103 " throttle messages IN_TOPIC MSGS_PER_SEC [OUT_TOPIC]]\n"\
00104 "OR\n"\
00105 " throttle bytes IN_TOPIC BYTES_PER_SEC WINDOW [OUT_TOPIC]]\n\n"\
00106 " This program will drop messages from IN_TOPIC so that either: the \n"\
00107 " average bytes per second on OUT_TOPIC, averaged over WINDOW \n"\
00108 " seconds, remains below BYTES_PER_SEC, or: the minimum inter-message\n"\
00109 " period is 1/MSGS_PER_SEC. The messages are output \n"\
00110 " to OUT_TOPIC, or (if not supplied), to IN_TOPIC_throttle.\n\n"
00111
00112 int main(int argc, char **argv)
00113 {
00114 if(argc < 3)
00115 {
00116 puts(USAGE);
00117 return 1;
00118 }
00119
00120 string intopic = string(argv[2]);
00121
00122 std::string topic_name;
00123 if(!getBaseName(string(argv[2]), topic_name))
00124 return 1;
00125
00126 ros::init(argc, argv, topic_name + string("_throttle"),
00127 ros::init_options::AnonymousName);
00128
00129 if(!strcmp(argv[1], "messages"))
00130 g_use_messages = true;
00131 else if(!strcmp(argv[1], "bytes"))
00132 g_use_messages = false;
00133 else
00134 {
00135 puts(USAGE);
00136 return 1;
00137 }
00138
00139 if(g_use_messages && argc == 5)
00140 g_output_topic = string(argv[4]);
00141 else if(!g_use_messages && argc == 6)
00142 g_output_topic = string(argv[5]);
00143 else
00144 g_output_topic = intopic + "_throttle";
00145
00146 if(g_use_messages)
00147 {
00148 if(argc < 4)
00149 {
00150 puts(USAGE);
00151 return 1;
00152 }
00153 g_period = ros::Duration(1.0/atof(argv[3]));
00154 }
00155 else
00156 {
00157 if(argc < 5)
00158 {
00159 puts(USAGE);
00160 return 1;
00161 }
00162 g_bps = atoi(argv[3]);
00163 g_window = atof(argv[4]);
00164 }
00165
00166 ros::NodeHandle n;
00167 g_node = &n;
00168 ros::Subscriber sub = n.subscribe<ShapeShifter>(intopic, 10, &in_cb);
00169 ros::spin();
00170 return 0;
00171 }
00172