21 #ifndef INCLUDE_FKIE_MESSAGE_FILTERS_TF_FILTER_IMPL_H_
22 #define INCLUDE_FKIE_MESSAGE_FILTERS_TF_FILTER_IMPL_H_
32 template<
class... Inputs>
33 struct TfFilter<Inputs...>::Impl
43 std::set<tf2::TransformableRequestHandle> handles;
47 : bc_(bc), parent_(parent), cur_queue_size_(0), max_queue_size_(0), tf_handle_(0), cbq_(
nullptr) {}
51 std::lock_guard<std::mutex> lock{
mutex_};
53 bc_.removeTransformableCallback(tf_handle_);
59 if (h != NeverTransformable && h != TransformAvailable)
61 info->handles.insert(h);
62 requests_.emplace(h, info);
67 void cancel_all_transformable_requests (std::shared_ptr<MessageInfo>& info) noexcept
71 bc_.cancelTransformableRequest(h);
74 info->handles.clear();
82 uint32_t cur_queue_size_, max_queue_size_;
85 std::map<tf2::TransformableRequestHandle, std::shared_ptr<MessageInfo>> requests_;
86 std::list<std::shared_ptr<MessageInfo>> messages_;
89 template<
class... Inputs>
94 : parent_(parent), msg_(msg), result_(result) {}
96 CallResult
call() noexcept
override
98 std::shared_ptr<Impl> impl = parent_.lock();
104 [
this, &impl](
auto... Is)
106 impl->parent_->send(std::get<Is>(this->msg_)...);
111 std::unique_lock<std::mutex> lock{impl->mutex_};
118 [
this, &cb](
auto... Is)
120 cb(std::get<Is>(this->msg_)..., this->result_);
129 std::weak_ptr<Impl> parent_;
137 std::string strip_slash(
const std::string&
s) noexcept
139 std::string::size_type n =
s.find_first_not_of(
'/');
140 if (n == std::string::npos)
return std::string();
146 template<
class... Inputs>
150 set_target_frame(target_frame);
153 template<
class... Inputs>
156 init(bc, queue_size, cbq);
157 set_target_frame(target_frame);
160 template<
class... Inputs>
163 init(bc, queue_size, nh);
164 set_target_frames(target_frames);
167 template<
class... Inputs>
170 init(bc, queue_size, cbq);
171 set_target_frames(target_frames);
174 template<
class... Inputs>
177 impl_ = std::make_shared<Impl>(
this, bc);
178 impl_->max_queue_size_ = queue_size;
180 impl_->tf_handle_ = impl_->bc_.addTransformableCallback(
183 this->transformable(request_handle, target_frame, source_frame, time, result);
188 template<
class... Inputs>
191 init(bc, queue_size, nh.getCallbackQueue());
194 template<
class... Inputs>
197 if (!impl_)
throw std::logic_error(
"TfFilter object is not initialized");
198 std::unique_lock<std::mutex> lock{impl_->mutex_};
199 impl_->target_frames_.clear();
200 for (
const std::string&
f : target_frames)
202 std::string f2 = strip_slash(
f);
203 if (!f2.empty()) impl_->target_frames_.push_back(f2);
207 template<
class... Inputs>
210 set_target_frames({target_frame});
213 template<
class... Inputs>
217 std::lock_guard<std::mutex> lock{impl_->
mutex_};
218 for (
auto& info : impl_->messages_)
220 impl_->cancel_all_transformable_requests(info);
222 impl_->messages_.clear();
223 impl_->cur_queue_size_ = 0;
224 assert(impl_->requests_.empty());
227 template<
class... Inputs>
230 if (!impl_)
throw std::logic_error(
"TfFilter object is not initialized");
231 std::lock_guard<std::mutex> lock{impl_->mutex_};
232 impl_->failure_cb_ =
f;
235 template<
class... Inputs>
238 using MessageInfo =
typename Impl::MessageInfo;
240 std::unique_lock<std::mutex> lock{impl_->
mutex_};
241 if (impl_->target_frames_.empty())
return;
242 std::shared_ptr<MessageInfo> info = std::make_shared<MessageInfo>(std::forward_as_tuple(in...));
244 if (source_frame.empty())
246 report_failure(lock, info->message, TfFilterResult::EmptyFrameID);
251 for (
const std::string& target_frame : target_frames)
254 if (h == Impl::NeverTransformable)
256 impl_->cancel_all_transformable_requests(info);
257 report_failure(lock, info->message, TfFilterResult::TransformExpired);
261 if (!info->handles.empty())
263 impl_->messages_.push_back(info);
264 ++impl_->cur_queue_size_;
265 while (impl_->cur_queue_size_ > impl_->max_queue_size_)
267 std::shared_ptr<MessageInfo> old = impl_->messages_.front();
268 impl_->messages_.pop_front();
269 --impl_->cur_queue_size_;
270 impl_->cancel_all_transformable_requests(old);
271 report_failure(lock, old->message, TfFilterResult::QueueOverflow);
276 send_message(lock, info->message);
280 template<
class... Inputs>
283 using MessageInfo =
typename Impl::MessageInfo;
284 std::unique_lock<std::mutex> lock{impl_->mutex_};
286 auto req = impl_->requests_.find(request_handle);
287 if (req == impl_->requests_.end())
return;
288 std::shared_ptr<MessageInfo> info = req->second;
289 impl_->requests_.erase(req);
291 info->handles.erase(request_handle);
294 impl_->cancel_all_transformable_requests(info);
295 auto msg = std::find(impl_->messages_.begin(), impl_->messages_.end(), info);
296 if (msg == impl_->messages_.end())
return;
297 impl_->messages_.erase(msg);
298 --impl_->cur_queue_size_;
303 for (
const std::string& frame : impl_->target_frames_)
305 if (!impl_->bc_.canTransform(frame, source_frame, stamp))
307 report_failure(lock, info->message, TfFilterResult::UnknownFailure);
311 send_message(lock, info->message);
315 report_failure(lock, info->message, TfFilterResult::TransformFailed);
320 template<
class... Inputs>
326 impl_->cbq_->addCallback(boost::make_shared<RosCB>(impl_, msg, reason));
330 FilterFailureCB cb = impl_->failure_cb_;
336 [&cb, &msg, &reason](
auto... Is)
338 cb(std::get<Is>(msg)..., reason);
345 template<
class... Inputs>
351 impl_->cbq_->addCallback(boost::make_shared<RosCB>(impl_, msg, TfFilterResult::TransformAvailable));
358 [
this, &msg](
auto... Is)
360 this->send(std::get<Is>(msg)...);