Go to the documentation of this file.00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00029
00030
00031
00032
00033
00034
00035 #include <cstdio>
00036 #include <cstdlib>
00037 #include <deque>
00038 #include "topic_tools/shape_shifter.h"
00039 #include "topic_tools/parse.h"
00040
00041 using std::string;
00042 using std::vector;
00043 using std::deque;
00044 using namespace topic_tools;
00045
00046
00047 ros::NodeHandle *g_node = NULL;
00048 uint32_t g_bps = 0;
00049 ros::Duration g_period;
00050 double g_window = 1.0;
00051 bool g_advertised = false;
00052 string g_output_topic;
00053 string g_input_topic;
00054 ros::Publisher g_pub;
00055 ros::Subscriber* g_sub;
00056 bool g_use_messages;
00057 ros::Time g_last_time;
00058 bool g_use_wallclock;
00059 bool g_lazy;
00060 ros::TransportHints g_th;
00061
00062 class Sent
00063 {
00064 public:
00065 double t;
00066 uint32_t len;
00067 Sent(double _t, uint32_t _len) : t(_t), len(_len) { }
00068 };
00069 deque<Sent> g_sent;
00070
00071 void conn_cb(const ros::SingleSubscriberPublisher&);
00072 void in_cb(const ros::MessageEvent<ShapeShifter>& msg_event);
00073
00074 void subscribe()
00075 {
00076 g_sub = new ros::Subscriber(g_node->subscribe(g_input_topic, 10, &in_cb, g_th));
00077 }
00078
00079 void conn_cb(const ros::SingleSubscriberPublisher&)
00080 {
00081
00082
00083 if(g_lazy && !g_sub)
00084 {
00085 ROS_DEBUG("lazy mode; resubscribing");
00086 subscribe();
00087 }
00088 }
00089
00090 void in_cb(const ros::MessageEvent<ShapeShifter>& msg_event)
00091 {
00092 boost::shared_ptr<ShapeShifter const> const &msg = msg_event.getConstMessage();
00093 boost::shared_ptr<const ros::M_string> const& connection_header = msg_event.getConnectionHeaderPtr();
00094
00095 if (!g_advertised)
00096 {
00097
00098 bool latch = false;
00099 if (connection_header)
00100 {
00101 ros::M_string::const_iterator it = connection_header->find("latching");
00102 if((it != connection_header->end()) && (it->second == "1"))
00103 {
00104 ROS_DEBUG("input topic is latched; latching output topic to match");
00105 latch = true;
00106 }
00107 }
00108 g_pub = msg->advertise(*g_node, g_output_topic, 10, latch, conn_cb);
00109 g_advertised = true;
00110 printf("advertised as %s\n", g_output_topic.c_str());
00111 }
00112
00113
00114 if(g_lazy && !g_pub.getNumSubscribers())
00115 {
00116 ROS_DEBUG("lazy mode; unsubscribing");
00117 delete g_sub;
00118 g_sub = NULL;
00119 }
00120 else
00121 {
00122 if(g_use_messages)
00123 {
00124 ros::Time now;
00125 if(g_use_wallclock)
00126 now.fromSec(ros::WallTime::now().toSec());
00127 else
00128 now = ros::Time::now();
00129 if((now - g_last_time) > g_period)
00130 {
00131 g_pub.publish(msg);
00132 g_last_time = now;
00133 }
00134 }
00135 else
00136 {
00137
00138 ros::Time now;
00139 if(g_use_wallclock)
00140 now.fromSec(ros::WallTime::now().toSec());
00141 else
00142 now = ros::Time::now();
00143 const double t = now.toSec();
00144 while (!g_sent.empty() && g_sent.front().t < t - g_window)
00145 g_sent.pop_front();
00146
00147 uint32_t bytes = 0;
00148 for (deque<Sent>::iterator i = g_sent.begin(); i != g_sent.end(); ++i)
00149 bytes += i->len;
00150 if (bytes < g_bps)
00151 {
00152 g_pub.publish(msg);
00153 g_sent.push_back(Sent(t, msg->size()));
00154 }
00155 }
00156 }
00157 }
00158
00159 #define USAGE "\nusage: \n"\
00160 " throttle messages IN_TOPIC MSGS_PER_SEC [OUT_TOPIC]]\n"\
00161 "OR\n"\
00162 " throttle bytes IN_TOPIC BYTES_PER_SEC WINDOW [OUT_TOPIC]]\n\n"\
00163 " This program will drop messages from IN_TOPIC so that either: the \n"\
00164 " average bytes per second on OUT_TOPIC, averaged over WINDOW \n"\
00165 " seconds, remains below BYTES_PER_SEC, or: the minimum inter-message\n"\
00166 " period is 1/MSGS_PER_SEC. The messages are output \n"\
00167 " to OUT_TOPIC, or (if not supplied), to IN_TOPIC_throttle.\n\n"
00168
00169 int main(int argc, char **argv)
00170 {
00171 if(argc < 3)
00172 {
00173 puts(USAGE);
00174 return 1;
00175 }
00176
00177 g_input_topic = string(argv[2]);
00178
00179 std::string topic_name;
00180 if(!getBaseName(string(argv[2]), topic_name))
00181 return 1;
00182
00183 ros::init(argc, argv, topic_name + string("_throttle"),
00184 ros::init_options::AnonymousName);
00185 bool unreliable = false;
00186 ros::NodeHandle pnh("~");
00187 pnh.getParam("wall_clock", g_use_wallclock);
00188 pnh.getParam("unreliable", unreliable);
00189 pnh.getParam("lazy", g_lazy);
00190
00191 if (unreliable)
00192 g_th.unreliable().reliable();
00193
00194 if(!strcmp(argv[1], "messages"))
00195 g_use_messages = true;
00196 else if(!strcmp(argv[1], "bytes"))
00197 g_use_messages = false;
00198 else
00199 {
00200 puts(USAGE);
00201 return 1;
00202 }
00203
00204 if(g_use_messages && argc == 5)
00205 g_output_topic = string(argv[4]);
00206 else if(!g_use_messages && argc == 6)
00207 g_output_topic = string(argv[5]);
00208 else
00209 g_output_topic = g_input_topic + "_throttle";
00210
00211 if(g_use_messages)
00212 {
00213 if(argc < 4)
00214 {
00215 puts(USAGE);
00216 return 1;
00217 }
00218 g_period = ros::Duration(1.0/atof(argv[3]));
00219 }
00220 else
00221 {
00222 if(argc < 5)
00223 {
00224 puts(USAGE);
00225 return 1;
00226 }
00227 g_bps = atoi(argv[3]);
00228 g_window = atof(argv[4]);
00229 }
00230
00231 ros::NodeHandle n;
00232 g_node = &n;
00233 subscribe();
00234 ros::spin();
00235 return 0;
00236 }
00237