recorder.py
Go to the documentation of this file.
00001 # Software License Agreement (BSD License)
00002 #
00003 # Copyright (c) 2009, Willow Garage, Inc.
00004 # All rights reserved.
00005 #
00006 # Redistribution and use in source and binary forms, with or without
00007 # modification, are permitted provided that the following conditions
00008 # are met:
00009 #
00010 #  * Redistributions of source code must retain the above copyright
00011 #    notice, this list of conditions and the following disclaimer.
00012 #  * Redistributions in binary form must reproduce the above
00013 #    copyright notice, this list of conditions and the following
00014 #    disclaimer in the documentation and/or other materials provided
00015 #    with the distribution.
00016 #  * Neither the name of Willow Garage, Inc. nor the names of its
00017 #    contributors may be used to endorse or promote products derived
00018 #    from this software without specific prior written permission.
00019 #
00020 # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
00021 # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
00022 # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
00023 # FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
00024 # COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
00025 # INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
00026 # BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
00027 # LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
00028 # CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
00029 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
00030 # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
00031 # POSSIBILITY OF SUCH DAMAGE.
00032 
00033 """
00034 Recorder subscribes to ROS messages and writes them to a bag file.
00035 """
00036 
00037 import Queue
00038 import re
00039 import threading
00040 import time
00041 
00042 import rosbag
00043 import rosgraph
00044 import rospy
00045 
00046 import sys
00047 
00048 class Recorder(object):
00049     def __init__(self, filename, bag_lock=None, all=True, topics=[], regex=False, limit=0, master_check_interval=1.0):
00050         """
00051         Subscribe to ROS messages and record them to a bag file.
00052         
00053         @param filename: filename of bag to write to
00054         @type  filename: str
00055         @param all: all topics are to be recorded [default: True]
00056         @type  all: bool
00057         @param topics: topics (or regexes if regex is True) to record [default: empty list]
00058         @type  topics: list of str
00059         @param regex: topics should be considered as regular expressions [default: False]
00060         @type  regex: bool
00061         @param limit: record only this number of messages on each topic (if non-positive, then unlimited) [default: 0]
00062         @type  limit: int
00063         @param master_check_interval: period (in seconds) to check master for new topic publications [default: 1]
00064         @type  master_check_interval: float
00065         """
00066         self._all                   = all
00067         self._topics                = topics
00068         self._regex                 = regex
00069         self._limit                 = limit
00070         self._master_check_interval = master_check_interval
00071 
00072         self._bag                = rosbag.Bag(filename, 'w')
00073         self._bag_lock           = bag_lock if bag_lock else threading.Lock()
00074         self._listeners          = []
00075         self._subscriber_helpers = {}
00076         self._limited_topics     = set()
00077         self._failed_topics      = set()       
00078         self._last_update        = time.time()
00079         self._write_queue        = Queue.Queue()
00080         self._paused             = False
00081         self._stop_condition     = threading.Condition()
00082         self._stop_flag          = False
00083 
00084         # Compile regular expressions
00085         if self._regex:
00086             self._regexes = [re.compile(t) for t in self._topics]
00087         else:
00088             self._regexes = None
00089 
00090         self._message_count = {}  # topic -> int (track number of messages recorded on each topic)
00091 
00092         self._master_check_thread = threading.Thread(target=self._run_master_check)
00093         self._write_thread        = threading.Thread(target=self._run_write)
00094 
00095     @property
00096     def bag(self): return self._bag
00097 
00098     def add_listener(self, listener):
00099         """
00100         Add a listener which gets called whenever a message is recorded.
00101         @param listener: function to call
00102         @type  listener: function taking (topic, message, time)
00103         """
00104         self._listeners.append(listener)
00105 
00106     def start(self):
00107         """
00108         Start subscribing and recording messages to bag.
00109         """
00110         self._master_check_thread.start()
00111         self._write_thread.start()
00112 
00113     @property
00114     def paused(self):        return self._paused
00115     def pause(self):         self._paused = True
00116     def unpause(self):       self._paused = False
00117     def toggle_paused(self): self._paused = not self._paused
00118 
00119     def stop(self):
00120         """
00121         Stop recording.
00122         """
00123         with self._stop_condition:
00124             self._stop_flag = True
00125             self._stop_condition.notify_all()
00126         
00127         self._write_queue.put(self)
00128 
00129     ## Implementation
00130 
00131     def _run_master_check(self):
00132         master = rosgraph.Master('rxbag_recorder')
00133 
00134         try:
00135             while not self._stop_flag:
00136                 # Check for new topics
00137                 for topic, datatype in master.getPublishedTopics(''):
00138                     # Check if: 
00139                     #    the topic is already subscribed to, or
00140                     #    we've failed to subscribe to it already, or
00141                     #    we've already reached the message limit, or
00142                     #    we don't want to subscribe 
00143                     if topic in self._subscriber_helpers or topic in self._failed_topics or topic in self._limited_topics or not self._should_subscribe_to(topic):
00144                         continue
00145 
00146                     try:
00147                         pytype = roslib.message.get_message_class(datatype)
00148                         
00149                         self._message_count[topic] = 0
00150 
00151                         self._subscriber_helpers[topic] = _SubscriberHelper(self, topic, pytype)
00152                     except Exception, ex:
00153                         print >> sys.stderr, 'Error subscribing to %s (ignoring): %s' % (topic, str(ex))
00154                         self._failed_topics.add(topic)
00155 
00156                 # Wait a while
00157                 self._stop_condition.acquire()
00158                 self._stop_condition.wait(self._master_check_interval)
00159 
00160         except Exception, ex:
00161             print >> sys.stderr, 'Error recording to bag: %s' % str(ex)
00162 
00163         # Unsubscribe from all topics
00164         for topic in list(self._subscriber_helpers.keys()):
00165             self._unsubscribe(topic)
00166         
00167         # Close the bag file so that the index gets written
00168         try:
00169             self._bag.close()
00170         except Exception, ex:
00171             print >> sys.stderr, 'Error closing bag [%s]: %s' % (self._bag.filename, str(ex))
00172 
00173     def _should_subscribe_to(self, topic):
00174         if self._all:
00175             return True
00176         
00177         if not self._regex:
00178             return topic in self._topics
00179 
00180         for regex in self._regexes:
00181             if regex.match(topic):
00182                 return True
00183             
00184         return False
00185     
00186     def _unsubscribe(self, topic):
00187         try:
00188             self._subscriber_helpers[topic].subscriber.unregister()
00189         except Exception:
00190             return
00191 
00192         del self._subscriber_helpers[topic]
00193 
00194     def _record(self, topic, m):
00195         if self._paused:
00196             return
00197 
00198         if self._limit and self._message_count[topic] >= self._limit:
00199             self._limited_topics.add(topic)
00200             self._unsubscribe(topic)
00201             return
00202 
00203         self._write_queue.put((topic, m, rospy.get_rostime()))
00204         self._message_count[topic] += 1
00205 
00206     def _run_write(self):
00207         try:
00208             while not self._stop_flag:
00209                 # Wait for a message
00210                 item = self._write_queue.get()
00211                 
00212                 if item == self:
00213                     continue
00214                 
00215                 topic, m, t = item
00216                 
00217                 # Write to the bag
00218                 with self._bag_lock:
00219                     self._bag.write(topic, m, t)
00220 
00221                 # Notify listeners that a message has been recorded
00222                 for listener in self._listeners:
00223                     listener(topic, m, t)
00224 
00225         except Exception, ex:
00226             print >> sys.stderr, 'Error write to bag: %s' % str(ex)
00227 
00228 class _SubscriberHelper(object):
00229     def __init__(self, recorder, topic, pytype):
00230         self.recorder = recorder
00231         self.topic    = topic
00232 
00233         self.subscriber = rospy.Subscriber(self.topic, pytype, self.callback)
00234 
00235     def callback(self, m):
00236         self.recorder._record(self.topic, m)


rxbag
Author(s): Tim Field
autogenerated on Mon Oct 6 2014 07:26:07