topic_buffer_server.cpp
Go to the documentation of this file.
00001 #include <cstdio>
00002 #include <vector>
00003 #include <list>
00004 #include "ros/console.h"
00005 #include "std_msgs/String.h"
00006 #include "jsk_topic_tools/List.h"
00007 #include "jsk_topic_tools/Update.h"
00008 #include "topic_tools/shape_shifter.h"
00009 #include "topic_tools/parse.h"
00010 
00011 using std::string;
00012 using std::vector;
00013 using std::list;
00014 using namespace topic_tools;
00015 
00016 class sub_info_t
00017 {
00018 public:
00019     std::string topic_name;
00020     ros::Subscriber *sub;
00021     ros::Publisher pub;
00022     bool advertised;
00023     boost::shared_ptr<ShapeShifter const> msg;
00024     ros::Time last_time_received;
00025     ros::Duration rate;
00026 };
00027 
00028 typedef boost::shared_ptr<sub_info_t> sub_info_ref;
00029 
00030 static list<sub_info_ref> g_subs;
00031 
00032 static ros::NodeHandle *g_node = NULL;
00033 
00034 void in_cb(const boost::shared_ptr<ShapeShifter const>& msg,
00035            boost::shared_ptr<sub_info_t> s)
00036 {
00037     if ( s->rate.isZero() && s->last_time_received.isZero() ) { // skip first time
00038     } else if ( s->rate.isZero() && ! s->last_time_received.isZero() ) { // just for second time
00039         s->rate = ros::Time::now() - s->last_time_received;
00040     } else {
00041         double alpha = 0.1; //N = 19 alpha =  2 / ( N + 1 )
00042         s->rate = (ros::Duration(alpha * (ros::Time::now() - s->last_time_received).toSec() + (1 - alpha) * s->rate.toSec()));
00043     }
00044 
00045     if ( s->advertised == false ) {
00046         s->pub = msg->advertise(*g_node, s->topic_name+string("_update"), 10);
00047         s->advertised = true;
00048         ROS_INFO_STREAM("advertised as " << s->topic_name+string("_update"));
00049     }
00050 
00051     s->msg = msg;
00052 
00053     // update last_time_received
00054     s->last_time_received = ros::Time::now();
00055 }
00056 
00057 
00058 bool list_topic_cb(jsk_topic_tools::List::Request& req,
00059                    jsk_topic_tools::List::Response& res)
00060 {
00061     ROS_INFO_STREAM("service (list) is called, returns " << g_subs.size() << " topics");
00062     for (list<sub_info_ref>::iterator it = g_subs.begin();
00063          it != g_subs.end();
00064          ++it)
00065         {
00066             while( ! (*it)->advertised ) {
00067                 ROS_WARN_STREAM("service (list) waiting.. " << (*it)->topic_name << " is not running yet...");
00068                 return false;
00069             }
00070             res.topic_names.push_back((*it)->topic_name);
00071             ROS_INFO_STREAM("service (list) returns res.topic_name:" << (*it)->topic_name );
00072         }
00073 
00074     return true;
00075 }
00076 
00077 void update_topic_cb(const std_msgs::String::ConstPtr &msg) {
00078   ROS_INFO_STREAM("topic (update) is called, searching from " << g_subs.size() << " topics");
00079   for (list<sub_info_ref>::iterator it = g_subs.begin();
00080          it != g_subs.end();
00081          ++it)
00082   {
00083     if ( (*it)->topic_name == msg->data && (*it)->advertised == true ) {
00084       if (! (*it)->advertised ) {
00085         ROS_WARN_STREAM("topic (update) " << (*it)->topic_name << " is not running yet...");
00086         continue;
00087       }
00088       ROS_INFO_STREAM("topic (update) " << (*it)->topic_name << " running at " << 1.0/((*it)->rate).toSec() << " Hz");
00089       (*it)->pub.publish((*it)->msg);
00090       ROS_INFO_STREAM("topic (update) is called, req.topic:" << msg->data);
00091       return;
00092     }
00093   }
00094   ROS_ERROR_STREAM("could not find topic named " << msg->data );
00095 }
00096 
00097 bool update_topic_cb(jsk_topic_tools::Update::Request& req,
00098                      jsk_topic_tools::Update::Response& res)
00099 {
00100 
00101     ROS_INFO_STREAM("service (update) is called, searching from " << g_subs.size() << " topics");
00102     for (list<sub_info_ref>::iterator it = g_subs.begin();
00103          it != g_subs.end();
00104          ++it)
00105         {
00106             if ( (*it)->topic_name == req.topic_name && (*it)->advertised == true ) {
00107                 if (! (*it)->advertised ) {
00108                     ROS_WARN_STREAM("service (update) " << (*it)->topic_name << " is not running yet...");
00109                     continue;
00110                 }
00111                 ROS_INFO_STREAM("service (update) " << (*it)->topic_name << " running at " << 1.0/((*it)->rate).toSec() << " Hz");
00112                 (*it)->pub.publish((*it)->msg);
00113                 res.rate = (*it)->rate.toSec();
00114                 ROS_INFO_STREAM("service (update) is called, req.topic:" << req.topic_name << ", res.rate " << res.rate);
00115                 return true;
00116             }
00117         }
00118     ROS_ERROR_STREAM("could not find topic named " << req.topic_name );
00119     return false;
00120 }
00121 
00122 
00123 int main(int argc, char **argv)
00124 {
00125     vector<string> args;
00126     ros::removeROSArgs(argc, (const char**)argv, args);
00127 
00128     if (args.size() < 2)
00129         {
00130             printf("\nusage: topic_buffer_Server IN_TOPIC1 [IN_TOPIC2 [...]]\n\n");
00131             return 1;
00132         }
00133 
00134     ros::init(argc, argv, "topic_buffer_server");
00135     vector<string> topics;
00136     for (unsigned int i = 1; i < args.size(); i++)
00137         topics.push_back(args[i]);
00138 
00139     ros::NodeHandle n;
00140     ros::NodeHandle nh("~");
00141 
00142     g_node = &n;
00143 
00144     for (size_t i = 0; i < topics.size(); i++)
00145         {
00146             //sub_info_t sub_info;
00147             boost::shared_ptr<sub_info_t> sub_info(new sub_info_t);
00148             sub_info->topic_name = ros::names::resolve(topics[i]);
00149             sub_info->last_time_received = ros::Time(0);
00150             sub_info->rate = ros::Duration(0);
00151             sub_info->advertised = false;
00152             ROS_INFO_STREAM("subscribe " << sub_info->topic_name);
00153             sub_info->sub = new ros::Subscriber(n.subscribe<ShapeShifter>(sub_info->topic_name, 10, boost::bind(in_cb, _1, sub_info)));
00154 
00155             // waiting for all topics are publisherd
00156             while (sub_info->sub->getNumPublishers() == 0 ) {
00157                 ros::Duration(1.0).sleep();
00158                 ROS_WARN_STREAM("wait for " << sub_info->topic_name << " ... " << i << "/" << topics.size());
00159             }
00160 
00161             g_subs.push_back(sub_info);
00162         }
00163     ROS_INFO_STREAM("setup done, subscribed " << topics.size() << " topics");
00164 
00165     // New service
00166     ros::ServiceServer ss_list = nh.advertiseService(string("list"), list_topic_cb);
00167 
00168     ros::ServiceServer ss_update = nh.advertiseService(string("update"), update_topic_cb);
00169     ros::Subscriber ss_update_sub = nh.subscribe("update", 1, update_topic_cb);
00170     ros::spin();
00171 
00172     return 0;
00173 }


jsk_topic_tools
Author(s): Kei Okada , Yusuke Furuta
autogenerated on Mon Oct 6 2014 10:56:17