recorder.py
Go to the documentation of this file.
00001 # Software License Agreement (BSD License)
00002 #
00003 # Copyright (c) 2012, Willow Garage, Inc.
00004 # All rights reserved.
00005 #
00006 # Redistribution and use in source and binary forms, with or without
00007 # modification, are permitted provided that the following conditions
00008 # are met:
00009 #
00010 #  * Redistributions of source code must retain the above copyright
00011 #    notice, this list of conditions and the following disclaimer.
00012 #  * Redistributions in binary form must reproduce the above
00013 #    copyright notice, this list of conditions and the following
00014 #    disclaimer in the documentation and/or other materials provided
00015 #    with the distribution.
00016 #  * Neither the name of Willow Garage, Inc. nor the names of its
00017 #    contributors may be used to endorse or promote products derived
00018 #    from this software without specific prior written permission.
00019 #
00020 # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
00021 # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
00022 # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
00023 # FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
00024 # COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
00025 # INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
00026 # BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
00027 # LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
00028 # CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
00029 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
00030 # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
00031 # POSSIBILITY OF SUCH DAMAGE.
00032 
00033 """
00034 Recorder subscribes to ROS messages and writes them to a bag file.
00035 """
00036 
00037 import Queue
00038 import re
00039 import threading
00040 import time
00041 
00042 import rosbag
00043 import rosgraph
00044 import roslib
00045 import rospy
00046 
00047 import sys
00048 
00049 
00050 class Recorder(object):
00051     def __init__(self, filename, bag_lock=None, all=True, topics=[], regex=False, limit=0, master_check_interval=1.0):
00052         """
00053         Subscribe to ROS messages and record them to a bag file.
00054 
00055         @param filename: filename of bag to write to
00056         @type  filename: str
00057         @param all: all topics are to be recorded [default: True]
00058         @type  all: bool
00059         @param topics: topics (or regexes if regex is True) to record [default: empty list]
00060         @type  topics: list of str
00061         @param regex: topics should be considered as regular expressions [default: False]
00062         @type  regex: bool
00063         @param limit: record only this number of messages on each topic (if non-positive, then unlimited) [default: 0]
00064         @type  limit: int
00065         @param master_check_interval: period (in seconds) to check master for new topic publications [default: 1]
00066         @type  master_check_interval: float
00067         """
00068         self._all = all
00069         self._topics = topics
00070         self._regex = regex
00071         self._limit = limit
00072         self._master_check_interval = master_check_interval
00073 
00074         self._bag = rosbag.Bag(filename, 'w')
00075         self._bag_lock = bag_lock if bag_lock else threading.Lock()
00076         self._listeners = []
00077         self._subscriber_helpers = {}
00078         self._limited_topics = set()
00079         self._failed_topics = set()
00080         self._last_update = time.time()
00081         self._write_queue = Queue.Queue()
00082         self._paused = False
00083         self._stop_condition = threading.Condition()
00084         self._stop_flag = False
00085 
00086         # Compile regular expressions
00087         if self._regex:
00088             self._regexes = [re.compile(t) for t in self._topics]
00089         else:
00090             self._regexes = None
00091 
00092         self._message_count = {}  # topic -> int (track number of messages recorded on each topic)
00093 
00094         self._master_check_thread = threading.Thread(target=self._run_master_check)
00095         self._write_thread = threading.Thread(target=self._run_write)
00096 
00097     @property
00098     def bag(self):
00099         return self._bag
00100 
00101     def add_listener(self, listener):
00102         """
00103         Add a listener which gets called whenever a message is recorded.
00104         @param listener: function to call
00105         @type  listener: function taking (topic, message, time)
00106         """
00107         self._listeners.append(listener)
00108 
00109     def start(self):
00110         """
00111         Start subscribing and recording messages to bag.
00112         """
00113         self._master_check_thread.start()
00114         self._write_thread.start()
00115 
00116     @property
00117     def paused(self):
00118         return self._paused
00119 
00120     def pause(self):
00121         self._paused = True
00122 
00123     def unpause(self):
00124         self._paused = False
00125 
00126     def toggle_paused(self):
00127         self._paused = not self._paused
00128 
00129     def stop(self):
00130         """
00131         Stop recording.
00132         """
00133         with self._stop_condition:
00134             self._stop_flag = True
00135             self._stop_condition.notify_all()
00136 
00137         self._write_queue.put(self)
00138 
00139     ## Implementation
00140 
00141     def _run_master_check(self):
00142         master = rosgraph.Master('rqt_bag_recorder')
00143 
00144         try:
00145             while not self._stop_flag:
00146                 # Check for new topics
00147                 for topic, datatype in master.getPublishedTopics(''):
00148                     # Check if:
00149                     #    the topic is already subscribed to, or
00150                     #    we've failed to subscribe to it already, or
00151                     #    we've already reached the message limit, or
00152                     #    we don't want to subscribe
00153                     if topic in self._subscriber_helpers or topic in self._failed_topics or topic in self._limited_topics or not self._should_subscribe_to(topic):
00154                         continue
00155 
00156                     try:
00157                         pytype = roslib.message.get_message_class(datatype)
00158 
00159                         self._message_count[topic] = 0
00160 
00161                         self._subscriber_helpers[topic] = _SubscriberHelper(self, topic, pytype)
00162                     except Exception, ex:
00163                         print >> sys.stderr, 'Error subscribing to %s (ignoring): %s' % (topic, str(ex))
00164                         self._failed_topics.add(topic)
00165 
00166                 # Wait a while
00167                 self._stop_condition.acquire()
00168                 self._stop_condition.wait(self._master_check_interval)
00169 
00170         except Exception, ex:
00171             print >> sys.stderr, 'Error recording to bag: %s' % str(ex)
00172 
00173         # Unsubscribe from all topics
00174         for topic in list(self._subscriber_helpers.keys()):
00175             self._unsubscribe(topic)
00176 
00177         # Close the bag file so that the index gets written
00178         try:
00179             self._bag.close()
00180         except Exception, ex:
00181             print >> sys.stderr, 'Error closing bag [%s]: %s' % (self._bag.filename, str(ex))
00182 
00183     def _should_subscribe_to(self, topic):
00184         if self._all:
00185             return True
00186 
00187         if not self._regex:
00188             return topic in self._topics
00189 
00190         for regex in self._regexes:
00191             if regex.match(topic):
00192                 return True
00193 
00194         return False
00195 
00196     def _unsubscribe(self, topic):
00197         try:
00198             self._subscriber_helpers[topic].subscriber.unregister()
00199         except Exception:
00200             return
00201 
00202         del self._subscriber_helpers[topic]
00203 
00204     def _record(self, topic, m):
00205         if self._paused:
00206             return
00207 
00208         if self._limit and self._message_count[topic] >= self._limit:
00209             self._limited_topics.add(topic)
00210             self._unsubscribe(topic)
00211             return
00212 
00213         self._write_queue.put((topic, m, rospy.get_rostime()))
00214         self._message_count[topic] += 1
00215 
00216     def _run_write(self):
00217         try:
00218             while not self._stop_flag:
00219                 # Wait for a message
00220                 item = self._write_queue.get()
00221 
00222                 if item == self:
00223                     continue
00224 
00225                 topic, m, t = item
00226 
00227                 # Write to the bag
00228                 with self._bag_lock:
00229                     self._bag.write(topic, m, t)
00230 
00231                 # Notify listeners that a message has been recorded
00232                 for listener in self._listeners:
00233                     listener(topic, m, t)
00234 
00235         except Exception, ex:
00236             print >> sys.stderr, 'Error write to bag: %s' % str(ex)
00237 
00238 
00239 class _SubscriberHelper(object):
00240     def __init__(self, recorder, topic, pytype):
00241         self.recorder = recorder
00242         self.topic = topic
00243 
00244         self.subscriber = rospy.Subscriber(self.topic, pytype, self.callback)
00245 
00246     def callback(self, m):
00247         self.recorder._record(self.topic, m)


rqt_bag
Author(s): Aaron Blasdel
autogenerated on Fri Jan 3 2014 11:55:06