Package rosbag :: Module bag
[frames] | no frames]

Source Code for Module rosbag.bag

   1  # Software License Agreement (BSD License) 
   2  # 
   3  # Copyright (c) 2010, Willow Garage, Inc. 
   4  # All rights reserved. 
   5  # 
   6  # Redistribution and use in source and binary forms, with or without 
   7  # modification, are permitted provided that the following conditions 
   8  # are met: 
   9  # 
  10  #  * Redistributions of source code must retain the above copyright 
  11  #    notice, this list of conditions and the following disclaimer. 
  12  #  * Redistributions in binary form must reproduce the above 
  13  #    copyright notice, this list of conditions and the following 
  14  #    disclaimer in the documentation and/or other materials provided 
  15  #    with the distribution. 
  16  #  * Neither the name of Willow Garage, Inc. nor the names of its 
  17  #    contributors may be used to endorse or promote products derived 
  18  #    from this software without specific prior written permission. 
  19  # 
  20  # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 
  21  # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 
  22  # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS 
  23  # FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE 
  24  # COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, 
  25  # INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, 
  26  # BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; 
  27  # LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER 
  28  # CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 
  29  # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN 
  30  # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 
  31  # POSSIBILITY OF SUCH DAMAGE. 
  32   
  33  """ 
  34  The rosbag Python API. 
  35   
  36  Provides serialization of bag files. 
  37  """ 
  38   
  39  from __future__ import print_function 
  40   
  41  import bisect 
  42  import bz2 
  43  import collections 
  44  import heapq 
  45  import os 
  46  import re 
  47  import struct 
  48  import sys 
  49  import threading 
  50  import time 
  51  import yaml 
  52   
  53  from Cryptodome.Cipher import AES 
  54  from Cryptodome.Random import get_random_bytes 
  55   
  56  import gnupg 
  57   
  58  try: 
  59      from cStringIO import StringIO  # Python 2.x 
  60  except ImportError: 
  61      from io import BytesIO as StringIO  # Python 3.x 
  62   
  63  import genmsg 
  64  import genpy 
  65  import genpy.dynamic 
  66  import genpy.message 
  67   
  68  import roslib.names # still needed for roslib.names.canonicalize_name() 
  69  import rospy 
  70  try: 
  71      import roslz4 
  72      found_lz4 = True 
  73  except ImportError: 
  74      rospy.logwarn( 
  75          'Failed to load Python extension for LZ4 support. ' 
  76          'LZ4 compression will not be available.') 
  77      found_lz4 = False 
