filter_utils/filter_chain_nodelet.hpp
Go to the documentation of this file.
1 #pragma once
2 
12 #include <memory>
13 #include <mutex>
14 #include <string>
15 #include <type_traits>
16 #include <unordered_map>
17 #include <unordered_set>
18 
19 #include <boost/thread/recursive_mutex.hpp>
20 
23 #include <dynamic_reconfigure/server.h>
24 #include <ros/duration.h>
25 #include <ros/message_traits.h>
26 #include <ros/node_handle.h>
27 #include <ros/publisher.h>
29 #include <ros/subscriber.h>
30 #include <ros/time.h>
31 
32 #include <cras_cpp_common/FilterChainConfig.h>
33 
39 
40 namespace cras
41 {
42 
43 template <typename F>
45 
98 template<class F>
100 {
101 public:
103  typedef F DataType;
104 
106  typedef ::ros::message_traits::DataType<F> MsgDataType;
107 
116  FilterChainNodelet(const ::std::string& dataType, const ::std::string& topicIn, const ::std::string& topicFiltered,
117  const ::std::string& configNamespace);
118 
126  FilterChainNodelet(const ::std::string& topicIn, const ::std::string& topicFiltered,
127  const ::std::string& configNamespace);
128 
134  explicit FilterChainNodelet(const ::std::string& configNamespace);
135 
136  ~FilterChainNodelet() override;
137 
141  void enableFilterPublishers();
142 
147 
152  void setDisabledFilters(::std::unordered_set<::std::string> filters);
153 
158  void setMaxAge(const ::ros::Duration& maxAge);
159 
160 protected:
165  void dynreconfCallback(::cras_cpp_common::FilterChainConfig& config, uint32_t);
166 
170  void updateDynamicParams() const;
171 
172  void onInit() override;
173 
179  void subscribe();
180 
184  virtual void connectCb(const ::ros::SingleSubscriberPublisher&);
185 
189  virtual void disconnectCb(const ::ros::SingleSubscriberPublisher&);
190 
197  template <typename T, ::std::enable_if_t<::ros::message_traits::HasHeader<T>::value, bool> = true>
198  ::ros::Time getStamp(const typename T::ConstPtr& data);
199 
206  template <typename T, ::std::enable_if_t<!::ros::message_traits::HasHeader<T>::value, bool> = true>
207  ::ros::Time getStamp(const typename T::ConstPtr& data);
208 
215  virtual void dataCallback(const typename F::ConstPtr& data);
216 
224  virtual void filterStartCallback(const F& data, size_t filterNum, const ::std::string& name,
225  const ::std::string& type);
226 
235  virtual void filterFinishedCallback(const F& data, size_t filterNum, const ::std::string& name,
236  const ::std::string& type, bool success);
237 
240 
243 
246 
248  ::std::unique_ptr<::cras::DiagnosedSubscriber<F>> subscriberDiag;
249 
252 
255 
257  ::std::unique_ptr<::cras::DiagnosedPublisher<F>> publisherDiag;
258 
260  ::std::string topicIn;
261 
263  ::std::string topicFiltered;
264 
266  ::std::string configNamespace;
267 
269  bool publishFilters {false};
270 
272  size_t publisherQueueSize {15_sz};
273 
275  ::std::unordered_map<::std::string, ::ros::Publisher> filterPublishers;
276 
278  ::std::mutex filterPublishersMutex;
279 
281  ::std::mutex connectMutex;
282 
284  bool lazySubscription {true};
285 
287  size_t subscriberQueueSize {15_sz};
288 
291 
294 
297 
300 
302  mutable ::boost::recursive_mutex configMutex;
303 
305  ::std::unique_ptr<::dynamic_reconfigure::Server<::cras_cpp_common::FilterChainConfig>> dynreconfServer;
306 
308  ::std::unique_ptr<::cras::DurationStatus> callbackDurationDiag;
309 
311  ::std::unordered_map<::std::string, ::std::unique_ptr<::cras::DurationStatus>> filterCallbackDurationDiags;
312 
314  ::std::unique_ptr<::cras::FilterChainDiagnostics<F>> chainDiag;
315 };
316 
321 template <typename F>
323 {
324 public:
329  FilterChainDiagnostics(const ::std::string& name, const ::cras::FilterChain<F>& chain);
330 
336  void addReport(const ::std::string& filterName, bool success);
337 
338  void run(::diagnostic_updater::DiagnosticStatusWrapper& stat) override;
339 
340 protected:
342  const ::cras::FilterChain<F>& chain;
343 
345  ::std::mutex mutex;
346 
348  ::std::unordered_map<::std::string, size_t> numCallbacks;
349 
351  ::std::unordered_map<::std::string, size_t> numSuccesses;
352 
354  ::std::unordered_map<::std::string, size_t> numFailures;
355 };
356 
357 }
358 
359 #include "impl/filter_chain_nodelet.hpp"
cras::FilterChainNodelet::configMutex
mutable ::boost::recursive_mutex configMutex
Mutex protecting the dynamic reconfigure options, maxAge and publishFilters.
Definition: filter_utils/filter_chain_nodelet.hpp:302
filter_chain.hpp
Helpers for working with filter chains based on filters::FilterChain.
cras::FilterChainNodelet::connectCb
virtual void connectCb(const ::ros::SingleSubscriberPublisher &)
Callback called when a new subscriber of the output topic appears.
cras::FilterChainNodelet::setMaxAge
void setMaxAge(const ::ros::Duration &maxAge)
Set the maximum age of messages that are accepted and filtered.
diag_utils.hpp
Utilities for working with diagnostics-related tools.
node_handle.h
cras::FilterChainNodelet::enableFilterPublishers
void enableFilterPublishers()
Enable publishers of single filter results.
cras::FilterChainNodelet::chainDiag
::std::unique_ptr<::cras::FilterChainDiagnostics< F > > chainDiag
Diagnostic task for overall chain performance.
Definition: filter_utils/filter_chain_nodelet.hpp:314
cras::FilterChainNodelet::getStamp
::ros::Time getStamp(const typename T::ConstPtr &data)
Get timestamp from a message with header.
cras::FilterChainNodelet::dataCallback
virtual void dataCallback(const typename F::ConstPtr &data)
Callback called when a new message arrives on the input topic.
filters
cras
Definition: any.hpp:15
duration_status.h
Diagnostic task for duration of some task.
cras::FilterChainNodelet::filterPublishersMutex
::std::mutex filterPublishersMutex
Mutex that protects filterPublishers.
Definition: filter_utils/filter_chain_nodelet.hpp:278
cras::FilterChainNodelet::onInit
void onInit() override
time.h
cras::FilterChainNodelet::dynreconfServer
::std::unique_ptr<::dynamic_reconfigure::Server<::cras_cpp_common::FilterChainConfig > > dynreconfServer
Dynamic reconfigure server.
Definition: filter_utils/filter_chain_nodelet.hpp:305
cras::FilterChainNodelet::publisherDiag
::std::unique_ptr<::cras::DiagnosedPublisher< F > > publisherDiag
Publisher diagnostics.
Definition: filter_utils/filter_chain_nodelet.hpp:257
cras::FilterChainNodelet::publisherQueueSize
size_t publisherQueueSize
Queue size for publishers.
Definition: filter_utils/filter_chain_nodelet.hpp:272
cras::FilterChainDiagnostics::numSuccesses
::std::unordered_map<::std::string, size_t > numSuccesses
The number of successful filter runs since last update.
Definition: filter_utils/filter_chain_nodelet.hpp:351
cras::FilterChainNodelet::configNamespace
::std::string configNamespace
ROS parameter namespace from which filter configuration will be read.
Definition: filter_utils/filter_chain_nodelet.hpp:266
cras::FilterChainNodelet::publishTopicDiagnostics
bool publishTopicDiagnostics
Whether topic frequency/delay statistics should be published to diagnostics.
Definition: filter_utils/filter_chain_nodelet.hpp:290
cras::FilterChainNodelet::filterPublishers
::std::unordered_map<::std::string, ::ros::Publisher > filterPublishers
Publishers for individual filter results.
Definition: filter_utils/filter_chain_nodelet.hpp:275
cras::FilterChain
Definition: filter_chain.hpp:42
publisher.h
diagnostic_updater.h
cras::FilterChainNodelet::nodeHandle
::ros::NodeHandle nodeHandle
Public NodeHandle.
Definition: filter_utils/filter_chain_nodelet.hpp:239
cras::FilterChainNodelet::dynreconfCallback
void dynreconfCallback(::cras_cpp_common::FilterChainConfig &config, uint32_t)
Dynamic reconfiguration of parameters.
cras::FilterChainNodelet::FilterChainNodelet
FilterChainNodelet(const ::std::string &dataType, const ::std::string &topicIn, const ::std::string &topicFiltered, const ::std::string &configNamespace)
Read ROS parameters, initialize publishers/subscribers, initialize other class members....
cras::FilterChainNodelet::filterFinishedCallback
virtual void filterFinishedCallback(const F &data, size_t filterNum, const ::std::string &name, const ::std::string &type, bool success)
Callback called after each filter finishes working on a message.
cras::FilterChainDiagnostics::addReport
void addReport(const ::std::string &filterName, bool success)
Call this function every time a filter finished callback is called.
cras::FilterChainDiagnostics::numCallbacks
::std::unordered_map<::std::string, size_t > numCallbacks
The overall number of callbacks since last update.
Definition: filter_utils/filter_chain_nodelet.hpp:348
diagnostic_updater::DiagnosticTask
duration.h
cras::FilterChainDiagnostics::mutex
::std::mutex mutex
Mutex protecting numCallbacks, numSuccesses and numFailures.
Definition: filter_utils/filter_chain_nodelet.hpp:345
cras::FilterChainNodelet
A versatile nodelet that can load and run a filter chain.
Definition: filter_utils/filter_chain_nodelet.hpp:99
diagnosed_pub_sub.hpp
ROS message publisher and subscriber with automatic rate and delay diagnostics.
message_traits.h
cras::FilterChainNodelet::publishDurationDiagnostics
bool publishDurationDiagnostics
Whether callback duration statistics should be published to diagnostics.
Definition: filter_utils/filter_chain_nodelet.hpp:293
subscriber.h
cras::NodeletWithDiagnostics< ::nodelet::Nodelet >::data
::std::unique_ptr<::cras::impl::NodeletWithDiagnosticsPrivate > data
PIMPL.
Definition: nodelet_with_diagnostics.hpp:774
cras::FilterChainNodelet::disableFilterPublishers
void disableFilterPublishers()
Disable publishers of single filter results.
cras::FilterChainNodelet::lazySubscription
bool lazySubscription
Whether to use the lazy subscription model (subscribe to input only when someone subscribes to output...
Definition: filter_utils/filter_chain_nodelet.hpp:284
cras::FilterChainNodelet::subscriberQueueSize
size_t subscriberQueueSize
Queue size for subscriber.
Definition: filter_utils/filter_chain_nodelet.hpp:287
cras::FilterChainNodelet::disconnectCb
virtual void disconnectCb(const ::ros::SingleSubscriberPublisher &)
Callback called when a subscriber of the output topic disappears.
cras::FilterChainNodelet::filterChain
::cras::FilterChain< F > filterChain
The chain of filters to apply to the incoming messages.
Definition: filter_utils/filter_chain_nodelet.hpp:251
DiagnosticStatusWrapper.h
cras::FilterChainNodelet::filteredPublisher
::ros::Publisher filteredPublisher
Publisher for the filtered data.
Definition: filter_utils/filter_chain_nodelet.hpp:254
cras::FilterChainNodelet::subscriberDiag
::std::unique_ptr<::cras::DiagnosedSubscriber< F > > subscriberDiag
Subscriber diagnostics.
Definition: filter_utils/filter_chain_nodelet.hpp:248
cras::FilterChainDiagnostics::FilterChainDiagnostics
FilterChainDiagnostics(const ::std::string &name, const ::cras::FilterChain< F > &chain)
ros::NodeHandle
cras::FilterChainNodelet::privateNodeHandle
::ros::NodeHandle privateNodeHandle
Private NodeHandle.
Definition: filter_utils/filter_chain_nodelet.hpp:242
cras::FilterChainNodelet::subscribe
void subscribe()
Subscribe to the input topic.
ros::Publisher
cras::FilterChainDiagnostics::numFailures
::std::unordered_map<::std::string, size_t > numFailures
The number of failed filter runs since last update.
Definition: filter_utils/filter_chain_nodelet.hpp:354
nodelet_utils.hpp
This file contains a set of classes that make work with nodelets easier.
cras::FilterChainNodelet::connectMutex
::std::mutex connectMutex
Mutex that protects subscriber, subscriberDiag.
Definition: filter_utils/filter_chain_nodelet.hpp:281
cras::FilterChainNodelet::publishFilters
bool publishFilters
Whether to create a publisher for each filter. Can be changed during runtime via enableFilterPublishe...
Definition: filter_utils/filter_chain_nodelet.hpp:269
single_subscriber_publisher.h
cras::FilterChainNodelet::DataType
F DataType
Type of the filtered data.
Definition: filter_utils/filter_chain_nodelet.hpp:103
ros::Time
cras::FilterChainNodelet::~FilterChainNodelet
~FilterChainNodelet() override
cras::FilterChainDiagnostics::chain
const ::cras::FilterChain< F > & chain
The diagnosed chain.
Definition: filter_utils/filter_chain_nodelet.hpp:342
cras::FilterChainNodelet::topicFiltered
::std::string topicFiltered
Topic for outgoing (filtered) messages.
Definition: filter_utils/filter_chain_nodelet.hpp:263
cras::FilterChainDiagnostics::run
void run(::diagnostic_updater::DiagnosticStatusWrapper &stat) override
cras::FilterChainNodelet::callbackDurationDiag
::std::unique_ptr<::cras::DurationStatus > callbackDurationDiag
Diagnostic task for overall callback duration.
Definition: filter_utils/filter_chain_nodelet.hpp:308
cras::Nodelet
Definition: nodelet_utils.hpp:111
cras::FilterChainNodelet::filterCallbackDurationDiags
::std::unordered_map<::std::string, ::std::unique_ptr<::cras::DurationStatus > > filterCallbackDurationDiags
Diagnostic tasks for duration of each filter callback.
Definition: filter_utils/filter_chain_nodelet.hpp:311
cras::FilterChainNodelet::filterStartCallback
virtual void filterStartCallback(const F &data, size_t filterNum, const ::std::string &name, const ::std::string &type)
Callback called before each filter starts working on a message.
diagnostic_updater::DiagnosticStatusWrapper
cras::FilterChainDiagnostics
Diagnostics of performance of a filter chain.
Definition: filter_utils/filter_chain_nodelet.hpp:44
ros::Subscriber
cras::FilterChainNodelet::subscriber
::ros::Subscriber subscriber
Subscriber to data.
Definition: filter_utils/filter_chain_nodelet.hpp:245
cras::FilterChainNodelet::MsgDataType
::ros::message_traits::DataType< F > MsgDataType
Accessor to DataType field of the filtered message type.
Definition: filter_utils/filter_chain_nodelet.hpp:106
ros::Duration
cras::FilterChainNodelet::publishChainDiagnostics
bool publishChainDiagnostics
Whether overall chain diagnostics should be published to diagnostics.
Definition: filter_utils/filter_chain_nodelet.hpp:296
cras::FilterChainNodelet::setDisabledFilters
void setDisabledFilters(::std::unordered_set<::std::string > filters)
Set which filters are temporarily disabled.
cras::FilterChainNodelet::updateDynamicParams
void updateDynamicParams() const
Update dynamic parameters on the server to correspond to current settings of this nodelet.
cras::FilterChainNodelet::maxAge
::ros::Duration maxAge
Maximum age of a message for it to be considered. Older messages are thrown away when received.
Definition: filter_utils/filter_chain_nodelet.hpp:299
cras::FilterChainNodelet::topicIn
::std::string topicIn
Topic for incoming messages.
Definition: filter_utils/filter_chain_nodelet.hpp:260


cras_cpp_common
Author(s): Martin Pecka
autogenerated on Wed Apr 23 2025 02:48:42