tf2_scanner.cpp
Go to the documentation of this file.
1 // Scans multiple bag files for tf2 messages and aggregates them
2 // Author: Max Schwarz <max.schwarz@ais.uni-bonn.de>
3 
4 #include "tf2_scanner.h"
5 
6 #include <tf2_msgs/TFMessage.h>
7 
8 #include <mutex>
9 #include <thread>
10 #include <condition_variable>
11 
12 #include <rosbag/bag.h>
13 
14 #include "bag_view.h"
15 
16 #include "doctest.h"
17 
18 namespace rosbag_fancy
19 {
20 
21 namespace
22 {
23  bool tf2StaticPredicate(const BagReader::Connection& con)
24  {
25  return con.topicInBag == "/tf_static" && con.type == "tf2_msgs/TFMessage";
26  }
27 }
28 
30 {
31 public:
32  explicit Private(const std::vector<BagReader*>& readers)
33  {
34  for(auto& reader : readers)
35  m_view.addBag(reader, tf2StaticPredicate);
36 
37  m_thread = std::thread{[&](){ process(); }};
38  }
39 
41  {
42  m_shouldExit = true;
43  m_thread.join();
44  }
45 
46  const tf2_msgs::TFMessage* fetchUpdate(const ros::Time& time)
47  {
48  std::unique_lock lock{m_mutex};
49 
50  // Wait until the message list is populated until the point we are interested in
51  m_cond.wait(lock, [&]() -> bool {
52  return m_scanningFinished || (!m_msgs.empty() && m_scannedTime >= time);
53  });
54 
55  // No messages?
56  if(m_msgs.empty())
57  return {};
58 
59  // If we made a skip back in time, reset the cache idx
60  if(m_msgs[m_cacheIdx].stamp > time)
61  {
62  m_cacheIdx = 0;
63  m_sentIdx = -1;
64 
65  // If time is earlier than the first entry, we have no transforms.
66  if(m_msgs[m_cacheIdx].stamp > time)
67  return &m_emptyMessage;
68  }
69 
70  // Advance
71  while(m_cacheIdx < m_msgs.size()-1 && m_msgs[m_cacheIdx+1].stamp < time)
72  {
73  m_cacheIdx++;
74  }
75 
76  if(static_cast<int>(m_cacheIdx) != m_sentIdx)
77  {
79  return &m_msgs[m_cacheIdx].msg;
80  }
81  else
82  return {};
83  }
84 
85 private:
86  struct Msg
87  {
89  tf2_msgs::TFMessage msg;
90  };
91 
92  void process()
93  {
94  std::unordered_map<std::string, geometry_msgs::TransformStamped> transforms;
95 
96  for(auto& pmsg : m_view)
97  {
98  if(m_shouldExit)
99  return;
100 
101  auto msg = pmsg.msg->instantiate<tf2_msgs::TFMessage>();
102  if(!msg)
103  continue;
104 
105  bool changed = false;
106  for(auto& transform : msg->transforms)
107  {
108  auto [it, inserted] = transforms.try_emplace(transform.child_frame_id, transform);
109  if(inserted)
110  changed = true;
111  else
112  {
113  if(it->second != transform)
114  {
115  it->second = transform;
116  changed = true;
117  }
118  }
119  }
120 
121  if(changed)
122  {
123  std::unique_lock lock{m_mutex};
124 
125  auto& entry = m_msgs.emplace_back();
126  entry.stamp = pmsg.msg->stamp;
127  entry.msg.transforms.reserve(transforms.size());
128  for(auto& [_, transform] : transforms)
129  entry.msg.transforms.push_back(transform);
130 
131  m_scannedTime = pmsg.msg->stamp;
132 
133  m_cond.notify_all();
134  }
135  }
136 
137  {
138  std::unique_lock lock{m_mutex};
140  m_scanningFinished = true;
141  m_cond.notify_all();
142  }
143  }
144 
146 
147  std::atomic_bool m_shouldExit = false;
148 
149  std::mutex m_mutex;
150  std::vector<Msg> m_msgs;
152  bool m_scanningFinished = false;
153  std::condition_variable m_cond;
154 
155  std::thread m_thread;
156 
157  unsigned int m_cacheIdx = 0;
158  int m_sentIdx = -1;
159 
160  tf2_msgs::TFMessage m_emptyMessage{};
161 };
162 
163 TF2Scanner::TF2Scanner(const std::vector<BagReader*>& readers)
164  : m_d{std::make_unique<Private>(readers)}
165 {
166 }
167 
169 {
170 }
171 
172 const tf2_msgs::TFMessage* TF2Scanner::fetchUpdate(const ros::Time& time)
173 {
174  return m_d->fetchUpdate(time);
175 }
176 
177 
178 // TESTS
179 
180 TEST_CASE("TF2Scanner")
181 {
182  // Generate a bag file
183  // NOTE: This generates a bag file which is slightly different, since
184  // the latch=true header is not set for tf_static messages.
185  // For the purpose of testing the above, this is enough, however.
186  char bagfileName[] = "/tmp/rosbag_fancy_test_XXXXXX";
187  {
188  int fd = mkstemp(bagfileName);
189  REQUIRE(fd >= 0);
190  close(fd);
191 
192  rosbag::Bag bag{bagfileName, rosbag::BagMode::Write};
193 
194  {
195  std_msgs::Header msg;
196  msg.frame_id = "a";
197  bag.write("/topicA", ros::Time(1000, 0), msg);
198  }
199  {
200  std_msgs::Header msg;
201  msg.frame_id = "b";
202  bag.write("/topicB", ros::Time(1001, 0), msg);
203  }
204  {
205  tf2_msgs::TFMessage msg;
206  msg.transforms.resize(2);
207  msg.transforms[0].header.frame_id = "base_link";
208  msg.transforms[0].child_frame_id = "arm_link";
209  msg.transforms[0].transform.translation.x = 1.0;
210  msg.transforms[1].header.frame_id = "base_link";
211  msg.transforms[1].child_frame_id = "leg_link";
212  msg.transforms[1].transform.translation.x = 4.0;
213  bag.write("/tf_static", ros::Time(1002, 0), msg);
214  }
215  {
216  tf2_msgs::TFMessage msg;
217  msg.transforms.resize(1);
218  msg.transforms[0].header.frame_id = "base_link";
219  msg.transforms[0].child_frame_id = "arm_link";
220  msg.transforms[0].transform.translation.x = 2.0;
221  bag.write("/tf_static", ros::Time(1010, 0), msg);
222  }
223 
224  bag.close();
225  }
226 
227  BagReader reader{bagfileName};
228 
229  TF2Scanner scanner{{&reader}};
230 
231  {
232  auto msg = scanner.fetchUpdate(ros::Time(0));
233  REQUIRE(msg);
234  CAPTURE(*msg);
235  CHECK(msg->transforms.size() == 0);
236  }
237  {
238  auto msg = scanner.fetchUpdate(ros::Time(1005));
239  REQUIRE(msg);
240  CAPTURE(*msg);
241  REQUIRE(msg->transforms.size() == 2);
242 
243  auto it = std::find_if(msg->transforms.begin(), msg->transforms.end(), [&](auto& trans){ return trans.child_frame_id == "arm_link"; });
244  REQUIRE(it != msg->transforms.end());
245  CHECK(it->transform.translation.x == doctest::Approx(1.0));
246  }
247  {
248  auto msg = scanner.fetchUpdate(ros::Time(1006));
249  REQUIRE(!msg);
250  }
251  {
252  auto msg = scanner.fetchUpdate(ros::Time(1012));
253  REQUIRE(msg);
254  CAPTURE(*msg);
255  REQUIRE(msg->transforms.size() == 2);
256 
257  auto it = std::find_if(msg->transforms.begin(), msg->transforms.end(), [&](auto& trans){ return trans.child_frame_id == "arm_link"; });
258  REQUIRE(it != msg->transforms.end());
259  CHECK(it->transform.translation.x == doctest::Approx(2.0));
260  }
261 
262  {
263  auto msg = scanner.fetchUpdate(ros::Time(1015));
264  REQUIRE(!msg);
265  }
266 
267  {
268  auto msg = scanner.fetchUpdate(ros::Time(1005));
269  REQUIRE(msg);
270  CAPTURE(*msg);
271  REQUIRE(msg->transforms.size() == 2);
272 
273  auto it = std::find_if(msg->transforms.begin(), msg->transforms.end(), [&](auto& trans){ return trans.child_frame_id == "arm_link"; });
274  REQUIRE(it != msg->transforms.end());
275  CHECK(it->transform.translation.x == doctest::Approx(1.0));
276  }
277 
278  unlink(bagfileName);
279 }
280 
281 }
rosbag_fancy::TF2Scanner::fetchUpdate
const tf2_msgs::TFMessage * fetchUpdate(const ros::Time &time)
Fetch next aggregated message.
Definition: tf2_scanner.cpp:172
doctest::Approx
Definition: doctest.h:1171
rosbag_fancy::TF2Scanner::Private::m_cond
std::condition_variable m_cond
Definition: tf2_scanner.cpp:153
rosbag_fancy::TF2Scanner::Private::m_cacheIdx
unsigned int m_cacheIdx
Definition: tf2_scanner.cpp:157
rosbag::Bag
tf2_scanner.h
rosbag_fancy::TF2Scanner::Private::m_emptyMessage
tf2_msgs::TFMessage m_emptyMessage
Definition: tf2_scanner.cpp:160
bag_view.h
rosbag_fancy::TF2Scanner::Private::m_shouldExit
std::atomic_bool m_shouldExit
Definition: tf2_scanner.cpp:147
rosbag_fancy::TF2Scanner::Private::m_scanningFinished
bool m_scanningFinished
Definition: tf2_scanner.cpp:152
rosbag_fancy
Definition: bag_reader.cpp:240
rosbag_fancy::TF2Scanner::m_d
std::unique_ptr< Private > m_d
Definition: tf2_scanner.h:33
rosbag_fancy::TF2Scanner::~TF2Scanner
~TF2Scanner()
Definition: tf2_scanner.cpp:168
rosbag_fancy::TF2Scanner::Private
Definition: tf2_scanner.cpp:29
rosbag_fancy::BagView
Definition: bag_view.h:12
bag.h
rosbag_fancy::TF2Scanner::Private::m_view
BagView m_view
Definition: tf2_scanner.cpp:145
rosbag_fancy::TF2Scanner
Definition: tf2_scanner.h:17
REQUIRE
#define REQUIRE(...)
Definition: doctest.h:2934
rosbag_fancy::TF2Scanner::Private::Msg::stamp
ros::Time stamp
Definition: tf2_scanner.cpp:88
rosbag_fancy::BagView::endTime
ros::Time endTime() const
Definition: bag_view.cpp:237
rosbag_fancy::TF2Scanner::Private::~Private
~Private()
Definition: tf2_scanner.cpp:40
rosbag_fancy::TF2Scanner::Private::m_mutex
std::mutex m_mutex
Definition: tf2_scanner.cpp:149
rosbag_fancy::TF2Scanner::Private::Msg
Definition: tf2_scanner.cpp:86
rosbag_fancy::TF2Scanner::Private::Msg::msg
tf2_msgs::TFMessage msg
Definition: tf2_scanner.cpp:89
rosbag_fancy::TF2Scanner::Private::m_scannedTime
ros::Time m_scannedTime
Definition: tf2_scanner.cpp:151
CHECK
#define CHECK(...)
Definition: doctest.h:2927
rosbag_fancy::TF2Scanner::TF2Scanner
TF2Scanner(const std::vector< BagReader * > &readers)
Definition: tf2_scanner.cpp:163
rosbag_fancy::TF2Scanner::Private::process
void process()
Definition: tf2_scanner.cpp:92
rosbag_fancy::TF2Scanner::Private::m_sentIdx
int m_sentIdx
Definition: tf2_scanner.cpp:158
rosbag_fancy::BagView::addBag
void addBag(BagReader *reader)
Definition: bag_view.cpp:227
rosbag_fancy::BagReader
Definition: bag_reader.h:21
rosbag_fancy::TEST_CASE
TEST_CASE("BagView: One file")
Definition: bag_reader.cpp:831
rosbag::Bag::write
void write(std::string const &topic, ros::MessageEvent< T > const &event)
ros::Time
rosbag_fancy::TF2Scanner::Private::m_msgs
std::vector< Msg > m_msgs
Definition: tf2_scanner.cpp:150
rosbag_fancy::TF2Scanner::Private::fetchUpdate
const tf2_msgs::TFMessage * fetchUpdate(const ros::Time &time)
Definition: tf2_scanner.cpp:46
doctest.h
rosbag_fancy::TF2Scanner::Private::Private
Private(const std::vector< BagReader * > &readers)
Definition: tf2_scanner.cpp:32
rosbag_fancy::TF2Scanner::Private::m_thread
std::thread m_thread
Definition: tf2_scanner.cpp:155
CAPTURE
#define CAPTURE(x)
Definition: doctest.h:2911


rosbag_fancy
Author(s):
autogenerated on Tue Feb 20 2024 03:20:59