34 #include <std_msgs/Header.h>
77 const std::string &datatype,
78 const std::string &_md5sum,
79 const std::string& message_definition,
86 message_definition_(message_definition),
87 max_queue_(max_queue),
91 has_header_(has_header),
92 intraprocess_subscriber_count_(0)
108 if (callbacks->connect_ && callbacks->callback_queue_)
113 for (; it != end; ++it)
116 CallbackInterfacePtr cb(boost::make_shared<PeerConnDisconnCallback>(callbacks->connect_, sub_link, callbacks->has_tracked_object_, callbacks->tracked_object_));
117 callbacks->callback_queue_->addCallback(cb, (uint64_t)callbacks.get());
122 if (callbacks->push_latched_message_)
136 if (cb->callback_queue_)
138 cb->callback_queue_->removeByID((uint64_t)cb.get());
145 if (callbacks->push_latched_message_)
149 for (; it != end; ++it)
151 if ((*it)->push_latched_message_)
197 ser::deserialize(istream,
header);
200 ser::serialize(ostream,
header);
207 sub_link->enqueueMessage(m,
true,
false);
225 if (sub_link->isIntraprocess())
248 if (sub_link->isIntraprocess())
281 conn_data[cidx][0] = (*c)->getConnectionID();
287 conn_data[cidx][1] = (int)
s.bytes_sent_;
288 conn_data[cidx][2] = (
int)
s.message_data_sent_;
289 conn_data[cidx][3] = (int)
s.messages_sent_;
290 conn_data[cidx][4] = 0;
293 stats[1] = conn_data;
307 curr_info[0] = (int)(*c)->getConnectionID();
308 curr_info[1] = (*c)->getDestinationCallerID();
310 curr_info[3] = (*c)->getTransportType();
311 curr_info[4] =
name_;
313 curr_info[6] = (*c)->getTransportInfo();
314 info[info.
size()] = curr_info;
330 for (V_SubscriberLink::iterator i = local_publishers.begin();
331 i != local_publishers.end(); ++i)
343 for (; it != end; ++it)
346 if (cbs->push_latched_message_)
348 cbs->push_latched_message_(sub_link);
350 if (cbs->connect_ && cbs->callback_queue_)
352 CallbackInterfacePtr cb(boost::make_shared<PeerConnDisconnCallback>(cbs->connect_, sub_link, cbs->has_tracked_object_, cbs->tracked_object_));
353 cbs->callback_queue_->addCallback(cb, (uint64_t)cbs.get());
364 for (; it != end; ++it)
367 if (cbs->disconnect_ && cbs->callback_queue_)
369 CallbackInterfacePtr cb(boost::make_shared<PeerConnDisconnCallback>(cbs->disconnect_, sub_link, cbs->has_tracked_object_, cbs->tracked_object_));
370 cbs->callback_queue_->addCallback(cb, (uint64_t)cbs.get());
384 uint32_t old_seq =
seq_;
407 for (; it != end; ++it)
412 sub->getPublishTypes(
s, n, ti);
414 nocopy = nocopy || n;
436 for (; it != end; ++it)
439 if (sub->isIntraprocess())
441 sub->enqueueMessage(m,
false,
true);
475 V_SerializedMessage::iterator it = queue.begin();
476 V_SerializedMessage::iterator end = queue.end();
477 for (; it != end; ++it)
485 std::string
md5sum, topic, client_callerid;
487 || !
header.getValue(
"topic", topic)
488 || !
header.getValue(
"callerid", client_callerid))
490 std::string msg(
"Header from subscriber did not have the required elements: md5sum, topic, callerid");
504 std::string msg = std::string(
"received a tcpros connection for a nonexistent topic [") +
505 topic + std::string(
"] from [" + client_callerid +
"].");
519 std::string msg = std::string(
"Client [") + client_callerid + std::string(
"] wants topic ") + topic +
520 std::string(
" to have datatype/md5sum [") +
datatype +
"/" +
md5sum +
522 std::string(
"]. Dropping connection.");