cmd_play.cpp
Go to the documentation of this file.
1 // Play command
2 // Author: Max Schwarz <max.schwarz@ais.uni-bonn.de>
3 
4 #include <boost/program_options.hpp>
5 
6 #include <iostream>
7 #include <rosfmt/full.h>
8 #include <fmt/chrono.h>
9 
10 #include <chrono>
11 
12 #include <ros/node_handle.h>
13 #include <ros/publisher.h>
14 
15 #include <rosgraph_msgs/Clock.h>
16 
17 #include "bag_reader.h"
18 #include "bag_view.h"
19 #include "topic_manager.h"
20 #include "ui.h"
21 #include "tf2_scanner.h"
22 
23 namespace po = boost::program_options;
24 using namespace rosbag_fancy;
25 
26 int play(const std::vector<std::string>& options)
27 {
28  po::variables_map vm;
29 
30  // Handle CLI arguments
31  {
32  po::options_description desc("Options");
33  desc.add_options()
34  ("help", "Display this help message")
35  ("clock", "Publish clock (requires use_sim_time)")
36  ;
37 
38  po::options_description hidden("Hidden");
39  hidden.add_options()
40  ("input", po::value<std::vector<std::string>>()->required(), "Input file")
41  ;
42 
43  po::options_description all("All");
44  all.add(desc).add(hidden);
45 
46  po::positional_options_description p;
47  p.add("input", -1);
48 
49  auto usage = [&](){
50  std::cout << "Usage: rosbag_fancy play [options] <bag file(s)>\n\n";
51  std::cout << desc << "\n\n";
52  };
53 
54  try
55  {
56  po::store(
57  po::command_line_parser(options).options(all).positional(p).run(),
58  vm
59  );
60 
61  if(vm.count("help"))
62  {
63  usage();
64  return 0;
65  }
66 
67  po::notify(vm);
68  }
69  catch(po::error& e)
70  {
71  std::cerr << "Could not parse arguments: " << e.what() << "\n\n";
72  usage();
73  return 1;
74  }
75  }
76 
77  bool publishClock = vm.count("clock");
78 
79  ros::Publisher pub_clock;
80  ros::SteadyTime lastClockPublishTime;
81 
82  if(publishClock)
83  pub_clock = ros::NodeHandle{}.advertise<rosgraph_msgs::Clock>("/clock", 1, true);
84 
85  struct Bag
86  {
87  explicit Bag(const std::string& filename)
88  : reader{filename}
89  {}
90 
91  BagReader reader;
92  std::unordered_map<int, ros::Publisher> publishers;
93  };
94 
95  std::vector<Bag> bags;
96 
97  TopicManager topicManager;
98  std::map<std::string, int> topicMap;
99 
100  ros::NodeHandle nh;
101 
102  ros::Time startTime;
103  ros::Time endTime;
104 
105  for(auto& filename : vm["input"].as<std::vector<std::string>>())
106  {
107  auto& bag = bags.emplace_back(filename);
108 
109  if(startTime == ros::Time(0) || bag.reader.startTime() < startTime)
110  startTime = bag.reader.startTime();
111 
112  if(endTime == ros::Time(0) || bag.reader.endTime() > endTime)
113  endTime = bag.reader.endTime();
114 
115  for(auto& [id, con] : bag.reader.connections())
116  {
118  opts.datatype = con.type;
119  opts.md5sum = con.md5sum;
120  opts.message_definition = con.msgDef;
121  opts.topic = con.topicInBag;
122  opts.latch = con.latching;
123  opts.queue_size = 100;
124  opts.has_header = false; // FIXME: Is this correct?
125 
126  bag.publishers[id] = nh.advertise(opts);
127 
128  auto it = topicMap.find(con.topicInBag);
129  if(it == topicMap.end())
130  {
131  topicMap[con.topicInBag] = topicManager.topics().size();
132  topicManager.addTopic(con.topicInBag);
133  }
134  }
135  }
136 
137  std::vector<BagReader*> bagReaders;
138  for(auto& bag : bags)
139  bagReaders.push_back(&bag.reader);
140 
141  TF2Scanner tf2Scanner{bagReaders};
142 
143  ros::Publisher pub_tf_static = nh.advertise<tf2_msgs::TFMessage>("/tf_static", 20, true);
144 
145  BagView view;
146  auto topicPredicate = [&](const BagReader::Connection& connection){
147  return connection.topicInBag != "/tf_static";
148  };
149 
150  for(auto& bag : bags)
151  view.addBag(&bag.reader, topicPredicate);
152 
153  ros::WallDuration(0.2).sleep();
154 
155  ros::Time bagRefTime = startTime;
156  ros::SteadyTime wallRefTime = ros::SteadyTime::now();
157  ros::Time currentTime = bagRefTime;
158 
159  std::unique_ptr<PlaybackUI> ui;
160 
161  bool paused = false;
162 
163  BagView::Iterator it = view.begin();
164 
165  if(!vm.count("no-ui"))
166  {
167  ui.reset(new PlaybackUI{topicManager, startTime, endTime});
168 
169  ui->seekForwardRequested.connect([&](){
170  currentTime += ros::Duration{5.0};
171  bagRefTime += ros::Duration{5.0};
172 
173  it = view.findTime(currentTime);
174  });
175  ui->seekBackwardRequested.connect([&](){
176  currentTime -= ros::Duration{5.0};
177  bagRefTime -= ros::Duration{5.0};
178 
179  if(currentTime < startTime)
180  {
181  currentTime = startTime;
182  bagRefTime = currentTime;
183  wallRefTime = ros::SteadyTime::now();
184  }
185 
186  it = view.findTime(currentTime);
187  });
188  ui->pauseRequested.connect([&](){
189  paused = !paused;
190  bagRefTime = currentTime;
191  wallRefTime = ros::SteadyTime::now();
192  });
193  ui->exitRequested.connect([&](){
194  ros::shutdown();
195  });
196  }
197 
198  while(ros::ok())
199  {
200  if(paused)
201  ros::WallDuration{0.1}.sleep();
202  else
203  {
204  if(it == view.end())
205  break;
206 
207  auto& msg = *it->msg;
208 
209  if(msg.stamp < bagRefTime)
210  continue;
211 
212  ros::SteadyTime wallStamp = wallRefTime + ros::WallDuration().fromNSec((msg.stamp - bagRefTime).toNSec());
213  ros::SteadyTime::sleepUntil(wallStamp);
214 
215  currentTime = msg.stamp;
216  ui->setPositionInBag(msg.stamp);
217 
218  bags[it->bagIndex].publishers[msg.connection->id].publish(msg);
219 
220  auto& topic = topicManager.topics()[topicMap[msg.connection->topicInBag]];
221  topic.notifyMessage(msg.size());
222 
223  if(publishClock && wallStamp - lastClockPublishTime > ros::WallDuration{0.001})
224  {
225  rosgraph_msgs::Clock msg;
226  msg.clock = currentTime;
227  pub_clock.publish(msg);
228  lastClockPublishTime = wallStamp;
229  }
230 
231  // Publish matching tf_static
232  if(auto tfmsg = tf2Scanner.fetchUpdate(currentTime))
233  pub_tf_static.publish(*tfmsg);
234 
235  ++it;
236  }
237 
238  ros::spinOnce();
239 
240  // Handle key input
241  if(ui)
242  {
243  fd_set fds{};
244  FD_ZERO(&fds);
245  FD_SET(STDIN_FILENO, &fds);
246  timeval timeout{};
247  int ret = select(STDIN_FILENO+1, &fds, nullptr, nullptr, &timeout);
248  if(ret < 0)
249  {
250  if(errno != EAGAIN && errno != EINTR)
251  throw std::runtime_error{fmt::format("select() error: {}", strerror(errno))};
252  }
253  else if(ret != 0)
254  ui->handleInput();
255  }
256 
257  ui->setPaused(paused);
258  }
259 
260  return 0;
261 }
std::string format
Definition: ui.cpp:76
filename
std::string message_definition
void run(class_loader::ClassLoader *loader)
std::vector< Topic > & topics()
int play(const std::vector< std::string > &options)
Definition: cmd_play.cpp:26
void addTopic(const std::string &topic, float rateLimit=0.0f, int flags=0)
void publish(const boost::shared_ptr< M > &message) const
static bool sleepUntil(const SteadyTime &end)
static SteadyTime now()
ROSCPP_DECL bool ok()
Publisher advertise(const std::string &topic, uint32_t queue_size, bool latch=false)
bool sleep() const
const BagReader::Message * msg
Definition: bag_view.h:20
void addBag(BagReader *reader)
Definition: bag_view.cpp:227
def usage(progname)
WallDuration & fromNSec(int64_t t)
ROSCPP_DECL void shutdown()
ROSCPP_DECL void spinOnce()


rosbag_fancy
Author(s):
autogenerated on Fri Dec 9 2022 04:00:08