Package rosbag :: Module bag
[frames] | no frames]

Source Code for Module rosbag.bag

   1  # Software License Agreement (BSD License) 
   2  # 
   3  # Copyright (c) 2010, Willow Garage, Inc. 
   4  # All rights reserved. 
   5  # 
   6  # Redistribution and use in source and binary forms, with or without 
   7  # modification, are permitted provided that the following conditions 
   8  # are met: 
   9  # 
  10  #  * Redistributions of source code must retain the above copyright 
  11  #    notice, this list of conditions and the following disclaimer. 
  12  #  * Redistributions in binary form must reproduce the above 
  13  #    copyright notice, this list of conditions and the following 
  14  #    disclaimer in the documentation and/or other materials provided 
  15  #    with the distribution. 
  16  #  * Neither the name of Willow Garage, Inc. nor the names of its 
  17  #    contributors may be used to endorse or promote products derived 
  18  #    from this software without specific prior written permission. 
  19  # 
  20  # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 
  21  # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 
  22  # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS 
  23  # FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE 
  24  # COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, 
  25  # INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, 
  26  # BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; 
  27  # LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER 
  28  # CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 
  29  # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN 
  30  # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 
  31  # POSSIBILITY OF SUCH DAMAGE. 
  32   
  33  """ 
  34  The rosbag Python API. 
  35   
  36  Provides serialization of bag files. 
  37  """ 
  38   
  39  from __future__ import print_function 
  40   
  41  import bisect 
  42  import bz2 
  43  import collections 
  44  import heapq 
  45  import os 
  46  import re 
  47  import struct 
  48  import sys 
  49  import threading 
  50  import time 
  51  import yaml 
  52   
  53  try: 
  54      from cStringIO import StringIO  # Python 2.x 
  55  except ImportError: 
  56      from io import BytesIO as StringIO  # Python 3.x 
  57   
  58  import genmsg 
  59  import genpy 
  60  import genpy.dynamic 
  61  import genpy.message 
  62   
  63  import roslib.names # still needed for roslib.names.canonicalize_name() 
  64  import rospy 
  65  try: 
  66      import roslz4 
  67      found_lz4 = True 
  68  except ImportError: 
  69      rospy.logwarn( 
  70          'Failed to load Python extension for LZ4 support. ' 
  71          'LZ4 compression will not be available.') 
  72      found_lz4 = False 
