recorder.py
Go to the documentation of this file.
00001 # Software License Agreement (BSD License)
00002 #
00003 # Copyright (c) 2012, Willow Garage, Inc.
00004 # All rights reserved.
00005 #
00006 # Redistribution and use in source and binary forms, with or without
00007 # modification, are permitted provided that the following conditions
00008 # are met:
00009 #
00010 #  * Redistributions of source code must retain the above copyright
00011 #    notice, this list of conditions and the following disclaimer.
00012 #  * Redistributions in binary form must reproduce the above
00013 #    copyright notice, this list of conditions and the following
00014 #    disclaimer in the documentation and/or other materials provided
00015 #    with the distribution.
00016 #  * Neither the name of Willow Garage, Inc. nor the names of its
00017 #    contributors may be used to endorse or promote products derived
00018 #    from this software without specific prior written permission.
00019 #
00020 # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
00021 # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
00022 # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
00023 # FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
00024 # COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
00025 # INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
00026 # BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
00027 # LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
00028 # CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
00029 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
00030 # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
00031 # POSSIBILITY OF SUCH DAMAGE.
00032 
00033 """
00034 Recorder subscribes to ROS messages and writes them to a bag file.
00035 """
00036 
00037 from __future__ import print_function
00038 try:
00039     from queue import Queue
00040 except ImportError:
00041     from Queue import Queue
00042 import re
00043 import threading
00044 import time
00045 
00046 import rosbag
00047 import rosgraph
00048 import roslib
00049 import rospy
00050 
00051 import sys
00052 
00053 
00054 class Recorder(object):
00055 
00056     def __init__(self, filename, bag_lock=None, all=True, topics=[], regex=False, limit=0, master_check_interval=1.0):
00057         """
00058         Subscribe to ROS messages and record them to a bag file.
00059 
00060         @param filename: filename of bag to write to
00061         @type  filename: str
00062         @param all: all topics are to be recorded [default: True]
00063         @type  all: bool
00064         @param topics: topics (or regexes if regex is True) to record [default: empty list]
00065         @type  topics: list of str
00066         @param regex: topics should be considered as regular expressions [default: False]
00067         @type  regex: bool
00068         @param limit: record only this number of messages on each topic (if non-positive, then unlimited) [default: 0]
00069         @type  limit: int
00070         @param master_check_interval: period (in seconds) to check master for new topic publications [default: 1]
00071         @type  master_check_interval: float
00072         """
00073         self._all = all
00074         self._topics = topics
00075         self._regex = regex
00076         self._limit = limit
00077         self._master_check_interval = master_check_interval
00078 
00079         self._bag = rosbag.Bag(filename, 'w')
00080         self._bag_lock = bag_lock if bag_lock else threading.Lock()
00081         self._listeners = []
00082         self._subscriber_helpers = {}
00083         self._limited_topics = set()
00084         self._failed_topics = set()
00085         self._last_update = time.time()
00086         self._write_queue = Queue()
00087         self._paused = False
00088         self._stop_condition = threading.Condition()
00089         self._stop_flag = False
00090 
00091         # Compile regular expressions
00092         if self._regex:
00093             self._regexes = [re.compile(t) for t in self._topics]
00094         else:
00095             self._regexes = None
00096 
00097         self._message_count = {}  # topic -> int (track number of messages recorded on each topic)
00098 
00099         self._master_check_thread = threading.Thread(target=self._run_master_check)
00100         self._write_thread = threading.Thread(target=self._run_write)
00101 
00102     @property
00103     def bag(self):
00104         return self._bag
00105 
00106     def add_listener(self, listener):
00107         """
00108         Add a listener which gets called whenever a message is recorded.
00109         @param listener: function to call
00110         @type  listener: function taking (topic, message, time)
00111         """
00112         self._listeners.append(listener)
00113 
00114     def start(self):
00115         """
00116         Start subscribing and recording messages to bag.
00117         """
00118         self._master_check_thread.start()
00119         self._write_thread.start()
00120 
00121     @property
00122     def paused(self):
00123         return self._paused
00124 
00125     def pause(self):
00126         self._paused = True
00127 
00128     def unpause(self):
00129         self._paused = False
00130 
00131     def toggle_paused(self):
00132         self._paused = not self._paused
00133 
00134     def stop(self):
00135         """
00136         Stop recording.
00137         """
00138         with self._stop_condition:
00139             self._stop_flag = True
00140             self._stop_condition.notify_all()
00141 
00142         self._write_queue.put(self)
00143 
00144     # Implementation
00145 
00146     def _run_master_check(self):
00147         master = rosgraph.Master('rqt_bag_recorder')
00148 
00149         try:
00150             while not self._stop_flag:
00151                 # Check for new topics
00152                 for topic, datatype in master.getPublishedTopics(''):
00153                     # Check if:
00154                     #    the topic is already subscribed to, or
00155                     #    we've failed to subscribe to it already, or
00156                     #    we've already reached the message limit, or
00157                     #    we don't want to subscribe
00158                     if topic in self._subscriber_helpers or topic in self._failed_topics or topic in self._limited_topics or not self._should_subscribe_to(topic):
00159                         continue
00160 
00161                     try:
00162                         pytype = roslib.message.get_message_class(datatype)
00163 
00164                         self._message_count[topic] = 0
00165 
00166                         self._subscriber_helpers[topic] = _SubscriberHelper(self, topic, pytype)
00167                     except Exception as ex:
00168                         print('Error subscribing to %s (ignoring): %s' %
00169                               (topic, str(ex)), file=sys.stderr)
00170                         self._failed_topics.add(topic)
00171 
00172                 # Wait a while
00173                 self._stop_condition.acquire()
00174                 self._stop_condition.wait(self._master_check_interval)
00175 
00176         except Exception as ex:
00177             print('Error recording to bag: %s' % str(ex), file=sys.stderr)
00178 
00179         # Unsubscribe from all topics
00180         for topic in list(self._subscriber_helpers.keys()):
00181             self._unsubscribe(topic)
00182 
00183         # Close the bag file so that the index gets written
00184         try:
00185             self._bag.close()
00186         except Exception as ex:
00187             print('Error closing bag [%s]: %s' % (self._bag.filename, str(ex)))
00188 
00189     def _should_subscribe_to(self, topic):
00190         if self._all:
00191             return True
00192 
00193         if not self._regex:
00194             return topic in self._topics
00195 
00196         for regex in self._regexes:
00197             if regex.match(topic):
00198                 return True
00199 
00200         return False
00201 
00202     def _unsubscribe(self, topic):
00203         try:
00204             self._subscriber_helpers[topic].subscriber.unregister()
00205         except Exception:
00206             return
00207 
00208         del self._subscriber_helpers[topic]
00209 
00210     def _record(self, topic, m):
00211         if self._paused:
00212             return
00213 
00214         if self._limit and self._message_count[topic] >= self._limit:
00215             self._limited_topics.add(topic)
00216             self._unsubscribe(topic)
00217             return
00218 
00219         self._write_queue.put((topic, m, rospy.get_rostime()))
00220         self._message_count[topic] += 1
00221 
00222     def _run_write(self):
00223         try:
00224             while not self._stop_flag:
00225                 # Wait for a message
00226                 item = self._write_queue.get()
00227 
00228                 if item == self:
00229                     continue
00230 
00231                 topic, m, t = item
00232 
00233                 # Write to the bag
00234                 with self._bag_lock:
00235                     self._bag.write(topic, m, t)
00236 
00237                 # Notify listeners that a message has been recorded
00238                 for listener in self._listeners:
00239                     listener(topic, m, t)
00240 
00241         except Exception as ex:
00242             print('Error write to bag: %s' % str(ex), file=sys.stderr)
00243 
00244 
00245 class _SubscriberHelper(object):
00246 
00247     def __init__(self, recorder, topic, pytype):
00248         self.recorder = recorder
00249         self.topic = topic
00250 
00251         self.subscriber = rospy.Subscriber(self.topic, pytype, self.callback)
00252 
00253     def callback(self, m):
00254         self.recorder._record(self.topic, m)


rqt_bag
Author(s): Aaron Blasdel, Tim Field
autogenerated on Thu Jun 6 2019 18:52:48