Go to the documentation of this file.00001 #include <cstdio>
00002 #include <vector>
00003 #include <list>
00004 #include "ros/console.h"
00005 #include "std_msgs/String.h"
00006 #include "jsk_topic_tools/List.h"
00007 #include "jsk_topic_tools/Update.h"
00008 #include "topic_tools/shape_shifter.h"
00009 #include "topic_tools/parse.h"
00010 #include <boost/thread.hpp>
00011 #include <boost/date_time/posix_time/posix_time.hpp>
00012 #include <boost/date_time.hpp>
00013
00014 using std::string;
00015 using std::vector;
00016 using std::list;
00017 using namespace topic_tools;
00018
00019 static bool use_periodic_rate = false;
00020
00021 class sub_info_t
00022 {
00023 public:
00024 boost::mutex mutex;
00025 std::string topic_name;
00026 ros::Subscriber *sub;
00027 ros::Publisher pub;
00028 bool advertised;
00029 boost::shared_ptr<ShapeShifter const> msg;
00030 ros::Time last_time_received;
00031 ros::Duration rate;
00032 bool periodic;
00033 ros::Duration periodic_rate;
00034 boost::thread periodic_thread;
00035 void periodic_update_topic(){
00036 ROS_INFO_STREAM("topic " << this->topic_name << " is now published at " << 1.0/(this->periodic_rate).toSec() << " Hz periodically.");
00037 while(ros::ok() && this->periodic){
00038 double sleep_sec;
00039 {
00040 sleep_sec = this->periodic_rate.toNSec();
00041 boost::mutex::scoped_lock lock(mutex);
00042 this->pub.publish(this->msg);
00043 }
00044
00045 boost::this_thread::sleep(boost::posix_time::milliseconds(sleep_sec * 1e-6));
00046 }
00047 ROS_INFO_STREAM("topic " << this->topic_name << " is now NOT published.");
00048 }
00049 };
00050
00051 typedef boost::shared_ptr<sub_info_t> sub_info_ref;
00052
00053 static list<sub_info_ref> g_subs;
00054
00055 static ros::NodeHandle *g_node = NULL;
00056
00057 void in_cb(const boost::shared_ptr<ShapeShifter const>& msg,
00058 boost::shared_ptr<sub_info_t> s)
00059 {
00060 boost::mutex::scoped_lock lock(s->mutex);
00061 if ( s->rate.isZero() && s->last_time_received.isZero() ) {
00062 } else if ( s->rate.isZero() && ! s->last_time_received.isZero() ) {
00063 s->rate = ros::Time::now() - s->last_time_received;
00064 } else {
00065 double alpha = 0.1;
00066 s->rate = (ros::Duration(alpha * (ros::Time::now() - s->last_time_received).toSec() + (1 - alpha) * s->rate.toSec()));
00067 }
00068
00069 if ( s->advertised == false ) {
00070 s->pub = msg->advertise(*g_node, s->topic_name+string("_update"), 10);
00071 s->advertised = true;
00072 ROS_INFO_STREAM("advertised as " << s->topic_name+string("_update"));
00073 }
00074
00075 s->msg = msg;
00076
00077
00078 s->last_time_received = ros::Time::now();
00079 }
00080
00081
00082 bool list_topic_cb(jsk_topic_tools::List::Request& req,
00083 jsk_topic_tools::List::Response& res)
00084 {
00085 ROS_INFO_STREAM("service (list) is called, returns " << g_subs.size() << " topics");
00086 for (list<sub_info_ref>::iterator it = g_subs.begin();
00087 it != g_subs.end();
00088 ++it)
00089 {
00090 while( ! (*it)->advertised ) {
00091 ROS_WARN_STREAM("service (list) waiting.. " << (*it)->topic_name << " is not running yet...");
00092 return false;
00093 }
00094 res.topic_names.push_back((*it)->topic_name);
00095 ROS_INFO_STREAM("service (list) returns res.topic_name:" << (*it)->topic_name );
00096 }
00097
00098 return true;
00099 }
00100
00101 void update_topic_cb(const std_msgs::String::ConstPtr &msg) {
00102 ROS_INFO_STREAM("topic (update) is called, searching from " << g_subs.size() << " topics");
00103 for (list<sub_info_ref>::iterator it = g_subs.begin();
00104 it != g_subs.end();
00105 ++it)
00106 {
00107 if ( (*it)->topic_name == msg->data && (*it)->advertised == true ) {
00108 if (! (*it)->advertised ) {
00109 ROS_WARN_STREAM("topic (update) " << (*it)->topic_name << " is not running yet...");
00110 continue;
00111 }
00112 ROS_INFO_STREAM("topic (update) " << (*it)->topic_name << " running at " << 1.0/((*it)->rate).toSec() << " Hz");
00113 (*it)->pub.publish((*it)->msg);
00114 ROS_INFO_STREAM("topic (update) is called, req.topic:" << msg->data);
00115 return;
00116 }
00117 }
00118 ROS_ERROR_STREAM("could not find topic named " << msg->data );
00119 }
00120
00121 bool update_topic_cb(jsk_topic_tools::Update::Request& req,
00122 jsk_topic_tools::Update::Response& res)
00123 {
00124
00125 ROS_INFO_STREAM("service (update) is called, searching from " << g_subs.size() << " topics");
00126 for (list<sub_info_ref>::iterator it = g_subs.begin();
00127 it != g_subs.end();
00128 ++it)
00129 {
00130 if ( (*it)->topic_name == req.topic_name && (*it)->advertised == true ) {
00131 if (! (*it)->advertised ) {
00132 ROS_WARN_STREAM("service (update) " << (*it)->topic_name << " is not running yet...");
00133 continue;
00134 }
00135 if (req.periodic && ! (*it)->periodic) {
00136
00137 if (req.periodic_rate > (*it)->rate) {
00138 (*it)->periodic = true;
00139 (*it)->periodic_rate = ros::Duration(req.periodic_rate);
00140 (*it)->periodic_thread = boost::thread(boost::bind(&sub_info_t::periodic_update_topic, &(**it)));
00141 res.rate = (*it)->rate.toSec();
00142 ROS_INFO_STREAM("periodic publish of topic is started in the new thread. topic:" << (*it)->topic_name << " rate:" << (*it)->periodic_rate);
00143 return true;
00144 } else {
00145 ROS_ERROR_STREAM("Invalid periodic rate is set at " << (*it)->topic_name << ". rate must be 0.0 < " << 1.0/req.periodic_rate.toSec() << " < " << 1.0/((*it)->rate).toSec());
00146 return false;
00147 }
00148 } else if (! req.periodic && (*it)->periodic) {
00149
00150 (*it)->periodic = false;
00151 ((*it)->periodic_thread).join();
00152 res.rate = 0.0;
00153 ROS_INFO_STREAM("periodic publish of topic " << (*it)->topic_name << " is now stopped.");
00154 return true;
00155 } else if (req.periodic) {
00156 if (req.periodic_rate > (*it)->rate) {
00157 (*it)->periodic = true;
00158 (*it)->periodic_rate = ros::Duration(req.periodic_rate);
00159 ROS_INFO_STREAM("periodic publish of topic is started in the existing thread. topic:" << (*it)->topic_name << " rate:" << (*it)->periodic_rate);
00160 return true;
00161 } else {
00162 ROS_ERROR_STREAM("Invalid periodic rate is set at " << (*it)->topic_name << ". rate must be 0.0 < " << 1.0/req.periodic_rate.toSec() << " < " << 1.0/((*it)->rate).toSec());
00163 return false;
00164 }
00165 } else {
00166
00167 ROS_INFO_STREAM("service (update) " << (*it)->topic_name << " running at " << 1.0/((*it)->rate).toSec() << " Hz");
00168 (*it)->pub.publish((*it)->msg);
00169 res.rate = (*it)->rate.toSec();
00170 ROS_INFO_STREAM("service (update) is called, req.topic:" << req.topic_name << ", res.rate " << res.rate);
00171 return true;
00172 }
00173 }
00174 }
00175 ROS_ERROR_STREAM("could not find topic named " << req.topic_name );
00176 return false;
00177 }
00178
00179
00180 int main(int argc, char **argv)
00181 {
00182 vector<string> args;
00183 ros::removeROSArgs(argc, (const char**)argv, args);
00184
00185 if (args.size() < 2)
00186 {
00187 printf("\nusage: topic_buffer_Server IN_TOPIC1 [IN_TOPIC2 [...]]\n\n");
00188 return 1;
00189 }
00190
00191 ros::init(argc, argv, "topic_buffer_server");
00192 vector<string> topics;
00193 for (unsigned int i = 1; i < args.size(); i++)
00194 topics.push_back(args[i]);
00195
00196 ros::NodeHandle n;
00197 ros::NodeHandle nh("~");
00198
00199 g_node = &n;
00200
00201 double periodic_rate = 0.1;
00202 if (nh.hasParam("periodic_rate")) {
00203 use_periodic_rate = true;
00204 nh.param ("periodic_rate", periodic_rate, 0.1);
00205 ROS_INFO("use periodic rate = %f", periodic_rate);
00206 }
00207
00208 for (size_t i = 0; i < topics.size(); i++)
00209 {
00210
00211 boost::shared_ptr<sub_info_t> sub_info(new sub_info_t);
00212 sub_info->topic_name = ros::names::resolve(topics[i]);
00213 sub_info->last_time_received = ros::Time(0);
00214 sub_info->rate = ros::Duration(0);
00215 sub_info->advertised = false;
00216 sub_info->periodic = false;
00217 ROS_INFO_STREAM("subscribe " << sub_info->topic_name);
00218 sub_info->sub = new ros::Subscriber(n.subscribe<ShapeShifter>(sub_info->topic_name, 10, boost::bind(in_cb, _1, sub_info)));
00219
00220
00221 while (sub_info->sub->getNumPublishers() == 0 ) {
00222 ros::Duration(1.0).sleep();
00223 ROS_WARN_STREAM("wait for " << sub_info->topic_name << " ... " << i << "/" << topics.size());
00224 }
00225
00226 g_subs.push_back(sub_info);
00227 }
00228 ROS_INFO_STREAM("setup done, subscribed " << topics.size() << " topics");
00229
00230 if(use_periodic_rate) {
00231 for (list<sub_info_ref>::iterator it = g_subs.begin(); it != g_subs.end(); ++it) {
00232 (*it)->periodic = true;
00233 (*it)->periodic_rate = ros::Duration(periodic_rate);
00234 (*it)->periodic_thread = boost::thread(boost::bind(&sub_info_t::periodic_update_topic, &(**it)));
00235 ROS_INFO_STREAM("periodic publish of topic is started in the new thread. topic:" << (*it)->topic_name << " rate:" << (*it)->periodic_rate);
00236 }
00237 }
00238
00239
00240 ros::ServiceServer ss_list = nh.advertiseService(string("list"), list_topic_cb);
00241
00242 ros::ServiceServer ss_update = nh.advertiseService(string("update"), update_topic_cb);
00243 ros::Subscriber ss_update_sub = nh.subscribe("update", 1, update_topic_cb);
00244 ros::spin();
00245
00246 return 0;
00247 }