5 #include <topic_proxy/RequestMessage.h> 6 #include <topic_proxy/AddPublisher.h> 55 Client(
const std::string& host, uint32_t port,
const std::string& ns = std::string())
85 for(
int i = 0; i < subscriptions.
size(); ++i) {
89 topic =
static_cast<std::string
>(p);
91 if (p.
hasMember(
"topic")) topic = static_cast<std::string>(p[
"topic"]);
93 if (topic.empty())
continue;
96 subscription->local_topic = topic;
97 subscription->request.topic = topic;
100 if (p.
hasMember(
"remote_topic")) subscription->request.topic = static_cast<std::string>(p[
"remote_topic"]);
101 if (p.
hasMember(
"timeout")) subscription->request.timeout =
ros::Duration(static_cast<double>(p[
"timeout"]));
102 if (p.
hasMember(
"compressed")) subscription->request.compressed = static_cast<bool>(p[
"compressed"]);
104 if (p.
hasMember(
"latch")) subscription->latch = static_cast<bool>(p[
"latch"]);
116 for(
int i = 0; i < publications.
size(); ++i) {
120 topic =
static_cast<std::string
>(p);
122 if (p.
hasMember(
"topic")) topic = static_cast<std::string>(p[
"topic"]);
124 if (topic.empty())
continue;
127 AddPublisher::Response response;
128 request.topic = topic;
130 if (p.
hasMember(
"remote_topic")) request.remote_topic = static_cast<std::string>(p[
"remote_topic"]);
131 if (p.
hasMember(
"compressed")) request.compressed = static_cast<bool>(p[
"compressed"]);
132 if (p.
hasMember(
"latch")) request.latch = static_cast<bool>(p[
"latch"]);
155 MessageInstanceConstPtr instance =
send(request);
157 ROS_ERROR(
"GetMessage request for topic %s failed", request.topic.c_str());
163 if (!subscription->publisher
164 && !instance->type.empty() && !instance->md5sum.empty()) {
165 if (subscription->local_topic.empty()) subscription->local_topic = topic;
166 std::string advertised_topic = nh_.
resolveName(topic_prefix_ + subscription->local_topic);
168 ROS_INFO(
"Advertising topic %s from host %s as %s", request.topic.c_str(),
getHost().c_str(), advertised_topic.c_str());
170 ROS_INFO(
"Advertising topic %s as %s", request.topic.c_str(), advertised_topic.c_str());
176 subscription->publisher = nh_.
advertise(ops);
179 if (instance->blob.empty()) {
180 ROS_DEBUG(
"No message received on topic %s", request.topic.c_str());
184 subscription->publisher.publish(
185 instance->blob.asMessage().morph(instance->md5sum, instance->type, instance->message_definition)
193 if (subscriptions_.count(topic))
return subscriptions_.at(topic);
195 return subscriptions_.insert(std::pair<std::string, SubscriptionInfoPtr>(topic, subscription)).first->second;
200 if (publications_.count(topic))
return publications_.at(topic);
202 return publications_.insert(std::pair<std::string, PublicationInfoPtr>(topic, publication)).first->second;
207 PublishMessage::Request
request;
208 request.message.topic = publication->remote_topic;
209 request.message.type = message->getDataType();
210 request.message.md5sum = message->getMD5Sum();
211 request.message.message_definition = message->getMessageDefinition();
212 request.message.blob = message->blob();
213 request.message.blob.setCompressed(publication->compressed);
214 request.latch = publication->latch;
216 if (!
send(request)) {
217 ROS_ERROR(
"PublishMessage request for topic %s failed", request.message.topic.c_str());
223 republish(subscription->request, subscription->local_topic, subscription->latch);
229 subscription->request.topic = request.remote_topic;
230 subscription->request.compressed = request.compressed;
231 subscription->request.timeout = request.timeout;
232 subscription->latch = request.latch;
233 subscription->local_topic = request.topic;
240 subscription->timer.stop();
244 if (request.interval.isZero()) {
245 return republish(subscription->request, request.topic, request.latch);
255 if (!publication->subscriber) {
256 std::string subscribed_topic = nh_.
resolveName(topic_prefix_ + request.topic);
258 ROS_INFO(
"Subscribing to topic %s for host %s as %s", request.topic.c_str(),
getHost().c_str(), subscribed_topic.c_str());
260 ROS_INFO(
"Subscribing to topic %s as %s", request.topic.c_str(), subscribed_topic.c_str());
264 publication->remote_topic = request.topic;
265 if (!request.remote_topic.empty()) publication->remote_topic = request.remote_topic;
266 publication->latch = request.latch;
267 publication->compressed = request.compressed;
275 for (std::map<std::string, SubscriptionInfoPtr>::iterator it = subscriptions_.begin(); it != subscriptions_.end(); ++it) {
276 it->second->publisher.shutdown();
278 subscriptions_.clear();
283 for (std::map<std::string, PublicationInfoPtr>::iterator it = publications_.begin(); it != publications_.end(); ++it) {
284 it->second->subscriber.shutdown();
286 publications_.clear();
301 int main(
int argc,
char **argv)
303 ros::init(argc, argv,
"topic_proxy_client");
313 if (argc > 1) host = argv[1];
314 if (argc > 2) port = boost::lexical_cast<uint32_t>(std::string(argv[2]));
GetMessage::Request request
ROSCPP_DECL const std::string & getURI()
boost::shared_ptr< SubscriptionInfo > SubscriptionInfoPtr
void connectCallback(const SubscriptionInfoPtr &, const ros::SingleSubscriberPublisher &)
std::string topic_prefix_
void publishCallback(const PublicationInfoPtr &publication, const blob::ShapeShifterConstPtr &message)
Subscriber subscribe(const std::string &topic, uint32_t queue_size, void(T::*fp)(M), T *obj, const TransportHints &transport_hints=TransportHints())
std::string resolveName(const std::string &name, bool remap=true) const
ROSCPP_DECL void init(int &argc, char **argv, const std::string &name, uint32_t options=0)
void clearSubscriptions()
int main(int argc, char **argv)
ServiceServer advertiseService(const std::string &service, bool(T::*srv_func)(MReq &, MRes &), T *obj)
Type const & getType() const
std::map< std::string, PublicationInfoPtr > publications_
ROSCPP_DECL void spin(Spinner &spinner)
std::map< std::string, SubscriptionInfoPtr > subscriptions_
void disconnectCallback(const SubscriptionInfoPtr &, const ros::SingleSubscriberPublisher &)
Client(const std::string &ns)
ROSCPP_DECL bool get(const std::string &key, std::string &s)
Client(const std::string &host, uint32_t port, const std::string &ns=std::string())
Timer createTimer(Rate r, Handler h, Obj o, bool oneshot=false, bool autostart=true) const
uint16_t getTCPPort() const
Publisher advertise(const std::string &topic, uint32_t queue_size, bool latch=false)
bool handleRequestMessage(RequestMessage::Request &request, RequestMessage::Response &response)
bool handleAddPublisher(AddPublisher::Request &request, AddPublisher::Response &response)
const SubscriptionInfoPtr & getSubscription(const std::string &topic)
bool republish(GetMessage::Request &request, bool latch=false)
const std::string & getHost() const
ros::ServiceServer request_message_service_
bool hasMember(const std::string &name) const
MessageInstanceConstPtr send(GetMessage::Request &)
ros::Subscriber subscriber
bool republish(GetMessage::Request &request, const std::string &topic, bool latch=false)
std::string message_definition
const PublicationInfoPtr & getPublication(const std::string &topic)
ros::ServiceServer add_publisher_service_
void timerCallback(const SubscriptionInfoPtr &subscription, const ros::TimerEvent &event)
boost::shared_ptr< PublicationInfo > PublicationInfoPtr