recorder.py
Go to the documentation of this file.
00001 # Software License Agreement (BSD License)
00002 #
00003 # Copyright (c) 2012, Willow Garage, Inc.
00004 # All rights reserved.
00005 #
00006 # Redistribution and use in source and binary forms, with or without
00007 # modification, are permitted provided that the following conditions
00008 # are met:
00009 #
00010 #  * Redistributions of source code must retain the above copyright
00011 #    notice, this list of conditions and the following disclaimer.
00012 #  * Redistributions in binary form must reproduce the above
00013 #    copyright notice, this list of conditions and the following
00014 #    disclaimer in the documentation and/or other materials provided
00015 #    with the distribution.
00016 #  * Neither the name of Willow Garage, Inc. nor the names of its
00017 #    contributors may be used to endorse or promote products derived
00018 #    from this software without specific prior written permission.
00019 #
00020 # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
00021 # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
00022 # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
00023 # FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
00024 # COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
00025 # INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
00026 # BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
00027 # LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
00028 # CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
00029 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
00030 # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
00031 # POSSIBILITY OF SUCH DAMAGE.
00032 
00033 """
00034 Recorder subscribes to ROS messages and writes them to a bag file.
00035 """
00036 
00037 from __future__ import print_function
00038 try:
00039     from queue import Queue
00040 except ImportError:
00041     from Queue import Queue
00042 import re
00043 import threading
00044 import time
00045 
00046 import rosbag
00047 import rosgraph
00048 import roslib
00049 import rospy
00050 
00051 import sys
00052 
00053 
00054 class Recorder(object):
00055     def __init__(self, filename, bag_lock=None, all=True, topics=[], regex=False, limit=0, master_check_interval=1.0):
00056         """
00057         Subscribe to ROS messages and record them to a bag file.
00058 
00059         @param filename: filename of bag to write to
00060         @type  filename: str
00061         @param all: all topics are to be recorded [default: True]
00062         @type  all: bool
00063         @param topics: topics (or regexes if regex is True) to record [default: empty list]
00064         @type  topics: list of str
00065         @param regex: topics should be considered as regular expressions [default: False]
00066         @type  regex: bool
00067         @param limit: record only this number of messages on each topic (if non-positive, then unlimited) [default: 0]
00068         @type  limit: int
00069         @param master_check_interval: period (in seconds) to check master for new topic publications [default: 1]
00070         @type  master_check_interval: float
00071         """
00072         self._all = all
00073         self._topics = topics
00074         self._regex = regex
00075         self._limit = limit
00076         self._master_check_interval = master_check_interval
00077 
00078         self._bag = rosbag.Bag(filename, 'w')
00079         self._bag_lock = bag_lock if bag_lock else threading.Lock()
00080         self._listeners = []
00081         self._subscriber_helpers = {}
00082         self._limited_topics = set()
00083         self._failed_topics = set()
00084         self._last_update = time.time()
00085         self._write_queue = Queue()
00086         self._paused = False
00087         self._stop_condition = threading.Condition()
00088         self._stop_flag = False
00089 
00090         # Compile regular expressions
00091         if self._regex:
00092             self._regexes = [re.compile(t) for t in self._topics]
00093         else:
00094             self._regexes = None
00095 
00096         self._message_count = {}  # topic -> int (track number of messages recorded on each topic)
00097 
00098         self._master_check_thread = threading.Thread(target=self._run_master_check)
00099         self._write_thread = threading.Thread(target=self._run_write)
00100 
00101     @property
00102     def bag(self):
00103         return self._bag
00104 
00105     def add_listener(self, listener):
00106         """
00107         Add a listener which gets called whenever a message is recorded.
00108         @param listener: function to call
00109         @type  listener: function taking (topic, message, time)
00110         """
00111         self._listeners.append(listener)
00112 
00113     def start(self):
00114         """
00115         Start subscribing and recording messages to bag.
00116         """
00117         self._master_check_thread.start()
00118         self._write_thread.start()
00119 
00120     @property
00121     def paused(self):
00122         return self._paused
00123 
00124     def pause(self):
00125         self._paused = True
00126 
00127     def unpause(self):
00128         self._paused = False
00129 
00130     def toggle_paused(self):
00131         self._paused = not self._paused
00132 
00133     def stop(self):
00134         """
00135         Stop recording.
00136         """
00137         with self._stop_condition:
00138             self._stop_flag = True
00139             self._stop_condition.notify_all()
00140 
00141         self._write_queue.put(self)
00142 
00143     ## Implementation
00144 
00145     def _run_master_check(self):
00146         master = rosgraph.Master('rqt_bag_recorder')
00147 
00148         try:
00149             while not self._stop_flag:
00150                 # Check for new topics
00151                 for topic, datatype in master.getPublishedTopics(''):
00152                     # Check if:
00153                     #    the topic is already subscribed to, or
00154                     #    we've failed to subscribe to it already, or
00155                     #    we've already reached the message limit, or
00156                     #    we don't want to subscribe
00157                     if topic in self._subscriber_helpers or topic in self._failed_topics or topic in self._limited_topics or not self._should_subscribe_to(topic):
00158                         continue
00159 
00160                     try:
00161                         pytype = roslib.message.get_message_class(datatype)
00162 
00163                         self._message_count[topic] = 0
00164 
00165                         self._subscriber_helpers[topic] = _SubscriberHelper(self, topic, pytype)
00166                     except Exception as ex:
00167                         print('Error subscribing to %s (ignoring): %s' % (topic, str(ex)), file=sys.stderr)
00168                         self._failed_topics.add(topic)
00169 
00170                 # Wait a while
00171                 self._stop_condition.acquire()
00172                 self._stop_condition.wait(self._master_check_interval)
00173 
00174         except Exception as ex:
00175             print('Error recording to bag: %s' % str(ex), file=sys.stderr)
00176 
00177         # Unsubscribe from all topics
00178         for topic in list(self._subscriber_helpers.keys()):
00179             self._unsubscribe(topic)
00180 
00181         # Close the bag file so that the index gets written
00182         try:
00183             self._bag.close()
00184         except Exception as ex:
00185             print('Error closing bag [%s]: %s' % (self._bag.filename, str(ex)))
00186 
00187     def _should_subscribe_to(self, topic):
00188         if self._all:
00189             return True
00190 
00191         if not self._regex:
00192             return topic in self._topics
00193 
00194         for regex in self._regexes:
00195             if regex.match(topic):
00196                 return True
00197 
00198         return False
00199 
00200     def _unsubscribe(self, topic):
00201         try:
00202             self._subscriber_helpers[topic].subscriber.unregister()
00203         except Exception:
00204             return
00205 
00206         del self._subscriber_helpers[topic]
00207 
00208     def _record(self, topic, m):
00209         if self._paused:
00210             return
00211 
00212         if self._limit and self._message_count[topic] >= self._limit:
00213             self._limited_topics.add(topic)
00214             self._unsubscribe(topic)
00215             return
00216 
00217         self._write_queue.put((topic, m, rospy.get_rostime()))
00218         self._message_count[topic] += 1
00219 
00220     def _run_write(self):
00221         try:
00222             while not self._stop_flag:
00223                 # Wait for a message
00224                 item = self._write_queue.get()
00225 
00226                 if item == self:
00227                     continue
00228 
00229                 topic, m, t = item
00230 
00231                 # Write to the bag
00232                 with self._bag_lock:
00233                     self._bag.write(topic, m, t)
00234 
00235                 # Notify listeners that a message has been recorded
00236                 for listener in self._listeners:
00237                     listener(topic, m, t)
00238 
00239         except Exception as ex:
00240             print('Error write to bag: %s' % str(ex), file=sys.stderr)
00241 
00242 
00243 class _SubscriberHelper(object):
00244     def __init__(self, recorder, topic, pytype):
00245         self.recorder = recorder
00246         self.topic = topic
00247 
00248         self.subscriber = rospy.Subscriber(self.topic, pytype, self.callback)
00249 
00250     def callback(self, m):
00251         self.recorder._record(self.topic, m)


rqt_bag
Author(s): Aaron Blasdel, Tim Field
autogenerated on Thu Aug 17 2017 02:19:27