throttle.cpp
Go to the documentation of this file.
1 // throttle will transform a topic to have a limited number of bytes per second
3 //
4 // Copyright (C) 2009, Morgan Quigley
5 //
6 // Redistribution and use in source and binary forms, with or without
7 // modification, are permitted provided that the following conditions are met:
8 // * Redistributions of source code must retain the above copyright notice,
9 // this list of conditions and the following disclaimer.
10 // * Redistributions in binary form must reproduce the above copyright
11 // notice, this list of conditions and the following disclaimer in the
12 // documentation and/or other materials provided with the distribution.
13 // * Neither the name of Stanford University nor the names of its
14 // contributors may be used to endorse or promote products derived from
15 // this software without specific prior written permission.
16 //
17 // THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
18 // AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
19 // IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
20 // ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
21 // LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
22 // CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
23 // SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
24 // INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
25 // CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
26 // ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27 // POSSIBILITY OF SUCH DAMAGE.
29 
30 
31 // this could be made a lot smarter by trying to analyze and predict the
32 // message stream density, etc., rather than just being greedy and stuffing
33 // the output as fast as it can.
34 
35 #include <cstdio>
36 #include <cstdlib>
37 #include <deque>
39 #include "topic_tools/parse.h"
40 
41 using std::string;
42 using std::vector;
43 using std::deque;
44 using namespace topic_tools;
45 
46 // TODO: move all these globals into a reasonable local scope
48 uint32_t g_bps = 0; // bytes per second, not bits!
49 ros::Duration g_period; // minimum inter-message period
50 double g_window = 1.0; // 1 second window for starters
51 bool g_advertised = false;
59 bool g_lazy;
60 bool g_force_latch = false;
61 bool g_force_latch_value = true;
63 
64 class Sent
65 {
66 public:
67  double t;
68  uint32_t len;
69  Sent(double _t, uint32_t _len) : t(_t), len(_len) { }
70 };
71 deque<Sent> g_sent;
72 
74 void in_cb(const ros::MessageEvent<ShapeShifter>& msg_event);
75 
76 void subscribe()
77 {
78  g_sub = new ros::Subscriber(g_node->subscribe(g_input_topic, 10, &in_cb, g_th));
79 }
80 
82 {
83  // If we're in lazy subscribe mode, and the first subscriber just
84  // connected, then subscribe, #3546
85  if(g_lazy && !g_sub)
86  {
87  ROS_DEBUG("lazy mode; resubscribing");
88  subscribe();
89  }
90 }
91 
93 {
94  if (connection_header)
95  {
96  ros::M_string::const_iterator it = connection_header->find("latching");
97  if ((it != connection_header->end()) && (it->second == "1"))
98  {
99  ROS_DEBUG("input topic is latched; latching output topic to match");
100  return true;
101  }
102  }
103 
104  return false;
105 }
106 
108 {
110  boost::shared_ptr<const ros::M_string> const& connection_header = msg_event.getConnectionHeaderPtr();
111 
112  if (!g_advertised)
113  {
114  const bool latch = g_force_latch ? g_force_latch_value : is_latching(connection_header);
115  g_pub = msg->advertise(*g_node, g_output_topic, 10, latch, conn_cb);
116  g_advertised = true;
117  printf("advertised as %s\n", g_output_topic.c_str());
118  }
119  // If we're in lazy subscribe mode, and nobody's listening,
120  // then unsubscribe, #3546.
121  if(g_lazy && !g_pub.getNumSubscribers())
122  {
123  ROS_DEBUG("lazy mode; unsubscribing");
124  delete g_sub;
125  g_sub = NULL;
126  }
127  else
128  {
129  if(g_use_messages)
130  {
131  ros::Time now;
132  if(g_use_wallclock)
133  now.fromSec(ros::WallTime::now().toSec());
134  else
135  now = ros::Time::now();
136  if (g_last_time > now)
137  {
138  ROS_WARN("Detected jump back in time, resetting throttle period to now for.");
139  g_last_time = now;
140  }
141  if((now - g_last_time) > g_period)
142  {
143  g_pub.publish(msg);
144  g_last_time = now;
145  }
146  }
147  else
148  {
149  // pop the front of the queue until it's within the window
150  ros::Time now;
151  if(g_use_wallclock)
152  now.fromSec(ros::WallTime::now().toSec());
153  else
154  now = ros::Time::now();
155  const double t = now.toSec();
156  while (!g_sent.empty() && g_sent.front().t < t - g_window)
157  g_sent.pop_front();
158  // sum up how many bytes are in the window
159  uint32_t bytes = 0;
160  for (deque<Sent>::iterator i = g_sent.begin(); i != g_sent.end(); ++i)
161  bytes += i->len;
162  if (bytes < g_bps)
163  {
164  g_pub.publish(msg);
165  g_sent.push_back(Sent(t, msg->size()));
166  }
167  }
168  }
169 }
170 
171 #define USAGE "\nusage: \n"\
172  " throttle messages IN_TOPIC MSGS_PER_SEC [OUT_TOPIC]]\n"\
173  "OR\n"\
174  " throttle bytes IN_TOPIC BYTES_PER_SEC WINDOW [OUT_TOPIC]]\n\n"\
175  " This program will drop messages from IN_TOPIC so that either: the \n"\
176  " average bytes per second on OUT_TOPIC, averaged over WINDOW \n"\
177  " seconds, remains below BYTES_PER_SEC, or: the minimum inter-message\n"\
178  " period is 1/MSGS_PER_SEC. The messages are output \n"\
179  " to OUT_TOPIC, or (if not supplied), to IN_TOPIC_throttle.\n\n"
180 
181 int main(int argc, char **argv)
182 {
183  if(argc < 3)
184  {
185  puts(USAGE);
186  return 1;
187  }
188 
189  g_input_topic = string(argv[2]);
190 
191  std::string topic_name;
192  if(!getBaseName(string(argv[2]), topic_name))
193  return 1;
194 
195  ros::init(argc, argv, topic_name + string("_throttle"),
197  bool unreliable = false;
198  ros::NodeHandle pnh("~");
199  pnh.getParam("wall_clock", g_use_wallclock);
200  pnh.getParam("unreliable", unreliable);
201  pnh.getParam("lazy", g_lazy);
202  g_force_latch = pnh.getParam("force_latch", g_force_latch_value);
203 
204  if (unreliable)
205  g_th.unreliable().reliable(); // Prefers unreliable, but will accept reliable.
206 
207  if(!strcmp(argv[1], "messages"))
208  g_use_messages = true;
209  else if(!strcmp(argv[1], "bytes"))
210  g_use_messages = false;
211  else
212  {
213  puts(USAGE);
214  return 1;
215  }
216 
217  if(g_use_messages && argc == 5)
218  g_output_topic = string(argv[4]);
219  else if(!g_use_messages && argc == 6)
220  g_output_topic = string(argv[5]);
221  else
222  g_output_topic = g_input_topic + "_throttle";
223 
224  if(g_use_messages)
225  {
226  if(argc < 4)
227  {
228  puts(USAGE);
229  return 1;
230  }
231  g_period = ros::Duration(1.0/atof(argv[3]));
232  }
233  else
234  {
235  if(argc < 5)
236  {
237  puts(USAGE);
238  return 1;
239  }
240  g_bps = atoi(argv[3]);
241  g_window = atof(argv[4]);
242  }
243 
244  ros::NodeHandle n;
245  g_node = &n;
246  subscribe();
247  ros::spin();
248  return 0;
249 }
250 
#define USAGE
Definition: throttle.cpp:171
const boost::shared_ptr< ConstMessage > & getConstMessage() const
deque< Sent > g_sent
Definition: throttle.cpp:71
ros::Duration g_period
Definition: throttle.cpp:49
TransportHints & reliable()
Subscriber subscribe(const std::string &topic, uint32_t queue_size, void(T::*fp)(M), T *obj, const TransportHints &transport_hints=TransportHints())
uint32_t len
Definition: throttle.cpp:68
int main(int argc, char **argv)
Definition: throttle.cpp:181
ROSCPP_DECL void init(int &argc, char **argv, const std::string &name, uint32_t options=0)
bool g_use_wallclock
Definition: throttle.cpp:58
bool g_lazy
Definition: throttle.cpp:59
#define ROS_WARN(...)
Time & fromSec(double t)
ros::Subscriber * g_sub
Definition: throttle.cpp:55
bool is_latching(const boost::shared_ptr< const ros::M_string > &connection_header)
Definition: throttle.cpp:92
uint32_t g_bps
Definition: throttle.cpp:48
void publish(const boost::shared_ptr< M > &message) const
TransportHints & unreliable()
bool g_use_messages
Definition: throttle.cpp:56
TOPIC_TOOLS_DECL bool getBaseName(const std::string &full_name, std::string &base_name)
Definition: parse.cpp:40
bool getParam(const std::string &key, std::string &s) const
bool g_force_latch_value
Definition: throttle.cpp:61
void subscribe()
Definition: throttle.cpp:76
ROSCPP_DECL void spin()
double t
Definition: throttle.cpp:67
ros::TransportHints g_th
Definition: throttle.cpp:62
bool g_force_latch
Definition: throttle.cpp:60
ros::Time g_last_time
Definition: throttle.cpp:57
string g_output_topic
Definition: throttle.cpp:52
bool g_advertised
Definition: throttle.cpp:51
double g_window
Definition: throttle.cpp:50
void conn_cb(const ros::SingleSubscriberPublisher &)
Definition: throttle.cpp:81
static WallTime now()
static Time now()
Sent(double _t, uint32_t _len)
Definition: throttle.cpp:69
string g_input_topic
Definition: throttle.cpp:53
ros::NodeHandle * g_node
Definition: throttle.cpp:47
ros::Publisher g_pub
Definition: throttle.cpp:54
const boost::shared_ptr< M_string > & getConnectionHeaderPtr() const
void in_cb(const ros::MessageEvent< ShapeShifter > &msg_event)
Definition: throttle.cpp:107
uint32_t getNumSubscribers() const
#define ROS_DEBUG(...)


topic_tools
Author(s): Morgan Quigley, Brian Gerkey, Dirk Thomas
autogenerated on Mon Feb 28 2022 23:34:17