39 #if !defined(_MSC_VER) 40 #include <sys/select.h> 43 #include <boost/format.hpp> 45 #include "rosgraph_msgs/Clock.h" 60 ros::M_string::const_iterator header_iter = c->
header->find(
"latching");
61 return (header_iter != c->
header->end() && header_iter->second ==
"1");
83 bag_time_frequency(0.0),
94 wait_for_subscribers(false),
95 rate_control_topic(
""),
96 rate_control_max_delay(1.0
f),
102 if (
bags.size() == 0)
103 throw Exception(
"You must specify at least one bag file to play from");
105 throw Exception(
"Invalid duration, must be > 0.0");
116 pause_change_requested_(false),
117 requested_pause_state_(false),
118 terminal_modified_(false)
136 ROS_INFO(
"Opening %s", filename.c_str());
142 bags_.push_back(bag);
145 std::cerr <<
"Bag file " << filename <<
" is unindexed. Run rosbag reindex." << std::endl;
168 const auto full_initial_time = full_view.
getBeginTime();
184 view.
addQuery(*bag, initial_time, finish_time);
187 view.
addQuery(*bag, topics, initial_time, finish_time);
190 if (view.
size() == 0)
192 std::cerr <<
"No messages to play on specified topics. Exiting." << std::endl;
205 std::cout <<
"Creating rate control topic subscriber..." << std::flush;
211 ops.
md5sum = ros::message_traits::md5sum<topic_tools::ShapeShifter>();
212 ops.
datatype = ros::message_traits::datatype<topic_tools::ShapeShifter>();
219 std::cout <<
" done." << std::endl;
225 std::cout <<
" done." << std::endl;
227 std::cout << std::endl <<
"Hit space to toggle paused, or 's' to step." << std::endl;
232 View full_latch_view;
235 for (
const auto& bag : bags_) {
236 full_latch_view.
addQuery(*bag, full_initial_time, initial_time);
239 for (
const auto& bag : bags_) {
240 full_latch_view.
addQuery(*bag, topics, full_initial_time, initial_time);
244 std::set<std::pair<std::string, std::string>> latch_topics;
247 const auto header_iter = c->header->find(
"callerid");
248 const auto callerid = (header_iter != c->header->end() ? header_iter->second : string(
""));
250 latch_topics.emplace(callerid, c->topic);
261 for (
const auto& item : latch_topics) {
262 const auto& callerid = item.first;
263 const auto& topic = item.second;
266 for (
const auto& bag : bags_) {
270 auto last_message = latch_view.
end();
271 for (
auto iter = latch_view.
begin(); iter != latch_view.
end(); ++iter) {
272 if (iter->getCallerId() == callerid) {
277 if (last_message != latch_view.
end()) {
278 const auto publisher =
publishers_.find(callerid + topic);
281 publisher->second.publish(*last_message);
327 std::cout << std::endl;
331 std::cout << std::endl <<
"Done." << std::endl;
342 std::string def = ssmsg->getMessageDefinition();
346 std::istringstream
f(def);
349 while(std::getline(f, s,
'\n')) {
350 if (!s.empty() && s.find(
"#") != 0) {
352 if (s.find(
"Header ") == 0) {
360 std::cout << std::endl <<
"WARNING: Rate control topic is bad, header is not first. MSG may be malformed." << std::endl;
364 std::vector<uint8_t> buffer(length);
370 int32_t header_timestamp_sec = buffer[4] | (uint32_t)buffer[5] << 8 | (uint32_t)buffer[6] << 16 | (uint32_t)buffer[7] << 24;
371 int32_t header_timestamp_nsec = buffer[8] | (uint32_t)buffer[9] << 8 | (uint32_t)buffer[10] << 16 | (uint32_t)buffer[11] << 24;
443 bool all_topics_subscribed =
false;
444 std::cout <<
"Waiting for subscribers." << std::endl;
445 while (!all_topics_subscribed) {
446 all_topics_subscribed = std::all_of(
448 [](
const PublisherMap::value_type& pub) {
449 return pub.second.getNumSubscribers() > 0;
453 std::cout <<
"Finished waiting for subscribers." << std::endl;
458 ros::M_string::const_iterator header_iter = c->
header->find(
"callerid");
459 std::string callerid = (header_iter != c->
header->end() ? header_iter->second : string(
""));
461 string callerid_topic = callerid + c->
topic;
463 map<string, ros::Publisher>::iterator pub_iter =
publishers_.find(callerid_topic);
485 string callerid_topic = callerid + topic;
487 map<string, ros::Publisher>::iterator pub_iter =
publishers_.find(callerid_topic);
496 pub_iter->second.publish(m);
540 bool charsleftorpaused =
true;
602 charsleftorpaused =
false;
611 pub_iter->second.publish(m);
634 bool charsleftorpaused =
true;
663 charsleftorpaused =
false;
679 #if defined(_MSC_VER) 680 input_handle = GetStdHandle(STD_INPUT_HANDLE);
681 if (input_handle == INVALID_HANDLE_VALUE)
683 std::cout <<
"Failed to set up standard input handle." << std::endl;
686 if (! GetConsoleMode(input_handle, &stdin_set) )
688 std::cout <<
"Failed to save the console mode." << std::endl;
700 const int fd = fileno(stdin);
704 flags.c_lflag &= ~ICANON;
705 flags.c_cc[VMIN] = 0;
706 flags.c_cc[VTIME] = 0;
707 tcsetattr(fd, TCSANOW, &flags);
720 #if defined(_MSC_VER) 721 SetConsoleMode(input_handle, stdin_set);
723 const int fd = fileno(stdin);
733 #elif !defined(_MSC_VER) 737 #if defined(_MSC_VER) 739 INPUT_RECORD input_record[1];
740 DWORD input_size = 1;
741 BOOL b = GetNumberOfConsoleInputEvents(input_handle, &events);
744 b = ReadConsoleInput(input_handle, input_record, input_size, &events);
747 for (
unsigned int i = 0; i < events; ++i)
749 if (input_record[i].EventType & KEY_EVENT & input_record[i].Event.KeyEvent.bKeyDown)
751 CHAR ch = input_record[i].Event.KeyEvent.uChar.AsciiChar;
762 if (select(
maxfd_, &testfd, NULL, NULL, &tv) <= 0)
812 rosgraph_msgs::Clock pub_msg;
875 rosgraph_msgs::Clock pub_msg;
891 rosgraph_msgs::Clock pub_msg;
const boost::shared_ptr< ConstMessage > & getConstMessage() const
static void write(Stream &stream, typename boost::call_traits< T >::param_type t)
void setWCHorizon(const ros::WallTime &horizon)
bool wait_for_subscribers
TimeTranslator time_translator_
std::string rate_control_topic
std::vector< const ConnectionInfo *> getConnections()
void setPublishFrequency(double publish_frequency)
ros::Time translate(ros::Time const &t)
Subscriber subscribe(const std::string &topic, uint32_t queue_size, void(T::*fp)(M), T *obj, const TransportHints &transport_hints=TransportHints())
void doPublish(rosbag::MessageInstance const &m)
std::vector< std::string > topics
ros::Time const & getTime() const
bool requested_pause_state_
std::vector< std::string > pause_topics
ros::WallDuration wall_step_
std::vector< std::string > bags
static bool sleepUntil(const WallTime &end)
ros::WallTime wc_horizon_
ServiceServer advertiseService(const std::string &service, bool(T::*srv_func)(MReq &, MRes &), T *obj)
ros::Time const & getTime() const
float rate_control_max_delay
void setTranslatedStartTime(ros::Time const &t)
Increments the translated start time by shift. Useful for pausing.
ros::WallDuration advertise_sleep
double bag_time_frequency
void publish(const boost::shared_ptr< M > &message) const
SubscriptionCallbackHelperPtr helper
std::string const & getMD5Sum() const
void addQuery(Bag const &bag, ros::Time const &start_time=ros::TIME_MIN, ros::Time const &end_time=ros::TIME_MAX)
WallDuration & fromSec(double t)
void setTimeScale(double const &s)
ros::NodeHandle node_handle_
double publish_frequency_
void runClock(const ros::WallDuration &duration)
void setRealStartTime(ros::Time const &t)
ros::Subscriber rate_control_sub_
ROSTIME_DECL const Time TIME_MAX
void setTimeScale(double time_scale)
Publisher advertise(const std::string &topic, uint32_t queue_size, bool latch=false)
std::vector< boost::shared_ptr< Bag > > bags_
ros::ServiceServer pause_service_
void advertise(const ConnectionInfo *c)
ros::WallTime paused_time_
bool pauseCallback(std_srvs::SetBool::Request &req, std_srvs::SetBool::Response &res)
Player(PlayerOptions const &options)
std::string const & getDataType() const
boost::shared_ptr< ros::M_string > header
void waitForSubscribers() const
std::string const & getMessageDefinition() const
uint32_t serializationLength(const T &t)
#define ROS_INFO_STREAM(args)
ros::NodeHandle node_handle_
void setTime(const ros::Time &time)
ros::Duration bag_length_
void stepClock()
Step the clock to the horizon.
ros::AdvertiseOptions createAdvertiseOptions(MessageInstance const &msg, uint32_t queue_size, const std::string &prefix="")
Helper function to create AdvertiseOptions from a MessageInstance.
void shift(ros::Duration const &d)
Increments the translated start time by shift. Useful for pausing.
ros::Time last_rate_control_
ROSCPP_DECL void shutdown()
bool isLatching(const ConnectionInfo *c)
std::string getCallerId() const
std::string const & getTopic() const
bool pause_change_requested_
void updateRateTopicTime(const ros::MessageEvent< topic_tools::ShapeShifter const > &msg_event)
TimePublisher time_publisher_
ROSCPP_DECL void spinOnce()
void processPause(const bool paused, ros::WallTime &horizon)
ROSTIME_DECL const Duration DURATION_MAX
void setHorizon(const ros::Time &horizon)
void runStalledClock(const ros::WallDuration &duration)
Sleep as necessary, but don't let the click run.