filter_utils/filter_chain_nodelet.hpp
Go to the documentation of this file.
1 #pragma once
2 
12 #include <memory>
13 #include <mutex>
14 #include <string>
15 #include <type_traits>
16 #include <unordered_map>
17 #include <unordered_set>
18 
19 #include <boost/thread/recursive_mutex.hpp>
20 
23 #include <dynamic_reconfigure/server.h>
24 #include <ros/duration.h>
25 #include <ros/message_traits.h>
26 #include <ros/node_handle.h>
27 #include <ros/publisher.h>
29 #include <ros/subscriber.h>
30 #include <ros/time.h>
31 
32 #include <cras_cpp_common/FilterChainConfig.h>
33 
39 
40 namespace cras
41 {
42 
43 template <typename F>
45 
98 template<class F>
100 {
101 public:
103  typedef F DataType;
104 
106  typedef ::ros::message_traits::DataType<F> MsgDataType;
107 
116  FilterChainNodelet(const ::std::string& dataType, const ::std::string& topicIn, const ::std::string& topicFiltered,
117  const ::std::string& configNamespace);
118 
126  FilterChainNodelet(const ::std::string& topicIn, const ::std::string& topicFiltered,
127  const ::std::string& configNamespace);
128 
134  explicit FilterChainNodelet(const ::std::string& configNamespace);
135 
136  ~FilterChainNodelet() override;
137 
141  void enableFilterPublishers();
142 
147 
152  void setDisabledFilters(::std::unordered_set<::std::string> filters);
153 
158  void setMaxAge(const ::ros::Duration& maxAge);
159 
160 protected:
165  void dynreconfCallback(::cras_cpp_common::FilterChainConfig& config, uint32_t);
166 
170  void updateDynamicParams() const;
171 
172  void onInit() override;
173 
179  void subscribe();
180 
184  virtual void connectCb(const ::ros::SingleSubscriberPublisher&);
185 
189  virtual void disconnectCb(const ::ros::SingleSubscriberPublisher&);
190 
197  template <typename T, ::std::enable_if_t<::ros::message_traits::HasHeader<T>::value, bool> = true>
198  ::ros::Time getStamp(const typename T::ConstPtr& data);
199 
206  template <typename T, ::std::enable_if_t<!::ros::message_traits::HasHeader<T>::value, bool> = true>
207  ::ros::Time getStamp(const typename T::ConstPtr& data);
208 
215  virtual void dataCallback(const typename F::ConstPtr& data);
216 
224  virtual void filterStartCallback(const F& data, size_t filterNum, const ::std::string& name,
225  const ::std::string& type);
226 
235  virtual void filterFinishedCallback(const F& data, size_t filterNum, const ::std::string& name,
236  const ::std::string& type, bool success);
237 
240 
243 
246 
248  ::std::unique_ptr<::cras::DiagnosedSubscriber<F>> subscriberDiag;
249 
252 
255 
257  ::std::unique_ptr<::cras::DiagnosedPublisher<F>> publisherDiag;
258 
260  ::std::string topicIn;
261 
263  ::std::string topicFiltered;
264 
266  ::std::string configNamespace;
267 
269  bool publishFilters {false};
270 
272  size_t publisherQueueSize {15_sz};
273 
275  ::std::unordered_map<::std::string, ::ros::Publisher> filterPublishers;
276 
278  ::std::mutex filterPublishersMutex;
279 
281  ::std::mutex connectMutex;
282 
284  bool lazySubscription {true};
285 
287  size_t subscriberQueueSize {15_sz};
288 
291 
294 
297 
300 
302  mutable ::boost::recursive_mutex configMutex;
303 
305  ::std::unique_ptr<::dynamic_reconfigure::Server<::cras_cpp_common::FilterChainConfig>> dynreconfServer;
306 
308  ::std::unique_ptr<::cras::DurationStatus> callbackDurationDiag;
309 
311  ::std::unordered_map<::std::string, ::std::unique_ptr<::cras::DurationStatus>> filterCallbackDurationDiags;
312 
314  ::std::unique_ptr<::cras::FilterChainDiagnostics<F>> chainDiag;
315 };
316 
321 template <typename F>
323 {
324 public:
329  FilterChainDiagnostics(const ::std::string& name, const ::cras::FilterChain<F>& chain);
330 
336  void addReport(const ::std::string& filterName, bool success);
337 
338  void run(::diagnostic_updater::DiagnosticStatusWrapper& stat) override;
339 
340 protected:
342  const ::cras::FilterChain<F>& chain;
343 
345  ::std::mutex mutex;
346 
348  ::std::unordered_map<::std::string, size_t> numCallbacks;
349 
351  ::std::unordered_map<::std::string, size_t> numSuccesses;
352 
354  ::std::unordered_map<::std::string, size_t> numFailures;
355 };
356 
357 }
358 
359 #include "impl/filter_chain_nodelet.hpp"
::cras::FilterChain< F > filterChain
The chain of filters to apply to the incoming messages.
::std::unordered_map<::std::string, size_t > numSuccesses
The number of successful filter runs since last update.
::std::unique_ptr<::cras::DiagnosedPublisher< F > > publisherDiag
Publisher diagnostics.
void disableFilterPublishers()
Disable publishers of single filter results.
::ros::Publisher filteredPublisher
Publisher for the filtered data.
void run(class_loader::ClassLoader *loader)
bool publishFilters
Whether to create a publisher for each filter. Can be changed during runtime via enableFilterPublishe...
::std::unique_ptr<::cras::impl::NodeletWithDiagnosticsPrivate > data
PIMPL.
virtual void filterStartCallback(const F &data, size_t filterNum, const ::std::string &name, const ::std::string &type)
Callback called before each filter starts working on a message.
void updateDynamicParams() const
Update dynamic parameters on the server to correspond to current settings of this nodelet...
size_t subscriberQueueSize
Queue size for subscriber.
::std::unique_ptr<::dynamic_reconfigure::Server<::cras_cpp_common::FilterChainConfig > > dynreconfServer
Dynamic reconfigure server.
::std::mutex mutex
Mutex protecting numCallbacks, numSuccesses and numFailures.
::std::unique_ptr<::cras::DiagnosedSubscriber< F > > subscriberDiag
Subscriber diagnostics.
::ros::NodeHandle nodeHandle
Public NodeHandle.
::ros::message_traits::DataType< F > MsgDataType
Accessor to DataType field of the filtered message type.
::std::unordered_map<::std::string, ::std::unique_ptr<::cras::DurationStatus > > filterCallbackDurationDiags
Diagnostic tasks for duration of each filter callback.
bool lazySubscription
Whether to use the lazy subscription model (subscribe to input only when someone subscribes to output...
virtual void dataCallback(const typename F::ConstPtr &data)
Callback called when a new message arrives on the input topic.
ROS message publisher and subscriber with automatic rate and delay diagnostics.
::std::mutex filterPublishersMutex
Mutex that protects filterPublishers.
::std::string topicFiltered
Topic for outgoing (filtered) messages.
::std::unique_ptr<::cras::FilterChainDiagnostics< F > > chainDiag
Diagnostic task for overall chain performance.
::ros::NodeHandle privateNodeHandle
Private NodeHandle.
void subscribe()
Subscribe to the input topic.
::ros::Subscriber subscriber
Subscriber to data.
Utilities for working with diagnostics-related tools.
::std::string configNamespace
ROS parameter namespace from which filter configuration will be read.
::ros::Duration maxAge
Maximum age of a message for it to be considered. Older messages are thrown away when received...
virtual void filterFinishedCallback(const F &data, size_t filterNum, const ::std::string &name, const ::std::string &type, bool success)
Callback called after each filter finishes working on a message.
::std::unordered_map<::std::string, ::ros::Publisher > filterPublishers
Publishers for individual filter results.
::std::unordered_map<::std::string, size_t > numCallbacks
The overall number of callbacks since last update.
void onInit() override
void setMaxAge(const ::ros::Duration &maxAge)
Set the maximum age of messages that are accepted and filtered.
size_t publisherQueueSize
Queue size for publishers.
~FilterChainNodelet() override
virtual void connectCb(const ::ros::SingleSubscriberPublisher &)
Callback called when a new subscriber of the output topic appears.
::std::string topicIn
Topic for incoming messages.
::ros::Time getStamp(const typename T::ConstPtr &data)
Get timestamp from a message with header.
mutable ::boost::recursive_mutex configMutex
Mutex protecting the dynamic reconfigure options, maxAge and publishFilters.
Diagnostic task for duration of some task.
bool publishDurationDiagnostics
Whether callback duration statistics should be published to diagnostics.
This file contains a set of classes that make work with nodelets easier.
Definition: any.hpp:15
void setDisabledFilters(::std::unordered_set<::std::string > filters)
Set which filters are temporarily disabled.
bool publishChainDiagnostics
Whether overall chain diagnostics should be published to diagnostics.
::std::unordered_map<::std::string, size_t > numFailures
The number of failed filter runs since last update.
bool publishTopicDiagnostics
Whether topic frequency/delay statistics should be published to diagnostics.
Diagnostics of performance of a filter chain.
void enableFilterPublishers()
Enable publishers of single filter results.
Helpers for working with filter chains based on filters::FilterChain.
const ::cras::FilterChain< F > & chain
The diagnosed chain.
A versatile nodelet that can load and run a filter chain.
::std::unique_ptr<::cras::DurationStatus > callbackDurationDiag
Diagnostic task for overall callback duration.
virtual void disconnectCb(const ::ros::SingleSubscriberPublisher &)
Callback called when a subscriber of the output topic disappears.
void dynreconfCallback(::cras_cpp_common::FilterChainConfig &config, uint32_t)
Dynamic reconfiguration of parameters.
FilterChainNodelet(const ::std::string &dataType, const ::std::string &topicIn, const ::std::string &topicFiltered, const ::std::string &configNamespace)
Read ROS parameters, initialize publishers/subscribers, initialize other class members. ROS::init() is assumed to have been called before.
::std::mutex connectMutex
Mutex that protects subscriber, subscriberDiag.


cras_cpp_common
Author(s): Martin Pecka
autogenerated on Sat Jun 17 2023 02:32:53