38 #include <boost/filesystem.hpp> 39 #include <boost/scope_exit.hpp> 40 #include <boost/thread/xtime.hpp> 41 #include <boost/thread/shared_mutex.hpp> 42 #include <boost/date_time/local_time/local_time.hpp> 47 #include <rosbag_snapshot_msgs/SnapshotStatus.h> 67 : duration_limit_(duration_limit), memory_limit_(memory_limit), count_limit_(count_limit)
73 : default_duration_limit_(default_duration_limit)
74 , default_memory_limit_(default_memory_limit)
75 , default_count_limit_(default_count_limit)
76 , status_period_(status_period)
84 std::pair<topics_t::iterator, bool> ret;
85 ret =
topics_.insert(topics_t::value_type(topic, ops));
95 : msg(_msg), connection_header(_connection_header), time(_time)
110 boost::mutex::scoped_lock l(
lock);
113 status.traffic =
size_;
114 status.delivered_msgs =
queue_.size();
115 status.window_start =
queue_.front().time;
116 status.window_stop =
queue_.back().time;
121 boost::mutex::scoped_lock l(
lock);
144 ROS_WARN(
"Time has gone backwards. Clearing buffer for this topic.");
167 dt = time -
queue_.front().time;
180 boost::mutex::scoped_try_lock l(
lock);
191 boost::mutex::scoped_lock l(
lock);
197 return snapshot_msg.
msg->size() +
199 snapshot_msg.
msg->getDataType().size() +
200 snapshot_msg.
msg->getMD5Sum().size() +
201 snapshot_msg.
msg->getMessageDefinition().size() +
207 int32_t size = _out.
msg->size();
227 range_t::first_type begin =
queue_.begin();
228 range_t::second_type end =
queue_.end();
233 while (begin != end && (*begin).time < start)
238 while (end != begin && (*(end - 1)).time > stop)
257 buffer.second->sub_->shutdown();
263 if (options.
duration_limit_ == SnapshotterTopicOptions::INHERIT_DURATION_LIMIT)
265 if (options.
memory_limit_ == SnapshotterTopicOptions::INHERIT_MEMORY_LIMIT)
267 if (options.
count_limit_ == SnapshotterTopicOptions::INHERIT_COUNT_LIMIT)
273 size_t ind = file.rfind(
".bag");
275 if (ind != string::npos && ind == file.size() - 4)
286 std::stringstream msg;
287 const boost::posix_time::ptime now = boost::posix_time::second_clock::local_time();
288 boost::posix_time::time_facet*
const f =
new boost::posix_time::time_facet(
"%Y-%m-%d-%H-%M-%S");
289 msg.imbue(std::locale(msg.getloc(), f));
299 boost::shared_lock<boost::upgrade_mutex> lock(
state_lock_);
313 ROS_INFO(
"Subscribing to %s", topic.c_str());
319 ops.
md5sum = ros::message_traits::md5sum<topic_tools::ShapeShifter>();
320 ops.
datatype = ros::message_traits::datatype<topic_tools::ShapeShifter>();
322 boost::make_shared<ros::SubscriptionCallbackHelperT<const ros::MessageEvent<topic_tools::ShapeShifter const>&> >(
325 queue->setSubscriber(sub);
329 rosbag_snapshot_msgs::TriggerSnapshot::Request& req,
330 rosbag_snapshot_msgs::TriggerSnapshot::Response& res)
333 boost::mutex::scoped_lock l(message_queue.
lock);
338 if (!bag.
isOpen() && range.second > range.first)
347 res.message = string(
"failed to open bag: ") + err.what();
350 ROS_INFO(
"Writing snapshot to %s", req.filename.c_str());
355 ROS_INFO(
"Bag compression type LZ4");
360 ROS_INFO(
"Bag compression type BZ2");
372 for (MessageQueue::range_t::first_type msg_it = range.first; msg_it != range.second; ++msg_it)
381 res.message = string(
"failed to write bag: ") + err.what();
387 rosbag_snapshot_msgs::TriggerSnapshot::Response& res)
392 res.message =
"invalid";
395 bool recording_prior;
397 boost::upgrade_lock<boost::upgrade_mutex> read_lock(
state_lock_);
402 res.message =
"Already writing";
405 boost::upgrade_to_unique_lock<boost::upgrade_mutex> write_lock(read_lock);
415 boost::unique_lock<boost::upgrade_mutex> write_lock(
state_lock_);
427 if (req.topics.size() && req.topics.at(0).size())
429 for (std::string& topic : req.topics)
438 ROS_WARN(
"Requested topic %s is invalid, skipping.", topic.c_str());
443 buffers_t::iterator found =
buffers_.find(topic);
447 ROS_WARN(
"Requested topic %s is not subscribed, skipping.", topic.c_str());
451 if (!
writeTopic(bag, message_queue, topic, req, res))
458 for (
const buffers_t::value_type& pair :
buffers_)
461 std::string
const& topic = pair.first;
462 if (!
writeTopic(bag, message_queue, topic, req, res))
471 res.message = res.NO_DATA_MESSAGE;
481 for (
const buffers_t::value_type& pair :
buffers_)
483 pair.second->clear();
497 ROS_INFO(
"Buffering resumed and old data cleared.");
502 boost::upgrade_lock<boost::upgrade_mutex> read_lock(
state_lock_);
506 res.message =
"cannot enable recording while writing.";
512 boost::upgrade_to_unique_lock<boost::upgrade_mutex> write_lock(read_lock);
517 boost::upgrade_to_unique_lock<boost::upgrade_mutex> write_lock(read_lock);
533 rosbag_snapshot_msgs::SnapshotStatus msg;
535 boost::shared_lock<boost::upgrade_mutex> lock(
state_lock_);
539 for (
const buffers_t::value_type& pair :
buffers_)
541 rosgraph_msgs::TopicStatistics status;
542 status.node_sub = node_id;
543 status.topic = pair.first;
544 pair.second->fillStatus(status);
545 msg.topics.push_back(status);
559 std::string topic = t.name;
566 std::pair<buffers_t::iterator, bool> res =
buffers_.insert(buffers_t::value_type(topic, queue));
567 ROS_ASSERT_MSG(res.second,
"failed to add %s to topics. Perhaps it is a duplicate?", topic.c_str());
584 for (SnapshotterOptions::topics_t::value_type& pair :
options_.
topics_)
590 std::pair<buffers_t::iterator, bool> res =
buffers_.insert(buffers_t::value_type(topic, queue));
591 ROS_ASSERT_MSG(res.second,
"failed to add %s to topics. Perhaps it is a duplicate?", topic.c_str());
607 boost::placeholders::_1, &
options_));
626 ROS_ERROR(
"Service %s does not exist. Is snapshot running in this namespace?",
"trigger_snapshot");
629 rosbag_snapshot_msgs::TriggerSnapshotRequest req;
635 size_t ind = req.filename.rfind(
".bag");
636 if (ind != string::npos && ind == req.filename.size() - 4)
637 req.filename.erase(ind);
642 size_t ind = req.filename.rfind(
".bag");
643 if (ind == string::npos || ind != req.filename.size() - 4)
644 req.filename +=
".bag";
648 if (req.filename.empty())
650 boost::filesystem::path p(boost::filesystem::system_complete(req.filename));
651 req.filename = p.string();
653 rosbag_snapshot_msgs::TriggerSnapshotResponse res;
654 if (!client.
call(req, res))
671 ROS_ERROR(
"Service %s does not exist. Is snapshot running in this namespace?",
"enable_snapshot");
674 std_srvs::SetBoolRequest req;
676 std_srvs::SetBoolResponse res;
677 if (!client.
call(req, res))
void subscribe(std::string const &topic, boost::shared_ptr< MessageQueue > queue)
int32_t default_memory_limit_
ServiceClient serviceClient(const std::string &service_name, bool persistent=false, const M_string &header_values=M_string())
bool postfixFilename(std::string &file)
static const int32_t NO_COUNT_LIMIT
boost::shared_ptr< ros::M_string > connection_header
void pollTopics(ros::TimerEvent const &e, rosbag_snapshot::SnapshotterOptions *options)
boost::shared_ptr< ros::Subscriber > sub_
void publishStatus(ros::TimerEvent const &e)
std::string timeAsStr()
Return current local datetime as a string such as 2018-05-22-14-28-51. Used to generate bag filenames...
bool writeTopic(rosbag::Bag &bag, MessageQueue &message_queue, std::string const &topic, rosbag_snapshot_msgs::TriggerSnapshot::Request &req, rosbag_snapshot_msgs::TriggerSnapshot::Response &res)
void open(std::string const &filename, uint32_t mode=bagmode::Read)
Timer createTimer(Rate r, Handler h, Obj o, bool oneshot=false, bool autostart=true) const
bool enableCB(std_srvs::SetBool::Request &req, std_srvs::SetBool::Response &res)
ros::ServiceServer enable_server_
Subscriber subscribe(const std::string &topic, uint32_t queue_size, void(T::*fp)(M), T *obj, const TransportHints &transport_hints=TransportHints())
boost::shared_ptr< M > getMessage() const
static const int32_t INHERIT_MEMORY_LIMIT
void topicCB(const ros::MessageEvent< topic_tools::ShapeShifter const > &msg_event, boost::shared_ptr< MessageQueue > queue)
bool call(MReq &req, MRes &res)
void push(SnapshotMessage const &msg)
static const ros::Duration NO_DURATION_LIMIT
ROSCPP_DECL const std::string & getName()
ROSCPP_DECL bool getTopics(V_TopicInfo &topics)
ROSCPP_DECL std::string resolve(const std::string &name, bool remap=true)
ServiceServer advertiseService(const std::string &service, bool(T::*srv_func)(MReq &, MRes &), T *obj)
ros::Duration status_period_
topic_tools::ShapeShifter::ConstPtr msg
void fixTopicOptions(SnapshotterTopicOptions &options)
std::vector< TopicInfo > V_TopicInfo
boost::upgrade_mutex state_lock_
bool preparePush(int32_t size, ros::Time const &time)
int64_t getMessageSize(SnapshotMessage const &msg) const
SnapshotterOptions(ros::Duration default_duration_limit=ros::Duration(30), int32_t default_memory_limit=-1, int32_t default_count_limit=-1, ros::Duration status_period=ros::Duration(1))
SnapshotterOptions options_
void publish(const boost::shared_ptr< M > &message) const
SubscriptionCallbackHelperPtr helper
static const int32_t INHERIT_COUNT_LIMIT
std::pair< queue_t::const_iterator, queue_t::const_iterator > range_t
void _push(SnapshotMessage const &msg)
#define ROS_WARN_THROTTLE(period,...)
ros::Duration duration_limit_
SnapshotterTopicOptions options_
std::vector< std::string > topics_
#define ROS_ASSERT_MSG(cond,...)
static const int QUEUE_SIZE
void setCompression(CompressionType compression)
ros::Duration duration() const
ros::Timer poll_topic_timer_
static const ros::Duration INHERIT_DURATION_LIMIT
ros::Duration default_duration_limit_
Publisher advertise(const std::string &topic, uint32_t queue_size, bool latch=false)
virtual void spin(CallbackQueue *queue=0)
MessageQueue(SnapshotterTopicOptions const &options)
ros::Time getReceiptTime() const
SnapshotterTopicOptions(ros::Duration duration_limit=INHERIT_DURATION_LIMIT, int32_t memory_limit=INHERIT_MEMORY_LIMIT, int32_t count_limit=INHERIT_COUNT_LIMIT)
const std::string & getNamespace() const
SnapshotterClientOptions()
bool triggerSnapshotCb(rosbag_snapshot_msgs::TriggerSnapshot::Request &req, rosbag_snapshot_msgs::TriggerSnapshot::Response &res)
void fillStatus(rosgraph_msgs::TopicStatistics &status)
ros::ServiceServer trigger_snapshot_server_
const boost::shared_ptr< M_string > & getConnectionHeaderPtr() const
SnapshotMessage(topic_tools::ShapeShifter::ConstPtr _msg, boost::shared_ptr< ros::M_string > _connection_header, ros::Time _time)
uint32_t getNumSubscribers() const
void setSubscriber(boost::shared_ptr< ros::Subscriber > sub)
int run(SnapshotterClientOptions const &opts)
static const int32_t NO_MEMORY_LIMIT
ros::Publisher status_pub_
void write(std::string const &topic, ros::MessageEvent< T > const &event)
range_t rangeFromTimes(ros::Time const &start, ros::Time const &end)
Snapshotter(SnapshotterOptions const &options)
bool addTopic(std::string const &topic, ros::Duration duration_limit=SnapshotterTopicOptions::INHERIT_DURATION_LIMIT, int32_t memory_limit=SnapshotterTopicOptions::INHERIT_MEMORY_LIMIT, int32_t count_limit=SnapshotterTopicOptions::INHERIT_COUNT_LIMIT)