client.cpp
Go to the documentation of this file.
00001 #include <ros/ros.h>
00002 #include <topic_proxy/topic_proxy.h>
00003 #include <blob/shape_shifter.h>
00004 
00005 #include <topic_proxy/RequestMessage.h>
00006 #include <topic_proxy/AddPublisher.h>
00007 
00008 namespace topic_proxy
00009 {
00010 
00011 using blob::ShapeShifter;
00012 
00013 class Client : public TopicProxy
00014 {
00015 private:
00016   ros::NodeHandle nh_;
00017   ros::ServiceServer request_message_service_;
00018   ros::ServiceServer add_publisher_service_;
00019 
00020   struct SubscriptionInfo
00021   {
00022     std::string local_topic;
00023     std::string datatype;
00024     std::string md5sum;
00025     std::string message_definition;
00026 
00027     ros::Publisher publisher;
00028     bool latch;
00029     ros::Timer timer;
00030     GetMessage::Request request;
00031   };
00032   typedef boost::shared_ptr<SubscriptionInfo> SubscriptionInfoPtr;
00033   std::map<std::string, SubscriptionInfoPtr> subscriptions_;
00034 
00035   struct PublicationInfo
00036   {
00037     std::string remote_topic;
00038     ros::Subscriber subscriber;
00039     bool latch;
00040     bool compressed;
00041   };
00042   typedef boost::shared_ptr<PublicationInfo> PublicationInfoPtr;
00043   std::map<std::string, PublicationInfoPtr> publications_;
00044 
00045   std::string topic_prefix_;
00046 
00047 public:
00048   Client(const std::string& ns)
00049     : TopicProxy()
00050     , nh_(ns)
00051   {
00052     init();
00053   }
00054 
00055   Client(const std::string& host, uint32_t port, const std::string& ns = std::string())
00056     : TopicProxy(host, port)
00057     , nh_(ns)
00058   {
00059     init();
00060   }
00061 
00062   bool init() {
00063     if (connect()) {
00064       if (!getHost().empty()) {
00065         ROS_INFO("Connected to topic_proxy server at %s:%u", getHost().c_str(), getTCPPort());
00066       } else {
00067         ROS_INFO("Connected to topic_proxy server at local master %s", ros::master::getURI().c_str());
00068       }
00069     } else {
00070       if (!getHost().empty()) {
00071         ROS_WARN("Could not connect to topic_proxy server at %s:%u", getHost().c_str(), getTCPPort());
00072       } else {
00073         ROS_WARN("Could not connect to topic_proxy server at local master %s", ros::master::getURI().c_str());
00074       }
00075       return false;
00076     }
00077 
00078     ros::param::get("~topic_prefix", topic_prefix_);
00079     request_message_service_ = nh_.advertiseService("request_message", &Client::handleRequestMessage, this);
00080     add_publisher_service_ = nh_.advertiseService("add_publisher", &Client::handleAddPublisher, this);
00081 
00082     XmlRpc::XmlRpcValue subscriptions;
00083     ros::param::get("~subscriptions", subscriptions);
00084     if (subscriptions.getType() == XmlRpc::XmlRpcValue::TypeArray) {
00085       for(int i = 0; i < subscriptions.size(); ++i) {
00086         XmlRpc::XmlRpcValue p = subscriptions[i];
00087         std::string topic;
00088         if (p.getType() == XmlRpc::XmlRpcValue::TypeString) {
00089           topic = static_cast<std::string>(p);
00090         } else if (p.getType() == XmlRpc::XmlRpcValue::TypeStruct) {
00091           if (p.hasMember("topic")) topic = static_cast<std::string>(p["topic"]);
00092         }
00093         if (topic.empty()) continue;
00094 
00095         SubscriptionInfoPtr subscription = getSubscription(topic);
00096         subscription->local_topic = topic;
00097         subscription->request.topic = topic;
00098         ros::Duration interval;
00099         if (p.getType() == XmlRpc::XmlRpcValue::TypeStruct) {
00100           if (p.hasMember("remote_topic")) subscription->request.topic = static_cast<std::string>(p["remote_topic"]);
00101           if (p.hasMember("timeout"))      subscription->request.timeout = ros::Duration(static_cast<double>(p["timeout"]));
00102           if (p.hasMember("compressed"))   subscription->request.compressed = static_cast<bool>(p["compressed"]);
00103           if (p.hasMember("interval"))     interval = ros::Duration(static_cast<double>(p["interval"]));
00104           if (p.hasMember("latch"))        subscription->latch = static_cast<bool>(p["latch"]);
00105         }
00106 
00107         if (!interval.isZero()) {
00108           subscription->timer = nh_.createTimer(interval, boost::bind(&Client::timerCallback, this, subscription, _1));
00109         }
00110       }
00111     }
00112 
00113     XmlRpc::XmlRpcValue publications;
00114     ros::param::get("~publications", publications);
00115     if (publications.getType() == XmlRpc::XmlRpcValue::TypeArray) {
00116       for(int i = 0; i < publications.size(); ++i) {
00117         XmlRpc::XmlRpcValue p = publications[i];
00118         std::string topic;
00119         if (p.getType() == XmlRpc::XmlRpcValue::TypeString) {
00120           topic = static_cast<std::string>(p);
00121         } else if (p.getType() == XmlRpc::XmlRpcValue::TypeStruct) {
00122           if (p.hasMember("topic")) topic = static_cast<std::string>(p["topic"]);
00123         }
00124         if (topic.empty()) continue;
00125 
00126         AddPublisher::Request request;
00127         AddPublisher::Response response;
00128         request.topic = topic;
00129         if (p.getType() == XmlRpc::XmlRpcValue::TypeStruct) {
00130           if (p.hasMember("remote_topic")) request.remote_topic = static_cast<std::string>(p["remote_topic"]);
00131           if (p.hasMember("compressed"))   request.compressed = static_cast<bool>(p["compressed"]);
00132           if (p.hasMember("latch"))        request.latch = static_cast<bool>(p["latch"]);
00133         }
00134 
00135         handleAddPublisher(request, response);
00136       }
00137     }
00138 
00139     return true;
00140   }
00141 
00142   ~Client()
00143   {
00144     clearSubscriptions();
00145     clearPublications();
00146   }
00147 
00148   bool republish(GetMessage::Request& request, bool latch = false)
00149   {
00150     republish(request, request.topic, latch);
00151   }
00152 
00153   bool republish(GetMessage::Request& request, const std::string& topic, bool latch = false)
00154   {
00155     MessageInstanceConstPtr instance = send(request);
00156     if (!instance) {
00157       ROS_ERROR("GetMessage request for topic %s failed", request.topic.c_str());
00158       return false;
00159     }
00160 
00161     // advertise locally
00162     SubscriptionInfoPtr subscription = getSubscription(topic);
00163     if (!subscription->publisher
00164         && !instance->type.empty() && !instance->md5sum.empty()) {
00165       if (subscription->local_topic.empty()) subscription->local_topic = topic;
00166       std::string advertised_topic = nh_.resolveName(topic_prefix_ + subscription->local_topic);
00167       if (!getHost().empty()) {
00168         ROS_INFO("Advertising topic %s from host %s as %s", request.topic.c_str(), getHost().c_str(), advertised_topic.c_str());
00169       } else {
00170         ROS_INFO("Advertising topic %s as %s", request.topic.c_str(), advertised_topic.c_str());
00171       }
00172 
00173       ros::AdvertiseOptions ops = ros::AdvertiseOptions(advertised_topic, 10, instance->md5sum, instance->type, instance->message_definition,
00174                                                         boost::bind(&Client::connectCallback, this, subscription, _1), boost::bind(&Client::disconnectCallback, this, subscription, _1));
00175       ops.latch = latch;
00176       subscription->publisher = nh_.advertise(ops);
00177     }
00178 
00179     if (instance->blob.empty()) {
00180       ROS_DEBUG("No message received on topic %s", request.topic.c_str());
00181       return false;
00182     }
00183 
00184     subscription->publisher.publish(
00185       instance->blob.asMessage().morph(instance->md5sum, instance->type, instance->message_definition)
00186     );
00187     return true;
00188   }
00189 
00190 protected:
00191   const SubscriptionInfoPtr& getSubscription(const std::string& topic)
00192   {
00193     if (subscriptions_.count(topic)) return subscriptions_.at(topic);
00194     SubscriptionInfoPtr subscription(new SubscriptionInfo());
00195     return subscriptions_.insert(std::pair<std::string, SubscriptionInfoPtr>(topic, subscription)).first->second;
00196   }
00197 
00198   const PublicationInfoPtr& getPublication(const std::string& topic)
00199   {
00200     if (publications_.count(topic)) return publications_.at(topic);
00201     PublicationInfoPtr publication(new PublicationInfo());
00202     return publications_.insert(std::pair<std::string, PublicationInfoPtr>(topic, publication)).first->second;
00203   }
00204 
00205   void publishCallback(const PublicationInfoPtr& publication, const blob::ShapeShifterConstPtr& message)
00206   {
00207     PublishMessage::Request request;
00208     request.message.topic = publication->remote_topic;
00209     request.message.type = message->getDataType();
00210     request.message.md5sum = message->getMD5Sum();
00211     request.message.message_definition = message->getMessageDefinition();
00212     request.message.blob = message->blob();
00213     request.message.blob.setCompressed(publication->compressed);
00214     request.latch = publication->latch;
00215 
00216     if (!send(request)) {
00217       ROS_ERROR("PublishMessage request for topic %s failed", request.message.topic.c_str());
00218     }
00219   }
00220 
00221   void timerCallback(const SubscriptionInfoPtr& subscription, const ros::TimerEvent& event)
00222   {
00223     republish(subscription->request, subscription->local_topic, subscription->latch);
00224   }
00225 
00226   bool handleRequestMessage(RequestMessage::Request& request, RequestMessage::Response& response)
00227   {
00228     SubscriptionInfoPtr subscription = getSubscription(request.topic);
00229     subscription->request.topic = request.remote_topic;
00230     subscription->request.compressed = request.compressed;
00231     subscription->request.timeout = request.timeout;
00232     subscription->latch = request.latch;
00233     subscription->local_topic = request.topic;
00234 
00235     if (request.interval > ros::Duration()) {
00236       // add new timer
00237       subscription->timer = nh_.createTimer(request.interval, boost::bind(&Client::timerCallback, this, subscription, _1));
00238     } else {
00239       // stop otimer
00240       subscription->timer.stop();
00241     }
00242 
00243     // request once
00244     if (request.interval.isZero()) {
00245       return republish(subscription->request, request.topic, request.latch);
00246     }
00247 
00248     return true;
00249   }
00250 
00251   bool handleAddPublisher(AddPublisher::Request& request, AddPublisher::Response& response)
00252   {
00253     PublicationInfoPtr publication = getPublication(request.topic);
00254 
00255     if (!publication->subscriber) {
00256       std::string subscribed_topic = nh_.resolveName(topic_prefix_ + request.topic);
00257       if (!getHost().empty()) {
00258         ROS_INFO("Subscribing to topic %s for host %s as %s", request.topic.c_str(), getHost().c_str(), subscribed_topic.c_str());
00259       } else {
00260         ROS_INFO("Subscribing to topic %s as %s", request.topic.c_str(), subscribed_topic.c_str());
00261       }
00262 
00263       publication->subscriber = nh_.subscribe<blob::ShapeShifter>(subscribed_topic, 10, boost::bind(&Client::publishCallback, this, publication, _1));
00264       publication->remote_topic = request.topic;
00265       if (!request.remote_topic.empty()) publication->remote_topic = request.remote_topic;
00266       publication->latch = request.latch;
00267       publication->compressed = request.compressed;
00268     }
00269 
00270     return true;
00271   }
00272 
00273   void clearSubscriptions()
00274   {
00275     for (std::map<std::string, SubscriptionInfoPtr>::iterator it = subscriptions_.begin(); it != subscriptions_.end(); ++it) {
00276       it->second->publisher.shutdown();
00277     }
00278     subscriptions_.clear();
00279   }
00280 
00281   void clearPublications()
00282   {
00283     for (std::map<std::string, PublicationInfoPtr>::iterator it = publications_.begin(); it != publications_.end(); ++it) {
00284       it->second->subscriber.shutdown();
00285     }
00286     publications_.clear();
00287   }
00288 
00289 protected:
00290   void connectCallback(const SubscriptionInfoPtr&, const ros::SingleSubscriberPublisher&)
00291   {
00292   }
00293 
00294   void disconnectCallback(const SubscriptionInfoPtr&, const ros::SingleSubscriberPublisher&)
00295   {
00296   }
00297 };
00298 
00299 }
00300 
00301 int main(int argc, char **argv)
00302 {
00303   ros::init(argc, argv, "topic_proxy_client");
00304 
00305   std::string host;
00306   int port = 0;
00307 
00308   // get host/port from parameter server
00309   ros::param::get("~host", host);
00310   ros::param::get("~port", port);
00311 
00312   // get host/port from command line
00313   if (argc > 1) host = argv[1];
00314   if (argc > 2) port = boost::lexical_cast<uint32_t>(std::string(argv[2]));
00315 
00316   topic_proxy::Client client(host, port);
00317   ros::spin();
00318   return 0;
00319 }


topic_proxy
Author(s): Johannes Meyer
autogenerated on Thu Jun 6 2019 17:45:27