topic_buffer_server.cpp
Go to the documentation of this file.
00001 #include <cstdio>
00002 #include <vector>
00003 #include <list>
00004 #include "ros/console.h"
00005 #include "std_msgs/String.h"
00006 #include "jsk_topic_tools/List.h"
00007 #include "jsk_topic_tools/Update.h"
00008 #include "topic_tools/shape_shifter.h"
00009 #include "topic_tools/parse.h"
00010 #include <boost/thread.hpp>
00011 #include <boost/date_time/posix_time/posix_time.hpp>
00012 #include <boost/date_time.hpp>
00013 
00014 using std::string;
00015 using std::vector;
00016 using std::list;
00017 using namespace topic_tools;
00018 
00019 static bool use_periodic_rate = false;
00020 
00021 class sub_info_t
00022 {
00023 public:
00024     boost::mutex mutex;
00025     std::string topic_name;
00026     ros::Subscriber *sub;
00027     ros::Publisher pub;
00028     bool advertised;
00029     boost::shared_ptr<ShapeShifter const> msg;
00030     ros::Time last_time_received;
00031     ros::Duration rate;
00032     bool periodic;
00033     ros::Duration periodic_rate;
00034     boost::thread periodic_thread;
00035     void periodic_update_topic(){
00036         ROS_INFO_STREAM("topic " << this->topic_name << " is now published at " << 1.0/(this->periodic_rate).toSec() << " Hz periodically.");
00037         while(ros::ok() && this->periodic){
00038           double sleep_sec;
00039           {
00040             sleep_sec = this->periodic_rate.toNSec();
00041             boost::mutex::scoped_lock lock(mutex);
00042             this->pub.publish(this->msg);
00043           }
00044           //ROS_DEBUG_STREAM("sleep " << this->periodic_rate.toNSec() * 1e-6 << " msec.");
00045           boost::this_thread::sleep(boost::posix_time::milliseconds(sleep_sec * 1e-6));
00046         }
00047         ROS_INFO_STREAM("topic " << this->topic_name << " is now NOT published.");
00048     }
00049 };
00050 
00051 typedef boost::shared_ptr<sub_info_t> sub_info_ref;
00052 
00053 static list<sub_info_ref> g_subs;
00054 
00055 static ros::NodeHandle *g_node = NULL;
00056 
00057 void in_cb(const boost::shared_ptr<ShapeShifter const>& msg,
00058            boost::shared_ptr<sub_info_t> s)
00059 {
00060     boost::mutex::scoped_lock lock(s->mutex);
00061     if ( s->rate.isZero() && s->last_time_received.isZero() ) { // skip first time
00062     } else if ( s->rate.isZero() && ! s->last_time_received.isZero() ) { // just for second time
00063         s->rate = ros::Time::now() - s->last_time_received;
00064     } else {
00065         double alpha = 0.1; //N = 19 alpha =  2 / ( N + 1 )
00066         s->rate = (ros::Duration(alpha * (ros::Time::now() - s->last_time_received).toSec() + (1 - alpha) * s->rate.toSec()));
00067     }
00068 
00069     if ( s->advertised == false ) {
00070         s->pub = msg->advertise(*g_node, s->topic_name+string("_update"), 10);
00071         s->advertised = true;
00072         ROS_INFO_STREAM("advertised as " << s->topic_name+string("_update"));
00073     }
00074 
00075     s->msg = msg;
00076 
00077     // update last_time_received
00078     s->last_time_received = ros::Time::now();
00079 }
00080 
00081 
00082 bool list_topic_cb(jsk_topic_tools::List::Request& req,
00083                    jsk_topic_tools::List::Response& res)
00084 {
00085     ROS_INFO_STREAM("service (list) is called, returns " << g_subs.size() << " topics");
00086     for (list<sub_info_ref>::iterator it = g_subs.begin();
00087          it != g_subs.end();
00088          ++it)
00089         {
00090             while( ! (*it)->advertised ) {
00091                 ROS_WARN_STREAM("service (list) waiting.. " << (*it)->topic_name << " is not running yet...");
00092                 return false;
00093             }
00094             res.topic_names.push_back((*it)->topic_name);
00095             ROS_INFO_STREAM("service (list) returns res.topic_name:" << (*it)->topic_name );
00096         }
00097 
00098     return true;
00099 }
00100 
00101 void update_topic_cb(const std_msgs::String::ConstPtr &msg) {
00102   ROS_INFO_STREAM("topic (update) is called, searching from " << g_subs.size() << " topics");
00103   for (list<sub_info_ref>::iterator it = g_subs.begin();
00104          it != g_subs.end();
00105          ++it)
00106   {
00107     if ( (*it)->topic_name == msg->data && (*it)->advertised == true ) {
00108       if (! (*it)->advertised ) {
00109         ROS_WARN_STREAM("topic (update) " << (*it)->topic_name << " is not running yet...");
00110         continue;
00111       }
00112       ROS_INFO_STREAM("topic (update) " << (*it)->topic_name << " running at " << 1.0/((*it)->rate).toSec() << " Hz");
00113       (*it)->pub.publish((*it)->msg);
00114       ROS_INFO_STREAM("topic (update) is called, req.topic:" << msg->data);
00115       return;
00116     }
00117   }
00118   ROS_ERROR_STREAM("could not find topic named " << msg->data );
00119 }
00120 
00121 bool update_topic_cb(jsk_topic_tools::Update::Request& req,
00122                      jsk_topic_tools::Update::Response& res)
00123 {
00124 
00125     ROS_INFO_STREAM("service (update) is called, searching from " << g_subs.size() << " topics");
00126     for (list<sub_info_ref>::iterator it = g_subs.begin();
00127          it != g_subs.end();
00128          ++it)
00129         {
00130           if ( (*it)->topic_name == req.topic_name && (*it)->advertised == true ) {
00131                 if (! (*it)->advertised ) {
00132                     ROS_WARN_STREAM("service (update) " << (*it)->topic_name << " is not running yet...");
00133                     continue;
00134                 }
00135                 if (req.periodic && ! (*it)->periodic) {
00136                     // start periodic publish thread
00137                     if (req.periodic_rate > (*it)->rate) {
00138                         (*it)->periodic = true;
00139                         (*it)->periodic_rate = ros::Duration(req.periodic_rate);
00140                         (*it)->periodic_thread = boost::thread(boost::bind(&sub_info_t::periodic_update_topic, &(**it)));
00141                         res.rate = (*it)->rate.toSec();
00142                         ROS_INFO_STREAM("periodic publish of topic is started in the new thread. topic:" << (*it)->topic_name << " rate:" << (*it)->periodic_rate);
00143                         return true;
00144                     } else {
00145                         ROS_ERROR_STREAM("Invalid periodic rate is set at " << (*it)->topic_name << ". rate must be 0.0 < " << 1.0/req.periodic_rate.toSec() << " < " << 1.0/((*it)->rate).toSec());
00146                         return false;
00147                     }
00148                 } else if (! req.periodic && (*it)->periodic) {
00149                     // stop periodic publish thread
00150                     (*it)->periodic = false;
00151                     ((*it)->periodic_thread).join();
00152                     res.rate = 0.0;
00153                     ROS_INFO_STREAM("periodic publish of topic " << (*it)->topic_name << " is now stopped.");
00154                     return true;
00155                 } else if (req.periodic) {
00156                   if (req.periodic_rate > (*it)->rate) {
00157                     (*it)->periodic = true;
00158                     (*it)->periodic_rate = ros::Duration(req.periodic_rate);
00159                     ROS_INFO_STREAM("periodic publish of topic is started in the existing thread. topic:" << (*it)->topic_name << " rate:" << (*it)->periodic_rate);
00160                     return true;
00161                   } else {
00162                     ROS_ERROR_STREAM("Invalid periodic rate is set at " << (*it)->topic_name << ". rate must be 0.0 < " << 1.0/req.periodic_rate.toSec() << " < " << 1.0/((*it)->rate).toSec());
00163                     return false;
00164                     }
00165                 } else {
00166                   // single publish
00167                   ROS_INFO_STREAM("service (update) " << (*it)->topic_name << " running at " << 1.0/((*it)->rate).toSec() << " Hz");
00168                   (*it)->pub.publish((*it)->msg);
00169                   res.rate = (*it)->rate.toSec();
00170                   ROS_INFO_STREAM("service (update) is called, req.topic:" << req.topic_name << ", res.rate " << res.rate);
00171                   return true;
00172                 }
00173             }
00174         }
00175     ROS_ERROR_STREAM("could not find topic named " << req.topic_name );
00176     return false;
00177 }
00178 
00179 
00180 int main(int argc, char **argv)
00181 {
00182     vector<string> args;
00183     ros::removeROSArgs(argc, (const char**)argv, args);
00184 
00185     if (args.size() < 2)
00186         {
00187             printf("\nusage: topic_buffer_Server IN_TOPIC1 [IN_TOPIC2 [...]]\n\n");
00188             return 1;
00189         }
00190 
00191     ros::init(argc, argv, "topic_buffer_server");
00192     vector<string> topics;
00193     for (unsigned int i = 1; i < args.size(); i++)
00194         topics.push_back(args[i]);
00195 
00196     ros::NodeHandle n;
00197     ros::NodeHandle nh("~");
00198 
00199     g_node = &n;
00200 
00201     double periodic_rate = 0.1; // 10Hz
00202     if (nh.hasParam("periodic_rate")) {
00203       use_periodic_rate = true;
00204       nh.param ("periodic_rate", periodic_rate, 0.1);
00205       ROS_INFO("use periodic rate = %f", periodic_rate);
00206     }
00207 
00208     for (size_t i = 0; i < topics.size(); i++)
00209         {
00210             //sub_info_t sub_info;
00211             boost::shared_ptr<sub_info_t> sub_info(new sub_info_t);
00212             sub_info->topic_name = ros::names::resolve(topics[i]);
00213             sub_info->last_time_received = ros::Time(0);
00214             sub_info->rate = ros::Duration(0);
00215             sub_info->advertised = false;
00216             sub_info->periodic = false;
00217             ROS_INFO_STREAM("subscribe " << sub_info->topic_name);
00218             sub_info->sub = new ros::Subscriber(n.subscribe<ShapeShifter>(sub_info->topic_name, 10, boost::bind(in_cb, _1, sub_info)));
00219 
00220             // waiting for all topics are publisherd
00221             while (sub_info->sub->getNumPublishers() == 0 ) {
00222                 ros::Duration(1.0).sleep();
00223                 ROS_WARN_STREAM("wait for " << sub_info->topic_name << " ... " << i << "/" << topics.size());
00224             }
00225 
00226             g_subs.push_back(sub_info);
00227         }
00228     ROS_INFO_STREAM("setup done, subscribed " << topics.size() << " topics");
00229 
00230     if(use_periodic_rate) {
00231       for (list<sub_info_ref>::iterator it = g_subs.begin(); it != g_subs.end(); ++it) {
00232         (*it)->periodic = true;
00233         (*it)->periodic_rate = ros::Duration(periodic_rate);
00234         (*it)->periodic_thread = boost::thread(boost::bind(&sub_info_t::periodic_update_topic, &(**it)));
00235         ROS_INFO_STREAM("periodic publish of topic is started in the new thread. topic:" << (*it)->topic_name << " rate:" << (*it)->periodic_rate);
00236       }
00237     }
00238 
00239     // New service
00240     ros::ServiceServer ss_list = nh.advertiseService(string("list"), list_topic_cb);
00241 
00242     ros::ServiceServer ss_update = nh.advertiseService(string("update"), update_topic_cb);
00243     ros::Subscriber ss_update_sub = nh.subscribe("update", 1, update_topic_cb);
00244     ros::spin();
00245 
00246     return 0;
00247 }


jsk_topic_tools
Author(s): Kei Okada , Yusuke Furuta
autogenerated on Sun Jan 25 2015 12:38:34