Package rosbag :: Module bag
[frames] | no frames]

Source Code for Module rosbag.bag

   1  # Software License Agreement (BSD License) 
   2  # 
   3  # Copyright (c) 2010, Willow Garage, Inc. 
   4  # All rights reserved. 
   5  # 
   6  # Redistribution and use in source and binary forms, with or without 
   7  # modification, are permitted provided that the following conditions 
   8  # are met: 
   9  # 
  10  #  * Redistributions of source code must retain the above copyright 
  11  #    notice, this list of conditions and the following disclaimer. 
  12  #  * Redistributions in binary form must reproduce the above 
  13  #    copyright notice, this list of conditions and the following 
  14  #    disclaimer in the documentation and/or other materials provided 
  15  #    with the distribution. 
  16  #  * Neither the name of Willow Garage, Inc. nor the names of its 
  17  #    contributors may be used to endorse or promote products derived 
  18  #    from this software without specific prior written permission. 
  19  # 
  20  # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 
  21  # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 
  22  # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS 
  23  # FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE 
  24  # COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, 
  25  # INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, 
  26  # BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; 
  27  # LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER 
  28  # CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 
  29  # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN 
  30  # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 
  31  # POSSIBILITY OF SUCH DAMAGE. 
  32   
  33  """ 
  34  The rosbag Python API. 
  35   
  36  Provides serialization of bag files. 
  37  """ 
  38   
  39  from __future__ import print_function 
  40   
  41  import bisect 
  42  import bz2 
  43  import collections 
  44  import heapq 
  45  import os 
  46  import re 
  47  import struct 
  48  import sys 
  49  import threading 
  50  import time 
  51  import yaml 
  52   
  53  try: 
  54      from cStringIO import StringIO  # Python 2.x 
  55  except ImportError: 
  56      from io import BytesIO as StringIO  # Python 3.x 
  57   
  58  import genmsg 
  59  import genpy 
  60  import genpy.dynamic 
  61  import genpy.message 
  62   
  63  import roslib.names # still needed for roslib.names.canonicalize_name() 
  64  import rospy 
  65  try: 
  66      import roslz4 
  67      found_lz4 = True 
  68  except ImportError: 
  69      rospy.logwarn( 
  70          'Failed to load Python extension for LZ4 support. ' 
  71          'LZ4 compression will not be available.') 
  72      found_lz4 = False 
