00001 #include <topic_proxy/topic_proxy.h>
00002
00003 #include <ros/ros.h>
00004 #include <ros/callback_queue.h>
00005 #include <ros/serialization.h>
00006
00007 #include <ros/network.h>
00008 #include <ros/connection_manager.h>
00009 #include <ros/transport/transport_tcp.h>
00010
00011 #include <blob/shape_shifter.h>
00012
00013 namespace topic_proxy
00014 {
00015
00016 using blob::ShapeShifter;
00017
00018 class Server
00019 {
00020 private:
00021 ros::NodeHandle nh_;
00022 ros::ServiceServer get_message_server_;
00023 ros::ServiceServer publish_message_server_;
00024
00025 typedef ros::MessageEvent<const ShapeShifter> MessageEvent;
00026
00027 struct SubscriptionInfo {
00028 std::string topic;
00029 ros::Subscriber subscriber;
00030 ros::CallbackQueue callback_queue;
00031 MessageEvent event;
00032 ShapeShifter::ConstPtr last_message;
00033 };
00034 typedef boost::shared_ptr<SubscriptionInfo> SubscriptionInfoPtr;
00035 std::map<std::string, SubscriptionInfoPtr> subscriptions_;
00036
00037 struct PublicationInfo {
00038 std::string topic;
00039 ros::Publisher publisher;
00040 };
00041 typedef boost::shared_ptr<PublicationInfo> PublicationInfoPtr;
00042 std::map<std::string, PublicationInfoPtr> publications_;
00043
00044 public:
00045 Server()
00046 {
00047 get_message_server_ = nh_.advertiseService(g_get_message_service, &Server::handleGetMessage, this);
00048 publish_message_server_ = nh_.advertiseService(g_publish_message_service, &Server::handlePublishMessage, this);
00049 }
00050
00051 ~Server()
00052 {
00053 clearSubscriptions();
00054 clearPublications();
00055 }
00056
00057 const std::string& getHost() const
00058 {
00059 return ros::network::getHost();
00060 }
00061
00062 uint16_t getTCPPort() const
00063 {
00064 return ros::ConnectionManager::instance()->getTCPPort();
00065 }
00066
00067 protected:
00068 const SubscriptionInfoPtr& getSubscription(const std::string& topic)
00069 {
00070 if (subscriptions_.count(topic)) return subscriptions_.at(topic);
00071 SubscriptionInfoPtr subscription(new SubscriptionInfo());
00072 return subscriptions_.insert(std::pair<std::string, SubscriptionInfoPtr>(topic, subscription)).first->second;
00073 }
00074
00075 const PublicationInfoPtr& getPublication(const std::string& topic)
00076 {
00077 if (publications_.count(topic)) return publications_.at(topic);
00078 PublicationInfoPtr publication(new PublicationInfo());
00079 return publications_.insert(std::pair<std::string, PublicationInfoPtr>(topic, publication)).first->second;
00080 }
00081
00082 bool handleGetMessage(GetMessage::Request& request, GetMessage::Response& response)
00083 {
00084 SubscriptionInfoPtr subscription = getSubscription(request.topic);
00085
00086 if (!subscription->subscriber) {
00087 ROS_INFO("Subscribing to topic %s", request.topic.c_str());
00088 ros::SubscribeOptions ops = ros::SubscribeOptions::create<ShapeShifter>(request.topic, 1, boost::bind(&Server::subscriberCallback, this, subscription, _1), ros::VoidConstPtr(), &(subscription->callback_queue));
00089 subscription->subscriber = nh_.subscribe(ops);
00090 subscription->topic = subscription->subscriber.getTopic();
00091 }
00092
00093
00094 ros::WallDuration timeout(request.timeout.sec, request.timeout.nsec);
00095 if (timeout > ros::WallDuration()) {
00096
00097 subscription->callback_queue.clear();
00098 }
00099 subscription->event = MessageEvent();
00100 subscription->callback_queue.callOne(timeout);
00101
00102 ShapeShifter::ConstPtr instance;
00103 try {
00104 instance = subscription->event.getConstMessage();
00105
00106 } catch(ros::Exception& e) {
00107 ROS_ERROR("Catched exception while handling a request for topic %s: %s", request.topic.c_str(), e.what());
00108 return false;
00109 }
00110
00111
00112 if (instance) {
00113
00114 response.message.topic = subscription->topic;
00115 response.message.md5sum = instance->getMD5Sum();
00116 response.message.type = instance->getDataType();
00117 response.message.message_definition = instance->getMessageDefinition();
00118
00119 response.message.blob = instance->blob();
00120 response.message.blob.setCompressed(request.compressed);
00121
00122 subscription->last_message = instance;
00123
00124 } else {
00125
00126 response.message.topic = subscription->topic;
00127 if (subscription->last_message) {
00128 response.message.md5sum = subscription->last_message->getMD5Sum();
00129 response.message.type = subscription->last_message->getDataType();
00130 response.message.message_definition = subscription->last_message->getMessageDefinition();
00131 }
00132 }
00133
00134 return true;
00135 }
00136
00137 void subscriberCallback(const SubscriptionInfoPtr& subscription, const MessageEvent& event)
00138 {
00139 subscription->event = event;
00140 }
00141
00142 bool handlePublishMessage(PublishMessage::Request& request, PublishMessage::Response& response)
00143 {
00144 PublicationInfoPtr publication = getPublication(request.message.topic);
00145
00146 if (!publication->publisher) {
00147 ROS_INFO("Publishing topic %s (%s)", request.message.topic.c_str(), request.message.type.c_str());
00148 ros::AdvertiseOptions ops(request.message.topic, 10, request.message.md5sum, request.message.type, request.message.message_definition,
00149 boost::bind(&Server::connectCallback, this, publication, _1), boost::bind(&Server::disconnectCallback, this, publication, _1));
00150 ops.latch = request.latch;
00151 publication->publisher = nh_.advertise(ops);
00152 publication->topic = publication->publisher.getTopic();
00153 }
00154
00155 publication->publisher.publish(
00156 request.message.blob.asMessage().morph(request.message.md5sum, request.message.type, request.message.message_definition)
00157 );
00158 return true;
00159 }
00160
00161 void clearSubscriptions() {
00162 for (std::map<std::string, SubscriptionInfoPtr>::iterator it = subscriptions_.begin(); it != subscriptions_.end(); ++it) {
00163 it->second->subscriber.shutdown();
00164 }
00165 subscriptions_.clear();
00166 }
00167
00168 void clearPublications() {
00169 for (std::map<std::string, PublicationInfoPtr>::iterator it = publications_.begin(); it != publications_.end(); ++it) {
00170 it->second->publisher.shutdown();
00171 }
00172 publications_.clear();
00173 }
00174
00175 protected:
00176 void connectCallback(const PublicationInfoPtr&, const ros::SingleSubscriberPublisher&)
00177 {}
00178
00179 void disconnectCallback(const PublicationInfoPtr&, const ros::SingleSubscriberPublisher&)
00180 {}
00181 };
00182
00183 }
00184
00185 int main(int argc, char **argv)
00186 {
00187
00188 std::string tcpros_server_port_argv = "__tcpros_server_port:=" + boost::lexical_cast<std::string>(topic_proxy::g_default_port);
00189 std::vector<char *> new_argv;
00190 new_argv.reserve(argc + 1);
00191 new_argv.assign(&argv[0], &argv[argc]);
00192 new_argv.push_back(const_cast<char *>(tcpros_server_port_argv.c_str()));
00193 argc = new_argv.size();
00194 argv = new_argv.data();
00195
00196 ros::init(argc, argv, "topic_proxy_server");
00197 topic_proxy::Server server;
00198 ROS_INFO("Created topic_proxy server listening on %s:%u", server.getHost().c_str(), server.getTCPPort());
00199 ros::spin();
00200 return 0;
00201 }