Go to the documentation of this file.00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033 """
00034 Recorder subscribes to ROS messages and writes them to a bag file.
00035 """
00036
00037 import Queue
00038 import re
00039 import threading
00040 import time
00041
00042 import rosbag
00043 import rosgraph
00044 import rospy
00045
00046 import sys
00047
00048 class Recorder(object):
00049 def __init__(self, filename, bag_lock=None, all=True, topics=[], regex=False, limit=0, master_check_interval=1.0):
00050 """
00051 Subscribe to ROS messages and record them to a bag file.
00052
00053 @param filename: filename of bag to write to
00054 @type filename: str
00055 @param all: all topics are to be recorded [default: True]
00056 @type all: bool
00057 @param topics: topics (or regexes if regex is True) to record [default: empty list]
00058 @type topics: list of str
00059 @param regex: topics should be considered as regular expressions [default: False]
00060 @type regex: bool
00061 @param limit: record only this number of messages on each topic (if non-positive, then unlimited) [default: 0]
00062 @type limit: int
00063 @param master_check_interval: period (in seconds) to check master for new topic publications [default: 1]
00064 @type master_check_interval: float
00065 """
00066 self._all = all
00067 self._topics = topics
00068 self._regex = regex
00069 self._limit = limit
00070 self._master_check_interval = master_check_interval
00071
00072 self._bag = rosbag.Bag(filename, 'w')
00073 self._bag_lock = bag_lock if bag_lock else threading.Lock()
00074 self._listeners = []
00075 self._subscriber_helpers = {}
00076 self._limited_topics = set()
00077 self._failed_topics = set()
00078 self._last_update = time.time()
00079 self._write_queue = Queue.Queue()
00080 self._paused = False
00081 self._stop_condition = threading.Condition()
00082 self._stop_flag = False
00083
00084
00085 if self._regex:
00086 self._regexes = [re.compile(t) for t in self._topics]
00087 else:
00088 self._regexes = None
00089
00090 self._message_count = {}
00091
00092 self._master_check_thread = threading.Thread(target=self._run_master_check)
00093 self._write_thread = threading.Thread(target=self._run_write)
00094
00095 @property
00096 def bag(self): return self._bag
00097
00098 def add_listener(self, listener):
00099 """
00100 Add a listener which gets called whenever a message is recorded.
00101 @param listener: function to call
00102 @type listener: function taking (topic, message, time)
00103 """
00104 self._listeners.append(listener)
00105
00106 def start(self):
00107 """
00108 Start subscribing and recording messages to bag.
00109 """
00110 self._master_check_thread.start()
00111 self._write_thread.start()
00112
00113 @property
00114 def paused(self): return self._paused
00115 def pause(self): self._paused = True
00116 def unpause(self): self._paused = False
00117 def toggle_paused(self): self._paused = not self._paused
00118
00119 def stop(self):
00120 """
00121 Stop recording.
00122 """
00123 with self._stop_condition:
00124 self._stop_flag = True
00125 self._stop_condition.notify_all()
00126
00127 self._write_queue.put(self)
00128
00129
00130
00131 def _run_master_check(self):
00132 master = rosgraph.Master('rxbag_recorder')
00133
00134 try:
00135 while not self._stop_flag:
00136
00137 for topic, datatype in master.getPublishedTopics(''):
00138
00139
00140
00141
00142
00143 if topic in self._subscriber_helpers or topic in self._failed_topics or topic in self._limited_topics or not self._should_subscribe_to(topic):
00144 continue
00145
00146 try:
00147 pytype = roslib.message.get_message_class(datatype)
00148
00149 self._message_count[topic] = 0
00150
00151 self._subscriber_helpers[topic] = _SubscriberHelper(self, topic, pytype)
00152 except Exception, ex:
00153 print >> sys.stderr, 'Error subscribing to %s (ignoring): %s' % (topic, str(ex))
00154 self._failed_topics.add(topic)
00155
00156
00157 self._stop_condition.acquire()
00158 self._stop_condition.wait(self._master_check_interval)
00159
00160 except Exception, ex:
00161 print >> sys.stderr, 'Error recording to bag: %s' % str(ex)
00162
00163
00164 for topic in list(self._subscriber_helpers.keys()):
00165 self._unsubscribe(topic)
00166
00167
00168 try:
00169 self._bag.close()
00170 except Exception, ex:
00171 print >> sys.stderr, 'Error closing bag [%s]: %s' % (self._bag.filename, str(ex))
00172
00173 def _should_subscribe_to(self, topic):
00174 if self._all:
00175 return True
00176
00177 if not self._regex:
00178 return topic in self._topics
00179
00180 for regex in self._regexes:
00181 if regex.match(topic):
00182 return True
00183
00184 return False
00185
00186 def _unsubscribe(self, topic):
00187 try:
00188 self._subscriber_helpers[topic].subscriber.unregister()
00189 except Exception:
00190 return
00191
00192 del self._subscriber_helpers[topic]
00193
00194 def _record(self, topic, m):
00195 if self._paused:
00196 return
00197
00198 if self._limit and self._message_count[topic] >= self._limit:
00199 self._limited_topics.add(topic)
00200 self._unsubscribe(topic)
00201 return
00202
00203 self._write_queue.put((topic, m, rospy.get_rostime()))
00204 self._message_count[topic] += 1
00205
00206 def _run_write(self):
00207 try:
00208 while not self._stop_flag:
00209
00210 item = self._write_queue.get()
00211
00212 if item == self:
00213 continue
00214
00215 topic, m, t = item
00216
00217
00218 with self._bag_lock:
00219 self._bag.write(topic, m, t)
00220
00221
00222 for listener in self._listeners:
00223 listener(topic, m, t)
00224
00225 except Exception, ex:
00226 print >> sys.stderr, 'Error write to bag: %s' % str(ex)
00227
00228 class _SubscriberHelper(object):
00229 def __init__(self, recorder, topic, pytype):
00230 self.recorder = recorder
00231 self.topic = topic
00232
00233 self.subscriber = rospy.Subscriber(self.topic, pytype, self.callback)
00234
00235 def callback(self, m):
00236 self.recorder._record(self.topic, m)