Go to the documentation of this file.00001 #include <cstdio>
00002 #include <vector>
00003 #include <list>
00004 #include <boost/lambda/lambda.hpp>
00005 #include "ros/ros.h"
00006 #include "ros/header.h"
00007 #include "ros/console.h"
00008 #include "std_msgs/Header.h"
00009 #include "std_msgs/String.h"
00010 #include "jsk_topic_tools/List.h"
00011 #include "jsk_topic_tools/Update.h"
00012 #include "topic_tools/shape_shifter.h"
00013 #include "topic_tools/parse.h"
00014
00015 using std::string;
00016 using std::vector;
00017 using std::list;
00018 using namespace topic_tools;
00019 static bool debug;
00020
00021 class pub_info_t
00022 {
00023 public:
00024 std::string topic_name;
00025 ros::Publisher pub;
00026 ros::Subscriber *sub;
00027 bool advertised;
00028 boost::shared_ptr<ShapeShifter > msg;
00029 ros::Timer timer;
00030 ros::Duration rate;
00031 ros::Time last_time_received;
00032 bool topic_with_header;
00033 bool latched;
00034 uint32_t last_seq_received;
00035 uint32_t topic_received;
00036
00037 void publish(const ros::TimerEvent &event)
00038 {
00039 if ( advertised == false ) return;
00040
00041 topic_received++;
00042 if ( topic_with_header == true ) {
00043 std_msgs::Header header;
00044 uint8_t buf[msg->size()];
00045 ros::serialization::OStream ostream(buf, msg->size());
00046 ros::serialization::IStream istream(buf, msg->size());
00047 msg->write(ostream);
00048 ((uint32_t *)buf)[0] = last_seq_received + topic_received;
00049 ros::Time tmp(last_time_received.toSec() + topic_received * rate.toSec());
00050 ((uint32_t *)buf)[1] = tmp.sec;
00051 ((uint32_t *)buf)[2] = tmp.nsec;
00052 msg->read(istream);
00053 }
00054 if(debug){
00055 ROS_INFO_STREAM("publishing " << topic_name);
00056 }
00057 pub.publish(msg);
00058 }
00059 };
00060
00061 typedef boost::shared_ptr<pub_info_t> pub_info_ref;
00062
00063 static list<pub_info_ref> g_pubs;
00064
00065 static ros::NodeHandle *g_node = NULL;
00066
00067 static bool use_fixed_rate;
00068
00069
00070
00071 void in_cb(const boost::shared_ptr<ShapeShifter const>& msg,
00072 boost::shared_ptr<pub_info_t> s)
00073 {
00074 using namespace boost::lambda;
00075
00076 s->msg = boost::const_pointer_cast<ShapeShifter>(msg);
00077 s->topic_received = 0;
00078 if ( s->advertised == false ) {
00079 s->pub = msg->advertise(*g_node, s->topic_name+string("_buffered"), 10, s->latched);
00080 s->advertised = true;
00081 }
00082 if(debug){
00083 ROS_INFO_STREAM("advertised as " << s->topic_name+string("_buffered") << " running at " << 1/(s->rate.toSec()) << "Hz");
00084 }
00085
00086
00087 {
00088 std_msgs::Header header;
00089 uint8_t buf[msg->size()];
00090 ros::serialization::OStream stream(buf, msg->size());
00091 msg->write(stream);
00092 header.seq = ((uint32_t *)buf)[0];
00093 header.stamp.sec = ((uint32_t *)buf)[1];
00094 header.stamp.nsec = ((uint32_t *)buf)[2];
00095
00096 if ( abs((header.stamp - ros::Time::now()).toSec()) < 5.0 ) {
00097 if(debug){
00098 ROS_INFO_STREAM(" this message contains headers.. seq =" << header.seq << " stamp = " << header.stamp);
00099 }
00100 s->topic_with_header = true;
00101 s->last_seq_received = header.seq;
00102 s->last_time_received = header.stamp;
00103 }
00104 }
00105
00106 {
00107 ros::TimerEvent ev;
00108 s->publish (ev);
00109 }
00110 s->timer = g_node->createTimer(s->rate, &pub_info_t::publish, s);
00111 }
00112
00113
00114 int main(int argc, char **argv)
00115 {
00116 ros::init(argc, argv, "topic_buffer_client", ros::init_options::AnonymousName);
00117
00118 ros::NodeHandle n;
00119 ros::NodeHandle nh("~");
00120
00121 double fixed_rate = 0.1;
00122 if (nh.hasParam("fixed_rate")) {
00123 use_fixed_rate = true;
00124 nh.param ("fixed_rate", fixed_rate, 0.1);
00125 ROS_INFO("use fixed rate = %f", fixed_rate);
00126 }
00127
00128 double update_rate = 10;
00129 if (nh.hasParam("update_rate")) {
00130 nh.param ("update_rate", update_rate, 10.0);
00131 ROS_INFO("use update rate = %f", update_rate);
00132 }
00133
00134 bool latched;
00135 if (nh.hasParam("latched")) {
00136 nh.param ("latched", latched, false);
00137 if(latched) {
00138 ROS_INFO("use latched");
00139 }
00140 }
00141
00142 g_node = &n;
00143 bool use_service_p = false;
00144 std::vector<std::string> target_topics;
00145
00146
00147 XmlRpc::XmlRpcValue topics;
00148 if (!nh.getParam ("topics", topics)) {
00149 ROS_WARN("no ~topics is available, use service interface");
00150 use_service_p = true;
00151 }
00152 else {
00153 switch (topics.getType ()) {
00154 case XmlRpc::XmlRpcValue::TypeArray: {
00155 for (int d = 0; d < topics.size(); ++d) {
00156 target_topics.push_back((std::string)(topics[d]));
00157 }
00158 break;
00159 }
00160 default: {
00161 ROS_WARN("~topics mismatches type, it should be a list, use service interface");
00162 use_service_p = true;
00163 }
00164 }
00165 }
00166
00167 if (use_service_p) {
00168 target_topics.clear();
00169
00170 ros::service::waitForService(string("/list"), -1);
00171 ros::ServiceClient sc_list = n.serviceClient<jsk_topic_tools::List>(string("/list"), true);
00172 jsk_topic_tools::List::Request req;
00173 jsk_topic_tools::List::Response res;
00174 ROS_INFO_STREAM("calling /list");
00175 while ( sc_list.call(req, res) == false) {
00176 ROS_WARN_STREAM("calling /list fails, retry...");
00177 ros::Duration(1).sleep();
00178 }
00179 ROS_WARN_STREAM("calling /list success!!!");
00180 for(vector<std::string>::iterator it = res.topic_names.begin(); it != res.topic_names.end(); ++it) {
00181 target_topics.push_back(*it);
00182 }
00183 ROS_INFO_STREAM("calling /list has done.. found " << res.topic_names.size() << " topics to publish");
00184 }
00185 for(size_t i = 0; i < target_topics.size(); i++) {
00186 boost::shared_ptr<pub_info_t> pub_info(new pub_info_t);
00187 pub_info->topic_name = target_topics[i];
00188 if (use_fixed_rate) {
00189 pub_info->rate = ros::Duration(fixed_rate);
00190 }
00191 pub_info->latched = latched;
00192 pub_info->advertised = false;
00193 pub_info->topic_with_header = false;
00194 ROS_INFO_STREAM("subscribe " << pub_info->topic_name+string("_update"));
00195 pub_info->sub = new ros::Subscriber(n.subscribe<ShapeShifter>(pub_info->topic_name+string("_update"), 10, boost::bind(in_cb, _1, pub_info)));
00196
00197 g_pubs.push_back(pub_info);
00198 }
00199
00200 ros::Rate rate_loop(100);
00201 ros::Time last_updated;
00202
00203 bool use_service = true;
00204 nh.param("use_service", use_service, true);
00205
00206 nh.param("debug", debug, false);
00207
00208 ros::ServiceClient sc_update = n.serviceClient<jsk_topic_tools::Update>(string("/update"), true);
00209 ros::Publisher pub_update;
00210 if (!use_service) {
00211 pub_update = n.advertise<std_msgs::String>("/update", 1);
00212 }
00213 while ( ros::ok() ) {
00214
00215 if ( ((ros::Time::now() - last_updated) > ros::Duration(update_rate)) ) {
00216 for (list<pub_info_ref>::iterator it = g_pubs.begin();
00217 it != g_pubs.end();
00218 ++it) {
00219
00220 if (use_service) {
00221 jsk_topic_tools::Update::Request req;
00222 jsk_topic_tools::Update::Response res;
00223 req.topic_name = (*it)->topic_name;
00224 if ( sc_update.call(req, res) == false ) {
00225 ROS_ERROR_STREAM("calling /update (" << req.topic_name << ") fails, retry...");
00226 sc_update = n.serviceClient<jsk_topic_tools::Update>(string("/update"), true);
00227 continue;
00228 }
00229 (*it)->rate = ros::Duration(res.rate);
00230 if(debug){
00231 ROS_INFO_STREAM("calling /update " << req.topic_name << " .. " << res.rate);
00232 }
00233 }
00234 else {
00235 std_msgs::String msg;
00236 msg.data = (*it)->topic_name;
00237 pub_update.publish(msg);
00238 if(debug){
00239 ROS_INFO_STREAM("publishing /update " << msg.data);
00240 }
00241 }
00242 }
00243 last_updated = ros::Time::now();
00244 }
00245
00246 ros::spinOnce();
00247 rate_loop.sleep();
00248 }
00249
00250 return 0;
00251 }