Package rosbag :: Module bag
[frames] | no frames]

Source Code for Module rosbag.bag

   1  # Software License Agreement (BSD License) 
   2  # 
   3  # Copyright (c) 2010, Willow Garage, Inc. 
   4  # All rights reserved. 
   5  # 
   6  # Redistribution and use in source and binary forms, with or without 
   7  # modification, are permitted provided that the following conditions 
   8  # are met: 
   9  # 
  10  #  * Redistributions of source code must retain the above copyright 
  11  #    notice, this list of conditions and the following disclaimer. 
  12  #  * Redistributions in binary form must reproduce the above 
  13  #    copyright notice, this list of conditions and the following 
  14  #    disclaimer in the documentation and/or other materials provided 
  15  #    with the distribution. 
  16  #  * Neither the name of Willow Garage, Inc. nor the names of its 
  17  #    contributors may be used to endorse or promote products derived 
  18  #    from this software without specific prior written permission. 
  19  # 
  20  # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 
  21  # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 
  22  # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS 
  23  # FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE 
  24  # COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, 
  25  # INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, 
  26  # BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; 
  27  # LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER 
  28  # CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 
  29  # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN 
  30  # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 
  31  # POSSIBILITY OF SUCH DAMAGE. 
  32   
  33  """ 
  34  The rosbag Python API. 
  35   
  36  Provides serialization of bag files. 
  37  """ 
  38   
  39  from __future__ import print_function 
  40   
  41  import bisect 
  42  import bz2 
  43  import collections 
  44  import heapq 
  45  import os 
  46  import re 
  47  import struct 
  48  import sys 
  49  import threading 
  50  import time 
  51  import yaml 
  52   
  53  try: 
  54      from cStringIO import StringIO  # Python 2.x 
  55  except ImportError: 
  56      from io import BytesIO as StringIO  # Python 3.x 
  57   
  58  import genmsg 
  59  import genpy 
  60  import genpy.dynamic 
  61  import genpy.message 
  62   
  63  import roslib.names # still needed for roslib.names.canonicalize_name() 
  64  import rospy 
  65  try: 
  66      import roslz4 
  67      found_lz4 = True 
  68  except ImportError: 
  69      rospy.logwarn( 
  70          'Failed to load Python extension for LZ4 support. ' 
  71          'LZ4 compression will not be available.') 
  72      found_lz4 = False 
