00001 #include <ros/ros.h>
00002 #include <topic_proxy/topic_proxy.h>
00003 #include <blob/shape_shifter.h>
00004
00005 #include <topic_proxy/RequestMessage.h>
00006 #include <topic_proxy/AddPublisher.h>
00007
00008 namespace topic_proxy
00009 {
00010
00011 using blob::ShapeShifter;
00012
00013 class Client : public TopicProxy
00014 {
00015 private:
00016 ros::NodeHandle nh_;
00017 ros::ServiceServer request_message_service_;
00018 ros::ServiceServer add_publisher_service_;
00019
00020 struct SubscriptionInfo
00021 {
00022 std::string local_topic;
00023 std::string datatype;
00024 std::string md5sum;
00025 std::string message_definition;
00026
00027 ros::Publisher publisher;
00028 bool latch;
00029 ros::Timer timer;
00030 GetMessage::Request request;
00031 };
00032 typedef boost::shared_ptr<SubscriptionInfo> SubscriptionInfoPtr;
00033 std::map<std::string, SubscriptionInfoPtr> subscriptions_;
00034
00035 struct PublicationInfo
00036 {
00037 std::string remote_topic;
00038 ros::Subscriber subscriber;
00039 bool latch;
00040 bool compressed;
00041 };
00042 typedef boost::shared_ptr<PublicationInfo> PublicationInfoPtr;
00043 std::map<std::string, PublicationInfoPtr> publications_;
00044
00045 std::string topic_prefix_;
00046
00047 public:
00048 Client(const std::string& ns)
00049 : TopicProxy()
00050 , nh_(ns)
00051 {
00052 init();
00053 }
00054
00055 Client(const std::string& host, uint32_t port, const std::string& ns = std::string())
00056 : TopicProxy(host, port)
00057 , nh_(ns)
00058 {
00059 init();
00060 }
00061
00062 bool init() {
00063 if (connect()) {
00064 if (!getHost().empty()) {
00065 ROS_INFO("Connected to topic_proxy server at %s:%u", getHost().c_str(), getTCPPort());
00066 } else {
00067 ROS_INFO("Connected to topic_proxy server at local master %s", ros::master::getURI().c_str());
00068 }
00069 } else {
00070 if (!getHost().empty()) {
00071 ROS_WARN("Could not connect to topic_proxy server at %s:%u", getHost().c_str(), getTCPPort());
00072 } else {
00073 ROS_WARN("Could not connect to topic_proxy server at local master %s", ros::master::getURI().c_str());
00074 }
00075 return false;
00076 }
00077
00078 ros::param::get("~topic_prefix", topic_prefix_);
00079 request_message_service_ = nh_.advertiseService("request_message", &Client::handleRequestMessage, this);
00080 add_publisher_service_ = nh_.advertiseService("add_publisher", &Client::handleAddPublisher, this);
00081
00082 XmlRpc::XmlRpcValue subscriptions;
00083 ros::param::get("~subscriptions", subscriptions);
00084 if (subscriptions.getType() == XmlRpc::XmlRpcValue::TypeArray) {
00085 for(int i = 0; i < subscriptions.size(); ++i) {
00086 XmlRpc::XmlRpcValue p = subscriptions[i];
00087 std::string topic;
00088 if (p.getType() == XmlRpc::XmlRpcValue::TypeString) {
00089 topic = static_cast<std::string>(p);
00090 } else if (p.getType() == XmlRpc::XmlRpcValue::TypeStruct) {
00091 if (p.hasMember("topic")) topic = static_cast<std::string>(p["topic"]);
00092 }
00093 if (topic.empty()) continue;
00094
00095 SubscriptionInfoPtr subscription = getSubscription(topic);
00096 subscription->local_topic = topic;
00097 subscription->request.topic = topic;
00098 ros::Duration interval;
00099 if (p.getType() == XmlRpc::XmlRpcValue::TypeStruct) {
00100 if (p.hasMember("remote_topic")) subscription->request.topic = static_cast<std::string>(p["remote_topic"]);
00101 if (p.hasMember("timeout")) subscription->request.timeout = ros::Duration(static_cast<double>(p["timeout"]));
00102 if (p.hasMember("compressed")) subscription->request.compressed = static_cast<bool>(p["compressed"]);
00103 if (p.hasMember("interval")) interval = ros::Duration(static_cast<double>(p["interval"]));
00104 if (p.hasMember("latch")) subscription->latch = static_cast<bool>(p["latch"]);
00105 }
00106
00107 if (!interval.isZero()) {
00108 subscription->timer = nh_.createTimer(interval, boost::bind(&Client::timerCallback, this, subscription, _1));
00109 }
00110 }
00111 }
00112
00113 XmlRpc::XmlRpcValue publications;
00114 ros::param::get("~publications", publications);
00115 if (publications.getType() == XmlRpc::XmlRpcValue::TypeArray) {
00116 for(int i = 0; i < publications.size(); ++i) {
00117 XmlRpc::XmlRpcValue p = publications[i];
00118 std::string topic;
00119 if (p.getType() == XmlRpc::XmlRpcValue::TypeString) {
00120 topic = static_cast<std::string>(p);
00121 } else if (p.getType() == XmlRpc::XmlRpcValue::TypeStruct) {
00122 if (p.hasMember("topic")) topic = static_cast<std::string>(p["topic"]);
00123 }
00124 if (topic.empty()) continue;
00125
00126 AddPublisher::Request request;
00127 AddPublisher::Response response;
00128 request.topic = topic;
00129 if (p.getType() == XmlRpc::XmlRpcValue::TypeStruct) {
00130 if (p.hasMember("remote_topic")) request.remote_topic = static_cast<std::string>(p["remote_topic"]);
00131 if (p.hasMember("compressed")) request.compressed = static_cast<bool>(p["compressed"]);
00132 if (p.hasMember("latch")) request.latch = static_cast<bool>(p["latch"]);
00133 }
00134
00135 handleAddPublisher(request, response);
00136 }
00137 }
00138
00139 return true;
00140 }
00141
00142 ~Client()
00143 {
00144 clearSubscriptions();
00145 clearPublications();
00146 }
00147
00148 bool republish(GetMessage::Request& request, bool latch = false)
00149 {
00150 MessageInstanceConstPtr instance = send(request);
00151 if (!instance) {
00152 ROS_ERROR("GetMessage request for topic %s failed", request.topic.c_str());
00153 return false;
00154 }
00155
00156
00157 SubscriptionInfoPtr subscription = getSubscription(request.topic);
00158 if (!subscription->publisher
00159 && !instance->type.empty() && !instance->md5sum.empty()) {
00160 if (subscription->local_topic.empty()) subscription->local_topic = request.topic;
00161 std::string advertised_topic = nh_.resolveName(topic_prefix_ + subscription->local_topic);
00162 if (!getHost().empty()) {
00163 ROS_INFO("Advertising topic %s from host %s as %s", request.topic.c_str(), getHost().c_str(), advertised_topic.c_str());
00164 } else {
00165 ROS_INFO("Advertising topic %s as %s", request.topic.c_str(), advertised_topic.c_str());
00166 }
00167
00168 ros::AdvertiseOptions ops = ros::AdvertiseOptions(advertised_topic, 10, instance->md5sum, instance->type, instance->message_definition,
00169 boost::bind(&Client::connectCallback, this, subscription, _1), boost::bind(&Client::disconnectCallback, this, subscription, _1));
00170 ops.latch = latch;
00171 subscription->publisher = nh_.advertise(ops);
00172 }
00173
00174 if (instance->blob.empty()) {
00175 ROS_DEBUG("No message received on topic %s", request.topic.c_str());
00176 return false;
00177 }
00178
00179 subscription->publisher.publish(
00180 instance->blob.asMessage().morph(instance->md5sum, instance->type, instance->message_definition)
00181 );
00182 return true;
00183 }
00184
00185 protected:
00186 const SubscriptionInfoPtr& getSubscription(const std::string& topic)
00187 {
00188 if (subscriptions_.count(topic)) return subscriptions_.at(topic);
00189 SubscriptionInfoPtr subscription(new SubscriptionInfo());
00190 return subscriptions_.insert(std::pair<std::string, SubscriptionInfoPtr>(topic, subscription)).first->second;
00191 }
00192
00193 const PublicationInfoPtr& getPublication(const std::string& topic)
00194 {
00195 if (publications_.count(topic)) return publications_.at(topic);
00196 PublicationInfoPtr publication(new PublicationInfo());
00197 return publications_.insert(std::pair<std::string, PublicationInfoPtr>(topic, publication)).first->second;
00198 }
00199
00200 void publishCallback(const PublicationInfoPtr& publication, const blob::ShapeShifterConstPtr& message)
00201 {
00202 PublishMessage::Request request;
00203 request.message.topic = publication->remote_topic;
00204 request.message.type = message->getDataType();
00205 request.message.md5sum = message->getMD5Sum();
00206 request.message.message_definition = message->getMessageDefinition();
00207 request.message.blob = message->blob();
00208 request.message.blob.setCompressed(publication->compressed);
00209 request.latch = publication->latch;
00210
00211 if (!send(request)) {
00212 ROS_ERROR("PublishMessage request for topic %s failed", request.message.topic.c_str());
00213 }
00214 }
00215
00216 void timerCallback(const SubscriptionInfoPtr& subscription, const ros::TimerEvent& event)
00217 {
00218 republish(subscription->request, subscription->latch);
00219 }
00220
00221 bool handleRequestMessage(RequestMessage::Request& request, RequestMessage::Response& response)
00222 {
00223 SubscriptionInfoPtr subscription = getSubscription(request.topic);
00224 subscription->request.topic = request.topic;
00225 subscription->request.compressed = request.compressed;
00226 subscription->request.timeout = request.timeout;
00227 subscription->latch = request.latch;
00228
00229 if (request.interval > ros::Duration()) {
00230
00231 subscription->timer = nh_.createTimer(request.interval, boost::bind(&Client::timerCallback, this, subscription, _1));
00232 } else {
00233
00234 subscription->timer.stop();
00235 }
00236
00237
00238 if (request.interval.isZero()) {
00239 return republish(subscription->request);
00240 }
00241
00242 return true;
00243 }
00244
00245 bool handleAddPublisher(AddPublisher::Request& request, AddPublisher::Response& response)
00246 {
00247 PublicationInfoPtr publication = getPublication(request.topic);
00248
00249 if (!publication->subscriber) {
00250 std::string subscribed_topic = nh_.resolveName(topic_prefix_ + request.topic);
00251 if (!getHost().empty()) {
00252 ROS_INFO("Subscribing to topic %s for host %s as %s", request.topic.c_str(), getHost().c_str(), subscribed_topic.c_str());
00253 } else {
00254 ROS_INFO("Subscribing to topic %s as %s", request.topic.c_str(), subscribed_topic.c_str());
00255 }
00256
00257 publication->subscriber = nh_.subscribe<blob::ShapeShifter>(subscribed_topic, 10, boost::bind(&Client::publishCallback, this, publication, _1));
00258 publication->remote_topic = request.topic;
00259 if (!request.remote_topic.empty()) publication->remote_topic = request.remote_topic;
00260 publication->latch = request.latch;
00261 publication->compressed = request.compressed;
00262 }
00263
00264 return true;
00265 }
00266
00267 void clearSubscriptions()
00268 {
00269 for (std::map<std::string, SubscriptionInfoPtr>::iterator it = subscriptions_.begin(); it != subscriptions_.end(); ++it) {
00270 it->second->publisher.shutdown();
00271 }
00272 subscriptions_.clear();
00273 }
00274
00275 void clearPublications()
00276 {
00277 for (std::map<std::string, PublicationInfoPtr>::iterator it = publications_.begin(); it != publications_.end(); ++it) {
00278 it->second->subscriber.shutdown();
00279 }
00280 publications_.clear();
00281 }
00282
00283 protected:
00284 void connectCallback(const SubscriptionInfoPtr&, const ros::SingleSubscriberPublisher&)
00285 {
00286 }
00287
00288 void disconnectCallback(const SubscriptionInfoPtr&, const ros::SingleSubscriberPublisher&)
00289 {
00290 }
00291 };
00292
00293 }
00294
00295 int main(int argc, char **argv)
00296 {
00297 ros::init(argc, argv, "topic_proxy_client");
00298
00299 std::string host;
00300 int port = 0;
00301
00302
00303 ros::param::get("~host", host);
00304 ros::param::get("~port", port);
00305
00306
00307 if (argc > 1) host = argv[1];
00308 if (argc > 2) port = boost::lexical_cast<uint32_t>(std::string(argv[2]));
00309
00310 topic_proxy::Client client(host, port);
00311 ros::spin();
00312 return 0;
00313 }