topic_buffer_client.cpp
Go to the documentation of this file.
1 #include <cstdio>
2 #include <vector>
3 #include <list>
4 #include <boost/lambda/lambda.hpp>
5 #include "ros/ros.h"
6 #include "ros/header.h"
7 #include "ros/console.h"
8 #include "std_msgs/Header.h"
9 #include "std_msgs/String.h"
10 #include "jsk_topic_tools/List.h"
11 #include "jsk_topic_tools/Update.h"
13 #include "topic_tools/parse.h"
14 
15 using std::string;
16 using std::vector;
17 using std::list;
18 using namespace topic_tools;
19 static bool debug;
20 
21 class pub_info_t
22 {
23 public:
24  std::string topic_name;
27  bool advertised;
31  ros::Time last_time_received;
32  bool topic_with_header;
33  bool latched;
34  uint32_t last_seq_received;
35  uint32_t topic_received;
36 
37  void publish(const ros::TimerEvent &event)
38  {
39  if ( advertised == false ) return;
40 
41  topic_received++;
42  if ( topic_with_header == true ) {
43  std_msgs::Header header;
44  uint8_t buf[msg->size()];
45  ros::serialization::OStream ostream(buf, msg->size());
46  ros::serialization::IStream istream(buf, msg->size());
47  msg->write(ostream);
48  ((uint32_t *)buf)[0] = last_seq_received + topic_received;
49  ros::Time tmp(last_time_received.toSec() + topic_received * rate.toSec());
50  ((uint32_t *)buf)[1] = tmp.sec;
51  ((uint32_t *)buf)[2] = tmp.nsec;
52  msg->read(istream);
53  }
54  if(debug){
55  ROS_INFO_STREAM("publishing " << topic_name);
56  }
57  pub.publish(msg);
58  }
59 };
60 
62 
63 static list<pub_info_ref> g_pubs;
64 
65 static ros::NodeHandle *g_node = NULL;
66 
67 static bool use_fixed_rate;
68 static bool use_periodic_rate = false;
69 
70 
71 
74 {
75  using namespace boost::lambda;
76 
77  s->msg = boost::const_pointer_cast<ShapeShifter>(msg);
78  s->topic_received = 0; // reset topic_received
79  if ( s->advertised == false ) {
80  s->pub = msg->advertise(*g_node, s->topic_name+string("_buffered"), 10, s->latched);
81  s->advertised = true;
82  }
83  if(debug){
84  ROS_INFO_STREAM("advertised as " << s->topic_name+string("_buffered") << " running at " << 1/(s->rate.toSec()) << "Hz");
85  }
86 
87  // check if msg has header
88  {
89  std_msgs::Header header;
90  uint8_t buf[msg->size()];
91  ros::serialization::OStream stream(buf, msg->size());
92  msg->write(stream);
93  header.seq = ((uint32_t *)buf)[0];
94  header.stamp.sec = ((uint32_t *)buf)[1];
95  header.stamp.nsec = ((uint32_t *)buf)[2];
96 
97  if ( abs((header.stamp - ros::Time::now()).toSec()) < 5.0 ) {
98  if(debug){
99  ROS_INFO_STREAM(" this message contains headers.. seq =" << header.seq << " stamp = " << header.stamp);
100  }
101  s->topic_with_header = true;
102  s->last_seq_received = header.seq;
103  s->last_time_received = header.stamp;
104  }
105  }
106  //g_node->createTimer(ros::Duration(0.1), [](const ros::TimerEvent event) { std::cerr << "hoge" << std::endl; });
107  {// at first publish once
108  ros::TimerEvent ev;
109  s->publish (ev);
110  }
111  s->timer = g_node->createTimer(s->rate, &pub_info_t::publish, s);
112 }
113 
114 
115 int main(int argc, char **argv)
116 {
117  ros::init(argc, argv, "topic_buffer_client", ros::init_options::AnonymousName);
118 
119  ros::NodeHandle n;
120  ros::NodeHandle nh("~");
121 
122  double fixed_rate = 0.1; // 10Hz
123  if (nh.hasParam("fixed_rate")) {
124  use_fixed_rate = true;
125  nh.param ("fixed_rate", fixed_rate, 0.1);
126  ROS_INFO("use fixed rate = %f", fixed_rate);
127  }
128 
129  double update_rate = 10; // 0.1Hz
130  if (nh.hasParam("update_rate")) {
131  nh.param ("update_rate", update_rate, 10.0);
132  ROS_INFO("use update rate = %f", update_rate);
133  }
134 
135  double periodic_rate = 0.1; // 10Hz
136  if (nh.hasParam("periodic_rate")) {
137  use_periodic_rate = true;
138  nh.param ("periodic_rate", periodic_rate, 0.1);
139  ROS_INFO("use periodic rate = %f", periodic_rate);
140  }
141 
142  bool latched;
143  if (nh.hasParam("latched")) {
144  nh.param ("latched", latched, false);
145  if(latched) {
146  ROS_INFO("use latched");
147  }
148  }
149 
150  g_node = &n;
151  bool use_service_p = false;
152  std::vector<std::string> target_topics;
153  // use service or parameter
154  // check the parameter first
156  if (!nh.getParam ("topics", topics)) {
157  ROS_WARN("no ~topics is available, use service interface");
158  use_service_p = true;
159  }
160  else {
161  switch (topics.getType ()) {
163  for (int d = 0; d < topics.size(); ++d) {
164  target_topics.push_back((std::string)(topics[d]));
165  }
166  break;
167  }
168  default: {
169  ROS_WARN("~topics mismatches type, it should be a list, use service interface");
170  use_service_p = true;
171  }
172  }
173  }
174 
175  if (use_service_p) {
176  target_topics.clear();
177  // New service
178  ros::service::waitForService(string("/list"), -1);
179  ros::ServiceClient sc_list = n.serviceClient<jsk_topic_tools::List>(string("/list"), true);
180  jsk_topic_tools::List::Request req;
181  jsk_topic_tools::List::Response res;
182  ROS_INFO_STREAM("calling " << sc_list.getService());
183  while ( sc_list.call(req, res) == false) {
184  ROS_WARN_STREAM("calling " << sc_list.getService() << " fails, retry...");
185  ros::Duration(1).sleep();
186  }
187  ROS_WARN_STREAM("calling /list success!!!");
188  for(vector<std::string>::iterator it = res.topic_names.begin(); it != res.topic_names.end(); ++it) {
189  target_topics.push_back(*it);
190  }
191  ROS_INFO_STREAM("calling /list has done.. found " << res.topic_names.size() << " topics to publish");
192  }
193  for(size_t i = 0; i < target_topics.size(); i++) {
195  pub_info->topic_name = target_topics[i];
196  if (use_fixed_rate) {
197  pub_info->rate = ros::Duration(fixed_rate);
198  }
199  pub_info->latched = latched;
200  pub_info->advertised = false;
201  pub_info->topic_with_header = false;
202  ROS_INFO_STREAM("subscribe " << pub_info->topic_name+string("_update"));
203  pub_info->sub = new ros::Subscriber(n.subscribe<ShapeShifter>(pub_info->topic_name+string("_update"), 10, boost::bind(in_cb, _1, pub_info)));
204 
205  g_pubs.push_back(pub_info);
206  }
207 
208  ros::Rate rate_loop(100);
209  ros::Time last_updated;
210 
211  bool use_service = true;
212  nh.param("use_service", use_service, true);
213 
214  nh.param("debug", debug, false);
215 
216  ros::ServiceClient sc_update = n.serviceClient<jsk_topic_tools::Update>(string("/update"), true);
217  ros::Publisher pub_update;
218  if (!use_service) {
219  pub_update = n.advertise<std_msgs::String>("/update", 1);
220  }
221 
222  if (use_periodic_rate) {
223  for (list<pub_info_ref>::iterator it = g_pubs.begin();
224  it != g_pubs.end();
225  ++it) {
226  jsk_topic_tools::Update::Request req;
227  jsk_topic_tools::Update::Response res;
228  req.topic_name = (*it)->topic_name;
229  req.periodic = true;
230  req.periodic_rate = ros::Duration(periodic_rate);
231  sc_update.call(req, res);
232  ROS_INFO_STREAM("sending request for periodic rate publish topic:" << req.topic_name << " rate:" << req.periodic_rate);
233  }
234  }
235  while ( ros::ok() ) {
236 
237  if ( update_rate >= 0 && (ros::Time::now() - last_updated) > ros::Duration(update_rate) ) {
238  for (list<pub_info_ref>::iterator it = g_pubs.begin();
239  it != g_pubs.end();
240  ++it) {
241 
242  if (use_service) {
243  jsk_topic_tools::Update::Request req;
244  jsk_topic_tools::Update::Response res;
245  req.topic_name = (*it)->topic_name;
246  if ( sc_update.call(req, res) == false ) {
247  ROS_ERROR_STREAM("calling /update (" << req.topic_name << ") fails, retry...");
248  sc_update = n.serviceClient<jsk_topic_tools::Update>(string("/update"), true);
249  continue;
250  }
251  (*it)->rate = ros::Duration(res.rate);
252  if(debug){
253  ROS_INFO_STREAM("calling /update " << req.topic_name << " .. " << res.rate);
254  }
255  }
256  else {
257  std_msgs::String msg;
258  msg.data = (*it)->topic_name;
259  pub_update.publish(msg);
260  if(debug){
261  ROS_INFO_STREAM("publishing /update " << msg.data);
262  }
263  }
264  }
265  last_updated = ros::Time::now();
266  }
267 
268  ros::spinOnce();
269  rate_loop.sleep();
270  }
271 
272  return 0;
273 }
d
msg
ServiceClient serviceClient(const std::string &service_name, bool persistent=false, const M_string &header_values=M_string())
pub
boost::shared_ptr< pub_info_t > pub_info_ref
void publish(const boost::shared_ptr< M > &message) const
Subscriber subscribe(const std::string &topic, uint32_t queue_size, void(T::*fp)(M), T *obj, const TransportHints &transport_hints=TransportHints())
int size() const
bool sleep() const
ROSCPP_DECL void init(int &argc, char **argv, const std::string &name, uint32_t options=0)
bool call(MReq &req, MRes &res)
Type const & getType() const
int main(int argc, char **argv)
#define ROS_WARN(...)
static list< pub_info_ref > g_pubs
void in_cb(const boost::shared_ptr< ShapeShifter const > &msg, boost::shared_ptr< pub_info_t > s)
static bool use_periodic_rate
#define ROS_INFO(...)
bool param(const std::string &param_name, T &param_val, const T &default_val) const
static ros::NodeHandle * g_node
ROSCPP_DECL bool ok()
Timer createTimer(Rate r, Handler h, Obj o, bool oneshot=false, bool autostart=true) const
Publisher advertise(const std::string &topic, uint32_t queue_size, bool latch=false)
#define ROS_WARN_STREAM(args)
bool sleep()
#define ROS_INFO_STREAM(args)
INLINE Rall1d< T, V, S > abs(const Rall1d< T, V, S > &x)
bool getParam(const std::string &key, std::string &s) const
static Time now()
std::string getService()
bool hasParam(const std::string &key) const
#define ROS_ERROR_STREAM(args)
ROSCPP_DECL void spinOnce()
static bool debug
static bool use_fixed_rate
ROSCPP_DECL bool waitForService(const std::string &service_name, int32_t timeout)


jsk_topic_tools
Author(s): Kei Okada , Yusuke Furuta
autogenerated on Tue Feb 6 2018 03:45:19