5 #include "std_msgs/String.h" 6 #include "jsk_topic_tools/List.h" 7 #include "jsk_topic_tools/Update.h" 10 #include <boost/thread.hpp> 11 #include <boost/date_time/posix_time/posix_time.hpp> 12 #include <boost/date_time.hpp> 25 std::string topic_name;
34 boost::thread periodic_thread;
35 void periodic_update_topic(){
36 ROS_INFO_STREAM(
"topic " << this->topic_name <<
" is now published at " << 1.0/(this->periodic_rate).toSec() <<
" Hz periodically.");
37 while(
ros::ok() && this->periodic){
40 sleep_sec = this->periodic_rate.
toNSec();
41 boost::mutex::scoped_lock lock(mutex);
45 boost::this_thread::sleep(boost::posix_time::milliseconds(sleep_sec * 1e-6));
47 ROS_INFO_STREAM(
"topic " << this->topic_name <<
" is now NOT published.");
60 boost::mutex::scoped_lock lock(s->mutex);
61 if ( s->rate.isZero() && s->last_time_received.isZero() ) {
62 }
else if ( s->rate.isZero() && ! s->last_time_received.isZero() ) {
69 if ( s->advertised ==
false ) {
70 s->pub = msg->advertise(*g_node, s->topic_name+
string(
"_update"), 10);
83 jsk_topic_tools::List::Response& res)
86 for (list<sub_info_ref>::iterator it =
g_subs.begin();
90 while( ! (*it)->advertised ) {
91 ROS_WARN_STREAM(
"service (list) waiting.. " << (*it)->topic_name <<
" is not running yet...");
94 res.topic_names.push_back((*it)->topic_name);
95 ROS_INFO_STREAM(
"service (list) returns res.topic_name:" << (*it)->topic_name );
103 for (list<sub_info_ref>::iterator it =
g_subs.begin();
107 if ( (*it)->topic_name == msg->data && (*it)->advertised ==
true ) {
108 if (! (*it)->advertised ) {
109 ROS_WARN_STREAM(
"topic (update) " << (*it)->topic_name <<
" is not running yet...");
112 ROS_INFO_STREAM(
"topic (update) " << (*it)->topic_name <<
" running at " << 1.0/((*it)->rate).toSec() <<
" Hz");
113 (*it)->pub.publish((*it)->msg);
122 jsk_topic_tools::Update::Response& res)
126 for (list<sub_info_ref>::iterator it =
g_subs.begin();
130 if ( (*it)->topic_name == req.topic_name && (*it)->advertised ==
true ) {
131 if (! (*it)->advertised ) {
132 ROS_WARN_STREAM(
"service (update) " << (*it)->topic_name <<
" is not running yet...");
135 if (req.periodic && ! (*it)->periodic) {
137 if (req.periodic_rate > (*it)->rate) {
138 (*it)->periodic =
true;
140 (*it)->periodic_thread = boost::thread(boost::bind(&sub_info_t::periodic_update_topic, &(**it)));
141 res.rate = (*it)->rate.toSec();
142 ROS_INFO_STREAM(
"periodic publish of topic is started in the new thread. topic:" << (*it)->topic_name <<
" rate:" << (*it)->periodic_rate);
145 ROS_ERROR_STREAM(
"Invalid periodic rate is set at " << (*it)->topic_name <<
". rate must be 0.0 < " << 1.0/req.periodic_rate.toSec() <<
" < " << 1.0/((*it)->rate).toSec());
148 }
else if (! req.periodic && (*it)->periodic) {
150 (*it)->periodic =
false;
151 ((*it)->periodic_thread).join();
153 ROS_INFO_STREAM(
"periodic publish of topic " << (*it)->topic_name <<
" is now stopped.");
155 }
else if (req.periodic) {
156 if (req.periodic_rate > (*it)->rate) {
157 (*it)->periodic =
true;
159 ROS_INFO_STREAM(
"periodic publish of topic is started in the existing thread. topic:" << (*it)->topic_name <<
" rate:" << (*it)->periodic_rate);
162 ROS_ERROR_STREAM(
"Invalid periodic rate is set at " << (*it)->topic_name <<
". rate must be 0.0 < " << 1.0/req.periodic_rate.toSec() <<
" < " << 1.0/((*it)->rate).toSec());
167 ROS_INFO_STREAM(
"service (update) " << (*it)->topic_name <<
" running at " << 1.0/((*it)->rate).toSec() <<
" Hz");
168 (*it)->pub.publish((*it)->msg);
169 res.rate = (*it)->rate.toSec();
170 ROS_INFO_STREAM(
"service (update) is called, req.topic:" << req.topic_name <<
", res.rate " << res.rate);
187 printf(
"\nusage: topic_buffer_Server IN_TOPIC1 [IN_TOPIC2 [...]]\n\n");
191 ros::init(argc, argv,
"topic_buffer_server");
193 for (
unsigned int i = 1; i < args.size(); i++)
194 topics.push_back(args[i]);
201 double periodic_rate = 0.1;
204 nh.
param (
"periodic_rate", periodic_rate, 0.1);
205 ROS_INFO(
"use periodic rate = %f", periodic_rate);
208 for (
size_t i = 0; i < topics.size(); i++)
213 sub_info->last_time_received =
ros::Time(0);
215 sub_info->advertised =
false;
216 sub_info->periodic =
false;
221 while (sub_info->sub->getNumPublishers() == 0 ) {
223 ROS_WARN_STREAM(
"wait for " << sub_info->topic_name <<
" ... " << i <<
"/" << topics.size());
226 g_subs.push_back(sub_info);
228 ROS_INFO_STREAM(
"setup done, subscribed " << topics.size() <<
" topics");
231 for (list<sub_info_ref>::iterator it =
g_subs.begin(); it !=
g_subs.end(); ++it) {
232 (*it)->periodic =
true;
234 (*it)->periodic_thread = boost::thread(boost::bind(&sub_info_t::periodic_update_topic, &(**it)));
235 ROS_INFO_STREAM(
"periodic publish of topic is started in the new thread. topic:" << (*it)->topic_name <<
" rate:" << (*it)->periodic_rate);
void publish(const boost::shared_ptr< M > &message) const
void update_topic_cb(const std_msgs::String::ConstPtr &msg)
boost::shared_ptr< sub_info_t > sub_info_ref
Subscriber subscribe(const std::string &topic, uint32_t queue_size, void(T::*fp)(M), T *obj, const TransportHints &transport_hints=TransportHints())
bool list_topic_cb(jsk_topic_tools::List::Request &req, jsk_topic_tools::List::Response &res)
ROSCPP_DECL void init(int &argc, char **argv, const std::string &name, uint32_t options=0)
ROSCPP_DECL std::string resolve(const std::string &name, bool remap=true)
ServiceServer advertiseService(const std::string &service, bool(T::*srv_func)(MReq &, MRes &), T *obj)
static bool use_periodic_rate
void in_cb(const boost::shared_ptr< ShapeShifter const > &msg, boost::shared_ptr< sub_info_t > s)
ROSCPP_DECL void spin(Spinner &spinner)
bool param(const std::string ¶m_name, T ¶m_val, const T &default_val) const
#define ROS_WARN_STREAM(args)
static list< sub_info_ref > g_subs
#define ROS_INFO_STREAM(args)
int main(int argc, char **argv)
ROSCPP_DECL void removeROSArgs(int argc, const char *const *argv, V_string &args_out)
bool hasParam(const std::string &key) const
#define ROS_ERROR_STREAM(args)
static ros::NodeHandle * g_node