15 #include <std_msgs/Bool.h> 16 #include <std_msgs/Int32.h> 17 #include <std_msgs/String.h> 42 CRAS_ERROR(
"Parameter ~topics is empty, priority mux will not do anything!");
46 std::list<std::pair<std::string, XmlRpc::XmlRpcValue>> topicItems;
50 for (
size_t i = 0; i < topicParams.
size(); ++i)
51 topicItems.emplace_back(
"[" +
cras::to_string(i) +
"]", topicParams[i]);
54 for (
const auto& item : topicParams)
55 topicItems.emplace_back(
"/" + item.first, item.second);
58 CRAS_ERROR(
"Parameter ~topics has to be either a list or a dict. Priority mux will not do anything!");
62 for (
const auto& item : topicItems)
64 const auto& key = item.first;
65 const auto& xmlConfig = item.second;
68 CRAS_ERROR(
"Item %s of ~topics has to be a dict, but %s was given.",
74 auto xmlParamAdapter = std::make_shared<cras::XmlRpcValueGetParamAdapter>(xmlConfig, itemNamespace);
80 config.
inTopic = xmlParams.getParam<std::string>(
"topic", cras::nullopt);
84 CRAS_ERROR(
"Item %s of ~topics has to contain key 'topic'. Skipping the item.", key.c_str());
88 config.
name = xmlParams.getParam(
"name", config.
inTopic);
89 config.
outTopic = xmlParams.getParam(
"out_topic", DEFAULT_OUT_TOPIC);
90 config.
priority = xmlParams.getParam(
"priority", 100,
"", {
true,
true});
92 config.
queueSize = xmlParams.getParam(
"queue_size", 10_sz,
"messages");
100 lockParams =
params->getParam(
"locks", lockParams);
101 std::list<std::pair<std::string, XmlRpc::XmlRpcValue>> lockItems;
105 for (
size_t i = 0; i < lockParams.
size(); ++i)
109 for (
const auto& item : lockParams)
110 lockItems.emplace_back(
"/" + item.first, item.second);
113 CRAS_ERROR(
"Parameter ~locks has to be either a list or a dict. No locks will be set.");
117 if (lockItems.empty())
120 for (
const auto& item : lockItems)
122 const auto& key = item.first;
123 const auto& xmlConfig = item.second;
126 CRAS_ERROR(
"Item %s of ~locks has to be a dict, but %s was given.",
132 auto xmlParamAdapter = std::make_shared<cras::XmlRpcValueGetParamAdapter>(xmlConfig, itemNamespace);
138 config.
topic = xmlParams.getParam<std::string>(
"topic", cras::nullopt);
142 CRAS_ERROR(
"Item %s of ~locks has to contain key 'topic'. Skipping the item.", key.c_str());
146 config.
name = xmlParams.getParam(
"name", config.
topic);
147 config.
priority = xmlParams.getParam(
"priority", 100,
"", {
true,
true});
149 config.
queueSize = xmlParams.getParam(
"queue_size", 10_sz,
"messages");
165 const auto& topicConfig = config.second;
169 const auto selectedTopic =
"selected/" +
cras::stripLeading(topicConfig.outTopic,
'/');
174 std_msgs::String selectedMsg;
177 CRAS_INFO(
"No topic is now selected for output topic %s.", topicConfig.outTopic.c_str());
182 topicConfig.inTopic, topicConfig.queueSize,
cb);
188 const auto& lockConfig = config.second;
198 std_msgs::Bool lockedMsg;
199 lockedMsg.data =
false;
201 CRAS_INFO(
"Lock %s is not locked.", lockConfig.name.c_str());
213 const auto& priority = topicConfig.priority;
214 const auto& outTopic = topicConfig.outTopic;
217 auto highestLockedPriority = std::numeric_limits<int>::min();
220 const auto& itPriority = it->first.first;
221 const auto& itTopic = it->first.second;
222 const auto& itStamp = it->second;
224 const auto& itTimeout = itConfig.timeout;
225 if (itPriority > highestLockedPriority && (itStamp + itTimeout <
ros::Time::now()))
226 highestLockedPriority = itPriority;
231 const auto& itPriority = it->first.first;
233 if (itPriority <= priority)
239 if (itPriority < highestLockedPriority)
241 CRAS_DEBUG(
"Priority %i not active, as a lock with priority %i is locked.",
242 priority, highestLockedPriority);
246 const auto& itTopic = it->first.second;
248 const auto& itStamp = it->second;
251 CRAS_DEBUG(
"Priority %i not active. Found higher active priority %i.", priority, itPriority);
256 auto& pub = this->
publishers[topicConfig.outTopic];
262 pub =
event.getConstMessage()->advertise(this->
getNodeHandle(), topicConfig.outTopic, this->outQueueSize, latch);
276 const auto& priority = lockConfig.priority;
277 const auto& timeout = lockConfig.timeout;
279 auto& stamp = this->
lastLockStamps[std::make_pair(priority, topic)];
284 std_msgs::Bool lockedMsg;
285 lockedMsg.data =
false;
287 CRAS_INFO(
"Lock %s is unlocked now.", lockConfig.name.c_str());
305 timer.setPeriod({0, 0},
true);
307 timer.setPeriod(timeout,
true);
322 CRAS_INFO(
"Priority %i is now active.", newPriority);
329 CRAS_INFO(
"Source topic '%s' is now selected for output topic '%s'.",
330 newTopic.c_str(), config.outTopic.c_str());
337 const auto& itPriority = topicConfig.second.priority;
338 if (itPriority >= newPriority)
341 const auto& itTopic = topicConfig.second.inTopic;
342 const auto outTopic = topicConfig.second.outTopic;
351 msg.data = newPriority;
363 std_msgs::String
msg;
384 std_msgs::String
msg;
388 CRAS_INFO(
"No topic is now selected for output topic %s.", outTopic.c_str());
const boost::shared_ptr< ConstMessage > & getConstMessage() const
::std::unordered_map<::std::string, ::ros::Publisher > lockedPublishers
::cras::BoundParamHelperPtr privateParams(const ::std::string &ns="") const
::ros::Timer priorityBackToNoneTimer
virtual void cb(const ::ros::MessageEvent<::topic_tools::ShapeShifter const > &event, const ::std::string &inTopic)
Timer createTimer(Rate r, Handler h, Obj o, bool oneshot=false, bool autostart=true) const
Subscriber subscribe(const std::string &topic, uint32_t queue_size, void(T::*fp)(M), T *obj, const TransportHints &transport_hints=TransportHints())
::std::unordered_map<::std::string, ::ros::Publisher > publishers
::std::map<::std::pair< int, ::std::string >, ::ros::Time > lastLockStamps
ros::NodeHandle & getNodeHandle() const
void setPeriod(const Duration &period, bool reset=true)
void publishPriorityChange(int newPriority, const ::ros::Duration &timeout)
inline ::std::string to_string(const ::XmlRpc::XmlRpcValue &value)
void onPriorityTimeout(const ::ros::TimerEvent &)
ros::NodeHandle & getPrivateNodeHandle() const
::ros::Publisher activePriorityPub
constexpr auto DEFAULT_OUT_TOPIC
virtual void lockCb(const ::std::string &topic, const ::std_msgs::BoolConstPtr &msg)
::std::map<::std::pair< int, ::std::string >, ::ros::Time > lastReceiveStamps
void stripLeading(::std::string &s, const char &c=' ')
Type const & getType() const
void publish(const boost::shared_ptr< M > &message) const
::std::list<::ros::Subscriber > subscribers
virtual void updatePriorities(int newPriority, const ::std::string &newTopic)
::std::unordered_map<::std::string, ::cras::priority_mux::TopicConfig > topicConfigs
void onLockTimeout(const ::std::string &topic, const ::ros::TimerEvent &)
auto bind_front(F &&f, Args &&... args)
M_string & getConnectionHeader() const
std::string resolveName(const std::string &name, bool remap=true) const
Publisher advertise(const std::string &topic, uint32_t queue_size, bool latch=false)
::std::unordered_map<::std::string, ::std::string > lastSelectedTopics
::std::unordered_map<::std::string, ::ros::Timer > selectedBackToNoneTimers
::std::unordered_map<::std::string, ::ros::Publisher > selectedPublishers
::std::unordered_map<::std::string, ::ros::Timer > lockTimeoutTimers
::std::unordered_map<::std::string, ::cras::priority_mux::LockConfig > lockConfigs
::cras::BoundParamHelperPtr params(const ::ros::NodeHandle &node, const ::std::string &ns="") const
const boost::shared_ptr< M_string > & getConnectionHeaderPtr() const
void onSelectedTopicTimeout(const ::std::string &outTopic, const ::ros::TimerEvent &)
static const ::std::string NONE_TOPIC
void publishSelectedTopicChange(const ::std::string &outTopic, const ::std::string &newTopic, const ::ros::Duration &timeout)