78 79 -class ROSBagException(Exception):
80 """ 81 Base class for exceptions in rosbag. 82 """
83 - def __init__(self, value=None):
84 self.value = value 85 #fix for #1209. needed in Python 2.7. 86 # For details: https://stackoverflow.com/questions/41808912/cannot-unpickle-exception-subclass 87 self.args = (value,)
88
89 - def __str__(self):
90 return self.value
91
92 -class ROSBagFormatException(ROSBagException):
93 """ 94 Exceptions for errors relating to the bag file format. 95 """
96 - def __init__(self, value):
97 ROSBagException.__init__(self, value)
98
99 -class ROSBagUnindexedException(ROSBagException):
100 """ 101 Exception for unindexed bags. 102 """
103 - def __init__(self, *args):
104 #*args needed for #1209 105 ROSBagException.__init__(self, 'Unindexed bag')
106
107 -class ROSBagEncryptNotSupportedException(ROSBagException):
108 """ 109 Exception raised when encryption is not supported. 110 """
111 - def __init__(self, value):
112 ROSBagException.__init__(self, value)
113
114 -class ROSBagEncryptException(ROSBagException):
115 """ 116 Exception raised when encryption or decryption failed. 117 """
118 - def __init__(self, value):
119 ROSBagException.__init__(self, value)
120
121 -class Compression:
122 """ 123 Allowable compression types 124 """ 125 NONE = 'none' 126 BZ2 = 'bz2' 127 LZ4 = 'lz4'
128 129 BagMessage = collections.namedtuple('BagMessage', 'topic message timestamp') 130 BagMessageWithConnectionHeader = collections.namedtuple('BagMessageWithConnectionHeader', 'topic message timestamp connection_header')
131 132 -class _ROSBagEncryptor(object):
133 """ 134 Base class for bag encryptor. 135 """ 136 _ENCRYPTOR_FIELD_NAME = 'encryptor' 137
138 - def __init__(self):
139 pass
140
141 -class _ROSBagNoEncryptor(_ROSBagEncryptor):
142 """ 143 Class for unencrypted bags. 144 """
145 - def __init__(self):
146 super(_ROSBagNoEncryptor, self).__init__()
147
148 - def initialize(self, _, __):
149 pass
150
151 - def encrypt_chunk(self, chunk_size, _, __):
152 return chunk_size
153
154 - def decrypt_chunk(self, chunk):
155 return chunk
156
157 - def add_fields_to_file_header(self, _):
158 pass
159
160 - def read_fields_from_file_header(self, _):
161 pass
162
163 - def write_encrypted_header(self, write_header, f, header):
164 return write_header(f, header)
165
166 - def read_encrypted_header(self, read_header, f, req_op=None):
167 return read_header(f, req_op)
168
169 - def add_info_rows(self, rows):
170 pass
171
172 - def get_info_str(self):
173 return ''
174
175 -class _ROSBagAesCbcEncryptor(_ROSBagEncryptor):
176 """ 177 Class for AES-CBC-encrypted bags. 178 """ 179 NAME = 'rosbag/AesCbcEncryptor' 180 _GPG_USER_FIELD_NAME = 'gpg_user' 181 _ENCRYPTED_KEY_FIELD_NAME = 'encrypted_key' 182
183 - def __init__(self):
184 """ 185 Create AES encryptor. 186 """ 187 super(_ROSBagAesCbcEncryptor, self).__init__() 188 # User name of GPG key used for symmetric key encryption 189 self._gpg_key_user = None 190 # GPG passphrase 191 self._gpg_passphrase = None 192 # Symmetric key for encryption/decryption 193 self._symmetric_key = None 194 # Encrypted symmetric key 195 self._encrypted_symmetric_key = None
196
197 - def initialize(self, bag, gpg_key_user, passphrase=None):
198 """ 199 Initialize encryptor by composing AES symmetric key. 200 @param bag: bag to be encrypted/decrypted 201 @type bag: Bag 202 @param gpg_key_user: user name of GPG key used for symmetric key encryption 203 @type gpg_key_user: str 204 @raise ROSBagException: if GPG key user has already been set 205 """ 206 if bag._mode != 'w': 207 self._gpg_passphrase = passphrase or os.getenv('ROSBAG_GPG_PASSPHRASE', None) 208 return 209 if self._gpg_key_user == gpg_key_user: 210 return 211 if not self._gpg_key_user: 212 self._gpg_key_user = gpg_key_user 213 self._build_symmetric_key() 214 else: 215 raise ROSBagException('Encryption user has already been set to {}'.format(self._gpg_key_user))
216
217 - def encrypt_chunk(self, chunk_size, chunk_data_pos, f):
218 """ 219 Read chunk from file, encrypt it, and write back to file. 220 @param chunk_size: size of chunk 221 @type chunk_size: int 222 @param chunk_data_pos: position of chunk data portion 223 @type chunk_data_pos: int 224 @param f: file stream 225 @type f: file 226 @return: size of initialization vector and encrypted chunk 227 @rtype: int 228 """ 229 f.seek(chunk_data_pos) 230 chunk = _read(f, chunk_size) 231 # Encrypt chunk 232 iv = get_random_bytes(AES.block_size) 233 f.seek(chunk_data_pos) 234 f.write(iv) 235 cipher = AES.new(self._symmetric_key, AES.MODE_CBC, iv) 236 encrypted_chunk = cipher.encrypt(_add_padding(chunk)) 237 # Write encrypted chunk 238 f.write(encrypted_chunk) 239 f.truncate(f.tell()) 240 return AES.block_size + len(encrypted_chunk)
241
242 - def decrypt_chunk(self, encrypted_chunk):
243 """ 244 Decrypt chunk. 245 @param encrypted_chunk: chunk to decrypt 246 @type encrypted_chunk: str 247 @return: decrypted chunk 248 @rtype: str 249 @raise ROSBagFormatException: if size of input chunk is not multiple of AES block size 250 """ 251 if len(encrypted_chunk) % AES.block_size != 0: 252 raise ROSBagFormatException('Error in encrypted chunk size: {}'.format(len(encrypted_chunk))) 253 if len(encrypted_chunk) < AES.block_size: 254 raise ROSBagFormatException('No initialization vector in encrypted chunk: {}'.format(len(encrypted_chunk))) 255 256 iv = encrypted_chunk[:AES.block_size] 257 cipher = AES.new(self._symmetric_key, AES.MODE_CBC, iv) 258 decrypted_chunk = cipher.decrypt(encrypted_chunk[AES.block_size:]) 259 return _remove_padding(decrypted_chunk)
260
261 - def add_fields_to_file_header(self, header):
262 """ 263 Add encryptor information to bag file header. 264 @param header: bag file header 265 @type header: dict 266 """ 267 header[self._ENCRYPTOR_FIELD_NAME] = self.NAME 268 header[self._GPG_USER_FIELD_NAME] = self._gpg_key_user 269 header[self._ENCRYPTED_KEY_FIELD_NAME] = self._encrypted_symmetric_key
270
271 - def read_fields_from_file_header(self, header):
272 """ 273 Read encryptor information from bag file header. 274 @param header: bag file header 275 @type header: dict 276 @raise ROSBagFormatException: if GPG key user is not found in header 277 """ 278 try: 279 self._encrypted_symmetric_key = _read_bytes_field(header, self._ENCRYPTED_KEY_FIELD_NAME) 280 except ROSBagFormatException: 281 raise ROSBagFormatException('Encrypted symmetric key is not found in header') 282 try: 283 self._gpg_key_user = _read_str_field(header, self._GPG_USER_FIELD_NAME) 284 except ROSBagFormatException: 285 raise ROSBagFormatException('GPG key user is not found in header') 286 try: 287 self._symmetric_key = _decrypt_string_gpg(self._encrypted_symmetric_key, self._gpg_passphrase) 288 except ROSBagFormatException: 289 raise
290
291 - def write_encrypted_header(self, _, f, header):
292 """ 293 Write encrypted header to bag file. 294 @param f: file stream 295 @type f: file 296 @param header: unencrypted header 297 @type header: dict 298 @return: encrypted string representing header 299 @rtype: str 300 """ 301 header_str = b'' 302 equal = b'=' 303 for k, v in header.items(): 304 if not isinstance(k, bytes): 305 k = k.encode() 306 if not isinstance(v, bytes): 307 v = v.encode() 308 header_str += _pack_uint32(len(k) + 1 + len(v)) + k + equal + v 309 310 iv = get_random_bytes(AES.block_size) 311 enc_str = iv 312 cipher = AES.new(self._symmetric_key, AES.MODE_CBC, iv) 313 enc_str += cipher.encrypt(_add_padding(header_str)) 314 _write_sized(f, enc_str) 315 return enc_str
316
317 - def read_encrypted_header(self, _, f, req_op=None):
318 """ 319 Read encrypted header from bag file. 320 @param f: file stream 321 @type f: file 322 @param req_op: expected header op code 323 @type req_op: int 324 @return: decrypted header 325 @rtype: dict 326 @raise ROSBagFormatException: if error occurs while decrypting/reading header 327 """ 328 # Read header 329 try: 330 header = self._decrypt_encrypted_header(f) 331 except ROSBagException as ex: 332 raise ROSBagFormatException('Error reading header: %s' % str(ex)) 333 334 return _build_header_from_str(header, req_op)
335
336 - def add_info_rows(self, rows):
337 """ 338 Add rows for rosbag info. 339 @param rows: information on bag encryption 340 @type rows: list of tuples 341 """ 342 rows.append(('encryption', self.NAME)) 343 rows.append(('GPG key user', self._gpg_key_user))
344
345 - def get_info_str(self):
346 """ 347 Return string for rosbag info. 348 @return: information on bag encryption 349 @rtype: str 350 """ 351 return 'encryption: %s\nGPG key user: %s\n' % (self.NAME, self._gpg_key_user)
352
353 - def _build_symmetric_key(self):
354 if not self._gpg_key_user: 355 return 356 self._symmetric_key = get_random_bytes(AES.block_size) 357 self._encrypted_symmetric_key = _encrypt_string_gpg(self._gpg_key_user, self._symmetric_key)
358
359 - def _decrypt_encrypted_header(self, f):
360 try: 361 size = _read_uint32(f) 362 except struct.error as ex: 363 raise ROSBagFormatException('error unpacking uint32: %s' % str(ex)) 364 365 if size % AES.block_size != 0: 366 raise ROSBagFormatException('Error in encrypted header size: {}'.format(size)) 367 if size < AES.block_size: 368 raise ROSBagFormatException('No initialization vector in encrypted header: {}'.format(size)) 369 370 iv = _read(f, AES.block_size) 371 size -= AES.block_size 372 encrypted_header = _read(f, size) 373 cipher = AES.new(self._symmetric_key, AES.MODE_CBC, iv) 374 header = cipher.decrypt(encrypted_header) 375 return _remove_padding(header)
376
377 -def _add_padding(input_bytes):
378 # Add PKCS#7 padding to input string 379 padding_num = AES.block_size - len(input_bytes) % AES.block_size 380 return input_bytes + bytes((padding_num,) * padding_num)
381
382 -def _remove_padding(input_str):
383 # Remove PKCS#7 padding from input string 384 return input_str[:-ord(input_str[len(input_str) - 1:])]
385
386 -def _encrypt_string_gpg(key_user, input):
387 gpg = gnupg.GPG() 388 enc_data = gpg.encrypt(input, [key_user], always_trust=True) 389 if not enc_data.ok: 390 raise ROSBagEncryptException('Failed to encrypt bag: {}. Have you installed a required public key?'.format(enc_data.status)) 391 return str(enc_data)
392
393 -def _decrypt_string_gpg(input, passphrase=None):
394 gpg = gnupg.GPG() 395 dec_data = gpg.decrypt(input, passphrase=passphrase) 396 if not dec_data.ok: 397 raise ROSBagEncryptException('Failed to decrypt bag: {}. Have you installed a required private key?'.format(dec_data.status)) 398 return dec_data.data
399
400 -class Bag(object):
401 """ 402 Bag serialize messages to and from a single file on disk using the bag format. 403 """
404 - def __init__(self, f, mode='r', compression=Compression.NONE, chunk_threshold=768 * 1024, allow_unindexed=False, options=None, skip_index=False):
405 """ 406 Open a bag file. The mode can be 'r', 'w', or 'a' for reading (default), 407 writing or appending. The file will be created if it doesn't exist 408 when opened for writing or appending; it will be truncated when opened 409 for writing. Simultaneous reading and writing is allowed when in writing 410 or appending mode. 411 @param f: filename of bag to open or a stream to read from 412 @type f: str or file 413 @param mode: mode, either 'r', 'w', or 'a' 414 @type mode: str 415 @param compression: compression mode, see U{rosbag.Compression} for valid modes 416 @type compression: str 417 @param chunk_threshold: minimum number of uncompressed bytes per chunk 418 @type chunk_threshold: int 419 @param allow_unindexed: if True, allow opening unindexed bags 420 @type allow_unindexed: bool 421 @param options: the bag options (currently: compression and chunk_threshold) 422 @type options: dict 423 @param skip_index: if True, don't read the connection index records on open [2.0+] 424 @type skip_index: bool 425 @raise ValueError: if any argument is invalid 426 @raise ROSBagException: if an error occurs opening file 427 @raise ROSBagFormatException: if bag format is corrupted 428 """ 429 if options is not None: 430 if type(options) is not dict: 431 raise ValueError('options must be of type dict') 432 if 'compression' in options: 433 compression = options['compression'] 434 if 'chunk_threshold' in options: 435 chunk_threshold = options['chunk_threshold'] 436 437 self._file = None 438 self._filename = None 439 self._version = None 440 441 allowed_compressions = [Compression.NONE, Compression.BZ2] 442 if found_lz4: 443 allowed_compressions.append(Compression.LZ4) 444 if compression not in allowed_compressions: 445 raise ValueError('compression must be one of: %s' % ', '.join(allowed_compressions)) 446 self._compression = compression 447 448 if chunk_threshold < 0: 449 raise ValueError('chunk_threshold must be greater than or equal to zero') 450 self._chunk_threshold = chunk_threshold 451 452 self._skip_index = skip_index 453 454 self._reader = None 455 456 self._file_header_pos = None 457 self._index_data_pos = 0 # (1.2+) 458 459 self._clear_index() 460 461 self._buffer = StringIO() 462 463 self._curr_compression = Compression.NONE 464 465 self._encryptor = _ROSBagNoEncryptor() 466 467 self._open(f, mode, allow_unindexed) 468 469 self._output_file = self._file
470
471 - def __iter__(self):
472 return self.read_messages()
473
474 - def __enter__(self):
475 return self
476
477 - def __exit__(self, exc_type, exc_value, traceback):
478 self.close()
479 480 @property
481 - def options(self):
482 """Get the options.""" 483 return { 'compression' : self._compression, 'chunk_threshold' : self._chunk_threshold }
484 485 @property
486 - def filename(self):
487 """Get the filename.""" 488 return self._filename
489 490 @property
491 - def version(self):
492 """Get the version.""" 493 return self._version
494 495 @property
496 - def mode(self):
497 """Get the mode.""" 498 return self._mode
499 500 @property
501 - def size(self):
502 """Get the size in bytes.""" 503 if not self._file: 504 raise ValueError('I/O operation on closed bag') 505 506 pos = self._file.tell() 507 self._file.seek(0, os.SEEK_END) 508 size = self._file.tell() 509 self._file.seek(pos) 510 return size
511 512 # compression 513
514 - def _get_compression(self):
515 """Get the compression method to use for writing.""" 516 return self._compression
517
518 - def _set_compression(self, compression):
519 """Set the compression method to use for writing.""" 520 allowed_compressions = [Compression.NONE, Compression.BZ2] 521 if found_lz4: 522 allowed_compressions.append(Compression.LZ4) 523 if compression not in allowed_compressions: 524 raise ValueError('compression must be one of: %s' % ', '.join(allowed_compressions)) 525 526 self.flush() 527 self._compression = compression
528 529 compression = property(_get_compression, _set_compression) 530 531 # chunk_threshold 532
533 - def _get_chunk_threshold(self):
534 """Get the chunk threshold to use for writing.""" 535 return self._chunk_threshold
536
537 - def _set_chunk_threshold(self, chunk_threshold):
538 """Set the chunk threshold to use for writing.""" 539 if chunk_threshold < 0: 540 raise ValueError('chunk_threshold must be greater than or equal to zero') 541 542 self.flush() 543 self._chunk_threshold = chunk_threshold
544 545 chunk_threshold = property(_get_chunk_threshold, _set_chunk_threshold) 546
547 - def read_messages(self, topics=None, start_time=None, end_time=None, connection_filter=None, raw=False, return_connection_header=False):
548 """ 549 Read messages from the bag, optionally filtered by topic, timestamp and connection details. 550 @param topics: list of topics or a single topic. if an empty list is given all topics will be read [optional] 551 @type topics: list(str) or str 552 @param start_time: earliest timestamp of message to return [optional] 553 @type start_time: U{genpy.Time} 554 @param end_time: latest timestamp of message to return [optional] 555 @type end_time: U{genpy.Time} 556 @param connection_filter: function to filter connections to include [optional] 557 @type connection_filter: function taking (topic, datatype, md5sum, msg_def, header) and returning bool 558 @param raw: if True, then generate tuples of (datatype, (data, md5sum, position), pytype) 559 @type raw: bool 560 @return: generator of BagMessage(topic, message, timestamp) namedtuples for each message in the bag file 561 @rtype: generator of tuples of (str, U{genpy.Message}, U{genpy.Time}) [not raw] or (str, (str, str, str, tuple, class), U{genpy.Time}) [raw] 562 """ 563 self.flush() 564 565 if topics and type(topics) is str: 566 topics = [topics] 567 568 return self._reader.read_messages(topics, start_time, end_time, connection_filter, raw, return_connection_header)
569
570 - def flush(self):
571 """ 572 Write the open chunk to disk so subsequent reads will read all messages. 573 @raise ValueError: if bag is closed 574 """ 575 if not self._file: 576 raise ValueError('I/O operation on closed bag') 577 578 if self._chunk_open: 579 self._stop_writing_chunk()
580
581 - def write(self, topic, msg, t=None, raw=False, connection_header=None):
582 """ 583 Write a message to the bag. 584 @param topic: name of topic 585 @type topic: str 586 @param msg: message to add to bag, or tuple (if raw) 587 @type msg: Message or tuple of raw message data 588 @param t: ROS time of message publication, if None specified, use current time [optional] 589 @type t: U{genpy.Time} 590 @param raw: if True, msg is in raw format, i.e. (msg_type, serialized_bytes, md5sum, pytype) 591 @type raw: bool 592 @raise ValueError: if arguments are invalid or bag is closed 593 """ 594 if not self._file: 595 raise ValueError('I/O operation on closed bag') 596 597 if not topic: 598 raise ValueError('topic is invalid') 599 if not msg: 600 raise ValueError('msg is invalid') 601 602 if t is None: 603 t = genpy.Time.from_sec(time.time()) 604 605 # Seek to end (in case previous operation was a read) 606 self._file.seek(0, os.SEEK_END) 607 608 # Open a chunk, if needed 609 if not self._chunk_open: 610 self._start_writing_chunk(t) 611 612 # Unpack raw 613 if raw: 614 if len(msg) == 5: 615 msg_type, serialized_bytes, md5sum, pos, pytype = msg 616 elif len(msg) == 4: 617 msg_type, serialized_bytes, md5sum, pytype = msg 618 else: 619 raise ValueError('msg must be of length 4 or 5') 620 621 # Write connection record, if necessary (currently using a connection per topic; ignoring message connection header) 622 if topic in self._topic_connections: 623 connection_info = self._topic_connections[topic] 624 conn_id = connection_info.id 625 else: 626 conn_id = len(self._connections) 627 628 if raw: 629 if pytype is None: 630 try: 631 pytype = genpy.message.get_message_class(msg_type) 632 except Exception: 633 pytype = None 634 if pytype is None: 635 raise ROSBagException('cannot locate message class and no message class provided for [%s]' % msg_type) 636 637 if pytype._md5sum != md5sum: 638 print('WARNING: md5sum of loaded type [%s] does not match that specified' % msg_type, file=sys.stderr) 639 #raise ROSBagException('md5sum of loaded type does not match that of data being recorded') 640 641 header = connection_header if connection_header is not None else { 642 'topic': topic, 643 'type': msg_type, 644 'md5sum': md5sum, 645 'message_definition': pytype._full_text 646 } 647 else: 648 header = connection_header if connection_header is not None else { 649 'topic': topic, 650 'type': msg.__class__._type, 651 'md5sum': msg.__class__._md5sum, 652 'message_definition': msg._full_text 653 } 654 655 connection_info = _ConnectionInfo(conn_id, topic, header) 656 # No need to encrypt connection records in chunk (encrypt=False) 657 self._write_connection_record(connection_info, False) 658 659 self._connections[conn_id] = connection_info 660 self._topic_connections[topic] = connection_info 661 662 # Create an index entry 663 index_entry = _IndexEntry200(t, self._curr_chunk_info.pos, self._get_chunk_offset()) 664 665 # Update the indexes and current chunk info 666 if conn_id not in self._curr_chunk_connection_indexes: 667 # This is the first message on this connection in the chunk 668 self._curr_chunk_connection_indexes[conn_id] = [index_entry] 669 self._curr_chunk_info.connection_counts[conn_id] = 1 670 else: 671 curr_chunk_connection_index = self._curr_chunk_connection_indexes[conn_id] 672 if index_entry >= curr_chunk_connection_index[-1]: 673 # Test if we're writing chronologically. Can skip binary search if so. 674 curr_chunk_connection_index.append(index_entry) 675 else: 676 bisect.insort_right(curr_chunk_connection_index, index_entry) 677 678 self._curr_chunk_info.connection_counts[conn_id] += 1 679 680 if conn_id not in self._connection_indexes: 681 self._connection_indexes[conn_id] = [index_entry] 682 else: 683 bisect.insort_right(self._connection_indexes[conn_id], index_entry) 684 685 # Update the chunk start/end times 686 if t > self._curr_chunk_info.end_time: 687 self._curr_chunk_info.end_time = t 688 elif t < self._curr_chunk_info.start_time: 689 self._curr_chunk_info.start_time = t 690 691 if not raw: 692 # Serialize the message to the buffer 693 self._buffer.seek(0) 694 self._buffer.truncate(0) 695 msg.serialize(self._buffer) 696 serialized_bytes = self._buffer.getvalue() 697 698 # Write message data record 699 self._write_message_data_record(conn_id, t, serialized_bytes) 700 701 # Check if we want to stop this chunk 702 chunk_size = self._get_chunk_offset() 703 if chunk_size > self._chunk_threshold: 704 self._stop_writing_chunk()
705
706 - def reindex(self):
707 """ 708 Reindexes the bag file. Yields position of each chunk for progress. 709 """ 710 self._clear_index() 711 return self._reader.reindex()
712
713 - def close(self):
714 """ 715 Close the bag file. Closing an already closed bag does nothing. 716 """ 717 if self._file: 718 if self._mode in 'wa': 719 self._stop_writing() 720 721 self._close_file()
722
723 - def get_compression_info(self):
724 """ 725 Returns information about the compression of the bag 726 @return: generator of CompressionTuple(compression, uncompressed, compressed) describing the 727 type of compression used, size of the uncompressed data in Bytes, size of the compressed data in Bytes. If 728 no compression is used then uncompressed and compressed data will be equal. 729 @rtype: generator of CompressionTuple of (str, int, int) 730 """ 731 732 compression = self.compression 733 uncompressed = 0 734 compressed = 0 735 736 if self._chunk_headers: 737 compression_counts = {} 738 compression_uncompressed = {} 739 compression_compressed = {} 740 741 # the rest of this is determine which compression algorithm is dominant and 742 # to add up the uncompressed and compressed Bytes 743 for chunk_header in self._chunk_headers.values(): 744 if chunk_header.compression not in compression_counts: 745 compression_counts[chunk_header.compression] = 0 746 if chunk_header.compression not in compression_uncompressed: 747 compression_uncompressed[chunk_header.compression] = 0 748 if chunk_header.compression not in compression_compressed: 749 compression_compressed[chunk_header.compression] = 0 750 751 compression_counts[chunk_header.compression] += 1 752 compression_uncompressed[chunk_header.compression] += chunk_header.uncompressed_size 753 uncompressed += chunk_header.uncompressed_size 754 compression_compressed[chunk_header.compression] += chunk_header.compressed_size 755 compressed += chunk_header.compressed_size 756 757 chunk_count = len(self._chunk_headers) 758 759 main_compression_count, main_compression = sorted([(v, k) for k, v in compression_counts.items()], reverse=True)[0] 760 compression = str(main_compression) 761 762 return collections.namedtuple("CompressionTuple", ["compression", 763 "uncompressed", "compressed"])(compression=compression, 764 uncompressed=uncompressed, compressed=compressed)
765
766 - def get_message_count(self, topic_filters=None):
767 """ 768 Returns the number of messages in the bag. Can be filtered by Topic 769 @param topic_filters: One or more topics to filter by 770 @type topic_filters: Could be either a single str or a list of str. 771 @return: The number of messages in the bag, optionally filtered by topic 772 @rtype: int 773 """ 774 775 num_messages = 0 776 777 if topic_filters is not None: 778 info = self.get_type_and_topic_info(topic_filters=topic_filters) 779 for topic in info.topics.values(): 780 num_messages += topic.message_count 781 else: 782 if self._chunks: 783 for c in self._chunks: 784 for counts in c.connection_counts.values(): 785 num_messages += counts 786 else: 787 num_messages = sum([len(index) for index in self._connection_indexes.values()]) 788 789 return num_messages
790
791 - def get_start_time(self):
792 """ 793 Returns the start time of the bag. 794 @return: a timestamp of the start of the bag 795 @rtype: float, timestamp in seconds, includes fractions of a second 796 """ 797 798 if self._chunks: 799 start_stamp = self._chunks[0].start_time.to_sec() 800 else: 801 if not self._connection_indexes: 802 raise ROSBagException('Bag contains no message') 803 start_stamps = [index[0].time.to_sec() for index in self._connection_indexes.values() if index] 804 start_stamp = min(start_stamps) if start_stamps else 0 805 806 return start_stamp
807
808 - def get_end_time(self):
809 """ 810 Returns the end time of the bag. 811 @return: a timestamp of the end of the bag 812 @rtype: float, timestamp in seconds, includes fractions of a second 813 """ 814 815 if self._chunks: 816 end_stamp = self._chunks[-1].end_time.to_sec() 817 else: 818 if not self._connection_indexes: 819 raise ROSBagException('Bag contains no message') 820 end_stamps = [index[-1].time.to_sec() for index in self._connection_indexes.values() if index] 821 end_stamp = max(end_stamps) if end_stamps else 0 822 823 return end_stamp
824
825 - def get_type_and_topic_info(self, topic_filters=None):
826 """ 827 Coallates info about the type and topics in the bag. 828 829 Note, it would be nice to filter by type as well, but there appear to be some limitations in the current architecture 830 that prevent that from working when more than one message type is written on the same topic. 831 832 @param topic_filters: specify one or more topic to filter by. 833 @type topic_filters: either a single str or a list of str. 834 @return: generator of TypesAndTopicsTuple(types{key:type name, val: md5hash}, 835 topics{type: msg type (Ex. "std_msgs/String"), 836 message_count: the number of messages of the particular type, 837 connections: the number of connections, 838 frequency: the data frequency, 839 key: type name, 840 val: md5hash}) describing the types of messages present 841 and information about the topics 842 @rtype: TypesAndTopicsTuple(dict(str, str), 843 TopicTuple(str, int, int, float, str, str)) 844 """ 845 846 datatypes = set() 847 datatype_infos = [] 848 849 for connection in self._connections.values(): 850 if connection.datatype in datatypes: 851 continue 852 853 datatype_infos.append((connection.datatype, connection.md5sum, connection.msg_def)) 854 datatypes.add(connection.datatype) 855 856 topics = [] 857 # load our list of topics and optionally filter 858 if topic_filters is not None: 859 if not isinstance(topic_filters, list): 860 topic_filters = [topic_filters] 861 862 topics = topic_filters 863 else: 864 topics = [c.topic for c in self._get_connections()] 865 866 topics = sorted(set(topics)) 867 868 topic_datatypes = {} 869 topic_conn_counts = {} 870 topic_msg_counts = {} 871 topic_freqs_median = {} 872 873 for topic in topics: 874 connections = list(self._get_connections(topic)) 875 876 if not connections: 877 continue 878 879 topic_datatypes[topic] = connections[0].datatype 880 topic_conn_counts[topic] = len(connections) 881 882 msg_count = 0 883 for connection in connections: 884 for chunk in self._chunks: 885 msg_count += chunk.connection_counts.get(connection.id, 0) 886 887 topic_msg_counts[topic] = msg_count 888 889 if self._connection_indexes_read: 890 stamps = [entry.time.to_sec() for entry in self._get_entries(connections)] 891 if len(stamps) > 1: 892 periods = [s1 - s0 for s1, s0 in zip(stamps[1:], stamps[:-1])] 893 med_period = _median(periods) 894 if med_period > 0.0: 895 topic_freqs_median[topic] = 1.0 / med_period 896 897 # process datatypes 898 types = {} 899 for datatype, md5sum, msg_def in sorted(datatype_infos): 900 types[datatype] = md5sum 901 902 # process topics 903 topics_t = {} 904 TopicTuple = collections.namedtuple("TopicTuple", ["msg_type", "message_count", "connections", "frequency"]) 905 for topic in sorted(topic_datatypes.keys()): 906 topic_msg_count = topic_msg_counts[topic] 907 frequency = topic_freqs_median[topic] if topic in topic_freqs_median else None 908 topics_t[topic] = TopicTuple(msg_type=topic_datatypes[topic], 909 message_count=topic_msg_count, 910 connections=topic_conn_counts[topic], 911 frequency=frequency) 912 913 return collections.namedtuple("TypesAndTopicsTuple", ["msg_types", "topics"])(msg_types=types, topics=topics_t)
914
915 - def set_encryptor(self, encryptor=None, param=None):
916 if self._chunks: 917 raise ROSBagException('Cannot set encryptor after chunks are written') 918 if encryptor is None: 919 self._encryptor = _ROSBagNoEncryptor() 920 elif encryptor == _ROSBagAesCbcEncryptor.NAME: 921 if sys.platform == 'win32': 922 raise ROSBagEncryptNotSupportedException('AES CBC encryptor is not supported for Windows') 923 else: 924 self._encryptor = _ROSBagAesCbcEncryptor() 925 else: 926 self._encryptor = _ROSBagNoEncryptor() 927 self._encryptor.initialize(self, param)
928
929 - def __str__(self):
930 rows = [] 931 932 try: 933 if self._filename: 934 rows.append(('path', self._filename)) 935 936 if self._version == 102 and type(self._reader) == _BagReader102_Unindexed: 937 rows.append(('version', '1.2 (unindexed)')) 938 else: 939 rows.append(('version', '%d.%d' % (int(self._version / 100), self._version % 100))) 940 941 if not self._connection_indexes and not self._chunks: 942 rows.append(('size', _human_readable_size(self.size))) 943 else: 944 if self._chunks: 945 start_stamp = self._chunks[ 0].start_time.to_sec() 946 end_stamp = self._chunks[-1].end_time.to_sec() 947 else: 948 start_stamps = [index[0].time.to_sec() for index in self._connection_indexes.values() if index] 949 start_stamp = min(start_stamps) if start_stamps else 0 950 end_stamps = [index[-1].time.to_sec() for index in self._connection_indexes.values() if index] 951 end_stamp = max(end_stamps) if end_stamps else 0 952 953 # Show duration 954 duration = end_stamp - start_stamp 955 dur_secs = duration % 60 956 dur_mins = int(duration / 60) 957 dur_hrs = int(dur_mins / 60) 958 if dur_hrs > 0: 959 dur_mins = dur_mins % 60 960 duration_str = '%dhr %d:%02ds (%ds)' % (dur_hrs, dur_mins, dur_secs, duration) 961 elif dur_mins > 0: 962 duration_str = '%d:%02ds (%ds)' % (dur_mins, dur_secs, duration) 963 else: 964 duration_str = '%.1fs' % duration 965 966 rows.append(('duration', duration_str)) 967 968 # Show start and end times 969 rows.append(('start', '%s (%.2f)' % (_time_to_str(start_stamp), start_stamp))) 970 rows.append(('end', '%s (%.2f)' % (_time_to_str(end_stamp), end_stamp))) 971 972 rows.append(('size', _human_readable_size(self.size))) 973 974 if self._chunks: 975 num_messages = 0 976 for c in self._chunks: 977 for counts in c.connection_counts.values(): 978 num_messages += counts 979 else: 980 num_messages = sum([len(index) for index in self._connection_indexes.values()]) 981 rows.append(('messages', str(num_messages))) 982 983 # Show compression information 984 if len(self._chunk_headers) == 0: 985 rows.append(('compression', 'none')) 986 else: 987 compression_counts = {} 988 compression_uncompressed = {} 989 compression_compressed = {} 990 for chunk_header in self._chunk_headers.values(): 991 if chunk_header.compression not in compression_counts: 992 compression_counts[chunk_header.compression] = 1 993 compression_uncompressed[chunk_header.compression] = chunk_header.uncompressed_size 994 compression_compressed[chunk_header.compression] = chunk_header.compressed_size 995 else: 996 compression_counts[chunk_header.compression] += 1 997 compression_uncompressed[chunk_header.compression] += chunk_header.uncompressed_size 998 compression_compressed[chunk_header.compression] += chunk_header.compressed_size 999 1000 chunk_count = len(self._chunk_headers) 1001 1002 compressions = [] 1003 for count, compression in reversed(sorted([(v, k) for k, v in compression_counts.items()])): 1004 if compression != Compression.NONE: 1005 fraction = (100.0 * compression_compressed[compression]) / compression_uncompressed[compression] 1006 compressions.append('%s [%d/%d chunks; %.2f%%]' % (compression, count, chunk_count, fraction)) 1007 else: 1008 compressions.append('%s [%d/%d chunks]' % (compression, count, chunk_count)) 1009 rows.append(('compression', ', '.join(compressions))) 1010 1011 all_uncompressed = (sum([count for c, count in compression_counts.items() if c != Compression.NONE]) == 0) 1012 if not all_uncompressed: 1013 total_uncompressed_size = sum((h.uncompressed_size for h in self._chunk_headers.values())) 1014 total_compressed_size = sum((h.compressed_size for h in self._chunk_headers.values())) 1015 1016 total_uncompressed_size_str = _human_readable_size(total_uncompressed_size) 1017 total_compressed_size_str = _human_readable_size(total_compressed_size) 1018 total_size_str_length = max([len(total_uncompressed_size_str), len(total_compressed_size_str)]) 1019 1020 if duration > 0: 1021 uncompressed_rate_str = _human_readable_size(total_uncompressed_size / duration) 1022 compressed_rate_str = _human_readable_size(total_compressed_size / duration) 1023 rate_str_length = max([len(uncompressed_rate_str), len(compressed_rate_str)]) 1024 1025 rows.append(('uncompressed', '%*s @ %*s/s' % 1026 (total_size_str_length, total_uncompressed_size_str, rate_str_length, uncompressed_rate_str))) 1027 rows.append(('compressed', '%*s @ %*s/s (%.2f%%)' % 1028 (total_size_str_length, total_compressed_size_str, rate_str_length, compressed_rate_str, (100.0 * total_compressed_size) / total_uncompressed_size))) 1029 else: 1030 rows.append(('uncompressed', '%*s' % (total_size_str_length, total_uncompressed_size_str))) 1031 rows.append(('compressed', '%*s' % (total_size_str_length, total_compressed_size_str))) 1032 1033 self._encryptor.add_info_rows(rows) 1034 1035 datatypes = set() 1036 datatype_infos = [] 1037 for connection in self._connections.values(): 1038 if connection.datatype in datatypes: 1039 continue 1040 datatype_infos.append((connection.datatype, connection.md5sum, connection.msg_def)) 1041 datatypes.add(connection.datatype) 1042 1043 topics = sorted(set([c.topic for c in self._get_connections()])) 1044 topic_datatypes = {} 1045 topic_conn_counts = {} 1046 topic_msg_counts = {} 1047 topic_freqs_median = {} 1048 for topic in topics: 1049 connections = list(self._get_connections(topic)) 1050 1051 topic_datatypes[topic] = connections[0].datatype 1052 topic_conn_counts[topic] = len(connections) 1053 1054 msg_count = 0 1055 for connection in connections: 1056 if self._chunks: 1057 for chunk in self._chunks: 1058 msg_count += chunk.connection_counts.get(connection.id, 0) 1059 else: 1060 msg_count += len(self._connection_indexes.get(connection.id, [])) 1061 topic_msg_counts[topic] = msg_count 1062 1063 if self._connection_indexes_read: 1064 stamps = [entry.time.to_sec() for entry in self._get_entries(connections)] 1065 if len(stamps) > 1: 1066 periods = [s1 - s0 for s1, s0 in zip(stamps[1:], stamps[:-1])] 1067 med_period = _median(periods) 1068 if med_period > 0.0: 1069 topic_freqs_median[topic] = 1.0 / med_period 1070 1071 topics = sorted(topic_datatypes.keys()) 1072 max_topic_len = max([len(topic) for topic in topics]) 1073 max_datatype_len = max([len(datatype) for datatype in datatypes]) 1074 max_msg_count_len = max([len('%d' % msg_count) for msg_count in topic_msg_counts.values()]) 1075 max_freq_median_len = max([len(_human_readable_frequency(freq)) for freq in topic_freqs_median.values()]) if len(topic_freqs_median) > 0 else 0 1076 1077 # Show datatypes 1078 for i, (datatype, md5sum, msg_def) in enumerate(sorted(datatype_infos)): 1079 s = '%-*s [%s]' % (max_datatype_len, datatype, md5sum) 1080 if i == 0: 1081 rows.append(('types', s)) 1082 else: 1083 rows.append(('', s)) 1084 1085 # Show topics 1086 for i, topic in enumerate(topics): 1087 topic_msg_count = topic_msg_counts[topic] 1088 1089 s = '%-*s %*d %s' % (max_topic_len, topic, max_msg_count_len, topic_msg_count, 'msgs' if topic_msg_count > 1 else 'msg ') 1090 if topic in topic_freqs_median: 1091 s += ' @ %*s' % (max_freq_median_len, _human_readable_frequency(topic_freqs_median[topic])) 1092 else: 1093 s += ' %*s' % (max_freq_median_len, '') 1094 1095 s += ' : %-*s' % (max_datatype_len, topic_datatypes[topic]) 1096 if topic_conn_counts[topic] > 1: 1097 s += ' (%d connections)' % topic_conn_counts[topic] 1098 1099 if i == 0: 1100 rows.append(('topics', s)) 1101 else: 1102 rows.append(('', s)) 1103 1104 except Exception as ex: 1105 raise 1106 1107 first_column_width = max([len(field) for field, _ in rows]) + 1 1108 1109 s = '' 1110 for (field, value) in rows: 1111 if field: 1112 s += '%-*s %s\n' % (first_column_width, field + ':', value) 1113 else: 1114 s += '%-*s %s\n' % (first_column_width, '', value) 1115 1116 return s.rstrip()
1117
1118 - def _get_yaml_info(self, key=None):
1119 s = '' 1120 1121 try: 1122 if self._filename: 1123 s += 'path: %s\n' % self._filename 1124 1125 if self._version == 102 and type(self._reader) == _BagReader102_Unindexed: 1126 s += 'version: 1.2 (unindexed)\n' 1127 else: 1128 s += 'version: %d.%d\n' % (int(self._version / 100), self._version % 100) 1129 1130 if not self._connection_indexes and not self._chunks: 1131 s += 'size: %d\n' % self.size 1132 s += 'indexed: False\n' 1133 else: 1134 if self._chunks: 1135 start_stamp = self._chunks[ 0].start_time.to_sec() 1136 end_stamp = self._chunks[-1].end_time.to_sec() 1137 else: 1138 start_stamps = [index[0].time.to_sec() for index in self._connection_indexes.values() if index] 1139 start_stamp = min(start_stamps) if start_stamps else 0 1140 end_stamps = [index[-1].time.to_sec() for index in self._connection_indexes.values() if index] 1141 end_stamp = max(end_stamps) if end_stamps else 0 1142 1143 duration = end_stamp - start_stamp 1144 s += 'duration: %.6f\n' % duration 1145 s += 'start: %.6f\n' % start_stamp 1146 s += 'end: %.6f\n' % end_stamp 1147 s += 'size: %d\n' % self.size 1148 if self._chunks: 1149 num_messages = 0 1150 for c in self._chunks: 1151 for counts in c.connection_counts.values(): 1152 num_messages += counts 1153 else: 1154 num_messages = sum([len(index) for index in self._connection_indexes.values()]) 1155 s += 'messages: %d\n' % num_messages 1156 s += 'indexed: True\n' 1157 1158 # Show compression information 1159 if len(self._chunk_headers) == 0: 1160 s += 'compression: none\n' 1161 else: 1162 compression_counts = {} 1163 compression_uncompressed = {} 1164 compression_compressed = {} 1165 for chunk_header in self._chunk_headers.values(): 1166 if chunk_header.compression not in compression_counts: 1167 compression_counts[chunk_header.compression] = 1 1168 compression_uncompressed[chunk_header.compression] = chunk_header.uncompressed_size 1169 compression_compressed[chunk_header.compression] = chunk_header.compressed_size 1170 else: 1171 compression_counts[chunk_header.compression] += 1 1172 compression_uncompressed[chunk_header.compression] += chunk_header.uncompressed_size 1173 compression_compressed[chunk_header.compression] += chunk_header.compressed_size 1174 1175 chunk_count = len(self._chunk_headers) 1176 1177 main_compression_count, main_compression = list(reversed(sorted([(v, k) for k, v in compression_counts.items()])))[0] 1178 s += 'compression: %s\n' % str(main_compression) 1179 1180 all_uncompressed = (sum([count for c, count in compression_counts.items() if c != Compression.NONE]) == 0) 1181 if not all_uncompressed: 1182 s += 'uncompressed: %d\n' % sum((h.uncompressed_size for h in self._chunk_headers.values())) 1183 s += 'compressed: %d\n' % sum((h.compressed_size for h in self._chunk_headers.values())) 1184 1185 s += self._encryptor.get_info_str() 1186 1187 datatypes = set() 1188 datatype_infos = [] 1189 for connection in self._connections.values(): 1190 if connection.datatype in datatypes: 1191 continue 1192 datatype_infos.append((connection.datatype, connection.md5sum, connection.msg_def)) 1193 datatypes.add(connection.datatype) 1194 1195 topics = sorted(set([c.topic for c in self._get_connections()])) 1196 topic_datatypes = {} 1197 topic_conn_counts = {} 1198 topic_msg_counts = {} 1199 topic_freqs_median = {} 1200 for topic in topics: 1201 connections = list(self._get_connections(topic)) 1202 1203 topic_datatypes[topic] = connections[0].datatype 1204 topic_conn_counts[topic] = len(connections) 1205 1206 msg_count = 0 1207 for connection in connections: 1208 for chunk in self._chunks: 1209 msg_count += chunk.connection_counts.get(connection.id, 0) 1210 topic_msg_counts[topic] = msg_count 1211 1212 if self._connection_indexes_read: 1213 stamps = [entry.time.to_sec() for entry in self._get_entries(connections)] 1214 if len(stamps) > 1: 1215 periods = [s1 - s0 for s1, s0 in zip(stamps[1:], stamps[:-1])] 1216 med_period = _median(periods) 1217 if med_period > 0.0: 1218 topic_freqs_median[topic] = 1.0 / med_period 1219 1220 topics = sorted(topic_datatypes.keys()) 1221 max_topic_len = max([len(topic) for topic in topics]) 1222 max_datatype_len = max([len(datatype) for datatype in datatypes]) 1223 max_msg_count_len = max([len('%d' % msg_count) for msg_count in topic_msg_counts.values()]) 1224 max_freq_median_len = max([len(_human_readable_frequency(freq)) for freq in topic_freqs_median.values()]) if len(topic_freqs_median) > 0 else 0 1225 1226 # Show datatypes 1227 s += 'types:\n' 1228 for i, (datatype, md5sum, msg_def) in enumerate(sorted(datatype_infos)): 1229 s += ' - type: %s\n' % datatype 1230 s += ' md5: %s\n' % md5sum 1231 1232 # Show topics 1233 s += 'topics:\n' 1234 for i, topic in enumerate(topics): 1235 topic_msg_count = topic_msg_counts[topic] 1236 1237 s += ' - topic: %s\n' % topic 1238 s += ' type: %s\n' % topic_datatypes[topic] 1239 s += ' messages: %d\n' % topic_msg_count 1240 1241 if topic_conn_counts[topic] > 1: 1242 s += ' connections: %d\n' % topic_conn_counts[topic] 1243 1244 if topic in topic_freqs_median: 1245 s += ' frequency: %.4f\n' % topic_freqs_median[topic] 1246 1247 if not key: 1248 return s 1249 1250 class DictObject(object): 1251 def __init__(self, d): 1252 for a, b in d.items(): 1253 if isinstance(b, (list, tuple)): 1254 setattr(self, a, [DictObject(x) if isinstance(x, dict) else x for x in b]) 1255 else: 1256 setattr(self, a, DictObject(b) if isinstance(b, dict) else b)
1257 1258 obj = DictObject(yaml.safe_load(s)) 1259 try: 1260 val = eval('obj.' + key) 1261 except Exception as ex: 1262 print('Error getting key "%s"' % key, file=sys.stderr) 1263 return None 1264 1265 def print_yaml(val, indent=0): 1266 indent_str = ' ' * indent 1267 1268 if type(val) is list: 1269 s = '' 1270 for item in val: 1271 s += '%s- %s\n' % (indent_str, print_yaml(item, indent + 1)) 1272 return s 1273 elif type(val) is DictObject: 1274 s = '' 1275 for i, (k, v) in enumerate(val.__dict__.items()): 1276 if i != 0: 1277 s += indent_str 1278 s += '%s: %s' % (k, str(v)) 1279 if i < len(val.__dict__) - 1: 1280 s += '\n' 1281 return s 1282 else: 1283 return indent_str + str(val) 1284 1285 return print_yaml(val) 1286 1287 except Exception as ex: 1288 raise 1289 1290 ### Internal API ### 1291 1292 @property
1293 - def _has_compressed_chunks(self):
1294 if not self._chunk_headers: 1295 return False 1296 1297 return any((h.compression != Compression.NONE for h in self._chunk_headers.values()))
1298 1299 @property
1300 - def _uncompressed_size(self):
1301 if not self._chunk_headers: 1302 return self.size 1303 1304 return sum((h.uncompressed_size for h in self._chunk_headers.values()))
1305
1306 - def _read_message(self, position, raw=False, return_connection_header=False):
1307 """ 1308 Read the message from the given position in the file. 1309 """ 1310 self.flush() 1311 return self._reader.seek_and_read_message_data_record(position, raw, return_connection_header)
1312 1313 # Index accessing 1314
1315 - def _get_connections(self, topics=None, connection_filter=None):
1316 """ 1317 Yield the connections, optionally filtering by topic and/or connection information. 1318 """ 1319 if topics: 1320 if type(topics) is str: 1321 topics = set([roslib.names.canonicalize_name(topics)]) 1322 else: 1323 topics = set([roslib.names.canonicalize_name(t) for t in topics]) 1324 1325 for c in self._connections.values(): 1326 if topics and c.topic not in topics and roslib.names.canonicalize_name(c.topic) not in topics: 1327 continue 1328 if connection_filter and not connection_filter(c.topic, c.datatype, c.md5sum, c.msg_def, c.header): 1329 continue 1330 yield c
1331
1332 - def _get_entries(self, connections=None, start_time=None, end_time=None):
1333 """ 1334 Yield index entries on the given connections in the given time range. 1335 """ 1336 for entry, _ in _mergesort(self._get_indexes(connections), key=lambda entry: entry.time): 1337 if start_time and entry.time < start_time: 1338 continue 1339 if end_time and entry.time > end_time: 1340 return 1341 yield entry
1342
1343 - def _get_entries_reverse(self, connections=None, start_time=None, end_time=None):
1344 """ 1345 Yield index entries on the given connections in the given time range in reverse order. 1346 """ 1347 for entry, _ in _mergesort((reversed(index) for index in self._get_indexes(connections)), key=lambda entry: -entry.time.to_sec()): 1348 if end_time and entry.time > end_time: 1349 continue 1350 if start_time and entry.time < start_time: 1351 return 1352 yield entry
1353
1354 - def _get_entry(self, t, connections=None):
1355 """ 1356 Return the first index entry on/before the given time on the given connections 1357 """ 1358 indexes = self._get_indexes(connections) 1359 1360 entry = _IndexEntry(t) 1361 1362 first_entry = None 1363 1364 for index in indexes: 1365 i = bisect.bisect_right(index, entry) - 1 1366 if i >= 0: 1367 index_entry = index[i] 1368 if first_entry is None or index_entry > first_entry: 1369 first_entry = index_entry 1370 1371 return first_entry
1372
1373 - def _get_entry_after(self, t, connections=None):
1374 """ 1375 Return the first index entry after the given time on the given connections 1376 """ 1377 indexes = self._get_indexes(connections) 1378 1379 entry = _IndexEntry(t) 1380 1381 first_entry = None 1382 1383 for index in indexes: 1384 i = bisect.bisect_right(index, entry) 1385 if i <= len(index) - 1: 1386 index_entry = index[i] 1387 if first_entry is None or index_entry < first_entry: 1388 first_entry = index_entry 1389 1390 return first_entry
1391
1392 - def _get_indexes(self, connections):
1393 """ 1394 Get the indexes for the given connections. 1395 """ 1396 if not self._connection_indexes_read: 1397 self._reader._read_connection_index_records() 1398 1399 if connections is None: 1400 return self._connection_indexes.values() 1401 else: 1402 return (self._connection_indexes[c.id] for c in connections)
1403 1404 ### Implementation ### 1405
1406 - def _clear_index(self):
1407 self._connection_indexes_read = False 1408 self._connection_indexes = {} # id -> IndexEntry[] (1.2+) 1409 1410 self._topic_connections = {} # topic -> connection_id 1411 self._connections = {} # id -> ConnectionInfo 1412 self._connection_count = 0 # (2.0) 1413 self._chunk_count = 0 # (2.0) 1414 self._chunks = [] # ChunkInfo[] (2.0) 1415 self._chunk_headers = {} # chunk_pos -> ChunkHeader (2.0) 1416 1417 self._chunk_open = False 1418 self._curr_chunk_info = None 1419 self._curr_chunk_data_pos = None 1420 self._curr_chunk_connection_indexes = {}
1421
1422 - def _open(self, f, mode, allow_unindexed):
1423 if not f: 1424 raise ValueError('filename (or stream) is invalid') 1425 1426 try: 1427 if mode == 'r': self._open_read(f, allow_unindexed) 1428 elif mode == 'w': self._open_write(f) 1429 elif mode == 'a': self._open_append(f, allow_unindexed) 1430 else: 1431 raise ValueError('mode "%s" is invalid' % mode) 1432 if 'b' not in self._file.mode and not isinstance('', bytes): 1433 raise ROSBagException('In Python 3 the bag file must be opened in binary mode') 1434 except struct.error: 1435 raise ROSBagFormatException('error with bag')
1436
1437 - def _is_file(self, f):
1438 try: 1439 return isinstance(f, file) # Python 2 1440 except NameError: 1441 import io 1442 return isinstance(f, io.IOBase) # Python 3...this will return false in Python 2 always
1443
1444 - def _open_read(self, f, allow_unindexed):
1445 if self._is_file(f): 1446 self._file = f 1447 self._filename = None 1448 else: 1449 self._file = open(f, 'rb') 1450 self._filename = f 1451 1452 self._mode = 'r' 1453 1454 try: 1455 self._version = self._read_version() 1456 except: 1457 self._version = None 1458 self._close_file() 1459 raise 1460 1461 try: 1462 self._create_reader() 1463 self._reader.start_reading() 1464 except ROSBagUnindexedException as ex: 1465 if not allow_unindexed: 1466 self._close_file() 1467 raise 1468 except: 1469 self._close_file() 1470 raise
1471
1472 - def _open_write(self, f):
1473 if self._is_file(f): 1474 self._file = f 1475 self._filename = None 1476 else: 1477 self._file = open(f, 'w+b') 1478 self._filename = f 1479 1480 self._mode = 'w' 1481 1482 self._version = 200 1483 1484 try: 1485 self._create_reader() 1486 self._start_writing() 1487 except: 1488 self._close_file() 1489 raise
1490
1491 - def _open_append(self, f, allow_unindexed):
1492 if self._is_file(f): 1493 self._file = f 1494 self._filename = None 1495 else: 1496 try: 1497 # Test if the file already exists 1498 open(f, 'r').close() 1499 1500 # File exists: open in read with update mode 1501 self._file = open(f, 'r+b') 1502 except IOError: 1503 # File doesn't exist: open in write mode 1504 self._file = open(f, 'w+b') 1505 1506 self._filename = f 1507 1508 self._mode = 'a' 1509 1510 try: 1511 self._version = self._read_version() 1512 except: 1513 self._version = None 1514 self._close_file() 1515 raise 1516 1517 if self._version != 200: 1518 self._file.close() 1519 raise ROSBagException('bag version %d is unsupported for appending' % self._version) 1520 1521 try: 1522 self._start_appending() 1523 except ROSBagUnindexedException: 1524 if not allow_unindexed: 1525 self._close_file() 1526 raise 1527 except: 1528 self._close_file() 1529 raise
1530
1531 - def _close_file(self):
1532 self._file.close() 1533 self._file = None
1534
1535 - def _create_reader(self):
1536 """ 1537 @raise ROSBagException: if the bag version is unsupported 1538 """ 1539 major_version, minor_version = int(self._version / 100), self._version % 100 1540 if major_version == 2: 1541 self._reader = _BagReader200(self) 1542 elif major_version == 1: 1543 if minor_version == 1: 1544 raise ROSBagException('unsupported bag version %d. Please convert bag to newer format.' % self._version) 1545 else: 1546 # Get the op code of the first record. If it's FILE_HEADER, then we have an indexed 1.2 bag. 1547 first_record_pos = self._file.tell() 1548 header = _read_header(self._file) 1549 op = _read_uint8_field(header, 'op') 1550 self._file.seek(first_record_pos) 1551 1552 if op == _OP_FILE_HEADER: 1553 self._reader = _BagReader102_Indexed(self) 1554 else: 1555 self._reader = _BagReader102_Unindexed(self) 1556 else: 1557 raise ROSBagException('unknown bag version %d' % self._version)
1558
1559 - def _read_version(self):
1560 """ 1561 @raise ROSBagException: if the file is empty, or the version line can't be parsed 1562 """ 1563 version_line = self._file.readline().rstrip().decode() 1564 if len(version_line) == 0: 1565 raise ROSBagException('empty file') 1566 1567 matches = re.match("#ROS(.*) V(\d).(\d)", version_line) 1568 if matches is None or len(matches.groups()) != 3: 1569 raise ROSBagException('This does not appear to be a bag file') 1570 1571 version_type, major_version_str, minor_version_str = matches.groups() 1572 1573 version = int(major_version_str) * 100 + int(minor_version_str) 1574 1575 return version
1576
1577 - def _start_writing(self):
1578 version = _VERSION + '\n' 1579 version = version.encode() 1580 self._file.write(version) 1581 self._file_header_pos = self._file.tell() 1582 self._write_file_header_record(0, 0, 0) 1583
1584 - def _stop_writing(self):
1585 # Write the open chunk (if any) to file 1586 self.flush() 1587 1588 # When read and write operations are mixed (e.g. bags in 'a' mode might be opened in 'r+b' mode) 1589 # it could cause problems on Windows: 1590 # https://stackoverflow.com/questions/14279658/mixing-read-and-write-on-python-files-in-windows 1591 # to fix this, f.seek(0, os.SEEK_CUR) needs to be added after a read() before the next write() 1592 self._file.seek(0, os.SEEK_CUR) 1593 1594 # Remember this location as the start of the index 1595 self._index_data_pos = self._file.tell() 1596 1597 # Write connection infos 1598 for connection_info in self._connections.values(): 1599 # Encrypt connection records in index data (encrypt: True) 1600 self._write_connection_record(connection_info, True) 1601 1602 # Write chunk infos 1603 for chunk_info in self._chunks: 1604 self._write_chunk_info_record(chunk_info) 1605 1606 # Re-write the file header 1607 self._file.seek(self._file_header_pos) 1608 self._write_file_header_record(self._index_data_pos, len(self._connections), len(self._chunks))
1609
1610 - def _start_appending(self):
1611 self._file_header_pos = self._file.tell() 1612 1613 self._create_reader() 1614 self._reader.start_reading() 1615 1616 # Truncate the file to chop off the index 1617 self._file.truncate(self._index_data_pos) 1618 self._reader.index_data_pos = 0 1619 1620 # Rewrite the file header, clearing the index position (so we know if the index is invalid) 1621 self._file.seek(self._file_header_pos); 1622 self._write_file_header_record(0, 0, 0) 1623 1624 # Seek to the end of the file 1625 self._file.seek(0, os.SEEK_END)
1626
1627 - def _start_writing_chunk(self, t):
1628 self._curr_chunk_info = _ChunkInfo(self._file.tell(), t, t) 1629 self._write_chunk_header(_ChunkHeader(self._compression, 0, 0)) 1630 self._curr_chunk_data_pos = self._file.tell() 1631 self._set_compression_mode(self._compression) 1632 self._chunk_open = True
1633
1634 - def _get_chunk_offset(self):
1635 if self._compression == Compression.NONE: 1636 return self._file.tell() - self._curr_chunk_data_pos 1637 else: 1638 return self._output_file.compressed_bytes_in
1639
1640 - def _stop_writing_chunk(self):
1641 # Add this chunk to the index 1642 self._chunks.append(self._curr_chunk_info) 1643 1644 # Get the uncompressed and compressed sizes 1645 uncompressed_size = self._get_chunk_offset() 1646 self._set_compression_mode(Compression.NONE) 1647 compressed_size = self._file.tell() - self._curr_chunk_data_pos 1648 1649 # When encryption is on, compressed_size represents encrypted chunk size; 1650 # When decrypting, the actual compressed size can be deduced from the decrypted chunk 1651 compressed_size = self._encryptor.encrypt_chunk(compressed_size, self._curr_chunk_data_pos, self._file) 1652 1653 # Rewrite the chunk header with the size of the chunk (remembering current offset) 1654 end_of_chunk_pos = self._file.tell() 1655 self._file.seek(self._curr_chunk_info.pos) 1656 chunk_header = _ChunkHeader(self._compression, compressed_size, uncompressed_size, self._curr_chunk_data_pos) 1657 self._write_chunk_header(chunk_header) 1658 self._chunk_headers[self._curr_chunk_info.pos] = chunk_header 1659 1660 # Write out the connection indexes and clear them 1661 self._file.seek(end_of_chunk_pos) 1662 for connection_id, entries in self._curr_chunk_connection_indexes.items(): 1663 self._write_connection_index_record(connection_id, entries) 1664 self._curr_chunk_connection_indexes.clear() 1665 1666 # Flag that we're starting a new chunk 1667 self._chunk_open = False
1668
1669 - def _set_compression_mode(self, compression):
1670 # Flush the compressor, if needed 1671 if self._curr_compression != Compression.NONE: 1672 self._output_file.flush() 1673 1674 # Create the compressor 1675 if compression == Compression.BZ2: 1676 self._output_file = _CompressorFileFacade(self._file, bz2.BZ2Compressor()) 1677 elif compression == Compression.LZ4 and found_lz4: 1678 self._output_file = _CompressorFileFacade(self._file, roslz4.LZ4Compressor()) 1679 elif compression == Compression.NONE: 1680 self._output_file = self._file 1681 else: 1682 raise ROSBagException('unsupported compression type: %s' % compression) 1683 1684 self._curr_compression = compression
1685
1686 - def _write_file_header_record(self, index_pos, connection_count, chunk_count):
1687 header = { 1688 'op': _pack_uint8(_OP_FILE_HEADER), 1689 'index_pos': _pack_uint64(index_pos), 1690 'conn_count': _pack_uint32(connection_count), 1691 'chunk_count': _pack_uint32(chunk_count) 1692 } 1693 self._encryptor.add_fields_to_file_header(header) 1694 _write_record(self._file, header, padded_size=_FILE_HEADER_LENGTH)
1695
1696 - def _write_connection_record(self, connection_info, encrypt):
1697 header = { 1698 'op': _pack_uint8(_OP_CONNECTION), 1699 'topic': connection_info.topic, 1700 'conn': _pack_uint32(connection_info.id) 1701 } 1702 if encrypt: 1703 self._encryptor.write_encrypted_header(_write_header, self._output_file, header) 1704 else: 1705 _write_header(self._output_file, header) 1706 1707 if encrypt: 1708 self._encryptor.write_encrypted_header(_write_header, self._output_file, connection_info.header) 1709 else: 1710 _write_header(self._output_file, connection_info.header)
1711
1712 - def _write_message_data_record(self, connection_id, t, serialized_bytes):
1713 header = { 1714 'op': _pack_uint8(_OP_MSG_DATA), 1715 'conn': _pack_uint32(connection_id), 1716 'time': _pack_time(t) 1717 } 1718 _write_record(self._output_file, header, serialized_bytes)
1719
1720 - def _write_chunk_header(self, chunk_header):
1721 header = { 1722 'op': _pack_uint8(_OP_CHUNK), 1723 'compression': chunk_header.compression, 1724 'size': _pack_uint32(chunk_header.uncompressed_size) 1725 } 1726 _write_header(self._file, header) 1727 1728 self._file.write(_pack_uint32(chunk_header.compressed_size))
1729
1730 - def _write_connection_index_record(self, connection_id, entries):
1731 header = { 1732 'op': _pack_uint8(_OP_INDEX_DATA), 1733 'conn': _pack_uint32(connection_id), 1734 'ver': _pack_uint32(_INDEX_VERSION), 1735 'count': _pack_uint32(len(entries)) 1736 } 1737 1738 buffer = self._buffer 1739 buffer.seek(0) 1740 buffer.truncate(0) 1741 for entry in entries: 1742 buffer.write(_pack_time (entry.time)) 1743 buffer.write(_pack_uint32(entry.offset)) 1744 1745 _write_record(self._file, header, buffer.getvalue()) 1746
1747 - def _write_chunk_info_record(self, chunk_info):
1748 header = { 1749 'op': _pack_uint8 (_OP_CHUNK_INFO), 1750 'ver': _pack_uint32(_CHUNK_INDEX_VERSION), 1751 'chunk_pos': _pack_uint64(chunk_info.pos), 1752 'start_time': _pack_time(chunk_info.start_time), 1753 'end_time': _pack_time(chunk_info.end_time), 1754 'count': _pack_uint32(len(chunk_info.connection_counts)) 1755 } 1756 1757 buffer = self._buffer 1758 buffer.seek(0) 1759 buffer.truncate(0) 1760 for connection_id, count in chunk_info.connection_counts.items(): 1761 buffer.write(_pack_uint32(connection_id)) 1762 buffer.write(_pack_uint32(count)) 1763 1764 _write_record(self._file, header, buffer.getvalue())
1765 1766 ### Implementation ### 1767 1768 _message_types = {} # md5sum -> type 1769 1770 _OP_MSG_DEF = 0x01 1771 _OP_MSG_DATA = 0x02 1772 _OP_FILE_HEADER = 0x03 1773 _OP_INDEX_DATA = 0x04 1774 _OP_CHUNK = 0x05 1775 _OP_CHUNK_INFO = 0x06 1776 _OP_CONNECTION = 0x07 1777 1778 _OP_CODES = { 1779 _OP_MSG_DEF: 'MSG_DEF', 1780 _OP_MSG_DATA: 'MSG_DATA', 1781 _OP_FILE_HEADER: 'FILE_HEADER', 1782 _OP_INDEX_DATA: 'INDEX_DATA', 1783 _OP_CHUNK: 'CHUNK', 1784 _OP_CHUNK_INFO: 'CHUNK_INFO', 1785 _OP_CONNECTION: 'CONNECTION' 1786 } 1787 1788 _VERSION = '#ROSBAG V2.0' 1789 _FILE_HEADER_LENGTH = 4096 1790 _INDEX_VERSION = 1 1791 _CHUNK_INDEX_VERSION = 1
1792 1793 -class _ConnectionInfo(object):
1794 - def __init__(self, id, topic, header):
1795 try: 1796 datatype = _read_str_field(header, 'type') 1797 md5sum = _read_str_field(header, 'md5sum') 1798 msg_def = _read_str_field(header, 'message_definition') 1799 except KeyError as ex: 1800 raise ROSBagFormatException('connection header field %s not found' % str(ex)) 1801 1802 self.id = id 1803 self.topic = topic 1804 self.datatype = datatype 1805 self.md5sum = md5sum 1806 self.msg_def = msg_def 1807 self.header = header
1808
1809 - def __str__(self):
1810 return '%d on %s: %s' % (self.id, self.topic, str(self.header))
1811
1812 -class _ChunkInfo(object):
1813 - def __init__(self, pos, start_time, end_time):
1814 self.pos = pos 1815 self.start_time = start_time 1816 self.end_time = end_time 1817 1818 self.connection_counts = {}
1819
1820 - def __str__(self):
1821 s = 'chunk_pos: %d\n' % self.pos 1822 s += 'start_time: %s\n' % str(self.start_time) 1823 s += 'end_time: %s\n' % str(self.end_time) 1824 s += 'connections: %d\n' % len(self.connection_counts) 1825 s += '\n'.join([' - [%4d] %d' % (connection_id, count) for connection_id, count in self.connection_counts.items()]) 1826 return s
1827
1828 -class _ChunkHeader(object):
1829 - def __init__(self, compression, compressed_size, uncompressed_size, data_pos=0):
1830 self.compression = compression 1831 self.compressed_size = compressed_size 1832 self.uncompressed_size = uncompressed_size 1833 self.data_pos = data_pos
1834
1835 - def __str__(self):
1836 if self.uncompressed_size > 0: 1837 ratio = 100 * (float(self.compressed_size) / self.uncompressed_size) 1838 return 'compression: %s, size: %d, uncompressed: %d (%.2f%%)' % (self.compression, self.compressed_size, self.uncompressed_size, ratio) 1839 else: 1840 return 'compression: %s, size: %d, uncompressed: %d' % (self.compression, self.compressed_size, self.uncompressed_size)
1841
1842 -class ComparableMixin(object):
1843 __slots__ = [] 1844
1845 - def _compare(self, other, method):
1846 try: 1847 return method(self._cmpkey(), other._cmpkey()) 1848 except (AttributeError, TypeError): 1849 # _cmpkey not implemented or return different type 1850 # so can not compare with other 1851 return NotImplemented
1852
1853 - def __lt__(self, other):
1854 return self._compare(other, lambda s, o: s < o)
1855
1856 - def __le__(self, other):
1857 return self._compare(other, lambda s, o: s <= o)
1858
1859 - def __eq__(self, other):
1860 return self._compare(other, lambda s, o: s == o)
1861
1862 - def __ge__(self, other):
1863 return self._compare(other, lambda s, o: s >= o)
1864
1865 - def __gt__(self, other):
1866 return self._compare(other, lambda s, o: s > o)
1867
1868 - def __ne__(self, other):
1869 return self._compare(other, lambda s, o: s != o)
1870
1871 -class _IndexEntry(ComparableMixin):
1872 __slots__ = ['time'] 1873
1874 - def __init__(self, time):
1875 self.time = time
1876
1877 - def _cmpkey(self):
1878 return self.time
1879
1880 -class _IndexEntry102(_IndexEntry):
1881 __slots__ = ['offset'] 1882
1883 - def __init__(self, time, offset):
1884 self.time = time 1885 self.offset = offset
1886 1887 @property
1888 - def position(self):
1889 return self.offset
1890
1891 - def __str__(self):
1892 return '%d.%d: %d' % (self.time.secs, self.time.nsecs, self.offset)
1893
1894 -class _IndexEntry200(_IndexEntry):
1895 __slots__ = ['chunk_pos', 'offset'] 1896
1897 - def __init__(self, time, chunk_pos, offset):
1898 self.time = time 1899 self.chunk_pos = chunk_pos 1900 self.offset = offset
1901 1902 @property
1903 - def position(self):
1904 return (self.chunk_pos, self.offset)
1905
1906 - def __str__(self):
1907 return '%d.%d: %d+%d' % (self.time.secs, self.time.nsecs, self.chunk_pos, self.offset)
1908
1909 -def _get_message_type(info):
1910 message_type = _message_types.get(info.md5sum) 1911 if message_type is None: 1912 try: 1913 message_type = genpy.dynamic.generate_dynamic(info.datatype, info.msg_def)[info.datatype] 1914 if (message_type._md5sum != info.md5sum): 1915 print('WARNING: For type [%s] stored md5sum [%s] does not match message definition [%s].\n Try: "rosrun rosbag fix_msg_defs.py old_bag new_bag."'%(info.datatype, info.md5sum, message_type._md5sum), file=sys.stderr) 1916 except genmsg.InvalidMsgSpec: 1917 message_type = genpy.dynamic.generate_dynamic(info.datatype, "")[info.datatype] 1918 print('WARNING: For type [%s] stored md5sum [%s] has invalid message definition."'%(info.datatype, info.md5sum), file=sys.stderr) 1919 except genmsg.MsgGenerationException as ex: 1920 raise ROSBagException('Error generating datatype %s: %s' % (info.datatype, str(ex))) 1921 1922 _message_types[info.md5sum] = message_type 1923 1924 return message_type
1925
1926 -def _read_uint8 (f): return _unpack_uint8 (f.read(1))
1927 -def _read_uint32(f): return _unpack_uint32(f.read(4))
1928 -def _read_uint64(f): return _unpack_uint64(f.read(8))
1929 -def _read_time (f): return _unpack_time (f.read(8))
1930
1931 -def _decode_bytes(v): return v
1932 -def _decode_str(v): return v if type(v) is str else v.decode()
1933 -def _unpack_uint8(v): return struct.unpack('<B', v)[0]
1934 -def _unpack_uint32(v): return struct.unpack('<L', v)[0]
1935 -def _unpack_uint64(v): return struct.unpack('<Q', v)[0]
1936 -def _unpack_time(v): return rospy.Time(*struct.unpack('<LL', v))
1937
1938 -def _pack_uint8(v): return struct.pack('<B', v)
1939 -def _pack_uint32(v): return struct.pack('<L', v)
1940 -def _pack_uint64(v): return struct.pack('<Q', v)
1941 -def _pack_time(v): return _pack_uint32(v.secs) + _pack_uint32(v.nsecs)
1942
1943 -def _read(f, size):
1944 data = f.read(size) 1945 if len(data) != size: 1946 raise ROSBagException('expecting %d bytes, read %d' % (size, len(data))) 1947 return data
1948
1949 -def _skip_record(f):
1950 _skip_sized(f) # skip header 1951 _skip_sized(f) # skip data
1952
1953 -def _skip_sized(f):
1954 size = _read_uint32(f) 1955 f.seek(size, os.SEEK_CUR)
1956
1957 -def _read_sized(f):
1958 try: 1959 size = _read_uint32(f) 1960 except struct.error as ex: 1961 raise ROSBagFormatException('error unpacking uint32: %s' % str(ex)) 1962 return _read(f, size)
1963
1964 -def _write_sized(f, v):
1965 if not isinstance(v, bytes): 1966 v = v.encode() 1967 f.write(_pack_uint32(len(v))) 1968 f.write(v)
1969
1970 -def _read_field(header, field, unpack_fn):
1971 if field not in header: 1972 raise ROSBagFormatException('expected "%s" field in record' % field) 1973 1974 try: 1975 value = unpack_fn(header[field]) 1976 except Exception as ex: 1977 raise ROSBagFormatException('error reading field "%s": %s' % (field, str(ex))) 1978 1979 return value
1980
1981 -def _read_bytes_field (header, field): return _read_field(header, field, _decode_bytes)
1982 -def _read_str_field (header, field): return _read_field(header, field, _decode_str)
1983 -def _read_uint8_field (header, field): return _read_field(header, field, _unpack_uint8)
1984 -def _read_uint32_field(header, field): return _read_field(header, field, _unpack_uint32)
1985 -def _read_uint64_field(header, field): return _read_field(header, field, _unpack_uint64)
1986 -def _read_time_field (header, field): return _read_field(header, field, _unpack_time)
1987
1988 -def _write_record(f, header, data='', padded_size=None):
1989 header_str = _write_header(f, header) 1990 1991 if padded_size is not None: 1992 header_len = len(header_str) 1993 if header_len < padded_size: 1994 data = ' ' * (padded_size - header_len) 1995 else: 1996 data = '' 1997 1998 _write_sized(f, data)
1999
2000 -def _write_header(f, header):
2001 header_str = b'' 2002 equal = b'=' 2003 for k, v in header.items(): 2004 if not isinstance(k, bytes): 2005 k = k.encode() 2006 if not isinstance(v, bytes): 2007 v = v.encode() 2008 header_str += _pack_uint32(len(k) + 1 + len(v)) + k + equal + v 2009 _write_sized(f, header_str) 2010 return header_str
2011
2012 -def _read_header(f, req_op=None):
2013 bag_pos = f.tell() 2014 2015 # Read header 2016 try: 2017 header = _read_sized(f) 2018 except ROSBagException as ex: 2019 raise ROSBagFormatException('Error reading header: %s' % str(ex)) 2020 2021 return _build_header_from_str(header, req_op)
2022
2023 -def _build_header_from_str(header, req_op):
2024 # Parse header into a dict 2025 header_dict = {} 2026 while header != b'': 2027 # Read size 2028 if len(header) < 4: 2029 raise ROSBagFormatException('Error reading header field') 2030 (size,) = struct.unpack('<L', header[:4]) # @todo reindex: catch struct.error 2031 header = header[4:] 2032 2033 # Read bytes 2034 if len(header) < size: 2035 raise ROSBagFormatException('Error reading header field: expected %d bytes, read %d' % (size, len(header))) 2036 (name, sep, value) = header[:size].partition(b'=') 2037 if sep == b'': 2038 raise ROSBagFormatException('Error reading header field') 2039 2040 name = name.decode() 2041 header_dict[name] = value # @todo reindex: raise exception on empty name 2042 2043 header = header[size:] 2044 2045 # Check the op code of the header, if supplied 2046 if req_op is not None: 2047 op = _read_uint8_field(header_dict, 'op') 2048 if req_op != op: 2049 raise ROSBagFormatException('Expected op code: %s, got %s' % (_OP_CODES[req_op], _OP_CODES[op])) 2050 2051 return header_dict
2052
2053 -def _peek_next_header_op(f):
2054 pos = f.tell() 2055 header = _read_header(f) 2056 op = _read_uint8_field(header, 'op') 2057 f.seek(pos) 2058 return op
2059
2060 -def _read_record_data(f):
2061 try: 2062 record_data = _read_sized(f) 2063 except ROSBagException as ex: 2064 raise ROSBagFormatException('Error reading record data: %s' % str(ex)) 2065 2066 return record_data
2067
2068 -class _BagReader(object):
2069 - def __init__(self, bag):
2070 self.bag = bag
2071
2072 - def start_reading(self):
2073 raise NotImplementedError()
2074
2075 - def read_messages(self, topics, start_time, end_time, connection_filter, raw, return_connection_header=False):
2076 raise NotImplementedError()
2077
2078 - def reindex(self):
2079 raise NotImplementedError()
2080
2081 -class _BagReader102_Unindexed(_BagReader):
2082 """ 2083 Support class for reading unindexed v1.2 bag files. 2084 """
2085 - def __init__(self, bag):
2086 _BagReader.__init__(self, bag)
2087
2088 - def start_reading(self):
2089 self.bag._file_header_pos = self.bag._file.tell()
2090
2091 - def reindex(self):
2092 """Generates all bag index information by rereading the message records.""" 2093 f = self.bag._file 2094 2095 total_bytes = self.bag.size 2096 2097 # Re-read the file header to get to the start of the first message 2098 self.bag._file.seek(self.bag._file_header_pos) 2099 2100 offset = f.tell() 2101 2102 # Read message definition and data records 2103 while offset < total_bytes: 2104 yield offset 2105 2106 op = _peek_next_header_op(f) 2107 2108 if op == _OP_MSG_DEF: 2109 connection_info = self.read_message_definition_record() 2110 2111 if connection_info.topic not in self.bag._topic_connections: 2112 self.bag._topic_connections[connection_info.topic] = connection_info.id 2113 self.bag._connections[connection_info.id] = connection_info 2114 self.bag._connection_indexes[connection_info.id] = [] 2115 2116 elif op == _OP_MSG_DATA: 2117 # Read the topic and timestamp from the header 2118 header = _read_header(f) 2119 2120 topic = _read_str_field(header, 'topic') 2121 secs = _read_uint32_field(header, 'sec') 2122 nsecs = _read_uint32_field(header, 'nsec') 2123 t = genpy.Time(secs, nsecs) 2124 2125 if topic not in self.bag._topic_connections: 2126 datatype = _read_str_field(header, 'type') 2127 self._create_connection_info_for_datatype(topic, datatype) 2128 2129 connection_id = self.bag._topic_connections[topic] 2130 info = self.bag._connections[connection_id] 2131 2132 # Skip over the message content 2133 _skip_sized(f) 2134 2135 # Insert the message entry (in order) into the connection index 2136 bisect.insort_right(self.bag._connection_indexes[connection_id], _IndexEntry102(t, offset)) 2137 2138 offset = f.tell()
2139
2140 - def read_messages(self, topics, start_time, end_time, topic_filter, raw, return_connection_header=False):
2141 f = self.bag._file 2142 2143 f.seek(self.bag._file_header_pos) 2144 2145 while True: 2146 # Read MSG_DEF records 2147 while True: 2148 position = f.tell() 2149 2150 try: 2151 header = _read_header(f) 2152 except Exception: 2153 return 2154 2155 op = _read_uint8_field(header, 'op') 2156 if op != _OP_MSG_DEF: 2157 break 2158 2159 connection_info = self.read_message_definition_record(header) 2160 2161 if connection_info.topic not in self.bag._topic_connections: 2162 self.bag._topic_connections[connection_info.topic] = connection_info.id 2163 2164 self.bag._connections[connection_info.id] = connection_info 2165 2166 # Check that we have a MSG_DATA record 2167 if op != _OP_MSG_DATA: 2168 raise ROSBagFormatException('Expecting OP_MSG_DATA, got %d' % op) 2169 2170 topic = _read_str_field(header, 'topic') 2171 2172 if topic not in self.bag._topic_connections: 2173 datatype = _read_str_field(header, 'type') 2174 self._create_connection_info_for_datatype(topic, datatype) 2175 2176 connection_id = self.bag._topic_connections[topic] 2177 info = self.bag._connections[connection_id] 2178 2179 # Get the message type 2180 try: 2181 msg_type = _get_message_type(info) 2182 except KeyError: 2183 raise ROSBagException('Cannot deserialize messages of type [%s]. Message was not preceded in bagfile by definition' % info.datatype) 2184 2185 # Get the timestamp 2186 secs = _read_uint32_field(header, 'sec') 2187 nsecs = _read_uint32_field(header, 'nsec') 2188 t = genpy.Time(secs, nsecs) 2189 2190 # Read the message content 2191 data = _read_record_data(f) 2192 2193 if raw: 2194 msg = (info.datatype, data, info.md5sum, position, msg_type) 2195 else: 2196 # Deserialize the message 2197 msg = msg_type() 2198 msg.deserialize(data) 2199 2200 if return_connection_header: 2201 yield BagMessageWithConnectionHeader(topic, msg, t, info.header) 2202 else: 2203 yield BagMessage(topic, msg, t) 2204 2205 self.bag._connection_indexes_read = True
2206
2207 - def _create_connection_info_for_datatype(self, topic, datatype):
2208 for c in self.bag._connections.values(): 2209 if c.datatype == datatype: 2210 connection_id = len(self.bag._connections) 2211 connection_header = { 'topic' : topic, 'type' : c.header['type'], 'md5sum' : c.header['md5sum'], 'message_definition' : c.header['message_definition'] } 2212 connection_info = _ConnectionInfo(connection_id, topic, connection_header) 2213 2214 self.bag._topic_connections[topic] = connection_id 2215 self.bag._connections[connection_id] = connection_info 2216 self.bag._connection_indexes[connection_id] = [] 2217 return 2218 2219 raise ROSBagFormatException('Topic %s of datatype %s not preceded by message definition' % (topic, datatype))
2220
2221 - def read_message_definition_record(self, header=None):
2222 if not header: 2223 header = _read_header(self.bag._file, _OP_MSG_DEF) 2224 2225 topic = _read_str_field(header, 'topic') 2226 datatype = _read_str_field(header, 'type') 2227 md5sum = _read_str_field(header, 'md5') 2228 msg_def = _read_str_field(header, 'def') 2229 2230 _skip_sized(self.bag._file) # skip the record data 2231 2232 connection_header = { 'topic' : topic, 'type' : datatype, 'md5sum' : md5sum, 'message_definition' : msg_def } 2233 2234 id = len(self.bag._connections) 2235 2236 return _ConnectionInfo(id, topic, connection_header)
2237
2238 -class _BagReader102_Indexed(_BagReader102_Unindexed):
2239 """ 2240 Support class for reading indexed v1.2 bag files. 2241 """
2242 - def __init__(self, bag):
2243 _BagReader.__init__(self, bag)
2244
2245 - def read_messages(self, topics, start_time, end_time, connection_filter, raw):
2246 connections = self.bag._get_connections(topics, connection_filter) 2247 for entry in self.bag._get_entries(connections, start_time, end_time): 2248 yield self.seek_and_read_message_data_record(entry.offset, raw)
2249
2250 - def reindex(self):
2251 """Generates all bag index information by rereading the message records.""" 2252 f = self.bag._file 2253 2254 total_bytes = self.bag.size 2255 2256 # Re-read the file header to get to the start of the first message 2257 self.bag._file.seek(self.bag._file_header_pos) 2258 self.read_file_header_record() 2259 2260 offset = f.tell() 2261 2262 # Read message definition and data records 2263 while offset < total_bytes: 2264 yield offset 2265 2266 op = _peek_next_header_op(f) 2267 2268 if op == _OP_MSG_DEF: 2269 connection_info = self.read_message_definition_record() 2270 2271 if connection_info.topic not in self.bag._topic_connections: 2272 self.bag._topic_connections[connection_info.topic] = connection_info.id 2273 self.bag._connections[connection_info.id] = connection_info 2274 self.bag._connection_indexes[connection_info.id] = [] 2275 2276 elif op == _OP_MSG_DATA: 2277 # Read the topic and timestamp from the header 2278 header = _read_header(f) 2279 2280 topic = _read_str_field(header, 'topic') 2281 secs = _read_uint32_field(header, 'sec') 2282 nsecs = _read_uint32_field(header, 'nsec') 2283 t = genpy.Time(secs, nsecs) 2284 2285 if topic not in self.bag._topic_connections: 2286 datatype = _read_str_field(header, 'type') 2287 self._create_connection_info_for_datatype(topic, datatype) 2288 2289 connection_id = self.bag._topic_connections[topic] 2290 info = self.bag._connections[connection_id] 2291 2292 # Skip over the message content 2293 _skip_sized(f) 2294 2295 # Insert the message entry (in order) into the connection index 2296 bisect.insort_right(self.bag._connection_indexes[connection_id], _IndexEntry102(t, offset)) 2297 2298 elif op == _OP_INDEX_DATA: 2299 _skip_record(f) 2300 2301 offset = f.tell()
2302
2303 - def start_reading(self):
2304 try: 2305 # Read the file header 2306 self.read_file_header_record() 2307 2308 total_bytes = self.bag.size 2309 2310 # Check if the index position has been written, i.e. the bag was closed successfully 2311 if self.bag._index_data_pos == 0: 2312 raise ROSBagUnindexedException() 2313 2314 # Seek to the beginning of the topic index records 2315 self.bag._file.seek(self.bag._index_data_pos) 2316 2317 # Read the topic indexes 2318 topic_indexes = {} 2319 while True: 2320 pos = self.bag._file.tell() 2321 if pos >= total_bytes: 2322 break 2323 2324 topic, index = self.read_topic_index_record() 2325 2326 topic_indexes[topic] = index 2327 2328 # Read the message definition records (one for each topic) 2329 for topic, index in topic_indexes.items(): 2330 self.bag._file.seek(index[0].offset) 2331 2332 connection_info = self.read_message_definition_record() 2333 2334 if connection_info.topic not in self.bag._topic_connections: 2335 self.bag._topic_connections[connection_info.topic] = connection_info.id 2336 self.bag._connections[connection_info.id] = connection_info 2337 2338 self.bag._connection_indexes[connection_info.id] = index 2339 2340 self.bag._connection_indexes_read = True 2341 2342 except Exception as ex: 2343 raise ROSBagUnindexedException()
2344
2345 - def read_file_header_record(self):
2346 self.bag._file_header_pos = self.bag._file.tell() 2347 2348 header = _read_header(self.bag._file, _OP_FILE_HEADER) 2349 2350 self.bag._index_data_pos = _read_uint64_field(header, 'index_pos') 2351 2352 _skip_sized(self.bag._file) # skip the record data, i.e. padding
2353
2354 - def read_topic_index_record(self):
2355 f = self.bag._file 2356 2357 header = _read_header(f, _OP_INDEX_DATA) 2358 2359 index_version = _read_uint32_field(header, 'ver') 2360 topic = _read_str_field (header, 'topic') 2361 count = _read_uint32_field(header, 'count') 2362 2363 if index_version != 0: 2364 raise ROSBagFormatException('expecting index version 0, got %d' % index_version) 2365 2366 _read_uint32(f) # skip the record data size 2367 2368 topic_index = [] 2369 2370 for i in range(count): 2371 time = _read_time (f) 2372 offset = _read_uint64(f) 2373 2374 topic_index.append(_IndexEntry102(time, offset)) 2375 2376 return (topic, topic_index)
2377
2378 - def seek_and_read_message_data_record(self, position, raw, return_connection_header=False):
2379 f = self.bag._file 2380 2381 # Seek to the message position 2382 f.seek(position) 2383 2384 # Skip any MSG_DEF records 2385 while True: 2386 header = _read_header(f) 2387 op = _read_uint8_field(header, 'op') 2388 if op != _OP_MSG_DEF: 2389 break 2390 _skip_sized(f) 2391 2392 # Check that we have a MSG_DATA record 2393 if op != _OP_MSG_DATA: 2394 raise ROSBagFormatException('Expecting OP_MSG_DATA, got %d' % op) 2395 2396 topic = _read_str_field(header, 'topic') 2397 2398 connection_id = self.bag._topic_connections[topic] 2399 info = self.bag._connections[connection_id] 2400 2401 # Get the message type 2402 try: 2403 msg_type = _get_message_type(info) 2404 except KeyError: 2405 raise ROSBagException('Cannot deserialize messages of type [%s]. Message was not preceded in bagfile by definition' % info.datatype) 2406 2407 # Get the timestamp 2408 secs = _read_uint32_field(header, 'sec') 2409 nsecs = _read_uint32_field(header, 'nsec') 2410 t = genpy.Time(secs, nsecs) 2411 2412 # Read the message content 2413 data = _read_record_data(f) 2414 2415 if raw: 2416 msg = info.datatype, data, info.md5sum, position, msg_type 2417 else: 2418 # Deserialize the message 2419 msg = msg_type() 2420 msg.deserialize(data) 2421 2422 if return_connection_header: 2423 return BagMessageWithConnectionHeader(topic, msg, t, header) 2424 else: 2425 return BagMessage(topic, msg, t)
2426
2427 -class _BagReader200(_BagReader):
2428 """ 2429 Support class for reading v2.0 bag files. 2430 """
2431 - def __init__(self, bag):
2432 _BagReader.__init__(self, bag) 2433 2434 self.decompressed_chunk_pos = None 2435 self.decompressed_chunk = None 2436 self.decompressed_chunk_io = None
2437
2438 - def reindex(self):
2439 """ 2440 Generates all bag index information by rereading the chunks. 2441 Assumes the file header has been read. 2442 """ 2443 f = self.bag._file 2444 2445 f.seek(0, os.SEEK_END) 2446 total_bytes = f.tell() 2447 2448 # Read any connection records from after the chunk section. 2449 # This is to workaround a bug in rosbag record --split (fixed in r10390) 2450 # where connection records weren't always being written inside the chunk. 2451 self._read_terminal_connection_records() 2452 2453 # Re-read the file header to get to the start of the first chunk 2454 self.bag._file.seek(self.bag._file_header_pos) 2455 self.read_file_header_record() 2456 2457 trunc_pos = None 2458 2459 while True: 2460 chunk_pos = f.tell() 2461 if chunk_pos >= total_bytes: 2462 break 2463 yield chunk_pos 2464 2465 try: 2466 self._reindex_read_chunk(f, chunk_pos, total_bytes) 2467 except Exception as ex: 2468 break 2469 2470 trunc_pos = f.tell() 2471 2472 if trunc_pos and trunc_pos < total_bytes: 2473 f.truncate(trunc_pos)
2474
2475 - def _reindex_read_chunk(self, f, chunk_pos, total_bytes):
2476 # Read the chunk header 2477 chunk_header = self.read_chunk_header() 2478 2479 # If the chunk header size is 0, then the chunk wasn't correctly terminated - we're done 2480 if chunk_header.compressed_size == 0: 2481 raise ROSBagException('unterminated chunk at %d' % chunk_pos) 2482 2483 if chunk_header.compression == Compression.NONE: 2484 encrypted_chunk = _read(f, chunk_header.compressed_size) 2485 2486 chunk = self.bag._encryptor.decrypt_chunk(encrypted_chunk) 2487 2488 if self.decompressed_chunk_io: 2489 self.decompressed_chunk_io.close() 2490 self.decompressed_chunk_io = StringIO(chunk) 2491 2492 chunk_file = self.decompressed_chunk_io 2493 2494 else: 2495 # Read the chunk, and decrypt/decompress it 2496 encrypted_chunk = _read(f, chunk_header.compressed_size) 2497 2498 compressed_chunk = self.bag._encryptor.decrypt_chunk(encrypted_chunk) 2499 2500 # Decompress it 2501 if chunk_header.compression == Compression.BZ2: 2502 self.decompressed_chunk = bz2.decompress(compressed_chunk) 2503 elif chunk_header.compression == Compression.LZ4 and found_lz4: 2504 self.decompressed_chunk = roslz4.decompress(compressed_chunk) 2505 else: 2506 raise ROSBagException('unsupported compression type: %s' % chunk_header.compression) 2507 2508 if self.decompressed_chunk_io: 2509 self.decompressed_chunk_io.close() 2510 self.decompressed_chunk_io = StringIO(self.decompressed_chunk) 2511 2512 chunk_file = self.decompressed_chunk_io 2513 2514 # Read chunk connection and message records 2515 self.bag._curr_chunk_info = None 2516 2517 offset = chunk_file.tell() 2518 2519 expected_index_length = 0 2520 2521 while offset < chunk_header.uncompressed_size: 2522 op = _peek_next_header_op(chunk_file) 2523 if op == _OP_CONNECTION: 2524 # Connection records in chunk are not encrypted (encrypt: False) 2525 connection_info = self.read_connection_record(chunk_file, False) 2526 2527 if connection_info.id not in self.bag._connections: 2528 self.bag._connections[connection_info.id] = connection_info 2529 if connection_info.id not in self.bag._connection_indexes: 2530 self.bag._connection_indexes[connection_info.id] = [] 2531 2532 elif op == _OP_MSG_DATA: 2533 # Read the connection id and timestamp from the header 2534 header = _read_header(chunk_file) 2535 2536 connection_id = _read_uint32_field(header, 'conn') 2537 t = _read_time_field (header, 'time') 2538 2539 # Update the chunk info with this timestamp 2540 if not self.bag._curr_chunk_info: 2541 self.bag._curr_chunk_info = _ChunkInfo(chunk_pos, t, t) 2542 else: 2543 if t > self.bag._curr_chunk_info.end_time: 2544 self.bag._curr_chunk_info.end_time = t 2545 elif t < self.bag._curr_chunk_info.start_time: 2546 self.bag._curr_chunk_info.start_time = t 2547 if connection_id in self.bag._curr_chunk_info.connection_counts: 2548 self.bag._curr_chunk_info.connection_counts[connection_id] += 1 2549 else: 2550 self.bag._curr_chunk_info.connection_counts[connection_id] = 1 2551 2552 # Skip over the message content 2553 _skip_sized(chunk_file) 2554 2555 # Insert the message entry (in order) into the connection index 2556 if connection_id not in self.bag._connection_indexes: 2557 raise ROSBagException('connection id (id=%d) in chunk at position %d not preceded by connection record' % (connection_id, chunk_pos)) 2558 bisect.insort_right(self.bag._connection_indexes[connection_id], _IndexEntry200(t, chunk_pos, offset)) 2559 2560 expected_index_length += 1 2561 2562 else: 2563 # Unknown record type so skip 2564 _skip_record(chunk_file) 2565 2566 offset = chunk_file.tell() 2567 2568 # Skip over index records, connection records and chunk info records 2569 next_op = _peek_next_header_op(f) 2570 2571 total_index_length = 0 2572 2573 while next_op != _OP_CHUNK: 2574 if next_op == _OP_INDEX_DATA: 2575 # Bug workaround: C Turtle bags (pre-1.1.15) were written with an incorrect data length 2576 _, index = self.read_connection_index_record() 2577 total_index_length += len(index) 2578 else: 2579 _skip_record(f) 2580 2581 if f.tell() >= total_bytes: 2582 if total_index_length != expected_index_length: 2583 raise ROSBagException('index shorter than expected (%d vs %d)' % (total_index_length, expected_index_length)) 2584 break 2585 2586 next_op = _peek_next_header_op(f) 2587 2588 # Chunk was read correctly - store info 2589 self.bag._chunk_headers[chunk_pos] = chunk_header 2590 self.bag._chunks.append(self.bag._curr_chunk_info)
2591
2593 b, f, r = self.bag, self.bag._file, self.bag._reader 2594 2595 # Seek to the first record after FILE_HEADER 2596 f.seek(b._file_header_pos) 2597 r.read_file_header_record() 2598 2599 # Advance to the first CONNECTION 2600 if self._advance_to_next_record(_OP_CONNECTION): 2601 # Read the CONNECTION records 2602 while True: 2603 connection_info = r.read_connection_record(f, False) 2604 2605 b._connections[connection_info.id] = connection_info 2606 b._connection_indexes[connection_info.id] = [] 2607 2608 next_op = _peek_next_header_op(f) 2609 if next_op != _OP_CONNECTION: 2610 break
2611
2612 - def _advance_to_next_record(self, op):
2613 b, f = self.bag, self.bag._file 2614 2615 try: 2616 while True: 2617 next_op = _peek_next_header_op(f) 2618 if next_op == op: 2619 break 2620 2621 if next_op == _OP_INDEX_DATA: 2622 # Workaround the possible invalid data length in INDEX_DATA records 2623 2624 # read_connection_index_record() requires _curr_chunk_info is set 2625 if b._curr_chunk_info is None: 2626 b._curr_chunk_info = _ChunkInfo(0, rospy.Time(0, 1), rospy.Time(0, 1)) 2627 2628 b._reader.read_connection_index_record() 2629 else: 2630 _skip_record(f) 2631 2632 return True 2633 2634 except Exception as ex: 2635 return False
2636
2637 - def start_reading(self):
2638 try: 2639 # Read the file header 2640 self.read_file_header_record() 2641 2642 # Check if the index position has been written, i.e. the bag was closed successfully 2643 if self.bag._index_data_pos == 0: 2644 raise ROSBagUnindexedException() 2645 2646 # Seek to the end of the chunks 2647 self.bag._file.seek(self.bag._index_data_pos) 2648 2649 # Read the connection records 2650 self.bag._connection_indexes = {} 2651 for i in range(self.bag._connection_count): 2652 # Connection records in index data are encrypted (encrypt: True) 2653 connection_info = self.read_connection_record(self.bag._file, True) 2654 self.bag._connections[connection_info.id] = connection_info 2655 self.bag._connection_indexes[connection_info.id] = [] 2656 2657 # Read the chunk info records 2658 self.bag._chunks = [self.read_chunk_info_record() for i in range(self.bag._chunk_count)] 2659 2660 # Read the chunk headers 2661 self.bag._chunk_headers = {} 2662 for chunk_info in self.bag._chunks: 2663 self.bag._file.seek(chunk_info.pos) 2664 self.bag._chunk_headers[chunk_info.pos] = self.read_chunk_header() 2665 2666 if not self.bag._skip_index: 2667 self._read_connection_index_records() 2668 2669 except ROSBagEncryptNotSupportedException: 2670 raise 2671 except ROSBagEncryptException: 2672 raise 2673 except Exception as ex: 2674 raise ROSBagUnindexedException()
2675
2677 for chunk_info in self.bag._chunks: 2678 self.bag._file.seek(chunk_info.pos) 2679 _skip_record(self.bag._file) 2680 2681 self.bag._curr_chunk_info = chunk_info 2682 for i in range(len(chunk_info.connection_counts)): 2683 connection_id, index = self.read_connection_index_record() 2684 self.bag._connection_indexes[connection_id].extend(index) 2685 2686 # Remove any connections with no entries 2687 # This is a workaround for a bug where connection records were being written for 2688 # connections which had no messages in the bag 2689 orphan_connection_ids = [id for id, index in self.bag._connection_indexes.items() if not index] 2690 for id in orphan_connection_ids: 2691 del self.bag._connections[id] 2692 del self.bag._connection_indexes[id] 2693 2694 self.bag._connection_indexes_read = True
2695
2696 - def read_messages(self, topics, start_time, end_time, connection_filter, raw, return_connection_header=False):
2697 connections = self.bag._get_connections(topics, connection_filter) 2698 for entry in self.bag._get_entries(connections, start_time, end_time): 2699 yield self.seek_and_read_message_data_record((entry.chunk_pos, entry.offset), raw, return_connection_header)
2700 2701 ### 2702
2703 - def read_file_header_record(self):
2704 self.bag._file_header_pos = self.bag._file.tell() 2705 2706 header = _read_header(self.bag._file, _OP_FILE_HEADER) 2707 2708 self.bag._index_data_pos = _read_uint64_field(header, 'index_pos') 2709 self.bag._chunk_count = _read_uint32_field(header, 'chunk_count') 2710 self.bag._connection_count = _read_uint32_field(header, 'conn_count') 2711 try: 2712 encryptor = _read_str_field(header, 'encryptor') 2713 self.bag.set_encryptor(encryptor) 2714 self.bag._encryptor.read_fields_from_file_header(header) 2715 except ROSBagFormatException: 2716 # If encryptor header is not found, keep going 2717 pass 2718 2719 _skip_sized(self.bag._file) # skip over the record data, i.e. padding
2720
2721 - def read_connection_record(self, f, encrypt):
2722 if encrypt: 2723 header = self.bag._encryptor.read_encrypted_header(_read_header, f, _OP_CONNECTION) 2724 else: 2725 header = _read_header(f, _OP_CONNECTION) 2726 2727 conn_id = _read_uint32_field(header, 'conn') 2728 topic = _read_str_field (header, 'topic') 2729 2730 if encrypt: 2731 connection_header = self.bag._encryptor.read_encrypted_header(_read_header, f) 2732 else: 2733 connection_header = _read_header(f) 2734 2735 return _ConnectionInfo(conn_id, topic, connection_header)
2736
2737 - def read_chunk_info_record(self):
2738 f = self.bag._file 2739 2740 header = _read_header(f, _OP_CHUNK_INFO) 2741 2742 chunk_info_version = _read_uint32_field(header, 'ver') 2743 2744 if chunk_info_version == 1: 2745 chunk_pos = _read_uint64_field(header, 'chunk_pos') 2746 start_time = _read_time_field (header, 'start_time') 2747 end_time = _read_time_field (header, 'end_time') 2748 connection_count = _read_uint32_field(header, 'count') 2749 2750 chunk_info = _ChunkInfo(chunk_pos, start_time, end_time) 2751 2752 _read_uint32(f) # skip the record data size 2753 2754 for i in range(connection_count): 2755 connection_id = _read_uint32(f) 2756 count = _read_uint32(f) 2757 2758 chunk_info.connection_counts[connection_id] = count 2759 2760 return chunk_info 2761 else: 2762 raise ROSBagFormatException('Unknown chunk info record version: %d' % chunk_info_version)
2763
2764 - def read_chunk_header(self):
2765 header = _read_header(self.bag._file, _OP_CHUNK) 2766 2767 compression = _read_str_field (header, 'compression') 2768 uncompressed_size = _read_uint32_field(header, 'size') 2769 2770 compressed_size = _read_uint32(self.bag._file) # read the record data size 2771 2772 data_pos = self.bag._file.tell() 2773 2774 return _ChunkHeader(compression, compressed_size, uncompressed_size, data_pos)
2775
2777 f = self.bag._file 2778 2779 header = _read_header(f, _OP_INDEX_DATA) 2780 2781 index_version = _read_uint32_field(header, 'ver') 2782 connection_id = _read_uint32_field(header, 'conn') 2783 count = _read_uint32_field(header, 'count') 2784 2785 if index_version != 1: 2786 raise ROSBagFormatException('expecting index version 1, got %d' % index_version) 2787 2788 record_size = _read_uint32(f) # skip the record data size 2789 2790 index = [] 2791 2792 for i in range(count): 2793 time = _read_time (f) 2794 offset = _read_uint32(f) 2795 2796 index.append(_IndexEntry200(time, self.bag._curr_chunk_info.pos, offset)) 2797 2798 return (connection_id, index)
2799
2800 - def seek_and_read_message_data_record(self, position, raw, return_connection_header=False):
2801 chunk_pos, offset = position 2802 2803 chunk_header = self.bag._chunk_headers.get(chunk_pos) 2804 if chunk_header is None: 2805 raise ROSBagException('no chunk at position %d' % chunk_pos) 2806 2807 if chunk_header.compression == Compression.NONE: 2808 if self.decompressed_chunk_pos != chunk_pos: 2809 f = self.bag._file 2810 f.seek(chunk_header.data_pos) 2811 encrypted_chunk = _read(f, chunk_header.compressed_size) 2812 2813 chunk = self.bag._encryptor.decrypt_chunk(encrypted_chunk) 2814 2815 self.decompressed_chunk_pos = chunk_pos 2816 2817 if self.decompressed_chunk_io: 2818 self.decompressed_chunk_io.close() 2819 self.decompressed_chunk_io = StringIO(chunk) 2820 else: 2821 if self.decompressed_chunk_pos != chunk_pos: 2822 # Seek to the chunk data, read and decompress 2823 self.bag._file.seek(chunk_header.data_pos) 2824 encrypted_chunk = _read(self.bag._file, chunk_header.compressed_size) 2825 2826 compressed_chunk = self.bag._encryptor.decrypt_chunk(encrypted_chunk) 2827 2828 if chunk_header.compression == Compression.BZ2: 2829 self.decompressed_chunk = bz2.decompress(compressed_chunk) 2830 elif chunk_header.compression == Compression.LZ4 and found_lz4: 2831 self.decompressed_chunk = roslz4.decompress(compressed_chunk) 2832 else: 2833 raise ROSBagException('unsupported compression type: %s' % chunk_header.compression) 2834 2835 self.decompressed_chunk_pos = chunk_pos 2836 2837 if self.decompressed_chunk_io: 2838 self.decompressed_chunk_io.close() 2839 self.decompressed_chunk_io = StringIO(self.decompressed_chunk) 2840 2841 f = self.decompressed_chunk_io 2842 f.seek(offset) 2843 2844 # Skip any CONNECTION records 2845 while True: 2846 header = _read_header(f) 2847 op = _read_uint8_field(header, 'op') 2848 if op != _OP_CONNECTION: 2849 break 2850 _skip_sized(f) 2851 2852 # Check that we have a MSG_DATA record 2853 if op != _OP_MSG_DATA: 2854 raise ROSBagFormatException('Expecting OP_MSG_DATA, got %d' % op) 2855 2856 connection_id = _read_uint32_field(header, 'conn') 2857 t = _read_time_field (header, 'time') 2858 2859 # Get the message type 2860 connection_info = self.bag._connections[connection_id] 2861 try: 2862 msg_type = _get_message_type(connection_info) 2863 except KeyError: 2864 raise ROSBagException('Cannot deserialize messages of type [%s]. Message was not preceded in bag by definition' % connection_info.datatype) 2865 2866 # Read the message content 2867 data = _read_record_data(f) 2868 2869 # Deserialize the message 2870 if raw: 2871 msg = connection_info.datatype, data, connection_info.md5sum, (chunk_pos, offset), msg_type 2872 else: 2873 msg = msg_type() 2874 msg.deserialize(data) 2875 2876 if return_connection_header: 2877 return BagMessageWithConnectionHeader(connection_info.topic, msg, t, connection_info.header) 2878 else: 2879 return BagMessage(connection_info.topic, msg, t)
2880
2881 -def _time_to_str(secs):
2882 secs_frac = secs - int(secs) 2883 secs_frac_str = ('%.2f' % secs_frac)[1:] 2884 2885 return time.strftime('%b %d %Y %H:%M:%S', time.localtime(secs)) + secs_frac_str
2886
2887 -def _human_readable_size(size):
2888 multiple = 1024.0 2889 for suffix in ['KB', 'MB', 'GB', 'TB', 'PB', 'EB', 'ZB', 'YB']: 2890 size /= multiple 2891 if size < multiple: 2892 return '%.1f %s' % (size, suffix) 2893 2894 return '-'
2895
2896 -def _human_readable_frequency(freq):
2897 multiple = 1000.0 2898 for suffix in ['Hz', 'kHz', 'MHz', 'GHz', 'THz', 'PHz', 'EHz', 'ZHz', 'YHz']: 2899 if freq < multiple: 2900 return '%.1f %s' % (freq, suffix) 2901 freq /= multiple 2902 2903 return '-'
2904
2905 ## See http://code.activestate.com/recipes/511509 2906 -def _mergesort(list_of_lists, key=None):
2907 """ 2908 Perform an N-way merge operation on sorted lists. 2909 2910 @param list_of_lists: (really iterable of iterable) of sorted elements 2911 (either by naturally or by C{key}) 2912 @param key: specify sort key function (like C{sort()}, C{sorted()}) 2913 @param iterfun: function that returns an iterator. 2914 2915 Yields tuples of the form C{(item, iterator)}, where the iterator is the 2916 built-in list iterator or something you pass in, if you pre-generate the 2917 iterators. 2918 2919 This is a stable merge; complexity O(N lg N) 2920 2921 Examples:: 2922 2923 print list(x[0] for x in mergesort([[1,2,3,4], 2924 [2,3.5,3.7,4.5,6,7], 2925 [2.6,3.6,6.6,9]])) 2926 [1, 2, 2, 2.6, 3, 3.5, 3.6, 3.7, 4, 4.5, 6, 6.6, 7, 9] 2927 2928 # note stability 2929 print list(x[0] for x in mergesort([[1,2,3,4], 2930 [2,3.5,3.7,4.5,6,7], 2931 [2.6,3.6,6.6,9]], key=int)) 2932 [1, 2, 2, 2.6, 3, 3.5, 3.6, 3.7, 4, 4.5, 6, 6.6, 7, 9] 2933 2934 print list(x[0] for x in mergesort([[4,3,2,1], 2935 [7,6.5,4,3.7,3.3,1.9], 2936 [9,8.6,7.6,6.6,5.5,4.4,3.3]], 2937 key=lambda x: -x)) 2938 [9, 8.6, 7.6, 7, 6.6, 6.5, 5.5, 4.4, 4, 4, 3.7, 3.3, 3.3, 3, 2, 1.9, 1] 2939 """ 2940 2941 heap = [] 2942 for i, itr in enumerate(iter(pl) for pl in list_of_lists): 2943 try: 2944 item = next(itr) 2945 toadd = (key(item), i, item, itr) if key else (item, i, itr) 2946 heap.append(toadd) 2947 except StopIteration: 2948 pass 2949 heapq.heapify(heap) 2950 2951 if key: 2952 while heap: 2953 _, idx, item, itr = heap[0] 2954 yield item, itr 2955 try: 2956 item = next(itr) 2957 heapq.heapreplace(heap, (key(item), idx, item, itr) ) 2958 except StopIteration: 2959 heapq.heappop(heap) 2960 2961 else: 2962 while heap: 2963 item, idx, itr = heap[0] 2964 yield item, itr 2965 try: 2966 heapq.heapreplace(heap, (next(itr), idx, itr)) 2967 except StopIteration: 2968 heapq.heappop(heap)
2969
2970 -class _CompressorFileFacade(object):
2971 """ 2972 A file facade for sequential compressors (e.g., bz2.BZ2Compressor). 2973 """
2974 - def __init__(self, file, compressor):
2975 self.file = file 2976 self.compressor = compressor 2977 self.compressed_bytes_in = 0
2978
2979 - def write(self, data):
2980 compressed = self.compressor.compress(data) 2981 if len(compressed) > 0: 2982 self.file.write(compressed) 2983 self.compressed_bytes_in += len(data)
2984
2985 - def flush(self):
2986 compressed = self.compressor.flush() 2987 if len(compressed) > 0: 2988 self.file.write(compressed)
2989
2990 -def _median(values):
2991 values_len = len(values) 2992 if values_len == 0: 2993 return float('nan') 2994 2995 sorted_values = sorted(values) 2996 if values_len % 2 == 1: 2997 return sorted_values[int(values_len / 2)] 2998 2999 lower = sorted_values[int(values_len / 2) - 1] 3000 upper = sorted_values[int(values_len / 2)] 3001 3002 return float(lower + upper) / 2
3003