Go to the documentation of this file.00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033 """
00034 Recorder subscribes to ROS messages and writes them to a bag file.
00035 """
00036
00037 import Queue
00038 import re
00039 import threading
00040 import time
00041
00042 import rosbag
00043 import rosgraph
00044 import roslib
00045 import rospy
00046
00047 import sys
00048
00049
00050 class Recorder(object):
00051 def __init__(self, filename, bag_lock=None, all=True, topics=[], regex=False, limit=0, master_check_interval=1.0):
00052 """
00053 Subscribe to ROS messages and record them to a bag file.
00054
00055 @param filename: filename of bag to write to
00056 @type filename: str
00057 @param all: all topics are to be recorded [default: True]
00058 @type all: bool
00059 @param topics: topics (or regexes if regex is True) to record [default: empty list]
00060 @type topics: list of str
00061 @param regex: topics should be considered as regular expressions [default: False]
00062 @type regex: bool
00063 @param limit: record only this number of messages on each topic (if non-positive, then unlimited) [default: 0]
00064 @type limit: int
00065 @param master_check_interval: period (in seconds) to check master for new topic publications [default: 1]
00066 @type master_check_interval: float
00067 """
00068 self._all = all
00069 self._topics = topics
00070 self._regex = regex
00071 self._limit = limit
00072 self._master_check_interval = master_check_interval
00073
00074 self._bag = rosbag.Bag(filename, 'w')
00075 self._bag_lock = bag_lock if bag_lock else threading.Lock()
00076 self._listeners = []
00077 self._subscriber_helpers = {}
00078 self._limited_topics = set()
00079 self._failed_topics = set()
00080 self._last_update = time.time()
00081 self._write_queue = Queue.Queue()
00082 self._paused = False
00083 self._stop_condition = threading.Condition()
00084 self._stop_flag = False
00085
00086
00087 if self._regex:
00088 self._regexes = [re.compile(t) for t in self._topics]
00089 else:
00090 self._regexes = None
00091
00092 self._message_count = {}
00093
00094 self._master_check_thread = threading.Thread(target=self._run_master_check)
00095 self._write_thread = threading.Thread(target=self._run_write)
00096
00097 @property
00098 def bag(self):
00099 return self._bag
00100
00101 def add_listener(self, listener):
00102 """
00103 Add a listener which gets called whenever a message is recorded.
00104 @param listener: function to call
00105 @type listener: function taking (topic, message, time)
00106 """
00107 self._listeners.append(listener)
00108
00109 def start(self):
00110 """
00111 Start subscribing and recording messages to bag.
00112 """
00113 self._master_check_thread.start()
00114 self._write_thread.start()
00115
00116 @property
00117 def paused(self):
00118 return self._paused
00119
00120 def pause(self):
00121 self._paused = True
00122
00123 def unpause(self):
00124 self._paused = False
00125
00126 def toggle_paused(self):
00127 self._paused = not self._paused
00128
00129 def stop(self):
00130 """
00131 Stop recording.
00132 """
00133 with self._stop_condition:
00134 self._stop_flag = True
00135 self._stop_condition.notify_all()
00136
00137 self._write_queue.put(self)
00138
00139
00140
00141 def _run_master_check(self):
00142 master = rosgraph.Master('rqt_bag_recorder')
00143
00144 try:
00145 while not self._stop_flag:
00146
00147 for topic, datatype in master.getPublishedTopics(''):
00148
00149
00150
00151
00152
00153 if topic in self._subscriber_helpers or topic in self._failed_topics or topic in self._limited_topics or not self._should_subscribe_to(topic):
00154 continue
00155
00156 try:
00157 pytype = roslib.message.get_message_class(datatype)
00158
00159 self._message_count[topic] = 0
00160
00161 self._subscriber_helpers[topic] = _SubscriberHelper(self, topic, pytype)
00162 except Exception, ex:
00163 print >> sys.stderr, 'Error subscribing to %s (ignoring): %s' % (topic, str(ex))
00164 self._failed_topics.add(topic)
00165
00166
00167 self._stop_condition.acquire()
00168 self._stop_condition.wait(self._master_check_interval)
00169
00170 except Exception, ex:
00171 print >> sys.stderr, 'Error recording to bag: %s' % str(ex)
00172
00173
00174 for topic in list(self._subscriber_helpers.keys()):
00175 self._unsubscribe(topic)
00176
00177
00178 try:
00179 self._bag.close()
00180 except Exception, ex:
00181 print >> sys.stderr, 'Error closing bag [%s]: %s' % (self._bag.filename, str(ex))
00182
00183 def _should_subscribe_to(self, topic):
00184 if self._all:
00185 return True
00186
00187 if not self._regex:
00188 return topic in self._topics
00189
00190 for regex in self._regexes:
00191 if regex.match(topic):
00192 return True
00193
00194 return False
00195
00196 def _unsubscribe(self, topic):
00197 try:
00198 self._subscriber_helpers[topic].subscriber.unregister()
00199 except Exception:
00200 return
00201
00202 del self._subscriber_helpers[topic]
00203
00204 def _record(self, topic, m):
00205 if self._paused:
00206 return
00207
00208 if self._limit and self._message_count[topic] >= self._limit:
00209 self._limited_topics.add(topic)
00210 self._unsubscribe(topic)
00211 return
00212
00213 self._write_queue.put((topic, m, rospy.get_rostime()))
00214 self._message_count[topic] += 1
00215
00216 def _run_write(self):
00217 try:
00218 while not self._stop_flag:
00219
00220 item = self._write_queue.get()
00221
00222 if item == self:
00223 continue
00224
00225 topic, m, t = item
00226
00227
00228 with self._bag_lock:
00229 self._bag.write(topic, m, t)
00230
00231
00232 for listener in self._listeners:
00233 listener(topic, m, t)
00234
00235 except Exception, ex:
00236 print >> sys.stderr, 'Error write to bag: %s' % str(ex)
00237
00238
00239 class _SubscriberHelper(object):
00240 def __init__(self, recorder, topic, pytype):
00241 self.recorder = recorder
00242 self.topic = topic
00243
00244 self.subscriber = rospy.Subscriber(self.topic, pytype, self.callback)
00245
00246 def callback(self, m):
00247 self.recorder._record(self.topic, m)