Package rosbag :: Module bag
[frames] | no frames]

Source Code for Module rosbag.bag

   1  # Software License Agreement (BSD License) 
   2  # 
   3  # Copyright (c) 2010, Willow Garage, Inc. 
   4  # All rights reserved. 
   5  # 
   6  # Redistribution and use in source and binary forms, with or without 
   7  # modification, are permitted provided that the following conditions 
   8  # are met: 
   9  # 
  10  #  * Redistributions of source code must retain the above copyright 
  11  #    notice, this list of conditions and the following disclaimer. 
  12  #  * Redistributions in binary form must reproduce the above 
  13  #    copyright notice, this list of conditions and the following 
  14  #    disclaimer in the documentation and/or other materials provided 
  15  #    with the distribution. 
  16  #  * Neither the name of Willow Garage, Inc. nor the names of its 
  17  #    contributors may be used to endorse or promote products derived 
  18  #    from this software without specific prior written permission. 
  19  # 
  20  # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 
  21  # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 
  22  # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS 
  23  # FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE 
  24  # COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, 
  25  # INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, 
  26  # BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; 
  27  # LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER 
  28  # CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 
  29  # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN 
  30  # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 
  31  # POSSIBILITY OF SUCH DAMAGE. 
  32   
  33  """ 
  34  The rosbag Python API. 
  35   
  36  Provides serialization of bag files. 
  37  """ 
  38   
  39  PKG = 'rosbag' 
  40  import roslib; roslib.load_manifest(PKG) 
  41   
  42  import bisect 
  43  import bz2 
  44  from cStringIO import StringIO 
  45  import heapq 
  46  import os 
  47  import re 
  48  import struct 
  49  import sys 
  50  import threading 
  51  import time 
  52  import yaml 
  53   
  54  import roslib.genpy 
  55  import rospy 
