70 if (subscriptions_.count(topic))
return subscriptions_.at(topic);
72 return subscriptions_.insert(std::pair<std::string, SubscriptionInfoPtr>(topic, subscription)).first->second;
77 if (publications_.count(topic))
return publications_.at(topic);
79 return publications_.insert(std::pair<std::string, PublicationInfoPtr>(topic, publication)).first->second;
86 if (!subscription->subscriber) {
87 ROS_INFO(
"Subscribing to topic %s", request.topic.c_str());
89 subscription->subscriber = nh_.
subscribe(ops);
90 subscription->topic = subscription->subscriber.getTopic();
97 subscription->callback_queue.clear();
100 subscription->callback_queue.callOne(timeout);
104 instance = subscription->event.getConstMessage();
107 ROS_ERROR(
"Catched exception while handling a request for topic %s: %s", request.topic.c_str(), e.what());
114 response.message.topic = subscription->topic;
115 response.message.md5sum = instance->getMD5Sum();
116 response.message.type = instance->getDataType();
117 response.message.message_definition = instance->getMessageDefinition();
119 response.message.blob = instance->blob();
120 response.message.blob.setCompressed(request.compressed);
122 subscription->last_message = instance;
126 response.message.topic = subscription->topic;
127 if (subscription->last_message) {
128 response.message.md5sum = subscription->last_message->getMD5Sum();
129 response.message.type = subscription->last_message->getDataType();
130 response.message.message_definition = subscription->last_message->getMessageDefinition();
139 subscription->event =
event;
144 PublicationInfoPtr publication =
getPublication(request.message.topic);
146 if (!publication->publisher) {
147 ROS_INFO(
"Publishing topic %s (%s)", request.message.topic.c_str(), request.message.type.c_str());
148 ros::AdvertiseOptions ops(request.message.topic, 10, request.message.md5sum, request.message.type, request.message.message_definition,
150 ops.latch = request.latch;
151 publication->publisher = nh_.
advertise(ops);
152 publication->topic = publication->publisher.getTopic();
155 publication->publisher.publish(
156 request.message.blob.asMessage().morph(request.message.md5sum, request.message.type, request.message.message_definition)
162 for (std::map<std::string, SubscriptionInfoPtr>::iterator it = subscriptions_.begin(); it != subscriptions_.end(); ++it) {
163 it->second->subscriber.shutdown();
165 subscriptions_.clear();
169 for (std::map<std::string, PublicationInfoPtr>::iterator it = publications_.begin(); it != publications_.end(); ++it) {
170 it->second->publisher.shutdown();
172 publications_.clear();
185 int main(
int argc,
char **argv)
189 std::vector<char *> new_argv;
190 new_argv.reserve(argc + 1);
191 new_argv.assign(&argv[0], &argv[argc]);
192 new_argv.push_back(const_cast<char *>(tcpros_server_port_argv.c_str()));
193 argc = new_argv.size();
194 argv = new_argv.data();
196 ros::init(argc, argv,
"topic_proxy_server");
ros::CallbackQueue callback_queue
ros::ServiceServer get_message_server_
boost::shared_ptr< void const > VoidConstPtr
Subscriber subscribe(const std::string &topic, uint32_t queue_size, void(T::*fp)(M), T *obj, const TransportHints &transport_hints=TransportHints())
int main(int argc, char **argv)
ShapeShifter::ConstPtr last_message
void disconnectCallback(const PublicationInfoPtr &, const ros::SingleSubscriberPublisher &)
ros::MessageEvent< const ShapeShifter > MessageEvent
ROSCPP_DECL void init(int &argc, char **argv, const std::string &name, uint32_t options=0)
boost::shared_ptr< SubscriptionInfo > SubscriptionInfoPtr
void clearSubscriptions()
ServiceServer advertiseService(const std::string &service, bool(T::*srv_func)(MReq &, MRes &), T *obj)
bool handlePublishMessage(PublishMessage::Request &request, PublishMessage::Response &response)
uint16_t getTCPPort() const
ROSCPP_DECL void spin(Spinner &spinner)
const uint32_t g_default_port
void subscriberCallback(const SubscriptionInfoPtr &subscription, const MessageEvent &event)
ros::Subscriber subscriber
std::map< std::string, PublicationInfoPtr > publications_
ROSCPP_DECL const std::string & getHost()
static const ConnectionManagerPtr & instance()
const std::string g_get_message_service
const std::string g_publish_message_service
ros::ServiceServer publish_message_server_
Publisher advertise(const std::string &topic, uint32_t queue_size, bool latch=false)
const std::string & getHost() const
const PublicationInfoPtr & getPublication(const std::string &topic)
boost::shared_ptr< PublicationInfo > PublicationInfoPtr
std::map< std::string, SubscriptionInfoPtr > subscriptions_
bool handleGetMessage(GetMessage::Request &request, GetMessage::Response &response)
void connectCallback(const PublicationInfoPtr &, const ros::SingleSubscriberPublisher &)
const SubscriptionInfoPtr & getSubscription(const std::string &topic)