41 #include "ros/common.h" 62 #include <boost/make_shared.hpp> 73 , nonconst_callbacks_(0)
75 , shutting_down_(false)
76 , transport_hints_(transport_hints)
110 conn_data[cidx][0] = (*c)->getConnectionID();
113 conn_data[cidx][3] = (int)s.
drops_;
114 conn_data[cidx][4] = 0;
117 stats[1] = conn_data;
131 curr_info[0] = (int)(*c)->getConnectionID();
132 curr_info[1] = (*c)->getPublisherXMLRPCURI();
134 curr_info[3] = (*c)->getTransportType();
135 curr_info[4] =
name_;
137 curr_info[6] = (*c)->getTransportInfo();
138 info[info.
size()] = curr_info;
145 uint32_t num_connected_publishers = 0;
151 if ((*c)->getCallerID().size() > 0)
153 num_connected_publishers++;
156 return num_connected_publishers;
181 V_PublisherLink::iterator it = localsubscribers.begin();
182 V_PublisherLink::iterator end = localsubscribers.end();
183 for (;it != end; ++it)
201 pub_link->setPublisher(sub_link);
202 sub_link->setSubscriber(pub_link);
205 pub->addSubscriberLink(sub_link);
208 bool urisEqual(
const std::string& uri1,
const std::string& uri2)
210 std::string host1, host2;
211 uint32_t port1 = 0, port2 = 0;
214 return port1 == port2 && host1 == host2;
229 std::stringstream ss;
231 for (V_string::const_iterator up_i = new_pubs.begin();
232 up_i != new_pubs.end(); ++up_i)
237 ss <<
" already have these connections: ";
243 ss << (*spc)->getPublisherXMLRPCURI() <<
", ";
250 for (; it != end; ++it)
252 ss << (*it)->getRemoteURI() <<
", ";
270 for (V_string::const_iterator up_i = new_pubs.begin();
271 !found && up_i != new_pubs.end(); ++up_i)
273 if (
urisEqual((*spc)->getPublisherXMLRPCURI(), *up_i))
282 subtractions.push_back(*spc);
286 for (V_string::const_iterator up_i = new_pubs.begin(); up_i != new_pubs.end(); ++up_i)
292 if (
urisEqual(*up_i, (*spc)->getPublisherXMLRPCURI()))
304 for (; it != end; ++it)
306 if (
urisEqual(*up_i, (*it)->getRemoteURI()))
316 additions.push_back(*up_i);
321 for (V_PublisherLink::iterator i = subtractions.begin(); i != subtractions.end(); ++i)
327 link->getCallerID().c_str(),
name_.c_str(), link->getPublisherXMLRPCURI().c_str());
336 for (V_string::iterator i = additions.begin();
337 i != additions.end(); ++i)
360 if (transports.empty())
365 for (V_string::const_iterator it = transports.begin();
366 it != transports.end();
373 if (!max_datagram_size)
374 max_datagram_size = udp_transport->getMaxDatagramSize();
375 udp_transport->createIncoming(0,
false);
376 udpros_array[0] =
"UDPROS";
388 udpros_array[3] = udp_transport->getServerPort();
389 udpros_array[4] = max_datagram_size;
391 protos_array[protos++] = udpros_array;
393 else if (*it ==
"TCP")
395 tcpros_array[0] = std::string(
"TCPROS");
396 protos_array[protos++] = tcpros_array;
400 ROS_WARN(
"Unsupported transport type hinted: %s, skipping", it->c_str());
405 params[2] = protos_array;
406 std::string peer_host;
410 ROS_ERROR(
"Bad xml-rpc URI: [%s]", xmlrpc_uri.c_str());
422 peer_host.c_str(), peer_port,
name_.c_str());
426 udp_transport->close();
432 ROSCPP_LOG_DEBUG(
"Began asynchronous xmlrpc connection to [%s:%d]", peer_host.c_str(), peer_port);
436 PendingConnectionPtr conn(boost::make_shared<PendingConnection>(c, udp_transport, shared_from_this(), xmlrpc_uri));
471 std::string peer_host = conn->getClient()->getHost();
472 uint32_t peer_port = conn->getClient()->getPort();
473 std::stringstream ss;
474 ss <<
"http://" << peer_host <<
":" << peer_port <<
"/";
475 std::string xmlrpc_uri = ss.str();
476 udp_transport = conn->getUDPTransport();
482 peer_host.c_str(), peer_port,
name_.c_str());
487 if (proto.
size() == 0)
489 ROSCPP_LOG_DEBUG(
"Couldn't agree on any common protocols with [%s] for topic [%s]", xmlrpc_uri.c_str(),
name_.c_str());
494 if (proto.
getType() != XmlRpcValue::TypeArray)
496 ROSCPP_LOG_DEBUG(
"Available protocol info returned from %s is not a list.", xmlrpc_uri.c_str());
500 if (proto[0].getType() != XmlRpcValue::TypeString)
502 ROSCPP_LOG_DEBUG(
"Available protocol info list doesn't have a string as its first element.");
507 std::string proto_name = proto[0];
508 if (proto_name ==
"TCPROS")
510 if (proto.
size() != 3 ||
511 proto[1].
getType() != XmlRpcValue::TypeString ||
512 proto[2].
getType() != XmlRpcValue::TypeInt)
515 "parameters aren't string,int");
518 std::string pub_host = proto[1];
519 int pub_port = proto[2];
523 if (transport->connect(pub_host, pub_port))
529 pub_link->initialize(connection);
543 else if (proto_name ==
"UDPROS")
545 if (proto.
size() != 6 ||
546 proto[1].
getType() != XmlRpcValue::TypeString ||
547 proto[2].
getType() != XmlRpcValue::TypeInt ||
548 proto[3].
getType() != XmlRpcValue::TypeInt ||
549 proto[4].
getType() != XmlRpcValue::TypeInt ||
550 proto[5].
getType() != XmlRpcValue::TypeBase64)
553 "parameters aren't string,int,int,int,base64");
557 std::string pub_host = proto[1];
558 int pub_port = proto[2];
559 int conn_id = proto[3];
560 int max_datagram_size = proto[4];
561 std::vector<char> header_bytes = proto[5];
563 memcpy(buffer.get(), &header_bytes[0], header_bytes.size());
566 if (!h.parse(buffer, header_bytes.size(), err))
568 ROSCPP_LOG_DEBUG(
"Unable to parse UDPROS connection header: %s", err.c_str());
572 ROSCPP_LOG_DEBUG(
"Connecting via udpros to topic [%s] at host [%s:%d] connection id [%08x] max_datagram_size [%d]",
name_.c_str(), pub_host.c_str(), pub_port, conn_id, max_datagram_size);
574 std::string error_msg;
575 if (h.getValue(
"error", error_msg))
577 ROSCPP_LOG_DEBUG(
"Received error message in header for connection to [%s]: [%s]", xmlrpc_uri.c_str(), error_msg.c_str());
583 if (pub_link->setHeader(h))
586 connection->initialize(udp_transport,
false, NULL);
587 connection->setHeader(h);
588 pub_link->initialize(connection);
595 ROSCPP_LOG_DEBUG(
"Connected to publisher of topic [%s] at [%s:%d]",
name_.c_str(), pub_host.c_str(), pub_port);
599 ROSCPP_LOG_DEBUG(
"Failed to connect to publisher of topic [%s] at [%s:%d]",
name_.c_str(), pub_host.c_str(), pub_port);
606 ROSCPP_LOG_DEBUG(
"Publisher offered unsupported transport [%s]", proto_name.c_str());
623 for (V_CallbackInfo::iterator cb =
callbacks_.begin();
630 const std::type_info* ti = &info->helper_->getTypeInfo();
638 for (; des_it != des_end; ++des_it)
640 if (*des_it->first == *ti)
642 deserializer = des_it->second;
649 deserializer = boost::make_shared<MessageDeserializer>(info->helper_, m, connection_header);
653 bool was_full =
false;
654 bool nonconst_need_copy =
false;
657 nonconst_need_copy =
true;
660 info->subscription_queue_->push(info->helper_, deserializer, info->has_tracked_object_, info->tracked_object_, nonconst_need_copy, receipt_time, &was_full);
668 info->callback_queue_->addCallback(info->subscription_queue_, (uint64_t)info.get());
674 statistics_.
callback(connection_header,
name_, link->getCallerID(), m, link->getStats().bytes_received_, receipt_time, drops > 0);
677 if (link->isLatched())
702 if (
md5sum_ ==
"*" && md5sum !=
"*")
709 if (md5sum !=
"*" && md5sum != this->
md5sum())
718 info->helper_ = helper;
719 info->callback_queue_ = queue;
720 info->subscription_queue_ = boost::make_shared<SubscriptionQueue>(
name_, queue_size, allow_concurrent_callbacks);
721 info->tracked_object_ = tracked_object;
722 info->has_tracked_object_ =
false;
725 info->has_tracked_object_ =
true;
728 if (!helper->isConst())
743 for (; it != end;++it)
746 if (link->isLatched())
751 const LatchInfo& latch_info = des_it->second;
754 bool was_full =
false;
755 info->subscription_queue_->push(info->helper_, des, info->has_tracked_object_, info->tracked_object_,
true, latch_info.
receipt_time, &was_full);
758 info->callback_queue_->addCallback(info->subscription_queue_, (uint64_t)info.get());
774 for (V_CallbackInfo::iterator it =
callbacks_.begin();
777 if ((*it)->helper_ == helper)
782 if (!helper->isConst())
794 info->subscription_queue_->clear();
795 info->callback_queue_->removeByID((uint64_t)info.get());
824 if (pub_link->isLatched())
833 for (V_CallbackInfo::iterator cb =
callbacks_.begin();
837 if (info->helper_->getTypeInfo() == ti)
void callback(const boost::shared_ptr< M_string > &connection_header, const std::string &topic, const std::string &callerid, const SerializedMessage &m, const uint64_t &bytes_sent, const ros::Time &received_time, bool dropped)
S_PendingConnection pending_connections_
boost::mutex publisher_links_mutex_
void init(const SubscriptionCallbackHelperPtr &helper)
TransportHints & reliable()
Specifies a reliable transport. Currently this means TCP.
TransportHints transport_hints_
std::vector< PublisherLinkPtr > V_PublisherLink
ROSCPP_DECL bool splitURI(const std::string &uri, std::string &host, uint32_t &port)
void pendingConnectionDone(const PendingConnectionPtr &pending_conn, XmlRpc::XmlRpcValue &result)
ROSCPP_DECL const std::string & getName()
Returns the name of the current node.
M_PublisherLinkToLatchInfo latched_messages_
bool addCallback(const SubscriptionCallbackHelperPtr &helper, const std::string &md5sum, CallbackQueueInterface *queue, int32_t queue_size, const VoidConstPtr &tracked_object, bool allow_concurrent_callbacks)
XmlRpc::XmlRpcValue getStats()
Abstract interface for a queue used to handle all callbacks within roscpp.
V_PublisherLink publisher_links_
boost::mutex pending_connections_mutex_
uint32_t nonconst_callbacks_
boost::function< bool(const ConnectionPtr &, const Header &)> HeaderReceivedFunc
void getInfo(XmlRpc::XmlRpcValue &info)
Provides a way of specifying network transport hints to ros::NodeHandle::subscribe() and someday ros:...
void headerReceived(const PublisherLinkPtr &link, const Header &h)
V_TypeAndDeserializer cached_deserializers_
void removeCallback(const SubscriptionCallbackHelperPtr &helper)
bool negotiateConnection(const std::string &xmlrpc_uri)
Negotiates a connection with a publisher.
std::map< std::string, std::string > M_string
void dropAllConnections()
#define ROSCPP_LOG_DEBUG(...)
Type const & getType() const
void getPublishTypes(bool &ser, bool &nocopy, const std::type_info &ti)
std::vector< std::string > V_string
ROSCPP_DECL const std::string & getHost()
static const ConnectionManagerPtr & instance()
SerializedMessage message
uint32_t handleMessage(const SerializedMessage &m, bool ser, bool nocopy, const boost::shared_ptr< M_string > &connection_header, const PublisherLinkPtr &link)
Called to notify that a new message has arrived from a publisher. Schedules the callback for invokati...
const std::type_info * type_info
StatisticsLogger statistics_
boost::mutex callbacks_mutex_
int getMaxDatagramSize()
Returns the maximum datagram size specified on this TransportHints, or 0 if no size was specified...
const std::string md5sum()
void removePublisherLink(const PublisherLinkPtr &pub_link)
Removes a subscriber from our list.
void addPublisherLink(const PublisherLinkPtr &link)
boost::mutex md5sum_mutex_
Subscription(const std::string &name, const std::string &md5sum, const std::string &datatype, const TransportHints &transport_hints)
void shutdown()
Terminate all our PublisherLinks and join our callback thread if it exists.
static const PollManagerPtr & instance()
bool pubUpdate(const std::vector< std::string > &pubs)
Handle a publisher update list received from the master. Creates/drops PublisherLinks based on the li...
const V_string & getTransports()
Returns a vector of transports, ordered by preference.
#define ROSCPP_CONN_LOG_DEBUG(...)
static const XMLRPCManagerPtr & instance()
void addLocalConnection(const PublicationPtr &pub)
bool urisEqual(const std::string &uri1, const std::string &uri2)
bool executeNonBlock(const char *method, XmlRpcValue const ¶ms)
const std::string datatype()
boost::mutex shutdown_mutex_
void closeTransport(const TransportUDPPtr &trans)
const std::string & getName() const
V_CallbackInfo callbacks_
uint32_t getNumPublishers()
void drop()
Terminate all our PublisherLinks.
boost::shared_ptr< std::map< std::string, std::string > > connection_header
uint64_t messages_received_