21 #ifndef INCLUDE_FKIE_MESSAGE_FILTERS_TF_FILTER_IMPL_H_ 22 #define INCLUDE_FKIE_MESSAGE_FILTERS_TF_FILTER_IMPL_H_ 32 template<
class... Inputs>
43 std::set<tf2::TransformableRequestHandle>
handles;
47 : bc_(bc), parent_(parent), cur_queue_size_(0), max_queue_size_(0), tf_handle_(0), cbq_(
nullptr) {}
51 std::lock_guard<std::mutex> lock{
mutex_};
53 bc_.removeTransformableCallback(tf_handle_);
59 if (h != NeverTransformable && h != TransformAvailable)
61 info->handles.insert(h);
62 requests_.emplace(h, info);
71 bc_.cancelTransformableRequest(h);
74 info->handles.clear();
85 std::map<tf2::TransformableRequestHandle, std::shared_ptr<MessageInfo>>
requests_;
89 template<
class... Inputs>
94 : parent_(parent), msg_(msg), result_(result) {}
98 std::shared_ptr<Impl> impl = parent_.lock();
104 [
this, &impl](
auto... Is)
106 impl->parent_->send(std::get<Is>(this->msg_)...);
111 std::unique_lock<std::mutex> lock{impl->mutex_};
118 [
this, &cb](
auto... Is)
120 cb(std::get<Is>(this->msg_)..., this->result_);
137 std::string strip_slash(
const std::string&
s) noexcept
139 std::string::size_type n = s.find_first_not_of(
'/');
140 if (n == std::string::npos)
return std::string();
146 template<
class... Inputs>
149 init(bc, queue_size, nh);
153 template<
class... Inputs>
156 init(bc, queue_size, cbq);
160 template<
class... Inputs>
163 init(bc, queue_size, nh);
167 template<
class... Inputs>
170 init(bc, queue_size, cbq);
174 template<
class... Inputs>
177 impl_ = std::make_shared<Impl>(
this, bc);
178 impl_->max_queue_size_ = queue_size;
180 impl_->tf_handle_ =
impl_->bc_.addTransformableCallback(
183 this->
transformable(request_handle, target_frame, source_frame, time, result);
188 template<
class... Inputs>
191 init(bc, queue_size, nh.getCallbackQueue());
194 template<
class... Inputs>
197 if (!
impl_)
throw std::logic_error(
"TfFilter object is not initialized");
198 std::unique_lock<std::mutex> lock{
impl_->mutex_};
199 impl_->target_frames_.clear();
200 for (
const std::string&
f : target_frames)
202 std::string f2 = strip_slash(
f);
203 if (!f2.empty())
impl_->target_frames_.push_back(f2);
207 template<
class... Inputs>
213 template<
class... Inputs>
217 std::lock_guard<std::mutex> lock{
impl_->mutex_};
218 for (
auto& info :
impl_->messages_)
220 impl_->cancel_all_transformable_requests(info);
222 impl_->messages_.clear();
223 impl_->cur_queue_size_ = 0;
224 assert(
impl_->requests_.empty());
227 template<
class... Inputs>
230 if (!
impl_)
throw std::logic_error(
"TfFilter object is not initialized");
231 std::lock_guard<std::mutex> lock{
impl_->mutex_};
232 impl_->failure_cb_ = f;
235 template<
class... Inputs>
238 using MessageInfo =
typename Impl::MessageInfo;
240 std::unique_lock<std::mutex> lock{
impl_->mutex_};
241 if (
impl_->target_frames_.empty())
return;
242 std::shared_ptr<MessageInfo> info = std::make_shared<MessageInfo>(std::forward_as_tuple(in...));
244 if (source_frame.empty())
251 for (
const std::string& target_frame : target_frames)
256 impl_->cancel_all_transformable_requests(info);
261 if (!info->handles.empty())
263 impl_->messages_.push_back(info);
264 ++
impl_->cur_queue_size_;
265 while (
impl_->cur_queue_size_ >
impl_->max_queue_size_)
267 std::shared_ptr<MessageInfo> old =
impl_->messages_.front();
268 impl_->messages_.pop_front();
269 --
impl_->cur_queue_size_;
270 impl_->cancel_all_transformable_requests(old);
280 template<
class... Inputs>
283 using MessageInfo =
typename Impl::MessageInfo;
284 std::unique_lock<std::mutex> lock{
impl_->mutex_};
286 auto req =
impl_->requests_.find(request_handle);
287 if (req ==
impl_->requests_.end())
return;
288 std::shared_ptr<MessageInfo> info = req->second;
289 impl_->requests_.erase(req);
291 info->handles.erase(request_handle);
294 impl_->cancel_all_transformable_requests(info);
295 auto msg = std::find(
impl_->messages_.begin(),
impl_->messages_.end(), info);
296 if (msg ==
impl_->messages_.end())
return;
297 impl_->messages_.erase(msg);
298 --
impl_->cur_queue_size_;
303 for (
const std::string& frame :
impl_->target_frames_)
305 if (!
impl_->bc_.canTransform(frame, source_frame, stamp))
320 template<
class... Inputs>
326 impl_->cbq_->addCallback(boost::make_shared<RosCB>(
impl_, msg, reason));
336 [&cb, &msg, &reason](
auto... Is)
338 cb(std::get<Is>(msg)..., reason);
345 template<
class... Inputs>
358 [
this, &msg](
auto... Is)
360 this->
send(std::get<Is>(msg)...);
std::map< tf2::TransformableRequestHandle, std::shared_ptr< MessageInfo > > requests_
MessageInfo(const MessageTuple &m)
uint32_t TransformableCallbackHandle
std::string access_ros_header_frame_id(const M &m) noexcept
void set_filter_failure_function(FilterFailureCB cb)
Register callback for failed transforms.
Wait for TF transformations for incoming messages.
FilterFailureCB failure_cb_
The message has been dropped because of a queue overflow.
The requested transform is unavailable.
static constexpr tf2::TransformableRequestHandle NeverTransformable
void set_target_frames(const ros::V_string &target_frames)
Choose the TF target frames.
CallResult call() noexcept override
The message has an empty TF frame ID and cannot be transformed.
void cancel_all_transformable_requests(std::shared_ptr< MessageInfo > &info) noexcept
std::vector< std::string > V_string
void receive(const Inputs &... in) override
Process incoming data.
void reset() noexcept override
Reset filter state.
void transformable(tf2::TransformableRequestHandle request_handle, const std::string &target_frame, const std::string &source_frame, ros::Time time, tf2::TransformableResult result)
Impl(TfFilter< Inputs... > *parent, tf2::BufferCore &bc) noexcept
RosCB(const std::weak_ptr< Impl > &parent, const MessageTuple &msg, TfFilterResult result) noexcept
void init(tf2::BufferCore &bc, uint32_t queue_size, const ros::NodeHandle &nh) noexcept
Initialize the filter.
ros::CallbackQueueInterface * cbq_
void send_message(std::unique_lock< std::mutex > &, const MessageTuple &m)
ScopedUnlock< BasicLockable > with_scoped_unlock(BasicLockable &lockable)
void report_failure(std::unique_lock< std::mutex > &, const MessageTuple &, TfFilterResult)
void send(const Outputs &... out)
Pass data to all connected sinks.
std::tuple< Inputs... > MessageTuple
std::set< tf2::TransformableRequestHandle > handles
TfFilterResult
TF transformation results.
tf2::TransformableRequestHandle make_transformable_request(std::shared_ptr< MessageInfo > &info, const std::string &target_frame, const std::string &source_frame, const ros::Time &time) noexcept
std::shared_ptr< Impl > impl_
tf2::TransformableCallbackHandle tf_handle_
The requested transform is no longer available, likely because the message is too old...
std::function< void(const Inputs &..., TfFilterResult)> FilterFailureCB
Callback for failed transform queries.
std::list< std::shared_ptr< MessageInfo > > messages_
std::weak_ptr< Impl > parent_
TfFilter< Inputs... > * parent_
ros::V_string target_frames_
auto index_apply(Function f)
TfFilter() noexcept
Empty constructor.
void set_target_frame(const std::string &target_frame)
Choose the TF target frame.
uint64_t TransformableRequestHandle
ros::Time access_ros_header_stamp(const M &m) noexcept