41 #include "ros/common.h" 62 #include <boost/make_shared.hpp> 73 , nonconst_callbacks_(0)
75 , shutting_down_(false)
76 , transport_hints_(transport_hints)
110 conn_data[cidx][0] = (*c)->getConnectionID();
113 conn_data[cidx][3] = (int)s.
drops_;
114 conn_data[cidx][4] = 0;
117 stats[1] = conn_data;
131 curr_info[0] = (int)(*c)->getConnectionID();
132 curr_info[1] = (*c)->getPublisherXMLRPCURI();
134 curr_info[3] = (*c)->getTransportType();
135 curr_info[4] =
name_;
137 curr_info[6] = (*c)->getTransportInfo();
138 info[info.
size()] = curr_info;
170 V_PublisherLink::iterator it = localsubscribers.begin();
171 V_PublisherLink::iterator end = localsubscribers.end();
172 for (;it != end; ++it)
190 pub_link->setPublisher(sub_link);
191 sub_link->setSubscriber(pub_link);
194 pub->addSubscriberLink(sub_link);
197 bool urisEqual(
const std::string& uri1,
const std::string& uri2)
199 std::string host1, host2;
200 uint32_t port1 = 0, port2 = 0;
203 return port1 == port2 && host1 == host2;
218 std::stringstream ss;
220 for (V_string::const_iterator up_i = new_pubs.begin();
221 up_i != new_pubs.end(); ++up_i)
226 ss <<
" already have these connections: ";
232 ss << (*spc)->getPublisherXMLRPCURI() <<
", ";
239 for (; it != end; ++it)
241 ss << (*it)->getRemoteURI() <<
", ";
259 for (V_string::const_iterator up_i = new_pubs.begin();
260 !found && up_i != new_pubs.end(); ++up_i)
262 if (
urisEqual((*spc)->getPublisherXMLRPCURI(), *up_i))
271 subtractions.push_back(*spc);
275 for (V_string::const_iterator up_i = new_pubs.begin(); up_i != new_pubs.end(); ++up_i)
281 if (
urisEqual(*up_i, (*spc)->getPublisherXMLRPCURI()))
293 for (; it != end; ++it)
295 if (
urisEqual(*up_i, (*it)->getRemoteURI()))
305 additions.push_back(*up_i);
310 for (V_PublisherLink::iterator i = subtractions.begin(); i != subtractions.end(); ++i)
316 link->getCallerID().c_str(),
name_.c_str(), link->getPublisherXMLRPCURI().c_str());
325 for (V_string::iterator i = additions.begin();
326 i != additions.end(); ++i)
349 if (transports.empty())
354 for (V_string::const_iterator it = transports.begin();
355 it != transports.end();
362 if (!max_datagram_size)
363 max_datagram_size = udp_transport->getMaxDatagramSize();
364 udp_transport->createIncoming(0,
false);
365 udpros_array[0] =
"UDPROS";
377 udpros_array[3] = udp_transport->getServerPort();
378 udpros_array[4] = max_datagram_size;
380 protos_array[protos++] = udpros_array;
382 else if (*it ==
"TCP")
384 tcpros_array[0] = std::string(
"TCPROS");
385 protos_array[protos++] = tcpros_array;
389 ROS_WARN(
"Unsupported transport type hinted: %s, skipping", it->c_str());
394 params[2] = protos_array;
395 std::string peer_host;
399 ROS_ERROR(
"Bad xml-rpc URI: [%s]", xmlrpc_uri.c_str());
411 peer_host.c_str(), peer_port,
name_.c_str());
415 udp_transport->close();
421 ROSCPP_LOG_DEBUG(
"Began asynchronous xmlrpc connection to [%s:%d]", peer_host.c_str(), peer_port);
425 PendingConnectionPtr conn(boost::make_shared<PendingConnection>(c, udp_transport, shared_from_this(), xmlrpc_uri));
460 std::string peer_host = conn->getClient()->getHost();
461 uint32_t peer_port = conn->getClient()->getPort();
462 std::stringstream ss;
463 ss <<
"http://" << peer_host <<
":" << peer_port <<
"/";
464 std::string xmlrpc_uri = ss.str();
465 udp_transport = conn->getUDPTransport();
471 peer_host.c_str(), peer_port,
name_.c_str());
476 if (proto.
size() == 0)
478 ROSCPP_LOG_DEBUG(
"Couldn't agree on any common protocols with [%s] for topic [%s]", xmlrpc_uri.c_str(),
name_.c_str());
483 if (proto.
getType() != XmlRpcValue::TypeArray)
485 ROSCPP_LOG_DEBUG(
"Available protocol info returned from %s is not a list.", xmlrpc_uri.c_str());
489 if (proto[0].getType() != XmlRpcValue::TypeString)
491 ROSCPP_LOG_DEBUG(
"Available protocol info list doesn't have a string as its first element.");
496 std::string proto_name = proto[0];
497 if (proto_name ==
"TCPROS")
499 if (proto.
size() != 3 ||
500 proto[1].
getType() != XmlRpcValue::TypeString ||
501 proto[2].
getType() != XmlRpcValue::TypeInt)
504 "parameters aren't string,int");
507 std::string pub_host = proto[1];
508 int pub_port = proto[2];
512 if (transport->connect(pub_host, pub_port))
518 pub_link->initialize(connection);
532 else if (proto_name ==
"UDPROS")
534 if (proto.
size() != 6 ||
535 proto[1].
getType() != XmlRpcValue::TypeString ||
536 proto[2].
getType() != XmlRpcValue::TypeInt ||
537 proto[3].
getType() != XmlRpcValue::TypeInt ||
538 proto[4].
getType() != XmlRpcValue::TypeInt ||
539 proto[5].
getType() != XmlRpcValue::TypeBase64)
542 "parameters aren't string,int,int,int,base64");
546 std::string pub_host = proto[1];
547 int pub_port = proto[2];
548 int conn_id = proto[3];
549 int max_datagram_size = proto[4];
550 std::vector<char> header_bytes = proto[5];
552 memcpy(buffer.get(), &header_bytes[0], header_bytes.size());
555 if (!h.parse(buffer, header_bytes.size(), err))
557 ROSCPP_LOG_DEBUG(
"Unable to parse UDPROS connection header: %s", err.c_str());
561 ROSCPP_LOG_DEBUG(
"Connecting via udpros to topic [%s] at host [%s:%d] connection id [%08x] max_datagram_size [%d]",
name_.c_str(), pub_host.c_str(), pub_port, conn_id, max_datagram_size);
563 std::string error_msg;
564 if (h.getValue(
"error", error_msg))
566 ROSCPP_LOG_DEBUG(
"Received error message in header for connection to [%s]: [%s]", xmlrpc_uri.c_str(), error_msg.c_str());
572 if (pub_link->setHeader(h))
575 connection->initialize(udp_transport,
false, NULL);
576 connection->setHeader(h);
577 pub_link->initialize(connection);
584 ROSCPP_LOG_DEBUG(
"Connected to publisher of topic [%s] at [%s:%d]",
name_.c_str(), pub_host.c_str(), pub_port);
588 ROSCPP_LOG_DEBUG(
"Failed to connect to publisher of topic [%s] at [%s:%d]",
name_.c_str(), pub_host.c_str(), pub_port);
595 ROSCPP_LOG_DEBUG(
"Publisher offered unsupported transport [%s]", proto_name.c_str());
612 for (V_CallbackInfo::iterator cb =
callbacks_.begin();
619 const std::type_info* ti = &info->helper_->getTypeInfo();
627 for (; des_it != des_end; ++des_it)
629 if (*des_it->first == *ti)
631 deserializer = des_it->second;
638 deserializer = boost::make_shared<MessageDeserializer>(info->helper_, m, connection_header);
642 bool was_full =
false;
643 bool nonconst_need_copy =
false;
646 nonconst_need_copy =
true;
649 info->subscription_queue_->push(info->helper_, deserializer, info->has_tracked_object_, info->tracked_object_, nonconst_need_copy, receipt_time, &was_full);
657 info->callback_queue_->addCallback(info->subscription_queue_, (uint64_t)info.get());
663 statistics_.
callback(connection_header,
name_, link->getCallerID(), m, link->getStats().bytes_received_, receipt_time, drops > 0);
666 if (link->isLatched())
691 if (
md5sum_ ==
"*" && md5sum !=
"*")
698 if (md5sum !=
"*" && md5sum != this->
md5sum())
707 info->helper_ = helper;
708 info->callback_queue_ = queue;
709 info->subscription_queue_ = boost::make_shared<SubscriptionQueue>(
name_, queue_size, allow_concurrent_callbacks);
710 info->tracked_object_ = tracked_object;
711 info->has_tracked_object_ =
false;
714 info->has_tracked_object_ =
true;
717 if (!helper->isConst())
732 for (; it != end;++it)
735 if (link->isLatched())
740 const LatchInfo& latch_info = des_it->second;
743 bool was_full =
false;
744 info->subscription_queue_->push(info->helper_, des, info->has_tracked_object_, info->tracked_object_,
true, latch_info.
receipt_time, &was_full);
747 info->callback_queue_->addCallback(info->subscription_queue_, (uint64_t)info.get());
763 for (V_CallbackInfo::iterator it =
callbacks_.begin();
766 if ((*it)->helper_ == helper)
771 if (!helper->isConst())
783 info->subscription_queue_->clear();
784 info->callback_queue_->removeByID((uint64_t)info.get());
813 if (pub_link->isLatched())
822 for (V_CallbackInfo::iterator cb =
callbacks_.begin();
826 if (info->helper_->getTypeInfo() == ti)
void callback(const boost::shared_ptr< M_string > &connection_header, const std::string &topic, const std::string &callerid, const SerializedMessage &m, const uint64_t &bytes_sent, const ros::Time &received_time, bool dropped)
S_PendingConnection pending_connections_
boost::mutex publisher_links_mutex_
void init(const SubscriptionCallbackHelperPtr &helper)
TransportHints & reliable()
Specifies a reliable transport. Currently this means TCP.
TransportHints transport_hints_
std::vector< PublisherLinkPtr > V_PublisherLink
ROSCPP_DECL bool splitURI(const std::string &uri, std::string &host, uint32_t &port)
void pendingConnectionDone(const PendingConnectionPtr &pending_conn, XmlRpc::XmlRpcValue &result)
ROSCPP_DECL const std::string & getName()
Returns the name of the current node.
M_PublisherLinkToLatchInfo latched_messages_
bool addCallback(const SubscriptionCallbackHelperPtr &helper, const std::string &md5sum, CallbackQueueInterface *queue, int32_t queue_size, const VoidConstPtr &tracked_object, bool allow_concurrent_callbacks)
const std::string & getName() const
XmlRpc::XmlRpcValue getStats()
Abstract interface for a queue used to handle all callbacks within roscpp.
V_PublisherLink publisher_links_
boost::mutex pending_connections_mutex_
Type const & getType() const
uint32_t nonconst_callbacks_
boost::function< bool(const ConnectionPtr &, const Header &)> HeaderReceivedFunc
void getInfo(XmlRpc::XmlRpcValue &info)
Provides a way of specifying network transport hints to ros::NodeHandle::subscribe() and someday ros:...
void headerReceived(const PublisherLinkPtr &link, const Header &h)
V_TypeAndDeserializer cached_deserializers_
void removeCallback(const SubscriptionCallbackHelperPtr &helper)
bool negotiateConnection(const std::string &xmlrpc_uri)
Negotiates a connection with a publisher.
std::map< std::string, std::string > M_string
void dropAllConnections()
#define ROSCPP_LOG_DEBUG(...)
void getPublishTypes(bool &ser, bool &nocopy, const std::type_info &ti)
std::vector< std::string > V_string
ROSCPP_DECL const std::string & getHost()
static const ConnectionManagerPtr & instance()
SerializedMessage message
uint32_t handleMessage(const SerializedMessage &m, bool ser, bool nocopy, const boost::shared_ptr< M_string > &connection_header, const PublisherLinkPtr &link)
Called to notify that a new message has arrived from a publisher. Schedules the callback for invokati...
const std::type_info * type_info
StatisticsLogger statistics_
boost::mutex callbacks_mutex_
int getMaxDatagramSize()
Returns the maximum datagram size specified on this TransportHints, or 0 if no size was specified...
const std::string md5sum()
void removePublisherLink(const PublisherLinkPtr &pub_link)
Removes a subscriber from our list.
void addPublisherLink(const PublisherLinkPtr &link)
boost::mutex md5sum_mutex_
Subscription(const std::string &name, const std::string &md5sum, const std::string &datatype, const TransportHints &transport_hints)
void shutdown()
Terminate all our PublisherLinks and join our callback thread if it exists.
static const PollManagerPtr & instance()
bool pubUpdate(const std::vector< std::string > &pubs)
Handle a publisher update list received from the master. Creates/drops PublisherLinks based on the li...
const V_string & getTransports()
Returns a vector of transports, ordered by preference.
#define ROSCPP_CONN_LOG_DEBUG(...)
static const XMLRPCManagerPtr & instance()
void addLocalConnection(const PublicationPtr &pub)
bool urisEqual(const std::string &uri1, const std::string &uri2)
bool executeNonBlock(const char *method, XmlRpcValue const ¶ms)
const std::string datatype()
boost::mutex shutdown_mutex_
void closeTransport(const TransportUDPPtr &trans)
V_CallbackInfo callbacks_
uint32_t getNumPublishers()
void drop()
Terminate all our PublisherLinks.
boost::shared_ptr< std::map< std::string, std::string > > connection_header
uint64_t messages_received_