client.cpp
Go to the documentation of this file.
00001 #include <ros/ros.h>
00002 #include <topic_proxy/topic_proxy.h>
00003 #include <blob/shape_shifter.h>
00004 
00005 #include <topic_proxy/RequestMessage.h>
00006 #include <topic_proxy/AddPublisher.h>
00007 
00008 namespace topic_proxy
00009 {
00010 
00011 using blob::ShapeShifter;
00012 
00013 class Client : public TopicProxy
00014 {
00015 private:
00016   ros::NodeHandle nh_;
00017   ros::ServiceServer request_message_service_;
00018   ros::ServiceServer add_publisher_service_;
00019 
00020   struct SubscriptionInfo
00021   {
00022     std::string local_topic;
00023     std::string datatype;
00024     std::string md5sum;
00025     std::string message_definition;
00026 
00027     ros::Publisher publisher;
00028     bool latch;
00029     ros::Timer timer;
00030     GetMessage::Request request;
00031   };
00032   typedef boost::shared_ptr<SubscriptionInfo> SubscriptionInfoPtr;
00033   std::map<std::string, SubscriptionInfoPtr> subscriptions_;
00034 
00035   struct PublicationInfo
00036   {
00037     std::string remote_topic;
00038     ros::Subscriber subscriber;
00039     bool latch;
00040     bool compressed;
00041   };
00042   typedef boost::shared_ptr<PublicationInfo> PublicationInfoPtr;
00043   std::map<std::string, PublicationInfoPtr> publications_;
00044 
00045   std::string topic_prefix_;
00046 
00047 public:
00048   Client(const std::string& ns)
00049     : TopicProxy()
00050     , nh_(ns)
00051   {
00052     init();
00053   }
00054 
00055   Client(const std::string& host, uint32_t port, const std::string& ns = std::string())
00056     : TopicProxy(host, port)
00057     , nh_(ns)
00058   {
00059     init();
00060   }
00061 
00062   bool init() {
00063     if (connect()) {
00064       if (!getHost().empty()) {
00065         ROS_INFO("Connected to topic_proxy server at %s:%u", getHost().c_str(), getTCPPort());
00066       } else {
00067         ROS_INFO("Connected to topic_proxy server at local master %s", ros::master::getURI().c_str());
00068       }
00069     } else {
00070       if (!getHost().empty()) {
00071         ROS_WARN("Could not connect to topic_proxy server at %s:%u", getHost().c_str(), getTCPPort());
00072       } else {
00073         ROS_WARN("Could not connect to topic_proxy server at local master %s", ros::master::getURI().c_str());
00074       }
00075       return false;
00076     }
00077 
00078     ros::param::get("~topic_prefix", topic_prefix_);
00079     request_message_service_ = nh_.advertiseService("request_message", &Client::handleRequestMessage, this);
00080     add_publisher_service_ = nh_.advertiseService("add_publisher", &Client::handleAddPublisher, this);
00081 
00082     XmlRpc::XmlRpcValue subscriptions;
00083     ros::param::get("~subscriptions", subscriptions);
00084     if (subscriptions.getType() == XmlRpc::XmlRpcValue::TypeArray) {
00085       for(int i = 0; i < subscriptions.size(); ++i) {
00086         XmlRpc::XmlRpcValue p = subscriptions[i];
00087         std::string topic;
00088         if (p.getType() == XmlRpc::XmlRpcValue::TypeString) {
00089           topic = static_cast<std::string>(p);
00090         } else if (p.getType() == XmlRpc::XmlRpcValue::TypeStruct) {
00091           if (p.hasMember("topic")) topic = static_cast<std::string>(p["topic"]);
00092         }
00093         if (topic.empty()) continue;
00094 
00095         SubscriptionInfoPtr subscription = getSubscription(topic);
00096         subscription->local_topic = topic;
00097         subscription->request.topic = topic;
00098         ros::Duration interval;
00099         if (p.getType() == XmlRpc::XmlRpcValue::TypeStruct) {
00100           if (p.hasMember("remote_topic")) subscription->request.topic = static_cast<std::string>(p["remote_topic"]);
00101           if (p.hasMember("timeout"))      subscription->request.timeout = ros::Duration(static_cast<double>(p["timeout"]));
00102           if (p.hasMember("compressed"))   subscription->request.compressed = static_cast<bool>(p["compressed"]);
00103           if (p.hasMember("interval"))     interval = ros::Duration(static_cast<double>(p["interval"]));
00104           if (p.hasMember("latch"))        subscription->latch = static_cast<bool>(p["latch"]);
00105         }
00106 
00107         if (!interval.isZero()) {
00108           subscription->timer = nh_.createTimer(interval, boost::bind(&Client::timerCallback, this, subscription, _1));
00109         }
00110       }
00111     }
00112 
00113     XmlRpc::XmlRpcValue publications;
00114     ros::param::get("~publications", publications);
00115     if (publications.getType() == XmlRpc::XmlRpcValue::TypeArray) {
00116       for(int i = 0; i < publications.size(); ++i) {
00117         XmlRpc::XmlRpcValue p = publications[i];
00118         std::string topic;
00119         if (p.getType() == XmlRpc::XmlRpcValue::TypeString) {
00120           topic = static_cast<std::string>(p);
00121         } else if (p.getType() == XmlRpc::XmlRpcValue::TypeStruct) {
00122           if (p.hasMember("topic")) topic = static_cast<std::string>(p["topic"]);
00123         }
00124         if (topic.empty()) continue;
00125 
00126         AddPublisher::Request request;
00127         AddPublisher::Response response;
00128         request.topic = topic;
00129         if (p.getType() == XmlRpc::XmlRpcValue::TypeStruct) {
00130           if (p.hasMember("remote_topic")) request.remote_topic = static_cast<std::string>(p["remote_topic"]);
00131           if (p.hasMember("compressed"))   request.compressed = static_cast<bool>(p["compressed"]);
00132           if (p.hasMember("latch"))        request.latch = static_cast<bool>(p["latch"]);
00133         }
00134 
00135         handleAddPublisher(request, response);
00136       }
00137     }
00138 
00139     return true;
00140   }
00141 
00142   ~Client()
00143   {
00144     clearSubscriptions();
00145     clearPublications();
00146   }
00147 
00148   bool republish(GetMessage::Request& request, bool latch = false)
00149   {
00150     MessageInstanceConstPtr instance = send(request);
00151     if (!instance) {
00152       ROS_ERROR("GetMessage request for topic %s failed", request.topic.c_str());
00153       return false;
00154     }
00155 
00156     // advertise locally
00157     SubscriptionInfoPtr subscription = getSubscription(request.topic);
00158     if (!subscription->publisher
00159         && !instance->type.empty() && !instance->md5sum.empty()) {
00160       if (subscription->local_topic.empty()) subscription->local_topic = request.topic;
00161       std::string advertised_topic = nh_.resolveName(topic_prefix_ + subscription->local_topic);
00162       if (!getHost().empty()) {
00163         ROS_INFO("Advertising topic %s from host %s as %s", request.topic.c_str(), getHost().c_str(), advertised_topic.c_str());
00164       } else {
00165         ROS_INFO("Advertising topic %s as %s", request.topic.c_str(), advertised_topic.c_str());
00166       }
00167 
00168       ros::AdvertiseOptions ops = ros::AdvertiseOptions(advertised_topic, 10, instance->md5sum, instance->type, instance->message_definition,
00169                                                         boost::bind(&Client::connectCallback, this, subscription, _1), boost::bind(&Client::disconnectCallback, this, subscription, _1));
00170       ops.latch = latch;
00171       subscription->publisher = nh_.advertise(ops);
00172     }
00173 
00174     if (instance->blob.empty()) {
00175       ROS_DEBUG("No message received on topic %s", request.topic.c_str());
00176       return false;
00177     }
00178 
00179     subscription->publisher.publish(
00180       instance->blob.asMessage().morph(instance->md5sum, instance->type, instance->message_definition)
00181     );
00182     return true;
00183   }
00184 
00185 protected:
00186   const SubscriptionInfoPtr& getSubscription(const std::string& topic)
00187   {
00188     if (subscriptions_.count(topic)) return subscriptions_.at(topic);
00189     SubscriptionInfoPtr subscription(new SubscriptionInfo());
00190     return subscriptions_.insert(std::pair<std::string, SubscriptionInfoPtr>(topic, subscription)).first->second;
00191   }
00192 
00193   const PublicationInfoPtr& getPublication(const std::string& topic)
00194   {
00195     if (publications_.count(topic)) return publications_.at(topic);
00196     PublicationInfoPtr publication(new PublicationInfo());
00197     return publications_.insert(std::pair<std::string, PublicationInfoPtr>(topic, publication)).first->second;
00198   }
00199 
00200   void publishCallback(const PublicationInfoPtr& publication, const blob::ShapeShifterConstPtr& message)
00201   {
00202     PublishMessage::Request request;
00203     request.message.topic = publication->remote_topic;
00204     request.message.type = message->getDataType();
00205     request.message.md5sum = message->getMD5Sum();
00206     request.message.message_definition = message->getMessageDefinition();
00207     request.message.blob = message->blob();
00208     request.message.blob.setCompressed(publication->compressed);
00209     request.latch = publication->latch;
00210 
00211     if (!send(request)) {
00212       ROS_ERROR("PublishMessage request for topic %s failed", request.message.topic.c_str());
00213     }
00214   }
00215 
00216   void timerCallback(const SubscriptionInfoPtr& subscription, const ros::TimerEvent& event)
00217   {
00218     republish(subscription->request, subscription->latch);
00219   }
00220 
00221   bool handleRequestMessage(RequestMessage::Request& request, RequestMessage::Response& response)
00222   {
00223     SubscriptionInfoPtr subscription = getSubscription(request.topic);
00224     subscription->request.topic = request.topic;
00225     subscription->request.compressed = request.compressed;
00226     subscription->request.timeout = request.timeout;
00227     subscription->latch = request.latch;
00228 
00229     if (request.interval > ros::Duration()) {
00230       // add new timer
00231       subscription->timer = nh_.createTimer(request.interval, boost::bind(&Client::timerCallback, this, subscription, _1));
00232     } else {
00233       // stop otimer
00234       subscription->timer.stop();
00235     }
00236 
00237     // request once
00238     if (request.interval.isZero()) {
00239       return republish(subscription->request);
00240     }
00241 
00242     return true;
00243   }
00244 
00245   bool handleAddPublisher(AddPublisher::Request& request, AddPublisher::Response& response)
00246   {
00247     PublicationInfoPtr publication = getPublication(request.topic);
00248 
00249     if (!publication->subscriber) {
00250       std::string subscribed_topic = nh_.resolveName(topic_prefix_ + request.topic);
00251       if (!getHost().empty()) {
00252         ROS_INFO("Subscribing to topic %s for host %s as %s", request.topic.c_str(), getHost().c_str(), subscribed_topic.c_str());
00253       } else {
00254         ROS_INFO("Subscribing to topic %s as %s", request.topic.c_str(), subscribed_topic.c_str());
00255       }
00256 
00257       publication->subscriber = nh_.subscribe<blob::ShapeShifter>(subscribed_topic, 10, boost::bind(&Client::publishCallback, this, publication, _1));
00258       publication->remote_topic = request.topic;
00259       if (!request.remote_topic.empty()) publication->remote_topic = request.remote_topic;
00260       publication->latch = request.latch;
00261       publication->compressed = request.compressed;
00262     }
00263 
00264     return true;
00265   }
00266 
00267   void clearSubscriptions()
00268   {
00269     for (std::map<std::string, SubscriptionInfoPtr>::iterator it = subscriptions_.begin(); it != subscriptions_.end(); ++it) {
00270       it->second->publisher.shutdown();
00271     }
00272     subscriptions_.clear();
00273   }
00274 
00275   void clearPublications()
00276   {
00277     for (std::map<std::string, PublicationInfoPtr>::iterator it = publications_.begin(); it != publications_.end(); ++it) {
00278       it->second->subscriber.shutdown();
00279     }
00280     publications_.clear();
00281   }
00282 
00283 protected:
00284   void connectCallback(const SubscriptionInfoPtr&, const ros::SingleSubscriberPublisher&)
00285   {
00286   }
00287 
00288   void disconnectCallback(const SubscriptionInfoPtr&, const ros::SingleSubscriberPublisher&)
00289   {
00290   }
00291 };
00292 
00293 }
00294 
00295 int main(int argc, char **argv)
00296 {
00297   ros::init(argc, argv, "topic_proxy_client");
00298 
00299   std::string host;
00300   int port = 0;
00301 
00302   // get host/port from parameter server
00303   ros::param::get("~host", host);
00304   ros::param::get("~port", port);
00305 
00306   // get host/port from command line
00307   if (argc > 1) host = argv[1];
00308   if (argc > 2) port = boost::lexical_cast<uint32_t>(std::string(argv[2]));
00309 
00310   topic_proxy::Client client(host, port);
00311   ros::spin();
00312   return 0;
00313 }


topic_proxy
Author(s): Johannes Meyer
autogenerated on Fri Aug 28 2015 13:23:37