cmd_play.cpp
Go to the documentation of this file.
1 // Play command
2 // Author: Max Schwarz <max.schwarz@ais.uni-bonn.de>
3 
4 #include <boost/program_options.hpp>
5 
6 #include <iostream>
7 #include <rosfmt/full.h>
8 #include <fmt/chrono.h>
9 
10 #include <chrono>
11 
12 #include <ros/node_handle.h>
13 #include <ros/publisher.h>
14 #include <rosbag_fancy_msgs/PlayStatus.h>
15 #include <std_srvs/Trigger.h>
16 
17 #include <rosgraph_msgs/Clock.h>
18 
19 #include "bag_reader.h"
20 #include "bag_view.h"
21 #include "topic_manager.h"
22 #include "ui.h"
23 #include "tf2_scanner.h"
24 
25 namespace po = boost::program_options;
26 using namespace rosbag_fancy;
27 using namespace rosbag_fancy_msgs;
28 
29 int play(const std::vector<std::string>& options)
30 {
31  po::variables_map vm;
32 
33  // Handle CLI arguments
34  {
35  po::options_description desc("Options");
36  desc.add_options()
37  ("help", "Display this help message")
38  ("clock", "Publish clock (requires use_sim_time)")
39  ("no-ui", "Disable terminal UI")
40  ;
41 
42  po::options_description hidden("Hidden");
43  hidden.add_options()
44  ("input", po::value<std::vector<std::string>>()->required(), "Input file")
45  ;
46 
47  po::options_description all("All");
48  all.add(desc).add(hidden);
49 
50  po::positional_options_description p;
51  p.add("input", -1);
52 
53  auto usage = [&](){
54  std::cout << "Usage: rosbag_fancy play [options] <bag file(s)>\n\n";
55  std::cout << desc << "\n\n";
56  };
57 
58  try
59  {
60  po::store(
61  po::command_line_parser(options).options(all).positional(p).run(),
62  vm
63  );
64 
65  if(vm.count("help"))
66  {
67  usage();
68  return 0;
69  }
70 
71  po::notify(vm);
72  }
73  catch(po::error& e)
74  {
75  std::cerr << "Could not parse arguments: " << e.what() << "\n\n";
76  usage();
77  return 1;
78  }
79  }
80 
81  bool publishClock = vm.count("clock");
82 
83  ros::Publisher pub_clock;
84  ros::SteadyTime lastClockPublishTime;
85 
86  if(publishClock)
87  pub_clock = ros::NodeHandle{}.advertise<rosgraph_msgs::Clock>("/clock", 1, true);
88 
89  struct Bag
90  {
91  explicit Bag(const std::string& filename)
92  : reader{filename}, filename_ ( filename )
93  {}
94 
95  BagReader reader;
96  std::string filename_;
97  std::unordered_map<int, ros::Publisher> publishers;
98  };
99 
100  std::vector<Bag> bags;
101 
102  TopicManager topicManager;
103  std::map<std::string, int> topicMap;
104 
105  ros::NodeHandle nh;
106 
107  ros::Time startTime;
108  ros::Time endTime;
109 
110  for(auto& filename : vm["input"].as<std::vector<std::string>>())
111  {
112  auto& bag = bags.emplace_back(filename);
113 
114  if(startTime == ros::Time(0) || bag.reader.startTime() < startTime)
115  startTime = bag.reader.startTime();
116 
117  if(endTime == ros::Time(0) || bag.reader.endTime() > endTime)
118  endTime = bag.reader.endTime();
119 
120  for(auto& [id, con] : bag.reader.connections())
121  {
123  opts.datatype = con.type;
124  opts.md5sum = con.md5sum;
125  opts.message_definition = con.msgDef;
126  opts.topic = con.topicInBag;
127  opts.latch = con.latching;
128  opts.queue_size = 100;
129  opts.has_header = false; // FIXME: Is this correct?
130 
131  bag.publishers[id] = nh.advertise(opts);
132 
133  auto it = topicMap.find(con.topicInBag);
134  if(it == topicMap.end())
135  {
136  topicMap[con.topicInBag] = topicManager.topics().size();
137  topicManager.addTopic(con.topicInBag);
138  }
139  }
140  }
141 
142  std::vector<BagReader*> bagReaders;
143  for(auto& bag : bags)
144  bagReaders.push_back(&bag.reader);
145 
146  TF2Scanner tf2Scanner{bagReaders};
147 
148  ros::Publisher pub_tf_static = nh.advertise<tf2_msgs::TFMessage>("/tf_static", 20, true);
149 
150  BagView view;
151  auto topicPredicate = [&](const BagReader::Connection& connection){
152  return connection.topicInBag != "/tf_static";
153  };
154 
155  for(auto& bag : bags)
156  view.addBag(&bag.reader, topicPredicate);
157 
158  ros::WallDuration(0.2).sleep();
159 
160  ros::Time bagRefTime = startTime;
161  ros::SteadyTime wallRefTime = ros::SteadyTime::now();
162  ros::Time currentTime = bagRefTime;
163 
164  std::unique_ptr<PlaybackUI> ui;
165 
166  bool paused = false;
167 
168  BagView::Iterator it = view.begin();
169 
170  if(!vm.count("no-ui"))
171  {
172  ui.reset(new PlaybackUI{topicManager, startTime, endTime});
173 
174  ui->seekForwardRequested.connect([&](){
175  currentTime += ros::Duration{5.0};
176  bagRefTime += ros::Duration{5.0};
177 
178  it = view.findTime(currentTime);
179  });
180  ui->seekBackwardRequested.connect([&](){
181  currentTime -= ros::Duration{5.0};
182  bagRefTime -= ros::Duration{5.0};
183 
184  if(currentTime < startTime)
185  {
186  currentTime = startTime;
187  bagRefTime = currentTime;
188  wallRefTime = ros::SteadyTime::now();
189  }
190 
191  it = view.findTime(currentTime);
192  });
193  ui->pauseRequested.connect([&](){
194  paused = !paused;
195  bagRefTime = currentTime;
196  wallRefTime = ros::SteadyTime::now();
197  });
198  ui->exitRequested.connect([&](){
199  ros::shutdown();
200  });
201  }
202 
203  // Start/Stop service calls
204  ros::ServiceServer srv_start = nh.advertiseService("start", boost::function<bool(std_srvs::TriggerRequest&, std_srvs::TriggerResponse&)>([&](auto&, auto& resp) {
205  paused = false;
206  resp.success = !paused;
207  if(ui)
208  ui->setPaused(paused);
209  return true;
210  }));
211  ros::ServiceServer srv_pause = nh.advertiseService("pause", boost::function<bool(std_srvs::TriggerRequest&, std_srvs::TriggerResponse&)>([&](auto&, auto& resp) {
212  paused = true;
213  resp.success = paused;
214  if(ui)
215  ui->setPaused(paused);
216  return true;
217  }));
218  ros::ServiceServer srv_toggle = nh.advertiseService("toggle", boost::function<bool(std_srvs::TriggerRequest&, std_srvs::TriggerResponse&)>([&](auto&, auto& resp) {
219  paused = !paused;
220  if(ui)
221  ui->setPaused(paused);
222  resp.success = true;
223  return true;
224  }));
225 
226  // Status publisher
227  ros::Publisher pub_status = nh.advertise<PlayStatus>("status", 1);
228  ros::SteadyTimer timer_status = nh.createSteadyTimer(ros::WallDuration(0.1), boost::function<void(const ros::SteadyTimerEvent&)>([&](auto&) {
229  auto msg = boost::make_shared<PlayStatus>();
230  msg->header.stamp = ros::Time::now();
231 
232  msg->status = paused ? PlayStatus::STATUS_PAUSED : PlayStatus::STATUS_RUNNING;
233 
234  msg->currentTime = currentTime;
235  msg->startTime = startTime;
236  msg->endTime = endTime;
237  msg->current = currentTime-startTime;
238  msg->duration = endTime-startTime;
239 
240  msg->bagfiles.reserve(bags.size());
241  for(const auto& bag : bags)
242  msg->bagfiles.emplace_back(bag.filename_);
243 
244  msg->topics.reserve(topicManager.topics().size());
245  for(const auto& topic : topicManager.topics())
246  {
247  msg->topics.emplace_back();
248  auto& topicMsg = msg->topics.back();
249 
250  topicMsg.name = topic.name;
251  topicMsg.bandwidth = topic.bandwidth;
252  topicMsg.rate = topic.messageRate;
253  }
254 
255  pub_status.publish(msg);
256  }));
257 
258  while(ros::ok())
259  {
260  if(paused)
261  ros::WallDuration{0.1}.sleep();
262  else
263  {
264  if(it == view.end())
265  break;
266 
267  auto& msg = *it->msg;
268 
269  if(msg.stamp < bagRefTime)
270  continue;
271 
272  ros::SteadyTime wallStamp = wallRefTime + ros::WallDuration().fromNSec((msg.stamp - bagRefTime).toNSec());
273  ros::SteadyTime::sleepUntil(wallStamp);
274 
275  currentTime = msg.stamp;
276  if(ui)
277  ui->setPositionInBag(msg.stamp);
278 
279  bags[it->bagIndex].publishers[msg.connection->id].publish(msg);
280 
281  auto& topic = topicManager.topics()[topicMap[msg.connection->topicInBag]];
282  topic.notifyMessage(msg.size());
283 
284  if(publishClock && wallStamp - lastClockPublishTime > ros::WallDuration{0.001})
285  {
286  rosgraph_msgs::Clock msg;
287  msg.clock = currentTime;
288  pub_clock.publish(msg);
289  lastClockPublishTime = wallStamp;
290  }
291 
292  // Publish matching tf_static
293  if(auto tfmsg = tf2Scanner.fetchUpdate(currentTime))
294  pub_tf_static.publish(*tfmsg);
295 
296  ++it;
297  }
298 
299  ros::spinOnce();
300 
301  if(ui)
302  {
303  // Handle key input
304  fd_set fds{};
305  FD_ZERO(&fds);
306  FD_SET(STDIN_FILENO, &fds);
307  timeval timeout{};
308  int ret = select(STDIN_FILENO+1, &fds, nullptr, nullptr, &timeout);
309  if(ret < 0)
310  {
311  if(errno != EAGAIN && errno != EINTR)
312  throw std::runtime_error{fmt::format("select() error: {}", strerror(errno))};
313  }
314  else if(ret != 0)
315  ui->handleInput();
316 
317  ui->setPaused(paused);
318  }
319  }
320 
321  return 0;
322 }
ui.h
ros::AdvertiseOptions::topic
std::string topic
ros::WallDuration::sleep
bool sleep() const
node_handle.h
ros::Publisher
run
void run(class_loader::ClassLoader *loader)
tf2_scanner.h
bag_view.h
rosbag_fancy::BagReader::Connection
Definition: bag_reader.h:32
rosbag_fancy
Definition: bag_reader.cpp:240
ros::spinOnce
ROSCPP_DECL void spinOnce()
rosbag_fancy::TopicManager::topics
std::vector< Topic > & topics()
Definition: topic_manager.h:106
rosbag_fancy::BagView::MultiBagMessage::msg
const BagReader::Message * msg
Definition: bag_view.h:20
ros::shutdown
ROSCPP_DECL void shutdown()
ros::NodeHandle::advertiseService
ServiceServer advertiseService(AdvertiseServiceOptions &ops)
ros::AdvertiseOptions::datatype
std::string datatype
rosbag_fancy::BagView
Definition: bag_view.h:12
ros::AdvertiseOptions::has_header
bool has_header
ros::Publisher::publish
void publish(const boost::shared_ptr< M > &message) const
ros::NodeHandle::advertise
Publisher advertise(AdvertiseOptions &ops)
publisher.h
ros::AdvertiseOptions
ros::ok
ROSCPP_DECL bool ok()
ros::ServiceServer
rosbag_fancy::TF2Scanner
Definition: tf2_scanner.h:17
topic_manager.h
ros::SteadyTime::now
static SteadyTime now()
ros::SteadyTimer
ros::AdvertiseOptions::latch
bool latch
ros::AdvertiseOptions::queue_size
uint32_t queue_size
ros::SteadyTime
rosbag_fancy::TopicManager::addTopic
void addTopic(const std::string &topic, float rateLimit=0.0f, int flags=0)
Definition: topic_manager.cpp:26
rosbag_fancy::BagView::Iterator
Definition: bag_view.h:24
ros::SteadyTime::sleepUntil
static bool sleepUntil(const SteadyTime &end)
rosbag_fancy::BagView::MultiBagMessage::bagIndex
unsigned int bagIndex
Definition: bag_view.h:21
rosbag_fancy::BagView::addBag
void addBag(BagReader *reader)
Definition: bag_view.cpp:227
full.h
rosbag_fancy::BagReader
Definition: bag_reader.h:21
format
std::string format
Definition: ui.cpp:76
rosbag_fancy::PlaybackUI
Definition: ui.h:42
ros::Time
ros::NodeHandle::createSteadyTimer
SteadyTimer createSteadyTimer(SteadyTimerOptions &ops) const
bag_reader.h
ros::AdvertiseOptions::md5sum
std::string md5sum
ros::AdvertiseOptions::message_definition
std::string message_definition
play
int play(const std::vector< std::string > &options)
Definition: cmd_play.cpp:29
ros::WallDuration
ros::Duration
rosbag_fancy::TopicManager
Definition: topic_manager.h:101
ros::NodeHandle
ros::Time::now
static Time now()
DurationBase< WallDuration >::fromNSec
WallDuration & fromNSec(int64_t t)
usage
def usage(progname)


rosbag_fancy
Author(s):
autogenerated on Tue Feb 20 2024 03:20:59