priority_mux.cpp
Go to the documentation of this file.
1 // SPDX-License-Identifier: BSD-3-Clause
2 // SPDX-FileCopyrightText: Czech Technical University in Prague
3 
10 #include <list>
11 #include <memory>
12 #include <string>
13 #include <unordered_map>
14 #include <unordered_set>
15 #include <utility>
16 
18 #include <ros/ros.h>
19 #include <rosbag/bag.h>
20 #include <rosbag/view.h>
21 #include <std_msgs/Bool.h>
22 #include <std_msgs/Int32.h>
23 #include <std_msgs/String.h>
24 
30 
31 namespace cras
32 {
33 
34 constexpr auto DEFAULT_OUT_TOPIC = "mux_out";
35 constexpr auto DEFAULT_NONE_TOPIC = "__none";
36 
38 {
39  const auto params = this->privateParams();
40 
41  this->queueSize = params->getParam("queue_size", 10_sz, "messages");
42  this->tcpNoDelay = params->getParam("tcp_no_delay", false);
43 
44  const auto defaultOutTopic = params->getParam("default_out_topic", std::string(DEFAULT_OUT_TOPIC));
45  const auto noneTopic = params->getParam("none_topic", std::string(DEFAULT_NONE_TOPIC));
46  const auto nonePriority = params->getParam("none_priority", 0);
47  const auto defaultSubscriberConnectDelay = params->getParam("subscriber_connect_delay", ros::WallDuration(0.1));
48 
49  XmlRpc::XmlRpcValue topicParams;
50  try
51  {
52  topicParams = params->getParam<XmlRpc::XmlRpcValue>("topics", cras::nullopt);
53  }
54  catch (const cras::GetParamException& e)
55  {
56  CRAS_ERROR("Parameter ~topics is empty, priority mux will not do anything!");
57  return;
58  }
59 
60  std::list<std::pair<std::string, XmlRpc::XmlRpcValue>> topicItems;
61  switch (topicParams.getType())
62  {
64  for (size_t i = 0; i < topicParams.size(); ++i)
65  topicItems.emplace_back("[" + cras::to_string(i) + "]", topicParams[i]);
66  break;
68  for (const auto& item : topicParams)
69  topicItems.emplace_back("/" + item.first, item.second);
70  break;
71  default:
72  CRAS_ERROR("Parameter ~topics has to be either a list or a dict. Priority mux will not do anything!");
73  return;
74  }
75 
76  std::unordered_map<std::string, std::string> disableTopics;
77  std::unordered_set<std::string> invertedDisableTopics;
78 
79  for (const auto& item : topicItems)
80  {
81  const auto& key = item.first;
82  const auto& xmlConfig = item.second;
83  if (xmlConfig.getType() != XmlRpc::XmlRpcValue::TypeStruct)
84  {
85  CRAS_ERROR("Item %s of ~topics has to be a dict, but %s was given.",
86  key.c_str(), cras::to_string(xmlConfig.getType()).c_str());
87  continue;
88  }
89 
90  auto itemNamespace = this->getPrivateNodeHandle().resolveName("topics") + key;
91  auto xmlParamAdapter = std::make_shared<cras::XmlRpcValueGetParamAdapter>(xmlConfig, itemNamespace);
92  cras::BoundParamHelper xmlParams(this->log, xmlParamAdapter);
94 
95  try
96  {
97  config.inTopic = xmlParams.getParam<std::string>("topic", cras::nullopt);
98  }
99  catch (const cras::GetParamException& e)
100  {
101  CRAS_ERROR("Item %s of ~topics has to contain key 'topic'. Skipping the item.", key.c_str());
102  continue;
103  }
104 
105  config.name = xmlParams.getParam("name", config.inTopic);
106  config.outTopic = xmlParams.getParam("out_topic", defaultOutTopic);
107  config.queueSize = xmlParams.getParam("queue_size", this->queueSize);
108  try
109  {
110  config.priority = xmlParams.getParam<int>("priority", cras::nullopt);
111  }
112  catch (const cras::GetParamException& e)
113  {
114  CRAS_ERROR("Item %s of ~topics has to contain key 'priority'. Skipping the item.", key.c_str());
115  continue;
116  }
117  try
118  {
119  config.timeout = xmlParams.getParam<ros::Duration>("timeout", cras::nullopt, "s");
120  }
121  catch (const cras::GetParamException& e)
122  {
123  CRAS_ERROR("Item %s of ~topics has to contain key 'timeout'. Skipping the item.", key.c_str());
124  continue;
125  }
126 
127  const auto disableTopic = xmlParams.getParam("disable_topic", std::string());
128  if (!disableTopic.empty())
129  {
130  disableTopics[config.inTopic] = disableTopic;
131 
132  const auto disableTopicInverted = xmlParams.getParam("disable_topic_inverted", false);
133  if (disableTopicInverted)
134  invertedDisableTopics.insert(disableTopic);
135 
136  // If there should be a message sent just before disabling this topic, read the message from a bag
137  const auto beforeDisableMessageBag = xmlParams.getParam("before_disable_message", std::string());
138  if (!beforeDisableMessageBag.empty())
139  {
140  try
141  {
142  rosbag::Bag bag(beforeDisableMessageBag);
143  rosbag::View view(bag);
144  if (view.size() == 0)
145  {
146  CRAS_ERROR("Bag file %s does not contain any messages.", beforeDisableMessageBag.c_str());
147  }
148  else
149  {
150  // Read the first message from the bag and copy it to a ShapeShifter
151  const auto msg = *view.begin();
152  auto shifter = boost::make_shared<cras::ShapeShifter>();
153  cras::resizeBuffer(*shifter, msg.size());
154  ros::serialization::OStream stream(cras::getBuffer(*shifter), msg.size());
155  msg.write(stream);
156  shifter->morph(msg.getMD5Sum(), msg.getDataType(), msg.getMessageDefinition(), msg.isLatching() ? "1" : "");
157  // If the message has Header, figure out if we need to overwrite frame_id
158  if (cras::hasHeader(*shifter))
159  {
160  // This set is read by disableCb().
161  this->beforeDisableMessagesWithHeader.insert(config.inTopic);
162  const auto frameId = xmlParams.getParam("before_disable_message_frame_id", std::string());
163  if (!frameId.empty())
164  {
165  auto header = cras::getHeader(*shifter);
166  if (header)
167  {
168  header->frame_id = frameId;
169  cras::setHeader(*shifter, *header);
170  }
171  }
172  }
173  // Create a fake message event. Time does not matter as the code in disableCb() ignores it.
175  event.init(shifter, msg.getConnectionHeader(), ros::Time(0), true,
177  this->beforeDisableMessages[config.inTopic] = event;
178  }
179  }
180  catch (const rosbag::BagException& e)
181  {
182  CRAS_ERROR("%s", e.what());
183  }
184  }
185  }
186 
187  this->topicConfigs[config.inTopic] = config;
188  }
189 
190  XmlRpc::XmlRpcValue lockParams;
191  lockParams.begin(); // convert to struct
192 
193  lockParams = params->getParam("locks", lockParams);
194  std::list<std::pair<std::string, XmlRpc::XmlRpcValue>> lockItems;
195  switch (lockParams.getType())
196  {
198  for (size_t i = 0; i < lockParams.size(); ++i)
199  lockItems.emplace_back("[" + cras::to_string(i) + "]", lockParams[i]);
200  break;
202  for (const auto& item : lockParams)
203  lockItems.emplace_back("/" + item.first, item.second);
204  break;
205  default:
206  CRAS_ERROR("Parameter ~locks has to be either a list or a dict. No locks will be set.");
207  return;
208  }
209 
210  if (lockItems.empty())
211  CRAS_INFO("No locks were specified.");
212 
213  for (const auto& item : lockItems)
214  {
215  const auto& key = item.first;
216  const auto& xmlConfig = item.second;
217  if (xmlConfig.getType() != XmlRpc::XmlRpcValue::TypeStruct)
218  {
219  CRAS_ERROR("Item %s of ~locks has to be a dict, but %s was given.",
220  key.c_str(), cras::to_string(xmlConfig.getType()).c_str());
221  continue;
222  }
223 
224  auto itemNamespace = this->getPrivateNodeHandle().resolveName("locks") + key;
225  auto xmlParamAdapter = std::make_shared<cras::XmlRpcValueGetParamAdapter>(xmlConfig, itemNamespace);
226  cras::BoundParamHelper xmlParams(this->log, xmlParamAdapter);
228 
229  try
230  {
231  config.topic = xmlParams.getParam<std::string>("topic", cras::nullopt);
232  }
233  catch (const cras::GetParamException& e)
234  {
235  CRAS_ERROR("Item %s of ~locks has to contain key 'topic'. Skipping the item.", key.c_str());
236  continue;
237  }
238 
239  config.name = xmlParams.getParam("name", config.topic);
240  try
241  {
242  config.priority = xmlParams.getParam<int>("priority", cras::nullopt);
243  }
244  catch (const cras::GetParamException& e)
245  {
246  CRAS_ERROR("Item %s of ~locks has to contain key 'priority'. Skipping the item.", key.c_str());
247  continue;
248  }
249  try
250  {
251  config.timeout = xmlParams.getParam<ros::Duration>("timeout", cras::nullopt, "s");
252  }
253  catch (const cras::GetParamException& e)
254  {
255  CRAS_ERROR("Item %s of ~locks has to contain key 'timeout'. Skipping the item.", key.c_str());
256  continue;
257  }
258 
259  this->lockConfigs[config.topic] = config;
260  }
261 
262  ros::TransportHints transportHints;
263  transportHints.tcpNoDelay(this->tcpNoDelay);
264 
265  for (const auto& config : this->topicConfigs)
266  {
267  const auto& topicConfig = config.second;
268  this->outTopics.insert(topicConfig.outTopic);
269  this->outTopicConfigs[topicConfig.outTopic].queueSize = this->queueSize;
270  this->outTopicConfigs[topicConfig.outTopic].subscriberConnectDelay = defaultSubscriberConnectDelay;
271 
272  const auto cb = cras::bind_front(&PriorityMuxNodelet::cb, this, topicConfig.inTopic);
273  const auto sub = this->getNodeHandle().subscribe<topic_tools::ShapeShifter>(
274  topicConfig.inTopic, topicConfig.queueSize, cb, nullptr, transportHints);
275  this->subscribers.push_back(sub);
276  }
277 
278  std::list<std::pair<std::string, XmlRpc::XmlRpcValue>> outTopicItems;
279  if (params->hasParam("out_topics"))
280  {
281  const auto outTopicParams = params->getParam<XmlRpc::XmlRpcValue>("out_topics", cras::nullopt);
282  switch (outTopicParams.getType())
283  {
285  for (size_t i = 0; i < outTopicParams.size(); ++i)
286  outTopicItems.emplace_back("[" + cras::to_string(i) + "]", outTopicParams[i]);
287  break;
289  for (const auto& item : outTopicParams)
290  outTopicItems.emplace_back("/" + item.first, item.second);
291  break;
292  default:
293  CRAS_ERROR("Parameter ~out_topics has to be either a list or a dict. It will be ignored!");
294  }
295  }
296 
297  for (const auto& item : outTopicItems)
298  {
299  const auto& key = item.first;
300  const auto& xmlConfig = item.second;
301  if (xmlConfig.getType() != XmlRpc::XmlRpcValue::TypeStruct)
302  {
303  CRAS_ERROR("Item %s of ~out_topics has to be a dict, but %s was given.",
304  key.c_str(), cras::to_string(xmlConfig.getType()).c_str());
305  continue;
306  }
307 
308  auto itemNamespace = this->getPrivateNodeHandle().resolveName("out_topics") + key;
309  auto xmlParamAdapter = std::make_shared<cras::XmlRpcValueGetParamAdapter>(xmlConfig, itemNamespace);
310  cras::BoundParamHelper xmlParams(this->log, xmlParamAdapter);
311 
312  std::string topic;
313  try
314  {
315  topic = xmlParams.getParam<std::string>("topic", cras::nullopt);
316  if (topic.empty())
317  {
318  CRAS_ERROR("Item %s of ~out_topics has to contain key 'topic'. Skipping the item.", key.c_str());
319  continue;
320  }
321  else
322  {
323  this->outTopicConfigs[topic].topic = topic;
324  }
325  }
326  catch (const cras::GetParamException& e)
327  {
328  CRAS_ERROR("Item %s of ~out_topics has to contain key 'topic'. Skipping the item.", key.c_str());
329  continue;
330  }
331 
332  auto& config = this->outTopicConfigs[topic];
333 
334  config.queueSize = xmlParams.getParam("queue_size", config.queueSize);
335  config.subscriberConnectDelay = xmlParams.getParam("subscriber_connect_delay", config.subscriberConnectDelay);
336  config.numSubscribersToWait = xmlParams.getParam("num_subscribers_to_wait", config.numSubscribersToWait);
337  if (xmlParams.hasParam("force_latch"))
338  config.forceLatch = xmlParams.getParam<bool>("force_latch", cras::nullopt);
339  }
340 
341  for (const auto& outTopic : this->outTopics)
342  {
343  const auto selectedTopic = "selected/" + cras::stripLeading(outTopic, '/');
344  this->selectedPublishers[outTopic] =
345  this->getPrivateNodeHandle().advertise<std_msgs::String>(selectedTopic, this->queueSize, true);
346  }
347 
348  this->mux = std::make_unique<PriorityMux>(this->topicConfigs, this->lockConfigs,
350  noneTopic, nonePriority);
351 
352  this->activePriorityPub = this->getPrivateNodeHandle().advertise<std_msgs::Int32>(
353  "active_priority", this->queueSize, true);
354 
355  this->resetSub = this->getPrivateNodeHandle().subscribe(
356  "reset", this->queueSize, &PriorityMuxNodelet::resetCb, this, transportHints);
357 
358  for (const auto& inTopicAndDisableTopic : disableTopics)
359  {
360  const auto& inTopic = inTopicAndDisableTopic.first;
361  const auto& disableTopic = inTopicAndDisableTopic.second;
362  const bool invert = invertedDisableTopics.find(disableTopic) != invertedDisableTopics.end();
363 
364  const auto cb = cras::bind_front(&PriorityMuxNodelet::disableCb, this, inTopic, invert);
365  const auto sub = this->getNodeHandle().subscribe<topic_tools::ShapeShifter>(
366  disableTopic, this->queueSize, cb, nullptr, transportHints);
367  this->subscribers.push_back(sub);
368  }
369 
370  for (const auto& config : this->lockConfigs)
371  {
372  const auto& lockConfig = config.second;
373  const auto cb = cras::bind_front(&PriorityMuxNodelet::lockCb, this, lockConfig.topic);
374  const auto sub = this->getNodeHandle().subscribe<std_msgs::Bool>(
375  lockConfig.topic, this->queueSize, cb, nullptr, transportHints);
376  this->subscribers.push_back(sub);
377  }
378 
379  ros::WallDuration(0.1).sleep(); // Give publishers and subscribers time to wire up
380 
381  this->publishChanges();
382 }
383 
385  const std::string& inTopic, const ros::MessageEvent<const topic_tools::ShapeShifter>& event)
386 {
387  const auto& topicConfig = this->topicConfigs[inTopic];
388 
389  CRAS_DEBUG("Received message on topic %s with priority %i. Current priority %i.",
390  inTopic.c_str(), topicConfig.priority, this->mux->getActivePriority());
391 
392  const auto shouldPublish = this->mux->cb(inTopic, event.getReceiptTime(), ros::Time::now());
393 
394  if (shouldPublish)
395  {
396  auto& pub = this->publishers[topicConfig.outTopic];
397  if (!pub)
398  {
399  const auto& outTopicConfig = this->outTopicConfigs[topicConfig.outTopic];
400 
401  // Figure out if the publisher should be latched
402  auto latch = false;
403  if (event.getConnectionHeaderPtr() != nullptr)
404  latch = event.getConnectionHeader()["latching"] == "1";
405  if (outTopicConfig.forceLatch.has_value())
406  latch = *outTopicConfig.forceLatch;
407 
408  // Create the publisher
409  pub = event.getConstMessage()->advertise(
410  this->getNodeHandle(), topicConfig.outTopic, outTopicConfig.queueSize, latch);
411  CRAS_DEBUG("Created publisher %s with type %s.", topicConfig.outTopic.c_str(),
412  event.getConstMessage()->getDataType().c_str());
413 
414  // Give the publisher some time to wire up so that we don't lose the first message.
415  if (!latch)
416  {
417  const auto waitStart = ros::WallTime::now();
418  while (ros::ok() && pub.getNumSubscribers() < outTopicConfig.numSubscribersToWait)
419  {
420  ros::WallDuration(0.01).sleep();
421  if (ros::WallTime::now() - waitStart > ros::WallDuration(1, 0))
422  {
423  CRAS_WARN_THROTTLE(1.0, "Waiting for subscribers to topic %s (has %u/%zu).",
424  topicConfig.outTopic.c_str(), pub.getNumSubscribers(),
425  outTopicConfig.numSubscribersToWait);
426  }
427  }
428  if (ros::WallTime::now() < (waitStart + outTopicConfig.subscriberConnectDelay))
429  (outTopicConfig.subscriberConnectDelay - (ros::WallTime::now() - waitStart)).sleep();
430  CRAS_DEBUG("Waited %f seconds before publishing.", (ros::WallTime::now() - waitStart).toSec());
431  }
432  }
433 
434  CRAS_DEBUG("Publishing message on topic %s.", inTopic.c_str());
435  pub.publish(event.getConstMessage());
436  }
437  else
438  {
439  CRAS_DEBUG("Discarding message on topic %s.", inTopic.c_str());
440  }
441 
442  this->publishChanges();
443 }
444 
445 void PriorityMuxNodelet::lockCb(const std::string& topic, const ros::MessageEvent<std_msgs::Bool const>& event)
446 {
447  this->mux->lockCb(topic, event.getReceiptTime(), event.getConstMessage()->data, ros::Time::now());
448 
449  this->publishChanges();
450 }
451 
453  const std::string& inTopic, const bool invert, const ros::MessageEvent<const topic_tools::ShapeShifter>& event)
454 {
455  bool disable = true;
457  {
458  disable = event.getConstMessage()->instantiate<std_msgs::Bool>()->data;
459  if (invert)
460  disable = !disable;
461  }
462 
463  // Publish the "before disabled" message if there is any and the topic gets disabled right now
464  if (disable && !this->mux->isDisabled(inTopic, event.getReceiptTime()))
465  {
466  const auto& it = this->beforeDisableMessages.find(inTopic);
467  if (it != this->beforeDisableMessages.end())
468  {
469  const auto& itEvent = it->second;
470  auto message = itEvent.getConstMessage();
471  auto nonconstWillCopy = itEvent.nonConstWillCopy();
472  // If the message type has header, update stamp to current time
473  if (this->beforeDisableMessagesWithHeader.find(inTopic) != this->beforeDisableMessagesWithHeader.end())
474  {
475  auto header = cras::getHeader(*message);
476  if (header)
477  {
478  header->stamp = ros::Time::now();
479  auto nonconstMessage = itEvent.getMessage();
480  cras::setHeader(*nonconstMessage, *header);
481  message = nonconstMessage;
482  nonconstWillCopy = false; // We already made a copy and nobody else will use it
483  }
484  }
485 
486  // Create a fake message event
488  beforeEvent.init(message, itEvent.getConnectionHeaderPtr(), event.getReceiptTime(), nonconstWillCopy,
489  itEvent.getMessageFactory());
490 
491  // Inject the constructed message as if it were received on the inTopic.
492  this->cb(inTopic, beforeEvent);
493  }
494  }
495 
496  this->mux->disableCb(inTopic, event.getReceiptTime(), disable, ros::Time::now());
497 
498  this->publishChanges();
499 }
500 
502 {
503  const auto newPriority = this->mux->getActivePriority();
504  if (!this->lastActivePriority || newPriority != *this->lastActivePriority)
505  {
506  CRAS_INFO("Priority %i is now active.", newPriority);
507  std_msgs::Int32 msg;
508  msg.data = newPriority;
509  this->activePriorityPub.publish(msg);
510  }
511  this->lastActivePriority = newPriority;
512 
513  const auto& newTopics = this->mux->getLastSelectedTopics();
514  const auto& oldTopics = this->lastSelectedTopics;
515  for (const auto& outTopic : this->outTopics)
516  {
517  const auto newIt = newTopics.find(outTopic);
518  const auto oldIt = oldTopics.find(outTopic);
519  if (newIt == newTopics.end() || oldIt == oldTopics.end() || newIt->second != oldIt->second)
520  {
521  CRAS_INFO("Source topic '%s' is now selected for output topic '%s'.", newIt->second.c_str(), outTopic.c_str());
522  std_msgs::String msg;
523  msg.data = newIt->second;
524  this->selectedPublishers[outTopic].publish(msg);
525  }
526  }
527  this->lastSelectedTopics = newTopics;
528 }
529 
530 void PriorityMuxNodelet::onTimeout(const std::string&, const ros::TimerEvent& event)
531 {
532  this->mux->update(event.current_real);
533 
534  this->publishChanges();
535 }
536 
537 void PriorityMuxNodelet::setTimer(const std::string& name, const ros::Duration& timeout)
538 {
539  if (this->timers.find(name) == this->timers.end())
540  {
541  const auto cb = cras::bind_front(&PriorityMuxNodelet::onTimeout, this, name);
542  this->timers[name] = this->getNodeHandle().createTimer(timeout, cb, true, true);
543  }
544  else
545  {
546  this->timers[name].setPeriod(timeout);
547  }
548 }
549 
551 {
552  CRAS_WARN("Resetting mux.");
553 
554  for (auto& timer : this->timers)
555  timer.second.stop();
556  this->timers.clear();
557 
558  this->lastActivePriority.reset();
559  this->lastSelectedTopics.clear();
560  this->mux->reset(ros::Time::now());
561 
562  this->publishChanges();
563 }
564 
566 {
567  this->reset();
568 }
569 
570 }
571 
XmlRpc::XmlRpcValue::size
int size() const
cras::priority_mux::LockConfig::name
::std::string name
The human-readable name of the config.
Definition: priority_mux_base.h:47
ros::MessageEvent::getReceiptTime
ros::Time getReceiptTime() const
ros::WallDuration::sleep
bool sleep() const
ros::serialization::OStream
cras::priority_mux::LockConfig
Configuration of a lock topic for the mux.
Definition: priority_mux_base.h:45
cras::priority_mux::TopicConfig::priority
int priority
The priority for the topic. Usually is a positive number.
Definition: priority_mux_base.h:37
rosbag::Bag
msg
msg
nodelet::Nodelet::getNodeHandle
ros::NodeHandle & getNodeHandle() const
rosbag::View::size
uint32_t size()
cras
xmlrpc_value_utils.hpp
cras::hasHeader
bool hasHeader(const ::topic_tools::ShapeShifter &msg)
Tell whether the given message has header field.
shape_shifter.h
Tools for more convenient working with ShapeShifter objects.
NodeletParamHelper< ::nodelet::Nodelet >::privateParams
::cras::BoundParamHelperPtr privateParams(const ::std::string &ns="") const
ros.h
cras::stripLeading
void stripLeading(::std::string &s, const char &c=' ')
cras::PriorityMuxNodelet::publishChanges
void publishChanges()
Check what changed from the last publishChanges() call and if there are changes, publish them (active...
Definition: priority_mux.cpp:501
cras::ParamHelper::getLogger
::cras::LogHelperPtr getLogger() const
cras::priority_mux::LockConfig::topic
::std::string topic
The topic associated with the lock.
Definition: priority_mux_base.h:48
ros::message_traits::DataType
ros::TransportHints
cras::PriorityMuxNodelet::onInit
void onInit() override
Definition: priority_mux.cpp:37
cras::priority_mux::TopicConfig::inTopic
::std::string inTopic
The input topic.
Definition: priority_mux_base.h:35
cras::PriorityMuxNodelet::activePriorityPub
::ros::Publisher activePriorityPub
Publisher for active priority.
Definition: priority_mux.h:191
cras::PriorityMuxNodelet::disableCb
virtual void disableCb(const ::std::string &inTopic, bool invert, const ::ros::MessageEvent<::topic_tools::ShapeShifter const > &event)
Callback method triggered when a disable message is received.
Definition: priority_mux.cpp:452
CRAS_WARN
#define CRAS_WARN(...)
bag.h
ros::MessageEvent::getConstMessage
const boost::shared_ptr< ConstMessage > & getConstMessage() const
priority_mux.h
Priority-based muxer nodelet for topics.
cras::BoundParamHelper
CRAS_ERROR
#define CRAS_ERROR(...)
nodelet::Nodelet::getPrivateNodeHandle
ros::NodeHandle & getPrivateNodeHandle() const
ros::Publisher::publish
void publish(const boost::shared_ptr< M > &message) const
ros::NodeHandle::advertise
Publisher advertise(AdvertiseOptions &ops)
cras::PriorityMuxNodelet::queueSize
size_t queueSize
Queue size of all publishers and subscribers.
Definition: priority_mux.h:227
ros::ok
ROSCPP_DECL bool ok()
ros::TransportHints::tcpNoDelay
TransportHints & tcpNoDelay(bool nodelay=true)
functional.hpp
cras::PriorityMuxNodelet::cb
virtual void cb(const ::std::string &inTopic, const ::ros::MessageEvent<::topic_tools::ShapeShifter const > &event)
Callback method triggered when a message is received on the input topic.
Definition: priority_mux.cpp:384
XmlRpc::XmlRpcValue::TypeStruct
TypeStruct
cras::PriorityMuxNodelet::lastActivePriority
::cras::optional< int > lastActivePriority
The active priority after the last publishChanges() call.
Definition: priority_mux.h:215
CRAS_INFO
#define CRAS_INFO(...)
cras::PriorityMuxNodelet
Priority-based muxer nodelet for topics.
Definition: priority_mux.h:117
rosbag::BagException
cras::PriorityMuxNodelet::lastSelectedTopics
::std::unordered_map<::std::string, ::std::string > lastSelectedTopics
The selected output topics after the last publishChanges() call.
Definition: priority_mux.h:218
cras::PriorityMuxNodelet::lockCb
virtual void lockCb(const ::std::string &topic, const ::ros::MessageEvent<::std_msgs::Bool const > &event)
Callback method triggered when a lock message is received.
Definition: priority_mux.cpp:445
ros::NodeHandle::resolveName
std::string resolveName(const std::string &name, bool remap=true) const
cras::priority_mux::LockConfig::priority
int priority
The priority for the lock. Usually is a positive number.
Definition: priority_mux_base.h:49
ros::DefaultMessageCreator
PLUGINLIB_EXPORT_CLASS
#define PLUGINLIB_EXPORT_CLASS(class_type, base_class_type)
cras::PriorityMuxNodelet::beforeDisableMessagesWithHeader
::std::unordered_set<::std::string > beforeDisableMessagesWithHeader
Cached list of "before disable" messages that have a Header.
Definition: priority_mux.h:224
cras::DEFAULT_OUT_TOPIC
constexpr auto DEFAULT_OUT_TOPIC
Definition: priority_mux.cpp:34
priority_mux_base.h
Priority-based muxer for topics.
frameId
std::string const * frameId(const M &m)
ros::WallTime::now
static WallTime now()
NodeletWithDiagnostics< ::nodelet::Nodelet >::data
::std::unique_ptr<::cras::impl::NodeletWithDiagnosticsPrivate > data
cras::priority_mux::TopicConfig::name
::std::string name
The human-readable name of the config.
Definition: priority_mux_base.h:34
cras::PriorityMuxNodelet::resetSub
::ros::Subscriber resetSub
Subscriber for reset topic.
Definition: priority_mux.h:197
cras::PriorityMuxNodelet::lockConfigs
::std::unordered_map<::std::string, ::cras::priority_mux::LockConfig > lockConfigs
Configurations of lock topics.
Definition: priority_mux.h:206
cras::PriorityMuxNodelet::topicConfigs
::std::unordered_map<::std::string, ::cras::priority_mux::TopicConfig > topicConfigs
Configurations of input topics.
Definition: priority_mux.h:203
cras::priority_mux::TopicConfig
Configuration of an input to the mux.
Definition: priority_mux_base.h:32
cras::PriorityMuxNodelet::mux
::std::unique_ptr<::cras::PriorityMux > mux
Mux instance.
Definition: priority_mux.h:182
cras::priority_mux::TopicConfig::timeout
::ros::Duration timeout
How long do messages on this topic keep the priority active.
Definition: priority_mux_base.h:38
ros::NodeHandle::subscribe
Subscriber subscribe(const std::string &topic, uint32_t queue_size, const boost::function< void(C)> &callback, const VoidConstPtr &tracked_object=VoidConstPtr(), const TransportHints &transport_hints=TransportHints())
ros::TimerEvent
ros::TimerEvent::current_real
Time current_real
cras::PriorityMuxNodelet::outTopicConfigs
std::unordered_map< std::string, ::cras::priority_mux::OutputTopicConfig > outTopicConfigs
Configurations of output topics.
Definition: priority_mux.h:209
XmlRpc::XmlRpcValue::getType
const Type & getType() const
XmlRpc::XmlRpcValue::TypeArray
TypeArray
view.h
cras::GetParamException
cras::PriorityMuxNodelet::resetCb
virtual void resetCb(const ::ros::MessageEvent<::topic_tools::ShapeShifter const > &event)
Callback method triggered when a reset message is received.
Definition: priority_mux.cpp:565
rosbag::View
cras::bind_front
auto bind_front(F &&f, Args &&... args)
cras::PriorityMuxNodelet::publishers
::std::unordered_map<::std::string, ::ros::Publisher > publishers
Map of output topic and the associated publisher.
Definition: priority_mux.h:188
cras::PriorityMuxNodelet::beforeDisableMessages
::std::unordered_map<::std::string, ::ros::MessageEvent<::cras::ShapeShifter > > beforeDisableMessages
The messages to be injected into the mux right before a topic is disabled.
Definition: priority_mux.h:221
nodelet::Nodelet
CRAS_WARN_THROTTLE
#define CRAS_WARN_THROTTLE(period,...)
ros::Time
NodeletParamHelper< ::nodelet::Nodelet >::params
::cras::BoundParamHelperPtr params(const ::ros::NodeHandle &node, const ::std::string &ns="") const
cras::PriorityMuxNodelet::tcpNoDelay
bool tcpNoDelay
Whether tcpNoDelay() flag should be set for all subscribers.
Definition: priority_mux.h:230
cras::PriorityMuxNodelet::selectedPublishers
::std::unordered_map<::std::string, ::ros::Publisher > selectedPublishers
Map of publishers of the topics announcing currently selected publishers.
Definition: priority_mux.h:194
cras::getBuffer
uint8_t * getBuffer(::topic_tools::ShapeShifter &msg)
Get the internal buffer with serialized message data.
CRAS_DEBUG
#define CRAS_DEBUG(...)
class_list_macros.hpp
cras::priority_mux::TopicConfig::queueSize
size_t queueSize
Queue size for the subscriber.
Definition: priority_mux_base.h:39
ros::MessageEvent::getConnectionHeaderPtr
const boost::shared_ptr< M_string > & getConnectionHeaderPtr() const
cras::HasLogger::log
::cras::LogHelperPtr log
cras::PriorityMuxNodelet::reset
void reset()
Reset the mux to its initial state.
Definition: priority_mux.cpp:550
cras::PriorityMuxNodelet::outTopics
::std::unordered_set<::std::string > outTopics
All output topic names.
Definition: priority_mux.h:212
cras::BoundParamHelper::hasParam
bool hasParam(const ::std::string &name, const bool searchNested=true) const
cras::PriorityMuxNodelet::timers
::std::unordered_map<::std::string, ::ros::Timer > timers
Map of timer names and the timers.
Definition: priority_mux.h:200
XmlRpc::XmlRpcValue::begin
iterator begin()
ros::WallDuration
cras::PriorityMuxNodelet::onTimeout
void onTimeout(const ::std::string &name, const ::ros::TimerEvent &)
Method triggered on timer timeout. This should update the mux and check for changes.
Definition: priority_mux.cpp:530
topic_tools::ShapeShifter
header
const std::string header
ros::NodeHandle::createTimer
Timer createTimer(Duration period, const TimerCallback &callback, bool oneshot=false, bool autostart=true) const
rosbag::View::begin
iterator begin()
cras::DEFAULT_NONE_TOPIC
constexpr auto DEFAULT_NONE_TOPIC
Definition: priority_mux.cpp:35
ros::Duration
cras::priority_mux::LockConfig::timeout
::ros::Duration timeout
Definition: priority_mux_base.h:50
ros::MessageEvent::init
void init(const ConstMessagePtr &message, const boost::shared_ptr< M_string > &connection_header, ros::Time receipt_time, bool nonconst_need_copy, const CreateFunction &create)
cras::getHeader
::cras::optional<::std_msgs::Header > getHeader(const ::topic_tools::ShapeShifter &msg)
Get the header field of the given message, if it has any.
cras::priority_mux::TopicConfig::outTopic
::std::string outTopic
The output topic.
Definition: priority_mux_base.h:36
cras::PriorityMuxNodelet::subscribers
::std::list<::ros::Subscriber > subscribers
List of subscribers. Just for keeping them alive.
Definition: priority_mux.h:185
XmlRpc::XmlRpcValue
cras::PriorityMuxNodelet::setTimer
void setTimer(const ::std::string &name, const ::ros::Duration &timeout)
Set a timer that will call onTimeout() after the specified time.
Definition: priority_mux.cpp:537
ros::MessageEvent
cras::BoundParamHelper::getParam
inline ::std::string getParam(const ::std::string &name, const ::cras::optional< const char * > &defaultValue, const ::std::string &unit="", const ::cras::GetParamOptions<::std::string > &options={}) const
cras::to_string
inline ::std::string to_string(char *value)
cras::resizeBuffer
void resizeBuffer(::topic_tools::ShapeShifter &msg, size_t newLength)
Resize the internal buffer of the message.
ros::Time::now
static Time now()
cras::setHeader
bool setHeader(::topic_tools::ShapeShifter &msg, ::std_msgs::Header &header)
Change the header field of the given message, if it has any.


cras_topic_tools
Author(s): Martin Pecka
autogenerated on Sun Jan 5 2025 03:50:49