34 #include <std_msgs/Header.h> 77 const std::string &datatype,
78 const std::string &_md5sum,
79 const std::string& message_definition,
86 message_definition_(message_definition),
87 max_queue_(max_queue),
91 has_header_(has_header),
92 intraprocess_subscriber_count_(0)
108 if (callbacks->connect_ && callbacks->callback_queue_)
113 for (; it != end; ++it)
116 CallbackInterfacePtr cb(boost::make_shared<PeerConnDisconnCallback>(callbacks->connect_, sub_link, callbacks->has_tracked_object_, callbacks->tracked_object_));
117 callbacks->callback_queue_->addCallback(cb, (uint64_t)callbacks.get());
130 if (cb->callback_queue_)
132 cb->callback_queue_->removeByID((uint64_t)cb.get());
175 ser::deserialize(istream, header);
178 ser::serialize(ostream, header);
185 sub_link->enqueueMessage(m,
true,
false);
208 if (sub_link->isIntraprocess())
236 if (sub_link->isIntraprocess())
269 conn_data[cidx][0] = (*c)->getConnectionID();
278 conn_data[cidx][4] = 0;
281 stats[1] = conn_data;
295 curr_info[0] = (int)(*c)->getConnectionID();
296 curr_info[1] = (*c)->getDestinationCallerID();
298 curr_info[3] = (*c)->getTransportType();
299 curr_info[4] =
name_;
301 curr_info[6] = (*c)->getTransportInfo();
302 info[info.
size()] = curr_info;
318 for (V_SubscriberLink::iterator i = local_publishers.begin();
319 i != local_publishers.end(); ++i)
331 for (; it != end; ++it)
334 if (cbs->connect_ && cbs->callback_queue_)
336 CallbackInterfacePtr cb(boost::make_shared<PeerConnDisconnCallback>(cbs->connect_, sub_link, cbs->has_tracked_object_, cbs->tracked_object_));
337 cbs->callback_queue_->addCallback(cb, (uint64_t)cbs.get());
348 for (; it != end; ++it)
351 if (cbs->disconnect_ && cbs->callback_queue_)
353 CallbackInterfacePtr cb(boost::make_shared<PeerConnDisconnCallback>(cbs->disconnect_, sub_link, cbs->has_tracked_object_, cbs->tracked_object_));
354 cbs->callback_queue_->addCallback(cb, (uint64_t)cbs.get());
368 uint32_t old_seq =
seq_;
385 for (; it != end; ++it)
390 sub->getPublishTypes(s, n, ti);
391 serialize = serialize || s;
392 nocopy = nocopy || n;
394 if (serialize && nocopy)
414 for (; it != end; ++it)
417 if (sub->isIntraprocess())
419 sub->enqueueMessage(m,
false,
true);
453 V_SerializedMessage::iterator it = queue.begin();
454 V_SerializedMessage::iterator end = queue.end();
455 for (; it != end; ++it)
463 std::string
md5sum, topic, client_callerid;
464 if (!header.
getValue(
"md5sum", md5sum)
466 || !header.
getValue(
"callerid", client_callerid))
468 std::string msg(
"Header from subscriber did not have the required elements: md5sum, topic, callerid");
482 std::string msg = std::string(
"received a tcpros connection for a nonexistent topic [") +
483 topic + std::string(
"] from [" + client_callerid +
"].");
492 (md5sum != std::string(
"*") &&
getMD5Sum() != std::string(
"*")))
497 std::string msg = std::string(
"Client [") + client_callerid + std::string(
"] wants topic ") + topic +
498 std::string(
" to have datatype/md5sum [") + datatype +
"/" + md5sum +
500 std::string(
"]. Dropping connection.");
Publication(const std::string &name, const std::string &datatype, const std::string &_md5sum, const std::string &message_definition, size_t max_queue, bool latch, bool has_header)
CallResult
Possible results for the call() method.
uint32_t incrementSequence()
void processPublishQueue()
boost::function< void(const SingleSubscriberPublisher &)> SubscriberStatusCallback
SerializedMessage last_message_
XmlRpc::XmlRpcValue getStats()
Get the accumulated stats for this publication.
void dropAllConnections()
uint32_t getNumSubscribers()
Returns the number of subscribers this publication has.
bool enqueueMessage(const SerializedMessage &m)
queues an outgoing message into each of the publishers, so that it gets sent to every subscriber ...
const std::string & getDataType() const
Returns the data type of the message published by this publication.
boost::mutex callbacks_mutex_
PeerConnDisconnCallback(const SubscriberStatusCallback &callback, const SubscriberLinkPtr &sub_link, bool use_tracked_object, const VoidConstWPtr &tracked_object)
Abstract interface for items which can be added to a CallbackQueueInterface.
void addSubscriberLink(const SubscriberLinkPtr &sub_link)
Adds a publisher to our list.
std_msgs::Header * header(M &m)
void getPublishTypes(bool &serialize, bool &nocopy, const std::type_info &ti)
V_SerializedMessage publish_queue_
Allows publication of a message to a single subscriber. Only available inside subscriber connection c...
void publish(SerializedMessage &m)
V_SubscriberLink subscriber_links_
void removeSubscriberLink(const SubscriberLinkPtr &sub_link)
Removes a publisher from our list (deleting it if it's the last reference)
SubscriberLinkPtr sub_link_
bool hasSubscribers()
Returns whether or not this publication has any subscribers.
uint64_t message_data_sent_
void addCallbacks(const SubscriberCallbacksPtr &callbacks)
uint32_t intraprocess_subscriber_count_
SubscriberStatusCallback callback_
bool isDropped()
Returns if this publication is valid or not.
boost::weak_ptr< void const > VoidConstWPtr
boost::mutex publish_queue_mutex_
bool validateHeader(const Header &h, std::string &error_msg)
void removeCallbacks(const SubscriberCallbacksPtr &callbacks)
void peerConnect(const SubscriberLinkPtr &sub_link)
Called when a new peer has connected. Calls the connection callback.
VoidConstWPtr tracked_object_
std::vector< SerializedMessage > V_SerializedMessage
boost::mutex subscriber_links_mutex_
boost::shared_array< uint8_t > buf
std::vector< SubscriberLinkPtr > V_SubscriberLink
void getInfo(XmlRpc::XmlRpcValue &info)
Get the accumulated info for this publication.
const std::string & getMD5Sum() const
Returns the md5sum of the message published by this publication.
void peerDisconnect(const SubscriberLinkPtr &sub_link)
Called when a peer has disconnected. Calls the disconnection callback.
boost::shared_ptr< void const > message
virtual CallResult call()
Call this callback.
void drop()
Drop this publication. Disconnects all publishers.