56 57 -class ROSBagException(Exception):
58 """ 59 Base class for exceptions in rosbag. 60 """
61 - def __init__(self, value):
62 self.value = value
63
64 - def __str__(self):
65 return self.value
66
67 -class ROSBagFormatException(ROSBagException):
68 """ 69 Exceptions for errors relating to the bag file format. 70 """
71 - def __init__(self, value):
72 ROSBagException.__init__(self, value)
73
74 -class ROSBagUnindexedException(ROSBagException):
75 """ 76 Exception for unindexed bags. 77 """
78 - def __init__(self):
79 ROSBagException.__init__(self, 'Unindexed bag')
80
81 -class Compression:
82 """ 83 Allowable compression types 84 """ 85 NONE = 'none' 86 BZ2 = 'bz2'
87
88 -class Bag(object):
89 """ 90 Bag serialize messages to and from a single file on disk using the bag format. 91 """
92 - def __init__(self, f, mode='r', compression=Compression.NONE, chunk_threshold=768 * 1024, allow_unindexed=False, options=None, skip_index=False):
93 """ 94 Open a bag file. The mode can be 'r', 'w', or 'a' for reading (default), 95 writing or appending. The file will be created if it doesn't exist 96 when opened for writing or appending; it will be truncated when opened 97 for writing. Simultaneous reading and writing is allowed when in writing 98 or appending mode. 99 @param f: filename of bag to open or a stream to read from 100 @type f: str or file 101 @param mode: mode, either 'r', 'w', or 'a' 102 @type mode: str 103 @param compression: compression mode, see U{rosbag.Compression} for valid modes 104 @type compression: str 105 @param chunk_threshold: minimum number of uncompressed bytes per chunk 106 @type chunk_threshold: int 107 @param allow_unindexed: if True, allow opening unindexed bags 108 @type allow_unindexed: bool 109 @param options: the bag options (currently: compression and chunk_threshold) 110 @type options: dict 111 @param skip_index: if True, don't read the connection index records on open [2.0+] 112 @type skip_index: bool 113 @raise ValueError: if any argument is invalid 114 @raise ROSBagException: if an error occurs opening file 115 @raise ROSBagFormatException: if bag format is corrupted 116 """ 117 if options is not None: 118 if type(options) is not dict: 119 raise ValueError('options must be of type dict') 120 if 'compression' in options: 121 compression = options['compression'] 122 if 'chunk_threshold' in options: 123 chunk_threshold = options['chunk_threshold'] 124 125 self._file = None 126 self._filename = None 127 self._version = None 128 129 allowed_compressions = [Compression.NONE, Compression.BZ2] 130 if compression not in allowed_compressions: 131 raise ValueError('compression must be one of: %s' % ', '.join(allowed_compressions)) 132 self._compression = compression 133 134 if chunk_threshold < 0: 135 raise ValueError('chunk_threshold must be greater than or equal to zero') 136 self._chunk_threshold = chunk_threshold 137 138 self._skip_index = skip_index 139 140 self._reader = None 141 142 self._file_header_pos = None 143 self._index_data_pos = 0 # (1.2+) 144 145 self._clear_index() 146 147 self._buffer = StringIO() 148 149 self._curr_compression = Compression.NONE 150 151 self._open(f, mode, allow_unindexed) 152 153 self._output_file = self._file
154
155 - def __iter__(self):
156 return self.read_messages()
157
158 - def __enter__(self):
159 return self
160
161 - def __exit__(self, exc_type, exc_value, traceback):
162 self.close()
163 164 @property
165 - def options(self):
166 """Get the options.""" 167 return { 'compression' : self._compression, 'chunk_threshold' : self._chunk_threshold }
168 169 @property
170 - def filename(self):
171 """Get the filename.""" 172 return self._filename
173 174 @property
175 - def version(self):
176 """Get the version.""" 177 return self._version
178 179 @property
180 - def mode(self):
181 """Get the mode.""" 182 return self._mode
183 184 @property
185 - def size(self):
186 """Get the size in bytes.""" 187 if not self._file: 188 raise ValueError('I/O operation on closed bag') 189 190 pos = self._file.tell() 191 self._file.seek(0, os.SEEK_END) 192 size = self._file.tell() 193 self._file.seek(pos) 194 return size
195 196 # compression 197
198 - def _get_compression(self):
199 """Get the compression method to use for writing.""" 200 return self._compression
201
202 - def _set_compression(self, compression):
203 """Set the compression method to use for writing.""" 204 allowed_compressions = [Compression.NONE, Compression.BZ2] 205 if compression not in allowed_compressions: 206 raise ValueError('compression must be one of: %s' % ', '.join(allowed_compressions)) 207 208 self.flush() 209 self._compression = compression
210 211 compression = property(_get_compression, _set_compression) 212 213 # chunk_threshold 214
215 - def _get_chunk_threshold(self):
216 """Get the chunk threshold to use for writing.""" 217 return self._chunk_threshold
218
219 - def _set_chunk_threshold(self, chunk_threshold):
220 """Set the chunk threshold to use for writing.""" 221 if chunk_threshold < 0: 222 raise ValueError('chunk_threshold must be greater than or equal to zero') 223 224 self.flush() 225 self._chunk_threshold = chunk_threshold
226 227 chunk_threshold = property(_get_chunk_threshold, _set_chunk_threshold) 228
229 - def read_messages(self, topics=None, start_time=None, end_time=None, connection_filter=None, raw=False):
230 """ 231 Read messages from the bag, optionally filtered by topic, timestamp and connection details. 232 @param topics: list of topics or a single topic [optional] 233 @type topics: list(str) or str 234 @param start_time: earliest timestamp of message to return [optional] 235 @type start_time: U{roslib.rostime.Time} 236 @param end_time: latest timestamp of message to return [optional] 237 @type end_time: U{roslib.rostime.Time} 238 @param connection_filter: function to filter connections to include [optional] 239 @type connection_filter: function taking (topic, datatype, md5sum, msg_def, header) and returning bool 240 @param raw: if True, then generate tuples of (datatype, (data, md5sum, position), pytype) 241 @type raw: bool 242 @return: generator of (topic, message, timestamp) tuples for each message in the bag file 243 @rtype: generator of tuples of (str, U{roslib.message.Message}, U{roslib.rostime.Time}) [not raw] or (str, (str, str, str, tuple, class), U{roslib.rostime.Time}) [raw] 244 """ 245 self.flush() 246 247 if topics and type(topics) is str: 248 topics = [topics] 249 250 return self._reader.read_messages(topics, start_time, end_time, connection_filter, raw)
251
252 - def flush(self):
253 """ 254 Write the open chunk to disk so subsequent reads will read all messages. 255 @raise ValueError: if bag is closed 256 """ 257 if not self._file: 258 raise ValueError('I/O operation on closed bag') 259 260 if self._chunk_open: 261 self._stop_writing_chunk()
262
263 - def write(self, topic, msg, t=None, raw=False):
264 """ 265 Write a message to the bag. 266 @param topic: name of topic 267 @type topic: str 268 @param msg: message to add to bag, or tuple (if raw) 269 @type msg: Message or tuple of raw message data 270 @param t: ROS time of message publication, if None specifed, use current time [optional] 271 @type t: U{roslib.rostime.Time} 272 @param raw: if True, msg is in raw format, i.e. (msg_type, serialized_bytes, md5sum, pytype) 273 @type raw: bool 274 @raise ValueError: if arguments are invalid or bag is closed 275 """ 276 if not self._file: 277 raise ValueError('I/O operation on closed bag') 278 279 if not topic: 280 raise ValueError('topic is invalid') 281 if not msg: 282 raise ValueError('msg is invalid') 283 284 if t is None: 285 t = roslib.rostime.Time.from_sec(time.time()) 286 287 # Seek to end (in case previous operation was a read) 288 self._file.seek(0, os.SEEK_END) 289 290 # Open a chunk, if needed 291 if not self._chunk_open: 292 self._start_writing_chunk(t) 293 294 # Unpack raw 295 if raw: 296 if len(msg) == 5: 297 msg_type, serialized_bytes, md5sum, pos, pytype = msg 298 elif len(msg) == 4: 299 msg_type, serialized_bytes, md5sum, pytype = msg 300 else: 301 raise ValueError('msg must be of length 4 or 5') 302 303 # Write connection record, if necessary (currently using a connection per topic; ignoring message connection header) 304 if topic in self._topic_connections: 305 connection_info = self._topic_connections[topic] 306 conn_id = connection_info.id 307 else: 308 conn_id = len(self._connections) 309 310 if raw: 311 if pytype is None: 312 try: 313 pytype = roslib.message.get_message_class(msg_type) 314 except Exception: 315 pytype = None 316 if pytype is None: 317 raise ROSBagException('cannot locate message class and no message class provided for [%s]' % msg_type) 318 319 if pytype._md5sum != md5sum: 320 print >> sys.stderr, 'WARNING: md5sum of loaded type [%s] does not match that specified' % msg_type 321 #raise ROSBagException('md5sum of loaded type does not match that of data being recorded') 322 323 header = { 'topic' : topic, 'type' : msg_type, 'md5sum' : md5sum, 'message_definition' : pytype._full_text } 324 else: 325 header = { 'topic' : topic, 'type' : msg.__class__._type, 'md5sum' : msg.__class__._md5sum, 'message_definition' : msg._full_text } 326 327 connection_info = _ConnectionInfo(conn_id, topic, header) 328 329 self._write_connection_record(connection_info) 330 331 self._connections[conn_id] = connection_info 332 self._topic_connections[topic] = connection_info 333 334 # Create an index entry 335 index_entry = _IndexEntry200(t, self._curr_chunk_info.pos, self._get_chunk_offset()) 336 337 # Update the indexes and current chunk info 338 if conn_id not in self._curr_chunk_connection_indexes: 339 # This is the first message on this connection in the chunk 340 self._curr_chunk_connection_indexes[conn_id] = [index_entry] 341 self._curr_chunk_info.connection_counts[conn_id] = 1 342 else: 343 curr_chunk_connection_index = self._curr_chunk_connection_indexes[conn_id] 344 if index_entry >= curr_chunk_connection_index[-1]: 345 # Test if we're writing chronologically. Can skip binary search if so. 346 curr_chunk_connection_index.append(index_entry) 347 else: 348 bisect.insort_right(curr_chunk_connection_index, index_entry) 349 350 self._curr_chunk_info.connection_counts[conn_id] += 1 351 352 if conn_id not in self._connection_indexes: 353 self._connection_indexes[conn_id] = [index_entry] 354 else: 355 bisect.insort_right(self._connection_indexes[conn_id], index_entry) 356 357 # Update the chunk start/end times 358 if t > self._curr_chunk_info.end_time: 359 self._curr_chunk_info.end_time = t 360 elif t < self._curr_chunk_info.start_time: 361 self._curr_chunk_info.start_time = t 362 363 if not raw: 364 # Serialize the message to the buffer 365 self._buffer.seek(0) 366 self._buffer.truncate(0) 367 msg.serialize(self._buffer) 368 serialized_bytes = self._buffer.getvalue() 369 370 # Write message data record 371 self._write_message_data_record(conn_id, t, serialized_bytes) 372 373 # Check if we want to stop this chunk 374 chunk_size = self._get_chunk_offset() 375 if chunk_size > self._chunk_threshold: 376 self._stop_writing_chunk()
377
378 - def reindex(self):
379 """ 380 Reindexes the bag file. Yields position of each chunk for progress. 381 """ 382 self._clear_index() 383 return self._reader.reindex()
384
385 - def close(self):
386 """ 387 Close the bag file. Closing an already closed bag does nothing. 388 """ 389 if self._file: 390 if self._mode in 'wa': 391 self._stop_writing() 392 393 self._close_file()
394
395 - def __str__(self):
396 rows = [] 397 398 try: 399 if self._filename: 400 rows.append(('path', self._filename)) 401 402 if self._version == 102 and type(self._reader) == _BagReader102_Unindexed: 403 rows.append(('version', '1.2 (unindexed)')) 404 else: 405 rows.append(('version', '%d.%d' % (self._version / 100, self._version % 100))) 406 407 if not self._connection_indexes and not self._chunks: 408 rows.append(('size', _human_readable_size(self.size))) 409 else: 410 if self._chunks: 411 start_stamp = self._chunks[ 0].start_time.to_sec() 412 end_stamp = self._chunks[-1].end_time.to_sec() 413 else: 414 start_stamp = min([index[ 0].time.to_sec() for index in self._connection_indexes.itervalues()]) 415 end_stamp = max([index[-1].time.to_sec() for index in self._connection_indexes.itervalues()]) 416 417 # Show duration 418 duration = end_stamp - start_stamp 419 dur_secs = duration % 60 420 dur_mins = int(duration / 60) 421 dur_hrs = int(dur_mins / 60) 422 if dur_hrs > 0: 423 dur_mins = dur_mins % 60 424 duration_str = '%dhr %d:%02ds (%ds)' % (dur_hrs, dur_mins, dur_secs, duration) 425 elif dur_mins > 0: 426 duration_str = '%d:%02ds (%ds)' % (dur_mins, dur_secs, duration) 427 else: 428 duration_str = '%.1fs' % duration 429 430 rows.append(('duration', duration_str)) 431 432 # Show start and end times 433 rows.append(('start', '%s (%.2f)' % (_time_to_str(start_stamp), start_stamp))) 434 rows.append(('end', '%s (%.2f)' % (_time_to_str(end_stamp), end_stamp))) 435 436 rows.append(('size', _human_readable_size(self.size))) 437 438 if self._chunks: 439 num_messages = 0 440 for c in self._chunks: 441 for counts in c.connection_counts.itervalues(): 442 num_messages += counts 443 else: 444 num_messages = sum([len(index) for index in self._connection_indexes.itervalues()]) 445 rows.append(('messages', str(num_messages))) 446 447 # Show compression information 448 if len(self._chunk_headers) == 0: 449 rows.append(('compression', 'none')) 450 else: 451 compression_counts = {} 452 compression_uncompressed = {} 453 compression_compressed = {} 454 for chunk_header in self._chunk_headers.itervalues(): 455 if chunk_header.compression not in compression_counts: 456 compression_counts[chunk_header.compression] = 1 457 compression_uncompressed[chunk_header.compression] = chunk_header.uncompressed_size 458 compression_compressed[chunk_header.compression] = chunk_header.compressed_size 459 else: 460 compression_counts[chunk_header.compression] += 1 461 compression_uncompressed[chunk_header.compression] += chunk_header.uncompressed_size 462 compression_compressed[chunk_header.compression] += chunk_header.compressed_size 463 464 chunk_count = len(self._chunk_headers) 465 466 compressions = [] 467 for count, compression in reversed(sorted([(v, k) for k, v in compression_counts.items()])): 468 if compression != Compression.NONE: 469 fraction = (100.0 * compression_compressed[compression]) / compression_uncompressed[compression] 470 compressions.append('%s [%d/%d chunks; %.2f%%]' % (compression, count, chunk_count, fraction)) 471 else: 472 compressions.append('%s [%d/%d chunks]' % (compression, count, chunk_count)) 473 rows.append(('compression', ', '.join(compressions))) 474 475 all_uncompressed = (sum([count for c, count in compression_counts.items() if c != Compression.NONE]) == 0) 476 if not all_uncompressed: 477 total_uncompressed_size = sum((h.uncompressed_size for h in self._chunk_headers.itervalues())) 478 total_compressed_size = sum((h.compressed_size for h in self._chunk_headers.itervalues())) 479 480 total_uncompressed_size_str = _human_readable_size(total_uncompressed_size) 481 total_compressed_size_str = _human_readable_size(total_compressed_size) 482 total_size_str_length = max([len(total_uncompressed_size_str), len(total_compressed_size_str)]) 483 484 if duration > 0: 485 uncompressed_rate_str = _human_readable_size(total_uncompressed_size / duration) 486 compressed_rate_str = _human_readable_size(total_compressed_size / duration) 487 rate_str_length = max([len(uncompressed_rate_str), len(compressed_rate_str)]) 488 489 rows.append(('uncompressed', '%*s @ %*s/s' % 490 (total_size_str_length, total_uncompressed_size_str, rate_str_length, uncompressed_rate_str))) 491 rows.append(('compressed', '%*s @ %*s/s (%.2f%%)' % 492 (total_size_str_length, total_compressed_size_str, rate_str_length, compressed_rate_str, (100.0 * total_compressed_size) / total_uncompressed_size))) 493 else: 494 rows.append(('uncompressed', '%*s' % (total_size_str_length, total_uncompressed_size_str))) 495 rows.append(('compressed', '%*s' % (total_size_str_length, total_compressed_size_str))) 496 497 datatypes = set() 498 datatype_infos = [] 499 for connection in self._connections.itervalues(): 500 if connection.datatype in datatypes: 501 continue 502 datatype_infos.append((connection.datatype, connection.md5sum, connection.msg_def)) 503 datatypes.add(connection.datatype) 504 505 topics = sorted(set([c.topic for c in self._get_connections()])) 506 topic_datatypes = {} 507 topic_conn_counts = {} 508 topic_msg_counts = {} 509 topic_freqs_median = {} 510 for topic in topics: 511 connections = list(self._get_connections(topic)) 512 513 topic_datatypes[topic] = connections[0].datatype 514 topic_conn_counts[topic] = len(connections) 515 516 msg_count = 0 517 for connection in connections: 518 for chunk in self._chunks: 519 msg_count += chunk.connection_counts.get(connection.id, 0) 520 topic_msg_counts[topic] = msg_count 521 522 if self._connection_indexes_read: 523 stamps = [entry.time.to_sec() for entry in self._get_entries(connections)] 524 if len(stamps) > 1: 525 periods = [s1 - s0 for s1, s0 in zip(stamps[1:], stamps[:-1])] 526 med_period = _median(periods) 527 if med_period > 0.0: 528 topic_freqs_median[topic] = 1.0 / med_period 529 530 topics = sorted(topic_datatypes.iterkeys()) 531 max_topic_len = max([len(topic) for topic in topics]) 532 max_datatype_len = max([len(datatype) for datatype in datatypes]) 533 max_msg_count_len = max([len('%d' % msg_count) for msg_count in topic_msg_counts.itervalues()]) 534 max_freq_median_len = max([len(_human_readable_frequency(freq)) for freq in topic_freqs_median.itervalues()]) if len(topic_freqs_median) > 0 else 0 535 536 # Show datatypes 537 for i, (datatype, md5sum, msg_def) in enumerate(sorted(datatype_infos)): 538 s = '%-*s [%s]' % (max_datatype_len, datatype, md5sum) 539 if i == 0: 540 rows.append(('types', s)) 541 else: 542 rows.append(('', s)) 543 544 # Show topics 545 for i, topic in enumerate(topics): 546 topic_msg_count = topic_msg_counts[topic] 547 548 s = '%-*s %*d %s' % (max_topic_len, topic, max_msg_count_len, topic_msg_count, 'msgs' if topic_msg_count > 1 else 'msg ') 549 if topic in topic_freqs_median: 550 s += ' @ %*s' % (max_freq_median_len, _human_readable_frequency(topic_freqs_median[topic])) 551 else: 552 s += ' %*s' % (max_freq_median_len, '') 553 554 s += ' : %-*s' % (max_datatype_len, topic_datatypes[topic]) 555 if topic_conn_counts[topic] > 1: 556 s += ' (%d connections)' % topic_conn_counts[topic] 557 558 if i == 0: 559 rows.append(('topics', s)) 560 else: 561 rows.append(('', s)) 562 563 except Exception, ex: 564 raise 565 566 first_column_width = max([len(field) for field, _ in rows]) + 1 567 568 s = '' 569 for (field, value) in rows: 570 if field: 571 s += '%-*s %s\n' % (first_column_width, field + ':', value) 572 else: 573 s += '%-*s %s\n' % (first_column_width, '', value) 574 575 return s.rstrip()
576
577 - def _get_yaml_info(self, key=None):
578 s = '' 579 580 try: 581 if self._filename: 582 s += 'path: %s\n' % self._filename 583 584 if self._version == 102 and type(self._reader) == _BagReader102_Unindexed: 585 s += 'version: 1.2 (unindexed)\n' 586 else: 587 s += 'version: %d.%d\n' % (self._version / 100, self._version % 100) 588 589 if not self._connection_indexes and not self._chunks: 590 s += 'size: %d\n' % self.size 591 s += 'indexed: False\n' 592 else: 593 if self._chunks: 594 start_stamp = self._chunks[ 0].start_time.to_sec() 595 end_stamp = self._chunks[-1].end_time.to_sec() 596 else: 597 start_stamp = min([index[ 0].time.to_sec() for index in self._connection_indexes.itervalues()]) 598 end_stamp = max([index[-1].time.to_sec() for index in self._connection_indexes.itervalues()]) 599 600 duration = end_stamp - start_stamp 601 s += 'duration: %.6f\n' % duration 602 s += 'start: %.6f\n' % start_stamp 603 s += 'end: %.6f\n' % end_stamp 604 s += 'size: %d\n' % self.size 605 if self._chunks: 606 num_messages = 0 607 for c in self._chunks: 608 for counts in c.connection_counts.itervalues(): 609 num_messages += counts 610 else: 611 num_messages = sum([len(index) for index in self._connection_indexes.itervalues()]) 612 s += 'messages: %d\n' % num_messages 613 s += 'indexed: True\n' 614 615 # Show compression information 616 if len(self._chunk_headers) == 0: 617 s += 'compression: none\n' 618 else: 619 compression_counts = {} 620 compression_uncompressed = {} 621 compression_compressed = {} 622 for chunk_header in self._chunk_headers.itervalues(): 623 if chunk_header.compression not in compression_counts: 624 compression_counts[chunk_header.compression] = 1 625 compression_uncompressed[chunk_header.compression] = chunk_header.uncompressed_size 626 compression_compressed[chunk_header.compression] = chunk_header.compressed_size 627 else: 628 compression_counts[chunk_header.compression] += 1 629 compression_uncompressed[chunk_header.compression] += chunk_header.uncompressed_size 630 compression_compressed[chunk_header.compression] += chunk_header.compressed_size 631 632 chunk_count = len(self._chunk_headers) 633 634 main_compression_count, main_compression = list(reversed(sorted([(v, k) for k, v in compression_counts.items()])))[0] 635 s += 'compression: %s\n' % str(main_compression) 636 637 all_uncompressed = (sum([count for c, count in compression_counts.items() if c != Compression.NONE]) == 0) 638 if not all_uncompressed: 639 s += 'uncompressed: %d\n' % sum((h.uncompressed_size for h in self._chunk_headers.itervalues())) 640 s += 'compressed: %d\n' % sum((h.compressed_size for h in self._chunk_headers.itervalues())) 641 642 datatypes = set() 643 datatype_infos = [] 644 for connection in self._connections.itervalues(): 645 if connection.datatype in datatypes: 646 continue 647 datatype_infos.append((connection.datatype, connection.md5sum, connection.msg_def)) 648 datatypes.add(connection.datatype) 649 650 topics = sorted(set([c.topic for c in self._get_connections()])) 651 topic_datatypes = {} 652 topic_conn_counts = {} 653 topic_msg_counts = {} 654 topic_freqs_median = {} 655 for topic in topics: 656 connections = list(self._get_connections(topic)) 657 658 topic_datatypes[topic] = connections[0].datatype 659 topic_conn_counts[topic] = len(connections) 660 661 msg_count = 0 662 for connection in connections: 663 for chunk in self._chunks: 664 msg_count += chunk.connection_counts.get(connection.id, 0) 665 topic_msg_counts[topic] = msg_count 666 667 if self._connection_indexes_read: 668 stamps = [entry.time.to_sec() for entry in self._get_entries(connections)] 669 if len(stamps) > 1: 670 periods = [s1 - s0 for s1, s0 in zip(stamps[1:], stamps[:-1])] 671 med_period = _median(periods) 672 if med_period > 0.0: 673 topic_freqs_median[topic] = 1.0 / med_period 674 675 topics = sorted(topic_datatypes.iterkeys()) 676 max_topic_len = max([len(topic) for topic in topics]) 677 max_datatype_len = max([len(datatype) for datatype in datatypes]) 678 max_msg_count_len = max([len('%d' % msg_count) for msg_count in topic_msg_counts.itervalues()]) 679 max_freq_median_len = max([len(_human_readable_frequency(freq)) for freq in topic_freqs_median.itervalues()]) if len(topic_freqs_median) > 0 else 0 680 681 # Show datatypes 682 s += 'types:\n' 683 for i, (datatype, md5sum, msg_def) in enumerate(sorted(datatype_infos)): 684 s += ' - type: %s\n' % datatype 685 s += ' md5: %s\n' % md5sum 686 687 # Show topics 688 s += 'topics:\n' 689 for i, topic in enumerate(topics): 690 topic_msg_count = topic_msg_counts[topic] 691 692 s += ' - topic: %s\n' % topic 693 s += ' type: %s\n' % topic_datatypes[topic] 694 s += ' messages: %d\n' % topic_msg_count 695 696 if topic_conn_counts[topic] > 1: 697 s += ' connections: %d\n' % topic_conn_counts[topic] 698 699 if topic in topic_freqs_median: 700 s += ' frequency: %.4f\n' % topic_freqs_median[topic] 701 702 if not key: 703 return s 704 705 class DictObject(object): 706 def __init__(self, d): 707 for a, b in d.items(): 708 if isinstance(b, (list, tuple)): 709 setattr(self, a, [DictObject(x) if isinstance(x, dict) else x for x in b]) 710 else: 711 setattr(self, a, DictObject(b) if isinstance(b, dict) else b)
712 713 obj = DictObject(yaml.load(s)) 714 try: 715 val = eval('obj.' + key) 716 except Exception, ex: 717 print >> sys.stderr, 'Error getting key "%s"' % key 718 return None 719 720 def print_yaml(val, indent=0): 721 indent_str = ' ' * indent 722 723 if type(val) is list: 724 s = '' 725 for item in val: 726 s += '%s- %s\n' % (indent_str, print_yaml(item, indent + 1)) 727 return s 728 elif type(val) is DictObject: 729 s = '' 730 for i, (k, v) in enumerate(val.__dict__.items()): 731 if i != 0: 732 s += indent_str 733 s += '%s: %s' % (k, str(v)) 734 if i < len(val.__dict__) - 1: 735 s += '\n' 736 return s 737 else: 738 return indent_str + str(val) 739 740 return print_yaml(val) 741 742 except Exception, ex: 743 raise 744 745 ### Internal API ### 746 747 @property
748 - def _has_compressed_chunks(self):
749 if not self._chunk_headers: 750 return False 751 752 return any((h.compression != Compression.NONE for h in self._chunk_headers.itervalues()))
753 754 @property
755 - def _uncompressed_size(self):
756 if not self._chunk_headers: 757 return self.size 758 759 return sum((h.uncompressed_size for h in self._chunk_headers.itervalues()))
760
761 - def _read_message(self, position, raw=False):
762 """ 763 Read the message from the given position in the file. 764 """ 765 self.flush() 766 return self._reader.seek_and_read_message_data_record(position, raw)
767 768 # Index accessing 769
770 - def _get_connections(self, topics=None, connection_filter=None):
771 """ 772 Yield the connections, optionally filtering by topic and/or connection information. 773 """ 774 if topics: 775 if type(topics) is str: 776 topics = set([roslib.names.canonicalize_name(topics)]) 777 else: 778 topics = set([roslib.names.canonicalize_name(t) for t in topics]) 779 780 for c in self._connections.itervalues(): 781 if topics and c.topic not in topics and roslib.names.canonicalize_name(c.topic) not in topics: 782 continue 783 if connection_filter and not connection_filter(c.topic, c.datatype, c.md5sum, c.msg_def, c.header): 784 continue 785 yield c
786
787 - def _get_entries(self, connections=None, start_time=None, end_time=None):
788 """ 789 Yield index entries on the given connections in the given time range. 790 """ 791 for entry, _ in _mergesort(self._get_indexes(connections), key=lambda entry: entry.time): 792 if start_time and entry.time < start_time: 793 continue 794 if end_time and entry.time > end_time: 795 return 796 yield entry
797
798 - def _get_entries_reverse(self, connections=None, start_time=None, end_time=None):
799 """ 800 Yield index entries on the given connections in the given time range in reverse order. 801 """ 802 for entry, _ in _mergesort((reversed(index) for index in self._get_indexes(connections)), key=lambda entry: -entry.time.to_sec()): 803 if end_time and entry.time > end_time: 804 continue 805 if start_time and entry.time < start_time: 806 return 807 yield entry
808
809 - def _get_entry(self, t, connections=None):
810 """ 811 Return the first index entry on/before the given time on the given connections 812 """ 813 indexes = self._get_indexes(connections) 814 815 entry = _IndexEntry(t) 816 817 first_entry = None 818 819 for index in indexes: 820 i = bisect.bisect_right(index, entry) - 1 821 if i >= 0: 822 index_entry = index[i] 823 if first_entry is None or index_entry > first_entry: 824 first_entry = index_entry 825 826 return first_entry
827
828 - def _get_entry_after(self, t, connections=None):
829 """ 830 Return the first index entry after the given time on the given connections 831 """ 832 indexes = self._get_indexes(connections) 833 834 entry = _IndexEntry(t) 835 836 first_entry = None 837 838 for index in indexes: 839 i = bisect.bisect_right(index, entry) 840 if i <= len(index) - 1: 841 index_entry = index[i] 842 if first_entry is None or index_entry < first_entry: 843 first_entry = index_entry 844 845 return first_entry
846
847 - def _get_indexes(self, connections):
848 """ 849 Get the indexes for the given connections. 850 """ 851 if not self._connection_indexes_read: 852 self._reader._read_connection_index_records() 853 854 if connections is None: 855 return self._connection_indexes.itervalues() 856 else: 857 return (self._connection_indexes[c.id] for c in connections)
858 859 ### Implementation ### 860
861 - def _clear_index(self):
862 self._connection_indexes_read = False 863 self._connection_indexes = {} # id -> IndexEntry[] (1.2+) 864 865 self._topic_connections = {} # topic -> connection_id 866 self._connections = {} # id -> ConnectionInfo 867 self._connection_count = 0 # (2.0) 868 self._chunk_count = 0 # (2.0) 869 self._chunks = [] # ChunkInfo[] (2.0) 870 self._chunk_headers = {} # chunk_pos -> ChunkHeader (2.0) 871 872 self._chunk_open = False 873 self._curr_chunk_info = None 874 self._curr_chunk_data_pos = None 875 self._curr_chunk_connection_indexes = {}
876
877 - def _open(self, f, mode, allow_unindexed):
878 if not f: 879 raise ValueError('filename (or stream) is invalid') 880 881 try: 882 if mode == 'r': self._open_read(f, allow_unindexed) 883 elif mode == 'w': self._open_write(f) 884 elif mode == 'a': self._open_append(f, allow_unindexed) 885 else: 886 raise ValueError('mode "%s" is invalid' % mode) 887 except struct.error: 888 raise ROSBagFormatException('error with bag')
889
890 - def _open_read(self, f, allow_unindexed):
891 if isinstance(f, file): 892 self._file = f 893 self._filename = None 894 else: 895 self._file = open(f, 'rb') 896 self._filename = f 897 898 self._mode = 'r' 899 900 try: 901 self._version = self._read_version() 902 except: 903 self._version = None 904 self._close_file() 905 raise 906 907 try: 908 self._create_reader() 909 self._reader.start_reading() 910 except ROSBagUnindexedException, ex: 911 if not allow_unindexed: 912 self._close_file() 913 raise 914 except: 915 self._close_file() 916 raise
917
918 - def _open_write(self, f):
919 if isinstance(f, file): 920 self._file = f 921 self._filename = None 922 else: 923 self._file = open(f, 'w+b') 924 self._filename = f 925 926 self._mode = 'w' 927 928 self._version = 200 929 930 try: 931 self._create_reader() 932 self._start_writing() 933 except: 934 self._close_file() 935 raise
936
937 - def _open_append(self, f, allow_unindexed):
938 if isinstance(f, file): 939 self._file = f 940 self._filename = None 941 else: 942 try: 943 # Test if the file already exists 944 open(f, 'r').close() 945 946 # File exists: open in read with update mode 947 self._file = open(f, 'r+b') 948 except IOError: 949 # File doesn't exist: open in write mode 950 self._file = open(f, 'w+b') 951 952 self._filename = f 953 954 self._mode = 'a' 955 956 try: 957 self._version = self._read_version() 958 except: 959 self._version = None 960 self._close_file() 961 raise 962 963 if self._version != 200: 964 self._file.close() 965 raise ROSBagException('bag version %d is unsupported for appending' % self._version) 966 967 try: 968 self._start_appending() 969 except ROSBagUnindexedException: 970 if not allow_unindexed: 971 self._close_file() 972 raise 973 except: 974 self._close_file() 975 raise
976
977 - def _close_file(self):
978 self._file.close() 979 self._file = None
980
981 - def _create_reader(self):
982 """ 983 @raise ROSBagException: if the bag version is unsupported 984 """ 985 major_version, minor_version = self._version / 100, self._version % 100 986 if major_version == 2: 987 self._reader = _BagReader200(self) 988 elif major_version == 1: 989 if minor_version == 1: 990 self._reader = _BagReader101(self) 991 else: 992 # Get the op code of the first record. If it's FILE_HEADER, then we have an indexed 1.2 bag. 993 first_record_pos = self._file.tell() 994 header = _read_header(self._file) 995 op = _read_uint8_field(header, 'op') 996 self._file.seek(first_record_pos) 997 998 if op == _OP_FILE_HEADER: 999 self._reader = _BagReader102_Indexed(self) 1000 else: 1001 self._reader = _BagReader102_Unindexed(self) 1002 else: 1003 raise ROSBagException('unknown bag version %d' % self._version)
1004
1005 - def _read_version(self):
1006 """ 1007 @raise ROSBagException: if the file is empty, or the version line can't be parsed 1008 """ 1009 version_line = self._file.readline().rstrip() 1010 if len(version_line) == 0: 1011 raise ROSBagException('empty file') 1012 1013 matches = re.match("#ROS(.*) V(\d).(\d)", version_line) 1014 if matches is None or len(matches.groups()) != 3: 1015 raise ROSBagException('This does not appear to be a bag file') 1016 1017 version_type, major_version_str, minor_version_str = matches.groups() 1018 1019 version = int(major_version_str) * 100 + int(minor_version_str) 1020 1021 return version
1022
1023 - def _start_writing(self):
1024 self._file.write(_VERSION + '\n') 1025 self._file_header_pos = self._file.tell() 1026 self._write_file_header_record(0, 0, 0) 1027
1028 - def _stop_writing(self):
1029 # Write the open chunk (if any) to file 1030 self.flush() 1031 1032 # Remember this location as the start of the index 1033 self._index_data_pos = self._file.tell() 1034 1035 # Write connection infos 1036 for connection_info in self._connections.itervalues(): 1037 self._write_connection_record(connection_info) 1038 1039 # Write chunk infos 1040 for chunk_info in self._chunks: 1041 self._write_chunk_info_record(chunk_info) 1042 1043 # Re-write the file header 1044 self._file.seek(self._file_header_pos) 1045 self._write_file_header_record(self._index_data_pos, len(self._connections), len(self._chunks))
1046
1047 - def _start_appending(self):
1048 self._file_header_pos = self._file.tell() 1049 1050 self._create_reader() 1051 self._reader.start_reading() 1052 1053 # Truncate the file to chop off the index 1054 self._file.truncate(self._index_data_pos) 1055 self._reader.index_data_pos = 0 1056 1057 # Rewrite the file header, clearing the index position (so we know if the index is invalid) 1058 self._file.seek(self._file_header_pos); 1059 self._write_file_header_record(0, 0, 0) 1060 1061 # Seek to the end of the file 1062 self._file.seek(0, os.SEEK_END)
1063
1064 - def _start_writing_chunk(self, t):
1065 self._curr_chunk_info = _ChunkInfo(self._file.tell(), t, t) 1066 self._write_chunk_header(_ChunkHeader(self._compression, 0, 0)) 1067 self._curr_chunk_data_pos = self._file.tell() 1068 self._set_compression_mode(self._compression) 1069 self._chunk_open = True
1070
1071 - def _get_chunk_offset(self):
1072 if self._compression == Compression.NONE: 1073 return self._file.tell() - self._curr_chunk_data_pos 1074 else: 1075 return self._output_file.compressed_bytes_in
1076
1077 - def _stop_writing_chunk(self):
1078 # Add this chunk to the index 1079 self._chunks.append(self._curr_chunk_info) 1080 1081 # Get the uncompressed and compressed sizes 1082 uncompressed_size = self._get_chunk_offset() 1083 self._set_compression_mode(Compression.NONE) 1084 compressed_size = self._file.tell() - self._curr_chunk_data_pos 1085 1086 # Rewrite the chunk header with the size of the chunk (remembering current offset) 1087 end_of_chunk_pos = self._file.tell() 1088 self._file.seek(self._curr_chunk_info.pos) 1089 chunk_header = _ChunkHeader(self._compression, compressed_size, uncompressed_size, self._curr_chunk_data_pos) 1090 self._write_chunk_header(chunk_header) 1091 self._chunk_headers[self._curr_chunk_info.pos] = chunk_header 1092 1093 # Write out the connection indexes and clear them 1094 self._file.seek(end_of_chunk_pos) 1095 for connection_id, entries in self._curr_chunk_connection_indexes.items(): 1096 self._write_connection_index_record(connection_id, entries) 1097 self._curr_chunk_connection_indexes.clear() 1098 1099 # Flag that we're starting a new chunk 1100 self._chunk_open = False
1101
1102 - def _set_compression_mode(self, compression):
1103 # Flush the compressor, if needed 1104 if self._curr_compression != Compression.NONE: 1105 self._output_file.flush() 1106 1107 # Create the compressor 1108 if compression == Compression.BZ2: 1109 self._output_file = _BZ2CompressorFileFacade(self._file) 1110 elif compression == Compression.NONE: 1111 self._output_file = self._file 1112 else: 1113 raise ROSBagException('unsupported compression type: %s' % compression) 1114 1115 self._curr_compression = compression
1116
1117 - def _write_file_header_record(self, index_pos, connection_count, chunk_count):
1118 header = { 1119 'op': _pack_uint8(_OP_FILE_HEADER), 1120 'index_pos': _pack_uint64(index_pos), 1121 'conn_count': _pack_uint32(connection_count), 1122 'chunk_count': _pack_uint32(chunk_count) 1123 } 1124 _write_record(self._file, header, padded_size=_FILE_HEADER_LENGTH)
1125
1126 - def _write_connection_record(self, connection_info):
1127 header = { 1128 'op': _pack_uint8(_OP_CONNECTION), 1129 'topic': connection_info.topic, 1130 'conn': _pack_uint32(connection_info.id) 1131 } 1132 1133 _write_header(self._output_file, header) 1134 1135 _write_header(self._output_file, connection_info.header)
1136
1137 - def _write_message_data_record(self, connection_id, t, serialized_bytes):
1138 header = { 1139 'op': _pack_uint8(_OP_MSG_DATA), 1140 'conn': _pack_uint32(connection_id), 1141 'time': _pack_time(t) 1142 } 1143 _write_record(self._output_file, header, serialized_bytes)
1144
1145 - def _write_chunk_header(self, chunk_header):
1146 header = { 1147 'op': _pack_uint8(_OP_CHUNK), 1148 'compression': chunk_header.compression, 1149 'size': _pack_uint32(chunk_header.uncompressed_size) 1150 } 1151 _write_header(self._file, header) 1152 1153 self._file.write(_pack_uint32(chunk_header.compressed_size))
1154
1155 - def _write_connection_index_record(self, connection_id, entries):
1156 header = { 1157 'op': _pack_uint8(_OP_INDEX_DATA), 1158 'conn': _pack_uint32(connection_id), 1159 'ver': _pack_uint32(_INDEX_VERSION), 1160 'count': _pack_uint32(len(entries)) 1161 } 1162 1163 buffer = self._buffer 1164 buffer.seek(0) 1165 buffer.truncate(0) 1166 for entry in entries: 1167 buffer.write(_pack_time (entry.time)) 1168 buffer.write(_pack_uint32(entry.offset)) 1169 1170 _write_record(self._file, header, buffer.getvalue()) 1171
1172 - def _write_chunk_info_record(self, chunk_info):
1173 header = { 1174 'op': _pack_uint8 (_OP_CHUNK_INFO), 1175 'ver': _pack_uint32(_CHUNK_INDEX_VERSION), 1176 'chunk_pos': _pack_uint64(chunk_info.pos), 1177 'start_time': _pack_time(chunk_info.start_time), 1178 'end_time': _pack_time(chunk_info.end_time), 1179 'count': _pack_uint32(len(chunk_info.connection_counts)) 1180 } 1181 1182 buffer = self._buffer 1183 buffer.seek(0) 1184 buffer.truncate(0) 1185 for connection_id, count in chunk_info.connection_counts.items(): 1186 buffer.write(_pack_uint32(connection_id)) 1187 buffer.write(_pack_uint32(count)) 1188 1189 _write_record(self._file, header, buffer.getvalue())
1190 1191 ### Implementation ### 1192 1193 _message_types = {} # md5sum -> type 1194 1195 _OP_MSG_DEF = 0x01 1196 _OP_MSG_DATA = 0x02 1197 _OP_FILE_HEADER = 0x03 1198 _OP_INDEX_DATA = 0x04 1199 _OP_CHUNK = 0x05 1200 _OP_CHUNK_INFO = 0x06 1201 _OP_CONNECTION = 0x07 1202 1203 _OP_CODES = { 1204 _OP_MSG_DEF: 'MSG_DEF', 1205 _OP_MSG_DATA: 'MSG_DATA', 1206 _OP_FILE_HEADER: 'FILE_HEADER', 1207 _OP_INDEX_DATA: 'INDEX_DATA', 1208 _OP_CHUNK: 'CHUNK', 1209 _OP_CHUNK_INFO: 'CHUNK_INFO', 1210 _OP_CONNECTION: 'CONNECTION' 1211 } 1212 1213 _VERSION = '#ROSBAG V2.0' 1214 _FILE_HEADER_LENGTH = 4096 1215 _INDEX_VERSION = 1 1216 _CHUNK_INDEX_VERSION = 1
1217 1218 -class _ConnectionInfo(object):
1219 - def __init__(self, id, topic, header):
1220 try: 1221 datatype, md5sum, msg_def = header['type'], header['md5sum'], header['message_definition'] 1222 except KeyError, ex: 1223 raise ROSBagFormatException('connection header field %s not found' % str(ex)) 1224 1225 self.id = id 1226 self.topic = topic 1227 self.datatype = datatype 1228 self.md5sum = md5sum 1229 self.msg_def = msg_def 1230 self.header = header
1231
1232 - def __str__(self):
1233 return '%d on %s: %s' % (self.id, self.topic, str(self.header))
1234
1235 -class _ChunkInfo(object):
1236 - def __init__(self, pos, start_time, end_time):
1237 self.pos = pos 1238 self.start_time = start_time 1239 self.end_time = end_time 1240 1241 self.connection_counts = {}
1242
1243 - def __str__(self):
1244 s = 'chunk_pos: %d\n' % self.pos 1245 s += 'start_time: %s\n' % str(self.start_time) 1246 s += 'end_time: %s\n' % str(self.end_time) 1247 s += 'connections: %d\n' % len(self.connection_counts) 1248 s += '\n'.join([' - [%4d] %d' % (connection_id, count) for connection_id, count in self.connection_counts.items()]) 1249 return s
1250
1251 -class _ChunkHeader(object):
1252 - def __init__(self, compression, compressed_size, uncompressed_size, data_pos=0):
1253 self.compression = compression 1254 self.compressed_size = compressed_size 1255 self.uncompressed_size = uncompressed_size 1256 self.data_pos = data_pos
1257
1258 - def __str__(self):
1259 if self.uncompressed_size > 0: 1260 ratio = 100 * (float(self.compressed_size) / self.uncompressed_size) 1261 return 'compression: %s, size: %d, uncompressed: %d (%.2f%%)' % (self.compression, self.compressed_size, self.uncompressed_size, ratio) 1262 else: 1263 return 'compression: %s, size: %d, uncompressed: %d' % (self.compression, self.compressed_size, self.uncompressed_size)
1264
1265 -class _IndexEntry(object):
1266 - def __init__(self, time):
1267 self.time = time
1268
1269 - def __cmp__(self, other):
1270 return self.time.__cmp__(other.time)
1271
1272 -class _IndexEntry102(_IndexEntry):
1273 - def __init__(self, time, offset):
1274 self.time = time 1275 self.offset = offset
1276 1277 @property
1278 - def position(self):
1279 return self.offset
1280
1281 - def __str__(self):
1282 return '%d.%d: %d' % (self.time.secs, self.time.nsecs, self.offset)
1283
1284 -class _IndexEntry200(_IndexEntry):
1285 - def __init__(self, time, chunk_pos, offset):
1286 self.time = time 1287 self.chunk_pos = chunk_pos 1288 self.offset = offset
1289 1290 @property
1291 - def position(self):
1292 return (self.chunk_pos, self.offset)
1293
1294 - def __str__(self):
1295 return '%d.%d: %d+%d' % (self.time.secs, self.time.nsecs, self.chunk_pos, self.offset)
1296
1297 -def _get_message_type(info):
1298 message_type = _message_types.get(info.md5sum) 1299 if message_type is None: 1300 try: 1301 message_type = roslib.genpy.generate_dynamic(info.datatype, info.msg_def)[info.datatype] 1302 if (message_type._md5sum != info.md5sum): 1303 print >> sys.stderr, 'WARNING: For type [%s] stored md5sum [%s] does not match message definition [%s].\n Try: "rosrun rosbag fix_msg_defs.py old_bag new_bag."'%(info.datatype, info.md5sum, message_type._md5sum) 1304 except roslib.msgs.MsgSpecException: 1305 message_type = roslib.genpy.generate_dynamic(info.datatype, "")[info.datatype] 1306 print >> sys.stderr, 'WARNING: For type [%s] stored md5sum [%s] has invalid message definition."'%(info.datatype, info.md5sum) 1307 except roslib.genpy.MsgGenerationException, ex: 1308 raise ROSBagException('Error generating datatype %s: %s' % (info.datatype, str(ex))) 1309 1310 _message_types[info.md5sum] = message_type 1311 1312 return message_type
1313
1314 -def _read_uint8 (f): return _unpack_uint8 (f.read(1))
1315 -def _read_uint32(f): return _unpack_uint32(f.read(4))
1316 -def _read_uint64(f): return _unpack_uint64(f.read(8))
1317 -def _read_time (f): return _unpack_time (f.read(8))
1318
1319 -def _unpack_uint8(v): return struct.unpack('<B', v)[0]
1320 -def _unpack_uint32(v): return struct.unpack('<L', v)[0]
1321 -def _unpack_uint64(v): return struct.unpack('<Q', v)[0]
1322 -def _unpack_time(v): return rospy.Time(*struct.unpack('<LL', v))
1323
1324 -def _pack_uint8(v): return struct.pack('<B', v)
1325 -def _pack_uint32(v): return struct.pack('<L', v)
1326 -def _pack_uint64(v): return struct.pack('<Q', v)
1327 -def _pack_time(v): return _pack_uint32(v.secs) + _pack_uint32(v.nsecs)
1328
1329 -def _read(f, size):
1330 data = f.read(size) 1331 if len(data) != size: 1332 raise ROSBagException('expecting %d bytes, read %d' % (size, len(data))) 1333 return data
1334
1335 -def _skip_record(f):
1336 _skip_sized(f) # skip header 1337 _skip_sized(f) # skip data
1338
1339 -def _skip_sized(f):
1340 size = _read_uint32(f) 1341 f.seek(size, os.SEEK_CUR)
1342
1343 -def _read_sized(f):
1344 try: 1345 size = _read_uint32(f) 1346 except struct.error, ex: 1347 raise ROSBagFormatException('error unpacking uint32: %s' % str(ex)) 1348 return _read(f, size)
1349
1350 -def _write_sized(f, v):
1351 f.write(_pack_uint32(len(v))) 1352 f.write(v)
1353
1354 -def _read_field(header, field, unpack_fn):
1355 if field not in header: 1356 raise ROSBagFormatException('expected "%s" field in record' % field) 1357 1358 try: 1359 value = unpack_fn(header[field]) 1360 except Exception, ex: 1361 raise ROSBagFormatException('error reading field "%s": %s' % (field, str(ex))) 1362 1363 return value
1364
1365 -def _read_str_field (header, field): return _read_field(header, field, lambda v: v)
1366 -def _read_uint8_field (header, field): return _read_field(header, field, _unpack_uint8)
1367 -def _read_uint32_field(header, field): return _read_field(header, field, _unpack_uint32)
1368 -def _read_uint64_field(header, field): return _read_field(header, field, _unpack_uint64)
1369 -def _read_time_field (header, field): return _read_field(header, field, _unpack_time)
1370
1371 -def _write_record(f, header, data='', padded_size=None):
1372 header_str = _write_header(f, header) 1373 1374 if padded_size is not None: 1375 header_len = len(header_str) 1376 if header_len < padded_size: 1377 data = ' ' * (padded_size - header_len) 1378 else: 1379 data = '' 1380 1381 _write_sized(f, data)
1382
1383 -def _write_header(f, header):
1384 header_str = ''.join([_pack_uint32(len(k) + 1 + len(v)) + k + '=' + v for k, v in header.items()]) 1385 _write_sized(f, header_str) 1386 return header_str
1387
1388 -def _read_header(f, req_op=None):
1389 bag_pos = f.tell() 1390 1391 # Read header 1392 try: 1393 header = _read_sized(f) 1394 except ROSBagException, ex: 1395 raise ROSBagFormatException('Error reading header: %s' % str(ex)) 1396 1397 # Parse header into a dict 1398 header_dict = {} 1399 while header != '': 1400 # Read size 1401 if len(header) < 4: 1402 raise ROSBagFormatException('Error reading header field') 1403 (size,) = struct.unpack('<L', header[:4]) # @todo reindex: catch struct.error 1404 header = header[4:] 1405 1406 # Read bytes 1407 if len(header) < size: 1408 raise ROSBagFormatException('Error reading header field: expected %d bytes, read %d' % (size, len(header))) 1409 (name, sep, value) = header[:size].partition('=') 1410 if sep == '': 1411 raise ROSBagFormatException('Error reading header field') 1412 1413 header_dict[name] = value # @todo reindex: raise exception on empty name 1414 1415 header = header[size:] 1416 1417 # Check the op code of the header, if supplied 1418 if req_op is not None: 1419 op = _read_uint8_field(header_dict, 'op') 1420 if req_op != op: 1421 raise ROSBagFormatException('Expected op code: %s, got %s' % (_OP_CODES[req_op], _OP_CODES[op])) 1422 1423 return header_dict
1424
1425 -def _peek_next_header_op(f):
1426 pos = f.tell() 1427 header = _read_header(f) 1428 op = _read_uint8_field(header, 'op') 1429 f.seek(pos) 1430 return op
1431
1432 -def _read_record_data(f):
1433 try: 1434 record_data = _read_sized(f) 1435 except ROSBagException, ex: 1436 raise ROSBagFormatException('Error reading record data: %s' % str(ex)) 1437 1438 return record_data
1439
1440 -class _BagReader(object):
1441 - def __init__(self, bag):
1442 self.bag = bag
1443
1444 - def start_reading(self):
1445 raise NotImplementedError()
1446
1447 - def read_messages(self, topics, start_time, end_time, connection_filter, raw):
1448 raise NotImplementedError()
1449
1450 - def reindex(self):
1451 raise NotImplementedError()
1452
1453 -class _BagReader101(_BagReader):
1454 """ 1455 Support class for reading v1.1 bag files. 1456 """
1457 - def __init__(self, bag):
1458 _BagReader.__init__(self, bag)
1459
1460 - def start_reading(self):
1461 pass
1462
1463 - def read_messages(self, topics, start_time, end_time, topic_filter, raw):
1464 f = self.bag._file 1465 1466 while True: 1467 bag_pos = f.tell() 1468 1469 # Read topic/md5/type string headers 1470 topic = f.readline().rstrip() 1471 if not topic: 1472 return 1473 1474 md5sum = f.readline().rstrip() 1475 datatype = f.readline().rstrip() 1476 1477 # Migration for rostools->roslib rename 1478 if datatype in ['rostools/Header', 'rostools/Log', 'rostools/Time']: 1479 datatype = datatype.replace('rostools', 'roslib') 1480 1481 # Read timestamp 1482 data = f.read(12) 1483 if len(data) != 12: 1484 raise ROSBagFormatException('Bag file appears to be corrupt (1)') 1485 (time_sec, time_nsec, length) = struct.unpack("<LLL", data) 1486 t = rospy.Time(time_sec, time_nsec) 1487 1488 # Read message 1489 data = f.read(length) 1490 if len(data) != length: 1491 raise ROSBagFormatException('Bag file appears to be corrupt (2)') 1492 1493 try: 1494 pytype = _message_types[md5sum] 1495 except KeyError: 1496 try: 1497 pytype = roslib.message.get_message_class(datatype) 1498 except Exception: 1499 pytype = None 1500 1501 if pytype is None: 1502 raise ROSBagException('Cannot deserialize messages of type [%s]: cannot locate message class' % datatype) 1503 else: 1504 if pytype._md5sum != md5sum: 1505 (package, type) = datatype.split('/') 1506 if roslib.gentools.compute_md5_v1(roslib.gentools.get_file_dependencies(roslib.msgs.msg_file(package,type))) == md5sum: 1507 print 'In V1.1 Logfile, found old md5sum for type [%s]. Allowing implicit migration to new md5sum.' % datatype 1508 else: 1509 raise ROSBagException('Cannot deserialize messages of type [%s]: md5sum is outdated in V1.1 bagfile' % datatype) 1510 _message_types[md5sum] = pytype 1511 1512 if raw: 1513 msg = datatype, data, pytype._md5sum, bag_pos, pytype 1514 else: 1515 msg = pytype() 1516 msg.deserialize(data) 1517 1518 yield topic, msg, t
1519
1520 -class _BagReader102_Unindexed(_BagReader):
1521 """ 1522 Support class for reading unindexed v1.2 bag files. 1523 """
1524 - def __init__(self, bag):
1525 _BagReader.__init__(self, bag)
1526
1527 - def start_reading(self):
1528 self.bag._file_header_pos = self.bag._file.tell()
1529
1530 - def reindex(self):
1531 """Generates all bag index information by rereading the message records.""" 1532 f = self.bag._file 1533 1534 total_bytes = self.bag.size 1535 1536 # Re-read the file header to get to the start of the first message 1537 self.bag._file.seek(self.bag._file_header_pos) 1538 1539 offset = f.tell() 1540 1541 # Read message definition and data records 1542 while offset < total_bytes: 1543 yield offset 1544 1545 op = _peek_next_header_op(f) 1546 1547 if op == _OP_MSG_DEF: 1548 connection_info = self.read_message_definition_record() 1549 1550 if connection_info.topic not in self.bag._topic_connections: 1551 self.bag._topic_connections[connection_info.topic] = connection_info.id 1552 self.bag._connections[connection_info.id] = connection_info 1553 self.bag._connection_indexes[connection_info.id] = [] 1554 1555 elif op == _OP_MSG_DATA: 1556 # Read the topic and timestamp from the header 1557 header = _read_header(f) 1558 1559 topic = _read_str_field(header, 'topic') 1560 secs = _read_uint32_field(header, 'sec') 1561 nsecs = _read_uint32_field(header, 'nsec') 1562 t = roslib.rostime.Time(secs, nsecs) 1563 1564 if topic not in self.bag._topic_connections: 1565 datatype = _read_str_field(header, 'type') 1566 self._create_connection_info_for_datatype(topic, datatype) 1567 1568 connection_id = self.bag._topic_connections[topic] 1569 info = self.bag._connections[connection_id] 1570 1571 # Skip over the message content 1572 _skip_sized(f) 1573 1574 # Insert the message entry (in order) into the connection index 1575 bisect.insort_right(self.bag._connection_indexes[connection_id], _IndexEntry102(t, offset)) 1576 1577 offset = f.tell()
1578
1579 - def read_messages(self, topics, start_time, end_time, topic_filter, raw):
1580 f = self.bag._file 1581 1582 f.seek(self.bag._file_header_pos) 1583 1584 while True: 1585 # Read MSG_DEF records 1586 while True: 1587 position = f.tell() 1588 1589 try: 1590 header = _read_header(f) 1591 except Exception: 1592 return 1593 1594 op = _read_uint8_field(header, 'op') 1595 if op != _OP_MSG_DEF: 1596 break 1597 1598 connection_info = self.read_message_definition_record(header) 1599 1600 if connection_info.topic not in self.bag._topic_connections: 1601 self.bag._topic_connections[connection_info.topic] = connection_info.id 1602 1603 self.bag._connections[connection_info.id] = connection_info 1604 1605 # Check that we have a MSG_DATA record 1606 if op != _OP_MSG_DATA: 1607 raise ROSBagFormatException('Expecting OP_MSG_DATA, got %d' % op) 1608 1609 topic = _read_str_field(header, 'topic') 1610 1611 if topic not in self.bag._topic_connections: 1612 datatype = _read_str_field(header, 'type') 1613 self._create_connection_info_for_datatype(topic, datatype) 1614 1615 connection_id = self.bag._topic_connections[topic] 1616 info = self.bag._connections[connection_id] 1617 1618 # Get the message type 1619 try: 1620 msg_type = _get_message_type(info) 1621 except KeyError: 1622 raise ROSBagException('Cannot deserialize messages of type [%s]. Message was not preceeded in bagfile by definition' % info.datatype) 1623 1624 # Get the timestamp 1625 secs = _read_uint32_field(header, 'sec') 1626 nsecs = _read_uint32_field(header, 'nsec') 1627 t = roslib.rostime.Time(secs, nsecs) 1628 1629 # Read the message content 1630 data = _read_record_data(f) 1631 1632 if raw: 1633 msg = (info.datatype, data, info.md5sum, position, msg_type) 1634 else: 1635 # Deserialize the message 1636 msg = msg_type() 1637 msg.deserialize(data) 1638 1639 yield (topic, msg, t) 1640 1641 self.bag._connection_indexes_read = True
1642
1643 - def _create_connection_info_for_datatype(self, topic, datatype):
1644 for c in self.bag._connections.itervalues(): 1645 if c.datatype == datatype: 1646 connection_id = len(self.bag._connections) 1647 connection_header = { 'topic' : topic, 'type' : c.header['type'], 'md5sum' : c.header['md5sum'], 'message_definition' : c.header['message_definition'] } 1648 connection_info = _ConnectionInfo(connection_id, topic, connection_header) 1649 1650 self.bag._topic_connections[topic] = connection_id 1651 self.bag._connections[connection_id] = connection_info 1652 self.bag._connection_indexes[connection_id] = [] 1653 return 1654 1655 raise ROSBagFormatException('Topic %s of datatype %s not preceded by message definition' % (topic, datatype))
1656
1657 - def read_message_definition_record(self, header=None):
1658 if not header: 1659 header = _read_header(self.bag._file, _OP_MSG_DEF) 1660 1661 topic = _read_str_field(header, 'topic') 1662 datatype = _read_str_field(header, 'type') 1663 md5sum = _read_str_field(header, 'md5') 1664 msg_def = _read_str_field(header, 'def') 1665 1666 _skip_sized(self.bag._file) # skip the record data 1667 1668 connection_header = { 'topic' : topic, 'type' : datatype, 'md5sum' : md5sum, 'message_definition' : msg_def } 1669 1670 id = len(self.bag._connections) 1671 1672 return _ConnectionInfo(id, topic, connection_header)
1673
1674 -class _BagReader102_Indexed(_BagReader102_Unindexed):
1675 """ 1676 Support class for reading indexed v1.2 bag files. 1677 """
1678 - def __init__(self, bag):
1679 _BagReader.__init__(self, bag)
1680
1681 - def read_messages(self, topics, start_time, end_time, connection_filter, raw):
1682 connections = self.bag._get_connections(topics, connection_filter) 1683 for entry in self.bag._get_entries(connections, start_time, end_time): 1684 yield self.seek_and_read_message_data_record(entry.offset, raw)
1685
1686 - def reindex(self):
1687 """Generates all bag index information by rereading the message records.""" 1688 f = self.bag._file 1689 1690 total_bytes = self.bag.size 1691 1692 # Re-read the file header to get to the start of the first message 1693 self.bag._file.seek(self.bag._file_header_pos) 1694 self.read_file_header_record() 1695 1696 offset = f.tell() 1697 1698 # Read message definition and data records 1699 while offset < total_bytes: 1700 yield offset 1701 1702 op = _peek_next_header_op(f) 1703 1704 if op == _OP_MSG_DEF: 1705 connection_info = self.read_message_definition_record() 1706 1707 if connection_info.topic not in self.bag._topic_connections: 1708 self.bag._topic_connections[connection_info.topic] = connection_info.id 1709 self.bag._connections[connection_info.id] = connection_info 1710 self.bag._connection_indexes[connection_info.id] = [] 1711 1712 elif op == _OP_MSG_DATA: 1713 # Read the topic and timestamp from the header 1714 header = _read_header(f) 1715 1716 topic = _read_str_field(header, 'topic') 1717 secs = _read_uint32_field(header, 'sec') 1718 nsecs = _read_uint32_field(header, 'nsec') 1719 t = roslib.rostime.Time(secs, nsecs) 1720 1721 if topic not in self.bag._topic_connections: 1722 datatype = _read_str_field(header, 'type') 1723 self._create_connection_info_for_datatype(topic, datatype) 1724 1725 connection_id = self.bag._topic_connections[topic] 1726 info = self.bag._connections[connection_id] 1727 1728 # Skip over the message content 1729 _skip_sized(f) 1730 1731 # Insert the message entry (in order) into the connection index 1732 bisect.insort_right(self.bag._connection_indexes[connection_id], _IndexEntry102(t, offset)) 1733 1734 elif op == _OP_INDEX_DATA: 1735 _skip_record(f) 1736 1737 offset = f.tell()
1738
1739 - def start_reading(self):
1740 try: 1741 # Read the file header 1742 self.read_file_header_record() 1743 1744 total_bytes = self.bag.size 1745 1746 # Check if the index position has been written, i.e. the bag was closed successfully 1747 if self.bag._index_data_pos == 0: 1748 raise ROSBagUnindexedException() 1749 1750 # Seek to the beginning of the topic index records 1751 self.bag._file.seek(self.bag._index_data_pos) 1752 1753 # Read the topic indexes 1754 topic_indexes = {} 1755 while True: 1756 pos = self.bag._file.tell() 1757 if pos >= total_bytes: 1758 break 1759 1760 topic, index = self.read_topic_index_record() 1761 1762 topic_indexes[topic] = index 1763 1764 # Read the message definition records (one for each topic) 1765 for topic, index in topic_indexes.items(): 1766 self.bag._file.seek(index[0].offset) 1767 1768 connection_info = self.read_message_definition_record() 1769 1770 if connection_info.topic not in self.bag._topic_connections: 1771 self.bag._topic_connections[connection_info.topic] = connection_info.id 1772 self.bag._connections[connection_info.id] = connection_info 1773 1774 self.bag._connection_indexes[connection_info.id] = index 1775 1776 self.bag._connection_indexes_read = True 1777 1778 except Exception, ex: 1779 raise ROSBagUnindexedException()
1780
1781 - def read_file_header_record(self):
1782 self.bag._file_header_pos = self.bag._file.tell() 1783 1784 header = _read_header(self.bag._file, _OP_FILE_HEADER) 1785 1786 self.bag._index_data_pos = _read_uint64_field(header, 'index_pos') 1787 1788 _skip_sized(self.bag._file) # skip the record data, i.e. padding
1789
1790 - def read_topic_index_record(self):
1791 f = self.bag._file 1792 1793 header = _read_header(f, _OP_INDEX_DATA) 1794 1795 index_version = _read_uint32_field(header, 'ver') 1796 topic = _read_str_field (header, 'topic') 1797 count = _read_uint32_field(header, 'count') 1798 1799 if index_version != 0: 1800 raise ROSBagFormatException('expecting index version 0, got %d' % index_version) 1801 1802 _read_uint32(f) # skip the record data size 1803 1804 topic_index = [] 1805 1806 for i in range(count): 1807 time = _read_time (f) 1808 offset = _read_uint64(f) 1809 1810 topic_index.append(_IndexEntry102(time, offset)) 1811 1812 return (topic, topic_index)
1813
1814 - def seek_and_read_message_data_record(self, position, raw):
1815 f = self.bag._file 1816 1817 # Seek to the message position 1818 f.seek(position) 1819 1820 # Skip any MSG_DEF records 1821 while True: 1822 header = _read_header(f) 1823 op = _read_uint8_field(header, 'op') 1824 if op != _OP_MSG_DEF: 1825 break 1826 _skip_sized(f) 1827 1828 # Check that we have a MSG_DATA record 1829 if op != _OP_MSG_DATA: 1830 raise ROSBagFormatException('Expecting OP_MSG_DATA, got %d' % op) 1831 1832 topic = _read_str_field(header, 'topic') 1833 1834 connection_id = self.bag._topic_connections[topic] 1835 info = self.bag._connections[connection_id] 1836 1837 # Get the message type 1838 try: 1839 msg_type = _get_message_type(info) 1840 except KeyError: 1841 raise ROSBagException('Cannot deserialize messages of type [%s]. Message was not preceeded in bagfile by definition' % info.datatype) 1842 1843 # Get the timestamp 1844 secs = _read_uint32_field(header, 'sec') 1845 nsecs = _read_uint32_field(header, 'nsec') 1846 t = roslib.rostime.Time(secs, nsecs) 1847 1848 # Read the message content 1849 data = _read_record_data(f) 1850 1851 if raw: 1852 msg = info.datatype, data, info.md5sum, position, msg_type 1853 else: 1854 # Deserialize the message 1855 msg = msg_type() 1856 msg.deserialize(data) 1857 1858 return (topic, msg, t)
1859
1860 -class _BagReader200(_BagReader):
1861 """ 1862 Support class for reading v2.0 bag files. 1863 """
1864 - def __init__(self, bag):
1865 _BagReader.__init__(self, bag) 1866 1867 self.decompressed_chunk_pos = None 1868 self.decompressed_chunk = None 1869 self.decompressed_chunk_io = None
1870
1871 - def reindex(self):
1872 """ 1873 Generates all bag index information by rereading the chunks. 1874 Assumes the file header has been read. 1875 """ 1876 f = self.bag._file 1877 1878 f.seek(0, os.SEEK_END) 1879 total_bytes = f.tell() 1880 1881 # Read any connection records from after the chunk section. 1882 # This is to workaround a bug in rosbag record --split (fixed in r10390) 1883 # where connection records weren't always being written inside the chunk. 1884 self._read_terminal_connection_records() 1885 1886 # Re-read the file header to get to the start of the first chunk 1887 self.bag._file.seek(self.bag._file_header_pos) 1888 self.read_file_header_record() 1889 1890 trunc_pos = None 1891 1892 while True: 1893 chunk_pos = f.tell() 1894 if chunk_pos >= total_bytes: 1895 break 1896 1897 yield chunk_pos 1898 1899 try: 1900 self._reindex_read_chunk(f, chunk_pos, total_bytes) 1901 except Exception, ex: 1902 break 1903 1904 trunc_pos = f.tell() 1905 1906 if trunc_pos and trunc_pos < total_bytes: 1907 f.truncate(trunc_pos)
1908
1909 - def _reindex_read_chunk(self, f, chunk_pos, total_bytes):
1910 # Read the chunk header 1911 chunk_header = self.read_chunk_header() 1912 1913 # If the chunk header size is 0, then the chunk wasn't correctly terminated - we're done 1914 if chunk_header.compressed_size == 0: 1915 raise ROSBagException('unterminated chunk at %d' % chunk_pos) 1916 1917 if chunk_header.compression == Compression.NONE: 1918 chunk_file = f 1919 else: 1920 # Read the compressed chunk 1921 compressed_chunk = _read(f, chunk_header.compressed_size) 1922 1923 # Decompress it 1924 if chunk_header.compression == Compression.BZ2: 1925 self.decompressed_chunk = bz2.decompress(compressed_chunk) 1926 else: 1927 raise ROSBagException('unsupported compression type: %s' % chunk_header.compression) 1928 1929 if self.decompressed_chunk_io: 1930 self.decompressed_chunk_io.close() 1931 self.decompressed_chunk_io = StringIO(self.decompressed_chunk) 1932 1933 chunk_file = self.decompressed_chunk_io 1934 1935 # Read chunk connection and message records 1936 self.bag._curr_chunk_info = None 1937 1938 if chunk_header.compression == Compression.NONE: 1939 offset = chunk_file.tell() - chunk_pos 1940 else: 1941 offset = chunk_file.tell() 1942 1943 expected_index_length = 0 1944 1945 while offset < chunk_header.uncompressed_size: 1946 op = _peek_next_header_op(chunk_file) 1947 1948 if op == _OP_CONNECTION: 1949 connection_info = self.read_connection_record(chunk_file) 1950 1951 if connection_info.id not in self.bag._connections: 1952 self.bag._connections[connection_info.id] = connection_info 1953 if connection_info.id not in self.bag._connection_indexes: 1954 self.bag._connection_indexes[connection_info.id] = [] 1955 1956 elif op == _OP_MSG_DATA: 1957 # Read the connection id and timestamp from the header 1958 header = _read_header(chunk_file) 1959 1960 connection_id = _read_uint32_field(header, 'conn') 1961 t = _read_time_field (header, 'time') 1962 1963 # Update the chunk info with this timestamp 1964 if not self.bag._curr_chunk_info: 1965 self.bag._curr_chunk_info = _ChunkInfo(chunk_pos, t, t) 1966 else: 1967 if t > self.bag._curr_chunk_info.end_time: 1968 self.bag._curr_chunk_info.end_time = t 1969 elif t < self.bag._curr_chunk_info.start_time: 1970 self.bag._curr_chunk_info.start_time = t 1971 if connection_id in self.bag._curr_chunk_info.connection_counts: 1972 self.bag._curr_chunk_info.connection_counts[connection_id] += 1 1973 else: 1974 self.bag._curr_chunk_info.connection_counts[connection_id] = 1 1975 1976 # Skip over the message content 1977 _skip_sized(chunk_file) 1978 1979 # Insert the message entry (in order) into the connection index 1980 if connection_id not in self.bag._connection_indexes: 1981 raise ROSBagException('connection id (id=%d) in chunk at position %d not preceded by connection record' % (connection_id, chunk_pos)) 1982 bisect.insort_right(self.bag._connection_indexes[connection_id], _IndexEntry200(t, chunk_pos, offset)) 1983 1984 expected_index_length += 1 1985 1986 else: 1987 # Unknown record type so skip 1988 _skip_record(chunk_file) 1989 1990 if chunk_header.compression == Compression.NONE: 1991 offset = chunk_file.tell() - chunk_pos 1992 else: 1993 offset = chunk_file.tell() 1994 1995 # Skip over index records, connection records and chunk info records 1996 next_op = _peek_next_header_op(f) 1997 1998 total_index_length = 0 1999 2000 while next_op != _OP_CHUNK: 2001 if next_op == _OP_INDEX_DATA: 2002 # Bug workaround: C Turtle bags (pre-1.1.15) were written with an incorrect data length 2003 _, index = self.read_connection_index_record() 2004 total_index_length += len(index) 2005 else: 2006 _skip_record(f) 2007 2008 if f.tell() >= total_bytes: 2009 if total_index_length != expected_index_length: 2010 raise ROSBagException('index shorter than expected (%d vs %d)' % (total_index_length, expected_index_length)) 2011 break 2012 2013 next_op = _peek_next_header_op(f) 2014 2015 # Chunk was read correctly - store info 2016 self.bag._chunk_headers[chunk_pos] = chunk_header 2017 self.bag._chunks.append(self.bag._curr_chunk_info)
2018
2020 b, f, r = self.bag, self.bag._file, self.bag._reader 2021 2022 # Seek to the first record after FILE_HEADER 2023 f.seek(b._file_header_pos) 2024 r.read_file_header_record() 2025 2026 # Advance to the first CONNECTION 2027 if self._advance_to_next_record(_OP_CONNECTION): 2028 # Read the CONNECTION records 2029 while True: 2030 connection_info = r.read_connection_record(f) 2031 2032 b._connections[connection_info.id] = connection_info 2033 b._connection_indexes[connection_info.id] = [] 2034 2035 next_op = _peek_next_header_op(f) 2036 if next_op != _OP_CONNECTION: 2037 break
2038
2039 - def _advance_to_next_record(self, op):
2040 b, f = self.bag, self.bag._file 2041 2042 try: 2043 while True: 2044 next_op = _peek_next_header_op(f) 2045 if next_op == op: 2046 break 2047 2048 if next_op == _OP_INDEX_DATA: 2049 # Workaround the possible invalid data length in INDEX_DATA records 2050 2051 # read_connection_index_record() requires _curr_chunk_info is set 2052 if b._curr_chunk_info is None: 2053 b._curr_chunk_info = _ChunkInfo(0, rospy.Time(0, 1), rospy.Time(0, 1)) 2054 2055 b._reader.read_connection_index_record() 2056 else: 2057 _skip_record(f) 2058 2059 return True 2060 2061 except Exception, ex: 2062 return False
2063
2064 - def start_reading(self):
2065 try: 2066 # Read the file header 2067 self.read_file_header_record() 2068 2069 # Check if the index position has been written, i.e. the bag was closed successfully 2070 if self.bag._index_data_pos == 0: 2071 raise ROSBagUnindexedException() 2072 2073 # Seek to the end of the chunks 2074 self.bag._file.seek(self.bag._index_data_pos) 2075 2076 # Read the connection records 2077 self.bag._connection_indexes = {} 2078 for i in range(self.bag._connection_count): 2079 connection_info = self.read_connection_record(self.bag._file) 2080 self.bag._connections[connection_info.id] = connection_info 2081 self.bag._connection_indexes[connection_info.id] = [] 2082 2083 # Read the chunk info records 2084 self.bag._chunks = [self.read_chunk_info_record() for i in range(self.bag._chunk_count)] 2085 2086 # Read the chunk headers 2087 self.bag._chunk_headers = {} 2088 for chunk_info in self.bag._chunks: 2089 self.bag._file.seek(chunk_info.pos) 2090 self.bag._chunk_headers[chunk_info.pos] = self.read_chunk_header() 2091 2092 if not self.bag._skip_index: 2093 self._read_connection_index_records() 2094 2095 except Exception, ex: 2096 raise ROSBagUnindexedException()
2097
2099 for chunk_info in self.bag._chunks: 2100 self.bag._file.seek(chunk_info.pos) 2101 _skip_record(self.bag._file) 2102 2103 self.bag._curr_chunk_info = chunk_info 2104 for i in range(len(chunk_info.connection_counts)): 2105 connection_id, index = self.read_connection_index_record() 2106 self.bag._connection_indexes[connection_id].extend(index) 2107 2108 # Remove any connections with no entries 2109 # This is a workaround for a bug where connection records were being written for 2110 # connections which had no messages in the bag 2111 orphan_connection_ids = [id for id, index in self.bag._connection_indexes.iteritems() if not index] 2112 for id in orphan_connection_ids: 2113 del self.bag._connections[id] 2114 del self.bag._connection_indexes[id] 2115 2116 self.bag._connection_indexes_read = True
2117
2118 - def read_messages(self, topics, start_time, end_time, connection_filter, raw):
2119 connections = self.bag._get_connections(topics, connection_filter) 2120 for entry in self.bag._get_entries(connections, start_time, end_time): 2121 yield self.seek_and_read_message_data_record((entry.chunk_pos, entry.offset), raw)
2122 2123 ### 2124
2125 - def read_file_header_record(self):
2126 self.bag._file_header_pos = self.bag._file.tell() 2127 2128 header = _read_header(self.bag._file, _OP_FILE_HEADER) 2129 2130 self.bag._index_data_pos = _read_uint64_field(header, 'index_pos') 2131 self.bag._chunk_count = _read_uint32_field(header, 'chunk_count') 2132 self.bag._connection_count = _read_uint32_field(header, 'conn_count') 2133 2134 _skip_sized(self.bag._file) # skip over the record data, i.e. padding
2135
2136 - def read_connection_record(self, f):
2137 header = _read_header(f, _OP_CONNECTION) 2138 2139 conn_id = _read_uint32_field(header, 'conn') 2140 topic = _read_str_field (header, 'topic') 2141 2142 connection_header = _read_header(f) 2143 2144 return _ConnectionInfo(conn_id, topic, connection_header)
2145
2146 - def read_chunk_info_record(self):
2147 f = self.bag._file 2148 2149 header = _read_header(f, _OP_CHUNK_INFO) 2150 2151 chunk_info_version = _read_uint32_field(header, 'ver') 2152 2153 if chunk_info_version == 1: 2154 chunk_pos = _read_uint64_field(header, 'chunk_pos') 2155 start_time = _read_time_field (header, 'start_time') 2156 end_time = _read_time_field (header, 'end_time') 2157 connection_count = _read_uint32_field(header, 'count') 2158 2159 chunk_info = _ChunkInfo(chunk_pos, start_time, end_time) 2160 2161 _read_uint32(f) # skip the record data size 2162 2163 for i in range(connection_count): 2164 connection_id = _read_uint32(f) 2165 count = _read_uint32(f) 2166 2167 chunk_info.connection_counts[connection_id] = count 2168 2169 return chunk_info 2170 else: 2171 raise ROSBagFormatException('Unknown chunk info record version: %d' % chunk_info_version)
2172
2173 - def read_chunk_header(self):
2174 header = _read_header(self.bag._file, _OP_CHUNK) 2175 2176 compression = _read_str_field (header, 'compression') 2177 uncompressed_size = _read_uint32_field(header, 'size') 2178 2179 compressed_size = _read_uint32(self.bag._file) # read the record data size 2180 2181 data_pos = self.bag._file.tell() 2182 2183 return _ChunkHeader(compression, compressed_size, uncompressed_size, data_pos)
2184
2186 f = self.bag._file 2187 2188 header = _read_header(f, _OP_INDEX_DATA) 2189 2190 index_version = _read_uint32_field(header, 'ver') 2191 connection_id = _read_uint32_field(header, 'conn') 2192 count = _read_uint32_field(header, 'count') 2193 2194 if index_version != 1: 2195 raise ROSBagFormatException('expecting index version 1, got %d' % index_version) 2196 2197 record_size = _read_uint32(f) # skip the record data size 2198 2199 index = [] 2200 2201 for i in range(count): 2202 time = _read_time (f) 2203 offset = _read_uint32(f) 2204 2205 index.append(_IndexEntry200(time, self.bag._curr_chunk_info.pos, offset)) 2206 2207 return (connection_id, index)
2208
2209 - def seek_and_read_message_data_record(self, position, raw):
2210 chunk_pos, offset = position 2211 2212 chunk_header = self.bag._chunk_headers.get(chunk_pos) 2213 if chunk_header is None: 2214 raise ROSBagException('no chunk at position %d' % chunk_pos) 2215 2216 if chunk_header.compression == Compression.NONE: 2217 f = self.bag._file 2218 f.seek(chunk_header.data_pos + offset) 2219 else: 2220 if self.decompressed_chunk_pos != chunk_pos: 2221 # Seek to the chunk data, read and decompress 2222 self.bag._file.seek(chunk_header.data_pos) 2223 compressed_chunk = _read(self.bag._file, chunk_header.compressed_size) 2224 2225 if chunk_header.compression == Compression.BZ2: 2226 self.decompressed_chunk = bz2.decompress(compressed_chunk) 2227 else: 2228 raise ROSBagException('unsupported compression type: %s' % chunk_header.compression) 2229 2230 self.decompressed_chunk_pos = chunk_pos 2231 2232 if self.decompressed_chunk_io: 2233 self.decompressed_chunk_io.close() 2234 self.decompressed_chunk_io = StringIO(self.decompressed_chunk) 2235 2236 f = self.decompressed_chunk_io 2237 f.seek(offset) 2238 2239 # Skip any CONNECTION records 2240 while True: 2241 header = _read_header(f) 2242 op = _read_uint8_field(header, 'op') 2243 if op != _OP_CONNECTION: 2244 break 2245 _skip_sized(f) 2246 2247 # Check that we have a MSG_DATA record 2248 if op != _OP_MSG_DATA: 2249 raise ROSBagFormatException('Expecting OP_MSG_DATA, got %d' % op) 2250 2251 connection_id = _read_uint32_field(header, 'conn') 2252 t = _read_time_field (header, 'time') 2253 2254 # Get the message type 2255 connection_info = self.bag._connections[connection_id] 2256 try: 2257 msg_type = _get_message_type(connection_info) 2258 except KeyError: 2259 raise ROSBagException('Cannot deserialize messages of type [%s]. Message was not preceded in bag by definition' % connection_info.datatype) 2260 2261 # Read the message content 2262 data = _read_record_data(f) 2263 2264 # Deserialize the message 2265 if raw: 2266 msg = connection_info.datatype, data, connection_info.md5sum, (chunk_pos, offset), msg_type 2267 else: 2268 msg = msg_type() 2269 msg.deserialize(data) 2270 2271 return (connection_info.topic, msg, t)
2272
2273 -def _time_to_str(secs):
2274 secs_frac = secs - int(secs) 2275 secs_frac_str = ('%.2f' % secs_frac)[1:] 2276 2277 return time.strftime('%b %d %Y %H:%M:%S', time.localtime(secs)) + secs_frac_str
2278
2279 -def _human_readable_size(size):
2280 multiple = 1024.0 2281 for suffix in ['KB', 'MB', 'GB', 'TB', 'PB', 'EB', 'ZB', 'YB']: 2282 size /= multiple 2283 if size < multiple: 2284 return '%.1f %s' % (size, suffix) 2285 2286 return '-'
2287
2288 -def _human_readable_frequency(freq):
2289 multiple = 1000.0 2290 for suffix in ['Hz', 'kHz', 'MHz', 'GHz', 'THz', 'PHz', 'EHz', 'ZHz', 'YHz']: 2291 if freq < multiple: 2292 return '%.1f %s' % (freq, suffix) 2293 freq /= multiple 2294 2295 return '-'
2296
2297 ## See http://code.activestate.com/recipes/511509 2298 -def _mergesort(list_of_lists, key=None):
2299 """ 2300 Perform an N-way merge operation on sorted lists. 2301 2302 @param list_of_lists: (really iterable of iterable) of sorted elements 2303 (either by naturally or by C{key}) 2304 @param key: specify sort key function (like C{sort()}, C{sorted()}) 2305 @param iterfun: function that returns an iterator. 2306 2307 Yields tuples of the form C{(item, iterator)}, where the iterator is the 2308 built-in list iterator or something you pass in, if you pre-generate the 2309 iterators. 2310 2311 This is a stable merge; complexity O(N lg N) 2312 2313 Examples:: 2314 2315 print list(x[0] for x in mergesort([[1,2,3,4], 2316 [2,3.5,3.7,4.5,6,7], 2317 [2.6,3.6,6.6,9]])) 2318 [1, 2, 2, 2.6, 3, 3.5, 3.6, 3.7, 4, 4.5, 6, 6.6, 7, 9] 2319 2320 # note stability 2321 print list(x[0] for x in mergesort([[1,2,3,4], 2322 [2,3.5,3.7,4.5,6,7], 2323 [2.6,3.6,6.6,9]], key=int)) 2324 [1, 2, 2, 2.6, 3, 3.5, 3.6, 3.7, 4, 4.5, 6, 6.6, 7, 9] 2325 2326 print list(x[0] for x in mergesort([[4,3,2,1], 2327 [7,6.5,4,3.7,3.3,1.9], 2328 [9,8.6,7.6,6.6,5.5,4.4,3.3]], 2329 key=lambda x: -x)) 2330 [9, 8.6, 7.6, 7, 6.6, 6.5, 5.5, 4.4, 4, 4, 3.7, 3.3, 3.3, 3, 2, 1.9, 1] 2331 """ 2332 2333 heap = [] 2334 for i, itr in enumerate(iter(pl) for pl in list_of_lists): 2335 try: 2336 item = itr.next() 2337 toadd = (key(item), i, item, itr) if key else (item, i, itr) 2338 heap.append(toadd) 2339 except StopIteration: 2340 pass 2341 heapq.heapify(heap) 2342 2343 if key: 2344 while heap: 2345 _, idx, item, itr = heap[0] 2346 yield item, itr 2347 try: 2348 item = itr.next() 2349 heapq.heapreplace(heap, (key(item), idx, item, itr) ) 2350 except StopIteration: 2351 heapq.heappop(heap) 2352 2353 else: 2354 while heap: 2355 item, idx, itr = heap[0] 2356 yield item, itr 2357 try: 2358 heapq.heapreplace(heap, (itr.next(), idx, itr)) 2359 except StopIteration: 2360 heapq.heappop(heap)
2361
2362 -class _BZ2CompressorFileFacade(object):
2363 """ 2364 A file facade for the bz2.BZ2Compressor. 2365 """
2366 - def __init__(self, file):
2367 self.file = file 2368 self.compressor = bz2.BZ2Compressor() 2369 self.compressed_bytes_in = 0
2370
2371 - def write(self, data):
2372 compressed = self.compressor.compress(data) 2373 if len(compressed) > 0: 2374 self.file.write(compressed) 2375 self.compressed_bytes_in += len(data)
2376
2377 - def flush(self):
2378 compressed = self.compressor.flush() 2379 if len(compressed) > 0: 2380 self.file.write(compressed)
2381
2382 -def _median(values):
2383 values_len = len(values) 2384 if values_len == 0: 2385 return float('nan') 2386 2387 sorted_values = sorted(values) 2388 if values_len % 2 == 1: 2389 return sorted_values[(values_len + 1) / 2 - 1] 2390 2391 lower = sorted_values[values_len / 2 - 1] 2392 upper = sorted_values[values_len / 2] 2393 2394 return float(lower + upper) / 2
2395