priority_mux.cpp
Go to the documentation of this file.
1 // SPDX-License-Identifier: BSD-3-Clause
2 // SPDX-FileCopyrightText: Czech Technical University in Prague
3 
10 #include <list>
11 #include <memory>
12 #include <string>
13 #include <unordered_map>
14 #include <utility>
15 
17 #include <ros/ros.h>
18 #include <rosbag/bag.h>
19 #include <rosbag/view.h>
20 #include <std_msgs/Bool.h>
21 #include <std_msgs/Int32.h>
22 #include <std_msgs/String.h>
23 
29 
30 namespace cras
31 {
32 
33 constexpr auto DEFAULT_OUT_TOPIC = "mux_out";
34 constexpr auto DEFAULT_NONE_TOPIC = "__none";
35 
37 {
38  const auto params = this->privateParams();
39 
40  this->queueSize = params->getParam("queue_size", 10_sz, "messages");
41  this->tcpNoDelay = params->getParam("tcp_no_delay", false);
42 
43  const auto defaultOutTopic = params->getParam("default_out_topic", std::string(DEFAULT_OUT_TOPIC));
44  const auto noneTopic = params->getParam("none_topic", std::string(DEFAULT_NONE_TOPIC));
45  const auto nonePriority = params->getParam("none_priority", 0);
46  const auto defaultSubscriberConnectDelay = params->getParam("subscriber_connect_delay", ros::WallDuration(0.1));
47 
48  XmlRpc::XmlRpcValue topicParams;
49  try
50  {
51  topicParams = params->getParam<XmlRpc::XmlRpcValue>("topics", cras::nullopt);
52  }
53  catch (const cras::GetParamException& e)
54  {
55  CRAS_ERROR("Parameter ~topics is empty, priority mux will not do anything!");
56  return;
57  }
58 
59  std::list<std::pair<std::string, XmlRpc::XmlRpcValue>> topicItems;
60  switch (topicParams.getType())
61  {
63  for (size_t i = 0; i < topicParams.size(); ++i)
64  topicItems.emplace_back("[" + cras::to_string(i) + "]", topicParams[i]);
65  break;
67  for (const auto& item : topicParams)
68  topicItems.emplace_back("/" + item.first, item.second);
69  break;
70  default:
71  CRAS_ERROR("Parameter ~topics has to be either a list or a dict. Priority mux will not do anything!");
72  return;
73  }
74 
75  std::unordered_map<std::string, std::string> disableTopics;
76 
77  for (const auto& item : topicItems)
78  {
79  const auto& key = item.first;
80  const auto& xmlConfig = item.second;
81  if (xmlConfig.getType() != XmlRpc::XmlRpcValue::TypeStruct)
82  {
83  CRAS_ERROR("Item %s of ~topics has to be a dict, but %s was given.",
84  key.c_str(), cras::to_string(xmlConfig.getType()).c_str());
85  continue;
86  }
87 
88  auto itemNamespace = this->getPrivateNodeHandle().resolveName("topics") + key;
89  auto xmlParamAdapter = std::make_shared<cras::XmlRpcValueGetParamAdapter>(xmlConfig, itemNamespace);
90  cras::BoundParamHelper xmlParams(this->log, xmlParamAdapter);
92 
93  try
94  {
95  config.inTopic = xmlParams.getParam<std::string>("topic", cras::nullopt);
96  }
97  catch (const cras::GetParamException& e)
98  {
99  CRAS_ERROR("Item %s of ~topics has to contain key 'topic'. Skipping the item.", key.c_str());
100  continue;
101  }
102 
103  config.name = xmlParams.getParam("name", config.inTopic);
104  config.outTopic = xmlParams.getParam("out_topic", defaultOutTopic);
105  config.queueSize = xmlParams.getParam("queue_size", this->queueSize);
106  try
107  {
108  config.priority = xmlParams.getParam<int>("priority", cras::nullopt);
109  }
110  catch (const cras::GetParamException& e)
111  {
112  CRAS_ERROR("Item %s of ~topics has to contain key 'priority'. Skipping the item.", key.c_str());
113  continue;
114  }
115  try
116  {
117  config.timeout = xmlParams.getParam<ros::Duration>("timeout", cras::nullopt, "s");
118  }
119  catch (const cras::GetParamException& e)
120  {
121  CRAS_ERROR("Item %s of ~topics has to contain key 'timeout'. Skipping the item.", key.c_str());
122  continue;
123  }
124 
125  const auto disableTopic = xmlParams.getParam("disable_topic", std::string());
126  if (!disableTopic.empty())
127  {
128  disableTopics[config.inTopic] = disableTopic;
129  // If there should be a message sent just before disabling this topic, read the message from a bag
130  const auto beforeDisableMessageBag = xmlParams.getParam("before_disable_message", std::string());
131  if (!beforeDisableMessageBag.empty())
132  {
133  try
134  {
135  rosbag::Bag bag(beforeDisableMessageBag);
136  rosbag::View view(bag);
137  if (view.size() == 0)
138  {
139  CRAS_ERROR("Bag file %s does not contain any messages.", beforeDisableMessageBag.c_str());
140  }
141  else
142  {
143  // Read the first message from the bag and copy it to a ShapeShifter
144  const auto msg = *view.begin();
145  auto shifter = boost::make_shared<cras::ShapeShifter>();
146  cras::resizeBuffer(*shifter, msg.size());
147  ros::serialization::OStream stream(cras::getBuffer(*shifter), msg.size());
148  msg.write(stream);
149  shifter->morph(msg.getMD5Sum(), msg.getDataType(), msg.getMessageDefinition(), msg.isLatching() ? "1" : "");
150  // If the message has Header, figure out if we need to overwrite frame_id
151  if (cras::hasHeader(*shifter))
152  {
153  // This set is read by disableCb().
154  this->beforeDisableMessagesWithHeader.insert(config.inTopic);
155  const auto frameId = xmlParams.getParam("before_disable_message_frame_id", std::string());
156  if (!frameId.empty())
157  {
158  auto header = cras::getHeader(*shifter);
159  if (header)
160  {
161  header->frame_id = frameId;
162  cras::setHeader(*shifter, *header);
163  }
164  }
165  }
166  // Create a fake message event. Time does not matter as the code in disableCb() ignores it.
168  event.init(shifter, msg.getConnectionHeader(), ros::Time(0), true,
170  this->beforeDisableMessages[config.inTopic] = event;
171  }
172  }
173  catch (const rosbag::BagException& e)
174  {
175  CRAS_ERROR("%s", e.what());
176  }
177  }
178  }
179 
180  this->topicConfigs[config.inTopic] = config;
181  }
182 
183  XmlRpc::XmlRpcValue lockParams;
184  lockParams.begin(); // convert to struct
185 
186  lockParams = params->getParam("locks", lockParams);
187  std::list<std::pair<std::string, XmlRpc::XmlRpcValue>> lockItems;
188  switch (lockParams.getType())
189  {
191  for (size_t i = 0; i < lockParams.size(); ++i)
192  lockItems.emplace_back("[" + cras::to_string(i) + "]", lockParams[i]);
193  break;
195  for (const auto& item : lockParams)
196  lockItems.emplace_back("/" + item.first, item.second);
197  break;
198  default:
199  CRAS_ERROR("Parameter ~locks has to be either a list or a dict. No locks will be set.");
200  return;
201  }
202 
203  if (lockItems.empty())
204  CRAS_INFO("No locks were specified.");
205 
206  for (const auto& item : lockItems)
207  {
208  const auto& key = item.first;
209  const auto& xmlConfig = item.second;
210  if (xmlConfig.getType() != XmlRpc::XmlRpcValue::TypeStruct)
211  {
212  CRAS_ERROR("Item %s of ~locks has to be a dict, but %s was given.",
213  key.c_str(), cras::to_string(xmlConfig.getType()).c_str());
214  continue;
215  }
216 
217  auto itemNamespace = this->getPrivateNodeHandle().resolveName("locks") + key;
218  auto xmlParamAdapter = std::make_shared<cras::XmlRpcValueGetParamAdapter>(xmlConfig, itemNamespace);
219  cras::BoundParamHelper xmlParams(this->log, xmlParamAdapter);
221 
222  try
223  {
224  config.topic = xmlParams.getParam<std::string>("topic", cras::nullopt);
225  }
226  catch (const cras::GetParamException& e)
227  {
228  CRAS_ERROR("Item %s of ~locks has to contain key 'topic'. Skipping the item.", key.c_str());
229  continue;
230  }
231 
232  config.name = xmlParams.getParam("name", config.topic);
233  try
234  {
235  config.priority = xmlParams.getParam<int>("priority", cras::nullopt);
236  }
237  catch (const cras::GetParamException& e)
238  {
239  CRAS_ERROR("Item %s of ~locks has to contain key 'priority'. Skipping the item.", key.c_str());
240  continue;
241  }
242  try
243  {
244  config.timeout = xmlParams.getParam<ros::Duration>("timeout", cras::nullopt, "s");
245  }
246  catch (const cras::GetParamException& e)
247  {
248  CRAS_ERROR("Item %s of ~locks has to contain key 'timeout'. Skipping the item.", key.c_str());
249  continue;
250  }
251 
252  this->lockConfigs[config.topic] = config;
253  }
254 
255  ros::TransportHints transportHints;
256  transportHints.tcpNoDelay(this->tcpNoDelay);
257 
258  for (const auto& config : this->topicConfigs)
259  {
260  const auto& topicConfig = config.second;
261  this->outTopics.insert(topicConfig.outTopic);
262  this->outTopicConfigs[topicConfig.outTopic].queueSize = this->queueSize;
263  this->outTopicConfigs[topicConfig.outTopic].subscriberConnectDelay = defaultSubscriberConnectDelay;
264 
265  const auto cb = cras::bind_front(&PriorityMuxNodelet::cb, this, topicConfig.inTopic);
266  const auto sub = this->getNodeHandle().subscribe<topic_tools::ShapeShifter>(
267  topicConfig.inTopic, topicConfig.queueSize, cb, nullptr, transportHints);
268  this->subscribers.push_back(sub);
269  }
270 
271  std::list<std::pair<std::string, XmlRpc::XmlRpcValue>> outTopicItems;
272  if (params->hasParam("out_topics"))
273  {
274  const auto outTopicParams = params->getParam<XmlRpc::XmlRpcValue>("out_topics", cras::nullopt);
275  switch (outTopicParams.getType())
276  {
278  for (size_t i = 0; i < outTopicParams.size(); ++i)
279  outTopicItems.emplace_back("[" + cras::to_string(i) + "]", outTopicParams[i]);
280  break;
282  for (const auto& item : outTopicParams)
283  outTopicItems.emplace_back("/" + item.first, item.second);
284  break;
285  default:
286  CRAS_ERROR("Parameter ~out_topics has to be either a list or a dict. It will be ignored!");
287  }
288  }
289 
290  for (const auto& item : outTopicItems)
291  {
292  const auto& key = item.first;
293  const auto& xmlConfig = item.second;
294  if (xmlConfig.getType() != XmlRpc::XmlRpcValue::TypeStruct)
295  {
296  CRAS_ERROR("Item %s of ~out_topics has to be a dict, but %s was given.",
297  key.c_str(), cras::to_string(xmlConfig.getType()).c_str());
298  continue;
299  }
300 
301  auto itemNamespace = this->getPrivateNodeHandle().resolveName("out_topics") + key;
302  auto xmlParamAdapter = std::make_shared<cras::XmlRpcValueGetParamAdapter>(xmlConfig, itemNamespace);
303  cras::BoundParamHelper xmlParams(this->log, xmlParamAdapter);
304 
305  std::string topic;
306  try
307  {
308  topic = xmlParams.getParam<std::string>("topic", cras::nullopt);
309  if (topic.empty())
310  {
311  CRAS_ERROR("Item %s of ~out_topics has to contain key 'topic'. Skipping the item.", key.c_str());
312  continue;
313  }
314  else
315  {
316  this->outTopicConfigs[topic].topic = topic;
317  }
318  }
319  catch (const cras::GetParamException& e)
320  {
321  CRAS_ERROR("Item %s of ~out_topics has to contain key 'topic'. Skipping the item.", key.c_str());
322  continue;
323  }
324 
325  auto& config = this->outTopicConfigs[topic];
326 
327  config.queueSize = xmlParams.getParam("queue_size", config.queueSize);
328  config.subscriberConnectDelay = xmlParams.getParam("subscriber_connect_delay", config.subscriberConnectDelay);
329  config.numSubscribersToWait = xmlParams.getParam("num_subscribers_to_wait", config.numSubscribersToWait);
330  if (xmlParams.hasParam("force_latch"))
331  config.forceLatch = xmlParams.getParam<bool>("force_latch", cras::nullopt);
332  }
333 
334  for (const auto& outTopic : this->outTopics)
335  {
336  const auto selectedTopic = "selected/" + cras::stripLeading(outTopic, '/');
337  this->selectedPublishers[outTopic] =
338  this->getPrivateNodeHandle().advertise<std_msgs::String>(selectedTopic, this->queueSize, true);
339  }
340 
341  this->mux = std::make_unique<PriorityMux>(this->topicConfigs, this->lockConfigs,
343  noneTopic, nonePriority);
344 
345  this->activePriorityPub = this->getPrivateNodeHandle().advertise<std_msgs::Int32>(
346  "active_priority", this->queueSize, true);
347 
348  this->resetSub = this->getPrivateNodeHandle().subscribe(
349  "reset", this->queueSize, &PriorityMuxNodelet::resetCb, this, transportHints);
350 
351  for (const auto& inTopicAndDisableTopic : disableTopics)
352  {
353  const auto& inTopic = inTopicAndDisableTopic.first;
354  const auto& disableTopic = inTopicAndDisableTopic.second;
355 
356  const auto cb = cras::bind_front(&PriorityMuxNodelet::disableCb, this, inTopic);
357  const auto sub = this->getNodeHandle().subscribe<topic_tools::ShapeShifter>(
358  disableTopic, this->queueSize, cb, nullptr, transportHints);
359  this->subscribers.push_back(sub);
360  }
361 
362  for (const auto& config : this->lockConfigs)
363  {
364  const auto& lockConfig = config.second;
365  const auto cb = cras::bind_front(&PriorityMuxNodelet::lockCb, this, lockConfig.topic);
366  const auto sub = this->getNodeHandle().subscribe<std_msgs::Bool>(
367  lockConfig.topic, this->queueSize, cb, nullptr, transportHints);
368  this->subscribers.push_back(sub);
369  }
370 
371  ros::WallDuration(0.1).sleep(); // Give publishers and subscribers time to wire up
372 
373  this->publishChanges();
374 }
375 
377  const std::string& inTopic, const ros::MessageEvent<const topic_tools::ShapeShifter>& event)
378 {
379  const auto& topicConfig = this->topicConfigs[inTopic];
380 
381  CRAS_DEBUG("Received message on topic %s with priority %i. Current priority %i.",
382  inTopic.c_str(), topicConfig.priority, this->mux->getActivePriority());
383 
384  const auto shouldPublish = this->mux->cb(inTopic, event.getReceiptTime(), ros::Time::now());
385 
386  if (shouldPublish)
387  {
388  auto& pub = this->publishers[topicConfig.outTopic];
389  if (!pub)
390  {
391  const auto& outTopicConfig = this->outTopicConfigs[topicConfig.outTopic];
392 
393  // Figure out if the publisher should be latched
394  auto latch = false;
395  if (event.getConnectionHeaderPtr() != nullptr)
396  latch = event.getConnectionHeader()["latching"] == "1";
397  if (outTopicConfig.forceLatch.has_value())
398  latch = *outTopicConfig.forceLatch;
399 
400  // Create the publisher
401  pub = event.getConstMessage()->advertise(
402  this->getNodeHandle(), topicConfig.outTopic, outTopicConfig.queueSize, latch);
403  CRAS_DEBUG("Created publisher %s with type %s.", topicConfig.outTopic.c_str(),
404  event.getConstMessage()->getDataType().c_str());
405 
406  // Give the publisher some time to wire up so that we don't lose the first message.
407  if (!latch)
408  {
409  const auto waitStart = ros::WallTime::now();
410  while (ros::ok() && pub.getNumSubscribers() < outTopicConfig.numSubscribersToWait)
411  {
412  ros::WallDuration(0.01).sleep();
413  if (ros::WallTime::now() - waitStart > ros::WallDuration(1, 0))
414  {
415  CRAS_WARN_THROTTLE(1.0, "Waiting for subscribers to topic %s (has %u/%zu).",
416  topicConfig.outTopic.c_str(), pub.getNumSubscribers(),
417  outTopicConfig.numSubscribersToWait);
418  }
419  }
420  if (ros::WallTime::now() < (waitStart + outTopicConfig.subscriberConnectDelay))
421  (outTopicConfig.subscriberConnectDelay - (ros::WallTime::now() - waitStart)).sleep();
422  CRAS_DEBUG("Waited %f seconds before publishing.", (ros::WallTime::now() - waitStart).toSec());
423  }
424  }
425 
426  CRAS_DEBUG("Publishing message on topic %s.", inTopic.c_str());
427  pub.publish(event.getConstMessage());
428  }
429  else
430  {
431  CRAS_DEBUG("Discarding message on topic %s.", inTopic.c_str());
432  }
433 
434  this->publishChanges();
435 }
436 
437 void PriorityMuxNodelet::lockCb(const std::string& topic, const ros::MessageEvent<std_msgs::Bool const>& event)
438 {
439  this->mux->lockCb(topic, event.getReceiptTime(), event.getConstMessage()->data, ros::Time::now());
440 
441  this->publishChanges();
442 }
443 
445  const std::string& inTopic, const ros::MessageEvent<const topic_tools::ShapeShifter>& event)
446 {
447  bool disable = true;
449  disable = event.getConstMessage()->instantiate<std_msgs::Bool>()->data;
450 
451  // Publish the "before disabled" message if there is any and the topic gets disabled right now
452  if (disable && !this->mux->isDisabled(inTopic, event.getReceiptTime()))
453  {
454  const auto& it = this->beforeDisableMessages.find(inTopic);
455  if (it != this->beforeDisableMessages.end())
456  {
457  const auto& itEvent = it->second;
458  auto message = itEvent.getConstMessage();
459  auto nonconstWillCopy = itEvent.nonConstWillCopy();
460  // If the message type has header, update stamp to current time
461  if (this->beforeDisableMessagesWithHeader.find(inTopic) != this->beforeDisableMessagesWithHeader.end())
462  {
463  auto header = cras::getHeader(*message);
464  if (header)
465  {
466  header->stamp = ros::Time::now();
467  auto nonconstMessage = itEvent.getMessage();
468  cras::setHeader(*nonconstMessage, *header);
469  message = nonconstMessage;
470  nonconstWillCopy = false; // We already made a copy and nobody else will use it
471  }
472  }
473 
474  // Create a fake message event
476  beforeEvent.init(message, itEvent.getConnectionHeaderPtr(), event.getReceiptTime(), nonconstWillCopy,
477  itEvent.getMessageFactory());
478 
479  // Inject the constructed message as if it were received on the inTopic.
480  this->cb(inTopic, beforeEvent);
481  }
482  }
483 
484  this->mux->disableCb(inTopic, event.getReceiptTime(), disable, ros::Time::now());
485 
486  this->publishChanges();
487 }
488 
490 {
491  const auto newPriority = this->mux->getActivePriority();
492  if (!this->lastActivePriority || newPriority != *this->lastActivePriority)
493  {
494  CRAS_INFO("Priority %i is now active.", newPriority);
495  std_msgs::Int32 msg;
496  msg.data = newPriority;
497  this->activePriorityPub.publish(msg);
498  }
499  this->lastActivePriority = newPriority;
500 
501  const auto& newTopics = this->mux->getLastSelectedTopics();
502  const auto& oldTopics = this->lastSelectedTopics;
503  for (const auto& outTopic : this->outTopics)
504  {
505  const auto newIt = newTopics.find(outTopic);
506  const auto oldIt = oldTopics.find(outTopic);
507  if (newIt == newTopics.end() || oldIt == oldTopics.end() || newIt->second != oldIt->second)
508  {
509  CRAS_INFO("Source topic '%s' is now selected for output topic '%s'.", newIt->second.c_str(), outTopic.c_str());
510  std_msgs::String msg;
511  msg.data = newIt->second;
512  this->selectedPublishers[outTopic].publish(msg);
513  }
514  }
515  this->lastSelectedTopics = newTopics;
516 }
517 
518 void PriorityMuxNodelet::onTimeout(const std::string&, const ros::TimerEvent& event)
519 {
520  this->mux->update(event.current_real);
521 
522  this->publishChanges();
523 }
524 
525 void PriorityMuxNodelet::setTimer(const std::string& name, const ros::Duration& timeout)
526 {
527  if (this->timers.find(name) == this->timers.end())
528  {
529  const auto cb = cras::bind_front(&PriorityMuxNodelet::onTimeout, this, name);
530  this->timers[name] = this->getNodeHandle().createTimer(timeout, cb, true, true);
531  }
532  else
533  {
534  this->timers[name].setPeriod(timeout);
535  }
536 }
537 
539 {
540  CRAS_WARN("Resetting mux.");
541 
542  for (auto& timer : this->timers)
543  timer.second.stop();
544  this->timers.clear();
545 
546  this->lastActivePriority.reset();
547  this->lastSelectedTopics.clear();
548  this->mux->reset(ros::Time::now());
549 
550  this->publishChanges();
551 }
552 
554 {
555  this->reset();
556 }
557 
558 }
559 
XmlRpc::XmlRpcValue::size
int size() const
cras::priority_mux::LockConfig::name
::std::string name
The human-readable name of the config.
Definition: priority_mux_base.h:47
ros::MessageEvent::getReceiptTime
ros::Time getReceiptTime() const
ros::WallDuration::sleep
bool sleep() const
ros::serialization::OStream
cras::priority_mux::LockConfig
Configuration of a lock topic for the mux.
Definition: priority_mux_base.h:45
cras::priority_mux::TopicConfig::priority
int priority
The priority for the topic. Usually is a positive number.
Definition: priority_mux_base.h:37
rosbag::Bag
msg
msg
nodelet::Nodelet::getNodeHandle
ros::NodeHandle & getNodeHandle() const
rosbag::View::size
uint32_t size()
cras
xmlrpc_value_utils.hpp
cras::hasHeader
bool hasHeader(const ::topic_tools::ShapeShifter &msg)
Tell whether the given message has header field.
shape_shifter.h
Tools for more convenient working with ShapeShifter objects.
NodeletParamHelper< ::nodelet::Nodelet >::privateParams
::cras::BoundParamHelperPtr privateParams(const ::std::string &ns="") const
ros.h
cras::stripLeading
void stripLeading(::std::string &s, const char &c=' ')
cras::PriorityMuxNodelet::publishChanges
void publishChanges()
Check what changed from the last publishChanges() call and if there are changes, publish them (active...
Definition: priority_mux.cpp:489
cras::ParamHelper::getLogger
::cras::LogHelperPtr getLogger() const
cras::priority_mux::LockConfig::topic
::std::string topic
The topic associated with the lock.
Definition: priority_mux_base.h:48
ros::message_traits::DataType
ros::TransportHints
cras::PriorityMuxNodelet::onInit
void onInit() override
Definition: priority_mux.cpp:36
cras::priority_mux::TopicConfig::inTopic
::std::string inTopic
The input topic.
Definition: priority_mux_base.h:35
cras::PriorityMuxNodelet::activePriorityPub
::ros::Publisher activePriorityPub
Publisher for active priority.
Definition: priority_mux.h:183
CRAS_WARN
#define CRAS_WARN(...)
bag.h
ros::MessageEvent::getConstMessage
const boost::shared_ptr< ConstMessage > & getConstMessage() const
priority_mux.h
Priority-based muxer nodelet for topics.
cras::BoundParamHelper
CRAS_ERROR
#define CRAS_ERROR(...)
nodelet::Nodelet::getPrivateNodeHandle
ros::NodeHandle & getPrivateNodeHandle() const
ros::Publisher::publish
void publish(const boost::shared_ptr< M > &message) const
ros::NodeHandle::advertise
Publisher advertise(AdvertiseOptions &ops)
cras::PriorityMuxNodelet::queueSize
size_t queueSize
Queue size of all publishers and subscribers.
Definition: priority_mux.h:219
ros::ok
ROSCPP_DECL bool ok()
ros::TransportHints::tcpNoDelay
TransportHints & tcpNoDelay(bool nodelay=true)
functional.hpp
cras::PriorityMuxNodelet::cb
virtual void cb(const ::std::string &inTopic, const ::ros::MessageEvent<::topic_tools::ShapeShifter const > &event)
Callback method triggered when a message is received on the input topic.
Definition: priority_mux.cpp:376
XmlRpc::XmlRpcValue::TypeStruct
TypeStruct
cras::PriorityMuxNodelet::lastActivePriority
::cras::optional< int > lastActivePriority
The active priority after the last publishChanges() call.
Definition: priority_mux.h:207
CRAS_INFO
#define CRAS_INFO(...)
cras::PriorityMuxNodelet
Priority-based muxer nodelet for topics.
Definition: priority_mux.h:113
rosbag::BagException
cras::PriorityMuxNodelet::lastSelectedTopics
::std::unordered_map<::std::string, ::std::string > lastSelectedTopics
The selected output topics after the last publishChanges() call.
Definition: priority_mux.h:210
cras::PriorityMuxNodelet::lockCb
virtual void lockCb(const ::std::string &topic, const ::ros::MessageEvent<::std_msgs::Bool const > &event)
Callback method triggered when a lock message is received.
Definition: priority_mux.cpp:437
ros::NodeHandle::resolveName
std::string resolveName(const std::string &name, bool remap=true) const
cras::priority_mux::LockConfig::priority
int priority
The priority for the lock. Usually is a positive number.
Definition: priority_mux_base.h:49
ros::DefaultMessageCreator
PLUGINLIB_EXPORT_CLASS
#define PLUGINLIB_EXPORT_CLASS(class_type, base_class_type)
cras::PriorityMuxNodelet::beforeDisableMessagesWithHeader
::std::unordered_set<::std::string > beforeDisableMessagesWithHeader
Cached list of "before disable" messages that have a Header.
Definition: priority_mux.h:216
cras::DEFAULT_OUT_TOPIC
constexpr auto DEFAULT_OUT_TOPIC
Definition: priority_mux.cpp:33
priority_mux_base.h
Priority-based muxer for topics.
frameId
std::string const * frameId(const M &m)
ros::WallTime::now
static WallTime now()
NodeletWithDiagnostics< ::nodelet::Nodelet >::data
::std::unique_ptr<::cras::impl::NodeletWithDiagnosticsPrivate > data
cras::priority_mux::TopicConfig::name
::std::string name
The human-readable name of the config.
Definition: priority_mux_base.h:34
cras::PriorityMuxNodelet::resetSub
::ros::Subscriber resetSub
Subscriber for reset topic.
Definition: priority_mux.h:189
cras::PriorityMuxNodelet::lockConfigs
::std::unordered_map<::std::string, ::cras::priority_mux::LockConfig > lockConfigs
Configurations of lock topics.
Definition: priority_mux.h:198
cras::PriorityMuxNodelet::topicConfigs
::std::unordered_map<::std::string, ::cras::priority_mux::TopicConfig > topicConfigs
Configurations of input topics.
Definition: priority_mux.h:195
cras::priority_mux::TopicConfig
Configuration of an input to the mux.
Definition: priority_mux_base.h:32
cras::PriorityMuxNodelet::mux
::std::unique_ptr<::cras::PriorityMux > mux
Mux instance.
Definition: priority_mux.h:174
cras::priority_mux::TopicConfig::timeout
::ros::Duration timeout
How long do messages on this topic keep the priority active.
Definition: priority_mux_base.h:38
ros::NodeHandle::subscribe
Subscriber subscribe(const std::string &topic, uint32_t queue_size, const boost::function< void(C)> &callback, const VoidConstPtr &tracked_object=VoidConstPtr(), const TransportHints &transport_hints=TransportHints())
ros::TimerEvent
ros::TimerEvent::current_real
Time current_real
cras::PriorityMuxNodelet::outTopicConfigs
std::unordered_map< std::string, ::cras::priority_mux::OutputTopicConfig > outTopicConfigs
Configurations of output topics.
Definition: priority_mux.h:201
XmlRpc::XmlRpcValue::getType
const Type & getType() const
XmlRpc::XmlRpcValue::TypeArray
TypeArray
view.h
cras::GetParamException
cras::PriorityMuxNodelet::resetCb
virtual void resetCb(const ::ros::MessageEvent<::topic_tools::ShapeShifter const > &event)
Callback method triggered when a reset message is received.
Definition: priority_mux.cpp:553
rosbag::View
cras::bind_front
auto bind_front(F &&f, Args &&... args)
cras::PriorityMuxNodelet::publishers
::std::unordered_map<::std::string, ::ros::Publisher > publishers
Map of output topic and the associated publisher.
Definition: priority_mux.h:180
cras::PriorityMuxNodelet::beforeDisableMessages
::std::unordered_map<::std::string, ::ros::MessageEvent<::cras::ShapeShifter > > beforeDisableMessages
The messages to be injected into the mux right before a topic is disabled.
Definition: priority_mux.h:213
nodelet::Nodelet
CRAS_WARN_THROTTLE
#define CRAS_WARN_THROTTLE(period,...)
ros::Time
NodeletParamHelper< ::nodelet::Nodelet >::params
::cras::BoundParamHelperPtr params(const ::ros::NodeHandle &node, const ::std::string &ns="") const
cras::PriorityMuxNodelet::tcpNoDelay
bool tcpNoDelay
Whether tcpNoDelay() flag should be set for all subscribers.
Definition: priority_mux.h:222
cras::PriorityMuxNodelet::selectedPublishers
::std::unordered_map<::std::string, ::ros::Publisher > selectedPublishers
Map of publishers of the topics announcing currently selected publishers.
Definition: priority_mux.h:186
cras::getBuffer
uint8_t * getBuffer(::topic_tools::ShapeShifter &msg)
Get the internal buffer with serialized message data.
CRAS_DEBUG
#define CRAS_DEBUG(...)
class_list_macros.hpp
cras::priority_mux::TopicConfig::queueSize
size_t queueSize
Queue size for the subscriber.
Definition: priority_mux_base.h:39
ros::MessageEvent::getConnectionHeaderPtr
const boost::shared_ptr< M_string > & getConnectionHeaderPtr() const
cras::HasLogger::log
::cras::LogHelperPtr log
cras::PriorityMuxNodelet::reset
void reset()
Reset the mux to its initial state.
Definition: priority_mux.cpp:538
cras::PriorityMuxNodelet::outTopics
::std::unordered_set<::std::string > outTopics
All output topic names.
Definition: priority_mux.h:204
cras::BoundParamHelper::hasParam
bool hasParam(const ::std::string &name, const bool searchNested=true) const
cras::PriorityMuxNodelet::timers
::std::unordered_map<::std::string, ::ros::Timer > timers
Map of timer names and the timers.
Definition: priority_mux.h:192
cras::PriorityMuxNodelet::disableCb
virtual void disableCb(const ::std::string &inTopic, const ::ros::MessageEvent<::topic_tools::ShapeShifter const > &event)
Callback method triggered when a disable message is received.
Definition: priority_mux.cpp:444
XmlRpc::XmlRpcValue::begin
iterator begin()
ros::WallDuration
cras::PriorityMuxNodelet::onTimeout
void onTimeout(const ::std::string &name, const ::ros::TimerEvent &)
Method triggered on timer timeout. This should update the mux and check for changes.
Definition: priority_mux.cpp:518
topic_tools::ShapeShifter
header
const std::string header
ros::NodeHandle::createTimer
Timer createTimer(Duration period, const TimerCallback &callback, bool oneshot=false, bool autostart=true) const
rosbag::View::begin
iterator begin()
cras::DEFAULT_NONE_TOPIC
constexpr auto DEFAULT_NONE_TOPIC
Definition: priority_mux.cpp:34
ros::Duration
cras::priority_mux::LockConfig::timeout
::ros::Duration timeout
Definition: priority_mux_base.h:50
ros::MessageEvent::init
void init(const ConstMessagePtr &message, const boost::shared_ptr< M_string > &connection_header, ros::Time receipt_time, bool nonconst_need_copy, const CreateFunction &create)
cras::getHeader
::cras::optional<::std_msgs::Header > getHeader(const ::topic_tools::ShapeShifter &msg)
Get the header field of the given message, if it has any.
cras::priority_mux::TopicConfig::outTopic
::std::string outTopic
The output topic.
Definition: priority_mux_base.h:36
cras::PriorityMuxNodelet::subscribers
::std::list<::ros::Subscriber > subscribers
List of subscribers. Just for keeping them alive.
Definition: priority_mux.h:177
XmlRpc::XmlRpcValue
cras::PriorityMuxNodelet::setTimer
void setTimer(const ::std::string &name, const ::ros::Duration &timeout)
Set a timer that will call onTimeout() after the specified time.
Definition: priority_mux.cpp:525
ros::MessageEvent
cras::BoundParamHelper::getParam
inline ::std::string getParam(const ::std::string &name, const ::cras::optional< const char * > &defaultValue, const ::std::string &unit="", const ::cras::GetParamOptions<::std::string > &options={}) const
cras::to_string
inline ::std::string to_string(char *value)
cras::resizeBuffer
void resizeBuffer(::topic_tools::ShapeShifter &msg, size_t newLength)
Resize the internal buffer of the message.
ros::Time::now
static Time now()
cras::setHeader
bool setHeader(::topic_tools::ShapeShifter &msg, ::std_msgs::Header &header)
Change the header field of the given message, if it has any.


cras_topic_tools
Author(s): Martin Pecka
autogenerated on Sun Jan 14 2024 03:48:28