Go to the documentation of this file.
13 #include <unordered_map>
14 #include <unordered_set>
21 #include <std_msgs/Bool.h>
22 #include <std_msgs/Int32.h>
23 #include <std_msgs/String.h>
46 const auto nonePriority =
params->getParam(
"none_priority", 0);
47 const auto defaultSubscriberConnectDelay =
params->getParam(
"subscriber_connect_delay",
ros::WallDuration(0.1));
56 CRAS_ERROR(
"Parameter ~topics is empty, priority mux will not do anything!");
60 std::list<std::pair<std::string, XmlRpc::XmlRpcValue>> topicItems;
64 for (
size_t i = 0; i < topicParams.
size(); ++i)
65 topicItems.emplace_back(
"[" +
cras::to_string(i) +
"]", topicParams[i]);
68 for (
const auto& item : topicParams)
69 topicItems.emplace_back(
"/" + item.first, item.second);
72 CRAS_ERROR(
"Parameter ~topics has to be either a list or a dict. Priority mux will not do anything!");
76 std::unordered_map<std::string, std::string> disableTopics;
77 std::unordered_set<std::string> invertedDisableTopics;
79 for (
const auto& item : topicItems)
81 const auto& key = item.first;
82 const auto& xmlConfig = item.second;
85 CRAS_ERROR(
"Item %s of ~topics has to be a dict, but %s was given.",
91 auto xmlParamAdapter = std::make_shared<cras::XmlRpcValueGetParamAdapter>(xmlConfig, itemNamespace);
97 config.
inTopic = xmlParams.
getParam<std::string>(
"topic", cras::nullopt);
101 CRAS_ERROR(
"Item %s of ~topics has to contain key 'topic'. Skipping the item.", key.c_str());
114 CRAS_ERROR(
"Item %s of ~topics has to contain key 'priority'. Skipping the item.", key.c_str());
123 CRAS_ERROR(
"Item %s of ~topics has to contain key 'timeout'. Skipping the item.", key.c_str());
127 const auto disableTopic = xmlParams.
getParam(
"disable_topic", std::string());
128 if (!disableTopic.empty())
130 disableTopics[config.
inTopic] = disableTopic;
132 const auto disableTopicInverted = xmlParams.
getParam(
"disable_topic_inverted",
false);
133 if (disableTopicInverted)
134 invertedDisableTopics.insert(disableTopic);
137 const auto beforeDisableMessageBag = xmlParams.
getParam(
"before_disable_message", std::string());
138 if (!beforeDisableMessageBag.empty())
144 if (view.
size() == 0)
146 CRAS_ERROR(
"Bag file %s does not contain any messages.", beforeDisableMessageBag.c_str());
152 auto shifter = boost::make_shared<cras::ShapeShifter>();
156 shifter->morph(
msg.getMD5Sum(),
msg.getDataType(),
msg.getMessageDefinition(),
msg.isLatching() ?
"1" :
"");
162 const auto frameId = xmlParams.
getParam(
"before_disable_message_frame_id", std::string());
193 lockParams =
params->getParam(
"locks", lockParams);
194 std::list<std::pair<std::string, XmlRpc::XmlRpcValue>> lockItems;
198 for (
size_t i = 0; i < lockParams.
size(); ++i)
202 for (
const auto& item : lockParams)
203 lockItems.emplace_back(
"/" + item.first, item.second);
206 CRAS_ERROR(
"Parameter ~locks has to be either a list or a dict. No locks will be set.");
210 if (lockItems.empty())
213 for (
const auto& item : lockItems)
215 const auto& key = item.first;
216 const auto& xmlConfig = item.second;
219 CRAS_ERROR(
"Item %s of ~locks has to be a dict, but %s was given.",
225 auto xmlParamAdapter = std::make_shared<cras::XmlRpcValueGetParamAdapter>(xmlConfig, itemNamespace);
231 config.
topic = xmlParams.
getParam<std::string>(
"topic", cras::nullopt);
235 CRAS_ERROR(
"Item %s of ~locks has to contain key 'topic'. Skipping the item.", key.c_str());
246 CRAS_ERROR(
"Item %s of ~locks has to contain key 'priority'. Skipping the item.", key.c_str());
255 CRAS_ERROR(
"Item %s of ~locks has to contain key 'timeout'. Skipping the item.", key.c_str());
267 const auto& topicConfig = config.second;
268 this->
outTopics.insert(topicConfig.outTopic);
270 this->
outTopicConfigs[topicConfig.outTopic].subscriberConnectDelay = defaultSubscriberConnectDelay;
274 topicConfig.inTopic, topicConfig.queueSize,
cb,
nullptr, transportHints);
278 std::list<std::pair<std::string, XmlRpc::XmlRpcValue>> outTopicItems;
279 if (
params->hasParam(
"out_topics"))
282 switch (outTopicParams.getType())
285 for (
size_t i = 0; i < outTopicParams.size(); ++i)
286 outTopicItems.emplace_back(
"[" +
cras::to_string(i) +
"]", outTopicParams[i]);
289 for (
const auto& item : outTopicParams)
290 outTopicItems.emplace_back(
"/" + item.first, item.second);
293 CRAS_ERROR(
"Parameter ~out_topics has to be either a list or a dict. It will be ignored!");
297 for (
const auto& item : outTopicItems)
299 const auto& key = item.first;
300 const auto& xmlConfig = item.second;
303 CRAS_ERROR(
"Item %s of ~out_topics has to be a dict, but %s was given.",
309 auto xmlParamAdapter = std::make_shared<cras::XmlRpcValueGetParamAdapter>(xmlConfig, itemNamespace);
315 topic = xmlParams.
getParam<std::string>(
"topic", cras::nullopt);
318 CRAS_ERROR(
"Item %s of ~out_topics has to contain key 'topic'. Skipping the item.", key.c_str());
328 CRAS_ERROR(
"Item %s of ~out_topics has to contain key 'topic'. Skipping the item.", key.c_str());
334 config.queueSize = xmlParams.
getParam(
"queue_size", config.queueSize);
335 config.subscriberConnectDelay = xmlParams.
getParam(
"subscriber_connect_delay", config.subscriberConnectDelay);
336 config.numSubscribersToWait = xmlParams.
getParam(
"num_subscribers_to_wait", config.numSubscribersToWait);
337 if (xmlParams.
hasParam(
"force_latch"))
338 config.forceLatch = xmlParams.
getParam<
bool>(
"force_latch", cras::nullopt);
341 for (
const auto& outTopic : this->
outTopics)
348 this->
mux = std::make_unique<PriorityMux>(this->topicConfigs, this->
lockConfigs,
350 noneTopic, nonePriority);
353 "active_priority", this->
queueSize,
true);
358 for (
const auto& inTopicAndDisableTopic : disableTopics)
360 const auto& inTopic = inTopicAndDisableTopic.first;
361 const auto& disableTopic = inTopicAndDisableTopic.second;
362 const bool invert = invertedDisableTopics.find(disableTopic) != invertedDisableTopics.end();
366 disableTopic, this->
queueSize,
cb,
nullptr, transportHints);
372 const auto& lockConfig = config.second;
375 lockConfig.topic, this->
queueSize,
cb,
nullptr, transportHints);
389 CRAS_DEBUG(
"Received message on topic %s with priority %i. Current priority %i.",
390 inTopic.c_str(), topicConfig.priority, this->mux->getActivePriority());
396 auto& pub = this->
publishers[topicConfig.outTopic];
399 const auto& outTopicConfig = this->
outTopicConfigs[topicConfig.outTopic];
404 latch =
event.getConnectionHeader()[
"latching"] ==
"1";
405 if (outTopicConfig.forceLatch.has_value())
406 latch = *outTopicConfig.forceLatch;
409 pub =
event.getConstMessage()->advertise(
410 this->
getNodeHandle(), topicConfig.outTopic, outTopicConfig.queueSize, latch);
411 CRAS_DEBUG(
"Created publisher %s with type %s.", topicConfig.outTopic.c_str(),
418 while (
ros::ok() && pub.getNumSubscribers() < outTopicConfig.numSubscribersToWait)
424 topicConfig.outTopic.c_str(), pub.getNumSubscribers(),
425 outTopicConfig.numSubscribersToWait);
434 CRAS_DEBUG(
"Publishing message on topic %s.", inTopic.c_str());
439 CRAS_DEBUG(
"Discarding message on topic %s.", inTopic.c_str());
458 disable =
event.getConstMessage()->instantiate<std_msgs::Bool>()->
data;
469 const auto& itEvent = it->second;
470 auto message = itEvent.getConstMessage();
471 auto nonconstWillCopy = itEvent.nonConstWillCopy();
479 auto nonconstMessage = itEvent.getMessage();
481 message = nonconstMessage;
482 nonconstWillCopy =
false;
488 beforeEvent.
init(message, itEvent.getConnectionHeaderPtr(), event.
getReceiptTime(), nonconstWillCopy,
489 itEvent.getMessageFactory());
492 this->
cb(inTopic, beforeEvent);
503 const auto newPriority = this->
mux->getActivePriority();
506 CRAS_INFO(
"Priority %i is now active.", newPriority);
508 msg.data = newPriority;
513 const auto& newTopics = this->
mux->getLastSelectedTopics();
515 for (
const auto& outTopic : this->
outTopics)
517 const auto newIt = newTopics.find(outTopic);
518 const auto oldIt = oldTopics.find(outTopic);
519 if (newIt == newTopics.end() || oldIt == oldTopics.end() || newIt->second != oldIt->second)
521 CRAS_INFO(
"Source topic '%s' is now selected for output topic '%s'.", newIt->second.c_str(), outTopic.c_str());
522 std_msgs::String
msg;
523 msg.data = newIt->second;
539 if (this->
timers.find(name) == this->timers.end())
546 this->
timers[name].setPeriod(timeout);
554 for (
auto& timer : this->
timers)
556 this->timers.clear();
::std::string name
The human-readable name of the config.
ros::Time getReceiptTime() const
Configuration of a lock topic for the mux.
int priority
The priority for the topic. Usually is a positive number.
ros::NodeHandle & getNodeHandle() const
bool hasHeader(const ::topic_tools::ShapeShifter &msg)
Tell whether the given message has header field.
Tools for more convenient working with ShapeShifter objects.
::cras::BoundParamHelperPtr privateParams(const ::std::string &ns="") const
void stripLeading(::std::string &s, const char &c=' ')
void publishChanges()
Check what changed from the last publishChanges() call and if there are changes, publish them (active...
::cras::LogHelperPtr getLogger() const
::std::string topic
The topic associated with the lock.
::std::string inTopic
The input topic.
::ros::Publisher activePriorityPub
Publisher for active priority.
virtual void disableCb(const ::std::string &inTopic, bool invert, const ::ros::MessageEvent<::topic_tools::ShapeShifter const > &event)
Callback method triggered when a disable message is received.
const boost::shared_ptr< ConstMessage > & getConstMessage() const
Priority-based muxer nodelet for topics.
ros::NodeHandle & getPrivateNodeHandle() const
void publish(const boost::shared_ptr< M > &message) const
Publisher advertise(AdvertiseOptions &ops)
size_t queueSize
Queue size of all publishers and subscribers.
TransportHints & tcpNoDelay(bool nodelay=true)
virtual void cb(const ::std::string &inTopic, const ::ros::MessageEvent<::topic_tools::ShapeShifter const > &event)
Callback method triggered when a message is received on the input topic.
::cras::optional< int > lastActivePriority
The active priority after the last publishChanges() call.
Priority-based muxer nodelet for topics.
::std::unordered_map<::std::string, ::std::string > lastSelectedTopics
The selected output topics after the last publishChanges() call.
virtual void lockCb(const ::std::string &topic, const ::ros::MessageEvent<::std_msgs::Bool const > &event)
Callback method triggered when a lock message is received.
std::string resolveName(const std::string &name, bool remap=true) const
int priority
The priority for the lock. Usually is a positive number.
#define PLUGINLIB_EXPORT_CLASS(class_type, base_class_type)
::std::unordered_set<::std::string > beforeDisableMessagesWithHeader
Cached list of "before disable" messages that have a Header.
constexpr auto DEFAULT_OUT_TOPIC
Priority-based muxer for topics.
std::string const * frameId(const M &m)
::std::unique_ptr<::cras::impl::NodeletWithDiagnosticsPrivate > data
::std::string name
The human-readable name of the config.
::ros::Subscriber resetSub
Subscriber for reset topic.
::std::unordered_map<::std::string, ::cras::priority_mux::LockConfig > lockConfigs
Configurations of lock topics.
::std::unordered_map<::std::string, ::cras::priority_mux::TopicConfig > topicConfigs
Configurations of input topics.
Configuration of an input to the mux.
::std::unique_ptr<::cras::PriorityMux > mux
Mux instance.
::ros::Duration timeout
How long do messages on this topic keep the priority active.
Subscriber subscribe(const std::string &topic, uint32_t queue_size, const boost::function< void(C)> &callback, const VoidConstPtr &tracked_object=VoidConstPtr(), const TransportHints &transport_hints=TransportHints())
std::unordered_map< std::string, ::cras::priority_mux::OutputTopicConfig > outTopicConfigs
Configurations of output topics.
const Type & getType() const
virtual void resetCb(const ::ros::MessageEvent<::topic_tools::ShapeShifter const > &event)
Callback method triggered when a reset message is received.
auto bind_front(F &&f, Args &&... args)
::std::unordered_map<::std::string, ::ros::Publisher > publishers
Map of output topic and the associated publisher.
::std::unordered_map<::std::string, ::ros::MessageEvent<::cras::ShapeShifter > > beforeDisableMessages
The messages to be injected into the mux right before a topic is disabled.
#define CRAS_WARN_THROTTLE(period,...)
::cras::BoundParamHelperPtr params(const ::ros::NodeHandle &node, const ::std::string &ns="") const
bool tcpNoDelay
Whether tcpNoDelay() flag should be set for all subscribers.
::std::unordered_map<::std::string, ::ros::Publisher > selectedPublishers
Map of publishers of the topics announcing currently selected publishers.
uint8_t * getBuffer(::topic_tools::ShapeShifter &msg)
Get the internal buffer with serialized message data.
size_t queueSize
Queue size for the subscriber.
const boost::shared_ptr< M_string > & getConnectionHeaderPtr() const
void reset()
Reset the mux to its initial state.
::std::unordered_set<::std::string > outTopics
All output topic names.
bool hasParam(const ::std::string &name, const bool searchNested=true) const
::std::unordered_map<::std::string, ::ros::Timer > timers
Map of timer names and the timers.
void onTimeout(const ::std::string &name, const ::ros::TimerEvent &)
Method triggered on timer timeout. This should update the mux and check for changes.
Timer createTimer(Duration period, const TimerCallback &callback, bool oneshot=false, bool autostart=true) const
constexpr auto DEFAULT_NONE_TOPIC
void init(const ConstMessagePtr &message, const boost::shared_ptr< M_string > &connection_header, ros::Time receipt_time, bool nonconst_need_copy, const CreateFunction &create)
::cras::optional<::std_msgs::Header > getHeader(const ::topic_tools::ShapeShifter &msg)
Get the header field of the given message, if it has any.
::std::string outTopic
The output topic.
::std::list<::ros::Subscriber > subscribers
List of subscribers. Just for keeping them alive.
void setTimer(const ::std::string &name, const ::ros::Duration &timeout)
Set a timer that will call onTimeout() after the specified time.
inline ::std::string getParam(const ::std::string &name, const ::cras::optional< const char * > &defaultValue, const ::std::string &unit="", const ::cras::GetParamOptions<::std::string > &options={}) const
inline ::std::string to_string(char *value)
void resizeBuffer(::topic_tools::ShapeShifter &msg, size_t newLength)
Resize the internal buffer of the message.
bool setHeader(::topic_tools::ShapeShifter &msg, ::std_msgs::Header &header)
Change the header field of the given message, if it has any.
cras_topic_tools
Author(s): Martin Pecka
autogenerated on Sun Jan 5 2025 03:50:49