priority_mux.cpp
Go to the documentation of this file.
1 
9 #include <limits>
10 #include <list>
11 #include <memory>
12 #include <string>
13 #include <utility>
14 
15 #include <std_msgs/Bool.h>
16 #include <std_msgs/Int32.h>
17 #include <std_msgs/String.h>
18 
22 
23 namespace cras
24 {
25 
26 constexpr auto DEFAULT_OUT_TOPIC = "mux_out";
27 
29 {
30  const auto params = this->privateParams();
31 
32  this->outQueueSize = params->getParam("out_queue_size", 10_sz, "messages");
33  this->lastActivePriority = this->priorityNone = params->getParam("priority_none", 0);
34 
35  XmlRpc::XmlRpcValue topicParams;
36  try
37  {
38  topicParams = params->getParam<XmlRpc::XmlRpcValue>("topics", cras::nullopt);
39  }
40  catch (const cras::GetParamException& e)
41  {
42  CRAS_ERROR("Parameter ~topics is empty, priority mux will not do anything!");
43  return;
44  }
45 
46  std::list<std::pair<std::string, XmlRpc::XmlRpcValue>> topicItems;
47  switch (topicParams.getType())
48  {
50  for (size_t i = 0; i < topicParams.size(); ++i)
51  topicItems.emplace_back("[" + cras::to_string(i) + "]", topicParams[i]);
52  break;
54  for (const auto& item : topicParams)
55  topicItems.emplace_back("/" + item.first, item.second);
56  break;
57  default:
58  CRAS_ERROR("Parameter ~topics has to be either a list or a dict. Priority mux will not do anything!");
59  return;
60  }
61 
62  for (const auto& item : topicItems)
63  {
64  const auto& key = item.first;
65  const auto& xmlConfig = item.second;
66  if (xmlConfig.getType() != XmlRpc::XmlRpcValue::TypeStruct)
67  {
68  CRAS_ERROR("Item %s of ~topics has to be a dict, but %s was given.",
69  key.c_str(), cras::to_string(xmlConfig.getType()).c_str());
70  continue;
71  }
72 
73  auto itemNamespace = this->getPrivateNodeHandle().resolveName("topics") + key;
74  auto xmlParamAdapter = std::make_shared<cras::XmlRpcValueGetParamAdapter>(xmlConfig, itemNamespace);
75  cras::BoundParamHelper xmlParams(this->log, xmlParamAdapter);
77 
78  try
79  {
80  config.inTopic = xmlParams.getParam<std::string>("topic", cras::nullopt);
81  }
82  catch (const cras::GetParamException& e)
83  {
84  CRAS_ERROR("Item %s of ~topics has to contain key 'topic'. Skipping the item.", key.c_str());
85  continue;
86  }
87 
88  config.name = xmlParams.getParam("name", config.inTopic);
89  config.outTopic = xmlParams.getParam("out_topic", DEFAULT_OUT_TOPIC);
90  config.priority = xmlParams.getParam("priority", 100, "", {true, true});
91  config.timeout = xmlParams.getParam("timeout", ros::Duration{1, 0}, "s", {true, true});
92  config.queueSize = xmlParams.getParam("queue_size", 10_sz, "messages");
93 
94  this->topicConfigs[config.inTopic] = config;
95  }
96 
97  XmlRpc::XmlRpcValue lockParams;
98  lockParams.begin(); // convert to struct
99 
100  lockParams = params->getParam("locks", lockParams);
101  std::list<std::pair<std::string, XmlRpc::XmlRpcValue>> lockItems;
102  switch (lockParams.getType())
103  {
105  for (size_t i = 0; i < lockParams.size(); ++i)
106  lockItems.emplace_back("[" + cras::to_string(i) + "]", lockParams[i]);
107  break;
109  for (const auto& item : lockParams)
110  lockItems.emplace_back("/" + item.first, item.second);
111  break;
112  default:
113  CRAS_ERROR("Parameter ~locks has to be either a list or a dict. No locks will be set.");
114  return;
115  }
116 
117  if (lockItems.empty())
118  CRAS_INFO("No locks were specified.");
119 
120  for (const auto& item : lockItems)
121  {
122  const auto& key = item.first;
123  const auto& xmlConfig = item.second;
124  if (xmlConfig.getType() != XmlRpc::XmlRpcValue::TypeStruct)
125  {
126  CRAS_ERROR("Item %s of ~locks has to be a dict, but %s was given.",
127  key.c_str(), cras::to_string(xmlConfig.getType()).c_str());
128  continue;
129  }
130 
131  auto itemNamespace = this->getPrivateNodeHandle().resolveName("locks") + key;
132  auto xmlParamAdapter = std::make_shared<cras::XmlRpcValueGetParamAdapter>(xmlConfig, itemNamespace);
133  cras::BoundParamHelper xmlParams(this->log, xmlParamAdapter);
135 
136  try
137  {
138  config.topic = xmlParams.getParam<std::string>("topic", cras::nullopt);
139  }
140  catch (const cras::GetParamException& e)
141  {
142  CRAS_ERROR("Item %s of ~locks has to contain key 'topic'. Skipping the item.", key.c_str());
143  continue;
144  }
145 
146  config.name = xmlParams.getParam("name", config.topic);
147  config.priority = xmlParams.getParam("priority", 100, "", {true, true});
148  config.timeout = xmlParams.getParam("timeout", ros::Duration{1, 0}, "s", {true, true});
149  config.queueSize = xmlParams.getParam("queue_size", 10_sz, "messages");
150 
151  this->lockConfigs[config.topic] = config;
152  }
153 
154  this->activePriorityPub = this->getPrivateNodeHandle().advertise<std_msgs::Int32>(
155  "active_priority", this->outQueueSize, true);
156  ros::WallDuration(0.1).sleep();
157 
158  std_msgs::Int32 msg;
159  msg.data = this->priorityNone;
160  this->activePriorityPub.publish(msg);
161  CRAS_INFO("No priority active now.");
162 
163  for (const auto& config : this->topicConfigs)
164  {
165  const auto& topicConfig = config.second;
166 
167  if (!this->selectedPublishers[topicConfig.outTopic])
168  {
169  const auto selectedTopic = "selected/" + cras::stripLeading(topicConfig.outTopic, '/');
170  this->selectedPublishers[topicConfig.outTopic] =
171  this->getPrivateNodeHandle().advertise<std_msgs::String>(selectedTopic, this->outQueueSize, true);
172  ros::WallDuration(0.1).sleep();
173 
174  std_msgs::String selectedMsg;
175  selectedMsg.data = priority_mux::NONE_TOPIC;
176  this->selectedPublishers[topicConfig.outTopic].publish(selectedMsg);
177  CRAS_INFO("No topic is now selected for output topic %s.", topicConfig.outTopic.c_str());
178  }
179 
180  const auto cb = boost::bind(&PriorityMuxNodelet::cb, this, boost::placeholders::_1, topicConfig.inTopic);
181  const auto sub = this->getNodeHandle().subscribe<topic_tools::ShapeShifter>(
182  topicConfig.inTopic, topicConfig.queueSize, cb);
183  this->subscribers.push_back(sub);
184  }
185 
186  for (const auto& config : this->lockConfigs)
187  {
188  const auto& lockConfig = config.second;
189 
190  const auto lockedTopic = "locked/" + cras::stripLeading(lockConfig.topic, '/');
191  this->lockedPublishers[lockConfig.topic] =
192  this->getPrivateNodeHandle().advertise<std_msgs::Bool>(lockedTopic, this->outQueueSize, true);
193  ros::WallDuration(0.1).sleep();
194 
195  // Pretend that each lock has received a message when starting the mux
196  this->lastLockStamps[std::make_pair(lockConfig.priority, lockConfig.topic)] = ros::Time::now();
197 
198  std_msgs::Bool lockedMsg;
199  lockedMsg.data = false;
200  this->lockedPublishers[lockConfig.topic].publish(lockedMsg);
201  CRAS_INFO("Lock %s is not locked.", lockConfig.name.c_str());
202 
203  const auto cb = cras::bind_front(&PriorityMuxNodelet::lockCb, this, lockConfig.topic);
204  const auto sub = this->getNodeHandle().subscribe<std_msgs::Bool>(lockConfig.topic, lockConfig.queueSize, cb);
205  this->subscribers.push_back(sub);
206  }
207 }
208 
210  const ros::MessageEvent<const topic_tools::ShapeShifter>& event, const std::string& inTopic)
211 {
212  const auto& topicConfig = this->topicConfigs[inTopic];
213  const auto& priority = topicConfig.priority;
214  const auto& outTopic = topicConfig.outTopic;
215  this->lastReceiveStamps[std::make_pair(priority, inTopic)] = ros::Time::now();
216 
217  auto highestLockedPriority = std::numeric_limits<int>::min();
218  for (auto it = this->lastLockStamps.crbegin(); it != this->lastLockStamps.crend(); ++it)
219  {
220  const auto& itPriority = it->first.first;
221  const auto& itTopic = it->first.second;
222  const auto& itStamp = it->second;
223  const auto& itConfig = this->lockConfigs[itTopic];
224  const auto& itTimeout = itConfig.timeout;
225  if (itPriority > highestLockedPriority && (itStamp + itTimeout < ros::Time::now()))
226  highestLockedPriority = itPriority;
227  }
228 
229  for (auto it = this->lastReceiveStamps.crbegin(); it != this->lastReceiveStamps.crend(); ++it)
230  {
231  const auto& itPriority = it->first.first;
232 
233  if (itPriority <= priority)
234  {
235  this->updatePriorities(priority, inTopic);
236  break;
237  }
238 
239  if (itPriority < highestLockedPriority)
240  {
241  CRAS_DEBUG("Priority %i not active, as a lock with priority %i is locked.",
242  priority, highestLockedPriority);
243  return;
244  }
245 
246  const auto& itTopic = it->first.second;
247  const auto& itConfig = this->topicConfigs[itTopic];
248  const auto& itStamp = it->second;
249  if ((ros::Time::now() - itStamp) < itConfig.timeout)
250  {
251  CRAS_DEBUG("Priority %i not active. Found higher active priority %i.", priority, itPriority);
252  return;
253  }
254  }
255 
256  auto& pub = this->publishers[topicConfig.outTopic];
257  if (!pub)
258  {
259  auto latch = false;
260  if (event.getConnectionHeaderPtr() != nullptr)
261  latch = event.getConnectionHeader()["latching"] == "1";
262  pub = event.getConstMessage()->advertise(this->getNodeHandle(), topicConfig.outTopic, this->outQueueSize, latch);
263  ros::WallDuration(0.1).sleep();
264  }
265 
266  if (this->priorityBackToNoneTimer.isValid())
267  this->priorityBackToNoneTimer.setPeriod(topicConfig.timeout, true);
268  if (this->selectedBackToNoneTimers[topicConfig.outTopic].isValid())
269  this->selectedBackToNoneTimers[topicConfig.outTopic].setPeriod(topicConfig.timeout, true);
270  pub.publish(event.getConstMessage());
271 }
272 
273 void PriorityMuxNodelet::lockCb(const std::string& topic, const std_msgs::BoolConstPtr& msg)
274 {
275  const auto& lockConfig = this->lockConfigs[topic];
276  const auto& priority = lockConfig.priority;
277  const auto& timeout = lockConfig.timeout;
278 
279  auto& stamp = this->lastLockStamps[std::make_pair(priority, topic)];
280 
281  // Check if the lock was locked before; if it was, announce that it is being unlocked.
282  if (stamp + timeout < ros::Time::now())
283  {
284  std_msgs::Bool lockedMsg;
285  lockedMsg.data = false;
286  this->lockedPublishers[topic].publish(lockedMsg);
287  CRAS_INFO("Lock %s is unlocked now.", lockConfig.name.c_str());
288  }
289 
290  auto time = ros::Time::now();
291  // It the message contains "true", set the timeout so that it is already expired
292  if (msg->data)
293  {
294  if (ros::Time(0, 1) + timeout > time)
295  time -= timeout + ros::Duration(0, 1);
296  else
297  time = ros::Time(0, 0); // TODO: this will prevent timeouting a lock at the start of a simulation
298  }
299  stamp = time;
300 
301  auto& timer = this->lockTimeoutTimers[topic];
302  if (timer.isValid())
303  {
304  if (msg->data)
305  timer.setPeriod({0, 0}, true);
306  else
307  timer.setPeriod(timeout, true);
308  }
309  else
310  {
312  timer = this->getNodeHandle().createTimer(timeout, cb, true);
313  }
314 }
315 
316 void PriorityMuxNodelet::updatePriorities(const int newPriority, const ::std::string& newTopic)
317 {
318  const auto& config = this->topicConfigs[newTopic];
319 
320  if (newPriority != this->lastActivePriority)
321  {
322  CRAS_INFO("Priority %i is now active.", newPriority);
323  this->lastActivePriority = newPriority;
324  this->publishPriorityChange(newPriority, config.timeout);
325  }
326 
327  if (newTopic != this->lastSelectedTopics[config.outTopic])
328  {
329  CRAS_INFO("Source topic '%s' is now selected for output topic '%s'.",
330  newTopic.c_str(), config.outTopic.c_str());
331  this->lastSelectedTopics[config.outTopic] = newTopic;
332  this->publishSelectedTopicChange(config.outTopic, newTopic, config.timeout);
333  }
334 
335  for (const auto& topicConfig : this->topicConfigs)
336  {
337  const auto& itPriority = topicConfig.second.priority;
338  if (itPriority >= newPriority)
339  break;
340 
341  const auto& itTopic = topicConfig.second.inTopic;
342  const auto outTopic = topicConfig.second.outTopic;
343  if (this->lastSelectedTopics[outTopic] == itTopic && this->selectedBackToNoneTimers[outTopic].isValid())
344  this->selectedBackToNoneTimers[outTopic].setPeriod(ros::Duration(0, 0), true);
345  }
346 }
347 
348 void PriorityMuxNodelet::publishPriorityChange(const int newPriority, const ::ros::Duration& timeout)
349 {
350  std_msgs::Int32 msg;
351  msg.data = newPriority;
352  this->activePriorityPub.publish(msg);
353 
354  if (this->priorityBackToNoneTimer.isValid())
357  timeout, &PriorityMuxNodelet::onPriorityTimeout, this, true);
358 }
359 
360 void PriorityMuxNodelet::publishSelectedTopicChange(const ::std::string& outTopic, const std::string& newTopic,
361  const ros::Duration& timeout)
362 {
363  std_msgs::String msg;
364  msg.data = newTopic;
365  this->selectedPublishers[outTopic].publish(msg);
366 
367  if (this->selectedBackToNoneTimers[outTopic].isValid())
368  this->selectedBackToNoneTimers[outTopic].stop();
369  auto cb = ::cras::bind_front(&PriorityMuxNodelet::onSelectedTopicTimeout, this, outTopic);
370  this->selectedBackToNoneTimers[outTopic] = this->getNodeHandle().createTimer(timeout, cb, true);
371 }
372 
374 {
375  std_msgs::Int32 msg;
376  msg.data = this->priorityNone;
377  this->activePriorityPub.publish(msg);
378  this->lastActivePriority = this->priorityNone;
379  CRAS_INFO("No priority is now active.");
380 }
381 
382 void PriorityMuxNodelet::onSelectedTopicTimeout(const ::std::string& outTopic, const ros::TimerEvent&)
383 {
384  std_msgs::String msg;
385  msg.data = priority_mux::NONE_TOPIC;
386  this->selectedPublishers[outTopic].publish(msg);
388  CRAS_INFO("No topic is now selected for output topic %s.", outTopic.c_str());
389 }
390 
391 void PriorityMuxNodelet::onLockTimeout(const std::string& topic, const ros::TimerEvent&)
392 {
393  std_msgs::Bool msg;
394  msg.data = true;
395  this->lockedPublishers[topic].publish(msg);
396  CRAS_INFO("Lock %s is locked now.", this->lockConfigs[topic].name.c_str());
397 }
398 
399 }
const boost::shared_ptr< ConstMessage > & getConstMessage() const
msg
::std::unordered_map<::std::string, ::ros::Publisher > lockedPublishers
Definition: priority_mux.h:71
::cras::BoundParamHelperPtr privateParams(const ::std::string &ns="") const
::ros::Timer priorityBackToNoneTimer
Definition: priority_mux.h:73
virtual void cb(const ::ros::MessageEvent<::topic_tools::ShapeShifter const > &event, const ::std::string &inTopic)
Timer createTimer(Rate r, Handler h, Obj o, bool oneshot=false, bool autostart=true) const
Subscriber subscribe(const std::string &topic, uint32_t queue_size, void(T::*fp)(M), T *obj, const TransportHints &transport_hints=TransportHints())
::std::unordered_map<::std::string, ::ros::Publisher > publishers
Definition: priority_mux.h:68
::std::map<::std::pair< int, ::std::string >, ::ros::Time > lastLockStamps
Definition: priority_mux.h:81
ros::NodeHandle & getNodeHandle() const
#define CRAS_INFO(...)
void stop()
void setPeriod(const Duration &period, bool reset=true)
void publishPriorityChange(int newPriority, const ::ros::Duration &timeout)
inline ::std::string to_string(const ::XmlRpc::XmlRpcValue &value)
void onPriorityTimeout(const ::ros::TimerEvent &)
ros::NodeHandle & getPrivateNodeHandle() const
::ros::Publisher activePriorityPub
Definition: priority_mux.h:69
constexpr auto DEFAULT_OUT_TOPIC
#define CRAS_DEBUG(...)
::cras::LogHelperPtr log
virtual void lockCb(const ::std::string &topic, const ::std_msgs::BoolConstPtr &msg)
::std::map<::std::pair< int, ::std::string >, ::ros::Time > lastReceiveStamps
Definition: priority_mux.h:80
void stripLeading(::std::string &s, const char &c=' ')
Type const & getType() const
void publish(const boost::shared_ptr< M > &message) const
::std::list<::ros::Subscriber > subscribers
Definition: priority_mux.h:67
virtual void updatePriorities(int newPriority, const ::std::string &newTopic)
::std::unordered_map<::std::string, ::cras::priority_mux::TopicConfig > topicConfigs
Definition: priority_mux.h:77
void onLockTimeout(const ::std::string &topic, const ::ros::TimerEvent &)
auto bind_front(F &&f, Args &&... args)
M_string & getConnectionHeader() const
std::string resolveName(const std::string &name, bool remap=true) const
Publisher advertise(const std::string &topic, uint32_t queue_size, bool latch=false)
bool sleep() const
::std::unordered_map<::std::string, ::std::string > lastSelectedTopics
Definition: priority_mux.h:86
::std::unordered_map<::std::string, ::ros::Timer > selectedBackToNoneTimers
Definition: priority_mux.h:74
::std::unordered_map<::std::string, ::ros::Publisher > selectedPublishers
Definition: priority_mux.h:70
bool isValid()
::std::unordered_map<::std::string, ::ros::Timer > lockTimeoutTimers
Definition: priority_mux.h:75
::std::unordered_map<::std::string, ::cras::priority_mux::LockConfig > lockConfigs
Definition: priority_mux.h:78
::cras::BoundParamHelperPtr params(const ::ros::NodeHandle &node, const ::std::string &ns="") const
static Time now()
void onInit() override
const boost::shared_ptr< M_string > & getConnectionHeaderPtr() const
void onSelectedTopicTimeout(const ::std::string &outTopic, const ::ros::TimerEvent &)
static const ::std::string NONE_TOPIC
Definition: priority_mux.h:28
void publishSelectedTopicChange(const ::std::string &outTopic, const ::std::string &newTopic, const ::ros::Duration &timeout)
#define CRAS_ERROR(...)


cras_topic_tools
Author(s): Martin Pecka
autogenerated on Sat Jun 17 2023 02:33:13