server.cpp
Go to the documentation of this file.
00001 #include <topic_proxy/topic_proxy.h>
00002 
00003 #include <ros/ros.h>
00004 #include <ros/callback_queue.h>
00005 #include <ros/serialization.h>
00006 
00007 #include <ros/network.h>
00008 #include <ros/connection_manager.h>
00009 #include <ros/transport/transport_tcp.h>
00010 
00011 #include <blob/shape_shifter.h>
00012 
00013 namespace topic_proxy
00014 {
00015 
00016 using blob::ShapeShifter;
00017 
00018 class Server
00019 {
00020 private:
00021   ros::NodeHandle nh_;
00022   ros::ServiceServer get_message_server_;
00023   ros::ServiceServer publish_message_server_;
00024 
00025   typedef ros::MessageEvent<const ShapeShifter> MessageEvent;
00026 
00027   struct SubscriptionInfo {
00028     std::string topic;
00029     ros::Subscriber subscriber;
00030     ros::CallbackQueue callback_queue;
00031     MessageEvent event;
00032     ShapeShifter::ConstPtr last_message;
00033   };
00034   typedef boost::shared_ptr<SubscriptionInfo> SubscriptionInfoPtr;
00035   std::map<std::string, SubscriptionInfoPtr> subscriptions_;
00036 
00037   struct PublicationInfo {
00038     std::string topic;
00039     ros::Publisher publisher;
00040   };
00041   typedef boost::shared_ptr<PublicationInfo> PublicationInfoPtr;
00042   std::map<std::string, PublicationInfoPtr> publications_;
00043 
00044 public:
00045   Server()
00046   {
00047     get_message_server_     = nh_.advertiseService(g_get_message_service, &Server::handleGetMessage, this);
00048     publish_message_server_ = nh_.advertiseService(g_publish_message_service, &Server::handlePublishMessage, this);
00049   }
00050 
00051   ~Server()
00052   {
00053     clearSubscriptions();
00054     clearPublications();
00055   }
00056 
00057   const std::string& getHost() const
00058   {
00059     return ros::network::getHost();
00060   }
00061 
00062   uint16_t getTCPPort() const
00063   {
00064     return ros::ConnectionManager::instance()->getTCPPort();
00065   }
00066 
00067 protected:  
00068   const SubscriptionInfoPtr& getSubscription(const std::string& topic)
00069   {
00070     if (subscriptions_.count(topic)) return subscriptions_.at(topic);
00071     SubscriptionInfoPtr subscription(new SubscriptionInfo());
00072     return subscriptions_.insert(std::pair<std::string, SubscriptionInfoPtr>(topic, subscription)).first->second;
00073   }
00074 
00075   const PublicationInfoPtr& getPublication(const std::string& topic)
00076   {
00077     if (publications_.count(topic)) return publications_.at(topic);
00078     PublicationInfoPtr publication(new PublicationInfo());
00079     return publications_.insert(std::pair<std::string, PublicationInfoPtr>(topic, publication)).first->second;
00080   }
00081 
00082   bool handleGetMessage(GetMessage::Request& request, GetMessage::Response& response)
00083   {
00084     SubscriptionInfoPtr subscription = getSubscription(request.topic);
00085 
00086     if (!subscription->subscriber) {
00087       ROS_INFO("Subscribing to topic %s", request.topic.c_str());
00088       ros::SubscribeOptions ops = ros::SubscribeOptions::create<ShapeShifter>(request.topic, 1, boost::bind(&Server::subscriberCallback, this, subscription, _1), ros::VoidConstPtr(), &(subscription->callback_queue));
00089       subscription->subscriber = nh_.subscribe(ops);
00090       subscription->topic = subscription->subscriber.getTopic();
00091     }
00092 
00093     // wait for exactly one callback and reset event pointer
00094     ros::WallDuration timeout(request.timeout.sec, request.timeout.nsec);
00095     if (timeout > ros::WallDuration()) {
00096       // clear callback queue and ignore all messages received if a timeout was specified
00097       subscription->callback_queue.clear();
00098     }
00099     subscription->event = MessageEvent();
00100     subscription->callback_queue.callOne(timeout);
00101 
00102     ShapeShifter::ConstPtr instance;
00103     try {
00104       instance = subscription->event.getConstMessage();
00105 
00106     } catch(ros::Exception& e) {
00107       ROS_ERROR("Catched exception while handling a request for topic %s: %s", request.topic.c_str(), e.what());
00108       return false;
00109     }
00110 
00111     // any message message has been received?
00112     if (instance) {
00113       // fill response
00114       response.message.topic = subscription->topic;
00115       response.message.md5sum = instance->getMD5Sum();
00116       response.message.type = instance->getDataType();
00117       response.message.message_definition = instance->getMessageDefinition();
00118       // response.message.latching = subscription->event.getConnectionHeader()["latching"];
00119       response.message.blob = instance->blob();
00120       response.message.blob.setCompressed(request.compressed);
00121 
00122       subscription->last_message = instance;
00123 
00124     } else {
00125       // fill response from last message (but without data)
00126       response.message.topic = subscription->topic;
00127       if (subscription->last_message) {
00128         response.message.md5sum = subscription->last_message->getMD5Sum();
00129         response.message.type = subscription->last_message->getDataType();
00130         response.message.message_definition = subscription->last_message->getMessageDefinition();
00131       }
00132     }
00133 
00134     return true;
00135   }
00136 
00137   void subscriberCallback(const SubscriptionInfoPtr& subscription, const MessageEvent& event)
00138   {
00139     subscription->event = event;
00140   }
00141 
00142   bool handlePublishMessage(PublishMessage::Request& request, PublishMessage::Response& response)
00143   {
00144     PublicationInfoPtr publication = getPublication(request.message.topic);
00145 
00146     if (!publication->publisher) {
00147       ROS_INFO("Publishing topic %s (%s)", request.message.topic.c_str(), request.message.type.c_str());
00148       ros::AdvertiseOptions ops(request.message.topic, 10, request.message.md5sum, request.message.type, request.message.message_definition,
00149                                 boost::bind(&Server::connectCallback, this, publication, _1), boost::bind(&Server::disconnectCallback, this, publication, _1));
00150       ops.latch = request.latch;
00151       publication->publisher = nh_.advertise(ops);
00152       publication->topic = publication->publisher.getTopic();
00153     }
00154 
00155     publication->publisher.publish(
00156       request.message.blob.asMessage().morph(request.message.md5sum, request.message.type, request.message.message_definition)
00157     );
00158     return true;
00159   }
00160 
00161   void clearSubscriptions() {
00162     for (std::map<std::string, SubscriptionInfoPtr>::iterator it = subscriptions_.begin(); it != subscriptions_.end(); ++it) {
00163       it->second->subscriber.shutdown();
00164     }
00165     subscriptions_.clear();
00166   }
00167 
00168   void clearPublications() {
00169     for (std::map<std::string, PublicationInfoPtr>::iterator it = publications_.begin(); it != publications_.end(); ++it) {
00170       it->second->publisher.shutdown();
00171     }
00172     publications_.clear();
00173   }
00174 
00175 protected:
00176   void connectCallback(const PublicationInfoPtr&, const ros::SingleSubscriberPublisher&)
00177   {}
00178 
00179   void disconnectCallback(const PublicationInfoPtr&, const ros::SingleSubscriberPublisher&)
00180   {}
00181 };
00182 
00183 } // namespace topic_proxy
00184 
00185 int main(int argc, char **argv)
00186 {
00187   // add __tcpros_server_port to remappings
00188   std::string tcpros_server_port_argv = "__tcpros_server_port:=" + boost::lexical_cast<std::string>(topic_proxy::g_default_port);
00189   std::vector<char *> new_argv;
00190   new_argv.reserve(argc + 1);
00191   new_argv.assign(&argv[0], &argv[argc]);
00192   new_argv.push_back(const_cast<char *>(tcpros_server_port_argv.c_str()));
00193   argc = new_argv.size();
00194   argv = new_argv.data();
00195 
00196   ros::init(argc, argv, "topic_proxy_server");
00197   topic_proxy::Server server;
00198   ROS_INFO("Created topic_proxy server listening on %s:%u", server.getHost().c_str(), server.getTCPPort());
00199   ros::spin();
00200   return 0;
00201 }


topic_proxy
Author(s): Johannes Meyer
autogenerated on Fri Aug 28 2015 13:23:37