38 #include <boost/filesystem.hpp> 41 #if BOOST_FILESYSTEM_VERSION < 3 42 #include <sys/statvfs.h> 51 #include <boost/foreach.hpp> 52 #include <boost/lexical_cast.hpp> 53 #include <boost/regex.hpp> 54 #include <boost/thread.hpp> 55 #include <boost/thread/xtime.hpp> 56 #include <boost/date_time/local_time/local_time.hpp> 65 #define foreach BOOST_FOREACH 80 topic(_topic), msg(_msg), connection_header(_connection_header), time(_time)
87 filename(_filename), queue(_queue),
time(_time)
106 buffer_size(1048576 * 256),
107 chunk_size(1024 * 768),
114 min_space(1024 * 1024 * 1024),
127 writing_enabled_(true)
140 fprintf(stderr,
"Specifing a count is not valid with automatic topic subscription.\n");
146 fprintf(stderr,
"No topics specified.\n");
156 queue_ =
new std::queue<OutgoingMessage>;
165 ROS_WARN(
"/use_sim_time set to true and no clock published. Still waiting for valid time...");
178 boost::thread record_thread;
204 record_thread.join();
212 ROS_INFO(
"Subscribing to %s", topic.c_str());
221 ops.
md5sum = ros::message_traits::md5sum<topic_tools::ShapeShifter>();
222 ops.
datatype = ros::message_traits::datatype<topic_tools::ShapeShifter>();
227 *sub = nh.subscribe(ops);
257 boost::regex e(regex_str);
259 if (boost::regex_match(topic, what, e, boost::match_extra))
276 std::stringstream msg;
277 const boost::posix_time::ptime now=
278 boost::posix_time::second_clock::local_time();
279 boost::posix_time::time_facet *
const f=
280 new boost::posix_time::time_facet(
"%Y-%m-%d-%H-%M-%S");
281 msg.imbue(std::locale(msg.getloc(),f));
289 Time rectime = Time::now();
292 cout <<
"Received message on topic " << subscriber->getTopic() << endl;
309 Time now = Time::now();
311 ROS_WARN(
"rosbag record buffer exceeded. Dropping oldest queued message.");
325 subscriber->shutdown();
336 vector<string> parts;
339 size_t ind = prefix.rfind(
".bag");
341 if (ind != std::string::npos && ind == prefix.size() - 4)
346 if (prefix.length() > 0)
347 parts.push_back(prefix);
351 parts.push_back(boost::lexical_cast<string>(
split_count_));
353 if (parts.size() == 0)
355 throw BagException(
"Bag filename is empty (neither of these was specified: prefix, append_date, split)");
359 for (
unsigned int i = 1; i < parts.size(); i++)
376 queue_ =
new std::queue<OutgoingMessage>;
392 ROS_ERROR(
"Error writing: %s", e.what());
483 while (nh.
ok() || !
queue_->empty()) {
486 bool finished =
false;
489 lock.release()->unlock();
494 #if BOOST_VERSION >= 105000 495 boost::xtime_get(&xt, boost::TIME_UTC_);
497 boost::xtime_get(&xt, boost::TIME_UTC);
499 xt.nsec += 250000000;
514 lock.release()->unlock();
543 lock.release()->unlock();
545 string target_filename = out_queue.
filename;
546 string write_filename = target_filename + string(
".active");
552 ROS_ERROR(
"Error writing: %s", ex.what());
556 while (!out_queue.
queue->empty()) {
558 out_queue.
queue->pop();
589 std::string peer_host;
594 ROS_ERROR(
"Bad xml-rpc URI trying to inspect node at: [%s]", static_cast<std::string>(resp[2]).c_str());
601 c.execute(
"getSubscriptions", req2, resp2);
603 if (!c.isFault() && resp2.
valid() && resp2.
size() > 0 &&
static_cast<int>(resp2[0]) == 1)
605 for(
int i = 0; i < resp2[2].
size(); i++)
611 ROS_ERROR(
"Node at: [%s] failed to return subscriptions.", static_cast<std::string>(resp[2]).c_str());
621 pub.
publish(std_msgs::Empty());
638 #if BOOST_FILESYSTEM_VERSION < 3 639 struct statvfs fiData;
642 ROS_WARN(
"Failed to check filesystem stats.");
645 unsigned long long free_space = 0;
646 free_space = (
unsigned long long) (fiData.f_bsize) * (
unsigned long long) (fiData.f_bavail);
662 boost::filesystem::path p(boost::filesystem::system_complete(
bag_.
getFileName().c_str()));
664 boost::filesystem::space_info info;
667 info = boost::filesystem::space(p);
669 catch (boost::filesystem::filesystem_error &e)
671 ROS_WARN(
"Failed to check filesystem stats [%s].", e.what());
701 ROS_WARN(
"Not logging message because logging disabled. Most likely cause is a full disk.");
ros::TransportHints transport_hints
void doRecordSnapshotter()
ros::Time last_buffer_warn_
int exit_code_
eventual exit code
void publish(const boost::shared_ptr< M > &message) const
void open(std::string const &filename, uint32_t mode=bagmode::Read)
std::string getFileName() const
Subscriber subscribe(const std::string &topic, uint32_t queue_size, void(T::*fp)(M), T *obj, const TransportHints &transport_hints=TransportHints())
uint64_t queue_size_
queue size
ros::WallTime check_disk_next_
boost::mutex queue_mutex_
mutex for queue
void setChunkThreshold(uint32_t chunk_threshold)
std::string target_filename_
boost::regex exclude_regex
ROSCPP_DECL bool splitURI(const std::string &uri, std::string &host, uint32_t &port)
OutgoingQueue(std::string const &_filename, std::queue< OutgoingMessage > *_queue, ros::Time _time)
ROSCPP_DECL const std::string & getName()
ROSCPP_DECL bool getTopics(V_TopicInfo &topics)
topic_tools::ShapeShifter::ConstPtr msg
const boost::shared_ptr< M_string > & getConnectionHeaderPtr() const
std::vector< TopicInfo > V_TopicInfo
std::string write_filename_
ros::Duration max_duration
std::set< std::string > currently_recording_
set of currenly recording topics
OutgoingMessage(std::string const &_topic, topic_tools::ShapeShifter::ConstPtr _msg, boost::shared_ptr< ros::M_string > _connection_header, ros::Time _time)
void doCheckMaster(ros::TimerEvent const &e, ros::NodeHandle &node_handle)
static std::string timeToStr(T ros_t)
std::queue< OutgoingMessage > * queue
SubscriptionCallbackHelperPtr helper
WallDuration & fromSec(double t)
bool scheduledCheckDisk()
bool isSubscribed(std::string const &topic) const
void setCompression(CompressionType compression)
Timer createTimer(Rate r, Handler h, Obj o, bool oneshot=false, bool autostart=true) const
ROSCPP_DECL bool execute(const std::string &method, const XmlRpc::XmlRpcValue &request, XmlRpc::XmlRpcValue &response, XmlRpc::XmlRpcValue &payload, bool wait_for_master)
std::list< std::string > current_files_
boost::condition_variable_any queue_condition_
conditional variable for queue
boost::shared_ptr< ros::Subscriber > subscribe(std::string const &topic)
Publisher advertise(const std::string &topic, uint32_t queue_size, bool latch=false)
TransportHints transport_hints
unsigned long long min_space
void doQueue(const ros::MessageEvent< topic_tools::ShapeShifter const > &msg_event, std::string const &topic, boost::shared_ptr< ros::Subscriber > subscriber, boost::shared_ptr< int > count)
Callback to be invoked to save messages into a queue.
boost::mutex check_disk_mutex_
Recorder(RecorderOptions const &options)
bool checkDuration(const ros::Time &)
void snapshotTrigger(std_msgs::Empty::ConstPtr trigger)
Callback to be invoked to actually do the recording.
uint64_t split_count_
split count
std::queue< OutgoingQueue > queue_queue_
queue of queues to be used by the snapshot recorders
ROSCPP_DECL void shutdown()
void doRecord()
Thread that actually does writing to file.
int num_subscribers_
used for book-keeping of our number of subscribers
std::queue< OutgoingMessage > * queue_
queue for storing
CompressionType compression
static bool waitForValid()
std::vector< std::string > topics
std::string min_space_str
bool shouldSubscribeToTopic(std::string const &topic, bool from_node=false)
void write(std::string const &topic, ros::MessageEvent< T > const &event)
boost::shared_ptr< M > getMessage() const
boost::shared_ptr< ros::M_string > connection_header