73 74 -class ROSBagException(Exception):
75 """ 76 Base class for exceptions in rosbag. 77 """
78 - def __init__(self, value):
79 self.value = value
80
81 - def __str__(self):
82 return self.value
83
84 -class ROSBagFormatException(ROSBagException):
85 """ 86 Exceptions for errors relating to the bag file format. 87 """
88 - def __init__(self, value):
89 ROSBagException.__init__(self, value)
90
91 -class ROSBagUnindexedException(ROSBagException):
92 """ 93 Exception for unindexed bags. 94 """
95 - def __init__(self):
96 ROSBagException.__init__(self, 'Unindexed bag')
97
98 -class Compression:
99 """ 100 Allowable compression types 101 """ 102 NONE = 'none' 103 BZ2 = 'bz2' 104 LZ4 = 'lz4'
105 106 BagMessage = collections.namedtuple('BagMessage', 'topic message timestamp')
107 108 -class Bag(object):
109 """ 110 Bag serialize messages to and from a single file on disk using the bag format. 111 """
112 - def __init__(self, f, mode='r', compression=Compression.NONE, chunk_threshold=768 * 1024, allow_unindexed=False, options=None, skip_index=False):
113 """ 114 Open a bag file. The mode can be 'r', 'w', or 'a' for reading (default), 115 writing or appending. The file will be created if it doesn't exist 116 when opened for writing or appending; it will be truncated when opened 117 for writing. Simultaneous reading and writing is allowed when in writing 118 or appending mode. 119 @param f: filename of bag to open or a stream to read from 120 @type f: str or file 121 @param mode: mode, either 'r', 'w', or 'a' 122 @type mode: str 123 @param compression: compression mode, see U{rosbag.Compression} for valid modes 124 @type compression: str 125 @param chunk_threshold: minimum number of uncompressed bytes per chunk 126 @type chunk_threshold: int 127 @param allow_unindexed: if True, allow opening unindexed bags 128 @type allow_unindexed: bool 129 @param options: the bag options (currently: compression and chunk_threshold) 130 @type options: dict 131 @param skip_index: if True, don't read the connection index records on open [2.0+] 132 @type skip_index: bool 133 @raise ValueError: if any argument is invalid 134 @raise ROSBagException: if an error occurs opening file 135 @raise ROSBagFormatException: if bag format is corrupted 136 """ 137 if options is not None: 138 if type(options) is not dict: 139 raise ValueError('options must be of type dict') 140 if 'compression' in options: 141 compression = options['compression'] 142 if 'chunk_threshold' in options: 143 chunk_threshold = options['chunk_threshold'] 144 145 self._file = None 146 self._filename = None 147 self._version = None 148 149 allowed_compressions = [Compression.NONE, Compression.BZ2] 150 if found_lz4: 151 allowed_compressions.append(Compression.LZ4) 152 if compression not in allowed_compressions: 153 raise ValueError('compression must be one of: %s' % ', '.join(allowed_compressions)) 154 self._compression = compression 155 156 if chunk_threshold < 0: 157 raise ValueError('chunk_threshold must be greater than or equal to zero') 158 self._chunk_threshold = chunk_threshold 159 160 self._skip_index = skip_index 161 162 self._reader = None 163 164 self._file_header_pos = None 165 self._index_data_pos = 0 # (1.2+) 166 167 self._clear_index() 168 169 self._buffer = StringIO() 170 171 self._curr_compression = Compression.NONE 172 173 self._open(f, mode, allow_unindexed) 174 175 self._output_file = self._file
176
177 - def __iter__(self):
178 return self.read_messages()
179
180 - def __enter__(self):
181 return self
182
183 - def __exit__(self, exc_type, exc_value, traceback):
184 self.close()
185 186 @property
187 - def options(self):
188 """Get the options.""" 189 return { 'compression' : self._compression, 'chunk_threshold' : self._chunk_threshold }
190 191 @property
192 - def filename(self):
193 """Get the filename.""" 194 return self._filename
195 196 @property
197 - def version(self):
198 """Get the version.""" 199 return self._version
200 201 @property
202 - def mode(self):
203 """Get the mode.""" 204 return self._mode
205 206 @property
207 - def size(self):
208 """Get the size in bytes.""" 209 if not self._file: 210 raise ValueError('I/O operation on closed bag') 211 212 pos = self._file.tell() 213 self._file.seek(0, os.SEEK_END) 214 size = self._file.tell() 215 self._file.seek(pos) 216 return size
217 218 # compression 219
220 - def _get_compression(self):
221 """Get the compression method to use for writing.""" 222 return self._compression
223
224 - def _set_compression(self, compression):
225 """Set the compression method to use for writing.""" 226 allowed_compressions = [Compression.NONE, Compression.BZ2] 227 if found_lz4: 228 allowed_compressions.append(Compression.LZ4) 229 if compression not in allowed_compressions: 230 raise ValueError('compression must be one of: %s' % ', '.join(allowed_compressions)) 231 232 self.flush() 233 self._compression = compression
234 235 compression = property(_get_compression, _set_compression) 236 237 # chunk_threshold 238
239 - def _get_chunk_threshold(self):
240 """Get the chunk threshold to use for writing.""" 241 return self._chunk_threshold
242
243 - def _set_chunk_threshold(self, chunk_threshold):
244 """Set the chunk threshold to use for writing.""" 245 if chunk_threshold < 0: 246 raise ValueError('chunk_threshold must be greater than or equal to zero') 247 248 self.flush() 249 self._chunk_threshold = chunk_threshold
250 251 chunk_threshold = property(_get_chunk_threshold, _set_chunk_threshold) 252
253 - def read_messages(self, topics=None, start_time=None, end_time=None, connection_filter=None, raw=False):
254 """ 255 Read messages from the bag, optionally filtered by topic, timestamp and connection details. 256 @param topics: list of topics or a single topic. if an empty list is given all topics will be read [optional] 257 @type topics: list(str) or str 258 @param start_time: earliest timestamp of message to return [optional] 259 @type start_time: U{genpy.Time} 260 @param end_time: latest timestamp of message to return [optional] 261 @type end_time: U{genpy.Time} 262 @param connection_filter: function to filter connections to include [optional] 263 @type connection_filter: function taking (topic, datatype, md5sum, msg_def, header) and returning bool 264 @param raw: if True, then generate tuples of (datatype, (data, md5sum, position), pytype) 265 @type raw: bool 266 @return: generator of BagMessage(topic, message, timestamp) namedtuples for each message in the bag file 267 @rtype: generator of tuples of (str, U{genpy.Message}, U{genpy.Time}) [not raw] or (str, (str, str, str, tuple, class), U{genpy.Time}) [raw] 268 """ 269 self.flush() 270 271 if topics and type(topics) is str: 272 topics = [topics] 273 274 return self._reader.read_messages(topics, start_time, end_time, connection_filter, raw)
275
276 - def flush(self):
277 """ 278 Write the open chunk to disk so subsequent reads will read all messages. 279 @raise ValueError: if bag is closed 280 """ 281 if not self._file: 282 raise ValueError('I/O operation on closed bag') 283 284 if self._chunk_open: 285 self._stop_writing_chunk()
286
287 - def write(self, topic, msg, t=None, raw=False):
288 """ 289 Write a message to the bag. 290 @param topic: name of topic 291 @type topic: str 292 @param msg: message to add to bag, or tuple (if raw) 293 @type msg: Message or tuple of raw message data 294 @param t: ROS time of message publication, if None specifed, use current time [optional] 295 @type t: U{genpy.Time} 296 @param raw: if True, msg is in raw format, i.e. (msg_type, serialized_bytes, md5sum, pytype) 297 @type raw: bool 298 @raise ValueError: if arguments are invalid or bag is closed 299 """ 300 if not self._file: 301 raise ValueError('I/O operation on closed bag') 302 303 if not topic: 304 raise ValueError('topic is invalid') 305 if not msg: 306 raise ValueError('msg is invalid') 307 308 if t is None: 309 t = genpy.Time.from_sec(time.time()) 310 311 # Seek to end (in case previous operation was a read) 312 self._file.seek(0, os.SEEK_END) 313 314 # Open a chunk, if needed 315 if not self._chunk_open: 316 self._start_writing_chunk(t) 317 318 # Unpack raw 319 if raw: 320 if len(msg) == 5: 321 msg_type, serialized_bytes, md5sum, pos, pytype = msg 322 elif len(msg) == 4: 323 msg_type, serialized_bytes, md5sum, pytype = msg 324 else: 325 raise ValueError('msg must be of length 4 or 5') 326 327 # Write connection record, if necessary (currently using a connection per topic; ignoring message connection header) 328 if topic in self._topic_connections: 329 connection_info = self._topic_connections[topic] 330 conn_id = connection_info.id 331 else: 332 conn_id = len(self._connections) 333 334 if raw: 335 if pytype is None: 336 try: 337 pytype = genpy.message.get_message_class(msg_type) 338 except Exception: 339 pytype = None 340 if pytype is None: 341 raise ROSBagException('cannot locate message class and no message class provided for [%s]' % msg_type) 342 343 if pytype._md5sum != md5sum: 344 print('WARNING: md5sum of loaded type [%s] does not match that specified' % msg_type, file=sys.stderr) 345 #raise ROSBagException('md5sum of loaded type does not match that of data being recorded') 346 347 header = { 'topic' : topic, 'type' : msg_type, 'md5sum' : md5sum, 'message_definition' : pytype._full_text } 348 else: 349 header = { 'topic' : topic, 'type' : msg.__class__._type, 'md5sum' : msg.__class__._md5sum, 'message_definition' : msg._full_text } 350 351 connection_info = _ConnectionInfo(conn_id, topic, header) 352 353 self._write_connection_record(connection_info) 354 355 self._connections[conn_id] = connection_info 356 self._topic_connections[topic] = connection_info 357 358 # Create an index entry 359 index_entry = _IndexEntry200(t, self._curr_chunk_info.pos, self._get_chunk_offset()) 360 361 # Update the indexes and current chunk info 362 if conn_id not in self._curr_chunk_connection_indexes: 363 # This is the first message on this connection in the chunk 364 self._curr_chunk_connection_indexes[conn_id] = [index_entry] 365 self._curr_chunk_info.connection_counts[conn_id] = 1 366 else: 367 curr_chunk_connection_index = self._curr_chunk_connection_indexes[conn_id] 368 if index_entry >= curr_chunk_connection_index[-1]: 369 # Test if we're writing chronologically. Can skip binary search if so. 370 curr_chunk_connection_index.append(index_entry) 371 else: 372 bisect.insort_right(curr_chunk_connection_index, index_entry) 373 374 self._curr_chunk_info.connection_counts[conn_id] += 1 375 376 if conn_id not in self._connection_indexes: 377 self._connection_indexes[conn_id] = [index_entry] 378 else: 379 bisect.insort_right(self._connection_indexes[conn_id], index_entry) 380 381 # Update the chunk start/end times 382 if t > self._curr_chunk_info.end_time: 383 self._curr_chunk_info.end_time = t 384 elif t < self._curr_chunk_info.start_time: 385 self._curr_chunk_info.start_time = t 386 387 if not raw: 388 # Serialize the message to the buffer 389 self._buffer.seek(0) 390 self._buffer.truncate(0) 391 msg.serialize(self._buffer) 392 serialized_bytes = self._buffer.getvalue() 393 394 # Write message data record 395 self._write_message_data_record(conn_id, t, serialized_bytes) 396 397 # Check if we want to stop this chunk 398 chunk_size = self._get_chunk_offset() 399 if chunk_size > self._chunk_threshold: 400 self._stop_writing_chunk()
401
402 - def reindex(self):
403 """ 404 Reindexes the bag file. Yields position of each chunk for progress. 405 """ 406 self._clear_index() 407 return self._reader.reindex()
408
409 - def close(self):
410 """ 411 Close the bag file. Closing an already closed bag does nothing. 412 """ 413 if self._file: 414 if self._mode in 'wa': 415 self._stop_writing() 416 417 self._close_file()
418
419 - def get_compression_info(self):
420 """ 421 Returns information about the compression of the bag 422 @return: generator of CompressionTuple(compression, uncompressed, compressed) describing the 423 type of compression used, size of the uncompressed data in Bytes, size of the compressed data in Bytes. If 424 no compression is used then uncompressed and compressed data will be equal. 425 @rtype: generator of CompressionTuple of (str, int, int) 426 """ 427 428 compression = self.compression 429 uncompressed = 0 430 compressed = 0 431 432 if self._chunk_headers: 433 compression_counts = {} 434 compression_uncompressed = {} 435 compression_compressed = {} 436 437 # the rest of this is determine which compression algorithm is dominant and 438 # to add up the uncompressed and compressed Bytes 439 for chunk_header in self._chunk_headers.values(): 440 if chunk_header.compression not in compression_counts: 441 compression_counts[chunk_header.compression] = 0 442 if chunk_header.compression not in compression_uncompressed: 443 compression_uncompressed[chunk_header.compression] = 0 444 if chunk_header.compression not in compression_compressed: 445 compression_compressed[chunk_header.compression] = 0 446 447 compression_counts[chunk_header.compression] += 1 448 compression_uncompressed[chunk_header.compression] += chunk_header.uncompressed_size 449 uncompressed += chunk_header.uncompressed_size 450 compression_compressed[chunk_header.compression] += chunk_header.compressed_size 451 compressed += chunk_header.compressed_size 452 453 chunk_count = len(self._chunk_headers) 454 455 main_compression_count, main_compression = sorted([(v, k) for k, v in compression_counts.items()], reverse=True)[0] 456 compression = str(main_compression) 457 458 return collections.namedtuple("CompressionTuple", ["compression", 459 "uncompressed", "compressed"])(compression=compression, 460 uncompressed=uncompressed, compressed=compressed)
461
462 - def get_message_count(self, topic_filters=None):
463 """ 464 Returns the number of messages in the bag. Can be filtered by Topic 465 @param topic_filters: One or more topics to filter by 466 @type topic_filters: Could be either a single str or a list of str. 467 @return: The number of messages in the bag, optionally filtered by topic 468 @rtype: int 469 """ 470 471 num_messages = 0 472 473 if topic_filters is not None: 474 info = self.get_type_and_topic_info(topic_filters=topic_filters) 475 for topic in info.topics.values(): 476 num_messages += topic.message_count 477 else: 478 if self._chunks: 479 for c in self._chunks: 480 for counts in c.connection_counts.values(): 481 num_messages += counts 482 else: 483 num_messages = sum([len(index) for index in self._connection_indexes.values()]) 484 485 return num_messages
486
487 - def get_start_time(self):
488 """ 489 Returns the start time of the bag. 490 @return: a timestamp of the start of the bag 491 @rtype: float, timestamp in seconds, includes fractions of a second 492 """ 493 494 if self._chunks: 495 start_stamp = self._chunks[0].start_time.to_sec() 496 else: 497 if not self._connection_indexes: 498 raise ROSBagException('Bag contains no message') 499 start_stamps = [index[0].time.to_sec() for index in self._connection_indexes.values() if index] 500 start_stamp = min(start_stamps) if start_stamps else 0 501 502 return start_stamp
503
504 - def get_end_time(self):
505 """ 506 Returns the end time of the bag. 507 @return: a timestamp of the end of the bag 508 @rtype: float, timestamp in seconds, includes fractions of a second 509 """ 510 511 if self._chunks: 512 end_stamp = self._chunks[-1].end_time.to_sec() 513 else: 514 if not self._connection_indexes: 515 raise ROSBagException('Bag contains no message') 516 end_stamps = [index[-1].time.to_sec() for index in self._connection_indexes.values() if index] 517 end_stamp = max(end_stamps) if end_stamps else 0 518 519 return end_stamp
520
521 - def get_type_and_topic_info(self, topic_filters=None):
522 """ 523 Coallates info about the type and topics in the bag. 524 525 Note, it would be nice to filter by type as well, but there appear to be some limitations in the current architecture 526 that prevent that from working when more than one message type is written on the same topic. 527 528 @param topic_filters: specify one or more topic to filter by. 529 @type topic_filters: either a single str or a list of str. 530 @return: generator of TypesAndTopicsTuple(types{key:type name, val: md5hash}, 531 topics{type: msg type (Ex. "std_msgs/String"), 532 message_count: the number of messages of the particular type, 533 connections: the number of connections, 534 frequency: the data frequency, 535 key: type name, 536 val: md5hash}) describing the types of messages present 537 and information about the topics 538 @rtype: TypesAndTopicsTuple(dict(str, str), 539 TopicTuple(str, int, int, float, str, str)) 540 """ 541 542 datatypes = set() 543 datatype_infos = [] 544 545 for connection in self._connections.values(): 546 if connection.datatype in datatypes: 547 continue 548 549 datatype_infos.append((connection.datatype, connection.md5sum, connection.msg_def)) 550 datatypes.add(connection.datatype) 551 552 topics = [] 553 # load our list of topics and optionally filter 554 if topic_filters is not None: 555 if not isinstance(topic_filters, list): 556 topic_filters = [topic_filters] 557 558 topics = topic_filters 559 else: 560 topics = [c.topic for c in self._get_connections()] 561 562 topics = sorted(set(topics)) 563 564 topic_datatypes = {} 565 topic_conn_counts = {} 566 topic_msg_counts = {} 567 topic_freqs_median = {} 568 569 for topic in topics: 570 connections = list(self._get_connections(topic)) 571 572 if not connections: 573 continue 574 575 topic_datatypes[topic] = connections[0].datatype 576 topic_conn_counts[topic] = len(connections) 577 578 msg_count = 0 579 for connection in connections: 580 for chunk in self._chunks: 581 msg_count += chunk.connection_counts.get(connection.id, 0) 582 583 topic_msg_counts[topic] = msg_count 584 585 if self._connection_indexes_read: 586 stamps = [entry.time.to_sec() for entry in self._get_entries(connections)] 587 if len(stamps) > 1: 588 periods = [s1 - s0 for s1, s0 in zip(stamps[1:], stamps[:-1])] 589 med_period = _median(periods) 590 if med_period > 0.0: 591 topic_freqs_median[topic] = 1.0 / med_period 592 593 # process datatypes 594 types = {} 595 for datatype, md5sum, msg_def in sorted(datatype_infos): 596 types[datatype] = md5sum 597 598 # process topics 599 topics_t = {} 600 TopicTuple = collections.namedtuple("TopicTuple", ["msg_type", "message_count", "connections", "frequency"]) 601 for topic in sorted(topic_datatypes.keys()): 602 topic_msg_count = topic_msg_counts[topic] 603 frequency = topic_freqs_median[topic] if topic in topic_freqs_median else None 604 topics_t[topic] = TopicTuple(msg_type=topic_datatypes[topic], 605 message_count=topic_msg_count, 606 connections=topic_conn_counts[topic], 607 frequency=frequency) 608 609 return collections.namedtuple("TypesAndTopicsTuple", ["msg_types", "topics"])(msg_types=types, topics=topics_t)
610
611 - def __str__(self):
612 rows = [] 613 614 try: 615 if self._filename: 616 rows.append(('path', self._filename)) 617 618 if self._version == 102 and type(self._reader) == _BagReader102_Unindexed: 619 rows.append(('version', '1.2 (unindexed)')) 620 else: 621 rows.append(('version', '%d.%d' % (int(self._version / 100), self._version % 100))) 622 623 if not self._connection_indexes and not self._chunks: 624 rows.append(('size', _human_readable_size(self.size))) 625 else: 626 if self._chunks: 627 start_stamp = self._chunks[ 0].start_time.to_sec() 628 end_stamp = self._chunks[-1].end_time.to_sec() 629 else: 630 start_stamps = [index[0].time.to_sec() for index in self._connection_indexes.values() if index] 631 start_stamp = min(start_stamps) if start_stamps else 0 632 end_stamps = [index[-1].time.to_sec() for index in self._connection_indexes.values() if index] 633 end_stamp = max(end_stamps) if end_stamps else 0 634 635 # Show duration 636 duration = end_stamp - start_stamp 637 dur_secs = duration % 60 638 dur_mins = int(duration / 60) 639 dur_hrs = int(dur_mins / 60) 640 if dur_hrs > 0: 641 dur_mins = dur_mins % 60 642 duration_str = '%dhr %d:%02ds (%ds)' % (dur_hrs, dur_mins, dur_secs, duration) 643 elif dur_mins > 0: 644 duration_str = '%d:%02ds (%ds)' % (dur_mins, dur_secs, duration) 645 else: 646 duration_str = '%.1fs' % duration 647 648 rows.append(('duration', duration_str)) 649 650 # Show start and end times 651 rows.append(('start', '%s (%.2f)' % (_time_to_str(start_stamp), start_stamp))) 652 rows.append(('end', '%s (%.2f)' % (_time_to_str(end_stamp), end_stamp))) 653 654 rows.append(('size', _human_readable_size(self.size))) 655 656 if self._chunks: 657 num_messages = 0 658 for c in self._chunks: 659 for counts in c.connection_counts.values(): 660 num_messages += counts 661 else: 662 num_messages = sum([len(index) for index in self._connection_indexes.values()]) 663 rows.append(('messages', str(num_messages))) 664 665 # Show compression information 666 if len(self._chunk_headers) == 0: 667 rows.append(('compression', 'none')) 668 else: 669 compression_counts = {} 670 compression_uncompressed = {} 671 compression_compressed = {} 672 for chunk_header in self._chunk_headers.values(): 673 if chunk_header.compression not in compression_counts: 674 compression_counts[chunk_header.compression] = 1 675 compression_uncompressed[chunk_header.compression] = chunk_header.uncompressed_size 676 compression_compressed[chunk_header.compression] = chunk_header.compressed_size 677 else: 678 compression_counts[chunk_header.compression] += 1 679 compression_uncompressed[chunk_header.compression] += chunk_header.uncompressed_size 680 compression_compressed[chunk_header.compression] += chunk_header.compressed_size 681 682 chunk_count = len(self._chunk_headers) 683 684 compressions = [] 685 for count, compression in reversed(sorted([(v, k) for k, v in compression_counts.items()])): 686 if compression != Compression.NONE: 687 fraction = (100.0 * compression_compressed[compression]) / compression_uncompressed[compression] 688 compressions.append('%s [%d/%d chunks; %.2f%%]' % (compression, count, chunk_count, fraction)) 689 else: 690 compressions.append('%s [%d/%d chunks]' % (compression, count, chunk_count)) 691 rows.append(('compression', ', '.join(compressions))) 692 693 all_uncompressed = (sum([count for c, count in compression_counts.items() if c != Compression.NONE]) == 0) 694 if not all_uncompressed: 695 total_uncompressed_size = sum((h.uncompressed_size for h in self._chunk_headers.values())) 696 total_compressed_size = sum((h.compressed_size for h in self._chunk_headers.values())) 697 698 total_uncompressed_size_str = _human_readable_size(total_uncompressed_size) 699 total_compressed_size_str = _human_readable_size(total_compressed_size) 700 total_size_str_length = max([len(total_uncompressed_size_str), len(total_compressed_size_str)]) 701 702 if duration > 0: 703 uncompressed_rate_str = _human_readable_size(total_uncompressed_size / duration) 704 compressed_rate_str = _human_readable_size(total_compressed_size / duration) 705 rate_str_length = max([len(uncompressed_rate_str), len(compressed_rate_str)]) 706 707 rows.append(('uncompressed', '%*s @ %*s/s' % 708 (total_size_str_length, total_uncompressed_size_str, rate_str_length, uncompressed_rate_str))) 709 rows.append(('compressed', '%*s @ %*s/s (%.2f%%)' % 710 (total_size_str_length, total_compressed_size_str, rate_str_length, compressed_rate_str, (100.0 * total_compressed_size) / total_uncompressed_size))) 711 else: 712 rows.append(('uncompressed', '%*s' % (total_size_str_length, total_uncompressed_size_str))) 713 rows.append(('compressed', '%*s' % (total_size_str_length, total_compressed_size_str))) 714 715 datatypes = set() 716 datatype_infos = [] 717 for connection in self._connections.values(): 718 if connection.datatype in datatypes: 719 continue 720 datatype_infos.append((connection.datatype, connection.md5sum, connection.msg_def)) 721 datatypes.add(connection.datatype) 722 723 topics = sorted(set([c.topic for c in self._get_connections()])) 724 topic_datatypes = {} 725 topic_conn_counts = {} 726 topic_msg_counts = {} 727 topic_freqs_median = {} 728 for topic in topics: 729 connections = list(self._get_connections(topic)) 730 731 topic_datatypes[topic] = connections[0].datatype 732 topic_conn_counts[topic] = len(connections) 733 734 msg_count = 0 735 for connection in connections: 736 for chunk in self._chunks: 737 msg_count += chunk.connection_counts.get(connection.id, 0) 738 topic_msg_counts[topic] = msg_count 739 740 if self._connection_indexes_read: 741 stamps = [entry.time.to_sec() for entry in self._get_entries(connections)] 742 if len(stamps) > 1: 743 periods = [s1 - s0 for s1, s0 in zip(stamps[1:], stamps[:-1])] 744 med_period = _median(periods) 745 if med_period > 0.0: 746 topic_freqs_median[topic] = 1.0 / med_period 747 748 topics = sorted(topic_datatypes.keys()) 749 max_topic_len = max([len(topic) for topic in topics]) 750 max_datatype_len = max([len(datatype) for datatype in datatypes]) 751 max_msg_count_len = max([len('%d' % msg_count) for msg_count in topic_msg_counts.values()]) 752 max_freq_median_len = max([len(_human_readable_frequency(freq)) for freq in topic_freqs_median.values()]) if len(topic_freqs_median) > 0 else 0 753 754 # Show datatypes 755 for i, (datatype, md5sum, msg_def) in enumerate(sorted(datatype_infos)): 756 s = '%-*s [%s]' % (max_datatype_len, datatype, md5sum) 757 if i == 0: 758 rows.append(('types', s)) 759 else: 760 rows.append(('', s)) 761 762 # Show topics 763 for i, topic in enumerate(topics): 764 topic_msg_count = topic_msg_counts[topic] 765 766 s = '%-*s %*d %s' % (max_topic_len, topic, max_msg_count_len, topic_msg_count, 'msgs' if topic_msg_count > 1 else 'msg ') 767 if topic in topic_freqs_median: 768 s += ' @ %*s' % (max_freq_median_len, _human_readable_frequency(topic_freqs_median[topic])) 769 else: 770 s += ' %*s' % (max_freq_median_len, '') 771 772 s += ' : %-*s' % (max_datatype_len, topic_datatypes[topic]) 773 if topic_conn_counts[topic] > 1: 774 s += ' (%d connections)' % topic_conn_counts[topic] 775 776 if i == 0: 777 rows.append(('topics', s)) 778 else: 779 rows.append(('', s)) 780 781 except Exception as ex: 782 raise 783 784 first_column_width = max([len(field) for field, _ in rows]) + 1 785 786 s = '' 787 for (field, value) in rows: 788 if field: 789 s += '%-*s %s\n' % (first_column_width, field + ':', value) 790 else: 791 s += '%-*s %s\n' % (first_column_width, '', value) 792 793 return s.rstrip()
794
795 - def _get_yaml_info(self, key=None):
796 s = '' 797 798 try: 799 if self._filename: 800 s += 'path: %s\n' % self._filename 801 802 if self._version == 102 and type(self._reader) == _BagReader102_Unindexed: 803 s += 'version: 1.2 (unindexed)\n' 804 else: 805 s += 'version: %d.%d\n' % (int(self._version / 100), self._version % 100) 806 807 if not self._connection_indexes and not self._chunks: 808 s += 'size: %d\n' % self.size 809 s += 'indexed: False\n' 810 else: 811 if self._chunks: 812 start_stamp = self._chunks[ 0].start_time.to_sec() 813 end_stamp = self._chunks[-1].end_time.to_sec() 814 else: 815 start_stamps = [index[0].time.to_sec() for index in self._connection_indexes.values() if index] 816 start_stamp = min(start_stamps) if start_stamps else 0 817 end_stamps = [index[-1].time.to_sec() for index in self._connection_indexes.values() if index] 818 end_stamp = max(end_stamps) if end_stamps else 0 819 820 duration = end_stamp - start_stamp 821 s += 'duration: %.6f\n' % duration 822 s += 'start: %.6f\n' % start_stamp 823 s += 'end: %.6f\n' % end_stamp 824 s += 'size: %d\n' % self.size 825 if self._chunks: 826 num_messages = 0 827 for c in self._chunks: 828 for counts in c.connection_counts.values(): 829 num_messages += counts 830 else: 831 num_messages = sum([len(index) for index in self._connection_indexes.values()]) 832 s += 'messages: %d\n' % num_messages 833 s += 'indexed: True\n' 834 835 # Show compression information 836 if len(self._chunk_headers) == 0: 837 s += 'compression: none\n' 838 else: 839 compression_counts = {} 840 compression_uncompressed = {} 841 compression_compressed = {} 842 for chunk_header in self._chunk_headers.values(): 843 if chunk_header.compression not in compression_counts: 844 compression_counts[chunk_header.compression] = 1 845 compression_uncompressed[chunk_header.compression] = chunk_header.uncompressed_size 846 compression_compressed[chunk_header.compression] = chunk_header.compressed_size 847 else: 848 compression_counts[chunk_header.compression] += 1 849 compression_uncompressed[chunk_header.compression] += chunk_header.uncompressed_size 850 compression_compressed[chunk_header.compression] += chunk_header.compressed_size 851 852 chunk_count = len(self._chunk_headers) 853 854 main_compression_count, main_compression = list(reversed(sorted([(v, k) for k, v in compression_counts.items()])))[0] 855 s += 'compression: %s\n' % str(main_compression) 856 857 all_uncompressed = (sum([count for c, count in compression_counts.items() if c != Compression.NONE]) == 0) 858 if not all_uncompressed: 859 s += 'uncompressed: %d\n' % sum((h.uncompressed_size for h in self._chunk_headers.values())) 860 s += 'compressed: %d\n' % sum((h.compressed_size for h in self._chunk_headers.values())) 861 862 datatypes = set() 863 datatype_infos = [] 864 for connection in self._connections.values(): 865 if connection.datatype in datatypes: 866 continue 867 datatype_infos.append((connection.datatype, connection.md5sum, connection.msg_def)) 868 datatypes.add(connection.datatype) 869 870 topics = sorted(set([c.topic for c in self._get_connections()])) 871 topic_datatypes = {} 872 topic_conn_counts = {} 873 topic_msg_counts = {} 874 topic_freqs_median = {} 875 for topic in topics: 876 connections = list(self._get_connections(topic)) 877 878 topic_datatypes[topic] = connections[0].datatype 879 topic_conn_counts[topic] = len(connections) 880 881 msg_count = 0 882 for connection in connections: 883 for chunk in self._chunks: 884 msg_count += chunk.connection_counts.get(connection.id, 0) 885 topic_msg_counts[topic] = msg_count 886 887 if self._connection_indexes_read: 888 stamps = [entry.time.to_sec() for entry in self._get_entries(connections)] 889 if len(stamps) > 1: 890 periods = [s1 - s0 for s1, s0 in zip(stamps[1:], stamps[:-1])] 891 med_period = _median(periods) 892 if med_period > 0.0: 893 topic_freqs_median[topic] = 1.0 / med_period 894 895 topics = sorted(topic_datatypes.keys()) 896 max_topic_len = max([len(topic) for topic in topics]) 897 max_datatype_len = max([len(datatype) for datatype in datatypes]) 898 max_msg_count_len = max([len('%d' % msg_count) for msg_count in topic_msg_counts.values()]) 899 max_freq_median_len = max([len(_human_readable_frequency(freq)) for freq in topic_freqs_median.values()]) if len(topic_freqs_median) > 0 else 0 900 901 # Show datatypes 902 s += 'types:\n' 903 for i, (datatype, md5sum, msg_def) in enumerate(sorted(datatype_infos)): 904 s += ' - type: %s\n' % datatype 905 s += ' md5: %s\n' % md5sum 906 907 # Show topics 908 s += 'topics:\n' 909 for i, topic in enumerate(topics): 910 topic_msg_count = topic_msg_counts[topic] 911 912 s += ' - topic: %s\n' % topic 913 s += ' type: %s\n' % topic_datatypes[topic] 914 s += ' messages: %d\n' % topic_msg_count 915 916 if topic_conn_counts[topic] > 1: 917 s += ' connections: %d\n' % topic_conn_counts[topic] 918 919 if topic in topic_freqs_median: 920 s += ' frequency: %.4f\n' % topic_freqs_median[topic] 921 922 if not key: 923 return s 924 925 class DictObject(object): 926 def __init__(self, d): 927 for a, b in d.items(): 928 if isinstance(b, (list, tuple)): 929 setattr(self, a, [DictObject(x) if isinstance(x, dict) else x for x in b]) 930 else: 931 setattr(self, a, DictObject(b) if isinstance(b, dict) else b)
932 933 obj = DictObject(yaml.load(s)) 934 try: 935 val = eval('obj.' + key) 936 except Exception as ex: 937 print('Error getting key "%s"' % key, file=sys.stderr) 938 return None 939 940 def print_yaml(val, indent=0): 941 indent_str = ' ' * indent 942 943 if type(val) is list: 944 s = '' 945 for item in val: 946 s += '%s- %s\n' % (indent_str, print_yaml(item, indent + 1)) 947 return s 948 elif type(val) is DictObject: 949 s = '' 950 for i, (k, v) in enumerate(val.__dict__.items()): 951 if i != 0: 952 s += indent_str 953 s += '%s: %s' % (k, str(v)) 954 if i < len(val.__dict__) - 1: 955 s += '\n' 956 return s 957 else: 958 return indent_str + str(val) 959 960 return print_yaml(val) 961 962 except Exception as ex: 963 raise 964 965 ### Internal API ### 966 967 @property
968 - def _has_compressed_chunks(self):
969 if not self._chunk_headers: 970 return False 971 972 return any((h.compression != Compression.NONE for h in self._chunk_headers.values()))
973 974 @property
975 - def _uncompressed_size(self):
976 if not self._chunk_headers: 977 return self.size 978 979 return sum((h.uncompressed_size for h in self._chunk_headers.values()))
980
981 - def _read_message(self, position, raw=False):
982 """ 983 Read the message from the given position in the file. 984 """ 985 self.flush() 986 return self._reader.seek_and_read_message_data_record(position, raw)
987 988 # Index accessing 989
990 - def _get_connections(self, topics=None, connection_filter=None):
991 """ 992 Yield the connections, optionally filtering by topic and/or connection information. 993 """ 994 if topics: 995 if type(topics) is str: 996 topics = set([roslib.names.canonicalize_name(topics)]) 997 else: 998 topics = set([roslib.names.canonicalize_name(t) for t in topics]) 999 1000 for c in self._connections.values(): 1001 if topics and c.topic not in topics and roslib.names.canonicalize_name(c.topic) not in topics: 1002 continue 1003 if connection_filter and not connection_filter(c.topic, c.datatype, c.md5sum, c.msg_def, c.header): 1004 continue 1005 yield c
1006
1007 - def _get_entries(self, connections=None, start_time=None, end_time=None):
1008 """ 1009 Yield index entries on the given connections in the given time range. 1010 """ 1011 for entry, _ in _mergesort(self._get_indexes(connections), key=lambda entry: entry.time): 1012 if start_time and entry.time < start_time: 1013 continue 1014 if end_time and entry.time > end_time: 1015 return 1016 yield entry
1017
1018 - def _get_entries_reverse(self, connections=None, start_time=None, end_time=None):
1019 """ 1020 Yield index entries on the given connections in the given time range in reverse order. 1021 """ 1022 for entry, _ in _mergesort((reversed(index) for index in self._get_indexes(connections)), key=lambda entry: -entry.time.to_sec()): 1023 if end_time and entry.time > end_time: 1024 continue 1025 if start_time and entry.time < start_time: 1026 return 1027 yield entry
1028
1029 - def _get_entry(self, t, connections=None):
1030 """ 1031 Return the first index entry on/before the given time on the given connections 1032 """ 1033 indexes = self._get_indexes(connections) 1034 1035 entry = _IndexEntry(t) 1036 1037 first_entry = None 1038 1039 for index in indexes: 1040 i = bisect.bisect_right(index, entry) - 1 1041 if i >= 0: 1042 index_entry = index[i] 1043 if first_entry is None or index_entry > first_entry: 1044 first_entry = index_entry 1045 1046 return first_entry
1047
1048 - def _get_entry_after(self, t, connections=None):
1049 """ 1050 Return the first index entry after the given time on the given connections 1051 """ 1052 indexes = self._get_indexes(connections) 1053 1054 entry = _IndexEntry(t) 1055 1056 first_entry = None 1057 1058 for index in indexes: 1059 i = bisect.bisect_right(index, entry) 1060 if i <= len(index) - 1: 1061 index_entry = index[i] 1062 if first_entry is None or index_entry < first_entry: 1063 first_entry = index_entry 1064 1065 return first_entry
1066
1067 - def _get_indexes(self, connections):
1068 """ 1069 Get the indexes for the given connections. 1070 """ 1071 if not self._connection_indexes_read: 1072 self._reader._read_connection_index_records() 1073 1074 if connections is None: 1075 return self._connection_indexes.values() 1076 else: 1077 return (self._connection_indexes[c.id] for c in connections)
1078 1079 ### Implementation ### 1080
1081 - def _clear_index(self):
1082 self._connection_indexes_read = False 1083 self._connection_indexes = {} # id -> IndexEntry[] (1.2+) 1084 1085 self._topic_connections = {} # topic -> connection_id 1086 self._connections = {} # id -> ConnectionInfo 1087 self._connection_count = 0 # (2.0) 1088 self._chunk_count = 0 # (2.0) 1089 self._chunks = [] # ChunkInfo[] (2.0) 1090 self._chunk_headers = {} # chunk_pos -> ChunkHeader (2.0) 1091 1092 self._chunk_open = False 1093 self._curr_chunk_info = None 1094 self._curr_chunk_data_pos = None 1095 self._curr_chunk_connection_indexes = {}
1096
1097 - def _open(self, f, mode, allow_unindexed):
1098 if not f: 1099 raise ValueError('filename (or stream) is invalid') 1100 1101 try: 1102 if mode == 'r': self._open_read(f, allow_unindexed) 1103 elif mode == 'w': self._open_write(f) 1104 elif mode == 'a': self._open_append(f, allow_unindexed) 1105 else: 1106 raise ValueError('mode "%s" is invalid' % mode) 1107 if 'b' not in self._file.mode and not isinstance('', bytes): 1108 raise ROSBagException('In Python 3 the bag file must be opened in binary mode') 1109 except struct.error: 1110 raise ROSBagFormatException('error with bag')
1111
1112 - def _is_file(self, f):
1113 try: 1114 return isinstance(f, file) # Python 2 1115 except NameError: 1116 import io 1117 return isinstance(f, io.IOBase) # Python 3...this will return false in Python 2 always
1118
1119 - def _open_read(self, f, allow_unindexed):
1120 if self._is_file(f): 1121 self._file = f 1122 self._filename = None 1123 else: 1124 self._file = open(f, 'rb') 1125 self._filename = f 1126 1127 self._mode = 'r' 1128 1129 try: 1130 self._version = self._read_version() 1131 except: 1132 self._version = None 1133 self._close_file() 1134 raise 1135 1136 try: 1137 self._create_reader() 1138 self._reader.start_reading() 1139 except ROSBagUnindexedException as ex: 1140 if not allow_unindexed: 1141 self._close_file() 1142 raise 1143 except: 1144 self._close_file() 1145 raise
1146
1147 - def _open_write(self, f):
1148 if self._is_file(f): 1149 self._file = f 1150 self._filename = None 1151 else: 1152 self._file = open(f, 'w+b') 1153 self._filename = f 1154 1155 self._mode = 'w' 1156 1157 self._version = 200 1158 1159 try: 1160 self._create_reader() 1161 self._start_writing() 1162 except: 1163 self._close_file() 1164 raise
1165
1166 - def _open_append(self, f, allow_unindexed):
1167 if self._is_file(f): 1168 self._file = f 1169 self._filename = None 1170 else: 1171 try: 1172 # Test if the file already exists 1173 open(f, 'r').close() 1174 1175 # File exists: open in read with update mode 1176 self._file = open(f, 'r+b') 1177 except IOError: 1178 # File doesn't exist: open in write mode 1179 self._file = open(f, 'w+b') 1180 1181 self._filename = f 1182 1183 self._mode = 'a' 1184 1185 try: 1186 self._version = self._read_version() 1187 except: 1188 self._version = None 1189 self._close_file() 1190 raise 1191 1192 if self._version != 200: 1193 self._file.close() 1194 raise ROSBagException('bag version %d is unsupported for appending' % self._version) 1195 1196 try: 1197 self._start_appending() 1198 except ROSBagUnindexedException: 1199 if not allow_unindexed: 1200 self._close_file() 1201 raise 1202 except: 1203 self._close_file() 1204 raise
1205
1206 - def _close_file(self):
1207 self._file.close() 1208 self._file = None
1209
1210 - def _create_reader(self):
1211 """ 1212 @raise ROSBagException: if the bag version is unsupported 1213 """ 1214 major_version, minor_version = int(self._version / 100), self._version % 100 1215 if major_version == 2: 1216 self._reader = _BagReader200(self) 1217 elif major_version == 1: 1218 if minor_version == 1: 1219 raise ROSBagException('unsupported bag version %d. Please convert bag to newer format.' % self._version) 1220 else: 1221 # Get the op code of the first record. If it's FILE_HEADER, then we have an indexed 1.2 bag. 1222 first_record_pos = self._file.tell() 1223 header = _read_header(self._file) 1224 op = _read_uint8_field(header, 'op') 1225 self._file.seek(first_record_pos) 1226 1227 if op == _OP_FILE_HEADER: 1228 self._reader = _BagReader102_Indexed(self) 1229 else: 1230 self._reader = _BagReader102_Unindexed(self) 1231 else: 1232 raise ROSBagException('unknown bag version %d' % self._version)
1233
1234 - def _read_version(self):
1235 """ 1236 @raise ROSBagException: if the file is empty, or the version line can't be parsed 1237 """ 1238 version_line = self._file.readline().rstrip().decode() 1239 if len(version_line) == 0: 1240 raise ROSBagException('empty file') 1241 1242 matches = re.match("#ROS(.*) V(\d).(\d)", version_line) 1243 if matches is None or len(matches.groups()) != 3: 1244 raise ROSBagException('This does not appear to be a bag file') 1245 1246 version_type, major_version_str, minor_version_str = matches.groups() 1247 1248 version = int(major_version_str) * 100 + int(minor_version_str) 1249 1250 return version
1251
1252 - def _start_writing(self):
1253 version = _VERSION + '\n' 1254 version = version.encode() 1255 self._file.write(version) 1256 self._file_header_pos = self._file.tell() 1257 self._write_file_header_record(0, 0, 0) 1258
1259 - def _stop_writing(self):
1260 # Write the open chunk (if any) to file 1261 self.flush() 1262 1263 # Remember this location as the start of the index 1264 self._index_data_pos = self._file.tell() 1265 1266 # Write connection infos 1267 for connection_info in self._connections.values(): 1268 self._write_connection_record(connection_info) 1269 1270 # Write chunk infos 1271 for chunk_info in self._chunks: 1272 self._write_chunk_info_record(chunk_info) 1273 1274 # Re-write the file header 1275 self._file.seek(self._file_header_pos) 1276 self._write_file_header_record(self._index_data_pos, len(self._connections), len(self._chunks))
1277
1278 - def _start_appending(self):
1279 self._file_header_pos = self._file.tell() 1280 1281 self._create_reader() 1282 self._reader.start_reading() 1283 1284 # Truncate the file to chop off the index 1285 self._file.truncate(self._index_data_pos) 1286 self._reader.index_data_pos = 0 1287 1288 # Rewrite the file header, clearing the index position (so we know if the index is invalid) 1289 self._file.seek(self._file_header_pos); 1290 self._write_file_header_record(0, 0, 0) 1291 1292 # Seek to the end of the file 1293 self._file.seek(0, os.SEEK_END)
1294
1295 - def _start_writing_chunk(self, t):
1296 self._curr_chunk_info = _ChunkInfo(self._file.tell(), t, t) 1297 self._write_chunk_header(_ChunkHeader(self._compression, 0, 0)) 1298 self._curr_chunk_data_pos = self._file.tell() 1299 self._set_compression_mode(self._compression) 1300 self._chunk_open = True
1301
1302 - def _get_chunk_offset(self):
1303 if self._compression == Compression.NONE: 1304 return self._file.tell() - self._curr_chunk_data_pos 1305 else: 1306 return self._output_file.compressed_bytes_in
1307
1308 - def _stop_writing_chunk(self):
1309 # Add this chunk to the index 1310 self._chunks.append(self._curr_chunk_info) 1311 1312 # Get the uncompressed and compressed sizes 1313 uncompressed_size = self._get_chunk_offset() 1314 self._set_compression_mode(Compression.NONE) 1315 compressed_size = self._file.tell() - self._curr_chunk_data_pos 1316 1317 # Rewrite the chunk header with the size of the chunk (remembering current offset) 1318 end_of_chunk_pos = self._file.tell() 1319 self._file.seek(self._curr_chunk_info.pos) 1320 chunk_header = _ChunkHeader(self._compression, compressed_size, uncompressed_size, self._curr_chunk_data_pos) 1321 self._write_chunk_header(chunk_header) 1322 self._chunk_headers[self._curr_chunk_info.pos] = chunk_header 1323 1324 # Write out the connection indexes and clear them 1325 self._file.seek(end_of_chunk_pos) 1326 for connection_id, entries in self._curr_chunk_connection_indexes.items(): 1327 self._write_connection_index_record(connection_id, entries) 1328 self._curr_chunk_connection_indexes.clear() 1329 1330 # Flag that we're starting a new chunk 1331 self._chunk_open = False
1332
1333 - def _set_compression_mode(self, compression):
1334 # Flush the compressor, if needed 1335 if self._curr_compression != Compression.NONE: 1336 self._output_file.flush() 1337 1338 # Create the compressor 1339 if compression == Compression.BZ2: 1340 self._output_file = _CompressorFileFacade(self._file, bz2.BZ2Compressor()) 1341 elif compression == Compression.LZ4 and found_lz4: 1342 self._output_file = _CompressorFileFacade(self._file, roslz4.LZ4Compressor()) 1343 elif compression == Compression.NONE: 1344 self._output_file = self._file 1345 else: 1346 raise ROSBagException('unsupported compression type: %s' % compression) 1347 1348 self._curr_compression = compression
1349
1350 - def _write_file_header_record(self, index_pos, connection_count, chunk_count):
1351 header = { 1352 'op': _pack_uint8(_OP_FILE_HEADER), 1353 'index_pos': _pack_uint64(index_pos), 1354 'conn_count': _pack_uint32(connection_count), 1355 'chunk_count': _pack_uint32(chunk_count) 1356 } 1357 _write_record(self._file, header, padded_size=_FILE_HEADER_LENGTH)
1358
1359 - def _write_connection_record(self, connection_info):
1360 header = { 1361 'op': _pack_uint8(_OP_CONNECTION), 1362 'topic': connection_info.topic, 1363 'conn': _pack_uint32(connection_info.id) 1364 } 1365 1366 _write_header(self._output_file, header) 1367 1368 _write_header(self._output_file, connection_info.header)
1369
1370 - def _write_message_data_record(self, connection_id, t, serialized_bytes):
1371 header = { 1372 'op': _pack_uint8(_OP_MSG_DATA), 1373 'conn': _pack_uint32(connection_id), 1374 'time': _pack_time(t) 1375 } 1376 _write_record(self._output_file, header, serialized_bytes)
1377
1378 - def _write_chunk_header(self, chunk_header):
1379 header = { 1380 'op': _pack_uint8(_OP_CHUNK), 1381 'compression': chunk_header.compression, 1382 'size': _pack_uint32(chunk_header.uncompressed_size) 1383 } 1384 _write_header(self._file, header) 1385 1386 self._file.write(_pack_uint32(chunk_header.compressed_size))
1387
1388 - def _write_connection_index_record(self, connection_id, entries):
1389 header = { 1390 'op': _pack_uint8(_OP_INDEX_DATA), 1391 'conn': _pack_uint32(connection_id), 1392 'ver': _pack_uint32(_INDEX_VERSION), 1393 'count': _pack_uint32(len(entries)) 1394 } 1395 1396 buffer = self._buffer 1397 buffer.seek(0) 1398 buffer.truncate(0) 1399 for entry in entries: 1400 buffer.write(_pack_time (entry.time)) 1401 buffer.write(_pack_uint32(entry.offset)) 1402 1403 _write_record(self._file, header, buffer.getvalue()) 1404
1405 - def _write_chunk_info_record(self, chunk_info):
1406 header = { 1407 'op': _pack_uint8 (_OP_CHUNK_INFO), 1408 'ver': _pack_uint32(_CHUNK_INDEX_VERSION), 1409 'chunk_pos': _pack_uint64(chunk_info.pos), 1410 'start_time': _pack_time(chunk_info.start_time), 1411 'end_time': _pack_time(chunk_info.end_time), 1412 'count': _pack_uint32(len(chunk_info.connection_counts)) 1413 } 1414 1415 buffer = self._buffer 1416 buffer.seek(0) 1417 buffer.truncate(0) 1418 for connection_id, count in chunk_info.connection_counts.items(): 1419 buffer.write(_pack_uint32(connection_id)) 1420 buffer.write(_pack_uint32(count)) 1421 1422 _write_record(self._file, header, buffer.getvalue())
1423 1424 ### Implementation ### 1425 1426 _message_types = {} # md5sum -> type 1427 1428 _OP_MSG_DEF = 0x01 1429 _OP_MSG_DATA = 0x02 1430 _OP_FILE_HEADER = 0x03 1431 _OP_INDEX_DATA = 0x04 1432 _OP_CHUNK = 0x05 1433 _OP_CHUNK_INFO = 0x06 1434 _OP_CONNECTION = 0x07 1435 1436 _OP_CODES = { 1437 _OP_MSG_DEF: 'MSG_DEF', 1438 _OP_MSG_DATA: 'MSG_DATA', 1439 _OP_FILE_HEADER: 'FILE_HEADER', 1440 _OP_INDEX_DATA: 'INDEX_DATA', 1441 _OP_CHUNK: 'CHUNK', 1442 _OP_CHUNK_INFO: 'CHUNK_INFO', 1443 _OP_CONNECTION: 'CONNECTION' 1444 } 1445 1446 _VERSION = '#ROSBAG V2.0' 1447 _FILE_HEADER_LENGTH = 4096 1448 _INDEX_VERSION = 1 1449 _CHUNK_INDEX_VERSION = 1
1450 1451 -class _ConnectionInfo(object):
1452 - def __init__(self, id, topic, header):
1453 try: 1454 datatype = _read_str_field(header, 'type') 1455 md5sum = _read_str_field(header, 'md5sum') 1456 msg_def = _read_str_field(header, 'message_definition') 1457 except KeyError as ex: 1458 raise ROSBagFormatException('connection header field %s not found' % str(ex)) 1459 1460 self.id = id 1461 self.topic = topic 1462 self.datatype = datatype 1463 self.md5sum = md5sum 1464 self.msg_def = msg_def 1465 self.header = header
1466
1467 - def __str__(self):
1468 return '%d on %s: %s' % (self.id, self.topic, str(self.header))
1469
1470 -class _ChunkInfo(object):
1471 - def __init__(self, pos, start_time, end_time):
1472 self.pos = pos 1473 self.start_time = start_time 1474 self.end_time = end_time 1475 1476 self.connection_counts = {}
1477
1478 - def __str__(self):
1479 s = 'chunk_pos: %d\n' % self.pos 1480 s += 'start_time: %s\n' % str(self.start_time) 1481 s += 'end_time: %s\n' % str(self.end_time) 1482 s += 'connections: %d\n' % len(self.connection_counts) 1483 s += '\n'.join([' - [%4d] %d' % (connection_id, count) for connection_id, count in self.connection_counts.items()]) 1484 return s
1485
1486 -class _ChunkHeader(object):
1487 - def __init__(self, compression, compressed_size, uncompressed_size, data_pos=0):
1488 self.compression = compression 1489 self.compressed_size = compressed_size 1490 self.uncompressed_size = uncompressed_size 1491 self.data_pos = data_pos
1492
1493 - def __str__(self):
1494 if self.uncompressed_size > 0: 1495 ratio = 100 * (float(self.compressed_size) / self.uncompressed_size) 1496 return 'compression: %s, size: %d, uncompressed: %d (%.2f%%)' % (self.compression, self.compressed_size, self.uncompressed_size, ratio) 1497 else: 1498 return 'compression: %s, size: %d, uncompressed: %d' % (self.compression, self.compressed_size, self.uncompressed_size)
1499
1500 -class ComparableMixin(object):
1501 __slots__ = [] 1502
1503 - def _compare(self, other, method):
1504 try: 1505 return method(self._cmpkey(), other._cmpkey()) 1506 except (AttributeError, TypeError): 1507 # _cmpkey not implemented or return different type 1508 # so can not compare with other 1509 return NotImplemented
1510
1511 - def __lt__(self, other):
1512 return self._compare(other, lambda s, o: s < o)
1513
1514 - def __le__(self, other):
1515 return self._compare(other, lambda s, o: s <= o)
1516
1517 - def __eq__(self, other):
1518 return self._compare(other, lambda s, o: s == o)
1519
1520 - def __ge__(self, other):
1521 return self._compare(other, lambda s, o: s >= o)
1522
1523 - def __gt__(self, other):
1524 return self._compare(other, lambda s, o: s > o)
1525
1526 - def __ne__(self, other):
1527 return self._compare(other, lambda s, o: s != o)
1528
1529 -class _IndexEntry(ComparableMixin):
1530 __slots__ = ['time'] 1531
1532 - def __init__(self, time):
1533 self.time = time
1534
1535 - def _cmpkey(self):
1536 return self.time
1537
1538 -class _IndexEntry102(_IndexEntry):
1539 __slots__ = ['offset'] 1540
1541 - def __init__(self, time, offset):
1542 self.time = time 1543 self.offset = offset
1544 1545 @property
1546 - def position(self):
1547 return self.offset
1548
1549 - def __str__(self):
1550 return '%d.%d: %d' % (self.time.secs, self.time.nsecs, self.offset)
1551
1552 -class _IndexEntry200(_IndexEntry):
1553 __slots__ = ['chunk_pos', 'offset'] 1554
1555 - def __init__(self, time, chunk_pos, offset):
1556 self.time = time 1557 self.chunk_pos = chunk_pos 1558 self.offset = offset
1559 1560 @property
1561 - def position(self):
1562 return (self.chunk_pos, self.offset)
1563
1564 - def __str__(self):
1565 return '%d.%d: %d+%d' % (self.time.secs, self.time.nsecs, self.chunk_pos, self.offset)
1566
1567 -def _get_message_type(info):
1568 message_type = _message_types.get(info.md5sum) 1569 if message_type is None: 1570 try: 1571 message_type = genpy.dynamic.generate_dynamic(info.datatype, info.msg_def)[info.datatype] 1572 if (message_type._md5sum != info.md5sum): 1573 print('WARNING: For type [%s] stored md5sum [%s] does not match message definition [%s].\n Try: "rosrun rosbag fix_msg_defs.py old_bag new_bag."'%(info.datatype, info.md5sum, message_type._md5sum), file=sys.stderr) 1574 except genmsg.InvalidMsgSpec: 1575 message_type = genpy.dynamic.generate_dynamic(info.datatype, "")[info.datatype] 1576 print('WARNING: For type [%s] stored md5sum [%s] has invalid message definition."'%(info.datatype, info.md5sum), file=sys.stderr) 1577 except genmsg.MsgGenerationException as ex: 1578 raise ROSBagException('Error generating datatype %s: %s' % (info.datatype, str(ex))) 1579 1580 _message_types[info.md5sum] = message_type 1581 1582 return message_type
1583
1584 -def _read_uint8 (f): return _unpack_uint8 (f.read(1))
1585 -def _read_uint32(f): return _unpack_uint32(f.read(4))
1586 -def _read_uint64(f): return _unpack_uint64(f.read(8))
1587 -def _read_time (f): return _unpack_time (f.read(8))
1588
1589 -def _decode_str(v): return v if type(v) is str else v.decode()
1590 -def _unpack_uint8(v): return struct.unpack('<B', v)[0]
1591 -def _unpack_uint32(v): return struct.unpack('<L', v)[0]
1592 -def _unpack_uint64(v): return struct.unpack('<Q', v)[0]
1593 -def _unpack_time(v): return rospy.Time(*struct.unpack('<LL', v))
1594
1595 -def _pack_uint8(v): return struct.pack('<B', v)
1596 -def _pack_uint32(v): return struct.pack('<L', v)
1597 -def _pack_uint64(v): return struct.pack('<Q', v)
1598 -def _pack_time(v): return _pack_uint32(v.secs) + _pack_uint32(v.nsecs)
1599
1600 -def _read(f, size):
1601 data = f.read(size) 1602 if len(data) != size: 1603 raise ROSBagException('expecting %d bytes, read %d' % (size, len(data))) 1604 return data
1605
1606 -def _skip_record(f):
1607 _skip_sized(f) # skip header 1608 _skip_sized(f) # skip data
1609
1610 -def _skip_sized(f):
1611 size = _read_uint32(f) 1612 f.seek(size, os.SEEK_CUR)
1613
1614 -def _read_sized(f):
1615 try: 1616 size = _read_uint32(f) 1617 except struct.error as ex: 1618 raise ROSBagFormatException('error unpacking uint32: %s' % str(ex)) 1619 return _read(f, size)
1620
1621 -def _write_sized(f, v):
1622 if not isinstance(v, bytes): 1623 v = v.encode() 1624 f.write(_pack_uint32(len(v))) 1625 f.write(v)
1626
1627 -def _read_field(header, field, unpack_fn):
1628 if field not in header: 1629 raise ROSBagFormatException('expected "%s" field in record' % field) 1630 1631 try: 1632 value = unpack_fn(header[field]) 1633 except Exception as ex: 1634 raise ROSBagFormatException('error reading field "%s": %s' % (field, str(ex))) 1635 1636 return value
1637
1638 -def _read_str_field (header, field): return _read_field(header, field, _decode_str)
1639 -def _read_uint8_field (header, field): return _read_field(header, field, _unpack_uint8)
1640 -def _read_uint32_field(header, field): return _read_field(header, field, _unpack_uint32)
1641 -def _read_uint64_field(header, field): return _read_field(header, field, _unpack_uint64)
1642 -def _read_time_field (header, field): return _read_field(header, field, _unpack_time)
1643
1644 -def _write_record(f, header, data='', padded_size=None):
1645 header_str = _write_header(f, header) 1646 1647 if padded_size is not None: 1648 header_len = len(header_str) 1649 if header_len < padded_size: 1650 data = ' ' * (padded_size - header_len) 1651 else: 1652 data = '' 1653 1654 _write_sized(f, data)
1655
1656 -def _write_header(f, header):
1657 header_str = b'' 1658 equal = b'=' 1659 for k, v in header.items(): 1660 if not isinstance(k, bytes): 1661 k = k.encode() 1662 if not isinstance(v, bytes): 1663 v = v.encode() 1664 header_str += _pack_uint32(len(k) + 1 + len(v)) + k + equal + v 1665 _write_sized(f, header_str) 1666 return header_str
1667
1668 -def _read_header(f, req_op=None):
1669 bag_pos = f.tell() 1670 1671 # Read header 1672 try: 1673 header = _read_sized(f) 1674 except ROSBagException as ex: 1675 raise ROSBagFormatException('Error reading header: %s' % str(ex)) 1676 1677 # Parse header into a dict 1678 header_dict = {} 1679 while header != b'': 1680 # Read size 1681 if len(header) < 4: 1682 raise ROSBagFormatException('Error reading header field') 1683 (size,) = struct.unpack('<L', header[:4]) # @todo reindex: catch struct.error 1684 header = header[4:] 1685 1686 # Read bytes 1687 if len(header) < size: 1688 raise ROSBagFormatException('Error reading header field: expected %d bytes, read %d' % (size, len(header))) 1689 (name, sep, value) = header[:size].partition(b'=') 1690 if sep == b'': 1691 raise ROSBagFormatException('Error reading header field') 1692 1693 name = name.decode() 1694 header_dict[name] = value # @todo reindex: raise exception on empty name 1695 1696 header = header[size:] 1697 1698 # Check the op code of the header, if supplied 1699 if req_op is not None: 1700 op = _read_uint8_field(header_dict, 'op') 1701 if req_op != op: 1702 raise ROSBagFormatException('Expected op code: %s, got %s' % (_OP_CODES[req_op], _OP_CODES[op])) 1703 1704 return header_dict
1705
1706 -def _peek_next_header_op(f):
1707 pos = f.tell() 1708 header = _read_header(f) 1709 op = _read_uint8_field(header, 'op') 1710 f.seek(pos) 1711 return op
1712
1713 -def _read_record_data(f):
1714 try: 1715 record_data = _read_sized(f) 1716 except ROSBagException as ex: 1717 raise ROSBagFormatException('Error reading record data: %s' % str(ex)) 1718 1719 return record_data
1720
1721 -class _BagReader(object):
1722 - def __init__(self, bag):
1723 self.bag = bag
1724
1725 - def start_reading(self):
1726 raise NotImplementedError()
1727
1728 - def read_messages(self, topics, start_time, end_time, connection_filter, raw):
1729 raise NotImplementedError()
1730
1731 - def reindex(self):
1732 raise NotImplementedError()
1733
1734 -class _BagReader102_Unindexed(_BagReader):
1735 """ 1736 Support class for reading unindexed v1.2 bag files. 1737 """
1738 - def __init__(self, bag):
1739 _BagReader.__init__(self, bag)
1740
1741 - def start_reading(self):
1742 self.bag._file_header_pos = self.bag._file.tell()
1743
1744 - def reindex(self):
1745 """Generates all bag index information by rereading the message records.""" 1746 f = self.bag._file 1747 1748 total_bytes = self.bag.size 1749 1750 # Re-read the file header to get to the start of the first message 1751 self.bag._file.seek(self.bag._file_header_pos) 1752 1753 offset = f.tell() 1754 1755 # Read message definition and data records 1756 while offset < total_bytes: 1757 yield offset 1758 1759 op = _peek_next_header_op(f) 1760 1761 if op == _OP_MSG_DEF: 1762 connection_info = self.read_message_definition_record() 1763 1764 if connection_info.topic not in self.bag._topic_connections: 1765 self.bag._topic_connections[connection_info.topic] = connection_info.id 1766 self.bag._connections[connection_info.id] = connection_info 1767 self.bag._connection_indexes[connection_info.id] = [] 1768 1769 elif op == _OP_MSG_DATA: 1770 # Read the topic and timestamp from the header 1771 header = _read_header(f) 1772 1773 topic = _read_str_field(header, 'topic') 1774 secs = _read_uint32_field(header, 'sec') 1775 nsecs = _read_uint32_field(header, 'nsec') 1776 t = genpy.Time(secs, nsecs) 1777 1778 if topic not in self.bag._topic_connections: 1779 datatype = _read_str_field(header, 'type') 1780 self._create_connection_info_for_datatype(topic, datatype) 1781 1782 connection_id = self.bag._topic_connections[topic] 1783 info = self.bag._connections[connection_id] 1784 1785 # Skip over the message content 1786 _skip_sized(f) 1787 1788 # Insert the message entry (in order) into the connection index 1789 bisect.insort_right(self.bag._connection_indexes[connection_id], _IndexEntry102(t, offset)) 1790 1791 offset = f.tell()
1792
1793 - def read_messages(self, topics, start_time, end_time, topic_filter, raw):
1794 f = self.bag._file 1795 1796 f.seek(self.bag._file_header_pos) 1797 1798 while True: 1799 # Read MSG_DEF records 1800 while True: 1801 position = f.tell() 1802 1803 try: 1804 header = _read_header(f) 1805 except Exception: 1806 return 1807 1808 op = _read_uint8_field(header, 'op') 1809 if op != _OP_MSG_DEF: 1810 break 1811 1812 connection_info = self.read_message_definition_record(header) 1813 1814 if connection_info.topic not in self.bag._topic_connections: 1815 self.bag._topic_connections[connection_info.topic] = connection_info.id 1816 1817 self.bag._connections[connection_info.id] = connection_info 1818 1819 # Check that we have a MSG_DATA record 1820 if op != _OP_MSG_DATA: 1821 raise ROSBagFormatException('Expecting OP_MSG_DATA, got %d' % op) 1822 1823 topic = _read_str_field(header, 'topic') 1824 1825 if topic not in self.bag._topic_connections: 1826 datatype = _read_str_field(header, 'type') 1827 self._create_connection_info_for_datatype(topic, datatype) 1828 1829 connection_id = self.bag._topic_connections[topic] 1830 info = self.bag._connections[connection_id] 1831 1832 # Get the message type 1833 try: 1834 msg_type = _get_message_type(info) 1835 except KeyError: 1836 raise ROSBagException('Cannot deserialize messages of type [%s]. Message was not preceeded in bagfile by definition' % info.datatype) 1837 1838 # Get the timestamp 1839 secs = _read_uint32_field(header, 'sec') 1840 nsecs = _read_uint32_field(header, 'nsec') 1841 t = genpy.Time(secs, nsecs) 1842 1843 # Read the message content 1844 data = _read_record_data(f) 1845 1846 if raw: 1847 msg = (info.datatype, data, info.md5sum, position, msg_type) 1848 else: 1849 # Deserialize the message 1850 msg = msg_type() 1851 msg.deserialize(data) 1852 1853 yield BagMessage(topic, msg, t) 1854 1855 self.bag._connection_indexes_read = True
1856
1857 - def _create_connection_info_for_datatype(self, topic, datatype):
1858 for c in self.bag._connections.values(): 1859 if c.datatype == datatype: 1860 connection_id = len(self.bag._connections) 1861 connection_header = { 'topic' : topic, 'type' : c.header['type'], 'md5sum' : c.header['md5sum'], 'message_definition' : c.header['message_definition'] } 1862 connection_info = _ConnectionInfo(connection_id, topic, connection_header) 1863 1864 self.bag._topic_connections[topic] = connection_id 1865 self.bag._connections[connection_id] = connection_info 1866 self.bag._connection_indexes[connection_id] = [] 1867 return 1868 1869 raise ROSBagFormatException('Topic %s of datatype %s not preceded by message definition' % (topic, datatype))
1870
1871 - def read_message_definition_record(self, header=None):
1872 if not header: 1873 header = _read_header(self.bag._file, _OP_MSG_DEF) 1874 1875 topic = _read_str_field(header, 'topic') 1876 datatype = _read_str_field(header, 'type') 1877 md5sum = _read_str_field(header, 'md5') 1878 msg_def = _read_str_field(header, 'def') 1879 1880 _skip_sized(self.bag._file) # skip the record data 1881 1882 connection_header = { 'topic' : topic, 'type' : datatype, 'md5sum' : md5sum, 'message_definition' : msg_def } 1883 1884 id = len(self.bag._connections) 1885 1886 return _ConnectionInfo(id, topic, connection_header)
1887
1888 -class _BagReader102_Indexed(_BagReader102_Unindexed):
1889 """ 1890 Support class for reading indexed v1.2 bag files. 1891 """
1892 - def __init__(self, bag):
1893 _BagReader.__init__(self, bag)
1894
1895 - def read_messages(self, topics, start_time, end_time, connection_filter, raw):
1896 connections = self.bag._get_connections(topics, connection_filter) 1897 for entry in self.bag._get_entries(connections, start_time, end_time): 1898 yield self.seek_and_read_message_data_record(entry.offset, raw)
1899
1900 - def reindex(self):
1901 """Generates all bag index information by rereading the message records.""" 1902 f = self.bag._file 1903 1904 total_bytes = self.bag.size 1905 1906 # Re-read the file header to get to the start of the first message 1907 self.bag._file.seek(self.bag._file_header_pos) 1908 self.read_file_header_record() 1909 1910 offset = f.tell() 1911 1912 # Read message definition and data records 1913 while offset < total_bytes: 1914 yield offset 1915 1916 op = _peek_next_header_op(f) 1917 1918 if op == _OP_MSG_DEF: 1919 connection_info = self.read_message_definition_record() 1920 1921 if connection_info.topic not in self.bag._topic_connections: 1922 self.bag._topic_connections[connection_info.topic] = connection_info.id 1923 self.bag._connections[connection_info.id] = connection_info 1924 self.bag._connection_indexes[connection_info.id] = [] 1925 1926 elif op == _OP_MSG_DATA: 1927 # Read the topic and timestamp from the header 1928 header = _read_header(f) 1929 1930 topic = _read_str_field(header, 'topic') 1931 secs = _read_uint32_field(header, 'sec') 1932 nsecs = _read_uint32_field(header, 'nsec') 1933 t = genpy.Time(secs, nsecs) 1934 1935 if topic not in self.bag._topic_connections: 1936 datatype = _read_str_field(header, 'type') 1937 self._create_connection_info_for_datatype(topic, datatype) 1938 1939 connection_id = self.bag._topic_connections[topic] 1940 info = self.bag._connections[connection_id] 1941 1942 # Skip over the message content 1943 _skip_sized(f) 1944 1945 # Insert the message entry (in order) into the connection index 1946 bisect.insort_right(self.bag._connection_indexes[connection_id], _IndexEntry102(t, offset)) 1947 1948 elif op == _OP_INDEX_DATA: 1949 _skip_record(f) 1950 1951 offset = f.tell()
1952
1953 - def start_reading(self):
1954 try: 1955 # Read the file header 1956 self.read_file_header_record() 1957 1958 total_bytes = self.bag.size 1959 1960 # Check if the index position has been written, i.e. the bag was closed successfully 1961 if self.bag._index_data_pos == 0: 1962 raise ROSBagUnindexedException() 1963 1964 # Seek to the beginning of the topic index records 1965 self.bag._file.seek(self.bag._index_data_pos) 1966 1967 # Read the topic indexes 1968 topic_indexes = {} 1969 while True: 1970 pos = self.bag._file.tell() 1971 if pos >= total_bytes: 1972 break 1973 1974 topic, index = self.read_topic_index_record() 1975 1976 topic_indexes[topic] = index 1977 1978 # Read the message definition records (one for each topic) 1979 for topic, index in topic_indexes.items(): 1980 self.bag._file.seek(index[0].offset) 1981 1982 connection_info = self.read_message_definition_record() 1983 1984 if connection_info.topic not in self.bag._topic_connections: 1985 self.bag._topic_connections[connection_info.topic] = connection_info.id 1986 self.bag._connections[connection_info.id] = connection_info 1987 1988 self.bag._connection_indexes[connection_info.id] = index 1989 1990 self.bag._connection_indexes_read = True 1991 1992 except Exception as ex: 1993 raise ROSBagUnindexedException()
1994
1995 - def read_file_header_record(self):
1996 self.bag._file_header_pos = self.bag._file.tell() 1997 1998 header = _read_header(self.bag._file, _OP_FILE_HEADER) 1999 2000 self.bag._index_data_pos = _read_uint64_field(header, 'index_pos') 2001 2002 _skip_sized(self.bag._file) # skip the record data, i.e. padding
2003
2004 - def read_topic_index_record(self):
2005 f = self.bag._file 2006 2007 header = _read_header(f, _OP_INDEX_DATA) 2008 2009 index_version = _read_uint32_field(header, 'ver') 2010 topic = _read_str_field (header, 'topic') 2011 count = _read_uint32_field(header, 'count') 2012 2013 if index_version != 0: 2014 raise ROSBagFormatException('expecting index version 0, got %d' % index_version) 2015 2016 _read_uint32(f) # skip the record data size 2017 2018 topic_index = [] 2019 2020 for i in range(count): 2021 time = _read_time (f) 2022 offset = _read_uint64(f) 2023 2024 topic_index.append(_IndexEntry102(time, offset)) 2025 2026 return (topic, topic_index)
2027
2028 - def seek_and_read_message_data_record(self, position, raw):
2029 f = self.bag._file 2030 2031 # Seek to the message position 2032 f.seek(position) 2033 2034 # Skip any MSG_DEF records 2035 while True: 2036 header = _read_header(f) 2037 op = _read_uint8_field(header, 'op') 2038 if op != _OP_MSG_DEF: 2039 break 2040 _skip_sized(f) 2041 2042 # Check that we have a MSG_DATA record 2043 if op != _OP_MSG_DATA: 2044 raise ROSBagFormatException('Expecting OP_MSG_DATA, got %d' % op) 2045 2046 topic = _read_str_field(header, 'topic') 2047 2048 connection_id = self.bag._topic_connections[topic] 2049 info = self.bag._connections[connection_id] 2050 2051 # Get the message type 2052 try: 2053 msg_type = _get_message_type(info) 2054 except KeyError: 2055 raise ROSBagException('Cannot deserialize messages of type [%s]. Message was not preceeded in bagfile by definition' % info.datatype) 2056 2057 # Get the timestamp 2058 secs = _read_uint32_field(header, 'sec') 2059 nsecs = _read_uint32_field(header, 'nsec') 2060 t = genpy.Time(secs, nsecs) 2061 2062 # Read the message content 2063 data = _read_record_data(f) 2064 2065 if raw: 2066 msg = info.datatype, data, info.md5sum, position, msg_type 2067 else: 2068 # Deserialize the message 2069 msg = msg_type() 2070 msg.deserialize(data) 2071 2072 return BagMessage(topic, msg, t)
2073
2074 -class _BagReader200(_BagReader):
2075 """ 2076 Support class for reading v2.0 bag files. 2077 """
2078 - def __init__(self, bag):
2079 _BagReader.__init__(self, bag) 2080 2081 self.decompressed_chunk_pos = None 2082 self.decompressed_chunk = None 2083 self.decompressed_chunk_io = None
2084
2085 - def reindex(self):
2086 """ 2087 Generates all bag index information by rereading the chunks. 2088 Assumes the file header has been read. 2089 """ 2090 f = self.bag._file 2091 2092 f.seek(0, os.SEEK_END) 2093 total_bytes = f.tell() 2094 2095 # Read any connection records from after the chunk section. 2096 # This is to workaround a bug in rosbag record --split (fixed in r10390) 2097 # where connection records weren't always being written inside the chunk. 2098 self._read_terminal_connection_records() 2099 2100 # Re-read the file header to get to the start of the first chunk 2101 self.bag._file.seek(self.bag._file_header_pos) 2102 self.read_file_header_record() 2103 2104 trunc_pos = None 2105 2106 while True: 2107 chunk_pos = f.tell() 2108 if chunk_pos >= total_bytes: 2109 break 2110 2111 yield chunk_pos 2112 2113 try: 2114 self._reindex_read_chunk(f, chunk_pos, total_bytes) 2115 except Exception as ex: 2116 break 2117 2118 trunc_pos = f.tell() 2119 2120 if trunc_pos and trunc_pos < total_bytes: 2121 f.truncate(trunc_pos)
2122
2123 - def _reindex_read_chunk(self, f, chunk_pos, total_bytes):
2124 # Read the chunk header 2125 chunk_header = self.read_chunk_header() 2126 2127 # If the chunk header size is 0, then the chunk wasn't correctly terminated - we're done 2128 if chunk_header.compressed_size == 0: 2129 raise ROSBagException('unterminated chunk at %d' % chunk_pos) 2130 2131 if chunk_header.compression == Compression.NONE: 2132 chunk_file = f 2133 else: 2134 # Read the compressed chunk 2135 compressed_chunk = _read(f, chunk_header.compressed_size) 2136 2137 # Decompress it 2138 if chunk_header.compression == Compression.BZ2: 2139 self.decompressed_chunk = bz2.decompress(compressed_chunk) 2140 elif chunk_header.compression == Compression.LZ4 and found_lz4: 2141 self.decompressed_chunk = roslz4.decompress(compressed_chunk) 2142 else: 2143 raise ROSBagException('unsupported compression type: %s' % chunk_header.compression) 2144 2145 if self.decompressed_chunk_io: 2146 self.decompressed_chunk_io.close() 2147 self.decompressed_chunk_io = StringIO(self.decompressed_chunk) 2148 2149 chunk_file = self.decompressed_chunk_io 2150 2151 # Read chunk connection and message records 2152 self.bag._curr_chunk_info = None 2153 2154 if chunk_header.compression == Compression.NONE: 2155 offset = chunk_file.tell() - chunk_pos 2156 else: 2157 offset = chunk_file.tell() 2158 2159 expected_index_length = 0 2160 2161 while offset < chunk_header.uncompressed_size: 2162 op = _peek_next_header_op(chunk_file) 2163 2164 if op == _OP_CONNECTION: 2165 connection_info = self.read_connection_record(chunk_file) 2166 2167 if connection_info.id not in self.bag._connections: 2168 self.bag._connections[connection_info.id] = connection_info 2169 if connection_info.id not in self.bag._connection_indexes: 2170 self.bag._connection_indexes[connection_info.id] = [] 2171 2172 elif op == _OP_MSG_DATA: 2173 # Read the connection id and timestamp from the header 2174 header = _read_header(chunk_file) 2175 2176 connection_id = _read_uint32_field(header, 'conn') 2177 t = _read_time_field (header, 'time') 2178 2179 # Update the chunk info with this timestamp 2180 if not self.bag._curr_chunk_info: 2181 self.bag._curr_chunk_info = _ChunkInfo(chunk_pos, t, t) 2182 else: 2183 if t > self.bag._curr_chunk_info.end_time: 2184 self.bag._curr_chunk_info.end_time = t 2185 elif t < self.bag._curr_chunk_info.start_time: 2186 self.bag._curr_chunk_info.start_time = t 2187 if connection_id in self.bag._curr_chunk_info.connection_counts: 2188 self.bag._curr_chunk_info.connection_counts[connection_id] += 1 2189 else: 2190 self.bag._curr_chunk_info.connection_counts[connection_id] = 1 2191 2192 # Skip over the message content 2193 _skip_sized(chunk_file) 2194 2195 # Insert the message entry (in order) into the connection index 2196 if connection_id not in self.bag._connection_indexes: 2197 raise ROSBagException('connection id (id=%d) in chunk at position %d not preceded by connection record' % (connection_id, chunk_pos)) 2198 bisect.insort_right(self.bag._connection_indexes[connection_id], _IndexEntry200(t, chunk_pos, offset)) 2199 2200 expected_index_length += 1 2201 2202 else: 2203 # Unknown record type so skip 2204 _skip_record(chunk_file) 2205 2206 if chunk_header.compression == Compression.NONE: 2207 offset = chunk_file.tell() - chunk_pos 2208 else: 2209 offset = chunk_file.tell() 2210 2211 # Skip over index records, connection records and chunk info records 2212 next_op = _peek_next_header_op(f) 2213 2214 total_index_length = 0 2215 2216 while next_op != _OP_CHUNK: 2217 if next_op == _OP_INDEX_DATA: 2218 # Bug workaround: C Turtle bags (pre-1.1.15) were written with an incorrect data length 2219 _, index = self.read_connection_index_record() 2220 total_index_length += len(index) 2221 else: 2222 _skip_record(f) 2223 2224 if f.tell() >= total_bytes: 2225 if total_index_length != expected_index_length: 2226 raise ROSBagException('index shorter than expected (%d vs %d)' % (total_index_length, expected_index_length)) 2227 break 2228 2229 next_op = _peek_next_header_op(f) 2230 2231 # Chunk was read correctly - store info 2232 self.bag._chunk_headers[chunk_pos] = chunk_header 2233 self.bag._chunks.append(self.bag._curr_chunk_info)
2234
2236 b, f, r = self.bag, self.bag._file, self.bag._reader 2237 2238 # Seek to the first record after FILE_HEADER 2239 f.seek(b._file_header_pos) 2240 r.read_file_header_record() 2241 2242 # Advance to the first CONNECTION 2243 if self._advance_to_next_record(_OP_CONNECTION): 2244 # Read the CONNECTION records 2245 while True: 2246 connection_info = r.read_connection_record(f) 2247 2248 b._connections[connection_info.id] = connection_info 2249 b._connection_indexes[connection_info.id] = [] 2250 2251 next_op = _peek_next_header_op(f) 2252 if next_op != _OP_CONNECTION: 2253 break
2254
2255 - def _advance_to_next_record(self, op):
2256 b, f = self.bag, self.bag._file 2257 2258 try: 2259 while True: 2260 next_op = _peek_next_header_op(f) 2261 if next_op == op: 2262 break 2263 2264 if next_op == _OP_INDEX_DATA: 2265 # Workaround the possible invalid data length in INDEX_DATA records 2266 2267 # read_connection_index_record() requires _curr_chunk_info is set 2268 if b._curr_chunk_info is None: 2269 b._curr_chunk_info = _ChunkInfo(0, rospy.Time(0, 1), rospy.Time(0, 1)) 2270 2271 b._reader.read_connection_index_record() 2272 else: 2273 _skip_record(f) 2274 2275 return True 2276 2277 except Exception as ex: 2278 return False
2279
2280 - def start_reading(self):
2281 try: 2282 # Read the file header 2283 self.read_file_header_record() 2284 2285 # Check if the index position has been written, i.e. the bag was closed successfully 2286 if self.bag._index_data_pos == 0: 2287 raise ROSBagUnindexedException() 2288 2289 # Seek to the end of the chunks 2290 self.bag._file.seek(self.bag._index_data_pos) 2291 2292 # Read the connection records 2293 self.bag._connection_indexes = {} 2294 for i in range(self.bag._connection_count): 2295 connection_info = self.read_connection_record(self.bag._file) 2296 self.bag._connections[connection_info.id] = connection_info 2297 self.bag._connection_indexes[connection_info.id] = [] 2298 2299 # Read the chunk info records 2300 self.bag._chunks = [self.read_chunk_info_record() for i in range(self.bag._chunk_count)] 2301 2302 # Read the chunk headers 2303 self.bag._chunk_headers = {} 2304 for chunk_info in self.bag._chunks: 2305 self.bag._file.seek(chunk_info.pos) 2306 self.bag._chunk_headers[chunk_info.pos] = self.read_chunk_header() 2307 2308 if not self.bag._skip_index: 2309 self._read_connection_index_records() 2310 2311 except Exception as ex: 2312 raise ROSBagUnindexedException()
2313
2315 for chunk_info in self.bag._chunks: 2316 self.bag._file.seek(chunk_info.pos) 2317 _skip_record(self.bag._file) 2318 2319 self.bag._curr_chunk_info = chunk_info 2320 for i in range(len(chunk_info.connection_counts)): 2321 connection_id, index = self.read_connection_index_record() 2322 self.bag._connection_indexes[connection_id].extend(index) 2323 2324 # Remove any connections with no entries 2325 # This is a workaround for a bug where connection records were being written for 2326 # connections which had no messages in the bag 2327 orphan_connection_ids = [id for id, index in self.bag._connection_indexes.items() if not index] 2328 for id in orphan_connection_ids: 2329 del self.bag._connections[id] 2330 del self.bag._connection_indexes[id] 2331 2332 self.bag._connection_indexes_read = True
2333
2334 - def read_messages(self, topics, start_time, end_time, connection_filter, raw):
2335 connections = self.bag._get_connections(topics, connection_filter) 2336 for entry in self.bag._get_entries(connections, start_time, end_time): 2337 yield self.seek_and_read_message_data_record((entry.chunk_pos, entry.offset), raw)
2338 2339 ### 2340
2341 - def read_file_header_record(self):
2342 self.bag._file_header_pos = self.bag._file.tell() 2343 2344 header = _read_header(self.bag._file, _OP_FILE_HEADER) 2345 2346 self.bag._index_data_pos = _read_uint64_field(header, 'index_pos') 2347 self.bag._chunk_count = _read_uint32_field(header, 'chunk_count') 2348 self.bag._connection_count = _read_uint32_field(header, 'conn_count') 2349 2350 _skip_sized(self.bag._file) # skip over the record data, i.e. padding
2351
2352 - def read_connection_record(self, f):
2353 header = _read_header(f, _OP_CONNECTION) 2354 2355 conn_id = _read_uint32_field(header, 'conn') 2356 topic = _read_str_field (header, 'topic') 2357 2358 connection_header = _read_header(f) 2359 2360 return _ConnectionInfo(conn_id, topic, connection_header)
2361
2362 - def read_chunk_info_record(self):
2363 f = self.bag._file 2364 2365 header = _read_header(f, _OP_CHUNK_INFO) 2366 2367 chunk_info_version = _read_uint32_field(header, 'ver') 2368 2369 if chunk_info_version == 1: 2370 chunk_pos = _read_uint64_field(header, 'chunk_pos') 2371 start_time = _read_time_field (header, 'start_time') 2372 end_time = _read_time_field (header, 'end_time') 2373 connection_count = _read_uint32_field(header, 'count') 2374 2375 chunk_info = _ChunkInfo(chunk_pos, start_time, end_time) 2376 2377 _read_uint32(f) # skip the record data size 2378 2379 for i in range(connection_count): 2380 connection_id = _read_uint32(f) 2381 count = _read_uint32(f) 2382 2383 chunk_info.connection_counts[connection_id] = count 2384 2385 return chunk_info 2386 else: 2387 raise ROSBagFormatException('Unknown chunk info record version: %d' % chunk_info_version)
2388
2389 - def read_chunk_header(self):
2390 header = _read_header(self.bag._file, _OP_CHUNK) 2391 2392 compression = _read_str_field (header, 'compression') 2393 uncompressed_size = _read_uint32_field(header, 'size') 2394 2395 compressed_size = _read_uint32(self.bag._file) # read the record data size 2396 2397 data_pos = self.bag._file.tell() 2398 2399 return _ChunkHeader(compression, compressed_size, uncompressed_size, data_pos)
2400
2402 f = self.bag._file 2403 2404 header = _read_header(f, _OP_INDEX_DATA) 2405 2406 index_version = _read_uint32_field(header, 'ver') 2407 connection_id = _read_uint32_field(header, 'conn') 2408 count = _read_uint32_field(header, 'count') 2409 2410 if index_version != 1: 2411 raise ROSBagFormatException('expecting index version 1, got %d' % index_version) 2412 2413 record_size = _read_uint32(f) # skip the record data size 2414 2415 index = [] 2416 2417 for i in range(count): 2418 time = _read_time (f) 2419 offset = _read_uint32(f) 2420 2421 index.append(_IndexEntry200(time, self.bag._curr_chunk_info.pos, offset)) 2422 2423 return (connection_id, index)
2424
2425 - def seek_and_read_message_data_record(self, position, raw):
2426 chunk_pos, offset = position 2427 2428 chunk_header = self.bag._chunk_headers.get(chunk_pos) 2429 if chunk_header is None: 2430 raise ROSBagException('no chunk at position %d' % chunk_pos) 2431 2432 if chunk_header.compression == Compression.NONE: 2433 f = self.bag._file 2434 f.seek(chunk_header.data_pos + offset) 2435 else: 2436 if self.decompressed_chunk_pos != chunk_pos: 2437 # Seek to the chunk data, read and decompress 2438 self.bag._file.seek(chunk_header.data_pos) 2439 compressed_chunk = _read(self.bag._file, chunk_header.compressed_size) 2440 2441 if chunk_header.compression == Compression.BZ2: 2442 self.decompressed_chunk = bz2.decompress(compressed_chunk) 2443 elif chunk_header.compression == Compression.LZ4 and found_lz4: 2444 self.decompressed_chunk = roslz4.decompress(compressed_chunk) 2445 else: 2446 raise ROSBagException('unsupported compression type: %s' % chunk_header.compression) 2447 2448 self.decompressed_chunk_pos = chunk_pos 2449 2450 if self.decompressed_chunk_io: 2451 self.decompressed_chunk_io.close() 2452 self.decompressed_chunk_io = StringIO(self.decompressed_chunk) 2453 2454 f = self.decompressed_chunk_io 2455 f.seek(offset) 2456 2457 # Skip any CONNECTION records 2458 while True: 2459 header = _read_header(f) 2460 op = _read_uint8_field(header, 'op') 2461 if op != _OP_CONNECTION: 2462 break 2463 _skip_sized(f) 2464 2465 # Check that we have a MSG_DATA record 2466 if op != _OP_MSG_DATA: 2467 raise ROSBagFormatException('Expecting OP_MSG_DATA, got %d' % op) 2468 2469 connection_id = _read_uint32_field(header, 'conn') 2470 t = _read_time_field (header, 'time') 2471 2472 # Get the message type 2473 connection_info = self.bag._connections[connection_id] 2474 try: 2475 msg_type = _get_message_type(connection_info) 2476 except KeyError: 2477 raise ROSBagException('Cannot deserialize messages of type [%s]. Message was not preceded in bag by definition' % connection_info.datatype) 2478 2479 # Read the message content 2480 data = _read_record_data(f) 2481 2482 # Deserialize the message 2483 if raw: 2484 msg = connection_info.datatype, data, connection_info.md5sum, (chunk_pos, offset), msg_type 2485 else: 2486 msg = msg_type() 2487 msg.deserialize(data) 2488 2489 return BagMessage(connection_info.topic, msg, t)
2490
2491 -def _time_to_str(secs):
2492 secs_frac = secs - int(secs) 2493 secs_frac_str = ('%.2f' % secs_frac)[1:] 2494 2495 return time.strftime('%b %d %Y %H:%M:%S', time.localtime(secs)) + secs_frac_str
2496
2497 -def _human_readable_size(size):
2498 multiple = 1024.0 2499 for suffix in ['KB', 'MB', 'GB', 'TB', 'PB', 'EB', 'ZB', 'YB']: 2500 size /= multiple 2501 if size < multiple: 2502 return '%.1f %s' % (size, suffix) 2503 2504 return '-'
2505
2506 -def _human_readable_frequency(freq):
2507 multiple = 1000.0 2508 for suffix in ['Hz', 'kHz', 'MHz', 'GHz', 'THz', 'PHz', 'EHz', 'ZHz', 'YHz']: 2509 if freq < multiple: 2510 return '%.1f %s' % (freq, suffix) 2511 freq /= multiple 2512 2513 return '-'
2514
2515 ## See http://code.activestate.com/recipes/511509 2516 -def _mergesort(list_of_lists, key=None):
2517 """ 2518 Perform an N-way merge operation on sorted lists. 2519 2520 @param list_of_lists: (really iterable of iterable) of sorted elements 2521 (either by naturally or by C{key}) 2522 @param key: specify sort key function (like C{sort()}, C{sorted()}) 2523 @param iterfun: function that returns an iterator. 2524 2525 Yields tuples of the form C{(item, iterator)}, where the iterator is the 2526 built-in list iterator or something you pass in, if you pre-generate the 2527 iterators. 2528 2529 This is a stable merge; complexity O(N lg N) 2530 2531 Examples:: 2532 2533 print list(x[0] for x in mergesort([[1,2,3,4], 2534 [2,3.5,3.7,4.5,6,7], 2535 [2.6,3.6,6.6,9]])) 2536 [1, 2, 2, 2.6, 3, 3.5, 3.6, 3.7, 4, 4.5, 6, 6.6, 7, 9] 2537 2538 # note stability 2539 print list(x[0] for x in mergesort([[1,2,3,4], 2540 [2,3.5,3.7,4.5,6,7], 2541 [2.6,3.6,6.6,9]], key=int)) 2542 [1, 2, 2, 2.6, 3, 3.5, 3.6, 3.7, 4, 4.5, 6, 6.6, 7, 9] 2543 2544 print list(x[0] for x in mergesort([[4,3,2,1], 2545 [7,6.5,4,3.7,3.3,1.9], 2546 [9,8.6,7.6,6.6,5.5,4.4,3.3]], 2547 key=lambda x: -x)) 2548 [9, 8.6, 7.6, 7, 6.6, 6.5, 5.5, 4.4, 4, 4, 3.7, 3.3, 3.3, 3, 2, 1.9, 1] 2549 """ 2550 2551 heap = [] 2552 for i, itr in enumerate(iter(pl) for pl in list_of_lists): 2553 try: 2554 item = next(itr) 2555 toadd = (key(item), i, item, itr) if key else (item, i, itr) 2556 heap.append(toadd) 2557 except StopIteration: 2558 pass 2559 heapq.heapify(heap) 2560 2561 if key: 2562 while heap: 2563 _, idx, item, itr = heap[0] 2564 yield item, itr 2565 try: 2566 item = next(itr) 2567 heapq.heapreplace(heap, (key(item), idx, item, itr) ) 2568 except StopIteration: 2569 heapq.heappop(heap) 2570 2571 else: 2572 while heap: 2573 item, idx, itr = heap[0] 2574 yield item, itr 2575 try: 2576 heapq.heapreplace(heap, (next(itr), idx, itr)) 2577 except StopIteration: 2578 heapq.heappop(heap)
2579
2580 -class _CompressorFileFacade(object):
2581 """ 2582 A file facade for sequential compressors (e.g., bz2.BZ2Compressor). 2583 """
2584 - def __init__(self, file, compressor):
2585 self.file = file 2586 self.compressor = compressor 2587 self.compressed_bytes_in = 0
2588
2589 - def write(self, data):
2590 compressed = self.compressor.compress(data) 2591 if len(compressed) > 0: 2592 self.file.write(compressed) 2593 self.compressed_bytes_in += len(data)
2594
2595 - def flush(self):
2596 compressed = self.compressor.flush() 2597 if len(compressed) > 0: 2598 self.file.write(compressed)
2599
2600 -def _median(values):
2601 values_len = len(values) 2602 if values_len == 0: 2603 return float('nan') 2604 2605 sorted_values = sorted(values) 2606 if values_len % 2 == 1: 2607 return sorted_values[int(values_len / 2)] 2608 2609 lower = sorted_values[int(values_len / 2) - 1] 2610 upper = sorted_values[int(values_len / 2)] 2611 2612 return float(lower + upper) / 2
2613