Package rosbag :: Module bag
[frames] | no frames]

Source Code for Module rosbag.bag

   1  # Software License Agreement (BSD License) 
   2  # 
   3  # Copyright (c) 2010, Willow Garage, Inc. 
   4  # All rights reserved. 
   5  # 
   6  # Redistribution and use in source and binary forms, with or without 
   7  # modification, are permitted provided that the following conditions 
   8  # are met: 
   9  # 
  10  #  * Redistributions of source code must retain the above copyright 
  11  #    notice, this list of conditions and the following disclaimer. 
  12  #  * Redistributions in binary form must reproduce the above 
  13  #    copyright notice, this list of conditions and the following 
  14  #    disclaimer in the documentation and/or other materials provided 
  15  #    with the distribution. 
  16  #  * Neither the name of Willow Garage, Inc. nor the names of its 
  17  #    contributors may be used to endorse or promote products derived 
  18  #    from this software without specific prior written permission. 
  19  # 
  20  # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 
  21  # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 
  22  # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS 
  23  # FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE 
  24  # COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, 
  25  # INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, 
  26  # BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; 
  27  # LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER 
  28  # CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 
  29  # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN 
  30  # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 
  31  # POSSIBILITY OF SUCH DAMAGE. 
  32   
  33  """ 
  34  The rosbag Python API. 
  35   
  36  Provides serialization of bag files. 
  37  """ 
  38   
  39  from __future__ import print_function 
  40   
  41  import bisect 
  42  import bz2 
  43  import heapq 
  44  import os 
  45  import re 
  46  import struct 
  47  import sys 
  48  import threading 
  49  import time 
  50  import yaml 
  51   
  52  try: 
  53      from cStringIO import StringIO  # Python 2.x 
  54  except ImportError: 
  55      from io import StringIO  # Python 3.x 
  56   
  57  import genmsg 
  58  import genpy 
  59  import genpy.dynamic 
  60  import genpy.message 
  61   
  62  import roslib.names # still needed for roslib.names.canonicalize_name() 
  63  import rospy 
