38 #include <boost/filesystem.hpp> 39 #include <boost/scope_exit.hpp> 40 #include <boost/thread/xtime.hpp> 41 #include <boost/thread/shared_mutex.hpp> 42 #include <boost/date_time/local_time/local_time.hpp> 46 #include <rosbag_snapshot_msgs/SnapshotStatus.h> 63 : duration_limit_(duration_limit), memory_limit_(memory_limit)
69 : default_duration_limit_(default_duration_limit)
70 , default_memory_limit_(default_memory_limit)
71 , status_period_(status_period)
79 std::pair<topics_t::iterator, bool> ret;
80 ret =
topics_.insert(topics_t::value_type(topic, ops));
90 : msg(_msg), connection_header(_connection_header), time(_time)
105 boost::mutex::scoped_lock l(
lock);
108 status.traffic =
size_;
109 status.delivered_msgs =
queue_.size();
110 status.window_start =
queue_.front().time;
111 status.window_stop =
queue_.back().time;
116 boost::mutex::scoped_lock l(
lock);
139 ROS_WARN(
"Time has gone backwards. Clearing buffer for this topic.");
162 dt = time -
queue_.front().time;
169 boost::mutex::scoped_try_lock l(
lock);
180 boost::mutex::scoped_lock l(
lock);
185 int32_t size = _out.
msg->size();
205 range_t::first_type begin =
queue_.begin();
206 range_t::second_type end =
queue_.end();
211 while (begin != end && (*begin).time < start)
216 while (end != begin && (*(end - 1)).time > stop)
235 buffer.second->sub_->shutdown();
241 if (options.
duration_limit_ == SnapshotterTopicOptions::INHERIT_DURATION_LIMIT)
243 if (options.
memory_limit_ == SnapshotterTopicOptions::INHERIT_MEMORY_LIMIT)
249 size_t ind = file.rfind(
".bag");
251 if (ind != string::npos && ind == file.size() - 4)
262 std::stringstream msg;
263 const boost::posix_time::ptime now = boost::posix_time::second_clock::local_time();
264 boost::posix_time::time_facet*
const f =
new boost::posix_time::time_facet(
"%Y-%m-%d-%H-%M-%S");
265 msg.imbue(std::locale(msg.getloc(), f));
275 boost::shared_lock<boost::upgrade_mutex> lock(
state_lock_);
289 ROS_INFO(
"Subscribing to %s", topic.c_str());
295 ops.
md5sum = ros::message_traits::md5sum<topic_tools::ShapeShifter>();
296 ops.
datatype = ros::message_traits::datatype<topic_tools::ShapeShifter>();
298 boost::make_shared<ros::SubscriptionCallbackHelperT<const ros::MessageEvent<topic_tools::ShapeShifter const>&> >(
301 queue->setSubscriber(sub);
305 rosbag_snapshot_msgs::TriggerSnapshot::Request& req,
306 rosbag_snapshot_msgs::TriggerSnapshot::Response& res)
309 boost::mutex::scoped_lock l(message_queue.
lock);
314 if (!bag.
isOpen() && range.second > range.first)
323 res.message = string(
"failed to open bag: ") + err.what();
326 ROS_INFO(
"Writing snapshot to %s", req.filename.c_str());
332 for (MessageQueue::range_t::first_type msg_it = range.first; msg_it != range.second; ++msg_it)
341 res.message = string(
"failed to write bag: ") + err.what();
347 rosbag_snapshot_msgs::TriggerSnapshot::Response& res)
352 res.message =
"invalid";
355 bool recording_prior;
357 boost::upgrade_lock<boost::upgrade_mutex> read_lock(
state_lock_);
362 res.message =
"Already writing";
365 boost::upgrade_to_unique_lock<boost::upgrade_mutex> write_lock(read_lock);
375 boost::unique_lock<boost::upgrade_mutex> write_lock(
state_lock_);
387 if (req.topics.size() && req.topics.at(0).size())
389 for (std::string& topic : req.topics)
398 ROS_WARN(
"Requested topic %s is invalid, skipping.", topic.c_str());
403 buffers_t::iterator found =
buffers_.find(topic);
407 ROS_WARN(
"Requested topic %s is not subscribed, skipping.", topic.c_str());
411 if (!
writeTopic(bag, message_queue, topic, req, res))
418 for (
const buffers_t::value_type& pair :
buffers_)
421 std::string
const& topic = pair.first;
422 if (!
writeTopic(bag, message_queue, topic, req, res))
431 res.message = res.NO_DATA_MESSAGE;
441 for (
const buffers_t::value_type& pair :
buffers_)
443 pair.second->clear();
457 ROS_INFO(
"Buffering resumed and old data cleared.");
462 boost::upgrade_lock<boost::upgrade_mutex> read_lock(
state_lock_);
466 res.message =
"cannot enable recording while writing.";
472 boost::upgrade_to_unique_lock<boost::upgrade_mutex> write_lock(read_lock);
477 boost::upgrade_to_unique_lock<boost::upgrade_mutex> write_lock(read_lock);
493 rosbag_snapshot_msgs::SnapshotStatus msg;
495 boost::shared_lock<boost::upgrade_mutex> lock(
state_lock_);
499 for (
const buffers_t::value_type& pair :
buffers_)
501 rosgraph_msgs::TopicStatistics status;
502 status.node_sub = node_id;
503 status.topic = pair.first;
504 pair.second->fillStatus(status);
505 msg.topics.push_back(status);
519 std::string topic = t.name;
526 std::pair<buffers_t::iterator, bool> res =
buffers_.insert(buffers_t::value_type(topic, queue));
527 ROS_ASSERT_MSG(res.second,
"failed to add %s to topics. Perhaps it is a duplicate?", topic.c_str());
544 for (SnapshotterOptions::topics_t::value_type& pair :
options_.
topics_)
550 std::pair<buffers_t::iterator, bool> res =
buffers_.insert(buffers_t::value_type(topic, queue));
551 ROS_ASSERT_MSG(res.second,
"failed to add %s to topics. Perhaps it is a duplicate?", topic.c_str());
584 ROS_ERROR(
"Service %s does not exist. Is snapshot running in this namespace?",
"trigger_snapshot");
587 rosbag_snapshot_msgs::TriggerSnapshotRequest req;
593 size_t ind = req.filename.rfind(
".bag");
594 if (ind != string::npos && ind == req.filename.size() - 4)
595 req.filename.erase(ind);
600 size_t ind = req.filename.rfind(
".bag");
601 if (ind == string::npos || ind != req.filename.size() - 4)
602 req.filename +=
".bag";
606 if (req.filename.empty())
608 boost::filesystem::path p(boost::filesystem::system_complete(req.filename));
609 req.filename = p.string();
611 rosbag_snapshot_msgs::TriggerSnapshotResponse res;
612 if (!client.
call(req, res))
629 ROS_ERROR(
"Service %s does not exist. Is snapshot running in this namespace?",
"enable_snapshot");
632 std_srvs::SetBoolRequest req;
634 std_srvs::SetBoolResponse res;
635 if (!client.
call(req, res))
void subscribe(std::string const &topic, boost::shared_ptr< MessageQueue > queue)
int32_t default_memory_limit_
ServiceClient serviceClient(const std::string &service_name, bool persistent=false, const M_string &header_values=M_string())
bool postfixFilename(std::string &file)
void topicCB(const ros::MessageEvent< topic_tools::ShapeShifter const > &msg_event, boost::shared_ptr< MessageQueue > queue)
boost::shared_ptr< ros::M_string > connection_header
void pollTopics(ros::TimerEvent const &e, rosbag_snapshot::SnapshotterOptions *options)
ros::Time getReceiptTime() const
#define ROS_WARN_THROTTLE(rate,...)
boost::shared_ptr< ros::Subscriber > sub_
void publishStatus(ros::TimerEvent const &e)
std::string timeAsStr()
Return current local datetime as a string such as 2018-05-22-14-28-51. Used to generate bag filenames...
void publish(const boost::shared_ptr< M > &message) const
bool writeTopic(rosbag::Bag &bag, MessageQueue &message_queue, std::string const &topic, rosbag_snapshot_msgs::TriggerSnapshot::Request &req, rosbag_snapshot_msgs::TriggerSnapshot::Response &res)
void open(std::string const &filename, uint32_t mode=bagmode::Read)
bool enableCB(std_srvs::SetBool::Request &req, std_srvs::SetBool::Response &res)
ros::ServiceServer enable_server_
Subscriber subscribe(const std::string &topic, uint32_t queue_size, void(T::*fp)(M), T *obj, const TransportHints &transport_hints=TransportHints())
SnapshotterTopicOptions(ros::Duration duration_limit=INHERIT_DURATION_LIMIT, int32_t memory_limit=INHERIT_MEMORY_LIMIT)
static const int32_t INHERIT_MEMORY_LIMIT
bool call(MReq &req, MRes &res)
void push(SnapshotMessage const &msg)
static const ros::Duration NO_DURATION_LIMIT
ROSCPP_DECL const std::string & getName()
ROSCPP_DECL bool getTopics(V_TopicInfo &topics)
ROSCPP_DECL std::string resolve(const std::string &name, bool remap=true)
const boost::shared_ptr< M_string > & getConnectionHeaderPtr() const
ServiceServer advertiseService(const std::string &service, bool(T::*srv_func)(MReq &, MRes &), T *obj)
ros::Duration status_period_
topic_tools::ShapeShifter::ConstPtr msg
void fixTopicOptions(SnapshotterTopicOptions &options)
std::vector< TopicInfo > V_TopicInfo
boost::upgrade_mutex state_lock_
bool preparePush(int32_t size, ros::Time const &time)
bool addTopic(std::string const &topic, ros::Duration duration_limit=SnapshotterTopicOptions::INHERIT_DURATION_LIMIT, int32_t memory_limit=SnapshotterTopicOptions::INHERIT_MEMORY_LIMIT)
SnapshotterOptions options_
SubscriptionCallbackHelperPtr helper
std::pair< queue_t::const_iterator, queue_t::const_iterator > range_t
void _push(SnapshotMessage const &msg)
ros::Duration duration_limit_
SnapshotterTopicOptions options_
std::vector< std::string > topics_
#define ROS_ASSERT_MSG(cond,...)
static const int QUEUE_SIZE
Timer createTimer(Rate r, Handler h, Obj o, bool oneshot=false, bool autostart=true) const
const std::string & getNamespace() const
ros::Timer poll_topic_timer_
static const ros::Duration INHERIT_DURATION_LIMIT
ros::Duration default_duration_limit_
Publisher advertise(const std::string &topic, uint32_t queue_size, bool latch=false)
virtual void spin(CallbackQueue *queue=0)
MessageQueue(SnapshotterTopicOptions const &options)
uint32_t getNumSubscribers() const
SnapshotterClientOptions()
bool triggerSnapshotCb(rosbag_snapshot_msgs::TriggerSnapshot::Request &req, rosbag_snapshot_msgs::TriggerSnapshot::Response &res)
ros::Duration duration() const
void fillStatus(rosgraph_msgs::TopicStatistics &status)
SnapshotterOptions(ros::Duration default_duration_limit=ros::Duration(30), int32_t default_memory_limit=-1, ros::Duration status_period=ros::Duration(1))
ros::ServiceServer trigger_snapshot_server_
SnapshotMessage(topic_tools::ShapeShifter::ConstPtr _msg, boost::shared_ptr< ros::M_string > _connection_header, ros::Time _time)
void setSubscriber(boost::shared_ptr< ros::Subscriber > sub)
int run(SnapshotterClientOptions const &opts)
static const int32_t NO_MEMORY_LIMIT
ros::Publisher status_pub_
void write(std::string const &topic, ros::MessageEvent< T > const &event)
range_t rangeFromTimes(ros::Time const &start, ros::Time const &end)
boost::shared_ptr< M > getMessage() const
Snapshotter(SnapshotterOptions const &options)