Go to the documentation of this file.
38 #include <boost/filesystem.hpp>
41 #if BOOST_FILESYSTEM_VERSION < 3
42 #include <sys/statvfs.h>
51 #include <boost/lexical_cast.hpp>
52 #include <boost/regex.hpp>
53 #include <boost/thread.hpp>
54 #include <boost/thread/xtime.hpp>
55 #include <boost/date_time/local_time/local_time.hpp>
77 topic(_topic), msg(_msg), connection_header(_connection_header), time(_time)
84 filename(_filename), queue(_queue), time(_time)
100 repeat_latched(false),
105 buffer_size(1048576 * 256),
106 chunk_size(1024 * 768),
113 min_space(1024 * 1024 * 1024),
126 writing_enabled_(true)
139 fprintf(stderr,
"Specifying a count is not valid with automatic topic subscription.\n");
145 fprintf(stderr,
"No topics specified.\n");
160 queue_ =
new std::queue<OutgoingMessage>;
169 ROS_WARN(
"/use_sim_time set to true and no clock published. Still waiting for valid time...");
182 boost::thread record_thread;
185 record_thread = boost::thread([
this]() {
195 catch (
const std::exception& ex)
212 record_thread = boost::thread([
this]() {
222 catch (
const std::exception& ex)
248 record_thread.join();
256 ROS_INFO(
"Subscribing to %s", topic.c_str());
265 ops.
md5sum = ros::message_traits::md5sum<topic_tools::ShapeShifter>();
266 ops.
datatype = ros::message_traits::datatype<topic_tools::ShapeShifter>();
269 boost::bind(&
Recorder::doQueue,
this, boost::placeholders::_1, topic, sub, count));
302 [&topic] (
string const& regex_str){
303 boost::regex e(regex_str);
305 return boost::regex_match(topic, what, e, boost::match_extra);
317 std::stringstream msg;
318 const boost::posix_time::ptime now=
319 boost::posix_time::second_clock::local_time();
320 boost::posix_time::time_facet *
const f=
321 new boost::posix_time::time_facet(
"%Y-%m-%d-%H-%M-%S");
322 msg.imbue(std::locale(msg.getloc(),
f));
330 Time rectime = Time::now();
333 cout <<
"Received message on topic " << subscriber->getTopic() << endl;
351 latched_msgs_.insert({{subscriber->getTopic(), it2->second}, out});
363 Time now = Time::now();
365 ROS_WARN(
"rosbag record buffer exceeded. Dropping oldest queued message.");
379 subscriber->shutdown();
390 vector<string> parts;
393 size_t ind = prefix.rfind(
".bag");
395 if (ind != std::string::npos && ind == prefix.size() - 4)
400 if (prefix.length() > 0)
401 parts.push_back(prefix);
405 parts.push_back(boost::lexical_cast<string>(
split_count_));
407 if (parts.size() == 0)
409 throw BagException(
"Bag filename is empty (neither of these was specified: prefix, append_date, split)");
413 for (
unsigned int i = 1; i < parts.size(); i++)
430 queue_ =
new std::queue<OutgoingMessage>;
446 ROS_ERROR(
"Error writing: %s", e.what());
460 bag_.
write(out.second.topic, now, *out.second.msg, out.second.connection_header);
466 std_msgs::String msg;
568 while (nh.
ok() || !
queue_->empty()) {
571 bool finished =
false;
574 lock.release()->unlock();
579 boost::xtime_get(&xt, boost::TIME_UTC_);
580 xt.nsec += 250000000;
595 lock.release()->unlock();
633 lock.release()->unlock();
635 string target_filename = out_queue.
filename;
636 string write_filename = target_filename + string(
".active");
642 ROS_ERROR(
"Error writing: %s", ex.what());
646 while (!out_queue.
queue->empty()) {
648 out_queue.
queue->pop();
679 std::string peer_host;
684 ROS_ERROR(
"Bad xml-rpc URI trying to inspect node at: [%s]",
static_cast<std::string
>(resp[2]).c_str());
691 c.
execute(
"getSubscriptions", req2, resp2);
693 if (!c.
isFault() && resp2.
valid() && resp2.
size() > 0 &&
static_cast<int>(resp2[0]) == 1)
695 for(
int i = 0; i < resp2[2].
size(); i++)
701 ROS_ERROR(
"Node at: [%s] failed to return subscriptions.",
static_cast<std::string
>(resp[2]).c_str());
711 pub.
publish(std_msgs::Empty());
728 #if BOOST_FILESYSTEM_VERSION < 3
729 struct statvfs fiData;
732 ROS_WARN(
"Failed to check filesystem stats.");
735 unsigned long long free_space = 0;
736 free_space = (
unsigned long long) (fiData.f_bsize) * (
unsigned long long) (fiData.f_bavail);
752 boost::filesystem::path p(boost::filesystem::system_complete(
bag_.
getFileName().c_str()));
754 boost::filesystem::space_info info;
757 info = boost::filesystem::space(p);
759 catch (
const boost::filesystem::filesystem_error& e)
761 ROS_WARN(
"Failed to check filesystem stats [%s].", e.what());
790 ROS_WARN(
"Not logging message because logging disabled. Most likely cause is a full disk.");
ros::WallTime check_disk_next_
uint64_t split_count_
split count
void doRecord()
Thread that actually does writing to file.
WallDuration & fromSec(double t)
bool scheduledCheckDisk()
#define ROS_ERROR_STREAM(args)
OutgoingQueue(std::string const &_filename, std::queue< OutgoingMessage > *_queue, ros::Time _time)
Recorder(RecorderOptions const &options)
std::string min_space_str
uint64_t queue_size_
queue size
OutgoingMessage(std::string const &_topic, topic_tools::ShapeShifter::ConstPtr _msg, boost::shared_ptr< ros::M_string > _connection_header, ros::Time _time)
boost::shared_ptr< ros::Subscriber > subscribe(std::string const &topic)
boost::mutex queue_mutex_
mutex for queue
TransportHints transport_hints
bool execute(const char *method, XmlRpcValue const ¶ms, XmlRpcValue &result)
ROSCPP_DECL void shutdown()
ros::TransportHints transport_hints
std::map< std::pair< std::string, std::string >, OutgoingMessage > latched_msgs_
int exit_code_
eventual exit code
std::string write_filename_
ROSCPP_DECL bool getTopics(V_TopicInfo &topics)
unsigned long long min_space
void publish(const boost::shared_ptr< M > &message) const
Publisher advertise(AdvertiseOptions &ops)
boost::condition_variable_any queue_condition_
conditional variable for queue
int num_subscribers_
used for book-keeping of our number of subscribers
std::queue< OutgoingMessage > * queue_
queue for storing
topic_tools::ShapeShifter::ConstPtr msg
static std::string timeToStr(T ros_t)
void setCompression(CompressionType compression)
void doCheckMaster(ros::TimerEvent const &e, ros::NodeHandle &node_handle)
std::vector< TopicInfo > V_TopicInfo
std::string getFileName() const
void doQueue(const ros::MessageEvent< topic_tools::ShapeShifter const > &msg_event, std::string const &topic, boost::shared_ptr< ros::Subscriber > subscriber, boost::shared_ptr< int > count)
Callback to be invoked to save messages into a queue.
CompressionType compression
void snapshotTrigger(std_msgs::Empty::ConstPtr trigger)
Callback to be invoked to actually do the recording.
std::vector< std::string > topics
std::queue< OutgoingMessage > * queue
std::list< std::string > current_files_
Subscriber subscribe(const std::string &topic, uint32_t queue_size, const boost::function< void(C)> &callback, const VoidConstPtr &tracked_object=VoidConstPtr(), const TransportHints &transport_hints=TransportHints())
ROSCPP_DECL bool execute(const std::string &method, const XmlRpc::XmlRpcValue &request, XmlRpc::XmlRpcValue &response, XmlRpc::XmlRpcValue &payload, bool wait_for_master)
const ROSCPP_DECL std::string & getName()
ros::Duration max_duration
void write(std::string const &topic, ros::MessageEvent< T > const &event)
void doRecordSnapshotter()
bool checkDuration(const ros::Time &)
std::string target_filename_
bool shouldSubscribeToTopic(std::string const &topic, bool from_node=false)
static bool waitForValid()
std::queue< OutgoingQueue > queue_queue_
queue of queues to be used by the snapshot recorders
const boost::shared_ptr< M_string > & getConnectionHeaderPtr() const
boost::shared_ptr< ros::M_string > connection_header
boost::shared_ptr< M > getMessage() const
boost::mutex check_disk_mutex_
ros::Publisher pub_begin_write
ros::Time last_buffer_warn_
void open(std::string const &filename, uint32_t mode=bagmode::Read)
SubscriptionCallbackHelperPtr helper
void setChunkThreshold(uint32_t chunk_threshold)
boost::regex exclude_regex
Timer createTimer(Duration period, const TimerCallback &callback, bool oneshot=false, bool autostart=true) const
bool isSubscribed(std::string const &topic) const
ROSCPP_DECL bool splitURI(const std::string &uri, std::string &host, uint32_t &port)
std::set< std::string > currently_recording_
set of currently recording topics
rosbag
Author(s): Tim Field, Jeremy Leibs, James Bowman, Dirk Thomas
, Jacob Perron
autogenerated on Sat Sep 14 2024 03:00:07