73 74 -class ROSBagException(Exception):
75 """ 76 Base class for exceptions in rosbag. 77 """
78 - def __init__(self, value):
79 self.value = value
80
81 - def __str__(self):
82 return self.value
83
84 -class ROSBagFormatException(ROSBagException):
85 """ 86 Exceptions for errors relating to the bag file format. 87 """
88 - def __init__(self, value):
89 ROSBagException.__init__(self, value)
90
91 -class ROSBagUnindexedException(ROSBagException):
92 """ 93 Exception for unindexed bags. 94 """
95 - def __init__(self):
96 ROSBagException.__init__(self, 'Unindexed bag')
97
98 -class Compression:
99 """ 100 Allowable compression types 101 """ 102 NONE = 'none' 103 BZ2 = 'bz2' 104 LZ4 = 'lz4'
105 106 BagMessage = collections.namedtuple('BagMessage', 'topic message timestamp')
107 108 -class Bag(object):
109 """ 110 Bag serialize messages to and from a single file on disk using the bag format. 111 """
112 - def __init__(self, f, mode='r', compression=Compression.NONE, chunk_threshold=768 * 1024, allow_unindexed=False, options=None, skip_index=False):
113 """ 114 Open a bag file. The mode can be 'r', 'w', or 'a' for reading (default), 115 writing or appending. The file will be created if it doesn't exist 116 when opened for writing or appending; it will be truncated when opened 117 for writing. Simultaneous reading and writing is allowed when in writing 118 or appending mode. 119 @param f: filename of bag to open or a stream to read from 120 @type f: str or file 121 @param mode: mode, either 'r', 'w', or 'a' 122 @type mode: str 123 @param compression: compression mode, see U{rosbag.Compression} for valid modes 124 @type compression: str 125 @param chunk_threshold: minimum number of uncompressed bytes per chunk 126 @type chunk_threshold: int 127 @param allow_unindexed: if True, allow opening unindexed bags 128 @type allow_unindexed: bool 129 @param options: the bag options (currently: compression and chunk_threshold) 130 @type options: dict 131 @param skip_index: if True, don't read the connection index records on open [2.0+] 132 @type skip_index: bool 133 @raise ValueError: if any argument is invalid 134 @raise ROSBagException: if an error occurs opening file 135 @raise ROSBagFormatException: if bag format is corrupted 136 """ 137 if options is not None: 138 if type(options) is not dict: 139 raise ValueError('options must be of type dict') 140 if 'compression' in options: 141 compression = options['compression'] 142 if 'chunk_threshold' in options: 143 chunk_threshold = options['chunk_threshold'] 144 145 self._file = None 146 self._filename = None 147 self._version = None 148 149 allowed_compressions = [Compression.NONE, Compression.BZ2] 150 if found_lz4: 151 allowed_compressions.append(Compression.LZ4) 152 if compression not in allowed_compressions: 153 raise ValueError('compression must be one of: %s' % ', '.join(allowed_compressions)) 154 self._compression = compression 155 156 if chunk_threshold < 0: 157 raise ValueError('chunk_threshold must be greater than or equal to zero') 158 self._chunk_threshold = chunk_threshold 159 160 self._skip_index = skip_index 161 162 self._reader = None 163 164 self._file_header_pos = None 165 self._index_data_pos = 0 # (1.2+) 166 167 self._clear_index() 168 169 self._buffer = StringIO() 170 171 self._curr_compression = Compression.NONE 172 173 self._open(f, mode, allow_unindexed) 174 175 self._output_file = self._file
176
177 - def __iter__(self):
178 return self.read_messages()
179
180 - def __enter__(self):
181 return self
182
183 - def __exit__(self, exc_type, exc_value, traceback):
184 self.close()
185 186 @property
187 - def options(self):
188 """Get the options.""" 189 return { 'compression' : self._compression, 'chunk_threshold' : self._chunk_threshold }
190 191 @property
192 - def filename(self):
193 """Get the filename.""" 194 return self._filename
195 196 @property
197 - def version(self):
198 """Get the version.""" 199 return self._version
200 201 @property
202 - def mode(self):
203 """Get the mode.""" 204 return self._mode
205 206 @property
207 - def size(self):
208 """Get the size in bytes.""" 209 if not self._file: 210 raise ValueError('I/O operation on closed bag') 211 212 pos = self._file.tell() 213 self._file.seek(0, os.SEEK_END) 214 size = self._file.tell() 215 self._file.seek(pos) 216 return size
217 218 # compression 219
220 - def _get_compression(self):
221 """Get the compression method to use for writing.""" 222 return self._compression
223
224 - def _set_compression(self, compression):
225 """Set the compression method to use for writing.""" 226 allowed_compressions = [Compression.NONE, Compression.BZ2] 227 if found_lz4: 228 allowed_compressions.append(Compression.LZ4) 229 if compression not in allowed_compressions: 230 raise ValueError('compression must be one of: %s' % ', '.join(allowed_compressions)) 231 232 self.flush() 233 self._compression = compression
234 235 compression = property(_get_compression, _set_compression) 236 237 # chunk_threshold 238
239 - def _get_chunk_threshold(self):
240 """Get the chunk threshold to use for writing.""" 241 return self._chunk_threshold
242
243 - def _set_chunk_threshold(self, chunk_threshold):
244 """Set the chunk threshold to use for writing.""" 245 if chunk_threshold < 0: 246 raise ValueError('chunk_threshold must be greater than or equal to zero') 247 248 self.flush() 249 self._chunk_threshold = chunk_threshold
250 251 chunk_threshold = property(_get_chunk_threshold, _set_chunk_threshold) 252
253 - def read_messages(self, topics=None, start_time=None, end_time=None, connection_filter=None, raw=False):
254 """ 255 Read messages from the bag, optionally filtered by topic, timestamp and connection details. 256 @param topics: list of topics or a single topic. if an empty list is given all topics will be read [optional] 257 @type topics: list(str) or str 258 @param start_time: earliest timestamp of message to return [optional] 259 @type start_time: U{genpy.Time} 260 @param end_time: latest timestamp of message to return [optional] 261 @type end_time: U{genpy.Time} 262 @param connection_filter: function to filter connections to include [optional] 263 @type connection_filter: function taking (topic, datatype, md5sum, msg_def, header) and returning bool 264 @param raw: if True, then generate tuples of (datatype, (data, md5sum, position), pytype) 265 @type raw: bool 266 @return: generator of BagMessage(topic, message, timestamp) namedtuples for each message in the bag file 267 @rtype: generator of tuples of (str, U{genpy.Message}, U{genpy.Time}) [not raw] or (str, (str, str, str, tuple, class), U{genpy.Time}) [raw] 268 """ 269 self.flush() 270 271 if topics and type(topics) is str: 272 topics = [topics] 273 274 return self._reader.read_messages(topics, start_time, end_time, connection_filter, raw)
275
276 - def flush(self):
277 """ 278 Write the open chunk to disk so subsequent reads will read all messages. 279 @raise ValueError: if bag is closed 280 """ 281 if not self._file: 282 raise ValueError('I/O operation on closed bag') 283 284 if self._chunk_open: 285 self._stop_writing_chunk()
286
287 - def write(self, topic, msg, t=None, raw=False):
288 """ 289 Write a message to the bag. 290 @param topic: name of topic 291 @type topic: str 292 @param msg: message to add to bag, or tuple (if raw) 293 @type msg: Message or tuple of raw message data 294 @param t: ROS time of message publication, if None specifed, use current time [optional] 295 @type t: U{genpy.Time} 296 @param raw: if True, msg is in raw format, i.e. (msg_type, serialized_bytes, md5sum, pytype) 297 @type raw: bool 298 @raise ValueError: if arguments are invalid or bag is closed 299 """ 300 if not self._file: 301 raise ValueError('I/O operation on closed bag') 302 303 if not topic: 304 raise ValueError('topic is invalid') 305 if not msg: 306 raise ValueError('msg is invalid') 307 308 if t is None: 309 t = genpy.Time.from_sec(time.time()) 310 311 # Seek to end (in case previous operation was a read) 312 self._file.seek(0, os.SEEK_END) 313 314 # Open a chunk, if needed 315 if not self._chunk_open: 316 self._start_writing_chunk(t) 317 318 # Unpack raw 319 if raw: 320 if len(msg) == 5: 321 msg_type, serialized_bytes, md5sum, pos, pytype = msg 322 elif len(msg) == 4: 323 msg_type, serialized_bytes, md5sum, pytype = msg 324 else: 325 raise ValueError('msg must be of length 4 or 5') 326 327 # Write connection record, if necessary (currently using a connection per topic; ignoring message connection header) 328 if topic in self._topic_connections: 329 connection_info = self._topic_connections[topic] 330 conn_id = connection_info.id 331 else: 332 conn_id = len(self._connections) 333 334 if raw: 335 if pytype is None: 336 try: 337 pytype = genpy.message.get_message_class(msg_type) 338 except Exception: 339 pytype = None 340 if pytype is None: 341 raise ROSBagException('cannot locate message class and no message class provided for [%s]' % msg_type) 342 343 if pytype._md5sum != md5sum: 344 print('WARNING: md5sum of loaded type [%s] does not match that specified' % msg_type, file=sys.stderr) 345 #raise ROSBagException('md5sum of loaded type does not match that of data being recorded') 346 347 header = { 'topic' : topic, 'type' : msg_type, 'md5sum' : md5sum, 'message_definition' : pytype._full_text } 348 else: 349 header = { 'topic' : topic, 'type' : msg.__class__._type, 'md5sum' : msg.__class__._md5sum, 'message_definition' : msg._full_text } 350 351 connection_info = _ConnectionInfo(conn_id, topic, header) 352 353 self._write_connection_record(connection_info) 354 355 self._connections[conn_id] = connection_info 356 self._topic_connections[topic] = connection_info 357 358 # Create an index entry 359 index_entry = _IndexEntry200(t, self._curr_chunk_info.pos, self._get_chunk_offset()) 360 361 # Update the indexes and current chunk info 362 if conn_id not in self._curr_chunk_connection_indexes: 363 # This is the first message on this connection in the chunk 364 self._curr_chunk_connection_indexes[conn_id] = [index_entry] 365 self._curr_chunk_info.connection_counts[conn_id] = 1 366 else: 367 curr_chunk_connection_index = self._curr_chunk_connection_indexes[conn_id] 368 if index_entry >= curr_chunk_connection_index[-1]: 369 # Test if we're writing chronologically. Can skip binary search if so. 370 curr_chunk_connection_index.append(index_entry) 371 else: 372 bisect.insort_right(curr_chunk_connection_index, index_entry) 373 374 self._curr_chunk_info.connection_counts[conn_id] += 1 375 376 if conn_id not in self._connection_indexes: 377 self._connection_indexes[conn_id] = [index_entry] 378 else: 379 bisect.insort_right(self._connection_indexes[conn_id], index_entry) 380 381 # Update the chunk start/end times 382 if t > self._curr_chunk_info.end_time: 383 self._curr_chunk_info.end_time = t 384 elif t < self._curr_chunk_info.start_time: 385 self._curr_chunk_info.start_time = t 386 387 if not raw: 388 # Serialize the message to the buffer 389 self._buffer.seek(0) 390 self._buffer.truncate(0) 391 msg.serialize(self._buffer) 392 serialized_bytes = self._buffer.getvalue() 393 394 # Write message data record 395 self._write_message_data_record(conn_id, t, serialized_bytes) 396 397 # Check if we want to stop this chunk 398 chunk_size = self._get_chunk_offset() 399 if chunk_size > self._chunk_threshold: 400 self._stop_writing_chunk()
401
402 - def reindex(self):
403 """ 404 Reindexes the bag file. Yields position of each chunk for progress. 405 """ 406 self._clear_index() 407 return self._reader.reindex()
408
409 - def close(self):
410 """ 411 Close the bag file. Closing an already closed bag does nothing. 412 """ 413 if self._file: 414 if self._mode in 'wa': 415 self._stop_writing() 416 417 self._close_file()
418
419 - def get_compression_info(self):
420 """ 421 Returns information about the compression of the bag 422 @return: generator of CompressionTuple(compression, uncompressed, compressed) describing the 423 type of compression used, size of the uncompressed data in Bytes, size of the compressed data in Bytes. If 424 no compression is used then uncompressed and compressed data will be equal. 425 @rtype: generator of CompressionTuple of (str, int, int) 426 """ 427 428 compression = self.compression 429 uncompressed = 0 430 compressed = 0 431 432 if self._chunk_headers: 433 compression_counts = {} 434 compression_uncompressed = {} 435 compression_compressed = {} 436 437 # the rest of this is determine which compression algorithm is dominant and 438 # to add up the uncompressed and compressed Bytes 439 for chunk_header in self._chunk_headers.values(): 440 if chunk_header.compression not in compression_counts: 441 compression_counts[chunk_header.compression] = 0 442 if chunk_header.compression not in compression_uncompressed: 443 compression_uncompressed[chunk_header.compression] = 0 444 if chunk_header.compression not in compression_compressed: 445 compression_compressed[chunk_header.compression] = 0 446 447 compression_counts[chunk_header.compression] += 1 448 compression_uncompressed[chunk_header.compression] += chunk_header.uncompressed_size 449 uncompressed += chunk_header.uncompressed_size 450 compression_compressed[chunk_header.compression] += chunk_header.compressed_size 451 compressed += chunk_header.compressed_size 452 453 chunk_count = len(self._chunk_headers) 454 455 main_compression_count, main_compression = sorted([(v, k) for k, v in compression_counts.items()], reverse=True)[0] 456 compression = str(main_compression) 457 458 return collections.namedtuple("CompressionTuple", ["compression", 459 "uncompressed", "compressed"])(compression=compression, 460 uncompressed=uncompressed, compressed=compressed)
461
462 - def get_message_count(self, topic_filters=None):
463 """ 464 Returns the number of messages in the bag. Can be filtered by Topic 465 @param topic_filters: One or more topics to filter by 466 @type topic_filters: Could be either a single str or a list of str. 467 @return: The number of messages in the bag, optionally filtered by topic 468 @rtype: int 469 """ 470 471 num_messages = 0 472 473 if topic_filters is not None: 474 info = self.get_type_and_topic_info(topic_filters=topic_filters) 475 for topic in info.topics.values(): 476 num_messages += topic.message_count 477 else: 478 if self._chunks: 479 for c in self._chunks: 480 for counts in c.connection_counts.values(): 481 num_messages += counts 482 else: 483 num_messages = sum([len(index) for index in self._connection_indexes.values()]) 484 485 return num_messages
486
487 - def get_start_time(self):
488 """ 489 Returns the start time of the bag. 490 @return: a timestamp of the start of the bag 491 @rtype: float, timestamp in seconds, includes fractions of a second 492 """ 493 494 if self._chunks: 495 start_stamp = self._chunks[0].start_time.to_sec() 496 else: 497 if not self._connection_indexes: 498 raise ROSBagException('Bag contains no message') 499 start_stamps = [index[0].time.to_sec() for index in self._connection_indexes.values() if index] 500 start_stamp = min(start_stamps) if start_stamps else 0 501 502 return start_stamp
503
504 - def get_end_time(self):
505 """ 506 Returns the end time of the bag. 507 @return: a timestamp of the end of the bag 508 @rtype: float, timestamp in seconds, includes fractions of a second 509 """ 510 511 if self._chunks: 512 end_stamp = self._chunks[-1].end_time.to_sec() 513 else: 514 if not self._connection_indexes: 515 raise ROSBagException('Bag contains no message') 516 end_stamps = [index[-1].time.to_sec() for index in self._connection_indexes.values() if index] 517 end_stamp = max(end_stamps) if end_stamps else 0 518 519 return end_stamp
520
521 - def get_type_and_topic_info(self, topic_filters=None):
522 """ 523 Coallates info about the type and topics in the bag. 524 525 Note, it would be nice to filter by type as well, but there appear to be some limitations in the current architecture 526 that prevent that from working when more than one message type is written on the same topic. 527 528 @param topic_filters: specify one or more topic to filter by. 529 @type topic_filters: either a single str or a list of str. 530 @return: generator of TypesAndTopicsTuple(types{key:type name, val: md5hash}, 531 topics{type: msg type (Ex. "std_msgs/String"), 532 message_count: the number of messages of the particular type, 533 connections: the number of connections, 534 frequency: the data frequency, 535 key: type name, 536 val: md5hash}) describing the types of messages present 537 and information about the topics 538 @rtype: TypesAndTopicsTuple(dict(str, str), 539 TopicTuple(str, int, int, float, str, str)) 540 """ 541 542 datatypes = set() 543 datatype_infos = [] 544 545 for connection in self._connections.values(): 546 if connection.datatype in datatypes: 547 continue 548 549 datatype_infos.append((connection.datatype, connection.md5sum, connection.msg_def)) 550 datatypes.add(connection.datatype) 551 552 topics = [] 553 # load our list of topics and optionally filter 554 if topic_filters is not None: 555 if not isinstance(topic_filters, list): 556 topic_filters = [topic_filters] 557 558 topics = topic_filters 559 else: 560 topics = [c.topic for c in self._get_connections()] 561 562 topics = sorted(set(topics)) 563 564 topic_datatypes = {} 565 topic_conn_counts = {} 566 topic_msg_counts = {} 567 topic_freqs_median = {} 568 569 for topic in topics: 570 connections = list(self._get_connections(topic)) 571 572 if not connections: 573 continue 574 575 topic_datatypes[topic] = connections[0].datatype 576 topic_conn_counts[topic] = len(connections) 577 578 msg_count = 0 579 for connection in connections: 580 for chunk in self._chunks: 581 msg_count += chunk.connection_counts.get(connection.id, 0) 582 583 topic_msg_counts[topic] = msg_count 584 585 if self._connection_indexes_read: 586 stamps = [entry.time.to_sec() for entry in self._get_entries(connections)] 587 if len(stamps) > 1: 588 periods = [s1 - s0 for s1, s0 in zip(stamps[1:], stamps[:-1])] 589 med_period = _median(periods) 590 if med_period > 0.0: 591 topic_freqs_median[topic] = 1.0 / med_period 592 593 # process datatypes 594 types = {} 595 for datatype, md5sum, msg_def in sorted(datatype_infos): 596 types[datatype] = md5sum 597 598 # process topics 599 topics_t = {} 600 TopicTuple = collections.namedtuple("TopicTuple", ["msg_type", "message_count", "connections", "frequency"]) 601 for topic in sorted(topic_datatypes.keys()): 602 topic_msg_count = topic_msg_counts[topic] 603 frequency = topic_freqs_median[topic] if topic in topic_freqs_median else None 604 topics_t[topic] = TopicTuple(msg_type=topic_datatypes[topic], 605 message_count=topic_msg_count, 606 connections=topic_conn_counts[topic], 607 frequency=frequency) 608 609 return collections.namedtuple("TypesAndTopicsTuple", ["msg_types", "topics"])(msg_types=types, topics=topics_t)
610
611 - def __str__(self):
612 rows = [] 613 614 try: 615 if self._filename: 616 rows.append(('path', self._filename)) 617 618 if self._version == 102 and type(self._reader) == _BagReader102_Unindexed: 619 rows.append(('version', '1.2 (unindexed)')) 620 else: 621 rows.append(('version', '%d.%d' % (int(self._version / 100), self._version % 100))) 622 623 if not self._connection_indexes and not self._chunks: 624 rows.append(('size', _human_readable_size(self.size))) 625 else: 626 if self._chunks: 627 start_stamp = self._chunks[ 0].start_time.to_sec() 628 end_stamp = self._chunks[-1].end_time.to_sec() 629 else: 630 start_stamps = [index[0].time.to_sec() for index in self._connection_indexes.values() if index] 631 start_stamp = min(start_stamps) if start_stamps else 0 632 end_stamps = [index[-1].time.to_sec() for index in self._connection_indexes.values() if index] 633 end_stamp = max(end_stamps) if end_stamps else 0 634 635 # Show duration 636 duration = end_stamp - start_stamp 637 dur_secs = duration % 60 638 dur_mins = int(duration / 60) 639 dur_hrs = int(dur_mins / 60) 640 if dur_hrs > 0: 641 dur_mins = dur_mins % 60 642 duration_str = '%dhr %d:%02ds (%ds)' % (dur_hrs, dur_mins, dur_secs, duration) 643 elif dur_mins > 0: 644 duration_str = '%d:%02ds (%ds)' % (dur_mins, dur_secs, duration) 645 else: 646 duration_str = '%.1fs' % duration 647 648 rows.append(('duration', duration_str)) 649 650 # Show start and end times 651 rows.append(('start', '%s (%.2f)' % (_time_to_str(start_stamp), start_stamp))) 652 rows.append(('end', '%s (%.2f)' % (_time_to_str(end_stamp), end_stamp))) 653 654 rows.append(('size', _human_readable_size(self.size))) 655 656 if self._chunks: 657 num_messages = 0 658 for c in self._chunks: 659 for counts in c.connection_counts.values(): 660 num_messages += counts 661 else: 662 num_messages = sum([len(index) for index in self._connection_indexes.values()]) 663 rows.append(('messages', str(num_messages))) 664 665 # Show compression information 666 if len(self._chunk_headers) == 0: 667 rows.append(('compression', 'none')) 668 else: 669 compression_counts = {} 670 compression_uncompressed = {} 671 compression_compressed = {} 672 for chunk_header in self._chunk_headers.values(): 673 if chunk_header.compression not in compression_counts: 674 compression_counts[chunk_header.compression] = 1 675 compression_uncompressed[chunk_header.compression] = chunk_header.uncompressed_size 676 compression_compressed[chunk_header.compression] = chunk_header.compressed_size 677 else: 678 compression_counts[chunk_header.compression] += 1 679 compression_uncompressed[chunk_header.compression] += chunk_header.uncompressed_size 680 compression_compressed[chunk_header.compression] += chunk_header.compressed_size 681 682 chunk_count = len(self._chunk_headers) 683 684 compressions = [] 685 for count, compression in reversed(sorted([(v, k) for k, v in compression_counts.items()])): 686 if compression != Compression.NONE: 687 fraction = (100.0 * compression_compressed[compression]) / compression_uncompressed[compression] 688 compressions.append('%s [%d/%d chunks; %.2f%%]' % (compression, count, chunk_count, fraction)) 689 else: 690 compressions.append('%s [%d/%d chunks]' % (compression, count, chunk_count)) 691 rows.append(('compression', ', '.join(compressions))) 692 693 all_uncompressed = (sum([count for c, count in compression_counts.items() if c != Compression.NONE]) == 0) 694 if not all_uncompressed: 695 total_uncompressed_size = sum((h.uncompressed_size for h in self._chunk_headers.values())) 696 total_compressed_size = sum((h.compressed_size for h in self._chunk_headers.values())) 697 698 total_uncompressed_size_str = _human_readable_size(total_uncompressed_size) 699 total_compressed_size_str = _human_readable_size(total_compressed_size) 700 total_size_str_length = max([len(total_uncompressed_size_str), len(total_compressed_size_str)]) 701 702 if duration > 0: 703 uncompressed_rate_str = _human_readable_size(total_uncompressed_size / duration) 704 compressed_rate_str = _human_readable_size(total_compressed_size / duration) 705 rate_str_length = max([len(uncompressed_rate_str), len(compressed_rate_str)]) 706 707 rows.append(('uncompressed', '%*s @ %*s/s' % 708 (total_size_str_length, total_uncompressed_size_str, rate_str_length, uncompressed_rate_str))) 709 rows.append(('compressed', '%*s @ %*s/s (%.2f%%)' % 710 (total_size_str_length, total_compressed_size_str, rate_str_length, compressed_rate_str, (100.0 * total_compressed_size) / total_uncompressed_size))) 711 else: 712 rows.append(('uncompressed', '%*s' % (total_size_str_length, total_uncompressed_size_str))) 713 rows.append(('compressed', '%*s' % (total_size_str_length, total_compressed_size_str))) 714 715 datatypes = set() 716 datatype_infos = [] 717 for connection in self._connections.values(): 718 if connection.datatype in datatypes: 719 continue 720 datatype_infos.append((connection.datatype, connection.md5sum, connection.msg_def)) 721 datatypes.add(connection.datatype) 722 723 topics = sorted(set([c.topic for c in self._get_connections()])) 724 topic_datatypes = {} 725 topic_conn_counts = {} 726 topic_msg_counts = {} 727 topic_freqs_median = {} 728 for topic in topics: 729 connections = list(self._get_connections(topic)) 730 731 topic_datatypes[topic] = connections[0].datatype 732 topic_conn_counts[topic] = len(connections) 733 734 msg_count = 0 735 for connection in connections: 736 if self._chunks: 737 for chunk in self._chunks: 738 msg_count += chunk.connection_counts.get(connection.id, 0) 739 else: 740 msg_count += len(self._connection_indexes.get(connection.id, [])) 741 topic_msg_counts[topic] = msg_count 742 743 if self._connection_indexes_read: 744 stamps = [entry.time.to_sec() for entry in self._get_entries(connections)] 745 if len(stamps) > 1: 746 periods = [s1 - s0 for s1, s0 in zip(stamps[1:], stamps[:-1])] 747 med_period = _median(periods) 748 if med_period > 0.0: 749 topic_freqs_median[topic] = 1.0 / med_period 750 751 topics = sorted(topic_datatypes.keys()) 752 max_topic_len = max([len(topic) for topic in topics]) 753 max_datatype_len = max([len(datatype) for datatype in datatypes]) 754 max_msg_count_len = max([len('%d' % msg_count) for msg_count in topic_msg_counts.values()]) 755 max_freq_median_len = max([len(_human_readable_frequency(freq)) for freq in topic_freqs_median.values()]) if len(topic_freqs_median) > 0 else 0 756 757 # Show datatypes 758 for i, (datatype, md5sum, msg_def) in enumerate(sorted(datatype_infos)): 759 s = '%-*s [%s]' % (max_datatype_len, datatype, md5sum) 760 if i == 0: 761 rows.append(('types', s)) 762 else: 763 rows.append(('', s)) 764 765 # Show topics 766 for i, topic in enumerate(topics): 767 topic_msg_count = topic_msg_counts[topic] 768 769 s = '%-*s %*d %s' % (max_topic_len, topic, max_msg_count_len, topic_msg_count, 'msgs' if topic_msg_count > 1 else 'msg ') 770 if topic in topic_freqs_median: 771 s += ' @ %*s' % (max_freq_median_len, _human_readable_frequency(topic_freqs_median[topic])) 772 else: 773 s += ' %*s' % (max_freq_median_len, '') 774 775 s += ' : %-*s' % (max_datatype_len, topic_datatypes[topic]) 776 if topic_conn_counts[topic] > 1: 777 s += ' (%d connections)' % topic_conn_counts[topic] 778 779 if i == 0: 780 rows.append(('topics', s)) 781 else: 782 rows.append(('', s)) 783 784 except Exception as ex: 785 raise 786 787 first_column_width = max([len(field) for field, _ in rows]) + 1 788 789 s = '' 790 for (field, value) in rows: 791 if field: 792 s += '%-*s %s\n' % (first_column_width, field + ':', value) 793 else: 794 s += '%-*s %s\n' % (first_column_width, '', value) 795 796 return s.rstrip()
797
798 - def _get_yaml_info(self, key=None):
799 s = '' 800 801 try: 802 if self._filename: 803 s += 'path: %s\n' % self._filename 804 805 if self._version == 102 and type(self._reader) == _BagReader102_Unindexed: 806 s += 'version: 1.2 (unindexed)\n' 807 else: 808 s += 'version: %d.%d\n' % (int(self._version / 100), self._version % 100) 809 810 if not self._connection_indexes and not self._chunks: 811 s += 'size: %d\n' % self.size 812 s += 'indexed: False\n' 813 else: 814 if self._chunks: 815 start_stamp = self._chunks[ 0].start_time.to_sec() 816 end_stamp = self._chunks[-1].end_time.to_sec() 817 else: 818 start_stamps = [index[0].time.to_sec() for index in self._connection_indexes.values() if index] 819 start_stamp = min(start_stamps) if start_stamps else 0 820 end_stamps = [index[-1].time.to_sec() for index in self._connection_indexes.values() if index] 821 end_stamp = max(end_stamps) if end_stamps else 0 822 823 duration = end_stamp - start_stamp 824 s += 'duration: %.6f\n' % duration 825 s += 'start: %.6f\n' % start_stamp 826 s += 'end: %.6f\n' % end_stamp 827 s += 'size: %d\n' % self.size 828 if self._chunks: 829 num_messages = 0 830 for c in self._chunks: 831 for counts in c.connection_counts.values(): 832 num_messages += counts 833 else: 834 num_messages = sum([len(index) for index in self._connection_indexes.values()]) 835 s += 'messages: %d\n' % num_messages 836 s += 'indexed: True\n' 837 838 # Show compression information 839 if len(self._chunk_headers) == 0: 840 s += 'compression: none\n' 841 else: 842 compression_counts = {} 843 compression_uncompressed = {} 844 compression_compressed = {} 845 for chunk_header in self._chunk_headers.values(): 846 if chunk_header.compression not in compression_counts: 847 compression_counts[chunk_header.compression] = 1 848 compression_uncompressed[chunk_header.compression] = chunk_header.uncompressed_size 849 compression_compressed[chunk_header.compression] = chunk_header.compressed_size 850 else: 851 compression_counts[chunk_header.compression] += 1 852 compression_uncompressed[chunk_header.compression] += chunk_header.uncompressed_size 853 compression_compressed[chunk_header.compression] += chunk_header.compressed_size 854 855 chunk_count = len(self._chunk_headers) 856 857 main_compression_count, main_compression = list(reversed(sorted([(v, k) for k, v in compression_counts.items()])))[0] 858 s += 'compression: %s\n' % str(main_compression) 859 860 all_uncompressed = (sum([count for c, count in compression_counts.items() if c != Compression.NONE]) == 0) 861 if not all_uncompressed: 862 s += 'uncompressed: %d\n' % sum((h.uncompressed_size for h in self._chunk_headers.values())) 863 s += 'compressed: %d\n' % sum((h.compressed_size for h in self._chunk_headers.values())) 864 865 datatypes = set() 866 datatype_infos = [] 867 for connection in self._connections.values(): 868 if connection.datatype in datatypes: 869 continue 870 datatype_infos.append((connection.datatype, connection.md5sum, connection.msg_def)) 871 datatypes.add(connection.datatype) 872 873 topics = sorted(set([c.topic for c in self._get_connections()])) 874 topic_datatypes = {} 875 topic_conn_counts = {} 876 topic_msg_counts = {} 877 topic_freqs_median = {} 878 for topic in topics: 879 connections = list(self._get_connections(topic)) 880 881 topic_datatypes[topic] = connections[0].datatype 882 topic_conn_counts[topic] = len(connections) 883 884 msg_count = 0 885 for connection in connections: 886 for chunk in self._chunks: 887 msg_count += chunk.connection_counts.get(connection.id, 0) 888 topic_msg_counts[topic] = msg_count 889 890 if self._connection_indexes_read: 891 stamps = [entry.time.to_sec() for entry in self._get_entries(connections)] 892 if len(stamps) > 1: 893 periods = [s1 - s0 for s1, s0 in zip(stamps[1:], stamps[:-1])] 894 med_period = _median(periods) 895 if med_period > 0.0: 896 topic_freqs_median[topic] = 1.0 / med_period 897 898 topics = sorted(topic_datatypes.keys()) 899 max_topic_len = max([len(topic) for topic in topics]) 900 max_datatype_len = max([len(datatype) for datatype in datatypes]) 901 max_msg_count_len = max([len('%d' % msg_count) for msg_count in topic_msg_counts.values()]) 902 max_freq_median_len = max([len(_human_readable_frequency(freq)) for freq in topic_freqs_median.values()]) if len(topic_freqs_median) > 0 else 0 903 904 # Show datatypes 905 s += 'types:\n' 906 for i, (datatype, md5sum, msg_def) in enumerate(sorted(datatype_infos)): 907 s += ' - type: %s\n' % datatype 908 s += ' md5: %s\n' % md5sum 909 910 # Show topics 911 s += 'topics:\n' 912 for i, topic in enumerate(topics): 913 topic_msg_count = topic_msg_counts[topic] 914 915 s += ' - topic: %s\n' % topic 916 s += ' type: %s\n' % topic_datatypes[topic] 917 s += ' messages: %d\n' % topic_msg_count 918 919 if topic_conn_counts[topic] > 1: 920 s += ' connections: %d\n' % topic_conn_counts[topic] 921 922 if topic in topic_freqs_median: 923 s += ' frequency: %.4f\n' % topic_freqs_median[topic] 924 925 if not key: 926 return s 927 928 class DictObject(object): 929 def __init__(self, d): 930 for a, b in d.items(): 931 if isinstance(b, (list, tuple)): 932 setattr(self, a, [DictObject(x) if isinstance(x, dict) else x for x in b]) 933 else: 934 setattr(self, a, DictObject(b) if isinstance(b, dict) else b)
935 936 obj = DictObject(yaml.load(s)) 937 try: 938 val = eval('obj.' + key) 939 except Exception as ex: 940 print('Error getting key "%s"' % key, file=sys.stderr) 941 return None 942 943 def print_yaml(val, indent=0): 944 indent_str = ' ' * indent 945 946 if type(val) is list: 947 s = '' 948 for item in val: 949 s += '%s- %s\n' % (indent_str, print_yaml(item, indent + 1)) 950 return s 951 elif type(val) is DictObject: 952 s = '' 953 for i, (k, v) in enumerate(val.__dict__.items()): 954 if i != 0: 955 s += indent_str 956 s += '%s: %s' % (k, str(v)) 957 if i < len(val.__dict__) - 1: 958 s += '\n' 959 return s 960 else: 961 return indent_str + str(val) 962 963 return print_yaml(val) 964 965 except Exception as ex: 966 raise 967 968 ### Internal API ### 969 970 @property
971 - def _has_compressed_chunks(self):
972 if not self._chunk_headers: 973 return False 974 975 return any((h.compression != Compression.NONE for h in self._chunk_headers.values()))
976 977 @property
978 - def _uncompressed_size(self):
979 if not self._chunk_headers: 980 return self.size 981 982 return sum((h.uncompressed_size for h in self._chunk_headers.values()))
983
984 - def _read_message(self, position, raw=False):
985 """ 986 Read the message from the given position in the file. 987 """ 988 self.flush() 989 return self._reader.seek_and_read_message_data_record(position, raw)
990 991 # Index accessing 992
993 - def _get_connections(self, topics=None, connection_filter=None):
994 """ 995 Yield the connections, optionally filtering by topic and/or connection information. 996 """ 997 if topics: 998 if type(topics) is str: 999 topics = set([roslib.names.canonicalize_name(topics)]) 1000 else: 1001 topics = set([roslib.names.canonicalize_name(t) for t in topics]) 1002 1003 for c in self._connections.values(): 1004 if topics and c.topic not in topics and roslib.names.canonicalize_name(c.topic) not in topics: 1005 continue 1006 if connection_filter and not connection_filter(c.topic, c.datatype, c.md5sum, c.msg_def, c.header): 1007 continue 1008 yield c
1009
1010 - def _get_entries(self, connections=None, start_time=None, end_time=None):
1011 """ 1012 Yield index entries on the given connections in the given time range. 1013 """ 1014 for entry, _ in _mergesort(self._get_indexes(connections), key=lambda entry: entry.time): 1015 if start_time and entry.time < start_time: 1016 continue 1017 if end_time and entry.time > end_time: 1018 return 1019 yield entry
1020
1021 - def _get_entries_reverse(self, connections=None, start_time=None, end_time=None):
1022 """ 1023 Yield index entries on the given connections in the given time range in reverse order. 1024 """ 1025 for entry, _ in _mergesort((reversed(index) for index in self._get_indexes(connections)), key=lambda entry: -entry.time.to_sec()): 1026 if end_time and entry.time > end_time: 1027 continue 1028 if start_time and entry.time < start_time: 1029 return 1030 yield entry
1031
1032 - def _get_entry(self, t, connections=None):
1033 """ 1034 Return the first index entry on/before the given time on the given connections 1035 """ 1036 indexes = self._get_indexes(connections) 1037 1038 entry = _IndexEntry(t) 1039 1040 first_entry = None 1041 1042 for index in indexes: 1043 i = bisect.bisect_right(index, entry) - 1 1044 if i >= 0: 1045 index_entry = index[i] 1046 if first_entry is None or index_entry > first_entry: 1047 first_entry = index_entry 1048 1049 return first_entry
1050
1051 - def _get_entry_after(self, t, connections=None):
1052 """ 1053 Return the first index entry after the given time on the given connections 1054 """ 1055 indexes = self._get_indexes(connections) 1056 1057 entry = _IndexEntry(t) 1058 1059 first_entry = None 1060 1061 for index in indexes: 1062 i = bisect.bisect_right(index, entry) 1063 if i <= len(index) - 1: 1064 index_entry = index[i] 1065 if first_entry is None or index_entry < first_entry: 1066 first_entry = index_entry 1067 1068 return first_entry
1069
1070 - def _get_indexes(self, connections):
1071 """ 1072 Get the indexes for the given connections. 1073 """ 1074 if not self._connection_indexes_read: 1075 self._reader._read_connection_index_records() 1076 1077 if connections is None: 1078 return self._connection_indexes.values() 1079 else: 1080 return (self._connection_indexes[c.id] for c in connections)
1081 1082 ### Implementation ### 1083
1084 - def _clear_index(self):
1085 self._connection_indexes_read = False 1086 self._connection_indexes = {} # id -> IndexEntry[] (1.2+) 1087 1088 self._topic_connections = {} # topic -> connection_id 1089 self._connections = {} # id -> ConnectionInfo 1090 self._connection_count = 0 # (2.0) 1091 self._chunk_count = 0 # (2.0) 1092 self._chunks = [] # ChunkInfo[] (2.0) 1093 self._chunk_headers = {} # chunk_pos -> ChunkHeader (2.0) 1094 1095 self._chunk_open = False 1096 self._curr_chunk_info = None 1097 self._curr_chunk_data_pos = None 1098 self._curr_chunk_connection_indexes = {}
1099
1100 - def _open(self, f, mode, allow_unindexed):
1101 if not f: 1102 raise ValueError('filename (or stream) is invalid') 1103 1104 try: 1105 if mode == 'r': self._open_read(f, allow_unindexed) 1106 elif mode == 'w': self._open_write(f) 1107 elif mode == 'a': self._open_append(f, allow_unindexed) 1108 else: 1109 raise ValueError('mode "%s" is invalid' % mode) 1110 if 'b' not in self._file.mode and not isinstance('', bytes): 1111 raise ROSBagException('In Python 3 the bag file must be opened in binary mode') 1112 except struct.error: 1113 raise ROSBagFormatException('error with bag')
1114
1115 - def _is_file(self, f):
1116 try: 1117 return isinstance(f, file) # Python 2 1118 except NameError: 1119 import io 1120 return isinstance(f, io.IOBase) # Python 3...this will return false in Python 2 always
1121
1122 - def _open_read(self, f, allow_unindexed):
1123 if self._is_file(f): 1124 self._file = f 1125 self._filename = None 1126 else: 1127 self._file = open(f, 'rb') 1128 self._filename = f 1129 1130 self._mode = 'r' 1131 1132 try: 1133 self._version = self._read_version() 1134 except: 1135 self._version = None 1136 self._close_file() 1137 raise 1138 1139 try: 1140 self._create_reader() 1141 self._reader.start_reading() 1142 except ROSBagUnindexedException as ex: 1143 if not allow_unindexed: 1144 self._close_file() 1145 raise 1146 except: 1147 self._close_file() 1148 raise
1149
1150 - def _open_write(self, f):
1151 if self._is_file(f): 1152 self._file = f 1153 self._filename = None 1154 else: 1155 self._file = open(f, 'w+b') 1156 self._filename = f 1157 1158 self._mode = 'w' 1159 1160 self._version = 200 1161 1162 try: 1163 self._create_reader() 1164 self._start_writing() 1165 except: 1166 self._close_file() 1167 raise
1168
1169 - def _open_append(self, f, allow_unindexed):
1170 if self._is_file(f): 1171 self._file = f 1172 self._filename = None 1173 else: 1174 try: 1175 # Test if the file already exists 1176 open(f, 'r').close() 1177 1178 # File exists: open in read with update mode 1179 self._file = open(f, 'r+b') 1180 except IOError: 1181 # File doesn't exist: open in write mode 1182 self._file = open(f, 'w+b') 1183 1184 self._filename = f 1185 1186 self._mode = 'a' 1187 1188 try: 1189 self._version = self._read_version() 1190 except: 1191 self._version = None 1192 self._close_file() 1193 raise 1194 1195 if self._version != 200: 1196 self._file.close() 1197 raise ROSBagException('bag version %d is unsupported for appending' % self._version) 1198 1199 try: 1200 self._start_appending() 1201 except ROSBagUnindexedException: 1202 if not allow_unindexed: 1203 self._close_file() 1204 raise 1205 except: 1206 self._close_file() 1207 raise
1208
1209 - def _close_file(self):
1210 self._file.close() 1211 self._file = None
1212
1213 - def _create_reader(self):
1214 """ 1215 @raise ROSBagException: if the bag version is unsupported 1216 """ 1217 major_version, minor_version = int(self._version / 100), self._version % 100 1218 if major_version == 2: 1219 self._reader = _BagReader200(self) 1220 elif major_version == 1: 1221 if minor_version == 1: 1222 raise ROSBagException('unsupported bag version %d. Please convert bag to newer format.' % self._version) 1223 else: 1224 # Get the op code of the first record. If it's FILE_HEADER, then we have an indexed 1.2 bag. 1225 first_record_pos = self._file.tell() 1226 header = _read_header(self._file) 1227 op = _read_uint8_field(header, 'op') 1228 self._file.seek(first_record_pos) 1229 1230 if op == _OP_FILE_HEADER: 1231 self._reader = _BagReader102_Indexed(self) 1232 else: 1233 self._reader = _BagReader102_Unindexed(self) 1234 else: 1235 raise ROSBagException('unknown bag version %d' % self._version)
1236
1237 - def _read_version(self):
1238 """ 1239 @raise ROSBagException: if the file is empty, or the version line can't be parsed 1240 """ 1241 version_line = self._file.readline().rstrip().decode() 1242 if len(version_line) == 0: 1243 raise ROSBagException('empty file') 1244 1245 matches = re.match("#ROS(.*) V(\d).(\d)", version_line) 1246 if matches is None or len(matches.groups()) != 3: 1247 raise ROSBagException('This does not appear to be a bag file') 1248 1249 version_type, major_version_str, minor_version_str = matches.groups() 1250 1251 version = int(major_version_str) * 100 + int(minor_version_str) 1252 1253 return version
1254
1255 - def _start_writing(self):
1256 version = _VERSION + '\n' 1257 version = version.encode() 1258 self._file.write(version) 1259 self._file_header_pos = self._file.tell() 1260 self._write_file_header_record(0, 0, 0) 1261
1262 - def _stop_writing(self):
1263 # Write the open chunk (if any) to file 1264 self.flush() 1265 1266 # Remember this location as the start of the index 1267 self._index_data_pos = self._file.tell() 1268 1269 # Write connection infos 1270 for connection_info in self._connections.values(): 1271 self._write_connection_record(connection_info) 1272 1273 # Write chunk infos 1274 for chunk_info in self._chunks: 1275 self._write_chunk_info_record(chunk_info) 1276 1277 # Re-write the file header 1278 self._file.seek(self._file_header_pos) 1279 self._write_file_header_record(self._index_data_pos, len(self._connections), len(self._chunks))
1280
1281 - def _start_appending(self):
1282 self._file_header_pos = self._file.tell() 1283 1284 self._create_reader() 1285 self._reader.start_reading() 1286 1287 # Truncate the file to chop off the index 1288 self._file.truncate(self._index_data_pos) 1289 self._reader.index_data_pos = 0 1290 1291 # Rewrite the file header, clearing the index position (so we know if the index is invalid) 1292 self._file.seek(self._file_header_pos); 1293 self._write_file_header_record(0, 0, 0) 1294 1295 # Seek to the end of the file 1296 self._file.seek(0, os.SEEK_END)
1297
1298 - def _start_writing_chunk(self, t):
1299 self._curr_chunk_info = _ChunkInfo(self._file.tell(), t, t) 1300 self._write_chunk_header(_ChunkHeader(self._compression, 0, 0)) 1301 self._curr_chunk_data_pos = self._file.tell() 1302 self._set_compression_mode(self._compression) 1303 self._chunk_open = True
1304
1305 - def _get_chunk_offset(self):
1306 if self._compression == Compression.NONE: 1307 return self._file.tell() - self._curr_chunk_data_pos 1308 else: 1309 return self._output_file.compressed_bytes_in
1310
1311 - def _stop_writing_chunk(self):
1312 # Add this chunk to the index 1313 self._chunks.append(self._curr_chunk_info) 1314 1315 # Get the uncompressed and compressed sizes 1316 uncompressed_size = self._get_chunk_offset() 1317 self._set_compression_mode(Compression.NONE) 1318 compressed_size = self._file.tell() - self._curr_chunk_data_pos 1319 1320 # Rewrite the chunk header with the size of the chunk (remembering current offset) 1321 end_of_chunk_pos = self._file.tell() 1322 self._file.seek(self._curr_chunk_info.pos) 1323 chunk_header = _ChunkHeader(self._compression, compressed_size, uncompressed_size, self._curr_chunk_data_pos) 1324 self._write_chunk_header(chunk_header) 1325 self._chunk_headers[self._curr_chunk_info.pos] = chunk_header 1326 1327 # Write out the connection indexes and clear them 1328 self._file.seek(end_of_chunk_pos) 1329 for connection_id, entries in self._curr_chunk_connection_indexes.items(): 1330 self._write_connection_index_record(connection_id, entries) 1331 self._curr_chunk_connection_indexes.clear() 1332 1333 # Flag that we're starting a new chunk 1334 self._chunk_open = False
1335
1336 - def _set_compression_mode(self, compression):
1337 # Flush the compressor, if needed 1338 if self._curr_compression != Compression.NONE: 1339 self._output_file.flush() 1340 1341 # Create the compressor 1342 if compression == Compression.BZ2: 1343 self._output_file = _CompressorFileFacade(self._file, bz2.BZ2Compressor()) 1344 elif compression == Compression.LZ4 and found_lz4: 1345 self._output_file = _CompressorFileFacade(self._file, roslz4.LZ4Compressor()) 1346 elif compression == Compression.NONE: 1347 self._output_file = self._file 1348 else: 1349 raise ROSBagException('unsupported compression type: %s' % compression) 1350 1351 self._curr_compression = compression
1352
1353 - def _write_file_header_record(self, index_pos, connection_count, chunk_count):
1354 header = { 1355 'op': _pack_uint8(_OP_FILE_HEADER), 1356 'index_pos': _pack_uint64(index_pos), 1357 'conn_count': _pack_uint32(connection_count), 1358 'chunk_count': _pack_uint32(chunk_count) 1359 } 1360 _write_record(self._file, header, padded_size=_FILE_HEADER_LENGTH)
1361
1362 - def _write_connection_record(self, connection_info):
1363 header = { 1364 'op': _pack_uint8(_OP_CONNECTION), 1365 'topic': connection_info.topic, 1366 'conn': _pack_uint32(connection_info.id) 1367 } 1368 1369 _write_header(self._output_file, header) 1370 1371 _write_header(self._output_file, connection_info.header)
1372
1373 - def _write_message_data_record(self, connection_id, t, serialized_bytes):
1374 header = { 1375 'op': _pack_uint8(_OP_MSG_DATA), 1376 'conn': _pack_uint32(connection_id), 1377 'time': _pack_time(t) 1378 } 1379 _write_record(self._output_file, header, serialized_bytes)
1380
1381 - def _write_chunk_header(self, chunk_header):
1382 header = { 1383 'op': _pack_uint8(_OP_CHUNK), 1384 'compression': chunk_header.compression, 1385 'size': _pack_uint32(chunk_header.uncompressed_size) 1386 } 1387 _write_header(self._file, header) 1388 1389 self._file.write(_pack_uint32(chunk_header.compressed_size))
1390
1391 - def _write_connection_index_record(self, connection_id, entries):
1392 header = { 1393 'op': _pack_uint8(_OP_INDEX_DATA), 1394 'conn': _pack_uint32(connection_id), 1395 'ver': _pack_uint32(_INDEX_VERSION), 1396 'count': _pack_uint32(len(entries)) 1397 } 1398 1399 buffer = self._buffer 1400 buffer.seek(0) 1401 buffer.truncate(0) 1402 for entry in entries: 1403 buffer.write(_pack_time (entry.time)) 1404 buffer.write(_pack_uint32(entry.offset)) 1405 1406 _write_record(self._file, header, buffer.getvalue()) 1407
1408 - def _write_chunk_info_record(self, chunk_info):
1409 header = { 1410 'op': _pack_uint8 (_OP_CHUNK_INFO), 1411 'ver': _pack_uint32(_CHUNK_INDEX_VERSION), 1412 'chunk_pos': _pack_uint64(chunk_info.pos), 1413 'start_time': _pack_time(chunk_info.start_time), 1414 'end_time': _pack_time(chunk_info.end_time), 1415 'count': _pack_uint32(len(chunk_info.connection_counts)) 1416 } 1417 1418 buffer = self._buffer 1419 buffer.seek(0) 1420 buffer.truncate(0) 1421 for connection_id, count in chunk_info.connection_counts.items(): 1422 buffer.write(_pack_uint32(connection_id)) 1423 buffer.write(_pack_uint32(count)) 1424 1425 _write_record(self._file, header, buffer.getvalue())
1426 1427 ### Implementation ### 1428 1429 _message_types = {} # md5sum -> type 1430 1431 _OP_MSG_DEF = 0x01 1432 _OP_MSG_DATA = 0x02 1433 _OP_FILE_HEADER = 0x03 1434 _OP_INDEX_DATA = 0x04 1435 _OP_CHUNK = 0x05 1436 _OP_CHUNK_INFO = 0x06 1437 _OP_CONNECTION = 0x07 1438 1439 _OP_CODES = { 1440 _OP_MSG_DEF: 'MSG_DEF', 1441 _OP_MSG_DATA: 'MSG_DATA', 1442 _OP_FILE_HEADER: 'FILE_HEADER', 1443 _OP_INDEX_DATA: 'INDEX_DATA', 1444 _OP_CHUNK: 'CHUNK', 1445 _OP_CHUNK_INFO: 'CHUNK_INFO', 1446 _OP_CONNECTION: 'CONNECTION' 1447 } 1448 1449 _VERSION = '#ROSBAG V2.0' 1450 _FILE_HEADER_LENGTH = 4096 1451 _INDEX_VERSION = 1 1452 _CHUNK_INDEX_VERSION = 1
1453 1454 -class _ConnectionInfo(object):
1455 - def __init__(self, id, topic, header):
1456 try: 1457 datatype = _read_str_field(header, 'type') 1458 md5sum = _read_str_field(header, 'md5sum') 1459 msg_def = _read_str_field(header, 'message_definition') 1460 except KeyError as ex: 1461 raise ROSBagFormatException('connection header field %s not found' % str(ex)) 1462 1463 self.id = id 1464 self.topic = topic 1465 self.datatype = datatype 1466 self.md5sum = md5sum 1467 self.msg_def = msg_def 1468 self.header = header
1469
1470 - def __str__(self):
1471 return '%d on %s: %s' % (self.id, self.topic, str(self.header))
1472
1473 -class _ChunkInfo(object):
1474 - def __init__(self, pos, start_time, end_time):
1475 self.pos = pos 1476 self.start_time = start_time 1477 self.end_time = end_time 1478 1479 self.connection_counts = {}
1480
1481 - def __str__(self):
1482 s = 'chunk_pos: %d\n' % self.pos 1483 s += 'start_time: %s\n' % str(self.start_time) 1484 s += 'end_time: %s\n' % str(self.end_time) 1485 s += 'connections: %d\n' % len(self.connection_counts) 1486 s += '\n'.join([' - [%4d] %d' % (connection_id, count) for connection_id, count in self.connection_counts.items()]) 1487 return s
1488
1489 -class _ChunkHeader(object):
1490 - def __init__(self, compression, compressed_size, uncompressed_size, data_pos=0):
1491 self.compression = compression 1492 self.compressed_size = compressed_size 1493 self.uncompressed_size = uncompressed_size 1494 self.data_pos = data_pos
1495
1496 - def __str__(self):
1497 if self.uncompressed_size > 0: 1498 ratio = 100 * (float(self.compressed_size) / self.uncompressed_size) 1499 return 'compression: %s, size: %d, uncompressed: %d (%.2f%%)' % (self.compression, self.compressed_size, self.uncompressed_size, ratio) 1500 else: 1501 return 'compression: %s, size: %d, uncompressed: %d' % (self.compression, self.compressed_size, self.uncompressed_size)
1502
1503 -class ComparableMixin(object):
1504 __slots__ = [] 1505
1506 - def _compare(self, other, method):
1507 try: 1508 return method(self._cmpkey(), other._cmpkey()) 1509 except (AttributeError, TypeError): 1510 # _cmpkey not implemented or return different type 1511 # so can not compare with other 1512 return NotImplemented
1513
1514 - def __lt__(self, other):
1515 return self._compare(other, lambda s, o: s < o)
1516
1517 - def __le__(self, other):
1518 return self._compare(other, lambda s, o: s <= o)
1519
1520 - def __eq__(self, other):
1521 return self._compare(other, lambda s, o: s == o)
1522
1523 - def __ge__(self, other):
1524 return self._compare(other, lambda s, o: s >= o)
1525
1526 - def __gt__(self, other):
1527 return self._compare(other, lambda s, o: s > o)
1528
1529 - def __ne__(self, other):
1530 return self._compare(other, lambda s, o: s != o)
1531
1532 -class _IndexEntry(ComparableMixin):
1533 __slots__ = ['time'] 1534
1535 - def __init__(self, time):
1536 self.time = time
1537
1538 - def _cmpkey(self):
1539 return self.time
1540
1541 -class _IndexEntry102(_IndexEntry):
1542 __slots__ = ['offset'] 1543
1544 - def __init__(self, time, offset):
1545 self.time = time 1546 self.offset = offset
1547 1548 @property
1549 - def position(self):
1550 return self.offset
1551
1552 - def __str__(self):
1553 return '%d.%d: %d' % (self.time.secs, self.time.nsecs, self.offset)
1554
1555 -class _IndexEntry200(_IndexEntry):
1556 __slots__ = ['chunk_pos', 'offset'] 1557
1558 - def __init__(self, time, chunk_pos, offset):
1559 self.time = time 1560 self.chunk_pos = chunk_pos 1561 self.offset = offset
1562 1563 @property
1564 - def position(self):
1565 return (self.chunk_pos, self.offset)
1566
1567 - def __str__(self):
1568 return '%d.%d: %d+%d' % (self.time.secs, self.time.nsecs, self.chunk_pos, self.offset)
1569
1570 -def _get_message_type(info):
1571 message_type = _message_types.get(info.md5sum) 1572 if message_type is None: 1573 try: 1574 message_type = genpy.dynamic.generate_dynamic(info.datatype, info.msg_def)[info.datatype] 1575 if (message_type._md5sum != info.md5sum): 1576 print('WARNING: For type [%s] stored md5sum [%s] does not match message definition [%s].\n Try: "rosrun rosbag fix_msg_defs.py old_bag new_bag."'%(info.datatype, info.md5sum, message_type._md5sum), file=sys.stderr) 1577 except genmsg.InvalidMsgSpec: 1578 message_type = genpy.dynamic.generate_dynamic(info.datatype, "")[info.datatype] 1579 print('WARNING: For type [%s] stored md5sum [%s] has invalid message definition."'%(info.datatype, info.md5sum), file=sys.stderr) 1580 except genmsg.MsgGenerationException as ex: 1581 raise ROSBagException('Error generating datatype %s: %s' % (info.datatype, str(ex))) 1582 1583 _message_types[info.md5sum] = message_type 1584 1585 return message_type
1586
1587 -def _read_uint8 (f): return _unpack_uint8 (f.read(1))
1588 -def _read_uint32(f): return _unpack_uint32(f.read(4))
1589 -def _read_uint64(f): return _unpack_uint64(f.read(8))
1590 -def _read_time (f): return _unpack_time (f.read(8))
1591
1592 -def _decode_str(v): return v if type(v) is str else v.decode()
1593 -def _unpack_uint8(v): return struct.unpack('<B', v)[0]
1594 -def _unpack_uint32(v): return struct.unpack('<L', v)[0]
1595 -def _unpack_uint64(v): return struct.unpack('<Q', v)[0]
1596 -def _unpack_time(v): return rospy.Time(*struct.unpack('<LL', v))
1597
1598 -def _pack_uint8(v): return struct.pack('<B', v)
1599 -def _pack_uint32(v): return struct.pack('<L', v)
1600 -def _pack_uint64(v): return struct.pack('<Q', v)
1601 -def _pack_time(v): return _pack_uint32(v.secs) + _pack_uint32(v.nsecs)
1602
1603 -def _read(f, size):
1604 data = f.read(size) 1605 if len(data) != size: 1606 raise ROSBagException('expecting %d bytes, read %d' % (size, len(data))) 1607 return data
1608
1609 -def _skip_record(f):
1610 _skip_sized(f) # skip header 1611 _skip_sized(f) # skip data
1612
1613 -def _skip_sized(f):
1614 size = _read_uint32(f) 1615 f.seek(size, os.SEEK_CUR)
1616
1617 -def _read_sized(f):
1618 try: 1619 size = _read_uint32(f) 1620 except struct.error as ex: 1621 raise ROSBagFormatException('error unpacking uint32: %s' % str(ex)) 1622 return _read(f, size)
1623
1624 -def _write_sized(f, v):
1625 if not isinstance(v, bytes): 1626 v = v.encode() 1627 f.write(_pack_uint32(len(v))) 1628 f.write(v)
1629
1630 -def _read_field(header, field, unpack_fn):
1631 if field not in header: 1632 raise ROSBagFormatException('expected "%s" field in record' % field) 1633 1634 try: 1635 value = unpack_fn(header[field]) 1636 except Exception as ex: 1637 raise ROSBagFormatException('error reading field "%s": %s' % (field, str(ex))) 1638 1639 return value
1640
1641 -def _read_str_field (header, field): return _read_field(header, field, _decode_str)
1642 -def _read_uint8_field (header, field): return _read_field(header, field, _unpack_uint8)
1643 -def _read_uint32_field(header, field): return _read_field(header, field, _unpack_uint32)
1644 -def _read_uint64_field(header, field): return _read_field(header, field, _unpack_uint64)
1645 -def _read_time_field (header, field): return _read_field(header, field, _unpack_time)
1646
1647 -def _write_record(f, header, data='', padded_size=None):
1648 header_str = _write_header(f, header) 1649 1650 if padded_size is not None: 1651 header_len = len(header_str) 1652 if header_len < padded_size: 1653 data = ' ' * (padded_size - header_len) 1654 else: 1655 data = '' 1656 1657 _write_sized(f, data)
1658
1659 -def _write_header(f, header):
1660 header_str = b'' 1661 equal = b'=' 1662 for k, v in header.items(): 1663 if not isinstance(k, bytes): 1664 k = k.encode() 1665 if not isinstance(v, bytes): 1666 v = v.encode() 1667 header_str += _pack_uint32(len(k) + 1 + len(v)) + k + equal + v 1668 _write_sized(f, header_str) 1669 return header_str
1670
1671 -def _read_header(f, req_op=None):
1672 bag_pos = f.tell() 1673 1674 # Read header 1675 try: 1676 header = _read_sized(f) 1677 except ROSBagException as ex: 1678 raise ROSBagFormatException('Error reading header: %s' % str(ex)) 1679 1680 # Parse header into a dict 1681 header_dict = {} 1682 while header != b'': 1683 # Read size 1684 if len(header) < 4: 1685 raise ROSBagFormatException('Error reading header field') 1686 (size,) = struct.unpack('<L', header[:4]) # @todo reindex: catch struct.error 1687 header = header[4:] 1688 1689 # Read bytes 1690 if len(header) < size: 1691 raise ROSBagFormatException('Error reading header field: expected %d bytes, read %d' % (size, len(header))) 1692 (name, sep, value) = header[:size].partition(b'=') 1693 if sep == b'': 1694 raise ROSBagFormatException('Error reading header field') 1695 1696 name = name.decode() 1697 header_dict[name] = value # @todo reindex: raise exception on empty name 1698 1699 header = header[size:] 1700 1701 # Check the op code of the header, if supplied 1702 if req_op is not None: 1703 op = _read_uint8_field(header_dict, 'op') 1704 if req_op != op: 1705 raise ROSBagFormatException('Expected op code: %s, got %s' % (_OP_CODES[req_op], _OP_CODES[op])) 1706 1707 return header_dict
1708
1709 -def _peek_next_header_op(f):
1710 pos = f.tell() 1711 header = _read_header(f) 1712 op = _read_uint8_field(header, 'op') 1713 f.seek(pos) 1714 return op
1715
1716 -def _read_record_data(f):
1717 try: 1718 record_data = _read_sized(f) 1719 except ROSBagException as ex: 1720 raise ROSBagFormatException('Error reading record data: %s' % str(ex)) 1721 1722 return record_data
1723
1724 -class _BagReader(object):
1725 - def __init__(self, bag):
1726 self.bag = bag
1727
1728 - def start_reading(self):
1729 raise NotImplementedError()
1730
1731 - def read_messages(self, topics, start_time, end_time, connection_filter, raw):
1732 raise NotImplementedError()
1733
1734 - def reindex(self):
1735 raise NotImplementedError()
1736
1737 -class _BagReader102_Unindexed(_BagReader):
1738 """ 1739 Support class for reading unindexed v1.2 bag files. 1740 """
1741 - def __init__(self, bag):
1742 _BagReader.__init__(self, bag)
1743
1744 - def start_reading(self):
1745 self.bag._file_header_pos = self.bag._file.tell()
1746
1747 - def reindex(self):
1748 """Generates all bag index information by rereading the message records.""" 1749 f = self.bag._file 1750 1751 total_bytes = self.bag.size 1752 1753 # Re-read the file header to get to the start of the first message 1754 self.bag._file.seek(self.bag._file_header_pos) 1755 1756 offset = f.tell() 1757 1758 # Read message definition and data records 1759 while offset < total_bytes: 1760 yield offset 1761 1762 op = _peek_next_header_op(f) 1763 1764 if op == _OP_MSG_DEF: 1765 connection_info = self.read_message_definition_record() 1766 1767 if connection_info.topic not in self.bag._topic_connections: 1768 self.bag._topic_connections[connection_info.topic] = connection_info.id 1769 self.bag._connections[connection_info.id] = connection_info 1770 self.bag._connection_indexes[connection_info.id] = [] 1771 1772 elif op == _OP_MSG_DATA: 1773 # Read the topic and timestamp from the header 1774 header = _read_header(f) 1775 1776 topic = _read_str_field(header, 'topic') 1777 secs = _read_uint32_field(header, 'sec') 1778 nsecs = _read_uint32_field(header, 'nsec') 1779 t = genpy.Time(secs, nsecs) 1780 1781 if topic not in self.bag._topic_connections: 1782 datatype = _read_str_field(header, 'type') 1783 self._create_connection_info_for_datatype(topic, datatype) 1784 1785 connection_id = self.bag._topic_connections[topic] 1786 info = self.bag._connections[connection_id] 1787 1788 # Skip over the message content 1789 _skip_sized(f) 1790 1791 # Insert the message entry (in order) into the connection index 1792 bisect.insort_right(self.bag._connection_indexes[connection_id], _IndexEntry102(t, offset)) 1793 1794 offset = f.tell()
1795
1796 - def read_messages(self, topics, start_time, end_time, topic_filter, raw):
1797 f = self.bag._file 1798 1799 f.seek(self.bag._file_header_pos) 1800 1801 while True: 1802 # Read MSG_DEF records 1803 while True: 1804 position = f.tell() 1805 1806 try: 1807 header = _read_header(f) 1808 except Exception: 1809 return 1810 1811 op = _read_uint8_field(header, 'op') 1812 if op != _OP_MSG_DEF: 1813 break 1814 1815 connection_info = self.read_message_definition_record(header) 1816 1817 if connection_info.topic not in self.bag._topic_connections: 1818 self.bag._topic_connections[connection_info.topic] = connection_info.id 1819 1820 self.bag._connections[connection_info.id] = connection_info 1821 1822 # Check that we have a MSG_DATA record 1823 if op != _OP_MSG_DATA: 1824 raise ROSBagFormatException('Expecting OP_MSG_DATA, got %d' % op) 1825 1826 topic = _read_str_field(header, 'topic') 1827 1828 if topic not in self.bag._topic_connections: 1829 datatype = _read_str_field(header, 'type') 1830 self._create_connection_info_for_datatype(topic, datatype) 1831 1832 connection_id = self.bag._topic_connections[topic] 1833 info = self.bag._connections[connection_id] 1834 1835 # Get the message type 1836 try: 1837 msg_type = _get_message_type(info) 1838 except KeyError: 1839 raise ROSBagException('Cannot deserialize messages of type [%s]. Message was not preceeded in bagfile by definition' % info.datatype) 1840 1841 # Get the timestamp 1842 secs = _read_uint32_field(header, 'sec') 1843 nsecs = _read_uint32_field(header, 'nsec') 1844 t = genpy.Time(secs, nsecs) 1845 1846 # Read the message content 1847 data = _read_record_data(f) 1848 1849 if raw: 1850 msg = (info.datatype, data, info.md5sum, position, msg_type) 1851 else: 1852 # Deserialize the message 1853 msg = msg_type() 1854 msg.deserialize(data) 1855 1856 yield BagMessage(topic, msg, t) 1857 1858 self.bag._connection_indexes_read = True
1859
1860 - def _create_connection_info_for_datatype(self, topic, datatype):
1861 for c in self.bag._connections.values(): 1862 if c.datatype == datatype: 1863 connection_id = len(self.bag._connections) 1864 connection_header = { 'topic' : topic, 'type' : c.header['type'], 'md5sum' : c.header['md5sum'], 'message_definition' : c.header['message_definition'] } 1865 connection_info = _ConnectionInfo(connection_id, topic, connection_header) 1866 1867 self.bag._topic_connections[topic] = connection_id 1868 self.bag._connections[connection_id] = connection_info 1869 self.bag._connection_indexes[connection_id] = [] 1870 return 1871 1872 raise ROSBagFormatException('Topic %s of datatype %s not preceded by message definition' % (topic, datatype))
1873
1874 - def read_message_definition_record(self, header=None):
1875 if not header: 1876 header = _read_header(self.bag._file, _OP_MSG_DEF) 1877 1878 topic = _read_str_field(header, 'topic') 1879 datatype = _read_str_field(header, 'type') 1880 md5sum = _read_str_field(header, 'md5') 1881 msg_def = _read_str_field(header, 'def') 1882 1883 _skip_sized(self.bag._file) # skip the record data 1884 1885 connection_header = { 'topic' : topic, 'type' : datatype, 'md5sum' : md5sum, 'message_definition' : msg_def } 1886 1887 id = len(self.bag._connections) 1888 1889 return _ConnectionInfo(id, topic, connection_header)
1890
1891 -class _BagReader102_Indexed(_BagReader102_Unindexed):
1892 """ 1893 Support class for reading indexed v1.2 bag files. 1894 """
1895 - def __init__(self, bag):
1896 _BagReader.__init__(self, bag)
1897
1898 - def read_messages(self, topics, start_time, end_time, connection_filter, raw):
1899 connections = self.bag._get_connections(topics, connection_filter) 1900 for entry in self.bag._get_entries(connections, start_time, end_time): 1901 yield self.seek_and_read_message_data_record(entry.offset, raw)
1902
1903 - def reindex(self):
1904 """Generates all bag index information by rereading the message records.""" 1905 f = self.bag._file 1906 1907 total_bytes = self.bag.size 1908 1909 # Re-read the file header to get to the start of the first message 1910 self.bag._file.seek(self.bag._file_header_pos) 1911 self.read_file_header_record() 1912 1913 offset = f.tell() 1914 1915 # Read message definition and data records 1916 while offset < total_bytes: 1917 yield offset 1918 1919 op = _peek_next_header_op(f) 1920 1921 if op == _OP_MSG_DEF: 1922 connection_info = self.read_message_definition_record() 1923 1924 if connection_info.topic not in self.bag._topic_connections: 1925 self.bag._topic_connections[connection_info.topic] = connection_info.id 1926 self.bag._connections[connection_info.id] = connection_info 1927 self.bag._connection_indexes[connection_info.id] = [] 1928 1929 elif op == _OP_MSG_DATA: 1930 # Read the topic and timestamp from the header 1931 header = _read_header(f) 1932 1933 topic = _read_str_field(header, 'topic') 1934 secs = _read_uint32_field(header, 'sec') 1935 nsecs = _read_uint32_field(header, 'nsec') 1936 t = genpy.Time(secs, nsecs) 1937 1938 if topic not in self.bag._topic_connections: 1939 datatype = _read_str_field(header, 'type') 1940 self._create_connection_info_for_datatype(topic, datatype) 1941 1942 connection_id = self.bag._topic_connections[topic] 1943 info = self.bag._connections[connection_id] 1944 1945 # Skip over the message content 1946 _skip_sized(f) 1947 1948 # Insert the message entry (in order) into the connection index 1949 bisect.insort_right(self.bag._connection_indexes[connection_id], _IndexEntry102(t, offset)) 1950 1951 elif op == _OP_INDEX_DATA: 1952 _skip_record(f) 1953 1954 offset = f.tell()
1955
1956 - def start_reading(self):
1957 try: 1958 # Read the file header 1959 self.read_file_header_record() 1960 1961 total_bytes = self.bag.size 1962 1963 # Check if the index position has been written, i.e. the bag was closed successfully 1964 if self.bag._index_data_pos == 0: 1965 raise ROSBagUnindexedException() 1966 1967 # Seek to the beginning of the topic index records 1968 self.bag._file.seek(self.bag._index_data_pos) 1969 1970 # Read the topic indexes 1971 topic_indexes = {} 1972 while True: 1973 pos = self.bag._file.tell() 1974 if pos >= total_bytes: 1975 break 1976 1977 topic, index = self.read_topic_index_record() 1978 1979 topic_indexes[topic] = index 1980 1981 # Read the message definition records (one for each topic) 1982 for topic, index in topic_indexes.items(): 1983 self.bag._file.seek(index[0].offset) 1984 1985 connection_info = self.read_message_definition_record() 1986 1987 if connection_info.topic not in self.bag._topic_connections: 1988 self.bag._topic_connections[connection_info.topic] = connection_info.id 1989 self.bag._connections[connection_info.id] = connection_info 1990 1991 self.bag._connection_indexes[connection_info.id] = index 1992 1993 self.bag._connection_indexes_read = True 1994 1995 except Exception as ex: 1996 raise ROSBagUnindexedException()
1997
1998 - def read_file_header_record(self):
1999 self.bag._file_header_pos = self.bag._file.tell() 2000 2001 header = _read_header(self.bag._file, _OP_FILE_HEADER) 2002 2003 self.bag._index_data_pos = _read_uint64_field(header, 'index_pos') 2004 2005 _skip_sized(self.bag._file) # skip the record data, i.e. padding
2006
2007 - def read_topic_index_record(self):
2008 f = self.bag._file 2009 2010 header = _read_header(f, _OP_INDEX_DATA) 2011 2012 index_version = _read_uint32_field(header, 'ver') 2013 topic = _read_str_field (header, 'topic') 2014 count = _read_uint32_field(header, 'count') 2015 2016 if index_version != 0: 2017 raise ROSBagFormatException('expecting index version 0, got %d' % index_version) 2018 2019 _read_uint32(f) # skip the record data size 2020 2021 topic_index = [] 2022 2023 for i in range(count): 2024 time = _read_time (f) 2025 offset = _read_uint64(f) 2026 2027 topic_index.append(_IndexEntry102(time, offset)) 2028 2029 return (topic, topic_index)
2030
2031 - def seek_and_read_message_data_record(self, position, raw):
2032 f = self.bag._file 2033 2034 # Seek to the message position 2035 f.seek(position) 2036 2037 # Skip any MSG_DEF records 2038 while True: 2039 header = _read_header(f) 2040 op = _read_uint8_field(header, 'op') 2041 if op != _OP_MSG_DEF: 2042 break 2043 _skip_sized(f) 2044 2045 # Check that we have a MSG_DATA record 2046 if op != _OP_MSG_DATA: 2047 raise ROSBagFormatException('Expecting OP_MSG_DATA, got %d' % op) 2048 2049 topic = _read_str_field(header, 'topic') 2050 2051 connection_id = self.bag._topic_connections[topic] 2052 info = self.bag._connections[connection_id] 2053 2054 # Get the message type 2055 try: 2056 msg_type = _get_message_type(info) 2057 except KeyError: 2058 raise ROSBagException('Cannot deserialize messages of type [%s]. Message was not preceeded in bagfile by definition' % info.datatype) 2059 2060 # Get the timestamp 2061 secs = _read_uint32_field(header, 'sec') 2062 nsecs = _read_uint32_field(header, 'nsec') 2063 t = genpy.Time(secs, nsecs) 2064 2065 # Read the message content 2066 data = _read_record_data(f) 2067 2068 if raw: 2069 msg = info.datatype, data, info.md5sum, position, msg_type 2070 else: 2071 # Deserialize the message 2072 msg = msg_type() 2073 msg.deserialize(data) 2074 2075 return BagMessage(topic, msg, t)
2076
2077 -class _BagReader200(_BagReader):
2078 """ 2079 Support class for reading v2.0 bag files. 2080 """
2081 - def __init__(self, bag):
2082 _BagReader.__init__(self, bag) 2083 2084 self.decompressed_chunk_pos = None 2085 self.decompressed_chunk = None 2086 self.decompressed_chunk_io = None
2087
2088 - def reindex(self):
2089 """ 2090 Generates all bag index information by rereading the chunks. 2091 Assumes the file header has been read. 2092 """ 2093 f = self.bag._file 2094 2095 f.seek(0, os.SEEK_END) 2096 total_bytes = f.tell() 2097 2098 # Read any connection records from after the chunk section. 2099 # This is to workaround a bug in rosbag record --split (fixed in r10390) 2100 # where connection records weren't always being written inside the chunk. 2101 self._read_terminal_connection_records() 2102 2103 # Re-read the file header to get to the start of the first chunk 2104 self.bag._file.seek(self.bag._file_header_pos) 2105 self.read_file_header_record() 2106 2107 trunc_pos = None 2108 2109 while True: 2110 chunk_pos = f.tell() 2111 if chunk_pos >= total_bytes: 2112 break 2113 2114 yield chunk_pos 2115 2116 try: 2117 self._reindex_read_chunk(f, chunk_pos, total_bytes) 2118 except Exception as ex: 2119 break 2120 2121 trunc_pos = f.tell() 2122 2123 if trunc_pos and trunc_pos < total_bytes: 2124 f.truncate(trunc_pos)
2125
2126 - def _reindex_read_chunk(self, f, chunk_pos, total_bytes):
2127 # Read the chunk header 2128 chunk_header = self.read_chunk_header() 2129 2130 # If the chunk header size is 0, then the chunk wasn't correctly terminated - we're done 2131 if chunk_header.compressed_size == 0: 2132 raise ROSBagException('unterminated chunk at %d' % chunk_pos) 2133 2134 if chunk_header.compression == Compression.NONE: 2135 chunk_file = f 2136 else: 2137 # Read the compressed chunk 2138 compressed_chunk = _read(f, chunk_header.compressed_size) 2139 2140 # Decompress it 2141 if chunk_header.compression == Compression.BZ2: 2142 self.decompressed_chunk = bz2.decompress(compressed_chunk) 2143 elif chunk_header.compression == Compression.LZ4 and found_lz4: 2144 self.decompressed_chunk = roslz4.decompress(compressed_chunk) 2145 else: 2146 raise ROSBagException('unsupported compression type: %s' % chunk_header.compression) 2147 2148 if self.decompressed_chunk_io: 2149 self.decompressed_chunk_io.close() 2150 self.decompressed_chunk_io = StringIO(self.decompressed_chunk) 2151 2152 chunk_file = self.decompressed_chunk_io 2153 2154 # Read chunk connection and message records 2155 self.bag._curr_chunk_info = None 2156 2157 if chunk_header.compression == Compression.NONE: 2158 offset = chunk_file.tell() - chunk_pos 2159 else: 2160 offset = chunk_file.tell() 2161 2162 expected_index_length = 0 2163 2164 while offset < chunk_header.uncompressed_size: 2165 op = _peek_next_header_op(chunk_file) 2166 2167 if op == _OP_CONNECTION: 2168 connection_info = self.read_connection_record(chunk_file) 2169 2170 if connection_info.id not in self.bag._connections: 2171 self.bag._connections[connection_info.id] = connection_info 2172 if connection_info.id not in self.bag._connection_indexes: 2173 self.bag._connection_indexes[connection_info.id] = [] 2174 2175 elif op == _OP_MSG_DATA: 2176 # Read the connection id and timestamp from the header 2177 header = _read_header(chunk_file) 2178 2179 connection_id = _read_uint32_field(header, 'conn') 2180 t = _read_time_field (header, 'time') 2181 2182 # Update the chunk info with this timestamp 2183 if not self.bag._curr_chunk_info: 2184 self.bag._curr_chunk_info = _ChunkInfo(chunk_pos, t, t) 2185 else: 2186 if t > self.bag._curr_chunk_info.end_time: 2187 self.bag._curr_chunk_info.end_time = t 2188 elif t < self.bag._curr_chunk_info.start_time: 2189 self.bag._curr_chunk_info.start_time = t 2190 if connection_id in self.bag._curr_chunk_info.connection_counts: 2191 self.bag._curr_chunk_info.connection_counts[connection_id] += 1 2192 else: 2193 self.bag._curr_chunk_info.connection_counts[connection_id] = 1 2194 2195 # Skip over the message content 2196 _skip_sized(chunk_file) 2197 2198 # Insert the message entry (in order) into the connection index 2199 if connection_id not in self.bag._connection_indexes: 2200 raise ROSBagException('connection id (id=%d) in chunk at position %d not preceded by connection record' % (connection_id, chunk_pos)) 2201 bisect.insort_right(self.bag._connection_indexes[connection_id], _IndexEntry200(t, chunk_pos, offset)) 2202 2203 expected_index_length += 1 2204 2205 else: 2206 # Unknown record type so skip 2207 _skip_record(chunk_file) 2208 2209 if chunk_header.compression == Compression.NONE: 2210 offset = chunk_file.tell() - chunk_pos 2211 else: 2212 offset = chunk_file.tell() 2213 2214 # Skip over index records, connection records and chunk info records 2215 next_op = _peek_next_header_op(f) 2216 2217 total_index_length = 0 2218 2219 while next_op != _OP_CHUNK: 2220 if next_op == _OP_INDEX_DATA: 2221 # Bug workaround: C Turtle bags (pre-1.1.15) were written with an incorrect data length 2222 _, index = self.read_connection_index_record() 2223 total_index_length += len(index) 2224 else: 2225 _skip_record(f) 2226 2227 if f.tell() >= total_bytes: 2228 if total_index_length != expected_index_length: 2229 raise ROSBagException('index shorter than expected (%d vs %d)' % (total_index_length, expected_index_length)) 2230 break 2231 2232 next_op = _peek_next_header_op(f) 2233 2234 # Chunk was read correctly - store info 2235 self.bag._chunk_headers[chunk_pos] = chunk_header 2236 self.bag._chunks.append(self.bag._curr_chunk_info)
2237
2239 b, f, r = self.bag, self.bag._file, self.bag._reader 2240 2241 # Seek to the first record after FILE_HEADER 2242 f.seek(b._file_header_pos) 2243 r.read_file_header_record() 2244 2245 # Advance to the first CONNECTION 2246 if self._advance_to_next_record(_OP_CONNECTION): 2247 # Read the CONNECTION records 2248 while True: 2249 connection_info = r.read_connection_record(f) 2250 2251 b._connections[connection_info.id] = connection_info 2252 b._connection_indexes[connection_info.id] = [] 2253 2254 next_op = _peek_next_header_op(f) 2255 if next_op != _OP_CONNECTION: 2256 break
2257
2258 - def _advance_to_next_record(self, op):
2259 b, f = self.bag, self.bag._file 2260 2261 try: 2262 while True: 2263 next_op = _peek_next_header_op(f) 2264 if next_op == op: 2265 break 2266 2267 if next_op == _OP_INDEX_DATA: 2268 # Workaround the possible invalid data length in INDEX_DATA records 2269 2270 # read_connection_index_record() requires _curr_chunk_info is set 2271 if b._curr_chunk_info is None: 2272 b._curr_chunk_info = _ChunkInfo(0, rospy.Time(0, 1), rospy.Time(0, 1)) 2273 2274 b._reader.read_connection_index_record() 2275 else: 2276 _skip_record(f) 2277 2278 return True 2279 2280 except Exception as ex: 2281 return False
2282
2283 - def start_reading(self):
2284 try: 2285 # Read the file header 2286 self.read_file_header_record() 2287 2288 # Check if the index position has been written, i.e. the bag was closed successfully 2289 if self.bag._index_data_pos == 0: 2290 raise ROSBagUnindexedException() 2291 2292 # Seek to the end of the chunks 2293 self.bag._file.seek(self.bag._index_data_pos) 2294 2295 # Read the connection records 2296 self.bag._connection_indexes = {} 2297 for i in range(self.bag._connection_count): 2298 connection_info = self.read_connection_record(self.bag._file) 2299 self.bag._connections[connection_info.id] = connection_info 2300 self.bag._connection_indexes[connection_info.id] = [] 2301 2302 # Read the chunk info records 2303 self.bag._chunks = [self.read_chunk_info_record() for i in range(self.bag._chunk_count)] 2304 2305 # Read the chunk headers 2306 self.bag._chunk_headers = {} 2307 for chunk_info in self.bag._chunks: 2308 self.bag._file.seek(chunk_info.pos) 2309 self.bag._chunk_headers[chunk_info.pos] = self.read_chunk_header() 2310 2311 if not self.bag._skip_index: 2312 self._read_connection_index_records() 2313 2314 except Exception as ex: 2315 raise ROSBagUnindexedException()
2316
2318 for chunk_info in self.bag._chunks: 2319 self.bag._file.seek(chunk_info.pos) 2320 _skip_record(self.bag._file) 2321 2322 self.bag._curr_chunk_info = chunk_info 2323 for i in range(len(chunk_info.connection_counts)): 2324 connection_id, index = self.read_connection_index_record() 2325 self.bag._connection_indexes[connection_id].extend(index) 2326 2327 # Remove any connections with no entries 2328 # This is a workaround for a bug where connection records were being written for 2329 # connections which had no messages in the bag 2330 orphan_connection_ids = [id for id, index in self.bag._connection_indexes.items() if not index] 2331 for id in orphan_connection_ids: 2332 del self.bag._connections[id] 2333 del self.bag._connection_indexes[id] 2334 2335 self.bag._connection_indexes_read = True
2336
2337 - def read_messages(self, topics, start_time, end_time, connection_filter, raw):
2338 connections = self.bag._get_connections(topics, connection_filter) 2339 for entry in self.bag._get_entries(connections, start_time, end_time): 2340 yield self.seek_and_read_message_data_record((entry.chunk_pos, entry.offset), raw)
2341 2342 ### 2343
2344 - def read_file_header_record(self):
2345 self.bag._file_header_pos = self.bag._file.tell() 2346 2347 header = _read_header(self.bag._file, _OP_FILE_HEADER) 2348 2349 self.bag._index_data_pos = _read_uint64_field(header, 'index_pos') 2350 self.bag._chunk_count = _read_uint32_field(header, 'chunk_count') 2351 self.bag._connection_count = _read_uint32_field(header, 'conn_count') 2352 2353 _skip_sized(self.bag._file) # skip over the record data, i.e. padding
2354
2355 - def read_connection_record(self, f):
2356 header = _read_header(f, _OP_CONNECTION) 2357 2358 conn_id = _read_uint32_field(header, 'conn') 2359 topic = _read_str_field (header, 'topic') 2360 2361 connection_header = _read_header(f) 2362 2363 return _ConnectionInfo(conn_id, topic, connection_header)
2364
2365 - def read_chunk_info_record(self):
2366 f = self.bag._file 2367 2368 header = _read_header(f, _OP_CHUNK_INFO) 2369 2370 chunk_info_version = _read_uint32_field(header, 'ver') 2371 2372 if chunk_info_version == 1: 2373 chunk_pos = _read_uint64_field(header, 'chunk_pos') 2374 start_time = _read_time_field (header, 'start_time') 2375 end_time = _read_time_field (header, 'end_time') 2376 connection_count = _read_uint32_field(header, 'count') 2377 2378 chunk_info = _ChunkInfo(chunk_pos, start_time, end_time) 2379 2380 _read_uint32(f) # skip the record data size 2381 2382 for i in range(connection_count): 2383 connection_id = _read_uint32(f) 2384 count = _read_uint32(f) 2385 2386 chunk_info.connection_counts[connection_id] = count 2387 2388 return chunk_info 2389 else: 2390 raise ROSBagFormatException('Unknown chunk info record version: %d' % chunk_info_version)
2391
2392 - def read_chunk_header(self):
2393 header = _read_header(self.bag._file, _OP_CHUNK) 2394 2395 compression = _read_str_field (header, 'compression') 2396 uncompressed_size = _read_uint32_field(header, 'size') 2397 2398 compressed_size = _read_uint32(self.bag._file) # read the record data size 2399 2400 data_pos = self.bag._file.tell() 2401 2402 return _ChunkHeader(compression, compressed_size, uncompressed_size, data_pos)
2403
2405 f = self.bag._file 2406 2407 header = _read_header(f, _OP_INDEX_DATA) 2408 2409 index_version = _read_uint32_field(header, 'ver') 2410 connection_id = _read_uint32_field(header, 'conn') 2411 count = _read_uint32_field(header, 'count') 2412 2413 if index_version != 1: 2414 raise ROSBagFormatException('expecting index version 1, got %d' % index_version) 2415 2416 record_size = _read_uint32(f) # skip the record data size 2417 2418 index = [] 2419 2420 for i in range(count): 2421 time = _read_time (f) 2422 offset = _read_uint32(f) 2423 2424 index.append(_IndexEntry200(time, self.bag._curr_chunk_info.pos, offset)) 2425 2426 return (connection_id, index)
2427
2428 - def seek_and_read_message_data_record(self, position, raw):
2429 chunk_pos, offset = position 2430 2431 chunk_header = self.bag._chunk_headers.get(chunk_pos) 2432 if chunk_header is None: 2433 raise ROSBagException('no chunk at position %d' % chunk_pos) 2434 2435 if chunk_header.compression == Compression.NONE: 2436 f = self.bag._file 2437 f.seek(chunk_header.data_pos + offset) 2438 else: 2439 if self.decompressed_chunk_pos != chunk_pos: 2440 # Seek to the chunk data, read and decompress 2441 self.bag._file.seek(chunk_header.data_pos) 2442 compressed_chunk = _read(self.bag._file, chunk_header.compressed_size) 2443 2444 if chunk_header.compression == Compression.BZ2: 2445 self.decompressed_chunk = bz2.decompress(compressed_chunk) 2446 elif chunk_header.compression == Compression.LZ4 and found_lz4: 2447 self.decompressed_chunk = roslz4.decompress(compressed_chunk) 2448 else: 2449 raise ROSBagException('unsupported compression type: %s' % chunk_header.compression) 2450 2451 self.decompressed_chunk_pos = chunk_pos 2452 2453 if self.decompressed_chunk_io: 2454 self.decompressed_chunk_io.close() 2455 self.decompressed_chunk_io = StringIO(self.decompressed_chunk) 2456 2457 f = self.decompressed_chunk_io 2458 f.seek(offset) 2459 2460 # Skip any CONNECTION records 2461 while True: 2462 header = _read_header(f) 2463 op = _read_uint8_field(header, 'op') 2464 if op != _OP_CONNECTION: 2465 break 2466 _skip_sized(f) 2467 2468 # Check that we have a MSG_DATA record 2469 if op != _OP_MSG_DATA: 2470 raise ROSBagFormatException('Expecting OP_MSG_DATA, got %d' % op) 2471 2472 connection_id = _read_uint32_field(header, 'conn') 2473 t = _read_time_field (header, 'time') 2474 2475 # Get the message type 2476 connection_info = self.bag._connections[connection_id] 2477 try: 2478 msg_type = _get_message_type(connection_info) 2479 except KeyError: 2480 raise ROSBagException('Cannot deserialize messages of type [%s]. Message was not preceded in bag by definition' % connection_info.datatype) 2481 2482 # Read the message content 2483 data = _read_record_data(f) 2484 2485 # Deserialize the message 2486 if raw: 2487 msg = connection_info.datatype, data, connection_info.md5sum, (chunk_pos, offset), msg_type 2488 else: 2489 msg = msg_type() 2490 msg.deserialize(data) 2491 2492 return BagMessage(connection_info.topic, msg, t)
2493
2494 -def _time_to_str(secs):
2495 secs_frac = secs - int(secs) 2496 secs_frac_str = ('%.2f' % secs_frac)[1:] 2497 2498 return time.strftime('%b %d %Y %H:%M:%S', time.localtime(secs)) + secs_frac_str
2499
2500 -def _human_readable_size(size):
2501 multiple = 1024.0 2502 for suffix in ['KB', 'MB', 'GB', 'TB', 'PB', 'EB', 'ZB', 'YB']: 2503 size /= multiple 2504 if size < multiple: 2505 return '%.1f %s' % (size, suffix) 2506 2507 return '-'
2508
2509 -def _human_readable_frequency(freq):
2510 multiple = 1000.0 2511 for suffix in ['Hz', 'kHz', 'MHz', 'GHz', 'THz', 'PHz', 'EHz', 'ZHz', 'YHz']: 2512 if freq < multiple: 2513 return '%.1f %s' % (freq, suffix) 2514 freq /= multiple 2515 2516 return '-'
2517
2518 ## See http://code.activestate.com/recipes/511509 2519 -def _mergesort(list_of_lists, key=None):
2520 """ 2521 Perform an N-way merge operation on sorted lists. 2522 2523 @param list_of_lists: (really iterable of iterable) of sorted elements 2524 (either by naturally or by C{key}) 2525 @param key: specify sort key function (like C{sort()}, C{sorted()}) 2526 @param iterfun: function that returns an iterator. 2527 2528 Yields tuples of the form C{(item, iterator)}, where the iterator is the 2529 built-in list iterator or something you pass in, if you pre-generate the 2530 iterators. 2531 2532 This is a stable merge; complexity O(N lg N) 2533 2534 Examples:: 2535 2536 print list(x[0] for x in mergesort([[1,2,3,4], 2537 [2,3.5,3.7,4.5,6,7], 2538 [2.6,3.6,6.6,9]])) 2539 [1, 2, 2, 2.6, 3, 3.5, 3.6, 3.7, 4, 4.5, 6, 6.6, 7, 9] 2540 2541 # note stability 2542 print list(x[0] for x in mergesort([[1,2,3,4], 2543 [2,3.5,3.7,4.5,6,7], 2544 [2.6,3.6,6.6,9]], key=int)) 2545 [1, 2, 2, 2.6, 3, 3.5, 3.6, 3.7, 4, 4.5, 6, 6.6, 7, 9] 2546 2547 print list(x[0] for x in mergesort([[4,3,2,1], 2548 [7,6.5,4,3.7,3.3,1.9], 2549 [9,8.6,7.6,6.6,5.5,4.4,3.3]], 2550 key=lambda x: -x)) 2551 [9, 8.6, 7.6, 7, 6.6, 6.5, 5.5, 4.4, 4, 4, 3.7, 3.3, 3.3, 3, 2, 1.9, 1] 2552 """ 2553 2554 heap = [] 2555 for i, itr in enumerate(iter(pl) for pl in list_of_lists): 2556 try: 2557 item = next(itr) 2558 toadd = (key(item), i, item, itr) if key else (item, i, itr) 2559 heap.append(toadd) 2560 except StopIteration: 2561 pass 2562 heapq.heapify(heap) 2563 2564 if key: 2565 while heap: 2566 _, idx, item, itr = heap[0] 2567 yield item, itr 2568 try: 2569 item = next(itr) 2570 heapq.heapreplace(heap, (key(item), idx, item, itr) ) 2571 except StopIteration: 2572 heapq.heappop(heap) 2573 2574 else: 2575 while heap: 2576 item, idx, itr = heap[0] 2577 yield item, itr 2578 try: 2579 heapq.heapreplace(heap, (next(itr), idx, itr)) 2580 except StopIteration: 2581 heapq.heappop(heap)
2582
2583 -class _CompressorFileFacade(object):
2584 """ 2585 A file facade for sequential compressors (e.g., bz2.BZ2Compressor). 2586 """
2587 - def __init__(self, file, compressor):
2588 self.file = file 2589 self.compressor = compressor 2590 self.compressed_bytes_in = 0
2591
2592 - def write(self, data):
2593 compressed = self.compressor.compress(data) 2594 if len(compressed) > 0: 2595 self.file.write(compressed) 2596 self.compressed_bytes_in += len(data)
2597
2598 - def flush(self):
2599 compressed = self.compressor.flush() 2600 if len(compressed) > 0: 2601 self.file.write(compressed)
2602
2603 -def _median(values):
2604 values_len = len(values) 2605 if values_len == 0: 2606 return float('nan') 2607 2608 sorted_values = sorted(values) 2609 if values_len % 2 == 1: 2610 return sorted_values[int(values_len / 2)] 2611 2612 lower = sorted_values[int(values_len / 2) - 1] 2613 upper = sorted_values[int(values_len / 2)] 2614 2615 return float(lower + upper) / 2
2616