topic_buffer_server.cpp
Go to the documentation of this file.
1 #include <cstdio>
2 #include <vector>
3 #include <list>
4 #include "ros/console.h"
5 #include "std_msgs/String.h"
6 #include "jsk_topic_tools/List.h"
7 #include "jsk_topic_tools/Update.h"
9 #include "topic_tools/parse.h"
10 #include <boost/thread.hpp>
11 #include <boost/date_time/posix_time/posix_time.hpp>
12 #include <boost/date_time.hpp>
13 
14 using std::string;
15 using std::vector;
16 using std::list;
17 using namespace topic_tools;
18 
19 static bool use_periodic_rate = false;
20 
21 class sub_info_t
22 {
23 public:
24  boost::mutex mutex;
25  std::string topic_name;
28  bool advertised;
30  ros::Time last_time_received;
32  bool periodic;
33  ros::Duration periodic_rate;
34  boost::thread periodic_thread;
35  void periodic_update_topic(){
36  ROS_INFO_STREAM("topic " << this->topic_name << " is now published at " << 1.0/(this->periodic_rate).toSec() << " Hz periodically.");
37  while(ros::ok() && this->periodic){
38  double sleep_sec;
39  {
40  sleep_sec = this->periodic_rate.toNSec();
41  boost::mutex::scoped_lock lock(mutex);
42  this->pub.publish(this->msg);
43  }
44  //ROS_DEBUG_STREAM("sleep " << this->periodic_rate.toNSec() * 1e-6 << " msec.");
45  boost::this_thread::sleep(boost::posix_time::milliseconds(sleep_sec * 1e-6));
46  }
47  ROS_INFO_STREAM("topic " << this->topic_name << " is now NOT published.");
48  }
49 };
50 
52 
53 static list<sub_info_ref> g_subs;
54 
55 static ros::NodeHandle *g_node = NULL;
56 
59 {
60  boost::mutex::scoped_lock lock(s->mutex);
61  if ( s->rate.isZero() && s->last_time_received.isZero() ) { // skip first time
62  } else if ( s->rate.isZero() && ! s->last_time_received.isZero() ) { // just for second time
63  s->rate = ros::Time::now() - s->last_time_received;
64  } else {
65  double alpha = 0.1; //N = 19 alpha = 2 / ( N + 1 )
66  s->rate = (ros::Duration(alpha * (ros::Time::now() - s->last_time_received).toSec() + (1 - alpha) * s->rate.toSec()));
67  }
68 
69  if ( s->advertised == false ) {
70  s->pub = msg->advertise(*g_node, s->topic_name+string("_update"), 10);
71  s->advertised = true;
72  ROS_INFO_STREAM("advertised as " << s->topic_name+string("_update"));
73  }
74 
75  s->msg = msg;
76 
77  // update last_time_received
78  s->last_time_received = ros::Time::now();
79 }
80 
81 
82 bool list_topic_cb(jsk_topic_tools::List::Request& req,
83  jsk_topic_tools::List::Response& res)
84 {
85  ROS_INFO_STREAM("service (list) is called, returns " << g_subs.size() << " topics");
86  for (list<sub_info_ref>::iterator it = g_subs.begin();
87  it != g_subs.end();
88  ++it)
89  {
90  while( ! (*it)->advertised ) {
91  ROS_WARN_STREAM("service (list) waiting.. " << (*it)->topic_name << " is not running yet...");
92  return false;
93  }
94  res.topic_names.push_back((*it)->topic_name);
95  ROS_INFO_STREAM("service (list) returns res.topic_name:" << (*it)->topic_name );
96  }
97 
98  return true;
99 }
100 
101 void update_topic_cb(const std_msgs::String::ConstPtr &msg) {
102  ROS_INFO_STREAM("topic (update) is called, searching from " << g_subs.size() << " topics");
103  for (list<sub_info_ref>::iterator it = g_subs.begin();
104  it != g_subs.end();
105  ++it)
106  {
107  if ( (*it)->topic_name == msg->data && (*it)->advertised == true ) {
108  if (! (*it)->advertised ) {
109  ROS_WARN_STREAM("topic (update) " << (*it)->topic_name << " is not running yet...");
110  continue;
111  }
112  ROS_INFO_STREAM("topic (update) " << (*it)->topic_name << " running at " << 1.0/((*it)->rate).toSec() << " Hz");
113  (*it)->pub.publish((*it)->msg);
114  ROS_INFO_STREAM("topic (update) is called, req.topic:" << msg->data);
115  return;
116  }
117  }
118  ROS_ERROR_STREAM("could not find topic named " << msg->data );
119 }
120 
121 bool update_topic_cb(jsk_topic_tools::Update::Request& req,
122  jsk_topic_tools::Update::Response& res)
123 {
124 
125  ROS_INFO_STREAM("service (update) is called, searching from " << g_subs.size() << " topics");
126  for (list<sub_info_ref>::iterator it = g_subs.begin();
127  it != g_subs.end();
128  ++it)
129  {
130  if ( (*it)->topic_name == req.topic_name && (*it)->advertised == true ) {
131  if (! (*it)->advertised ) {
132  ROS_WARN_STREAM("service (update) " << (*it)->topic_name << " is not running yet...");
133  continue;
134  }
135  if (req.periodic && ! (*it)->periodic) {
136  // start periodic publish thread
137  if (req.periodic_rate > (*it)->rate) {
138  (*it)->periodic = true;
139  (*it)->periodic_rate = ros::Duration(req.periodic_rate);
140  (*it)->periodic_thread = boost::thread(boost::bind(&sub_info_t::periodic_update_topic, &(**it)));
141  res.rate = (*it)->rate.toSec();
142  ROS_INFO_STREAM("periodic publish of topic is started in the new thread. topic:" << (*it)->topic_name << " rate:" << (*it)->periodic_rate);
143  return true;
144  } else {
145  ROS_ERROR_STREAM("Invalid periodic rate is set at " << (*it)->topic_name << ". rate must be 0.0 < " << 1.0/req.periodic_rate.toSec() << " < " << 1.0/((*it)->rate).toSec());
146  return false;
147  }
148  } else if (! req.periodic && (*it)->periodic) {
149  // stop periodic publish thread
150  (*it)->periodic = false;
151  ((*it)->periodic_thread).join();
152  res.rate = 0.0;
153  ROS_INFO_STREAM("periodic publish of topic " << (*it)->topic_name << " is now stopped.");
154  return true;
155  } else if (req.periodic) {
156  if (req.periodic_rate > (*it)->rate) {
157  (*it)->periodic = true;
158  (*it)->periodic_rate = ros::Duration(req.periodic_rate);
159  ROS_INFO_STREAM("periodic publish of topic is started in the existing thread. topic:" << (*it)->topic_name << " rate:" << (*it)->periodic_rate);
160  return true;
161  } else {
162  ROS_ERROR_STREAM("Invalid periodic rate is set at " << (*it)->topic_name << ". rate must be 0.0 < " << 1.0/req.periodic_rate.toSec() << " < " << 1.0/((*it)->rate).toSec());
163  return false;
164  }
165  } else {
166  // single publish
167  ROS_INFO_STREAM("service (update) " << (*it)->topic_name << " running at " << 1.0/((*it)->rate).toSec() << " Hz");
168  (*it)->pub.publish((*it)->msg);
169  res.rate = (*it)->rate.toSec();
170  ROS_INFO_STREAM("service (update) is called, req.topic:" << req.topic_name << ", res.rate " << res.rate);
171  return true;
172  }
173  }
174  }
175  ROS_ERROR_STREAM("could not find topic named " << req.topic_name );
176  return false;
177 }
178 
179 
180 int main(int argc, char **argv)
181 {
182  vector<string> args;
183  ros::removeROSArgs(argc, (const char**)argv, args);
184 
185  if (args.size() < 2)
186  {
187  printf("\nusage: topic_buffer_Server IN_TOPIC1 [IN_TOPIC2 [...]]\n\n");
188  return 1;
189  }
190 
191  ros::init(argc, argv, "topic_buffer_server");
192  vector<string> topics;
193  for (unsigned int i = 1; i < args.size(); i++)
194  topics.push_back(args[i]);
195 
196  ros::NodeHandle n;
197  ros::NodeHandle nh("~");
198 
199  g_node = &n;
200 
201  double periodic_rate = 0.1; // 10Hz
202  if (nh.hasParam("periodic_rate")) {
203  use_periodic_rate = true;
204  nh.param ("periodic_rate", periodic_rate, 0.1);
205  ROS_INFO("use periodic rate = %f", periodic_rate);
206  }
207 
208  for (size_t i = 0; i < topics.size(); i++)
209  {
210  //sub_info_t sub_info;
212  sub_info->topic_name = ros::names::resolve(topics[i]);
213  sub_info->last_time_received = ros::Time(0);
214  sub_info->rate = ros::Duration(0);
215  sub_info->advertised = false;
216  sub_info->periodic = false;
217  ROS_INFO_STREAM("subscribe " << sub_info->topic_name);
218  sub_info->sub = new ros::Subscriber(n.subscribe<ShapeShifter>(sub_info->topic_name, 10, boost::bind(in_cb, _1, sub_info)));
219 
220  // waiting for all topics are publisherd
221  while (sub_info->sub->getNumPublishers() == 0 ) {
222  ros::Duration(1.0).sleep();
223  ROS_WARN_STREAM("wait for " << sub_info->topic_name << " ... " << i << "/" << topics.size());
224  }
225 
226  g_subs.push_back(sub_info);
227  }
228  ROS_INFO_STREAM("setup done, subscribed " << topics.size() << " topics");
229 
230  if(use_periodic_rate) {
231  for (list<sub_info_ref>::iterator it = g_subs.begin(); it != g_subs.end(); ++it) {
232  (*it)->periodic = true;
233  (*it)->periodic_rate = ros::Duration(periodic_rate);
234  (*it)->periodic_thread = boost::thread(boost::bind(&sub_info_t::periodic_update_topic, &(**it)));
235  ROS_INFO_STREAM("periodic publish of topic is started in the new thread. topic:" << (*it)->topic_name << " rate:" << (*it)->periodic_rate);
236  }
237  }
238 
239  // New service
240  ros::ServiceServer ss_list = nh.advertiseService(string("list"), list_topic_cb);
241 
242  ros::ServiceServer ss_update = nh.advertiseService(string("update"), update_topic_cb);
243  ros::Subscriber ss_update_sub = nh.subscribe("update", 1, update_topic_cb);
244  ros::spin();
245 
246  return 0;
247 }
msg
pub
void publish(const boost::shared_ptr< M > &message) const
void update_topic_cb(const std_msgs::String::ConstPtr &msg)
boost::shared_ptr< sub_info_t > sub_info_ref
Subscriber subscribe(const std::string &topic, uint32_t queue_size, void(T::*fp)(M), T *obj, const TransportHints &transport_hints=TransportHints())
bool list_topic_cb(jsk_topic_tools::List::Request &req, jsk_topic_tools::List::Response &res)
bool sleep() const
ROSCPP_DECL void init(int &argc, char **argv, const std::string &name, uint32_t options=0)
ROSCPP_DECL std::string resolve(const std::string &name, bool remap=true)
ServiceServer advertiseService(const std::string &service, bool(T::*srv_func)(MReq &, MRes &), T *obj)
static bool use_periodic_rate
void in_cb(const boost::shared_ptr< ShapeShifter const > &msg, boost::shared_ptr< sub_info_t > s)
ROSCPP_DECL void spin(Spinner &spinner)
#define ROS_INFO(...)
int64_t toNSec() const
bool param(const std::string &param_name, T &param_val, const T &default_val) const
ROSCPP_DECL bool ok()
#define ROS_WARN_STREAM(args)
static list< sub_info_ref > g_subs
#define ROS_INFO_STREAM(args)
int main(int argc, char **argv)
static Time now()
ROSCPP_DECL void removeROSArgs(int argc, const char *const *argv, V_string &args_out)
bool hasParam(const std::string &key) const
#define ROS_ERROR_STREAM(args)
static ros::NodeHandle * g_node


jsk_topic_tools
Author(s): Kei Okada , Yusuke Furuta
autogenerated on Tue Feb 6 2018 03:45:19