recorder.py
Go to the documentation of this file.
1 # Software License Agreement (BSD License)
2 #
3 # Copyright (c) 2012, Willow Garage, Inc.
4 # All rights reserved.
5 #
6 # Redistribution and use in source and binary forms, with or without
7 # modification, are permitted provided that the following conditions
8 # are met:
9 #
10 # * Redistributions of source code must retain the above copyright
11 # notice, this list of conditions and the following disclaimer.
12 # * Redistributions in binary form must reproduce the above
13 # copyright notice, this list of conditions and the following
14 # disclaimer in the documentation and/or other materials provided
15 # with the distribution.
16 # * Neither the name of Willow Garage, Inc. nor the names of its
17 # contributors may be used to endorse or promote products derived
18 # from this software without specific prior written permission.
19 #
20 # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21 # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22 # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
23 # FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
24 # COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
25 # INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
26 # BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
27 # LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
28 # CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
29 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
30 # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
31 # POSSIBILITY OF SUCH DAMAGE.
32 
33 """
34 Recorder subscribes to ROS messages and writes them to a bag file.
35 """
36 
37 from __future__ import print_function
38 try:
39  from queue import Queue
40 except ImportError:
41  from Queue import Queue
42 import re
43 import threading
44 import time
45 
46 import rosbag
47 import rosgraph
48 import roslib
49 import rospy
50 
51 import sys
52 
53 
54 class Recorder(object):
55 
56  def __init__(self, filename, bag_lock=None, all=True, topics=[], regex=False, limit=0, master_check_interval=1.0):
57  """
58  Subscribe to ROS messages and record them to a bag file.
59 
60  @param filename: filename of bag to write to
61  @type filename: str
62  @param all: all topics are to be recorded [default: True]
63  @type all: bool
64  @param topics: topics (or regexes if regex is True) to record [default: empty list]
65  @type topics: list of str
66  @param regex: topics should be considered as regular expressions [default: False]
67  @type regex: bool
68  @param limit: record only this number of messages on each topic (if non-positive, then unlimited) [default: 0]
69  @type limit: int
70  @param master_check_interval: period (in seconds) to check master for new topic publications [default: 1]
71  @type master_check_interval: float
72  """
73  self._all = all
74  self._topics = topics
75  self._regex = regex
76  self._limit = limit
77  self._master_check_interval = master_check_interval
78 
79  self._bag = rosbag.Bag(filename, 'w')
80  self._bag_lock = bag_lock if bag_lock else threading.Lock()
81  self._listeners = []
83  self._limited_topics = set()
84  self._failed_topics = set()
85  self._last_update = time.time()
86  self._write_queue = Queue()
87  self._paused = False
88  self._stop_condition = threading.Condition()
89  self._stop_flag = False
90 
91  # Compile regular expressions
92  if self._regex:
93  self._regexes = [re.compile(t) for t in self._topics]
94  else:
95  self._regexes = None
96 
97  self._message_count = {} # topic -> int (track number of messages recorded on each topic)
98 
99  self._master_check_thread = threading.Thread(target=self._run_master_check)
100  self._write_thread = threading.Thread(target=self._run_write)
101 
102  @property
103  def bag(self):
104  return self._bag
105 
106  def add_listener(self, listener):
107  """
108  Add a listener which gets called whenever a message is recorded.
109  @param listener: function to call
110  @type listener: function taking (topic, message, time)
111  """
112  self._listeners.append(listener)
113 
114  def start(self):
115  """
116  Start subscribing and recording messages to bag.
117  """
118  self._master_check_thread.start()
119  self._write_thread.start()
120 
121  @property
122  def paused(self):
123  return self._paused
124 
125  def pause(self):
126  self._paused = True
127 
128  def unpause(self):
129  self._paused = False
130 
131  def toggle_paused(self):
132  self._paused = not self._paused
133 
134  def stop(self):
135  """
136  Stop recording.
137  """
138  with self._stop_condition:
139  self._stop_flag = True
140  self._stop_condition.notify_all()
141 
142  self._write_queue.put(self)
143 
144  # Implementation
145 
146  def _run_master_check(self):
147  master = rosgraph.Master('rqt_bag_recorder')
148 
149  try:
150  while not self._stop_flag:
151  # Check for new topics
152  for topic, datatype in master.getPublishedTopics(''):
153  # Check if:
154  # the topic is already subscribed to, or
155  # we've failed to subscribe to it already, or
156  # we've already reached the message limit, or
157  # we don't want to subscribe
158  if topic in self._subscriber_helpers or topic in self._failed_topics or topic in self._limited_topics or not self._should_subscribe_to(topic):
159  continue
160 
161  try:
162  pytype = roslib.message.get_message_class(datatype)
163 
164  self._message_count[topic] = 0
165 
166  self._subscriber_helpers[topic] = _SubscriberHelper(self, topic, pytype)
167  except Exception as ex:
168  print('Error subscribing to %s (ignoring): %s' %
169  (topic, str(ex)), file=sys.stderr)
170  self._failed_topics.add(topic)
171 
172  # Wait a while
173  self._stop_condition.acquire()
174  self._stop_condition.wait(self._master_check_interval)
175 
176  except Exception as ex:
177  print('Error recording to bag: %s' % str(ex), file=sys.stderr)
178 
179  # Unsubscribe from all topics
180  for topic in list(self._subscriber_helpers.keys()):
181  self._unsubscribe(topic)
182 
183  # Close the bag file so that the index gets written
184  try:
185  self._bag.close()
186  except Exception as ex:
187  print('Error closing bag [%s]: %s' % (self._bag.filename, str(ex)))
188 
189  def _should_subscribe_to(self, topic):
190  if self._all:
191  return True
192 
193  if not self._regex:
194  return topic in self._topics
195 
196  for regex in self._regexes:
197  if regex.match(topic):
198  return True
199 
200  return False
201 
202  def _unsubscribe(self, topic):
203  try:
204  self._subscriber_helpers[topic].subscriber.unregister()
205  except Exception:
206  return
207 
208  del self._subscriber_helpers[topic]
209 
210  def _record(self, topic, m):
211  if self._paused:
212  return
213 
214  if self._limit and self._message_count[topic] >= self._limit:
215  self._limited_topics.add(topic)
216  self._unsubscribe(topic)
217  return
218 
219  self._write_queue.put((topic, m, rospy.get_rostime()))
220  self._message_count[topic] += 1
221 
222  def _run_write(self):
223  try:
224  while not self._stop_flag:
225  # Wait for a message
226  item = self._write_queue.get()
227 
228  if item == self:
229  continue
230 
231  topic, m, t = item
232 
233  # Write to the bag
234  with self._bag_lock:
235  self._bag.write(topic, m, t)
236 
237  # Notify listeners that a message has been recorded
238  for listener in self._listeners:
239  listener(topic, m, t)
240 
241  except Exception as ex:
242  print('Error write to bag: %s' % str(ex), file=sys.stderr)
243 
244 
245 class _SubscriberHelper(object):
246 
247  def __init__(self, recorder, topic, pytype):
248  self.recorder = recorder
249  self.topic = topic
250 
251  self.subscriber = rospy.Subscriber(self.topic, pytype, self.callback)
252 
253  def callback(self, m):
254  self.recorder._record(self.topic, m)
def _record(self, topic, m)
Definition: recorder.py:210
def _should_subscribe_to(self, topic)
Definition: recorder.py:189
def __init__(self, filename, bag_lock=None, all=True, topics=[], regex=False, limit=0, master_check_interval=1.0)
Definition: recorder.py:56
def __init__(self, recorder, topic, pytype)
Definition: recorder.py:247
def _unsubscribe(self, topic)
Definition: recorder.py:202
def add_listener(self, listener)
Definition: recorder.py:106
def _run_master_check(self)
Definition: recorder.py:146


rqt_bag
Author(s): Dirk Thomas , Aaron Blasdel , Austin Hendrix , Tim Field
autogenerated on Fri Feb 19 2021 03:14:14