00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033 """
00034 Recorder subscribes to ROS messages and writes them to a bag file.
00035 """
00036
00037 from __future__ import with_statement
00038
00039 PKG = 'rxbag'
00040 import roslib; roslib.load_manifest(PKG)
00041
00042 import Queue
00043 import re
00044 import threading
00045 import time
00046
00047 import rosbag
00048 import rosgraph.masterapi
00049 import rospy
00050
00051 import sys
00052
00053 class Recorder(object):
00054 def __init__(self, filename, bag_lock=None, all=True, topics=[], regex=False, limit=0, master_check_interval=1.0):
00055 """
00056 Subscribe to ROS messages and record them to a bag file.
00057
00058 @param filename: filename of bag to write to
00059 @type filename: str
00060 @param all: all topics are to be recorded [default: True]
00061 @type all: bool
00062 @param topics: topics (or regexes if regex is True) to record [default: empty list]
00063 @type topics: list of str
00064 @param regex: topics should be considered as regular expressions [default: False]
00065 @type regex: bool
00066 @param limit: record only this number of messages on each topic (if non-positive, then unlimited) [default: 0]
00067 @type limit: int
00068 @param master_check_interval: period (in seconds) to check master for new topic publications [default: 1]
00069 @type master_check_interval: float
00070 """
00071 self._all = all
00072 self._topics = topics
00073 self._regex = regex
00074 self._limit = limit
00075 self._master_check_interval = master_check_interval
00076
00077 self._bag = rosbag.Bag(filename, 'w')
00078 self._bag_lock = bag_lock if bag_lock else threading.Lock()
00079 self._listeners = []
00080 self._subscriber_helpers = {}
00081 self._limited_topics = set()
00082 self._failed_topics = set()
00083 self._last_update = time.time()
00084 self._write_queue = Queue.Queue()
00085 self._paused = False
00086 self._stop_condition = threading.Condition()
00087 self._stop_flag = False
00088
00089
00090 if self._regex:
00091 self._regexes = [re.compile(t) for t in self._topics]
00092 else:
00093 self._regexes = None
00094
00095 self._message_count = {}
00096
00097 self._master_check_thread = threading.Thread(target=self._run_master_check)
00098 self._write_thread = threading.Thread(target=self._run_write)
00099
00100 @property
00101 def bag(self): return self._bag
00102
00103 def add_listener(self, listener):
00104 """
00105 Add a listener which gets called whenever a message is recorded.
00106 @param listener: function to call
00107 @type listener: function taking (topic, message, time)
00108 """
00109 self._listeners.append(listener)
00110
00111 def start(self):
00112 """
00113 Start subscribing and recording messages to bag.
00114 """
00115 self._master_check_thread.start()
00116 self._write_thread.start()
00117
00118 @property
00119 def paused(self): return self._paused
00120 def pause(self): self._paused = True
00121 def unpause(self): self._paused = False
00122 def toggle_paused(self): self._paused = not self._paused
00123
00124 def stop(self):
00125 """
00126 Stop recording.
00127 """
00128 with self._stop_condition:
00129 self._stop_flag = True
00130 self._stop_condition.notify_all()
00131
00132 self._write_queue.put(self)
00133
00134
00135
00136 def _run_master_check(self):
00137 master = rosgraph.masterapi.Master('rxbag.recorder')
00138
00139 try:
00140 while not self._stop_flag:
00141
00142 for topic, datatype in master.getPublishedTopics(''):
00143
00144
00145
00146
00147
00148 if topic in self._subscriber_helpers or topic in self._failed_topics or topic in self._limited_topics or not self._should_subscribe_to(topic):
00149 continue
00150
00151 try:
00152 pytype = roslib.message.get_message_class(datatype)
00153
00154 self._message_count[topic] = 0
00155
00156 self._subscriber_helpers[topic] = _SubscriberHelper(self, topic, pytype)
00157 except Exception, ex:
00158 print >> sys.stderr, 'Error subscribing to %s (ignoring): %s' % (topic, str(ex))
00159 self._failed_topics.add(topic)
00160
00161
00162 self._stop_condition.acquire()
00163 self._stop_condition.wait(self._master_check_interval)
00164
00165 except Exception, ex:
00166 print >> sys.stderr, 'Error recording to bag: %s' % str(ex)
00167
00168
00169 for topic in list(self._subscriber_helpers.keys()):
00170 self._unsubscribe(topic)
00171
00172
00173 try:
00174 self._bag.close()
00175 except Exception, ex:
00176 print >> sys.stderr, 'Error closing bag [%s]: %s' % (self._bag.filename, str(ex))
00177
00178 def _should_subscribe_to(self, topic):
00179 if self._all:
00180 return True
00181
00182 if not self._regex:
00183 return topic in self._topics
00184
00185 for regex in self._regexes:
00186 if regex.match(topic):
00187 return True
00188
00189 return False
00190
00191 def _unsubscribe(self, topic):
00192 try:
00193 self._subscriber_helpers[topic].subscriber.unregister()
00194 except Exception:
00195 return
00196
00197 del self._subscriber_helpers[topic]
00198
00199 def _record(self, topic, m):
00200 if self._paused:
00201 return
00202
00203 if self._limit and self._message_count[topic] >= self._limit:
00204 self._limited_topics.add(topic)
00205 self._unsubscribe(topic)
00206 return
00207
00208 self._write_queue.put((topic, m, rospy.get_rostime()))
00209 self._message_count[topic] += 1
00210
00211 def _run_write(self):
00212 try:
00213 while not self._stop_flag:
00214
00215 item = self._write_queue.get()
00216
00217 if item == self:
00218 continue
00219
00220 topic, m, t = item
00221
00222
00223 with self._bag_lock:
00224 self._bag.write(topic, m, t)
00225
00226
00227 for listener in self._listeners:
00228 listener(topic, m, t)
00229
00230 except Exception, ex:
00231 print >> sys.stderr, 'Error write to bag: %s' % str(ex)
00232
00233 class _SubscriberHelper(object):
00234 def __init__(self, recorder, topic, pytype):
00235 self.recorder = recorder
00236 self.topic = topic
00237
00238 self.subscriber = rospy.Subscriber(self.topic, pytype, self.callback)
00239
00240 def callback(self, m):
00241 self.recorder._record(self.topic, m)