64 65 -class ROSBagException(Exception):
66 """ 67 Base class for exceptions in rosbag. 68 """
69 - def __init__(self, value):
70 self.value = value
71
72 - def __str__(self):
73 return self.value
74
75 -class ROSBagFormatException(ROSBagException):
76 """ 77 Exceptions for errors relating to the bag file format. 78 """
79 - def __init__(self, value):
80 ROSBagException.__init__(self, value)
81
82 -class ROSBagUnindexedException(ROSBagException):
83 """ 84 Exception for unindexed bags. 85 """
86 - def __init__(self):
87 ROSBagException.__init__(self, 'Unindexed bag')
88
89 -class Compression:
90 """ 91 Allowable compression types 92 """ 93 NONE = 'none' 94 BZ2 = 'bz2'
95
96 -class Bag(object):
97 """ 98 Bag serialize messages to and from a single file on disk using the bag format. 99 """
100 - def __init__(self, f, mode='r', compression=Compression.NONE, chunk_threshold=768 * 1024, allow_unindexed=False, options=None, skip_index=False):
101 """ 102 Open a bag file. The mode can be 'r', 'w', or 'a' for reading (default), 103 writing or appending. The file will be created if it doesn't exist 104 when opened for writing or appending; it will be truncated when opened 105 for writing. Simultaneous reading and writing is allowed when in writing 106 or appending mode. 107 @param f: filename of bag to open or a stream to read from 108 @type f: str or file 109 @param mode: mode, either 'r', 'w', or 'a' 110 @type mode: str 111 @param compression: compression mode, see U{rosbag.Compression} for valid modes 112 @type compression: str 113 @param chunk_threshold: minimum number of uncompressed bytes per chunk 114 @type chunk_threshold: int 115 @param allow_unindexed: if True, allow opening unindexed bags 116 @type allow_unindexed: bool 117 @param options: the bag options (currently: compression and chunk_threshold) 118 @type options: dict 119 @param skip_index: if True, don't read the connection index records on open [2.0+] 120 @type skip_index: bool 121 @raise ValueError: if any argument is invalid 122 @raise ROSBagException: if an error occurs opening file 123 @raise ROSBagFormatException: if bag format is corrupted 124 """ 125 if options is not None: 126 if type(options) is not dict: 127 raise ValueError('options must be of type dict') 128 if 'compression' in options: 129 compression = options['compression'] 130 if 'chunk_threshold' in options: 131 chunk_threshold = options['chunk_threshold'] 132 133 self._file = None 134 self._filename = None 135 self._version = None 136 137 allowed_compressions = [Compression.NONE, Compression.BZ2] 138 if compression not in allowed_compressions: 139 raise ValueError('compression must be one of: %s' % ', '.join(allowed_compressions)) 140 self._compression = compression 141 142 if chunk_threshold < 0: 143 raise ValueError('chunk_threshold must be greater than or equal to zero') 144 self._chunk_threshold = chunk_threshold 145 146 self._skip_index = skip_index 147 148 self._reader = None 149 150 self._file_header_pos = None 151 self._index_data_pos = 0 # (1.2+) 152 153 self._clear_index() 154 155 self._buffer = StringIO() 156 157 self._curr_compression = Compression.NONE 158 159 self._open(f, mode, allow_unindexed) 160 161 self._output_file = self._file
162
163 - def __iter__(self):
164 return self.read_messages()
165
166 - def __enter__(self):
167 return self
168
169 - def __exit__(self, exc_type, exc_value, traceback):
170 self.close()
171 172 @property
173 - def options(self):
174 """Get the options.""" 175 return { 'compression' : self._compression, 'chunk_threshold' : self._chunk_threshold }
176 177 @property
178 - def filename(self):
179 """Get the filename.""" 180 return self._filename
181 182 @property
183 - def version(self):
184 """Get the version.""" 185 return self._version
186 187 @property
188 - def mode(self):
189 """Get the mode.""" 190 return self._mode
191 192 @property
193 - def size(self):
194 """Get the size in bytes.""" 195 if not self._file: 196 raise ValueError('I/O operation on closed bag') 197 198 pos = self._file.tell() 199 self._file.seek(0, os.SEEK_END) 200 size = self._file.tell() 201 self._file.seek(pos) 202 return size
203 204 # compression 205
206 - def _get_compression(self):
207 """Get the compression method to use for writing.""" 208 return self._compression
209
210 - def _set_compression(self, compression):
211 """Set the compression method to use for writing.""" 212 allowed_compressions = [Compression.NONE, Compression.BZ2] 213 if compression not in allowed_compressions: 214 raise ValueError('compression must be one of: %s' % ', '.join(allowed_compressions)) 215 216 self.flush() 217 self._compression = compression
218 219 compression = property(_get_compression, _set_compression) 220 221 # chunk_threshold 222
223 - def _get_chunk_threshold(self):
224 """Get the chunk threshold to use for writing.""" 225 return self._chunk_threshold
226
227 - def _set_chunk_threshold(self, chunk_threshold):
228 """Set the chunk threshold to use for writing.""" 229 if chunk_threshold < 0: 230 raise ValueError('chunk_threshold must be greater than or equal to zero') 231 232 self.flush() 233 self._chunk_threshold = chunk_threshold
234 235 chunk_threshold = property(_get_chunk_threshold, _set_chunk_threshold) 236
237 - def read_messages(self, topics=None, start_time=None, end_time=None, connection_filter=None, raw=False):
238 """ 239 Read messages from the bag, optionally filtered by topic, timestamp and connection details. 240 @param topics: list of topics or a single topic [optional] 241 @type topics: list(str) or str 242 @param start_time: earliest timestamp of message to return [optional] 243 @type start_time: U{genpy.Time} 244 @param end_time: latest timestamp of message to return [optional] 245 @type end_time: U{genpy.Time} 246 @param connection_filter: function to filter connections to include [optional] 247 @type connection_filter: function taking (topic, datatype, md5sum, msg_def, header) and returning bool 248 @param raw: if True, then generate tuples of (datatype, (data, md5sum, position), pytype) 249 @type raw: bool 250 @return: generator of (topic, message, timestamp) tuples for each message in the bag file 251 @rtype: generator of tuples of (str, U{genpy.Message}, U{genpy.Time}) [not raw] or (str, (str, str, str, tuple, class), U{genpy.Time}) [raw] 252 """ 253 self.flush() 254 255 if topics and type(topics) is str: 256 topics = [topics] 257 258 return self._reader.read_messages(topics, start_time, end_time, connection_filter, raw)
259
260 - def flush(self):
261 """ 262 Write the open chunk to disk so subsequent reads will read all messages. 263 @raise ValueError: if bag is closed 264 """ 265 if not self._file: 266 raise ValueError('I/O operation on closed bag') 267 268 if self._chunk_open: 269 self._stop_writing_chunk()
270
271 - def write(self, topic, msg, t=None, raw=False):
272 """ 273 Write a message to the bag. 274 @param topic: name of topic 275 @type topic: str 276 @param msg: message to add to bag, or tuple (if raw) 277 @type msg: Message or tuple of raw message data 278 @param t: ROS time of message publication, if None specifed, use current time [optional] 279 @type t: U{genpy.Time} 280 @param raw: if True, msg is in raw format, i.e. (msg_type, serialized_bytes, md5sum, pytype) 281 @type raw: bool 282 @raise ValueError: if arguments are invalid or bag is closed 283 """ 284 if not self._file: 285 raise ValueError('I/O operation on closed bag') 286 287 if not topic: 288 raise ValueError('topic is invalid') 289 if not msg: 290 raise ValueError('msg is invalid') 291 292 if t is None: 293 t = genpy.Time.from_sec(time.time()) 294 295 # Seek to end (in case previous operation was a read) 296 self._file.seek(0, os.SEEK_END) 297 298 # Open a chunk, if needed 299 if not self._chunk_open: 300 self._start_writing_chunk(t) 301 302 # Unpack raw 303 if raw: 304 if len(msg) == 5: 305 msg_type, serialized_bytes, md5sum, pos, pytype = msg 306 elif len(msg) == 4: 307 msg_type, serialized_bytes, md5sum, pytype = msg 308 else: 309 raise ValueError('msg must be of length 4 or 5') 310 311 # Write connection record, if necessary (currently using a connection per topic; ignoring message connection header) 312 if topic in self._topic_connections: 313 connection_info = self._topic_connections[topic] 314 conn_id = connection_info.id 315 else: 316 conn_id = len(self._connections) 317 318 if raw: 319 if pytype is None: 320 try: 321 pytype = genpy.message.get_message_class(msg_type) 322 except Exception: 323 pytype = None 324 if pytype is None: 325 raise ROSBagException('cannot locate message class and no message class provided for [%s]' % msg_type) 326 327 if pytype._md5sum != md5sum: 328 print('WARNING: md5sum of loaded type [%s] does not match that specified' % msg_type, file=sys.stderr) 329 #raise ROSBagException('md5sum of loaded type does not match that of data being recorded') 330 331 header = { 'topic' : topic, 'type' : msg_type, 'md5sum' : md5sum, 'message_definition' : pytype._full_text } 332 else: 333 header = { 'topic' : topic, 'type' : msg.__class__._type, 'md5sum' : msg.__class__._md5sum, 'message_definition' : msg._full_text } 334 335 connection_info = _ConnectionInfo(conn_id, topic, header) 336 337 self._write_connection_record(connection_info) 338 339 self._connections[conn_id] = connection_info 340 self._topic_connections[topic] = connection_info 341 342 # Create an index entry 343 index_entry = _IndexEntry200(t, self._curr_chunk_info.pos, self._get_chunk_offset()) 344 345 # Update the indexes and current chunk info 346 if conn_id not in self._curr_chunk_connection_indexes: 347 # This is the first message on this connection in the chunk 348 self._curr_chunk_connection_indexes[conn_id] = [index_entry] 349 self._curr_chunk_info.connection_counts[conn_id] = 1 350 else: 351 curr_chunk_connection_index = self._curr_chunk_connection_indexes[conn_id] 352 if index_entry >= curr_chunk_connection_index[-1]: 353 # Test if we're writing chronologically. Can skip binary search if so. 354 curr_chunk_connection_index.append(index_entry) 355 else: 356 bisect.insort_right(curr_chunk_connection_index, index_entry) 357 358 self._curr_chunk_info.connection_counts[conn_id] += 1 359 360 if conn_id not in self._connection_indexes: 361 self._connection_indexes[conn_id] = [index_entry] 362 else: 363 bisect.insort_right(self._connection_indexes[conn_id], index_entry) 364 365 # Update the chunk start/end times 366 if t > self._curr_chunk_info.end_time: 367 self._curr_chunk_info.end_time = t 368 elif t < self._curr_chunk_info.start_time: 369 self._curr_chunk_info.start_time = t 370 371 if not raw: 372 # Serialize the message to the buffer 373 self._buffer.seek(0) 374 self._buffer.truncate(0) 375 msg.serialize(self._buffer) 376 serialized_bytes = self._buffer.getvalue() 377 378 # Write message data record 379 self._write_message_data_record(conn_id, t, serialized_bytes) 380 381 # Check if we want to stop this chunk 382 chunk_size = self._get_chunk_offset() 383 if chunk_size > self._chunk_threshold: 384 self._stop_writing_chunk()
385
386 - def reindex(self):
387 """ 388 Reindexes the bag file. Yields position of each chunk for progress. 389 """ 390 self._clear_index() 391 return self._reader.reindex()
392
393 - def close(self):
394 """ 395 Close the bag file. Closing an already closed bag does nothing. 396 """ 397 if self._file: 398 if self._mode in 'wa': 399 self._stop_writing() 400 401 self._close_file()
402
403 - def __str__(self):
404 rows = [] 405 406 try: 407 if self._filename: 408 rows.append(('path', self._filename)) 409 410 if self._version == 102 and type(self._reader) == _BagReader102_Unindexed: 411 rows.append(('version', '1.2 (unindexed)')) 412 else: 413 rows.append(('version', '%d.%d' % (self._version / 100, self._version % 100))) 414 415 if not self._connection_indexes and not self._chunks: 416 rows.append(('size', _human_readable_size(self.size))) 417 else: 418 if self._chunks: 419 start_stamp = self._chunks[ 0].start_time.to_sec() 420 end_stamp = self._chunks[-1].end_time.to_sec() 421 else: 422 start_stamp = min([index[ 0].time.to_sec() for index in self._connection_indexes.itervalues()]) 423 end_stamp = max([index[-1].time.to_sec() for index in self._connection_indexes.itervalues()]) 424 425 # Show duration 426 duration = end_stamp - start_stamp 427 dur_secs = duration % 60 428 dur_mins = int(duration / 60) 429 dur_hrs = int(dur_mins / 60) 430 if dur_hrs > 0: 431 dur_mins = dur_mins % 60 432 duration_str = '%dhr %d:%02ds (%ds)' % (dur_hrs, dur_mins, dur_secs, duration) 433 elif dur_mins > 0: 434 duration_str = '%d:%02ds (%ds)' % (dur_mins, dur_secs, duration) 435 else: 436 duration_str = '%.1fs' % duration 437 438 rows.append(('duration', duration_str)) 439 440 # Show start and end times 441 rows.append(('start', '%s (%.2f)' % (_time_to_str(start_stamp), start_stamp))) 442 rows.append(('end', '%s (%.2f)' % (_time_to_str(end_stamp), end_stamp))) 443 444 rows.append(('size', _human_readable_size(self.size))) 445 446 if self._chunks: 447 num_messages = 0 448 for c in self._chunks: 449 for counts in c.connection_counts.itervalues(): 450 num_messages += counts 451 else: 452 num_messages = sum([len(index) for index in self._connection_indexes.itervalues()]) 453 rows.append(('messages', str(num_messages))) 454 455 # Show compression information 456 if len(self._chunk_headers) == 0: 457 rows.append(('compression', 'none')) 458 else: 459 compression_counts = {} 460 compression_uncompressed = {} 461 compression_compressed = {} 462 for chunk_header in self._chunk_headers.itervalues(): 463 if chunk_header.compression not in compression_counts: 464 compression_counts[chunk_header.compression] = 1 465 compression_uncompressed[chunk_header.compression] = chunk_header.uncompressed_size 466 compression_compressed[chunk_header.compression] = chunk_header.compressed_size 467 else: 468 compression_counts[chunk_header.compression] += 1 469 compression_uncompressed[chunk_header.compression] += chunk_header.uncompressed_size 470 compression_compressed[chunk_header.compression] += chunk_header.compressed_size 471 472 chunk_count = len(self._chunk_headers) 473 474 compressions = [] 475 for count, compression in reversed(sorted([(v, k) for k, v in compression_counts.items()])): 476 if compression != Compression.NONE: 477 fraction = (100.0 * compression_compressed[compression]) / compression_uncompressed[compression] 478 compressions.append('%s [%d/%d chunks; %.2f%%]' % (compression, count, chunk_count, fraction)) 479 else: 480 compressions.append('%s [%d/%d chunks]' % (compression, count, chunk_count)) 481 rows.append(('compression', ', '.join(compressions))) 482 483 all_uncompressed = (sum([count for c, count in compression_counts.items() if c != Compression.NONE]) == 0) 484 if not all_uncompressed: 485 total_uncompressed_size = sum((h.uncompressed_size for h in self._chunk_headers.itervalues())) 486 total_compressed_size = sum((h.compressed_size for h in self._chunk_headers.itervalues())) 487 488 total_uncompressed_size_str = _human_readable_size(total_uncompressed_size) 489 total_compressed_size_str = _human_readable_size(total_compressed_size) 490 total_size_str_length = max([len(total_uncompressed_size_str), len(total_compressed_size_str)]) 491 492 if duration > 0: 493 uncompressed_rate_str = _human_readable_size(total_uncompressed_size / duration) 494 compressed_rate_str = _human_readable_size(total_compressed_size / duration) 495 rate_str_length = max([len(uncompressed_rate_str), len(compressed_rate_str)]) 496 497 rows.append(('uncompressed', '%*s @ %*s/s' % 498 (total_size_str_length, total_uncompressed_size_str, rate_str_length, uncompressed_rate_str))) 499 rows.append(('compressed', '%*s @ %*s/s (%.2f%%)' % 500 (total_size_str_length, total_compressed_size_str, rate_str_length, compressed_rate_str, (100.0 * total_compressed_size) / total_uncompressed_size))) 501 else: 502 rows.append(('uncompressed', '%*s' % (total_size_str_length, total_uncompressed_size_str))) 503 rows.append(('compressed', '%*s' % (total_size_str_length, total_compressed_size_str))) 504 505 datatypes = set() 506 datatype_infos = [] 507 for connection in self._connections.itervalues(): 508 if connection.datatype in datatypes: 509 continue 510 datatype_infos.append((connection.datatype, connection.md5sum, connection.msg_def)) 511 datatypes.add(connection.datatype) 512 513 topics = sorted(set([c.topic for c in self._get_connections()])) 514 topic_datatypes = {} 515 topic_conn_counts = {} 516 topic_msg_counts = {} 517 topic_freqs_median = {} 518 for topic in topics: 519 connections = list(self._get_connections(topic)) 520 521 topic_datatypes[topic] = connections[0].datatype 522 topic_conn_counts[topic] = len(connections) 523 524 msg_count = 0 525 for connection in connections: 526 for chunk in self._chunks: 527 msg_count += chunk.connection_counts.get(connection.id, 0) 528 topic_msg_counts[topic] = msg_count 529 530 if self._connection_indexes_read: 531 stamps = [entry.time.to_sec() for entry in self._get_entries(connections)] 532 if len(stamps) > 1: 533 periods = [s1 - s0 for s1, s0 in zip(stamps[1:], stamps[:-1])] 534 med_period = _median(periods) 535 if med_period > 0.0: 536 topic_freqs_median[topic] = 1.0 / med_period 537 538 topics = sorted(topic_datatypes.iterkeys()) 539 max_topic_len = max([len(topic) for topic in topics]) 540 max_datatype_len = max([len(datatype) for datatype in datatypes]) 541 max_msg_count_len = max([len('%d' % msg_count) for msg_count in topic_msg_counts.itervalues()]) 542 max_freq_median_len = max([len(_human_readable_frequency(freq)) for freq in topic_freqs_median.itervalues()]) if len(topic_freqs_median) > 0 else 0 543 544 # Show datatypes 545 for i, (datatype, md5sum, msg_def) in enumerate(sorted(datatype_infos)): 546 s = '%-*s [%s]' % (max_datatype_len, datatype, md5sum) 547 if i == 0: 548 rows.append(('types', s)) 549 else: 550 rows.append(('', s)) 551 552 # Show topics 553 for i, topic in enumerate(topics): 554 topic_msg_count = topic_msg_counts[topic] 555 556 s = '%-*s %*d %s' % (max_topic_len, topic, max_msg_count_len, topic_msg_count, 'msgs' if topic_msg_count > 1 else 'msg ') 557 if topic in topic_freqs_median: 558 s += ' @ %*s' % (max_freq_median_len, _human_readable_frequency(topic_freqs_median[topic])) 559 else: 560 s += ' %*s' % (max_freq_median_len, '') 561 562 s += ' : %-*s' % (max_datatype_len, topic_datatypes[topic]) 563 if topic_conn_counts[topic] > 1: 564 s += ' (%d connections)' % topic_conn_counts[topic] 565 566 if i == 0: 567 rows.append(('topics', s)) 568 else: 569 rows.append(('', s)) 570 571 except Exception as ex: 572 raise 573 574 first_column_width = max([len(field) for field, _ in rows]) + 1 575 576 s = '' 577 for (field, value) in rows: 578 if field: 579 s += '%-*s %s\n' % (first_column_width, field + ':', value) 580 else: 581 s += '%-*s %s\n' % (first_column_width, '', value) 582 583 return s.rstrip()
584
585 - def _get_yaml_info(self, key=None):
586 s = '' 587 588 try: 589 if self._filename: 590 s += 'path: %s\n' % self._filename 591 592 if self._version == 102 and type(self._reader) == _BagReader102_Unindexed: 593 s += 'version: 1.2 (unindexed)\n' 594 else: 595 s += 'version: %d.%d\n' % (self._version / 100, self._version % 100) 596 597 if not self._connection_indexes and not self._chunks: 598 s += 'size: %d\n' % self.size 599 s += 'indexed: False\n' 600 else: 601 if self._chunks: 602 start_stamp = self._chunks[ 0].start_time.to_sec() 603 end_stamp = self._chunks[-1].end_time.to_sec() 604 else: 605 start_stamp = min([index[ 0].time.to_sec() for index in self._connection_indexes.itervalues()]) 606 end_stamp = max([index[-1].time.to_sec() for index in self._connection_indexes.itervalues()]) 607 608 duration = end_stamp - start_stamp 609 s += 'duration: %.6f\n' % duration 610 s += 'start: %.6f\n' % start_stamp 611 s += 'end: %.6f\n' % end_stamp 612 s += 'size: %d\n' % self.size 613 if self._chunks: 614 num_messages = 0 615 for c in self._chunks: 616 for counts in c.connection_counts.itervalues(): 617 num_messages += counts 618 else: 619 num_messages = sum([len(index) for index in self._connection_indexes.itervalues()]) 620 s += 'messages: %d\n' % num_messages 621 s += 'indexed: True\n' 622 623 # Show compression information 624 if len(self._chunk_headers) == 0: 625 s += 'compression: none\n' 626 else: 627 compression_counts = {} 628 compression_uncompressed = {} 629 compression_compressed = {} 630 for chunk_header in self._chunk_headers.itervalues(): 631 if chunk_header.compression not in compression_counts: 632 compression_counts[chunk_header.compression] = 1 633 compression_uncompressed[chunk_header.compression] = chunk_header.uncompressed_size 634 compression_compressed[chunk_header.compression] = chunk_header.compressed_size 635 else: 636 compression_counts[chunk_header.compression] += 1 637 compression_uncompressed[chunk_header.compression] += chunk_header.uncompressed_size 638 compression_compressed[chunk_header.compression] += chunk_header.compressed_size 639 640 chunk_count = len(self._chunk_headers) 641 642 main_compression_count, main_compression = list(reversed(sorted([(v, k) for k, v in compression_counts.items()])))[0] 643 s += 'compression: %s\n' % str(main_compression) 644 645 all_uncompressed = (sum([count for c, count in compression_counts.items() if c != Compression.NONE]) == 0) 646 if not all_uncompressed: 647 s += 'uncompressed: %d\n' % sum((h.uncompressed_size for h in self._chunk_headers.itervalues())) 648 s += 'compressed: %d\n' % sum((h.compressed_size for h in self._chunk_headers.itervalues())) 649 650 datatypes = set() 651 datatype_infos = [] 652 for connection in self._connections.itervalues(): 653 if connection.datatype in datatypes: 654 continue 655 datatype_infos.append((connection.datatype, connection.md5sum, connection.msg_def)) 656 datatypes.add(connection.datatype) 657 658 topics = sorted(set([c.topic for c in self._get_connections()])) 659 topic_datatypes = {} 660 topic_conn_counts = {} 661 topic_msg_counts = {} 662 topic_freqs_median = {} 663 for topic in topics: 664 connections = list(self._get_connections(topic)) 665 666 topic_datatypes[topic] = connections[0].datatype 667 topic_conn_counts[topic] = len(connections) 668 669 msg_count = 0 670 for connection in connections: 671 for chunk in self._chunks: 672 msg_count += chunk.connection_counts.get(connection.id, 0) 673 topic_msg_counts[topic] = msg_count 674 675 if self._connection_indexes_read: 676 stamps = [entry.time.to_sec() for entry in self._get_entries(connections)] 677 if len(stamps) > 1: 678 periods = [s1 - s0 for s1, s0 in zip(stamps[1:], stamps[:-1])] 679 med_period = _median(periods) 680 if med_period > 0.0: 681 topic_freqs_median[topic] = 1.0 / med_period 682 683 topics = sorted(topic_datatypes.iterkeys()) 684 max_topic_len = max([len(topic) for topic in topics]) 685 max_datatype_len = max([len(datatype) for datatype in datatypes]) 686 max_msg_count_len = max([len('%d' % msg_count) for msg_count in topic_msg_counts.itervalues()]) 687 max_freq_median_len = max([len(_human_readable_frequency(freq)) for freq in topic_freqs_median.itervalues()]) if len(topic_freqs_median) > 0 else 0 688 689 # Show datatypes 690 s += 'types:\n' 691 for i, (datatype, md5sum, msg_def) in enumerate(sorted(datatype_infos)): 692 s += ' - type: %s\n' % datatype 693 s += ' md5: %s\n' % md5sum 694 695 # Show topics 696 s += 'topics:\n' 697 for i, topic in enumerate(topics): 698 topic_msg_count = topic_msg_counts[topic] 699 700 s += ' - topic: %s\n' % topic 701 s += ' type: %s\n' % topic_datatypes[topic] 702 s += ' messages: %d\n' % topic_msg_count 703 704 if topic_conn_counts[topic] > 1: 705 s += ' connections: %d\n' % topic_conn_counts[topic] 706 707 if topic in topic_freqs_median: 708 s += ' frequency: %.4f\n' % topic_freqs_median[topic] 709 710 if not key: 711 return s 712 713 class DictObject(object): 714 def __init__(self, d): 715 for a, b in d.items(): 716 if isinstance(b, (list, tuple)): 717 setattr(self, a, [DictObject(x) if isinstance(x, dict) else x for x in b]) 718 else: 719 setattr(self, a, DictObject(b) if isinstance(b, dict) else b)
720 721 obj = DictObject(yaml.load(s)) 722 try: 723 val = eval('obj.' + key) 724 except Exception as ex: 725 print('Error getting key "%s"' % key, file=sys.stderr) 726 return None 727 728 def print_yaml(val, indent=0): 729 indent_str = ' ' * indent 730 731 if type(val) is list: 732 s = '' 733 for item in val: 734 s += '%s- %s\n' % (indent_str, print_yaml(item, indent + 1)) 735 return s 736 elif type(val) is DictObject: 737 s = '' 738 for i, (k, v) in enumerate(val.__dict__.items()): 739 if i != 0: 740 s += indent_str 741 s += '%s: %s' % (k, str(v)) 742 if i < len(val.__dict__) - 1: 743 s += '\n' 744 return s 745 else: 746 return indent_str + str(val) 747 748 return print_yaml(val) 749 750 except Exception as ex: 751 raise 752 753 ### Internal API ### 754 755 @property
756 - def _has_compressed_chunks(self):
757 if not self._chunk_headers: 758 return False 759 760 return any((h.compression != Compression.NONE for h in self._chunk_headers.itervalues()))
761 762 @property
763 - def _uncompressed_size(self):
764 if not self._chunk_headers: 765 return self.size 766 767 return sum((h.uncompressed_size for h in self._chunk_headers.itervalues()))
768
769 - def _read_message(self, position, raw=False):
770 """ 771 Read the message from the given position in the file. 772 """ 773 self.flush() 774 return self._reader.seek_and_read_message_data_record(position, raw)
775 776 # Index accessing 777
778 - def _get_connections(self, topics=None, connection_filter=None):
779 """ 780 Yield the connections, optionally filtering by topic and/or connection information. 781 """ 782 if topics: 783 if type(topics) is str: 784 topics = set([roslib.names.canonicalize_name(topics)]) 785 else: 786 topics = set([roslib.names.canonicalize_name(t) for t in topics]) 787 788 for c in self._connections.itervalues(): 789 if topics and c.topic not in topics and roslib.names.canonicalize_name(c.topic) not in topics: 790 continue 791 if connection_filter and not connection_filter(c.topic, c.datatype, c.md5sum, c.msg_def, c.header): 792 continue 793 yield c
794
795 - def _get_entries(self, connections=None, start_time=None, end_time=None):
796 """ 797 Yield index entries on the given connections in the given time range. 798 """ 799 for entry, _ in _mergesort(self._get_indexes(connections), key=lambda entry: entry.time): 800 if start_time and entry.time < start_time: 801 continue 802 if end_time and entry.time > end_time: 803 return 804 yield entry
805
806 - def _get_entries_reverse(self, connections=None, start_time=None, end_time=None):
807 """ 808 Yield index entries on the given connections in the given time range in reverse order. 809 """ 810 for entry, _ in _mergesort((reversed(index) for index in self._get_indexes(connections)), key=lambda entry: -entry.time.to_sec()): 811 if end_time and entry.time > end_time: 812 continue 813 if start_time and entry.time < start_time: 814 return 815 yield entry
816
817 - def _get_entry(self, t, connections=None):
818 """ 819 Return the first index entry on/before the given time on the given connections 820 """ 821 indexes = self._get_indexes(connections) 822 823 entry = _IndexEntry(t) 824 825 first_entry = None 826 827 for index in indexes: 828 i = bisect.bisect_right(index, entry) - 1 829 if i >= 0: 830 index_entry = index[i] 831 if first_entry is None or index_entry > first_entry: 832 first_entry = index_entry 833 834 return first_entry
835
836 - def _get_entry_after(self, t, connections=None):
837 """ 838 Return the first index entry after the given time on the given connections 839 """ 840 indexes = self._get_indexes(connections) 841 842 entry = _IndexEntry(t) 843 844 first_entry = None 845 846 for index in indexes: 847 i = bisect.bisect_right(index, entry) 848 if i <= len(index) - 1: 849 index_entry = index[i] 850 if first_entry is None or index_entry < first_entry: 851 first_entry = index_entry 852 853 return first_entry
854
855 - def _get_indexes(self, connections):
856 """ 857 Get the indexes for the given connections. 858 """ 859 if not self._connection_indexes_read: 860 self._reader._read_connection_index_records() 861 862 if connections is None: 863 return self._connection_indexes.itervalues() 864 else: 865 return (self._connection_indexes[c.id] for c in connections)
866 867 ### Implementation ### 868
869 - def _clear_index(self):
870 self._connection_indexes_read = False 871 self._connection_indexes = {} # id -> IndexEntry[] (1.2+) 872 873 self._topic_connections = {} # topic -> connection_id 874 self._connections = {} # id -> ConnectionInfo 875 self._connection_count = 0 # (2.0) 876 self._chunk_count = 0 # (2.0) 877 self._chunks = [] # ChunkInfo[] (2.0) 878 self._chunk_headers = {} # chunk_pos -> ChunkHeader (2.0) 879 880 self._chunk_open = False 881 self._curr_chunk_info = None 882 self._curr_chunk_data_pos = None 883 self._curr_chunk_connection_indexes = {}
884
885 - def _open(self, f, mode, allow_unindexed):
886 if not f: 887 raise ValueError('filename (or stream) is invalid') 888 889 try: 890 if mode == 'r': self._open_read(f, allow_unindexed) 891 elif mode == 'w': self._open_write(f) 892 elif mode == 'a': self._open_append(f, allow_unindexed) 893 else: 894 raise ValueError('mode "%s" is invalid' % mode) 895 except struct.error: 896 raise ROSBagFormatException('error with bag')
897
898 - def _open_read(self, f, allow_unindexed):
899 if isinstance(f, file): 900 self._file = f 901 self._filename = None 902 else: 903 self._file = open(f, 'rb') 904 self._filename = f 905 906 self._mode = 'r' 907 908 try: 909 self._version = self._read_version() 910 except: 911 self._version = None 912 self._close_file() 913 raise 914 915 try: 916 self._create_reader() 917 self._reader.start_reading() 918 except ROSBagUnindexedException as ex: 919 if not allow_unindexed: 920 self._close_file() 921 raise 922 except: 923 self._close_file() 924 raise
925
926 - def _open_write(self, f):
927 if isinstance(f, file): 928 self._file = f 929 self._filename = None 930 else: 931 self._file = open(f, 'w+b') 932 self._filename = f 933 934 self._mode = 'w' 935 936 self._version = 200 937 938 try: 939 self._create_reader() 940 self._start_writing() 941 except: 942 self._close_file() 943 raise
944
945 - def _open_append(self, f, allow_unindexed):
946 if isinstance(f, file): 947 self._file = f 948 self._filename = None 949 else: 950 try: 951 # Test if the file already exists 952 open(f, 'r').close() 953 954 # File exists: open in read with update mode 955 self._file = open(f, 'r+b') 956 except IOError: 957 # File doesn't exist: open in write mode 958 self._file = open(f, 'w+b') 959 960 self._filename = f 961 962 self._mode = 'a' 963 964 try: 965 self._version = self._read_version() 966 except: 967 self._version = None 968 self._close_file() 969 raise 970 971 if self._version != 200: 972 self._file.close() 973 raise ROSBagException('bag version %d is unsupported for appending' % self._version) 974 975 try: 976 self._start_appending() 977 except ROSBagUnindexedException: 978 if not allow_unindexed: 979 self._close_file() 980 raise 981 except: 982 self._close_file() 983 raise
984
985 - def _close_file(self):
986 self._file.close() 987 self._file = None
988
989 - def _create_reader(self):
990 """ 991 @raise ROSBagException: if the bag version is unsupported 992 """ 993 major_version, minor_version = self._version / 100, self._version % 100 994 if major_version == 2: 995 self._reader = _BagReader200(self) 996 elif major_version == 1: 997 if minor_version == 1: 998 raise ROSBagException('unsupported bag version %d. Please convert bag to newer format.' % self._version) 999 else: 1000 # Get the op code of the first record. If it's FILE_HEADER, then we have an indexed 1.2 bag. 1001 first_record_pos = self._file.tell() 1002 header = _read_header(self._file) 1003 op = _read_uint8_field(header, 'op') 1004 self._file.seek(first_record_pos) 1005 1006 if op == _OP_FILE_HEADER: 1007 self._reader = _BagReader102_Indexed(self) 1008 else: 1009 self._reader = _BagReader102_Unindexed(self) 1010 else: 1011 raise ROSBagException('unknown bag version %d' % self._version)
1012
1013 - def _read_version(self):
1014 """ 1015 @raise ROSBagException: if the file is empty, or the version line can't be parsed 1016 """ 1017 version_line = self._file.readline().rstrip() 1018 if len(version_line) == 0: 1019 raise ROSBagException('empty file') 1020 1021 matches = re.match("#ROS(.*) V(\d).(\d)", version_line) 1022 if matches is None or len(matches.groups()) != 3: 1023 raise ROSBagException('This does not appear to be a bag file') 1024 1025 version_type, major_version_str, minor_version_str = matches.groups() 1026 1027 version = int(major_version_str) * 100 + int(minor_version_str) 1028 1029 return version
1030
1031 - def _start_writing(self):
1032 self._file.write(_VERSION + '\n') 1033 self._file_header_pos = self._file.tell() 1034 self._write_file_header_record(0, 0, 0) 1035
1036 - def _stop_writing(self):
1037 # Write the open chunk (if any) to file 1038 self.flush() 1039 1040 # Remember this location as the start of the index 1041 self._index_data_pos = self._file.tell() 1042 1043 # Write connection infos 1044 for connection_info in self._connections.itervalues(): 1045 self._write_connection_record(connection_info) 1046 1047 # Write chunk infos 1048 for chunk_info in self._chunks: 1049 self._write_chunk_info_record(chunk_info) 1050 1051 # Re-write the file header 1052 self._file.seek(self._file_header_pos) 1053 self._write_file_header_record(self._index_data_pos, len(self._connections), len(self._chunks))
1054
1055 - def _start_appending(self):
1056 self._file_header_pos = self._file.tell() 1057 1058 self._create_reader() 1059 self._reader.start_reading() 1060 1061 # Truncate the file to chop off the index 1062 self._file.truncate(self._index_data_pos) 1063 self._reader.index_data_pos = 0 1064 1065 # Rewrite the file header, clearing the index position (so we know if the index is invalid) 1066 self._file.seek(self._file_header_pos); 1067 self._write_file_header_record(0, 0, 0) 1068 1069 # Seek to the end of the file 1070 self._file.seek(0, os.SEEK_END)
1071
1072 - def _start_writing_chunk(self, t):
1073 self._curr_chunk_info = _ChunkInfo(self._file.tell(), t, t) 1074 self._write_chunk_header(_ChunkHeader(self._compression, 0, 0)) 1075 self._curr_chunk_data_pos = self._file.tell() 1076 self._set_compression_mode(self._compression) 1077 self._chunk_open = True
1078
1079 - def _get_chunk_offset(self):
1080 if self._compression == Compression.NONE: 1081 return self._file.tell() - self._curr_chunk_data_pos 1082 else: 1083 return self._output_file.compressed_bytes_in
1084
1085 - def _stop_writing_chunk(self):
1086 # Add this chunk to the index 1087 self._chunks.append(self._curr_chunk_info) 1088 1089 # Get the uncompressed and compressed sizes 1090 uncompressed_size = self._get_chunk_offset() 1091 self._set_compression_mode(Compression.NONE) 1092 compressed_size = self._file.tell() - self._curr_chunk_data_pos 1093 1094 # Rewrite the chunk header with the size of the chunk (remembering current offset) 1095 end_of_chunk_pos = self._file.tell() 1096 self._file.seek(self._curr_chunk_info.pos) 1097 chunk_header = _ChunkHeader(self._compression, compressed_size, uncompressed_size, self._curr_chunk_data_pos) 1098 self._write_chunk_header(chunk_header) 1099 self._chunk_headers[self._curr_chunk_info.pos] = chunk_header 1100 1101 # Write out the connection indexes and clear them 1102 self._file.seek(end_of_chunk_pos) 1103 for connection_id, entries in self._curr_chunk_connection_indexes.items(): 1104 self._write_connection_index_record(connection_id, entries) 1105 self._curr_chunk_connection_indexes.clear() 1106 1107 # Flag that we're starting a new chunk 1108 self._chunk_open = False
1109
1110 - def _set_compression_mode(self, compression):
1111 # Flush the compressor, if needed 1112 if self._curr_compression != Compression.NONE: 1113 self._output_file.flush() 1114 1115 # Create the compressor 1116 if compression == Compression.BZ2: 1117 self._output_file = _BZ2CompressorFileFacade(self._file) 1118 elif compression == Compression.NONE: 1119 self._output_file = self._file 1120 else: 1121 raise ROSBagException('unsupported compression type: %s' % compression) 1122 1123 self._curr_compression = compression
1124
1125 - def _write_file_header_record(self, index_pos, connection_count, chunk_count):
1126 header = { 1127 'op': _pack_uint8(_OP_FILE_HEADER), 1128 'index_pos': _pack_uint64(index_pos), 1129 'conn_count': _pack_uint32(connection_count), 1130 'chunk_count': _pack_uint32(chunk_count) 1131 } 1132 _write_record(self._file, header, padded_size=_FILE_HEADER_LENGTH)
1133
1134 - def _write_connection_record(self, connection_info):
1135 header = { 1136 'op': _pack_uint8(_OP_CONNECTION), 1137 'topic': connection_info.topic, 1138 'conn': _pack_uint32(connection_info.id) 1139 } 1140 1141 _write_header(self._output_file, header) 1142 1143 _write_header(self._output_file, connection_info.header)
1144
1145 - def _write_message_data_record(self, connection_id, t, serialized_bytes):
1146 header = { 1147 'op': _pack_uint8(_OP_MSG_DATA), 1148 'conn': _pack_uint32(connection_id), 1149 'time': _pack_time(t) 1150 } 1151 _write_record(self._output_file, header, serialized_bytes)
1152
1153 - def _write_chunk_header(self, chunk_header):
1154 header = { 1155 'op': _pack_uint8(_OP_CHUNK), 1156 'compression': chunk_header.compression, 1157 'size': _pack_uint32(chunk_header.uncompressed_size) 1158 } 1159 _write_header(self._file, header) 1160 1161 self._file.write(_pack_uint32(chunk_header.compressed_size))
1162
1163 - def _write_connection_index_record(self, connection_id, entries):
1164 header = { 1165 'op': _pack_uint8(_OP_INDEX_DATA), 1166 'conn': _pack_uint32(connection_id), 1167 'ver': _pack_uint32(_INDEX_VERSION), 1168 'count': _pack_uint32(len(entries)) 1169 } 1170 1171 buffer = self._buffer 1172 buffer.seek(0) 1173 buffer.truncate(0) 1174 for entry in entries: 1175 buffer.write(_pack_time (entry.time)) 1176 buffer.write(_pack_uint32(entry.offset)) 1177 1178 _write_record(self._file, header, buffer.getvalue()) 1179
1180 - def _write_chunk_info_record(self, chunk_info):
1181 header = { 1182 'op': _pack_uint8 (_OP_CHUNK_INFO), 1183 'ver': _pack_uint32(_CHUNK_INDEX_VERSION), 1184 'chunk_pos': _pack_uint64(chunk_info.pos), 1185 'start_time': _pack_time(chunk_info.start_time), 1186 'end_time': _pack_time(chunk_info.end_time), 1187 'count': _pack_uint32(len(chunk_info.connection_counts)) 1188 } 1189 1190 buffer = self._buffer 1191 buffer.seek(0) 1192 buffer.truncate(0) 1193 for connection_id, count in chunk_info.connection_counts.items(): 1194 buffer.write(_pack_uint32(connection_id)) 1195 buffer.write(_pack_uint32(count)) 1196 1197 _write_record(self._file, header, buffer.getvalue())
1198 1199 ### Implementation ### 1200 1201 _message_types = {} # md5sum -> type 1202 1203 _OP_MSG_DEF = 0x01 1204 _OP_MSG_DATA = 0x02 1205 _OP_FILE_HEADER = 0x03 1206 _OP_INDEX_DATA = 0x04 1207 _OP_CHUNK = 0x05 1208 _OP_CHUNK_INFO = 0x06 1209 _OP_CONNECTION = 0x07 1210 1211 _OP_CODES = { 1212 _OP_MSG_DEF: 'MSG_DEF', 1213 _OP_MSG_DATA: 'MSG_DATA', 1214 _OP_FILE_HEADER: 'FILE_HEADER', 1215 _OP_INDEX_DATA: 'INDEX_DATA', 1216 _OP_CHUNK: 'CHUNK', 1217 _OP_CHUNK_INFO: 'CHUNK_INFO', 1218 _OP_CONNECTION: 'CONNECTION' 1219 } 1220 1221 _VERSION = '#ROSBAG V2.0' 1222 _FILE_HEADER_LENGTH = 4096 1223 _INDEX_VERSION = 1 1224 _CHUNK_INDEX_VERSION = 1
1225 1226 -class _ConnectionInfo(object):
1227 - def __init__(self, id, topic, header):
1228 try: 1229 datatype, md5sum, msg_def = header['type'], header['md5sum'], header['message_definition'] 1230 except KeyError as ex: 1231 raise ROSBagFormatException('connection header field %s not found' % str(ex)) 1232 1233 self.id = id 1234 self.topic = topic 1235 self.datatype = datatype 1236 self.md5sum = md5sum 1237 self.msg_def = msg_def 1238 self.header = header
1239
1240 - def __str__(self):
1241 return '%d on %s: %s' % (self.id, self.topic, str(self.header))
1242
1243 -class _ChunkInfo(object):
1244 - def __init__(self, pos, start_time, end_time):
1245 self.pos = pos 1246 self.start_time = start_time 1247 self.end_time = end_time 1248 1249 self.connection_counts = {}
1250
1251 - def __str__(self):
1252 s = 'chunk_pos: %d\n' % self.pos 1253 s += 'start_time: %s\n' % str(self.start_time) 1254 s += 'end_time: %s\n' % str(self.end_time) 1255 s += 'connections: %d\n' % len(self.connection_counts) 1256 s += '\n'.join([' - [%4d] %d' % (connection_id, count) for connection_id, count in self.connection_counts.items()]) 1257 return s
1258
1259 -class _ChunkHeader(object):
1260 - def __init__(self, compression, compressed_size, uncompressed_size, data_pos=0):
1261 self.compression = compression 1262 self.compressed_size = compressed_size 1263 self.uncompressed_size = uncompressed_size 1264 self.data_pos = data_pos
1265
1266 - def __str__(self):
1267 if self.uncompressed_size > 0: 1268 ratio = 100 * (float(self.compressed_size) / self.uncompressed_size) 1269 return 'compression: %s, size: %d, uncompressed: %d (%.2f%%)' % (self.compression, self.compressed_size, self.uncompressed_size, ratio) 1270 else: 1271 return 'compression: %s, size: %d, uncompressed: %d' % (self.compression, self.compressed_size, self.uncompressed_size)
1272
1273 -class _IndexEntry(object):
1274 - def __init__(self, time):
1275 self.time = time
1276
1277 - def __cmp__(self, other):
1278 return self.time.__cmp__(other.time)
1279
1280 -class _IndexEntry102(_IndexEntry):
1281 - def __init__(self, time, offset):
1282 self.time = time 1283 self.offset = offset
1284 1285 @property
1286 - def position(self):
1287 return self.offset
1288
1289 - def __str__(self):
1290 return '%d.%d: %d' % (self.time.secs, self.time.nsecs, self.offset)
1291
1292 -class _IndexEntry200(_IndexEntry):
1293 - def __init__(self, time, chunk_pos, offset):
1294 self.time = time 1295 self.chunk_pos = chunk_pos 1296 self.offset = offset
1297 1298 @property
1299 - def position(self):
1300 return (self.chunk_pos, self.offset)
1301
1302 - def __str__(self):
1303 return '%d.%d: %d+%d' % (self.time.secs, self.time.nsecs, self.chunk_pos, self.offset)
1304
1305 -def _get_message_type(info):
1306 message_type = _message_types.get(info.md5sum) 1307 if message_type is None: 1308 try: 1309 message_type = genpy.dynamic.generate_dynamic(info.datatype, info.msg_def)[info.datatype] 1310 if (message_type._md5sum != info.md5sum): 1311 print('WARNING: For type [%s] stored md5sum [%s] does not match message definition [%s].\n Try: "rosrun rosbag fix_msg_defs.py old_bag new_bag."'%(info.datatype, info.md5sum, message_type._md5sum), file=sys.stderr) 1312 except genmsg.InvalidMsgSpec: 1313 message_type = genpy.dynamic.generate_dynamic(info.datatype, "")[info.datatype] 1314 print('WARNING: For type [%s] stored md5sum [%s] has invalid message definition."'%(info.datatype, info.md5sum), file=sys.stderr) 1315 except genmsg.MsgGenerationException as ex: 1316 raise ROSBagException('Error generating datatype %s: %s' % (info.datatype, str(ex))) 1317 1318 _message_types[info.md5sum] = message_type 1319 1320 return message_type
1321
1322 -def _read_uint8 (f): return _unpack_uint8 (f.read(1))
1323 -def _read_uint32(f): return _unpack_uint32(f.read(4))
1324 -def _read_uint64(f): return _unpack_uint64(f.read(8))
1325 -def _read_time (f): return _unpack_time (f.read(8))
1326
1327 -def _unpack_uint8(v): return struct.unpack('<B', v)[0]
1328 -def _unpack_uint32(v): return struct.unpack('<L', v)[0]
1329 -def _unpack_uint64(v): return struct.unpack('<Q', v)[0]
1330 -def _unpack_time(v): return rospy.Time(*struct.unpack('<LL', v))
1331
1332 -def _pack_uint8(v): return struct.pack('<B', v)
1333 -def _pack_uint32(v): return struct.pack('<L', v)
1334 -def _pack_uint64(v): return struct.pack('<Q', v)
1335 -def _pack_time(v): return _pack_uint32(v.secs) + _pack_uint32(v.nsecs)
1336
1337 -def _read(f, size):
1338 data = f.read(size) 1339 if len(data) != size: 1340 raise ROSBagException('expecting %d bytes, read %d' % (size, len(data))) 1341 return data
1342
1343 -def _skip_record(f):
1344 _skip_sized(f) # skip header 1345 _skip_sized(f) # skip data
1346
1347 -def _skip_sized(f):
1348 size = _read_uint32(f) 1349 f.seek(size, os.SEEK_CUR)
1350
1351 -def _read_sized(f):
1352 try: 1353 size = _read_uint32(f) 1354 except struct.error as ex: 1355 raise ROSBagFormatException('error unpacking uint32: %s' % str(ex)) 1356 return _read(f, size)
1357
1358 -def _write_sized(f, v):
1359 f.write(_pack_uint32(len(v))) 1360 f.write(v)
1361
1362 -def _read_field(header, field, unpack_fn):
1363 if field not in header: 1364 raise ROSBagFormatException('expected "%s" field in record' % field) 1365 1366 try: 1367 value = unpack_fn(header[field]) 1368 except Exception as ex: 1369 raise ROSBagFormatException('error reading field "%s": %s' % (field, str(ex))) 1370 1371 return value
1372
1373 -def _read_str_field (header, field): return _read_field(header, field, lambda v: v)
1374 -def _read_uint8_field (header, field): return _read_field(header, field, _unpack_uint8)
1375 -def _read_uint32_field(header, field): return _read_field(header, field, _unpack_uint32)
1376 -def _read_uint64_field(header, field): return _read_field(header, field, _unpack_uint64)
1377 -def _read_time_field (header, field): return _read_field(header, field, _unpack_time)
1378
1379 -def _write_record(f, header, data='', padded_size=None):
1380 header_str = _write_header(f, header) 1381 1382 if padded_size is not None: 1383 header_len = len(header_str) 1384 if header_len < padded_size: 1385 data = ' ' * (padded_size - header_len) 1386 else: 1387 data = '' 1388 1389 _write_sized(f, data)
1390
1391 -def _write_header(f, header):
1392 header_str = ''.join([_pack_uint32(len(k) + 1 + len(v)) + k + '=' + v for k, v in header.items()]) 1393 _write_sized(f, header_str) 1394 return header_str
1395
1396 -def _read_header(f, req_op=None):
1397 bag_pos = f.tell() 1398 1399 # Read header 1400 try: 1401 header = _read_sized(f) 1402 except ROSBagException as ex: 1403 raise ROSBagFormatException('Error reading header: %s' % str(ex)) 1404 1405 # Parse header into a dict 1406 header_dict = {} 1407 while header != '': 1408 # Read size 1409 if len(header) < 4: 1410 raise ROSBagFormatException('Error reading header field') 1411 (size,) = struct.unpack('<L', header[:4]) # @todo reindex: catch struct.error 1412 header = header[4:] 1413 1414 # Read bytes 1415 if len(header) < size: 1416 raise ROSBagFormatException('Error reading header field: expected %d bytes, read %d' % (size, len(header))) 1417 (name, sep, value) = header[:size].partition('=') 1418 if sep == '': 1419 raise ROSBagFormatException('Error reading header field') 1420 1421 header_dict[name] = value # @todo reindex: raise exception on empty name 1422 1423 header = header[size:] 1424 1425 # Check the op code of the header, if supplied 1426 if req_op is not None: 1427 op = _read_uint8_field(header_dict, 'op') 1428 if req_op != op: 1429 raise ROSBagFormatException('Expected op code: %s, got %s' % (_OP_CODES[req_op], _OP_CODES[op])) 1430 1431 return header_dict
1432
1433 -def _peek_next_header_op(f):
1434 pos = f.tell() 1435 header = _read_header(f) 1436 op = _read_uint8_field(header, 'op') 1437 f.seek(pos) 1438 return op
1439
1440 -def _read_record_data(f):
1441 try: 1442 record_data = _read_sized(f) 1443 except ROSBagException as ex: 1444 raise ROSBagFormatException('Error reading record data: %s' % str(ex)) 1445 1446 return record_data
1447
1448 -class _BagReader(object):
1449 - def __init__(self, bag):
1450 self.bag = bag
1451
1452 - def start_reading(self):
1453 raise NotImplementedError()
1454
1455 - def read_messages(self, topics, start_time, end_time, connection_filter, raw):
1456 raise NotImplementedError()
1457
1458 - def reindex(self):
1459 raise NotImplementedError()
1460
1461 -class _BagReader102_Unindexed(_BagReader):
1462 """ 1463 Support class for reading unindexed v1.2 bag files. 1464 """
1465 - def __init__(self, bag):
1466 _BagReader.__init__(self, bag)
1467
1468 - def start_reading(self):
1469 self.bag._file_header_pos = self.bag._file.tell()
1470
1471 - def reindex(self):
1472 """Generates all bag index information by rereading the message records.""" 1473 f = self.bag._file 1474 1475 total_bytes = self.bag.size 1476 1477 # Re-read the file header to get to the start of the first message 1478 self.bag._file.seek(self.bag._file_header_pos) 1479 1480 offset = f.tell() 1481 1482 # Read message definition and data records 1483 while offset < total_bytes: 1484 yield offset 1485 1486 op = _peek_next_header_op(f) 1487 1488 if op == _OP_MSG_DEF: 1489 connection_info = self.read_message_definition_record() 1490 1491 if connection_info.topic not in self.bag._topic_connections: 1492 self.bag._topic_connections[connection_info.topic] = connection_info.id 1493 self.bag._connections[connection_info.id] = connection_info 1494 self.bag._connection_indexes[connection_info.id] = [] 1495 1496 elif op == _OP_MSG_DATA: 1497 # Read the topic and timestamp from the header 1498 header = _read_header(f) 1499 1500 topic = _read_str_field(header, 'topic') 1501 secs = _read_uint32_field(header, 'sec') 1502 nsecs = _read_uint32_field(header, 'nsec') 1503 t = genpy.Time(secs, nsecs) 1504 1505 if topic not in self.bag._topic_connections: 1506 datatype = _read_str_field(header, 'type') 1507 self._create_connection_info_for_datatype(topic, datatype) 1508 1509 connection_id = self.bag._topic_connections[topic] 1510 info = self.bag._connections[connection_id] 1511 1512 # Skip over the message content 1513 _skip_sized(f) 1514 1515 # Insert the message entry (in order) into the connection index 1516 bisect.insort_right(self.bag._connection_indexes[connection_id], _IndexEntry102(t, offset)) 1517 1518 offset = f.tell()
1519
1520 - def read_messages(self, topics, start_time, end_time, topic_filter, raw):
1521 f = self.bag._file 1522 1523 f.seek(self.bag._file_header_pos) 1524 1525 while True: 1526 # Read MSG_DEF records 1527 while True: 1528 position = f.tell() 1529 1530 try: 1531 header = _read_header(f) 1532 except Exception: 1533 return 1534 1535 op = _read_uint8_field(header, 'op') 1536 if op != _OP_MSG_DEF: 1537 break 1538 1539 connection_info = self.read_message_definition_record(header) 1540 1541 if connection_info.topic not in self.bag._topic_connections: 1542 self.bag._topic_connections[connection_info.topic] = connection_info.id 1543 1544 self.bag._connections[connection_info.id] = connection_info 1545 1546 # Check that we have a MSG_DATA record 1547 if op != _OP_MSG_DATA: 1548 raise ROSBagFormatException('Expecting OP_MSG_DATA, got %d' % op) 1549 1550 topic = _read_str_field(header, 'topic') 1551 1552 if topic not in self.bag._topic_connections: 1553 datatype = _read_str_field(header, 'type') 1554 self._create_connection_info_for_datatype(topic, datatype) 1555 1556 connection_id = self.bag._topic_connections[topic] 1557 info = self.bag._connections[connection_id] 1558 1559 # Get the message type 1560 try: 1561 msg_type = _get_message_type(info) 1562 except KeyError: 1563 raise ROSBagException('Cannot deserialize messages of type [%s]. Message was not preceeded in bagfile by definition' % info.datatype) 1564 1565 # Get the timestamp 1566 secs = _read_uint32_field(header, 'sec') 1567 nsecs = _read_uint32_field(header, 'nsec') 1568 t = genpy.Time(secs, nsecs) 1569 1570 # Read the message content 1571 data = _read_record_data(f) 1572 1573 if raw: 1574 msg = (info.datatype, data, info.md5sum, position, msg_type) 1575 else: 1576 # Deserialize the message 1577 msg = msg_type() 1578 msg.deserialize(data) 1579 1580 yield (topic, msg, t) 1581 1582 self.bag._connection_indexes_read = True
1583
1584 - def _create_connection_info_for_datatype(self, topic, datatype):
1585 for c in self.bag._connections.itervalues(): 1586 if c.datatype == datatype: 1587 connection_id = len(self.bag._connections) 1588 connection_header = { 'topic' : topic, 'type' : c.header['type'], 'md5sum' : c.header['md5sum'], 'message_definition' : c.header['message_definition'] } 1589 connection_info = _ConnectionInfo(connection_id, topic, connection_header) 1590 1591 self.bag._topic_connections[topic] = connection_id 1592 self.bag._connections[connection_id] = connection_info 1593 self.bag._connection_indexes[connection_id] = [] 1594 return 1595 1596 raise ROSBagFormatException('Topic %s of datatype %s not preceded by message definition' % (topic, datatype))
1597
1598 - def read_message_definition_record(self, header=None):
1599 if not header: 1600 header = _read_header(self.bag._file, _OP_MSG_DEF) 1601 1602 topic = _read_str_field(header, 'topic') 1603 datatype = _read_str_field(header, 'type') 1604 md5sum = _read_str_field(header, 'md5') 1605 msg_def = _read_str_field(header, 'def') 1606 1607 _skip_sized(self.bag._file) # skip the record data 1608 1609 connection_header = { 'topic' : topic, 'type' : datatype, 'md5sum' : md5sum, 'message_definition' : msg_def } 1610 1611 id = len(self.bag._connections) 1612 1613 return _ConnectionInfo(id, topic, connection_header)
1614
1615 -class _BagReader102_Indexed(_BagReader102_Unindexed):
1616 """ 1617 Support class for reading indexed v1.2 bag files. 1618 """
1619 - def __init__(self, bag):
1620 _BagReader.__init__(self, bag)
1621
1622 - def read_messages(self, topics, start_time, end_time, connection_filter, raw):
1623 connections = self.bag._get_connections(topics, connection_filter) 1624 for entry in self.bag._get_entries(connections, start_time, end_time): 1625 yield self.seek_and_read_message_data_record(entry.offset, raw)
1626
1627 - def reindex(self):
1628 """Generates all bag index information by rereading the message records.""" 1629 f = self.bag._file 1630 1631 total_bytes = self.bag.size 1632 1633 # Re-read the file header to get to the start of the first message 1634 self.bag._file.seek(self.bag._file_header_pos) 1635 self.read_file_header_record() 1636 1637 offset = f.tell() 1638 1639 # Read message definition and data records 1640 while offset < total_bytes: 1641 yield offset 1642 1643 op = _peek_next_header_op(f) 1644 1645 if op == _OP_MSG_DEF: 1646 connection_info = self.read_message_definition_record() 1647 1648 if connection_info.topic not in self.bag._topic_connections: 1649 self.bag._topic_connections[connection_info.topic] = connection_info.id 1650 self.bag._connections[connection_info.id] = connection_info 1651 self.bag._connection_indexes[connection_info.id] = [] 1652 1653 elif op == _OP_MSG_DATA: 1654 # Read the topic and timestamp from the header 1655 header = _read_header(f) 1656 1657 topic = _read_str_field(header, 'topic') 1658 secs = _read_uint32_field(header, 'sec') 1659 nsecs = _read_uint32_field(header, 'nsec') 1660 t = genpy.Time(secs, nsecs) 1661 1662 if topic not in self.bag._topic_connections: 1663 datatype = _read_str_field(header, 'type') 1664 self._create_connection_info_for_datatype(topic, datatype) 1665 1666 connection_id = self.bag._topic_connections[topic] 1667 info = self.bag._connections[connection_id] 1668 1669 # Skip over the message content 1670 _skip_sized(f) 1671 1672 # Insert the message entry (in order) into the connection index 1673 bisect.insort_right(self.bag._connection_indexes[connection_id], _IndexEntry102(t, offset)) 1674 1675 elif op == _OP_INDEX_DATA: 1676 _skip_record(f) 1677 1678 offset = f.tell()
1679
1680 - def start_reading(self):
1681 try: 1682 # Read the file header 1683 self.read_file_header_record() 1684 1685 total_bytes = self.bag.size 1686 1687 # Check if the index position has been written, i.e. the bag was closed successfully 1688 if self.bag._index_data_pos == 0: 1689 raise ROSBagUnindexedException() 1690 1691 # Seek to the beginning of the topic index records 1692 self.bag._file.seek(self.bag._index_data_pos) 1693 1694 # Read the topic indexes 1695 topic_indexes = {} 1696 while True: 1697 pos = self.bag._file.tell() 1698 if pos >= total_bytes: 1699 break 1700 1701 topic, index = self.read_topic_index_record() 1702 1703 topic_indexes[topic] = index 1704 1705 # Read the message definition records (one for each topic) 1706 for topic, index in topic_indexes.items(): 1707 self.bag._file.seek(index[0].offset) 1708 1709 connection_info = self.read_message_definition_record() 1710 1711 if connection_info.topic not in self.bag._topic_connections: 1712 self.bag._topic_connections[connection_info.topic] = connection_info.id 1713 self.bag._connections[connection_info.id] = connection_info 1714 1715 self.bag._connection_indexes[connection_info.id] = index 1716 1717 self.bag._connection_indexes_read = True 1718 1719 except Exception as ex: 1720 raise ROSBagUnindexedException()
1721
1722 - def read_file_header_record(self):
1723 self.bag._file_header_pos = self.bag._file.tell() 1724 1725 header = _read_header(self.bag._file, _OP_FILE_HEADER) 1726 1727 self.bag._index_data_pos = _read_uint64_field(header, 'index_pos') 1728 1729 _skip_sized(self.bag._file) # skip the record data, i.e. padding
1730
1731 - def read_topic_index_record(self):
1732 f = self.bag._file 1733 1734 header = _read_header(f, _OP_INDEX_DATA) 1735 1736 index_version = _read_uint32_field(header, 'ver') 1737 topic = _read_str_field (header, 'topic') 1738 count = _read_uint32_field(header, 'count') 1739 1740 if index_version != 0: 1741 raise ROSBagFormatException('expecting index version 0, got %d' % index_version) 1742 1743 _read_uint32(f) # skip the record data size 1744 1745 topic_index = [] 1746 1747 for i in range(count): 1748 time = _read_time (f) 1749 offset = _read_uint64(f) 1750 1751 topic_index.append(_IndexEntry102(time, offset)) 1752 1753 return (topic, topic_index)
1754
1755 - def seek_and_read_message_data_record(self, position, raw):
1756 f = self.bag._file 1757 1758 # Seek to the message position 1759 f.seek(position) 1760 1761 # Skip any MSG_DEF records 1762 while True: 1763 header = _read_header(f) 1764 op = _read_uint8_field(header, 'op') 1765 if op != _OP_MSG_DEF: 1766 break 1767 _skip_sized(f) 1768 1769 # Check that we have a MSG_DATA record 1770 if op != _OP_MSG_DATA: 1771 raise ROSBagFormatException('Expecting OP_MSG_DATA, got %d' % op) 1772 1773 topic = _read_str_field(header, 'topic') 1774 1775 connection_id = self.bag._topic_connections[topic] 1776 info = self.bag._connections[connection_id] 1777 1778 # Get the message type 1779 try: 1780 msg_type = _get_message_type(info) 1781 except KeyError: 1782 raise ROSBagException('Cannot deserialize messages of type [%s]. Message was not preceeded in bagfile by definition' % info.datatype) 1783 1784 # Get the timestamp 1785 secs = _read_uint32_field(header, 'sec') 1786 nsecs = _read_uint32_field(header, 'nsec') 1787 t = genpy.Time(secs, nsecs) 1788 1789 # Read the message content 1790 data = _read_record_data(f) 1791 1792 if raw: 1793 msg = info.datatype, data, info.md5sum, position, msg_type 1794 else: 1795 # Deserialize the message 1796 msg = msg_type() 1797 msg.deserialize(data) 1798 1799 return (topic, msg, t)
1800
1801 -class _BagReader200(_BagReader):
1802 """ 1803 Support class for reading v2.0 bag files. 1804 """
1805 - def __init__(self, bag):
1806 _BagReader.__init__(self, bag) 1807 1808 self.decompressed_chunk_pos = None 1809 self.decompressed_chunk = None 1810 self.decompressed_chunk_io = None
1811
1812 - def reindex(self):
1813 """ 1814 Generates all bag index information by rereading the chunks. 1815 Assumes the file header has been read. 1816 """ 1817 f = self.bag._file 1818 1819 f.seek(0, os.SEEK_END) 1820 total_bytes = f.tell() 1821 1822 # Read any connection records from after the chunk section. 1823 # This is to workaround a bug in rosbag record --split (fixed in r10390) 1824 # where connection records weren't always being written inside the chunk. 1825 self._read_terminal_connection_records() 1826 1827 # Re-read the file header to get to the start of the first chunk 1828 self.bag._file.seek(self.bag._file_header_pos) 1829 self.read_file_header_record() 1830 1831 trunc_pos = None 1832 1833 while True: 1834 chunk_pos = f.tell() 1835 if chunk_pos >= total_bytes: 1836 break 1837 1838 yield chunk_pos 1839 1840 try: 1841 self._reindex_read_chunk(f, chunk_pos, total_bytes) 1842 except Exception as ex: 1843 break 1844 1845 trunc_pos = f.tell() 1846 1847 if trunc_pos and trunc_pos < total_bytes: 1848 f.truncate(trunc_pos)
1849
1850 - def _reindex_read_chunk(self, f, chunk_pos, total_bytes):
1851 # Read the chunk header 1852 chunk_header = self.read_chunk_header() 1853 1854 # If the chunk header size is 0, then the chunk wasn't correctly terminated - we're done 1855 if chunk_header.compressed_size == 0: 1856 raise ROSBagException('unterminated chunk at %d' % chunk_pos) 1857 1858 if chunk_header.compression == Compression.NONE: 1859 chunk_file = f 1860 else: 1861 # Read the compressed chunk 1862 compressed_chunk = _read(f, chunk_header.compressed_size) 1863 1864 # Decompress it 1865 if chunk_header.compression == Compression.BZ2: 1866 self.decompressed_chunk = bz2.decompress(compressed_chunk) 1867 else: 1868 raise ROSBagException('unsupported compression type: %s' % chunk_header.compression) 1869 1870 if self.decompressed_chunk_io: 1871 self.decompressed_chunk_io.close() 1872 self.decompressed_chunk_io = StringIO(self.decompressed_chunk) 1873 1874 chunk_file = self.decompressed_chunk_io 1875 1876 # Read chunk connection and message records 1877 self.bag._curr_chunk_info = None 1878 1879 if chunk_header.compression == Compression.NONE: 1880 offset = chunk_file.tell() - chunk_pos 1881 else: 1882 offset = chunk_file.tell() 1883 1884 expected_index_length = 0 1885 1886 while offset < chunk_header.uncompressed_size: 1887 op = _peek_next_header_op(chunk_file) 1888 1889 if op == _OP_CONNECTION: 1890 connection_info = self.read_connection_record(chunk_file) 1891 1892 if connection_info.id not in self.bag._connections: 1893 self.bag._connections[connection_info.id] = connection_info 1894 if connection_info.id not in self.bag._connection_indexes: 1895 self.bag._connection_indexes[connection_info.id] = [] 1896 1897 elif op == _OP_MSG_DATA: 1898 # Read the connection id and timestamp from the header 1899 header = _read_header(chunk_file) 1900 1901 connection_id = _read_uint32_field(header, 'conn') 1902 t = _read_time_field (header, 'time') 1903 1904 # Update the chunk info with this timestamp 1905 if not self.bag._curr_chunk_info: 1906 self.bag._curr_chunk_info = _ChunkInfo(chunk_pos, t, t) 1907 else: 1908 if t > self.bag._curr_chunk_info.end_time: 1909 self.bag._curr_chunk_info.end_time = t 1910 elif t < self.bag._curr_chunk_info.start_time: 1911 self.bag._curr_chunk_info.start_time = t 1912 if connection_id in self.bag._curr_chunk_info.connection_counts: 1913 self.bag._curr_chunk_info.connection_counts[connection_id] += 1 1914 else: 1915 self.bag._curr_chunk_info.connection_counts[connection_id] = 1 1916 1917 # Skip over the message content 1918 _skip_sized(chunk_file) 1919 1920 # Insert the message entry (in order) into the connection index 1921 if connection_id not in self.bag._connection_indexes: 1922 raise ROSBagException('connection id (id=%d) in chunk at position %d not preceded by connection record' % (connection_id, chunk_pos)) 1923 bisect.insort_right(self.bag._connection_indexes[connection_id], _IndexEntry200(t, chunk_pos, offset)) 1924 1925 expected_index_length += 1 1926 1927 else: 1928 # Unknown record type so skip 1929 _skip_record(chunk_file) 1930 1931 if chunk_header.compression == Compression.NONE: 1932 offset = chunk_file.tell() - chunk_pos 1933 else: 1934 offset = chunk_file.tell() 1935 1936 # Skip over index records, connection records and chunk info records 1937 next_op = _peek_next_header_op(f) 1938 1939 total_index_length = 0 1940 1941 while next_op != _OP_CHUNK: 1942 if next_op == _OP_INDEX_DATA: 1943 # Bug workaround: C Turtle bags (pre-1.1.15) were written with an incorrect data length 1944 _, index = self.read_connection_index_record() 1945 total_index_length += len(index) 1946 else: 1947 _skip_record(f) 1948 1949 if f.tell() >= total_bytes: 1950 if total_index_length != expected_index_length: 1951 raise ROSBagException('index shorter than expected (%d vs %d)' % (total_index_length, expected_index_length)) 1952 break 1953 1954 next_op = _peek_next_header_op(f) 1955 1956 # Chunk was read correctly - store info 1957 self.bag._chunk_headers[chunk_pos] = chunk_header 1958 self.bag._chunks.append(self.bag._curr_chunk_info)
1959
1961 b, f, r = self.bag, self.bag._file, self.bag._reader 1962 1963 # Seek to the first record after FILE_HEADER 1964 f.seek(b._file_header_pos) 1965 r.read_file_header_record() 1966 1967 # Advance to the first CONNECTION 1968 if self._advance_to_next_record(_OP_CONNECTION): 1969 # Read the CONNECTION records 1970 while True: 1971 connection_info = r.read_connection_record(f) 1972 1973 b._connections[connection_info.id] = connection_info 1974 b._connection_indexes[connection_info.id] = [] 1975 1976 next_op = _peek_next_header_op(f) 1977 if next_op != _OP_CONNECTION: 1978 break
1979
1980 - def _advance_to_next_record(self, op):
1981 b, f = self.bag, self.bag._file 1982 1983 try: 1984 while True: 1985 next_op = _peek_next_header_op(f) 1986 if next_op == op: 1987 break 1988 1989 if next_op == _OP_INDEX_DATA: 1990 # Workaround the possible invalid data length in INDEX_DATA records 1991 1992 # read_connection_index_record() requires _curr_chunk_info is set 1993 if b._curr_chunk_info is None: 1994 b._curr_chunk_info = _ChunkInfo(0, rospy.Time(0, 1), rospy.Time(0, 1)) 1995 1996 b._reader.read_connection_index_record() 1997 else: 1998 _skip_record(f) 1999 2000 return True 2001 2002 except Exception as ex: 2003 return False
2004
2005 - def start_reading(self):
2006 try: 2007 # Read the file header 2008 self.read_file_header_record() 2009 2010 # Check if the index position has been written, i.e. the bag was closed successfully 2011 if self.bag._index_data_pos == 0: 2012 raise ROSBagUnindexedException() 2013 2014 # Seek to the end of the chunks 2015 self.bag._file.seek(self.bag._index_data_pos) 2016 2017 # Read the connection records 2018 self.bag._connection_indexes = {} 2019 for i in range(self.bag._connection_count): 2020 connection_info = self.read_connection_record(self.bag._file) 2021 self.bag._connections[connection_info.id] = connection_info 2022 self.bag._connection_indexes[connection_info.id] = [] 2023 2024 # Read the chunk info records 2025 self.bag._chunks = [self.read_chunk_info_record() for i in range(self.bag._chunk_count)] 2026 2027 # Read the chunk headers 2028 self.bag._chunk_headers = {} 2029 for chunk_info in self.bag._chunks: 2030 self.bag._file.seek(chunk_info.pos) 2031 self.bag._chunk_headers[chunk_info.pos] = self.read_chunk_header() 2032 2033 if not self.bag._skip_index: 2034 self._read_connection_index_records() 2035 2036 except Exception as ex: 2037 raise ROSBagUnindexedException()
2038
2040 for chunk_info in self.bag._chunks: 2041 self.bag._file.seek(chunk_info.pos) 2042 _skip_record(self.bag._file) 2043 2044 self.bag._curr_chunk_info = chunk_info 2045 for i in range(len(chunk_info.connection_counts)): 2046 connection_id, index = self.read_connection_index_record() 2047 self.bag._connection_indexes[connection_id].extend(index) 2048 2049 # Remove any connections with no entries 2050 # This is a workaround for a bug where connection records were being written for 2051 # connections which had no messages in the bag 2052 orphan_connection_ids = [id for id, index in self.bag._connection_indexes.iteritems() if not index] 2053 for id in orphan_connection_ids: 2054 del self.bag._connections[id] 2055 del self.bag._connection_indexes[id] 2056 2057 self.bag._connection_indexes_read = True
2058
2059 - def read_messages(self, topics, start_time, end_time, connection_filter, raw):
2060 connections = self.bag._get_connections(topics, connection_filter) 2061 for entry in self.bag._get_entries(connections, start_time, end_time): 2062 yield self.seek_and_read_message_data_record((entry.chunk_pos, entry.offset), raw)
2063 2064 ### 2065
2066 - def read_file_header_record(self):
2067 self.bag._file_header_pos = self.bag._file.tell() 2068 2069 header = _read_header(self.bag._file, _OP_FILE_HEADER) 2070 2071 self.bag._index_data_pos = _read_uint64_field(header, 'index_pos') 2072 self.bag._chunk_count = _read_uint32_field(header, 'chunk_count') 2073 self.bag._connection_count = _read_uint32_field(header, 'conn_count') 2074 2075 _skip_sized(self.bag._file) # skip over the record data, i.e. padding
2076
2077 - def read_connection_record(self, f):
2078 header = _read_header(f, _OP_CONNECTION) 2079 2080 conn_id = _read_uint32_field(header, 'conn') 2081 topic = _read_str_field (header, 'topic') 2082 2083 connection_header = _read_header(f) 2084 2085 return _ConnectionInfo(conn_id, topic, connection_header)
2086
2087 - def read_chunk_info_record(self):
2088 f = self.bag._file 2089 2090 header = _read_header(f, _OP_CHUNK_INFO) 2091 2092 chunk_info_version = _read_uint32_field(header, 'ver') 2093 2094 if chunk_info_version == 1: 2095 chunk_pos = _read_uint64_field(header, 'chunk_pos') 2096 start_time = _read_time_field (header, 'start_time') 2097 end_time = _read_time_field (header, 'end_time') 2098 connection_count = _read_uint32_field(header, 'count') 2099 2100 chunk_info = _ChunkInfo(chunk_pos, start_time, end_time) 2101 2102 _read_uint32(f) # skip the record data size 2103 2104 for i in range(connection_count): 2105 connection_id = _read_uint32(f) 2106 count = _read_uint32(f) 2107 2108 chunk_info.connection_counts[connection_id] = count 2109 2110 return chunk_info 2111 else: 2112 raise ROSBagFormatException('Unknown chunk info record version: %d' % chunk_info_version)
2113
2114 - def read_chunk_header(self):
2115 header = _read_header(self.bag._file, _OP_CHUNK) 2116 2117 compression = _read_str_field (header, 'compression') 2118 uncompressed_size = _read_uint32_field(header, 'size') 2119 2120 compressed_size = _read_uint32(self.bag._file) # read the record data size 2121 2122 data_pos = self.bag._file.tell() 2123 2124 return _ChunkHeader(compression, compressed_size, uncompressed_size, data_pos)
2125
2127 f = self.bag._file 2128 2129 header = _read_header(f, _OP_INDEX_DATA) 2130 2131 index_version = _read_uint32_field(header, 'ver') 2132 connection_id = _read_uint32_field(header, 'conn') 2133 count = _read_uint32_field(header, 'count') 2134 2135 if index_version != 1: 2136 raise ROSBagFormatException('expecting index version 1, got %d' % index_version) 2137 2138 record_size = _read_uint32(f) # skip the record data size 2139 2140 index = [] 2141 2142 for i in range(count): 2143 time = _read_time (f) 2144 offset = _read_uint32(f) 2145 2146 index.append(_IndexEntry200(time, self.bag._curr_chunk_info.pos, offset)) 2147 2148 return (connection_id, index)
2149
2150 - def seek_and_read_message_data_record(self, position, raw):
2151 chunk_pos, offset = position 2152 2153 chunk_header = self.bag._chunk_headers.get(chunk_pos) 2154 if chunk_header is None: 2155 raise ROSBagException('no chunk at position %d' % chunk_pos) 2156 2157 if chunk_header.compression == Compression.NONE: 2158 f = self.bag._file 2159 f.seek(chunk_header.data_pos + offset) 2160 else: 2161 if self.decompressed_chunk_pos != chunk_pos: 2162 # Seek to the chunk data, read and decompress 2163 self.bag._file.seek(chunk_header.data_pos) 2164 compressed_chunk = _read(self.bag._file, chunk_header.compressed_size) 2165 2166 if chunk_header.compression == Compression.BZ2: 2167 self.decompressed_chunk = bz2.decompress(compressed_chunk) 2168 else: 2169 raise ROSBagException('unsupported compression type: %s' % chunk_header.compression) 2170 2171 self.decompressed_chunk_pos = chunk_pos 2172 2173 if self.decompressed_chunk_io: 2174 self.decompressed_chunk_io.close() 2175 self.decompressed_chunk_io = StringIO(self.decompressed_chunk) 2176 2177 f = self.decompressed_chunk_io 2178 f.seek(offset) 2179 2180 # Skip any CONNECTION records 2181 while True: 2182 header = _read_header(f) 2183 op = _read_uint8_field(header, 'op') 2184 if op != _OP_CONNECTION: 2185 break 2186 _skip_sized(f) 2187 2188 # Check that we have a MSG_DATA record 2189 if op != _OP_MSG_DATA: 2190 raise ROSBagFormatException('Expecting OP_MSG_DATA, got %d' % op) 2191 2192 connection_id = _read_uint32_field(header, 'conn') 2193 t = _read_time_field (header, 'time') 2194 2195 # Get the message type 2196 connection_info = self.bag._connections[connection_id] 2197 try: 2198 msg_type = _get_message_type(connection_info) 2199 except KeyError: 2200 raise ROSBagException('Cannot deserialize messages of type [%s]. Message was not preceded in bag by definition' % connection_info.datatype) 2201 2202 # Read the message content 2203 data = _read_record_data(f) 2204 2205 # Deserialize the message 2206 if raw: 2207 msg = connection_info.datatype, data, connection_info.md5sum, (chunk_pos, offset), msg_type 2208 else: 2209 msg = msg_type() 2210 msg.deserialize(data) 2211 2212 return (connection_info.topic, msg, t)
2213
2214 -def _time_to_str(secs):
2215 secs_frac = secs - int(secs) 2216 secs_frac_str = ('%.2f' % secs_frac)[1:] 2217 2218 return time.strftime('%b %d %Y %H:%M:%S', time.localtime(secs)) + secs_frac_str
2219
2220 -def _human_readable_size(size):
2221 multiple = 1024.0 2222 for suffix in ['KB', 'MB', 'GB', 'TB', 'PB', 'EB', 'ZB', 'YB']: 2223 size /= multiple 2224 if size < multiple: 2225 return '%.1f %s' % (size, suffix) 2226 2227 return '-'
2228
2229 -def _human_readable_frequency(freq):
2230 multiple = 1000.0 2231 for suffix in ['Hz', 'kHz', 'MHz', 'GHz', 'THz', 'PHz', 'EHz', 'ZHz', 'YHz']: 2232 if freq < multiple: 2233 return '%.1f %s' % (freq, suffix) 2234 freq /= multiple 2235 2236 return '-'
2237
2238 ## See http://code.activestate.com/recipes/511509 2239 -def _mergesort(list_of_lists, key=None):
2240 """ 2241 Perform an N-way merge operation on sorted lists. 2242 2243 @param list_of_lists: (really iterable of iterable) of sorted elements 2244 (either by naturally or by C{key}) 2245 @param key: specify sort key function (like C{sort()}, C{sorted()}) 2246 @param iterfun: function that returns an iterator. 2247 2248 Yields tuples of the form C{(item, iterator)}, where the iterator is the 2249 built-in list iterator or something you pass in, if you pre-generate the 2250 iterators. 2251 2252 This is a stable merge; complexity O(N lg N) 2253 2254 Examples:: 2255 2256 print list(x[0] for x in mergesort([[1,2,3,4], 2257 [2,3.5,3.7,4.5,6,7], 2258 [2.6,3.6,6.6,9]])) 2259 [1, 2, 2, 2.6, 3, 3.5, 3.6, 3.7, 4, 4.5, 6, 6.6, 7, 9] 2260 2261 # note stability 2262 print list(x[0] for x in mergesort([[1,2,3,4], 2263 [2,3.5,3.7,4.5,6,7], 2264 [2.6,3.6,6.6,9]], key=int)) 2265 [1, 2, 2, 2.6, 3, 3.5, 3.6, 3.7, 4, 4.5, 6, 6.6, 7, 9] 2266 2267 print list(x[0] for x in mergesort([[4,3,2,1], 2268 [7,6.5,4,3.7,3.3,1.9], 2269 [9,8.6,7.6,6.6,5.5,4.4,3.3]], 2270 key=lambda x: -x)) 2271 [9, 8.6, 7.6, 7, 6.6, 6.5, 5.5, 4.4, 4, 4, 3.7, 3.3, 3.3, 3, 2, 1.9, 1] 2272 """ 2273 2274 heap = [] 2275 for i, itr in enumerate(iter(pl) for pl in list_of_lists): 2276 try: 2277 item = itr.next() 2278 toadd = (key(item), i, item, itr) if key else (item, i, itr) 2279 heap.append(toadd) 2280 except StopIteration: 2281 pass 2282 heapq.heapify(heap) 2283 2284 if key: 2285 while heap: 2286 _, idx, item, itr = heap[0] 2287 yield item, itr 2288 try: 2289 item = itr.next() 2290 heapq.heapreplace(heap, (key(item), idx, item, itr) ) 2291 except StopIteration: 2292 heapq.heappop(heap) 2293 2294 else: 2295 while heap: 2296 item, idx, itr = heap[0] 2297 yield item, itr 2298 try: 2299 heapq.heapreplace(heap, (itr.next(), idx, itr)) 2300 except StopIteration: 2301 heapq.heappop(heap)
2302
2303 -class _BZ2CompressorFileFacade(object):
2304 """ 2305 A file facade for the bz2.BZ2Compressor. 2306 """
2307 - def __init__(self, file):
2308 self.file = file 2309 self.compressor = bz2.BZ2Compressor() 2310 self.compressed_bytes_in = 0
2311
2312 - def write(self, data):
2313 compressed = self.compressor.compress(data) 2314 if len(compressed) > 0: 2315 self.file.write(compressed) 2316 self.compressed_bytes_in += len(data)
2317
2318 - def flush(self):
2319 compressed = self.compressor.flush() 2320 if len(compressed) > 0: 2321 self.file.write(compressed)
2322
2323 -def _median(values):
2324 values_len = len(values) 2325 if values_len == 0: 2326 return float('nan') 2327 2328 sorted_values = sorted(values) 2329 if values_len % 2 == 1: 2330 return sorted_values[(values_len + 1) / 2 - 1] 2331 2332 lower = sorted_values[values_len / 2 - 1] 2333 upper = sorted_values[values_len / 2] 2334 2335 return float(lower + upper) / 2
2336