1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33 """
34 The rosbag Python API.
35
36 Provides serialization of bag files.
37 """
38
39 from __future__ import print_function
40
41 import bisect
42 import bz2
43 import collections
44 import heapq
45 import os
46 import re
47 import struct
48 import sys
49 import threading
50 import time
51 import yaml
52
53 try:
54 from cStringIO import StringIO
55 except ImportError:
56 from io import BytesIO as StringIO
57
58 import genmsg
59 import genpy
60 import genpy.dynamic
61 import genpy.message
62
63 import roslib.names
64 import rospy
65 try:
66 import roslz4
67 found_lz4 = True
68 except ImportError:
69 rospy.logwarn(
70 'Failed to load Python extension for LZ4 support. '
71 'LZ4 compression will not be available.')
72 found_lz4 = False
75 """
76 Base class for exceptions in rosbag.
77 """
80
83
90
92 """
93 Exception for unindexed bags.
94 """
97
99 """
100 Allowable compression types
101 """
102 NONE = 'none'
103 BZ2 = 'bz2'
104 LZ4 = 'lz4'
105
106 BagMessage = collections.namedtuple('BagMessage', 'topic message timestamp')
107
108 -class Bag(object):
109 """
110 Bag serialize messages to and from a single file on disk using the bag format.
111 """
112 - def __init__(self, f, mode='r', compression=Compression.NONE, chunk_threshold=768 * 1024, allow_unindexed=False, options=None, skip_index=False):
113 """
114 Open a bag file. The mode can be 'r', 'w', or 'a' for reading (default),
115 writing or appending. The file will be created if it doesn't exist
116 when opened for writing or appending; it will be truncated when opened
117 for writing. Simultaneous reading and writing is allowed when in writing
118 or appending mode.
119 @param f: filename of bag to open or a stream to read from
120 @type f: str or file
121 @param mode: mode, either 'r', 'w', or 'a'
122 @type mode: str
123 @param compression: compression mode, see U{rosbag.Compression} for valid modes
124 @type compression: str
125 @param chunk_threshold: minimum number of uncompressed bytes per chunk
126 @type chunk_threshold: int
127 @param allow_unindexed: if True, allow opening unindexed bags
128 @type allow_unindexed: bool
129 @param options: the bag options (currently: compression and chunk_threshold)
130 @type options: dict
131 @param skip_index: if True, don't read the connection index records on open [2.0+]
132 @type skip_index: bool
133 @raise ValueError: if any argument is invalid
134 @raise ROSBagException: if an error occurs opening file
135 @raise ROSBagFormatException: if bag format is corrupted
136 """
137 if options is not None:
138 if type(options) is not dict:
139 raise ValueError('options must be of type dict')
140 if 'compression' in options:
141 compression = options['compression']
142 if 'chunk_threshold' in options:
143 chunk_threshold = options['chunk_threshold']
144
145 self._file = None
146 self._filename = None
147 self._version = None
148
149 allowed_compressions = [Compression.NONE, Compression.BZ2]
150 if found_lz4:
151 allowed_compressions.append(Compression.LZ4)
152 if compression not in allowed_compressions:
153 raise ValueError('compression must be one of: %s' % ', '.join(allowed_compressions))
154 self._compression = compression
155
156 if chunk_threshold < 0:
157 raise ValueError('chunk_threshold must be greater than or equal to zero')
158 self._chunk_threshold = chunk_threshold
159
160 self._skip_index = skip_index
161
162 self._reader = None
163
164 self._file_header_pos = None
165 self._index_data_pos = 0
166
167 self._clear_index()
168
169 self._buffer = StringIO()
170
171 self._curr_compression = Compression.NONE
172
173 self._open(f, mode, allow_unindexed)
174
175 self._output_file = self._file
176
179
182
183 - def __exit__(self, exc_type, exc_value, traceback):
185
186 @property
188 """Get the options."""
189 return { 'compression' : self._compression, 'chunk_threshold' : self._chunk_threshold }
190
191 @property
193 """Get the filename."""
194 return self._filename
195
196 @property
198 """Get the version."""
199 return self._version
200
201 @property
203 """Get the mode."""
204 return self._mode
205
206 @property
208 """Get the size in bytes."""
209 if not self._file:
210 raise ValueError('I/O operation on closed bag')
211
212 pos = self._file.tell()
213 self._file.seek(0, os.SEEK_END)
214 size = self._file.tell()
215 self._file.seek(pos)
216 return size
217
218
219
221 """Get the compression method to use for writing."""
222 return self._compression
223
234
235 compression = property(_get_compression, _set_compression)
236
237
238
240 """Get the chunk threshold to use for writing."""
241 return self._chunk_threshold
242
244 """Set the chunk threshold to use for writing."""
245 if chunk_threshold < 0:
246 raise ValueError('chunk_threshold must be greater than or equal to zero')
247
248 self.flush()
249 self._chunk_threshold = chunk_threshold
250
251 chunk_threshold = property(_get_chunk_threshold, _set_chunk_threshold)
252
253 - def read_messages(self, topics=None, start_time=None, end_time=None, connection_filter=None, raw=False):
254 """
255 Read messages from the bag, optionally filtered by topic, timestamp and connection details.
256 @param topics: list of topics or a single topic. if an empty list is given all topics will be read [optional]
257 @type topics: list(str) or str
258 @param start_time: earliest timestamp of message to return [optional]
259 @type start_time: U{genpy.Time}
260 @param end_time: latest timestamp of message to return [optional]
261 @type end_time: U{genpy.Time}
262 @param connection_filter: function to filter connections to include [optional]
263 @type connection_filter: function taking (topic, datatype, md5sum, msg_def, header) and returning bool
264 @param raw: if True, then generate tuples of (datatype, (data, md5sum, position), pytype)
265 @type raw: bool
266 @return: generator of BagMessage(topic, message, timestamp) namedtuples for each message in the bag file
267 @rtype: generator of tuples of (str, U{genpy.Message}, U{genpy.Time}) [not raw] or (str, (str, str, str, tuple, class), U{genpy.Time}) [raw]
268 """
269 self.flush()
270
271 if topics and type(topics) is str:
272 topics = [topics]
273
274 return self._reader.read_messages(topics, start_time, end_time, connection_filter, raw)
275
277 """
278 Write the open chunk to disk so subsequent reads will read all messages.
279 @raise ValueError: if bag is closed
280 """
281 if not self._file:
282 raise ValueError('I/O operation on closed bag')
283
284 if self._chunk_open:
285 self._stop_writing_chunk()
286
287 - def write(self, topic, msg, t=None, raw=False):
288 """
289 Write a message to the bag.
290 @param topic: name of topic
291 @type topic: str
292 @param msg: message to add to bag, or tuple (if raw)
293 @type msg: Message or tuple of raw message data
294 @param t: ROS time of message publication, if None specifed, use current time [optional]
295 @type t: U{genpy.Time}
296 @param raw: if True, msg is in raw format, i.e. (msg_type, serialized_bytes, md5sum, pytype)
297 @type raw: bool
298 @raise ValueError: if arguments are invalid or bag is closed
299 """
300 if not self._file:
301 raise ValueError('I/O operation on closed bag')
302
303 if not topic:
304 raise ValueError('topic is invalid')
305 if not msg:
306 raise ValueError('msg is invalid')
307
308 if t is None:
309 t = genpy.Time.from_sec(time.time())
310
311
312 self._file.seek(0, os.SEEK_END)
313
314
315 if not self._chunk_open:
316 self._start_writing_chunk(t)
317
318
319 if raw:
320 if len(msg) == 5:
321 msg_type, serialized_bytes, md5sum, pos, pytype = msg
322 elif len(msg) == 4:
323 msg_type, serialized_bytes, md5sum, pytype = msg
324 else:
325 raise ValueError('msg must be of length 4 or 5')
326
327
328 if topic in self._topic_connections:
329 connection_info = self._topic_connections[topic]
330 conn_id = connection_info.id
331 else:
332 conn_id = len(self._connections)
333
334 if raw:
335 if pytype is None:
336 try:
337 pytype = genpy.message.get_message_class(msg_type)
338 except Exception:
339 pytype = None
340 if pytype is None:
341 raise ROSBagException('cannot locate message class and no message class provided for [%s]' % msg_type)
342
343 if pytype._md5sum != md5sum:
344 print('WARNING: md5sum of loaded type [%s] does not match that specified' % msg_type, file=sys.stderr)
345
346
347 header = { 'topic' : topic, 'type' : msg_type, 'md5sum' : md5sum, 'message_definition' : pytype._full_text }
348 else:
349 header = { 'topic' : topic, 'type' : msg.__class__._type, 'md5sum' : msg.__class__._md5sum, 'message_definition' : msg._full_text }
350
351 connection_info = _ConnectionInfo(conn_id, topic, header)
352
353 self._write_connection_record(connection_info)
354
355 self._connections[conn_id] = connection_info
356 self._topic_connections[topic] = connection_info
357
358
359 index_entry = _IndexEntry200(t, self._curr_chunk_info.pos, self._get_chunk_offset())
360
361
362 if conn_id not in self._curr_chunk_connection_indexes:
363
364 self._curr_chunk_connection_indexes[conn_id] = [index_entry]
365 self._curr_chunk_info.connection_counts[conn_id] = 1
366 else:
367 curr_chunk_connection_index = self._curr_chunk_connection_indexes[conn_id]
368 if index_entry >= curr_chunk_connection_index[-1]:
369
370 curr_chunk_connection_index.append(index_entry)
371 else:
372 bisect.insort_right(curr_chunk_connection_index, index_entry)
373
374 self._curr_chunk_info.connection_counts[conn_id] += 1
375
376 if conn_id not in self._connection_indexes:
377 self._connection_indexes[conn_id] = [index_entry]
378 else:
379 bisect.insort_right(self._connection_indexes[conn_id], index_entry)
380
381
382 if t > self._curr_chunk_info.end_time:
383 self._curr_chunk_info.end_time = t
384 elif t < self._curr_chunk_info.start_time:
385 self._curr_chunk_info.start_time = t
386
387 if not raw:
388
389 self._buffer.seek(0)
390 self._buffer.truncate(0)
391 msg.serialize(self._buffer)
392 serialized_bytes = self._buffer.getvalue()
393
394
395 self._write_message_data_record(conn_id, t, serialized_bytes)
396
397
398 chunk_size = self._get_chunk_offset()
399 if chunk_size > self._chunk_threshold:
400 self._stop_writing_chunk()
401
403 """
404 Reindexes the bag file. Yields position of each chunk for progress.
405 """
406 self._clear_index()
407 return self._reader.reindex()
408
410 """
411 Close the bag file. Closing an already closed bag does nothing.
412 """
413 if self._file:
414 if self._mode in 'wa':
415 self._stop_writing()
416
417 self._close_file()
418
420 """
421 Returns information about the compression of the bag
422 @return: generator of CompressionTuple(compression, uncompressed, compressed) describing the
423 type of compression used, size of the uncompressed data in Bytes, size of the compressed data in Bytes. If
424 no compression is used then uncompressed and compressed data will be equal.
425 @rtype: generator of CompressionTuple of (str, int, int)
426 """
427
428 compression = self.compression
429 uncompressed = 0
430 compressed = 0
431
432 if self._chunk_headers:
433 compression_counts = {}
434 compression_uncompressed = {}
435 compression_compressed = {}
436
437
438
439 for chunk_header in self._chunk_headers.values():
440 if chunk_header.compression not in compression_counts:
441 compression_counts[chunk_header.compression] = 0
442 if chunk_header.compression not in compression_uncompressed:
443 compression_uncompressed[chunk_header.compression] = 0
444 if chunk_header.compression not in compression_compressed:
445 compression_compressed[chunk_header.compression] = 0
446
447 compression_counts[chunk_header.compression] += 1
448 compression_uncompressed[chunk_header.compression] += chunk_header.uncompressed_size
449 uncompressed += chunk_header.uncompressed_size
450 compression_compressed[chunk_header.compression] += chunk_header.compressed_size
451 compressed += chunk_header.compressed_size
452
453 chunk_count = len(self._chunk_headers)
454
455 main_compression_count, main_compression = sorted([(v, k) for k, v in compression_counts.items()], reverse=True)[0]
456 compression = str(main_compression)
457
458 return collections.namedtuple("CompressionTuple", ["compression",
459 "uncompressed", "compressed"])(compression=compression,
460 uncompressed=uncompressed, compressed=compressed)
461
463 """
464 Returns the number of messages in the bag. Can be filtered by Topic
465 @param topic_filters: One or more topics to filter by
466 @type topic_filters: Could be either a single str or a list of str.
467 @return: The number of messages in the bag, optionally filtered by topic
468 @rtype: int
469 """
470
471 num_messages = 0
472
473 if topic_filters is not None:
474 info = self.get_type_and_topic_info(topic_filters=topic_filters)
475 for topic in info.topics.values():
476 num_messages += topic.message_count
477 else:
478 if self._chunks:
479 for c in self._chunks:
480 for counts in c.connection_counts.values():
481 num_messages += counts
482 else:
483 num_messages = sum([len(index) for index in self._connection_indexes.values()])
484
485 return num_messages
486
488 """
489 Returns the start time of the bag.
490 @return: a timestamp of the start of the bag
491 @rtype: float, timestamp in seconds, includes fractions of a second
492 """
493
494 if self._chunks:
495 start_stamp = self._chunks[0].start_time.to_sec()
496 else:
497 if not self._connection_indexes:
498 raise ROSBagException('Bag contains no message')
499 start_stamp = min([index[0].time.to_sec() for index in self._connection_indexes.values()])
500
501 return start_stamp
502
504 """
505 Returns the end time of the bag.
506 @return: a timestamp of the end of the bag
507 @rtype: float, timestamp in seconds, includes fractions of a second
508 """
509
510 if self._chunks:
511 end_stamp = self._chunks[-1].end_time.to_sec()
512 else:
513 if not self._connection_indexes:
514 raise ROSBagException('Bag contains no message')
515 end_stamp = max([index[-1].time.to_sec() for index in self._connection_indexes.values()])
516
517 return end_stamp
518
520 """
521 Coallates info about the type and topics in the bag.
522
523 Note, it would be nice to filter by type as well, but there appear to be some limitations in the current architecture
524 that prevent that from working when more than one message type is written on the same topic.
525
526 @param topic_filters: specify one or more topic to filter by.
527 @type topic_filters: either a single str or a list of str.
528 @return: generator of TypesAndTopicsTuple(types{key:type name, val: md5hash},
529 topics{type: msg type (Ex. "std_msgs/String"),
530 message_count: the number of messages of the particular type,
531 connections: the number of connections,
532 frequency: the data frequency,
533 key: type name,
534 val: md5hash}) describing the types of messages present
535 and information about the topics
536 @rtype: TypesAndTopicsTuple(dict(str, str),
537 TopicTuple(str, int, int, float, str, str))
538 """
539
540 datatypes = set()
541 datatype_infos = []
542
543 for connection in self._connections.values():
544 if connection.datatype in datatypes:
545 continue
546
547 datatype_infos.append((connection.datatype, connection.md5sum, connection.msg_def))
548 datatypes.add(connection.datatype)
549
550 topics = []
551
552 if topic_filters is not None:
553 if not isinstance(topic_filters, list):
554 topic_filters = [topic_filters]
555
556 topics = topic_filters
557 else:
558 topics = [c.topic for c in self._get_connections()]
559
560 topics = sorted(set(topics))
561
562 topic_datatypes = {}
563 topic_conn_counts = {}
564 topic_msg_counts = {}
565 topic_freqs_median = {}
566
567 for topic in topics:
568 connections = list(self._get_connections(topic))
569
570 if not connections:
571 continue
572
573 topic_datatypes[topic] = connections[0].datatype
574 topic_conn_counts[topic] = len(connections)
575
576 msg_count = 0
577 for connection in connections:
578 for chunk in self._chunks:
579 msg_count += chunk.connection_counts.get(connection.id, 0)
580
581 topic_msg_counts[topic] = msg_count
582
583 if self._connection_indexes_read:
584 stamps = [entry.time.to_sec() for entry in self._get_entries(connections)]
585 if len(stamps) > 1:
586 periods = [s1 - s0 for s1, s0 in zip(stamps[1:], stamps[:-1])]
587 med_period = _median(periods)
588 if med_period > 0.0:
589 topic_freqs_median[topic] = 1.0 / med_period
590
591
592 types = {}
593 for datatype, md5sum, msg_def in sorted(datatype_infos):
594 types[datatype] = md5sum
595
596
597 topics_t = {}
598 TopicTuple = collections.namedtuple("TopicTuple", ["msg_type", "message_count", "connections", "frequency"])
599 for topic in sorted(topic_datatypes.keys()):
600 topic_msg_count = topic_msg_counts[topic]
601 frequency = topic_freqs_median[topic] if topic in topic_freqs_median else None
602 topics_t[topic] = TopicTuple(msg_type=topic_datatypes[topic],
603 message_count=topic_msg_count,
604 connections=topic_conn_counts[topic],
605 frequency=frequency)
606
607 return collections.namedtuple("TypesAndTopicsTuple", ["msg_types", "topics"])(msg_types=types, topics=topics_t)
608
610 rows = []
611
612 try:
613 if self._filename:
614 rows.append(('path', self._filename))
615
616 if self._version == 102 and type(self._reader) == _BagReader102_Unindexed:
617 rows.append(('version', '1.2 (unindexed)'))
618 else:
619 rows.append(('version', '%d.%d' % (int(self._version / 100), self._version % 100)))
620
621 if not self._connection_indexes and not self._chunks:
622 rows.append(('size', _human_readable_size(self.size)))
623 else:
624 if self._chunks:
625 start_stamp = self._chunks[ 0].start_time.to_sec()
626 end_stamp = self._chunks[-1].end_time.to_sec()
627 else:
628 start_stamp = min([index[ 0].time.to_sec() for index in self._connection_indexes.values()])
629 end_stamp = max([index[-1].time.to_sec() for index in self._connection_indexes.values()])
630
631
632 duration = end_stamp - start_stamp
633 dur_secs = duration % 60
634 dur_mins = int(duration / 60)
635 dur_hrs = int(dur_mins / 60)
636 if dur_hrs > 0:
637 dur_mins = dur_mins % 60
638 duration_str = '%dhr %d:%02ds (%ds)' % (dur_hrs, dur_mins, dur_secs, duration)
639 elif dur_mins > 0:
640 duration_str = '%d:%02ds (%ds)' % (dur_mins, dur_secs, duration)
641 else:
642 duration_str = '%.1fs' % duration
643
644 rows.append(('duration', duration_str))
645
646
647 rows.append(('start', '%s (%.2f)' % (_time_to_str(start_stamp), start_stamp)))
648 rows.append(('end', '%s (%.2f)' % (_time_to_str(end_stamp), end_stamp)))
649
650 rows.append(('size', _human_readable_size(self.size)))
651
652 if self._chunks:
653 num_messages = 0
654 for c in self._chunks:
655 for counts in c.connection_counts.values():
656 num_messages += counts
657 else:
658 num_messages = sum([len(index) for index in self._connection_indexes.values()])
659 rows.append(('messages', str(num_messages)))
660
661
662 if len(self._chunk_headers) == 0:
663 rows.append(('compression', 'none'))
664 else:
665 compression_counts = {}
666 compression_uncompressed = {}
667 compression_compressed = {}
668 for chunk_header in self._chunk_headers.values():
669 if chunk_header.compression not in compression_counts:
670 compression_counts[chunk_header.compression] = 1
671 compression_uncompressed[chunk_header.compression] = chunk_header.uncompressed_size
672 compression_compressed[chunk_header.compression] = chunk_header.compressed_size
673 else:
674 compression_counts[chunk_header.compression] += 1
675 compression_uncompressed[chunk_header.compression] += chunk_header.uncompressed_size
676 compression_compressed[chunk_header.compression] += chunk_header.compressed_size
677
678 chunk_count = len(self._chunk_headers)
679
680 compressions = []
681 for count, compression in reversed(sorted([(v, k) for k, v in compression_counts.items()])):
682 if compression != Compression.NONE:
683 fraction = (100.0 * compression_compressed[compression]) / compression_uncompressed[compression]
684 compressions.append('%s [%d/%d chunks; %.2f%%]' % (compression, count, chunk_count, fraction))
685 else:
686 compressions.append('%s [%d/%d chunks]' % (compression, count, chunk_count))
687 rows.append(('compression', ', '.join(compressions)))
688
689 all_uncompressed = (sum([count for c, count in compression_counts.items() if c != Compression.NONE]) == 0)
690 if not all_uncompressed:
691 total_uncompressed_size = sum((h.uncompressed_size for h in self._chunk_headers.values()))
692 total_compressed_size = sum((h.compressed_size for h in self._chunk_headers.values()))
693
694 total_uncompressed_size_str = _human_readable_size(total_uncompressed_size)
695 total_compressed_size_str = _human_readable_size(total_compressed_size)
696 total_size_str_length = max([len(total_uncompressed_size_str), len(total_compressed_size_str)])
697
698 if duration > 0:
699 uncompressed_rate_str = _human_readable_size(total_uncompressed_size / duration)
700 compressed_rate_str = _human_readable_size(total_compressed_size / duration)
701 rate_str_length = max([len(uncompressed_rate_str), len(compressed_rate_str)])
702
703 rows.append(('uncompressed', '%*s @ %*s/s' %
704 (total_size_str_length, total_uncompressed_size_str, rate_str_length, uncompressed_rate_str)))
705 rows.append(('compressed', '%*s @ %*s/s (%.2f%%)' %
706 (total_size_str_length, total_compressed_size_str, rate_str_length, compressed_rate_str, (100.0 * total_compressed_size) / total_uncompressed_size)))
707 else:
708 rows.append(('uncompressed', '%*s' % (total_size_str_length, total_uncompressed_size_str)))
709 rows.append(('compressed', '%*s' % (total_size_str_length, total_compressed_size_str)))
710
711 datatypes = set()
712 datatype_infos = []
713 for connection in self._connections.values():
714 if connection.datatype in datatypes:
715 continue
716 datatype_infos.append((connection.datatype, connection.md5sum, connection.msg_def))
717 datatypes.add(connection.datatype)
718
719 topics = sorted(set([c.topic for c in self._get_connections()]))
720 topic_datatypes = {}
721 topic_conn_counts = {}
722 topic_msg_counts = {}
723 topic_freqs_median = {}
724 for topic in topics:
725 connections = list(self._get_connections(topic))
726
727 topic_datatypes[topic] = connections[0].datatype
728 topic_conn_counts[topic] = len(connections)
729
730 msg_count = 0
731 for connection in connections:
732 for chunk in self._chunks:
733 msg_count += chunk.connection_counts.get(connection.id, 0)
734 topic_msg_counts[topic] = msg_count
735
736 if self._connection_indexes_read:
737 stamps = [entry.time.to_sec() for entry in self._get_entries(connections)]
738 if len(stamps) > 1:
739 periods = [s1 - s0 for s1, s0 in zip(stamps[1:], stamps[:-1])]
740 med_period = _median(periods)
741 if med_period > 0.0:
742 topic_freqs_median[topic] = 1.0 / med_period
743
744 topics = sorted(topic_datatypes.keys())
745 max_topic_len = max([len(topic) for topic in topics])
746 max_datatype_len = max([len(datatype) for datatype in datatypes])
747 max_msg_count_len = max([len('%d' % msg_count) for msg_count in topic_msg_counts.values()])
748 max_freq_median_len = max([len(_human_readable_frequency(freq)) for freq in topic_freqs_median.values()]) if len(topic_freqs_median) > 0 else 0
749
750
751 for i, (datatype, md5sum, msg_def) in enumerate(sorted(datatype_infos)):
752 s = '%-*s [%s]' % (max_datatype_len, datatype, md5sum)
753 if i == 0:
754 rows.append(('types', s))
755 else:
756 rows.append(('', s))
757
758
759 for i, topic in enumerate(topics):
760 topic_msg_count = topic_msg_counts[topic]
761
762 s = '%-*s %*d %s' % (max_topic_len, topic, max_msg_count_len, topic_msg_count, 'msgs' if topic_msg_count > 1 else 'msg ')
763 if topic in topic_freqs_median:
764 s += ' @ %*s' % (max_freq_median_len, _human_readable_frequency(topic_freqs_median[topic]))
765 else:
766 s += ' %*s' % (max_freq_median_len, '')
767
768 s += ' : %-*s' % (max_datatype_len, topic_datatypes[topic])
769 if topic_conn_counts[topic] > 1:
770 s += ' (%d connections)' % topic_conn_counts[topic]
771
772 if i == 0:
773 rows.append(('topics', s))
774 else:
775 rows.append(('', s))
776
777 except Exception as ex:
778 raise
779
780 first_column_width = max([len(field) for field, _ in rows]) + 1
781
782 s = ''
783 for (field, value) in rows:
784 if field:
785 s += '%-*s %s\n' % (first_column_width, field + ':', value)
786 else:
787 s += '%-*s %s\n' % (first_column_width, '', value)
788
789 return s.rstrip()
790
792 s = ''
793
794 try:
795 if self._filename:
796 s += 'path: %s\n' % self._filename
797
798 if self._version == 102 and type(self._reader) == _BagReader102_Unindexed:
799 s += 'version: 1.2 (unindexed)\n'
800 else:
801 s += 'version: %d.%d\n' % (int(self._version / 100), self._version % 100)
802
803 if not self._connection_indexes and not self._chunks:
804 s += 'size: %d\n' % self.size
805 s += 'indexed: False\n'
806 else:
807 if self._chunks:
808 start_stamp = self._chunks[ 0].start_time.to_sec()
809 end_stamp = self._chunks[-1].end_time.to_sec()
810 else:
811 start_stamp = min([index[ 0].time.to_sec() for index in self._connection_indexes.values()])
812 end_stamp = max([index[-1].time.to_sec() for index in self._connection_indexes.values()])
813
814 duration = end_stamp - start_stamp
815 s += 'duration: %.6f\n' % duration
816 s += 'start: %.6f\n' % start_stamp
817 s += 'end: %.6f\n' % end_stamp
818 s += 'size: %d\n' % self.size
819 if self._chunks:
820 num_messages = 0
821 for c in self._chunks:
822 for counts in c.connection_counts.values():
823 num_messages += counts
824 else:
825 num_messages = sum([len(index) for index in self._connection_indexes.values()])
826 s += 'messages: %d\n' % num_messages
827 s += 'indexed: True\n'
828
829
830 if len(self._chunk_headers) == 0:
831 s += 'compression: none\n'
832 else:
833 compression_counts = {}
834 compression_uncompressed = {}
835 compression_compressed = {}
836 for chunk_header in self._chunk_headers.values():
837 if chunk_header.compression not in compression_counts:
838 compression_counts[chunk_header.compression] = 1
839 compression_uncompressed[chunk_header.compression] = chunk_header.uncompressed_size
840 compression_compressed[chunk_header.compression] = chunk_header.compressed_size
841 else:
842 compression_counts[chunk_header.compression] += 1
843 compression_uncompressed[chunk_header.compression] += chunk_header.uncompressed_size
844 compression_compressed[chunk_header.compression] += chunk_header.compressed_size
845
846 chunk_count = len(self._chunk_headers)
847
848 main_compression_count, main_compression = list(reversed(sorted([(v, k) for k, v in compression_counts.items()])))[0]
849 s += 'compression: %s\n' % str(main_compression)
850
851 all_uncompressed = (sum([count for c, count in compression_counts.items() if c != Compression.NONE]) == 0)
852 if not all_uncompressed:
853 s += 'uncompressed: %d\n' % sum((h.uncompressed_size for h in self._chunk_headers.values()))
854 s += 'compressed: %d\n' % sum((h.compressed_size for h in self._chunk_headers.values()))
855
856 datatypes = set()
857 datatype_infos = []
858 for connection in self._connections.values():
859 if connection.datatype in datatypes:
860 continue
861 datatype_infos.append((connection.datatype, connection.md5sum, connection.msg_def))
862 datatypes.add(connection.datatype)
863
864 topics = sorted(set([c.topic for c in self._get_connections()]))
865 topic_datatypes = {}
866 topic_conn_counts = {}
867 topic_msg_counts = {}
868 topic_freqs_median = {}
869 for topic in topics:
870 connections = list(self._get_connections(topic))
871
872 topic_datatypes[topic] = connections[0].datatype
873 topic_conn_counts[topic] = len(connections)
874
875 msg_count = 0
876 for connection in connections:
877 for chunk in self._chunks:
878 msg_count += chunk.connection_counts.get(connection.id, 0)
879 topic_msg_counts[topic] = msg_count
880
881 if self._connection_indexes_read:
882 stamps = [entry.time.to_sec() for entry in self._get_entries(connections)]
883 if len(stamps) > 1:
884 periods = [s1 - s0 for s1, s0 in zip(stamps[1:], stamps[:-1])]
885 med_period = _median(periods)
886 if med_period > 0.0:
887 topic_freqs_median[topic] = 1.0 / med_period
888
889 topics = sorted(topic_datatypes.keys())
890 max_topic_len = max([len(topic) for topic in topics])
891 max_datatype_len = max([len(datatype) for datatype in datatypes])
892 max_msg_count_len = max([len('%d' % msg_count) for msg_count in topic_msg_counts.values()])
893 max_freq_median_len = max([len(_human_readable_frequency(freq)) for freq in topic_freqs_median.values()]) if len(topic_freqs_median) > 0 else 0
894
895
896 s += 'types:\n'
897 for i, (datatype, md5sum, msg_def) in enumerate(sorted(datatype_infos)):
898 s += ' - type: %s\n' % datatype
899 s += ' md5: %s\n' % md5sum
900
901
902 s += 'topics:\n'
903 for i, topic in enumerate(topics):
904 topic_msg_count = topic_msg_counts[topic]
905
906 s += ' - topic: %s\n' % topic
907 s += ' type: %s\n' % topic_datatypes[topic]
908 s += ' messages: %d\n' % topic_msg_count
909
910 if topic_conn_counts[topic] > 1:
911 s += ' connections: %d\n' % topic_conn_counts[topic]
912
913 if topic in topic_freqs_median:
914 s += ' frequency: %.4f\n' % topic_freqs_median[topic]
915
916 if not key:
917 return s
918
919 class DictObject(object):
920 def __init__(self, d):
921 for a, b in d.items():
922 if isinstance(b, (list, tuple)):
923 setattr(self, a, [DictObject(x) if isinstance(x, dict) else x for x in b])
924 else:
925 setattr(self, a, DictObject(b) if isinstance(b, dict) else b)
926
927 obj = DictObject(yaml.load(s))
928 try:
929 val = eval('obj.' + key)
930 except Exception as ex:
931 print('Error getting key "%s"' % key, file=sys.stderr)
932 return None
933
934 def print_yaml(val, indent=0):
935 indent_str = ' ' * indent
936
937 if type(val) is list:
938 s = ''
939 for item in val:
940 s += '%s- %s\n' % (indent_str, print_yaml(item, indent + 1))
941 return s
942 elif type(val) is DictObject:
943 s = ''
944 for i, (k, v) in enumerate(val.__dict__.items()):
945 if i != 0:
946 s += indent_str
947 s += '%s: %s' % (k, str(v))
948 if i < len(val.__dict__) - 1:
949 s += '\n'
950 return s
951 else:
952 return indent_str + str(val)
953
954 return print_yaml(val)
955
956 except Exception as ex:
957 raise
958
959
960
961 @property
963 if not self._chunk_headers:
964 return False
965
966 return any((h.compression != Compression.NONE for h in self._chunk_headers.values()))
967
968 @property
970 if not self._chunk_headers:
971 return self.size
972
973 return sum((h.uncompressed_size for h in self._chunk_headers.values()))
974
976 """
977 Read the message from the given position in the file.
978 """
979 self.flush()
980 return self._reader.seek_and_read_message_data_record(position, raw)
981
982
983
985 """
986 Yield the connections, optionally filtering by topic and/or connection information.
987 """
988 if topics:
989 if type(topics) is str:
990 topics = set([roslib.names.canonicalize_name(topics)])
991 else:
992 topics = set([roslib.names.canonicalize_name(t) for t in topics])
993
994 for c in self._connections.values():
995 if topics and c.topic not in topics and roslib.names.canonicalize_name(c.topic) not in topics:
996 continue
997 if connection_filter and not connection_filter(c.topic, c.datatype, c.md5sum, c.msg_def, c.header):
998 continue
999 yield c
1000
1001 - def _get_entries(self, connections=None, start_time=None, end_time=None):
1002 """
1003 Yield index entries on the given connections in the given time range.
1004 """
1005 for entry, _ in _mergesort(self._get_indexes(connections), key=lambda entry: entry.time):
1006 if start_time and entry.time < start_time:
1007 continue
1008 if end_time and entry.time > end_time:
1009 return
1010 yield entry
1011
1013 """
1014 Yield index entries on the given connections in the given time range in reverse order.
1015 """
1016 for entry, _ in _mergesort((reversed(index) for index in self._get_indexes(connections)), key=lambda entry: -entry.time.to_sec()):
1017 if end_time and entry.time > end_time:
1018 continue
1019 if start_time and entry.time < start_time:
1020 return
1021 yield entry
1022
1023 - def _get_entry(self, t, connections=None):
1024 """
1025 Return the first index entry on/before the given time on the given connections
1026 """
1027 indexes = self._get_indexes(connections)
1028
1029 entry = _IndexEntry(t)
1030
1031 first_entry = None
1032
1033 for index in indexes:
1034 i = bisect.bisect_right(index, entry) - 1
1035 if i >= 0:
1036 index_entry = index[i]
1037 if first_entry is None or index_entry > first_entry:
1038 first_entry = index_entry
1039
1040 return first_entry
1041
1042 - def _get_entry_after(self, t, connections=None):
1043 """
1044 Return the first index entry after the given time on the given connections
1045 """
1046 indexes = self._get_indexes(connections)
1047
1048 entry = _IndexEntry(t)
1049
1050 first_entry = None
1051
1052 for index in indexes:
1053 i = bisect.bisect_right(index, entry)
1054 if i <= len(index) - 1:
1055 index_entry = index[i]
1056 if first_entry is None or index_entry < first_entry:
1057 first_entry = index_entry
1058
1059 return first_entry
1060
1062 """
1063 Get the indexes for the given connections.
1064 """
1065 if not self._connection_indexes_read:
1066 self._reader._read_connection_index_records()
1067
1068 if connections is None:
1069 return self._connection_indexes.values()
1070 else:
1071 return (self._connection_indexes[c.id] for c in connections)
1072
1073
1074
1076 self._connection_indexes_read = False
1077 self._connection_indexes = {}
1078
1079 self._topic_connections = {}
1080 self._connections = {}
1081 self._connection_count = 0
1082 self._chunk_count = 0
1083 self._chunks = []
1084 self._chunk_headers = {}
1085
1086 self._chunk_open = False
1087 self._curr_chunk_info = None
1088 self._curr_chunk_data_pos = None
1089 self._curr_chunk_connection_indexes = {}
1090
1091 - def _open(self, f, mode, allow_unindexed):
1092 if not f:
1093 raise ValueError('filename (or stream) is invalid')
1094
1095 try:
1096 if mode == 'r': self._open_read(f, allow_unindexed)
1097 elif mode == 'w': self._open_write(f)
1098 elif mode == 'a': self._open_append(f, allow_unindexed)
1099 else:
1100 raise ValueError('mode "%s" is invalid' % mode)
1101 if 'b' not in self._file.mode and not isinstance('', bytes):
1102 raise ROSBagException('In Python 3 the bag file must be opened in binary mode')
1103 except struct.error:
1104 raise ROSBagFormatException('error with bag')
1105
1107 try:
1108 return isinstance(f, file)
1109 except NameError:
1110 import io
1111 return isinstance(f, io.IOBase)
1112
1114 if self._is_file(f):
1115 self._file = f
1116 self._filename = None
1117 else:
1118 self._file = open(f, 'rb')
1119 self._filename = f
1120
1121 self._mode = 'r'
1122
1123 try:
1124 self._version = self._read_version()
1125 except:
1126 self._version = None
1127 self._close_file()
1128 raise
1129
1130 try:
1131 self._create_reader()
1132 self._reader.start_reading()
1133 except ROSBagUnindexedException as ex:
1134 if not allow_unindexed:
1135 self._close_file()
1136 raise
1137 except:
1138 self._close_file()
1139 raise
1140
1142 if self._is_file(f):
1143 self._file = f
1144 self._filename = None
1145 else:
1146 self._file = open(f, 'w+b')
1147 self._filename = f
1148
1149 self._mode = 'w'
1150
1151 self._version = 200
1152
1153 try:
1154 self._create_reader()
1155 self._start_writing()
1156 except:
1157 self._close_file()
1158 raise
1159
1161 if self._is_file(f):
1162 self._file = f
1163 self._filename = None
1164 else:
1165 try:
1166
1167 open(f, 'r').close()
1168
1169
1170 self._file = open(f, 'r+b')
1171 except IOError:
1172
1173 self._file = open(f, 'w+b')
1174
1175 self._filename = f
1176
1177 self._mode = 'a'
1178
1179 try:
1180 self._version = self._read_version()
1181 except:
1182 self._version = None
1183 self._close_file()
1184 raise
1185
1186 if self._version != 200:
1187 self._file.close()
1188 raise ROSBagException('bag version %d is unsupported for appending' % self._version)
1189
1190 try:
1191 self._start_appending()
1192 except ROSBagUnindexedException:
1193 if not allow_unindexed:
1194 self._close_file()
1195 raise
1196 except:
1197 self._close_file()
1198 raise
1199
1201 self._file.close()
1202 self._file = None
1203
1205 """
1206 @raise ROSBagException: if the bag version is unsupported
1207 """
1208 major_version, minor_version = int(self._version / 100), self._version % 100
1209 if major_version == 2:
1210 self._reader = _BagReader200(self)
1211 elif major_version == 1:
1212 if minor_version == 1:
1213 raise ROSBagException('unsupported bag version %d. Please convert bag to newer format.' % self._version)
1214 else:
1215
1216 first_record_pos = self._file.tell()
1217 header = _read_header(self._file)
1218 op = _read_uint8_field(header, 'op')
1219 self._file.seek(first_record_pos)
1220
1221 if op == _OP_FILE_HEADER:
1222 self._reader = _BagReader102_Indexed(self)
1223 else:
1224 self._reader = _BagReader102_Unindexed(self)
1225 else:
1226 raise ROSBagException('unknown bag version %d' % self._version)
1227
1229 """
1230 @raise ROSBagException: if the file is empty, or the version line can't be parsed
1231 """
1232 version_line = self._file.readline().rstrip().decode()
1233 if len(version_line) == 0:
1234 raise ROSBagException('empty file')
1235
1236 matches = re.match("#ROS(.*) V(\d).(\d)", version_line)
1237 if matches is None or len(matches.groups()) != 3:
1238 raise ROSBagException('This does not appear to be a bag file')
1239
1240 version_type, major_version_str, minor_version_str = matches.groups()
1241
1242 version = int(major_version_str) * 100 + int(minor_version_str)
1243
1244 return version
1245
1247 version = _VERSION + '\n'
1248 version = version.encode()
1249 self._file.write(version)
1250 self._file_header_pos = self._file.tell()
1251 self._write_file_header_record(0, 0, 0)
1252
1254
1255 self.flush()
1256
1257
1258 self._index_data_pos = self._file.tell()
1259
1260
1261 for connection_info in self._connections.values():
1262 self._write_connection_record(connection_info)
1263
1264
1265 for chunk_info in self._chunks:
1266 self._write_chunk_info_record(chunk_info)
1267
1268
1269 self._file.seek(self._file_header_pos)
1270 self._write_file_header_record(self._index_data_pos, len(self._connections), len(self._chunks))
1271
1273 self._file_header_pos = self._file.tell()
1274
1275 self._create_reader()
1276 self._reader.start_reading()
1277
1278
1279 self._file.truncate(self._index_data_pos)
1280 self._reader.index_data_pos = 0
1281
1282
1283 self._file.seek(self._file_header_pos);
1284 self._write_file_header_record(0, 0, 0)
1285
1286
1287 self._file.seek(0, os.SEEK_END)
1288
1290 self._curr_chunk_info = _ChunkInfo(self._file.tell(), t, t)
1291 self._write_chunk_header(_ChunkHeader(self._compression, 0, 0))
1292 self._curr_chunk_data_pos = self._file.tell()
1293 self._set_compression_mode(self._compression)
1294 self._chunk_open = True
1295
1297 if self._compression == Compression.NONE:
1298 return self._file.tell() - self._curr_chunk_data_pos
1299 else:
1300 return self._output_file.compressed_bytes_in
1301
1303
1304 self._chunks.append(self._curr_chunk_info)
1305
1306
1307 uncompressed_size = self._get_chunk_offset()
1308 self._set_compression_mode(Compression.NONE)
1309 compressed_size = self._file.tell() - self._curr_chunk_data_pos
1310
1311
1312 end_of_chunk_pos = self._file.tell()
1313 self._file.seek(self._curr_chunk_info.pos)
1314 chunk_header = _ChunkHeader(self._compression, compressed_size, uncompressed_size, self._curr_chunk_data_pos)
1315 self._write_chunk_header(chunk_header)
1316 self._chunk_headers[self._curr_chunk_info.pos] = chunk_header
1317
1318
1319 self._file.seek(end_of_chunk_pos)
1320 for connection_id, entries in self._curr_chunk_connection_indexes.items():
1321 self._write_connection_index_record(connection_id, entries)
1322 self._curr_chunk_connection_indexes.clear()
1323
1324
1325 self._chunk_open = False
1326
1343
1345 header = {
1346 'op': _pack_uint8(_OP_FILE_HEADER),
1347 'index_pos': _pack_uint64(index_pos),
1348 'conn_count': _pack_uint32(connection_count),
1349 'chunk_count': _pack_uint32(chunk_count)
1350 }
1351 _write_record(self._file, header, padded_size=_FILE_HEADER_LENGTH)
1352
1354 header = {
1355 'op': _pack_uint8(_OP_CONNECTION),
1356 'topic': connection_info.topic,
1357 'conn': _pack_uint32(connection_info.id)
1358 }
1359
1360 _write_header(self._output_file, header)
1361
1362 _write_header(self._output_file, connection_info.header)
1363
1365 header = {
1366 'op': _pack_uint8(_OP_MSG_DATA),
1367 'conn': _pack_uint32(connection_id),
1368 'time': _pack_time(t)
1369 }
1370 _write_record(self._output_file, header, serialized_bytes)
1371
1373 header = {
1374 'op': _pack_uint8(_OP_CHUNK),
1375 'compression': chunk_header.compression,
1376 'size': _pack_uint32(chunk_header.uncompressed_size)
1377 }
1378 _write_header(self._file, header)
1379
1380 self._file.write(_pack_uint32(chunk_header.compressed_size))
1381
1383 header = {
1384 'op': _pack_uint8(_OP_INDEX_DATA),
1385 'conn': _pack_uint32(connection_id),
1386 'ver': _pack_uint32(_INDEX_VERSION),
1387 'count': _pack_uint32(len(entries))
1388 }
1389
1390 buffer = self._buffer
1391 buffer.seek(0)
1392 buffer.truncate(0)
1393 for entry in entries:
1394 buffer.write(_pack_time (entry.time))
1395 buffer.write(_pack_uint32(entry.offset))
1396
1397 _write_record(self._file, header, buffer.getvalue())
1398
1400 header = {
1401 'op': _pack_uint8 (_OP_CHUNK_INFO),
1402 'ver': _pack_uint32(_CHUNK_INDEX_VERSION),
1403 'chunk_pos': _pack_uint64(chunk_info.pos),
1404 'start_time': _pack_time(chunk_info.start_time),
1405 'end_time': _pack_time(chunk_info.end_time),
1406 'count': _pack_uint32(len(chunk_info.connection_counts))
1407 }
1408
1409 buffer = self._buffer
1410 buffer.seek(0)
1411 buffer.truncate(0)
1412 for connection_id, count in chunk_info.connection_counts.items():
1413 buffer.write(_pack_uint32(connection_id))
1414 buffer.write(_pack_uint32(count))
1415
1416 _write_record(self._file, header, buffer.getvalue())
1417
1418
1419
1420 _message_types = {}
1421
1422 _OP_MSG_DEF = 0x01
1423 _OP_MSG_DATA = 0x02
1424 _OP_FILE_HEADER = 0x03
1425 _OP_INDEX_DATA = 0x04
1426 _OP_CHUNK = 0x05
1427 _OP_CHUNK_INFO = 0x06
1428 _OP_CONNECTION = 0x07
1429
1430 _OP_CODES = {
1431 _OP_MSG_DEF: 'MSG_DEF',
1432 _OP_MSG_DATA: 'MSG_DATA',
1433 _OP_FILE_HEADER: 'FILE_HEADER',
1434 _OP_INDEX_DATA: 'INDEX_DATA',
1435 _OP_CHUNK: 'CHUNK',
1436 _OP_CHUNK_INFO: 'CHUNK_INFO',
1437 _OP_CONNECTION: 'CONNECTION'
1438 }
1439
1440 _VERSION = '#ROSBAG V2.0'
1441 _FILE_HEADER_LENGTH = 4096
1442 _INDEX_VERSION = 1
1443 _CHUNK_INDEX_VERSION = 1
1446 - def __init__(self, id, topic, header):
1447 try:
1448 datatype = _read_str_field(header, 'type')
1449 md5sum = _read_str_field(header, 'md5sum')
1450 msg_def = _read_str_field(header, 'message_definition')
1451 except KeyError as ex:
1452 raise ROSBagFormatException('connection header field %s not found' % str(ex))
1453
1454 self.id = id
1455 self.topic = topic
1456 self.datatype = datatype
1457 self.md5sum = md5sum
1458 self.msg_def = msg_def
1459 self.header = header
1460
1462 return '%d on %s: %s' % (self.id, self.topic, str(self.header))
1463
1465 - def __init__(self, pos, start_time, end_time):
1466 self.pos = pos
1467 self.start_time = start_time
1468 self.end_time = end_time
1469
1470 self.connection_counts = {}
1471
1473 s = 'chunk_pos: %d\n' % self.pos
1474 s += 'start_time: %s\n' % str(self.start_time)
1475 s += 'end_time: %s\n' % str(self.end_time)
1476 s += 'connections: %d\n' % len(self.connection_counts)
1477 s += '\n'.join([' - [%4d] %d' % (connection_id, count) for connection_id, count in self.connection_counts.items()])
1478 return s
1479
1482 self.compression = compression
1483 self.compressed_size = compressed_size
1484 self.uncompressed_size = uncompressed_size
1485 self.data_pos = data_pos
1486
1488 if self.uncompressed_size > 0:
1489 ratio = 100 * (float(self.compressed_size) / self.uncompressed_size)
1490 return 'compression: %s, size: %d, uncompressed: %d (%.2f%%)' % (self.compression, self.compressed_size, self.uncompressed_size, ratio)
1491 else:
1492 return 'compression: %s, size: %d, uncompressed: %d' % (self.compression, self.compressed_size, self.uncompressed_size)
1493
1495 __slots__ = []
1496
1498 try:
1499 return method(self._cmpkey(), other._cmpkey())
1500 except (AttributeError, TypeError):
1501
1502
1503 return NotImplemented
1504
1506 return self._compare(other, lambda s, o: s < o)
1507
1509 return self._compare(other, lambda s, o: s <= o)
1510
1512 return self._compare(other, lambda s, o: s == o)
1513
1515 return self._compare(other, lambda s, o: s >= o)
1516
1518 return self._compare(other, lambda s, o: s > o)
1519
1521 return self._compare(other, lambda s, o: s != o)
1522
1523 -class _IndexEntry(ComparableMixin):
1524 __slots__ = ['time']
1525
1526 - def __init__(self, time):
1528
1529 - def _cmpkey(self):
1531
1532 -class _IndexEntry102(_IndexEntry):
1533 __slots__ = ['offset']
1534
1535 - def __init__(self, time, offset):
1536 self.time = time
1537 self.offset = offset
1538
1539 @property
1540 - def position(self):
1542
1543 - def __str__(self):
1544 return '%d.%d: %d' % (self.time.secs, self.time.nsecs, self.offset)
1545
1546 -class _IndexEntry200(_IndexEntry):
1547 __slots__ = ['chunk_pos', 'offset']
1548
1549 - def __init__(self, time, chunk_pos, offset):
1550 self.time = time
1551 self.chunk_pos = chunk_pos
1552 self.offset = offset
1553
1554 @property
1555 - def position(self):
1556 return (self.chunk_pos, self.offset)
1557
1558 - def __str__(self):
1559 return '%d.%d: %d+%d' % (self.time.secs, self.time.nsecs, self.chunk_pos, self.offset)
1560
1562 message_type = _message_types.get(info.md5sum)
1563 if message_type is None:
1564 try:
1565 message_type = genpy.dynamic.generate_dynamic(info.datatype, info.msg_def)[info.datatype]
1566 if (message_type._md5sum != info.md5sum):
1567 print('WARNING: For type [%s] stored md5sum [%s] does not match message definition [%s].\n Try: "rosrun rosbag fix_msg_defs.py old_bag new_bag."'%(info.datatype, info.md5sum, message_type._md5sum), file=sys.stderr)
1568 except genmsg.InvalidMsgSpec:
1569 message_type = genpy.dynamic.generate_dynamic(info.datatype, "")[info.datatype]
1570 print('WARNING: For type [%s] stored md5sum [%s] has invalid message definition."'%(info.datatype, info.md5sum), file=sys.stderr)
1571 except genmsg.MsgGenerationException as ex:
1572 raise ROSBagException('Error generating datatype %s: %s' % (info.datatype, str(ex)))
1573
1574 _message_types[info.md5sum] = message_type
1575
1576 return message_type
1577
1581 -def _read_time (f): return _unpack_time (f.read(8))
1582
1586 -def _unpack_time(v): return rospy.Time(*struct.unpack('<LL', v))
1587
1591 -def _pack_time(v): return _pack_uint32(v.secs) + _pack_uint32(v.nsecs)
1592
1594 data = f.read(size)
1595 if len(data) != size:
1596 raise ROSBagException('expecting %d bytes, read %d' % (size, len(data)))
1597 return data
1598
1600 _skip_sized(f)
1601 _skip_sized(f)
1602
1604 size = _read_uint32(f)
1605 f.seek(size, os.SEEK_CUR)
1606
1608 try:
1609 size = _read_uint32(f)
1610 except struct.error as ex:
1611 raise ROSBagFormatException('error unpacking uint32: %s' % str(ex))
1612 return _read(f, size)
1613
1615 if not isinstance(v, bytes):
1616 v = v.encode()
1617 f.write(_pack_uint32(len(v)))
1618 f.write(v)
1619
1621 if field not in header:
1622 raise ROSBagFormatException('expected "%s" field in record' % field)
1623
1624 try:
1625 value = unpack_fn(header[field])
1626 except Exception as ex:
1627 raise ROSBagFormatException('error reading field "%s": %s' % (field, str(ex)))
1628
1629 return value
1630
1632 return _read_field(header, field, lambda v: v)
1633 -def _read_uint8_field (header, field): return _read_field(header, field, _unpack_uint8)
1636 -def _read_time_field (header, field): return _read_field(header, field, _unpack_time)
1637
1639 header_str = _write_header(f, header)
1640
1641 if padded_size is not None:
1642 header_len = len(header_str)
1643 if header_len < padded_size:
1644 data = ' ' * (padded_size - header_len)
1645 else:
1646 data = ''
1647
1648 _write_sized(f, data)
1649
1651 header_str = b''
1652 equal = b'='
1653 for k, v in header.items():
1654 if not isinstance(k, bytes):
1655 k = k.encode()
1656 if not isinstance(v, bytes):
1657 v = v.encode()
1658 header_str += _pack_uint32(len(k) + 1 + len(v)) + k + equal + v
1659 _write_sized(f, header_str)
1660 return header_str
1661
1663 bag_pos = f.tell()
1664
1665
1666 try:
1667 header = _read_sized(f)
1668 except ROSBagException as ex:
1669 raise ROSBagFormatException('Error reading header: %s' % str(ex))
1670
1671
1672 header_dict = {}
1673 while header != b'':
1674
1675 if len(header) < 4:
1676 raise ROSBagFormatException('Error reading header field')
1677 (size,) = struct.unpack('<L', header[:4])
1678 header = header[4:]
1679
1680
1681 if len(header) < size:
1682 raise ROSBagFormatException('Error reading header field: expected %d bytes, read %d' % (size, len(header)))
1683 (name, sep, value) = header[:size].partition(b'=')
1684 if sep == b'':
1685 raise ROSBagFormatException('Error reading header field')
1686
1687 name = name.decode()
1688 header_dict[name] = value
1689
1690 header = header[size:]
1691
1692
1693 if req_op is not None:
1694 op = _read_uint8_field(header_dict, 'op')
1695 if req_op != op:
1696 raise ROSBagFormatException('Expected op code: %s, got %s' % (_OP_CODES[req_op], _OP_CODES[op]))
1697
1698 return header_dict
1699
1701 pos = f.tell()
1702 header = _read_header(f)
1703 op = _read_uint8_field(header, 'op')
1704 f.seek(pos)
1705 return op
1706
1708 try:
1709 record_data = _read_sized(f)
1710 except ROSBagException as ex:
1711 raise ROSBagFormatException('Error reading record data: %s' % str(ex))
1712
1713 return record_data
1714
1718
1720 raise NotImplementedError()
1721
1722 - def read_messages(self, topics, start_time, end_time, connection_filter, raw):
1723 raise NotImplementedError()
1724
1726 raise NotImplementedError()
1727
1729 """
1730 Support class for reading unindexed v1.2 bag files.
1731 """
1734
1736 self.bag._file_header_pos = self.bag._file.tell()
1737
1739 """Generates all bag index information by rereading the message records."""
1740 f = self.bag._file
1741
1742 total_bytes = self.bag.size
1743
1744
1745 self.bag._file.seek(self.bag._file_header_pos)
1746
1747 offset = f.tell()
1748
1749
1750 while offset < total_bytes:
1751 yield offset
1752
1753 op = _peek_next_header_op(f)
1754
1755 if op == _OP_MSG_DEF:
1756 connection_info = self.read_message_definition_record()
1757
1758 if connection_info.topic not in self.bag._topic_connections:
1759 self.bag._topic_connections[connection_info.topic] = connection_info.id
1760 self.bag._connections[connection_info.id] = connection_info
1761 self.bag._connection_indexes[connection_info.id] = []
1762
1763 elif op == _OP_MSG_DATA:
1764
1765 header = _read_header(f)
1766
1767 topic = _read_str_field(header, 'topic')
1768 secs = _read_uint32_field(header, 'sec')
1769 nsecs = _read_uint32_field(header, 'nsec')
1770 t = genpy.Time(secs, nsecs)
1771
1772 if topic not in self.bag._topic_connections:
1773 datatype = _read_str_field(header, 'type')
1774 self._create_connection_info_for_datatype(topic, datatype)
1775
1776 connection_id = self.bag._topic_connections[topic]
1777 info = self.bag._connections[connection_id]
1778
1779
1780 _skip_sized(f)
1781
1782
1783 bisect.insort_right(self.bag._connection_indexes[connection_id], _IndexEntry102(t, offset))
1784
1785 offset = f.tell()
1786
1787 - def read_messages(self, topics, start_time, end_time, topic_filter, raw):
1788 f = self.bag._file
1789
1790 f.seek(self.bag._file_header_pos)
1791
1792 while True:
1793
1794 while True:
1795 position = f.tell()
1796
1797 try:
1798 header = _read_header(f)
1799 except Exception:
1800 return
1801
1802 op = _read_uint8_field(header, 'op')
1803 if op != _OP_MSG_DEF:
1804 break
1805
1806 connection_info = self.read_message_definition_record(header)
1807
1808 if connection_info.topic not in self.bag._topic_connections:
1809 self.bag._topic_connections[connection_info.topic] = connection_info.id
1810
1811 self.bag._connections[connection_info.id] = connection_info
1812
1813
1814 if op != _OP_MSG_DATA:
1815 raise ROSBagFormatException('Expecting OP_MSG_DATA, got %d' % op)
1816
1817 topic = _read_str_field(header, 'topic')
1818
1819 if topic not in self.bag._topic_connections:
1820 datatype = _read_str_field(header, 'type')
1821 self._create_connection_info_for_datatype(topic, datatype)
1822
1823 connection_id = self.bag._topic_connections[topic]
1824 info = self.bag._connections[connection_id]
1825
1826
1827 try:
1828 msg_type = _get_message_type(info)
1829 except KeyError:
1830 raise ROSBagException('Cannot deserialize messages of type [%s]. Message was not preceeded in bagfile by definition' % info.datatype)
1831
1832
1833 secs = _read_uint32_field(header, 'sec')
1834 nsecs = _read_uint32_field(header, 'nsec')
1835 t = genpy.Time(secs, nsecs)
1836
1837
1838 data = _read_record_data(f)
1839
1840 if raw:
1841 msg = (info.datatype, data, info.md5sum, position, msg_type)
1842 else:
1843
1844 msg = msg_type()
1845 msg.deserialize(data)
1846
1847 yield BagMessage(topic, msg, t)
1848
1849 self.bag._connection_indexes_read = True
1850
1852 for c in self.bag._connections.values():
1853 if c.datatype == datatype:
1854 connection_id = len(self.bag._connections)
1855 connection_header = { 'topic' : topic, 'type' : c.header['type'], 'md5sum' : c.header['md5sum'], 'message_definition' : c.header['message_definition'] }
1856 connection_info = _ConnectionInfo(connection_id, topic, connection_header)
1857
1858 self.bag._topic_connections[topic] = connection_id
1859 self.bag._connections[connection_id] = connection_info
1860 self.bag._connection_indexes[connection_id] = []
1861 return
1862
1863 raise ROSBagFormatException('Topic %s of datatype %s not preceded by message definition' % (topic, datatype))
1864
1866 if not header:
1867 header = _read_header(self.bag._file, _OP_MSG_DEF)
1868
1869 topic = _read_str_field(header, 'topic')
1870 datatype = _read_str_field(header, 'type')
1871 md5sum = _read_str_field(header, 'md5')
1872 msg_def = _read_str_field(header, 'def')
1873
1874 _skip_sized(self.bag._file)
1875
1876 connection_header = { 'topic' : topic, 'type' : datatype, 'md5sum' : md5sum, 'message_definition' : msg_def }
1877
1878 id = len(self.bag._connections)
1879
1880 return _ConnectionInfo(id, topic, connection_header)
1881
1883 """
1884 Support class for reading indexed v1.2 bag files.
1885 """
1888
1889 - def read_messages(self, topics, start_time, end_time, connection_filter, raw):
1890 connections = self.bag._get_connections(topics, connection_filter)
1891 for entry in self.bag._get_entries(connections, start_time, end_time):
1892 yield self.seek_and_read_message_data_record(entry.offset, raw)
1893
1895 """Generates all bag index information by rereading the message records."""
1896 f = self.bag._file
1897
1898 total_bytes = self.bag.size
1899
1900
1901 self.bag._file.seek(self.bag._file_header_pos)
1902 self.read_file_header_record()
1903
1904 offset = f.tell()
1905
1906
1907 while offset < total_bytes:
1908 yield offset
1909
1910 op = _peek_next_header_op(f)
1911
1912 if op == _OP_MSG_DEF:
1913 connection_info = self.read_message_definition_record()
1914
1915 if connection_info.topic not in self.bag._topic_connections:
1916 self.bag._topic_connections[connection_info.topic] = connection_info.id
1917 self.bag._connections[connection_info.id] = connection_info
1918 self.bag._connection_indexes[connection_info.id] = []
1919
1920 elif op == _OP_MSG_DATA:
1921
1922 header = _read_header(f)
1923
1924 topic = _read_str_field(header, 'topic')
1925 secs = _read_uint32_field(header, 'sec')
1926 nsecs = _read_uint32_field(header, 'nsec')
1927 t = genpy.Time(secs, nsecs)
1928
1929 if topic not in self.bag._topic_connections:
1930 datatype = _read_str_field(header, 'type')
1931 self._create_connection_info_for_datatype(topic, datatype)
1932
1933 connection_id = self.bag._topic_connections[topic]
1934 info = self.bag._connections[connection_id]
1935
1936
1937 _skip_sized(f)
1938
1939
1940 bisect.insort_right(self.bag._connection_indexes[connection_id], _IndexEntry102(t, offset))
1941
1942 elif op == _OP_INDEX_DATA:
1943 _skip_record(f)
1944
1945 offset = f.tell()
1946
1948 try:
1949
1950 self.read_file_header_record()
1951
1952 total_bytes = self.bag.size
1953
1954
1955 if self.bag._index_data_pos == 0:
1956 raise ROSBagUnindexedException()
1957
1958
1959 self.bag._file.seek(self.bag._index_data_pos)
1960
1961
1962 topic_indexes = {}
1963 while True:
1964 pos = self.bag._file.tell()
1965 if pos >= total_bytes:
1966 break
1967
1968 topic, index = self.read_topic_index_record()
1969
1970 topic_indexes[topic] = index
1971
1972
1973 for topic, index in topic_indexes.items():
1974 self.bag._file.seek(index[0].offset)
1975
1976 connection_info = self.read_message_definition_record()
1977
1978 if connection_info.topic not in self.bag._topic_connections:
1979 self.bag._topic_connections[connection_info.topic] = connection_info.id
1980 self.bag._connections[connection_info.id] = connection_info
1981
1982 self.bag._connection_indexes[connection_info.id] = index
1983
1984 self.bag._connection_indexes_read = True
1985
1986 except Exception as ex:
1987 raise ROSBagUnindexedException()
1988
1990 self.bag._file_header_pos = self.bag._file.tell()
1991
1992 header = _read_header(self.bag._file, _OP_FILE_HEADER)
1993
1994 self.bag._index_data_pos = _read_uint64_field(header, 'index_pos')
1995
1996 _skip_sized(self.bag._file)
1997
1999 f = self.bag._file
2000
2001 header = _read_header(f, _OP_INDEX_DATA)
2002
2003 index_version = _read_uint32_field(header, 'ver')
2004 topic = _read_str_field (header, 'topic')
2005 count = _read_uint32_field(header, 'count')
2006
2007 if index_version != 0:
2008 raise ROSBagFormatException('expecting index version 0, got %d' % index_version)
2009
2010 _read_uint32(f)
2011
2012 topic_index = []
2013
2014 for i in range(count):
2015 time = _read_time (f)
2016 offset = _read_uint64(f)
2017
2018 topic_index.append(_IndexEntry102(time, offset))
2019
2020 return (topic, topic_index)
2021
2023 f = self.bag._file
2024
2025
2026 f.seek(position)
2027
2028
2029 while True:
2030 header = _read_header(f)
2031 op = _read_uint8_field(header, 'op')
2032 if op != _OP_MSG_DEF:
2033 break
2034 _skip_sized(f)
2035
2036
2037 if op != _OP_MSG_DATA:
2038 raise ROSBagFormatException('Expecting OP_MSG_DATA, got %d' % op)
2039
2040 topic = _read_str_field(header, 'topic')
2041
2042 connection_id = self.bag._topic_connections[topic]
2043 info = self.bag._connections[connection_id]
2044
2045
2046 try:
2047 msg_type = _get_message_type(info)
2048 except KeyError:
2049 raise ROSBagException('Cannot deserialize messages of type [%s]. Message was not preceeded in bagfile by definition' % info.datatype)
2050
2051
2052 secs = _read_uint32_field(header, 'sec')
2053 nsecs = _read_uint32_field(header, 'nsec')
2054 t = genpy.Time(secs, nsecs)
2055
2056
2057 data = _read_record_data(f)
2058
2059 if raw:
2060 msg = info.datatype, data, info.md5sum, position, msg_type
2061 else:
2062
2063 msg = msg_type()
2064 msg.deserialize(data)
2065
2066 return BagMessage(topic, msg, t)
2067
2069 """
2070 Support class for reading v2.0 bag files.
2071 """
2073 _BagReader.__init__(self, bag)
2074
2075 self.decompressed_chunk_pos = None
2076 self.decompressed_chunk = None
2077 self.decompressed_chunk_io = None
2078
2080 """
2081 Generates all bag index information by rereading the chunks.
2082 Assumes the file header has been read.
2083 """
2084 f = self.bag._file
2085
2086 f.seek(0, os.SEEK_END)
2087 total_bytes = f.tell()
2088
2089
2090
2091
2092 self._read_terminal_connection_records()
2093
2094
2095 self.bag._file.seek(self.bag._file_header_pos)
2096 self.read_file_header_record()
2097
2098 trunc_pos = None
2099
2100 while True:
2101 chunk_pos = f.tell()
2102 if chunk_pos >= total_bytes:
2103 break
2104
2105 yield chunk_pos
2106
2107 try:
2108 self._reindex_read_chunk(f, chunk_pos, total_bytes)
2109 except Exception as ex:
2110 break
2111
2112 trunc_pos = f.tell()
2113
2114 if trunc_pos and trunc_pos < total_bytes:
2115 f.truncate(trunc_pos)
2116
2118
2119 chunk_header = self.read_chunk_header()
2120
2121
2122 if chunk_header.compressed_size == 0:
2123 raise ROSBagException('unterminated chunk at %d' % chunk_pos)
2124
2125 if chunk_header.compression == Compression.NONE:
2126 chunk_file = f
2127 else:
2128
2129 compressed_chunk = _read(f, chunk_header.compressed_size)
2130
2131
2132 if chunk_header.compression == Compression.BZ2:
2133 self.decompressed_chunk = bz2.decompress(compressed_chunk)
2134 elif chunk_header.compression == Compression.LZ4 and found_lz4:
2135 self.decompressed_chunk = roslz4.decompress(compressed_chunk)
2136 else:
2137 raise ROSBagException('unsupported compression type: %s' % chunk_header.compression)
2138
2139 if self.decompressed_chunk_io:
2140 self.decompressed_chunk_io.close()
2141 self.decompressed_chunk_io = StringIO(self.decompressed_chunk)
2142
2143 chunk_file = self.decompressed_chunk_io
2144
2145
2146 self.bag._curr_chunk_info = None
2147
2148 if chunk_header.compression == Compression.NONE:
2149 offset = chunk_file.tell() - chunk_pos
2150 else:
2151 offset = chunk_file.tell()
2152
2153 expected_index_length = 0
2154
2155 while offset < chunk_header.uncompressed_size:
2156 op = _peek_next_header_op(chunk_file)
2157
2158 if op == _OP_CONNECTION:
2159 connection_info = self.read_connection_record(chunk_file)
2160
2161 if connection_info.id not in self.bag._connections:
2162 self.bag._connections[connection_info.id] = connection_info
2163 if connection_info.id not in self.bag._connection_indexes:
2164 self.bag._connection_indexes[connection_info.id] = []
2165
2166 elif op == _OP_MSG_DATA:
2167
2168 header = _read_header(chunk_file)
2169
2170 connection_id = _read_uint32_field(header, 'conn')
2171 t = _read_time_field (header, 'time')
2172
2173
2174 if not self.bag._curr_chunk_info:
2175 self.bag._curr_chunk_info = _ChunkInfo(chunk_pos, t, t)
2176 else:
2177 if t > self.bag._curr_chunk_info.end_time:
2178 self.bag._curr_chunk_info.end_time = t
2179 elif t < self.bag._curr_chunk_info.start_time:
2180 self.bag._curr_chunk_info.start_time = t
2181 if connection_id in self.bag._curr_chunk_info.connection_counts:
2182 self.bag._curr_chunk_info.connection_counts[connection_id] += 1
2183 else:
2184 self.bag._curr_chunk_info.connection_counts[connection_id] = 1
2185
2186
2187 _skip_sized(chunk_file)
2188
2189
2190 if connection_id not in self.bag._connection_indexes:
2191 raise ROSBagException('connection id (id=%d) in chunk at position %d not preceded by connection record' % (connection_id, chunk_pos))
2192 bisect.insort_right(self.bag._connection_indexes[connection_id], _IndexEntry200(t, chunk_pos, offset))
2193
2194 expected_index_length += 1
2195
2196 else:
2197
2198 _skip_record(chunk_file)
2199
2200 if chunk_header.compression == Compression.NONE:
2201 offset = chunk_file.tell() - chunk_pos
2202 else:
2203 offset = chunk_file.tell()
2204
2205
2206 next_op = _peek_next_header_op(f)
2207
2208 total_index_length = 0
2209
2210 while next_op != _OP_CHUNK:
2211 if next_op == _OP_INDEX_DATA:
2212
2213 _, index = self.read_connection_index_record()
2214 total_index_length += len(index)
2215 else:
2216 _skip_record(f)
2217
2218 if f.tell() >= total_bytes:
2219 if total_index_length != expected_index_length:
2220 raise ROSBagException('index shorter than expected (%d vs %d)' % (total_index_length, expected_index_length))
2221 break
2222
2223 next_op = _peek_next_header_op(f)
2224
2225
2226 self.bag._chunk_headers[chunk_pos] = chunk_header
2227 self.bag._chunks.append(self.bag._curr_chunk_info)
2228
2230 b, f, r = self.bag, self.bag._file, self.bag._reader
2231
2232
2233 f.seek(b._file_header_pos)
2234 r.read_file_header_record()
2235
2236
2237 if self._advance_to_next_record(_OP_CONNECTION):
2238
2239 while True:
2240 connection_info = r.read_connection_record(f)
2241
2242 b._connections[connection_info.id] = connection_info
2243 b._connection_indexes[connection_info.id] = []
2244
2245 next_op = _peek_next_header_op(f)
2246 if next_op != _OP_CONNECTION:
2247 break
2248
2250 b, f = self.bag, self.bag._file
2251
2252 try:
2253 while True:
2254 next_op = _peek_next_header_op(f)
2255 if next_op == op:
2256 break
2257
2258 if next_op == _OP_INDEX_DATA:
2259
2260
2261
2262 if b._curr_chunk_info is None:
2263 b._curr_chunk_info = _ChunkInfo(0, rospy.Time(0, 1), rospy.Time(0, 1))
2264
2265 b._reader.read_connection_index_record()
2266 else:
2267 _skip_record(f)
2268
2269 return True
2270
2271 except Exception as ex:
2272 return False
2273
2275 try:
2276
2277 self.read_file_header_record()
2278
2279
2280 if self.bag._index_data_pos == 0:
2281 raise ROSBagUnindexedException()
2282
2283
2284 self.bag._file.seek(self.bag._index_data_pos)
2285
2286
2287 self.bag._connection_indexes = {}
2288 for i in range(self.bag._connection_count):
2289 connection_info = self.read_connection_record(self.bag._file)
2290 self.bag._connections[connection_info.id] = connection_info
2291 self.bag._connection_indexes[connection_info.id] = []
2292
2293
2294 self.bag._chunks = [self.read_chunk_info_record() for i in range(self.bag._chunk_count)]
2295
2296
2297 self.bag._chunk_headers = {}
2298 for chunk_info in self.bag._chunks:
2299 self.bag._file.seek(chunk_info.pos)
2300 self.bag._chunk_headers[chunk_info.pos] = self.read_chunk_header()
2301
2302 if not self.bag._skip_index:
2303 self._read_connection_index_records()
2304
2305 except Exception as ex:
2306 raise ROSBagUnindexedException()
2307
2309 for chunk_info in self.bag._chunks:
2310 self.bag._file.seek(chunk_info.pos)
2311 _skip_record(self.bag._file)
2312
2313 self.bag._curr_chunk_info = chunk_info
2314 for i in range(len(chunk_info.connection_counts)):
2315 connection_id, index = self.read_connection_index_record()
2316 self.bag._connection_indexes[connection_id].extend(index)
2317
2318
2319
2320
2321 orphan_connection_ids = [id for id, index in self.bag._connection_indexes.items() if not index]
2322 for id in orphan_connection_ids:
2323 del self.bag._connections[id]
2324 del self.bag._connection_indexes[id]
2325
2326 self.bag._connection_indexes_read = True
2327
2328 - def read_messages(self, topics, start_time, end_time, connection_filter, raw):
2329 connections = self.bag._get_connections(topics, connection_filter)
2330 for entry in self.bag._get_entries(connections, start_time, end_time):
2331 yield self.seek_and_read_message_data_record((entry.chunk_pos, entry.offset), raw)
2332
2333
2334
2336 self.bag._file_header_pos = self.bag._file.tell()
2337
2338 header = _read_header(self.bag._file, _OP_FILE_HEADER)
2339
2340 self.bag._index_data_pos = _read_uint64_field(header, 'index_pos')
2341 self.bag._chunk_count = _read_uint32_field(header, 'chunk_count')
2342 self.bag._connection_count = _read_uint32_field(header, 'conn_count')
2343
2344 _skip_sized(self.bag._file)
2345
2347 header = _read_header(f, _OP_CONNECTION)
2348
2349 conn_id = _read_uint32_field(header, 'conn')
2350 topic = _read_str_field (header, 'topic')
2351
2352 connection_header = _read_header(f)
2353
2354 return _ConnectionInfo(conn_id, topic, connection_header)
2355
2357 f = self.bag._file
2358
2359 header = _read_header(f, _OP_CHUNK_INFO)
2360
2361 chunk_info_version = _read_uint32_field(header, 'ver')
2362
2363 if chunk_info_version == 1:
2364 chunk_pos = _read_uint64_field(header, 'chunk_pos')
2365 start_time = _read_time_field (header, 'start_time')
2366 end_time = _read_time_field (header, 'end_time')
2367 connection_count = _read_uint32_field(header, 'count')
2368
2369 chunk_info = _ChunkInfo(chunk_pos, start_time, end_time)
2370
2371 _read_uint32(f)
2372
2373 for i in range(connection_count):
2374 connection_id = _read_uint32(f)
2375 count = _read_uint32(f)
2376
2377 chunk_info.connection_counts[connection_id] = count
2378
2379 return chunk_info
2380 else:
2381 raise ROSBagFormatException('Unknown chunk info record version: %d' % chunk_info_version)
2382
2384 header = _read_header(self.bag._file, _OP_CHUNK)
2385
2386 compression = _read_str_field (header, 'compression')
2387 uncompressed_size = _read_uint32_field(header, 'size')
2388
2389 compressed_size = _read_uint32(self.bag._file)
2390
2391 data_pos = self.bag._file.tell()
2392
2393 return _ChunkHeader(compression, compressed_size, uncompressed_size, data_pos)
2394
2396 f = self.bag._file
2397
2398 header = _read_header(f, _OP_INDEX_DATA)
2399
2400 index_version = _read_uint32_field(header, 'ver')
2401 connection_id = _read_uint32_field(header, 'conn')
2402 count = _read_uint32_field(header, 'count')
2403
2404 if index_version != 1:
2405 raise ROSBagFormatException('expecting index version 1, got %d' % index_version)
2406
2407 record_size = _read_uint32(f)
2408
2409 index = []
2410
2411 for i in range(count):
2412 time = _read_time (f)
2413 offset = _read_uint32(f)
2414
2415 index.append(_IndexEntry200(time, self.bag._curr_chunk_info.pos, offset))
2416
2417 return (connection_id, index)
2418
2420 chunk_pos, offset = position
2421
2422 chunk_header = self.bag._chunk_headers.get(chunk_pos)
2423 if chunk_header is None:
2424 raise ROSBagException('no chunk at position %d' % chunk_pos)
2425
2426 if chunk_header.compression == Compression.NONE:
2427 f = self.bag._file
2428 f.seek(chunk_header.data_pos + offset)
2429 else:
2430 if self.decompressed_chunk_pos != chunk_pos:
2431
2432 self.bag._file.seek(chunk_header.data_pos)
2433 compressed_chunk = _read(self.bag._file, chunk_header.compressed_size)
2434
2435 if chunk_header.compression == Compression.BZ2:
2436 self.decompressed_chunk = bz2.decompress(compressed_chunk)
2437 elif chunk_header.compression == Compression.LZ4 and found_lz4:
2438 self.decompressed_chunk = roslz4.decompress(compressed_chunk)
2439 else:
2440 raise ROSBagException('unsupported compression type: %s' % chunk_header.compression)
2441
2442 self.decompressed_chunk_pos = chunk_pos
2443
2444 if self.decompressed_chunk_io:
2445 self.decompressed_chunk_io.close()
2446 self.decompressed_chunk_io = StringIO(self.decompressed_chunk)
2447
2448 f = self.decompressed_chunk_io
2449 f.seek(offset)
2450
2451
2452 while True:
2453 header = _read_header(f)
2454 op = _read_uint8_field(header, 'op')
2455 if op != _OP_CONNECTION:
2456 break
2457 _skip_sized(f)
2458
2459
2460 if op != _OP_MSG_DATA:
2461 raise ROSBagFormatException('Expecting OP_MSG_DATA, got %d' % op)
2462
2463 connection_id = _read_uint32_field(header, 'conn')
2464 t = _read_time_field (header, 'time')
2465
2466
2467 connection_info = self.bag._connections[connection_id]
2468 try:
2469 msg_type = _get_message_type(connection_info)
2470 except KeyError:
2471 raise ROSBagException('Cannot deserialize messages of type [%s]. Message was not preceded in bag by definition' % connection_info.datatype)
2472
2473
2474 data = _read_record_data(f)
2475
2476
2477 if raw:
2478 msg = connection_info.datatype, data, connection_info.md5sum, (chunk_pos, offset), msg_type
2479 else:
2480 msg = msg_type()
2481 msg.deserialize(data)
2482
2483 return BagMessage(connection_info.topic, msg, t)
2484
2486 secs_frac = secs - int(secs)
2487 secs_frac_str = ('%.2f' % secs_frac)[1:]
2488
2489 return time.strftime('%b %d %Y %H:%M:%S', time.localtime(secs)) + secs_frac_str
2490
2492 multiple = 1024.0
2493 for suffix in ['KB', 'MB', 'GB', 'TB', 'PB', 'EB', 'ZB', 'YB']:
2494 size /= multiple
2495 if size < multiple:
2496 return '%.1f %s' % (size, suffix)
2497
2498 return '-'
2499
2501 multiple = 1000.0
2502 for suffix in ['Hz', 'kHz', 'MHz', 'GHz', 'THz', 'PHz', 'EHz', 'ZHz', 'YHz']:
2503 if freq < multiple:
2504 return '%.1f %s' % (freq, suffix)
2505 freq /= multiple
2506
2507 return '-'
2508
2511 """
2512 Perform an N-way merge operation on sorted lists.
2513
2514 @param list_of_lists: (really iterable of iterable) of sorted elements
2515 (either by naturally or by C{key})
2516 @param key: specify sort key function (like C{sort()}, C{sorted()})
2517 @param iterfun: function that returns an iterator.
2518
2519 Yields tuples of the form C{(item, iterator)}, where the iterator is the
2520 built-in list iterator or something you pass in, if you pre-generate the
2521 iterators.
2522
2523 This is a stable merge; complexity O(N lg N)
2524
2525 Examples::
2526
2527 print list(x[0] for x in mergesort([[1,2,3,4],
2528 [2,3.5,3.7,4.5,6,7],
2529 [2.6,3.6,6.6,9]]))
2530 [1, 2, 2, 2.6, 3, 3.5, 3.6, 3.7, 4, 4.5, 6, 6.6, 7, 9]
2531
2532 # note stability
2533 print list(x[0] for x in mergesort([[1,2,3,4],
2534 [2,3.5,3.7,4.5,6,7],
2535 [2.6,3.6,6.6,9]], key=int))
2536 [1, 2, 2, 2.6, 3, 3.5, 3.6, 3.7, 4, 4.5, 6, 6.6, 7, 9]
2537
2538 print list(x[0] for x in mergesort([[4,3,2,1],
2539 [7,6.5,4,3.7,3.3,1.9],
2540 [9,8.6,7.6,6.6,5.5,4.4,3.3]],
2541 key=lambda x: -x))
2542 [9, 8.6, 7.6, 7, 6.6, 6.5, 5.5, 4.4, 4, 4, 3.7, 3.3, 3.3, 3, 2, 1.9, 1]
2543 """
2544
2545 heap = []
2546 for i, itr in enumerate(iter(pl) for pl in list_of_lists):
2547 try:
2548 item = next(itr)
2549 toadd = (key(item), i, item, itr) if key else (item, i, itr)
2550 heap.append(toadd)
2551 except StopIteration:
2552 pass
2553 heapq.heapify(heap)
2554
2555 if key:
2556 while heap:
2557 _, idx, item, itr = heap[0]
2558 yield item, itr
2559 try:
2560 item = next(itr)
2561 heapq.heapreplace(heap, (key(item), idx, item, itr) )
2562 except StopIteration:
2563 heapq.heappop(heap)
2564
2565 else:
2566 while heap:
2567 item, idx, itr = heap[0]
2568 yield item, itr
2569 try:
2570 heapq.heapreplace(heap, (next(itr), idx, itr))
2571 except StopIteration:
2572 heapq.heappop(heap)
2573
2575 """
2576 A file facade for sequential compressors (e.g., bz2.BZ2Compressor).
2577 """
2579 self.file = file
2580 self.compressor = compressor
2581 self.compressed_bytes_in = 0
2582
2584 compressed = self.compressor.compress(data)
2585 if len(compressed) > 0:
2586 self.file.write(compressed)
2587 self.compressed_bytes_in += len(data)
2588
2590 compressed = self.compressor.flush()
2591 if len(compressed) > 0:
2592 self.file.write(compressed)
2593
2607