38 #include <boost/filesystem.hpp>
39 #include <boost/scope_exit.hpp>
40 #include <boost/thread/xtime.hpp>
41 #include <boost/thread/shared_mutex.hpp>
42 #include <boost/date_time/local_time/local_time.hpp>
47 #include <rosbag_snapshot_msgs/SnapshotStatus.h>
67 : duration_limit_(duration_limit), memory_limit_(memory_limit), count_limit_(count_limit)
72 int32_t default_count_limit,
ros::Duration status_period,
bool clear_buffer)
73 : default_duration_limit_(default_duration_limit)
74 , default_memory_limit_(default_memory_limit)
75 , default_count_limit_(default_count_limit)
76 , status_period_(status_period)
77 , clear_buffer_(clear_buffer)
85 std::pair<topics_t::iterator, bool> ret;
86 ret =
topics_.insert(topics_t::value_type(topic, ops));
96 : msg(_msg), connection_header(_connection_header), time(_time)
111 boost::mutex::scoped_lock l(
lock);
114 status.traffic =
size_;
115 status.delivered_msgs =
queue_.size();
116 status.window_start =
queue_.front().time;
117 status.window_stop =
queue_.back().time;
122 boost::mutex::scoped_lock l(
lock);
145 ROS_WARN(
"Time has gone backwards. Clearing buffer for this topic.");
168 dt = time -
queue_.front().time;
181 boost::mutex::scoped_try_lock l(
lock);
192 boost::mutex::scoped_lock l(
lock);
198 return snapshot_msg.
msg->size() +
200 snapshot_msg.
msg->getDataType().size() +
201 snapshot_msg.
msg->getMD5Sum().size() +
202 snapshot_msg.
msg->getMessageDefinition().size() +
208 int32_t size = _out.
msg->size();
228 range_t::first_type begin =
queue_.begin();
229 range_t::second_type end =
queue_.end();
234 while (begin != end && (*begin).time <
start)
239 while (end != begin && (*(end - 1)).time > stop)
256 buffer.second->sub_->shutdown();
272 size_t ind = file.rfind(
".bag");
274 if (ind != string::npos && ind == file.size() - 4)
285 std::stringstream msg;
286 const boost::posix_time::ptime now = boost::posix_time::second_clock::local_time();
287 boost::posix_time::time_facet*
const f =
new boost::posix_time::time_facet(
"%Y-%m-%d-%H-%M-%S");
288 msg.imbue(std::locale(msg.getloc(),
f));
298 boost::shared_lock<boost::upgrade_mutex> lock(
state_lock_);
312 ROS_INFO(
"Subscribing to %s", topic.c_str());
318 ops.
md5sum = ros::message_traits::md5sum<topic_tools::ShapeShifter>();
319 ops.
datatype = ros::message_traits::datatype<topic_tools::ShapeShifter>();
321 boost::make_shared<ros::SubscriptionCallbackHelperT<const ros::MessageEvent<topic_tools::ShapeShifter const>&> >(
324 queue->setSubscriber(sub);
328 rosbag_snapshot_msgs::TriggerSnapshot::Request& req,
329 rosbag_snapshot_msgs::TriggerSnapshot::Response& res)
332 boost::mutex::scoped_lock l(message_queue.
lock);
337 if (!bag.
isOpen() && range.second > range.first)
346 res.message = string(
"failed to open bag: ") + err.what();
349 ROS_INFO(
"Writing snapshot to %s", req.filename.c_str());
354 ROS_INFO(
"Bag compression type LZ4");
359 ROS_INFO(
"Bag compression type BZ2");
371 for (MessageQueue::range_t::first_type msg_it = range.first; msg_it != range.second; ++msg_it)
380 res.message = string(
"failed to write bag: ") + err.what();
386 rosbag_snapshot_msgs::TriggerSnapshot::Response& res)
391 res.message =
"invalid";
394 bool recording_prior;
396 boost::upgrade_lock<boost::upgrade_mutex> read_lock(
state_lock_);
401 res.message =
"Already writing";
404 boost::upgrade_to_unique_lock<boost::upgrade_mutex> write_lock(read_lock);
414 boost::unique_lock<boost::upgrade_mutex> write_lock(
state_lock_);
426 if (req.topics.size() && req.topics.at(0).size())
428 for (std::string& topic : req.topics)
437 ROS_WARN(
"Requested topic %s is invalid, skipping.", topic.c_str());
442 buffers_t::iterator found =
buffers_.find(topic);
446 ROS_WARN(
"Requested topic %s is not subscribed, skipping.", topic.c_str());
450 if (!
writeTopic(bag, message_queue, topic, req, res))
457 for (
const buffers_t::value_type& pair :
buffers_)
460 std::string
const& topic = pair.first;
461 if (!
writeTopic(bag, message_queue, topic, req, res))
470 res.message = res.NO_DATA_MESSAGE;
480 for (
const buffers_t::value_type& pair :
buffers_)
482 pair.second->clear();
505 boost::upgrade_lock<boost::upgrade_mutex> read_lock(
state_lock_);
509 res.message =
"cannot enable recording while writing.";
515 boost::upgrade_to_unique_lock<boost::upgrade_mutex> write_lock(read_lock);
520 boost::upgrade_to_unique_lock<boost::upgrade_mutex> write_lock(read_lock);
536 rosbag_snapshot_msgs::SnapshotStatus msg;
538 boost::shared_lock<boost::upgrade_mutex> lock(
state_lock_);
542 for (
const buffers_t::value_type& pair :
buffers_)
544 rosgraph_msgs::TopicStatistics status;
545 status.node_sub = node_id;
546 status.topic = pair.first;
547 pair.second->fillStatus(status);
548 msg.topics.push_back(status);
562 std::string topic = t.name;
569 std::pair<buffers_t::iterator, bool> res =
buffers_.insert(buffers_t::value_type(topic, queue));
570 ROS_ASSERT_MSG(res.second,
"failed to add %s to topics. Perhaps it is a duplicate?", topic.c_str());
587 for (SnapshotterOptions::topics_t::value_type& pair :
options_.
topics_)
593 std::pair<buffers_t::iterator, bool> res =
buffers_.insert(buffers_t::value_type(topic, queue));
594 ROS_ASSERT_MSG(res.second,
"failed to add %s to topics. Perhaps it is a duplicate?", topic.c_str());
629 ROS_ERROR(
"Service %s does not exist. Is snapshot running in this namespace?",
"trigger_snapshot");
632 rosbag_snapshot_msgs::TriggerSnapshotRequest req;
638 size_t ind = req.filename.rfind(
".bag");
639 if (ind != string::npos && ind == req.filename.size() - 4)
640 req.filename.erase(ind);
645 size_t ind = req.filename.rfind(
".bag");
646 if (ind == string::npos || ind != req.filename.size() - 4)
647 req.filename +=
".bag";
651 if (req.filename.empty())
653 boost::filesystem::path p(boost::filesystem::system_complete(req.filename));
654 req.filename = p.string();
656 rosbag_snapshot_msgs::TriggerSnapshotResponse res;
657 if (!client.
call(req, res))
674 ROS_ERROR(
"Service %s does not exist. Is snapshot running in this namespace?",
"enable_snapshot");
677 std_srvs::SetBoolRequest req;
679 std_srvs::SetBoolResponse res;
680 if (!client.
call(req, res))