priority_mux_base.cpp
Go to the documentation of this file.
1 // SPDX-License-Identifier: BSD-3-Clause
2 // SPDX-FileCopyrightText: Czech Technical University in Prague
3 
10 #include <string>
11 #include <unordered_map>
12 #include <utility>
13 
15 
17 
18 namespace cras
19 {
20 
21 PriorityMux::PriorityMux(const std::unordered_map<std::string, cras::priority_mux::TopicConfig>& topicConfigs,
22  const std::unordered_map<std::string, cras::priority_mux::LockConfig>& lockConfigs,
23  const cras::PriorityMux::SetTimerFn& setTimerFn, const ::ros::Time& now,
24  const cras::LogHelperPtr& log, const std::string& noneTopic, int nonePriority) :
25  cras::HasLogger(log), topicConfigs(topicConfigs), lockConfigs(lockConfigs), setTimer(setTimerFn),
26  noneTopic(noneTopic), nonePriority(nonePriority)
27 {
28  this->resetImpl(now);
29 
30  CRAS_INFO("Starting with priority %i.", nonePriority);
31  for (const auto& outAndSelectedTopic : this->lastSelectedTopics)
32  CRAS_DEBUG("Starting with no selected topic for output topic %s.", outAndSelectedTopic.first.c_str());
33 
34  for (const auto& nameAndLockConfig : this->lockConfigs)
35  {
36  const auto& name = nameAndLockConfig.first;
37  const auto& config = nameAndLockConfig.second;
38  if (config.timeout != ros::Duration(0, 0))
39  this->setTimer(name, config.timeout);
40  }
41 }
42 
43 bool PriorityMux::cb(const std::string& inTopic, const ros::Time& stamp, const ros::Time& now)
44 {
45  const auto it = this->topicConfigs.find(inTopic);
46  if (it == this->topicConfigs.end())
47  {
48  CRAS_ERROR("Priority mux called with topic %s which is not configured.", inTopic.c_str());
49  return false;
50  }
51 
52  const auto& topicConfig = it->second;
53  const auto& priority = topicConfig.priority;
54  const auto key = std::make_pair(priority, inTopic);
55  this->lastReceiveStamps[key] = stamp;
56 
57  this->update(now);
58 
59  // If stamp is older than now, wait only for as much time as actually remains
60  const auto remainingTimeout = topicConfig.timeout + (stamp - now);
61  // If the callback came late and the topic is already timed out, do not plan a callback as everything is done in the
62  // preceding update() call.
63  if (remainingTimeout > ros::Duration(0, 0))
64  this->setTimer(inTopic, remainingTimeout);
65 
66  // If the topic is disabled, do not publish it
67  const auto disableIt = this->disabledStamps.find(key);
68  if (disableIt != this->disabledStamps.end())
69  {
70  const auto& disabledStamp = disableIt->second;
71  if (now < disabledStamp + topicConfig.timeout)
72  return false;
73  }
74 
75  return this->lastActivePriority <= priority;
76 }
77 
78 void PriorityMux::disableCb(const std::string& inTopic, const ros::Time& stamp, const bool disable,
79  const ros::Time& now)
80 {
81  const auto it = this->topicConfigs.find(inTopic);
82  if (it == this->topicConfigs.end())
83  {
84  CRAS_ERROR("Priority mux called with topic %s which is not configured.", inTopic.c_str());
85  return;
86  }
87 
88  const auto& topicConfig = it->second;
89  const auto& priority = topicConfig.priority;
90  const auto key = std::make_pair(priority, inTopic);
91 
92  if (disable)
93  this->disabledStamps[key] = stamp;
94  else
95  this->disabledStamps.erase(key);
96 
97  this->update(now);
98 
99  if (disable)
100  {
101  // If stamp is older than now, wait only for as much time as actually remains
102  const auto remainingTimeout = topicConfig.timeout + (stamp - now);
103  // If the callback came late and the topic is already timed out, do not plan a callback as everything is done in the
104  // preceding update() call.
105  if (remainingTimeout > ros::Duration(0, 0))
106  this->setTimer("__disable_" + inTopic, remainingTimeout);
107  }
108 }
109 
110 void PriorityMux::lockCb(const std::string& topic, const ros::Time& stamp, const bool locked, const ros::Time& now)
111 {
112  const auto it = this->lockConfigs.find(topic);
113  if (it == this->lockConfigs.end())
114  {
115  CRAS_ERROR("Priority mux called with lock topic %s which is not configured.", topic.c_str());
116  return;
117  }
118 
119  const auto& lockConfig = it->second;
120  const auto& priority = lockConfig.priority;
121  const auto& timeout = lockConfig.timeout;
122 
123  const auto key = std::make_pair(priority, topic);
124  this->lastLockStamps[key] = stamp;
125  this->lockStates[key] = locked;
126 
127  this->update(now);
128 
129  // If stamp is older than now, wait only for as much time as actually remains
130  const auto remainingTimeout = timeout + (stamp - now);
131 
132  if (!locked && timeout != ros::Duration(0, 0) && remainingTimeout > ros::Duration(0, 0))
133  this->setTimer(topic, timeout);
134 }
135 
137 {
138  const auto lockedPriority = this->getHighestLockedPriority(now);
139 
140  // Iterate from the highest priority downwards and find the first non-timeouted non-disabled topic
141  int topicsPriority {this->nonePriority};
142  for (auto it = this->lastReceiveStamps.crbegin(); it != this->lastReceiveStamps.crend(); ++it)
143  {
144  const auto& itPriority = it->first.first;
145  const auto& itTopic = it->first.second;
146  const auto& itConfig = this->topicConfigs[itTopic];
147  const auto& itStamp = it->second;
148  if (now < itStamp + itConfig.timeout)
149  {
150  // Check if the topic isn't currently disabled
151  const auto disableIt = this->disabledStamps.find(it->first);
152  if (disableIt != this->disabledStamps.end())
153  {
154  const auto& disabledStamp = disableIt->second;
155  if (now < disabledStamp + itConfig.timeout) // The topic is disabled, skip it
156  continue;
157  // If the disable is timed out, delete its record.
158  this->disabledStamps.erase(it->first);
159  }
160  topicsPriority = itPriority;
161  break;
162  }
163  }
164 
165  const auto lastPriority = this->lastActivePriority;
166  this->lastActivePriority = (std::max)(topicsPriority, lockedPriority);
167 
168  if (lastPriority != this->lastActivePriority)
169  CRAS_INFO("Changed priority from %i to %i.", lastPriority, this->lastActivePriority);
170 
171  auto lastTopics = this->lastSelectedTopics;
172 
173  for (auto& selected : this->lastSelectedTopics)
174  selected.second = this->noneTopic;
175 
176  if (topicsPriority >= lockedPriority)
177  {
178  for (const auto& topicAndConfig : this->topicConfigs)
179  {
180  const auto& config = topicAndConfig.second;
181  if (config.priority != topicsPriority)
182  continue;
183  this->lastSelectedTopics[config.outTopic] = topicAndConfig.first;
184  }
185  }
186 
187  for (const auto& outAndSelectedTopic : this->lastSelectedTopics)
188  {
189  if (lastTopics[outAndSelectedTopic.first] != outAndSelectedTopic.second)
190  CRAS_DEBUG("Choosing topic %s instead of %s for output topic %s.", outAndSelectedTopic.second.c_str(),
191  lastTopics[outAndSelectedTopic.first].c_str(), outAndSelectedTopic.first.c_str());
192  }
193 }
194 
196 {
197  auto highestLockedPriority = this->nonePriority;
198  for (auto it = this->lastLockStamps.crbegin(); it != this->lastLockStamps.crend(); ++it)
199  {
200  const auto& itPriority = it->first.first;
201  const auto& itTopic = it->first.second;
202  const auto& itStamp = it->second;
203  const auto& itConfig = this->lockConfigs[itTopic];
204  const auto& itTimeout = itConfig.timeout;
205  const auto key = std::make_pair(itPriority, itTopic);
206  if ((itTimeout != ros::Duration(0, 0) && itStamp + itTimeout < now) || this->lockStates[key])
207  {
208  highestLockedPriority = itPriority;
209  break;
210  }
211  }
212  return highestLockedPriority;
213 }
214 
215 const std::unordered_map<std::string, std::string>& PriorityMux::getLastSelectedTopics() const
216 {
217  return this->lastSelectedTopics;
218 }
219 
221 {
222  return this->lastActivePriority;
223 }
224 
225 void PriorityMux::reset(const ::ros::Time& now)
226 {
227  this->resetImpl(now);
228 }
229 
230 void PriorityMux::resetImpl(const ::ros::Time& now)
231 {
232  this->lastActivePriority = this->nonePriority;
233  this->lastReceiveStamps.clear();
234 
235  for (const auto& topicAndConfig : this->topicConfigs)
236  this->lastSelectedTopics[topicAndConfig.second.outTopic] = this->noneTopic;
237 
238  for (const auto& topicAndLockConfig : this->lockConfigs)
239  {
240  const auto key = std::make_pair(topicAndLockConfig.second.priority, topicAndLockConfig.first);
241  this->lastLockStamps[key] = now;
242  this->lockStates[key] = false;
243  }
244 
245  this->disabledStamps.clear();
246 
247  this->update(now);
248 }
249 
250 bool PriorityMux::isDisabled(const std::string& inTopic, const ros::Time& now)
251 {
252  const auto it = this->topicConfigs.find(inTopic);
253  if (it == this->topicConfigs.end())
254  {
255  CRAS_ERROR("Priority mux called with topic %s which is not configured.", inTopic.c_str());
256  return false;
257  }
258 
259  const auto& config = it->second;
260 
261  const auto disableIt = this->disabledStamps.find(std::make_pair(config.priority, it->first));
262  if (disableIt != this->disabledStamps.end())
263  {
264  const auto& disabledStamp = disableIt->second;
265  return now >= disabledStamp + config.timeout;
266  }
267  return false;
268 }
269 
270 }
cras::PriorityMux::isDisabled
bool isDisabled(const ::std::string &inTopic, const ::ros::Time &now)
Definition: priority_mux_base.cpp:250
cras::PriorityMux::cb
virtual bool cb(const ::std::string &inTopic, const ::ros::Time &stamp, const ::ros::Time &now)
Callback function run when input topic message is received.
Definition: priority_mux_base.cpp:43
cras
cras::PriorityMux::resetImpl
void resetImpl(const ::ros::Time &now)
Reset the mux to its original state.
Definition: priority_mux_base.cpp:230
cras::PriorityMux::lockConfigs
::std::unordered_map<::std::string, ::cras::priority_mux::LockConfig > lockConfigs
Configurations of lock topics.
Definition: priority_mux_base.h:189
cras::PriorityMux::noneTopic
::std::string noneTopic
Virtual name of a topic reported as selected when no priority is active.
Definition: priority_mux_base.h:213
cras::PriorityMux::topicConfigs
::std::unordered_map<::std::string, ::cras::priority_mux::TopicConfig > topicConfigs
Configurations of input topics.
Definition: priority_mux_base.h:186
cras::PriorityMux::disableCb
virtual void disableCb(const ::std::string &inTopic, const ::ros::Time &stamp, bool disable, const ::ros::Time &now)
Callback function to disable/enable an input topic.
Definition: priority_mux_base.cpp:78
cras::HasLogger
cras::PriorityMux::getHighestLockedPriority
int getHighestLockedPriority(const ::ros::Time &now)
Gets the highest locked priority.
Definition: priority_mux_base.cpp:195
cras::PriorityMux::lastLockStamps
::std::map<::std::pair< int, ::std::string >, ::ros::Time > lastLockStamps
The timestamps of the last received lock message for each lock topic.
Definition: priority_mux_base.h:207
cras::PriorityMux::lastReceiveStamps
::std::map<::std::pair< int, ::std::string >, ::ros::Time > lastReceiveStamps
The timestamps of the last received message for each input topic.
Definition: priority_mux_base.h:201
CRAS_ERROR
#define CRAS_ERROR(...)
CRAS_INFO
#define CRAS_INFO(...)
log_utils.h
cras::PriorityMux::setTimer
SetTimerFn setTimer
The function to call when the mux needs to schedule a call to update() after some time.
Definition: priority_mux_base.h:192
cras::PriorityMux::PriorityMux
PriorityMux(const ::std::unordered_map<::std::string, ::cras::priority_mux::TopicConfig > &topicConfigs, const ::std::unordered_map<::std::string, ::cras::priority_mux::LockConfig > &lockConfigs, const SetTimerFn &setTimerFn, const ::ros::Time &now, const ::cras::LogHelperPtr &log, const ::std::string &noneTopic="__none", int nonePriority=0)
Constructs a PriorityMux object.
Definition: priority_mux_base.cpp:21
cras::PriorityMux::lastActivePriority
int lastActivePriority
The currently active priority level.
Definition: priority_mux_base.h:219
priority_mux_base.h
Priority-based muxer for topics.
cras::PriorityMux::getActivePriority
int getActivePriority() const
Return the active priority level.
Definition: priority_mux_base.cpp:220
cras::PriorityMux::SetTimerFn
::std::function< void(const ::std::string &name, const ::ros::Duration &timeout)> SetTimerFn
The function to call when the mux needs to schedule a call to update() after some time.
Definition: priority_mux_base.h:92
cras::PriorityMux::lockStates
::std::map<::std::pair< int, ::std::string >, bool > lockStates
Current lock status of each lock topic (whether the lock is explicitly locked or not).
Definition: priority_mux_base.h:210
cras::PriorityMux::lockCb
virtual void lockCb(const ::std::string &topic, const ::ros::Time &stamp, bool locked, const ::ros::Time &now)
Callback function run when a lock message is received.
Definition: priority_mux_base.cpp:110
cras::PriorityMux::lastSelectedTopics
::std::unordered_map<::std::string, ::std::string > lastSelectedTopics
The currently selected topic for each output topic.
Definition: priority_mux_base.h:222
cras::LogHelperPtr
::cras::LogHelper::Ptr LogHelperPtr
ros::Time
CRAS_DEBUG
#define CRAS_DEBUG(...)
cras::PriorityMux::reset
virtual void reset(const ::ros::Time &now)
Resets the mux to its initial state.
Definition: priority_mux_base.cpp:225
cras::PriorityMux::disabledStamps
::std::map<::std::pair< int, ::std::string >, ::ros::Time > disabledStamps
The timestamps of the last received disable message for each input topic.
Definition: priority_mux_base.h:204
cras::PriorityMux::getLastSelectedTopics
const ::std::unordered_map<::std::string, ::std::string > & getLastSelectedTopics() const
Return the last selected topic for each output topic.
Definition: priority_mux_base.cpp:215
cras::PriorityMux::update
virtual void update(const ::ros::Time &now)
Updates the mux after changes have been made to some of its internal state.
Definition: priority_mux_base.cpp:136
ros::Duration
cras::PriorityMux::nonePriority
int nonePriority
Priority level signalling that no priority is active.
Definition: priority_mux_base.h:216


cras_topic_tools
Author(s): Martin Pecka
autogenerated on Mon Jun 17 2024 02:49:11