cmd_record.cpp
Go to the documentation of this file.
1 // record command
2 // Author: Max Schwarz <max.schwarz@ais.uni-bonn.de>
3 
4 #include <boost/program_options.hpp>
5 
6 #include <iostream>
7 #include <rosfmt/full.h>
8 
9 #include <ros/node_handle.h>
10 
11 #include <rosbag_fancy_msgs/Status.h>
12 
13 #include <std_srvs/Trigger.h>
14 
15 #include "topic_manager.h"
16 #include "topic_subscriber.h"
17 #include "ui.h"
18 #include "message_queue.h"
19 #include "mem_str.h"
20 #include "bag_writer.h"
21 
22 
23 namespace po = boost::program_options;
24 using namespace rosbag_fancy;
25 using namespace rosbag_fancy_msgs;
26 
27 int record(const std::vector<std::string>& options)
28 {
29  po::variables_map vm;
30 
31  // Handle CLI arguments
32  {
33  po::options_description desc("Options");
34  desc.add_options()
35  ("help", "Display this help message")
36  ("prefix,p", po::value<std::string>()->default_value("bag"), "Prefix for output bag file. The prefix is extended with a timestamp.")
37  ("output,o", po::value<std::string>()->value_name("FILE"), "Output bag file (overrides --prefix)")
38  ("topic", po::value<std::vector<std::string>>()->required(), "Topics to record")
39  ("queue-size", po::value<std::string>()->value_name("SIZE")->default_value("500MB"), "Queue size")
40  ("delete-old-at", po::value<std::string>()->value_name("SIZE"), "Delete old bags at given size, e.g. 100GB or 1TB")
41  ("split-bag-size", po::value<std::string>()->value_name("SIZE"), "Bag size for splitting, e.g. 1GB")
42  ("paused", "Start paused")
43  ("no-ui", "Disable terminal UI")
44  ("udp", "Subscribe using UDP transport")
45  ("bz2", "Enable BZ2 compression")
46  ("lz4", "Enable LZ2 compression")
47  ;
48 
49  po::positional_options_description p;
50  p.add("topic", -1);
51 
52  auto usage = [&](){
53  std::cout << "Usage: rosbag_fancy record [options] -o <bag file> <topics...>\n\n";
54  std::cout << desc << "\n\n";
55  std::cout << "Topics may be annotated with a rate limit in Hz, e.g.:\n";
56  std::cout << " rosbag_fancy /camera/image_raw=10.0\n";
57  std::cout << "\n";
58  };
59 
60  try
61  {
62  po::store(
63  po::command_line_parser(options).options(desc).positional(p).run(),
64  vm
65  );
66 
67  if(vm.count("help"))
68  {
69  usage();
70  return 0;
71  }
72 
73  po::notify(vm);
74  }
75  catch(po::error& e)
76  {
77  std::cerr << "Could not parse arguments: " << e.what() << "\n\n";
78  usage();
79  return 1;
80  }
81  }
82 
83  ros::NodeHandle nh{"~"};
84 
85  std::vector<std::string> topics = vm["topic"].as<std::vector<std::string>>();
86  std::sort(topics.begin(), topics.end());
87 
88  TopicManager topicManager;
89  for(auto& topicSpec : topics)
90  {
91  std::string name = topicSpec;
92  float rateLimit = 0.0f;
93 
94  auto sepIdx = topicSpec.find('=');
95 
96  if(sepIdx != std::string::npos)
97  {
98  name = topicSpec.substr(0, sepIdx);
99 
100  try
101  {
102  rateLimit = boost::lexical_cast<float>(topicSpec.substr(sepIdx+1));
103  }
104  catch(boost::bad_lexical_cast&)
105  {
106  std::cerr << "Bad topic spec: '" << topicSpec << "'\n";
107  return 1;
108  }
109  }
110 
111  int flags = 0;
112  if(vm.count("udp"))
113  flags |= static_cast<int>(Topic::Flag::UDP);
114 
115  topicManager.addTopic(name, rateLimit, flags);
116  }
117 
118  std::uint64_t queueSize = mem_str::stringToMemory(vm["queue-size"].as<std::string>());
119  MessageQueue queue{queueSize};
120 
121  // Figure out the output file name
122  auto namingMode = BagWriter::Naming::Verbatim;
123  std::string bagName = "";
124  if(vm.count("output"))
125  {
126  bagName = vm["output"].as<std::string>();
127  namingMode = BagWriter::Naming::Verbatim;
128  }
129  else
130  {
131  bagName = vm["prefix"].as<std::string>();
133  }
134 
135  std::uint64_t splitBagSizeInBytes = 0;
136  if(vm.count("split-bag-size"))
137  {
138  splitBagSizeInBytes = mem_str::stringToMemory(vm["split-bag-size"].as<std::string>());
139  }
140 
141  std::uint64_t deleteOldAtInBytes = 0;
142  if(vm.count("delete-old-at"))
143  {
144  deleteOldAtInBytes = mem_str::stringToMemory(vm["delete-old-at"].as<std::string>());
145  if(splitBagSizeInBytes != 0 && deleteOldAtInBytes < splitBagSizeInBytes)
146  {
147  ROSFMT_WARN("Chosen split-bag-size is larger than delete-old-at size!");
148  }
149  }
150 
151  if(!ros::Time::isValid())
152  {
153  ROSFMT_INFO("Waiting for ros::Time to become valid...");
155  }
156 
157  BagWriter writer{queue, bagName, namingMode, splitBagSizeInBytes, deleteOldAtInBytes};
158 
159  if(vm.count("bz2") && vm.count("lz4"))
160  {
161  fmt::print(stderr, "Options --bz2 and --lz4 are mutually exclusive\n");
162  return 1;
163  }
164 
165  if(vm.count("bz2"))
166  writer.setCompression(rosbag::compression::BZ2);
167  if(vm.count("lz4"))
168  writer.setCompression(rosbag::compression::LZ4);
169 
170  auto start = [&](){
171  try
172  {
173  writer.start();
174  }
175  catch(rosbag::BagException& e)
176  {
177  ROSFMT_ERROR("Could not open output bag file: {}", e.what());
178  return false;
179  }
180  return true;
181  };
182 
183  // Start/Stop service calls
184  ros::ServiceServer srv_start = nh.advertiseService("start", boost::function<bool(std_srvs::TriggerRequest&, std_srvs::TriggerResponse&)>([&](auto&, auto& resp){
185  resp.success = start();
186  return true;
187  }));
188  ros::ServiceServer srv_stop = nh.advertiseService("stop", boost::function<bool(std_srvs::TriggerRequest&, std_srvs::TriggerResponse&)>([&](auto&, auto& resp){
189  writer.stop();
190  resp.success = true;
191  return true;
192  }));
193 
194  // Status publisher
195  ros::Publisher pub_status = nh.advertise<Status>("status", 1);
196  ros::SteadyTimer timer_status = nh.createSteadyTimer(ros::WallDuration(0.1), boost::function<void(const ros::SteadyTimerEvent&)>([&](auto&){
198 
199  StatusPtr msg = boost::make_shared<Status>();
200  msg->header.stamp = ros::Time::now();
201 
202  msg->status = writer.running() ? Status::STATUS_RUNNING : Status::STATUS_PAUSED;
203 
204  msg->bagfile = writer.bagfileName();
205 
206  msg->bytes = writer.sizeInBytes();
207  msg->free_bytes = writer.freeSpace();
208  msg->bandwidth = 0;
209 
210  auto& counts = writer.messageCounts();
211 
212  for(auto& topic : topicManager.topics())
213  {
214  msg->topics.emplace_back();
215  auto& topicMsg = msg->topics.back();
216 
217  msg->bandwidth += topic.bandwidth;
218 
219  topicMsg.name = topic.name;
220  topicMsg.publishers = topic.numPublishers;
221  topicMsg.bandwidth = topic.bandwidth;
222  topicMsg.bytes = topic.totalBytes;
223  topicMsg.messages = topic.totalMessages;
224 
225  if(topic.id < counts.size())
226  topicMsg.messages_in_current_bag = counts[topic.id];
227 
228  topicMsg.rate = topic.messageRateAt(now);
229  }
230 
231  pub_status.publish(msg);
232  }));
233 
234  // Start recording if --paused is not given
235  if(vm.count("paused") == 0)
236  {
237  if(!start())
238  return 1;
239  }
240 
241  TopicSubscriber subscriber{topicManager, queue};
242 
243  std::unique_ptr<UI> ui;
244 
245  if(!vm.count("no-ui"))
246  ui.reset(new UI{topicManager, queue, writer});
247 
248  ros::spin();
249 
250  return 0;
251 }
rosbag_fancy::MessageQueue
Definition: message_queue.h:23
ui.h
node_handle.h
ros::Publisher
run
void run(class_loader::ClassLoader *loader)
topic_subscriber.h
rosbag_fancy
Definition: bag_reader.cpp:240
message_queue.h
rosbag_fancy::TopicManager::topics
std::vector< Topic > & topics()
Definition: topic_manager.h:106
rosbag::compression::LZ4
LZ4
rosbag_fancy::mem_str::stringToMemory
uint64_t stringToMemory(std::string humanSize)
Definition: mem_str.cpp:29
record
int record(const std::vector< std::string > &options)
Definition: cmd_record.cpp:27
rosbag::compression::BZ2
BZ2
ros::Publisher::publish
void publish(const boost::shared_ptr< M > &message) const
ROSFMT_WARN
#define ROSFMT_WARN(...)
ros::ServiceServer
topic_manager.h
rosbag::BagException
ros::Time::isValid
static bool isValid()
ros::SteadyTimer
rosbag_fancy::BagWriter::Naming::AppendTimestamp
@ AppendTimestamp
bag_writer.h
ros::WallTime::now
static WallTime now()
rosbag_fancy::TopicManager::addTopic
void addTopic(const std::string &topic, float rateLimit=0.0f, int flags=0)
Definition: topic_manager.cpp:26
rosbag_fancy::TopicSubscriber
Definition: topic_subscriber.h:20
rosbag_fancy::Topic::Flag::UDP
@ UDP
ROSFMT_ERROR
#define ROSFMT_ERROR(...)
ros::WallTime
full.h
rosbag_fancy::BagWriter
Definition: bag_writer.h:23
start
ROSCPP_DECL void start()
ros::Time::waitForValid
static bool waitForValid()
ros::spin
ROSCPP_DECL void spin()
rosbag_fancy::BagWriter::Naming::Verbatim
@ Verbatim
ros::WallDuration
mem_str.h
rosbag_fancy::TopicManager
Definition: topic_manager.h:101
rosbag_fancy::UI
Definition: ui.h:21
ros::NodeHandle
ros::Time::now
static Time now()
usage
def usage(progname)
ROSFMT_INFO
#define ROSFMT_INFO(...)


rosbag_fancy
Author(s):
autogenerated on Tue Feb 20 2024 03:20:59