bag_writer.cpp
Go to the documentation of this file.
1 // Output thread
2 // Author: Max Schwarz <max.schwarz@ais.uni-bonn.de>
3 
4 #include "bag_writer.h"
5 #include "message_queue.h"
6 #include "topic_manager.h"
7 
8 #include <rosfmt/rosfmt.h>
9 
10 #include <boost/filesystem.hpp>
11 #include <boost/range/iterator_range.hpp>
12 #include <map>
13 
14 #include <ros/node_handle.h>
15 
16 #include <tf2_msgs/TFMessage.h>
17 
18 namespace fs = boost::filesystem;
19 
20 namespace rosbag_fancy
21 {
22 
23 namespace
24 {
25  std::string timeToString(const ros::Time& cur_ros_t)
26  {
27  std::time_t cur_t = cur_ros_t.sec;
28  std::stringstream ss;
29  ss << std::put_time(std::localtime(&cur_t), "%Y-%m-%d-%H-%M-%S");
30  return ss.str();
31  }
32 
33  std::string getNewFilename(const std::string& old)
34  {
35  for(int i = 2; i < 10; ++i)
36  {
37  auto tst_filename = fmt::format("{}.{}", old, i);
38  if(!fs::exists(tst_filename))
39  return tst_filename;
40  }
41 
42  return {};
43  }
44 
45  std::vector<fs::path> getBagFilesInCurrentFolder(const std::string& filename)
46  {
47  std::vector<fs::path> bagFiles;
48  auto path = fs::absolute(fs::path(filename)).parent_path();
49 
50  for(auto& entry : boost::make_iterator_range(fs::directory_iterator(path), {}))
51  {
52  auto filePath = entry.path();
53  if(fs::extension(filePath) == ".bag")
54  {
55  bagFiles.emplace_back(filePath);
56  }
57  }
58 
59  return bagFiles;
60  }
61 
62  std::vector<fs::path> sortFilesByTime(const std::vector<fs::path>& files)
63  {
64  std::multimap<std::time_t,fs::path> timeFiles;
65  for(auto & entry : files)
66  {
67  timeFiles.emplace(fs::last_write_time(entry),entry);
68  }
69 
70  std::vector<fs::path> timeSortedFiles;
71  if(!timeFiles.empty())
72  std::transform(timeFiles.begin(), timeFiles.end(), std::back_inserter(timeSortedFiles), [](auto &kv){ return kv.second;});
73 
74  return timeSortedFiles;
75  }
76 
77  std::uint64_t getTotalSizeInBytes(const std::vector<fs::path>& files)
78  {
79  std::uint64_t totalSizeInBytes = 0;
80  for(auto& entry : files)
81  totalSizeInBytes += fs::file_size(entry);
82 
83  return totalSizeInBytes;
84  }
85 }
86 
87 BagWriter::BagWriter(rosbag_fancy::MessageQueue& queue, const std::string& filename, Naming namingMode, std::uint64_t splitSizeInBytes, std::uint64_t deleteOldAtInBytes)
88  : m_queue{queue}
89  , m_filename{filename}
90  , m_namingMode{namingMode}
91  , m_splitSizeInBytes{splitSizeInBytes}
92  , m_deleteOldAtInBytes{deleteOldAtInBytes}
93 {
94  ros::NodeHandle nh;
95  m_freeSpaceTimer = nh.createSteadyTimer(ros::WallDuration(5.0),
96  boost::bind(&BagWriter::checkFreeSpace, this)
97  );
98  checkFreeSpace();
99 
100  m_tf_header = boost::make_shared<std::map<std::string, std::string>>();
101  {
102  auto& connectionHeader = *m_tf_header;
103  connectionHeader["topic"] = "/tf_static";
104  connectionHeader["callerid"] = ros::this_node::getName();
105  connectionHeader["md5sum"] = ros::message_traits::MD5Sum<tf2_msgs::TFMessage>::value();
106  connectionHeader["type"] = "tf2_msgs/TFMessage";
107  connectionHeader["message_definition"] = ros::message_traits::Definition<tf2_msgs::TFMessage>::value();
108  connectionHeader["latching"] = "1";
109  }
110 
111  m_thread = std::thread{std::bind(&BagWriter::run, this)};
112 
113  if(m_deleteOldAtInBytes != 0)
114  m_cleanup_thread = std::thread{std::bind(&BagWriter::cleanupThread, this)};
115 }
116 
118 {
119  m_shouldShutdown = true;
120  m_queue.shutdown();
121  m_thread.join();
122 
123  if(m_deleteOldAtInBytes != 0)
124  {
125  m_cleanupCondition.notify_one();
126  m_cleanup_thread.join();
127  }
128 }
129 
131 {
132  while(!m_shouldShutdown)
133  {
134  auto msg = m_queue.pop();
135 
136  {
137  std::unique_lock<std::mutex> lock(m_mutex);
138  if(m_running && msg)
139  {
140  m_bag.write(msg->topic, msg->message);
142 
143  if(msg->topicData->id >= m_messageCounts.size())
144  {
145  m_messageCounts.resize(msg->topicData->id+1, 0);
146  m_byteCounts.resize(msg->topicData->id+1, 0);
147  }
148 
149  m_messageCounts[msg->topicData->id]++;
150  m_byteCounts[msg->topicData->id] += msg->size();
151  }
152  }
153 
154  if(msg && msg->topic == "/tf_static")
155  {
156  auto shifter = msg->message.getMessage();
157  auto tf_msg = shifter->instantiate<tf2_msgs::TFMessage>();
158  if(tf_msg)
159  {
160  for(auto& transformMsg : tf_msg->transforms)
161  m_tf_buf.setTransform(transformMsg, "bag", true);
162  }
163  }
164 
166  {
167  m_isReopeningBag = true;
168  stop();
169  start();
170  m_isReopeningBag = false;
171  }
172  }
173 }
174 
176 {
177  std::unique_lock<std::mutex> lock(m_mutex);
178 
179  if(m_running)
180  return;
181 
182  m_messageCounts.clear();
183 
184  std::string filename;
185  switch(m_namingMode)
186  {
188  filename = fmt::format("{}_{}.bag", m_filename, timeToString(ros::Time::now()));
189  break;
190  case Naming::Verbatim:
191  filename = m_filename;
192  break;
193  }
194 
195  if(!m_bagOpen)
196  {
197  // Don't overwrite existing files
198  if(fs::exists(filename))
199  {
200  auto replacement = getNewFilename(filename);
201  if(replacement.empty())
202  {
203  ROSFMT_ERROR("Could not find a bag file name to write to (template '{}')", filename);
204  ROSFMT_ERROR("Ignoring start request.");
205  m_bagOpen = false;
206  m_running = false;
207  return;
208  }
209 
210  filename = replacement;
211  }
212 
213  // We need to hold the cleanup mutex here to make sure the cleanup thread
214  // is aware of the current bag file name.
215  std::unique_lock<std::mutex> lock(m_cleanupMutex);
216 
217  ROSFMT_INFO("Opening bag file: {}", filename.c_str());
218  m_bag.open(filename, rosbag::bagmode::Write);
220 
221  m_expandedFilename = filename;
222 
223  // Write all known transforms to /tf_static
224  {
225  auto tf_msg = boost::make_shared<tf2_msgs::TFMessage>();
226  std::vector<std::string> frames;
227  m_tf_buf._getFrameStrings(frames);
228 
229  for(auto& frame : frames)
230  {
231  std::string parent;
232  if(!m_tf_buf._getParent(frame, ros::Time(0), parent))
233  continue;
234 
235  geometry_msgs::TransformStamped transform = m_tf_buf.lookupTransform(parent, frame, ros::Time(0));
236  tf_msg->transforms.push_back(std::move(transform));
237  }
238 
240 
241  m_bag.write("/tf_static", event);
242  }
243  }
244 
245  m_bagOpen = true;
246  m_running = true;
247 }
248 
250 {
251  std::unique_lock<std::mutex> lock(m_mutex);
252 
253  switch(m_namingMode)
254  {
256  m_bag.close();
257  m_bagOpen = false;
258  break;
259  case Naming::Verbatim:
260  break;
261  }
262 
263  m_running = false;
264 
265  ROSFMT_INFO("Recording stopped.");
266 }
267 
269 {
270  namespace fs = boost::filesystem;
271 
272  auto path = fs::absolute(fs::path(m_filename)).parent_path();
273  auto space = fs::space(path);
274 
275  m_freeSpace = space.available;
276 }
277 
279 {
280  namespace fs = boost::filesystem;
281 
282  using namespace std::literals;
283 
284  while(!m_shouldShutdown)
285  {
286  std::vector<fs::path> bagFiles = getBagFilesInCurrentFolder(m_filename);
287  m_directorySizeInBytes = getTotalSizeInBytes(bagFiles);
288 
290  {
291  std::unique_lock<std::mutex> lock(m_cleanupMutex);
292 
293  // explicit new computation, since there might have been some changes in the mean time.
294  // Here we are protected by the mutex, so filename change can happen in the meantime.
295  std::vector<fs::path> bagFiles = getBagFilesInCurrentFolder(m_filename);
296  std::uint64_t directorySizeInBytes = getTotalSizeInBytes(bagFiles);
297  std::vector<fs::path> timeSortedBagFiles = sortFilesByTime(bagFiles);
298 
299  fs::path currentPath = fs::path(bagfileName());
300 
301  for(auto& entry : timeSortedBagFiles)
302  {
303  // do not delete current one!
304  if(fs::equivalent(entry, currentPath))
305  continue;
306 
307  std::uint64_t bagSize = fs::file_size(entry);
308 
309  ROSFMT_INFO("Bag directory requires too much space. Removing old bag file: {}", entry.c_str());
310  fs::remove(entry);
311 
312  directorySizeInBytes -= bagSize;
313 
314  // Already finished?
316  break;
317  }
318 
320  {
321  ROSFMT_WARN("I could not remove enough bag files to make the bag file directory small enough.");
322  }
323 
325  }
326 
327  // Sleep for defined time, but exit when requested
328  std::unique_lock<std::mutex> lock(m_cleanupMutex);
329  m_cleanupCondition.wait_for(lock, 5s, [&]{ return m_shouldShutdown; });
330 
331  if(m_shouldShutdown)
332  break;
333  }
334 }
335 
337 {
338  std::unique_lock<std::mutex> lock{m_mutex};
339  m_compressionType = type;
340  m_bag.setCompression(type);
341 }
342 
343 }
rosfmt.h
rosbag_fancy::MessageQueue::shutdown
void shutdown()
Definition: message_queue.cpp:56
rosbag_fancy::MessageQueue
Definition: message_queue.h:23
rosbag_fancy::BagWriter::m_bag
rosbag::Bag m_bag
Definition: bag_writer.h:89
rosbag_fancy::BagWriter::m_freeSpace
std::uint64_t m_freeSpace
Definition: bag_writer.h:98
node_handle.h
rosbag_fancy::BagWriter::m_expandedFilename
std::string m_expandedFilename
Definition: bag_writer.h:82
rosbag_fancy::BagWriter::BagWriter
BagWriter(MessageQueue &queue, const std::string &filename, Naming namingMode, std::uint64_t splitSizeInBytes, std::uint64_t deleteOldAtInBytes)
Definition: bag_writer.cpp:87
rosbag_fancy::BagWriter::m_isReopeningBag
bool m_isReopeningBag
Definition: bag_writer.h:84
rosbag_fancy::MessageQueue::pop
boost::optional< Message > pop()
Definition: message_queue.cpp:32
rosbag_fancy::BagWriter::stop
void stop()
Definition: bag_writer.cpp:249
s
XmlRpcServer s
rosbag_fancy
Definition: bag_reader.cpp:240
tf2::BufferCore::setTransform
bool setTransform(const geometry_msgs::TransformStamped &transform, const std::string &authority, bool is_static=false)
message_queue.h
rosbag_fancy::BagWriter::Naming
Naming
Definition: bag_writer.h:26
rosbag_fancy::BagWriter::m_messageCounts
std::vector< std::uint64_t > m_messageCounts
Definition: bag_writer.h:111
rosbag::Bag::close
void close()
ROSFMT_WARN
#define ROSFMT_WARN(...)
topic_manager.h
rosbag_fancy::BagWriter::m_mutex
std::mutex m_mutex
Definition: bag_writer.h:103
rosbag_fancy::BagWriter::bagfileName
std::string bagfileName() const
Definition: bag_writer.h:58
rosbag::Bag::getSize
uint64_t getSize() const
tf2::BufferCore::_getFrameStrings
void _getFrameStrings(std::vector< std::string > &ids) const
rosbag_fancy::BagWriter::Naming::AppendTimestamp
@ AppendTimestamp
rosbag_fancy::BagWriter::m_namingMode
Naming m_namingMode
Definition: bag_writer.h:80
rosbag::Bag::setCompression
void setCompression(CompressionType compression)
rosbag_fancy::BagWriter::cleanupThread
void cleanupThread()
Definition: bag_writer.cpp:278
bag_writer.h
rosbag_fancy::BagWriter::m_running
std::atomic< bool > m_running
Definition: bag_writer.h:102
rosbag_fancy::BagWriter::start
void start()
Definition: bag_writer.cpp:175
ros::message_traits::MD5Sum::value
static const char * value()
rosbag_fancy::BagWriter::m_sizeInBytes
std::atomic< std::uint64_t > m_sizeInBytes
Definition: bag_writer.h:97
rosbag::bagmode::Write
Write
rosbag_fancy::BagWriter::checkFreeSpace
void checkFreeSpace()
Definition: bag_writer.cpp:268
rosbag_fancy::BagWriter::m_bagOpen
bool m_bagOpen
Definition: bag_writer.h:90
rosbag_fancy::BagWriter::m_shouldShutdown
bool m_shouldShutdown
Definition: bag_writer.h:95
rosbag_fancy::BagWriter::directorySizeInBytes
std::uint64_t directorySizeInBytes() const
Definition: bag_writer.h:49
rosbag_fancy::BagWriter::m_compressionType
rosbag::compression::CompressionType m_compressionType
Definition: bag_writer.h:114
ROSFMT_ERROR
#define ROSFMT_ERROR(...)
TimeBase< Time, Duration >::sec
uint32_t sec
format
std::string format
Definition: ui.cpp:76
rosbag_fancy::BagWriter::run
void run()
Definition: bag_writer.cpp:130
rosbag::compression::CompressionType
CompressionType
ros::this_node::getName
const ROSCPP_DECL std::string & getName()
rosbag::Bag::write
void write(std::string const &topic, ros::MessageEvent< T > const &event)
rosbag_fancy::BagWriter::setCompression
void setCompression(rosbag::compression::CompressionType type)
Definition: bag_writer.cpp:336
ros::Time
ros::NodeHandle::createSteadyTimer
SteadyTimer createSteadyTimer(SteadyTimerOptions &ops) const
rosbag_fancy::BagWriter::m_cleanupMutex
std::mutex m_cleanupMutex
Definition: bag_writer.h:105
rosbag_fancy::BagWriter::m_deleteOldAtInBytes
std::uint64_t m_deleteOldAtInBytes
Definition: bag_writer.h:86
rosbag_fancy::BagWriter::m_tf_header
boost::shared_ptr< std::map< std::string, std::string > > m_tf_header
Definition: bag_writer.h:109
rosbag_fancy::BagWriter::m_filename
std::string m_filename
Definition: bag_writer.h:79
ros::message_traits::Definition::value
static const char * value()
rosbag_fancy::BagWriter::m_tf_buf
tf2_ros::Buffer m_tf_buf
Definition: bag_writer.h:108
rosbag_fancy::BagWriter::m_directorySizeInBytes
std::atomic< std::uint64_t > m_directorySizeInBytes
Definition: bag_writer.h:87
rosbag_fancy::BagWriter::Naming::Verbatim
@ Verbatim
rosbag_fancy::BagWriter::~BagWriter
~BagWriter()
Definition: bag_writer.cpp:117
rosbag_fancy::BagWriter::m_cleanupCondition
std::condition_variable m_cleanupCondition
Definition: bag_writer.h:106
ros::WallDuration
rosbag_fancy::BagWriter::m_cleanup_thread
std::thread m_cleanup_thread
Definition: bag_writer.h:93
rosbag::Bag::open
void open(std::string const &filename, uint32_t mode=bagmode::Read)
rosbag_fancy::BagWriter::m_byteCounts
std::vector< std::uint64_t > m_byteCounts
Definition: bag_writer.h:112
rosbag_fancy::BagWriter::m_queue
MessageQueue & m_queue
Definition: bag_writer.h:77
rosbag_fancy::BagWriter::m_thread
std::thread m_thread
Definition: bag_writer.h:92
rosbag_fancy::BagWriter::m_splitSizeInBytes
std::uint64_t m_splitSizeInBytes
Definition: bag_writer.h:85
tf2_ros::Buffer::lookupTransform
virtual geometry_msgs::TransformStamped lookupTransform(const std::string &target_frame, const ros::Time &target_time, const std::string &source_frame, const ros::Time &source_time, const std::string &fixed_frame, const ros::Duration timeout) const
tf2::BufferCore::_getParent
bool _getParent(const std::string &frame_id, ros::Time time, std::string &parent) const
ros::NodeHandle
ros::MessageEvent
ros::Time::now
static Time now()
ROSFMT_INFO
#define ROSFMT_INFO(...)


rosbag_fancy
Author(s):
autogenerated on Tue Feb 20 2024 03:20:59