topic_buffer_client.cpp
Go to the documentation of this file.
00001 #include <cstdio>
00002 #include <vector>
00003 #include <list>
00004 #include <boost/lambda/lambda.hpp>
00005 #include "ros/ros.h"
00006 #include "ros/header.h"
00007 #include "ros/console.h"
00008 #include "std_msgs/Header.h"
00009 #include "std_msgs/String.h"
00010 #include "jsk_topic_tools/List.h"
00011 #include "jsk_topic_tools/Update.h"
00012 #include "topic_tools/shape_shifter.h"
00013 #include "topic_tools/parse.h"
00014 
00015 using std::string;
00016 using std::vector;
00017 using std::list;
00018 using namespace topic_tools;
00019 static bool debug;
00020 
00021 class pub_info_t
00022 {
00023 public:
00024     std::string topic_name;
00025     ros::Publisher pub;
00026     ros::Subscriber *sub;
00027     bool advertised;
00028     boost::shared_ptr<ShapeShifter > msg;
00029     ros::Timer timer;
00030     ros::Duration rate;
00031     ros::Time last_time_received;
00032     bool topic_with_header;
00033     bool latched;
00034     uint32_t last_seq_received;
00035     uint32_t topic_received;
00036 
00037     void publish(const ros::TimerEvent &event)
00038     {
00039         if ( advertised == false ) return;
00040 
00041         topic_received++;
00042         if ( topic_with_header == true ) {
00043             std_msgs::Header header;
00044             uint8_t buf[msg->size()];
00045             ros::serialization::OStream ostream(buf, msg->size());
00046             ros::serialization::IStream istream(buf, msg->size());
00047             msg->write(ostream);
00048             ((uint32_t *)buf)[0] = last_seq_received + topic_received;
00049             ros::Time tmp(last_time_received.toSec() + topic_received * rate.toSec());
00050             ((uint32_t *)buf)[1] = tmp.sec;
00051             ((uint32_t *)buf)[2] = tmp.nsec;
00052             msg->read(istream);
00053         }
00054         if(debug){
00055           ROS_INFO_STREAM("publishing " << topic_name);
00056         }
00057         pub.publish(msg);
00058     }
00059 };
00060 
00061 typedef boost::shared_ptr<pub_info_t> pub_info_ref;
00062 
00063 static list<pub_info_ref> g_pubs;
00064 
00065 static ros::NodeHandle *g_node = NULL;
00066 
00067 static bool use_fixed_rate;
00068 static bool use_periodic_rate = false;
00069 
00070 
00071 
00072 void in_cb(const boost::shared_ptr<ShapeShifter const>& msg,
00073            boost::shared_ptr<pub_info_t> s)
00074 {
00075     using namespace boost::lambda;
00076 
00077     s->msg = boost::const_pointer_cast<ShapeShifter>(msg);
00078     s->topic_received = 0; // reset topic_received
00079     if ( s->advertised == false ) {
00080         s->pub = msg->advertise(*g_node, s->topic_name+string("_buffered"), 10, s->latched);
00081         s->advertised = true;
00082     }
00083     if(debug){
00084       ROS_INFO_STREAM("advertised as " << s->topic_name+string("_buffered") << " running at " << 1/(s->rate.toSec()) << "Hz");
00085     }
00086 
00087     // check if msg has header
00088     {
00089         std_msgs::Header header;
00090         uint8_t buf[msg->size()];
00091         ros::serialization::OStream stream(buf, msg->size());
00092         msg->write(stream);
00093         header.seq = ((uint32_t *)buf)[0];
00094         header.stamp.sec = ((uint32_t *)buf)[1];
00095         header.stamp.nsec = ((uint32_t *)buf)[2];
00096 
00097         if ( abs((header.stamp - ros::Time::now()).toSec()) < 5.0 ) {
00098           if(debug){
00099             ROS_INFO_STREAM(" this message contains headers.. seq =" <<  header.seq << " stamp = " << header.stamp);
00100           }
00101             s->topic_with_header = true;
00102             s->last_seq_received = header.seq;
00103             s->last_time_received = header.stamp;
00104         }
00105     }
00106     //g_node->createTimer(ros::Duration(0.1), [](const ros::TimerEvent event) { std::cerr << "hoge" << std::endl; });
00107     {// at first publish once
00108       ros::TimerEvent ev;
00109       s->publish (ev);
00110     }
00111     s->timer = g_node->createTimer(s->rate, &pub_info_t::publish, s);
00112 }
00113 
00114 
00115 int main(int argc, char **argv)
00116 {
00117     ros::init(argc, argv, "topic_buffer_client", ros::init_options::AnonymousName);
00118 
00119     ros::NodeHandle n;
00120     ros::NodeHandle nh("~");
00121 
00122     double fixed_rate = 0.1; // 10Hz
00123     if (nh.hasParam("fixed_rate")) {
00124       use_fixed_rate = true;
00125       nh.param ("fixed_rate", fixed_rate, 0.1);
00126       ROS_INFO("use fixed rate = %f", fixed_rate);
00127     }
00128 
00129     double update_rate = 10; // 0.1Hz
00130     if (nh.hasParam("update_rate")) {
00131       nh.param ("update_rate", update_rate, 10.0);
00132       ROS_INFO("use update rate = %f", update_rate);
00133     }
00134 
00135     double periodic_rate = 0.1; // 10Hz
00136     if (nh.hasParam("periodic_rate")) {
00137       use_periodic_rate = true;
00138       nh.param ("periodic_rate", periodic_rate, 0.1);
00139       ROS_INFO("use periodic rate = %f", periodic_rate);
00140     }
00141 
00142     bool latched;
00143     if (nh.hasParam("latched")) {
00144       nh.param ("latched", latched, false);
00145       if(latched) {
00146         ROS_INFO("use latched");
00147       }
00148     }
00149 
00150     g_node = &n;
00151     bool use_service_p = false;
00152     std::vector<std::string> target_topics;
00153     // use service or parameter
00154     // check the parameter first
00155     XmlRpc::XmlRpcValue topics;
00156     if (!nh.getParam ("topics", topics)) {
00157       ROS_WARN("no ~topics is available, use service interface");
00158       use_service_p = true;
00159     }
00160     else {
00161       switch (topics.getType ()) {
00162       case XmlRpc::XmlRpcValue::TypeArray: {
00163         for (int d = 0; d < topics.size(); ++d) {
00164           target_topics.push_back((std::string)(topics[d]));
00165         }
00166         break;
00167       }
00168       default: {
00169         ROS_WARN("~topics mismatches type, it should be a list, use service interface");
00170         use_service_p = true;
00171       }
00172       }
00173     }
00174     
00175     if (use_service_p) {
00176       target_topics.clear();
00177       // New service
00178       ros::service::waitForService(string("/list"), -1);
00179       ros::ServiceClient sc_list = n.serviceClient<jsk_topic_tools::List>(string("/list"), true);
00180       jsk_topic_tools::List::Request req;
00181       jsk_topic_tools::List::Response res;
00182       ROS_INFO_STREAM("calling "  << sc_list.getService());
00183       while ( sc_list.call(req, res) == false) {
00184         ROS_WARN_STREAM("calling " << sc_list.getService() << " fails, retry...");
00185         ros::Duration(1).sleep();
00186       }
00187       ROS_WARN_STREAM("calling /list success!!!");
00188       for(vector<std::string>::iterator it = res.topic_names.begin(); it != res.topic_names.end(); ++it) {
00189         target_topics.push_back(*it);
00190       }
00191       ROS_INFO_STREAM("calling /list has done.. found " << res.topic_names.size() << " topics to publish");
00192     }
00193     for(size_t i = 0; i < target_topics.size(); i++) {
00194         boost::shared_ptr<pub_info_t> pub_info(new pub_info_t);
00195         pub_info->topic_name = target_topics[i];
00196         if (use_fixed_rate) {
00197           pub_info->rate = ros::Duration(fixed_rate);
00198         }
00199         pub_info->latched = latched;
00200         pub_info->advertised = false;
00201         pub_info->topic_with_header = false;
00202         ROS_INFO_STREAM("subscribe " << pub_info->topic_name+string("_update"));
00203         pub_info->sub = new ros::Subscriber(n.subscribe<ShapeShifter>(pub_info->topic_name+string("_update"), 10, boost::bind(in_cb, _1, pub_info)));
00204 
00205         g_pubs.push_back(pub_info);
00206     }
00207 
00208     ros::Rate rate_loop(100);
00209     ros::Time last_updated;
00210 
00211     bool use_service = true;
00212     nh.param("use_service", use_service, true);
00213 
00214     nh.param("debug", debug, false);
00215     
00216     ros::ServiceClient sc_update = n.serviceClient<jsk_topic_tools::Update>(string("/update"), true);
00217     ros::Publisher pub_update;
00218     if (!use_service) {
00219         pub_update = n.advertise<std_msgs::String>("/update", 1);
00220     }
00221 
00222     if (use_periodic_rate) {
00223       for (list<pub_info_ref>::iterator it = g_pubs.begin();
00224            it != g_pubs.end();
00225            ++it) {
00226         jsk_topic_tools::Update::Request req;
00227         jsk_topic_tools::Update::Response res;
00228         req.topic_name = (*it)->topic_name;
00229         req.periodic = true;
00230         req.periodic_rate = ros::Duration(periodic_rate);
00231         sc_update.call(req, res);
00232         ROS_INFO_STREAM("sending request for periodic rate publish  topic:" << req.topic_name << " rate:" << req.periodic_rate);
00233       }
00234     }
00235     while ( ros::ok() ) {
00236 
00237         if ( update_rate >= 0 && (ros::Time::now() - last_updated) > ros::Duration(update_rate) ) {
00238             for (list<pub_info_ref>::iterator it = g_pubs.begin();
00239                  it != g_pubs.end();
00240                  ++it) {
00241                 
00242                 if (use_service) {
00243                   jsk_topic_tools::Update::Request req;
00244                   jsk_topic_tools::Update::Response res;
00245                   req.topic_name = (*it)->topic_name;
00246                   if ( sc_update.call(req, res) == false ) {
00247                     ROS_ERROR_STREAM("calling /update (" << req.topic_name << ") fails, retry...");
00248                     sc_update = n.serviceClient<jsk_topic_tools::Update>(string("/update"), true);
00249                     continue;
00250                   }
00251                   (*it)->rate = ros::Duration(res.rate);
00252                   if(debug){
00253                     ROS_INFO_STREAM("calling /update " << req.topic_name << " .. " << res.rate);
00254                   }
00255                 }
00256                 else {
00257                   std_msgs::String msg;
00258                   msg.data = (*it)->topic_name;
00259                   pub_update.publish(msg);
00260                   if(debug){
00261                     ROS_INFO_STREAM("publishing /update " << msg.data);
00262                   }
00263                 }
00264             }
00265             last_updated = ros::Time::now();
00266         }
00267 
00268         ros::spinOnce();
00269         rate_loop.sleep();
00270     }
00271 
00272     return 0;
00273 }


jsk_topic_tools
Author(s): Kei Okada , Yusuke Furuta
autogenerated on Sun Jan 25 2015 12:38:34