00001 #include <ros/ros.h>
00002 #include <topic_proxy/topic_proxy.h>
00003 #include <blob/shape_shifter.h>
00004
00005 #include <topic_proxy/RequestMessage.h>
00006 #include <topic_proxy/AddPublisher.h>
00007
00008 namespace topic_proxy
00009 {
00010
00011 using blob::ShapeShifter;
00012
00013 class Client : public TopicProxy
00014 {
00015 private:
00016 ros::NodeHandle nh_;
00017 ros::ServiceServer request_message_service_;
00018 ros::ServiceServer add_publisher_service_;
00019
00020 struct SubscriptionInfo
00021 {
00022 std::string local_topic;
00023 std::string datatype;
00024 std::string md5sum;
00025 std::string message_definition;
00026
00027 ros::Publisher publisher;
00028 bool latch;
00029 ros::Timer timer;
00030 GetMessage::Request request;
00031 };
00032 typedef boost::shared_ptr<SubscriptionInfo> SubscriptionInfoPtr;
00033 std::map<std::string, SubscriptionInfoPtr> subscriptions_;
00034
00035 struct PublicationInfo
00036 {
00037 std::string remote_topic;
00038 ros::Subscriber subscriber;
00039 bool latch;
00040 bool compressed;
00041 };
00042 typedef boost::shared_ptr<PublicationInfo> PublicationInfoPtr;
00043 std::map<std::string, PublicationInfoPtr> publications_;
00044
00045 std::string topic_prefix_;
00046
00047 public:
00048 Client(const std::string& ns)
00049 : TopicProxy()
00050 , nh_(ns)
00051 {
00052 init();
00053 }
00054
00055 Client(const std::string& host, uint32_t port, const std::string& ns = std::string())
00056 : TopicProxy(host, port)
00057 , nh_(ns)
00058 {
00059 init();
00060 }
00061
00062 bool init() {
00063 if (connect()) {
00064 if (!getHost().empty()) {
00065 ROS_INFO("Connected to topic_proxy server at %s:%u", getHost().c_str(), getTCPPort());
00066 } else {
00067 ROS_INFO("Connected to topic_proxy server at local master %s", ros::master::getURI().c_str());
00068 }
00069 } else {
00070 if (!getHost().empty()) {
00071 ROS_WARN("Could not connect to topic_proxy server at %s:%u", getHost().c_str(), getTCPPort());
00072 } else {
00073 ROS_WARN("Could not connect to topic_proxy server at local master %s", ros::master::getURI().c_str());
00074 }
00075 return false;
00076 }
00077
00078 ros::param::get("~topic_prefix", topic_prefix_);
00079 request_message_service_ = nh_.advertiseService("request_message", &Client::handleRequestMessage, this);
00080 add_publisher_service_ = nh_.advertiseService("add_publisher", &Client::handleAddPublisher, this);
00081
00082 XmlRpc::XmlRpcValue subscriptions;
00083 ros::param::get("~subscriptions", subscriptions);
00084 if (subscriptions.getType() == XmlRpc::XmlRpcValue::TypeArray) {
00085 for(int i = 0; i < subscriptions.size(); ++i) {
00086 XmlRpc::XmlRpcValue p = subscriptions[i];
00087 std::string topic;
00088 if (p.getType() == XmlRpc::XmlRpcValue::TypeString) {
00089 topic = static_cast<std::string>(p);
00090 } else if (p.getType() == XmlRpc::XmlRpcValue::TypeStruct) {
00091 if (p.hasMember("topic")) topic = static_cast<std::string>(p["topic"]);
00092 }
00093 if (topic.empty()) continue;
00094
00095 SubscriptionInfoPtr subscription = getSubscription(topic);
00096 subscription->local_topic = topic;
00097 subscription->request.topic = topic;
00098 ros::Duration interval;
00099 if (p.getType() == XmlRpc::XmlRpcValue::TypeStruct) {
00100 if (p.hasMember("remote_topic")) subscription->request.topic = static_cast<std::string>(p["remote_topic"]);
00101 if (p.hasMember("timeout")) subscription->request.timeout = ros::Duration(static_cast<double>(p["timeout"]));
00102 if (p.hasMember("compressed")) subscription->request.compressed = static_cast<bool>(p["compressed"]);
00103 if (p.hasMember("interval")) interval = ros::Duration(static_cast<double>(p["interval"]));
00104 if (p.hasMember("latch")) subscription->latch = static_cast<bool>(p["latch"]);
00105 }
00106
00107 if (!interval.isZero()) {
00108 subscription->timer = nh_.createTimer(interval, boost::bind(&Client::timerCallback, this, subscription, _1));
00109 }
00110 }
00111 }
00112
00113 XmlRpc::XmlRpcValue publications;
00114 ros::param::get("~publications", publications);
00115 if (publications.getType() == XmlRpc::XmlRpcValue::TypeArray) {
00116 for(int i = 0; i < publications.size(); ++i) {
00117 XmlRpc::XmlRpcValue p = publications[i];
00118 std::string topic;
00119 if (p.getType() == XmlRpc::XmlRpcValue::TypeString) {
00120 topic = static_cast<std::string>(p);
00121 } else if (p.getType() == XmlRpc::XmlRpcValue::TypeStruct) {
00122 if (p.hasMember("topic")) topic = static_cast<std::string>(p["topic"]);
00123 }
00124 if (topic.empty()) continue;
00125
00126 AddPublisher::Request request;
00127 AddPublisher::Response response;
00128 request.topic = topic;
00129 if (p.getType() == XmlRpc::XmlRpcValue::TypeStruct) {
00130 if (p.hasMember("remote_topic")) request.remote_topic = static_cast<std::string>(p["remote_topic"]);
00131 if (p.hasMember("compressed")) request.compressed = static_cast<bool>(p["compressed"]);
00132 if (p.hasMember("latch")) request.latch = static_cast<bool>(p["latch"]);
00133 }
00134
00135 handleAddPublisher(request, response);
00136 }
00137 }
00138
00139 return true;
00140 }
00141
00142 ~Client()
00143 {
00144 clearSubscriptions();
00145 clearPublications();
00146 }
00147
00148 bool republish(GetMessage::Request& request, bool latch = false)
00149 {
00150 republish(request, request.topic, latch);
00151 }
00152
00153 bool republish(GetMessage::Request& request, const std::string& topic, bool latch = false)
00154 {
00155 MessageInstanceConstPtr instance = send(request);
00156 if (!instance) {
00157 ROS_ERROR("GetMessage request for topic %s failed", request.topic.c_str());
00158 return false;
00159 }
00160
00161
00162 SubscriptionInfoPtr subscription = getSubscription(topic);
00163 if (!subscription->publisher
00164 && !instance->type.empty() && !instance->md5sum.empty()) {
00165 if (subscription->local_topic.empty()) subscription->local_topic = topic;
00166 std::string advertised_topic = nh_.resolveName(topic_prefix_ + subscription->local_topic);
00167 if (!getHost().empty()) {
00168 ROS_INFO("Advertising topic %s from host %s as %s", request.topic.c_str(), getHost().c_str(), advertised_topic.c_str());
00169 } else {
00170 ROS_INFO("Advertising topic %s as %s", request.topic.c_str(), advertised_topic.c_str());
00171 }
00172
00173 ros::AdvertiseOptions ops = ros::AdvertiseOptions(advertised_topic, 10, instance->md5sum, instance->type, instance->message_definition,
00174 boost::bind(&Client::connectCallback, this, subscription, _1), boost::bind(&Client::disconnectCallback, this, subscription, _1));
00175 ops.latch = latch;
00176 subscription->publisher = nh_.advertise(ops);
00177 }
00178
00179 if (instance->blob.empty()) {
00180 ROS_DEBUG("No message received on topic %s", request.topic.c_str());
00181 return false;
00182 }
00183
00184 subscription->publisher.publish(
00185 instance->blob.asMessage().morph(instance->md5sum, instance->type, instance->message_definition)
00186 );
00187 return true;
00188 }
00189
00190 protected:
00191 const SubscriptionInfoPtr& getSubscription(const std::string& topic)
00192 {
00193 if (subscriptions_.count(topic)) return subscriptions_.at(topic);
00194 SubscriptionInfoPtr subscription(new SubscriptionInfo());
00195 return subscriptions_.insert(std::pair<std::string, SubscriptionInfoPtr>(topic, subscription)).first->second;
00196 }
00197
00198 const PublicationInfoPtr& getPublication(const std::string& topic)
00199 {
00200 if (publications_.count(topic)) return publications_.at(topic);
00201 PublicationInfoPtr publication(new PublicationInfo());
00202 return publications_.insert(std::pair<std::string, PublicationInfoPtr>(topic, publication)).first->second;
00203 }
00204
00205 void publishCallback(const PublicationInfoPtr& publication, const blob::ShapeShifterConstPtr& message)
00206 {
00207 PublishMessage::Request request;
00208 request.message.topic = publication->remote_topic;
00209 request.message.type = message->getDataType();
00210 request.message.md5sum = message->getMD5Sum();
00211 request.message.message_definition = message->getMessageDefinition();
00212 request.message.blob = message->blob();
00213 request.message.blob.setCompressed(publication->compressed);
00214 request.latch = publication->latch;
00215
00216 if (!send(request)) {
00217 ROS_ERROR("PublishMessage request for topic %s failed", request.message.topic.c_str());
00218 }
00219 }
00220
00221 void timerCallback(const SubscriptionInfoPtr& subscription, const ros::TimerEvent& event)
00222 {
00223 republish(subscription->request, subscription->local_topic, subscription->latch);
00224 }
00225
00226 bool handleRequestMessage(RequestMessage::Request& request, RequestMessage::Response& response)
00227 {
00228 SubscriptionInfoPtr subscription = getSubscription(request.topic);
00229 subscription->request.topic = request.remote_topic;
00230 subscription->request.compressed = request.compressed;
00231 subscription->request.timeout = request.timeout;
00232 subscription->latch = request.latch;
00233 subscription->local_topic = request.topic;
00234
00235 if (request.interval > ros::Duration()) {
00236
00237 subscription->timer = nh_.createTimer(request.interval, boost::bind(&Client::timerCallback, this, subscription, _1));
00238 } else {
00239
00240 subscription->timer.stop();
00241 }
00242
00243
00244 if (request.interval.isZero()) {
00245 return republish(subscription->request, request.topic, request.latch);
00246 }
00247
00248 return true;
00249 }
00250
00251 bool handleAddPublisher(AddPublisher::Request& request, AddPublisher::Response& response)
00252 {
00253 PublicationInfoPtr publication = getPublication(request.topic);
00254
00255 if (!publication->subscriber) {
00256 std::string subscribed_topic = nh_.resolveName(topic_prefix_ + request.topic);
00257 if (!getHost().empty()) {
00258 ROS_INFO("Subscribing to topic %s for host %s as %s", request.topic.c_str(), getHost().c_str(), subscribed_topic.c_str());
00259 } else {
00260 ROS_INFO("Subscribing to topic %s as %s", request.topic.c_str(), subscribed_topic.c_str());
00261 }
00262
00263 publication->subscriber = nh_.subscribe<blob::ShapeShifter>(subscribed_topic, 10, boost::bind(&Client::publishCallback, this, publication, _1));
00264 publication->remote_topic = request.topic;
00265 if (!request.remote_topic.empty()) publication->remote_topic = request.remote_topic;
00266 publication->latch = request.latch;
00267 publication->compressed = request.compressed;
00268 }
00269
00270 return true;
00271 }
00272
00273 void clearSubscriptions()
00274 {
00275 for (std::map<std::string, SubscriptionInfoPtr>::iterator it = subscriptions_.begin(); it != subscriptions_.end(); ++it) {
00276 it->second->publisher.shutdown();
00277 }
00278 subscriptions_.clear();
00279 }
00280
00281 void clearPublications()
00282 {
00283 for (std::map<std::string, PublicationInfoPtr>::iterator it = publications_.begin(); it != publications_.end(); ++it) {
00284 it->second->subscriber.shutdown();
00285 }
00286 publications_.clear();
00287 }
00288
00289 protected:
00290 void connectCallback(const SubscriptionInfoPtr&, const ros::SingleSubscriberPublisher&)
00291 {
00292 }
00293
00294 void disconnectCallback(const SubscriptionInfoPtr&, const ros::SingleSubscriberPublisher&)
00295 {
00296 }
00297 };
00298
00299 }
00300
00301 int main(int argc, char **argv)
00302 {
00303 ros::init(argc, argv, "topic_proxy_client");
00304
00305 std::string host;
00306 int port = 0;
00307
00308
00309 ros::param::get("~host", host);
00310 ros::param::get("~port", port);
00311
00312
00313 if (argc > 1) host = argv[1];
00314 if (argc > 2) port = boost::lexical_cast<uint32_t>(std::string(argv[2]));
00315
00316 topic_proxy::Client client(host, port);
00317 ros::spin();
00318 return 0;
00319 }