39 #include <boost/bind.hpp> 45 : writing_message_(false)
46 , header_written_(false)
68 if (!header.
getValue(
"topic", topic))
70 std::string msg(
"Header from subscriber did not have the required element: topic");
79 std::string client_callerid;
80 header.
getValue(
"callerid", client_callerid);
84 std::string msg = std::string(
"received a connection for a nonexistent topic [") +
85 topic + std::string(
"] from [" +
connection_->getTransport()->getTransportInfo() +
"] [" + client_callerid +
"].");
93 std::string error_msg;
94 if (!pt->validateHeader(header, error_msg))
109 m[
"type"] = pt->getDataType();
110 m[
"md5sum"] = pt->getMD5Sum();
111 m[
"message_definition"] = pt->getMessageDefinition();
113 m[
"latching"] = pt->isLatching() ?
"1" :
"0";
117 pt->addSubscriberLink(shared_from_this());
133 parent->removeSubscriberLink(shared_from_this());
191 max_queue = parent->getMaxQueue();
196 if (max_queue > 0 && (
int)
outbox_.size() >= max_queue)
200 ROS_DEBUG(
"Outgoing queue full for topic [%s]. " 201 "Discarding oldest message",
230 return connection_->getTransport()->getTransportInfo();
ConnectionPtr connection_
void startMessageWrite(bool immediate_write)
TransportSubscriberLink()
ROSCPP_DECL const std::string & getName()
Returns the name of the current node.
unsigned int connection_id_
bool handleHeader(const Header &header)
std::string destination_caller_id_
virtual std::string getTransportType()
boost::signals2::connection dropped_conn_
uint64_t message_data_sent_
std::map< std::string, std::string > M_string
#define ROS_DEBUG_NAMED(name,...)
virtual void enqueueMessage(const SerializedMessage &m, bool ser, bool nocopy)
Queue up a message for publication. Throws out old messages if we've reached our Publication's max qu...
#define ROSCPP_LOG_DEBUG(...)
bool initialize(const ConnectionPtr &connection)
void onHeaderWritten(const ConnectionPtr &conn)
static const ConnectionManagerPtr & instance()
virtual ~TransportSubscriberLink()
virtual std::string getTransportInfo()
boost::shared_array< uint8_t > buf
#define ROSCPP_CONN_LOG_DEBUG(...)
boost::mutex outbox_mutex_
boost::weak_ptr< Publication > PublicationWPtr
void onConnectionDropped(const ConnectionPtr &conn)
std::queue< SerializedMessage > outbox_
static const TopicManagerPtr & instance()
void onMessageWritten(const ConnectionPtr &conn)