1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33 """
34 The rosbag Python API.
35
36 Provides serialization of bag files.
37 """
38
39 from __future__ import print_function
40
41 import bisect
42 import bz2
43 import collections
44 import heapq
45 import os
46 import re
47 import struct
48 import sys
49 import threading
50 import time
51 import yaml
52
53 try:
54 from cStringIO import StringIO
55 except ImportError:
56 from io import BytesIO as StringIO
57
58 import genmsg
59 import genpy
60 import genpy.dynamic
61 import genpy.message
62
63 import roslib.names
64 import rospy
65 try:
66 import roslz4
67 found_lz4 = True
68 except ImportError:
69 rospy.logwarn(
70 'Failed to load Python extension for LZ4 support. '
71 'LZ4 compression will not be available.')
72 found_lz4 = False
75 """
76 Base class for exceptions in rosbag.
77 """
80
83
90
92 """
93 Exception for unindexed bags.
94 """
97
99 """
100 Allowable compression types
101 """
102 NONE = 'none'
103 BZ2 = 'bz2'
104 LZ4 = 'lz4'
105
106 BagMessage = collections.namedtuple('BagMessage', 'topic message timestamp')
107
108 -class Bag(object):
109 """
110 Bag serialize messages to and from a single file on disk using the bag format.
111 """
112 - def __init__(self, f, mode='r', compression=Compression.NONE, chunk_threshold=768 * 1024, allow_unindexed=False, options=None, skip_index=False):
113 """
114 Open a bag file. The mode can be 'r', 'w', or 'a' for reading (default),
115 writing or appending. The file will be created if it doesn't exist
116 when opened for writing or appending; it will be truncated when opened
117 for writing. Simultaneous reading and writing is allowed when in writing
118 or appending mode.
119 @param f: filename of bag to open or a stream to read from
120 @type f: str or file
121 @param mode: mode, either 'r', 'w', or 'a'
122 @type mode: str
123 @param compression: compression mode, see U{rosbag.Compression} for valid modes
124 @type compression: str
125 @param chunk_threshold: minimum number of uncompressed bytes per chunk
126 @type chunk_threshold: int
127 @param allow_unindexed: if True, allow opening unindexed bags
128 @type allow_unindexed: bool
129 @param options: the bag options (currently: compression and chunk_threshold)
130 @type options: dict
131 @param skip_index: if True, don't read the connection index records on open [2.0+]
132 @type skip_index: bool
133 @raise ValueError: if any argument is invalid
134 @raise ROSBagException: if an error occurs opening file
135 @raise ROSBagFormatException: if bag format is corrupted
136 """
137 if options is not None:
138 if type(options) is not dict:
139 raise ValueError('options must be of type dict')
140 if 'compression' in options:
141 compression = options['compression']
142 if 'chunk_threshold' in options:
143 chunk_threshold = options['chunk_threshold']
144
145 self._file = None
146 self._filename = None
147 self._version = None
148
149 allowed_compressions = [Compression.NONE, Compression.BZ2]
150 if found_lz4:
151 allowed_compressions.append(Compression.LZ4)
152 if compression not in allowed_compressions:
153 raise ValueError('compression must be one of: %s' % ', '.join(allowed_compressions))
154 self._compression = compression
155
156 if chunk_threshold < 0:
157 raise ValueError('chunk_threshold must be greater than or equal to zero')
158 self._chunk_threshold = chunk_threshold
159
160 self._skip_index = skip_index
161
162 self._reader = None
163
164 self._file_header_pos = None
165 self._index_data_pos = 0
166
167 self._clear_index()
168
169 self._buffer = StringIO()
170
171 self._curr_compression = Compression.NONE
172
173 self._open(f, mode, allow_unindexed)
174
175 self._output_file = self._file
176
179
182
183 - def __exit__(self, exc_type, exc_value, traceback):
185
186 @property
188 """Get the options."""
189 return { 'compression' : self._compression, 'chunk_threshold' : self._chunk_threshold }
190
191 @property
193 """Get the filename."""
194 return self._filename
195
196 @property
198 """Get the version."""
199 return self._version
200
201 @property
203 """Get the mode."""
204 return self._mode
205
206 @property
208 """Get the size in bytes."""
209 if not self._file:
210 raise ValueError('I/O operation on closed bag')
211
212 pos = self._file.tell()
213 self._file.seek(0, os.SEEK_END)
214 size = self._file.tell()
215 self._file.seek(pos)
216 return size
217
218
219
221 """Get the compression method to use for writing."""
222 return self._compression
223
234
235 compression = property(_get_compression, _set_compression)
236
237
238
240 """Get the chunk threshold to use for writing."""
241 return self._chunk_threshold
242
244 """Set the chunk threshold to use for writing."""
245 if chunk_threshold < 0:
246 raise ValueError('chunk_threshold must be greater than or equal to zero')
247
248 self.flush()
249 self._chunk_threshold = chunk_threshold
250
251 chunk_threshold = property(_get_chunk_threshold, _set_chunk_threshold)
252
253 - def read_messages(self, topics=None, start_time=None, end_time=None, connection_filter=None, raw=False):
254 """
255 Read messages from the bag, optionally filtered by topic, timestamp and connection details.
256 @param topics: list of topics or a single topic. if an empty list is given all topics will be read [optional]
257 @type topics: list(str) or str
258 @param start_time: earliest timestamp of message to return [optional]
259 @type start_time: U{genpy.Time}
260 @param end_time: latest timestamp of message to return [optional]
261 @type end_time: U{genpy.Time}
262 @param connection_filter: function to filter connections to include [optional]
263 @type connection_filter: function taking (topic, datatype, md5sum, msg_def, header) and returning bool
264 @param raw: if True, then generate tuples of (datatype, (data, md5sum, position), pytype)
265 @type raw: bool
266 @return: generator of BagMessage(topic, message, timestamp) namedtuples for each message in the bag file
267 @rtype: generator of tuples of (str, U{genpy.Message}, U{genpy.Time}) [not raw] or (str, (str, str, str, tuple, class), U{genpy.Time}) [raw]
268 """
269 self.flush()
270
271 if topics and type(topics) is str:
272 topics = [topics]
273
274 return self._reader.read_messages(topics, start_time, end_time, connection_filter, raw)
275
277 """
278 Write the open chunk to disk so subsequent reads will read all messages.
279 @raise ValueError: if bag is closed
280 """
281 if not self._file:
282 raise ValueError('I/O operation on closed bag')
283
284 if self._chunk_open:
285 self._stop_writing_chunk()
286
287 - def write(self, topic, msg, t=None, raw=False):
288 """
289 Write a message to the bag.
290 @param topic: name of topic
291 @type topic: str
292 @param msg: message to add to bag, or tuple (if raw)
293 @type msg: Message or tuple of raw message data
294 @param t: ROS time of message publication, if None specifed, use current time [optional]
295 @type t: U{genpy.Time}
296 @param raw: if True, msg is in raw format, i.e. (msg_type, serialized_bytes, md5sum, pytype)
297 @type raw: bool
298 @raise ValueError: if arguments are invalid or bag is closed
299 """
300 if not self._file:
301 raise ValueError('I/O operation on closed bag')
302
303 if not topic:
304 raise ValueError('topic is invalid')
305 if not msg:
306 raise ValueError('msg is invalid')
307
308 if t is None:
309 t = genpy.Time.from_sec(time.time())
310
311
312 self._file.seek(0, os.SEEK_END)
313
314
315 if not self._chunk_open:
316 self._start_writing_chunk(t)
317
318
319 if raw:
320 if len(msg) == 5:
321 msg_type, serialized_bytes, md5sum, pos, pytype = msg
322 elif len(msg) == 4:
323 msg_type, serialized_bytes, md5sum, pytype = msg
324 else:
325 raise ValueError('msg must be of length 4 or 5')
326
327
328 if topic in self._topic_connections:
329 connection_info = self._topic_connections[topic]
330 conn_id = connection_info.id
331 else:
332 conn_id = len(self._connections)
333
334 if raw:
335 if pytype is None:
336 try:
337 pytype = genpy.message.get_message_class(msg_type)
338 except Exception:
339 pytype = None
340 if pytype is None:
341 raise ROSBagException('cannot locate message class and no message class provided for [%s]' % msg_type)
342
343 if pytype._md5sum != md5sum:
344 print('WARNING: md5sum of loaded type [%s] does not match that specified' % msg_type, file=sys.stderr)
345
346
347 header = { 'topic' : topic, 'type' : msg_type, 'md5sum' : md5sum, 'message_definition' : pytype._full_text }
348 else:
349 header = { 'topic' : topic, 'type' : msg.__class__._type, 'md5sum' : msg.__class__._md5sum, 'message_definition' : msg._full_text }
350
351 connection_info = _ConnectionInfo(conn_id, topic, header)
352
353 self._write_connection_record(connection_info)
354
355 self._connections[conn_id] = connection_info
356 self._topic_connections[topic] = connection_info
357
358
359 index_entry = _IndexEntry200(t, self._curr_chunk_info.pos, self._get_chunk_offset())
360
361
362 if conn_id not in self._curr_chunk_connection_indexes:
363
364 self._curr_chunk_connection_indexes[conn_id] = [index_entry]
365 self._curr_chunk_info.connection_counts[conn_id] = 1
366 else:
367 curr_chunk_connection_index = self._curr_chunk_connection_indexes[conn_id]
368 if index_entry >= curr_chunk_connection_index[-1]:
369
370 curr_chunk_connection_index.append(index_entry)
371 else:
372 bisect.insort_right(curr_chunk_connection_index, index_entry)
373
374 self._curr_chunk_info.connection_counts[conn_id] += 1
375
376 if conn_id not in self._connection_indexes:
377 self._connection_indexes[conn_id] = [index_entry]
378 else:
379 bisect.insort_right(self._connection_indexes[conn_id], index_entry)
380
381
382 if t > self._curr_chunk_info.end_time:
383 self._curr_chunk_info.end_time = t
384 elif t < self._curr_chunk_info.start_time:
385 self._curr_chunk_info.start_time = t
386
387 if not raw:
388
389 self._buffer.seek(0)
390 self._buffer.truncate(0)
391 msg.serialize(self._buffer)
392 serialized_bytes = self._buffer.getvalue()
393
394
395 self._write_message_data_record(conn_id, t, serialized_bytes)
396
397
398 chunk_size = self._get_chunk_offset()
399 if chunk_size > self._chunk_threshold:
400 self._stop_writing_chunk()
401
403 """
404 Reindexes the bag file. Yields position of each chunk for progress.
405 """
406 self._clear_index()
407 return self._reader.reindex()
408
410 """
411 Close the bag file. Closing an already closed bag does nothing.
412 """
413 if self._file:
414 if self._mode in 'wa':
415 self._stop_writing()
416
417 self._close_file()
418
420 """
421 Returns information about the compression of the bag
422 @return: generator of CompressionTuple(compression, uncompressed, compressed) describing the
423 type of compression used, size of the uncompressed data in Bytes, size of the compressed data in Bytes. If
424 no compression is used then uncompressed and compressed data will be equal.
425 @rtype: generator of CompressionTuple of (str, int, int)
426 """
427
428 compression = self.compression
429 uncompressed = 0
430 compressed = 0
431
432 if self._chunk_headers:
433 compression_counts = {}
434 compression_uncompressed = {}
435 compression_compressed = {}
436
437
438
439 for chunk_header in self._chunk_headers.values():
440 if chunk_header.compression not in compression_counts:
441 compression_counts[chunk_header.compression] = 0
442 if chunk_header.compression not in compression_uncompressed:
443 compression_uncompressed[chunk_header.compression] = 0
444 if chunk_header.compression not in compression_compressed:
445 compression_compressed[chunk_header.compression] = 0
446
447 compression_counts[chunk_header.compression] += 1
448 compression_uncompressed[chunk_header.compression] += chunk_header.uncompressed_size
449 uncompressed += chunk_header.uncompressed_size
450 compression_compressed[chunk_header.compression] += chunk_header.compressed_size
451 compressed += chunk_header.compressed_size
452
453 chunk_count = len(self._chunk_headers)
454
455 main_compression_count, main_compression = sorted([(v, k) for k, v in compression_counts.items()], reverse=True)[0]
456 compression = str(main_compression)
457
458 return collections.namedtuple("CompressionTuple", ["compression",
459 "uncompressed", "compressed"])(compression=compression,
460 uncompressed=uncompressed, compressed=compressed)
461
463 """
464 Returns the number of messages in the bag. Can be filtered by Topic
465 @param topic_filters: One or more topics to filter by
466 @type topic_filters: Could be either a single str or a list of str.
467 @return: The number of messages in the bag, optionally filtered by topic
468 @rtype: int
469 """
470
471 num_messages = 0
472
473 if topic_filters is not None:
474 info = self.get_type_and_topic_info(topic_filters=topic_filters)
475 for topic in info.topics.values():
476 num_messages += topic.message_count
477 else:
478 if self._chunks:
479 for c in self._chunks:
480 for counts in c.connection_counts.values():
481 num_messages += counts
482 else:
483 num_messages = sum([len(index) for index in self._connection_indexes.values()])
484
485 return num_messages
486
488 """
489 Returns the start time of the bag.
490 @return: a timestamp of the start of the bag
491 @rtype: float, timestamp in seconds, includes fractions of a second
492 """
493
494 if self._chunks:
495 start_stamp = self._chunks[0].start_time.to_sec()
496 else:
497 if not self._connection_indexes:
498 raise ROSBagException('Bag contains no message')
499 start_stamps = [index[0].time.to_sec() for index in self._connection_indexes.values() if index]
500 start_stamp = min(start_stamps) if start_stamps else 0
501
502 return start_stamp
503
505 """
506 Returns the end time of the bag.
507 @return: a timestamp of the end of the bag
508 @rtype: float, timestamp in seconds, includes fractions of a second
509 """
510
511 if self._chunks:
512 end_stamp = self._chunks[-1].end_time.to_sec()
513 else:
514 if not self._connection_indexes:
515 raise ROSBagException('Bag contains no message')
516 end_stamps = [index[-1].time.to_sec() for index in self._connection_indexes.values() if index]
517 end_stamp = max(end_stamps) if end_stamps else 0
518
519 return end_stamp
520
522 """
523 Coallates info about the type and topics in the bag.
524
525 Note, it would be nice to filter by type as well, but there appear to be some limitations in the current architecture
526 that prevent that from working when more than one message type is written on the same topic.
527
528 @param topic_filters: specify one or more topic to filter by.
529 @type topic_filters: either a single str or a list of str.
530 @return: generator of TypesAndTopicsTuple(types{key:type name, val: md5hash},
531 topics{type: msg type (Ex. "std_msgs/String"),
532 message_count: the number of messages of the particular type,
533 connections: the number of connections,
534 frequency: the data frequency,
535 key: type name,
536 val: md5hash}) describing the types of messages present
537 and information about the topics
538 @rtype: TypesAndTopicsTuple(dict(str, str),
539 TopicTuple(str, int, int, float, str, str))
540 """
541
542 datatypes = set()
543 datatype_infos = []
544
545 for connection in self._connections.values():
546 if connection.datatype in datatypes:
547 continue
548
549 datatype_infos.append((connection.datatype, connection.md5sum, connection.msg_def))
550 datatypes.add(connection.datatype)
551
552 topics = []
553
554 if topic_filters is not None:
555 if not isinstance(topic_filters, list):
556 topic_filters = [topic_filters]
557
558 topics = topic_filters
559 else:
560 topics = [c.topic for c in self._get_connections()]
561
562 topics = sorted(set(topics))
563
564 topic_datatypes = {}
565 topic_conn_counts = {}
566 topic_msg_counts = {}
567 topic_freqs_median = {}
568
569 for topic in topics:
570 connections = list(self._get_connections(topic))
571
572 if not connections:
573 continue
574
575 topic_datatypes[topic] = connections[0].datatype
576 topic_conn_counts[topic] = len(connections)
577
578 msg_count = 0
579 for connection in connections:
580 for chunk in self._chunks:
581 msg_count += chunk.connection_counts.get(connection.id, 0)
582
583 topic_msg_counts[topic] = msg_count
584
585 if self._connection_indexes_read:
586 stamps = [entry.time.to_sec() for entry in self._get_entries(connections)]
587 if len(stamps) > 1:
588 periods = [s1 - s0 for s1, s0 in zip(stamps[1:], stamps[:-1])]
589 med_period = _median(periods)
590 if med_period > 0.0:
591 topic_freqs_median[topic] = 1.0 / med_period
592
593
594 types = {}
595 for datatype, md5sum, msg_def in sorted(datatype_infos):
596 types[datatype] = md5sum
597
598
599 topics_t = {}
600 TopicTuple = collections.namedtuple("TopicTuple", ["msg_type", "message_count", "connections", "frequency"])
601 for topic in sorted(topic_datatypes.keys()):
602 topic_msg_count = topic_msg_counts[topic]
603 frequency = topic_freqs_median[topic] if topic in topic_freqs_median else None
604 topics_t[topic] = TopicTuple(msg_type=topic_datatypes[topic],
605 message_count=topic_msg_count,
606 connections=topic_conn_counts[topic],
607 frequency=frequency)
608
609 return collections.namedtuple("TypesAndTopicsTuple", ["msg_types", "topics"])(msg_types=types, topics=topics_t)
610
612 rows = []
613
614 try:
615 if self._filename:
616 rows.append(('path', self._filename))
617
618 if self._version == 102 and type(self._reader) == _BagReader102_Unindexed:
619 rows.append(('version', '1.2 (unindexed)'))
620 else:
621 rows.append(('version', '%d.%d' % (int(self._version / 100), self._version % 100)))
622
623 if not self._connection_indexes and not self._chunks:
624 rows.append(('size', _human_readable_size(self.size)))
625 else:
626 if self._chunks:
627 start_stamp = self._chunks[ 0].start_time.to_sec()
628 end_stamp = self._chunks[-1].end_time.to_sec()
629 else:
630 start_stamps = [index[0].time.to_sec() for index in self._connection_indexes.values() if index]
631 start_stamp = min(start_stamps) if start_stamps else 0
632 end_stamps = [index[-1].time.to_sec() for index in self._connection_indexes.values() if index]
633 end_stamp = max(end_stamps) if end_stamps else 0
634
635
636 duration = end_stamp - start_stamp
637 dur_secs = duration % 60
638 dur_mins = int(duration / 60)
639 dur_hrs = int(dur_mins / 60)
640 if dur_hrs > 0:
641 dur_mins = dur_mins % 60
642 duration_str = '%dhr %d:%02ds (%ds)' % (dur_hrs, dur_mins, dur_secs, duration)
643 elif dur_mins > 0:
644 duration_str = '%d:%02ds (%ds)' % (dur_mins, dur_secs, duration)
645 else:
646 duration_str = '%.1fs' % duration
647
648 rows.append(('duration', duration_str))
649
650
651 rows.append(('start', '%s (%.2f)' % (_time_to_str(start_stamp), start_stamp)))
652 rows.append(('end', '%s (%.2f)' % (_time_to_str(end_stamp), end_stamp)))
653
654 rows.append(('size', _human_readable_size(self.size)))
655
656 if self._chunks:
657 num_messages = 0
658 for c in self._chunks:
659 for counts in c.connection_counts.values():
660 num_messages += counts
661 else:
662 num_messages = sum([len(index) for index in self._connection_indexes.values()])
663 rows.append(('messages', str(num_messages)))
664
665
666 if len(self._chunk_headers) == 0:
667 rows.append(('compression', 'none'))
668 else:
669 compression_counts = {}
670 compression_uncompressed = {}
671 compression_compressed = {}
672 for chunk_header in self._chunk_headers.values():
673 if chunk_header.compression not in compression_counts:
674 compression_counts[chunk_header.compression] = 1
675 compression_uncompressed[chunk_header.compression] = chunk_header.uncompressed_size
676 compression_compressed[chunk_header.compression] = chunk_header.compressed_size
677 else:
678 compression_counts[chunk_header.compression] += 1
679 compression_uncompressed[chunk_header.compression] += chunk_header.uncompressed_size
680 compression_compressed[chunk_header.compression] += chunk_header.compressed_size
681
682 chunk_count = len(self._chunk_headers)
683
684 compressions = []
685 for count, compression in reversed(sorted([(v, k) for k, v in compression_counts.items()])):
686 if compression != Compression.NONE:
687 fraction = (100.0 * compression_compressed[compression]) / compression_uncompressed[compression]
688 compressions.append('%s [%d/%d chunks; %.2f%%]' % (compression, count, chunk_count, fraction))
689 else:
690 compressions.append('%s [%d/%d chunks]' % (compression, count, chunk_count))
691 rows.append(('compression', ', '.join(compressions)))
692
693 all_uncompressed = (sum([count for c, count in compression_counts.items() if c != Compression.NONE]) == 0)
694 if not all_uncompressed:
695 total_uncompressed_size = sum((h.uncompressed_size for h in self._chunk_headers.values()))
696 total_compressed_size = sum((h.compressed_size for h in self._chunk_headers.values()))
697
698 total_uncompressed_size_str = _human_readable_size(total_uncompressed_size)
699 total_compressed_size_str = _human_readable_size(total_compressed_size)
700 total_size_str_length = max([len(total_uncompressed_size_str), len(total_compressed_size_str)])
701
702 if duration > 0:
703 uncompressed_rate_str = _human_readable_size(total_uncompressed_size / duration)
704 compressed_rate_str = _human_readable_size(total_compressed_size / duration)
705 rate_str_length = max([len(uncompressed_rate_str), len(compressed_rate_str)])
706
707 rows.append(('uncompressed', '%*s @ %*s/s' %
708 (total_size_str_length, total_uncompressed_size_str, rate_str_length, uncompressed_rate_str)))
709 rows.append(('compressed', '%*s @ %*s/s (%.2f%%)' %
710 (total_size_str_length, total_compressed_size_str, rate_str_length, compressed_rate_str, (100.0 * total_compressed_size) / total_uncompressed_size)))
711 else:
712 rows.append(('uncompressed', '%*s' % (total_size_str_length, total_uncompressed_size_str)))
713 rows.append(('compressed', '%*s' % (total_size_str_length, total_compressed_size_str)))
714
715 datatypes = set()
716 datatype_infos = []
717 for connection in self._connections.values():
718 if connection.datatype in datatypes:
719 continue
720 datatype_infos.append((connection.datatype, connection.md5sum, connection.msg_def))
721 datatypes.add(connection.datatype)
722
723 topics = sorted(set([c.topic for c in self._get_connections()]))
724 topic_datatypes = {}
725 topic_conn_counts = {}
726 topic_msg_counts = {}
727 topic_freqs_median = {}
728 for topic in topics:
729 connections = list(self._get_connections(topic))
730
731 topic_datatypes[topic] = connections[0].datatype
732 topic_conn_counts[topic] = len(connections)
733
734 msg_count = 0
735 for connection in connections:
736 if self._chunks:
737 for chunk in self._chunks:
738 msg_count += chunk.connection_counts.get(connection.id, 0)
739 else:
740 msg_count += len(self._connection_indexes.get(connection.id, []))
741 topic_msg_counts[topic] = msg_count
742
743 if self._connection_indexes_read:
744 stamps = [entry.time.to_sec() for entry in self._get_entries(connections)]
745 if len(stamps) > 1:
746 periods = [s1 - s0 for s1, s0 in zip(stamps[1:], stamps[:-1])]
747 med_period = _median(periods)
748 if med_period > 0.0:
749 topic_freqs_median[topic] = 1.0 / med_period
750
751 topics = sorted(topic_datatypes.keys())
752 max_topic_len = max([len(topic) for topic in topics])
753 max_datatype_len = max([len(datatype) for datatype in datatypes])
754 max_msg_count_len = max([len('%d' % msg_count) for msg_count in topic_msg_counts.values()])
755 max_freq_median_len = max([len(_human_readable_frequency(freq)) for freq in topic_freqs_median.values()]) if len(topic_freqs_median) > 0 else 0
756
757
758 for i, (datatype, md5sum, msg_def) in enumerate(sorted(datatype_infos)):
759 s = '%-*s [%s]' % (max_datatype_len, datatype, md5sum)
760 if i == 0:
761 rows.append(('types', s))
762 else:
763 rows.append(('', s))
764
765
766 for i, topic in enumerate(topics):
767 topic_msg_count = topic_msg_counts[topic]
768
769 s = '%-*s %*d %s' % (max_topic_len, topic, max_msg_count_len, topic_msg_count, 'msgs' if topic_msg_count > 1 else 'msg ')
770 if topic in topic_freqs_median:
771 s += ' @ %*s' % (max_freq_median_len, _human_readable_frequency(topic_freqs_median[topic]))
772 else:
773 s += ' %*s' % (max_freq_median_len, '')
774
775 s += ' : %-*s' % (max_datatype_len, topic_datatypes[topic])
776 if topic_conn_counts[topic] > 1:
777 s += ' (%d connections)' % topic_conn_counts[topic]
778
779 if i == 0:
780 rows.append(('topics', s))
781 else:
782 rows.append(('', s))
783
784 except Exception as ex:
785 raise
786
787 first_column_width = max([len(field) for field, _ in rows]) + 1
788
789 s = ''
790 for (field, value) in rows:
791 if field:
792 s += '%-*s %s\n' % (first_column_width, field + ':', value)
793 else:
794 s += '%-*s %s\n' % (first_column_width, '', value)
795
796 return s.rstrip()
797
799 s = ''
800
801 try:
802 if self._filename:
803 s += 'path: %s\n' % self._filename
804
805 if self._version == 102 and type(self._reader) == _BagReader102_Unindexed:
806 s += 'version: 1.2 (unindexed)\n'
807 else:
808 s += 'version: %d.%d\n' % (int(self._version / 100), self._version % 100)
809
810 if not self._connection_indexes and not self._chunks:
811 s += 'size: %d\n' % self.size
812 s += 'indexed: False\n'
813 else:
814 if self._chunks:
815 start_stamp = self._chunks[ 0].start_time.to_sec()
816 end_stamp = self._chunks[-1].end_time.to_sec()
817 else:
818 start_stamps = [index[0].time.to_sec() for index in self._connection_indexes.values() if index]
819 start_stamp = min(start_stamps) if start_stamps else 0
820 end_stamps = [index[-1].time.to_sec() for index in self._connection_indexes.values() if index]
821 end_stamp = max(end_stamps) if end_stamps else 0
822
823 duration = end_stamp - start_stamp
824 s += 'duration: %.6f\n' % duration
825 s += 'start: %.6f\n' % start_stamp
826 s += 'end: %.6f\n' % end_stamp
827 s += 'size: %d\n' % self.size
828 if self._chunks:
829 num_messages = 0
830 for c in self._chunks:
831 for counts in c.connection_counts.values():
832 num_messages += counts
833 else:
834 num_messages = sum([len(index) for index in self._connection_indexes.values()])
835 s += 'messages: %d\n' % num_messages
836 s += 'indexed: True\n'
837
838
839 if len(self._chunk_headers) == 0:
840 s += 'compression: none\n'
841 else:
842 compression_counts = {}
843 compression_uncompressed = {}
844 compression_compressed = {}
845 for chunk_header in self._chunk_headers.values():
846 if chunk_header.compression not in compression_counts:
847 compression_counts[chunk_header.compression] = 1
848 compression_uncompressed[chunk_header.compression] = chunk_header.uncompressed_size
849 compression_compressed[chunk_header.compression] = chunk_header.compressed_size
850 else:
851 compression_counts[chunk_header.compression] += 1
852 compression_uncompressed[chunk_header.compression] += chunk_header.uncompressed_size
853 compression_compressed[chunk_header.compression] += chunk_header.compressed_size
854
855 chunk_count = len(self._chunk_headers)
856
857 main_compression_count, main_compression = list(reversed(sorted([(v, k) for k, v in compression_counts.items()])))[0]
858 s += 'compression: %s\n' % str(main_compression)
859
860 all_uncompressed = (sum([count for c, count in compression_counts.items() if c != Compression.NONE]) == 0)
861 if not all_uncompressed:
862 s += 'uncompressed: %d\n' % sum((h.uncompressed_size for h in self._chunk_headers.values()))
863 s += 'compressed: %d\n' % sum((h.compressed_size for h in self._chunk_headers.values()))
864
865 datatypes = set()
866 datatype_infos = []
867 for connection in self._connections.values():
868 if connection.datatype in datatypes:
869 continue
870 datatype_infos.append((connection.datatype, connection.md5sum, connection.msg_def))
871 datatypes.add(connection.datatype)
872
873 topics = sorted(set([c.topic for c in self._get_connections()]))
874 topic_datatypes = {}
875 topic_conn_counts = {}
876 topic_msg_counts = {}
877 topic_freqs_median = {}
878 for topic in topics:
879 connections = list(self._get_connections(topic))
880
881 topic_datatypes[topic] = connections[0].datatype
882 topic_conn_counts[topic] = len(connections)
883
884 msg_count = 0
885 for connection in connections:
886 for chunk in self._chunks:
887 msg_count += chunk.connection_counts.get(connection.id, 0)
888 topic_msg_counts[topic] = msg_count
889
890 if self._connection_indexes_read:
891 stamps = [entry.time.to_sec() for entry in self._get_entries(connections)]
892 if len(stamps) > 1:
893 periods = [s1 - s0 for s1, s0 in zip(stamps[1:], stamps[:-1])]
894 med_period = _median(periods)
895 if med_period > 0.0:
896 topic_freqs_median[topic] = 1.0 / med_period
897
898 topics = sorted(topic_datatypes.keys())
899 max_topic_len = max([len(topic) for topic in topics])
900 max_datatype_len = max([len(datatype) for datatype in datatypes])
901 max_msg_count_len = max([len('%d' % msg_count) for msg_count in topic_msg_counts.values()])
902 max_freq_median_len = max([len(_human_readable_frequency(freq)) for freq in topic_freqs_median.values()]) if len(topic_freqs_median) > 0 else 0
903
904
905 s += 'types:\n'
906 for i, (datatype, md5sum, msg_def) in enumerate(sorted(datatype_infos)):
907 s += ' - type: %s\n' % datatype
908 s += ' md5: %s\n' % md5sum
909
910
911 s += 'topics:\n'
912 for i, topic in enumerate(topics):
913 topic_msg_count = topic_msg_counts[topic]
914
915 s += ' - topic: %s\n' % topic
916 s += ' type: %s\n' % topic_datatypes[topic]
917 s += ' messages: %d\n' % topic_msg_count
918
919 if topic_conn_counts[topic] > 1:
920 s += ' connections: %d\n' % topic_conn_counts[topic]
921
922 if topic in topic_freqs_median:
923 s += ' frequency: %.4f\n' % topic_freqs_median[topic]
924
925 if not key:
926 return s
927
928 class DictObject(object):
929 def __init__(self, d):
930 for a, b in d.items():
931 if isinstance(b, (list, tuple)):
932 setattr(self, a, [DictObject(x) if isinstance(x, dict) else x for x in b])
933 else:
934 setattr(self, a, DictObject(b) if isinstance(b, dict) else b)
935
936 obj = DictObject(yaml.load(s))
937 try:
938 val = eval('obj.' + key)
939 except Exception as ex:
940 print('Error getting key "%s"' % key, file=sys.stderr)
941 return None
942
943 def print_yaml(val, indent=0):
944 indent_str = ' ' * indent
945
946 if type(val) is list:
947 s = ''
948 for item in val:
949 s += '%s- %s\n' % (indent_str, print_yaml(item, indent + 1))
950 return s
951 elif type(val) is DictObject:
952 s = ''
953 for i, (k, v) in enumerate(val.__dict__.items()):
954 if i != 0:
955 s += indent_str
956 s += '%s: %s' % (k, str(v))
957 if i < len(val.__dict__) - 1:
958 s += '\n'
959 return s
960 else:
961 return indent_str + str(val)
962
963 return print_yaml(val)
964
965 except Exception as ex:
966 raise
967
968
969
970 @property
972 if not self._chunk_headers:
973 return False
974
975 return any((h.compression != Compression.NONE for h in self._chunk_headers.values()))
976
977 @property
979 if not self._chunk_headers:
980 return self.size
981
982 return sum((h.uncompressed_size for h in self._chunk_headers.values()))
983
985 """
986 Read the message from the given position in the file.
987 """
988 self.flush()
989 return self._reader.seek_and_read_message_data_record(position, raw)
990
991
992
994 """
995 Yield the connections, optionally filtering by topic and/or connection information.
996 """
997 if topics:
998 if type(topics) is str:
999 topics = set([roslib.names.canonicalize_name(topics)])
1000 else:
1001 topics = set([roslib.names.canonicalize_name(t) for t in topics])
1002
1003 for c in self._connections.values():
1004 if topics and c.topic not in topics and roslib.names.canonicalize_name(c.topic) not in topics:
1005 continue
1006 if connection_filter and not connection_filter(c.topic, c.datatype, c.md5sum, c.msg_def, c.header):
1007 continue
1008 yield c
1009
1010 - def _get_entries(self, connections=None, start_time=None, end_time=None):
1011 """
1012 Yield index entries on the given connections in the given time range.
1013 """
1014 for entry, _ in _mergesort(self._get_indexes(connections), key=lambda entry: entry.time):
1015 if start_time and entry.time < start_time:
1016 continue
1017 if end_time and entry.time > end_time:
1018 return
1019 yield entry
1020
1022 """
1023 Yield index entries on the given connections in the given time range in reverse order.
1024 """
1025 for entry, _ in _mergesort((reversed(index) for index in self._get_indexes(connections)), key=lambda entry: -entry.time.to_sec()):
1026 if end_time and entry.time > end_time:
1027 continue
1028 if start_time and entry.time < start_time:
1029 return
1030 yield entry
1031
1032 - def _get_entry(self, t, connections=None):
1033 """
1034 Return the first index entry on/before the given time on the given connections
1035 """
1036 indexes = self._get_indexes(connections)
1037
1038 entry = _IndexEntry(t)
1039
1040 first_entry = None
1041
1042 for index in indexes:
1043 i = bisect.bisect_right(index, entry) - 1
1044 if i >= 0:
1045 index_entry = index[i]
1046 if first_entry is None or index_entry > first_entry:
1047 first_entry = index_entry
1048
1049 return first_entry
1050
1051 - def _get_entry_after(self, t, connections=None):
1052 """
1053 Return the first index entry after the given time on the given connections
1054 """
1055 indexes = self._get_indexes(connections)
1056
1057 entry = _IndexEntry(t)
1058
1059 first_entry = None
1060
1061 for index in indexes:
1062 i = bisect.bisect_right(index, entry)
1063 if i <= len(index) - 1:
1064 index_entry = index[i]
1065 if first_entry is None or index_entry < first_entry:
1066 first_entry = index_entry
1067
1068 return first_entry
1069
1071 """
1072 Get the indexes for the given connections.
1073 """
1074 if not self._connection_indexes_read:
1075 self._reader._read_connection_index_records()
1076
1077 if connections is None:
1078 return self._connection_indexes.values()
1079 else:
1080 return (self._connection_indexes[c.id] for c in connections)
1081
1082
1083
1085 self._connection_indexes_read = False
1086 self._connection_indexes = {}
1087
1088 self._topic_connections = {}
1089 self._connections = {}
1090 self._connection_count = 0
1091 self._chunk_count = 0
1092 self._chunks = []
1093 self._chunk_headers = {}
1094
1095 self._chunk_open = False
1096 self._curr_chunk_info = None
1097 self._curr_chunk_data_pos = None
1098 self._curr_chunk_connection_indexes = {}
1099
1100 - def _open(self, f, mode, allow_unindexed):
1101 if not f:
1102 raise ValueError('filename (or stream) is invalid')
1103
1104 try:
1105 if mode == 'r': self._open_read(f, allow_unindexed)
1106 elif mode == 'w': self._open_write(f)
1107 elif mode == 'a': self._open_append(f, allow_unindexed)
1108 else:
1109 raise ValueError('mode "%s" is invalid' % mode)
1110 if 'b' not in self._file.mode and not isinstance('', bytes):
1111 raise ROSBagException('In Python 3 the bag file must be opened in binary mode')
1112 except struct.error:
1113 raise ROSBagFormatException('error with bag')
1114
1116 try:
1117 return isinstance(f, file)
1118 except NameError:
1119 import io
1120 return isinstance(f, io.IOBase)
1121
1123 if self._is_file(f):
1124 self._file = f
1125 self._filename = None
1126 else:
1127 self._file = open(f, 'rb')
1128 self._filename = f
1129
1130 self._mode = 'r'
1131
1132 try:
1133 self._version = self._read_version()
1134 except:
1135 self._version = None
1136 self._close_file()
1137 raise
1138
1139 try:
1140 self._create_reader()
1141 self._reader.start_reading()
1142 except ROSBagUnindexedException as ex:
1143 if not allow_unindexed:
1144 self._close_file()
1145 raise
1146 except:
1147 self._close_file()
1148 raise
1149
1151 if self._is_file(f):
1152 self._file = f
1153 self._filename = None
1154 else:
1155 self._file = open(f, 'w+b')
1156 self._filename = f
1157
1158 self._mode = 'w'
1159
1160 self._version = 200
1161
1162 try:
1163 self._create_reader()
1164 self._start_writing()
1165 except:
1166 self._close_file()
1167 raise
1168
1170 if self._is_file(f):
1171 self._file = f
1172 self._filename = None
1173 else:
1174 try:
1175
1176 open(f, 'r').close()
1177
1178
1179 self._file = open(f, 'r+b')
1180 except IOError:
1181
1182 self._file = open(f, 'w+b')
1183
1184 self._filename = f
1185
1186 self._mode = 'a'
1187
1188 try:
1189 self._version = self._read_version()
1190 except:
1191 self._version = None
1192 self._close_file()
1193 raise
1194
1195 if self._version != 200:
1196 self._file.close()
1197 raise ROSBagException('bag version %d is unsupported for appending' % self._version)
1198
1199 try:
1200 self._start_appending()
1201 except ROSBagUnindexedException:
1202 if not allow_unindexed:
1203 self._close_file()
1204 raise
1205 except:
1206 self._close_file()
1207 raise
1208
1210 self._file.close()
1211 self._file = None
1212
1214 """
1215 @raise ROSBagException: if the bag version is unsupported
1216 """
1217 major_version, minor_version = int(self._version / 100), self._version % 100
1218 if major_version == 2:
1219 self._reader = _BagReader200(self)
1220 elif major_version == 1:
1221 if minor_version == 1:
1222 raise ROSBagException('unsupported bag version %d. Please convert bag to newer format.' % self._version)
1223 else:
1224
1225 first_record_pos = self._file.tell()
1226 header = _read_header(self._file)
1227 op = _read_uint8_field(header, 'op')
1228 self._file.seek(first_record_pos)
1229
1230 if op == _OP_FILE_HEADER:
1231 self._reader = _BagReader102_Indexed(self)
1232 else:
1233 self._reader = _BagReader102_Unindexed(self)
1234 else:
1235 raise ROSBagException('unknown bag version %d' % self._version)
1236
1238 """
1239 @raise ROSBagException: if the file is empty, or the version line can't be parsed
1240 """
1241 version_line = self._file.readline().rstrip().decode()
1242 if len(version_line) == 0:
1243 raise ROSBagException('empty file')
1244
1245 matches = re.match("#ROS(.*) V(\d).(\d)", version_line)
1246 if matches is None or len(matches.groups()) != 3:
1247 raise ROSBagException('This does not appear to be a bag file')
1248
1249 version_type, major_version_str, minor_version_str = matches.groups()
1250
1251 version = int(major_version_str) * 100 + int(minor_version_str)
1252
1253 return version
1254
1256 version = _VERSION + '\n'
1257 version = version.encode()
1258 self._file.write(version)
1259 self._file_header_pos = self._file.tell()
1260 self._write_file_header_record(0, 0, 0)
1261
1263
1264 self.flush()
1265
1266
1267 self._index_data_pos = self._file.tell()
1268
1269
1270 for connection_info in self._connections.values():
1271 self._write_connection_record(connection_info)
1272
1273
1274 for chunk_info in self._chunks:
1275 self._write_chunk_info_record(chunk_info)
1276
1277
1278 self._file.seek(self._file_header_pos)
1279 self._write_file_header_record(self._index_data_pos, len(self._connections), len(self._chunks))
1280
1282 self._file_header_pos = self._file.tell()
1283
1284 self._create_reader()
1285 self._reader.start_reading()
1286
1287
1288 self._file.truncate(self._index_data_pos)
1289 self._reader.index_data_pos = 0
1290
1291
1292 self._file.seek(self._file_header_pos);
1293 self._write_file_header_record(0, 0, 0)
1294
1295
1296 self._file.seek(0, os.SEEK_END)
1297
1299 self._curr_chunk_info = _ChunkInfo(self._file.tell(), t, t)
1300 self._write_chunk_header(_ChunkHeader(self._compression, 0, 0))
1301 self._curr_chunk_data_pos = self._file.tell()
1302 self._set_compression_mode(self._compression)
1303 self._chunk_open = True
1304
1306 if self._compression == Compression.NONE:
1307 return self._file.tell() - self._curr_chunk_data_pos
1308 else:
1309 return self._output_file.compressed_bytes_in
1310
1312
1313 self._chunks.append(self._curr_chunk_info)
1314
1315
1316 uncompressed_size = self._get_chunk_offset()
1317 self._set_compression_mode(Compression.NONE)
1318 compressed_size = self._file.tell() - self._curr_chunk_data_pos
1319
1320
1321 end_of_chunk_pos = self._file.tell()
1322 self._file.seek(self._curr_chunk_info.pos)
1323 chunk_header = _ChunkHeader(self._compression, compressed_size, uncompressed_size, self._curr_chunk_data_pos)
1324 self._write_chunk_header(chunk_header)
1325 self._chunk_headers[self._curr_chunk_info.pos] = chunk_header
1326
1327
1328 self._file.seek(end_of_chunk_pos)
1329 for connection_id, entries in self._curr_chunk_connection_indexes.items():
1330 self._write_connection_index_record(connection_id, entries)
1331 self._curr_chunk_connection_indexes.clear()
1332
1333
1334 self._chunk_open = False
1335
1352
1354 header = {
1355 'op': _pack_uint8(_OP_FILE_HEADER),
1356 'index_pos': _pack_uint64(index_pos),
1357 'conn_count': _pack_uint32(connection_count),
1358 'chunk_count': _pack_uint32(chunk_count)
1359 }
1360 _write_record(self._file, header, padded_size=_FILE_HEADER_LENGTH)
1361
1363 header = {
1364 'op': _pack_uint8(_OP_CONNECTION),
1365 'topic': connection_info.topic,
1366 'conn': _pack_uint32(connection_info.id)
1367 }
1368
1369 _write_header(self._output_file, header)
1370
1371 _write_header(self._output_file, connection_info.header)
1372
1374 header = {
1375 'op': _pack_uint8(_OP_MSG_DATA),
1376 'conn': _pack_uint32(connection_id),
1377 'time': _pack_time(t)
1378 }
1379 _write_record(self._output_file, header, serialized_bytes)
1380
1382 header = {
1383 'op': _pack_uint8(_OP_CHUNK),
1384 'compression': chunk_header.compression,
1385 'size': _pack_uint32(chunk_header.uncompressed_size)
1386 }
1387 _write_header(self._file, header)
1388
1389 self._file.write(_pack_uint32(chunk_header.compressed_size))
1390
1392 header = {
1393 'op': _pack_uint8(_OP_INDEX_DATA),
1394 'conn': _pack_uint32(connection_id),
1395 'ver': _pack_uint32(_INDEX_VERSION),
1396 'count': _pack_uint32(len(entries))
1397 }
1398
1399 buffer = self._buffer
1400 buffer.seek(0)
1401 buffer.truncate(0)
1402 for entry in entries:
1403 buffer.write(_pack_time (entry.time))
1404 buffer.write(_pack_uint32(entry.offset))
1405
1406 _write_record(self._file, header, buffer.getvalue())
1407
1409 header = {
1410 'op': _pack_uint8 (_OP_CHUNK_INFO),
1411 'ver': _pack_uint32(_CHUNK_INDEX_VERSION),
1412 'chunk_pos': _pack_uint64(chunk_info.pos),
1413 'start_time': _pack_time(chunk_info.start_time),
1414 'end_time': _pack_time(chunk_info.end_time),
1415 'count': _pack_uint32(len(chunk_info.connection_counts))
1416 }
1417
1418 buffer = self._buffer
1419 buffer.seek(0)
1420 buffer.truncate(0)
1421 for connection_id, count in chunk_info.connection_counts.items():
1422 buffer.write(_pack_uint32(connection_id))
1423 buffer.write(_pack_uint32(count))
1424
1425 _write_record(self._file, header, buffer.getvalue())
1426
1427
1428
1429 _message_types = {}
1430
1431 _OP_MSG_DEF = 0x01
1432 _OP_MSG_DATA = 0x02
1433 _OP_FILE_HEADER = 0x03
1434 _OP_INDEX_DATA = 0x04
1435 _OP_CHUNK = 0x05
1436 _OP_CHUNK_INFO = 0x06
1437 _OP_CONNECTION = 0x07
1438
1439 _OP_CODES = {
1440 _OP_MSG_DEF: 'MSG_DEF',
1441 _OP_MSG_DATA: 'MSG_DATA',
1442 _OP_FILE_HEADER: 'FILE_HEADER',
1443 _OP_INDEX_DATA: 'INDEX_DATA',
1444 _OP_CHUNK: 'CHUNK',
1445 _OP_CHUNK_INFO: 'CHUNK_INFO',
1446 _OP_CONNECTION: 'CONNECTION'
1447 }
1448
1449 _VERSION = '#ROSBAG V2.0'
1450 _FILE_HEADER_LENGTH = 4096
1451 _INDEX_VERSION = 1
1452 _CHUNK_INDEX_VERSION = 1
1455 - def __init__(self, id, topic, header):
1456 try:
1457 datatype = _read_str_field(header, 'type')
1458 md5sum = _read_str_field(header, 'md5sum')
1459 msg_def = _read_str_field(header, 'message_definition')
1460 except KeyError as ex:
1461 raise ROSBagFormatException('connection header field %s not found' % str(ex))
1462
1463 self.id = id
1464 self.topic = topic
1465 self.datatype = datatype
1466 self.md5sum = md5sum
1467 self.msg_def = msg_def
1468 self.header = header
1469
1471 return '%d on %s: %s' % (self.id, self.topic, str(self.header))
1472
1474 - def __init__(self, pos, start_time, end_time):
1475 self.pos = pos
1476 self.start_time = start_time
1477 self.end_time = end_time
1478
1479 self.connection_counts = {}
1480
1482 s = 'chunk_pos: %d\n' % self.pos
1483 s += 'start_time: %s\n' % str(self.start_time)
1484 s += 'end_time: %s\n' % str(self.end_time)
1485 s += 'connections: %d\n' % len(self.connection_counts)
1486 s += '\n'.join([' - [%4d] %d' % (connection_id, count) for connection_id, count in self.connection_counts.items()])
1487 return s
1488
1491 self.compression = compression
1492 self.compressed_size = compressed_size
1493 self.uncompressed_size = uncompressed_size
1494 self.data_pos = data_pos
1495
1497 if self.uncompressed_size > 0:
1498 ratio = 100 * (float(self.compressed_size) / self.uncompressed_size)
1499 return 'compression: %s, size: %d, uncompressed: %d (%.2f%%)' % (self.compression, self.compressed_size, self.uncompressed_size, ratio)
1500 else:
1501 return 'compression: %s, size: %d, uncompressed: %d' % (self.compression, self.compressed_size, self.uncompressed_size)
1502
1504 __slots__ = []
1505
1507 try:
1508 return method(self._cmpkey(), other._cmpkey())
1509 except (AttributeError, TypeError):
1510
1511
1512 return NotImplemented
1513
1515 return self._compare(other, lambda s, o: s < o)
1516
1518 return self._compare(other, lambda s, o: s <= o)
1519
1521 return self._compare(other, lambda s, o: s == o)
1522
1524 return self._compare(other, lambda s, o: s >= o)
1525
1527 return self._compare(other, lambda s, o: s > o)
1528
1530 return self._compare(other, lambda s, o: s != o)
1531
1532 -class _IndexEntry(ComparableMixin):
1533 __slots__ = ['time']
1534
1535 - def __init__(self, time):
1537
1538 - def _cmpkey(self):
1540
1541 -class _IndexEntry102(_IndexEntry):
1542 __slots__ = ['offset']
1543
1544 - def __init__(self, time, offset):
1545 self.time = time
1546 self.offset = offset
1547
1548 @property
1549 - def position(self):
1551
1552 - def __str__(self):
1553 return '%d.%d: %d' % (self.time.secs, self.time.nsecs, self.offset)
1554
1555 -class _IndexEntry200(_IndexEntry):
1556 __slots__ = ['chunk_pos', 'offset']
1557
1558 - def __init__(self, time, chunk_pos, offset):
1559 self.time = time
1560 self.chunk_pos = chunk_pos
1561 self.offset = offset
1562
1563 @property
1564 - def position(self):
1565 return (self.chunk_pos, self.offset)
1566
1567 - def __str__(self):
1568 return '%d.%d: %d+%d' % (self.time.secs, self.time.nsecs, self.chunk_pos, self.offset)
1569
1571 message_type = _message_types.get(info.md5sum)
1572 if message_type is None:
1573 try:
1574 message_type = genpy.dynamic.generate_dynamic(info.datatype, info.msg_def)[info.datatype]
1575 if (message_type._md5sum != info.md5sum):
1576 print('WARNING: For type [%s] stored md5sum [%s] does not match message definition [%s].\n Try: "rosrun rosbag fix_msg_defs.py old_bag new_bag."'%(info.datatype, info.md5sum, message_type._md5sum), file=sys.stderr)
1577 except genmsg.InvalidMsgSpec:
1578 message_type = genpy.dynamic.generate_dynamic(info.datatype, "")[info.datatype]
1579 print('WARNING: For type [%s] stored md5sum [%s] has invalid message definition."'%(info.datatype, info.md5sum), file=sys.stderr)
1580 except genmsg.MsgGenerationException as ex:
1581 raise ROSBagException('Error generating datatype %s: %s' % (info.datatype, str(ex)))
1582
1583 _message_types[info.md5sum] = message_type
1584
1585 return message_type
1586
1590 -def _read_time (f): return _unpack_time (f.read(8))
1591
1592 -def _decode_str(v): return v if type(v) is str else v.decode()
1596 -def _unpack_time(v): return rospy.Time(*struct.unpack('<LL', v))
1597
1601 -def _pack_time(v): return _pack_uint32(v.secs) + _pack_uint32(v.nsecs)
1602
1604 data = f.read(size)
1605 if len(data) != size:
1606 raise ROSBagException('expecting %d bytes, read %d' % (size, len(data)))
1607 return data
1608
1610 _skip_sized(f)
1611 _skip_sized(f)
1612
1614 size = _read_uint32(f)
1615 f.seek(size, os.SEEK_CUR)
1616
1618 try:
1619 size = _read_uint32(f)
1620 except struct.error as ex:
1621 raise ROSBagFormatException('error unpacking uint32: %s' % str(ex))
1622 return _read(f, size)
1623
1625 if not isinstance(v, bytes):
1626 v = v.encode()
1627 f.write(_pack_uint32(len(v)))
1628 f.write(v)
1629
1631 if field not in header:
1632 raise ROSBagFormatException('expected "%s" field in record' % field)
1633
1634 try:
1635 value = unpack_fn(header[field])
1636 except Exception as ex:
1637 raise ROSBagFormatException('error reading field "%s": %s' % (field, str(ex)))
1638
1639 return value
1640
1641 -def _read_str_field (header, field): return _read_field(header, field, _decode_str)
1642 -def _read_uint8_field (header, field): return _read_field(header, field, _unpack_uint8)
1645 -def _read_time_field (header, field): return _read_field(header, field, _unpack_time)
1646
1648 header_str = _write_header(f, header)
1649
1650 if padded_size is not None:
1651 header_len = len(header_str)
1652 if header_len < padded_size:
1653 data = ' ' * (padded_size - header_len)
1654 else:
1655 data = ''
1656
1657 _write_sized(f, data)
1658
1660 header_str = b''
1661 equal = b'='
1662 for k, v in header.items():
1663 if not isinstance(k, bytes):
1664 k = k.encode()
1665 if not isinstance(v, bytes):
1666 v = v.encode()
1667 header_str += _pack_uint32(len(k) + 1 + len(v)) + k + equal + v
1668 _write_sized(f, header_str)
1669 return header_str
1670
1672 bag_pos = f.tell()
1673
1674
1675 try:
1676 header = _read_sized(f)
1677 except ROSBagException as ex:
1678 raise ROSBagFormatException('Error reading header: %s' % str(ex))
1679
1680
1681 header_dict = {}
1682 while header != b'':
1683
1684 if len(header) < 4:
1685 raise ROSBagFormatException('Error reading header field')
1686 (size,) = struct.unpack('<L', header[:4])
1687 header = header[4:]
1688
1689
1690 if len(header) < size:
1691 raise ROSBagFormatException('Error reading header field: expected %d bytes, read %d' % (size, len(header)))
1692 (name, sep, value) = header[:size].partition(b'=')
1693 if sep == b'':
1694 raise ROSBagFormatException('Error reading header field')
1695
1696 name = name.decode()
1697 header_dict[name] = value
1698
1699 header = header[size:]
1700
1701
1702 if req_op is not None:
1703 op = _read_uint8_field(header_dict, 'op')
1704 if req_op != op:
1705 raise ROSBagFormatException('Expected op code: %s, got %s' % (_OP_CODES[req_op], _OP_CODES[op]))
1706
1707 return header_dict
1708
1710 pos = f.tell()
1711 header = _read_header(f)
1712 op = _read_uint8_field(header, 'op')
1713 f.seek(pos)
1714 return op
1715
1717 try:
1718 record_data = _read_sized(f)
1719 except ROSBagException as ex:
1720 raise ROSBagFormatException('Error reading record data: %s' % str(ex))
1721
1722 return record_data
1723
1727
1729 raise NotImplementedError()
1730
1731 - def read_messages(self, topics, start_time, end_time, connection_filter, raw):
1732 raise NotImplementedError()
1733
1735 raise NotImplementedError()
1736
1738 """
1739 Support class for reading unindexed v1.2 bag files.
1740 """
1743
1745 self.bag._file_header_pos = self.bag._file.tell()
1746
1748 """Generates all bag index information by rereading the message records."""
1749 f = self.bag._file
1750
1751 total_bytes = self.bag.size
1752
1753
1754 self.bag._file.seek(self.bag._file_header_pos)
1755
1756 offset = f.tell()
1757
1758
1759 while offset < total_bytes:
1760 yield offset
1761
1762 op = _peek_next_header_op(f)
1763
1764 if op == _OP_MSG_DEF:
1765 connection_info = self.read_message_definition_record()
1766
1767 if connection_info.topic not in self.bag._topic_connections:
1768 self.bag._topic_connections[connection_info.topic] = connection_info.id
1769 self.bag._connections[connection_info.id] = connection_info
1770 self.bag._connection_indexes[connection_info.id] = []
1771
1772 elif op == _OP_MSG_DATA:
1773
1774 header = _read_header(f)
1775
1776 topic = _read_str_field(header, 'topic')
1777 secs = _read_uint32_field(header, 'sec')
1778 nsecs = _read_uint32_field(header, 'nsec')
1779 t = genpy.Time(secs, nsecs)
1780
1781 if topic not in self.bag._topic_connections:
1782 datatype = _read_str_field(header, 'type')
1783 self._create_connection_info_for_datatype(topic, datatype)
1784
1785 connection_id = self.bag._topic_connections[topic]
1786 info = self.bag._connections[connection_id]
1787
1788
1789 _skip_sized(f)
1790
1791
1792 bisect.insort_right(self.bag._connection_indexes[connection_id], _IndexEntry102(t, offset))
1793
1794 offset = f.tell()
1795
1796 - def read_messages(self, topics, start_time, end_time, topic_filter, raw):
1797 f = self.bag._file
1798
1799 f.seek(self.bag._file_header_pos)
1800
1801 while True:
1802
1803 while True:
1804 position = f.tell()
1805
1806 try:
1807 header = _read_header(f)
1808 except Exception:
1809 return
1810
1811 op = _read_uint8_field(header, 'op')
1812 if op != _OP_MSG_DEF:
1813 break
1814
1815 connection_info = self.read_message_definition_record(header)
1816
1817 if connection_info.topic not in self.bag._topic_connections:
1818 self.bag._topic_connections[connection_info.topic] = connection_info.id
1819
1820 self.bag._connections[connection_info.id] = connection_info
1821
1822
1823 if op != _OP_MSG_DATA:
1824 raise ROSBagFormatException('Expecting OP_MSG_DATA, got %d' % op)
1825
1826 topic = _read_str_field(header, 'topic')
1827
1828 if topic not in self.bag._topic_connections:
1829 datatype = _read_str_field(header, 'type')
1830 self._create_connection_info_for_datatype(topic, datatype)
1831
1832 connection_id = self.bag._topic_connections[topic]
1833 info = self.bag._connections[connection_id]
1834
1835
1836 try:
1837 msg_type = _get_message_type(info)
1838 except KeyError:
1839 raise ROSBagException('Cannot deserialize messages of type [%s]. Message was not preceeded in bagfile by definition' % info.datatype)
1840
1841
1842 secs = _read_uint32_field(header, 'sec')
1843 nsecs = _read_uint32_field(header, 'nsec')
1844 t = genpy.Time(secs, nsecs)
1845
1846
1847 data = _read_record_data(f)
1848
1849 if raw:
1850 msg = (info.datatype, data, info.md5sum, position, msg_type)
1851 else:
1852
1853 msg = msg_type()
1854 msg.deserialize(data)
1855
1856 yield BagMessage(topic, msg, t)
1857
1858 self.bag._connection_indexes_read = True
1859
1861 for c in self.bag._connections.values():
1862 if c.datatype == datatype:
1863 connection_id = len(self.bag._connections)
1864 connection_header = { 'topic' : topic, 'type' : c.header['type'], 'md5sum' : c.header['md5sum'], 'message_definition' : c.header['message_definition'] }
1865 connection_info = _ConnectionInfo(connection_id, topic, connection_header)
1866
1867 self.bag._topic_connections[topic] = connection_id
1868 self.bag._connections[connection_id] = connection_info
1869 self.bag._connection_indexes[connection_id] = []
1870 return
1871
1872 raise ROSBagFormatException('Topic %s of datatype %s not preceded by message definition' % (topic, datatype))
1873
1875 if not header:
1876 header = _read_header(self.bag._file, _OP_MSG_DEF)
1877
1878 topic = _read_str_field(header, 'topic')
1879 datatype = _read_str_field(header, 'type')
1880 md5sum = _read_str_field(header, 'md5')
1881 msg_def = _read_str_field(header, 'def')
1882
1883 _skip_sized(self.bag._file)
1884
1885 connection_header = { 'topic' : topic, 'type' : datatype, 'md5sum' : md5sum, 'message_definition' : msg_def }
1886
1887 id = len(self.bag._connections)
1888
1889 return _ConnectionInfo(id, topic, connection_header)
1890
1892 """
1893 Support class for reading indexed v1.2 bag files.
1894 """
1897
1898 - def read_messages(self, topics, start_time, end_time, connection_filter, raw):
1899 connections = self.bag._get_connections(topics, connection_filter)
1900 for entry in self.bag._get_entries(connections, start_time, end_time):
1901 yield self.seek_and_read_message_data_record(entry.offset, raw)
1902
1904 """Generates all bag index information by rereading the message records."""
1905 f = self.bag._file
1906
1907 total_bytes = self.bag.size
1908
1909
1910 self.bag._file.seek(self.bag._file_header_pos)
1911 self.read_file_header_record()
1912
1913 offset = f.tell()
1914
1915
1916 while offset < total_bytes:
1917 yield offset
1918
1919 op = _peek_next_header_op(f)
1920
1921 if op == _OP_MSG_DEF:
1922 connection_info = self.read_message_definition_record()
1923
1924 if connection_info.topic not in self.bag._topic_connections:
1925 self.bag._topic_connections[connection_info.topic] = connection_info.id
1926 self.bag._connections[connection_info.id] = connection_info
1927 self.bag._connection_indexes[connection_info.id] = []
1928
1929 elif op == _OP_MSG_DATA:
1930
1931 header = _read_header(f)
1932
1933 topic = _read_str_field(header, 'topic')
1934 secs = _read_uint32_field(header, 'sec')
1935 nsecs = _read_uint32_field(header, 'nsec')
1936 t = genpy.Time(secs, nsecs)
1937
1938 if topic not in self.bag._topic_connections:
1939 datatype = _read_str_field(header, 'type')
1940 self._create_connection_info_for_datatype(topic, datatype)
1941
1942 connection_id = self.bag._topic_connections[topic]
1943 info = self.bag._connections[connection_id]
1944
1945
1946 _skip_sized(f)
1947
1948
1949 bisect.insort_right(self.bag._connection_indexes[connection_id], _IndexEntry102(t, offset))
1950
1951 elif op == _OP_INDEX_DATA:
1952 _skip_record(f)
1953
1954 offset = f.tell()
1955
1957 try:
1958
1959 self.read_file_header_record()
1960
1961 total_bytes = self.bag.size
1962
1963
1964 if self.bag._index_data_pos == 0:
1965 raise ROSBagUnindexedException()
1966
1967
1968 self.bag._file.seek(self.bag._index_data_pos)
1969
1970
1971 topic_indexes = {}
1972 while True:
1973 pos = self.bag._file.tell()
1974 if pos >= total_bytes:
1975 break
1976
1977 topic, index = self.read_topic_index_record()
1978
1979 topic_indexes[topic] = index
1980
1981
1982 for topic, index in topic_indexes.items():
1983 self.bag._file.seek(index[0].offset)
1984
1985 connection_info = self.read_message_definition_record()
1986
1987 if connection_info.topic not in self.bag._topic_connections:
1988 self.bag._topic_connections[connection_info.topic] = connection_info.id
1989 self.bag._connections[connection_info.id] = connection_info
1990
1991 self.bag._connection_indexes[connection_info.id] = index
1992
1993 self.bag._connection_indexes_read = True
1994
1995 except Exception as ex:
1996 raise ROSBagUnindexedException()
1997
1999 self.bag._file_header_pos = self.bag._file.tell()
2000
2001 header = _read_header(self.bag._file, _OP_FILE_HEADER)
2002
2003 self.bag._index_data_pos = _read_uint64_field(header, 'index_pos')
2004
2005 _skip_sized(self.bag._file)
2006
2008 f = self.bag._file
2009
2010 header = _read_header(f, _OP_INDEX_DATA)
2011
2012 index_version = _read_uint32_field(header, 'ver')
2013 topic = _read_str_field (header, 'topic')
2014 count = _read_uint32_field(header, 'count')
2015
2016 if index_version != 0:
2017 raise ROSBagFormatException('expecting index version 0, got %d' % index_version)
2018
2019 _read_uint32(f)
2020
2021 topic_index = []
2022
2023 for i in range(count):
2024 time = _read_time (f)
2025 offset = _read_uint64(f)
2026
2027 topic_index.append(_IndexEntry102(time, offset))
2028
2029 return (topic, topic_index)
2030
2032 f = self.bag._file
2033
2034
2035 f.seek(position)
2036
2037
2038 while True:
2039 header = _read_header(f)
2040 op = _read_uint8_field(header, 'op')
2041 if op != _OP_MSG_DEF:
2042 break
2043 _skip_sized(f)
2044
2045
2046 if op != _OP_MSG_DATA:
2047 raise ROSBagFormatException('Expecting OP_MSG_DATA, got %d' % op)
2048
2049 topic = _read_str_field(header, 'topic')
2050
2051 connection_id = self.bag._topic_connections[topic]
2052 info = self.bag._connections[connection_id]
2053
2054
2055 try:
2056 msg_type = _get_message_type(info)
2057 except KeyError:
2058 raise ROSBagException('Cannot deserialize messages of type [%s]. Message was not preceeded in bagfile by definition' % info.datatype)
2059
2060
2061 secs = _read_uint32_field(header, 'sec')
2062 nsecs = _read_uint32_field(header, 'nsec')
2063 t = genpy.Time(secs, nsecs)
2064
2065
2066 data = _read_record_data(f)
2067
2068 if raw:
2069 msg = info.datatype, data, info.md5sum, position, msg_type
2070 else:
2071
2072 msg = msg_type()
2073 msg.deserialize(data)
2074
2075 return BagMessage(topic, msg, t)
2076
2078 """
2079 Support class for reading v2.0 bag files.
2080 """
2082 _BagReader.__init__(self, bag)
2083
2084 self.decompressed_chunk_pos = None
2085 self.decompressed_chunk = None
2086 self.decompressed_chunk_io = None
2087
2089 """
2090 Generates all bag index information by rereading the chunks.
2091 Assumes the file header has been read.
2092 """
2093 f = self.bag._file
2094
2095 f.seek(0, os.SEEK_END)
2096 total_bytes = f.tell()
2097
2098
2099
2100
2101 self._read_terminal_connection_records()
2102
2103
2104 self.bag._file.seek(self.bag._file_header_pos)
2105 self.read_file_header_record()
2106
2107 trunc_pos = None
2108
2109 while True:
2110 chunk_pos = f.tell()
2111 if chunk_pos >= total_bytes:
2112 break
2113
2114 yield chunk_pos
2115
2116 try:
2117 self._reindex_read_chunk(f, chunk_pos, total_bytes)
2118 except Exception as ex:
2119 break
2120
2121 trunc_pos = f.tell()
2122
2123 if trunc_pos and trunc_pos < total_bytes:
2124 f.truncate(trunc_pos)
2125
2127
2128 chunk_header = self.read_chunk_header()
2129
2130
2131 if chunk_header.compressed_size == 0:
2132 raise ROSBagException('unterminated chunk at %d' % chunk_pos)
2133
2134 if chunk_header.compression == Compression.NONE:
2135 chunk_file = f
2136 else:
2137
2138 compressed_chunk = _read(f, chunk_header.compressed_size)
2139
2140
2141 if chunk_header.compression == Compression.BZ2:
2142 self.decompressed_chunk = bz2.decompress(compressed_chunk)
2143 elif chunk_header.compression == Compression.LZ4 and found_lz4:
2144 self.decompressed_chunk = roslz4.decompress(compressed_chunk)
2145 else:
2146 raise ROSBagException('unsupported compression type: %s' % chunk_header.compression)
2147
2148 if self.decompressed_chunk_io:
2149 self.decompressed_chunk_io.close()
2150 self.decompressed_chunk_io = StringIO(self.decompressed_chunk)
2151
2152 chunk_file = self.decompressed_chunk_io
2153
2154
2155 self.bag._curr_chunk_info = None
2156
2157 if chunk_header.compression == Compression.NONE:
2158 offset = chunk_file.tell() - chunk_pos
2159 else:
2160 offset = chunk_file.tell()
2161
2162 expected_index_length = 0
2163
2164 while offset < chunk_header.uncompressed_size:
2165 op = _peek_next_header_op(chunk_file)
2166
2167 if op == _OP_CONNECTION:
2168 connection_info = self.read_connection_record(chunk_file)
2169
2170 if connection_info.id not in self.bag._connections:
2171 self.bag._connections[connection_info.id] = connection_info
2172 if connection_info.id not in self.bag._connection_indexes:
2173 self.bag._connection_indexes[connection_info.id] = []
2174
2175 elif op == _OP_MSG_DATA:
2176
2177 header = _read_header(chunk_file)
2178
2179 connection_id = _read_uint32_field(header, 'conn')
2180 t = _read_time_field (header, 'time')
2181
2182
2183 if not self.bag._curr_chunk_info:
2184 self.bag._curr_chunk_info = _ChunkInfo(chunk_pos, t, t)
2185 else:
2186 if t > self.bag._curr_chunk_info.end_time:
2187 self.bag._curr_chunk_info.end_time = t
2188 elif t < self.bag._curr_chunk_info.start_time:
2189 self.bag._curr_chunk_info.start_time = t
2190 if connection_id in self.bag._curr_chunk_info.connection_counts:
2191 self.bag._curr_chunk_info.connection_counts[connection_id] += 1
2192 else:
2193 self.bag._curr_chunk_info.connection_counts[connection_id] = 1
2194
2195
2196 _skip_sized(chunk_file)
2197
2198
2199 if connection_id not in self.bag._connection_indexes:
2200 raise ROSBagException('connection id (id=%d) in chunk at position %d not preceded by connection record' % (connection_id, chunk_pos))
2201 bisect.insort_right(self.bag._connection_indexes[connection_id], _IndexEntry200(t, chunk_pos, offset))
2202
2203 expected_index_length += 1
2204
2205 else:
2206
2207 _skip_record(chunk_file)
2208
2209 if chunk_header.compression == Compression.NONE:
2210 offset = chunk_file.tell() - chunk_pos
2211 else:
2212 offset = chunk_file.tell()
2213
2214
2215 next_op = _peek_next_header_op(f)
2216
2217 total_index_length = 0
2218
2219 while next_op != _OP_CHUNK:
2220 if next_op == _OP_INDEX_DATA:
2221
2222 _, index = self.read_connection_index_record()
2223 total_index_length += len(index)
2224 else:
2225 _skip_record(f)
2226
2227 if f.tell() >= total_bytes:
2228 if total_index_length != expected_index_length:
2229 raise ROSBagException('index shorter than expected (%d vs %d)' % (total_index_length, expected_index_length))
2230 break
2231
2232 next_op = _peek_next_header_op(f)
2233
2234
2235 self.bag._chunk_headers[chunk_pos] = chunk_header
2236 self.bag._chunks.append(self.bag._curr_chunk_info)
2237
2239 b, f, r = self.bag, self.bag._file, self.bag._reader
2240
2241
2242 f.seek(b._file_header_pos)
2243 r.read_file_header_record()
2244
2245
2246 if self._advance_to_next_record(_OP_CONNECTION):
2247
2248 while True:
2249 connection_info = r.read_connection_record(f)
2250
2251 b._connections[connection_info.id] = connection_info
2252 b._connection_indexes[connection_info.id] = []
2253
2254 next_op = _peek_next_header_op(f)
2255 if next_op != _OP_CONNECTION:
2256 break
2257
2259 b, f = self.bag, self.bag._file
2260
2261 try:
2262 while True:
2263 next_op = _peek_next_header_op(f)
2264 if next_op == op:
2265 break
2266
2267 if next_op == _OP_INDEX_DATA:
2268
2269
2270
2271 if b._curr_chunk_info is None:
2272 b._curr_chunk_info = _ChunkInfo(0, rospy.Time(0, 1), rospy.Time(0, 1))
2273
2274 b._reader.read_connection_index_record()
2275 else:
2276 _skip_record(f)
2277
2278 return True
2279
2280 except Exception as ex:
2281 return False
2282
2284 try:
2285
2286 self.read_file_header_record()
2287
2288
2289 if self.bag._index_data_pos == 0:
2290 raise ROSBagUnindexedException()
2291
2292
2293 self.bag._file.seek(self.bag._index_data_pos)
2294
2295
2296 self.bag._connection_indexes = {}
2297 for i in range(self.bag._connection_count):
2298 connection_info = self.read_connection_record(self.bag._file)
2299 self.bag._connections[connection_info.id] = connection_info
2300 self.bag._connection_indexes[connection_info.id] = []
2301
2302
2303 self.bag._chunks = [self.read_chunk_info_record() for i in range(self.bag._chunk_count)]
2304
2305
2306 self.bag._chunk_headers = {}
2307 for chunk_info in self.bag._chunks:
2308 self.bag._file.seek(chunk_info.pos)
2309 self.bag._chunk_headers[chunk_info.pos] = self.read_chunk_header()
2310
2311 if not self.bag._skip_index:
2312 self._read_connection_index_records()
2313
2314 except Exception as ex:
2315 raise ROSBagUnindexedException()
2316
2318 for chunk_info in self.bag._chunks:
2319 self.bag._file.seek(chunk_info.pos)
2320 _skip_record(self.bag._file)
2321
2322 self.bag._curr_chunk_info = chunk_info
2323 for i in range(len(chunk_info.connection_counts)):
2324 connection_id, index = self.read_connection_index_record()
2325 self.bag._connection_indexes[connection_id].extend(index)
2326
2327
2328
2329
2330 orphan_connection_ids = [id for id, index in self.bag._connection_indexes.items() if not index]
2331 for id in orphan_connection_ids:
2332 del self.bag._connections[id]
2333 del self.bag._connection_indexes[id]
2334
2335 self.bag._connection_indexes_read = True
2336
2337 - def read_messages(self, topics, start_time, end_time, connection_filter, raw):
2338 connections = self.bag._get_connections(topics, connection_filter)
2339 for entry in self.bag._get_entries(connections, start_time, end_time):
2340 yield self.seek_and_read_message_data_record((entry.chunk_pos, entry.offset), raw)
2341
2342
2343
2345 self.bag._file_header_pos = self.bag._file.tell()
2346
2347 header = _read_header(self.bag._file, _OP_FILE_HEADER)
2348
2349 self.bag._index_data_pos = _read_uint64_field(header, 'index_pos')
2350 self.bag._chunk_count = _read_uint32_field(header, 'chunk_count')
2351 self.bag._connection_count = _read_uint32_field(header, 'conn_count')
2352
2353 _skip_sized(self.bag._file)
2354
2356 header = _read_header(f, _OP_CONNECTION)
2357
2358 conn_id = _read_uint32_field(header, 'conn')
2359 topic = _read_str_field (header, 'topic')
2360
2361 connection_header = _read_header(f)
2362
2363 return _ConnectionInfo(conn_id, topic, connection_header)
2364
2366 f = self.bag._file
2367
2368 header = _read_header(f, _OP_CHUNK_INFO)
2369
2370 chunk_info_version = _read_uint32_field(header, 'ver')
2371
2372 if chunk_info_version == 1:
2373 chunk_pos = _read_uint64_field(header, 'chunk_pos')
2374 start_time = _read_time_field (header, 'start_time')
2375 end_time = _read_time_field (header, 'end_time')
2376 connection_count = _read_uint32_field(header, 'count')
2377
2378 chunk_info = _ChunkInfo(chunk_pos, start_time, end_time)
2379
2380 _read_uint32(f)
2381
2382 for i in range(connection_count):
2383 connection_id = _read_uint32(f)
2384 count = _read_uint32(f)
2385
2386 chunk_info.connection_counts[connection_id] = count
2387
2388 return chunk_info
2389 else:
2390 raise ROSBagFormatException('Unknown chunk info record version: %d' % chunk_info_version)
2391
2393 header = _read_header(self.bag._file, _OP_CHUNK)
2394
2395 compression = _read_str_field (header, 'compression')
2396 uncompressed_size = _read_uint32_field(header, 'size')
2397
2398 compressed_size = _read_uint32(self.bag._file)
2399
2400 data_pos = self.bag._file.tell()
2401
2402 return _ChunkHeader(compression, compressed_size, uncompressed_size, data_pos)
2403
2405 f = self.bag._file
2406
2407 header = _read_header(f, _OP_INDEX_DATA)
2408
2409 index_version = _read_uint32_field(header, 'ver')
2410 connection_id = _read_uint32_field(header, 'conn')
2411 count = _read_uint32_field(header, 'count')
2412
2413 if index_version != 1:
2414 raise ROSBagFormatException('expecting index version 1, got %d' % index_version)
2415
2416 record_size = _read_uint32(f)
2417
2418 index = []
2419
2420 for i in range(count):
2421 time = _read_time (f)
2422 offset = _read_uint32(f)
2423
2424 index.append(_IndexEntry200(time, self.bag._curr_chunk_info.pos, offset))
2425
2426 return (connection_id, index)
2427
2429 chunk_pos, offset = position
2430
2431 chunk_header = self.bag._chunk_headers.get(chunk_pos)
2432 if chunk_header is None:
2433 raise ROSBagException('no chunk at position %d' % chunk_pos)
2434
2435 if chunk_header.compression == Compression.NONE:
2436 f = self.bag._file
2437 f.seek(chunk_header.data_pos + offset)
2438 else:
2439 if self.decompressed_chunk_pos != chunk_pos:
2440
2441 self.bag._file.seek(chunk_header.data_pos)
2442 compressed_chunk = _read(self.bag._file, chunk_header.compressed_size)
2443
2444 if chunk_header.compression == Compression.BZ2:
2445 self.decompressed_chunk = bz2.decompress(compressed_chunk)
2446 elif chunk_header.compression == Compression.LZ4 and found_lz4:
2447 self.decompressed_chunk = roslz4.decompress(compressed_chunk)
2448 else:
2449 raise ROSBagException('unsupported compression type: %s' % chunk_header.compression)
2450
2451 self.decompressed_chunk_pos = chunk_pos
2452
2453 if self.decompressed_chunk_io:
2454 self.decompressed_chunk_io.close()
2455 self.decompressed_chunk_io = StringIO(self.decompressed_chunk)
2456
2457 f = self.decompressed_chunk_io
2458 f.seek(offset)
2459
2460
2461 while True:
2462 header = _read_header(f)
2463 op = _read_uint8_field(header, 'op')
2464 if op != _OP_CONNECTION:
2465 break
2466 _skip_sized(f)
2467
2468
2469 if op != _OP_MSG_DATA:
2470 raise ROSBagFormatException('Expecting OP_MSG_DATA, got %d' % op)
2471
2472 connection_id = _read_uint32_field(header, 'conn')
2473 t = _read_time_field (header, 'time')
2474
2475
2476 connection_info = self.bag._connections[connection_id]
2477 try:
2478 msg_type = _get_message_type(connection_info)
2479 except KeyError:
2480 raise ROSBagException('Cannot deserialize messages of type [%s]. Message was not preceded in bag by definition' % connection_info.datatype)
2481
2482
2483 data = _read_record_data(f)
2484
2485
2486 if raw:
2487 msg = connection_info.datatype, data, connection_info.md5sum, (chunk_pos, offset), msg_type
2488 else:
2489 msg = msg_type()
2490 msg.deserialize(data)
2491
2492 return BagMessage(connection_info.topic, msg, t)
2493
2495 secs_frac = secs - int(secs)
2496 secs_frac_str = ('%.2f' % secs_frac)[1:]
2497
2498 return time.strftime('%b %d %Y %H:%M:%S', time.localtime(secs)) + secs_frac_str
2499
2501 multiple = 1024.0
2502 for suffix in ['KB', 'MB', 'GB', 'TB', 'PB', 'EB', 'ZB', 'YB']:
2503 size /= multiple
2504 if size < multiple:
2505 return '%.1f %s' % (size, suffix)
2506
2507 return '-'
2508
2510 multiple = 1000.0
2511 for suffix in ['Hz', 'kHz', 'MHz', 'GHz', 'THz', 'PHz', 'EHz', 'ZHz', 'YHz']:
2512 if freq < multiple:
2513 return '%.1f %s' % (freq, suffix)
2514 freq /= multiple
2515
2516 return '-'
2517
2520 """
2521 Perform an N-way merge operation on sorted lists.
2522
2523 @param list_of_lists: (really iterable of iterable) of sorted elements
2524 (either by naturally or by C{key})
2525 @param key: specify sort key function (like C{sort()}, C{sorted()})
2526 @param iterfun: function that returns an iterator.
2527
2528 Yields tuples of the form C{(item, iterator)}, where the iterator is the
2529 built-in list iterator or something you pass in, if you pre-generate the
2530 iterators.
2531
2532 This is a stable merge; complexity O(N lg N)
2533
2534 Examples::
2535
2536 print list(x[0] for x in mergesort([[1,2,3,4],
2537 [2,3.5,3.7,4.5,6,7],
2538 [2.6,3.6,6.6,9]]))
2539 [1, 2, 2, 2.6, 3, 3.5, 3.6, 3.7, 4, 4.5, 6, 6.6, 7, 9]
2540
2541 # note stability
2542 print list(x[0] for x in mergesort([[1,2,3,4],
2543 [2,3.5,3.7,4.5,6,7],
2544 [2.6,3.6,6.6,9]], key=int))
2545 [1, 2, 2, 2.6, 3, 3.5, 3.6, 3.7, 4, 4.5, 6, 6.6, 7, 9]
2546
2547 print list(x[0] for x in mergesort([[4,3,2,1],
2548 [7,6.5,4,3.7,3.3,1.9],
2549 [9,8.6,7.6,6.6,5.5,4.4,3.3]],
2550 key=lambda x: -x))
2551 [9, 8.6, 7.6, 7, 6.6, 6.5, 5.5, 4.4, 4, 4, 3.7, 3.3, 3.3, 3, 2, 1.9, 1]
2552 """
2553
2554 heap = []
2555 for i, itr in enumerate(iter(pl) for pl in list_of_lists):
2556 try:
2557 item = next(itr)
2558 toadd = (key(item), i, item, itr) if key else (item, i, itr)
2559 heap.append(toadd)
2560 except StopIteration:
2561 pass
2562 heapq.heapify(heap)
2563
2564 if key:
2565 while heap:
2566 _, idx, item, itr = heap[0]
2567 yield item, itr
2568 try:
2569 item = next(itr)
2570 heapq.heapreplace(heap, (key(item), idx, item, itr) )
2571 except StopIteration:
2572 heapq.heappop(heap)
2573
2574 else:
2575 while heap:
2576 item, idx, itr = heap[0]
2577 yield item, itr
2578 try:
2579 heapq.heapreplace(heap, (next(itr), idx, itr))
2580 except StopIteration:
2581 heapq.heappop(heap)
2582
2584 """
2585 A file facade for sequential compressors (e.g., bz2.BZ2Compressor).
2586 """
2588 self.file = file
2589 self.compressor = compressor
2590 self.compressed_bytes_in = 0
2591
2593 compressed = self.compressor.compress(data)
2594 if len(compressed) > 0:
2595 self.file.write(compressed)
2596 self.compressed_bytes_in += len(data)
2597
2599 compressed = self.compressor.flush()
2600 if len(compressed) > 0:
2601 self.file.write(compressed)
2602
2616