00001 #include <cstdio>
00002 #include <vector>
00003 #include <list>
00004 #include <boost/lambda/lambda.hpp>
00005 #include "ros/ros.h"
00006 #include "ros/header.h"
00007 #include "ros/console.h"
00008 #include "std_msgs/Header.h"
00009 #include "std_msgs/String.h"
00010 #include "jsk_topic_tools/List.h"
00011 #include "jsk_topic_tools/Update.h"
00012 #include "topic_tools/shape_shifter.h"
00013 #include "topic_tools/parse.h"
00014
00015 using std::string;
00016 using std::vector;
00017 using std::list;
00018 using namespace topic_tools;
00019 static bool debug;
00020
00021 class pub_info_t
00022 {
00023 public:
00024 std::string topic_name;
00025 ros::Publisher pub;
00026 ros::Subscriber *sub;
00027 bool advertised;
00028 boost::shared_ptr<ShapeShifter > msg;
00029 ros::Timer timer;
00030 ros::Duration rate;
00031 ros::Time last_time_received;
00032 bool topic_with_header;
00033 bool latched;
00034 uint32_t last_seq_received;
00035 uint32_t topic_received;
00036
00037 void publish(const ros::TimerEvent &event)
00038 {
00039 if ( advertised == false ) return;
00040
00041 topic_received++;
00042 if ( topic_with_header == true ) {
00043 std_msgs::Header header;
00044 uint8_t buf[msg->size()];
00045 ros::serialization::OStream ostream(buf, msg->size());
00046 ros::serialization::IStream istream(buf, msg->size());
00047 msg->write(ostream);
00048 ((uint32_t *)buf)[0] = last_seq_received + topic_received;
00049 ros::Time tmp(last_time_received.toSec() + topic_received * rate.toSec());
00050 ((uint32_t *)buf)[1] = tmp.sec;
00051 ((uint32_t *)buf)[2] = tmp.nsec;
00052 msg->read(istream);
00053 }
00054 if(debug){
00055 ROS_INFO_STREAM("publishing " << topic_name);
00056 }
00057 pub.publish(msg);
00058 }
00059 };
00060
00061 typedef boost::shared_ptr<pub_info_t> pub_info_ref;
00062
00063 static list<pub_info_ref> g_pubs;
00064
00065 static ros::NodeHandle *g_node = NULL;
00066
00067 static bool use_fixed_rate;
00068 static bool use_periodic_rate = false;
00069
00070
00071
00072 void in_cb(const boost::shared_ptr<ShapeShifter const>& msg,
00073 boost::shared_ptr<pub_info_t> s)
00074 {
00075 using namespace boost::lambda;
00076
00077 s->msg = boost::const_pointer_cast<ShapeShifter>(msg);
00078 s->topic_received = 0;
00079 if ( s->advertised == false ) {
00080 s->pub = msg->advertise(*g_node, s->topic_name+string("_buffered"), 10, s->latched);
00081 s->advertised = true;
00082 }
00083 if(debug){
00084 ROS_INFO_STREAM("advertised as " << s->topic_name+string("_buffered") << " running at " << 1/(s->rate.toSec()) << "Hz");
00085 }
00086
00087
00088 {
00089 std_msgs::Header header;
00090 uint8_t buf[msg->size()];
00091 ros::serialization::OStream stream(buf, msg->size());
00092 msg->write(stream);
00093 header.seq = ((uint32_t *)buf)[0];
00094 header.stamp.sec = ((uint32_t *)buf)[1];
00095 header.stamp.nsec = ((uint32_t *)buf)[2];
00096
00097 if ( abs((header.stamp - ros::Time::now()).toSec()) < 5.0 ) {
00098 if(debug){
00099 ROS_INFO_STREAM(" this message contains headers.. seq =" << header.seq << " stamp = " << header.stamp);
00100 }
00101 s->topic_with_header = true;
00102 s->last_seq_received = header.seq;
00103 s->last_time_received = header.stamp;
00104 }
00105 }
00106
00107 {
00108 ros::TimerEvent ev;
00109 s->publish (ev);
00110 }
00111 s->timer = g_node->createTimer(s->rate, &pub_info_t::publish, s);
00112 }
00113
00114
00115 int main(int argc, char **argv)
00116 {
00117 ros::init(argc, argv, "topic_buffer_client", ros::init_options::AnonymousName);
00118
00119 ros::NodeHandle n;
00120 ros::NodeHandle nh("~");
00121
00122 double fixed_rate = 0.1;
00123 if (nh.hasParam("fixed_rate")) {
00124 use_fixed_rate = true;
00125 nh.param ("fixed_rate", fixed_rate, 0.1);
00126 ROS_INFO("use fixed rate = %f", fixed_rate);
00127 }
00128
00129 double update_rate = 10;
00130 if (nh.hasParam("update_rate")) {
00131 nh.param ("update_rate", update_rate, 10.0);
00132 ROS_INFO("use update rate = %f", update_rate);
00133 }
00134
00135 double periodic_rate = 0.1;
00136 if (nh.hasParam("periodic_rate")) {
00137 use_periodic_rate = true;
00138 nh.param ("periodic_rate", periodic_rate, 0.1);
00139 ROS_INFO("use periodic rate = %f", periodic_rate);
00140 }
00141
00142 bool latched;
00143 if (nh.hasParam("latched")) {
00144 nh.param ("latched", latched, false);
00145 if(latched) {
00146 ROS_INFO("use latched");
00147 }
00148 }
00149
00150 g_node = &n;
00151 bool use_service_p = false;
00152 std::vector<std::string> target_topics;
00153
00154
00155 XmlRpc::XmlRpcValue topics;
00156 if (!nh.getParam ("topics", topics)) {
00157 ROS_WARN("no ~topics is available, use service interface");
00158 use_service_p = true;
00159 }
00160 else {
00161 switch (topics.getType ()) {
00162 case XmlRpc::XmlRpcValue::TypeArray: {
00163 for (int d = 0; d < topics.size(); ++d) {
00164 target_topics.push_back((std::string)(topics[d]));
00165 }
00166 break;
00167 }
00168 default: {
00169 ROS_WARN("~topics mismatches type, it should be a list, use service interface");
00170 use_service_p = true;
00171 }
00172 }
00173 }
00174
00175 if (use_service_p) {
00176 target_topics.clear();
00177
00178 ros::service::waitForService(string("/list"), -1);
00179 ros::ServiceClient sc_list = n.serviceClient<jsk_topic_tools::List>(string("/list"), true);
00180 jsk_topic_tools::List::Request req;
00181 jsk_topic_tools::List::Response res;
00182 ROS_INFO_STREAM("calling " << sc_list.getService());
00183 while ( sc_list.call(req, res) == false) {
00184 ROS_WARN_STREAM("calling " << sc_list.getService() << " fails, retry...");
00185 ros::Duration(1).sleep();
00186 }
00187 ROS_WARN_STREAM("calling /list success!!!");
00188 for(vector<std::string>::iterator it = res.topic_names.begin(); it != res.topic_names.end(); ++it) {
00189 target_topics.push_back(*it);
00190 }
00191 ROS_INFO_STREAM("calling /list has done.. found " << res.topic_names.size() << " topics to publish");
00192 }
00193 for(size_t i = 0; i < target_topics.size(); i++) {
00194 boost::shared_ptr<pub_info_t> pub_info(new pub_info_t);
00195 pub_info->topic_name = target_topics[i];
00196 if (use_fixed_rate) {
00197 pub_info->rate = ros::Duration(fixed_rate);
00198 }
00199 pub_info->latched = latched;
00200 pub_info->advertised = false;
00201 pub_info->topic_with_header = false;
00202 ROS_INFO_STREAM("subscribe " << pub_info->topic_name+string("_update"));
00203 pub_info->sub = new ros::Subscriber(n.subscribe<ShapeShifter>(pub_info->topic_name+string("_update"), 10, boost::bind(in_cb, _1, pub_info)));
00204
00205 g_pubs.push_back(pub_info);
00206 }
00207
00208 ros::Rate rate_loop(100);
00209 ros::Time last_updated;
00210
00211 bool use_service = true;
00212 nh.param("use_service", use_service, true);
00213
00214 nh.param("debug", debug, false);
00215
00216 ros::ServiceClient sc_update = n.serviceClient<jsk_topic_tools::Update>(string("/update"), true);
00217 ros::Publisher pub_update;
00218 if (!use_service) {
00219 pub_update = n.advertise<std_msgs::String>("/update", 1);
00220 }
00221
00222 if (use_periodic_rate) {
00223 for (list<pub_info_ref>::iterator it = g_pubs.begin();
00224 it != g_pubs.end();
00225 ++it) {
00226 jsk_topic_tools::Update::Request req;
00227 jsk_topic_tools::Update::Response res;
00228 req.topic_name = (*it)->topic_name;
00229 req.periodic = true;
00230 req.periodic_rate = ros::Duration(periodic_rate);
00231 sc_update.call(req, res);
00232 ROS_INFO_STREAM("sending request for periodic rate publish topic:" << req.topic_name << " rate:" << req.periodic_rate);
00233 }
00234 }
00235 while ( ros::ok() ) {
00236
00237 if ( update_rate >= 0 && (ros::Time::now() - last_updated) > ros::Duration(update_rate) ) {
00238 for (list<pub_info_ref>::iterator it = g_pubs.begin();
00239 it != g_pubs.end();
00240 ++it) {
00241
00242 if (use_service) {
00243 jsk_topic_tools::Update::Request req;
00244 jsk_topic_tools::Update::Response res;
00245 req.topic_name = (*it)->topic_name;
00246 if ( sc_update.call(req, res) == false ) {
00247 ROS_ERROR_STREAM("calling /update (" << req.topic_name << ") fails, retry...");
00248 sc_update = n.serviceClient<jsk_topic_tools::Update>(string("/update"), true);
00249 continue;
00250 }
00251 (*it)->rate = ros::Duration(res.rate);
00252 if(debug){
00253 ROS_INFO_STREAM("calling /update " << req.topic_name << " .. " << res.rate);
00254 }
00255 }
00256 else {
00257 std_msgs::String msg;
00258 msg.data = (*it)->topic_name;
00259 pub_update.publish(msg);
00260 if(debug){
00261 ROS_INFO_STREAM("publishing /update " << msg.data);
00262 }
00263 }
00264 }
00265 last_updated = ros::Time::now();
00266 }
00267
00268 ros::spinOnce();
00269 rate_loop.sleep();
00270 }
00271
00272 return 0;
00273 }