73 74 -class ROSBagException(Exception):
75 """ 76 Base class for exceptions in rosbag. 77 """
78 - def __init__(self, value):
79 self.value = value
80
81 - def __str__(self):
82 return self.value
83
84 -class ROSBagFormatException(ROSBagException):
85 """ 86 Exceptions for errors relating to the bag file format. 87 """
88 - def __init__(self, value):
89 ROSBagException.__init__(self, value)
90
91 -class ROSBagUnindexedException(ROSBagException):
92 """ 93 Exception for unindexed bags. 94 """
95 - def __init__(self):
96 ROSBagException.__init__(self, 'Unindexed bag')
97
98 -class Compression:
99 """ 100 Allowable compression types 101 """ 102 NONE = 'none' 103 BZ2 = 'bz2' 104 LZ4 = 'lz4'
105 106 BagMessage = collections.namedtuple('BagMessage', 'topic message timestamp')
107 108 -class Bag(object):
109 """ 110 Bag serialize messages to and from a single file on disk using the bag format. 111 """
112 - def __init__(self, f, mode='r', compression=Compression.NONE, chunk_threshold=768 * 1024, allow_unindexed=False, options=None, skip_index=False):
113 """ 114 Open a bag file. The mode can be 'r', 'w', or 'a' for reading (default), 115 writing or appending. The file will be created if it doesn't exist 116 when opened for writing or appending; it will be truncated when opened 117 for writing. Simultaneous reading and writing is allowed when in writing 118 or appending mode. 119 @param f: filename of bag to open or a stream to read from 120 @type f: str or file 121 @param mode: mode, either 'r', 'w', or 'a' 122 @type mode: str 123 @param compression: compression mode, see U{rosbag.Compression} for valid modes 124 @type compression: str 125 @param chunk_threshold: minimum number of uncompressed bytes per chunk 126 @type chunk_threshold: int 127 @param allow_unindexed: if True, allow opening unindexed bags 128 @type allow_unindexed: bool 129 @param options: the bag options (currently: compression and chunk_threshold) 130 @type options: dict 131 @param skip_index: if True, don't read the connection index records on open [2.0+] 132 @type skip_index: bool 133 @raise ValueError: if any argument is invalid 134 @raise ROSBagException: if an error occurs opening file 135 @raise ROSBagFormatException: if bag format is corrupted 136 """ 137 if options is not None: 138 if type(options) is not dict: 139 raise ValueError('options must be of type dict') 140 if 'compression' in options: 141 compression = options['compression'] 142 if 'chunk_threshold' in options: 143 chunk_threshold = options['chunk_threshold'] 144 145 self._file = None 146 self._filename = None 147 self._version = None 148 149 allowed_compressions = [Compression.NONE, Compression.BZ2] 150 if found_lz4: 151 allowed_compressions.append(Compression.LZ4) 152 if compression not in allowed_compressions: 153 raise ValueError('compression must be one of: %s' % ', '.join(allowed_compressions)) 154 self._compression = compression 155 156 if chunk_threshold < 0: 157 raise ValueError('chunk_threshold must be greater than or equal to zero') 158 self._chunk_threshold = chunk_threshold 159 160 self._skip_index = skip_index 161 162 self._reader = None 163 164 self._file_header_pos = None 165 self._index_data_pos = 0 # (1.2+) 166 167 self._clear_index() 168 169 self._buffer = StringIO() 170 171 self._curr_compression = Compression.NONE 172 173 self._open(f, mode, allow_unindexed) 174 175 self._output_file = self._file
176
177 - def __iter__(self):
178 return self.read_messages()
179
180 - def __enter__(self):
181 return self
182
183 - def __exit__(self, exc_type, exc_value, traceback):
184 self.close()
185 186 @property
187 - def options(self):
188 """Get the options.""" 189 return { 'compression' : self._compression, 'chunk_threshold' : self._chunk_threshold }
190 191 @property
192 - def filename(self):
193 """Get the filename.""" 194 return self._filename
195 196 @property
197 - def version(self):
198 """Get the version.""" 199 return self._version
200 201 @property
202 - def mode(self):
203 """Get the mode.""" 204 return self._mode
205 206 @property
207 - def size(self):
208 """Get the size in bytes.""" 209 if not self._file: 210 raise ValueError('I/O operation on closed bag') 211 212 pos = self._file.tell() 213 self._file.seek(0, os.SEEK_END) 214 size = self._file.tell() 215 self._file.seek(pos) 216 return size
217 218 # compression 219
220 - def _get_compression(self):
221 """Get the compression method to use for writing.""" 222 return self._compression
223
224 - def _set_compression(self, compression):
225 """Set the compression method to use for writing.""" 226 allowed_compressions = [Compression.NONE, Compression.BZ2] 227 if found_lz4: 228 allowed_compressions.append(Compression.LZ4) 229 if compression not in allowed_compressions: 230 raise ValueError('compression must be one of: %s' % ', '.join(allowed_compressions)) 231 232 self.flush() 233 self._compression = compression
234 235 compression = property(_get_compression, _set_compression) 236 237 # chunk_threshold 238
239 - def _get_chunk_threshold(self):
240 """Get the chunk threshold to use for writing.""" 241 return self._chunk_threshold
242
243 - def _set_chunk_threshold(self, chunk_threshold):
244 """Set the chunk threshold to use for writing.""" 245 if chunk_threshold < 0: 246 raise ValueError('chunk_threshold must be greater than or equal to zero') 247 248 self.flush() 249 self._chunk_threshold = chunk_threshold
250 251 chunk_threshold = property(_get_chunk_threshold, _set_chunk_threshold) 252
253 - def read_messages(self, topics=None, start_time=None, end_time=None, connection_filter=None, raw=False):
254 """ 255 Read messages from the bag, optionally filtered by topic, timestamp and connection details. 256 @param topics: list of topics or a single topic. if an empty list is given all topics will be read [optional] 257 @type topics: list(str) or str 258 @param start_time: earliest timestamp of message to return [optional] 259 @type start_time: U{genpy.Time} 260 @param end_time: latest timestamp of message to return [optional] 261 @type end_time: U{genpy.Time} 262 @param connection_filter: function to filter connections to include [optional] 263 @type connection_filter: function taking (topic, datatype, md5sum, msg_def, header) and returning bool 264 @param raw: if True, then generate tuples of (datatype, (data, md5sum, position), pytype) 265 @type raw: bool 266 @return: generator of BagMessage(topic, message, timestamp) namedtuples for each message in the bag file 267 @rtype: generator of tuples of (str, U{genpy.Message}, U{genpy.Time}) [not raw] or (str, (str, str, str, tuple, class), U{genpy.Time}) [raw] 268 """ 269 self.flush() 270 271 if topics and type(topics) is str: 272 topics = [topics] 273 274 return self._reader.read_messages(topics, start_time, end_time, connection_filter, raw)
275
276 - def flush(self):
277 """ 278 Write the open chunk to disk so subsequent reads will read all messages. 279 @raise ValueError: if bag is closed 280 """ 281 if not self._file: 282 raise ValueError('I/O operation on closed bag') 283 284 if self._chunk_open: 285 self._stop_writing_chunk()
286
287 - def write(self, topic, msg, t=None, raw=False):
288 """ 289 Write a message to the bag. 290 @param topic: name of topic 291 @type topic: str 292 @param msg: message to add to bag, or tuple (if raw) 293 @type msg: Message or tuple of raw message data 294 @param t: ROS time of message publication, if None specifed, use current time [optional] 295 @type t: U{genpy.Time} 296 @param raw: if True, msg is in raw format, i.e. (msg_type, serialized_bytes, md5sum, pytype) 297 @type raw: bool 298 @raise ValueError: if arguments are invalid or bag is closed 299 """ 300 if not self._file: 301 raise ValueError('I/O operation on closed bag') 302 303 if not topic: 304 raise ValueError('topic is invalid') 305 if not msg: 306 raise ValueError('msg is invalid') 307 308 if t is None: 309 t = genpy.Time.from_sec(time.time()) 310 311 # Seek to end (in case previous operation was a read) 312 self._file.seek(0, os.SEEK_END) 313 314 # Open a chunk, if needed 315 if not self._chunk_open: 316 self._start_writing_chunk(t) 317 318 # Unpack raw 319 if raw: 320 if len(msg) == 5: 321 msg_type, serialized_bytes, md5sum, pos, pytype = msg 322 elif len(msg) == 4: 323 msg_type, serialized_bytes, md5sum, pytype = msg 324 else: 325 raise ValueError('msg must be of length 4 or 5') 326 327 # Write connection record, if necessary (currently using a connection per topic; ignoring message connection header) 328 if topic in self._topic_connections: 329 connection_info = self._topic_connections[topic] 330 conn_id = connection_info.id 331 else: 332 conn_id = len(self._connections) 333 334 if raw: 335 if pytype is None: 336 try: 337 pytype = genpy.message.get_message_class(msg_type) 338 except Exception: 339 pytype = None 340 if pytype is None: 341 raise ROSBagException('cannot locate message class and no message class provided for [%s]' % msg_type) 342 343 if pytype._md5sum != md5sum: 344 print('WARNING: md5sum of loaded type [%s] does not match that specified' % msg_type, file=sys.stderr) 345 #raise ROSBagException('md5sum of loaded type does not match that of data being recorded') 346 347 header = { 'topic' : topic, 'type' : msg_type, 'md5sum' : md5sum, 'message_definition' : pytype._full_text } 348 else: 349 header = { 'topic' : topic, 'type' : msg.__class__._type, 'md5sum' : msg.__class__._md5sum, 'message_definition' : msg._full_text } 350 351 connection_info = _ConnectionInfo(conn_id, topic, header) 352 353 self._write_connection_record(connection_info) 354 355 self._connections[conn_id] = connection_info 356 self._topic_connections[topic] = connection_info 357 358 # Create an index entry 359 index_entry = _IndexEntry200(t, self._curr_chunk_info.pos, self._get_chunk_offset()) 360 361 # Update the indexes and current chunk info 362 if conn_id not in self._curr_chunk_connection_indexes: 363 # This is the first message on this connection in the chunk 364 self._curr_chunk_connection_indexes[conn_id] = [index_entry] 365 self._curr_chunk_info.connection_counts[conn_id] = 1 366 else: 367 curr_chunk_connection_index = self._curr_chunk_connection_indexes[conn_id] 368 if index_entry >= curr_chunk_connection_index[-1]: 369 # Test if we're writing chronologically. Can skip binary search if so. 370 curr_chunk_connection_index.append(index_entry) 371 else: 372 bisect.insort_right(curr_chunk_connection_index, index_entry) 373 374 self._curr_chunk_info.connection_counts[conn_id] += 1 375 376 if conn_id not in self._connection_indexes: 377 self._connection_indexes[conn_id] = [index_entry] 378 else: 379 bisect.insort_right(self._connection_indexes[conn_id], index_entry) 380 381 # Update the chunk start/end times 382 if t > self._curr_chunk_info.end_time: 383 self._curr_chunk_info.end_time = t 384 elif t < self._curr_chunk_info.start_time: 385 self._curr_chunk_info.start_time = t 386 387 if not raw: 388 # Serialize the message to the buffer 389 self._buffer.seek(0) 390 self._buffer.truncate(0) 391 msg.serialize(self._buffer) 392 serialized_bytes = self._buffer.getvalue() 393 394 # Write message data record 395 self._write_message_data_record(conn_id, t, serialized_bytes) 396 397 # Check if we want to stop this chunk 398 chunk_size = self._get_chunk_offset() 399 if chunk_size > self._chunk_threshold: 400 self._stop_writing_chunk()
401
402 - def reindex(self):
403 """ 404 Reindexes the bag file. Yields position of each chunk for progress. 405 """ 406 self._clear_index() 407 return self._reader.reindex()
408
409 - def close(self):
410 """ 411 Close the bag file. Closing an already closed bag does nothing. 412 """ 413 if self._file: 414 if self._mode in 'wa': 415 self._stop_writing() 416 417 self._close_file()
418
419 - def get_compression_info(self):
420 """ 421 Returns information about the compression of the bag 422 @return: generator of CompressionTuple(compression, uncompressed, compressed) describing the 423 type of compression used, size of the uncompressed data in Bytes, size of the compressed data in Bytes. If 424 no compression is used then uncompressed and compressed data will be equal. 425 @rtype: generator of CompressionTuple of (str, int, int) 426 """ 427 428 compression = self.compression 429 uncompressed = 0 430 compressed = 0 431 432 if self._chunk_headers: 433 compression_counts = {} 434 compression_uncompressed = {} 435 compression_compressed = {} 436 437 # the rest of this is determine which compression algorithm is dominant and 438 # to add up the uncompressed and compressed Bytes 439 for chunk_header in self._chunk_headers.values(): 440 if chunk_header.compression not in compression_counts: 441 compression_counts[chunk_header.compression] = 0 442 if chunk_header.compression not in compression_uncompressed: 443 compression_uncompressed[chunk_header.compression] = 0 444 if chunk_header.compression not in compression_compressed: 445 compression_compressed[chunk_header.compression] = 0 446 447 compression_counts[chunk_header.compression] += 1 448 compression_uncompressed[chunk_header.compression] += chunk_header.uncompressed_size 449 uncompressed += chunk_header.uncompressed_size 450 compression_compressed[chunk_header.compression] += chunk_header.compressed_size 451 compressed += chunk_header.compressed_size 452 453 chunk_count = len(self._chunk_headers) 454 455 main_compression_count, main_compression = sorted([(v, k) for k, v in compression_counts.items()], reverse=True)[0] 456 compression = str(main_compression) 457 458 return collections.namedtuple("CompressionTuple", ["compression", 459 "uncompressed", "compressed"])(compression=compression, 460 uncompressed=uncompressed, compressed=compressed)
461
462 - def get_message_count(self, topic_filters=None):
463 """ 464 Returns the number of messages in the bag. Can be filtered by Topic 465 @param topic_filters: One or more topics to filter by 466 @type topic_filters: Could be either a single str or a list of str. 467 @return: The number of messages in the bag, optionally filtered by topic 468 @rtype: int 469 """ 470 471 num_messages = 0 472 473 if topic_filters is not None: 474 info = self.get_type_and_topic_info(topic_filters=topic_filters) 475 for topic in info.topics.values(): 476 num_messages += topic.message_count 477 else: 478 if self._chunks: 479 for c in self._chunks: 480 for counts in c.connection_counts.values(): 481 num_messages += counts 482 else: 483 num_messages = sum([len(index) for index in self._connection_indexes.values()]) 484 485 return num_messages
486
487 - def get_start_time(self):
488 """ 489 Returns the start time of the bag. 490 @return: a timestamp of the start of the bag 491 @rtype: float, timestamp in seconds, includes fractions of a second 492 """ 493 494 if self._chunks: 495 start_stamp = self._chunks[0].start_time.to_sec() 496 else: 497 if not self._connection_indexes: 498 raise ROSBagException('Bag contains no message') 499 start_stamp = min([index[0].time.to_sec() for index in self._connection_indexes.values()]) 500 501 return start_stamp
502
503 - def get_end_time(self):
504 """ 505 Returns the end time of the bag. 506 @return: a timestamp of the end of the bag 507 @rtype: float, timestamp in seconds, includes fractions of a second 508 """ 509 510 if self._chunks: 511 end_stamp = self._chunks[-1].end_time.to_sec() 512 else: 513 if not self._connection_indexes: 514 raise ROSBagException('Bag contains no message') 515 end_stamp = max([index[-1].time.to_sec() for index in self._connection_indexes.values()]) 516 517 return end_stamp
518
519 - def get_type_and_topic_info(self, topic_filters=None):
520 """ 521 Coallates info about the type and topics in the bag. 522 523 Note, it would be nice to filter by type as well, but there appear to be some limitations in the current architecture 524 that prevent that from working when more than one message type is written on the same topic. 525 526 @param topic_filters: specify one or more topic to filter by. 527 @type topic_filters: either a single str or a list of str. 528 @return: generator of TypesAndTopicsTuple(types{key:type name, val: md5hash}, 529 topics{type: msg type (Ex. "std_msgs/String"), 530 message_count: the number of messages of the particular type, 531 connections: the number of connections, 532 frequency: the data frequency, 533 key: type name, 534 val: md5hash}) describing the types of messages present 535 and information about the topics 536 @rtype: TypesAndTopicsTuple(dict(str, str), 537 TopicTuple(str, int, int, float, str, str)) 538 """ 539 540 datatypes = set() 541 datatype_infos = [] 542 543 for connection in self._connections.values(): 544 if connection.datatype in datatypes: 545 continue 546 547 datatype_infos.append((connection.datatype, connection.md5sum, connection.msg_def)) 548 datatypes.add(connection.datatype) 549 550 topics = [] 551 # load our list of topics and optionally filter 552 if topic_filters is not None: 553 if not isinstance(topic_filters, list): 554 topic_filters = [topic_filters] 555 556 topics = topic_filters 557 else: 558 topics = [c.topic for c in self._get_connections()] 559 560 topics = sorted(set(topics)) 561 562 topic_datatypes = {} 563 topic_conn_counts = {} 564 topic_msg_counts = {} 565 topic_freqs_median = {} 566 567 for topic in topics: 568 connections = list(self._get_connections(topic)) 569 570 if not connections: 571 continue 572 573 topic_datatypes[topic] = connections[0].datatype 574 topic_conn_counts[topic] = len(connections) 575 576 msg_count = 0 577 for connection in connections: 578 for chunk in self._chunks: 579 msg_count += chunk.connection_counts.get(connection.id, 0) 580 581 topic_msg_counts[topic] = msg_count 582 583 if self._connection_indexes_read: 584 stamps = [entry.time.to_sec() for entry in self._get_entries(connections)] 585 if len(stamps) > 1: 586 periods = [s1 - s0 for s1, s0 in zip(stamps[1:], stamps[:-1])] 587 med_period = _median(periods) 588 if med_period > 0.0: 589 topic_freqs_median[topic] = 1.0 / med_period 590 591 # process datatypes 592 types = {} 593 for datatype, md5sum, msg_def in sorted(datatype_infos): 594 types[datatype] = md5sum 595 596 # process topics 597 topics_t = {} 598 TopicTuple = collections.namedtuple("TopicTuple", ["msg_type", "message_count", "connections", "frequency"]) 599 for topic in sorted(topic_datatypes.keys()): 600 topic_msg_count = topic_msg_counts[topic] 601 frequency = topic_freqs_median[topic] if topic in topic_freqs_median else None 602 topics_t[topic] = TopicTuple(msg_type=topic_datatypes[topic], 603 message_count=topic_msg_count, 604 connections=topic_conn_counts[topic], 605 frequency=frequency) 606 607 return collections.namedtuple("TypesAndTopicsTuple", ["msg_types", "topics"])(msg_types=types, topics=topics_t)
608
609 - def __str__(self):
610 rows = [] 611 612 try: 613 if self._filename: 614 rows.append(('path', self._filename)) 615 616 if self._version == 102 and type(self._reader) == _BagReader102_Unindexed: 617 rows.append(('version', '1.2 (unindexed)')) 618 else: 619 rows.append(('version', '%d.%d' % (int(self._version / 100), self._version % 100))) 620 621 if not self._connection_indexes and not self._chunks: 622 rows.append(('size', _human_readable_size(self.size))) 623 else: 624 if self._chunks: 625 start_stamp = self._chunks[ 0].start_time.to_sec() 626 end_stamp = self._chunks[-1].end_time.to_sec() 627 else: 628 start_stamp = min([index[ 0].time.to_sec() for index in self._connection_indexes.values()]) 629 end_stamp = max([index[-1].time.to_sec() for index in self._connection_indexes.values()]) 630 631 # Show duration 632 duration = end_stamp - start_stamp 633 dur_secs = duration % 60 634 dur_mins = int(duration / 60) 635 dur_hrs = int(dur_mins / 60) 636 if dur_hrs > 0: 637 dur_mins = dur_mins % 60 638 duration_str = '%dhr %d:%02ds (%ds)' % (dur_hrs, dur_mins, dur_secs, duration) 639 elif dur_mins > 0: 640 duration_str = '%d:%02ds (%ds)' % (dur_mins, dur_secs, duration) 641 else: 642 duration_str = '%.1fs' % duration 643 644 rows.append(('duration', duration_str)) 645 646 # Show start and end times 647 rows.append(('start', '%s (%.2f)' % (_time_to_str(start_stamp), start_stamp))) 648 rows.append(('end', '%s (%.2f)' % (_time_to_str(end_stamp), end_stamp))) 649 650 rows.append(('size', _human_readable_size(self.size))) 651 652 if self._chunks: 653 num_messages = 0 654 for c in self._chunks: 655 for counts in c.connection_counts.values(): 656 num_messages += counts 657 else: 658 num_messages = sum([len(index) for index in self._connection_indexes.values()]) 659 rows.append(('messages', str(num_messages))) 660 661 # Show compression information 662 if len(self._chunk_headers) == 0: 663 rows.append(('compression', 'none')) 664 else: 665 compression_counts = {} 666 compression_uncompressed = {} 667 compression_compressed = {} 668 for chunk_header in self._chunk_headers.values(): 669 if chunk_header.compression not in compression_counts: 670 compression_counts[chunk_header.compression] = 1 671 compression_uncompressed[chunk_header.compression] = chunk_header.uncompressed_size 672 compression_compressed[chunk_header.compression] = chunk_header.compressed_size 673 else: 674 compression_counts[chunk_header.compression] += 1 675 compression_uncompressed[chunk_header.compression] += chunk_header.uncompressed_size 676 compression_compressed[chunk_header.compression] += chunk_header.compressed_size 677 678 chunk_count = len(self._chunk_headers) 679 680 compressions = [] 681 for count, compression in reversed(sorted([(v, k) for k, v in compression_counts.items()])): 682 if compression != Compression.NONE: 683 fraction = (100.0 * compression_compressed[compression]) / compression_uncompressed[compression] 684 compressions.append('%s [%d/%d chunks; %.2f%%]' % (compression, count, chunk_count, fraction)) 685 else: 686 compressions.append('%s [%d/%d chunks]' % (compression, count, chunk_count)) 687 rows.append(('compression', ', '.join(compressions))) 688 689 all_uncompressed = (sum([count for c, count in compression_counts.items() if c != Compression.NONE]) == 0) 690 if not all_uncompressed: 691 total_uncompressed_size = sum((h.uncompressed_size for h in self._chunk_headers.values())) 692 total_compressed_size = sum((h.compressed_size for h in self._chunk_headers.values())) 693 694 total_uncompressed_size_str = _human_readable_size(total_uncompressed_size) 695 total_compressed_size_str = _human_readable_size(total_compressed_size) 696 total_size_str_length = max([len(total_uncompressed_size_str), len(total_compressed_size_str)]) 697 698 if duration > 0: 699 uncompressed_rate_str = _human_readable_size(total_uncompressed_size / duration) 700 compressed_rate_str = _human_readable_size(total_compressed_size / duration) 701 rate_str_length = max([len(uncompressed_rate_str), len(compressed_rate_str)]) 702 703 rows.append(('uncompressed', '%*s @ %*s/s' % 704 (total_size_str_length, total_uncompressed_size_str, rate_str_length, uncompressed_rate_str))) 705 rows.append(('compressed', '%*s @ %*s/s (%.2f%%)' % 706 (total_size_str_length, total_compressed_size_str, rate_str_length, compressed_rate_str, (100.0 * total_compressed_size) / total_uncompressed_size))) 707 else: 708 rows.append(('uncompressed', '%*s' % (total_size_str_length, total_uncompressed_size_str))) 709 rows.append(('compressed', '%*s' % (total_size_str_length, total_compressed_size_str))) 710 711 datatypes = set() 712 datatype_infos = [] 713 for connection in self._connections.values(): 714 if connection.datatype in datatypes: 715 continue 716 datatype_infos.append((connection.datatype, connection.md5sum, connection.msg_def)) 717 datatypes.add(connection.datatype) 718 719 topics = sorted(set([c.topic for c in self._get_connections()])) 720 topic_datatypes = {} 721 topic_conn_counts = {} 722 topic_msg_counts = {} 723 topic_freqs_median = {} 724 for topic in topics: 725 connections = list(self._get_connections(topic)) 726 727 topic_datatypes[topic] = connections[0].datatype 728 topic_conn_counts[topic] = len(connections) 729 730 msg_count = 0 731 for connection in connections: 732 for chunk in self._chunks: 733 msg_count += chunk.connection_counts.get(connection.id, 0) 734 topic_msg_counts[topic] = msg_count 735 736 if self._connection_indexes_read: 737 stamps = [entry.time.to_sec() for entry in self._get_entries(connections)] 738 if len(stamps) > 1: 739 periods = [s1 - s0 for s1, s0 in zip(stamps[1:], stamps[:-1])] 740 med_period = _median(periods) 741 if med_period > 0.0: 742 topic_freqs_median[topic] = 1.0 / med_period 743 744 topics = sorted(topic_datatypes.keys()) 745 max_topic_len = max([len(topic) for topic in topics]) 746 max_datatype_len = max([len(datatype) for datatype in datatypes]) 747 max_msg_count_len = max([len('%d' % msg_count) for msg_count in topic_msg_counts.values()]) 748 max_freq_median_len = max([len(_human_readable_frequency(freq)) for freq in topic_freqs_median.values()]) if len(topic_freqs_median) > 0 else 0 749 750 # Show datatypes 751 for i, (datatype, md5sum, msg_def) in enumerate(sorted(datatype_infos)): 752 s = '%-*s [%s]' % (max_datatype_len, datatype, md5sum) 753 if i == 0: 754 rows.append(('types', s)) 755 else: 756 rows.append(('', s)) 757 758 # Show topics 759 for i, topic in enumerate(topics): 760 topic_msg_count = topic_msg_counts[topic] 761 762 s = '%-*s %*d %s' % (max_topic_len, topic, max_msg_count_len, topic_msg_count, 'msgs' if topic_msg_count > 1 else 'msg ') 763 if topic in topic_freqs_median: 764 s += ' @ %*s' % (max_freq_median_len, _human_readable_frequency(topic_freqs_median[topic])) 765 else: 766 s += ' %*s' % (max_freq_median_len, '') 767 768 s += ' : %-*s' % (max_datatype_len, topic_datatypes[topic]) 769 if topic_conn_counts[topic] > 1: 770 s += ' (%d connections)' % topic_conn_counts[topic] 771 772 if i == 0: 773 rows.append(('topics', s)) 774 else: 775 rows.append(('', s)) 776 777 except Exception as ex: 778 raise 779 780 first_column_width = max([len(field) for field, _ in rows]) + 1 781 782 s = '' 783 for (field, value) in rows: 784 if field: 785 s += '%-*s %s\n' % (first_column_width, field + ':', value) 786 else: 787 s += '%-*s %s\n' % (first_column_width, '', value) 788 789 return s.rstrip()
790
791 - def _get_yaml_info(self, key=None):
792 s = '' 793 794 try: 795 if self._filename: 796 s += 'path: %s\n' % self._filename 797 798 if self._version == 102 and type(self._reader) == _BagReader102_Unindexed: 799 s += 'version: 1.2 (unindexed)\n' 800 else: 801 s += 'version: %d.%d\n' % (int(self._version / 100), self._version % 100) 802 803 if not self._connection_indexes and not self._chunks: 804 s += 'size: %d\n' % self.size 805 s += 'indexed: False\n' 806 else: 807 if self._chunks: 808 start_stamp = self._chunks[ 0].start_time.to_sec() 809 end_stamp = self._chunks[-1].end_time.to_sec() 810 else: 811 start_stamp = min([index[ 0].time.to_sec() for index in self._connection_indexes.values()]) 812 end_stamp = max([index[-1].time.to_sec() for index in self._connection_indexes.values()]) 813 814 duration = end_stamp - start_stamp 815 s += 'duration: %.6f\n' % duration 816 s += 'start: %.6f\n' % start_stamp 817 s += 'end: %.6f\n' % end_stamp 818 s += 'size: %d\n' % self.size 819 if self._chunks: 820 num_messages = 0 821 for c in self._chunks: 822 for counts in c.connection_counts.values(): 823 num_messages += counts 824 else: 825 num_messages = sum([len(index) for index in self._connection_indexes.values()]) 826 s += 'messages: %d\n' % num_messages 827 s += 'indexed: True\n' 828 829 # Show compression information 830 if len(self._chunk_headers) == 0: 831 s += 'compression: none\n' 832 else: 833 compression_counts = {} 834 compression_uncompressed = {} 835 compression_compressed = {} 836 for chunk_header in self._chunk_headers.values(): 837 if chunk_header.compression not in compression_counts: 838 compression_counts[chunk_header.compression] = 1 839 compression_uncompressed[chunk_header.compression] = chunk_header.uncompressed_size 840 compression_compressed[chunk_header.compression] = chunk_header.compressed_size 841 else: 842 compression_counts[chunk_header.compression] += 1 843 compression_uncompressed[chunk_header.compression] += chunk_header.uncompressed_size 844 compression_compressed[chunk_header.compression] += chunk_header.compressed_size 845 846 chunk_count = len(self._chunk_headers) 847 848 main_compression_count, main_compression = list(reversed(sorted([(v, k) for k, v in compression_counts.items()])))[0] 849 s += 'compression: %s\n' % str(main_compression) 850 851 all_uncompressed = (sum([count for c, count in compression_counts.items() if c != Compression.NONE]) == 0) 852 if not all_uncompressed: 853 s += 'uncompressed: %d\n' % sum((h.uncompressed_size for h in self._chunk_headers.values())) 854 s += 'compressed: %d\n' % sum((h.compressed_size for h in self._chunk_headers.values())) 855 856 datatypes = set() 857 datatype_infos = [] 858 for connection in self._connections.values(): 859 if connection.datatype in datatypes: 860 continue 861 datatype_infos.append((connection.datatype, connection.md5sum, connection.msg_def)) 862 datatypes.add(connection.datatype) 863 864 topics = sorted(set([c.topic for c in self._get_connections()])) 865 topic_datatypes = {} 866 topic_conn_counts = {} 867 topic_msg_counts = {} 868 topic_freqs_median = {} 869 for topic in topics: 870 connections = list(self._get_connections(topic)) 871 872 topic_datatypes[topic] = connections[0].datatype 873 topic_conn_counts[topic] = len(connections) 874 875 msg_count = 0 876 for connection in connections: 877 for chunk in self._chunks: 878 msg_count += chunk.connection_counts.get(connection.id, 0) 879 topic_msg_counts[topic] = msg_count 880 881 if self._connection_indexes_read: 882 stamps = [entry.time.to_sec() for entry in self._get_entries(connections)] 883 if len(stamps) > 1: 884 periods = [s1 - s0 for s1, s0 in zip(stamps[1:], stamps[:-1])] 885 med_period = _median(periods) 886 if med_period > 0.0: 887 topic_freqs_median[topic] = 1.0 / med_period 888 889 topics = sorted(topic_datatypes.keys()) 890 max_topic_len = max([len(topic) for topic in topics]) 891 max_datatype_len = max([len(datatype) for datatype in datatypes]) 892 max_msg_count_len = max([len('%d' % msg_count) for msg_count in topic_msg_counts.values()]) 893 max_freq_median_len = max([len(_human_readable_frequency(freq)) for freq in topic_freqs_median.values()]) if len(topic_freqs_median) > 0 else 0 894 895 # Show datatypes 896 s += 'types:\n' 897 for i, (datatype, md5sum, msg_def) in enumerate(sorted(datatype_infos)): 898 s += ' - type: %s\n' % datatype 899 s += ' md5: %s\n' % md5sum 900 901 # Show topics 902 s += 'topics:\n' 903 for i, topic in enumerate(topics): 904 topic_msg_count = topic_msg_counts[topic] 905 906 s += ' - topic: %s\n' % topic 907 s += ' type: %s\n' % topic_datatypes[topic] 908 s += ' messages: %d\n' % topic_msg_count 909 910 if topic_conn_counts[topic] > 1: 911 s += ' connections: %d\n' % topic_conn_counts[topic] 912 913 if topic in topic_freqs_median: 914 s += ' frequency: %.4f\n' % topic_freqs_median[topic] 915 916 if not key: 917 return s 918 919 class DictObject(object): 920 def __init__(self, d): 921 for a, b in d.items(): 922 if isinstance(b, (list, tuple)): 923 setattr(self, a, [DictObject(x) if isinstance(x, dict) else x for x in b]) 924 else: 925 setattr(self, a, DictObject(b) if isinstance(b, dict) else b)
926 927 obj = DictObject(yaml.load(s)) 928 try: 929 val = eval('obj.' + key) 930 except Exception as ex: 931 print('Error getting key "%s"' % key, file=sys.stderr) 932 return None 933 934 def print_yaml(val, indent=0): 935 indent_str = ' ' * indent 936 937 if type(val) is list: 938 s = '' 939 for item in val: 940 s += '%s- %s\n' % (indent_str, print_yaml(item, indent + 1)) 941 return s 942 elif type(val) is DictObject: 943 s = '' 944 for i, (k, v) in enumerate(val.__dict__.items()): 945 if i != 0: 946 s += indent_str 947 s += '%s: %s' % (k, str(v)) 948 if i < len(val.__dict__) - 1: 949 s += '\n' 950 return s 951 else: 952 return indent_str + str(val) 953 954 return print_yaml(val) 955 956 except Exception as ex: 957 raise 958 959 ### Internal API ### 960 961 @property
962 - def _has_compressed_chunks(self):
963 if not self._chunk_headers: 964 return False 965 966 return any((h.compression != Compression.NONE for h in self._chunk_headers.values()))
967 968 @property
969 - def _uncompressed_size(self):
970 if not self._chunk_headers: 971 return self.size 972 973 return sum((h.uncompressed_size for h in self._chunk_headers.values()))
974
975 - def _read_message(self, position, raw=False):
976 """ 977 Read the message from the given position in the file. 978 """ 979 self.flush() 980 return self._reader.seek_and_read_message_data_record(position, raw)
981 982 # Index accessing 983
984 - def _get_connections(self, topics=None, connection_filter=None):
985 """ 986 Yield the connections, optionally filtering by topic and/or connection information. 987 """ 988 if topics: 989 if type(topics) is str: 990 topics = set([roslib.names.canonicalize_name(topics)]) 991 else: 992 topics = set([roslib.names.canonicalize_name(t) for t in topics]) 993 994 for c in self._connections.values(): 995 if topics and c.topic not in topics and roslib.names.canonicalize_name(c.topic) not in topics: 996 continue 997 if connection_filter and not connection_filter(c.topic, c.datatype, c.md5sum, c.msg_def, c.header): 998 continue 999 yield c
1000
1001 - def _get_entries(self, connections=None, start_time=None, end_time=None):
1002 """ 1003 Yield index entries on the given connections in the given time range. 1004 """ 1005 for entry, _ in _mergesort(self._get_indexes(connections), key=lambda entry: entry.time): 1006 if start_time and entry.time < start_time: 1007 continue 1008 if end_time and entry.time > end_time: 1009 return 1010 yield entry
1011
1012 - def _get_entries_reverse(self, connections=None, start_time=None, end_time=None):
1013 """ 1014 Yield index entries on the given connections in the given time range in reverse order. 1015 """ 1016 for entry, _ in _mergesort((reversed(index) for index in self._get_indexes(connections)), key=lambda entry: -entry.time.to_sec()): 1017 if end_time and entry.time > end_time: 1018 continue 1019 if start_time and entry.time < start_time: 1020 return 1021 yield entry
1022
1023 - def _get_entry(self, t, connections=None):
1024 """ 1025 Return the first index entry on/before the given time on the given connections 1026 """ 1027 indexes = self._get_indexes(connections) 1028 1029 entry = _IndexEntry(t) 1030 1031 first_entry = None 1032 1033 for index in indexes: 1034 i = bisect.bisect_right(index, entry) - 1 1035 if i >= 0: 1036 index_entry = index[i] 1037 if first_entry is None or index_entry > first_entry: 1038 first_entry = index_entry 1039 1040 return first_entry
1041
1042 - def _get_entry_after(self, t, connections=None):
1043 """ 1044 Return the first index entry after the given time on the given connections 1045 """ 1046 indexes = self._get_indexes(connections) 1047 1048 entry = _IndexEntry(t) 1049 1050 first_entry = None 1051 1052 for index in indexes: 1053 i = bisect.bisect_right(index, entry) 1054 if i <= len(index) - 1: 1055 index_entry = index[i] 1056 if first_entry is None or index_entry < first_entry: 1057 first_entry = index_entry 1058 1059 return first_entry
1060
1061 - def _get_indexes(self, connections):
1062 """ 1063 Get the indexes for the given connections. 1064 """ 1065 if not self._connection_indexes_read: 1066 self._reader._read_connection_index_records() 1067 1068 if connections is None: 1069 return self._connection_indexes.values() 1070 else: 1071 return (self._connection_indexes[c.id] for c in connections)
1072 1073 ### Implementation ### 1074
1075 - def _clear_index(self):
1076 self._connection_indexes_read = False 1077 self._connection_indexes = {} # id -> IndexEntry[] (1.2+) 1078 1079 self._topic_connections = {} # topic -> connection_id 1080 self._connections = {} # id -> ConnectionInfo 1081 self._connection_count = 0 # (2.0) 1082 self._chunk_count = 0 # (2.0) 1083 self._chunks = [] # ChunkInfo[] (2.0) 1084 self._chunk_headers = {} # chunk_pos -> ChunkHeader (2.0) 1085 1086 self._chunk_open = False 1087 self._curr_chunk_info = None 1088 self._curr_chunk_data_pos = None 1089 self._curr_chunk_connection_indexes = {}
1090
1091 - def _open(self, f, mode, allow_unindexed):
1092 if not f: 1093 raise ValueError('filename (or stream) is invalid') 1094 1095 try: 1096 if mode == 'r': self._open_read(f, allow_unindexed) 1097 elif mode == 'w': self._open_write(f) 1098 elif mode == 'a': self._open_append(f, allow_unindexed) 1099 else: 1100 raise ValueError('mode "%s" is invalid' % mode) 1101 if 'b' not in self._file.mode and not isinstance('', bytes): 1102 raise ROSBagException('In Python 3 the bag file must be opened in binary mode') 1103 except struct.error: 1104 raise ROSBagFormatException('error with bag')
1105
1106 - def _is_file(self, f):
1107 try: 1108 return isinstance(f, file) # Python 2 1109 except NameError: 1110 import io 1111 return isinstance(f, io.IOBase) # Python 3...this will return false in Python 2 always
1112
1113 - def _open_read(self, f, allow_unindexed):
1114 if self._is_file(f): 1115 self._file = f 1116 self._filename = None 1117 else: 1118 self._file = open(f, 'rb') 1119 self._filename = f 1120 1121 self._mode = 'r' 1122 1123 try: 1124 self._version = self._read_version() 1125 except: 1126 self._version = None 1127 self._close_file() 1128 raise 1129 1130 try: 1131 self._create_reader() 1132 self._reader.start_reading() 1133 except ROSBagUnindexedException as ex: 1134 if not allow_unindexed: 1135 self._close_file() 1136 raise 1137 except: 1138 self._close_file() 1139 raise
1140
1141 - def _open_write(self, f):
1142 if self._is_file(f): 1143 self._file = f 1144 self._filename = None 1145 else: 1146 self._file = open(f, 'w+b') 1147 self._filename = f 1148 1149 self._mode = 'w' 1150 1151 self._version = 200 1152 1153 try: 1154 self._create_reader() 1155 self._start_writing() 1156 except: 1157 self._close_file() 1158 raise
1159
1160 - def _open_append(self, f, allow_unindexed):
1161 if self._is_file(f): 1162 self._file = f 1163 self._filename = None 1164 else: 1165 try: 1166 # Test if the file already exists 1167 open(f, 'r').close() 1168 1169 # File exists: open in read with update mode 1170 self._file = open(f, 'r+b') 1171 except IOError: 1172 # File doesn't exist: open in write mode 1173 self._file = open(f, 'w+b') 1174 1175 self._filename = f 1176 1177 self._mode = 'a' 1178 1179 try: 1180 self._version = self._read_version() 1181 except: 1182 self._version = None 1183 self._close_file() 1184 raise 1185 1186 if self._version != 200: 1187 self._file.close() 1188 raise ROSBagException('bag version %d is unsupported for appending' % self._version) 1189 1190 try: 1191 self._start_appending() 1192 except ROSBagUnindexedException: 1193 if not allow_unindexed: 1194 self._close_file() 1195 raise 1196 except: 1197 self._close_file() 1198 raise
1199
1200 - def _close_file(self):
1201 self._file.close() 1202 self._file = None
1203
1204 - def _create_reader(self):
1205 """ 1206 @raise ROSBagException: if the bag version is unsupported 1207 """ 1208 major_version, minor_version = int(self._version / 100), self._version % 100 1209 if major_version == 2: 1210 self._reader = _BagReader200(self) 1211 elif major_version == 1: 1212 if minor_version == 1: 1213 raise ROSBagException('unsupported bag version %d. Please convert bag to newer format.' % self._version) 1214 else: 1215 # Get the op code of the first record. If it's FILE_HEADER, then we have an indexed 1.2 bag. 1216 first_record_pos = self._file.tell() 1217 header = _read_header(self._file) 1218 op = _read_uint8_field(header, 'op') 1219 self._file.seek(first_record_pos) 1220 1221 if op == _OP_FILE_HEADER: 1222 self._reader = _BagReader102_Indexed(self) 1223 else: 1224 self._reader = _BagReader102_Unindexed(self) 1225 else: 1226 raise ROSBagException('unknown bag version %d' % self._version)
1227
1228 - def _read_version(self):
1229 """ 1230 @raise ROSBagException: if the file is empty, or the version line can't be parsed 1231 """ 1232 version_line = self._file.readline().rstrip().decode() 1233 if len(version_line) == 0: 1234 raise ROSBagException('empty file') 1235 1236 matches = re.match("#ROS(.*) V(\d).(\d)", version_line) 1237 if matches is None or len(matches.groups()) != 3: 1238 raise ROSBagException('This does not appear to be a bag file') 1239 1240 version_type, major_version_str, minor_version_str = matches.groups() 1241 1242 version = int(major_version_str) * 100 + int(minor_version_str) 1243 1244 return version
1245
1246 - def _start_writing(self):
1247 version = _VERSION + '\n' 1248 version = version.encode() 1249 self._file.write(version) 1250 self._file_header_pos = self._file.tell() 1251 self._write_file_header_record(0, 0, 0) 1252
1253 - def _stop_writing(self):
1254 # Write the open chunk (if any) to file 1255 self.flush() 1256 1257 # Remember this location as the start of the index 1258 self._index_data_pos = self._file.tell() 1259 1260 # Write connection infos 1261 for connection_info in self._connections.values(): 1262 self._write_connection_record(connection_info) 1263 1264 # Write chunk infos 1265 for chunk_info in self._chunks: 1266 self._write_chunk_info_record(chunk_info) 1267 1268 # Re-write the file header 1269 self._file.seek(self._file_header_pos) 1270 self._write_file_header_record(self._index_data_pos, len(self._connections), len(self._chunks))
1271
1272 - def _start_appending(self):
1273 self._file_header_pos = self._file.tell() 1274 1275 self._create_reader() 1276 self._reader.start_reading() 1277 1278 # Truncate the file to chop off the index 1279 self._file.truncate(self._index_data_pos) 1280 self._reader.index_data_pos = 0 1281 1282 # Rewrite the file header, clearing the index position (so we know if the index is invalid) 1283 self._file.seek(self._file_header_pos); 1284 self._write_file_header_record(0, 0, 0) 1285 1286 # Seek to the end of the file 1287 self._file.seek(0, os.SEEK_END)
1288
1289 - def _start_writing_chunk(self, t):
1290 self._curr_chunk_info = _ChunkInfo(self._file.tell(), t, t) 1291 self._write_chunk_header(_ChunkHeader(self._compression, 0, 0)) 1292 self._curr_chunk_data_pos = self._file.tell() 1293 self._set_compression_mode(self._compression) 1294 self._chunk_open = True
1295
1296 - def _get_chunk_offset(self):
1297 if self._compression == Compression.NONE: 1298 return self._file.tell() - self._curr_chunk_data_pos 1299 else: 1300 return self._output_file.compressed_bytes_in
1301
1302 - def _stop_writing_chunk(self):
1303 # Add this chunk to the index 1304 self._chunks.append(self._curr_chunk_info) 1305 1306 # Get the uncompressed and compressed sizes 1307 uncompressed_size = self._get_chunk_offset() 1308 self._set_compression_mode(Compression.NONE) 1309 compressed_size = self._file.tell() - self._curr_chunk_data_pos 1310 1311 # Rewrite the chunk header with the size of the chunk (remembering current offset) 1312 end_of_chunk_pos = self._file.tell() 1313 self._file.seek(self._curr_chunk_info.pos) 1314 chunk_header = _ChunkHeader(self._compression, compressed_size, uncompressed_size, self._curr_chunk_data_pos) 1315 self._write_chunk_header(chunk_header) 1316 self._chunk_headers[self._curr_chunk_info.pos] = chunk_header 1317 1318 # Write out the connection indexes and clear them 1319 self._file.seek(end_of_chunk_pos) 1320 for connection_id, entries in self._curr_chunk_connection_indexes.items(): 1321 self._write_connection_index_record(connection_id, entries) 1322 self._curr_chunk_connection_indexes.clear() 1323 1324 # Flag that we're starting a new chunk 1325 self._chunk_open = False
1326
1327 - def _set_compression_mode(self, compression):
1328 # Flush the compressor, if needed 1329 if self._curr_compression != Compression.NONE: 1330 self._output_file.flush() 1331 1332 # Create the compressor 1333 if compression == Compression.BZ2: 1334 self._output_file = _CompressorFileFacade(self._file, bz2.BZ2Compressor()) 1335 elif compression == Compression.LZ4 and found_lz4: 1336 self._output_file = _CompressorFileFacade(self._file, roslz4.LZ4Compressor()) 1337 elif compression == Compression.NONE: 1338 self._output_file = self._file 1339 else: 1340 raise ROSBagException('unsupported compression type: %s' % compression) 1341 1342 self._curr_compression = compression
1343
1344 - def _write_file_header_record(self, index_pos, connection_count, chunk_count):
1345 header = { 1346 'op': _pack_uint8(_OP_FILE_HEADER), 1347 'index_pos': _pack_uint64(index_pos), 1348 'conn_count': _pack_uint32(connection_count), 1349 'chunk_count': _pack_uint32(chunk_count) 1350 } 1351 _write_record(self._file, header, padded_size=_FILE_HEADER_LENGTH)
1352
1353 - def _write_connection_record(self, connection_info):
1354 header = { 1355 'op': _pack_uint8(_OP_CONNECTION), 1356 'topic': connection_info.topic, 1357 'conn': _pack_uint32(connection_info.id) 1358 } 1359 1360 _write_header(self._output_file, header) 1361 1362 _write_header(self._output_file, connection_info.header)
1363
1364 - def _write_message_data_record(self, connection_id, t, serialized_bytes):
1365 header = { 1366 'op': _pack_uint8(_OP_MSG_DATA), 1367 'conn': _pack_uint32(connection_id), 1368 'time': _pack_time(t) 1369 } 1370 _write_record(self._output_file, header, serialized_bytes)
1371
1372 - def _write_chunk_header(self, chunk_header):
1373 header = { 1374 'op': _pack_uint8(_OP_CHUNK), 1375 'compression': chunk_header.compression, 1376 'size': _pack_uint32(chunk_header.uncompressed_size) 1377 } 1378 _write_header(self._file, header) 1379 1380 self._file.write(_pack_uint32(chunk_header.compressed_size))
1381
1382 - def _write_connection_index_record(self, connection_id, entries):
1383 header = { 1384 'op': _pack_uint8(_OP_INDEX_DATA), 1385 'conn': _pack_uint32(connection_id), 1386 'ver': _pack_uint32(_INDEX_VERSION), 1387 'count': _pack_uint32(len(entries)) 1388 } 1389 1390 buffer = self._buffer 1391 buffer.seek(0) 1392 buffer.truncate(0) 1393 for entry in entries: 1394 buffer.write(_pack_time (entry.time)) 1395 buffer.write(_pack_uint32(entry.offset)) 1396 1397 _write_record(self._file, header, buffer.getvalue()) 1398
1399 - def _write_chunk_info_record(self, chunk_info):
1400 header = { 1401 'op': _pack_uint8 (_OP_CHUNK_INFO), 1402 'ver': _pack_uint32(_CHUNK_INDEX_VERSION), 1403 'chunk_pos': _pack_uint64(chunk_info.pos), 1404 'start_time': _pack_time(chunk_info.start_time), 1405 'end_time': _pack_time(chunk_info.end_time), 1406 'count': _pack_uint32(len(chunk_info.connection_counts)) 1407 } 1408 1409 buffer = self._buffer 1410 buffer.seek(0) 1411 buffer.truncate(0) 1412 for connection_id, count in chunk_info.connection_counts.items(): 1413 buffer.write(_pack_uint32(connection_id)) 1414 buffer.write(_pack_uint32(count)) 1415 1416 _write_record(self._file, header, buffer.getvalue())
1417 1418 ### Implementation ### 1419 1420 _message_types = {} # md5sum -> type 1421 1422 _OP_MSG_DEF = 0x01 1423 _OP_MSG_DATA = 0x02 1424 _OP_FILE_HEADER = 0x03 1425 _OP_INDEX_DATA = 0x04 1426 _OP_CHUNK = 0x05 1427 _OP_CHUNK_INFO = 0x06 1428 _OP_CONNECTION = 0x07 1429 1430 _OP_CODES = { 1431 _OP_MSG_DEF: 'MSG_DEF', 1432 _OP_MSG_DATA: 'MSG_DATA', 1433 _OP_FILE_HEADER: 'FILE_HEADER', 1434 _OP_INDEX_DATA: 'INDEX_DATA', 1435 _OP_CHUNK: 'CHUNK', 1436 _OP_CHUNK_INFO: 'CHUNK_INFO', 1437 _OP_CONNECTION: 'CONNECTION' 1438 } 1439 1440 _VERSION = '#ROSBAG V2.0' 1441 _FILE_HEADER_LENGTH = 4096 1442 _INDEX_VERSION = 1 1443 _CHUNK_INDEX_VERSION = 1
1444 1445 -class _ConnectionInfo(object):
1446 - def __init__(self, id, topic, header):
1447 try: 1448 datatype = _read_str_field(header, 'type') 1449 md5sum = _read_str_field(header, 'md5sum') 1450 msg_def = _read_str_field(header, 'message_definition') 1451 except KeyError as ex: 1452 raise ROSBagFormatException('connection header field %s not found' % str(ex)) 1453 1454 self.id = id 1455 self.topic = topic 1456 self.datatype = datatype 1457 self.md5sum = md5sum 1458 self.msg_def = msg_def 1459 self.header = header
1460
1461 - def __str__(self):
1462 return '%d on %s: %s' % (self.id, self.topic, str(self.header))
1463
1464 -class _ChunkInfo(object):
1465 - def __init__(self, pos, start_time, end_time):
1466 self.pos = pos 1467 self.start_time = start_time 1468 self.end_time = end_time 1469 1470 self.connection_counts = {}
1471
1472 - def __str__(self):
1473 s = 'chunk_pos: %d\n' % self.pos 1474 s += 'start_time: %s\n' % str(self.start_time) 1475 s += 'end_time: %s\n' % str(self.end_time) 1476 s += 'connections: %d\n' % len(self.connection_counts) 1477 s += '\n'.join([' - [%4d] %d' % (connection_id, count) for connection_id, count in self.connection_counts.items()]) 1478 return s
1479
1480 -class _ChunkHeader(object):
1481 - def __init__(self, compression, compressed_size, uncompressed_size, data_pos=0):
1482 self.compression = compression 1483 self.compressed_size = compressed_size 1484 self.uncompressed_size = uncompressed_size 1485 self.data_pos = data_pos
1486
1487 - def __str__(self):
1488 if self.uncompressed_size > 0: 1489 ratio = 100 * (float(self.compressed_size) / self.uncompressed_size) 1490 return 'compression: %s, size: %d, uncompressed: %d (%.2f%%)' % (self.compression, self.compressed_size, self.uncompressed_size, ratio) 1491 else: 1492 return 'compression: %s, size: %d, uncompressed: %d' % (self.compression, self.compressed_size, self.uncompressed_size)
1493
1494 -class ComparableMixin(object):
1495 __slots__ = [] 1496
1497 - def _compare(self, other, method):
1498 try: 1499 return method(self._cmpkey(), other._cmpkey()) 1500 except (AttributeError, TypeError): 1501 # _cmpkey not implemented or return different type 1502 # so can not compare with other 1503 return NotImplemented
1504
1505 - def __lt__(self, other):
1506 return self._compare(other, lambda s, o: s < o)
1507
1508 - def __le__(self, other):
1509 return self._compare(other, lambda s, o: s <= o)
1510
1511 - def __eq__(self, other):
1512 return self._compare(other, lambda s, o: s == o)
1513
1514 - def __ge__(self, other):
1515 return self._compare(other, lambda s, o: s >= o)
1516
1517 - def __gt__(self, other):
1518 return self._compare(other, lambda s, o: s > o)
1519
1520 - def __ne__(self, other):
1521 return self._compare(other, lambda s, o: s != o)
1522
1523 -class _IndexEntry(ComparableMixin):
1524 __slots__ = ['time'] 1525
1526 - def __init__(self, time):
1527 self.time = time
1528
1529 - def _cmpkey(self):
1530 return self.time
1531
1532 -class _IndexEntry102(_IndexEntry):
1533 __slots__ = ['offset'] 1534
1535 - def __init__(self, time, offset):
1536 self.time = time 1537 self.offset = offset
1538 1539 @property
1540 - def position(self):
1541 return self.offset
1542
1543 - def __str__(self):
1544 return '%d.%d: %d' % (self.time.secs, self.time.nsecs, self.offset)
1545
1546 -class _IndexEntry200(_IndexEntry):
1547 __slots__ = ['chunk_pos', 'offset'] 1548
1549 - def __init__(self, time, chunk_pos, offset):
1550 self.time = time 1551 self.chunk_pos = chunk_pos 1552 self.offset = offset
1553 1554 @property
1555 - def position(self):
1556 return (self.chunk_pos, self.offset)
1557
1558 - def __str__(self):
1559 return '%d.%d: %d+%d' % (self.time.secs, self.time.nsecs, self.chunk_pos, self.offset)
1560
1561 -def _get_message_type(info):
1562 message_type = _message_types.get(info.md5sum) 1563 if message_type is None: 1564 try: 1565 message_type = genpy.dynamic.generate_dynamic(info.datatype, info.msg_def)[info.datatype] 1566 if (message_type._md5sum != info.md5sum): 1567 print('WARNING: For type [%s] stored md5sum [%s] does not match message definition [%s].\n Try: "rosrun rosbag fix_msg_defs.py old_bag new_bag."'%(info.datatype, info.md5sum, message_type._md5sum), file=sys.stderr) 1568 except genmsg.InvalidMsgSpec: 1569 message_type = genpy.dynamic.generate_dynamic(info.datatype, "")[info.datatype] 1570 print('WARNING: For type [%s] stored md5sum [%s] has invalid message definition."'%(info.datatype, info.md5sum), file=sys.stderr) 1571 except genmsg.MsgGenerationException as ex: 1572 raise ROSBagException('Error generating datatype %s: %s' % (info.datatype, str(ex))) 1573 1574 _message_types[info.md5sum] = message_type 1575 1576 return message_type
1577
1578 -def _read_uint8 (f): return _unpack_uint8 (f.read(1))
1579 -def _read_uint32(f): return _unpack_uint32(f.read(4))
1580 -def _read_uint64(f): return _unpack_uint64(f.read(8))
1581 -def _read_time (f): return _unpack_time (f.read(8))
1582
1583 -def _unpack_uint8(v): return struct.unpack('<B', v)[0]
1584 -def _unpack_uint32(v): return struct.unpack('<L', v)[0]
1585 -def _unpack_uint64(v): return struct.unpack('<Q', v)[0]
1586 -def _unpack_time(v): return rospy.Time(*struct.unpack('<LL', v))
1587
1588 -def _pack_uint8(v): return struct.pack('<B', v)
1589 -def _pack_uint32(v): return struct.pack('<L', v)
1590 -def _pack_uint64(v): return struct.pack('<Q', v)
1591 -def _pack_time(v): return _pack_uint32(v.secs) + _pack_uint32(v.nsecs)
1592
1593 -def _read(f, size):
1594 data = f.read(size) 1595 if len(data) != size: 1596 raise ROSBagException('expecting %d bytes, read %d' % (size, len(data))) 1597 return data
1598
1599 -def _skip_record(f):
1600 _skip_sized(f) # skip header 1601 _skip_sized(f) # skip data
1602
1603 -def _skip_sized(f):
1604 size = _read_uint32(f) 1605 f.seek(size, os.SEEK_CUR)
1606
1607 -def _read_sized(f):
1608 try: 1609 size = _read_uint32(f) 1610 except struct.error as ex: 1611 raise ROSBagFormatException('error unpacking uint32: %s' % str(ex)) 1612 return _read(f, size)
1613
1614 -def _write_sized(f, v):
1615 if not isinstance(v, bytes): 1616 v = v.encode() 1617 f.write(_pack_uint32(len(v))) 1618 f.write(v)
1619
1620 -def _read_field(header, field, unpack_fn):
1621 if field not in header: 1622 raise ROSBagFormatException('expected "%s" field in record' % field) 1623 1624 try: 1625 value = unpack_fn(header[field]) 1626 except Exception as ex: 1627 raise ROSBagFormatException('error reading field "%s": %s' % (field, str(ex))) 1628 1629 return value
1630
1631 -def _read_str_field (header, field):
1632 return _read_field(header, field, lambda v: v)
1633 -def _read_uint8_field (header, field): return _read_field(header, field, _unpack_uint8)
1634 -def _read_uint32_field(header, field): return _read_field(header, field, _unpack_uint32)
1635 -def _read_uint64_field(header, field): return _read_field(header, field, _unpack_uint64)
1636 -def _read_time_field (header, field): return _read_field(header, field, _unpack_time)
1637
1638 -def _write_record(f, header, data='', padded_size=None):
1639 header_str = _write_header(f, header) 1640 1641 if padded_size is not None: 1642 header_len = len(header_str) 1643 if header_len < padded_size: 1644 data = ' ' * (padded_size - header_len) 1645 else: 1646 data = '' 1647 1648 _write_sized(f, data)
1649
1650 -def _write_header(f, header):
1651 header_str = b'' 1652 equal = b'=' 1653 for k, v in header.items(): 1654 if not isinstance(k, bytes): 1655 k = k.encode() 1656 if not isinstance(v, bytes): 1657 v = v.encode() 1658 header_str += _pack_uint32(len(k) + 1 + len(v)) + k + equal + v 1659 _write_sized(f, header_str) 1660 return header_str
1661
1662 -def _read_header(f, req_op=None):
1663 bag_pos = f.tell() 1664 1665 # Read header 1666 try: 1667 header = _read_sized(f) 1668 except ROSBagException as ex: 1669 raise ROSBagFormatException('Error reading header: %s' % str(ex)) 1670 1671 # Parse header into a dict 1672 header_dict = {} 1673 while header != b'': 1674 # Read size 1675 if len(header) < 4: 1676 raise ROSBagFormatException('Error reading header field') 1677 (size,) = struct.unpack('<L', header[:4]) # @todo reindex: catch struct.error 1678 header = header[4:] 1679 1680 # Read bytes 1681 if len(header) < size: 1682 raise ROSBagFormatException('Error reading header field: expected %d bytes, read %d' % (size, len(header))) 1683 (name, sep, value) = header[:size].partition(b'=') 1684 if sep == b'': 1685 raise ROSBagFormatException('Error reading header field') 1686 1687 name = name.decode() 1688 header_dict[name] = value # @todo reindex: raise exception on empty name 1689 1690 header = header[size:] 1691 1692 # Check the op code of the header, if supplied 1693 if req_op is not None: 1694 op = _read_uint8_field(header_dict, 'op') 1695 if req_op != op: 1696 raise ROSBagFormatException('Expected op code: %s, got %s' % (_OP_CODES[req_op], _OP_CODES[op])) 1697 1698 return header_dict
1699
1700 -def _peek_next_header_op(f):
1701 pos = f.tell() 1702 header = _read_header(f) 1703 op = _read_uint8_field(header, 'op') 1704 f.seek(pos) 1705 return op
1706
1707 -def _read_record_data(f):
1708 try: 1709 record_data = _read_sized(f) 1710 except ROSBagException as ex: 1711 raise ROSBagFormatException('Error reading record data: %s' % str(ex)) 1712 1713 return record_data
1714
1715 -class _BagReader(object):
1716 - def __init__(self, bag):
1717 self.bag = bag
1718
1719 - def start_reading(self):
1720 raise NotImplementedError()
1721
1722 - def read_messages(self, topics, start_time, end_time, connection_filter, raw):
1723 raise NotImplementedError()
1724
1725 - def reindex(self):
1726 raise NotImplementedError()
1727
1728 -class _BagReader102_Unindexed(_BagReader):
1729 """ 1730 Support class for reading unindexed v1.2 bag files. 1731 """
1732 - def __init__(self, bag):
1733 _BagReader.__init__(self, bag)
1734
1735 - def start_reading(self):
1736 self.bag._file_header_pos = self.bag._file.tell()
1737
1738 - def reindex(self):
1739 """Generates all bag index information by rereading the message records.""" 1740 f = self.bag._file 1741 1742 total_bytes = self.bag.size 1743 1744 # Re-read the file header to get to the start of the first message 1745 self.bag._file.seek(self.bag._file_header_pos) 1746 1747 offset = f.tell() 1748 1749 # Read message definition and data records 1750 while offset < total_bytes: 1751 yield offset 1752 1753 op = _peek_next_header_op(f) 1754 1755 if op == _OP_MSG_DEF: 1756 connection_info = self.read_message_definition_record() 1757 1758 if connection_info.topic not in self.bag._topic_connections: 1759 self.bag._topic_connections[connection_info.topic] = connection_info.id 1760 self.bag._connections[connection_info.id] = connection_info 1761 self.bag._connection_indexes[connection_info.id] = [] 1762 1763 elif op == _OP_MSG_DATA: 1764 # Read the topic and timestamp from the header 1765 header = _read_header(f) 1766 1767 topic = _read_str_field(header, 'topic') 1768 secs = _read_uint32_field(header, 'sec') 1769 nsecs = _read_uint32_field(header, 'nsec') 1770 t = genpy.Time(secs, nsecs) 1771 1772 if topic not in self.bag._topic_connections: 1773 datatype = _read_str_field(header, 'type') 1774 self._create_connection_info_for_datatype(topic, datatype) 1775 1776 connection_id = self.bag._topic_connections[topic] 1777 info = self.bag._connections[connection_id] 1778 1779 # Skip over the message content 1780 _skip_sized(f) 1781 1782 # Insert the message entry (in order) into the connection index 1783 bisect.insort_right(self.bag._connection_indexes[connection_id], _IndexEntry102(t, offset)) 1784 1785 offset = f.tell()
1786
1787 - def read_messages(self, topics, start_time, end_time, topic_filter, raw):
1788 f = self.bag._file 1789 1790 f.seek(self.bag._file_header_pos) 1791 1792 while True: 1793 # Read MSG_DEF records 1794 while True: 1795 position = f.tell() 1796 1797 try: 1798 header = _read_header(f) 1799 except Exception: 1800 return 1801 1802 op = _read_uint8_field(header, 'op') 1803 if op != _OP_MSG_DEF: 1804 break 1805 1806 connection_info = self.read_message_definition_record(header) 1807 1808 if connection_info.topic not in self.bag._topic_connections: 1809 self.bag._topic_connections[connection_info.topic] = connection_info.id 1810 1811 self.bag._connections[connection_info.id] = connection_info 1812 1813 # Check that we have a MSG_DATA record 1814 if op != _OP_MSG_DATA: 1815 raise ROSBagFormatException('Expecting OP_MSG_DATA, got %d' % op) 1816 1817 topic = _read_str_field(header, 'topic') 1818 1819 if topic not in self.bag._topic_connections: 1820 datatype = _read_str_field(header, 'type') 1821 self._create_connection_info_for_datatype(topic, datatype) 1822 1823 connection_id = self.bag._topic_connections[topic] 1824 info = self.bag._connections[connection_id] 1825 1826 # Get the message type 1827 try: 1828 msg_type = _get_message_type(info) 1829 except KeyError: 1830 raise ROSBagException('Cannot deserialize messages of type [%s]. Message was not preceeded in bagfile by definition' % info.datatype) 1831 1832 # Get the timestamp 1833 secs = _read_uint32_field(header, 'sec') 1834 nsecs = _read_uint32_field(header, 'nsec') 1835 t = genpy.Time(secs, nsecs) 1836 1837 # Read the message content 1838 data = _read_record_data(f) 1839 1840 if raw: 1841 msg = (info.datatype, data, info.md5sum, position, msg_type) 1842 else: 1843 # Deserialize the message 1844 msg = msg_type() 1845 msg.deserialize(data) 1846 1847 yield BagMessage(topic, msg, t) 1848 1849 self.bag._connection_indexes_read = True
1850
1851 - def _create_connection_info_for_datatype(self, topic, datatype):
1852 for c in self.bag._connections.values(): 1853 if c.datatype == datatype: 1854 connection_id = len(self.bag._connections) 1855 connection_header = { 'topic' : topic, 'type' : c.header['type'], 'md5sum' : c.header['md5sum'], 'message_definition' : c.header['message_definition'] } 1856 connection_info = _ConnectionInfo(connection_id, topic, connection_header) 1857 1858 self.bag._topic_connections[topic] = connection_id 1859 self.bag._connections[connection_id] = connection_info 1860 self.bag._connection_indexes[connection_id] = [] 1861 return 1862 1863 raise ROSBagFormatException('Topic %s of datatype %s not preceded by message definition' % (topic, datatype))
1864
1865 - def read_message_definition_record(self, header=None):
1866 if not header: 1867 header = _read_header(self.bag._file, _OP_MSG_DEF) 1868 1869 topic = _read_str_field(header, 'topic') 1870 datatype = _read_str_field(header, 'type') 1871 md5sum = _read_str_field(header, 'md5') 1872 msg_def = _read_str_field(header, 'def') 1873 1874 _skip_sized(self.bag._file) # skip the record data 1875 1876 connection_header = { 'topic' : topic, 'type' : datatype, 'md5sum' : md5sum, 'message_definition' : msg_def } 1877 1878 id = len(self.bag._connections) 1879 1880 return _ConnectionInfo(id, topic, connection_header)
1881
1882 -class _BagReader102_Indexed(_BagReader102_Unindexed):
1883 """ 1884 Support class for reading indexed v1.2 bag files. 1885 """
1886 - def __init__(self, bag):
1887 _BagReader.__init__(self, bag)
1888
1889 - def read_messages(self, topics, start_time, end_time, connection_filter, raw):
1890 connections = self.bag._get_connections(topics, connection_filter) 1891 for entry in self.bag._get_entries(connections, start_time, end_time): 1892 yield self.seek_and_read_message_data_record(entry.offset, raw)
1893
1894 - def reindex(self):
1895 """Generates all bag index information by rereading the message records.""" 1896 f = self.bag._file 1897 1898 total_bytes = self.bag.size 1899 1900 # Re-read the file header to get to the start of the first message 1901 self.bag._file.seek(self.bag._file_header_pos) 1902 self.read_file_header_record() 1903 1904 offset = f.tell() 1905 1906 # Read message definition and data records 1907 while offset < total_bytes: 1908 yield offset 1909 1910 op = _peek_next_header_op(f) 1911 1912 if op == _OP_MSG_DEF: 1913 connection_info = self.read_message_definition_record() 1914 1915 if connection_info.topic not in self.bag._topic_connections: 1916 self.bag._topic_connections[connection_info.topic] = connection_info.id 1917 self.bag._connections[connection_info.id] = connection_info 1918 self.bag._connection_indexes[connection_info.id] = [] 1919 1920 elif op == _OP_MSG_DATA: 1921 # Read the topic and timestamp from the header 1922 header = _read_header(f) 1923 1924 topic = _read_str_field(header, 'topic') 1925 secs = _read_uint32_field(header, 'sec') 1926 nsecs = _read_uint32_field(header, 'nsec') 1927 t = genpy.Time(secs, nsecs) 1928 1929 if topic not in self.bag._topic_connections: 1930 datatype = _read_str_field(header, 'type') 1931 self._create_connection_info_for_datatype(topic, datatype) 1932 1933 connection_id = self.bag._topic_connections[topic] 1934 info = self.bag._connections[connection_id] 1935 1936 # Skip over the message content 1937 _skip_sized(f) 1938 1939 # Insert the message entry (in order) into the connection index 1940 bisect.insort_right(self.bag._connection_indexes[connection_id], _IndexEntry102(t, offset)) 1941 1942 elif op == _OP_INDEX_DATA: 1943 _skip_record(f) 1944 1945 offset = f.tell()
1946
1947 - def start_reading(self):
1948 try: 1949 # Read the file header 1950 self.read_file_header_record() 1951 1952 total_bytes = self.bag.size 1953 1954 # Check if the index position has been written, i.e. the bag was closed successfully 1955 if self.bag._index_data_pos == 0: 1956 raise ROSBagUnindexedException() 1957 1958 # Seek to the beginning of the topic index records 1959 self.bag._file.seek(self.bag._index_data_pos) 1960 1961 # Read the topic indexes 1962 topic_indexes = {} 1963 while True: 1964 pos = self.bag._file.tell() 1965 if pos >= total_bytes: 1966 break 1967 1968 topic, index = self.read_topic_index_record() 1969 1970 topic_indexes[topic] = index 1971 1972 # Read the message definition records (one for each topic) 1973 for topic, index in topic_indexes.items(): 1974 self.bag._file.seek(index[0].offset) 1975 1976 connection_info = self.read_message_definition_record() 1977 1978 if connection_info.topic not in self.bag._topic_connections: 1979 self.bag._topic_connections[connection_info.topic] = connection_info.id 1980 self.bag._connections[connection_info.id] = connection_info 1981 1982 self.bag._connection_indexes[connection_info.id] = index 1983 1984 self.bag._connection_indexes_read = True 1985 1986 except Exception as ex: 1987 raise ROSBagUnindexedException()
1988
1989 - def read_file_header_record(self):
1990 self.bag._file_header_pos = self.bag._file.tell() 1991 1992 header = _read_header(self.bag._file, _OP_FILE_HEADER) 1993 1994 self.bag._index_data_pos = _read_uint64_field(header, 'index_pos') 1995 1996 _skip_sized(self.bag._file) # skip the record data, i.e. padding
1997
1998 - def read_topic_index_record(self):
1999 f = self.bag._file 2000 2001 header = _read_header(f, _OP_INDEX_DATA) 2002 2003 index_version = _read_uint32_field(header, 'ver') 2004 topic = _read_str_field (header, 'topic') 2005 count = _read_uint32_field(header, 'count') 2006 2007 if index_version != 0: 2008 raise ROSBagFormatException('expecting index version 0, got %d' % index_version) 2009 2010 _read_uint32(f) # skip the record data size 2011 2012 topic_index = [] 2013 2014 for i in range(count): 2015 time = _read_time (f) 2016 offset = _read_uint64(f) 2017 2018 topic_index.append(_IndexEntry102(time, offset)) 2019 2020 return (topic, topic_index)
2021
2022 - def seek_and_read_message_data_record(self, position, raw):
2023 f = self.bag._file 2024 2025 # Seek to the message position 2026 f.seek(position) 2027 2028 # Skip any MSG_DEF records 2029 while True: 2030 header = _read_header(f) 2031 op = _read_uint8_field(header, 'op') 2032 if op != _OP_MSG_DEF: 2033 break 2034 _skip_sized(f) 2035 2036 # Check that we have a MSG_DATA record 2037 if op != _OP_MSG_DATA: 2038 raise ROSBagFormatException('Expecting OP_MSG_DATA, got %d' % op) 2039 2040 topic = _read_str_field(header, 'topic') 2041 2042 connection_id = self.bag._topic_connections[topic] 2043 info = self.bag._connections[connection_id] 2044 2045 # Get the message type 2046 try: 2047 msg_type = _get_message_type(info) 2048 except KeyError: 2049 raise ROSBagException('Cannot deserialize messages of type [%s]. Message was not preceeded in bagfile by definition' % info.datatype) 2050 2051 # Get the timestamp 2052 secs = _read_uint32_field(header, 'sec') 2053 nsecs = _read_uint32_field(header, 'nsec') 2054 t = genpy.Time(secs, nsecs) 2055 2056 # Read the message content 2057 data = _read_record_data(f) 2058 2059 if raw: 2060 msg = info.datatype, data, info.md5sum, position, msg_type 2061 else: 2062 # Deserialize the message 2063 msg = msg_type() 2064 msg.deserialize(data) 2065 2066 return BagMessage(topic, msg, t)
2067
2068 -class _BagReader200(_BagReader):
2069 """ 2070 Support class for reading v2.0 bag files. 2071 """
2072 - def __init__(self, bag):
2073 _BagReader.__init__(self, bag) 2074 2075 self.decompressed_chunk_pos = None 2076 self.decompressed_chunk = None 2077 self.decompressed_chunk_io = None
2078
2079 - def reindex(self):
2080 """ 2081 Generates all bag index information by rereading the chunks. 2082 Assumes the file header has been read. 2083 """ 2084 f = self.bag._file 2085 2086 f.seek(0, os.SEEK_END) 2087 total_bytes = f.tell() 2088 2089 # Read any connection records from after the chunk section. 2090 # This is to workaround a bug in rosbag record --split (fixed in r10390) 2091 # where connection records weren't always being written inside the chunk. 2092 self._read_terminal_connection_records() 2093 2094 # Re-read the file header to get to the start of the first chunk 2095 self.bag._file.seek(self.bag._file_header_pos) 2096 self.read_file_header_record() 2097 2098 trunc_pos = None 2099 2100 while True: 2101 chunk_pos = f.tell() 2102 if chunk_pos >= total_bytes: 2103 break 2104 2105 yield chunk_pos 2106 2107 try: 2108 self._reindex_read_chunk(f, chunk_pos, total_bytes) 2109 except Exception as ex: 2110 break 2111 2112 trunc_pos = f.tell() 2113 2114 if trunc_pos and trunc_pos < total_bytes: 2115 f.truncate(trunc_pos)
2116
2117 - def _reindex_read_chunk(self, f, chunk_pos, total_bytes):
2118 # Read the chunk header 2119 chunk_header = self.read_chunk_header() 2120 2121 # If the chunk header size is 0, then the chunk wasn't correctly terminated - we're done 2122 if chunk_header.compressed_size == 0: 2123 raise ROSBagException('unterminated chunk at %d' % chunk_pos) 2124 2125 if chunk_header.compression == Compression.NONE: 2126 chunk_file = f 2127 else: 2128 # Read the compressed chunk 2129 compressed_chunk = _read(f, chunk_header.compressed_size) 2130 2131 # Decompress it 2132 if chunk_header.compression == Compression.BZ2: 2133 self.decompressed_chunk = bz2.decompress(compressed_chunk) 2134 elif chunk_header.compression == Compression.LZ4 and found_lz4: 2135 self.decompressed_chunk = roslz4.decompress(compressed_chunk) 2136 else: 2137 raise ROSBagException('unsupported compression type: %s' % chunk_header.compression) 2138 2139 if self.decompressed_chunk_io: 2140 self.decompressed_chunk_io.close() 2141 self.decompressed_chunk_io = StringIO(self.decompressed_chunk) 2142 2143 chunk_file = self.decompressed_chunk_io 2144 2145 # Read chunk connection and message records 2146 self.bag._curr_chunk_info = None 2147 2148 if chunk_header.compression == Compression.NONE: 2149 offset = chunk_file.tell() - chunk_pos 2150 else: 2151 offset = chunk_file.tell() 2152 2153 expected_index_length = 0 2154 2155 while offset < chunk_header.uncompressed_size: 2156 op = _peek_next_header_op(chunk_file) 2157 2158 if op == _OP_CONNECTION: 2159 connection_info = self.read_connection_record(chunk_file) 2160 2161 if connection_info.id not in self.bag._connections: 2162 self.bag._connections[connection_info.id] = connection_info 2163 if connection_info.id not in self.bag._connection_indexes: 2164 self.bag._connection_indexes[connection_info.id] = [] 2165 2166 elif op == _OP_MSG_DATA: 2167 # Read the connection id and timestamp from the header 2168 header = _read_header(chunk_file) 2169 2170 connection_id = _read_uint32_field(header, 'conn') 2171 t = _read_time_field (header, 'time') 2172 2173 # Update the chunk info with this timestamp 2174 if not self.bag._curr_chunk_info: 2175 self.bag._curr_chunk_info = _ChunkInfo(chunk_pos, t, t) 2176 else: 2177 if t > self.bag._curr_chunk_info.end_time: 2178 self.bag._curr_chunk_info.end_time = t 2179 elif t < self.bag._curr_chunk_info.start_time: 2180 self.bag._curr_chunk_info.start_time = t 2181 if connection_id in self.bag._curr_chunk_info.connection_counts: 2182 self.bag._curr_chunk_info.connection_counts[connection_id] += 1 2183 else: 2184 self.bag._curr_chunk_info.connection_counts[connection_id] = 1 2185 2186 # Skip over the message content 2187 _skip_sized(chunk_file) 2188 2189 # Insert the message entry (in order) into the connection index 2190 if connection_id not in self.bag._connection_indexes: 2191 raise ROSBagException('connection id (id=%d) in chunk at position %d not preceded by connection record' % (connection_id, chunk_pos)) 2192 bisect.insort_right(self.bag._connection_indexes[connection_id], _IndexEntry200(t, chunk_pos, offset)) 2193 2194 expected_index_length += 1 2195 2196 else: 2197 # Unknown record type so skip 2198 _skip_record(chunk_file) 2199 2200 if chunk_header.compression == Compression.NONE: 2201 offset = chunk_file.tell() - chunk_pos 2202 else: 2203 offset = chunk_file.tell() 2204 2205 # Skip over index records, connection records and chunk info records 2206 next_op = _peek_next_header_op(f) 2207 2208 total_index_length = 0 2209 2210 while next_op != _OP_CHUNK: 2211 if next_op == _OP_INDEX_DATA: 2212 # Bug workaround: C Turtle bags (pre-1.1.15) were written with an incorrect data length 2213 _, index = self.read_connection_index_record() 2214 total_index_length += len(index) 2215 else: 2216 _skip_record(f) 2217 2218 if f.tell() >= total_bytes: 2219 if total_index_length != expected_index_length: 2220 raise ROSBagException('index shorter than expected (%d vs %d)' % (total_index_length, expected_index_length)) 2221 break 2222 2223 next_op = _peek_next_header_op(f) 2224 2225 # Chunk was read correctly - store info 2226 self.bag._chunk_headers[chunk_pos] = chunk_header 2227 self.bag._chunks.append(self.bag._curr_chunk_info)
2228
2230 b, f, r = self.bag, self.bag._file, self.bag._reader 2231 2232 # Seek to the first record after FILE_HEADER 2233 f.seek(b._file_header_pos) 2234 r.read_file_header_record() 2235 2236 # Advance to the first CONNECTION 2237 if self._advance_to_next_record(_OP_CONNECTION): 2238 # Read the CONNECTION records 2239 while True: 2240 connection_info = r.read_connection_record(f) 2241 2242 b._connections[connection_info.id] = connection_info 2243 b._connection_indexes[connection_info.id] = [] 2244 2245 next_op = _peek_next_header_op(f) 2246 if next_op != _OP_CONNECTION: 2247 break
2248
2249 - def _advance_to_next_record(self, op):
2250 b, f = self.bag, self.bag._file 2251 2252 try: 2253 while True: 2254 next_op = _peek_next_header_op(f) 2255 if next_op == op: 2256 break 2257 2258 if next_op == _OP_INDEX_DATA: 2259 # Workaround the possible invalid data length in INDEX_DATA records 2260 2261 # read_connection_index_record() requires _curr_chunk_info is set 2262 if b._curr_chunk_info is None: 2263 b._curr_chunk_info = _ChunkInfo(0, rospy.Time(0, 1), rospy.Time(0, 1)) 2264 2265 b._reader.read_connection_index_record() 2266 else: 2267 _skip_record(f) 2268 2269 return True 2270 2271 except Exception as ex: 2272 return False
2273
2274 - def start_reading(self):
2275 try: 2276 # Read the file header 2277 self.read_file_header_record() 2278 2279 # Check if the index position has been written, i.e. the bag was closed successfully 2280 if self.bag._index_data_pos == 0: 2281 raise ROSBagUnindexedException() 2282 2283 # Seek to the end of the chunks 2284 self.bag._file.seek(self.bag._index_data_pos) 2285 2286 # Read the connection records 2287 self.bag._connection_indexes = {} 2288 for i in range(self.bag._connection_count): 2289 connection_info = self.read_connection_record(self.bag._file) 2290 self.bag._connections[connection_info.id] = connection_info 2291 self.bag._connection_indexes[connection_info.id] = [] 2292 2293 # Read the chunk info records 2294 self.bag._chunks = [self.read_chunk_info_record() for i in range(self.bag._chunk_count)] 2295 2296 # Read the chunk headers 2297 self.bag._chunk_headers = {} 2298 for chunk_info in self.bag._chunks: 2299 self.bag._file.seek(chunk_info.pos) 2300 self.bag._chunk_headers[chunk_info.pos] = self.read_chunk_header() 2301 2302 if not self.bag._skip_index: 2303 self._read_connection_index_records() 2304 2305 except Exception as ex: 2306 raise ROSBagUnindexedException()
2307
2309 for chunk_info in self.bag._chunks: 2310 self.bag._file.seek(chunk_info.pos) 2311 _skip_record(self.bag._file) 2312 2313 self.bag._curr_chunk_info = chunk_info 2314 for i in range(len(chunk_info.connection_counts)): 2315 connection_id, index = self.read_connection_index_record() 2316 self.bag._connection_indexes[connection_id].extend(index) 2317 2318 # Remove any connections with no entries 2319 # This is a workaround for a bug where connection records were being written for 2320 # connections which had no messages in the bag 2321 orphan_connection_ids = [id for id, index in self.bag._connection_indexes.items() if not index] 2322 for id in orphan_connection_ids: 2323 del self.bag._connections[id] 2324 del self.bag._connection_indexes[id] 2325 2326 self.bag._connection_indexes_read = True
2327
2328 - def read_messages(self, topics, start_time, end_time, connection_filter, raw):
2329 connections = self.bag._get_connections(topics, connection_filter) 2330 for entry in self.bag._get_entries(connections, start_time, end_time): 2331 yield self.seek_and_read_message_data_record((entry.chunk_pos, entry.offset), raw)
2332 2333 ### 2334
2335 - def read_file_header_record(self):
2336 self.bag._file_header_pos = self.bag._file.tell() 2337 2338 header = _read_header(self.bag._file, _OP_FILE_HEADER) 2339 2340 self.bag._index_data_pos = _read_uint64_field(header, 'index_pos') 2341 self.bag._chunk_count = _read_uint32_field(header, 'chunk_count') 2342 self.bag._connection_count = _read_uint32_field(header, 'conn_count') 2343 2344 _skip_sized(self.bag._file) # skip over the record data, i.e. padding
2345
2346 - def read_connection_record(self, f):
2347 header = _read_header(f, _OP_CONNECTION) 2348 2349 conn_id = _read_uint32_field(header, 'conn') 2350 topic = _read_str_field (header, 'topic') 2351 2352 connection_header = _read_header(f) 2353 2354 return _ConnectionInfo(conn_id, topic, connection_header)
2355
2356 - def read_chunk_info_record(self):
2357 f = self.bag._file 2358 2359 header = _read_header(f, _OP_CHUNK_INFO) 2360 2361 chunk_info_version = _read_uint32_field(header, 'ver') 2362 2363 if chunk_info_version == 1: 2364 chunk_pos = _read_uint64_field(header, 'chunk_pos') 2365 start_time = _read_time_field (header, 'start_time') 2366 end_time = _read_time_field (header, 'end_time') 2367 connection_count = _read_uint32_field(header, 'count') 2368 2369 chunk_info = _ChunkInfo(chunk_pos, start_time, end_time) 2370 2371 _read_uint32(f) # skip the record data size 2372 2373 for i in range(connection_count): 2374 connection_id = _read_uint32(f) 2375 count = _read_uint32(f) 2376 2377 chunk_info.connection_counts[connection_id] = count 2378 2379 return chunk_info 2380 else: 2381 raise ROSBagFormatException('Unknown chunk info record version: %d' % chunk_info_version)
2382
2383 - def read_chunk_header(self):
2384 header = _read_header(self.bag._file, _OP_CHUNK) 2385 2386 compression = _read_str_field (header, 'compression') 2387 uncompressed_size = _read_uint32_field(header, 'size') 2388 2389 compressed_size = _read_uint32(self.bag._file) # read the record data size 2390 2391 data_pos = self.bag._file.tell() 2392 2393 return _ChunkHeader(compression, compressed_size, uncompressed_size, data_pos)
2394
2396 f = self.bag._file 2397 2398 header = _read_header(f, _OP_INDEX_DATA) 2399 2400 index_version = _read_uint32_field(header, 'ver') 2401 connection_id = _read_uint32_field(header, 'conn') 2402 count = _read_uint32_field(header, 'count') 2403 2404 if index_version != 1: 2405 raise ROSBagFormatException('expecting index version 1, got %d' % index_version) 2406 2407 record_size = _read_uint32(f) # skip the record data size 2408 2409 index = [] 2410 2411 for i in range(count): 2412 time = _read_time (f) 2413 offset = _read_uint32(f) 2414 2415 index.append(_IndexEntry200(time, self.bag._curr_chunk_info.pos, offset)) 2416 2417 return (connection_id, index)
2418
2419 - def seek_and_read_message_data_record(self, position, raw):
2420 chunk_pos, offset = position 2421 2422 chunk_header = self.bag._chunk_headers.get(chunk_pos) 2423 if chunk_header is None: 2424 raise ROSBagException('no chunk at position %d' % chunk_pos) 2425 2426 if chunk_header.compression == Compression.NONE: 2427 f = self.bag._file 2428 f.seek(chunk_header.data_pos + offset) 2429 else: 2430 if self.decompressed_chunk_pos != chunk_pos: 2431 # Seek to the chunk data, read and decompress 2432 self.bag._file.seek(chunk_header.data_pos) 2433 compressed_chunk = _read(self.bag._file, chunk_header.compressed_size) 2434 2435 if chunk_header.compression == Compression.BZ2: 2436 self.decompressed_chunk = bz2.decompress(compressed_chunk) 2437 elif chunk_header.compression == Compression.LZ4 and found_lz4: 2438 self.decompressed_chunk = roslz4.decompress(compressed_chunk) 2439 else: 2440 raise ROSBagException('unsupported compression type: %s' % chunk_header.compression) 2441 2442 self.decompressed_chunk_pos = chunk_pos 2443 2444 if self.decompressed_chunk_io: 2445 self.decompressed_chunk_io.close() 2446 self.decompressed_chunk_io = StringIO(self.decompressed_chunk) 2447 2448 f = self.decompressed_chunk_io 2449 f.seek(offset) 2450 2451 # Skip any CONNECTION records 2452 while True: 2453 header = _read_header(f) 2454 op = _read_uint8_field(header, 'op') 2455 if op != _OP_CONNECTION: 2456 break 2457 _skip_sized(f) 2458 2459 # Check that we have a MSG_DATA record 2460 if op != _OP_MSG_DATA: 2461 raise ROSBagFormatException('Expecting OP_MSG_DATA, got %d' % op) 2462 2463 connection_id = _read_uint32_field(header, 'conn') 2464 t = _read_time_field (header, 'time') 2465 2466 # Get the message type 2467 connection_info = self.bag._connections[connection_id] 2468 try: 2469 msg_type = _get_message_type(connection_info) 2470 except KeyError: 2471 raise ROSBagException('Cannot deserialize messages of type [%s]. Message was not preceded in bag by definition' % connection_info.datatype) 2472 2473 # Read the message content 2474 data = _read_record_data(f) 2475 2476 # Deserialize the message 2477 if raw: 2478 msg = connection_info.datatype, data, connection_info.md5sum, (chunk_pos, offset), msg_type 2479 else: 2480 msg = msg_type() 2481 msg.deserialize(data) 2482 2483 return BagMessage(connection_info.topic, msg, t)
2484
2485 -def _time_to_str(secs):
2486 secs_frac = secs - int(secs) 2487 secs_frac_str = ('%.2f' % secs_frac)[1:] 2488 2489 return time.strftime('%b %d %Y %H:%M:%S', time.localtime(secs)) + secs_frac_str
2490
2491 -def _human_readable_size(size):
2492 multiple = 1024.0 2493 for suffix in ['KB', 'MB', 'GB', 'TB', 'PB', 'EB', 'ZB', 'YB']: 2494 size /= multiple 2495 if size < multiple: 2496 return '%.1f %s' % (size, suffix) 2497 2498 return '-'
2499
2500 -def _human_readable_frequency(freq):
2501 multiple = 1000.0 2502 for suffix in ['Hz', 'kHz', 'MHz', 'GHz', 'THz', 'PHz', 'EHz', 'ZHz', 'YHz']: 2503 if freq < multiple: 2504 return '%.1f %s' % (freq, suffix) 2505 freq /= multiple 2506 2507 return '-'
2508
2509 ## See http://code.activestate.com/recipes/511509 2510 -def _mergesort(list_of_lists, key=None):
2511 """ 2512 Perform an N-way merge operation on sorted lists. 2513 2514 @param list_of_lists: (really iterable of iterable) of sorted elements 2515 (either by naturally or by C{key}) 2516 @param key: specify sort key function (like C{sort()}, C{sorted()}) 2517 @param iterfun: function that returns an iterator. 2518 2519 Yields tuples of the form C{(item, iterator)}, where the iterator is the 2520 built-in list iterator or something you pass in, if you pre-generate the 2521 iterators. 2522 2523 This is a stable merge; complexity O(N lg N) 2524 2525 Examples:: 2526 2527 print list(x[0] for x in mergesort([[1,2,3,4], 2528 [2,3.5,3.7,4.5,6,7], 2529 [2.6,3.6,6.6,9]])) 2530 [1, 2, 2, 2.6, 3, 3.5, 3.6, 3.7, 4, 4.5, 6, 6.6, 7, 9] 2531 2532 # note stability 2533 print list(x[0] for x in mergesort([[1,2,3,4], 2534 [2,3.5,3.7,4.5,6,7], 2535 [2.6,3.6,6.6,9]], key=int)) 2536 [1, 2, 2, 2.6, 3, 3.5, 3.6, 3.7, 4, 4.5, 6, 6.6, 7, 9] 2537 2538 print list(x[0] for x in mergesort([[4,3,2,1], 2539 [7,6.5,4,3.7,3.3,1.9], 2540 [9,8.6,7.6,6.6,5.5,4.4,3.3]], 2541 key=lambda x: -x)) 2542 [9, 8.6, 7.6, 7, 6.6, 6.5, 5.5, 4.4, 4, 4, 3.7, 3.3, 3.3, 3, 2, 1.9, 1] 2543 """ 2544 2545 heap = [] 2546 for i, itr in enumerate(iter(pl) for pl in list_of_lists): 2547 try: 2548 item = next(itr) 2549 toadd = (key(item), i, item, itr) if key else (item, i, itr) 2550 heap.append(toadd) 2551 except StopIteration: 2552 pass 2553 heapq.heapify(heap) 2554 2555 if key: 2556 while heap: 2557 _, idx, item, itr = heap[0] 2558 yield item, itr 2559 try: 2560 item = next(itr) 2561 heapq.heapreplace(heap, (key(item), idx, item, itr) ) 2562 except StopIteration: 2563 heapq.heappop(heap) 2564 2565 else: 2566 while heap: 2567 item, idx, itr = heap[0] 2568 yield item, itr 2569 try: 2570 heapq.heapreplace(heap, (next(itr), idx, itr)) 2571 except StopIteration: 2572 heapq.heappop(heap)
2573
2574 -class _CompressorFileFacade(object):
2575 """ 2576 A file facade for sequential compressors (e.g., bz2.BZ2Compressor). 2577 """
2578 - def __init__(self, file, compressor):
2579 self.file = file 2580 self.compressor = compressor 2581 self.compressed_bytes_in = 0
2582
2583 - def write(self, data):
2584 compressed = self.compressor.compress(data) 2585 if len(compressed) > 0: 2586 self.file.write(compressed) 2587 self.compressed_bytes_in += len(data)
2588
2589 - def flush(self):
2590 compressed = self.compressor.flush() 2591 if len(compressed) > 0: 2592 self.file.write(compressed)
2593
2594 -def _median(values):
2595 values_len = len(values) 2596 if values_len == 0: 2597 return float('nan') 2598 2599 sorted_values = sorted(values) 2600 if values_len % 2 == 1: 2601 return sorted_values[int(values_len / 2)] 2602 2603 lower = sorted_values[int(values_len / 2) - 1] 2604 upper = sorted_values[int(values_len / 2)] 2605 2606 return float(lower + upper) / 2
2607