$search
00001 # Software License Agreement (BSD License) 00002 # 00003 # Copyright (c) 2009, Willow Garage, Inc. 00004 # All rights reserved. 00005 # 00006 # Redistribution and use in source and binary forms, with or without 00007 # modification, are permitted provided that the following conditions 00008 # are met: 00009 # 00010 # * Redistributions of source code must retain the above copyright 00011 # notice, this list of conditions and the following disclaimer. 00012 # * Redistributions in binary form must reproduce the above 00013 # copyright notice, this list of conditions and the following 00014 # disclaimer in the documentation and/or other materials provided 00015 # with the distribution. 00016 # * Neither the name of Willow Garage, Inc. nor the names of its 00017 # contributors may be used to endorse or promote products derived 00018 # from this software without specific prior written permission. 00019 # 00020 # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 00021 # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 00022 # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS 00023 # FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE 00024 # COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, 00025 # INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, 00026 # BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; 00027 # LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER 00028 # CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 00029 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN 00030 # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 00031 # POSSIBILITY OF SUCH DAMAGE. 00032 00033 """ 00034 Recorder subscribes to ROS messages and writes them to a bag file. 00035 """ 00036 00037 from __future__ import with_statement 00038 00039 PKG = 'rxbag' 00040 import roslib; roslib.load_manifest(PKG) 00041 00042 import Queue 00043 import re 00044 import threading 00045 import time 00046 00047 import rosbag 00048 import rosgraph.masterapi 00049 import rospy 00050 00051 import sys 00052 00053 class Recorder(object): 00054 def __init__(self, filename, bag_lock=None, all=True, topics=[], regex=False, limit=0, master_check_interval=1.0): 00055 """ 00056 Subscribe to ROS messages and record them to a bag file. 00057 00058 @param filename: filename of bag to write to 00059 @type filename: str 00060 @param all: all topics are to be recorded [default: True] 00061 @type all: bool 00062 @param topics: topics (or regexes if regex is True) to record [default: empty list] 00063 @type topics: list of str 00064 @param regex: topics should be considered as regular expressions [default: False] 00065 @type regex: bool 00066 @param limit: record only this number of messages on each topic (if non-positive, then unlimited) [default: 0] 00067 @type limit: int 00068 @param master_check_interval: period (in seconds) to check master for new topic publications [default: 1] 00069 @type master_check_interval: float 00070 """ 00071 self._all = all 00072 self._topics = topics 00073 self._regex = regex 00074 self._limit = limit 00075 self._master_check_interval = master_check_interval 00076 00077 self._bag = rosbag.Bag(filename, 'w') 00078 self._bag_lock = bag_lock if bag_lock else threading.Lock() 00079 self._listeners = [] 00080 self._subscriber_helpers = {} 00081 self._limited_topics = set() 00082 self._failed_topics = set() 00083 self._last_update = time.time() 00084 self._write_queue = Queue.Queue() 00085 self._paused = False 00086 self._stop_condition = threading.Condition() 00087 self._stop_flag = False 00088 00089 # Compile regular expressions 00090 if self._regex: 00091 self._regexes = [re.compile(t) for t in self._topics] 00092 else: 00093 self._regexes = None 00094 00095 self._message_count = {} # topic -> int (track number of messages recorded on each topic) 00096 00097 self._master_check_thread = threading.Thread(target=self._run_master_check) 00098 self._write_thread = threading.Thread(target=self._run_write) 00099 00100 @property 00101 def bag(self): return self._bag 00102 00103 def add_listener(self, listener): 00104 """ 00105 Add a listener which gets called whenever a message is recorded. 00106 @param listener: function to call 00107 @type listener: function taking (topic, message, time) 00108 """ 00109 self._listeners.append(listener) 00110 00111 def start(self): 00112 """ 00113 Start subscribing and recording messages to bag. 00114 """ 00115 self._master_check_thread.start() 00116 self._write_thread.start() 00117 00118 @property 00119 def paused(self): return self._paused 00120 def pause(self): self._paused = True 00121 def unpause(self): self._paused = False 00122 def toggle_paused(self): self._paused = not self._paused 00123 00124 def stop(self): 00125 """ 00126 Stop recording. 00127 """ 00128 with self._stop_condition: 00129 self._stop_flag = True 00130 self._stop_condition.notify_all() 00131 00132 self._write_queue.put(self) 00133 00134 ## Implementation 00135 00136 def _run_master_check(self): 00137 master = rosgraph.masterapi.Master('rxbag.recorder') 00138 00139 try: 00140 while not self._stop_flag: 00141 # Check for new topics 00142 for topic, datatype in master.getPublishedTopics(''): 00143 # Check if: 00144 # the topic is already subscribed to, or 00145 # we've failed to subscribe to it already, or 00146 # we've already reached the message limit, or 00147 # we don't want to subscribe 00148 if topic in self._subscriber_helpers or topic in self._failed_topics or topic in self._limited_topics or not self._should_subscribe_to(topic): 00149 continue 00150 00151 try: 00152 pytype = roslib.message.get_message_class(datatype) 00153 00154 self._message_count[topic] = 0 00155 00156 self._subscriber_helpers[topic] = _SubscriberHelper(self, topic, pytype) 00157 except Exception, ex: 00158 print >> sys.stderr, 'Error subscribing to %s (ignoring): %s' % (topic, str(ex)) 00159 self._failed_topics.add(topic) 00160 00161 # Wait a while 00162 self._stop_condition.acquire() 00163 self._stop_condition.wait(self._master_check_interval) 00164 00165 except Exception, ex: 00166 print >> sys.stderr, 'Error recording to bag: %s' % str(ex) 00167 00168 # Unsubscribe from all topics 00169 for topic in list(self._subscriber_helpers.keys()): 00170 self._unsubscribe(topic) 00171 00172 # Close the bag file so that the index gets written 00173 try: 00174 self._bag.close() 00175 except Exception, ex: 00176 print >> sys.stderr, 'Error closing bag [%s]: %s' % (self._bag.filename, str(ex)) 00177 00178 def _should_subscribe_to(self, topic): 00179 if self._all: 00180 return True 00181 00182 if not self._regex: 00183 return topic in self._topics 00184 00185 for regex in self._regexes: 00186 if regex.match(topic): 00187 return True 00188 00189 return False 00190 00191 def _unsubscribe(self, topic): 00192 try: 00193 self._subscriber_helpers[topic].subscriber.unregister() 00194 except Exception: 00195 return 00196 00197 del self._subscriber_helpers[topic] 00198 00199 def _record(self, topic, m): 00200 if self._paused: 00201 return 00202 00203 if self._limit and self._message_count[topic] >= self._limit: 00204 self._limited_topics.add(topic) 00205 self._unsubscribe(topic) 00206 return 00207 00208 self._write_queue.put((topic, m, rospy.get_rostime())) 00209 self._message_count[topic] += 1 00210 00211 def _run_write(self): 00212 try: 00213 while not self._stop_flag: 00214 # Wait for a message 00215 item = self._write_queue.get() 00216 00217 if item == self: 00218 continue 00219 00220 topic, m, t = item 00221 00222 # Write to the bag 00223 with self._bag_lock: 00224 self._bag.write(topic, m, t) 00225 00226 # Notify listeners that a message has been recorded 00227 for listener in self._listeners: 00228 listener(topic, m, t) 00229 00230 except Exception, ex: 00231 print >> sys.stderr, 'Error write to bag: %s' % str(ex) 00232 00233 class _SubscriberHelper(object): 00234 def __init__(self, recorder, topic, pytype): 00235 self.recorder = recorder 00236 self.topic = topic 00237 00238 self.subscriber = rospy.Subscriber(self.topic, pytype, self.callback) 00239 00240 def callback(self, m): 00241 self.recorder._record(self.topic, m)