46 #include <boost/filesystem.hpp> 49 #if BOOST_FILESYSTEM_VERSION < 3 50 #include <sys/statvfs.h> 58 #include <boost/lexical_cast.hpp> 59 #include <boost/regex.hpp> 60 #include <boost/thread.hpp> 61 #include <boost/thread/xtime.hpp> 62 #include <boost/date_time/local_time/local_time.hpp> 87 topic(
std::move(_topic)), msg(
std::move(_msg)), connection_header(
std::move(_connection_header)), time(_time)
93 OutgoingQueue::OutgoingQueue(
string _filename, std::shared_ptr<std::queue<OutgoingMessage>> _queue,
Time _time) :
94 filename(
std::move(_filename)), queue(
std::move(_queue)), time(_time)
102 options_(
std::move(options)),
106 writing_enabled_(true)
119 fprintf(
stderr,
"Specifing a count is not valid with automatic topic subscription.\n");
125 fprintf(
stderr,
"No topics specified.\n");
141 queue_ = std::make_shared<std::queue<OutgoingMessage>>();
153 ROS_WARN(
"/use_sim_time set to true and no clock published. Still waiting for valid time...");
168 boost::thread record_thread;
192 record_thread.join();
195 check_master_timer.
stop();
204 ROS_INFO(
"Subscribing to %s", topic.c_str());
212 ops.
md5sum = ros::message_traits::md5sum<topic_tools::ShapeShifter>();
213 ops.
datatype = ros::message_traits::datatype<topic_tools::ShapeShifter>();
246 [&topic] (
string const& regex_str){
247 boost::regex e(regex_str);
249 return boost::regex_match(topic, what, e, boost::match_extra);
261 std::stringstream msg;
262 const boost::posix_time::ptime now=
263 boost::posix_time::second_clock::local_time();
264 boost::posix_time::time_facet *
const f=
265 new boost::posix_time::time_facet(
"%Y-%m-%d-%H-%M-%S");
266 msg.imbue(std::locale(msg.getloc(),f));
277 Time rectime = Time::now();
281 cout <<
"Received message on topic " << subscriber->
getTopic() << endl;
299 Time now = Time::now();
301 ROS_WARN(
"rosbag record buffer exceeded. Dropping oldest queued message.");
323 vector<string> parts;
326 size_t ind = prefix.rfind(
".bag");
328 if (ind != std::string::npos && ind == prefix.size() - 4)
333 if (prefix.length() > 0)
335 parts.push_back(prefix);
343 parts.push_back(boost::lexical_cast<string>(
split_count_));
347 throw rosbag::BagException(
"Bag filename is empty (neither of these was specified: prefix, append_date, split)");
351 for (
unsigned int i = 1; i < parts.size(); i++)
370 queue_ = std::make_shared<std::queue<OutgoingMessage>>();
386 ROS_ERROR(
"Error writing: %s", e.what());
394 std_msgs::String msg;
494 while (nh.
ok() || !
queue_->empty()) {
497 bool finished =
false;
500 lock.release()->unlock();
505 #if BOOST_VERSION >= 105000 506 boost::xtime_get(&xt, boost::TIME_UTC_);
508 boost::xtime_get(&xt, boost::TIME_UTC);
510 xt.nsec += 250000000;
527 lock.release()->unlock();
573 lock.release()->unlock();
575 string target_filename = out_queue.
filename;
576 string write_filename = target_filename + string(
".active");
582 ROS_ERROR(
"Error writing: %s", ex.what());
586 while (!out_queue.
queue->empty()) {
588 out_queue.
queue->pop();
621 std::string peer_host;
626 ROS_ERROR(
"Bad xml-rpc URI trying to inspect node at: [%s]", static_cast<std::string>(resp[2]).c_str());
633 c.execute(
"getSubscriptions", req2, resp2);
635 if (!c.isFault() && resp2.
valid() && resp2.
size() > 0 &&
static_cast<int>(resp2[0]) == 1)
637 for(
int i = 0; i < resp2[2].
size(); i++)
647 ROS_ERROR(
"Node at: [%s] failed to return subscriptions.", static_cast<std::string>(resp[2]).c_str());
657 pub.
publish(std_msgs::Empty());
676 #if BOOST_FILESYSTEM_VERSION < 3 677 struct statvfs fiData;
680 ROS_WARN(
"Failed to check filesystem stats.");
683 unsigned long long free_space = 0;
684 free_space = (
unsigned long long) (fiData.f_bsize) * (
unsigned long long) (fiData.f_bavail);
700 boost::filesystem::path p(boost::filesystem::system_complete(
bag_.
getFileName().c_str()));
702 boost::filesystem::space_info info {};
705 info = boost::filesystem::space(p);
707 catch (
const boost::filesystem::filesystem_error& e)
709 ROS_WARN(
"Failed to check filesystem stats [%s].", e.what());
740 ROS_WARN(
"Not logging message because logging disabled. Most likely cause is a full disk.");
ros::WallTime check_disk_next_
uint64_t split_count_
split count
topic_tools::ShapeShifter::ConstPtr msg
void publish(const boost::shared_ptr< M > &message) const
OutgoingMessage()=default
boost::regex exclude_regex
void open(std::string const &filename, uint32_t mode=bagmode::Read)
void DoRecord()
Thread that actually does writing to file.
std::list< std::string > current_files_
std::string getFileName() const
Subscriber subscribe(const std::string &topic, uint32_t queue_size, void(T::*fp)(M), T *obj, const TransportHints &transport_hints=TransportHints())
std::shared_ptr< std::queue< OutgoingMessage > > queue
void setChunkThreshold(uint32_t chunk_threshold)
ROSCPP_DECL bool splitURI(const std::string &uri, std::string &host, uint32_t &port)
ROSCPP_DECL const std::string & getName()
ROSCPP_DECL bool getTopics(V_TopicInfo &topics)
int exit_code_
eventual exit code
std::string target_filename_
const boost::shared_ptr< M_string > & getConnectionHeaderPtr() const
std::vector< TopicInfo > V_TopicInfo
boost::shared_ptr< ros::Subscriber > Subscribe(ros::NodeHandle &nh, std::string const &topic)
std::string write_filename_
boost::mutex queue_mutex_
mutex for queue
ros::Time last_buffer_warn_
bool IsSubscribed(std::string const &topic) const
std::set< std::string > currently_recording_
set of currenly recording topics
boost::condition_variable_any queue_condition_
conditional variable for queue
SubscriptionCallbackHelperPtr helper
WallDuration & fromSec(double t)
uint64_t queue_size_
queue size
void setCompression(CompressionType compression)
static std::string TimeToStr(T ros_t)
std::string getTopic() const
Timer createTimer(Rate r, Handler h, Obj o, bool oneshot=false, bool autostart=true) const
ROSCPP_DECL bool execute(const std::string &method, const XmlRpc::XmlRpcValue &request, XmlRpc::XmlRpcValue &response, XmlRpc::XmlRpcValue &payload, bool wait_for_master)
rosbag::CompressionType compression
bool ShouldSubscribeToTopic(std::string const &topic, bool from_node=false)
std::string min_space_str
void DoCheckMaster(ros::TimerEvent const &e, ros::NodeHandle &node_handle)
Publisher advertise(const std::string &topic, uint32_t queue_size, bool latch=false)
std::queue< OutgoingQueue > queue_queue_
queue of queues to be used by the snapshot recorders
TransportHints transport_hints
std::vector< boost::shared_ptr< ros::Subscriber > > subscribers_
ros::Publisher pub_begin_write_
std::shared_ptr< std::queue< OutgoingMessage > > queue_
queue for storing
boost::mutex check_disk_mutex_
std::vector< std::string > topics
ROSCPP_DECL void shutdown()
void DoRecordSnapshotter()
bool ScheduledCheckDisk()
bool CheckDuration(const ros::Time &t)
#define ROS_ERROR_STREAM(args)
boost::shared_ptr< ros::M_string > connection_header
ros::Duration max_duration
static bool waitForValid()
void write(std::string const &topic, ros::MessageEvent< T > const &event)
void SnapshotTrigger(std_msgs::Empty::ConstPtr trigger)
Callback to be invoked to actually do the recording.
void DoQueue(const ros::MessageEvent< topic_tools::ShapeShifter const > &msg_event, std::string const &topic, ros::Subscriber *subscriber, const boost::shared_ptr< int > &count)
Callback to be invoked to save messages into a queue.
boost::shared_ptr< M > getMessage() const
ros::TransportHints transport_hints