38 #include <boost/filesystem.hpp> 41 #if BOOST_FILESYSTEM_VERSION < 3 42 #include <sys/statvfs.h> 51 #include <boost/lexical_cast.hpp> 52 #include <boost/regex.hpp> 53 #include <boost/thread.hpp> 54 #include <boost/thread/xtime.hpp> 55 #include <boost/date_time/local_time/local_time.hpp> 77 topic(_topic), msg(_msg), connection_header(_connection_header), time(_time)
84 filename(_filename), queue(_queue),
time(_time)
104 buffer_size(1048576 * 256),
105 chunk_size(1024 * 768),
112 min_space(1024 * 1024 * 1024),
125 writing_enabled_(true)
138 fprintf(stderr,
"Specifying a count is not valid with automatic topic subscription.\n");
144 fprintf(stderr,
"No topics specified.\n");
159 queue_ =
new std::queue<OutgoingMessage>;
168 ROS_WARN(
"/use_sim_time set to true and no clock published. Still waiting for valid time...");
181 boost::thread record_thread;
184 record_thread = boost::thread([
this]() {
194 catch (
const std::exception& ex)
211 record_thread = boost::thread([
this]() {
221 catch (
const std::exception& ex)
247 record_thread.join();
255 ROS_INFO(
"Subscribing to %s", topic.c_str());
264 ops.
md5sum = ros::message_traits::md5sum<topic_tools::ShapeShifter>();
265 ops.
datatype = ros::message_traits::datatype<topic_tools::ShapeShifter>();
270 *sub = nh.subscribe(ops);
301 [&topic] (
string const& regex_str){
302 boost::regex e(regex_str);
304 return boost::regex_match(topic, what, e, boost::match_extra);
316 std::stringstream msg;
317 const boost::posix_time::ptime now=
318 boost::posix_time::second_clock::local_time();
319 boost::posix_time::time_facet *
const f=
320 new boost::posix_time::time_facet(
"%Y-%m-%d-%H-%M-%S");
321 msg.imbue(std::locale(msg.getloc(),f));
329 Time rectime = Time::now();
332 cout <<
"Received message on topic " << subscriber->getTopic() << endl;
349 Time now = Time::now();
351 ROS_WARN(
"rosbag record buffer exceeded. Dropping oldest queued message.");
365 subscriber->shutdown();
376 vector<string> parts;
379 size_t ind = prefix.rfind(
".bag");
381 if (ind != std::string::npos && ind == prefix.size() - 4)
386 if (prefix.length() > 0)
387 parts.push_back(prefix);
391 parts.push_back(boost::lexical_cast<string>(
split_count_));
393 if (parts.size() == 0)
395 throw BagException(
"Bag filename is empty (neither of these was specified: prefix, append_date, split)");
399 for (
unsigned int i = 1; i < parts.size(); i++)
416 queue_ =
new std::queue<OutgoingMessage>;
432 ROS_ERROR(
"Error writing: %s", e.what());
440 std_msgs::String msg;
542 while (nh.
ok() || !
queue_->empty()) {
545 bool finished =
false;
548 lock.release()->unlock();
553 #if BOOST_VERSION >= 105000 554 boost::xtime_get(&xt, boost::TIME_UTC_);
556 boost::xtime_get(&xt, boost::TIME_UTC);
558 xt.nsec += 250000000;
573 lock.release()->unlock();
611 lock.release()->unlock();
613 string target_filename = out_queue.
filename;
614 string write_filename = target_filename + string(
".active");
620 ROS_ERROR(
"Error writing: %s", ex.what());
624 while (!out_queue.
queue->empty()) {
626 out_queue.
queue->pop();
657 std::string peer_host;
662 ROS_ERROR(
"Bad xml-rpc URI trying to inspect node at: [%s]", static_cast<std::string>(resp[2]).c_str());
669 c.execute(
"getSubscriptions", req2, resp2);
671 if (!c.isFault() && resp2.
valid() && resp2.
size() > 0 &&
static_cast<int>(resp2[0]) == 1)
673 for(
int i = 0; i < resp2[2].
size(); i++)
679 ROS_ERROR(
"Node at: [%s] failed to return subscriptions.", static_cast<std::string>(resp[2]).c_str());
689 pub.
publish(std_msgs::Empty());
706 #if BOOST_FILESYSTEM_VERSION < 3 707 struct statvfs fiData;
710 ROS_WARN(
"Failed to check filesystem stats.");
713 unsigned long long free_space = 0;
714 free_space = (
unsigned long long) (fiData.f_bsize) * (
unsigned long long) (fiData.f_bavail);
730 boost::filesystem::path p(boost::filesystem::system_complete(
bag_.
getFileName().c_str()));
732 boost::filesystem::space_info info;
735 info = boost::filesystem::space(p);
737 catch (
const boost::filesystem::filesystem_error& e)
739 ROS_WARN(
"Failed to check filesystem stats [%s].", e.what());
768 ROS_WARN(
"Not logging message because logging disabled. Most likely cause is a full disk.");
ros::TransportHints transport_hints
void doRecordSnapshotter()
ros::Time last_buffer_warn_
int exit_code_
eventual exit code
void open(std::string const &filename, uint32_t mode=bagmode::Read)
Timer createTimer(Rate r, Handler h, Obj o, bool oneshot=false, bool autostart=true) const
Subscriber subscribe(const std::string &topic, uint32_t queue_size, void(T::*fp)(M), T *obj, const TransportHints &transport_hints=TransportHints())
uint64_t queue_size_
queue size
ros::WallTime check_disk_next_
boost::shared_ptr< M > getMessage() const
boost::mutex queue_mutex_
mutex for queue
void setChunkThreshold(uint32_t chunk_threshold)
std::string target_filename_
boost::regex exclude_regex
ROSCPP_DECL bool splitURI(const std::string &uri, std::string &host, uint32_t &port)
OutgoingQueue(std::string const &_filename, std::queue< OutgoingMessage > *_queue, ros::Time _time)
ROSCPP_DECL const std::string & getName()
ROSCPP_DECL bool getTopics(V_TopicInfo &topics)
topic_tools::ShapeShifter::ConstPtr msg
std::vector< TopicInfo > V_TopicInfo
bool isSubscribed(std::string const &topic) const
std::string write_filename_
ros::Duration max_duration
std::set< std::string > currently_recording_
set of currently recording topics
OutgoingMessage(std::string const &_topic, topic_tools::ShapeShifter::ConstPtr _msg, boost::shared_ptr< ros::M_string > _connection_header, ros::Time _time)
void doCheckMaster(ros::TimerEvent const &e, ros::NodeHandle &node_handle)
static std::string timeToStr(T ros_t)
std::queue< OutgoingMessage > * queue
void publish(const boost::shared_ptr< M > &message) const
SubscriptionCallbackHelperPtr helper
WallDuration & fromSec(double t)
bool scheduledCheckDisk()
void doQueue(const ros::MessageEvent< topic_tools::ShapeShifter const > &msg_event, std::string const &topic, boost::shared_ptr< ros::Subscriber > subscriber, boost::shared_ptr< int > count)
Callback to be invoked to save messages into a queue.
void setCompression(CompressionType compression)
ROSCPP_DECL bool execute(const std::string &method, const XmlRpc::XmlRpcValue &request, XmlRpc::XmlRpcValue &response, XmlRpc::XmlRpcValue &payload, bool wait_for_master)
std::list< std::string > current_files_
boost::condition_variable_any queue_condition_
conditional variable for queue
boost::shared_ptr< ros::Subscriber > subscribe(std::string const &topic)
Publisher advertise(const std::string &topic, uint32_t queue_size, bool latch=false)
TransportHints transport_hints
unsigned long long min_space
boost::mutex check_disk_mutex_
Recorder(RecorderOptions const &options)
bool checkDuration(const ros::Time &)
void snapshotTrigger(std_msgs::Empty::ConstPtr trigger)
Callback to be invoked to actually do the recording.
uint64_t split_count_
split count
std::queue< OutgoingQueue > queue_queue_
queue of queues to be used by the snapshot recorders
std::string getFileName() const
ros::Publisher pub_begin_write
ROSCPP_DECL void shutdown()
void doRecord()
Thread that actually does writing to file.
int num_subscribers_
used for book-keeping of our number of subscribers
std::queue< OutgoingMessage > * queue_
queue for storing
CompressionType compression
#define ROS_ERROR_STREAM(args)
const boost::shared_ptr< M_string > & getConnectionHeaderPtr() const
static bool waitForValid()
std::vector< std::string > topics
std::string min_space_str
bool shouldSubscribeToTopic(std::string const &topic, bool from_node=false)
void write(std::string const &topic, ros::MessageEvent< T > const &event)
boost::shared_ptr< ros::M_string > connection_header