bag.py
Go to the documentation of this file.
00001 # Software License Agreement (BSD License)
00002 #
00003 # Copyright (c) 2010, Willow Garage, Inc.
00004 # All rights reserved.
00005 #
00006 # Redistribution and use in source and binary forms, with or without
00007 # modification, are permitted provided that the following conditions
00008 # are met:
00009 #
00010 #  * Redistributions of source code must retain the above copyright
00011 #    notice, this list of conditions and the following disclaimer.
00012 #  * Redistributions in binary form must reproduce the above
00013 #    copyright notice, this list of conditions and the following
00014 #    disclaimer in the documentation and/or other materials provided
00015 #    with the distribution.
00016 #  * Neither the name of Willow Garage, Inc. nor the names of its
00017 #    contributors may be used to endorse or promote products derived
00018 #    from this software without specific prior written permission.
00019 #
00020 # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
00021 # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
00022 # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
00023 # FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
00024 # COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
00025 # INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
00026 # BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
00027 # LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
00028 # CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
00029 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
00030 # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
00031 # POSSIBILITY OF SUCH DAMAGE.
00032 
00033 """
00034 The rosbag Python API.
00035 
00036 Provides serialization of bag files.
00037 """
00038 
00039 from __future__ import print_function
00040 
00041 import bisect
00042 import bz2
00043 import heapq
00044 import os
00045 import re
00046 import struct
00047 import sys
00048 import threading
00049 import time
00050 import yaml
00051 
00052 try:
00053     from cStringIO import StringIO  # Python 2.x
00054 except ImportError:
00055     from io import StringIO  # Python 3.x
00056 
00057 import genmsg
00058 import genpy
00059 import genpy.dynamic
00060 import genpy.message
00061 
00062 import roslib.names # still needed for roslib.names.canonicalize_name()
00063 import rospy
00064 
00065 class ROSBagException(Exception):
00066     """
00067     Base class for exceptions in rosbag.
00068     """
00069     def __init__(self, value):
00070         self.value = value
00071 
00072     def __str__(self):
00073         return self.value
00074 
00075 class ROSBagFormatException(ROSBagException):
00076     """
00077     Exceptions for errors relating to the bag file format.
00078     """
00079     def __init__(self, value):
00080         ROSBagException.__init__(self, value)
00081 
00082 class ROSBagUnindexedException(ROSBagException):
00083     """
00084     Exception for unindexed bags.
00085     """
00086     def __init__(self):
00087         ROSBagException.__init__(self, 'Unindexed bag')
00088 
00089 class Compression:
00090     """
00091     Allowable compression types
00092     """
00093     NONE = 'none'
00094     BZ2  = 'bz2'
00095 
00096 class Bag(object):
00097     """
00098     Bag serialize messages to and from a single file on disk using the bag format.
00099     """
00100     def __init__(self, f, mode='r', compression=Compression.NONE, chunk_threshold=768 * 1024, allow_unindexed=False, options=None, skip_index=False):
00101         """
00102         Open a bag file.  The mode can be 'r', 'w', or 'a' for reading (default),
00103         writing or appending.  The file will be created if it doesn't exist
00104         when opened for writing or appending; it will be truncated when opened
00105         for writing.  Simultaneous reading and writing is allowed when in writing
00106         or appending mode.
00107         @param f: filename of bag to open or a stream to read from
00108         @type  f: str or file
00109         @param mode: mode, either 'r', 'w', or 'a'
00110         @type  mode: str
00111         @param compression: compression mode, see U{rosbag.Compression} for valid modes
00112         @type  compression: str
00113         @param chunk_threshold: minimum number of uncompressed bytes per chunk
00114         @type  chunk_threshold: int
00115         @param allow_unindexed: if True, allow opening unindexed bags
00116         @type  allow_unindexed: bool
00117         @param options: the bag options (currently: compression and chunk_threshold)
00118         @type  options: dict
00119         @param skip_index: if True, don't read the connection index records on open [2.0+]
00120         @type  skip_index: bool
00121         @raise ValueError: if any argument is invalid
00122         @raise ROSBagException: if an error occurs opening file
00123         @raise ROSBagFormatException: if bag format is corrupted
00124         """
00125         if options is not None:
00126             if type(options) is not dict:
00127                 raise ValueError('options must be of type dict')                
00128             if 'compression' in options:
00129                 compression = options['compression']
00130             if 'chunk_threshold' in options:
00131                 chunk_threshold = options['chunk_threshold']
00132 
00133         self._file     = None
00134         self._filename = None
00135         self._version  = None
00136 
00137         allowed_compressions = [Compression.NONE, Compression.BZ2]
00138         if compression not in allowed_compressions:
00139             raise ValueError('compression must be one of: %s' % ', '.join(allowed_compressions))  
00140         self._compression = compression      
00141 
00142         if chunk_threshold < 0:
00143             raise ValueError('chunk_threshold must be greater than or equal to zero')        
00144         self._chunk_threshold = chunk_threshold
00145 
00146         self._skip_index = skip_index
00147 
00148         self._reader          = None
00149 
00150         self._file_header_pos = None
00151         self._index_data_pos  = 0       # (1.2+)
00152 
00153         self._clear_index()
00154 
00155         self._buffer = StringIO()        
00156 
00157         self._curr_compression = Compression.NONE
00158         
00159         self._open(f, mode, allow_unindexed)
00160 
00161         self._output_file = self._file
00162 
00163     def __iter__(self):
00164         return self.read_messages()
00165 
00166     def __enter__(self):
00167         return self
00168         
00169     def __exit__(self, exc_type, exc_value, traceback):
00170         self.close()
00171 
00172     @property
00173     def options(self):
00174         """Get the options."""
00175         return { 'compression' : self._compression, 'chunk_threshold' : self._chunk_threshold }
00176     
00177     @property
00178     def filename(self):
00179         """Get the filename."""
00180         return self._filename
00181     
00182     @property
00183     def version(self):
00184         """Get the version."""
00185         return self._version
00186     
00187     @property
00188     def mode(self):
00189         """Get the mode."""
00190         return self._mode
00191 
00192     @property
00193     def size(self):
00194         """Get the size in bytes."""
00195         if not self._file:
00196             raise ValueError('I/O operation on closed bag')
00197         
00198         pos = self._file.tell()
00199         self._file.seek(0, os.SEEK_END)
00200         size = self._file.tell()
00201         self._file.seek(pos)
00202         return size
00203 
00204     # compression
00205         
00206     def _get_compression(self):
00207         """Get the compression method to use for writing."""
00208         return self._compression
00209     
00210     def _set_compression(self, compression):
00211         """Set the compression method to use for writing."""
00212         allowed_compressions = [Compression.NONE, Compression.BZ2]
00213         if compression not in allowed_compressions:
00214             raise ValueError('compression must be one of: %s' % ', '.join(allowed_compressions))        
00215         
00216         self.flush()
00217         self._compression = compression
00218         
00219     compression = property(_get_compression, _set_compression)
00220     
00221     # chunk_threshold
00222     
00223     def _get_chunk_threshold(self):
00224         """Get the chunk threshold to use for writing."""
00225         return self._chunk_threshold
00226     
00227     def _set_chunk_threshold(self, chunk_threshold):
00228         """Set the chunk threshold to use for writing."""
00229         if chunk_threshold < 0:
00230             raise ValueError('chunk_threshold must be greater than or equal to zero')
00231 
00232         self.flush()
00233         self._chunk_threshold = chunk_threshold
00234         
00235     chunk_threshold = property(_get_chunk_threshold, _set_chunk_threshold)
00236 
00237     def read_messages(self, topics=None, start_time=None, end_time=None, connection_filter=None, raw=False):
00238         """
00239         Read messages from the bag, optionally filtered by topic, timestamp and connection details.
00240         @param topics: list of topics or a single topic [optional]
00241         @type  topics: list(str) or str
00242         @param start_time: earliest timestamp of message to return [optional]
00243         @type  start_time: U{genpy.Time}
00244         @param end_time: latest timestamp of message to return [optional]
00245         @type  end_time: U{genpy.Time}
00246         @param connection_filter: function to filter connections to include [optional]
00247         @type  connection_filter: function taking (topic, datatype, md5sum, msg_def, header) and returning bool
00248         @param raw: if True, then generate tuples of (datatype, (data, md5sum, position), pytype)
00249         @type  raw: bool
00250         @return: generator of (topic, message, timestamp) tuples for each message in the bag file
00251         @rtype:  generator of tuples of (str, U{genpy.Message}, U{genpy.Time}) [not raw] or (str, (str, str, str, tuple, class), U{genpy.Time}) [raw]
00252         """
00253         self.flush()
00254 
00255         if topics and type(topics) is str:
00256             topics = [topics]
00257         
00258         return self._reader.read_messages(topics, start_time, end_time, connection_filter, raw)
00259 
00260     def flush(self):
00261         """
00262         Write the open chunk to disk so subsequent reads will read all messages.
00263         @raise ValueError: if bag is closed 
00264         """
00265         if not self._file:
00266             raise ValueError('I/O operation on closed bag')
00267 
00268         if self._chunk_open:
00269             self._stop_writing_chunk()
00270 
00271     def write(self, topic, msg, t=None, raw=False):
00272         """
00273         Write a message to the bag.
00274         @param topic: name of topic
00275         @type  topic: str
00276         @param msg: message to add to bag, or tuple (if raw)
00277         @type  msg: Message or tuple of raw message data
00278         @param t: ROS time of message publication, if None specifed, use current time [optional]
00279         @type  t: U{genpy.Time}
00280         @param raw: if True, msg is in raw format, i.e. (msg_type, serialized_bytes, md5sum, pytype)
00281         @type  raw: bool
00282         @raise ValueError: if arguments are invalid or bag is closed
00283         """
00284         if not self._file:
00285             raise ValueError('I/O operation on closed bag')
00286 
00287         if not topic:
00288             raise ValueError('topic is invalid')
00289         if not msg:
00290             raise ValueError('msg is invalid')
00291 
00292         if t is None:
00293             t = genpy.Time.from_sec(time.time())
00294 
00295         # Seek to end (in case previous operation was a read)
00296         self._file.seek(0, os.SEEK_END)
00297 
00298         # Open a chunk, if needed
00299         if not self._chunk_open:
00300             self._start_writing_chunk(t)
00301 
00302         # Unpack raw
00303         if raw:
00304             if len(msg) == 5:
00305                 msg_type, serialized_bytes, md5sum, pos, pytype = msg
00306             elif len(msg) == 4:
00307                 msg_type, serialized_bytes, md5sum, pytype = msg
00308             else:
00309                 raise ValueError('msg must be of length 4 or 5')
00310 
00311         # Write connection record, if necessary (currently using a connection per topic; ignoring message connection header)
00312         if topic in self._topic_connections:
00313             connection_info = self._topic_connections[topic]
00314             conn_id = connection_info.id
00315         else:
00316             conn_id = len(self._connections)
00317 
00318             if raw:
00319                 if pytype is None:
00320                     try:
00321                         pytype = genpy.message.get_message_class(msg_type)
00322                     except Exception:
00323                         pytype = None
00324                 if pytype is None:
00325                     raise ROSBagException('cannot locate message class and no message class provided for [%s]' % msg_type)
00326     
00327                 if pytype._md5sum != md5sum:
00328                     print('WARNING: md5sum of loaded type [%s] does not match that specified' % msg_type, file=sys.stderr)
00329                     #raise ROSBagException('md5sum of loaded type does not match that of data being recorded')
00330             
00331                 header = { 'topic' : topic, 'type' : msg_type, 'md5sum' : md5sum, 'message_definition' : pytype._full_text }
00332             else:
00333                 header = { 'topic' : topic, 'type' : msg.__class__._type, 'md5sum' : msg.__class__._md5sum, 'message_definition' : msg._full_text }
00334 
00335             connection_info = _ConnectionInfo(conn_id, topic, header)
00336 
00337             self._write_connection_record(connection_info)
00338 
00339             self._connections[conn_id] = connection_info
00340             self._topic_connections[topic] = connection_info
00341 
00342         # Create an index entry
00343         index_entry = _IndexEntry200(t, self._curr_chunk_info.pos, self._get_chunk_offset())
00344 
00345         # Update the indexes and current chunk info 
00346         if conn_id not in self._curr_chunk_connection_indexes:
00347             # This is the first message on this connection in the chunk
00348             self._curr_chunk_connection_indexes[conn_id] = [index_entry]
00349             self._curr_chunk_info.connection_counts[conn_id] = 1
00350         else:
00351             curr_chunk_connection_index = self._curr_chunk_connection_indexes[conn_id]
00352             if index_entry >= curr_chunk_connection_index[-1]:
00353                 # Test if we're writing chronologically.  Can skip binary search if so.
00354                 curr_chunk_connection_index.append(index_entry)
00355             else:
00356                 bisect.insort_right(curr_chunk_connection_index, index_entry)
00357 
00358             self._curr_chunk_info.connection_counts[conn_id] += 1
00359 
00360         if conn_id not in self._connection_indexes:
00361             self._connection_indexes[conn_id] = [index_entry]
00362         else:
00363             bisect.insort_right(self._connection_indexes[conn_id], index_entry)
00364 
00365         # Update the chunk start/end times
00366         if t > self._curr_chunk_info.end_time:
00367             self._curr_chunk_info.end_time = t
00368         elif t < self._curr_chunk_info.start_time:
00369             self._curr_chunk_info.start_time = t
00370 
00371         if not raw:
00372             # Serialize the message to the buffer
00373             self._buffer.seek(0)
00374             self._buffer.truncate(0)
00375             msg.serialize(self._buffer)
00376             serialized_bytes = self._buffer.getvalue()
00377 
00378         # Write message data record
00379         self._write_message_data_record(conn_id, t, serialized_bytes)
00380         
00381         # Check if we want to stop this chunk
00382         chunk_size = self._get_chunk_offset()
00383         if chunk_size > self._chunk_threshold:
00384             self._stop_writing_chunk()
00385 
00386     def reindex(self):
00387         """
00388         Reindexes the bag file.  Yields position of each chunk for progress.
00389         """
00390         self._clear_index()
00391         return self._reader.reindex()
00392 
00393     def close(self):
00394         """
00395         Close the bag file.  Closing an already closed bag does nothing.
00396         """
00397         if self._file:
00398             if self._mode in 'wa':
00399                 self._stop_writing()
00400             
00401             self._close_file()
00402 
00403     def __str__(self):
00404         rows = []
00405 
00406         try:
00407             if self._filename:
00408                 rows.append(('path', self._filename))
00409 
00410             if self._version == 102 and type(self._reader) == _BagReader102_Unindexed:
00411                 rows.append(('version', '1.2 (unindexed)'))
00412             else:
00413                 rows.append(('version', '%d.%d' % (self._version / 100, self._version % 100)))
00414 
00415             if not self._connection_indexes and not self._chunks:
00416                 rows.append(('size', _human_readable_size(self.size)))
00417             else:
00418                 if self._chunks:
00419                     start_stamp = self._chunks[ 0].start_time.to_sec()
00420                     end_stamp   = self._chunks[-1].end_time.to_sec()
00421                 else:
00422                     start_stamp = min([index[ 0].time.to_sec() for index in self._connection_indexes.itervalues()])
00423                     end_stamp   = max([index[-1].time.to_sec() for index in self._connection_indexes.itervalues()])
00424     
00425                 # Show duration
00426                 duration = end_stamp - start_stamp
00427                 dur_secs = duration % 60
00428                 dur_mins = int(duration / 60)
00429                 dur_hrs  = int(dur_mins / 60)
00430                 if dur_hrs > 0:
00431                     dur_mins = dur_mins % 60
00432                     duration_str = '%dhr %d:%02ds (%ds)' % (dur_hrs, dur_mins, dur_secs, duration)
00433                 elif dur_mins > 0:
00434                     duration_str = '%d:%02ds (%ds)' % (dur_mins, dur_secs, duration)
00435                 else:
00436                     duration_str = '%.1fs' % duration   
00437 
00438                 rows.append(('duration', duration_str))
00439         
00440                 # Show start and end times
00441                 rows.append(('start', '%s (%.2f)' % (_time_to_str(start_stamp), start_stamp)))
00442                 rows.append(('end',   '%s (%.2f)' % (_time_to_str(end_stamp),   end_stamp)))
00443     
00444                 rows.append(('size', _human_readable_size(self.size)))
00445 
00446                 if self._chunks:
00447                     num_messages = 0
00448                     for c in self._chunks:
00449                         for counts in c.connection_counts.itervalues():
00450                             num_messages += counts
00451                 else:
00452                     num_messages = sum([len(index) for index in self._connection_indexes.itervalues()])
00453                 rows.append(('messages', str(num_messages)))
00454 
00455                 # Show compression information
00456                 if len(self._chunk_headers) == 0:
00457                     rows.append(('compression', 'none'))
00458                 else:
00459                     compression_counts       = {}
00460                     compression_uncompressed = {}
00461                     compression_compressed   = {}
00462                     for chunk_header in self._chunk_headers.itervalues():
00463                         if chunk_header.compression not in compression_counts:
00464                             compression_counts[chunk_header.compression] = 1
00465                             compression_uncompressed[chunk_header.compression] = chunk_header.uncompressed_size
00466                             compression_compressed[chunk_header.compression] = chunk_header.compressed_size
00467                         else:
00468                             compression_counts[chunk_header.compression] += 1
00469                             compression_uncompressed[chunk_header.compression] += chunk_header.uncompressed_size
00470                             compression_compressed[chunk_header.compression] += chunk_header.compressed_size
00471     
00472                     chunk_count = len(self._chunk_headers)
00473     
00474                     compressions = []
00475                     for count, compression in reversed(sorted([(v, k) for k, v in compression_counts.items()])):
00476                         if compression != Compression.NONE:
00477                             fraction = (100.0 * compression_compressed[compression]) / compression_uncompressed[compression]
00478                             compressions.append('%s [%d/%d chunks; %.2f%%]' % (compression, count, chunk_count, fraction))
00479                         else:
00480                             compressions.append('%s [%d/%d chunks]' % (compression, count, chunk_count))
00481                     rows.append(('compression', ', '.join(compressions)))
00482     
00483                     all_uncompressed = (sum([count for c, count in compression_counts.items() if c != Compression.NONE]) == 0)
00484                     if not all_uncompressed:    
00485                         total_uncompressed_size = sum((h.uncompressed_size for h in self._chunk_headers.itervalues()))
00486                         total_compressed_size   = sum((h.compressed_size   for h in self._chunk_headers.itervalues()))
00487                         
00488                         total_uncompressed_size_str = _human_readable_size(total_uncompressed_size)
00489                         total_compressed_size_str   = _human_readable_size(total_compressed_size)
00490                         total_size_str_length = max([len(total_uncompressed_size_str), len(total_compressed_size_str)])
00491 
00492                         if duration > 0:
00493                             uncompressed_rate_str = _human_readable_size(total_uncompressed_size / duration)
00494                             compressed_rate_str   = _human_readable_size(total_compressed_size / duration)
00495                             rate_str_length = max([len(uncompressed_rate_str), len(compressed_rate_str)])
00496 
00497                             rows.append(('uncompressed', '%*s @ %*s/s' %
00498                                          (total_size_str_length, total_uncompressed_size_str, rate_str_length, uncompressed_rate_str)))
00499                             rows.append(('compressed',   '%*s @ %*s/s (%.2f%%)' %
00500                                          (total_size_str_length, total_compressed_size_str,   rate_str_length, compressed_rate_str, (100.0 * total_compressed_size) / total_uncompressed_size)))
00501                         else:
00502                             rows.append(('uncompressed', '%*s' % (total_size_str_length, total_uncompressed_size_str)))
00503                             rows.append(('compressed',   '%*s' % (total_size_str_length, total_compressed_size_str)))
00504 
00505                 datatypes = set()
00506                 datatype_infos = []
00507                 for connection in self._connections.itervalues():
00508                     if connection.datatype in datatypes:
00509                         continue
00510                     datatype_infos.append((connection.datatype, connection.md5sum, connection.msg_def))
00511                     datatypes.add(connection.datatype)
00512                     
00513                 topics = sorted(set([c.topic for c in self._get_connections()]))
00514                 topic_datatypes    = {}
00515                 topic_conn_counts  = {}
00516                 topic_msg_counts   = {}
00517                 topic_freqs_median = {}
00518                 for topic in topics:
00519                     connections = list(self._get_connections(topic))
00520 
00521                     topic_datatypes[topic] = connections[0].datatype
00522                     topic_conn_counts[topic] = len(connections)
00523 
00524                     msg_count = 0
00525                     for connection in connections:
00526                         for chunk in self._chunks:
00527                             msg_count += chunk.connection_counts.get(connection.id, 0)
00528                     topic_msg_counts[topic] = msg_count
00529 
00530                     if self._connection_indexes_read:
00531                         stamps = [entry.time.to_sec() for entry in self._get_entries(connections)]
00532                         if len(stamps) > 1:
00533                             periods = [s1 - s0 for s1, s0 in zip(stamps[1:], stamps[:-1])]
00534                             med_period = _median(periods)
00535                             if med_period > 0.0:
00536                                 topic_freqs_median[topic] = 1.0 / med_period
00537 
00538                 topics = sorted(topic_datatypes.iterkeys())
00539                 max_topic_len       = max([len(topic) for topic in topics])
00540                 max_datatype_len    = max([len(datatype) for datatype in datatypes])
00541                 max_msg_count_len   = max([len('%d' % msg_count) for msg_count in topic_msg_counts.itervalues()])
00542                 max_freq_median_len = max([len(_human_readable_frequency(freq)) for freq in topic_freqs_median.itervalues()]) if len(topic_freqs_median) > 0 else 0
00543 
00544                 # Show datatypes       
00545                 for i, (datatype, md5sum, msg_def) in enumerate(sorted(datatype_infos)):
00546                     s = '%-*s [%s]' % (max_datatype_len, datatype, md5sum)
00547                     if i == 0:
00548                         rows.append(('types', s))
00549                     else:
00550                         rows.append(('', s))
00551                     
00552                 # Show topics
00553                 for i, topic in enumerate(topics):
00554                     topic_msg_count = topic_msg_counts[topic]
00555                     
00556                     s = '%-*s   %*d %s' % (max_topic_len, topic, max_msg_count_len, topic_msg_count, 'msgs' if topic_msg_count > 1 else 'msg ')
00557                     if topic in topic_freqs_median:
00558                         s += ' @ %*s' % (max_freq_median_len, _human_readable_frequency(topic_freqs_median[topic]))
00559                     else:
00560                         s += '   %*s' % (max_freq_median_len, '')
00561 
00562                     s += ' : %-*s' % (max_datatype_len, topic_datatypes[topic])
00563                     if topic_conn_counts[topic] > 1:
00564                         s += ' (%d connections)' % topic_conn_counts[topic]
00565         
00566                     if i == 0:
00567                         rows.append(('topics', s))
00568                     else:
00569                         rows.append(('', s))
00570         
00571         except Exception as ex:
00572             raise
00573 
00574         first_column_width = max([len(field) for field, _ in rows]) + 1
00575 
00576         s = ''
00577         for (field, value) in rows:
00578             if field:
00579                 s += '%-*s %s\n' % (first_column_width, field + ':', value)
00580             else:
00581                 s += '%-*s %s\n' % (first_column_width, '', value)
00582 
00583         return s.rstrip()
00584 
00585     def _get_yaml_info(self, key=None):
00586         s = ''
00587 
00588         try:
00589             if self._filename:
00590                 s += 'path: %s\n' % self._filename
00591 
00592             if self._version == 102 and type(self._reader) == _BagReader102_Unindexed:
00593                 s += 'version: 1.2 (unindexed)\n'
00594             else:
00595                 s += 'version: %d.%d\n' % (self._version / 100, self._version % 100)
00596 
00597             if not self._connection_indexes and not self._chunks:
00598                 s += 'size: %d\n' % self.size
00599                 s += 'indexed: False\n'
00600             else:
00601                 if self._chunks:
00602                     start_stamp = self._chunks[ 0].start_time.to_sec()
00603                     end_stamp   = self._chunks[-1].end_time.to_sec()
00604                 else:
00605                     start_stamp = min([index[ 0].time.to_sec() for index in self._connection_indexes.itervalues()])
00606                     end_stamp   = max([index[-1].time.to_sec() for index in self._connection_indexes.itervalues()])
00607                 
00608                 duration = end_stamp - start_stamp
00609                 s += 'duration: %.6f\n' % duration
00610                 s += 'start: %.6f\n' % start_stamp
00611                 s += 'end: %.6f\n' % end_stamp
00612                 s += 'size: %d\n' % self.size
00613                 if self._chunks:
00614                     num_messages = 0
00615                     for c in self._chunks:
00616                         for counts in c.connection_counts.itervalues():
00617                             num_messages += counts
00618                 else:
00619                     num_messages = sum([len(index) for index in self._connection_indexes.itervalues()])
00620                 s += 'messages: %d\n' % num_messages
00621                 s += 'indexed: True\n'
00622 
00623                 # Show compression information
00624                 if len(self._chunk_headers) == 0:
00625                     s += 'compression: none\n'
00626                 else:
00627                     compression_counts       = {}
00628                     compression_uncompressed = {}
00629                     compression_compressed   = {}
00630                     for chunk_header in self._chunk_headers.itervalues():
00631                         if chunk_header.compression not in compression_counts:
00632                             compression_counts[chunk_header.compression] = 1
00633                             compression_uncompressed[chunk_header.compression] = chunk_header.uncompressed_size
00634                             compression_compressed[chunk_header.compression] = chunk_header.compressed_size
00635                         else:
00636                             compression_counts[chunk_header.compression] += 1
00637                             compression_uncompressed[chunk_header.compression] += chunk_header.uncompressed_size
00638                             compression_compressed[chunk_header.compression] += chunk_header.compressed_size
00639     
00640                     chunk_count = len(self._chunk_headers)
00641     
00642                     main_compression_count, main_compression = list(reversed(sorted([(v, k) for k, v in compression_counts.items()])))[0]
00643                     s += 'compression: %s\n' % str(main_compression)
00644     
00645                     all_uncompressed = (sum([count for c, count in compression_counts.items() if c != Compression.NONE]) == 0)
00646                     if not all_uncompressed:    
00647                         s += 'uncompressed: %d\n' % sum((h.uncompressed_size for h in self._chunk_headers.itervalues()))
00648                         s += 'compressed: %d\n' % sum((h.compressed_size for h in self._chunk_headers.itervalues()))
00649 
00650                 datatypes = set()
00651                 datatype_infos = []
00652                 for connection in self._connections.itervalues():
00653                     if connection.datatype in datatypes:
00654                         continue
00655                     datatype_infos.append((connection.datatype, connection.md5sum, connection.msg_def))
00656                     datatypes.add(connection.datatype)
00657                     
00658                 topics = sorted(set([c.topic for c in self._get_connections()]))
00659                 topic_datatypes    = {}
00660                 topic_conn_counts  = {}
00661                 topic_msg_counts   = {}
00662                 topic_freqs_median = {}
00663                 for topic in topics:
00664                     connections = list(self._get_connections(topic))
00665 
00666                     topic_datatypes[topic] = connections[0].datatype
00667                     topic_conn_counts[topic] = len(connections)
00668 
00669                     msg_count = 0
00670                     for connection in connections:
00671                         for chunk in self._chunks:
00672                             msg_count += chunk.connection_counts.get(connection.id, 0)
00673                     topic_msg_counts[topic] = msg_count
00674 
00675                     if self._connection_indexes_read:
00676                         stamps = [entry.time.to_sec() for entry in self._get_entries(connections)]
00677                         if len(stamps) > 1:
00678                             periods = [s1 - s0 for s1, s0 in zip(stamps[1:], stamps[:-1])]
00679                             med_period = _median(periods)
00680                             if med_period > 0.0:
00681                                 topic_freqs_median[topic] = 1.0 / med_period
00682 
00683                 topics = sorted(topic_datatypes.iterkeys())
00684                 max_topic_len       = max([len(topic) for topic in topics])
00685                 max_datatype_len    = max([len(datatype) for datatype in datatypes])
00686                 max_msg_count_len   = max([len('%d' % msg_count) for msg_count in topic_msg_counts.itervalues()])
00687                 max_freq_median_len = max([len(_human_readable_frequency(freq)) for freq in topic_freqs_median.itervalues()]) if len(topic_freqs_median) > 0 else 0
00688 
00689                 # Show datatypes       
00690                 s += 'types:\n'
00691                 for i, (datatype, md5sum, msg_def) in enumerate(sorted(datatype_infos)):
00692                     s += '    - type: %s\n' % datatype
00693                     s += '      md5: %s\n' % md5sum
00694                     
00695                 # Show topics
00696                 s += 'topics:\n'
00697                 for i, topic in enumerate(topics):
00698                     topic_msg_count = topic_msg_counts[topic]
00699                     
00700                     s += '    - topic: %s\n' % topic
00701                     s += '      type: %s\n' % topic_datatypes[topic]
00702                     s += '      messages: %d\n' % topic_msg_count
00703                         
00704                     if topic_conn_counts[topic] > 1:
00705                         s += '      connections: %d\n' % topic_conn_counts[topic]
00706 
00707                     if topic in topic_freqs_median:
00708                         s += '      frequency: %.4f\n' % topic_freqs_median[topic]
00709 
00710             if not key:
00711                 return s
00712             
00713             class DictObject(object):
00714                 def __init__(self, d):
00715                     for a, b in d.items():
00716                         if isinstance(b, (list, tuple)):
00717                            setattr(self, a, [DictObject(x) if isinstance(x, dict) else x for x in b])
00718                         else:
00719                            setattr(self, a, DictObject(b) if isinstance(b, dict) else b)
00720 
00721             obj = DictObject(yaml.load(s))
00722             try:
00723                 val = eval('obj.' + key)
00724             except Exception as ex:
00725                 print('Error getting key "%s"' % key, file=sys.stderr)
00726                 return None
00727 
00728             def print_yaml(val, indent=0):
00729                 indent_str = '  ' * indent
00730 
00731                 if type(val) is list:
00732                     s = ''
00733                     for item in val:
00734                         s += '%s- %s\n' % (indent_str, print_yaml(item, indent + 1))
00735                     return s
00736                 elif type(val) is DictObject:
00737                     s = ''
00738                     for i, (k, v) in enumerate(val.__dict__.items()):
00739                         if i != 0:
00740                             s += indent_str
00741                         s += '%s: %s' % (k, str(v))
00742                         if i < len(val.__dict__) - 1:
00743                             s += '\n'
00744                     return s
00745                 else:
00746                     return indent_str + str(val)
00747 
00748             return print_yaml(val)
00749 
00750         except Exception as ex:
00751             raise
00752 
00753     ### Internal API ###
00754 
00755     @property
00756     def _has_compressed_chunks(self):
00757         if not self._chunk_headers:
00758             return False
00759 
00760         return any((h.compression != Compression.NONE for h in self._chunk_headers.itervalues()))
00761 
00762     @property
00763     def _uncompressed_size(self):
00764         if not self._chunk_headers:
00765             return self.size
00766 
00767         return sum((h.uncompressed_size for h in self._chunk_headers.itervalues()))
00768 
00769     def _read_message(self, position, raw=False):
00770         """
00771         Read the message from the given position in the file.
00772         """
00773         self.flush()
00774         return self._reader.seek_and_read_message_data_record(position, raw)
00775 
00776     # Index accessing
00777 
00778     def _get_connections(self, topics=None, connection_filter=None):
00779         """
00780         Yield the connections, optionally filtering by topic and/or connection information.
00781         """
00782         if topics:
00783             if type(topics) is str:
00784                 topics = set([roslib.names.canonicalize_name(topics)])
00785             else:
00786                 topics = set([roslib.names.canonicalize_name(t) for t in topics])
00787 
00788         for c in self._connections.itervalues():
00789             if topics and c.topic not in topics and roslib.names.canonicalize_name(c.topic) not in topics:
00790                 continue
00791             if connection_filter and not connection_filter(c.topic, c.datatype, c.md5sum, c.msg_def, c.header):
00792                 continue
00793             yield c
00794 
00795     def _get_entries(self, connections=None, start_time=None, end_time=None):
00796         """
00797         Yield index entries on the given connections in the given time range.
00798         """
00799         for entry, _ in _mergesort(self._get_indexes(connections), key=lambda entry: entry.time):
00800             if start_time and entry.time < start_time:
00801                 continue
00802             if end_time and entry.time > end_time:
00803                 return
00804             yield entry
00805 
00806     def _get_entries_reverse(self, connections=None, start_time=None, end_time=None):
00807         """
00808         Yield index entries on the given connections in the given time range in reverse order.
00809         """
00810         for entry, _ in _mergesort((reversed(index) for index in self._get_indexes(connections)), key=lambda entry: -entry.time.to_sec()):
00811             if end_time and entry.time > end_time:
00812                 continue
00813             if start_time and entry.time < start_time:
00814                 return
00815             yield entry
00816 
00817     def _get_entry(self, t, connections=None):
00818         """
00819         Return the first index entry on/before the given time on the given connections
00820         """
00821         indexes = self._get_indexes(connections)
00822 
00823         entry = _IndexEntry(t)
00824 
00825         first_entry = None
00826 
00827         for index in indexes:
00828             i = bisect.bisect_right(index, entry) - 1
00829             if i >= 0:
00830                 index_entry = index[i]
00831                 if first_entry is None or index_entry > first_entry:
00832                     first_entry = index_entry
00833 
00834         return first_entry
00835     
00836     def _get_entry_after(self, t, connections=None):
00837         """
00838         Return the first index entry after the given time on the given connections
00839         """
00840         indexes = self._get_indexes(connections)
00841 
00842         entry = _IndexEntry(t)
00843 
00844         first_entry = None
00845 
00846         for index in indexes:
00847             i = bisect.bisect_right(index, entry) 
00848             if i <= len(index) - 1:
00849                 index_entry = index[i]
00850                 if first_entry is None or index_entry < first_entry:
00851                     first_entry = index_entry
00852 
00853         return first_entry
00854 
00855     def _get_indexes(self, connections):
00856         """
00857         Get the indexes for the given connections.
00858         """
00859         if not self._connection_indexes_read:
00860             self._reader._read_connection_index_records()
00861 
00862         if connections is None:
00863             return self._connection_indexes.itervalues()
00864         else:
00865             return (self._connection_indexes[c.id] for c in connections)
00866 
00867     ### Implementation ###
00868 
00869     def _clear_index(self):
00870         self._connection_indexes_read = False
00871         self._connection_indexes      = {}    # id    -> IndexEntry[] (1.2+)
00872 
00873         self._topic_connections  = {}    # topic -> connection_id
00874         self._connections        = {}    # id -> ConnectionInfo
00875         self._connection_count   = 0     # (2.0)
00876         self._chunk_count        = 0     # (2.0)
00877         self._chunks             = []    # ChunkInfo[] (2.0)
00878         self._chunk_headers      = {}    # chunk_pos -> ChunkHeader (2.0)
00879 
00880         self._chunk_open                    = False
00881         self._curr_chunk_info               = None
00882         self._curr_chunk_data_pos           = None
00883         self._curr_chunk_connection_indexes = {}
00884     
00885     def _open(self, f, mode, allow_unindexed):
00886         if not f:
00887             raise ValueError('filename (or stream) is invalid')
00888 
00889         try:
00890             if   mode == 'r': self._open_read(f, allow_unindexed)
00891             elif mode == 'w': self._open_write(f)
00892             elif mode == 'a': self._open_append(f, allow_unindexed)
00893             else:
00894                 raise ValueError('mode "%s" is invalid' % mode)
00895         except struct.error:
00896             raise ROSBagFormatException('error with bag')
00897 
00898     def _open_read(self, f, allow_unindexed):
00899         if isinstance(f, file):
00900             self._file     = f
00901             self._filename = None
00902         else:
00903             self._file     = open(f, 'rb')
00904             self._filename = f        
00905 
00906         self._mode = 'r'
00907 
00908         try:
00909             self._version = self._read_version()
00910         except:
00911             self._version = None
00912             self._close_file()
00913             raise
00914 
00915         try:
00916             self._create_reader()
00917             self._reader.start_reading()
00918         except ROSBagUnindexedException as ex:
00919             if not allow_unindexed:
00920                 self._close_file()
00921                 raise
00922         except:
00923             self._close_file()
00924             raise
00925 
00926     def _open_write(self, f):
00927         if isinstance(f, file):
00928             self._file     = f
00929             self._filename = None
00930         else:
00931             self._file     = open(f, 'w+b')
00932             self._filename = f
00933 
00934         self._mode = 'w'
00935 
00936         self._version = 200
00937 
00938         try:
00939             self._create_reader()
00940             self._start_writing()
00941         except:
00942             self._close_file()
00943             raise
00944 
00945     def _open_append(self, f, allow_unindexed):
00946         if isinstance(f, file):
00947             self._file     = f
00948             self._filename = None
00949         else:        
00950             try:
00951                 # Test if the file already exists
00952                 open(f, 'r').close()
00953 
00954                 # File exists: open in read with update mode
00955                 self._file = open(f, 'r+b')
00956             except IOError:
00957                 # File doesn't exist: open in write mode
00958                 self._file = open(f, 'w+b')
00959         
00960             self._filename = f
00961 
00962         self._mode = 'a'
00963 
00964         try:
00965             self._version = self._read_version()
00966         except:
00967             self._version = None
00968             self._close_file()
00969             raise
00970 
00971         if self._version != 200:
00972             self._file.close()
00973             raise ROSBagException('bag version %d is unsupported for appending' % self._version)
00974 
00975         try:
00976             self._start_appending()
00977         except ROSBagUnindexedException:
00978             if not allow_unindexed:
00979                 self._close_file()
00980                 raise
00981         except:
00982             self._close_file()
00983             raise
00984 
00985     def _close_file(self):
00986         self._file.close()
00987         self._file = None
00988 
00989     def _create_reader(self):
00990         """
00991         @raise ROSBagException: if the bag version is unsupported
00992         """
00993         major_version, minor_version = self._version / 100, self._version % 100
00994         if major_version == 2:
00995             self._reader = _BagReader200(self)
00996         elif major_version == 1:
00997             if minor_version == 1:
00998                 raise ROSBagException('unsupported bag version %d. Please convert bag to newer format.' % self._version)
00999             else:
01000                 # Get the op code of the first record.  If it's FILE_HEADER, then we have an indexed 1.2 bag.
01001                 first_record_pos = self._file.tell()
01002                 header = _read_header(self._file)
01003                 op = _read_uint8_field(header, 'op')
01004                 self._file.seek(first_record_pos)
01005     
01006                 if op == _OP_FILE_HEADER:
01007                     self._reader = _BagReader102_Indexed(self)
01008                 else:
01009                     self._reader = _BagReader102_Unindexed(self)
01010         else:
01011             raise ROSBagException('unknown bag version %d' % self._version)
01012 
01013     def _read_version(self):
01014         """
01015         @raise ROSBagException: if the file is empty, or the version line can't be parsed
01016         """
01017         version_line = self._file.readline().rstrip()
01018         if len(version_line) == 0:
01019             raise ROSBagException('empty file')
01020         
01021         matches = re.match("#ROS(.*) V(\d).(\d)", version_line)
01022         if matches is None or len(matches.groups()) != 3:
01023             raise ROSBagException('This does not appear to be a bag file')
01024         
01025         version_type, major_version_str, minor_version_str = matches.groups()
01026 
01027         version = int(major_version_str) * 100 + int(minor_version_str)
01028         
01029         return version
01030 
01031     def _start_writing(self):        
01032         self._file.write(_VERSION + '\n')
01033         self._file_header_pos = self._file.tell()
01034         self._write_file_header_record(0, 0, 0)
01035 
01036     def _stop_writing(self):
01037         # Write the open chunk (if any) to file
01038         self.flush()
01039 
01040         # Remember this location as the start of the index
01041         self._index_data_pos = self._file.tell()
01042 
01043         # Write connection infos
01044         for connection_info in self._connections.itervalues():
01045             self._write_connection_record(connection_info)
01046 
01047         # Write chunk infos
01048         for chunk_info in self._chunks:
01049             self._write_chunk_info_record(chunk_info)
01050 
01051         # Re-write the file header
01052         self._file.seek(self._file_header_pos)
01053         self._write_file_header_record(self._index_data_pos, len(self._connections), len(self._chunks))
01054 
01055     def _start_appending(self):
01056         self._file_header_pos = self._file.tell()
01057 
01058         self._create_reader()
01059         self._reader.start_reading()
01060 
01061         # Truncate the file to chop off the index
01062         self._file.truncate(self._index_data_pos)
01063         self._reader.index_data_pos = 0
01064     
01065         # Rewrite the file header, clearing the index position (so we know if the index is invalid)
01066         self._file.seek(self._file_header_pos);
01067         self._write_file_header_record(0, 0, 0)
01068 
01069         # Seek to the end of the file
01070         self._file.seek(0, os.SEEK_END)
01071 
01072     def _start_writing_chunk(self, t):
01073         self._curr_chunk_info = _ChunkInfo(self._file.tell(), t, t)
01074         self._write_chunk_header(_ChunkHeader(self._compression, 0, 0))
01075         self._curr_chunk_data_pos = self._file.tell()
01076         self._set_compression_mode(self._compression)
01077         self._chunk_open = True
01078     
01079     def _get_chunk_offset(self):
01080         if self._compression == Compression.NONE:
01081             return self._file.tell() - self._curr_chunk_data_pos
01082         else:
01083             return self._output_file.compressed_bytes_in
01084 
01085     def _stop_writing_chunk(self):
01086         # Add this chunk to the index
01087         self._chunks.append(self._curr_chunk_info)
01088 
01089         # Get the uncompressed and compressed sizes
01090         uncompressed_size = self._get_chunk_offset()
01091         self._set_compression_mode(Compression.NONE)
01092         compressed_size = self._file.tell() - self._curr_chunk_data_pos
01093 
01094         # Rewrite the chunk header with the size of the chunk (remembering current offset)
01095         end_of_chunk_pos = self._file.tell()
01096         self._file.seek(self._curr_chunk_info.pos)
01097         chunk_header = _ChunkHeader(self._compression, compressed_size, uncompressed_size, self._curr_chunk_data_pos)
01098         self._write_chunk_header(chunk_header)
01099         self._chunk_headers[self._curr_chunk_info.pos] = chunk_header
01100 
01101         # Write out the connection indexes and clear them
01102         self._file.seek(end_of_chunk_pos)
01103         for connection_id, entries in self._curr_chunk_connection_indexes.items():
01104             self._write_connection_index_record(connection_id, entries)
01105         self._curr_chunk_connection_indexes.clear()
01106 
01107         # Flag that we're starting a new chunk
01108         self._chunk_open = False
01109 
01110     def _set_compression_mode(self, compression):
01111         # Flush the compressor, if needed
01112         if self._curr_compression != Compression.NONE:
01113             self._output_file.flush()
01114         
01115         # Create the compressor
01116         if compression == Compression.BZ2:
01117             self._output_file = _BZ2CompressorFileFacade(self._file)
01118         elif compression == Compression.NONE:
01119             self._output_file = self._file
01120         else:
01121             raise ROSBagException('unsupported compression type: %s' % compression)
01122 
01123         self._curr_compression = compression
01124 
01125     def _write_file_header_record(self, index_pos, connection_count, chunk_count):
01126         header = {
01127             'op':          _pack_uint8(_OP_FILE_HEADER),
01128             'index_pos':   _pack_uint64(index_pos),
01129             'conn_count':  _pack_uint32(connection_count),
01130             'chunk_count': _pack_uint32(chunk_count)
01131         }
01132         _write_record(self._file, header, padded_size=_FILE_HEADER_LENGTH)
01133 
01134     def _write_connection_record(self, connection_info):
01135         header = {
01136             'op':    _pack_uint8(_OP_CONNECTION),
01137             'topic': connection_info.topic,
01138             'conn':  _pack_uint32(connection_info.id)
01139         }
01140         
01141         _write_header(self._output_file, header)
01142         
01143         _write_header(self._output_file, connection_info.header)
01144 
01145     def _write_message_data_record(self, connection_id, t, serialized_bytes):
01146         header = {
01147             'op':   _pack_uint8(_OP_MSG_DATA),
01148             'conn': _pack_uint32(connection_id),
01149             'time': _pack_time(t)
01150         }
01151         _write_record(self._output_file, header, serialized_bytes)
01152 
01153     def _write_chunk_header(self, chunk_header):
01154         header = {
01155             'op':          _pack_uint8(_OP_CHUNK),
01156             'compression': chunk_header.compression,
01157             'size':        _pack_uint32(chunk_header.uncompressed_size)
01158         }
01159         _write_header(self._file, header)
01160 
01161         self._file.write(_pack_uint32(chunk_header.compressed_size))
01162 
01163     def _write_connection_index_record(self, connection_id, entries):        
01164         header = {
01165             'op':    _pack_uint8(_OP_INDEX_DATA),
01166             'conn':  _pack_uint32(connection_id),
01167             'ver':   _pack_uint32(_INDEX_VERSION),
01168             'count': _pack_uint32(len(entries))
01169         }
01170 
01171         buffer = self._buffer
01172         buffer.seek(0)
01173         buffer.truncate(0)            
01174         for entry in entries:
01175             buffer.write(_pack_time  (entry.time))
01176             buffer.write(_pack_uint32(entry.offset))
01177             
01178         _write_record(self._file, header, buffer.getvalue())            
01179 
01180     def _write_chunk_info_record(self, chunk_info):
01181         header = {
01182             'op':         _pack_uint8 (_OP_CHUNK_INFO),
01183             'ver':        _pack_uint32(_CHUNK_INDEX_VERSION),
01184             'chunk_pos':  _pack_uint64(chunk_info.pos),
01185             'start_time': _pack_time(chunk_info.start_time),
01186             'end_time':   _pack_time(chunk_info.end_time),
01187             'count':      _pack_uint32(len(chunk_info.connection_counts))
01188         }
01189         
01190         buffer = self._buffer
01191         buffer.seek(0)
01192         buffer.truncate(0)
01193         for connection_id, count in chunk_info.connection_counts.items():
01194             buffer.write(_pack_uint32(connection_id))
01195             buffer.write(_pack_uint32(count))
01196 
01197         _write_record(self._file, header, buffer.getvalue())    
01198 
01199 ### Implementation ###
01200 
01201 _message_types = {}   # md5sum -> type
01202 
01203 _OP_MSG_DEF     = 0x01
01204 _OP_MSG_DATA    = 0x02
01205 _OP_FILE_HEADER = 0x03
01206 _OP_INDEX_DATA  = 0x04
01207 _OP_CHUNK       = 0x05
01208 _OP_CHUNK_INFO  = 0x06
01209 _OP_CONNECTION  = 0x07
01210 
01211 _OP_CODES = {
01212     _OP_MSG_DEF:     'MSG_DEF',
01213     _OP_MSG_DATA:    'MSG_DATA',
01214     _OP_FILE_HEADER: 'FILE_HEADER',
01215     _OP_INDEX_DATA:  'INDEX_DATA',
01216     _OP_CHUNK:       'CHUNK',
01217     _OP_CHUNK_INFO:  'CHUNK_INFO',
01218     _OP_CONNECTION:  'CONNECTION'
01219 }
01220 
01221 _VERSION             = '#ROSBAG V2.0'
01222 _FILE_HEADER_LENGTH  = 4096
01223 _INDEX_VERSION       = 1
01224 _CHUNK_INDEX_VERSION = 1
01225 
01226 class _ConnectionInfo(object):
01227     def __init__(self, id, topic, header):
01228         try:
01229             datatype, md5sum, msg_def = header['type'], header['md5sum'], header['message_definition']
01230         except KeyError as ex:
01231             raise ROSBagFormatException('connection header field %s not found' % str(ex))
01232 
01233         self.id       = id
01234         self.topic    = topic
01235         self.datatype = datatype
01236         self.md5sum   = md5sum
01237         self.msg_def  = msg_def
01238         self.header   = header
01239 
01240     def __str__(self):
01241         return '%d on %s: %s' % (self.id, self.topic, str(self.header))
01242 
01243 class _ChunkInfo(object):
01244     def __init__(self, pos, start_time, end_time):
01245         self.pos        = pos
01246         self.start_time = start_time
01247         self.end_time   = end_time
01248         
01249         self.connection_counts = {}
01250 
01251     def __str__(self):
01252         s  = 'chunk_pos:   %d\n' % self.pos
01253         s += 'start_time:  %s\n' % str(self.start_time)
01254         s += 'end_time:    %s\n' % str(self.end_time)
01255         s += 'connections: %d\n' % len(self.connection_counts)
01256         s += '\n'.join(['  - [%4d] %d' % (connection_id, count) for connection_id, count in self.connection_counts.items()])
01257         return s
01258 
01259 class _ChunkHeader(object):
01260     def __init__(self, compression, compressed_size, uncompressed_size, data_pos=0):
01261         self.compression       = compression
01262         self.compressed_size   = compressed_size
01263         self.uncompressed_size = uncompressed_size
01264         self.data_pos          = data_pos
01265 
01266     def __str__(self):
01267         if self.uncompressed_size > 0:
01268             ratio = 100 * (float(self.compressed_size) / self.uncompressed_size)
01269             return 'compression: %s, size: %d, uncompressed: %d (%.2f%%)' % (self.compression, self.compressed_size, self.uncompressed_size, ratio)
01270         else:
01271             return 'compression: %s, size: %d, uncompressed: %d' % (self.compression, self.compressed_size, self.uncompressed_size)
01272 
01273 class _IndexEntry(object):
01274     def __init__(self, time):
01275         self.time = time
01276 
01277     def __cmp__(self, other):
01278         return self.time.__cmp__(other.time)
01279 
01280 class _IndexEntry102(_IndexEntry):
01281     def __init__(self, time, offset):
01282         self.time   = time
01283         self.offset = offset
01284         
01285     @property
01286     def position(self):
01287         return self.offset
01288         
01289     def __str__(self):
01290         return '%d.%d: %d' % (self.time.secs, self.time.nsecs, self.offset)
01291 
01292 class _IndexEntry200(_IndexEntry):
01293     def __init__(self, time, chunk_pos, offset):
01294         self.time      = time
01295         self.chunk_pos = chunk_pos
01296         self.offset    = offset
01297 
01298     @property
01299     def position(self):
01300         return (self.chunk_pos, self.offset)
01301 
01302     def __str__(self):
01303         return '%d.%d: %d+%d' % (self.time.secs, self.time.nsecs, self.chunk_pos, self.offset)
01304     
01305 def _get_message_type(info):
01306     message_type = _message_types.get(info.md5sum)
01307     if message_type is None:
01308         try:
01309             message_type = genpy.dynamic.generate_dynamic(info.datatype, info.msg_def)[info.datatype]
01310             if (message_type._md5sum != info.md5sum):
01311                 print('WARNING: For type [%s] stored md5sum [%s] does not match message definition [%s].\n  Try: "rosrun rosbag fix_msg_defs.py old_bag new_bag."'%(info.datatype, info.md5sum, message_type._md5sum), file=sys.stderr)
01312         except genmsg.InvalidMsgSpec:
01313             message_type = genpy.dynamic.generate_dynamic(info.datatype, "")[info.datatype]
01314             print('WARNING: For type [%s] stored md5sum [%s] has invalid message definition."'%(info.datatype, info.md5sum), file=sys.stderr)
01315         except genmsg.MsgGenerationException as ex:
01316             raise ROSBagException('Error generating datatype %s: %s' % (info.datatype, str(ex)))
01317 
01318         _message_types[info.md5sum] = message_type
01319 
01320     return message_type
01321 
01322 def _read_uint8 (f): return _unpack_uint8 (f.read(1))
01323 def _read_uint32(f): return _unpack_uint32(f.read(4))
01324 def _read_uint64(f): return _unpack_uint64(f.read(8))
01325 def _read_time  (f): return _unpack_time  (f.read(8))
01326 
01327 def _unpack_uint8(v):  return struct.unpack('<B', v)[0]
01328 def _unpack_uint32(v): return struct.unpack('<L', v)[0]
01329 def _unpack_uint64(v): return struct.unpack('<Q', v)[0]
01330 def _unpack_time(v):   return rospy.Time(*struct.unpack('<LL', v))
01331 
01332 def _pack_uint8(v):  return struct.pack('<B', v)
01333 def _pack_uint32(v): return struct.pack('<L', v)
01334 def _pack_uint64(v): return struct.pack('<Q', v)
01335 def _pack_time(v):   return _pack_uint32(v.secs) + _pack_uint32(v.nsecs)
01336 
01337 def _read(f, size):
01338     data = f.read(size)
01339     if len(data) != size:
01340         raise ROSBagException('expecting %d bytes, read %d' % (size, len(data)))   
01341     return data
01342 
01343 def _skip_record(f):
01344     _skip_sized(f)  # skip header
01345     _skip_sized(f)  # skip data
01346 
01347 def _skip_sized(f):
01348     size = _read_uint32(f)
01349     f.seek(size, os.SEEK_CUR)
01350 
01351 def _read_sized(f):
01352     try:
01353         size = _read_uint32(f)
01354     except struct.error as ex:
01355         raise ROSBagFormatException('error unpacking uint32: %s' % str(ex))
01356     return _read(f, size)
01357 
01358 def _write_sized(f, v):
01359     f.write(_pack_uint32(len(v)))
01360     f.write(v)
01361 
01362 def _read_field(header, field, unpack_fn):
01363     if field not in header:
01364         raise ROSBagFormatException('expected "%s" field in record' % field)
01365     
01366     try:
01367         value = unpack_fn(header[field])
01368     except Exception as ex:
01369         raise ROSBagFormatException('error reading field "%s": %s' % (field, str(ex)))
01370     
01371     return value
01372 
01373 def _read_str_field   (header, field): return _read_field(header, field, lambda v: v)
01374 def _read_uint8_field (header, field): return _read_field(header, field, _unpack_uint8)
01375 def _read_uint32_field(header, field): return _read_field(header, field, _unpack_uint32)
01376 def _read_uint64_field(header, field): return _read_field(header, field, _unpack_uint64)
01377 def _read_time_field  (header, field): return _read_field(header, field, _unpack_time)
01378 
01379 def _write_record(f, header, data='', padded_size=None):
01380     header_str = _write_header(f, header)
01381 
01382     if padded_size is not None:
01383         header_len = len(header_str)
01384         if header_len < padded_size:
01385             data = ' ' * (padded_size - header_len)
01386         else:
01387             data = ''
01388 
01389     _write_sized(f, data)
01390 
01391 def _write_header(f, header):
01392     header_str = ''.join([_pack_uint32(len(k) + 1 + len(v)) + k + '=' + v for k, v in header.items()])
01393     _write_sized(f, header_str)
01394     return header_str
01395 
01396 def _read_header(f, req_op=None):
01397     bag_pos = f.tell()
01398 
01399     # Read header
01400     try:
01401         header = _read_sized(f)
01402     except ROSBagException as ex:
01403         raise ROSBagFormatException('Error reading header: %s' % str(ex))
01404 
01405     # Parse header into a dict
01406     header_dict = {}
01407     while header != '':
01408         # Read size
01409         if len(header) < 4:
01410             raise ROSBagFormatException('Error reading header field')           
01411         (size,) = struct.unpack('<L', header[:4])                          # @todo reindex: catch struct.error
01412         header = header[4:]
01413 
01414         # Read bytes
01415         if len(header) < size:
01416             raise ROSBagFormatException('Error reading header field: expected %d bytes, read %d' % (size, len(header)))
01417         (name, sep, value) = header[:size].partition('=')
01418         if sep == '':
01419             raise ROSBagFormatException('Error reading header field')
01420 
01421         header_dict[name] = value                                          # @todo reindex: raise exception on empty name
01422         
01423         header = header[size:]
01424 
01425     # Check the op code of the header, if supplied
01426     if req_op is not None:
01427         op = _read_uint8_field(header_dict, 'op')
01428         if req_op != op:
01429             raise ROSBagFormatException('Expected op code: %s, got %s' % (_OP_CODES[req_op], _OP_CODES[op]))
01430 
01431     return header_dict
01432 
01433 def _peek_next_header_op(f):
01434     pos = f.tell()
01435     header = _read_header(f)
01436     op = _read_uint8_field(header, 'op')
01437     f.seek(pos)
01438     return op
01439 
01440 def _read_record_data(f):
01441     try:
01442         record_data = _read_sized(f)
01443     except ROSBagException as ex:
01444         raise ROSBagFormatException('Error reading record data: %s' % str(ex))
01445 
01446     return record_data
01447 
01448 class _BagReader(object):
01449     def __init__(self, bag):
01450         self.bag = bag
01451         
01452     def start_reading(self):
01453         raise NotImplementedError()
01454 
01455     def read_messages(self, topics, start_time, end_time, connection_filter, raw):
01456         raise NotImplementedError()
01457 
01458     def reindex(self):
01459         raise NotImplementedError()
01460 
01461 class _BagReader102_Unindexed(_BagReader):
01462     """
01463     Support class for reading unindexed v1.2 bag files.
01464     """
01465     def __init__(self, bag):
01466         _BagReader.__init__(self, bag)
01467         
01468     def start_reading(self):
01469         self.bag._file_header_pos = self.bag._file.tell()
01470 
01471     def reindex(self):
01472         """Generates all bag index information by rereading the message records."""
01473         f = self.bag._file
01474         
01475         total_bytes = self.bag.size
01476         
01477         # Re-read the file header to get to the start of the first message
01478         self.bag._file.seek(self.bag._file_header_pos)
01479 
01480         offset = f.tell()
01481 
01482         # Read message definition and data records
01483         while offset < total_bytes:
01484             yield offset
01485             
01486             op = _peek_next_header_op(f)
01487 
01488             if op == _OP_MSG_DEF:
01489                 connection_info = self.read_message_definition_record()
01490     
01491                 if connection_info.topic not in self.bag._topic_connections:
01492                     self.bag._topic_connections[connection_info.topic] = connection_info.id
01493                     self.bag._connections[connection_info.id]          = connection_info
01494                     self.bag._connection_indexes[connection_info.id]   = []
01495 
01496             elif op == _OP_MSG_DATA:
01497                 # Read the topic and timestamp from the header
01498                 header = _read_header(f)
01499                 
01500                 topic = _read_str_field(header, 'topic')
01501                 secs  = _read_uint32_field(header, 'sec')
01502                 nsecs = _read_uint32_field(header, 'nsec')
01503                 t = genpy.Time(secs, nsecs)
01504 
01505                 if topic not in self.bag._topic_connections:
01506                     datatype = _read_str_field(header, 'type')
01507                     self._create_connection_info_for_datatype(topic, datatype)
01508 
01509                 connection_id = self.bag._topic_connections[topic]
01510                 info = self.bag._connections[connection_id]
01511 
01512                 # Skip over the message content
01513                 _skip_sized(f)
01514 
01515                 # Insert the message entry (in order) into the connection index
01516                 bisect.insort_right(self.bag._connection_indexes[connection_id], _IndexEntry102(t, offset))
01517             
01518             offset = f.tell()
01519 
01520     def read_messages(self, topics, start_time, end_time, topic_filter, raw):
01521         f = self.bag._file
01522 
01523         f.seek(self.bag._file_header_pos)
01524 
01525         while True:
01526             # Read MSG_DEF records
01527             while True:
01528                 position = f.tell()
01529                 
01530                 try:
01531                     header = _read_header(f)
01532                 except Exception:
01533                     return
01534 
01535                 op = _read_uint8_field(header, 'op')
01536                 if op != _OP_MSG_DEF:
01537                     break
01538 
01539                 connection_info = self.read_message_definition_record(header)
01540                 
01541                 if connection_info.topic not in self.bag._topic_connections:
01542                     self.bag._topic_connections[connection_info.topic] = connection_info.id
01543 
01544                 self.bag._connections[connection_info.id] = connection_info
01545 
01546             # Check that we have a MSG_DATA record
01547             if op != _OP_MSG_DATA:
01548                 raise ROSBagFormatException('Expecting OP_MSG_DATA, got %d' % op)
01549 
01550             topic = _read_str_field(header, 'topic')
01551             
01552             if topic not in self.bag._topic_connections:
01553                 datatype = _read_str_field(header, 'type')
01554                 self._create_connection_info_for_datatype(topic, datatype)
01555 
01556             connection_id = self.bag._topic_connections[topic]
01557             info = self.bag._connections[connection_id]
01558     
01559             # Get the message type
01560             try:
01561                 msg_type = _get_message_type(info)
01562             except KeyError:
01563                 raise ROSBagException('Cannot deserialize messages of type [%s].  Message was not preceeded in bagfile by definition' % info.datatype)
01564 
01565             # Get the timestamp
01566             secs  = _read_uint32_field(header, 'sec')
01567             nsecs = _read_uint32_field(header, 'nsec')
01568             t = genpy.Time(secs, nsecs)
01569 
01570             # Read the message content
01571             data = _read_record_data(f)
01572             
01573             if raw:
01574                 msg = (info.datatype, data, info.md5sum, position, msg_type)
01575             else:
01576                 # Deserialize the message
01577                 msg = msg_type()
01578                 msg.deserialize(data)
01579 
01580             yield (topic, msg, t)
01581 
01582         self.bag._connection_indexes_read = True
01583 
01584     def _create_connection_info_for_datatype(self, topic, datatype):
01585         for c in self.bag._connections.itervalues():
01586             if c.datatype == datatype:
01587                 connection_id     = len(self.bag._connections)
01588                 connection_header = { 'topic' : topic, 'type' : c.header['type'], 'md5sum' : c.header['md5sum'], 'message_definition' : c.header['message_definition'] }
01589                 connection_info   = _ConnectionInfo(connection_id, topic, connection_header)
01590 
01591                 self.bag._topic_connections[topic]          = connection_id
01592                 self.bag._connections[connection_id]        = connection_info
01593                 self.bag._connection_indexes[connection_id] = []
01594                 return
01595 
01596         raise ROSBagFormatException('Topic %s of datatype %s not preceded by message definition' % (topic, datatype))
01597 
01598     def read_message_definition_record(self, header=None):
01599         if not header:
01600             header = _read_header(self.bag._file, _OP_MSG_DEF)
01601 
01602         topic    = _read_str_field(header, 'topic')
01603         datatype = _read_str_field(header, 'type')
01604         md5sum   = _read_str_field(header, 'md5')
01605         msg_def  = _read_str_field(header, 'def')
01606 
01607         _skip_sized(self.bag._file)  # skip the record data
01608 
01609         connection_header = { 'topic' : topic, 'type' : datatype, 'md5sum' : md5sum, 'message_definition' : msg_def }
01610 
01611         id = len(self.bag._connections)
01612 
01613         return _ConnectionInfo(id, topic, connection_header)
01614 
01615 class _BagReader102_Indexed(_BagReader102_Unindexed):
01616     """
01617     Support class for reading indexed v1.2 bag files.
01618     """
01619     def __init__(self, bag):
01620         _BagReader.__init__(self, bag)
01621 
01622     def read_messages(self, topics, start_time, end_time, connection_filter, raw):
01623         connections = self.bag._get_connections(topics, connection_filter)
01624         for entry in self.bag._get_entries(connections, start_time, end_time):
01625             yield self.seek_and_read_message_data_record(entry.offset, raw)
01626 
01627     def reindex(self):
01628         """Generates all bag index information by rereading the message records."""
01629         f = self.bag._file
01630         
01631         total_bytes = self.bag.size
01632         
01633         # Re-read the file header to get to the start of the first message
01634         self.bag._file.seek(self.bag._file_header_pos)
01635         self.read_file_header_record()
01636 
01637         offset = f.tell()
01638 
01639         # Read message definition and data records
01640         while offset < total_bytes:
01641             yield offset
01642             
01643             op = _peek_next_header_op(f)
01644 
01645             if op == _OP_MSG_DEF:
01646                 connection_info = self.read_message_definition_record()
01647     
01648                 if connection_info.topic not in self.bag._topic_connections:
01649                     self.bag._topic_connections[connection_info.topic] = connection_info.id
01650                     self.bag._connections[connection_info.id] = connection_info
01651                     self.bag._connection_indexes[connection_info.id] = []
01652 
01653             elif op == _OP_MSG_DATA:
01654                 # Read the topic and timestamp from the header
01655                 header = _read_header(f)
01656                 
01657                 topic = _read_str_field(header, 'topic')
01658                 secs  = _read_uint32_field(header, 'sec')
01659                 nsecs = _read_uint32_field(header, 'nsec')
01660                 t = genpy.Time(secs, nsecs)
01661 
01662                 if topic not in self.bag._topic_connections:
01663                     datatype = _read_str_field(header, 'type')
01664                     self._create_connection_info_for_datatype(topic, datatype)
01665 
01666                 connection_id = self.bag._topic_connections[topic]
01667                 info = self.bag._connections[connection_id]
01668 
01669                 # Skip over the message content
01670                 _skip_sized(f)
01671 
01672                 # Insert the message entry (in order) into the connection index
01673                 bisect.insort_right(self.bag._connection_indexes[connection_id], _IndexEntry102(t, offset))
01674 
01675             elif op == _OP_INDEX_DATA:
01676                 _skip_record(f)
01677 
01678             offset = f.tell()
01679 
01680     def start_reading(self):
01681         try:
01682             # Read the file header
01683             self.read_file_header_record()
01684             
01685             total_bytes = self.bag.size
01686     
01687             # Check if the index position has been written, i.e. the bag was closed successfully
01688             if self.bag._index_data_pos == 0:
01689                 raise ROSBagUnindexedException()
01690     
01691             # Seek to the beginning of the topic index records
01692             self.bag._file.seek(self.bag._index_data_pos)
01693 
01694             # Read the topic indexes
01695             topic_indexes = {}
01696             while True:
01697                 pos = self.bag._file.tell()
01698                 if pos >= total_bytes:
01699                     break
01700 
01701                 topic, index = self.read_topic_index_record()
01702 
01703                 topic_indexes[topic] = index
01704 
01705             # Read the message definition records (one for each topic)
01706             for topic, index in topic_indexes.items():
01707                 self.bag._file.seek(index[0].offset)
01708     
01709                 connection_info = self.read_message_definition_record()
01710     
01711                 if connection_info.topic not in self.bag._topic_connections:
01712                     self.bag._topic_connections[connection_info.topic] = connection_info.id
01713                 self.bag._connections[connection_info.id] = connection_info
01714     
01715                 self.bag._connection_indexes[connection_info.id] = index
01716 
01717             self.bag._connection_indexes_read = True
01718 
01719         except Exception as ex:
01720             raise ROSBagUnindexedException()
01721 
01722     def read_file_header_record(self):
01723         self.bag._file_header_pos = self.bag._file.tell()
01724 
01725         header = _read_header(self.bag._file, _OP_FILE_HEADER)
01726 
01727         self.bag._index_data_pos = _read_uint64_field(header, 'index_pos')
01728 
01729         _skip_sized(self.bag._file)  # skip the record data, i.e. padding
01730 
01731     def read_topic_index_record(self):
01732         f = self.bag._file
01733 
01734         header = _read_header(f, _OP_INDEX_DATA)
01735 
01736         index_version = _read_uint32_field(header, 'ver')
01737         topic         = _read_str_field   (header, 'topic')
01738         count         = _read_uint32_field(header, 'count')
01739         
01740         if index_version != 0:
01741             raise ROSBagFormatException('expecting index version 0, got %d' % index_version)
01742     
01743         _read_uint32(f) # skip the record data size
01744 
01745         topic_index = []
01746                 
01747         for i in range(count):
01748             time   = _read_time  (f)
01749             offset = _read_uint64(f)
01750             
01751             topic_index.append(_IndexEntry102(time, offset))
01752             
01753         return (topic, topic_index)
01754 
01755     def seek_and_read_message_data_record(self, position, raw):
01756         f = self.bag._file
01757 
01758         # Seek to the message position
01759         f.seek(position)
01760 
01761         # Skip any MSG_DEF records
01762         while True:
01763             header = _read_header(f)
01764             op = _read_uint8_field(header, 'op')
01765             if op != _OP_MSG_DEF:
01766                 break
01767             _skip_sized(f)
01768 
01769         # Check that we have a MSG_DATA record
01770         if op != _OP_MSG_DATA:
01771             raise ROSBagFormatException('Expecting OP_MSG_DATA, got %d' % op)
01772         
01773         topic = _read_str_field(header, 'topic')
01774 
01775         connection_id = self.bag._topic_connections[topic]
01776         info = self.bag._connections[connection_id]
01777 
01778         # Get the message type
01779         try:
01780             msg_type = _get_message_type(info)
01781         except KeyError:
01782             raise ROSBagException('Cannot deserialize messages of type [%s].  Message was not preceeded in bagfile by definition' % info.datatype)
01783 
01784         # Get the timestamp
01785         secs  = _read_uint32_field(header, 'sec')
01786         nsecs = _read_uint32_field(header, 'nsec')
01787         t = genpy.Time(secs, nsecs)
01788 
01789         # Read the message content
01790         data = _read_record_data(f)
01791         
01792         if raw:
01793             msg = info.datatype, data, info.md5sum, position, msg_type
01794         else:
01795             # Deserialize the message
01796             msg = msg_type()
01797             msg.deserialize(data)
01798         
01799         return (topic, msg, t)
01800 
01801 class _BagReader200(_BagReader):
01802     """
01803     Support class for reading v2.0 bag files.
01804     """
01805     def __init__(self, bag):
01806         _BagReader.__init__(self, bag)
01807         
01808         self.decompressed_chunk_pos = None
01809         self.decompressed_chunk     = None
01810         self.decompressed_chunk_io  = None
01811 
01812     def reindex(self):
01813         """
01814         Generates all bag index information by rereading the chunks.
01815         Assumes the file header has been read.
01816         """
01817         f = self.bag._file
01818 
01819         f.seek(0, os.SEEK_END)
01820         total_bytes = f.tell()
01821 
01822         # Read any connection records from after the chunk section.
01823         # This is to workaround a bug in rosbag record --split (fixed in r10390)
01824         # where connection records weren't always being written inside the chunk.
01825         self._read_terminal_connection_records()
01826 
01827         # Re-read the file header to get to the start of the first chunk
01828         self.bag._file.seek(self.bag._file_header_pos)
01829         self.read_file_header_record()
01830 
01831         trunc_pos = None
01832 
01833         while True:
01834             chunk_pos = f.tell()
01835             if chunk_pos >= total_bytes:
01836                 break
01837             
01838             yield chunk_pos
01839 
01840             try:
01841                 self._reindex_read_chunk(f, chunk_pos, total_bytes)
01842             except Exception as ex:
01843                 break
01844             
01845             trunc_pos = f.tell()
01846 
01847         if trunc_pos and trunc_pos < total_bytes:
01848             f.truncate(trunc_pos)
01849 
01850     def _reindex_read_chunk(self, f, chunk_pos, total_bytes):
01851         # Read the chunk header
01852         chunk_header = self.read_chunk_header()
01853 
01854         # If the chunk header size is 0, then the chunk wasn't correctly terminated - we're done
01855         if chunk_header.compressed_size == 0:
01856             raise ROSBagException('unterminated chunk at %d' % chunk_pos)
01857 
01858         if chunk_header.compression == Compression.NONE:
01859             chunk_file = f
01860         else:
01861             # Read the compressed chunk
01862             compressed_chunk = _read(f, chunk_header.compressed_size)
01863 
01864             # Decompress it
01865             if chunk_header.compression == Compression.BZ2:
01866                 self.decompressed_chunk = bz2.decompress(compressed_chunk)
01867             else:
01868                 raise ROSBagException('unsupported compression type: %s' % chunk_header.compression)
01869 
01870             if self.decompressed_chunk_io:
01871                 self.decompressed_chunk_io.close()
01872             self.decompressed_chunk_io = StringIO(self.decompressed_chunk)
01873 
01874             chunk_file = self.decompressed_chunk_io
01875 
01876         # Read chunk connection and message records
01877         self.bag._curr_chunk_info = None
01878 
01879         if chunk_header.compression == Compression.NONE:
01880             offset = chunk_file.tell() - chunk_pos
01881         else:
01882             offset = chunk_file.tell()
01883 
01884         expected_index_length = 0
01885 
01886         while offset < chunk_header.uncompressed_size:
01887             op = _peek_next_header_op(chunk_file)
01888 
01889             if op == _OP_CONNECTION:
01890                 connection_info = self.read_connection_record(chunk_file)
01891 
01892                 if connection_info.id not in self.bag._connections:
01893                     self.bag._connections[connection_info.id] = connection_info
01894                 if connection_info.id not in self.bag._connection_indexes:
01895                     self.bag._connection_indexes[connection_info.id] = []
01896 
01897             elif op == _OP_MSG_DATA:
01898                 # Read the connection id and timestamp from the header
01899                 header = _read_header(chunk_file)
01900 
01901                 connection_id = _read_uint32_field(header, 'conn')
01902                 t             = _read_time_field  (header, 'time')
01903 
01904                 # Update the chunk info with this timestamp
01905                 if not self.bag._curr_chunk_info:
01906                     self.bag._curr_chunk_info = _ChunkInfo(chunk_pos, t, t)
01907                 else:
01908                     if t > self.bag._curr_chunk_info.end_time:
01909                         self.bag._curr_chunk_info.end_time = t
01910                     elif t < self.bag._curr_chunk_info.start_time:
01911                         self.bag._curr_chunk_info.start_time = t
01912                 if connection_id in self.bag._curr_chunk_info.connection_counts:
01913                     self.bag._curr_chunk_info.connection_counts[connection_id] += 1
01914                 else:
01915                     self.bag._curr_chunk_info.connection_counts[connection_id] = 1
01916 
01917                 # Skip over the message content
01918                 _skip_sized(chunk_file)
01919 
01920                 # Insert the message entry (in order) into the connection index
01921                 if connection_id not in self.bag._connection_indexes:
01922                     raise ROSBagException('connection id (id=%d) in chunk at position %d not preceded by connection record' % (connection_id, chunk_pos))
01923                 bisect.insort_right(self.bag._connection_indexes[connection_id], _IndexEntry200(t, chunk_pos, offset)) 
01924 
01925                 expected_index_length += 1
01926 
01927             else:
01928                 # Unknown record type so skip
01929                 _skip_record(chunk_file)
01930 
01931             if chunk_header.compression == Compression.NONE:
01932                 offset = chunk_file.tell() - chunk_pos
01933             else:
01934                 offset = chunk_file.tell()
01935 
01936         # Skip over index records, connection records and chunk info records
01937         next_op = _peek_next_header_op(f)
01938         
01939         total_index_length = 0
01940         
01941         while next_op != _OP_CHUNK:
01942             if next_op == _OP_INDEX_DATA:
01943                 # Bug workaround: C Turtle bags (pre-1.1.15) were written with an incorrect data length
01944                 _, index = self.read_connection_index_record()
01945                 total_index_length += len(index)
01946             else:
01947                 _skip_record(f)
01948 
01949             if f.tell() >= total_bytes:
01950                 if total_index_length != expected_index_length:
01951                     raise ROSBagException('index shorter than expected (%d vs %d)' % (total_index_length, expected_index_length))
01952                 break
01953 
01954             next_op = _peek_next_header_op(f)
01955 
01956         # Chunk was read correctly - store info
01957         self.bag._chunk_headers[chunk_pos] = chunk_header
01958         self.bag._chunks.append(self.bag._curr_chunk_info)
01959 
01960     def _read_terminal_connection_records(self):
01961         b, f, r = self.bag, self.bag._file, self.bag._reader
01962 
01963         # Seek to the first record after FILE_HEADER
01964         f.seek(b._file_header_pos)
01965         r.read_file_header_record()
01966 
01967         # Advance to the first CONNECTION
01968         if self._advance_to_next_record(_OP_CONNECTION):
01969             # Read the CONNECTION records
01970             while True:
01971                 connection_info = r.read_connection_record(f)
01972 
01973                 b._connections[connection_info.id] = connection_info
01974                 b._connection_indexes[connection_info.id] = []
01975 
01976                 next_op = _peek_next_header_op(f)
01977                 if next_op != _OP_CONNECTION:
01978                     break
01979 
01980     def _advance_to_next_record(self, op):
01981         b, f = self.bag, self.bag._file
01982 
01983         try:
01984             while True:
01985                 next_op = _peek_next_header_op(f)
01986                 if next_op == op:
01987                     break
01988 
01989                 if next_op == _OP_INDEX_DATA:
01990                     # Workaround the possible invalid data length in INDEX_DATA records
01991 
01992                     # read_connection_index_record() requires _curr_chunk_info is set
01993                     if b._curr_chunk_info is None:
01994                         b._curr_chunk_info = _ChunkInfo(0, rospy.Time(0, 1), rospy.Time(0, 1))
01995 
01996                     b._reader.read_connection_index_record()
01997                 else:
01998                     _skip_record(f)
01999 
02000             return True
02001 
02002         except Exception as ex:
02003             return False
02004 
02005     def start_reading(self):
02006         try:
02007             # Read the file header
02008             self.read_file_header_record()
02009     
02010             # Check if the index position has been written, i.e. the bag was closed successfully
02011             if self.bag._index_data_pos == 0:
02012                 raise ROSBagUnindexedException()
02013     
02014             # Seek to the end of the chunks
02015             self.bag._file.seek(self.bag._index_data_pos)
02016 
02017             # Read the connection records
02018             self.bag._connection_indexes = {}
02019             for i in range(self.bag._connection_count):
02020                 connection_info = self.read_connection_record(self.bag._file)
02021                 self.bag._connections[connection_info.id] = connection_info
02022                 self.bag._connection_indexes[connection_info.id] = []
02023 
02024             # Read the chunk info records
02025             self.bag._chunks = [self.read_chunk_info_record() for i in range(self.bag._chunk_count)]
02026     
02027             # Read the chunk headers
02028             self.bag._chunk_headers = {}
02029             for chunk_info in self.bag._chunks:
02030                 self.bag._file.seek(chunk_info.pos)
02031                 self.bag._chunk_headers[chunk_info.pos] = self.read_chunk_header()
02032 
02033             if not self.bag._skip_index:
02034                 self._read_connection_index_records()
02035 
02036         except Exception as ex:
02037             raise ROSBagUnindexedException()
02038 
02039     def _read_connection_index_records(self):
02040         for chunk_info in self.bag._chunks:
02041             self.bag._file.seek(chunk_info.pos)
02042             _skip_record(self.bag._file)
02043 
02044             self.bag._curr_chunk_info = chunk_info
02045             for i in range(len(chunk_info.connection_counts)):
02046                 connection_id, index = self.read_connection_index_record()
02047                 self.bag._connection_indexes[connection_id].extend(index)
02048 
02049         # Remove any connections with no entries
02050         # This is a workaround for a bug where connection records were being written for
02051         # connections which had no messages in the bag
02052         orphan_connection_ids = [id for id, index in self.bag._connection_indexes.iteritems() if not index]
02053         for id in orphan_connection_ids:
02054             del self.bag._connections[id]
02055             del self.bag._connection_indexes[id]
02056 
02057         self.bag._connection_indexes_read = True
02058 
02059     def read_messages(self, topics, start_time, end_time, connection_filter, raw):
02060         connections = self.bag._get_connections(topics, connection_filter)
02061         for entry in self.bag._get_entries(connections, start_time, end_time):
02062             yield self.seek_and_read_message_data_record((entry.chunk_pos, entry.offset), raw)
02063 
02064     ###
02065 
02066     def read_file_header_record(self):
02067         self.bag._file_header_pos = self.bag._file.tell()
02068 
02069         header = _read_header(self.bag._file, _OP_FILE_HEADER)
02070 
02071         self.bag._index_data_pos   = _read_uint64_field(header, 'index_pos')
02072         self.bag._chunk_count      = _read_uint32_field(header, 'chunk_count')
02073         self.bag._connection_count = _read_uint32_field(header, 'conn_count')
02074 
02075         _skip_sized(self.bag._file)  # skip over the record data, i.e. padding
02076 
02077     def read_connection_record(self, f):
02078         header = _read_header(f, _OP_CONNECTION)
02079 
02080         conn_id = _read_uint32_field(header, 'conn')
02081         topic   = _read_str_field   (header, 'topic')
02082 
02083         connection_header = _read_header(f)
02084 
02085         return _ConnectionInfo(conn_id, topic, connection_header)
02086 
02087     def read_chunk_info_record(self):
02088         f = self.bag._file
02089         
02090         header = _read_header(f, _OP_CHUNK_INFO)
02091 
02092         chunk_info_version = _read_uint32_field(header, 'ver')
02093         
02094         if chunk_info_version == 1:
02095             chunk_pos        = _read_uint64_field(header, 'chunk_pos')
02096             start_time       = _read_time_field  (header, 'start_time')
02097             end_time         = _read_time_field  (header, 'end_time')
02098             connection_count = _read_uint32_field(header, 'count') 
02099 
02100             chunk_info = _ChunkInfo(chunk_pos, start_time, end_time)
02101 
02102             _read_uint32(f)   # skip the record data size
02103 
02104             for i in range(connection_count):
02105                 connection_id = _read_uint32(f)
02106                 count         = _read_uint32(f)
02107     
02108                 chunk_info.connection_counts[connection_id] = count
02109                 
02110             return chunk_info
02111         else:
02112             raise ROSBagFormatException('Unknown chunk info record version: %d' % chunk_info_version)
02113 
02114     def read_chunk_header(self):
02115         header = _read_header(self.bag._file, _OP_CHUNK)
02116 
02117         compression       = _read_str_field   (header, 'compression')
02118         uncompressed_size = _read_uint32_field(header, 'size')
02119 
02120         compressed_size = _read_uint32(self.bag._file)  # read the record data size
02121         
02122         data_pos = self.bag._file.tell()
02123 
02124         return _ChunkHeader(compression, compressed_size, uncompressed_size, data_pos)
02125 
02126     def read_connection_index_record(self):
02127         f = self.bag._file
02128 
02129         header = _read_header(f, _OP_INDEX_DATA)
02130         
02131         index_version = _read_uint32_field(header, 'ver')
02132         connection_id = _read_uint32_field(header, 'conn')
02133         count         = _read_uint32_field(header, 'count')
02134         
02135         if index_version != 1:
02136             raise ROSBagFormatException('expecting index version 1, got %d' % index_version)
02137     
02138         record_size = _read_uint32(f) # skip the record data size
02139 
02140         index = []
02141                 
02142         for i in range(count):
02143             time   = _read_time  (f)
02144             offset = _read_uint32(f)
02145             
02146             index.append(_IndexEntry200(time, self.bag._curr_chunk_info.pos, offset))
02147 
02148         return (connection_id, index)
02149 
02150     def seek_and_read_message_data_record(self, position, raw):
02151         chunk_pos, offset = position
02152 
02153         chunk_header = self.bag._chunk_headers.get(chunk_pos)
02154         if chunk_header is None:
02155             raise ROSBagException('no chunk at position %d' % chunk_pos)
02156 
02157         if chunk_header.compression == Compression.NONE:
02158             f = self.bag._file
02159             f.seek(chunk_header.data_pos + offset)
02160         else:
02161             if self.decompressed_chunk_pos != chunk_pos:
02162                 # Seek to the chunk data, read and decompress
02163                 self.bag._file.seek(chunk_header.data_pos)
02164                 compressed_chunk = _read(self.bag._file, chunk_header.compressed_size)
02165 
02166                 if chunk_header.compression == Compression.BZ2:
02167                     self.decompressed_chunk = bz2.decompress(compressed_chunk)
02168                 else:
02169                     raise ROSBagException('unsupported compression type: %s' % chunk_header.compression)
02170                 
02171                 self.decompressed_chunk_pos = chunk_pos
02172 
02173                 if self.decompressed_chunk_io:
02174                     self.decompressed_chunk_io.close()
02175                 self.decompressed_chunk_io = StringIO(self.decompressed_chunk)
02176 
02177             f = self.decompressed_chunk_io
02178             f.seek(offset)
02179 
02180         # Skip any CONNECTION records
02181         while True:
02182             header = _read_header(f)
02183             op = _read_uint8_field(header, 'op')
02184             if op != _OP_CONNECTION:
02185                 break
02186             _skip_sized(f)
02187 
02188         # Check that we have a MSG_DATA record
02189         if op != _OP_MSG_DATA:
02190             raise ROSBagFormatException('Expecting OP_MSG_DATA, got %d' % op)
02191 
02192         connection_id = _read_uint32_field(header, 'conn')
02193         t             = _read_time_field  (header, 'time')
02194 
02195         # Get the message type
02196         connection_info = self.bag._connections[connection_id]
02197         try:
02198             msg_type = _get_message_type(connection_info)
02199         except KeyError:
02200             raise ROSBagException('Cannot deserialize messages of type [%s].  Message was not preceded in bag by definition' % connection_info.datatype)
02201 
02202         # Read the message content
02203         data = _read_record_data(f)
02204         
02205         # Deserialize the message
02206         if raw:
02207             msg = connection_info.datatype, data, connection_info.md5sum, (chunk_pos, offset), msg_type
02208         else:
02209             msg = msg_type()
02210             msg.deserialize(data)
02211         
02212         return (connection_info.topic, msg, t)
02213 
02214 def _time_to_str(secs):
02215     secs_frac = secs - int(secs) 
02216     secs_frac_str = ('%.2f' % secs_frac)[1:]
02217 
02218     return time.strftime('%b %d %Y %H:%M:%S', time.localtime(secs)) + secs_frac_str
02219 
02220 def _human_readable_size(size):
02221     multiple = 1024.0
02222     for suffix in ['KB', 'MB', 'GB', 'TB', 'PB', 'EB', 'ZB', 'YB']:
02223         size /= multiple
02224         if size < multiple:
02225             return '%.1f %s' % (size, suffix)
02226 
02227     return '-'
02228 
02229 def _human_readable_frequency(freq):
02230     multiple = 1000.0
02231     for suffix in ['Hz', 'kHz', 'MHz', 'GHz', 'THz', 'PHz', 'EHz', 'ZHz', 'YHz']:
02232         if freq < multiple:
02233             return '%.1f %s' % (freq, suffix)
02234         freq /= multiple
02235 
02236     return '-'
02237 
02238 ## See http://code.activestate.com/recipes/511509
02239 def _mergesort(list_of_lists, key=None):
02240     """
02241     Perform an N-way merge operation on sorted lists.
02242 
02243     @param list_of_lists: (really iterable of iterable) of sorted elements
02244     (either by naturally or by C{key})
02245     @param key: specify sort key function (like C{sort()}, C{sorted()})
02246     @param iterfun: function that returns an iterator.
02247 
02248     Yields tuples of the form C{(item, iterator)}, where the iterator is the
02249     built-in list iterator or something you pass in, if you pre-generate the
02250     iterators.
02251 
02252     This is a stable merge; complexity O(N lg N)
02253 
02254     Examples::
02255 
02256     print list(x[0] for x in mergesort([[1,2,3,4],
02257                                         [2,3.5,3.7,4.5,6,7],
02258                                         [2.6,3.6,6.6,9]]))
02259     [1, 2, 2, 2.6, 3, 3.5, 3.6, 3.7, 4, 4.5, 6, 6.6, 7, 9]
02260 
02261     # note stability
02262     print list(x[0] for x in mergesort([[1,2,3,4],
02263                                         [2,3.5,3.7,4.5,6,7],
02264                                         [2.6,3.6,6.6,9]], key=int))
02265     [1, 2, 2, 2.6, 3, 3.5, 3.6, 3.7, 4, 4.5, 6, 6.6, 7, 9]
02266 
02267     print list(x[0] for x in mergesort([[4,3,2,1],
02268                                         [7,6.5,4,3.7,3.3,1.9],
02269                                         [9,8.6,7.6,6.6,5.5,4.4,3.3]],
02270                                         key=lambda x: -x))
02271     [9, 8.6, 7.6, 7, 6.6, 6.5, 5.5, 4.4, 4, 4, 3.7, 3.3, 3.3, 3, 2, 1.9, 1]
02272     """
02273 
02274     heap = []
02275     for i, itr in enumerate(iter(pl) for pl in list_of_lists):
02276         try:
02277             item = itr.next()
02278             toadd = (key(item), i, item, itr) if key else (item, i, itr)
02279             heap.append(toadd)
02280         except StopIteration:
02281             pass
02282     heapq.heapify(heap)
02283 
02284     if key:
02285         while heap:
02286             _, idx, item, itr = heap[0]
02287             yield item, itr
02288             try:
02289                 item = itr.next()
02290                 heapq.heapreplace(heap, (key(item), idx, item, itr) )
02291             except StopIteration:
02292                 heapq.heappop(heap)
02293 
02294     else:
02295         while heap:
02296             item, idx, itr = heap[0]
02297             yield item, itr
02298             try:
02299                 heapq.heapreplace(heap, (itr.next(), idx, itr))
02300             except StopIteration:
02301                 heapq.heappop(heap)
02302 
02303 class _BZ2CompressorFileFacade(object):
02304     """
02305     A file facade for the bz2.BZ2Compressor.
02306     """
02307     def __init__(self, file):
02308         self.file                = file
02309         self.compressor          = bz2.BZ2Compressor()
02310         self.compressed_bytes_in = 0
02311     
02312     def write(self, data):
02313         compressed = self.compressor.compress(data)
02314         if len(compressed) > 0:
02315             self.file.write(compressed)
02316         self.compressed_bytes_in += len(data)
02317     
02318     def flush(self):
02319         compressed = self.compressor.flush()
02320         if len(compressed) > 0:
02321             self.file.write(compressed)
02322 
02323 def _median(values):
02324     values_len = len(values)
02325     if values_len == 0:
02326         return float('nan')
02327 
02328     sorted_values = sorted(values)
02329     if values_len % 2 == 1:
02330         return sorted_values[(values_len + 1) / 2 - 1]
02331 
02332     lower = sorted_values[values_len / 2 - 1]
02333     upper = sorted_values[values_len / 2]
02334 
02335     return float(lower + upper) / 2


rosbag
Author(s): Tim Field, Jeremy Leibs, James Bowman
autogenerated on Fri Aug 28 2015 12:33:52