priority_mux_base.cpp
Go to the documentation of this file.
1 // SPDX-License-Identifier: BSD-3-Clause
2 // SPDX-FileCopyrightText: Czech Technical University in Prague
3 
10 #include <string>
11 #include <unordered_map>
12 #include <utility>
13 
15 
17 
18 namespace cras
19 {
20 
21 PriorityMux::PriorityMux(const std::unordered_map<std::string, cras::priority_mux::TopicConfig>& topicConfigs,
22  const std::unordered_map<std::string, cras::priority_mux::LockConfig>& lockConfigs,
23  const cras::PriorityMux::SetTimerFn& setTimerFn, const ::ros::Time& now,
24  const cras::LogHelperPtr& log, const std::string& noneTopic, int nonePriority) :
25  cras::HasLogger(log), topicConfigs(topicConfigs), lockConfigs(lockConfigs), setTimer(setTimerFn),
26  noneTopic(noneTopic), nonePriority(nonePriority), lastActivePriority(nonePriority)
27 {
28  this->resetImpl(now);
29 
30  CRAS_INFO("Starting with priority %i.", nonePriority);
31  for (const auto& outAndSelectedTopic : this->lastSelectedTopics)
32  CRAS_DEBUG("Starting with no selected topic for output topic %s.", outAndSelectedTopic.first.c_str());
33 
34  for (const auto& nameAndLockConfig : this->lockConfigs)
35  {
36  const auto& name = nameAndLockConfig.first;
37  const auto& config = nameAndLockConfig.second;
38  if (config.timeout != ros::Duration(0, 0))
39  this->setTimer(name, config.timeout);
40  }
41 }
42 
43 PriorityMux::~PriorityMux() = default;
44 
45 bool PriorityMux::cb(const std::string& inTopic, const ros::Time& stamp, const ros::Time& now)
46 {
47  const auto it = this->topicConfigs.find(inTopic);
48  if (it == this->topicConfigs.end())
49  {
50  CRAS_ERROR("Priority mux called with topic %s which is not configured.", inTopic.c_str());
51  return false;
52  }
53 
54  const auto& topicConfig = it->second;
55  const auto& priority = topicConfig.priority;
56  const auto key = std::make_pair(priority, inTopic);
57  this->lastReceiveStamps[key] = stamp;
58 
59  this->update(now);
60 
61  // If stamp is older than now, wait only for as much time as actually remains
62  const auto remainingTimeout = topicConfig.timeout + (stamp - now);
63  // If the callback came late and the topic is already timed out, do not plan a callback as everything is done in the
64  // preceding update() call.
65  if (remainingTimeout > ros::Duration(0, 0))
66  this->setTimer(inTopic, remainingTimeout);
67 
68  // If the topic is disabled, do not publish it
69  const auto disableIt = this->disabledStamps.find(key);
70  if (disableIt != this->disabledStamps.end())
71  {
72  const auto& disabledStamp = disableIt->second;
73  if (now < disabledStamp + topicConfig.timeout)
74  return false;
75  }
76 
77  return this->lastActivePriority <= priority;
78 }
79 
80 void PriorityMux::disableCb(const std::string& inTopic, const ros::Time& stamp, const bool disable,
81  const ros::Time& now)
82 {
83  const auto it = this->topicConfigs.find(inTopic);
84  if (it == this->topicConfigs.end())
85  {
86  CRAS_ERROR("Priority mux called with topic %s which is not configured.", inTopic.c_str());
87  return;
88  }
89 
90  const auto& topicConfig = it->second;
91  const auto& priority = topicConfig.priority;
92  const auto key = std::make_pair(priority, inTopic);
93 
94  if (disable)
95  this->disabledStamps[key] = stamp;
96  else
97  this->disabledStamps.erase(key);
98 
99  this->update(now);
100 
101  if (disable)
102  {
103  // If stamp is older than now, wait only for as much time as actually remains
104  const auto remainingTimeout = topicConfig.timeout + (stamp - now);
105  // If the callback came late and the topic is already timed out, do not plan a callback as everything is done in the
106  // preceding update() call.
107  if (remainingTimeout > ros::Duration(0, 0))
108  this->setTimer("__disable_" + inTopic, remainingTimeout);
109  }
110 }
111 
112 void PriorityMux::lockCb(const std::string& topic, const ros::Time& stamp, const bool locked, const ros::Time& now)
113 {
114  const auto it = this->lockConfigs.find(topic);
115  if (it == this->lockConfigs.end())
116  {
117  CRAS_ERROR("Priority mux called with lock topic %s which is not configured.", topic.c_str());
118  return;
119  }
120 
121  const auto& lockConfig = it->second;
122  const auto& priority = lockConfig.priority;
123  const auto& timeout = lockConfig.timeout;
124 
125  const auto key = std::make_pair(priority, topic);
126  this->lastLockStamps[key] = stamp;
127  this->lockStates[key] = locked;
128 
129  this->update(now);
130 
131  // If stamp is older than now, wait only for as much time as actually remains
132  const auto remainingTimeout = timeout + (stamp - now);
133 
134  if (!locked && timeout != ros::Duration(0, 0) && remainingTimeout > ros::Duration(0, 0))
135  this->setTimer(topic, timeout);
136 }
137 
139 {
140  const auto lockedPriority = this->getHighestLockedPriority(now);
141 
142  // Iterate from the highest priority downwards and find the first non-timeouted non-disabled topic
143  int topicsPriority {this->nonePriority};
144  for (auto it = this->lastReceiveStamps.crbegin(); it != this->lastReceiveStamps.crend(); ++it)
145  {
146  const auto& itPriority = it->first.first;
147  const auto& itTopic = it->first.second;
148  const auto& itConfig = this->topicConfigs[itTopic];
149  const auto& itStamp = it->second;
150  if (now < itStamp + itConfig.timeout)
151  {
152  // Check if the topic isn't currently disabled
153  const auto disableIt = this->disabledStamps.find(it->first);
154  if (disableIt != this->disabledStamps.end())
155  {
156  const auto& disabledStamp = disableIt->second;
157  if (now < disabledStamp + itConfig.timeout) // The topic is disabled, skip it
158  continue;
159  // If the disable is timed out, delete its record.
160  this->disabledStamps.erase(it->first);
161  }
162  topicsPriority = itPriority;
163  break;
164  }
165  }
166 
167  const auto lastPriority = this->lastActivePriority;
168  this->lastActivePriority = (std::max)(topicsPriority, lockedPriority);
169 
170  if (lastPriority != this->lastActivePriority)
171  CRAS_INFO("Changed priority from %i to %i.", lastPriority, this->lastActivePriority);
172 
173  auto lastTopics = this->lastSelectedTopics;
174 
175  for (auto& selected : this->lastSelectedTopics)
176  selected.second = this->noneTopic;
177 
178  if (topicsPriority >= lockedPriority)
179  {
180  for (const auto& topicAndConfig : this->topicConfigs)
181  {
182  const auto& config = topicAndConfig.second;
183  if (config.priority != topicsPriority)
184  continue;
185  this->lastSelectedTopics[config.outTopic] = topicAndConfig.first;
186  }
187  }
188 
189  for (const auto& outAndSelectedTopic : this->lastSelectedTopics)
190  {
191  if (lastTopics[outAndSelectedTopic.first] != outAndSelectedTopic.second)
192  CRAS_DEBUG("Choosing topic %s instead of %s for output topic %s.", outAndSelectedTopic.second.c_str(),
193  lastTopics[outAndSelectedTopic.first].c_str(), outAndSelectedTopic.first.c_str());
194  }
195 }
196 
198 {
199  auto highestLockedPriority = this->nonePriority;
200  for (auto it = this->lastLockStamps.crbegin(); it != this->lastLockStamps.crend(); ++it)
201  {
202  const auto& itPriority = it->first.first;
203  const auto& itTopic = it->first.second;
204  const auto& itStamp = it->second;
205  const auto& itConfig = this->lockConfigs[itTopic];
206  const auto& itTimeout = itConfig.timeout;
207  const auto key = std::make_pair(itPriority, itTopic);
208  if ((itTimeout != ros::Duration(0, 0) && itStamp + itTimeout < now) || this->lockStates[key])
209  {
210  highestLockedPriority = itPriority;
211  break;
212  }
213  }
214  return highestLockedPriority;
215 }
216 
217 const std::unordered_map<std::string, std::string>& PriorityMux::getLastSelectedTopics() const
218 {
219  return this->lastSelectedTopics;
220 }
221 
223 {
224  return this->lastActivePriority;
225 }
226 
227 void PriorityMux::reset(const ::ros::Time& now)
228 {
229  this->resetImpl(now);
230 }
231 
232 void PriorityMux::resetImpl(const ::ros::Time& now)
233 {
234  this->lastActivePriority = this->nonePriority;
235  this->lastReceiveStamps.clear();
236 
237  for (const auto& topicAndConfig : this->topicConfigs)
238  this->lastSelectedTopics[topicAndConfig.second.outTopic] = this->noneTopic;
239 
240  for (const auto& topicAndLockConfig : this->lockConfigs)
241  {
242  const auto key = std::make_pair(topicAndLockConfig.second.priority, topicAndLockConfig.first);
243  this->lastLockStamps[key] = now;
244  this->lockStates[key] = false;
245  }
246 
247  this->disabledStamps.clear();
248 
249  this->update(now);
250 }
251 
252 bool PriorityMux::isDisabled(const std::string& inTopic, const ros::Time& now)
253 {
254  const auto it = this->topicConfigs.find(inTopic);
255  if (it == this->topicConfigs.end())
256  {
257  CRAS_ERROR("Priority mux called with topic %s which is not configured.", inTopic.c_str());
258  return false;
259  }
260 
261  const auto& config = it->second;
262 
263  const auto disableIt = this->disabledStamps.find(std::make_pair(config.priority, it->first));
264  if (disableIt != this->disabledStamps.end())
265  {
266  const auto& disabledStamp = disableIt->second;
267  return now >= disabledStamp + config.timeout;
268  }
269  return false;
270 }
271 
272 }
cras::PriorityMux::isDisabled
bool isDisabled(const ::std::string &inTopic, const ::ros::Time &now)
Definition: priority_mux_base.cpp:252
cras::PriorityMux::cb
virtual bool cb(const ::std::string &inTopic, const ::ros::Time &stamp, const ::ros::Time &now)
Callback function run when input topic message is received.
Definition: priority_mux_base.cpp:45
cras
cras::PriorityMux::resetImpl
void resetImpl(const ::ros::Time &now)
Reset the mux to its original state.
Definition: priority_mux_base.cpp:232
cras::PriorityMux::lockConfigs
::std::unordered_map<::std::string, ::cras::priority_mux::LockConfig > lockConfigs
Configurations of lock topics.
Definition: priority_mux_base.h:191
cras::PriorityMux::noneTopic
::std::string noneTopic
Virtual name of a topic reported as selected when no priority is active.
Definition: priority_mux_base.h:215
cras::PriorityMux::topicConfigs
::std::unordered_map<::std::string, ::cras::priority_mux::TopicConfig > topicConfigs
Configurations of input topics.
Definition: priority_mux_base.h:188
cras::PriorityMux::disableCb
virtual void disableCb(const ::std::string &inTopic, const ::ros::Time &stamp, bool disable, const ::ros::Time &now)
Callback function to disable/enable an input topic.
Definition: priority_mux_base.cpp:80
cras::HasLogger
cras::PriorityMux::getHighestLockedPriority
int getHighestLockedPriority(const ::ros::Time &now)
Gets the highest locked priority.
Definition: priority_mux_base.cpp:197
cras::PriorityMux::lastLockStamps
::std::map<::std::pair< int, ::std::string >, ::ros::Time > lastLockStamps
The timestamps of the last received lock message for each lock topic.
Definition: priority_mux_base.h:209
cras::PriorityMux::lastReceiveStamps
::std::map<::std::pair< int, ::std::string >, ::ros::Time > lastReceiveStamps
The timestamps of the last received message for each input topic.
Definition: priority_mux_base.h:203
CRAS_ERROR
#define CRAS_ERROR(...)
CRAS_INFO
#define CRAS_INFO(...)
log_utils.h
cras::PriorityMux::setTimer
SetTimerFn setTimer
The function to call when the mux needs to schedule a call to update() after some time.
Definition: priority_mux_base.h:194
cras::PriorityMux::PriorityMux
PriorityMux(const ::std::unordered_map<::std::string, ::cras::priority_mux::TopicConfig > &topicConfigs, const ::std::unordered_map<::std::string, ::cras::priority_mux::LockConfig > &lockConfigs, const SetTimerFn &setTimerFn, const ::ros::Time &now, const ::cras::LogHelperPtr &log, const ::std::string &noneTopic="__none", int nonePriority=0)
Constructs a PriorityMux object.
Definition: priority_mux_base.cpp:21
cras::PriorityMux::lastActivePriority
int lastActivePriority
The currently active priority level.
Definition: priority_mux_base.h:221
priority_mux_base.h
Priority-based muxer for topics.
cras::PriorityMux::getActivePriority
int getActivePriority() const
Return the active priority level.
Definition: priority_mux_base.cpp:222
cras::PriorityMux::SetTimerFn
::std::function< void(const ::std::string &name, const ::ros::Duration &timeout)> SetTimerFn
The function to call when the mux needs to schedule a call to update() after some time.
Definition: priority_mux_base.h:92
cras::PriorityMux::lockStates
::std::map<::std::pair< int, ::std::string >, bool > lockStates
Current lock status of each lock topic (whether the lock is explicitly locked or not).
Definition: priority_mux_base.h:212
cras::PriorityMux::lockCb
virtual void lockCb(const ::std::string &topic, const ::ros::Time &stamp, bool locked, const ::ros::Time &now)
Callback function run when a lock message is received.
Definition: priority_mux_base.cpp:112
cras::PriorityMux::lastSelectedTopics
::std::unordered_map<::std::string, ::std::string > lastSelectedTopics
The currently selected topic for each output topic.
Definition: priority_mux_base.h:224
cras::LogHelperPtr
::cras::LogHelper::Ptr LogHelperPtr
ros::Time
CRAS_DEBUG
#define CRAS_DEBUG(...)
cras::PriorityMux::~PriorityMux
virtual ~PriorityMux()
cras::PriorityMux::reset
virtual void reset(const ::ros::Time &now)
Resets the mux to its initial state.
Definition: priority_mux_base.cpp:227
cras::PriorityMux::disabledStamps
::std::map<::std::pair< int, ::std::string >, ::ros::Time > disabledStamps
The timestamps of the last received disable message for each input topic.
Definition: priority_mux_base.h:206
cras::PriorityMux::getLastSelectedTopics
const ::std::unordered_map<::std::string, ::std::string > & getLastSelectedTopics() const
Return the last selected topic for each output topic.
Definition: priority_mux_base.cpp:217
cras::PriorityMux::update
virtual void update(const ::ros::Time &now)
Updates the mux after changes have been made to some of its internal state.
Definition: priority_mux_base.cpp:138
ros::Duration
cras::PriorityMux::nonePriority
int nonePriority
Priority level signalling that no priority is active.
Definition: priority_mux_base.h:218


cras_topic_tools
Author(s): Martin Pecka
autogenerated on Sun Jan 5 2025 03:50:49