41 #include "ros/common.h"
62 #include <boost/make_shared.hpp>
73 , nonconst_callbacks_(0)
75 , shutting_down_(false)
76 , transport_hints_(transport_hints)
110 conn_data[cidx][0] = (*c)->getConnectionID();
111 conn_data[cidx][1] = (int)
s.bytes_received_;
112 conn_data[cidx][2] = (
int)
s.messages_received_;
113 conn_data[cidx][3] = (int)
s.drops_;
114 conn_data[cidx][4] = 0;
117 stats[1] = conn_data;
131 curr_info[0] = (int)(*c)->getConnectionID();
132 curr_info[1] = (*c)->getPublisherXMLRPCURI();
134 curr_info[3] = (*c)->getTransportType();
135 curr_info[4] =
name_;
137 curr_info[6] = (*c)->getTransportInfo();
138 info[info.
size()] = curr_info;
145 uint32_t num_connected_publishers = 0;
151 if ((*c)->getCallerID().size() > 0)
153 num_connected_publishers++;
156 return num_connected_publishers;
181 V_PublisherLink::iterator it = localsubscribers.begin();
182 V_PublisherLink::iterator end = localsubscribers.end();
183 for (;it != end; ++it)
201 pub_link->setPublisher(sub_link);
202 sub_link->setSubscriber(pub_link);
205 pub->addSubscriberLink(sub_link);
208 bool urisEqual(
const std::string& uri1,
const std::string& uri2)
210 std::string host1, host2;
211 uint32_t port1 = 0, port2 = 0;
214 return port1 == port2 && host1 == host2;
229 std::stringstream ss;
231 for (V_string::const_iterator up_i = new_pubs.begin();
232 up_i != new_pubs.end(); ++up_i)
237 ss <<
" already have these connections: ";
243 ss << (*spc)->getPublisherXMLRPCURI() <<
", ";
250 for (; it != end; ++it)
252 ss << (*it)->getRemoteURI() <<
", ";
270 for (V_string::const_iterator up_i = new_pubs.begin();
271 !found && up_i != new_pubs.end(); ++up_i)
273 if (
urisEqual((*spc)->getPublisherXMLRPCURI(), *up_i))
282 subtractions.push_back(*spc);
286 for (V_string::const_iterator up_i = new_pubs.begin(); up_i != new_pubs.end(); ++up_i)
292 if (
urisEqual(*up_i, (*spc)->getPublisherXMLRPCURI()))
304 for (; it != end; ++it)
306 if (
urisEqual(*up_i, (*it)->getRemoteURI()))
316 additions.push_back(*up_i);
321 for (V_PublisherLink::iterator i = subtractions.begin(); i != subtractions.end(); ++i)
327 link->getCallerID().c_str(),
name_.c_str(), link->getPublisherXMLRPCURI().c_str());
336 for (V_string::iterator i = additions.begin();
337 i != additions.end(); ++i)
360 if (transports.empty())
365 for (V_string::const_iterator it = transports.begin();
366 it != transports.end();
373 if (!max_datagram_size)
374 max_datagram_size = udp_transport->getMaxDatagramSize();
375 udp_transport->createIncoming(0,
false);
376 udpros_array[0] =
"UDPROS";
388 udpros_array[3] = udp_transport->getServerPort();
389 udpros_array[4] = max_datagram_size;
391 protos_array[protos++] = udpros_array;
393 else if (*it ==
"TCP")
395 tcpros_array[0] = std::string(
"TCPROS");
396 protos_array[protos++] = tcpros_array;
400 ROS_WARN(
"Unsupported transport type hinted: %s, skipping", it->c_str());
405 params[2] = protos_array;
406 std::string peer_host;
410 ROS_ERROR(
"Bad xml-rpc URI: [%s]", xmlrpc_uri.c_str());
422 peer_host.c_str(), peer_port,
name_.c_str());
426 udp_transport->close();
432 ROSCPP_LOG_DEBUG(
"Began asynchronous xmlrpc connection to [%s:%d]", peer_host.c_str(), peer_port);
436 PendingConnectionPtr conn(boost::make_shared<PendingConnection>(c, udp_transport, shared_from_this(), xmlrpc_uri));
471 std::string peer_host = conn->getClient()->getHost();
472 uint32_t peer_port = conn->getClient()->getPort();
473 std::stringstream ss;
474 ss <<
"http://" << peer_host <<
":" << peer_port <<
"/";
475 std::string xmlrpc_uri = ss.str();
476 udp_transport = conn->getUDPTransport();
482 peer_host.c_str(), peer_port,
name_.c_str());
487 if (proto.
size() == 0)
489 ROSCPP_LOG_DEBUG(
"Couldn't agree on any common protocols with [%s] for topic [%s]", xmlrpc_uri.c_str(),
name_.c_str());
494 if (proto.
getType() != XmlRpcValue::TypeArray)
496 ROSCPP_LOG_DEBUG(
"Available protocol info returned from %s is not a list.", xmlrpc_uri.c_str());
500 if (proto[0].getType() != XmlRpcValue::TypeString)
502 ROSCPP_LOG_DEBUG(
"Available protocol info list doesn't have a string as its first element.");
507 std::string proto_name = proto[0];
508 if (proto_name ==
"TCPROS")
510 if (proto.
size() != 3 ||
511 proto[1].
getType() != XmlRpcValue::TypeString ||
512 proto[2].
getType() != XmlRpcValue::TypeInt)
515 "parameters aren't string,int");
518 std::string pub_host = proto[1];
519 int pub_port = proto[2];
523 if (transport->connect(pub_host, pub_port))
529 pub_link->initialize(connection);
543 else if (proto_name ==
"UDPROS")
545 if (proto.
size() != 6 ||
546 proto[1].
getType() != XmlRpcValue::TypeString ||
547 proto[2].
getType() != XmlRpcValue::TypeInt ||
548 proto[3].
getType() != XmlRpcValue::TypeInt ||
549 proto[4].
getType() != XmlRpcValue::TypeInt ||
550 proto[5].
getType() != XmlRpcValue::TypeBase64)
553 "parameters aren't string,int,int,int,base64");
557 std::string pub_host = proto[1];
558 int pub_port = proto[2];
559 int conn_id = proto[3];
560 int max_datagram_size = proto[4];
561 std::vector<char> header_bytes = proto[5];
563 memcpy(buffer.get(), &header_bytes[0], header_bytes.size());
566 if (!h.
parse(buffer, header_bytes.size(), err))
568 ROSCPP_LOG_DEBUG(
"Unable to parse UDPROS connection header: %s", err.c_str());
572 ROSCPP_LOG_DEBUG(
"Connecting via udpros to topic [%s] at host [%s:%d] connection id [%08x] max_datagram_size [%d]",
name_.c_str(), pub_host.c_str(), pub_port, conn_id, max_datagram_size);
574 std::string error_msg;
577 ROSCPP_LOG_DEBUG(
"Received error message in header for connection to [%s]: [%s]", xmlrpc_uri.c_str(), error_msg.c_str());
583 if (pub_link->setHeader(h))
586 connection->initialize(udp_transport,
false, NULL);
587 connection->setHeader(h);
588 pub_link->initialize(connection);
595 ROSCPP_LOG_DEBUG(
"Connected to publisher of topic [%s] at [%s:%d]",
name_.c_str(), pub_host.c_str(), pub_port);
599 ROSCPP_LOG_DEBUG(
"Failed to connect to publisher of topic [%s] at [%s:%d]",
name_.c_str(), pub_host.c_str(), pub_port);
606 ROSCPP_LOG_DEBUG(
"Publisher offered unsupported transport [%s]", proto_name.c_str());
623 for (V_CallbackInfo::iterator cb =
callbacks_.begin();
630 const std::type_info* ti = &info->helper_->getTypeInfo();
638 for (; des_it != des_end; ++des_it)
640 if (*des_it->first == *ti)
642 deserializer = des_it->second;
649 deserializer = boost::make_shared<MessageDeserializer>(info->helper_, m, connection_header);
653 bool was_full =
false;
654 bool nonconst_need_copy =
false;
657 nonconst_need_copy =
true;
660 info->subscription_queue_->push(info->helper_, deserializer, info->has_tracked_object_, info->tracked_object_, nonconst_need_copy, receipt_time, &was_full);
668 info->callback_queue_->addCallback(info->subscription_queue_, (uint64_t)info.get());
674 statistics_.
callback(connection_header,
name_, link->getCallerID(), m, link->getStats().bytes_received_, receipt_time, drops > 0, link->getConnectionID());
677 if (link->isLatched())
718 info->helper_ = helper;
719 info->callback_queue_ = queue;
720 info->subscription_queue_ = boost::make_shared<SubscriptionQueue>(
name_, queue_size, allow_concurrent_callbacks);
721 info->tracked_object_ = tracked_object;
722 info->has_tracked_object_ =
false;
725 info->has_tracked_object_ =
true;
728 if (!helper->isConst())
743 for (; it != end;++it)
746 if (link->isLatched())
751 const LatchInfo& latch_info = des_it->second;
754 bool was_full =
false;
755 info->subscription_queue_->push(info->helper_, des, info->has_tracked_object_, info->tracked_object_,
true, latch_info.
receipt_time, &was_full);
758 info->callback_queue_->addCallback(info->subscription_queue_, (uint64_t)info.get());
774 for (V_CallbackInfo::iterator it =
callbacks_.begin();
777 if ((*it)->helper_ == helper)
782 if (!helper->isConst())
794 info->subscription_queue_->clear();
795 info->callback_queue_->removeByID((uint64_t)info.get());
824 if (pub_link->isLatched())
833 for (V_CallbackInfo::iterator cb =
callbacks_.begin();
837 if (info->helper_->getTypeInfo() == ti)