Go to the documentation of this file.
13 #include <unordered_map>
20 #include <std_msgs/Bool.h>
21 #include <std_msgs/Int32.h>
22 #include <std_msgs/String.h>
45 const auto nonePriority =
params->getParam(
"none_priority", 0);
46 const auto defaultSubscriberConnectDelay =
params->getParam(
"subscriber_connect_delay",
ros::WallDuration(0.1));
55 CRAS_ERROR(
"Parameter ~topics is empty, priority mux will not do anything!");
59 std::list<std::pair<std::string, XmlRpc::XmlRpcValue>> topicItems;
63 for (
size_t i = 0; i < topicParams.
size(); ++i)
64 topicItems.emplace_back(
"[" +
cras::to_string(i) +
"]", topicParams[i]);
67 for (
const auto& item : topicParams)
68 topicItems.emplace_back(
"/" + item.first, item.second);
71 CRAS_ERROR(
"Parameter ~topics has to be either a list or a dict. Priority mux will not do anything!");
75 std::unordered_map<std::string, std::string> disableTopics;
77 for (
const auto& item : topicItems)
79 const auto& key = item.first;
80 const auto& xmlConfig = item.second;
83 CRAS_ERROR(
"Item %s of ~topics has to be a dict, but %s was given.",
89 auto xmlParamAdapter = std::make_shared<cras::XmlRpcValueGetParamAdapter>(xmlConfig, itemNamespace);
95 config.
inTopic = xmlParams.
getParam<std::string>(
"topic", cras::nullopt);
99 CRAS_ERROR(
"Item %s of ~topics has to contain key 'topic'. Skipping the item.", key.c_str());
112 CRAS_ERROR(
"Item %s of ~topics has to contain key 'priority'. Skipping the item.", key.c_str());
121 CRAS_ERROR(
"Item %s of ~topics has to contain key 'timeout'. Skipping the item.", key.c_str());
125 const auto disableTopic = xmlParams.
getParam(
"disable_topic", std::string());
126 if (!disableTopic.empty())
128 disableTopics[config.
inTopic] = disableTopic;
130 const auto beforeDisableMessageBag = xmlParams.
getParam(
"before_disable_message", std::string());
131 if (!beforeDisableMessageBag.empty())
137 if (view.
size() == 0)
139 CRAS_ERROR(
"Bag file %s does not contain any messages.", beforeDisableMessageBag.c_str());
145 auto shifter = boost::make_shared<cras::ShapeShifter>();
149 shifter->morph(
msg.getMD5Sum(),
msg.getDataType(),
msg.getMessageDefinition(),
msg.isLatching() ?
"1" :
"");
155 const auto frameId = xmlParams.
getParam(
"before_disable_message_frame_id", std::string());
186 lockParams =
params->getParam(
"locks", lockParams);
187 std::list<std::pair<std::string, XmlRpc::XmlRpcValue>> lockItems;
191 for (
size_t i = 0; i < lockParams.
size(); ++i)
195 for (
const auto& item : lockParams)
196 lockItems.emplace_back(
"/" + item.first, item.second);
199 CRAS_ERROR(
"Parameter ~locks has to be either a list or a dict. No locks will be set.");
203 if (lockItems.empty())
206 for (
const auto& item : lockItems)
208 const auto& key = item.first;
209 const auto& xmlConfig = item.second;
212 CRAS_ERROR(
"Item %s of ~locks has to be a dict, but %s was given.",
218 auto xmlParamAdapter = std::make_shared<cras::XmlRpcValueGetParamAdapter>(xmlConfig, itemNamespace);
224 config.
topic = xmlParams.
getParam<std::string>(
"topic", cras::nullopt);
228 CRAS_ERROR(
"Item %s of ~locks has to contain key 'topic'. Skipping the item.", key.c_str());
239 CRAS_ERROR(
"Item %s of ~locks has to contain key 'priority'. Skipping the item.", key.c_str());
248 CRAS_ERROR(
"Item %s of ~locks has to contain key 'timeout'. Skipping the item.", key.c_str());
260 const auto& topicConfig = config.second;
261 this->
outTopics.insert(topicConfig.outTopic);
263 this->
outTopicConfigs[topicConfig.outTopic].subscriberConnectDelay = defaultSubscriberConnectDelay;
267 topicConfig.inTopic, topicConfig.queueSize,
cb,
nullptr, transportHints);
271 std::list<std::pair<std::string, XmlRpc::XmlRpcValue>> outTopicItems;
272 if (
params->hasParam(
"out_topics"))
275 switch (outTopicParams.getType())
278 for (
size_t i = 0; i < outTopicParams.size(); ++i)
279 outTopicItems.emplace_back(
"[" +
cras::to_string(i) +
"]", outTopicParams[i]);
282 for (
const auto& item : outTopicParams)
283 outTopicItems.emplace_back(
"/" + item.first, item.second);
286 CRAS_ERROR(
"Parameter ~out_topics has to be either a list or a dict. It will be ignored!");
290 for (
const auto& item : outTopicItems)
292 const auto& key = item.first;
293 const auto& xmlConfig = item.second;
296 CRAS_ERROR(
"Item %s of ~out_topics has to be a dict, but %s was given.",
302 auto xmlParamAdapter = std::make_shared<cras::XmlRpcValueGetParamAdapter>(xmlConfig, itemNamespace);
308 topic = xmlParams.
getParam<std::string>(
"topic", cras::nullopt);
311 CRAS_ERROR(
"Item %s of ~out_topics has to contain key 'topic'. Skipping the item.", key.c_str());
321 CRAS_ERROR(
"Item %s of ~out_topics has to contain key 'topic'. Skipping the item.", key.c_str());
327 config.queueSize = xmlParams.
getParam(
"queue_size", config.queueSize);
328 config.subscriberConnectDelay = xmlParams.
getParam(
"subscriber_connect_delay", config.subscriberConnectDelay);
329 config.numSubscribersToWait = xmlParams.
getParam(
"num_subscribers_to_wait", config.numSubscribersToWait);
330 if (xmlParams.
hasParam(
"force_latch"))
331 config.forceLatch = xmlParams.
getParam<
bool>(
"force_latch", cras::nullopt);
334 for (
const auto& outTopic : this->
outTopics)
341 this->
mux = std::make_unique<PriorityMux>(this->topicConfigs, this->
lockConfigs,
343 noneTopic, nonePriority);
346 "active_priority", this->
queueSize,
true);
351 for (
const auto& inTopicAndDisableTopic : disableTopics)
353 const auto& inTopic = inTopicAndDisableTopic.first;
354 const auto& disableTopic = inTopicAndDisableTopic.second;
358 disableTopic, this->
queueSize,
cb,
nullptr, transportHints);
364 const auto& lockConfig = config.second;
367 lockConfig.topic, this->
queueSize,
cb,
nullptr, transportHints);
381 CRAS_DEBUG(
"Received message on topic %s with priority %i. Current priority %i.",
382 inTopic.c_str(), topicConfig.priority, this->mux->getActivePriority());
388 auto& pub = this->
publishers[topicConfig.outTopic];
391 const auto& outTopicConfig = this->
outTopicConfigs[topicConfig.outTopic];
396 latch =
event.getConnectionHeader()[
"latching"] ==
"1";
397 if (outTopicConfig.forceLatch.has_value())
398 latch = *outTopicConfig.forceLatch;
401 pub =
event.getConstMessage()->advertise(
402 this->
getNodeHandle(), topicConfig.outTopic, outTopicConfig.queueSize, latch);
403 CRAS_DEBUG(
"Created publisher %s with type %s.", topicConfig.outTopic.c_str(),
410 while (
ros::ok() && pub.getNumSubscribers() < outTopicConfig.numSubscribersToWait)
416 topicConfig.outTopic.c_str(), pub.getNumSubscribers(),
417 outTopicConfig.numSubscribersToWait);
426 CRAS_DEBUG(
"Publishing message on topic %s.", inTopic.c_str());
431 CRAS_DEBUG(
"Discarding message on topic %s.", inTopic.c_str());
449 disable =
event.getConstMessage()->instantiate<std_msgs::Bool>()->
data;
457 const auto& itEvent = it->second;
458 auto message = itEvent.getConstMessage();
459 auto nonconstWillCopy = itEvent.nonConstWillCopy();
467 auto nonconstMessage = itEvent.getMessage();
469 message = nonconstMessage;
470 nonconstWillCopy =
false;
476 beforeEvent.
init(message, itEvent.getConnectionHeaderPtr(), event.
getReceiptTime(), nonconstWillCopy,
477 itEvent.getMessageFactory());
480 this->
cb(inTopic, beforeEvent);
491 const auto newPriority = this->
mux->getActivePriority();
494 CRAS_INFO(
"Priority %i is now active.", newPriority);
496 msg.data = newPriority;
501 const auto& newTopics = this->
mux->getLastSelectedTopics();
503 for (
const auto& outTopic : this->
outTopics)
505 const auto newIt = newTopics.find(outTopic);
506 const auto oldIt = oldTopics.find(outTopic);
507 if (newIt == newTopics.end() || oldIt == oldTopics.end() || newIt->second != oldIt->second)
509 CRAS_INFO(
"Source topic '%s' is now selected for output topic '%s'.", newIt->second.c_str(), outTopic.c_str());
510 std_msgs::String
msg;
511 msg.data = newIt->second;
527 if (this->
timers.find(name) == this->timers.end())
534 this->
timers[name].setPeriod(timeout);
542 for (
auto& timer : this->
timers)
544 this->timers.clear();
::std::string name
The human-readable name of the config.
ros::Time getReceiptTime() const
Configuration of a lock topic for the mux.
int priority
The priority for the topic. Usually is a positive number.
ros::NodeHandle & getNodeHandle() const
bool hasHeader(const ::topic_tools::ShapeShifter &msg)
Tell whether the given message has header field.
Tools for more convenient working with ShapeShifter objects.
::cras::BoundParamHelperPtr privateParams(const ::std::string &ns="") const
void stripLeading(::std::string &s, const char &c=' ')
void publishChanges()
Check what changed from the last publishChanges() call and if there are changes, publish them (active...
::cras::LogHelperPtr getLogger() const
::std::string topic
The topic associated with the lock.
::std::string inTopic
The input topic.
::ros::Publisher activePriorityPub
Publisher for active priority.
const boost::shared_ptr< ConstMessage > & getConstMessage() const
Priority-based muxer nodelet for topics.
ros::NodeHandle & getPrivateNodeHandle() const
void publish(const boost::shared_ptr< M > &message) const
Publisher advertise(AdvertiseOptions &ops)
size_t queueSize
Queue size of all publishers and subscribers.
TransportHints & tcpNoDelay(bool nodelay=true)
virtual void cb(const ::std::string &inTopic, const ::ros::MessageEvent<::topic_tools::ShapeShifter const > &event)
Callback method triggered when a message is received on the input topic.
::cras::optional< int > lastActivePriority
The active priority after the last publishChanges() call.
Priority-based muxer nodelet for topics.
::std::unordered_map<::std::string, ::std::string > lastSelectedTopics
The selected output topics after the last publishChanges() call.
virtual void lockCb(const ::std::string &topic, const ::ros::MessageEvent<::std_msgs::Bool const > &event)
Callback method triggered when a lock message is received.
std::string resolveName(const std::string &name, bool remap=true) const
int priority
The priority for the lock. Usually is a positive number.
#define PLUGINLIB_EXPORT_CLASS(class_type, base_class_type)
::std::unordered_set<::std::string > beforeDisableMessagesWithHeader
Cached list of "before disable" messages that have a Header.
constexpr auto DEFAULT_OUT_TOPIC
Priority-based muxer for topics.
std::string const * frameId(const M &m)
::std::unique_ptr<::cras::impl::NodeletWithDiagnosticsPrivate > data
::std::string name
The human-readable name of the config.
::ros::Subscriber resetSub
Subscriber for reset topic.
::std::unordered_map<::std::string, ::cras::priority_mux::LockConfig > lockConfigs
Configurations of lock topics.
::std::unordered_map<::std::string, ::cras::priority_mux::TopicConfig > topicConfigs
Configurations of input topics.
Configuration of an input to the mux.
::std::unique_ptr<::cras::PriorityMux > mux
Mux instance.
::ros::Duration timeout
How long do messages on this topic keep the priority active.
Subscriber subscribe(const std::string &topic, uint32_t queue_size, const boost::function< void(C)> &callback, const VoidConstPtr &tracked_object=VoidConstPtr(), const TransportHints &transport_hints=TransportHints())
std::unordered_map< std::string, ::cras::priority_mux::OutputTopicConfig > outTopicConfigs
Configurations of output topics.
const Type & getType() const
virtual void resetCb(const ::ros::MessageEvent<::topic_tools::ShapeShifter const > &event)
Callback method triggered when a reset message is received.
auto bind_front(F &&f, Args &&... args)
::std::unordered_map<::std::string, ::ros::Publisher > publishers
Map of output topic and the associated publisher.
::std::unordered_map<::std::string, ::ros::MessageEvent<::cras::ShapeShifter > > beforeDisableMessages
The messages to be injected into the mux right before a topic is disabled.
#define CRAS_WARN_THROTTLE(period,...)
::cras::BoundParamHelperPtr params(const ::ros::NodeHandle &node, const ::std::string &ns="") const
bool tcpNoDelay
Whether tcpNoDelay() flag should be set for all subscribers.
::std::unordered_map<::std::string, ::ros::Publisher > selectedPublishers
Map of publishers of the topics announcing currently selected publishers.
uint8_t * getBuffer(::topic_tools::ShapeShifter &msg)
Get the internal buffer with serialized message data.
size_t queueSize
Queue size for the subscriber.
const boost::shared_ptr< M_string > & getConnectionHeaderPtr() const
void reset()
Reset the mux to its initial state.
::std::unordered_set<::std::string > outTopics
All output topic names.
bool hasParam(const ::std::string &name, const bool searchNested=true) const
::std::unordered_map<::std::string, ::ros::Timer > timers
Map of timer names and the timers.
virtual void disableCb(const ::std::string &inTopic, const ::ros::MessageEvent<::topic_tools::ShapeShifter const > &event)
Callback method triggered when a disable message is received.
void onTimeout(const ::std::string &name, const ::ros::TimerEvent &)
Method triggered on timer timeout. This should update the mux and check for changes.
Timer createTimer(Duration period, const TimerCallback &callback, bool oneshot=false, bool autostart=true) const
constexpr auto DEFAULT_NONE_TOPIC
void init(const ConstMessagePtr &message, const boost::shared_ptr< M_string > &connection_header, ros::Time receipt_time, bool nonconst_need_copy, const CreateFunction &create)
::cras::optional<::std_msgs::Header > getHeader(const ::topic_tools::ShapeShifter &msg)
Get the header field of the given message, if it has any.
::std::string outTopic
The output topic.
::std::list<::ros::Subscriber > subscribers
List of subscribers. Just for keeping them alive.
void setTimer(const ::std::string &name, const ::ros::Duration &timeout)
Set a timer that will call onTimeout() after the specified time.
inline ::std::string getParam(const ::std::string &name, const ::cras::optional< const char * > &defaultValue, const ::std::string &unit="", const ::cras::GetParamOptions<::std::string > &options={}) const
inline ::std::string to_string(char *value)
void resizeBuffer(::topic_tools::ShapeShifter &msg, size_t newLength)
Resize the internal buffer of the message.
bool setHeader(::topic_tools::ShapeShifter &msg, ::std_msgs::Header &header)
Change the header field of the given message, if it has any.
cras_topic_tools
Author(s): Martin Pecka
autogenerated on Sun Jan 14 2024 03:48:28