topic_buffer_client.cpp
Go to the documentation of this file.
00001 #include <cstdio>
00002 #include <vector>
00003 #include <list>
00004 #include <boost/lambda/lambda.hpp>
00005 #include "ros/ros.h"
00006 #include "ros/header.h"
00007 #include "ros/console.h"
00008 #include "std_msgs/Header.h"
00009 #include "std_msgs/String.h"
00010 #include "jsk_topic_tools/List.h"
00011 #include "jsk_topic_tools/Update.h"
00012 #include "topic_tools/shape_shifter.h"
00013 #include "topic_tools/parse.h"
00014 
00015 using std::string;
00016 using std::vector;
00017 using std::list;
00018 using namespace topic_tools;
00019 static bool debug;
00020 
00021 class pub_info_t
00022 {
00023 public:
00024     std::string topic_name;
00025     ros::Publisher pub;
00026     ros::Subscriber *sub;
00027     bool advertised;
00028     boost::shared_ptr<ShapeShifter > msg;
00029     ros::Timer timer;
00030     ros::Duration rate;
00031     ros::Time last_time_received;
00032     bool topic_with_header;
00033     bool latched;
00034     uint32_t last_seq_received;
00035     uint32_t topic_received;
00036 
00037     void publish(const ros::TimerEvent &event)
00038     {
00039         if ( advertised == false ) return;
00040 
00041         topic_received++;
00042         if ( topic_with_header == true ) {
00043             std_msgs::Header header;
00044             uint8_t buf[msg->size()];
00045             ros::serialization::OStream ostream(buf, msg->size());
00046             ros::serialization::IStream istream(buf, msg->size());
00047             msg->write(ostream);
00048             ((uint32_t *)buf)[0] = last_seq_received + topic_received;
00049             ros::Time tmp(last_time_received.toSec() + topic_received * rate.toSec());
00050             ((uint32_t *)buf)[1] = tmp.sec;
00051             ((uint32_t *)buf)[2] = tmp.nsec;
00052             msg->read(istream);
00053         }
00054         if(debug){
00055           ROS_INFO_STREAM("publishing " << topic_name);
00056         }
00057         pub.publish(msg);
00058     }
00059 };
00060 
00061 typedef boost::shared_ptr<pub_info_t> pub_info_ref;
00062 
00063 static list<pub_info_ref> g_pubs;
00064 
00065 static ros::NodeHandle *g_node = NULL;
00066 
00067 static bool use_fixed_rate;
00068 
00069 
00070 
00071 void in_cb(const boost::shared_ptr<ShapeShifter const>& msg,
00072            boost::shared_ptr<pub_info_t> s)
00073 {
00074     using namespace boost::lambda;
00075 
00076     s->msg = boost::const_pointer_cast<ShapeShifter>(msg);
00077     s->topic_received = 0; // reset topic_received
00078     if ( s->advertised == false ) {
00079         s->pub = msg->advertise(*g_node, s->topic_name+string("_buffered"), 10, s->latched);
00080         s->advertised = true;
00081     }
00082     if(debug){
00083       ROS_INFO_STREAM("advertised as " << s->topic_name+string("_buffered") << " running at " << 1/(s->rate.toSec()) << "Hz");
00084     }
00085 
00086     // check if msg has header
00087     {
00088         std_msgs::Header header;
00089         uint8_t buf[msg->size()];
00090         ros::serialization::OStream stream(buf, msg->size());
00091         msg->write(stream);
00092         header.seq = ((uint32_t *)buf)[0];
00093         header.stamp.sec = ((uint32_t *)buf)[1];
00094         header.stamp.nsec = ((uint32_t *)buf)[2];
00095 
00096         if ( abs((header.stamp - ros::Time::now()).toSec()) < 5.0 ) {
00097           if(debug){
00098             ROS_INFO_STREAM(" this message contains headers.. seq =" <<  header.seq << " stamp = " << header.stamp);
00099           }
00100             s->topic_with_header = true;
00101             s->last_seq_received = header.seq;
00102             s->last_time_received = header.stamp;
00103         }
00104     }
00105     //g_node->createTimer(ros::Duration(0.1), [](const ros::TimerEvent event) { std::cerr << "hoge" << std::endl; });
00106     {// at first publish once
00107       ros::TimerEvent ev;
00108       s->publish (ev);
00109     }
00110     s->timer = g_node->createTimer(s->rate, &pub_info_t::publish, s);
00111 }
00112 
00113 
00114 int main(int argc, char **argv)
00115 {
00116     ros::init(argc, argv, "topic_buffer_client", ros::init_options::AnonymousName);
00117 
00118     ros::NodeHandle n;
00119     ros::NodeHandle nh("~");
00120 
00121     double fixed_rate = 0.1; // 10Hz
00122     if (nh.hasParam("fixed_rate")) {
00123       use_fixed_rate = true;
00124       nh.param ("fixed_rate", fixed_rate, 0.1);
00125       ROS_INFO("use fixed rate = %f", fixed_rate);
00126     }
00127 
00128     double update_rate = 10; // 0.1Hz
00129     if (nh.hasParam("update_rate")) {
00130       nh.param ("update_rate", update_rate, 10.0);
00131       ROS_INFO("use update rate = %f", update_rate);
00132     }
00133 
00134     bool latched;
00135     if (nh.hasParam("latched")) {
00136       nh.param ("latched", latched, false);
00137       if(latched) {
00138         ROS_INFO("use latched");
00139       }
00140     }
00141 
00142     g_node = &n;
00143     bool use_service_p = false;
00144     std::vector<std::string> target_topics;
00145     // use service or parameter
00146     // check the parameter first
00147     XmlRpc::XmlRpcValue topics;
00148     if (!nh.getParam ("topics", topics)) {
00149       ROS_WARN("no ~topics is available, use service interface");
00150       use_service_p = true;
00151     }
00152     else {
00153       switch (topics.getType ()) {
00154       case XmlRpc::XmlRpcValue::TypeArray: {
00155         for (int d = 0; d < topics.size(); ++d) {
00156           target_topics.push_back((std::string)(topics[d]));
00157         }
00158         break;
00159       }
00160       default: {
00161         ROS_WARN("~topics mismatches type, it should be a list, use service interface");
00162         use_service_p = true;
00163       }
00164       }
00165     }
00166     
00167     if (use_service_p) {
00168       target_topics.clear();
00169       // New service
00170       ros::service::waitForService(string("/list"), -1);
00171       ros::ServiceClient sc_list = n.serviceClient<jsk_topic_tools::List>(string("/list"), true);
00172       jsk_topic_tools::List::Request req;
00173       jsk_topic_tools::List::Response res;
00174       ROS_INFO_STREAM("calling /list");
00175       while ( sc_list.call(req, res) == false) {
00176         ROS_WARN_STREAM("calling /list fails, retry...");
00177         ros::Duration(1).sleep();
00178       }
00179       ROS_WARN_STREAM("calling /list success!!!");
00180       for(vector<std::string>::iterator it = res.topic_names.begin(); it != res.topic_names.end(); ++it) {
00181         target_topics.push_back(*it);
00182       }
00183       ROS_INFO_STREAM("calling /list has done.. found " << res.topic_names.size() << " topics to publish");
00184     }
00185     for(size_t i = 0; i < target_topics.size(); i++) {
00186         boost::shared_ptr<pub_info_t> pub_info(new pub_info_t);
00187         pub_info->topic_name = target_topics[i];
00188         if (use_fixed_rate) {
00189           pub_info->rate = ros::Duration(fixed_rate);
00190         }
00191         pub_info->latched = latched;
00192         pub_info->advertised = false;
00193         pub_info->topic_with_header = false;
00194         ROS_INFO_STREAM("subscribe " << pub_info->topic_name+string("_update"));
00195         pub_info->sub = new ros::Subscriber(n.subscribe<ShapeShifter>(pub_info->topic_name+string("_update"), 10, boost::bind(in_cb, _1, pub_info)));
00196 
00197         g_pubs.push_back(pub_info);
00198     }
00199 
00200     ros::Rate rate_loop(100);
00201     ros::Time last_updated;
00202 
00203     bool use_service = true;
00204     nh.param("use_service", use_service, true);
00205 
00206     nh.param("debug", debug, false);
00207     
00208     ros::ServiceClient sc_update = n.serviceClient<jsk_topic_tools::Update>(string("/update"), true);
00209     ros::Publisher pub_update;
00210     if (!use_service) {
00211         pub_update = n.advertise<std_msgs::String>("/update", 1);
00212     }
00213     while ( ros::ok() ) {
00214 
00215         if ( ((ros::Time::now() - last_updated) > ros::Duration(update_rate)) ) {
00216             for (list<pub_info_ref>::iterator it = g_pubs.begin();
00217                  it != g_pubs.end();
00218                  ++it) {
00219                 
00220                 if (use_service) {
00221                   jsk_topic_tools::Update::Request req;
00222                   jsk_topic_tools::Update::Response res;
00223                   req.topic_name = (*it)->topic_name;
00224                   if ( sc_update.call(req, res) == false ) {
00225                     ROS_ERROR_STREAM("calling /update (" << req.topic_name << ") fails, retry...");
00226                     sc_update = n.serviceClient<jsk_topic_tools::Update>(string("/update"), true);
00227                     continue;
00228                   }
00229                   (*it)->rate = ros::Duration(res.rate);
00230                   if(debug){
00231                     ROS_INFO_STREAM("calling /update " << req.topic_name << " .. " << res.rate);
00232                   }
00233                 }
00234                 else {
00235                   std_msgs::String msg;
00236                   msg.data = (*it)->topic_name;
00237                   pub_update.publish(msg);
00238                   if(debug){
00239                     ROS_INFO_STREAM("publishing /update " << msg.data);
00240                   }
00241                 }
00242             }
00243             last_updated = ros::Time::now();
00244         }
00245 
00246         ros::spinOnce();
00247         rate_loop.sleep();
00248     }
00249 
00250     return 0;
00251 }


jsk_topic_tools
Author(s): Kei Okada , Yusuke Furuta
autogenerated on Mon Oct 6 2014 10:56:17