tf_filter_impl.h
Go to the documentation of this file.
1 /****************************************************************************
2  *
3  * fkie_message_filters
4  * Copyright © 2018-2020 Fraunhofer FKIE
5  * Author: Timo Röhling
6  *
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  * http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  *
19  ****************************************************************************/
20 
21 #ifndef INCLUDE_FKIE_MESSAGE_FILTERS_TF_FILTER_IMPL_H_
22 #define INCLUDE_FKIE_MESSAGE_FILTERS_TF_FILTER_IMPL_H_
23 
24 #include "tf_filter.h"
26 #include "helpers/tuple.h"
27 #include <mutex>
28 
29 namespace fkie_message_filters
30 {
31 
32 template<class... Inputs>
33 struct TfFilter<Inputs...>::Impl
34 {
35  static constexpr tf2::TransformableRequestHandle NeverTransformable = 0xffffffffffffffffull;
37 
38  struct MessageInfo
39  {
40  MessageInfo(const MessageTuple& m) : message(m) {}
41 
43  std::set<tf2::TransformableRequestHandle> handles;
44  };
45 
46  Impl(TfFilter<Inputs...>* parent, tf2::BufferCore& bc) noexcept
47  : bc_(bc), parent_(parent), cur_queue_size_(0), max_queue_size_(0), tf_handle_(0), cbq_(nullptr) {}
48 
50  {
51  std::lock_guard<std::mutex> lock{mutex_};
52  if (tf_handle_ != 0)
53  bc_.removeTransformableCallback(tf_handle_);
54  }
55 
56  tf2::TransformableRequestHandle make_transformable_request(std::shared_ptr<MessageInfo>& info, const std::string& target_frame, const std::string& source_frame, const ros::Time& time) noexcept
57  {
58  tf2::TransformableRequestHandle h = bc_.addTransformableRequest(tf_handle_, target_frame, source_frame, time);
59  if (h != NeverTransformable && h != TransformAvailable)
60  {
61  info->handles.insert(h);
62  requests_.emplace(h, info);
63  }
64  return h;
65  }
66 
67  void cancel_all_transformable_requests (std::shared_ptr<MessageInfo>& info) noexcept
68  {
69  for (const tf2::TransformableRequestHandle& h : info->handles)
70  {
71  bc_.cancelTransformableRequest(h);
72  requests_.erase(h);
73  }
74  info->handles.clear();
75  }
76 
78  TfFilter<Inputs...>* parent_;
79  std::mutex mutex_;
82  uint32_t cur_queue_size_, max_queue_size_;
85  std::map<tf2::TransformableRequestHandle, std::shared_ptr<MessageInfo>> requests_;
86  std::list<std::shared_ptr<MessageInfo>> messages_;
87 };
88 
89 template<class... Inputs>
90 class TfFilter<Inputs...>::RosCB : public ros::CallbackInterface
91 {
92 public:
93  RosCB(const std::weak_ptr<Impl>& parent, const MessageTuple& msg, TfFilterResult result) noexcept
94  : parent_(parent), msg_(msg), result_(result) {}
95 
96  CallResult call() noexcept override
97  {
98  std::shared_ptr<Impl> impl = parent_.lock();
99  if (impl)
100  {
101  if (result_ == TfFilterResult::TransformAvailable)
102  helpers::index_apply<sizeof...(Inputs)>
103  (
104  [this, &impl](auto... Is)
105  {
106  impl->parent_->send(std::get<Is>(this->msg_)...);
107  }
108  );
109  else
110  {
111  std::unique_lock<std::mutex> lock{impl->mutex_};
112  FilterFailureCB cb = impl->failure_cb_;
113  if (cb)
114  {
115  lock.unlock();
116  helpers::index_apply<sizeof...(Inputs)>
117  (
118  [this, &cb](auto... Is)
119  {
120  cb(std::get<Is>(this->msg_)..., this->result_);
121  }
122  );
123  }
124  }
125  }
126  return Success;
127  }
128 private:
129  std::weak_ptr<Impl> parent_;
132 };
133 
134 namespace
135 {
136 
137 std::string strip_slash(const std::string& s) noexcept
138 {
139  std::string::size_type n = s.find_first_not_of('/');
140  if (n == std::string::npos) return std::string();
141  return s.substr(n);
142 }
143 
144 }
145 
146 template<class... Inputs>
147 TfFilter<Inputs...>::TfFilter(tf2::BufferCore& bc, const std::string& target_frame, uint32_t queue_size, const ros::NodeHandle& nh) noexcept
148 {
149  init(bc, queue_size, nh);
150  set_target_frame(target_frame);
151 }
152 
153 template<class... Inputs>
154 TfFilter<Inputs...>::TfFilter(tf2::BufferCore& bc, const std::string& target_frame, uint32_t queue_size, ros::CallbackQueueInterface* cbq) noexcept
155 {
156  init(bc, queue_size, cbq);
157  set_target_frame(target_frame);
158 }
159 
160 template<class... Inputs>
161 TfFilter<Inputs...>::TfFilter(tf2::BufferCore& bc, const ros::V_string& target_frames, uint32_t queue_size, const ros::NodeHandle& nh) noexcept
162 {
163  init(bc, queue_size, nh);
164  set_target_frames(target_frames);
165 }
166 
167 template<class... Inputs>
168 TfFilter<Inputs...>::TfFilter(tf2::BufferCore& bc, const ros::V_string& target_frames, uint32_t queue_size, ros::CallbackQueueInterface* cbq) noexcept
169 {
170  init(bc, queue_size, cbq);
171  set_target_frames(target_frames);
172 }
173 
174 template<class... Inputs>
175 void TfFilter<Inputs...>::init(tf2::BufferCore& bc, uint32_t queue_size, ros::CallbackQueueInterface* cbq) noexcept
176 {
177  impl_ = std::make_shared<Impl>(this, bc);
178  impl_->max_queue_size_ = queue_size;
179  impl_->cbq_ = cbq;
180  impl_->tf_handle_ = impl_->bc_.addTransformableCallback(
181  [this](tf2::TransformableRequestHandle request_handle, const std::string& target_frame, const std::string& source_frame, const ros::Time& time, tf2::TransformableResult result)
182  {
183  this->transformable(request_handle, target_frame, source_frame, time, result);
184  }
185  );
186 }
187 
188 template<class... Inputs>
189 void TfFilter<Inputs...>::init(tf2::BufferCore& bc, uint32_t queue_size, const ros::NodeHandle& nh) noexcept
190 {
191  init(bc, queue_size, nh.getCallbackQueue());
192 }
193 
194 template<class... Inputs>
196 {
197  if (!impl_) throw std::logic_error("TfFilter object is not initialized");
198  std::unique_lock<std::mutex> lock{impl_->mutex_};
199  impl_->target_frames_.clear();
200  for (const std::string& f : target_frames)
201  {
202  std::string f2 = strip_slash(f);
203  if (!f2.empty()) impl_->target_frames_.push_back(f2);
204  }
205 }
206 
207 template<class... Inputs>
208 void TfFilter<Inputs...>::set_target_frame(const std::string& target_frame)
209 {
210  set_target_frames({target_frame});
211 }
212 
213 template<class... Inputs>
215 {
216  if (!impl_) return; // Nothing to do if object is not initialized
217  std::lock_guard<std::mutex> lock{impl_->mutex_};
218  for (auto& info : impl_->messages_)
219  {
220  impl_->cancel_all_transformable_requests(info);
221  }
222  impl_->messages_.clear();
223  impl_->cur_queue_size_ = 0;
224  assert(impl_->requests_.empty());
225 }
226 
227 template<class... Inputs>
229 {
230  if (!impl_) throw std::logic_error("TfFilter object is not initialized");
231  std::lock_guard<std::mutex> lock{impl_->mutex_};
232  impl_->failure_cb_ = f;
233 }
234 
235 template<class... Inputs>
236 void TfFilter<Inputs...>::receive (const Inputs&... in)
237 {
238  using MessageInfo = typename Impl::MessageInfo;
239  if (!impl_) return;
240  std::unique_lock<std::mutex> lock{impl_->mutex_};
241  if (impl_->target_frames_.empty()) return;
242  std::shared_ptr<MessageInfo> info = std::make_shared<MessageInfo>(std::forward_as_tuple(in...));
243  std::string source_frame = strip_slash(helpers::access_ros_header_frame_id(std::get<0>(info->message)));
244  if (source_frame.empty())
245  {
246  report_failure(lock, info->message, TfFilterResult::EmptyFrameID);
247  return;
248  }
249  ros::Time stamp = helpers::access_ros_header_stamp(std::get<0>(info->message));
250  ros::V_string target_frames = impl_->target_frames_;
251  for (const std::string& target_frame : target_frames)
252  {
253  tf2::TransformableRequestHandle h = impl_->make_transformable_request(info, target_frame, source_frame, stamp);
254  if (h == Impl::NeverTransformable)
255  {
256  impl_->cancel_all_transformable_requests(info);
257  report_failure(lock, info->message, TfFilterResult::TransformExpired);
258  return;
259  }
260  }
261  if (!info->handles.empty())
262  {
263  impl_->messages_.push_back(info);
264  ++impl_->cur_queue_size_;
265  while (impl_->cur_queue_size_ > impl_->max_queue_size_)
266  {
267  std::shared_ptr<MessageInfo> old = impl_->messages_.front();
268  impl_->messages_.pop_front();
269  --impl_->cur_queue_size_;
270  impl_->cancel_all_transformable_requests(old);
271  report_failure(lock, old->message, TfFilterResult::QueueOverflow);
272  }
273  }
274  else
275  {
276  send_message(lock, info->message);
277  }
278 }
279 
280 template<class... Inputs>
281 void TfFilter<Inputs...>::transformable(tf2::TransformableRequestHandle request_handle, const std::string& target_frame, const std::string& source_frame, ros::Time time, tf2::TransformableResult result)
282 {
283  using MessageInfo = typename Impl::MessageInfo;
284  std::unique_lock<std::mutex> lock{impl_->mutex_};
285  if (!impl_) return;
286  auto req = impl_->requests_.find(request_handle);
287  if (req == impl_->requests_.end()) return;
288  std::shared_ptr<MessageInfo> info = req->second;
289  impl_->requests_.erase(req);
290  if (!info) return; /* just in case */
291  info->handles.erase(request_handle);
292  if (info->handles.empty() || result != tf2::TransformAvailable)
293  {
294  impl_->cancel_all_transformable_requests(info);
295  auto msg = std::find(impl_->messages_.begin(), impl_->messages_.end(), info);
296  if (msg == impl_->messages_.end()) return;
297  impl_->messages_.erase(msg);
298  --impl_->cur_queue_size_;
299  if (result == tf2::TransformAvailable) /* Everything succeeded */
300  {
301  std::string source_frame = strip_slash(helpers::access_ros_header_frame_id(std::get<0>(info->message)));
302  ros::Time stamp = helpers::access_ros_header_stamp(std::get<0>(info->message));
303  for (const std::string& frame : impl_->target_frames_)
304  {
305  if (!impl_->bc_.canTransform(frame, source_frame, stamp))
306  {
307  report_failure(lock, info->message, TfFilterResult::UnknownFailure);
308  return;
309  }
310  }
311  send_message(lock, info->message);
312  }
313  else
314  {
315  report_failure(lock, info->message, TfFilterResult::TransformFailed);
316  }
317  }
318 }
319 
320 template<class... Inputs>
321 void TfFilter<Inputs...>::report_failure(std::unique_lock<std::mutex>& lock, const MessageTuple& msg, TfFilterResult reason)
322 {
323  if (!impl_) return;
324  if (impl_->cbq_)
325  {
326  impl_->cbq_->addCallback(boost::make_shared<RosCB>(impl_, msg, reason));
327  }
328  else
329  {
330  FilterFailureCB cb = impl_->failure_cb_;
331  if (cb)
332  {
333  auto unlock = helpers::with_scoped_unlock(lock);
334  helpers::index_apply<sizeof...(Inputs)>
335  (
336  [&cb, &msg, &reason](auto... Is)
337  {
338  cb(std::get<Is>(msg)..., reason);
339  }
340  );
341  }
342  }
343 }
344 
345 template<class... Inputs>
346 void TfFilter<Inputs...>::send_message(std::unique_lock<std::mutex>& lock, const MessageTuple& msg)
347 {
348  if (!impl_) return;
349  if (impl_->cbq_)
350  {
351  impl_->cbq_->addCallback(boost::make_shared<RosCB>(impl_, msg, TfFilterResult::TransformAvailable));
352  }
353  else
354  {
355  auto unlock = helpers::with_scoped_unlock(lock);
356  helpers::index_apply<sizeof...(Inputs)>
357  (
358  [this, &msg](auto... Is)
359  {
360  this->send(std::get<Is>(msg)...);
361  }
362  );
363  }
364 }
365 
366 } // namespace fkie_message_filters
367 
368 #endif /* INCLUDE_FKIE_MESSAGE_FILTERS_TF_FILTER_IMPL_H_ */
std::map< tf2::TransformableRequestHandle, std::shared_ptr< MessageInfo > > requests_
f
uint32_t TransformableCallbackHandle
std::string access_ros_header_frame_id(const M &m) noexcept
XmlRpcServer s
void set_filter_failure_function(FilterFailureCB cb)
Register callback for failed transforms.
Wait for TF transformations for incoming messages.
Definition: tf_filter.h:72
The message has been dropped because of a queue overflow.
The requested transform is unavailable.
static constexpr tf2::TransformableRequestHandle NeverTransformable
TransformableResult
void set_target_frames(const ros::V_string &target_frames)
Choose the TF target frames.
CallResult call() noexcept override
The message has an empty TF frame ID and cannot be transformed.
void cancel_all_transformable_requests(std::shared_ptr< MessageInfo > &info) noexcept
std::vector< std::string > V_string
void receive(const Inputs &... in) override
Process incoming data.
void reset() noexcept override
Reset filter state.
void transformable(tf2::TransformableRequestHandle request_handle, const std::string &target_frame, const std::string &source_frame, ros::Time time, tf2::TransformableResult result)
Impl(TfFilter< Inputs... > *parent, tf2::BufferCore &bc) noexcept
RosCB(const std::weak_ptr< Impl > &parent, const MessageTuple &msg, TfFilterResult result) noexcept
void init(tf2::BufferCore &bc, uint32_t queue_size, const ros::NodeHandle &nh) noexcept
Initialize the filter.
ros::CallbackQueueInterface * cbq_
void send_message(std::unique_lock< std::mutex > &, const MessageTuple &m)
Primary namespace.
Definition: buffer.h:33
ScopedUnlock< BasicLockable > with_scoped_unlock(BasicLockable &lockable)
Definition: scoped_unlock.h:59
void report_failure(std::unique_lock< std::mutex > &, const MessageTuple &, TfFilterResult)
void send(const Outputs &... out)
Pass data to all connected sinks.
Definition: source_impl.h:51
std::tuple< Inputs... > MessageTuple
Definition: tf_filter.h:192
std::set< tf2::TransformableRequestHandle > handles
TfFilterResult
TF transformation results.
Definition: tf_filter.h:37
tf2::TransformableRequestHandle make_transformable_request(std::shared_ptr< MessageInfo > &info, const std::string &target_frame, const std::string &source_frame, const ros::Time &time) noexcept
std::shared_ptr< Impl > impl_
Definition: tf_filter.h:194
tf2::TransformableCallbackHandle tf_handle_
The requested transform is no longer available, likely because the message is too old...
std::function< void(const Inputs &..., TfFilterResult)> FilterFailureCB
Callback for failed transform queries.
Definition: tf_filter.h:76
std::list< std::shared_ptr< MessageInfo > > messages_
TfFilter< Inputs... > * parent_
auto index_apply(Function f)
Definition: tuple.h:40
TfFilter() noexcept
Empty constructor.
Definition: tf_filter.h:82
void set_target_frame(const std::string &target_frame)
Choose the TF target frame.
uint64_t TransformableRequestHandle
ros::Time access_ros_header_stamp(const M &m) noexcept


fkie_message_filters
Author(s): Timo Röhling
autogenerated on Mon Feb 28 2022 22:21:43