1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33 """
34 The rosbag Python API.
35
36 Provides serialization of bag files.
37 """
38
39 PKG = 'rosbag'
40 import roslib; roslib.load_manifest(PKG)
41
42 import bisect
43 import bz2
44 from cStringIO import StringIO
45 import heapq
46 import os
47 import re
48 import struct
49 import sys
50 import threading
51 import time
52 import yaml
53
54 import roslib.genpy
55 import rospy
58 """
59 Base class for exceptions in rosbag.
60 """
63
66
73
75 """
76 Exception for unindexed bags.
77 """
80
82 """
83 Allowable compression types
84 """
85 NONE = 'none'
86 BZ2 = 'bz2'
87
89 """
90 Bag serialize messages to and from a single file on disk using the bag format.
91 """
92 - def __init__(self, f, mode='r', compression=Compression.NONE, chunk_threshold=768 * 1024, allow_unindexed=False, options=None, skip_index=False):
93 """
94 Open a bag file. The mode can be 'r', 'w', or 'a' for reading (default),
95 writing or appending. The file will be created if it doesn't exist
96 when opened for writing or appending; it will be truncated when opened
97 for writing. Simultaneous reading and writing is allowed when in writing
98 or appending mode.
99 @param f: filename of bag to open or a stream to read from
100 @type f: str or file
101 @param mode: mode, either 'r', 'w', or 'a'
102 @type mode: str
103 @param compression: compression mode, see U{rosbag.Compression} for valid modes
104 @type compression: str
105 @param chunk_threshold: minimum number of uncompressed bytes per chunk
106 @type chunk_threshold: int
107 @param allow_unindexed: if True, allow opening unindexed bags
108 @type allow_unindexed: bool
109 @param options: the bag options (currently: compression and chunk_threshold)
110 @type options: dict
111 @param skip_index: if True, don't read the connection index records on open [2.0+]
112 @type skip_index: bool
113 @raise ValueError: if any argument is invalid
114 @raise ROSBagException: if an error occurs opening file
115 @raise ROSBagFormatException: if bag format is corrupted
116 """
117 if options is not None:
118 if type(options) is not dict:
119 raise ValueError('options must be of type dict')
120 if 'compression' in options:
121 compression = options['compression']
122 if 'chunk_threshold' in options:
123 chunk_threshold = options['chunk_threshold']
124
125 self._file = None
126 self._filename = None
127 self._version = None
128
129 allowed_compressions = [Compression.NONE, Compression.BZ2]
130 if compression not in allowed_compressions:
131 raise ValueError('compression must be one of: %s' % ', '.join(allowed_compressions))
132 self._compression = compression
133
134 if chunk_threshold < 0:
135 raise ValueError('chunk_threshold must be greater than or equal to zero')
136 self._chunk_threshold = chunk_threshold
137
138 self._skip_index = skip_index
139
140 self._reader = None
141
142 self._file_header_pos = None
143 self._index_data_pos = 0
144
145 self._clear_index()
146
147 self._buffer = StringIO()
148
149 self._curr_compression = Compression.NONE
150
151 self._open(f, mode, allow_unindexed)
152
153 self._output_file = self._file
154
157
160
161 - def __exit__(self, exc_type, exc_value, traceback):
163
164 @property
166 """Get the options."""
167 return { 'compression' : self._compression, 'chunk_threshold' : self._chunk_threshold }
168
169 @property
171 """Get the filename."""
172 return self._filename
173
174 @property
176 """Get the version."""
177 return self._version
178
179 @property
181 """Get the mode."""
182 return self._mode
183
184 @property
186 """Get the size in bytes."""
187 if not self._file:
188 raise ValueError('I/O operation on closed bag')
189
190 pos = self._file.tell()
191 self._file.seek(0, os.SEEK_END)
192 size = self._file.tell()
193 self._file.seek(pos)
194 return size
195
196
197
199 """Get the compression method to use for writing."""
200 return self._compression
201
203 """Set the compression method to use for writing."""
204 allowed_compressions = [Compression.NONE, Compression.BZ2]
205 if compression not in allowed_compressions:
206 raise ValueError('compression must be one of: %s' % ', '.join(allowed_compressions))
207
208 self.flush()
209 self._compression = compression
210
211 compression = property(_get_compression, _set_compression)
212
213
214
216 """Get the chunk threshold to use for writing."""
217 return self._chunk_threshold
218
220 """Set the chunk threshold to use for writing."""
221 if chunk_threshold < 0:
222 raise ValueError('chunk_threshold must be greater than or equal to zero')
223
224 self.flush()
225 self._chunk_threshold = chunk_threshold
226
227 chunk_threshold = property(_get_chunk_threshold, _set_chunk_threshold)
228
229 - def read_messages(self, topics=None, start_time=None, end_time=None, connection_filter=None, raw=False):
230 """
231 Read messages from the bag, optionally filtered by topic, timestamp and connection details.
232 @param topics: list of topics or a single topic [optional]
233 @type topics: list(str) or str
234 @param start_time: earliest timestamp of message to return [optional]
235 @type start_time: U{roslib.rostime.Time}
236 @param end_time: latest timestamp of message to return [optional]
237 @type end_time: U{roslib.rostime.Time}
238 @param connection_filter: function to filter connections to include [optional]
239 @type connection_filter: function taking (topic, datatype, md5sum, msg_def, header) and returning bool
240 @param raw: if True, then generate tuples of (datatype, (data, md5sum, position), pytype)
241 @type raw: bool
242 @return: generator of (topic, message, timestamp) tuples for each message in the bag file
243 @rtype: generator of tuples of (str, U{roslib.message.Message}, U{roslib.rostime.Time}) [not raw] or (str, (str, str, str, tuple, class), U{roslib.rostime.Time}) [raw]
244 """
245 self.flush()
246
247 if topics and type(topics) is str:
248 topics = [topics]
249
250 return self._reader.read_messages(topics, start_time, end_time, connection_filter, raw)
251
253 """
254 Write the open chunk to disk so subsequent reads will read all messages.
255 @raise ValueError: if bag is closed
256 """
257 if not self._file:
258 raise ValueError('I/O operation on closed bag')
259
260 if self._chunk_open:
261 self._stop_writing_chunk()
262
263 - def write(self, topic, msg, t=None, raw=False):
264 """
265 Write a message to the bag.
266 @param topic: name of topic
267 @type topic: str
268 @param msg: message to add to bag, or tuple (if raw)
269 @type msg: Message or tuple of raw message data
270 @param t: ROS time of message publication, if None specifed, use current time [optional]
271 @type t: U{roslib.rostime.Time}
272 @param raw: if True, msg is in raw format, i.e. (msg_type, serialized_bytes, md5sum, pytype)
273 @type raw: bool
274 @raise ValueError: if arguments are invalid or bag is closed
275 """
276 if not self._file:
277 raise ValueError('I/O operation on closed bag')
278
279 if not topic:
280 raise ValueError('topic is invalid')
281 if not msg:
282 raise ValueError('msg is invalid')
283
284 if t is None:
285 t = roslib.rostime.Time.from_sec(time.time())
286
287
288 self._file.seek(0, os.SEEK_END)
289
290
291 if not self._chunk_open:
292 self._start_writing_chunk(t)
293
294
295 if raw:
296 if len(msg) == 5:
297 msg_type, serialized_bytes, md5sum, pos, pytype = msg
298 elif len(msg) == 4:
299 msg_type, serialized_bytes, md5sum, pytype = msg
300 else:
301 raise ValueError('msg must be of length 4 or 5')
302
303
304 if topic in self._topic_connections:
305 connection_info = self._topic_connections[topic]
306 conn_id = connection_info.id
307 else:
308 conn_id = len(self._connections)
309
310 if raw:
311 if pytype is None:
312 try:
313 pytype = roslib.message.get_message_class(msg_type)
314 except Exception:
315 pytype = None
316 if pytype is None:
317 raise ROSBagException('cannot locate message class and no message class provided for [%s]' % msg_type)
318
319 if pytype._md5sum != md5sum:
320 print >> sys.stderr, 'WARNING: md5sum of loaded type [%s] does not match that specified' % msg_type
321
322
323 header = { 'topic' : topic, 'type' : msg_type, 'md5sum' : md5sum, 'message_definition' : pytype._full_text }
324 else:
325 header = { 'topic' : topic, 'type' : msg.__class__._type, 'md5sum' : msg.__class__._md5sum, 'message_definition' : msg._full_text }
326
327 connection_info = _ConnectionInfo(conn_id, topic, header)
328
329 self._write_connection_record(connection_info)
330
331 self._connections[conn_id] = connection_info
332 self._topic_connections[topic] = connection_info
333
334
335 index_entry = _IndexEntry200(t, self._curr_chunk_info.pos, self._get_chunk_offset())
336
337
338 if conn_id not in self._curr_chunk_connection_indexes:
339
340 self._curr_chunk_connection_indexes[conn_id] = [index_entry]
341 self._curr_chunk_info.connection_counts[conn_id] = 1
342 else:
343 curr_chunk_connection_index = self._curr_chunk_connection_indexes[conn_id]
344 if index_entry >= curr_chunk_connection_index[-1]:
345
346 curr_chunk_connection_index.append(index_entry)
347 else:
348 bisect.insort_right(curr_chunk_connection_index, index_entry)
349
350 self._curr_chunk_info.connection_counts[conn_id] += 1
351
352 if conn_id not in self._connection_indexes:
353 self._connection_indexes[conn_id] = [index_entry]
354 else:
355 bisect.insort_right(self._connection_indexes[conn_id], index_entry)
356
357
358 if t > self._curr_chunk_info.end_time:
359 self._curr_chunk_info.end_time = t
360 elif t < self._curr_chunk_info.start_time:
361 self._curr_chunk_info.start_time = t
362
363 if not raw:
364
365 self._buffer.seek(0)
366 self._buffer.truncate(0)
367 msg.serialize(self._buffer)
368 serialized_bytes = self._buffer.getvalue()
369
370
371 self._write_message_data_record(conn_id, t, serialized_bytes)
372
373
374 chunk_size = self._get_chunk_offset()
375 if chunk_size > self._chunk_threshold:
376 self._stop_writing_chunk()
377
379 """
380 Reindexes the bag file. Yields position of each chunk for progress.
381 """
382 self._clear_index()
383 return self._reader.reindex()
384
386 """
387 Close the bag file. Closing an already closed bag does nothing.
388 """
389 if self._file:
390 if self._mode in 'wa':
391 self._stop_writing()
392
393 self._close_file()
394
396 rows = []
397
398 try:
399 if self._filename:
400 rows.append(('path', self._filename))
401
402 if self._version == 102 and type(self._reader) == _BagReader102_Unindexed:
403 rows.append(('version', '1.2 (unindexed)'))
404 else:
405 rows.append(('version', '%d.%d' % (self._version / 100, self._version % 100)))
406
407 if not self._connection_indexes and not self._chunks:
408 rows.append(('size', _human_readable_size(self.size)))
409 else:
410 if self._chunks:
411 start_stamp = self._chunks[ 0].start_time.to_sec()
412 end_stamp = self._chunks[-1].end_time.to_sec()
413 else:
414 start_stamp = min([index[ 0].time.to_sec() for index in self._connection_indexes.itervalues()])
415 end_stamp = max([index[-1].time.to_sec() for index in self._connection_indexes.itervalues()])
416
417
418 duration = end_stamp - start_stamp
419 dur_secs = duration % 60
420 dur_mins = int(duration / 60)
421 dur_hrs = int(dur_mins / 60)
422 if dur_hrs > 0:
423 dur_mins = dur_mins % 60
424 duration_str = '%dhr %d:%02ds (%ds)' % (dur_hrs, dur_mins, dur_secs, duration)
425 elif dur_mins > 0:
426 duration_str = '%d:%02ds (%ds)' % (dur_mins, dur_secs, duration)
427 else:
428 duration_str = '%.1fs' % duration
429
430 rows.append(('duration', duration_str))
431
432
433 rows.append(('start', '%s (%.2f)' % (_time_to_str(start_stamp), start_stamp)))
434 rows.append(('end', '%s (%.2f)' % (_time_to_str(end_stamp), end_stamp)))
435
436 rows.append(('size', _human_readable_size(self.size)))
437
438 if self._chunks:
439 num_messages = 0
440 for c in self._chunks:
441 for counts in c.connection_counts.itervalues():
442 num_messages += counts
443 else:
444 num_messages = sum([len(index) for index in self._connection_indexes.itervalues()])
445 rows.append(('messages', str(num_messages)))
446
447
448 if len(self._chunk_headers) == 0:
449 rows.append(('compression', 'none'))
450 else:
451 compression_counts = {}
452 compression_uncompressed = {}
453 compression_compressed = {}
454 for chunk_header in self._chunk_headers.itervalues():
455 if chunk_header.compression not in compression_counts:
456 compression_counts[chunk_header.compression] = 1
457 compression_uncompressed[chunk_header.compression] = chunk_header.uncompressed_size
458 compression_compressed[chunk_header.compression] = chunk_header.compressed_size
459 else:
460 compression_counts[chunk_header.compression] += 1
461 compression_uncompressed[chunk_header.compression] += chunk_header.uncompressed_size
462 compression_compressed[chunk_header.compression] += chunk_header.compressed_size
463
464 chunk_count = len(self._chunk_headers)
465
466 compressions = []
467 for count, compression in reversed(sorted([(v, k) for k, v in compression_counts.items()])):
468 if compression != Compression.NONE:
469 fraction = (100.0 * compression_compressed[compression]) / compression_uncompressed[compression]
470 compressions.append('%s [%d/%d chunks; %.2f%%]' % (compression, count, chunk_count, fraction))
471 else:
472 compressions.append('%s [%d/%d chunks]' % (compression, count, chunk_count))
473 rows.append(('compression', ', '.join(compressions)))
474
475 all_uncompressed = (sum([count for c, count in compression_counts.items() if c != Compression.NONE]) == 0)
476 if not all_uncompressed:
477 total_uncompressed_size = sum((h.uncompressed_size for h in self._chunk_headers.itervalues()))
478 total_compressed_size = sum((h.compressed_size for h in self._chunk_headers.itervalues()))
479
480 total_uncompressed_size_str = _human_readable_size(total_uncompressed_size)
481 total_compressed_size_str = _human_readable_size(total_compressed_size)
482 total_size_str_length = max([len(total_uncompressed_size_str), len(total_compressed_size_str)])
483
484 if duration > 0:
485 uncompressed_rate_str = _human_readable_size(total_uncompressed_size / duration)
486 compressed_rate_str = _human_readable_size(total_compressed_size / duration)
487 rate_str_length = max([len(uncompressed_rate_str), len(compressed_rate_str)])
488
489 rows.append(('uncompressed', '%*s @ %*s/s' %
490 (total_size_str_length, total_uncompressed_size_str, rate_str_length, uncompressed_rate_str)))
491 rows.append(('compressed', '%*s @ %*s/s (%.2f%%)' %
492 (total_size_str_length, total_compressed_size_str, rate_str_length, compressed_rate_str, (100.0 * total_compressed_size) / total_uncompressed_size)))
493 else:
494 rows.append(('uncompressed', '%*s' % (total_size_str_length, total_uncompressed_size_str)))
495 rows.append(('compressed', '%*s' % (total_size_str_length, total_compressed_size_str)))
496
497 datatypes = set()
498 datatype_infos = []
499 for connection in self._connections.itervalues():
500 if connection.datatype in datatypes:
501 continue
502 datatype_infos.append((connection.datatype, connection.md5sum, connection.msg_def))
503 datatypes.add(connection.datatype)
504
505 topics = sorted(set([c.topic for c in self._get_connections()]))
506 topic_datatypes = {}
507 topic_conn_counts = {}
508 topic_msg_counts = {}
509 topic_freqs_median = {}
510 for topic in topics:
511 connections = list(self._get_connections(topic))
512
513 topic_datatypes[topic] = connections[0].datatype
514 topic_conn_counts[topic] = len(connections)
515
516 msg_count = 0
517 for connection in connections:
518 for chunk in self._chunks:
519 msg_count += chunk.connection_counts.get(connection.id, 0)
520 topic_msg_counts[topic] = msg_count
521
522 if self._connection_indexes_read:
523 stamps = [entry.time.to_sec() for entry in self._get_entries(connections)]
524 if len(stamps) > 1:
525 periods = [s1 - s0 for s1, s0 in zip(stamps[1:], stamps[:-1])]
526 med_period = _median(periods)
527 if med_period > 0.0:
528 topic_freqs_median[topic] = 1.0 / med_period
529
530 topics = sorted(topic_datatypes.iterkeys())
531 max_topic_len = max([len(topic) for topic in topics])
532 max_datatype_len = max([len(datatype) for datatype in datatypes])
533 max_msg_count_len = max([len('%d' % msg_count) for msg_count in topic_msg_counts.itervalues()])
534 max_freq_median_len = max([len(_human_readable_frequency(freq)) for freq in topic_freqs_median.itervalues()]) if len(topic_freqs_median) > 0 else 0
535
536
537 for i, (datatype, md5sum, msg_def) in enumerate(sorted(datatype_infos)):
538 s = '%-*s [%s]' % (max_datatype_len, datatype, md5sum)
539 if i == 0:
540 rows.append(('types', s))
541 else:
542 rows.append(('', s))
543
544
545 for i, topic in enumerate(topics):
546 topic_msg_count = topic_msg_counts[topic]
547
548 s = '%-*s %*d %s' % (max_topic_len, topic, max_msg_count_len, topic_msg_count, 'msgs' if topic_msg_count > 1 else 'msg ')
549 if topic in topic_freqs_median:
550 s += ' @ %*s' % (max_freq_median_len, _human_readable_frequency(topic_freqs_median[topic]))
551 else:
552 s += ' %*s' % (max_freq_median_len, '')
553
554 s += ' : %-*s' % (max_datatype_len, topic_datatypes[topic])
555 if topic_conn_counts[topic] > 1:
556 s += ' (%d connections)' % topic_conn_counts[topic]
557
558 if i == 0:
559 rows.append(('topics', s))
560 else:
561 rows.append(('', s))
562
563 except Exception, ex:
564 raise
565
566 first_column_width = max([len(field) for field, _ in rows]) + 1
567
568 s = ''
569 for (field, value) in rows:
570 if field:
571 s += '%-*s %s\n' % (first_column_width, field + ':', value)
572 else:
573 s += '%-*s %s\n' % (first_column_width, '', value)
574
575 return s.rstrip()
576
578 s = ''
579
580 try:
581 if self._filename:
582 s += 'path: %s\n' % self._filename
583
584 if self._version == 102 and type(self._reader) == _BagReader102_Unindexed:
585 s += 'version: 1.2 (unindexed)\n'
586 else:
587 s += 'version: %d.%d\n' % (self._version / 100, self._version % 100)
588
589 if not self._connection_indexes and not self._chunks:
590 s += 'size: %d\n' % self.size
591 s += 'indexed: False\n'
592 else:
593 if self._chunks:
594 start_stamp = self._chunks[ 0].start_time.to_sec()
595 end_stamp = self._chunks[-1].end_time.to_sec()
596 else:
597 start_stamp = min([index[ 0].time.to_sec() for index in self._connection_indexes.itervalues()])
598 end_stamp = max([index[-1].time.to_sec() for index in self._connection_indexes.itervalues()])
599
600 duration = end_stamp - start_stamp
601 s += 'duration: %.6f\n' % duration
602 s += 'start: %.6f\n' % start_stamp
603 s += 'end: %.6f\n' % end_stamp
604 s += 'size: %d\n' % self.size
605 if self._chunks:
606 num_messages = 0
607 for c in self._chunks:
608 for counts in c.connection_counts.itervalues():
609 num_messages += counts
610 else:
611 num_messages = sum([len(index) for index in self._connection_indexes.itervalues()])
612 s += 'messages: %d\n' % num_messages
613 s += 'indexed: True\n'
614
615
616 if len(self._chunk_headers) == 0:
617 s += 'compression: none\n'
618 else:
619 compression_counts = {}
620 compression_uncompressed = {}
621 compression_compressed = {}
622 for chunk_header in self._chunk_headers.itervalues():
623 if chunk_header.compression not in compression_counts:
624 compression_counts[chunk_header.compression] = 1
625 compression_uncompressed[chunk_header.compression] = chunk_header.uncompressed_size
626 compression_compressed[chunk_header.compression] = chunk_header.compressed_size
627 else:
628 compression_counts[chunk_header.compression] += 1
629 compression_uncompressed[chunk_header.compression] += chunk_header.uncompressed_size
630 compression_compressed[chunk_header.compression] += chunk_header.compressed_size
631
632 chunk_count = len(self._chunk_headers)
633
634 main_compression_count, main_compression = list(reversed(sorted([(v, k) for k, v in compression_counts.items()])))[0]
635 s += 'compression: %s\n' % str(main_compression)
636
637 all_uncompressed = (sum([count for c, count in compression_counts.items() if c != Compression.NONE]) == 0)
638 if not all_uncompressed:
639 s += 'uncompressed: %d\n' % sum((h.uncompressed_size for h in self._chunk_headers.itervalues()))
640 s += 'compressed: %d\n' % sum((h.compressed_size for h in self._chunk_headers.itervalues()))
641
642 datatypes = set()
643 datatype_infos = []
644 for connection in self._connections.itervalues():
645 if connection.datatype in datatypes:
646 continue
647 datatype_infos.append((connection.datatype, connection.md5sum, connection.msg_def))
648 datatypes.add(connection.datatype)
649
650 topics = sorted(set([c.topic for c in self._get_connections()]))
651 topic_datatypes = {}
652 topic_conn_counts = {}
653 topic_msg_counts = {}
654 topic_freqs_median = {}
655 for topic in topics:
656 connections = list(self._get_connections(topic))
657
658 topic_datatypes[topic] = connections[0].datatype
659 topic_conn_counts[topic] = len(connections)
660
661 msg_count = 0
662 for connection in connections:
663 for chunk in self._chunks:
664 msg_count += chunk.connection_counts.get(connection.id, 0)
665 topic_msg_counts[topic] = msg_count
666
667 if self._connection_indexes_read:
668 stamps = [entry.time.to_sec() for entry in self._get_entries(connections)]
669 if len(stamps) > 1:
670 periods = [s1 - s0 for s1, s0 in zip(stamps[1:], stamps[:-1])]
671 med_period = _median(periods)
672 if med_period > 0.0:
673 topic_freqs_median[topic] = 1.0 / med_period
674
675 topics = sorted(topic_datatypes.iterkeys())
676 max_topic_len = max([len(topic) for topic in topics])
677 max_datatype_len = max([len(datatype) for datatype in datatypes])
678 max_msg_count_len = max([len('%d' % msg_count) for msg_count in topic_msg_counts.itervalues()])
679 max_freq_median_len = max([len(_human_readable_frequency(freq)) for freq in topic_freqs_median.itervalues()]) if len(topic_freqs_median) > 0 else 0
680
681
682 s += 'types:\n'
683 for i, (datatype, md5sum, msg_def) in enumerate(sorted(datatype_infos)):
684 s += ' - type: %s\n' % datatype
685 s += ' md5: %s\n' % md5sum
686
687
688 s += 'topics:\n'
689 for i, topic in enumerate(topics):
690 topic_msg_count = topic_msg_counts[topic]
691
692 s += ' - topic: %s\n' % topic
693 s += ' type: %s\n' % topic_datatypes[topic]
694 s += ' messages: %d\n' % topic_msg_count
695
696 if topic_conn_counts[topic] > 1:
697 s += ' connections: %d\n' % topic_conn_counts[topic]
698
699 if topic in topic_freqs_median:
700 s += ' frequency: %.4f\n' % topic_freqs_median[topic]
701
702 if not key:
703 return s
704
705 class DictObject(object):
706 def __init__(self, d):
707 for a, b in d.items():
708 if isinstance(b, (list, tuple)):
709 setattr(self, a, [DictObject(x) if isinstance(x, dict) else x for x in b])
710 else:
711 setattr(self, a, DictObject(b) if isinstance(b, dict) else b)
712
713 obj = DictObject(yaml.load(s))
714 try:
715 val = eval('obj.' + key)
716 except Exception, ex:
717 print >> sys.stderr, 'Error getting key "%s"' % key
718 return None
719
720 def print_yaml(val, indent=0):
721 indent_str = ' ' * indent
722
723 if type(val) is list:
724 s = ''
725 for item in val:
726 s += '%s- %s\n' % (indent_str, print_yaml(item, indent + 1))
727 return s
728 elif type(val) is DictObject:
729 s = ''
730 for i, (k, v) in enumerate(val.__dict__.items()):
731 if i != 0:
732 s += indent_str
733 s += '%s: %s' % (k, str(v))
734 if i < len(val.__dict__) - 1:
735 s += '\n'
736 return s
737 else:
738 return indent_str + str(val)
739
740 return print_yaml(val)
741
742 except Exception, ex:
743 raise
744
745
746
747 @property
749 if not self._chunk_headers:
750 return False
751
752 return any((h.compression != Compression.NONE for h in self._chunk_headers.itervalues()))
753
754 @property
756 if not self._chunk_headers:
757 return self.size
758
759 return sum((h.uncompressed_size for h in self._chunk_headers.itervalues()))
760
762 """
763 Read the message from the given position in the file.
764 """
765 self.flush()
766 return self._reader.seek_and_read_message_data_record(position, raw)
767
768
769
771 """
772 Yield the connections, optionally filtering by topic and/or connection information.
773 """
774 if topics:
775 if type(topics) is str:
776 topics = set([roslib.names.canonicalize_name(topics)])
777 else:
778 topics = set([roslib.names.canonicalize_name(t) for t in topics])
779
780 for c in self._connections.itervalues():
781 if topics and c.topic not in topics and roslib.names.canonicalize_name(c.topic) not in topics:
782 continue
783 if connection_filter and not connection_filter(c.topic, c.datatype, c.md5sum, c.msg_def, c.header):
784 continue
785 yield c
786
787 - def _get_entries(self, connections=None, start_time=None, end_time=None):
788 """
789 Yield index entries on the given connections in the given time range.
790 """
791 for entry, _ in _mergesort(self._get_indexes(connections), key=lambda entry: entry.time):
792 if start_time and entry.time < start_time:
793 continue
794 if end_time and entry.time > end_time:
795 return
796 yield entry
797
799 """
800 Yield index entries on the given connections in the given time range in reverse order.
801 """
802 for entry, _ in _mergesort((reversed(index) for index in self._get_indexes(connections)), key=lambda entry: -entry.time.to_sec()):
803 if end_time and entry.time > end_time:
804 continue
805 if start_time and entry.time < start_time:
806 return
807 yield entry
808
809 - def _get_entry(self, t, connections=None):
810 """
811 Return the first index entry on/before the given time on the given connections
812 """
813 indexes = self._get_indexes(connections)
814
815 entry = _IndexEntry(t)
816
817 first_entry = None
818
819 for index in indexes:
820 i = bisect.bisect_right(index, entry) - 1
821 if i >= 0:
822 index_entry = index[i]
823 if first_entry is None or index_entry > first_entry:
824 first_entry = index_entry
825
826 return first_entry
827
828 - def _get_entry_after(self, t, connections=None):
829 """
830 Return the first index entry after the given time on the given connections
831 """
832 indexes = self._get_indexes(connections)
833
834 entry = _IndexEntry(t)
835
836 first_entry = None
837
838 for index in indexes:
839 i = bisect.bisect_right(index, entry)
840 if i <= len(index) - 1:
841 index_entry = index[i]
842 if first_entry is None or index_entry < first_entry:
843 first_entry = index_entry
844
845 return first_entry
846
848 """
849 Get the indexes for the given connections.
850 """
851 if not self._connection_indexes_read:
852 self._reader._read_connection_index_records()
853
854 if connections is None:
855 return self._connection_indexes.itervalues()
856 else:
857 return (self._connection_indexes[c.id] for c in connections)
858
859
860
862 self._connection_indexes_read = False
863 self._connection_indexes = {}
864
865 self._topic_connections = {}
866 self._connections = {}
867 self._connection_count = 0
868 self._chunk_count = 0
869 self._chunks = []
870 self._chunk_headers = {}
871
872 self._chunk_open = False
873 self._curr_chunk_info = None
874 self._curr_chunk_data_pos = None
875 self._curr_chunk_connection_indexes = {}
876
877 - def _open(self, f, mode, allow_unindexed):
878 if not f:
879 raise ValueError('filename (or stream) is invalid')
880
881 try:
882 if mode == 'r': self._open_read(f, allow_unindexed)
883 elif mode == 'w': self._open_write(f)
884 elif mode == 'a': self._open_append(f, allow_unindexed)
885 else:
886 raise ValueError('mode "%s" is invalid' % mode)
887 except struct.error:
888 raise ROSBagFormatException('error with bag')
889
891 if isinstance(f, file):
892 self._file = f
893 self._filename = None
894 else:
895 self._file = open(f, 'rb')
896 self._filename = f
897
898 self._mode = 'r'
899
900 try:
901 self._version = self._read_version()
902 except:
903 self._version = None
904 self._close_file()
905 raise
906
907 try:
908 self._create_reader()
909 self._reader.start_reading()
910 except ROSBagUnindexedException, ex:
911 if not allow_unindexed:
912 self._close_file()
913 raise
914 except:
915 self._close_file()
916 raise
917
919 if isinstance(f, file):
920 self._file = f
921 self._filename = None
922 else:
923 self._file = open(f, 'w+b')
924 self._filename = f
925
926 self._mode = 'w'
927
928 self._version = 200
929
930 try:
931 self._create_reader()
932 self._start_writing()
933 except:
934 self._close_file()
935 raise
936
938 if isinstance(f, file):
939 self._file = f
940 self._filename = None
941 else:
942 try:
943
944 open(f, 'r').close()
945
946
947 self._file = open(f, 'r+b')
948 except IOError:
949
950 self._file = open(f, 'w+b')
951
952 self._filename = f
953
954 self._mode = 'a'
955
956 try:
957 self._version = self._read_version()
958 except:
959 self._version = None
960 self._close_file()
961 raise
962
963 if self._version != 200:
964 self._file.close()
965 raise ROSBagException('bag version %d is unsupported for appending' % self._version)
966
967 try:
968 self._start_appending()
969 except ROSBagUnindexedException:
970 if not allow_unindexed:
971 self._close_file()
972 raise
973 except:
974 self._close_file()
975 raise
976
978 self._file.close()
979 self._file = None
980
982 """
983 @raise ROSBagException: if the bag version is unsupported
984 """
985 major_version, minor_version = self._version / 100, self._version % 100
986 if major_version == 2:
987 self._reader = _BagReader200(self)
988 elif major_version == 1:
989 if minor_version == 1:
990 self._reader = _BagReader101(self)
991 else:
992
993 first_record_pos = self._file.tell()
994 header = _read_header(self._file)
995 op = _read_uint8_field(header, 'op')
996 self._file.seek(first_record_pos)
997
998 if op == _OP_FILE_HEADER:
999 self._reader = _BagReader102_Indexed(self)
1000 else:
1001 self._reader = _BagReader102_Unindexed(self)
1002 else:
1003 raise ROSBagException('unknown bag version %d' % self._version)
1004
1006 """
1007 @raise ROSBagException: if the file is empty, or the version line can't be parsed
1008 """
1009 version_line = self._file.readline().rstrip()
1010 if len(version_line) == 0:
1011 raise ROSBagException('empty file')
1012
1013 matches = re.match("#ROS(.*) V(\d).(\d)", version_line)
1014 if matches is None or len(matches.groups()) != 3:
1015 raise ROSBagException('This does not appear to be a bag file')
1016
1017 version_type, major_version_str, minor_version_str = matches.groups()
1018
1019 version = int(major_version_str) * 100 + int(minor_version_str)
1020
1021 return version
1022
1024 self._file.write(_VERSION + '\n')
1025 self._file_header_pos = self._file.tell()
1026 self._write_file_header_record(0, 0, 0)
1027
1029
1030 self.flush()
1031
1032
1033 self._index_data_pos = self._file.tell()
1034
1035
1036 for connection_info in self._connections.itervalues():
1037 self._write_connection_record(connection_info)
1038
1039
1040 for chunk_info in self._chunks:
1041 self._write_chunk_info_record(chunk_info)
1042
1043
1044 self._file.seek(self._file_header_pos)
1045 self._write_file_header_record(self._index_data_pos, len(self._connections), len(self._chunks))
1046
1048 self._file_header_pos = self._file.tell()
1049
1050 self._create_reader()
1051 self._reader.start_reading()
1052
1053
1054 self._file.truncate(self._index_data_pos)
1055 self._reader.index_data_pos = 0
1056
1057
1058 self._file.seek(self._file_header_pos);
1059 self._write_file_header_record(0, 0, 0)
1060
1061
1062 self._file.seek(0, os.SEEK_END)
1063
1065 self._curr_chunk_info = _ChunkInfo(self._file.tell(), t, t)
1066 self._write_chunk_header(_ChunkHeader(self._compression, 0, 0))
1067 self._curr_chunk_data_pos = self._file.tell()
1068 self._set_compression_mode(self._compression)
1069 self._chunk_open = True
1070
1072 if self._compression == Compression.NONE:
1073 return self._file.tell() - self._curr_chunk_data_pos
1074 else:
1075 return self._output_file.compressed_bytes_in
1076
1078
1079 self._chunks.append(self._curr_chunk_info)
1080
1081
1082 uncompressed_size = self._get_chunk_offset()
1083 self._set_compression_mode(Compression.NONE)
1084 compressed_size = self._file.tell() - self._curr_chunk_data_pos
1085
1086
1087 end_of_chunk_pos = self._file.tell()
1088 self._file.seek(self._curr_chunk_info.pos)
1089 chunk_header = _ChunkHeader(self._compression, compressed_size, uncompressed_size, self._curr_chunk_data_pos)
1090 self._write_chunk_header(chunk_header)
1091 self._chunk_headers[self._curr_chunk_info.pos] = chunk_header
1092
1093
1094 self._file.seek(end_of_chunk_pos)
1095 for connection_id, entries in self._curr_chunk_connection_indexes.items():
1096 self._write_connection_index_record(connection_id, entries)
1097 self._curr_chunk_connection_indexes.clear()
1098
1099
1100 self._chunk_open = False
1101
1116
1118 header = {
1119 'op': _pack_uint8(_OP_FILE_HEADER),
1120 'index_pos': _pack_uint64(index_pos),
1121 'conn_count': _pack_uint32(connection_count),
1122 'chunk_count': _pack_uint32(chunk_count)
1123 }
1124 _write_record(self._file, header, padded_size=_FILE_HEADER_LENGTH)
1125
1127 header = {
1128 'op': _pack_uint8(_OP_CONNECTION),
1129 'topic': connection_info.topic,
1130 'conn': _pack_uint32(connection_info.id)
1131 }
1132
1133 _write_header(self._output_file, header)
1134
1135 _write_header(self._output_file, connection_info.header)
1136
1138 header = {
1139 'op': _pack_uint8(_OP_MSG_DATA),
1140 'conn': _pack_uint32(connection_id),
1141 'time': _pack_time(t)
1142 }
1143 _write_record(self._output_file, header, serialized_bytes)
1144
1146 header = {
1147 'op': _pack_uint8(_OP_CHUNK),
1148 'compression': chunk_header.compression,
1149 'size': _pack_uint32(chunk_header.uncompressed_size)
1150 }
1151 _write_header(self._file, header)
1152
1153 self._file.write(_pack_uint32(chunk_header.compressed_size))
1154
1156 header = {
1157 'op': _pack_uint8(_OP_INDEX_DATA),
1158 'conn': _pack_uint32(connection_id),
1159 'ver': _pack_uint32(_INDEX_VERSION),
1160 'count': _pack_uint32(len(entries))
1161 }
1162
1163 buffer = self._buffer
1164 buffer.seek(0)
1165 buffer.truncate(0)
1166 for entry in entries:
1167 buffer.write(_pack_time (entry.time))
1168 buffer.write(_pack_uint32(entry.offset))
1169
1170 _write_record(self._file, header, buffer.getvalue())
1171
1173 header = {
1174 'op': _pack_uint8 (_OP_CHUNK_INFO),
1175 'ver': _pack_uint32(_CHUNK_INDEX_VERSION),
1176 'chunk_pos': _pack_uint64(chunk_info.pos),
1177 'start_time': _pack_time(chunk_info.start_time),
1178 'end_time': _pack_time(chunk_info.end_time),
1179 'count': _pack_uint32(len(chunk_info.connection_counts))
1180 }
1181
1182 buffer = self._buffer
1183 buffer.seek(0)
1184 buffer.truncate(0)
1185 for connection_id, count in chunk_info.connection_counts.items():
1186 buffer.write(_pack_uint32(connection_id))
1187 buffer.write(_pack_uint32(count))
1188
1189 _write_record(self._file, header, buffer.getvalue())
1190
1191
1192
1193 _message_types = {}
1194
1195 _OP_MSG_DEF = 0x01
1196 _OP_MSG_DATA = 0x02
1197 _OP_FILE_HEADER = 0x03
1198 _OP_INDEX_DATA = 0x04
1199 _OP_CHUNK = 0x05
1200 _OP_CHUNK_INFO = 0x06
1201 _OP_CONNECTION = 0x07
1202
1203 _OP_CODES = {
1204 _OP_MSG_DEF: 'MSG_DEF',
1205 _OP_MSG_DATA: 'MSG_DATA',
1206 _OP_FILE_HEADER: 'FILE_HEADER',
1207 _OP_INDEX_DATA: 'INDEX_DATA',
1208 _OP_CHUNK: 'CHUNK',
1209 _OP_CHUNK_INFO: 'CHUNK_INFO',
1210 _OP_CONNECTION: 'CONNECTION'
1211 }
1212
1213 _VERSION = '#ROSBAG V2.0'
1214 _FILE_HEADER_LENGTH = 4096
1215 _INDEX_VERSION = 1
1216 _CHUNK_INDEX_VERSION = 1
1219 - def __init__(self, id, topic, header):
1220 try:
1221 datatype, md5sum, msg_def = header['type'], header['md5sum'], header['message_definition']
1222 except KeyError, ex:
1223 raise ROSBagFormatException('connection header field %s not found' % str(ex))
1224
1225 self.id = id
1226 self.topic = topic
1227 self.datatype = datatype
1228 self.md5sum = md5sum
1229 self.msg_def = msg_def
1230 self.header = header
1231
1233 return '%d on %s: %s' % (self.id, self.topic, str(self.header))
1234
1236 - def __init__(self, pos, start_time, end_time):
1237 self.pos = pos
1238 self.start_time = start_time
1239 self.end_time = end_time
1240
1241 self.connection_counts = {}
1242
1244 s = 'chunk_pos: %d\n' % self.pos
1245 s += 'start_time: %s\n' % str(self.start_time)
1246 s += 'end_time: %s\n' % str(self.end_time)
1247 s += 'connections: %d\n' % len(self.connection_counts)
1248 s += '\n'.join([' - [%4d] %d' % (connection_id, count) for connection_id, count in self.connection_counts.items()])
1249 return s
1250
1253 self.compression = compression
1254 self.compressed_size = compressed_size
1255 self.uncompressed_size = uncompressed_size
1256 self.data_pos = data_pos
1257
1259 if self.uncompressed_size > 0:
1260 ratio = 100 * (float(self.compressed_size) / self.uncompressed_size)
1261 return 'compression: %s, size: %d, uncompressed: %d (%.2f%%)' % (self.compression, self.compressed_size, self.uncompressed_size, ratio)
1262 else:
1263 return 'compression: %s, size: %d, uncompressed: %d' % (self.compression, self.compressed_size, self.uncompressed_size)
1264
1265 -class _IndexEntry(object):
1266 - def __init__(self, time):
1268
1269 - def __cmp__(self, other):
1270 return self.time.__cmp__(other.time)
1271
1272 -class _IndexEntry102(_IndexEntry):
1273 - def __init__(self, time, offset):
1274 self.time = time
1275 self.offset = offset
1276
1277 @property
1278 - def position(self):
1280
1281 - def __str__(self):
1282 return '%d.%d: %d' % (self.time.secs, self.time.nsecs, self.offset)
1283
1284 -class _IndexEntry200(_IndexEntry):
1285 - def __init__(self, time, chunk_pos, offset):
1286 self.time = time
1287 self.chunk_pos = chunk_pos
1288 self.offset = offset
1289
1290 @property
1291 - def position(self):
1292 return (self.chunk_pos, self.offset)
1293
1294 - def __str__(self):
1295 return '%d.%d: %d+%d' % (self.time.secs, self.time.nsecs, self.chunk_pos, self.offset)
1296
1298 message_type = _message_types.get(info.md5sum)
1299 if message_type is None:
1300 try:
1301 message_type = roslib.genpy.generate_dynamic(info.datatype, info.msg_def)[info.datatype]
1302 if (message_type._md5sum != info.md5sum):
1303 print >> sys.stderr, 'WARNING: For type [%s] stored md5sum [%s] does not match message definition [%s].\n Try: "rosrun rosbag fix_msg_defs.py old_bag new_bag."'%(info.datatype, info.md5sum, message_type._md5sum)
1304 except roslib.msgs.MsgSpecException:
1305 message_type = roslib.genpy.generate_dynamic(info.datatype, "")[info.datatype]
1306 print >> sys.stderr, 'WARNING: For type [%s] stored md5sum [%s] has invalid message definition."'%(info.datatype, info.md5sum)
1307 except roslib.genpy.MsgGenerationException, ex:
1308 raise ROSBagException('Error generating datatype %s: %s' % (info.datatype, str(ex)))
1309
1310 _message_types[info.md5sum] = message_type
1311
1312 return message_type
1313
1317 -def _read_time (f): return _unpack_time (f.read(8))
1318
1322 -def _unpack_time(v): return rospy.Time(*struct.unpack('<LL', v))
1323
1327 -def _pack_time(v): return _pack_uint32(v.secs) + _pack_uint32(v.nsecs)
1328
1330 data = f.read(size)
1331 if len(data) != size:
1332 raise ROSBagException('expecting %d bytes, read %d' % (size, len(data)))
1333 return data
1334
1336 _skip_sized(f)
1337 _skip_sized(f)
1338
1340 size = _read_uint32(f)
1341 f.seek(size, os.SEEK_CUR)
1342
1344 try:
1345 size = _read_uint32(f)
1346 except struct.error, ex:
1347 raise ROSBagFormatException('error unpacking uint32: %s' % str(ex))
1348 return _read(f, size)
1349
1351 f.write(_pack_uint32(len(v)))
1352 f.write(v)
1353
1355 if field not in header:
1356 raise ROSBagFormatException('expected "%s" field in record' % field)
1357
1358 try:
1359 value = unpack_fn(header[field])
1360 except Exception, ex:
1361 raise ROSBagFormatException('error reading field "%s": %s' % (field, str(ex)))
1362
1363 return value
1364
1365 -def _read_str_field (header, field): return _read_field(header, field, lambda v: v)
1366 -def _read_uint8_field (header, field): return _read_field(header, field, _unpack_uint8)
1369 -def _read_time_field (header, field): return _read_field(header, field, _unpack_time)
1370
1372 header_str = _write_header(f, header)
1373
1374 if padded_size is not None:
1375 header_len = len(header_str)
1376 if header_len < padded_size:
1377 data = ' ' * (padded_size - header_len)
1378 else:
1379 data = ''
1380
1381 _write_sized(f, data)
1382
1384 header_str = ''.join([_pack_uint32(len(k) + 1 + len(v)) + k + '=' + v for k, v in header.items()])
1385 _write_sized(f, header_str)
1386 return header_str
1387
1389 bag_pos = f.tell()
1390
1391
1392 try:
1393 header = _read_sized(f)
1394 except ROSBagException, ex:
1395 raise ROSBagFormatException('Error reading header: %s' % str(ex))
1396
1397
1398 header_dict = {}
1399 while header != '':
1400
1401 if len(header) < 4:
1402 raise ROSBagFormatException('Error reading header field')
1403 (size,) = struct.unpack('<L', header[:4])
1404 header = header[4:]
1405
1406
1407 if len(header) < size:
1408 raise ROSBagFormatException('Error reading header field: expected %d bytes, read %d' % (size, len(header)))
1409 (name, sep, value) = header[:size].partition('=')
1410 if sep == '':
1411 raise ROSBagFormatException('Error reading header field')
1412
1413 header_dict[name] = value
1414
1415 header = header[size:]
1416
1417
1418 if req_op is not None:
1419 op = _read_uint8_field(header_dict, 'op')
1420 if req_op != op:
1421 raise ROSBagFormatException('Expected op code: %s, got %s' % (_OP_CODES[req_op], _OP_CODES[op]))
1422
1423 return header_dict
1424
1426 pos = f.tell()
1427 header = _read_header(f)
1428 op = _read_uint8_field(header, 'op')
1429 f.seek(pos)
1430 return op
1431
1433 try:
1434 record_data = _read_sized(f)
1435 except ROSBagException, ex:
1436 raise ROSBagFormatException('Error reading record data: %s' % str(ex))
1437
1438 return record_data
1439
1443
1445 raise NotImplementedError()
1446
1447 - def read_messages(self, topics, start_time, end_time, connection_filter, raw):
1448 raise NotImplementedError()
1449
1451 raise NotImplementedError()
1452
1454 """
1455 Support class for reading v1.1 bag files.
1456 """
1459
1462
1463 - def read_messages(self, topics, start_time, end_time, topic_filter, raw):
1464 f = self.bag._file
1465
1466 while True:
1467 bag_pos = f.tell()
1468
1469
1470 topic = f.readline().rstrip()
1471 if not topic:
1472 return
1473
1474 md5sum = f.readline().rstrip()
1475 datatype = f.readline().rstrip()
1476
1477
1478 if datatype in ['rostools/Header', 'rostools/Log', 'rostools/Time']:
1479 datatype = datatype.replace('rostools', 'roslib')
1480
1481
1482 data = f.read(12)
1483 if len(data) != 12:
1484 raise ROSBagFormatException('Bag file appears to be corrupt (1)')
1485 (time_sec, time_nsec, length) = struct.unpack("<LLL", data)
1486 t = rospy.Time(time_sec, time_nsec)
1487
1488
1489 data = f.read(length)
1490 if len(data) != length:
1491 raise ROSBagFormatException('Bag file appears to be corrupt (2)')
1492
1493 try:
1494 pytype = _message_types[md5sum]
1495 except KeyError:
1496 try:
1497 pytype = roslib.message.get_message_class(datatype)
1498 except Exception:
1499 pytype = None
1500
1501 if pytype is None:
1502 raise ROSBagException('Cannot deserialize messages of type [%s]: cannot locate message class' % datatype)
1503 else:
1504 if pytype._md5sum != md5sum:
1505 (package, type) = datatype.split('/')
1506 if roslib.gentools.compute_md5_v1(roslib.gentools.get_file_dependencies(roslib.msgs.msg_file(package,type))) == md5sum:
1507 print 'In V1.1 Logfile, found old md5sum for type [%s]. Allowing implicit migration to new md5sum.' % datatype
1508 else:
1509 raise ROSBagException('Cannot deserialize messages of type [%s]: md5sum is outdated in V1.1 bagfile' % datatype)
1510 _message_types[md5sum] = pytype
1511
1512 if raw:
1513 msg = datatype, data, pytype._md5sum, bag_pos, pytype
1514 else:
1515 msg = pytype()
1516 msg.deserialize(data)
1517
1518 yield topic, msg, t
1519
1521 """
1522 Support class for reading unindexed v1.2 bag files.
1523 """
1526
1528 self.bag._file_header_pos = self.bag._file.tell()
1529
1531 """Generates all bag index information by rereading the message records."""
1532 f = self.bag._file
1533
1534 total_bytes = self.bag.size
1535
1536
1537 self.bag._file.seek(self.bag._file_header_pos)
1538
1539 offset = f.tell()
1540
1541
1542 while offset < total_bytes:
1543 yield offset
1544
1545 op = _peek_next_header_op(f)
1546
1547 if op == _OP_MSG_DEF:
1548 connection_info = self.read_message_definition_record()
1549
1550 if connection_info.topic not in self.bag._topic_connections:
1551 self.bag._topic_connections[connection_info.topic] = connection_info.id
1552 self.bag._connections[connection_info.id] = connection_info
1553 self.bag._connection_indexes[connection_info.id] = []
1554
1555 elif op == _OP_MSG_DATA:
1556
1557 header = _read_header(f)
1558
1559 topic = _read_str_field(header, 'topic')
1560 secs = _read_uint32_field(header, 'sec')
1561 nsecs = _read_uint32_field(header, 'nsec')
1562 t = roslib.rostime.Time(secs, nsecs)
1563
1564 if topic not in self.bag._topic_connections:
1565 datatype = _read_str_field(header, 'type')
1566 self._create_connection_info_for_datatype(topic, datatype)
1567
1568 connection_id = self.bag._topic_connections[topic]
1569 info = self.bag._connections[connection_id]
1570
1571
1572 _skip_sized(f)
1573
1574
1575 bisect.insort_right(self.bag._connection_indexes[connection_id], _IndexEntry102(t, offset))
1576
1577 offset = f.tell()
1578
1579 - def read_messages(self, topics, start_time, end_time, topic_filter, raw):
1580 f = self.bag._file
1581
1582 f.seek(self.bag._file_header_pos)
1583
1584 while True:
1585
1586 while True:
1587 position = f.tell()
1588
1589 try:
1590 header = _read_header(f)
1591 except Exception:
1592 return
1593
1594 op = _read_uint8_field(header, 'op')
1595 if op != _OP_MSG_DEF:
1596 break
1597
1598 connection_info = self.read_message_definition_record(header)
1599
1600 if connection_info.topic not in self.bag._topic_connections:
1601 self.bag._topic_connections[connection_info.topic] = connection_info.id
1602
1603 self.bag._connections[connection_info.id] = connection_info
1604
1605
1606 if op != _OP_MSG_DATA:
1607 raise ROSBagFormatException('Expecting OP_MSG_DATA, got %d' % op)
1608
1609 topic = _read_str_field(header, 'topic')
1610
1611 if topic not in self.bag._topic_connections:
1612 datatype = _read_str_field(header, 'type')
1613 self._create_connection_info_for_datatype(topic, datatype)
1614
1615 connection_id = self.bag._topic_connections[topic]
1616 info = self.bag._connections[connection_id]
1617
1618
1619 try:
1620 msg_type = _get_message_type(info)
1621 except KeyError:
1622 raise ROSBagException('Cannot deserialize messages of type [%s]. Message was not preceeded in bagfile by definition' % info.datatype)
1623
1624
1625 secs = _read_uint32_field(header, 'sec')
1626 nsecs = _read_uint32_field(header, 'nsec')
1627 t = roslib.rostime.Time(secs, nsecs)
1628
1629
1630 data = _read_record_data(f)
1631
1632 if raw:
1633 msg = (info.datatype, data, info.md5sum, position, msg_type)
1634 else:
1635
1636 msg = msg_type()
1637 msg.deserialize(data)
1638
1639 yield (topic, msg, t)
1640
1641 self.bag._connection_indexes_read = True
1642
1644 for c in self.bag._connections.itervalues():
1645 if c.datatype == datatype:
1646 connection_id = len(self.bag._connections)
1647 connection_header = { 'topic' : topic, 'type' : c.header['type'], 'md5sum' : c.header['md5sum'], 'message_definition' : c.header['message_definition'] }
1648 connection_info = _ConnectionInfo(connection_id, topic, connection_header)
1649
1650 self.bag._topic_connections[topic] = connection_id
1651 self.bag._connections[connection_id] = connection_info
1652 self.bag._connection_indexes[connection_id] = []
1653 return
1654
1655 raise ROSBagFormatException('Topic %s of datatype %s not preceded by message definition' % (topic, datatype))
1656
1658 if not header:
1659 header = _read_header(self.bag._file, _OP_MSG_DEF)
1660
1661 topic = _read_str_field(header, 'topic')
1662 datatype = _read_str_field(header, 'type')
1663 md5sum = _read_str_field(header, 'md5')
1664 msg_def = _read_str_field(header, 'def')
1665
1666 _skip_sized(self.bag._file)
1667
1668 connection_header = { 'topic' : topic, 'type' : datatype, 'md5sum' : md5sum, 'message_definition' : msg_def }
1669
1670 id = len(self.bag._connections)
1671
1672 return _ConnectionInfo(id, topic, connection_header)
1673
1675 """
1676 Support class for reading indexed v1.2 bag files.
1677 """
1680
1681 - def read_messages(self, topics, start_time, end_time, connection_filter, raw):
1682 connections = self.bag._get_connections(topics, connection_filter)
1683 for entry in self.bag._get_entries(connections, start_time, end_time):
1684 yield self.seek_and_read_message_data_record(entry.offset, raw)
1685
1687 """Generates all bag index information by rereading the message records."""
1688 f = self.bag._file
1689
1690 total_bytes = self.bag.size
1691
1692
1693 self.bag._file.seek(self.bag._file_header_pos)
1694 self.read_file_header_record()
1695
1696 offset = f.tell()
1697
1698
1699 while offset < total_bytes:
1700 yield offset
1701
1702 op = _peek_next_header_op(f)
1703
1704 if op == _OP_MSG_DEF:
1705 connection_info = self.read_message_definition_record()
1706
1707 if connection_info.topic not in self.bag._topic_connections:
1708 self.bag._topic_connections[connection_info.topic] = connection_info.id
1709 self.bag._connections[connection_info.id] = connection_info
1710 self.bag._connection_indexes[connection_info.id] = []
1711
1712 elif op == _OP_MSG_DATA:
1713
1714 header = _read_header(f)
1715
1716 topic = _read_str_field(header, 'topic')
1717 secs = _read_uint32_field(header, 'sec')
1718 nsecs = _read_uint32_field(header, 'nsec')
1719 t = roslib.rostime.Time(secs, nsecs)
1720
1721 if topic not in self.bag._topic_connections:
1722 datatype = _read_str_field(header, 'type')
1723 self._create_connection_info_for_datatype(topic, datatype)
1724
1725 connection_id = self.bag._topic_connections[topic]
1726 info = self.bag._connections[connection_id]
1727
1728
1729 _skip_sized(f)
1730
1731
1732 bisect.insort_right(self.bag._connection_indexes[connection_id], _IndexEntry102(t, offset))
1733
1734 elif op == _OP_INDEX_DATA:
1735 _skip_record(f)
1736
1737 offset = f.tell()
1738
1740 try:
1741
1742 self.read_file_header_record()
1743
1744 total_bytes = self.bag.size
1745
1746
1747 if self.bag._index_data_pos == 0:
1748 raise ROSBagUnindexedException()
1749
1750
1751 self.bag._file.seek(self.bag._index_data_pos)
1752
1753
1754 topic_indexes = {}
1755 while True:
1756 pos = self.bag._file.tell()
1757 if pos >= total_bytes:
1758 break
1759
1760 topic, index = self.read_topic_index_record()
1761
1762 topic_indexes[topic] = index
1763
1764
1765 for topic, index in topic_indexes.items():
1766 self.bag._file.seek(index[0].offset)
1767
1768 connection_info = self.read_message_definition_record()
1769
1770 if connection_info.topic not in self.bag._topic_connections:
1771 self.bag._topic_connections[connection_info.topic] = connection_info.id
1772 self.bag._connections[connection_info.id] = connection_info
1773
1774 self.bag._connection_indexes[connection_info.id] = index
1775
1776 self.bag._connection_indexes_read = True
1777
1778 except Exception, ex:
1779 raise ROSBagUnindexedException()
1780
1782 self.bag._file_header_pos = self.bag._file.tell()
1783
1784 header = _read_header(self.bag._file, _OP_FILE_HEADER)
1785
1786 self.bag._index_data_pos = _read_uint64_field(header, 'index_pos')
1787
1788 _skip_sized(self.bag._file)
1789
1791 f = self.bag._file
1792
1793 header = _read_header(f, _OP_INDEX_DATA)
1794
1795 index_version = _read_uint32_field(header, 'ver')
1796 topic = _read_str_field (header, 'topic')
1797 count = _read_uint32_field(header, 'count')
1798
1799 if index_version != 0:
1800 raise ROSBagFormatException('expecting index version 0, got %d' % index_version)
1801
1802 _read_uint32(f)
1803
1804 topic_index = []
1805
1806 for i in range(count):
1807 time = _read_time (f)
1808 offset = _read_uint64(f)
1809
1810 topic_index.append(_IndexEntry102(time, offset))
1811
1812 return (topic, topic_index)
1813
1815 f = self.bag._file
1816
1817
1818 f.seek(position)
1819
1820
1821 while True:
1822 header = _read_header(f)
1823 op = _read_uint8_field(header, 'op')
1824 if op != _OP_MSG_DEF:
1825 break
1826 _skip_sized(f)
1827
1828
1829 if op != _OP_MSG_DATA:
1830 raise ROSBagFormatException('Expecting OP_MSG_DATA, got %d' % op)
1831
1832 topic = _read_str_field(header, 'topic')
1833
1834 connection_id = self.bag._topic_connections[topic]
1835 info = self.bag._connections[connection_id]
1836
1837
1838 try:
1839 msg_type = _get_message_type(info)
1840 except KeyError:
1841 raise ROSBagException('Cannot deserialize messages of type [%s]. Message was not preceeded in bagfile by definition' % info.datatype)
1842
1843
1844 secs = _read_uint32_field(header, 'sec')
1845 nsecs = _read_uint32_field(header, 'nsec')
1846 t = roslib.rostime.Time(secs, nsecs)
1847
1848
1849 data = _read_record_data(f)
1850
1851 if raw:
1852 msg = info.datatype, data, info.md5sum, position, msg_type
1853 else:
1854
1855 msg = msg_type()
1856 msg.deserialize(data)
1857
1858 return (topic, msg, t)
1859
1861 """
1862 Support class for reading v2.0 bag files.
1863 """
1865 _BagReader.__init__(self, bag)
1866
1867 self.decompressed_chunk_pos = None
1868 self.decompressed_chunk = None
1869 self.decompressed_chunk_io = None
1870
1872 """
1873 Generates all bag index information by rereading the chunks.
1874 Assumes the file header has been read.
1875 """
1876 f = self.bag._file
1877
1878 f.seek(0, os.SEEK_END)
1879 total_bytes = f.tell()
1880
1881
1882
1883
1884 self._read_terminal_connection_records()
1885
1886
1887 self.bag._file.seek(self.bag._file_header_pos)
1888 self.read_file_header_record()
1889
1890 trunc_pos = None
1891
1892 while True:
1893 chunk_pos = f.tell()
1894 if chunk_pos >= total_bytes:
1895 break
1896
1897 yield chunk_pos
1898
1899 try:
1900 self._reindex_read_chunk(f, chunk_pos, total_bytes)
1901 except Exception, ex:
1902 break
1903
1904 trunc_pos = f.tell()
1905
1906 if trunc_pos and trunc_pos < total_bytes:
1907 f.truncate(trunc_pos)
1908
1910
1911 chunk_header = self.read_chunk_header()
1912
1913
1914 if chunk_header.compressed_size == 0:
1915 raise ROSBagException('unterminated chunk at %d' % chunk_pos)
1916
1917 if chunk_header.compression == Compression.NONE:
1918 chunk_file = f
1919 else:
1920
1921 compressed_chunk = _read(f, chunk_header.compressed_size)
1922
1923
1924 if chunk_header.compression == Compression.BZ2:
1925 self.decompressed_chunk = bz2.decompress(compressed_chunk)
1926 else:
1927 raise ROSBagException('unsupported compression type: %s' % chunk_header.compression)
1928
1929 if self.decompressed_chunk_io:
1930 self.decompressed_chunk_io.close()
1931 self.decompressed_chunk_io = StringIO(self.decompressed_chunk)
1932
1933 chunk_file = self.decompressed_chunk_io
1934
1935
1936 self.bag._curr_chunk_info = None
1937
1938 if chunk_header.compression == Compression.NONE:
1939 offset = chunk_file.tell() - chunk_pos
1940 else:
1941 offset = chunk_file.tell()
1942
1943 expected_index_length = 0
1944
1945 while offset < chunk_header.uncompressed_size:
1946 op = _peek_next_header_op(chunk_file)
1947
1948 if op == _OP_CONNECTION:
1949 connection_info = self.read_connection_record(chunk_file)
1950
1951 if connection_info.id not in self.bag._connections:
1952 self.bag._connections[connection_info.id] = connection_info
1953 if connection_info.id not in self.bag._connection_indexes:
1954 self.bag._connection_indexes[connection_info.id] = []
1955
1956 elif op == _OP_MSG_DATA:
1957
1958 header = _read_header(chunk_file)
1959
1960 connection_id = _read_uint32_field(header, 'conn')
1961 t = _read_time_field (header, 'time')
1962
1963
1964 if not self.bag._curr_chunk_info:
1965 self.bag._curr_chunk_info = _ChunkInfo(chunk_pos, t, t)
1966 else:
1967 if t > self.bag._curr_chunk_info.end_time:
1968 self.bag._curr_chunk_info.end_time = t
1969 elif t < self.bag._curr_chunk_info.start_time:
1970 self.bag._curr_chunk_info.start_time = t
1971 if connection_id in self.bag._curr_chunk_info.connection_counts:
1972 self.bag._curr_chunk_info.connection_counts[connection_id] += 1
1973 else:
1974 self.bag._curr_chunk_info.connection_counts[connection_id] = 1
1975
1976
1977 _skip_sized(chunk_file)
1978
1979
1980 if connection_id not in self.bag._connection_indexes:
1981 raise ROSBagException('connection id (id=%d) in chunk at position %d not preceded by connection record' % (connection_id, chunk_pos))
1982 bisect.insort_right(self.bag._connection_indexes[connection_id], _IndexEntry200(t, chunk_pos, offset))
1983
1984 expected_index_length += 1
1985
1986 else:
1987
1988 _skip_record(chunk_file)
1989
1990 if chunk_header.compression == Compression.NONE:
1991 offset = chunk_file.tell() - chunk_pos
1992 else:
1993 offset = chunk_file.tell()
1994
1995
1996 next_op = _peek_next_header_op(f)
1997
1998 total_index_length = 0
1999
2000 while next_op != _OP_CHUNK:
2001 if next_op == _OP_INDEX_DATA:
2002
2003 _, index = self.read_connection_index_record()
2004 total_index_length += len(index)
2005 else:
2006 _skip_record(f)
2007
2008 if f.tell() >= total_bytes:
2009 if total_index_length != expected_index_length:
2010 raise ROSBagException('index shorter than expected (%d vs %d)' % (total_index_length, expected_index_length))
2011 break
2012
2013 next_op = _peek_next_header_op(f)
2014
2015
2016 self.bag._chunk_headers[chunk_pos] = chunk_header
2017 self.bag._chunks.append(self.bag._curr_chunk_info)
2018
2020 b, f, r = self.bag, self.bag._file, self.bag._reader
2021
2022
2023 f.seek(b._file_header_pos)
2024 r.read_file_header_record()
2025
2026
2027 if self._advance_to_next_record(_OP_CONNECTION):
2028
2029 while True:
2030 connection_info = r.read_connection_record(f)
2031
2032 b._connections[connection_info.id] = connection_info
2033 b._connection_indexes[connection_info.id] = []
2034
2035 next_op = _peek_next_header_op(f)
2036 if next_op != _OP_CONNECTION:
2037 break
2038
2040 b, f = self.bag, self.bag._file
2041
2042 try:
2043 while True:
2044 next_op = _peek_next_header_op(f)
2045 if next_op == op:
2046 break
2047
2048 if next_op == _OP_INDEX_DATA:
2049
2050
2051
2052 if b._curr_chunk_info is None:
2053 b._curr_chunk_info = _ChunkInfo(0, rospy.Time(0, 1), rospy.Time(0, 1))
2054
2055 b._reader.read_connection_index_record()
2056 else:
2057 _skip_record(f)
2058
2059 return True
2060
2061 except Exception, ex:
2062 return False
2063
2065 try:
2066
2067 self.read_file_header_record()
2068
2069
2070 if self.bag._index_data_pos == 0:
2071 raise ROSBagUnindexedException()
2072
2073
2074 self.bag._file.seek(self.bag._index_data_pos)
2075
2076
2077 self.bag._connection_indexes = {}
2078 for i in range(self.bag._connection_count):
2079 connection_info = self.read_connection_record(self.bag._file)
2080 self.bag._connections[connection_info.id] = connection_info
2081 self.bag._connection_indexes[connection_info.id] = []
2082
2083
2084 self.bag._chunks = [self.read_chunk_info_record() for i in range(self.bag._chunk_count)]
2085
2086
2087 self.bag._chunk_headers = {}
2088 for chunk_info in self.bag._chunks:
2089 self.bag._file.seek(chunk_info.pos)
2090 self.bag._chunk_headers[chunk_info.pos] = self.read_chunk_header()
2091
2092 if not self.bag._skip_index:
2093 self._read_connection_index_records()
2094
2095 except Exception, ex:
2096 raise ROSBagUnindexedException()
2097
2099 for chunk_info in self.bag._chunks:
2100 self.bag._file.seek(chunk_info.pos)
2101 _skip_record(self.bag._file)
2102
2103 self.bag._curr_chunk_info = chunk_info
2104 for i in range(len(chunk_info.connection_counts)):
2105 connection_id, index = self.read_connection_index_record()
2106 self.bag._connection_indexes[connection_id].extend(index)
2107
2108
2109
2110
2111 orphan_connection_ids = [id for id, index in self.bag._connection_indexes.iteritems() if not index]
2112 for id in orphan_connection_ids:
2113 del self.bag._connections[id]
2114 del self.bag._connection_indexes[id]
2115
2116 self.bag._connection_indexes_read = True
2117
2118 - def read_messages(self, topics, start_time, end_time, connection_filter, raw):
2119 connections = self.bag._get_connections(topics, connection_filter)
2120 for entry in self.bag._get_entries(connections, start_time, end_time):
2121 yield self.seek_and_read_message_data_record((entry.chunk_pos, entry.offset), raw)
2122
2123
2124
2126 self.bag._file_header_pos = self.bag._file.tell()
2127
2128 header = _read_header(self.bag._file, _OP_FILE_HEADER)
2129
2130 self.bag._index_data_pos = _read_uint64_field(header, 'index_pos')
2131 self.bag._chunk_count = _read_uint32_field(header, 'chunk_count')
2132 self.bag._connection_count = _read_uint32_field(header, 'conn_count')
2133
2134 _skip_sized(self.bag._file)
2135
2137 header = _read_header(f, _OP_CONNECTION)
2138
2139 conn_id = _read_uint32_field(header, 'conn')
2140 topic = _read_str_field (header, 'topic')
2141
2142 connection_header = _read_header(f)
2143
2144 return _ConnectionInfo(conn_id, topic, connection_header)
2145
2147 f = self.bag._file
2148
2149 header = _read_header(f, _OP_CHUNK_INFO)
2150
2151 chunk_info_version = _read_uint32_field(header, 'ver')
2152
2153 if chunk_info_version == 1:
2154 chunk_pos = _read_uint64_field(header, 'chunk_pos')
2155 start_time = _read_time_field (header, 'start_time')
2156 end_time = _read_time_field (header, 'end_time')
2157 connection_count = _read_uint32_field(header, 'count')
2158
2159 chunk_info = _ChunkInfo(chunk_pos, start_time, end_time)
2160
2161 _read_uint32(f)
2162
2163 for i in range(connection_count):
2164 connection_id = _read_uint32(f)
2165 count = _read_uint32(f)
2166
2167 chunk_info.connection_counts[connection_id] = count
2168
2169 return chunk_info
2170 else:
2171 raise ROSBagFormatException('Unknown chunk info record version: %d' % chunk_info_version)
2172
2174 header = _read_header(self.bag._file, _OP_CHUNK)
2175
2176 compression = _read_str_field (header, 'compression')
2177 uncompressed_size = _read_uint32_field(header, 'size')
2178
2179 compressed_size = _read_uint32(self.bag._file)
2180
2181 data_pos = self.bag._file.tell()
2182
2183 return _ChunkHeader(compression, compressed_size, uncompressed_size, data_pos)
2184
2186 f = self.bag._file
2187
2188 header = _read_header(f, _OP_INDEX_DATA)
2189
2190 index_version = _read_uint32_field(header, 'ver')
2191 connection_id = _read_uint32_field(header, 'conn')
2192 count = _read_uint32_field(header, 'count')
2193
2194 if index_version != 1:
2195 raise ROSBagFormatException('expecting index version 1, got %d' % index_version)
2196
2197 record_size = _read_uint32(f)
2198
2199 index = []
2200
2201 for i in range(count):
2202 time = _read_time (f)
2203 offset = _read_uint32(f)
2204
2205 index.append(_IndexEntry200(time, self.bag._curr_chunk_info.pos, offset))
2206
2207 return (connection_id, index)
2208
2210 chunk_pos, offset = position
2211
2212 chunk_header = self.bag._chunk_headers.get(chunk_pos)
2213 if chunk_header is None:
2214 raise ROSBagException('no chunk at position %d' % chunk_pos)
2215
2216 if chunk_header.compression == Compression.NONE:
2217 f = self.bag._file
2218 f.seek(chunk_header.data_pos + offset)
2219 else:
2220 if self.decompressed_chunk_pos != chunk_pos:
2221
2222 self.bag._file.seek(chunk_header.data_pos)
2223 compressed_chunk = _read(self.bag._file, chunk_header.compressed_size)
2224
2225 if chunk_header.compression == Compression.BZ2:
2226 self.decompressed_chunk = bz2.decompress(compressed_chunk)
2227 else:
2228 raise ROSBagException('unsupported compression type: %s' % chunk_header.compression)
2229
2230 self.decompressed_chunk_pos = chunk_pos
2231
2232 if self.decompressed_chunk_io:
2233 self.decompressed_chunk_io.close()
2234 self.decompressed_chunk_io = StringIO(self.decompressed_chunk)
2235
2236 f = self.decompressed_chunk_io
2237 f.seek(offset)
2238
2239
2240 while True:
2241 header = _read_header(f)
2242 op = _read_uint8_field(header, 'op')
2243 if op != _OP_CONNECTION:
2244 break
2245 _skip_sized(f)
2246
2247
2248 if op != _OP_MSG_DATA:
2249 raise ROSBagFormatException('Expecting OP_MSG_DATA, got %d' % op)
2250
2251 connection_id = _read_uint32_field(header, 'conn')
2252 t = _read_time_field (header, 'time')
2253
2254
2255 connection_info = self.bag._connections[connection_id]
2256 try:
2257 msg_type = _get_message_type(connection_info)
2258 except KeyError:
2259 raise ROSBagException('Cannot deserialize messages of type [%s]. Message was not preceded in bag by definition' % connection_info.datatype)
2260
2261
2262 data = _read_record_data(f)
2263
2264
2265 if raw:
2266 msg = connection_info.datatype, data, connection_info.md5sum, (chunk_pos, offset), msg_type
2267 else:
2268 msg = msg_type()
2269 msg.deserialize(data)
2270
2271 return (connection_info.topic, msg, t)
2272
2274 secs_frac = secs - int(secs)
2275 secs_frac_str = ('%.2f' % secs_frac)[1:]
2276
2277 return time.strftime('%b %d %Y %H:%M:%S', time.localtime(secs)) + secs_frac_str
2278
2280 multiple = 1024.0
2281 for suffix in ['KB', 'MB', 'GB', 'TB', 'PB', 'EB', 'ZB', 'YB']:
2282 size /= multiple
2283 if size < multiple:
2284 return '%.1f %s' % (size, suffix)
2285
2286 return '-'
2287
2289 multiple = 1000.0
2290 for suffix in ['Hz', 'kHz', 'MHz', 'GHz', 'THz', 'PHz', 'EHz', 'ZHz', 'YHz']:
2291 if freq < multiple:
2292 return '%.1f %s' % (freq, suffix)
2293 freq /= multiple
2294
2295 return '-'
2296
2299 """
2300 Perform an N-way merge operation on sorted lists.
2301
2302 @param list_of_lists: (really iterable of iterable) of sorted elements
2303 (either by naturally or by C{key})
2304 @param key: specify sort key function (like C{sort()}, C{sorted()})
2305 @param iterfun: function that returns an iterator.
2306
2307 Yields tuples of the form C{(item, iterator)}, where the iterator is the
2308 built-in list iterator or something you pass in, if you pre-generate the
2309 iterators.
2310
2311 This is a stable merge; complexity O(N lg N)
2312
2313 Examples::
2314
2315 print list(x[0] for x in mergesort([[1,2,3,4],
2316 [2,3.5,3.7,4.5,6,7],
2317 [2.6,3.6,6.6,9]]))
2318 [1, 2, 2, 2.6, 3, 3.5, 3.6, 3.7, 4, 4.5, 6, 6.6, 7, 9]
2319
2320 # note stability
2321 print list(x[0] for x in mergesort([[1,2,3,4],
2322 [2,3.5,3.7,4.5,6,7],
2323 [2.6,3.6,6.6,9]], key=int))
2324 [1, 2, 2, 2.6, 3, 3.5, 3.6, 3.7, 4, 4.5, 6, 6.6, 7, 9]
2325
2326 print list(x[0] for x in mergesort([[4,3,2,1],
2327 [7,6.5,4,3.7,3.3,1.9],
2328 [9,8.6,7.6,6.6,5.5,4.4,3.3]],
2329 key=lambda x: -x))
2330 [9, 8.6, 7.6, 7, 6.6, 6.5, 5.5, 4.4, 4, 4, 3.7, 3.3, 3.3, 3, 2, 1.9, 1]
2331 """
2332
2333 heap = []
2334 for i, itr in enumerate(iter(pl) for pl in list_of_lists):
2335 try:
2336 item = itr.next()
2337 toadd = (key(item), i, item, itr) if key else (item, i, itr)
2338 heap.append(toadd)
2339 except StopIteration:
2340 pass
2341 heapq.heapify(heap)
2342
2343 if key:
2344 while heap:
2345 _, idx, item, itr = heap[0]
2346 yield item, itr
2347 try:
2348 item = itr.next()
2349 heapq.heapreplace(heap, (key(item), idx, item, itr) )
2350 except StopIteration:
2351 heapq.heappop(heap)
2352
2353 else:
2354 while heap:
2355 item, idx, itr = heap[0]
2356 yield item, itr
2357 try:
2358 heapq.heapreplace(heap, (itr.next(), idx, itr))
2359 except StopIteration:
2360 heapq.heappop(heap)
2361
2363 """
2364 A file facade for the bz2.BZ2Compressor.
2365 """
2367 self.file = file
2368 self.compressor = bz2.BZ2Compressor()
2369 self.compressed_bytes_in = 0
2370
2372 compressed = self.compressor.compress(data)
2373 if len(compressed) > 0:
2374 self.file.write(compressed)
2375 self.compressed_bytes_in += len(data)
2376
2378 compressed = self.compressor.flush()
2379 if len(compressed) > 0:
2380 self.file.write(compressed)
2381
2395