Go to the documentation of this file.00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033 """
00034 Recorder subscribes to ROS messages and writes them to a bag file.
00035 """
00036
00037 from __future__ import print_function
00038 try:
00039 from queue import Queue
00040 except ImportError:
00041 from Queue import Queue
00042 import re
00043 import threading
00044 import time
00045
00046 import rosbag
00047 import rosgraph
00048 import roslib
00049 import rospy
00050
00051 import sys
00052
00053
00054 class Recorder(object):
00055 def __init__(self, filename, bag_lock=None, all=True, topics=[], regex=False, limit=0, master_check_interval=1.0):
00056 """
00057 Subscribe to ROS messages and record them to a bag file.
00058
00059 @param filename: filename of bag to write to
00060 @type filename: str
00061 @param all: all topics are to be recorded [default: True]
00062 @type all: bool
00063 @param topics: topics (or regexes if regex is True) to record [default: empty list]
00064 @type topics: list of str
00065 @param regex: topics should be considered as regular expressions [default: False]
00066 @type regex: bool
00067 @param limit: record only this number of messages on each topic (if non-positive, then unlimited) [default: 0]
00068 @type limit: int
00069 @param master_check_interval: period (in seconds) to check master for new topic publications [default: 1]
00070 @type master_check_interval: float
00071 """
00072 self._all = all
00073 self._topics = topics
00074 self._regex = regex
00075 self._limit = limit
00076 self._master_check_interval = master_check_interval
00077
00078 self._bag = rosbag.Bag(filename, 'w')
00079 self._bag_lock = bag_lock if bag_lock else threading.Lock()
00080 self._listeners = []
00081 self._subscriber_helpers = {}
00082 self._limited_topics = set()
00083 self._failed_topics = set()
00084 self._last_update = time.time()
00085 self._write_queue = Queue()
00086 self._paused = False
00087 self._stop_condition = threading.Condition()
00088 self._stop_flag = False
00089
00090
00091 if self._regex:
00092 self._regexes = [re.compile(t) for t in self._topics]
00093 else:
00094 self._regexes = None
00095
00096 self._message_count = {}
00097
00098 self._master_check_thread = threading.Thread(target=self._run_master_check)
00099 self._write_thread = threading.Thread(target=self._run_write)
00100
00101 @property
00102 def bag(self):
00103 return self._bag
00104
00105 def add_listener(self, listener):
00106 """
00107 Add a listener which gets called whenever a message is recorded.
00108 @param listener: function to call
00109 @type listener: function taking (topic, message, time)
00110 """
00111 self._listeners.append(listener)
00112
00113 def start(self):
00114 """
00115 Start subscribing and recording messages to bag.
00116 """
00117 self._master_check_thread.start()
00118 self._write_thread.start()
00119
00120 @property
00121 def paused(self):
00122 return self._paused
00123
00124 def pause(self):
00125 self._paused = True
00126
00127 def unpause(self):
00128 self._paused = False
00129
00130 def toggle_paused(self):
00131 self._paused = not self._paused
00132
00133 def stop(self):
00134 """
00135 Stop recording.
00136 """
00137 with self._stop_condition:
00138 self._stop_flag = True
00139 self._stop_condition.notify_all()
00140
00141 self._write_queue.put(self)
00142
00143
00144
00145 def _run_master_check(self):
00146 master = rosgraph.Master('rqt_bag_recorder')
00147
00148 try:
00149 while not self._stop_flag:
00150
00151 for topic, datatype in master.getPublishedTopics(''):
00152
00153
00154
00155
00156
00157 if topic in self._subscriber_helpers or topic in self._failed_topics or topic in self._limited_topics or not self._should_subscribe_to(topic):
00158 continue
00159
00160 try:
00161 pytype = roslib.message.get_message_class(datatype)
00162
00163 self._message_count[topic] = 0
00164
00165 self._subscriber_helpers[topic] = _SubscriberHelper(self, topic, pytype)
00166 except Exception as ex:
00167 print('Error subscribing to %s (ignoring): %s' % (topic, str(ex)), file=sys.stderr)
00168 self._failed_topics.add(topic)
00169
00170
00171 self._stop_condition.acquire()
00172 self._stop_condition.wait(self._master_check_interval)
00173
00174 except Exception as ex:
00175 print('Error recording to bag: %s' % str(ex), file=sys.stderr)
00176
00177
00178 for topic in list(self._subscriber_helpers.keys()):
00179 self._unsubscribe(topic)
00180
00181
00182 try:
00183 self._bag.close()
00184 except Exception as ex:
00185 print('Error closing bag [%s]: %s' % (self._bag.filename, str(ex)))
00186
00187 def _should_subscribe_to(self, topic):
00188 if self._all:
00189 return True
00190
00191 if not self._regex:
00192 return topic in self._topics
00193
00194 for regex in self._regexes:
00195 if regex.match(topic):
00196 return True
00197
00198 return False
00199
00200 def _unsubscribe(self, topic):
00201 try:
00202 self._subscriber_helpers[topic].subscriber.unregister()
00203 except Exception:
00204 return
00205
00206 del self._subscriber_helpers[topic]
00207
00208 def _record(self, topic, m):
00209 if self._paused:
00210 return
00211
00212 if self._limit and self._message_count[topic] >= self._limit:
00213 self._limited_topics.add(topic)
00214 self._unsubscribe(topic)
00215 return
00216
00217 self._write_queue.put((topic, m, rospy.get_rostime()))
00218 self._message_count[topic] += 1
00219
00220 def _run_write(self):
00221 try:
00222 while not self._stop_flag:
00223
00224 item = self._write_queue.get()
00225
00226 if item == self:
00227 continue
00228
00229 topic, m, t = item
00230
00231
00232 with self._bag_lock:
00233 self._bag.write(topic, m, t)
00234
00235
00236 for listener in self._listeners:
00237 listener(topic, m, t)
00238
00239 except Exception as ex:
00240 print('Error write to bag: %s' % str(ex), file=sys.stderr)
00241
00242
00243 class _SubscriberHelper(object):
00244 def __init__(self, recorder, topic, pytype):
00245 self.recorder = recorder
00246 self.topic = topic
00247
00248 self.subscriber = rospy.Subscriber(self.topic, pytype, self.callback)
00249
00250 def callback(self, m):
00251 self.recorder._record(self.topic, m)