throttle.cpp
Go to the documentation of this file.
1 // throttle will transform a topic to have a limited number of bytes per second
3 //
4 // Copyright (C) 2009, Morgan Quigley
5 //
6 // Redistribution and use in source and binary forms, with or without
7 // modification, are permitted provided that the following conditions are met:
8 // * Redistributions of source code must retain the above copyright notice,
9 // this list of conditions and the following disclaimer.
10 // * Redistributions in binary form must reproduce the above copyright
11 // notice, this list of conditions and the following disclaimer in the
12 // documentation and/or other materials provided with the distribution.
13 // * Neither the name of Stanford University nor the names of its
14 // contributors may be used to endorse or promote products derived from
15 // this software without specific prior written permission.
16 //
17 // THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
18 // AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
19 // IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
20 // ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
21 // LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
22 // CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
23 // SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
24 // INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
25 // CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
26 // ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27 // POSSIBILITY OF SUCH DAMAGE.
29 
30 
31 // this could be made a lot smarter by trying to analyze and predict the
32 // message stream density, etc., rather than just being greedy and stuffing
33 // the output as fast as it can.
34 
35 #include <cstdio>
36 #include <cstdlib>
37 #include <deque>
39 #include "topic_tools/parse.h"
40 
41 using std::string;
42 using std::vector;
43 using std::deque;
44 using namespace topic_tools;
45 
46 // TODO: move all these globals into a reasonable local scope
48 uint32_t g_bps = 0; // bytes per second, not bits!
49 ros::Duration g_period; // minimum inter-message period
50 double g_window = 1.0; // 1 second window for starters
51 bool g_advertised = false;
59 bool g_lazy;
61 
62 class Sent
63 {
64 public:
65  double t;
66  uint32_t len;
67  Sent(double _t, uint32_t _len) : t(_t), len(_len) { }
68 };
69 deque<Sent> g_sent;
70 
72 void in_cb(const ros::MessageEvent<ShapeShifter>& msg_event);
73 
74 void subscribe()
75 {
76  g_sub = new ros::Subscriber(g_node->subscribe(g_input_topic, 10, &in_cb, g_th));
77 }
78 
80 {
81  // If we're in lazy subscribe mode, and the first subscriber just
82  // connected, then subscribe, #3546
83  if(g_lazy && !g_sub)
84  {
85  ROS_DEBUG("lazy mode; resubscribing");
86  subscribe();
87  }
88 }
89 
90 void in_cb(const ros::MessageEvent<ShapeShifter>& msg_event)
91 {
93  boost::shared_ptr<const ros::M_string> const& connection_header = msg_event.getConnectionHeaderPtr();
94 
95  if (!g_advertised)
96  {
97  // If the input topic is latched, make the output topic latched
98  bool latch = false;
99  if (connection_header)
100  {
101  ros::M_string::const_iterator it = connection_header->find("latching");
102  if((it != connection_header->end()) && (it->second == "1"))
103  {
104  ROS_DEBUG("input topic is latched; latching output topic to match");
105  latch = true;
106  }
107  }
108  g_pub = msg->advertise(*g_node, g_output_topic, 10, latch, conn_cb);
109  g_advertised = true;
110  printf("advertised as %s\n", g_output_topic.c_str());
111  }
112  // If we're in lazy subscribe mode, and nobody's listening,
113  // then unsubscribe, #3546.
114  if(g_lazy && !g_pub.getNumSubscribers())
115  {
116  ROS_DEBUG("lazy mode; unsubscribing");
117  delete g_sub;
118  g_sub = NULL;
119  }
120  else
121  {
122  if(g_use_messages)
123  {
124  ros::Time now;
125  if(g_use_wallclock)
126  now.fromSec(ros::WallTime::now().toSec());
127  else
128  now = ros::Time::now();
129  if((now - g_last_time) > g_period)
130  {
131  g_pub.publish(msg);
132  g_last_time = now;
133  }
134  }
135  else
136  {
137  // pop the front of the queue until it's within the window
138  ros::Time now;
139  if(g_use_wallclock)
140  now.fromSec(ros::WallTime::now().toSec());
141  else
142  now = ros::Time::now();
143  const double t = now.toSec();
144  while (!g_sent.empty() && g_sent.front().t < t - g_window)
145  g_sent.pop_front();
146  // sum up how many bytes are in the window
147  uint32_t bytes = 0;
148  for (deque<Sent>::iterator i = g_sent.begin(); i != g_sent.end(); ++i)
149  bytes += i->len;
150  if (bytes < g_bps)
151  {
152  g_pub.publish(msg);
153  g_sent.push_back(Sent(t, msg->size()));
154  }
155  }
156  }
157 }
158 
159 #define USAGE "\nusage: \n"\
160  " throttle messages IN_TOPIC MSGS_PER_SEC [OUT_TOPIC]]\n"\
161  "OR\n"\
162  " throttle bytes IN_TOPIC BYTES_PER_SEC WINDOW [OUT_TOPIC]]\n\n"\
163  " This program will drop messages from IN_TOPIC so that either: the \n"\
164  " average bytes per second on OUT_TOPIC, averaged over WINDOW \n"\
165  " seconds, remains below BYTES_PER_SEC, or: the minimum inter-message\n"\
166  " period is 1/MSGS_PER_SEC. The messages are output \n"\
167  " to OUT_TOPIC, or (if not supplied), to IN_TOPIC_throttle.\n\n"
168 
169 int main(int argc, char **argv)
170 {
171  if(argc < 3)
172  {
173  puts(USAGE);
174  return 1;
175  }
176 
177  g_input_topic = string(argv[2]);
178 
179  std::string topic_name;
180  if(!getBaseName(string(argv[2]), topic_name))
181  return 1;
182 
183  ros::init(argc, argv, topic_name + string("_throttle"),
185  bool unreliable = false;
186  ros::NodeHandle pnh("~");
187  pnh.getParam("wall_clock", g_use_wallclock);
188  pnh.getParam("unreliable", unreliable);
189  pnh.getParam("lazy", g_lazy);
190 
191  if (unreliable)
192  g_th.unreliable().reliable(); // Prefers unreliable, but will accept reliable.
193 
194  if(!strcmp(argv[1], "messages"))
195  g_use_messages = true;
196  else if(!strcmp(argv[1], "bytes"))
197  g_use_messages = false;
198  else
199  {
200  puts(USAGE);
201  return 1;
202  }
203 
204  if(g_use_messages && argc == 5)
205  g_output_topic = string(argv[4]);
206  else if(!g_use_messages && argc == 6)
207  g_output_topic = string(argv[5]);
208  else
209  g_output_topic = g_input_topic + "_throttle";
210 
211  if(g_use_messages)
212  {
213  if(argc < 4)
214  {
215  puts(USAGE);
216  return 1;
217  }
218  g_period = ros::Duration(1.0/atof(argv[3]));
219  }
220  else
221  {
222  if(argc < 5)
223  {
224  puts(USAGE);
225  return 1;
226  }
227  g_bps = atoi(argv[3]);
228  g_window = atof(argv[4]);
229  }
230 
231  ros::NodeHandle n;
232  g_node = &n;
233  subscribe();
234  ros::spin();
235  return 0;
236 }
237 
#define USAGE
Definition: throttle.cpp:159
deque< Sent > g_sent
Definition: throttle.cpp:69
ros::Duration g_period
Definition: throttle.cpp:49
TransportHints & reliable()
void publish(const boost::shared_ptr< M > &message) const
Subscriber subscribe(const std::string &topic, uint32_t queue_size, void(T::*fp)(M), T *obj, const TransportHints &transport_hints=TransportHints())
uint32_t len
Definition: throttle.cpp:66
int main(int argc, char **argv)
Definition: throttle.cpp:169
ROSCPP_DECL void init(int &argc, char **argv, const std::string &name, uint32_t options=0)
bool g_use_wallclock
Definition: throttle.cpp:58
bool g_lazy
Definition: throttle.cpp:59
const boost::shared_ptr< M_string > & getConnectionHeaderPtr() const
ros::Subscriber * g_sub
Definition: throttle.cpp:55
uint32_t g_bps
Definition: throttle.cpp:48
const boost::shared_ptr< ConstMessage > & getConstMessage() const
TransportHints & unreliable()
bool g_use_messages
Definition: throttle.cpp:56
TOPIC_TOOLS_DECL bool getBaseName(const std::string &full_name, std::string &base_name)
Definition: parse.cpp:40
void subscribe()
Definition: throttle.cpp:74
ROSCPP_DECL void spin()
double t
Definition: throttle.cpp:65
ros::TransportHints g_th
Definition: throttle.cpp:60
ros::Time g_last_time
Definition: throttle.cpp:57
string g_output_topic
Definition: throttle.cpp:52
bool g_advertised
Definition: throttle.cpp:51
double g_window
Definition: throttle.cpp:50
void conn_cb(const ros::SingleSubscriberPublisher &)
Definition: throttle.cpp:79
uint32_t getNumSubscribers() const
static WallTime now()
bool getParam(const std::string &key, std::string &s) const
Time & fromSec(double t)
static Time now()
Sent(double _t, uint32_t _len)
Definition: throttle.cpp:67
string g_input_topic
Definition: throttle.cpp:53
ros::NodeHandle * g_node
Definition: throttle.cpp:47
ros::Publisher g_pub
Definition: throttle.cpp:54
void in_cb(const ros::MessageEvent< ShapeShifter > &msg_event)
Definition: throttle.cpp:90
#define ROS_DEBUG(...)


topic_tools
Author(s): Morgan Quigley, Brian Gerkey
autogenerated on Sun Feb 3 2019 03:30:24