Go to the documentation of this file.00001 #include <cstdio>
00002 #include <vector>
00003 #include <list>
00004 #include "ros/console.h"
00005 #include "std_msgs/String.h"
00006 #include "jsk_topic_tools/List.h"
00007 #include "jsk_topic_tools/Update.h"
00008 #include "topic_tools/shape_shifter.h"
00009 #include "topic_tools/parse.h"
00010
00011 using std::string;
00012 using std::vector;
00013 using std::list;
00014 using namespace topic_tools;
00015
00016 class sub_info_t
00017 {
00018 public:
00019 std::string topic_name;
00020 ros::Subscriber *sub;
00021 ros::Publisher pub;
00022 bool advertised;
00023 boost::shared_ptr<ShapeShifter const> msg;
00024 ros::Time last_time_received;
00025 ros::Duration rate;
00026 };
00027
00028 typedef boost::shared_ptr<sub_info_t> sub_info_ref;
00029
00030 static list<sub_info_ref> g_subs;
00031
00032 static ros::NodeHandle *g_node = NULL;
00033
00034 void in_cb(const boost::shared_ptr<ShapeShifter const>& msg,
00035 boost::shared_ptr<sub_info_t> s)
00036 {
00037 if ( s->rate.isZero() && s->last_time_received.isZero() ) {
00038 } else if ( s->rate.isZero() && ! s->last_time_received.isZero() ) {
00039 s->rate = ros::Time::now() - s->last_time_received;
00040 } else {
00041 double alpha = 0.1;
00042 s->rate = (ros::Duration(alpha * (ros::Time::now() - s->last_time_received).toSec() + (1 - alpha) * s->rate.toSec()));
00043 }
00044
00045 if ( s->advertised == false ) {
00046 s->pub = msg->advertise(*g_node, s->topic_name+string("_update"), 10);
00047 s->advertised = true;
00048 ROS_INFO_STREAM("advertised as " << s->topic_name+string("_update"));
00049 }
00050
00051 s->msg = msg;
00052
00053
00054 s->last_time_received = ros::Time::now();
00055 }
00056
00057
00058 bool list_topic_cb(jsk_topic_tools::List::Request& req,
00059 jsk_topic_tools::List::Response& res)
00060 {
00061 ROS_INFO_STREAM("service (list) is called, returns " << g_subs.size() << " topics");
00062 for (list<sub_info_ref>::iterator it = g_subs.begin();
00063 it != g_subs.end();
00064 ++it)
00065 {
00066 while( ! (*it)->advertised ) {
00067 ROS_WARN_STREAM("service (list) waiting.. " << (*it)->topic_name << " is not running yet...");
00068 return false;
00069 }
00070 res.topic_names.push_back((*it)->topic_name);
00071 ROS_INFO_STREAM("service (list) returns res.topic_name:" << (*it)->topic_name );
00072 }
00073
00074 return true;
00075 }
00076
00077 void update_topic_cb(const std_msgs::String::ConstPtr &msg) {
00078 ROS_INFO_STREAM("topic (update) is called, searching from " << g_subs.size() << " topics");
00079 for (list<sub_info_ref>::iterator it = g_subs.begin();
00080 it != g_subs.end();
00081 ++it)
00082 {
00083 if ( (*it)->topic_name == msg->data && (*it)->advertised == true ) {
00084 if (! (*it)->advertised ) {
00085 ROS_WARN_STREAM("topic (update) " << (*it)->topic_name << " is not running yet...");
00086 continue;
00087 }
00088 ROS_INFO_STREAM("topic (update) " << (*it)->topic_name << " running at " << 1.0/((*it)->rate).toSec() << " Hz");
00089 (*it)->pub.publish((*it)->msg);
00090 ROS_INFO_STREAM("topic (update) is called, req.topic:" << msg->data);
00091 return;
00092 }
00093 }
00094 ROS_ERROR_STREAM("could not find topic named " << msg->data );
00095 }
00096
00097 bool update_topic_cb(jsk_topic_tools::Update::Request& req,
00098 jsk_topic_tools::Update::Response& res)
00099 {
00100
00101 ROS_INFO_STREAM("service (update) is called, searching from " << g_subs.size() << " topics");
00102 for (list<sub_info_ref>::iterator it = g_subs.begin();
00103 it != g_subs.end();
00104 ++it)
00105 {
00106 if ( (*it)->topic_name == req.topic_name && (*it)->advertised == true ) {
00107 if (! (*it)->advertised ) {
00108 ROS_WARN_STREAM("service (update) " << (*it)->topic_name << " is not running yet...");
00109 continue;
00110 }
00111 ROS_INFO_STREAM("service (update) " << (*it)->topic_name << " running at " << 1.0/((*it)->rate).toSec() << " Hz");
00112 (*it)->pub.publish((*it)->msg);
00113 res.rate = (*it)->rate.toSec();
00114 ROS_INFO_STREAM("service (update) is called, req.topic:" << req.topic_name << ", res.rate " << res.rate);
00115 return true;
00116 }
00117 }
00118 ROS_ERROR_STREAM("could not find topic named " << req.topic_name );
00119 return false;
00120 }
00121
00122
00123 int main(int argc, char **argv)
00124 {
00125 vector<string> args;
00126 ros::removeROSArgs(argc, (const char**)argv, args);
00127
00128 if (args.size() < 2)
00129 {
00130 printf("\nusage: topic_buffer_Server IN_TOPIC1 [IN_TOPIC2 [...]]\n\n");
00131 return 1;
00132 }
00133
00134 ros::init(argc, argv, "topic_buffer_server");
00135 vector<string> topics;
00136 for (unsigned int i = 1; i < args.size(); i++)
00137 topics.push_back(args[i]);
00138
00139 ros::NodeHandle n;
00140 ros::NodeHandle nh("~");
00141
00142 g_node = &n;
00143
00144 for (size_t i = 0; i < topics.size(); i++)
00145 {
00146
00147 boost::shared_ptr<sub_info_t> sub_info(new sub_info_t);
00148 sub_info->topic_name = ros::names::resolve(topics[i]);
00149 sub_info->last_time_received = ros::Time(0);
00150 sub_info->rate = ros::Duration(0);
00151 sub_info->advertised = false;
00152 ROS_INFO_STREAM("subscribe " << sub_info->topic_name);
00153 sub_info->sub = new ros::Subscriber(n.subscribe<ShapeShifter>(sub_info->topic_name, 10, boost::bind(in_cb, _1, sub_info)));
00154
00155
00156 while (sub_info->sub->getNumPublishers() == 0 ) {
00157 ros::Duration(1.0).sleep();
00158 ROS_WARN_STREAM("wait for " << sub_info->topic_name << " ... " << i << "/" << topics.size());
00159 }
00160
00161 g_subs.push_back(sub_info);
00162 }
00163 ROS_INFO_STREAM("setup done, subscribed " << topics.size() << " topics");
00164
00165
00166 ros::ServiceServer ss_list = nh.advertiseService(string("list"), list_topic_cb);
00167
00168 ros::ServiceServer ss_update = nh.advertiseService(string("update"), update_topic_cb);
00169 ros::Subscriber ss_update_sub = nh.subscribe("update", 1, update_topic_cb);
00170 ros::spin();
00171
00172 return 0;
00173 }