00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033 """
00034 The rosbag Python API.
00035
00036 Provides serialization of bag files.
00037 """
00038
00039 from __future__ import print_function
00040
00041 import bisect
00042 import bz2
00043 import heapq
00044 import os
00045 import re
00046 import struct
00047 import sys
00048 import threading
00049 import time
00050 import yaml
00051
00052 try:
00053 from cStringIO import StringIO
00054 except ImportError:
00055 from io import StringIO
00056
00057 import genmsg
00058 import genpy
00059 import genpy.dynamic
00060 import genpy.message
00061
00062 import roslib.names
00063 import rospy
00064
00065 class ROSBagException(Exception):
00066 """
00067 Base class for exceptions in rosbag.
00068 """
00069 def __init__(self, value):
00070 self.value = value
00071
00072 def __str__(self):
00073 return self.value
00074
00075 class ROSBagFormatException(ROSBagException):
00076 """
00077 Exceptions for errors relating to the bag file format.
00078 """
00079 def __init__(self, value):
00080 ROSBagException.__init__(self, value)
00081
00082 class ROSBagUnindexedException(ROSBagException):
00083 """
00084 Exception for unindexed bags.
00085 """
00086 def __init__(self):
00087 ROSBagException.__init__(self, 'Unindexed bag')
00088
00089 class Compression:
00090 """
00091 Allowable compression types
00092 """
00093 NONE = 'none'
00094 BZ2 = 'bz2'
00095
00096 class Bag(object):
00097 """
00098 Bag serialize messages to and from a single file on disk using the bag format.
00099 """
00100 def __init__(self, f, mode='r', compression=Compression.NONE, chunk_threshold=768 * 1024, allow_unindexed=False, options=None, skip_index=False):
00101 """
00102 Open a bag file. The mode can be 'r', 'w', or 'a' for reading (default),
00103 writing or appending. The file will be created if it doesn't exist
00104 when opened for writing or appending; it will be truncated when opened
00105 for writing. Simultaneous reading and writing is allowed when in writing
00106 or appending mode.
00107 @param f: filename of bag to open or a stream to read from
00108 @type f: str or file
00109 @param mode: mode, either 'r', 'w', or 'a'
00110 @type mode: str
00111 @param compression: compression mode, see U{rosbag.Compression} for valid modes
00112 @type compression: str
00113 @param chunk_threshold: minimum number of uncompressed bytes per chunk
00114 @type chunk_threshold: int
00115 @param allow_unindexed: if True, allow opening unindexed bags
00116 @type allow_unindexed: bool
00117 @param options: the bag options (currently: compression and chunk_threshold)
00118 @type options: dict
00119 @param skip_index: if True, don't read the connection index records on open [2.0+]
00120 @type skip_index: bool
00121 @raise ValueError: if any argument is invalid
00122 @raise ROSBagException: if an error occurs opening file
00123 @raise ROSBagFormatException: if bag format is corrupted
00124 """
00125 if options is not None:
00126 if type(options) is not dict:
00127 raise ValueError('options must be of type dict')
00128 if 'compression' in options:
00129 compression = options['compression']
00130 if 'chunk_threshold' in options:
00131 chunk_threshold = options['chunk_threshold']
00132
00133 self._file = None
00134 self._filename = None
00135 self._version = None
00136
00137 allowed_compressions = [Compression.NONE, Compression.BZ2]
00138 if compression not in allowed_compressions:
00139 raise ValueError('compression must be one of: %s' % ', '.join(allowed_compressions))
00140 self._compression = compression
00141
00142 if chunk_threshold < 0:
00143 raise ValueError('chunk_threshold must be greater than or equal to zero')
00144 self._chunk_threshold = chunk_threshold
00145
00146 self._skip_index = skip_index
00147
00148 self._reader = None
00149
00150 self._file_header_pos = None
00151 self._index_data_pos = 0 # (1.2+)
00152
00153 self._clear_index()
00154
00155 self._buffer = StringIO()
00156
00157 self._curr_compression = Compression.NONE
00158
00159 self._open(f, mode, allow_unindexed)
00160
00161 self._output_file = self._file
00162
00163 def __iter__(self):
00164 return self.read_messages()
00165
00166 def __enter__(self):
00167 return self
00168
00169 def __exit__(self, exc_type, exc_value, traceback):
00170 self.close()
00171
00172 @property
00173 def options(self):
00174 """Get the options."""
00175 return { 'compression' : self._compression, 'chunk_threshold' : self._chunk_threshold }
00176
00177 @property
00178 def filename(self):
00179 """Get the filename."""
00180 return self._filename
00181
00182 @property
00183 def version(self):
00184 """Get the version."""
00185 return self._version
00186
00187 @property
00188 def mode(self):
00189 """Get the mode."""
00190 return self._mode
00191
00192 @property
00193 def size(self):
00194 """Get the size in bytes."""
00195 if not self._file:
00196 raise ValueError('I/O operation on closed bag')
00197
00198 pos = self._file.tell()
00199 self._file.seek(0, os.SEEK_END)
00200 size = self._file.tell()
00201 self._file.seek(pos)
00202 return size
00203
00204 # compression
00205
00206 def _get_compression(self):
00207 """Get the compression method to use for writing."""
00208 return self._compression
00209
00210 def _set_compression(self, compression):
00211 """Set the compression method to use for writing."""
00212 allowed_compressions = [Compression.NONE, Compression.BZ2]
00213 if compression not in allowed_compressions:
00214 raise ValueError('compression must be one of: %s' % ', '.join(allowed_compressions))
00215
00216 self.flush()
00217 self._compression = compression
00218
00219 compression = property(_get_compression, _set_compression)
00220
00221 # chunk_threshold
00222
00223 def _get_chunk_threshold(self):
00224 """Get the chunk threshold to use for writing."""
00225 return self._chunk_threshold
00226
00227 def _set_chunk_threshold(self, chunk_threshold):
00228 """Set the chunk threshold to use for writing."""
00229 if chunk_threshold < 0:
00230 raise ValueError('chunk_threshold must be greater than or equal to zero')
00231
00232 self.flush()
00233 self._chunk_threshold = chunk_threshold
00234
00235 chunk_threshold = property(_get_chunk_threshold, _set_chunk_threshold)
00236
00237 def read_messages(self, topics=None, start_time=None, end_time=None, connection_filter=None, raw=False):
00238 """
00239 Read messages from the bag, optionally filtered by topic, timestamp and connection details.
00240 @param topics: list of topics or a single topic [optional]
00241 @type topics: list(str) or str
00242 @param start_time: earliest timestamp of message to return [optional]
00243 @type start_time: U{genpy.Time}
00244 @param end_time: latest timestamp of message to return [optional]
00245 @type end_time: U{genpy.Time}
00246 @param connection_filter: function to filter connections to include [optional]
00247 @type connection_filter: function taking (topic, datatype, md5sum, msg_def, header) and returning bool
00248 @param raw: if True, then generate tuples of (datatype, (data, md5sum, position), pytype)
00249 @type raw: bool
00250 @return: generator of (topic, message, timestamp) tuples for each message in the bag file
00251 @rtype: generator of tuples of (str, U{genpy.Message}, U{genpy.Time}) [not raw] or (str, (str, str, str, tuple, class), U{genpy.Time}) [raw]
00252 """
00253 self.flush()
00254
00255 if topics and type(topics) is str:
00256 topics = [topics]
00257
00258 return self._reader.read_messages(topics, start_time, end_time, connection_filter, raw)
00259
00260 def flush(self):
00261 """
00262 Write the open chunk to disk so subsequent reads will read all messages.
00263 @raise ValueError: if bag is closed
00264 """
00265 if not self._file:
00266 raise ValueError('I/O operation on closed bag')
00267
00268 if self._chunk_open:
00269 self._stop_writing_chunk()
00270
00271 def write(self, topic, msg, t=None, raw=False):
00272 """
00273 Write a message to the bag.
00274 @param topic: name of topic
00275 @type topic: str
00276 @param msg: message to add to bag, or tuple (if raw)
00277 @type msg: Message or tuple of raw message data
00278 @param t: ROS time of message publication, if None specifed, use current time [optional]
00279 @type t: U{genpy.Time}
00280 @param raw: if True, msg is in raw format, i.e. (msg_type, serialized_bytes, md5sum, pytype)
00281 @type raw: bool
00282 @raise ValueError: if arguments are invalid or bag is closed
00283 """
00284 if not self._file:
00285 raise ValueError('I/O operation on closed bag')
00286
00287 if not topic:
00288 raise ValueError('topic is invalid')
00289 if not msg:
00290 raise ValueError('msg is invalid')
00291
00292 if t is None:
00293 t = genpy.Time.from_sec(time.time())
00294
00295 # Seek to end (in case previous operation was a read)
00296 self._file.seek(0, os.SEEK_END)
00297
00298 # Open a chunk, if needed
00299 if not self._chunk_open:
00300 self._start_writing_chunk(t)
00301
00302 # Unpack raw
00303 if raw:
00304 if len(msg) == 5:
00305 msg_type, serialized_bytes, md5sum, pos, pytype = msg
00306 elif len(msg) == 4:
00307 msg_type, serialized_bytes, md5sum, pytype = msg
00308 else:
00309 raise ValueError('msg must be of length 4 or 5')
00310
00311 # Write connection record, if necessary (currently using a connection per topic; ignoring message connection header)
00312 if topic in self._topic_connections:
00313 connection_info = self._topic_connections[topic]
00314 conn_id = connection_info.id
00315 else:
00316 conn_id = len(self._connections)
00317
00318 if raw:
00319 if pytype is None:
00320 try:
00321 pytype = genpy.message.get_message_class(msg_type)
00322 except Exception:
00323 pytype = None
00324 if pytype is None:
00325 raise ROSBagException('cannot locate message class and no message class provided for [%s]' % msg_type)
00326
00327 if pytype._md5sum != md5sum:
00328 print('WARNING: md5sum of loaded type [%s] does not match that specified' % msg_type, file=sys.stderr)
00329 #raise ROSBagException('md5sum of loaded type does not match that of data being recorded')
00330
00331 header = { 'topic' : topic, 'type' : msg_type, 'md5sum' : md5sum, 'message_definition' : pytype._full_text }
00332 else:
00333 header = { 'topic' : topic, 'type' : msg.__class__._type, 'md5sum' : msg.__class__._md5sum, 'message_definition' : msg._full_text }
00334
00335 connection_info = _ConnectionInfo(conn_id, topic, header)
00336
00337 self._write_connection_record(connection_info)
00338
00339 self._connections[conn_id] = connection_info
00340 self._topic_connections[topic] = connection_info
00341
00342 # Create an index entry
00343 index_entry = _IndexEntry200(t, self._curr_chunk_info.pos, self._get_chunk_offset())
00344
00345 # Update the indexes and current chunk info
00346 if conn_id not in self._curr_chunk_connection_indexes:
00347 # This is the first message on this connection in the chunk
00348 self._curr_chunk_connection_indexes[conn_id] = [index_entry]
00349 self._curr_chunk_info.connection_counts[conn_id] = 1
00350 else:
00351 curr_chunk_connection_index = self._curr_chunk_connection_indexes[conn_id]
00352 if index_entry >= curr_chunk_connection_index[-1]:
00353 # Test if we're writing chronologically. Can skip binary search if so.
00354 curr_chunk_connection_index.append(index_entry)
00355 else:
00356 bisect.insort_right(curr_chunk_connection_index, index_entry)
00357
00358 self._curr_chunk_info.connection_counts[conn_id] += 1
00359
00360 if conn_id not in self._connection_indexes:
00361 self._connection_indexes[conn_id] = [index_entry]
00362 else:
00363 bisect.insort_right(self._connection_indexes[conn_id], index_entry)
00364
00365 # Update the chunk start/end times
00366 if t > self._curr_chunk_info.end_time:
00367 self._curr_chunk_info.end_time = t
00368 elif t < self._curr_chunk_info.start_time:
00369 self._curr_chunk_info.start_time = t
00370
00371 if not raw:
00372 # Serialize the message to the buffer
00373 self._buffer.seek(0)
00374 self._buffer.truncate(0)
00375 msg.serialize(self._buffer)
00376 serialized_bytes = self._buffer.getvalue()
00377
00378 # Write message data record
00379 self._write_message_data_record(conn_id, t, serialized_bytes)
00380
00381 # Check if we want to stop this chunk
00382 chunk_size = self._get_chunk_offset()
00383 if chunk_size > self._chunk_threshold:
00384 self._stop_writing_chunk()
00385
00386 def reindex(self):
00387 """
00388 Reindexes the bag file. Yields position of each chunk for progress.
00389 """
00390 self._clear_index()
00391 return self._reader.reindex()
00392
00393 def close(self):
00394 """
00395 Close the bag file. Closing an already closed bag does nothing.
00396 """
00397 if self._file:
00398 if self._mode in 'wa':
00399 self._stop_writing()
00400
00401 self._close_file()
00402
00403 def __str__(self):
00404 rows = []
00405
00406 try:
00407 if self._filename:
00408 rows.append(('path', self._filename))
00409
00410 if self._version == 102 and type(self._reader) == _BagReader102_Unindexed:
00411 rows.append(('version', '1.2 (unindexed)'))
00412 else:
00413 rows.append(('version', '%d.%d' % (self._version / 100, self._version % 100)))
00414
00415 if not self._connection_indexes and not self._chunks:
00416 rows.append(('size', _human_readable_size(self.size)))
00417 else:
00418 if self._chunks:
00419 start_stamp = self._chunks[ 0].start_time.to_sec()
00420 end_stamp = self._chunks[-1].end_time.to_sec()
00421 else:
00422 start_stamp = min([index[ 0].time.to_sec() for index in self._connection_indexes.itervalues()])
00423 end_stamp = max([index[-1].time.to_sec() for index in self._connection_indexes.itervalues()])
00424
00425 # Show duration
00426 duration = end_stamp - start_stamp
00427 dur_secs = duration % 60
00428 dur_mins = int(duration / 60)
00429 dur_hrs = int(dur_mins / 60)
00430 if dur_hrs > 0:
00431 dur_mins = dur_mins % 60
00432 duration_str = '%dhr %d:%02ds (%ds)' % (dur_hrs, dur_mins, dur_secs, duration)
00433 elif dur_mins > 0:
00434 duration_str = '%d:%02ds (%ds)' % (dur_mins, dur_secs, duration)
00435 else:
00436 duration_str = '%.1fs' % duration
00437
00438 rows.append(('duration', duration_str))
00439
00440 # Show start and end times
00441 rows.append(('start', '%s (%.2f)' % (_time_to_str(start_stamp), start_stamp)))
00442 rows.append(('end', '%s (%.2f)' % (_time_to_str(end_stamp), end_stamp)))
00443
00444 rows.append(('size', _human_readable_size(self.size)))
00445
00446 if self._chunks:
00447 num_messages = 0
00448 for c in self._chunks:
00449 for counts in c.connection_counts.itervalues():
00450 num_messages += counts
00451 else:
00452 num_messages = sum([len(index) for index in self._connection_indexes.itervalues()])
00453 rows.append(('messages', str(num_messages)))
00454
00455 # Show compression information
00456 if len(self._chunk_headers) == 0:
00457 rows.append(('compression', 'none'))
00458 else:
00459 compression_counts = {}
00460 compression_uncompressed = {}
00461 compression_compressed = {}
00462 for chunk_header in self._chunk_headers.itervalues():
00463 if chunk_header.compression not in compression_counts:
00464 compression_counts[chunk_header.compression] = 1
00465 compression_uncompressed[chunk_header.compression] = chunk_header.uncompressed_size
00466 compression_compressed[chunk_header.compression] = chunk_header.compressed_size
00467 else:
00468 compression_counts[chunk_header.compression] += 1
00469 compression_uncompressed[chunk_header.compression] += chunk_header.uncompressed_size
00470 compression_compressed[chunk_header.compression] += chunk_header.compressed_size
00471
00472 chunk_count = len(self._chunk_headers)
00473
00474 compressions = []
00475 for count, compression in reversed(sorted([(v, k) for k, v in compression_counts.items()])):
00476 if compression != Compression.NONE:
00477 fraction = (100.0 * compression_compressed[compression]) / compression_uncompressed[compression]
00478 compressions.append('%s [%d/%d chunks; %.2f%%]' % (compression, count, chunk_count, fraction))
00479 else:
00480 compressions.append('%s [%d/%d chunks]' % (compression, count, chunk_count))
00481 rows.append(('compression', ', '.join(compressions)))
00482
00483 all_uncompressed = (sum([count for c, count in compression_counts.items() if c != Compression.NONE]) == 0)
00484 if not all_uncompressed:
00485 total_uncompressed_size = sum((h.uncompressed_size for h in self._chunk_headers.itervalues()))
00486 total_compressed_size = sum((h.compressed_size for h in self._chunk_headers.itervalues()))
00487
00488 total_uncompressed_size_str = _human_readable_size(total_uncompressed_size)
00489 total_compressed_size_str = _human_readable_size(total_compressed_size)
00490 total_size_str_length = max([len(total_uncompressed_size_str), len(total_compressed_size_str)])
00491
00492 if duration > 0:
00493 uncompressed_rate_str = _human_readable_size(total_uncompressed_size / duration)
00494 compressed_rate_str = _human_readable_size(total_compressed_size / duration)
00495 rate_str_length = max([len(uncompressed_rate_str), len(compressed_rate_str)])
00496
00497 rows.append(('uncompressed', '%*s @ %*s/s' %
00498 (total_size_str_length, total_uncompressed_size_str, rate_str_length, uncompressed_rate_str)))
00499 rows.append(('compressed', '%*s @ %*s/s (%.2f%%)' %
00500 (total_size_str_length, total_compressed_size_str, rate_str_length, compressed_rate_str, (100.0 * total_compressed_size) / total_uncompressed_size)))
00501 else:
00502 rows.append(('uncompressed', '%*s' % (total_size_str_length, total_uncompressed_size_str)))
00503 rows.append(('compressed', '%*s' % (total_size_str_length, total_compressed_size_str)))
00504
00505 datatypes = set()
00506 datatype_infos = []
00507 for connection in self._connections.itervalues():
00508 if connection.datatype in datatypes:
00509 continue
00510 datatype_infos.append((connection.datatype, connection.md5sum, connection.msg_def))
00511 datatypes.add(connection.datatype)
00512
00513 topics = sorted(set([c.topic for c in self._get_connections()]))
00514 topic_datatypes = {}
00515 topic_conn_counts = {}
00516 topic_msg_counts = {}
00517 topic_freqs_median = {}
00518 for topic in topics:
00519 connections = list(self._get_connections(topic))
00520
00521 topic_datatypes[topic] = connections[0].datatype
00522 topic_conn_counts[topic] = len(connections)
00523
00524 msg_count = 0
00525 for connection in connections:
00526 for chunk in self._chunks:
00527 msg_count += chunk.connection_counts.get(connection.id, 0)
00528 topic_msg_counts[topic] = msg_count
00529
00530 if self._connection_indexes_read:
00531 stamps = [entry.time.to_sec() for entry in self._get_entries(connections)]
00532 if len(stamps) > 1:
00533 periods = [s1 - s0 for s1, s0 in zip(stamps[1:], stamps[:-1])]
00534 med_period = _median(periods)
00535 if med_period > 0.0:
00536 topic_freqs_median[topic] = 1.0 / med_period
00537
00538 topics = sorted(topic_datatypes.iterkeys())
00539 max_topic_len = max([len(topic) for topic in topics])
00540 max_datatype_len = max([len(datatype) for datatype in datatypes])
00541 max_msg_count_len = max([len('%d' % msg_count) for msg_count in topic_msg_counts.itervalues()])
00542 max_freq_median_len = max([len(_human_readable_frequency(freq)) for freq in topic_freqs_median.itervalues()]) if len(topic_freqs_median) > 0 else 0
00543
00544 # Show datatypes
00545 for i, (datatype, md5sum, msg_def) in enumerate(sorted(datatype_infos)):
00546 s = '%-*s [%s]' % (max_datatype_len, datatype, md5sum)
00547 if i == 0:
00548 rows.append(('types', s))
00549 else:
00550 rows.append(('', s))
00551
00552 # Show topics
00553 for i, topic in enumerate(topics):
00554 topic_msg_count = topic_msg_counts[topic]
00555
00556 s = '%-*s %*d %s' % (max_topic_len, topic, max_msg_count_len, topic_msg_count, 'msgs' if topic_msg_count > 1 else 'msg ')
00557 if topic in topic_freqs_median:
00558 s += ' @ %*s' % (max_freq_median_len, _human_readable_frequency(topic_freqs_median[topic]))
00559 else:
00560 s += ' %*s' % (max_freq_median_len, '')
00561
00562 s += ' : %-*s' % (max_datatype_len, topic_datatypes[topic])
00563 if topic_conn_counts[topic] > 1:
00564 s += ' (%d connections)' % topic_conn_counts[topic]
00565
00566 if i == 0:
00567 rows.append(('topics', s))
00568 else:
00569 rows.append(('', s))
00570
00571 except Exception as ex:
00572 raise
00573
00574 first_column_width = max([len(field) for field, _ in rows]) + 1
00575
00576 s = ''
00577 for (field, value) in rows:
00578 if field:
00579 s += '%-*s %s\n' % (first_column_width, field + ':', value)
00580 else:
00581 s += '%-*s %s\n' % (first_column_width, '', value)
00582
00583 return s.rstrip()
00584
00585 def _get_yaml_info(self, key=None):
00586 s = ''
00587
00588 try:
00589 if self._filename:
00590 s += 'path: %s\n' % self._filename
00591
00592 if self._version == 102 and type(self._reader) == _BagReader102_Unindexed:
00593 s += 'version: 1.2 (unindexed)\n'
00594 else:
00595 s += 'version: %d.%d\n' % (self._version / 100, self._version % 100)
00596
00597 if not self._connection_indexes and not self._chunks:
00598 s += 'size: %d\n' % self.size
00599 s += 'indexed: False\n'
00600 else:
00601 if self._chunks:
00602 start_stamp = self._chunks[ 0].start_time.to_sec()
00603 end_stamp = self._chunks[-1].end_time.to_sec()
00604 else:
00605 start_stamp = min([index[ 0].time.to_sec() for index in self._connection_indexes.itervalues()])
00606 end_stamp = max([index[-1].time.to_sec() for index in self._connection_indexes.itervalues()])
00607
00608 duration = end_stamp - start_stamp
00609 s += 'duration: %.6f\n' % duration
00610 s += 'start: %.6f\n' % start_stamp
00611 s += 'end: %.6f\n' % end_stamp
00612 s += 'size: %d\n' % self.size
00613 if self._chunks:
00614 num_messages = 0
00615 for c in self._chunks:
00616 for counts in c.connection_counts.itervalues():
00617 num_messages += counts
00618 else:
00619 num_messages = sum([len(index) for index in self._connection_indexes.itervalues()])
00620 s += 'messages: %d\n' % num_messages
00621 s += 'indexed: True\n'
00622
00623 # Show compression information
00624 if len(self._chunk_headers) == 0:
00625 s += 'compression: none\n'
00626 else:
00627 compression_counts = {}
00628 compression_uncompressed = {}
00629 compression_compressed = {}
00630 for chunk_header in self._chunk_headers.itervalues():
00631 if chunk_header.compression not in compression_counts:
00632 compression_counts[chunk_header.compression] = 1
00633 compression_uncompressed[chunk_header.compression] = chunk_header.uncompressed_size
00634 compression_compressed[chunk_header.compression] = chunk_header.compressed_size
00635 else:
00636 compression_counts[chunk_header.compression] += 1
00637 compression_uncompressed[chunk_header.compression] += chunk_header.uncompressed_size
00638 compression_compressed[chunk_header.compression] += chunk_header.compressed_size
00639
00640 chunk_count = len(self._chunk_headers)
00641
00642 main_compression_count, main_compression = list(reversed(sorted([(v, k) for k, v in compression_counts.items()])))[0]
00643 s += 'compression: %s\n' % str(main_compression)
00644
00645 all_uncompressed = (sum([count for c, count in compression_counts.items() if c != Compression.NONE]) == 0)
00646 if not all_uncompressed:
00647 s += 'uncompressed: %d\n' % sum((h.uncompressed_size for h in self._chunk_headers.itervalues()))
00648 s += 'compressed: %d\n' % sum((h.compressed_size for h in self._chunk_headers.itervalues()))
00649
00650 datatypes = set()
00651 datatype_infos = []
00652 for connection in self._connections.itervalues():
00653 if connection.datatype in datatypes:
00654 continue
00655 datatype_infos.append((connection.datatype, connection.md5sum, connection.msg_def))
00656 datatypes.add(connection.datatype)
00657
00658 topics = sorted(set([c.topic for c in self._get_connections()]))
00659 topic_datatypes = {}
00660 topic_conn_counts = {}
00661 topic_msg_counts = {}
00662 topic_freqs_median = {}
00663 for topic in topics:
00664 connections = list(self._get_connections(topic))
00665
00666 topic_datatypes[topic] = connections[0].datatype
00667 topic_conn_counts[topic] = len(connections)
00668
00669 msg_count = 0
00670 for connection in connections:
00671 for chunk in self._chunks:
00672 msg_count += chunk.connection_counts.get(connection.id, 0)
00673 topic_msg_counts[topic] = msg_count
00674
00675 if self._connection_indexes_read:
00676 stamps = [entry.time.to_sec() for entry in self._get_entries(connections)]
00677 if len(stamps) > 1:
00678 periods = [s1 - s0 for s1, s0 in zip(stamps[1:], stamps[:-1])]
00679 med_period = _median(periods)
00680 if med_period > 0.0:
00681 topic_freqs_median[topic] = 1.0 / med_period
00682
00683 topics = sorted(topic_datatypes.iterkeys())
00684 max_topic_len = max([len(topic) for topic in topics])
00685 max_datatype_len = max([len(datatype) for datatype in datatypes])
00686 max_msg_count_len = max([len('%d' % msg_count) for msg_count in topic_msg_counts.itervalues()])
00687 max_freq_median_len = max([len(_human_readable_frequency(freq)) for freq in topic_freqs_median.itervalues()]) if len(topic_freqs_median) > 0 else 0
00688
00689 # Show datatypes
00690 s += 'types:\n'
00691 for i, (datatype, md5sum, msg_def) in enumerate(sorted(datatype_infos)):
00692 s += ' - type: %s\n' % datatype
00693 s += ' md5: %s\n' % md5sum
00694
00695 # Show topics
00696 s += 'topics:\n'
00697 for i, topic in enumerate(topics):
00698 topic_msg_count = topic_msg_counts[topic]
00699
00700 s += ' - topic: %s\n' % topic
00701 s += ' type: %s\n' % topic_datatypes[topic]
00702 s += ' messages: %d\n' % topic_msg_count
00703
00704 if topic_conn_counts[topic] > 1:
00705 s += ' connections: %d\n' % topic_conn_counts[topic]
00706
00707 if topic in topic_freqs_median:
00708 s += ' frequency: %.4f\n' % topic_freqs_median[topic]
00709
00710 if not key:
00711 return s
00712
00713 class DictObject(object):
00714 def __init__(self, d):
00715 for a, b in d.items():
00716 if isinstance(b, (list, tuple)):
00717 setattr(self, a, [DictObject(x) if isinstance(x, dict) else x for x in b])
00718 else:
00719 setattr(self, a, DictObject(b) if isinstance(b, dict) else b)
00720
00721 obj = DictObject(yaml.load(s))
00722 try:
00723 val = eval('obj.' + key)
00724 except Exception as ex:
00725 print('Error getting key "%s"' % key, file=sys.stderr)
00726 return None
00727
00728 def print_yaml(val, indent=0):
00729 indent_str = ' ' * indent
00730
00731 if type(val) is list:
00732 s = ''
00733 for item in val:
00734 s += '%s- %s\n' % (indent_str, print_yaml(item, indent + 1))
00735 return s
00736 elif type(val) is DictObject:
00737 s = ''
00738 for i, (k, v) in enumerate(val.__dict__.items()):
00739 if i != 0:
00740 s += indent_str
00741 s += '%s: %s' % (k, str(v))
00742 if i < len(val.__dict__) - 1:
00743 s += '\n'
00744 return s
00745 else:
00746 return indent_str + str(val)
00747
00748 return print_yaml(val)
00749
00750 except Exception as ex:
00751 raise
00752
00753 ### Internal API ###
00754
00755 @property
00756 def _has_compressed_chunks(self):
00757 if not self._chunk_headers:
00758 return False
00759
00760 return any((h.compression != Compression.NONE for h in self._chunk_headers.itervalues()))
00761
00762 @property
00763 def _uncompressed_size(self):
00764 if not self._chunk_headers:
00765 return self.size
00766
00767 return sum((h.uncompressed_size for h in self._chunk_headers.itervalues()))
00768
00769 def _read_message(self, position, raw=False):
00770 """
00771 Read the message from the given position in the file.
00772 """
00773 self.flush()
00774 return self._reader.seek_and_read_message_data_record(position, raw)
00775
00776 # Index accessing
00777
00778 def _get_connections(self, topics=None, connection_filter=None):
00779 """
00780 Yield the connections, optionally filtering by topic and/or connection information.
00781 """
00782 if topics:
00783 if type(topics) is str:
00784 topics = set([roslib.names.canonicalize_name(topics)])
00785 else:
00786 topics = set([roslib.names.canonicalize_name(t) for t in topics])
00787
00788 for c in self._connections.itervalues():
00789 if topics and c.topic not in topics and roslib.names.canonicalize_name(c.topic) not in topics:
00790 continue
00791 if connection_filter and not connection_filter(c.topic, c.datatype, c.md5sum, c.msg_def, c.header):
00792 continue
00793 yield c
00794
00795 def _get_entries(self, connections=None, start_time=None, end_time=None):
00796 """
00797 Yield index entries on the given connections in the given time range.
00798 """
00799 for entry, _ in _mergesort(self._get_indexes(connections), key=lambda entry: entry.time):
00800 if start_time and entry.time < start_time:
00801 continue
00802 if end_time and entry.time > end_time:
00803 return
00804 yield entry
00805
00806 def _get_entries_reverse(self, connections=None, start_time=None, end_time=None):
00807 """
00808 Yield index entries on the given connections in the given time range in reverse order.
00809 """
00810 for entry, _ in _mergesort((reversed(index) for index in self._get_indexes(connections)), key=lambda entry: -entry.time.to_sec()):
00811 if end_time and entry.time > end_time:
00812 continue
00813 if start_time and entry.time < start_time:
00814 return
00815 yield entry
00816
00817 def _get_entry(self, t, connections=None):
00818 """
00819 Return the first index entry on/before the given time on the given connections
00820 """
00821 indexes = self._get_indexes(connections)
00822
00823 entry = _IndexEntry(t)
00824
00825 first_entry = None
00826
00827 for index in indexes:
00828 i = bisect.bisect_right(index, entry) - 1
00829 if i >= 0:
00830 index_entry = index[i]
00831 if first_entry is None or index_entry > first_entry:
00832 first_entry = index_entry
00833
00834 return first_entry
00835
00836 def _get_entry_after(self, t, connections=None):
00837 """
00838 Return the first index entry after the given time on the given connections
00839 """
00840 indexes = self._get_indexes(connections)
00841
00842 entry = _IndexEntry(t)
00843
00844 first_entry = None
00845
00846 for index in indexes:
00847 i = bisect.bisect_right(index, entry)
00848 if i <= len(index) - 1:
00849 index_entry = index[i]
00850 if first_entry is None or index_entry < first_entry:
00851 first_entry = index_entry
00852
00853 return first_entry
00854
00855 def _get_indexes(self, connections):
00856 """
00857 Get the indexes for the given connections.
00858 """
00859 if not self._connection_indexes_read:
00860 self._reader._read_connection_index_records()
00861
00862 if connections is None:
00863 return self._connection_indexes.itervalues()
00864 else:
00865 return (self._connection_indexes[c.id] for c in connections)
00866
00867 ### Implementation ###
00868
00869 def _clear_index(self):
00870 self._connection_indexes_read = False
00871 self._connection_indexes = {} # id -> IndexEntry[] (1.2+)
00872
00873 self._topic_connections = {} # topic -> connection_id
00874 self._connections = {} # id -> ConnectionInfo
00875 self._connection_count = 0 # (2.0)
00876 self._chunk_count = 0 # (2.0)
00877 self._chunks = [] # ChunkInfo[] (2.0)
00878 self._chunk_headers = {} # chunk_pos -> ChunkHeader (2.0)
00879
00880 self._chunk_open = False
00881 self._curr_chunk_info = None
00882 self._curr_chunk_data_pos = None
00883 self._curr_chunk_connection_indexes = {}
00884
00885 def _open(self, f, mode, allow_unindexed):
00886 if not f:
00887 raise ValueError('filename (or stream) is invalid')
00888
00889 try:
00890 if mode == 'r': self._open_read(f, allow_unindexed)
00891 elif mode == 'w': self._open_write(f)
00892 elif mode == 'a': self._open_append(f, allow_unindexed)
00893 else:
00894 raise ValueError('mode "%s" is invalid' % mode)
00895 except struct.error:
00896 raise ROSBagFormatException('error with bag')
00897
00898 def _open_read(self, f, allow_unindexed):
00899 if isinstance(f, file):
00900 self._file = f
00901 self._filename = None
00902 else:
00903 self._file = open(f, 'rb')
00904 self._filename = f
00905
00906 self._mode = 'r'
00907
00908 try:
00909 self._version = self._read_version()
00910 except:
00911 self._version = None
00912 self._close_file()
00913 raise
00914
00915 try:
00916 self._create_reader()
00917 self._reader.start_reading()
00918 except ROSBagUnindexedException as ex:
00919 if not allow_unindexed:
00920 self._close_file()
00921 raise
00922 except:
00923 self._close_file()
00924 raise
00925
00926 def _open_write(self, f):
00927 if isinstance(f, file):
00928 self._file = f
00929 self._filename = None
00930 else:
00931 self._file = open(f, 'w+b')
00932 self._filename = f
00933
00934 self._mode = 'w'
00935
00936 self._version = 200
00937
00938 try:
00939 self._create_reader()
00940 self._start_writing()
00941 except:
00942 self._close_file()
00943 raise
00944
00945 def _open_append(self, f, allow_unindexed):
00946 if isinstance(f, file):
00947 self._file = f
00948 self._filename = None
00949 else:
00950 try:
00951
00952 open(f, 'r').close()
00953
00954
00955 self._file = open(f, 'r+b')
00956 except IOError:
00957
00958 self._file = open(f, 'w+b')
00959
00960 self._filename = f
00961
00962 self._mode = 'a'
00963
00964 try:
00965 self._version = self._read_version()
00966 except:
00967 self._version = None
00968 self._close_file()
00969 raise
00970
00971 if self._version != 200:
00972 self._file.close()
00973 raise ROSBagException('bag version %d is unsupported for appending' % self._version)
00974
00975 try:
00976 self._start_appending()
00977 except ROSBagUnindexedException:
00978 if not allow_unindexed:
00979 self._close_file()
00980 raise
00981 except:
00982 self._close_file()
00983 raise
00984
00985 def _close_file(self):
00986 self._file.close()
00987 self._file = None
00988
00989 def _create_reader(self):
00990 """
00991 @raise ROSBagException: if the bag version is unsupported
00992 """
00993 major_version, minor_version = self._version / 100, self._version % 100
00994 if major_version == 2:
00995 self._reader = _BagReader200(self)
00996 elif major_version == 1:
00997 if minor_version == 1:
00998 raise ROSBagException('unsupported bag version %d. Please convert bag to newer format.' % self._version)
00999 else:
01000
01001 first_record_pos = self._file.tell()
01002 header = _read_header(self._file)
01003 op = _read_uint8_field(header, 'op')
01004 self._file.seek(first_record_pos)
01005
01006 if op == _OP_FILE_HEADER:
01007 self._reader = _BagReader102_Indexed(self)
01008 else:
01009 self._reader = _BagReader102_Unindexed(self)
01010 else:
01011 raise ROSBagException('unknown bag version %d' % self._version)
01012
01013 def _read_version(self):
01014 """
01015 @raise ROSBagException: if the file is empty, or the version line can't be parsed
01016 """
01017 version_line = self._file.readline().rstrip()
01018 if len(version_line) == 0:
01019 raise ROSBagException('empty file')
01020
01021 matches = re.match("#ROS(.*) V(\d).(\d)", version_line)
01022 if matches is None or len(matches.groups()) != 3:
01023 raise ROSBagException('This does not appear to be a bag file')
01024
01025 version_type, major_version_str, minor_version_str = matches.groups()
01026
01027 version = int(major_version_str) * 100 + int(minor_version_str)
01028
01029 return version
01030
01031 def _start_writing(self):
01032 self._file.write(_VERSION + '\n')
01033 self._file_header_pos = self._file.tell()
01034 self._write_file_header_record(0, 0, 0)
01035
01036 def _stop_writing(self):
01037
01038 self.flush()
01039
01040
01041 self._index_data_pos = self._file.tell()
01042
01043
01044 for connection_info in self._connections.itervalues():
01045 self._write_connection_record(connection_info)
01046
01047
01048 for chunk_info in self._chunks:
01049 self._write_chunk_info_record(chunk_info)
01050
01051
01052 self._file.seek(self._file_header_pos)
01053 self._write_file_header_record(self._index_data_pos, len(self._connections), len(self._chunks))
01054
01055 def _start_appending(self):
01056 self._file_header_pos = self._file.tell()
01057
01058 self._create_reader()
01059 self._reader.start_reading()
01060
01061
01062 self._file.truncate(self._index_data_pos)
01063 self._reader.index_data_pos = 0
01064
01065
01066 self._file.seek(self._file_header_pos);
01067 self._write_file_header_record(0, 0, 0)
01068
01069
01070 self._file.seek(0, os.SEEK_END)
01071
01072 def _start_writing_chunk(self, t):
01073 self._curr_chunk_info = _ChunkInfo(self._file.tell(), t, t)
01074 self._write_chunk_header(_ChunkHeader(self._compression, 0, 0))
01075 self._curr_chunk_data_pos = self._file.tell()
01076 self._set_compression_mode(self._compression)
01077 self._chunk_open = True
01078
01079 def _get_chunk_offset(self):
01080 if self._compression == Compression.NONE:
01081 return self._file.tell() - self._curr_chunk_data_pos
01082 else:
01083 return self._output_file.compressed_bytes_in
01084
01085 def _stop_writing_chunk(self):
01086
01087 self._chunks.append(self._curr_chunk_info)
01088
01089
01090 uncompressed_size = self._get_chunk_offset()
01091 self._set_compression_mode(Compression.NONE)
01092 compressed_size = self._file.tell() - self._curr_chunk_data_pos
01093
01094
01095 end_of_chunk_pos = self._file.tell()
01096 self._file.seek(self._curr_chunk_info.pos)
01097 chunk_header = _ChunkHeader(self._compression, compressed_size, uncompressed_size, self._curr_chunk_data_pos)
01098 self._write_chunk_header(chunk_header)
01099 self._chunk_headers[self._curr_chunk_info.pos] = chunk_header
01100
01101
01102 self._file.seek(end_of_chunk_pos)
01103 for connection_id, entries in self._curr_chunk_connection_indexes.items():
01104 self._write_connection_index_record(connection_id, entries)
01105 self._curr_chunk_connection_indexes.clear()
01106
01107
01108 self._chunk_open = False
01109
01110 def _set_compression_mode(self, compression):
01111
01112 if self._curr_compression != Compression.NONE:
01113 self._output_file.flush()
01114
01115
01116 if compression == Compression.BZ2:
01117 self._output_file = _BZ2CompressorFileFacade(self._file)
01118 elif compression == Compression.NONE:
01119 self._output_file = self._file
01120 else:
01121 raise ROSBagException('unsupported compression type: %s' % compression)
01122
01123 self._curr_compression = compression
01124
01125 def _write_file_header_record(self, index_pos, connection_count, chunk_count):
01126 header = {
01127 'op': _pack_uint8(_OP_FILE_HEADER),
01128 'index_pos': _pack_uint64(index_pos),
01129 'conn_count': _pack_uint32(connection_count),
01130 'chunk_count': _pack_uint32(chunk_count)
01131 }
01132 _write_record(self._file, header, padded_size=_FILE_HEADER_LENGTH)
01133
01134 def _write_connection_record(self, connection_info):
01135 header = {
01136 'op': _pack_uint8(_OP_CONNECTION),
01137 'topic': connection_info.topic,
01138 'conn': _pack_uint32(connection_info.id)
01139 }
01140
01141 _write_header(self._output_file, header)
01142
01143 _write_header(self._output_file, connection_info.header)
01144
01145 def _write_message_data_record(self, connection_id, t, serialized_bytes):
01146 header = {
01147 'op': _pack_uint8(_OP_MSG_DATA),
01148 'conn': _pack_uint32(connection_id),
01149 'time': _pack_time(t)
01150 }
01151 _write_record(self._output_file, header, serialized_bytes)
01152
01153 def _write_chunk_header(self, chunk_header):
01154 header = {
01155 'op': _pack_uint8(_OP_CHUNK),
01156 'compression': chunk_header.compression,
01157 'size': _pack_uint32(chunk_header.uncompressed_size)
01158 }
01159 _write_header(self._file, header)
01160
01161 self._file.write(_pack_uint32(chunk_header.compressed_size))
01162
01163 def _write_connection_index_record(self, connection_id, entries):
01164 header = {
01165 'op': _pack_uint8(_OP_INDEX_DATA),
01166 'conn': _pack_uint32(connection_id),
01167 'ver': _pack_uint32(_INDEX_VERSION),
01168 'count': _pack_uint32(len(entries))
01169 }
01170
01171 buffer = self._buffer
01172 buffer.seek(0)
01173 buffer.truncate(0)
01174 for entry in entries:
01175 buffer.write(_pack_time (entry.time))
01176 buffer.write(_pack_uint32(entry.offset))
01177
01178 _write_record(self._file, header, buffer.getvalue())
01179
01180 def _write_chunk_info_record(self, chunk_info):
01181 header = {
01182 'op': _pack_uint8 (_OP_CHUNK_INFO),
01183 'ver': _pack_uint32(_CHUNK_INDEX_VERSION),
01184 'chunk_pos': _pack_uint64(chunk_info.pos),
01185 'start_time': _pack_time(chunk_info.start_time),
01186 'end_time': _pack_time(chunk_info.end_time),
01187 'count': _pack_uint32(len(chunk_info.connection_counts))
01188 }
01189
01190 buffer = self._buffer
01191 buffer.seek(0)
01192 buffer.truncate(0)
01193 for connection_id, count in chunk_info.connection_counts.items():
01194 buffer.write(_pack_uint32(connection_id))
01195 buffer.write(_pack_uint32(count))
01196
01197 _write_record(self._file, header, buffer.getvalue())
01198
01199
01200
01201 _message_types = {}
01202
01203 _OP_MSG_DEF = 0x01
01204 _OP_MSG_DATA = 0x02
01205 _OP_FILE_HEADER = 0x03
01206 _OP_INDEX_DATA = 0x04
01207 _OP_CHUNK = 0x05
01208 _OP_CHUNK_INFO = 0x06
01209 _OP_CONNECTION = 0x07
01210
01211 _OP_CODES = {
01212 _OP_MSG_DEF: 'MSG_DEF',
01213 _OP_MSG_DATA: 'MSG_DATA',
01214 _OP_FILE_HEADER: 'FILE_HEADER',
01215 _OP_INDEX_DATA: 'INDEX_DATA',
01216 _OP_CHUNK: 'CHUNK',
01217 _OP_CHUNK_INFO: 'CHUNK_INFO',
01218 _OP_CONNECTION: 'CONNECTION'
01219 }
01220
01221 _VERSION = '#ROSBAG V2.0'
01222 _FILE_HEADER_LENGTH = 4096
01223 _INDEX_VERSION = 1
01224 _CHUNK_INDEX_VERSION = 1
01225
01226 class _ConnectionInfo(object):
01227 def __init__(self, id, topic, header):
01228 try:
01229 datatype, md5sum, msg_def = header['type'], header['md5sum'], header['message_definition']
01230 except KeyError as ex:
01231 raise ROSBagFormatException('connection header field %s not found' % str(ex))
01232
01233 self.id = id
01234 self.topic = topic
01235 self.datatype = datatype
01236 self.md5sum = md5sum
01237 self.msg_def = msg_def
01238 self.header = header
01239
01240 def __str__(self):
01241 return '%d on %s: %s' % (self.id, self.topic, str(self.header))
01242
01243 class _ChunkInfo(object):
01244 def __init__(self, pos, start_time, end_time):
01245 self.pos = pos
01246 self.start_time = start_time
01247 self.end_time = end_time
01248
01249 self.connection_counts = {}
01250
01251 def __str__(self):
01252 s = 'chunk_pos: %d\n' % self.pos
01253 s += 'start_time: %s\n' % str(self.start_time)
01254 s += 'end_time: %s\n' % str(self.end_time)
01255 s += 'connections: %d\n' % len(self.connection_counts)
01256 s += '\n'.join([' - [%4d] %d' % (connection_id, count) for connection_id, count in self.connection_counts.items()])
01257 return s
01258
01259 class _ChunkHeader(object):
01260 def __init__(self, compression, compressed_size, uncompressed_size, data_pos=0):
01261 self.compression = compression
01262 self.compressed_size = compressed_size
01263 self.uncompressed_size = uncompressed_size
01264 self.data_pos = data_pos
01265
01266 def __str__(self):
01267 if self.uncompressed_size > 0:
01268 ratio = 100 * (float(self.compressed_size) / self.uncompressed_size)
01269 return 'compression: %s, size: %d, uncompressed: %d (%.2f%%)' % (self.compression, self.compressed_size, self.uncompressed_size, ratio)
01270 else:
01271 return 'compression: %s, size: %d, uncompressed: %d' % (self.compression, self.compressed_size, self.uncompressed_size)
01272
01273 class _IndexEntry(object):
01274 def __init__(self, time):
01275 self.time = time
01276
01277 def __cmp__(self, other):
01278 return self.time.__cmp__(other.time)
01279
01280 class _IndexEntry102(_IndexEntry):
01281 def __init__(self, time, offset):
01282 self.time = time
01283 self.offset = offset
01284
01285 @property
01286 def position(self):
01287 return self.offset
01288
01289 def __str__(self):
01290 return '%d.%d: %d' % (self.time.secs, self.time.nsecs, self.offset)
01291
01292 class _IndexEntry200(_IndexEntry):
01293 def __init__(self, time, chunk_pos, offset):
01294 self.time = time
01295 self.chunk_pos = chunk_pos
01296 self.offset = offset
01297
01298 @property
01299 def position(self):
01300 return (self.chunk_pos, self.offset)
01301
01302 def __str__(self):
01303 return '%d.%d: %d+%d' % (self.time.secs, self.time.nsecs, self.chunk_pos, self.offset)
01304
01305 def _get_message_type(info):
01306 message_type = _message_types.get(info.md5sum)
01307 if message_type is None:
01308 try:
01309 message_type = genpy.dynamic.generate_dynamic(info.datatype, info.msg_def)[info.datatype]
01310 if (message_type._md5sum != info.md5sum):
01311 print('WARNING: For type [%s] stored md5sum [%s] does not match message definition [%s].\n Try: "rosrun rosbag fix_msg_defs.py old_bag new_bag."'%(info.datatype, info.md5sum, message_type._md5sum), file=sys.stderr)
01312 except genmsg.InvalidMsgSpec:
01313 message_type = genpy.dynamic.generate_dynamic(info.datatype, "")[info.datatype]
01314 print('WARNING: For type [%s] stored md5sum [%s] has invalid message definition."'%(info.datatype, info.md5sum), file=sys.stderr)
01315 except genmsg.MsgGenerationException as ex:
01316 raise ROSBagException('Error generating datatype %s: %s' % (info.datatype, str(ex)))
01317
01318 _message_types[info.md5sum] = message_type
01319
01320 return message_type
01321
01322 def _read_uint8 (f): return _unpack_uint8 (f.read(1))
01323 def _read_uint32(f): return _unpack_uint32(f.read(4))
01324 def _read_uint64(f): return _unpack_uint64(f.read(8))
01325 def _read_time (f): return _unpack_time (f.read(8))
01326
01327 def _unpack_uint8(v): return struct.unpack('<B', v)[0]
01328 def _unpack_uint32(v): return struct.unpack('<L', v)[0]
01329 def _unpack_uint64(v): return struct.unpack('<Q', v)[0]
01330 def _unpack_time(v): return rospy.Time(*struct.unpack('<LL', v))
01331
01332 def _pack_uint8(v): return struct.pack('<B', v)
01333 def _pack_uint32(v): return struct.pack('<L', v)
01334 def _pack_uint64(v): return struct.pack('<Q', v)
01335 def _pack_time(v): return _pack_uint32(v.secs) + _pack_uint32(v.nsecs)
01336
01337 def _read(f, size):
01338 data = f.read(size)
01339 if len(data) != size:
01340 raise ROSBagException('expecting %d bytes, read %d' % (size, len(data)))
01341 return data
01342
01343 def _skip_record(f):
01344 _skip_sized(f)
01345 _skip_sized(f)
01346
01347 def _skip_sized(f):
01348 size = _read_uint32(f)
01349 f.seek(size, os.SEEK_CUR)
01350
01351 def _read_sized(f):
01352 try:
01353 size = _read_uint32(f)
01354 except struct.error as ex:
01355 raise ROSBagFormatException('error unpacking uint32: %s' % str(ex))
01356 return _read(f, size)
01357
01358 def _write_sized(f, v):
01359 f.write(_pack_uint32(len(v)))
01360 f.write(v)
01361
01362 def _read_field(header, field, unpack_fn):
01363 if field not in header:
01364 raise ROSBagFormatException('expected "%s" field in record' % field)
01365
01366 try:
01367 value = unpack_fn(header[field])
01368 except Exception as ex:
01369 raise ROSBagFormatException('error reading field "%s": %s' % (field, str(ex)))
01370
01371 return value
01372
01373 def _read_str_field (header, field): return _read_field(header, field, lambda v: v)
01374 def _read_uint8_field (header, field): return _read_field(header, field, _unpack_uint8)
01375 def _read_uint32_field(header, field): return _read_field(header, field, _unpack_uint32)
01376 def _read_uint64_field(header, field): return _read_field(header, field, _unpack_uint64)
01377 def _read_time_field (header, field): return _read_field(header, field, _unpack_time)
01378
01379 def _write_record(f, header, data='', padded_size=None):
01380 header_str = _write_header(f, header)
01381
01382 if padded_size is not None:
01383 header_len = len(header_str)
01384 if header_len < padded_size:
01385 data = ' ' * (padded_size - header_len)
01386 else:
01387 data = ''
01388
01389 _write_sized(f, data)
01390
01391 def _write_header(f, header):
01392 header_str = ''.join([_pack_uint32(len(k) + 1 + len(v)) + k + '=' + v for k, v in header.items()])
01393 _write_sized(f, header_str)
01394 return header_str
01395
01396 def _read_header(f, req_op=None):
01397 bag_pos = f.tell()
01398
01399
01400 try:
01401 header = _read_sized(f)
01402 except ROSBagException as ex:
01403 raise ROSBagFormatException('Error reading header: %s' % str(ex))
01404
01405
01406 header_dict = {}
01407 while header != '':
01408
01409 if len(header) < 4:
01410 raise ROSBagFormatException('Error reading header field')
01411 (size,) = struct.unpack('<L', header[:4])
01412 header = header[4:]
01413
01414
01415 if len(header) < size:
01416 raise ROSBagFormatException('Error reading header field: expected %d bytes, read %d' % (size, len(header)))
01417 (name, sep, value) = header[:size].partition('=')
01418 if sep == '':
01419 raise ROSBagFormatException('Error reading header field')
01420
01421 header_dict[name] = value
01422
01423 header = header[size:]
01424
01425
01426 if req_op is not None:
01427 op = _read_uint8_field(header_dict, 'op')
01428 if req_op != op:
01429 raise ROSBagFormatException('Expected op code: %s, got %s' % (_OP_CODES[req_op], _OP_CODES[op]))
01430
01431 return header_dict
01432
01433 def _peek_next_header_op(f):
01434 pos = f.tell()
01435 header = _read_header(f)
01436 op = _read_uint8_field(header, 'op')
01437 f.seek(pos)
01438 return op
01439
01440 def _read_record_data(f):
01441 try:
01442 record_data = _read_sized(f)
01443 except ROSBagException as ex:
01444 raise ROSBagFormatException('Error reading record data: %s' % str(ex))
01445
01446 return record_data
01447
01448 class _BagReader(object):
01449 def __init__(self, bag):
01450 self.bag = bag
01451
01452 def start_reading(self):
01453 raise NotImplementedError()
01454
01455 def read_messages(self, topics, start_time, end_time, connection_filter, raw):
01456 raise NotImplementedError()
01457
01458 def reindex(self):
01459 raise NotImplementedError()
01460
01461 class _BagReader102_Unindexed(_BagReader):
01462 """
01463 Support class for reading unindexed v1.2 bag files.
01464 """
01465 def __init__(self, bag):
01466 _BagReader.__init__(self, bag)
01467
01468 def start_reading(self):
01469 self.bag._file_header_pos = self.bag._file.tell()
01470
01471 def reindex(self):
01472 """Generates all bag index information by rereading the message records."""
01473 f = self.bag._file
01474
01475 total_bytes = self.bag.size
01476
01477
01478 self.bag._file.seek(self.bag._file_header_pos)
01479
01480 offset = f.tell()
01481
01482
01483 while offset < total_bytes:
01484 yield offset
01485
01486 op = _peek_next_header_op(f)
01487
01488 if op == _OP_MSG_DEF:
01489 connection_info = self.read_message_definition_record()
01490
01491 if connection_info.topic not in self.bag._topic_connections:
01492 self.bag._topic_connections[connection_info.topic] = connection_info.id
01493 self.bag._connections[connection_info.id] = connection_info
01494 self.bag._connection_indexes[connection_info.id] = []
01495
01496 elif op == _OP_MSG_DATA:
01497
01498 header = _read_header(f)
01499
01500 topic = _read_str_field(header, 'topic')
01501 secs = _read_uint32_field(header, 'sec')
01502 nsecs = _read_uint32_field(header, 'nsec')
01503 t = genpy.Time(secs, nsecs)
01504
01505 if topic not in self.bag._topic_connections:
01506 datatype = _read_str_field(header, 'type')
01507 self._create_connection_info_for_datatype(topic, datatype)
01508
01509 connection_id = self.bag._topic_connections[topic]
01510 info = self.bag._connections[connection_id]
01511
01512
01513 _skip_sized(f)
01514
01515
01516 bisect.insort_right(self.bag._connection_indexes[connection_id], _IndexEntry102(t, offset))
01517
01518 offset = f.tell()
01519
01520 def read_messages(self, topics, start_time, end_time, topic_filter, raw):
01521 f = self.bag._file
01522
01523 f.seek(self.bag._file_header_pos)
01524
01525 while True:
01526
01527 while True:
01528 position = f.tell()
01529
01530 try:
01531 header = _read_header(f)
01532 except Exception:
01533 return
01534
01535 op = _read_uint8_field(header, 'op')
01536 if op != _OP_MSG_DEF:
01537 break
01538
01539 connection_info = self.read_message_definition_record(header)
01540
01541 if connection_info.topic not in self.bag._topic_connections:
01542 self.bag._topic_connections[connection_info.topic] = connection_info.id
01543
01544 self.bag._connections[connection_info.id] = connection_info
01545
01546
01547 if op != _OP_MSG_DATA:
01548 raise ROSBagFormatException('Expecting OP_MSG_DATA, got %d' % op)
01549
01550 topic = _read_str_field(header, 'topic')
01551
01552 if topic not in self.bag._topic_connections:
01553 datatype = _read_str_field(header, 'type')
01554 self._create_connection_info_for_datatype(topic, datatype)
01555
01556 connection_id = self.bag._topic_connections[topic]
01557 info = self.bag._connections[connection_id]
01558
01559
01560 try:
01561 msg_type = _get_message_type(info)
01562 except KeyError:
01563 raise ROSBagException('Cannot deserialize messages of type [%s]. Message was not preceeded in bagfile by definition' % info.datatype)
01564
01565
01566 secs = _read_uint32_field(header, 'sec')
01567 nsecs = _read_uint32_field(header, 'nsec')
01568 t = genpy.Time(secs, nsecs)
01569
01570
01571 data = _read_record_data(f)
01572
01573 if raw:
01574 msg = (info.datatype, data, info.md5sum, position, msg_type)
01575 else:
01576
01577 msg = msg_type()
01578 msg.deserialize(data)
01579
01580 yield (topic, msg, t)
01581
01582 self.bag._connection_indexes_read = True
01583
01584 def _create_connection_info_for_datatype(self, topic, datatype):
01585 for c in self.bag._connections.itervalues():
01586 if c.datatype == datatype:
01587 connection_id = len(self.bag._connections)
01588 connection_header = { 'topic' : topic, 'type' : c.header['type'], 'md5sum' : c.header['md5sum'], 'message_definition' : c.header['message_definition'] }
01589 connection_info = _ConnectionInfo(connection_id, topic, connection_header)
01590
01591 self.bag._topic_connections[topic] = connection_id
01592 self.bag._connections[connection_id] = connection_info
01593 self.bag._connection_indexes[connection_id] = []
01594 return
01595
01596 raise ROSBagFormatException('Topic %s of datatype %s not preceded by message definition' % (topic, datatype))
01597
01598 def read_message_definition_record(self, header=None):
01599 if not header:
01600 header = _read_header(self.bag._file, _OP_MSG_DEF)
01601
01602 topic = _read_str_field(header, 'topic')
01603 datatype = _read_str_field(header, 'type')
01604 md5sum = _read_str_field(header, 'md5')
01605 msg_def = _read_str_field(header, 'def')
01606
01607 _skip_sized(self.bag._file)
01608
01609 connection_header = { 'topic' : topic, 'type' : datatype, 'md5sum' : md5sum, 'message_definition' : msg_def }
01610
01611 id = len(self.bag._connections)
01612
01613 return _ConnectionInfo(id, topic, connection_header)
01614
01615 class _BagReader102_Indexed(_BagReader102_Unindexed):
01616 """
01617 Support class for reading indexed v1.2 bag files.
01618 """
01619 def __init__(self, bag):
01620 _BagReader.__init__(self, bag)
01621
01622 def read_messages(self, topics, start_time, end_time, connection_filter, raw):
01623 connections = self.bag._get_connections(topics, connection_filter)
01624 for entry in self.bag._get_entries(connections, start_time, end_time):
01625 yield self.seek_and_read_message_data_record(entry.offset, raw)
01626
01627 def reindex(self):
01628 """Generates all bag index information by rereading the message records."""
01629 f = self.bag._file
01630
01631 total_bytes = self.bag.size
01632
01633
01634 self.bag._file.seek(self.bag._file_header_pos)
01635 self.read_file_header_record()
01636
01637 offset = f.tell()
01638
01639
01640 while offset < total_bytes:
01641 yield offset
01642
01643 op = _peek_next_header_op(f)
01644
01645 if op == _OP_MSG_DEF:
01646 connection_info = self.read_message_definition_record()
01647
01648 if connection_info.topic not in self.bag._topic_connections:
01649 self.bag._topic_connections[connection_info.topic] = connection_info.id
01650 self.bag._connections[connection_info.id] = connection_info
01651 self.bag._connection_indexes[connection_info.id] = []
01652
01653 elif op == _OP_MSG_DATA:
01654
01655 header = _read_header(f)
01656
01657 topic = _read_str_field(header, 'topic')
01658 secs = _read_uint32_field(header, 'sec')
01659 nsecs = _read_uint32_field(header, 'nsec')
01660 t = genpy.Time(secs, nsecs)
01661
01662 if topic not in self.bag._topic_connections:
01663 datatype = _read_str_field(header, 'type')
01664 self._create_connection_info_for_datatype(topic, datatype)
01665
01666 connection_id = self.bag._topic_connections[topic]
01667 info = self.bag._connections[connection_id]
01668
01669
01670 _skip_sized(f)
01671
01672
01673 bisect.insort_right(self.bag._connection_indexes[connection_id], _IndexEntry102(t, offset))
01674
01675 elif op == _OP_INDEX_DATA:
01676 _skip_record(f)
01677
01678 offset = f.tell()
01679
01680 def start_reading(self):
01681 try:
01682
01683 self.read_file_header_record()
01684
01685 total_bytes = self.bag.size
01686
01687
01688 if self.bag._index_data_pos == 0:
01689 raise ROSBagUnindexedException()
01690
01691
01692 self.bag._file.seek(self.bag._index_data_pos)
01693
01694
01695 topic_indexes = {}
01696 while True:
01697 pos = self.bag._file.tell()
01698 if pos >= total_bytes:
01699 break
01700
01701 topic, index = self.read_topic_index_record()
01702
01703 topic_indexes[topic] = index
01704
01705
01706 for topic, index in topic_indexes.items():
01707 self.bag._file.seek(index[0].offset)
01708
01709 connection_info = self.read_message_definition_record()
01710
01711 if connection_info.topic not in self.bag._topic_connections:
01712 self.bag._topic_connections[connection_info.topic] = connection_info.id
01713 self.bag._connections[connection_info.id] = connection_info
01714
01715 self.bag._connection_indexes[connection_info.id] = index
01716
01717 self.bag._connection_indexes_read = True
01718
01719 except Exception as ex:
01720 raise ROSBagUnindexedException()
01721
01722 def read_file_header_record(self):
01723 self.bag._file_header_pos = self.bag._file.tell()
01724
01725 header = _read_header(self.bag._file, _OP_FILE_HEADER)
01726
01727 self.bag._index_data_pos = _read_uint64_field(header, 'index_pos')
01728
01729 _skip_sized(self.bag._file)
01730
01731 def read_topic_index_record(self):
01732 f = self.bag._file
01733
01734 header = _read_header(f, _OP_INDEX_DATA)
01735
01736 index_version = _read_uint32_field(header, 'ver')
01737 topic = _read_str_field (header, 'topic')
01738 count = _read_uint32_field(header, 'count')
01739
01740 if index_version != 0:
01741 raise ROSBagFormatException('expecting index version 0, got %d' % index_version)
01742
01743 _read_uint32(f)
01744
01745 topic_index = []
01746
01747 for i in range(count):
01748 time = _read_time (f)
01749 offset = _read_uint64(f)
01750
01751 topic_index.append(_IndexEntry102(time, offset))
01752
01753 return (topic, topic_index)
01754
01755 def seek_and_read_message_data_record(self, position, raw):
01756 f = self.bag._file
01757
01758
01759 f.seek(position)
01760
01761
01762 while True:
01763 header = _read_header(f)
01764 op = _read_uint8_field(header, 'op')
01765 if op != _OP_MSG_DEF:
01766 break
01767 _skip_sized(f)
01768
01769
01770 if op != _OP_MSG_DATA:
01771 raise ROSBagFormatException('Expecting OP_MSG_DATA, got %d' % op)
01772
01773 topic = _read_str_field(header, 'topic')
01774
01775 connection_id = self.bag._topic_connections[topic]
01776 info = self.bag._connections[connection_id]
01777
01778
01779 try:
01780 msg_type = _get_message_type(info)
01781 except KeyError:
01782 raise ROSBagException('Cannot deserialize messages of type [%s]. Message was not preceeded in bagfile by definition' % info.datatype)
01783
01784
01785 secs = _read_uint32_field(header, 'sec')
01786 nsecs = _read_uint32_field(header, 'nsec')
01787 t = genpy.Time(secs, nsecs)
01788
01789
01790 data = _read_record_data(f)
01791
01792 if raw:
01793 msg = info.datatype, data, info.md5sum, position, msg_type
01794 else:
01795
01796 msg = msg_type()
01797 msg.deserialize(data)
01798
01799 return (topic, msg, t)
01800
01801 class _BagReader200(_BagReader):
01802 """
01803 Support class for reading v2.0 bag files.
01804 """
01805 def __init__(self, bag):
01806 _BagReader.__init__(self, bag)
01807
01808 self.decompressed_chunk_pos = None
01809 self.decompressed_chunk = None
01810 self.decompressed_chunk_io = None
01811
01812 def reindex(self):
01813 """
01814 Generates all bag index information by rereading the chunks.
01815 Assumes the file header has been read.
01816 """
01817 f = self.bag._file
01818
01819 f.seek(0, os.SEEK_END)
01820 total_bytes = f.tell()
01821
01822
01823
01824
01825 self._read_terminal_connection_records()
01826
01827
01828 self.bag._file.seek(self.bag._file_header_pos)
01829 self.read_file_header_record()
01830
01831 trunc_pos = None
01832
01833 while True:
01834 chunk_pos = f.tell()
01835 if chunk_pos >= total_bytes:
01836 break
01837
01838 yield chunk_pos
01839
01840 try:
01841 self._reindex_read_chunk(f, chunk_pos, total_bytes)
01842 except Exception as ex:
01843 break
01844
01845 trunc_pos = f.tell()
01846
01847 if trunc_pos and trunc_pos < total_bytes:
01848 f.truncate(trunc_pos)
01849
01850 def _reindex_read_chunk(self, f, chunk_pos, total_bytes):
01851
01852 chunk_header = self.read_chunk_header()
01853
01854
01855 if chunk_header.compressed_size == 0:
01856 raise ROSBagException('unterminated chunk at %d' % chunk_pos)
01857
01858 if chunk_header.compression == Compression.NONE:
01859 chunk_file = f
01860 else:
01861
01862 compressed_chunk = _read(f, chunk_header.compressed_size)
01863
01864
01865 if chunk_header.compression == Compression.BZ2:
01866 self.decompressed_chunk = bz2.decompress(compressed_chunk)
01867 else:
01868 raise ROSBagException('unsupported compression type: %s' % chunk_header.compression)
01869
01870 if self.decompressed_chunk_io:
01871 self.decompressed_chunk_io.close()
01872 self.decompressed_chunk_io = StringIO(self.decompressed_chunk)
01873
01874 chunk_file = self.decompressed_chunk_io
01875
01876
01877 self.bag._curr_chunk_info = None
01878
01879 if chunk_header.compression == Compression.NONE:
01880 offset = chunk_file.tell() - chunk_pos
01881 else:
01882 offset = chunk_file.tell()
01883
01884 expected_index_length = 0
01885
01886 while offset < chunk_header.uncompressed_size:
01887 op = _peek_next_header_op(chunk_file)
01888
01889 if op == _OP_CONNECTION:
01890 connection_info = self.read_connection_record(chunk_file)
01891
01892 if connection_info.id not in self.bag._connections:
01893 self.bag._connections[connection_info.id] = connection_info
01894 if connection_info.id not in self.bag._connection_indexes:
01895 self.bag._connection_indexes[connection_info.id] = []
01896
01897 elif op == _OP_MSG_DATA:
01898
01899 header = _read_header(chunk_file)
01900
01901 connection_id = _read_uint32_field(header, 'conn')
01902 t = _read_time_field (header, 'time')
01903
01904
01905 if not self.bag._curr_chunk_info:
01906 self.bag._curr_chunk_info = _ChunkInfo(chunk_pos, t, t)
01907 else:
01908 if t > self.bag._curr_chunk_info.end_time:
01909 self.bag._curr_chunk_info.end_time = t
01910 elif t < self.bag._curr_chunk_info.start_time:
01911 self.bag._curr_chunk_info.start_time = t
01912 if connection_id in self.bag._curr_chunk_info.connection_counts:
01913 self.bag._curr_chunk_info.connection_counts[connection_id] += 1
01914 else:
01915 self.bag._curr_chunk_info.connection_counts[connection_id] = 1
01916
01917
01918 _skip_sized(chunk_file)
01919
01920
01921 if connection_id not in self.bag._connection_indexes:
01922 raise ROSBagException('connection id (id=%d) in chunk at position %d not preceded by connection record' % (connection_id, chunk_pos))
01923 bisect.insort_right(self.bag._connection_indexes[connection_id], _IndexEntry200(t, chunk_pos, offset))
01924
01925 expected_index_length += 1
01926
01927 else:
01928
01929 _skip_record(chunk_file)
01930
01931 if chunk_header.compression == Compression.NONE:
01932 offset = chunk_file.tell() - chunk_pos
01933 else:
01934 offset = chunk_file.tell()
01935
01936
01937 next_op = _peek_next_header_op(f)
01938
01939 total_index_length = 0
01940
01941 while next_op != _OP_CHUNK:
01942 if next_op == _OP_INDEX_DATA:
01943
01944 _, index = self.read_connection_index_record()
01945 total_index_length += len(index)
01946 else:
01947 _skip_record(f)
01948
01949 if f.tell() >= total_bytes:
01950 if total_index_length != expected_index_length:
01951 raise ROSBagException('index shorter than expected (%d vs %d)' % (total_index_length, expected_index_length))
01952 break
01953
01954 next_op = _peek_next_header_op(f)
01955
01956
01957 self.bag._chunk_headers[chunk_pos] = chunk_header
01958 self.bag._chunks.append(self.bag._curr_chunk_info)
01959
01960 def _read_terminal_connection_records(self):
01961 b, f, r = self.bag, self.bag._file, self.bag._reader
01962
01963
01964 f.seek(b._file_header_pos)
01965 r.read_file_header_record()
01966
01967
01968 if self._advance_to_next_record(_OP_CONNECTION):
01969
01970 while True:
01971 connection_info = r.read_connection_record(f)
01972
01973 b._connections[connection_info.id] = connection_info
01974 b._connection_indexes[connection_info.id] = []
01975
01976 next_op = _peek_next_header_op(f)
01977 if next_op != _OP_CONNECTION:
01978 break
01979
01980 def _advance_to_next_record(self, op):
01981 b, f = self.bag, self.bag._file
01982
01983 try:
01984 while True:
01985 next_op = _peek_next_header_op(f)
01986 if next_op == op:
01987 break
01988
01989 if next_op == _OP_INDEX_DATA:
01990
01991
01992
01993 if b._curr_chunk_info is None:
01994 b._curr_chunk_info = _ChunkInfo(0, rospy.Time(0, 1), rospy.Time(0, 1))
01995
01996 b._reader.read_connection_index_record()
01997 else:
01998 _skip_record(f)
01999
02000 return True
02001
02002 except Exception as ex:
02003 return False
02004
02005 def start_reading(self):
02006 try:
02007
02008 self.read_file_header_record()
02009
02010
02011 if self.bag._index_data_pos == 0:
02012 raise ROSBagUnindexedException()
02013
02014
02015 self.bag._file.seek(self.bag._index_data_pos)
02016
02017
02018 self.bag._connection_indexes = {}
02019 for i in range(self.bag._connection_count):
02020 connection_info = self.read_connection_record(self.bag._file)
02021 self.bag._connections[connection_info.id] = connection_info
02022 self.bag._connection_indexes[connection_info.id] = []
02023
02024
02025 self.bag._chunks = [self.read_chunk_info_record() for i in range(self.bag._chunk_count)]
02026
02027
02028 self.bag._chunk_headers = {}
02029 for chunk_info in self.bag._chunks:
02030 self.bag._file.seek(chunk_info.pos)
02031 self.bag._chunk_headers[chunk_info.pos] = self.read_chunk_header()
02032
02033 if not self.bag._skip_index:
02034 self._read_connection_index_records()
02035
02036 except Exception as ex:
02037 raise ROSBagUnindexedException()
02038
02039 def _read_connection_index_records(self):
02040 for chunk_info in self.bag._chunks:
02041 self.bag._file.seek(chunk_info.pos)
02042 _skip_record(self.bag._file)
02043
02044 self.bag._curr_chunk_info = chunk_info
02045 for i in range(len(chunk_info.connection_counts)):
02046 connection_id, index = self.read_connection_index_record()
02047 self.bag._connection_indexes[connection_id].extend(index)
02048
02049
02050
02051
02052 orphan_connection_ids = [id for id, index in self.bag._connection_indexes.iteritems() if not index]
02053 for id in orphan_connection_ids:
02054 del self.bag._connections[id]
02055 del self.bag._connection_indexes[id]
02056
02057 self.bag._connection_indexes_read = True
02058
02059 def read_messages(self, topics, start_time, end_time, connection_filter, raw):
02060 connections = self.bag._get_connections(topics, connection_filter)
02061 for entry in self.bag._get_entries(connections, start_time, end_time):
02062 yield self.seek_and_read_message_data_record((entry.chunk_pos, entry.offset), raw)
02063
02064
02065
02066 def read_file_header_record(self):
02067 self.bag._file_header_pos = self.bag._file.tell()
02068
02069 header = _read_header(self.bag._file, _OP_FILE_HEADER)
02070
02071 self.bag._index_data_pos = _read_uint64_field(header, 'index_pos')
02072 self.bag._chunk_count = _read_uint32_field(header, 'chunk_count')
02073 self.bag._connection_count = _read_uint32_field(header, 'conn_count')
02074
02075 _skip_sized(self.bag._file)
02076
02077 def read_connection_record(self, f):
02078 header = _read_header(f, _OP_CONNECTION)
02079
02080 conn_id = _read_uint32_field(header, 'conn')
02081 topic = _read_str_field (header, 'topic')
02082
02083 connection_header = _read_header(f)
02084
02085 return _ConnectionInfo(conn_id, topic, connection_header)
02086
02087 def read_chunk_info_record(self):
02088 f = self.bag._file
02089
02090 header = _read_header(f, _OP_CHUNK_INFO)
02091
02092 chunk_info_version = _read_uint32_field(header, 'ver')
02093
02094 if chunk_info_version == 1:
02095 chunk_pos = _read_uint64_field(header, 'chunk_pos')
02096 start_time = _read_time_field (header, 'start_time')
02097 end_time = _read_time_field (header, 'end_time')
02098 connection_count = _read_uint32_field(header, 'count')
02099
02100 chunk_info = _ChunkInfo(chunk_pos, start_time, end_time)
02101
02102 _read_uint32(f)
02103
02104 for i in range(connection_count):
02105 connection_id = _read_uint32(f)
02106 count = _read_uint32(f)
02107
02108 chunk_info.connection_counts[connection_id] = count
02109
02110 return chunk_info
02111 else:
02112 raise ROSBagFormatException('Unknown chunk info record version: %d' % chunk_info_version)
02113
02114 def read_chunk_header(self):
02115 header = _read_header(self.bag._file, _OP_CHUNK)
02116
02117 compression = _read_str_field (header, 'compression')
02118 uncompressed_size = _read_uint32_field(header, 'size')
02119
02120 compressed_size = _read_uint32(self.bag._file)
02121
02122 data_pos = self.bag._file.tell()
02123
02124 return _ChunkHeader(compression, compressed_size, uncompressed_size, data_pos)
02125
02126 def read_connection_index_record(self):
02127 f = self.bag._file
02128
02129 header = _read_header(f, _OP_INDEX_DATA)
02130
02131 index_version = _read_uint32_field(header, 'ver')
02132 connection_id = _read_uint32_field(header, 'conn')
02133 count = _read_uint32_field(header, 'count')
02134
02135 if index_version != 1:
02136 raise ROSBagFormatException('expecting index version 1, got %d' % index_version)
02137
02138 record_size = _read_uint32(f)
02139
02140 index = []
02141
02142 for i in range(count):
02143 time = _read_time (f)
02144 offset = _read_uint32(f)
02145
02146 index.append(_IndexEntry200(time, self.bag._curr_chunk_info.pos, offset))
02147
02148 return (connection_id, index)
02149
02150 def seek_and_read_message_data_record(self, position, raw):
02151 chunk_pos, offset = position
02152
02153 chunk_header = self.bag._chunk_headers.get(chunk_pos)
02154 if chunk_header is None:
02155 raise ROSBagException('no chunk at position %d' % chunk_pos)
02156
02157 if chunk_header.compression == Compression.NONE:
02158 f = self.bag._file
02159 f.seek(chunk_header.data_pos + offset)
02160 else:
02161 if self.decompressed_chunk_pos != chunk_pos:
02162
02163 self.bag._file.seek(chunk_header.data_pos)
02164 compressed_chunk = _read(self.bag._file, chunk_header.compressed_size)
02165
02166 if chunk_header.compression == Compression.BZ2:
02167 self.decompressed_chunk = bz2.decompress(compressed_chunk)
02168 else:
02169 raise ROSBagException('unsupported compression type: %s' % chunk_header.compression)
02170
02171 self.decompressed_chunk_pos = chunk_pos
02172
02173 if self.decompressed_chunk_io:
02174 self.decompressed_chunk_io.close()
02175 self.decompressed_chunk_io = StringIO(self.decompressed_chunk)
02176
02177 f = self.decompressed_chunk_io
02178 f.seek(offset)
02179
02180
02181 while True:
02182 header = _read_header(f)
02183 op = _read_uint8_field(header, 'op')
02184 if op != _OP_CONNECTION:
02185 break
02186 _skip_sized(f)
02187
02188
02189 if op != _OP_MSG_DATA:
02190 raise ROSBagFormatException('Expecting OP_MSG_DATA, got %d' % op)
02191
02192 connection_id = _read_uint32_field(header, 'conn')
02193 t = _read_time_field (header, 'time')
02194
02195
02196 connection_info = self.bag._connections[connection_id]
02197 try:
02198 msg_type = _get_message_type(connection_info)
02199 except KeyError:
02200 raise ROSBagException('Cannot deserialize messages of type [%s]. Message was not preceded in bag by definition' % connection_info.datatype)
02201
02202
02203 data = _read_record_data(f)
02204
02205
02206 if raw:
02207 msg = connection_info.datatype, data, connection_info.md5sum, (chunk_pos, offset), msg_type
02208 else:
02209 msg = msg_type()
02210 msg.deserialize(data)
02211
02212 return (connection_info.topic, msg, t)
02213
02214 def _time_to_str(secs):
02215 secs_frac = secs - int(secs)
02216 secs_frac_str = ('%.2f' % secs_frac)[1:]
02217
02218 return time.strftime('%b %d %Y %H:%M:%S', time.localtime(secs)) + secs_frac_str
02219
02220 def _human_readable_size(size):
02221 multiple = 1024.0
02222 for suffix in ['KB', 'MB', 'GB', 'TB', 'PB', 'EB', 'ZB', 'YB']:
02223 size /= multiple
02224 if size < multiple:
02225 return '%.1f %s' % (size, suffix)
02226
02227 return '-'
02228
02229 def _human_readable_frequency(freq):
02230 multiple = 1000.0
02231 for suffix in ['Hz', 'kHz', 'MHz', 'GHz', 'THz', 'PHz', 'EHz', 'ZHz', 'YHz']:
02232 if freq < multiple:
02233 return '%.1f %s' % (freq, suffix)
02234 freq /= multiple
02235
02236 return '-'
02237
02238
02239 def _mergesort(list_of_lists, key=None):
02240 """
02241 Perform an N-way merge operation on sorted lists.
02242
02243 @param list_of_lists: (really iterable of iterable) of sorted elements
02244 (either by naturally or by C{key})
02245 @param key: specify sort key function (like C{sort()}, C{sorted()})
02246 @param iterfun: function that returns an iterator.
02247
02248 Yields tuples of the form C{(item, iterator)}, where the iterator is the
02249 built-in list iterator or something you pass in, if you pre-generate the
02250 iterators.
02251
02252 This is a stable merge; complexity O(N lg N)
02253
02254 Examples::
02255
02256 print list(x[0] for x in mergesort([[1,2,3,4],
02257 [2,3.5,3.7,4.5,6,7],
02258 [2.6,3.6,6.6,9]]))
02259 [1, 2, 2, 2.6, 3, 3.5, 3.6, 3.7, 4, 4.5, 6, 6.6, 7, 9]
02260
02261 # note stability
02262 print list(x[0] for x in mergesort([[1,2,3,4],
02263 [2,3.5,3.7,4.5,6,7],
02264 [2.6,3.6,6.6,9]], key=int))
02265 [1, 2, 2, 2.6, 3, 3.5, 3.6, 3.7, 4, 4.5, 6, 6.6, 7, 9]
02266
02267 print list(x[0] for x in mergesort([[4,3,2,1],
02268 [7,6.5,4,3.7,3.3,1.9],
02269 [9,8.6,7.6,6.6,5.5,4.4,3.3]],
02270 key=lambda x: -x))
02271 [9, 8.6, 7.6, 7, 6.6, 6.5, 5.5, 4.4, 4, 4, 3.7, 3.3, 3.3, 3, 2, 1.9, 1]
02272 """
02273
02274 heap = []
02275 for i, itr in enumerate(iter(pl) for pl in list_of_lists):
02276 try:
02277 item = itr.next()
02278 toadd = (key(item), i, item, itr) if key else (item, i, itr)
02279 heap.append(toadd)
02280 except StopIteration:
02281 pass
02282 heapq.heapify(heap)
02283
02284 if key:
02285 while heap:
02286 _, idx, item, itr = heap[0]
02287 yield item, itr
02288 try:
02289 item = itr.next()
02290 heapq.heapreplace(heap, (key(item), idx, item, itr) )
02291 except StopIteration:
02292 heapq.heappop(heap)
02293
02294 else:
02295 while heap:
02296 item, idx, itr = heap[0]
02297 yield item, itr
02298 try:
02299 heapq.heapreplace(heap, (itr.next(), idx, itr))
02300 except StopIteration:
02301 heapq.heappop(heap)
02302
02303 class _BZ2CompressorFileFacade(object):
02304 """
02305 A file facade for the bz2.BZ2Compressor.
02306 """
02307 def __init__(self, file):
02308 self.file = file
02309 self.compressor = bz2.BZ2Compressor()
02310 self.compressed_bytes_in = 0
02311
02312 def write(self, data):
02313 compressed = self.compressor.compress(data)
02314 if len(compressed) > 0:
02315 self.file.write(compressed)
02316 self.compressed_bytes_in += len(data)
02317
02318 def flush(self):
02319 compressed = self.compressor.flush()
02320 if len(compressed) > 0:
02321 self.file.write(compressed)
02322
02323 def _median(values):
02324 values_len = len(values)
02325 if values_len == 0:
02326 return float('nan')
02327
02328 sorted_values = sorted(values)
02329 if values_len % 2 == 1:
02330 return sorted_values[(values_len + 1) / 2 - 1]
02331
02332 lower = sorted_values[values_len / 2 - 1]
02333 upper = sorted_values[values_len / 2]
02334
02335 return float(lower + upper) / 2