1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33 """
34 The rosbag Python API.
35
36 Provides serialization of bag files.
37 """
38
39 from __future__ import print_function
40
41 import bisect
42 import bz2
43 import collections
44 import heapq
45 import os
46 import re
47 import struct
48 import sys
49 import threading
50 import time
51 import yaml
52
53 try:
54 from cStringIO import StringIO
55 except ImportError:
56 from io import BytesIO as StringIO
57
58 import genmsg
59 import genpy
60 import genpy.dynamic
61 import genpy.message
62
63 import roslib.names
64 import rospy
65 try:
66 import roslz4
67 found_lz4 = True
68 except ImportError:
69 rospy.logwarn(
70 'Failed to load Python extension for LZ4 support. '
71 'LZ4 compression will not be available.')
72 found_lz4 = False
75 """
76 Base class for exceptions in rosbag.
77 """
80
83
90
92 """
93 Exception for unindexed bags.
94 """
97
99 """
100 Allowable compression types
101 """
102 NONE = 'none'
103 BZ2 = 'bz2'
104 LZ4 = 'lz4'
105
106 BagMessage = collections.namedtuple('BagMessage', 'topic message timestamp')
107
108 -class Bag(object):
109 """
110 Bag serialize messages to and from a single file on disk using the bag format.
111 """
112 - def __init__(self, f, mode='r', compression=Compression.NONE, chunk_threshold=768 * 1024, allow_unindexed=False, options=None, skip_index=False):
113 """
114 Open a bag file. The mode can be 'r', 'w', or 'a' for reading (default),
115 writing or appending. The file will be created if it doesn't exist
116 when opened for writing or appending; it will be truncated when opened
117 for writing. Simultaneous reading and writing is allowed when in writing
118 or appending mode.
119 @param f: filename of bag to open or a stream to read from
120 @type f: str or file
121 @param mode: mode, either 'r', 'w', or 'a'
122 @type mode: str
123 @param compression: compression mode, see U{rosbag.Compression} for valid modes
124 @type compression: str
125 @param chunk_threshold: minimum number of uncompressed bytes per chunk
126 @type chunk_threshold: int
127 @param allow_unindexed: if True, allow opening unindexed bags
128 @type allow_unindexed: bool
129 @param options: the bag options (currently: compression and chunk_threshold)
130 @type options: dict
131 @param skip_index: if True, don't read the connection index records on open [2.0+]
132 @type skip_index: bool
133 @raise ValueError: if any argument is invalid
134 @raise ROSBagException: if an error occurs opening file
135 @raise ROSBagFormatException: if bag format is corrupted
136 """
137 if options is not None:
138 if type(options) is not dict:
139 raise ValueError('options must be of type dict')
140 if 'compression' in options:
141 compression = options['compression']
142 if 'chunk_threshold' in options:
143 chunk_threshold = options['chunk_threshold']
144
145 self._file = None
146 self._filename = None
147 self._version = None
148
149 allowed_compressions = [Compression.NONE, Compression.BZ2]
150 if found_lz4:
151 allowed_compressions.append(Compression.LZ4)
152 if compression not in allowed_compressions:
153 raise ValueError('compression must be one of: %s' % ', '.join(allowed_compressions))
154 self._compression = compression
155
156 if chunk_threshold < 0:
157 raise ValueError('chunk_threshold must be greater than or equal to zero')
158 self._chunk_threshold = chunk_threshold
159
160 self._skip_index = skip_index
161
162 self._reader = None
163
164 self._file_header_pos = None
165 self._index_data_pos = 0
166
167 self._clear_index()
168
169 self._buffer = StringIO()
170
171 self._curr_compression = Compression.NONE
172
173 self._open(f, mode, allow_unindexed)
174
175 self._output_file = self._file
176
179
182
183 - def __exit__(self, exc_type, exc_value, traceback):
185
186 @property
188 """Get the options."""
189 return { 'compression' : self._compression, 'chunk_threshold' : self._chunk_threshold }
190
191 @property
193 """Get the filename."""
194 return self._filename
195
196 @property
198 """Get the version."""
199 return self._version
200
201 @property
203 """Get the mode."""
204 return self._mode
205
206 @property
208 """Get the size in bytes."""
209 if not self._file:
210 raise ValueError('I/O operation on closed bag')
211
212 pos = self._file.tell()
213 self._file.seek(0, os.SEEK_END)
214 size = self._file.tell()
215 self._file.seek(pos)
216 return size
217
218
219
221 """Get the compression method to use for writing."""
222 return self._compression
223
234
235 compression = property(_get_compression, _set_compression)
236
237
238
240 """Get the chunk threshold to use for writing."""
241 return self._chunk_threshold
242
244 """Set the chunk threshold to use for writing."""
245 if chunk_threshold < 0:
246 raise ValueError('chunk_threshold must be greater than or equal to zero')
247
248 self.flush()
249 self._chunk_threshold = chunk_threshold
250
251 chunk_threshold = property(_get_chunk_threshold, _set_chunk_threshold)
252
253 - def read_messages(self, topics=None, start_time=None, end_time=None, connection_filter=None, raw=False):
254 """
255 Read messages from the bag, optionally filtered by topic, timestamp and connection details.
256 @param topics: list of topics or a single topic. if an empty list is given all topics will be read [optional]
257 @type topics: list(str) or str
258 @param start_time: earliest timestamp of message to return [optional]
259 @type start_time: U{genpy.Time}
260 @param end_time: latest timestamp of message to return [optional]
261 @type end_time: U{genpy.Time}
262 @param connection_filter: function to filter connections to include [optional]
263 @type connection_filter: function taking (topic, datatype, md5sum, msg_def, header) and returning bool
264 @param raw: if True, then generate tuples of (datatype, (data, md5sum, position), pytype)
265 @type raw: bool
266 @return: generator of BagMessage(topic, message, timestamp) namedtuples for each message in the bag file
267 @rtype: generator of tuples of (str, U{genpy.Message}, U{genpy.Time}) [not raw] or (str, (str, str, str, tuple, class), U{genpy.Time}) [raw]
268 """
269 self.flush()
270
271 if topics and type(topics) is str:
272 topics = [topics]
273
274 return self._reader.read_messages(topics, start_time, end_time, connection_filter, raw)
275
277 """
278 Write the open chunk to disk so subsequent reads will read all messages.
279 @raise ValueError: if bag is closed
280 """
281 if not self._file:
282 raise ValueError('I/O operation on closed bag')
283
284 if self._chunk_open:
285 self._stop_writing_chunk()
286
287 - def write(self, topic, msg, t=None, raw=False):
288 """
289 Write a message to the bag.
290 @param topic: name of topic
291 @type topic: str
292 @param msg: message to add to bag, or tuple (if raw)
293 @type msg: Message or tuple of raw message data
294 @param t: ROS time of message publication, if None specifed, use current time [optional]
295 @type t: U{genpy.Time}
296 @param raw: if True, msg is in raw format, i.e. (msg_type, serialized_bytes, md5sum, pytype)
297 @type raw: bool
298 @raise ValueError: if arguments are invalid or bag is closed
299 """
300 if not self._file:
301 raise ValueError('I/O operation on closed bag')
302
303 if not topic:
304 raise ValueError('topic is invalid')
305 if not msg:
306 raise ValueError('msg is invalid')
307
308 if t is None:
309 t = genpy.Time.from_sec(time.time())
310
311
312 self._file.seek(0, os.SEEK_END)
313
314
315 if not self._chunk_open:
316 self._start_writing_chunk(t)
317
318
319 if raw:
320 if len(msg) == 5:
321 msg_type, serialized_bytes, md5sum, pos, pytype = msg
322 elif len(msg) == 4:
323 msg_type, serialized_bytes, md5sum, pytype = msg
324 else:
325 raise ValueError('msg must be of length 4 or 5')
326
327
328 if topic in self._topic_connections:
329 connection_info = self._topic_connections[topic]
330 conn_id = connection_info.id
331 else:
332 conn_id = len(self._connections)
333
334 if raw:
335 if pytype is None:
336 try:
337 pytype = genpy.message.get_message_class(msg_type)
338 except Exception:
339 pytype = None
340 if pytype is None:
341 raise ROSBagException('cannot locate message class and no message class provided for [%s]' % msg_type)
342
343 if pytype._md5sum != md5sum:
344 print('WARNING: md5sum of loaded type [%s] does not match that specified' % msg_type, file=sys.stderr)
345
346
347 header = { 'topic' : topic, 'type' : msg_type, 'md5sum' : md5sum, 'message_definition' : pytype._full_text }
348 else:
349 header = { 'topic' : topic, 'type' : msg.__class__._type, 'md5sum' : msg.__class__._md5sum, 'message_definition' : msg._full_text }
350
351 connection_info = _ConnectionInfo(conn_id, topic, header)
352
353 self._write_connection_record(connection_info)
354
355 self._connections[conn_id] = connection_info
356 self._topic_connections[topic] = connection_info
357
358
359 index_entry = _IndexEntry200(t, self._curr_chunk_info.pos, self._get_chunk_offset())
360
361
362 if conn_id not in self._curr_chunk_connection_indexes:
363
364 self._curr_chunk_connection_indexes[conn_id] = [index_entry]
365 self._curr_chunk_info.connection_counts[conn_id] = 1
366 else:
367 curr_chunk_connection_index = self._curr_chunk_connection_indexes[conn_id]
368 if index_entry >= curr_chunk_connection_index[-1]:
369
370 curr_chunk_connection_index.append(index_entry)
371 else:
372 bisect.insort_right(curr_chunk_connection_index, index_entry)
373
374 self._curr_chunk_info.connection_counts[conn_id] += 1
375
376 if conn_id not in self._connection_indexes:
377 self._connection_indexes[conn_id] = [index_entry]
378 else:
379 bisect.insort_right(self._connection_indexes[conn_id], index_entry)
380
381
382 if t > self._curr_chunk_info.end_time:
383 self._curr_chunk_info.end_time = t
384 elif t < self._curr_chunk_info.start_time:
385 self._curr_chunk_info.start_time = t
386
387 if not raw:
388
389 self._buffer.seek(0)
390 self._buffer.truncate(0)
391 msg.serialize(self._buffer)
392 serialized_bytes = self._buffer.getvalue()
393
394
395 self._write_message_data_record(conn_id, t, serialized_bytes)
396
397
398 chunk_size = self._get_chunk_offset()
399 if chunk_size > self._chunk_threshold:
400 self._stop_writing_chunk()
401
403 """
404 Reindexes the bag file. Yields position of each chunk for progress.
405 """
406 self._clear_index()
407 return self._reader.reindex()
408
410 """
411 Close the bag file. Closing an already closed bag does nothing.
412 """
413 if self._file:
414 if self._mode in 'wa':
415 self._stop_writing()
416
417 self._close_file()
418
420 """
421 Returns information about the compression of the bag
422 @return: generator of CompressionTuple(compression, uncompressed, compressed) describing the
423 type of compression used, size of the uncompressed data in Bytes, size of the compressed data in Bytes. If
424 no compression is used then uncompressed and compressed data will be equal.
425 @rtype: generator of CompressionTuple of (str, int, int)
426 """
427
428 compression = self.compression
429 uncompressed = 0
430 compressed = 0
431
432 if self._chunk_headers:
433 compression_counts = {}
434 compression_uncompressed = {}
435 compression_compressed = {}
436
437
438
439 for chunk_header in self._chunk_headers.values():
440 if chunk_header.compression not in compression_counts:
441 compression_counts[chunk_header.compression] = 0
442 if chunk_header.compression not in compression_uncompressed:
443 compression_uncompressed[chunk_header.compression] = 0
444 if chunk_header.compression not in compression_compressed:
445 compression_compressed[chunk_header.compression] = 0
446
447 compression_counts[chunk_header.compression] += 1
448 compression_uncompressed[chunk_header.compression] += chunk_header.uncompressed_size
449 uncompressed += chunk_header.uncompressed_size
450 compression_compressed[chunk_header.compression] += chunk_header.compressed_size
451 compressed += chunk_header.compressed_size
452
453 chunk_count = len(self._chunk_headers)
454
455 main_compression_count, main_compression = sorted([(v, k) for k, v in compression_counts.items()], reverse=True)[0]
456 compression = str(main_compression)
457
458 return collections.namedtuple("CompressionTuple", ["compression",
459 "uncompressed", "compressed"])(compression=compression,
460 uncompressed=uncompressed, compressed=compressed)
461
463 """
464 Returns the number of messages in the bag. Can be filtered by Topic
465 @param topic_filters: One or more topics to filter by
466 @type topic_filters: Could be either a single str or a list of str.
467 @return: The number of messages in the bag, optionally filtered by topic
468 @rtype: int
469 """
470
471 num_messages = 0
472
473 if topic_filters is not None:
474 info = self.get_type_and_topic_info(topic_filters=topic_filters)
475 for topic in info.topics.values():
476 num_messages += topic.message_count
477 else:
478 if self._chunks:
479 for c in self._chunks:
480 for counts in c.connection_counts.values():
481 num_messages += counts
482 else:
483 num_messages = sum([len(index) for index in self._connection_indexes.values()])
484
485 return num_messages
486
488 """
489 Returns the start time of the bag.
490 @return: a timestamp of the start of the bag
491 @rtype: float, timestamp in seconds, includes fractions of a second
492 """
493
494 if self._chunks:
495 start_stamp = self._chunks[0].start_time.to_sec()
496 else:
497 if not self._connection_indexes:
498 raise ROSBagException('Bag contains no message')
499 start_stamps = [index[0].time.to_sec() for index in self._connection_indexes.values() if index]
500 start_stamp = min(start_stamps) if start_stamps else 0
501
502 return start_stamp
503
505 """
506 Returns the end time of the bag.
507 @return: a timestamp of the end of the bag
508 @rtype: float, timestamp in seconds, includes fractions of a second
509 """
510
511 if self._chunks:
512 end_stamp = self._chunks[-1].end_time.to_sec()
513 else:
514 if not self._connection_indexes:
515 raise ROSBagException('Bag contains no message')
516 end_stamps = [index[-1].time.to_sec() for index in self._connection_indexes.values() if index]
517 end_stamp = max(end_stamps) if end_stamps else 0
518
519 return end_stamp
520
522 """
523 Coallates info about the type and topics in the bag.
524
525 Note, it would be nice to filter by type as well, but there appear to be some limitations in the current architecture
526 that prevent that from working when more than one message type is written on the same topic.
527
528 @param topic_filters: specify one or more topic to filter by.
529 @type topic_filters: either a single str or a list of str.
530 @return: generator of TypesAndTopicsTuple(types{key:type name, val: md5hash},
531 topics{type: msg type (Ex. "std_msgs/String"),
532 message_count: the number of messages of the particular type,
533 connections: the number of connections,
534 frequency: the data frequency,
535 key: type name,
536 val: md5hash}) describing the types of messages present
537 and information about the topics
538 @rtype: TypesAndTopicsTuple(dict(str, str),
539 TopicTuple(str, int, int, float, str, str))
540 """
541
542 datatypes = set()
543 datatype_infos = []
544
545 for connection in self._connections.values():
546 if connection.datatype in datatypes:
547 continue
548
549 datatype_infos.append((connection.datatype, connection.md5sum, connection.msg_def))
550 datatypes.add(connection.datatype)
551
552 topics = []
553
554 if topic_filters is not None:
555 if not isinstance(topic_filters, list):
556 topic_filters = [topic_filters]
557
558 topics = topic_filters
559 else:
560 topics = [c.topic for c in self._get_connections()]
561
562 topics = sorted(set(topics))
563
564 topic_datatypes = {}
565 topic_conn_counts = {}
566 topic_msg_counts = {}
567 topic_freqs_median = {}
568
569 for topic in topics:
570 connections = list(self._get_connections(topic))
571
572 if not connections:
573 continue
574
575 topic_datatypes[topic] = connections[0].datatype
576 topic_conn_counts[topic] = len(connections)
577
578 msg_count = 0
579 for connection in connections:
580 for chunk in self._chunks:
581 msg_count += chunk.connection_counts.get(connection.id, 0)
582
583 topic_msg_counts[topic] = msg_count
584
585 if self._connection_indexes_read:
586 stamps = [entry.time.to_sec() for entry in self._get_entries(connections)]
587 if len(stamps) > 1:
588 periods = [s1 - s0 for s1, s0 in zip(stamps[1:], stamps[:-1])]
589 med_period = _median(periods)
590 if med_period > 0.0:
591 topic_freqs_median[topic] = 1.0 / med_period
592
593
594 types = {}
595 for datatype, md5sum, msg_def in sorted(datatype_infos):
596 types[datatype] = md5sum
597
598
599 topics_t = {}
600 TopicTuple = collections.namedtuple("TopicTuple", ["msg_type", "message_count", "connections", "frequency"])
601 for topic in sorted(topic_datatypes.keys()):
602 topic_msg_count = topic_msg_counts[topic]
603 frequency = topic_freqs_median[topic] if topic in topic_freqs_median else None
604 topics_t[topic] = TopicTuple(msg_type=topic_datatypes[topic],
605 message_count=topic_msg_count,
606 connections=topic_conn_counts[topic],
607 frequency=frequency)
608
609 return collections.namedtuple("TypesAndTopicsTuple", ["msg_types", "topics"])(msg_types=types, topics=topics_t)
610
612 rows = []
613
614 try:
615 if self._filename:
616 rows.append(('path', self._filename))
617
618 if self._version == 102 and type(self._reader) == _BagReader102_Unindexed:
619 rows.append(('version', '1.2 (unindexed)'))
620 else:
621 rows.append(('version', '%d.%d' % (int(self._version / 100), self._version % 100)))
622
623 if not self._connection_indexes and not self._chunks:
624 rows.append(('size', _human_readable_size(self.size)))
625 else:
626 if self._chunks:
627 start_stamp = self._chunks[ 0].start_time.to_sec()
628 end_stamp = self._chunks[-1].end_time.to_sec()
629 else:
630 start_stamps = [index[0].time.to_sec() for index in self._connection_indexes.values() if index]
631 start_stamp = min(start_stamps) if start_stamps else 0
632 end_stamps = [index[-1].time.to_sec() for index in self._connection_indexes.values() if index]
633 end_stamp = max(end_stamps) if end_stamps else 0
634
635
636 duration = end_stamp - start_stamp
637 dur_secs = duration % 60
638 dur_mins = int(duration / 60)
639 dur_hrs = int(dur_mins / 60)
640 if dur_hrs > 0:
641 dur_mins = dur_mins % 60
642 duration_str = '%dhr %d:%02ds (%ds)' % (dur_hrs, dur_mins, dur_secs, duration)
643 elif dur_mins > 0:
644 duration_str = '%d:%02ds (%ds)' % (dur_mins, dur_secs, duration)
645 else:
646 duration_str = '%.1fs' % duration
647
648 rows.append(('duration', duration_str))
649
650
651 rows.append(('start', '%s (%.2f)' % (_time_to_str(start_stamp), start_stamp)))
652 rows.append(('end', '%s (%.2f)' % (_time_to_str(end_stamp), end_stamp)))
653
654 rows.append(('size', _human_readable_size(self.size)))
655
656 if self._chunks:
657 num_messages = 0
658 for c in self._chunks:
659 for counts in c.connection_counts.values():
660 num_messages += counts
661 else:
662 num_messages = sum([len(index) for index in self._connection_indexes.values()])
663 rows.append(('messages', str(num_messages)))
664
665
666 if len(self._chunk_headers) == 0:
667 rows.append(('compression', 'none'))
668 else:
669 compression_counts = {}
670 compression_uncompressed = {}
671 compression_compressed = {}
672 for chunk_header in self._chunk_headers.values():
673 if chunk_header.compression not in compression_counts:
674 compression_counts[chunk_header.compression] = 1
675 compression_uncompressed[chunk_header.compression] = chunk_header.uncompressed_size
676 compression_compressed[chunk_header.compression] = chunk_header.compressed_size
677 else:
678 compression_counts[chunk_header.compression] += 1
679 compression_uncompressed[chunk_header.compression] += chunk_header.uncompressed_size
680 compression_compressed[chunk_header.compression] += chunk_header.compressed_size
681
682 chunk_count = len(self._chunk_headers)
683
684 compressions = []
685 for count, compression in reversed(sorted([(v, k) for k, v in compression_counts.items()])):
686 if compression != Compression.NONE:
687 fraction = (100.0 * compression_compressed[compression]) / compression_uncompressed[compression]
688 compressions.append('%s [%d/%d chunks; %.2f%%]' % (compression, count, chunk_count, fraction))
689 else:
690 compressions.append('%s [%d/%d chunks]' % (compression, count, chunk_count))
691 rows.append(('compression', ', '.join(compressions)))
692
693 all_uncompressed = (sum([count for c, count in compression_counts.items() if c != Compression.NONE]) == 0)
694 if not all_uncompressed:
695 total_uncompressed_size = sum((h.uncompressed_size for h in self._chunk_headers.values()))
696 total_compressed_size = sum((h.compressed_size for h in self._chunk_headers.values()))
697
698 total_uncompressed_size_str = _human_readable_size(total_uncompressed_size)
699 total_compressed_size_str = _human_readable_size(total_compressed_size)
700 total_size_str_length = max([len(total_uncompressed_size_str), len(total_compressed_size_str)])
701
702 if duration > 0:
703 uncompressed_rate_str = _human_readable_size(total_uncompressed_size / duration)
704 compressed_rate_str = _human_readable_size(total_compressed_size / duration)
705 rate_str_length = max([len(uncompressed_rate_str), len(compressed_rate_str)])
706
707 rows.append(('uncompressed', '%*s @ %*s/s' %
708 (total_size_str_length, total_uncompressed_size_str, rate_str_length, uncompressed_rate_str)))
709 rows.append(('compressed', '%*s @ %*s/s (%.2f%%)' %
710 (total_size_str_length, total_compressed_size_str, rate_str_length, compressed_rate_str, (100.0 * total_compressed_size) / total_uncompressed_size)))
711 else:
712 rows.append(('uncompressed', '%*s' % (total_size_str_length, total_uncompressed_size_str)))
713 rows.append(('compressed', '%*s' % (total_size_str_length, total_compressed_size_str)))
714
715 datatypes = set()
716 datatype_infos = []
717 for connection in self._connections.values():
718 if connection.datatype in datatypes:
719 continue
720 datatype_infos.append((connection.datatype, connection.md5sum, connection.msg_def))
721 datatypes.add(connection.datatype)
722
723 topics = sorted(set([c.topic for c in self._get_connections()]))
724 topic_datatypes = {}
725 topic_conn_counts = {}
726 topic_msg_counts = {}
727 topic_freqs_median = {}
728 for topic in topics:
729 connections = list(self._get_connections(topic))
730
731 topic_datatypes[topic] = connections[0].datatype
732 topic_conn_counts[topic] = len(connections)
733
734 msg_count = 0
735 for connection in connections:
736 for chunk in self._chunks:
737 msg_count += chunk.connection_counts.get(connection.id, 0)
738 topic_msg_counts[topic] = msg_count
739
740 if self._connection_indexes_read:
741 stamps = [entry.time.to_sec() for entry in self._get_entries(connections)]
742 if len(stamps) > 1:
743 periods = [s1 - s0 for s1, s0 in zip(stamps[1:], stamps[:-1])]
744 med_period = _median(periods)
745 if med_period > 0.0:
746 topic_freqs_median[topic] = 1.0 / med_period
747
748 topics = sorted(topic_datatypes.keys())
749 max_topic_len = max([len(topic) for topic in topics])
750 max_datatype_len = max([len(datatype) for datatype in datatypes])
751 max_msg_count_len = max([len('%d' % msg_count) for msg_count in topic_msg_counts.values()])
752 max_freq_median_len = max([len(_human_readable_frequency(freq)) for freq in topic_freqs_median.values()]) if len(topic_freqs_median) > 0 else 0
753
754
755 for i, (datatype, md5sum, msg_def) in enumerate(sorted(datatype_infos)):
756 s = '%-*s [%s]' % (max_datatype_len, datatype, md5sum)
757 if i == 0:
758 rows.append(('types', s))
759 else:
760 rows.append(('', s))
761
762
763 for i, topic in enumerate(topics):
764 topic_msg_count = topic_msg_counts[topic]
765
766 s = '%-*s %*d %s' % (max_topic_len, topic, max_msg_count_len, topic_msg_count, 'msgs' if topic_msg_count > 1 else 'msg ')
767 if topic in topic_freqs_median:
768 s += ' @ %*s' % (max_freq_median_len, _human_readable_frequency(topic_freqs_median[topic]))
769 else:
770 s += ' %*s' % (max_freq_median_len, '')
771
772 s += ' : %-*s' % (max_datatype_len, topic_datatypes[topic])
773 if topic_conn_counts[topic] > 1:
774 s += ' (%d connections)' % topic_conn_counts[topic]
775
776 if i == 0:
777 rows.append(('topics', s))
778 else:
779 rows.append(('', s))
780
781 except Exception as ex:
782 raise
783
784 first_column_width = max([len(field) for field, _ in rows]) + 1
785
786 s = ''
787 for (field, value) in rows:
788 if field:
789 s += '%-*s %s\n' % (first_column_width, field + ':', value)
790 else:
791 s += '%-*s %s\n' % (first_column_width, '', value)
792
793 return s.rstrip()
794
796 s = ''
797
798 try:
799 if self._filename:
800 s += 'path: %s\n' % self._filename
801
802 if self._version == 102 and type(self._reader) == _BagReader102_Unindexed:
803 s += 'version: 1.2 (unindexed)\n'
804 else:
805 s += 'version: %d.%d\n' % (int(self._version / 100), self._version % 100)
806
807 if not self._connection_indexes and not self._chunks:
808 s += 'size: %d\n' % self.size
809 s += 'indexed: False\n'
810 else:
811 if self._chunks:
812 start_stamp = self._chunks[ 0].start_time.to_sec()
813 end_stamp = self._chunks[-1].end_time.to_sec()
814 else:
815 start_stamps = [index[0].time.to_sec() for index in self._connection_indexes.values() if index]
816 start_stamp = min(start_stamps) if start_stamps else 0
817 end_stamps = [index[-1].time.to_sec() for index in self._connection_indexes.values() if index]
818 end_stamp = max(end_stamps) if end_stamps else 0
819
820 duration = end_stamp - start_stamp
821 s += 'duration: %.6f\n' % duration
822 s += 'start: %.6f\n' % start_stamp
823 s += 'end: %.6f\n' % end_stamp
824 s += 'size: %d\n' % self.size
825 if self._chunks:
826 num_messages = 0
827 for c in self._chunks:
828 for counts in c.connection_counts.values():
829 num_messages += counts
830 else:
831 num_messages = sum([len(index) for index in self._connection_indexes.values()])
832 s += 'messages: %d\n' % num_messages
833 s += 'indexed: True\n'
834
835
836 if len(self._chunk_headers) == 0:
837 s += 'compression: none\n'
838 else:
839 compression_counts = {}
840 compression_uncompressed = {}
841 compression_compressed = {}
842 for chunk_header in self._chunk_headers.values():
843 if chunk_header.compression not in compression_counts:
844 compression_counts[chunk_header.compression] = 1
845 compression_uncompressed[chunk_header.compression] = chunk_header.uncompressed_size
846 compression_compressed[chunk_header.compression] = chunk_header.compressed_size
847 else:
848 compression_counts[chunk_header.compression] += 1
849 compression_uncompressed[chunk_header.compression] += chunk_header.uncompressed_size
850 compression_compressed[chunk_header.compression] += chunk_header.compressed_size
851
852 chunk_count = len(self._chunk_headers)
853
854 main_compression_count, main_compression = list(reversed(sorted([(v, k) for k, v in compression_counts.items()])))[0]
855 s += 'compression: %s\n' % str(main_compression)
856
857 all_uncompressed = (sum([count for c, count in compression_counts.items() if c != Compression.NONE]) == 0)
858 if not all_uncompressed:
859 s += 'uncompressed: %d\n' % sum((h.uncompressed_size for h in self._chunk_headers.values()))
860 s += 'compressed: %d\n' % sum((h.compressed_size for h in self._chunk_headers.values()))
861
862 datatypes = set()
863 datatype_infos = []
864 for connection in self._connections.values():
865 if connection.datatype in datatypes:
866 continue
867 datatype_infos.append((connection.datatype, connection.md5sum, connection.msg_def))
868 datatypes.add(connection.datatype)
869
870 topics = sorted(set([c.topic for c in self._get_connections()]))
871 topic_datatypes = {}
872 topic_conn_counts = {}
873 topic_msg_counts = {}
874 topic_freqs_median = {}
875 for topic in topics:
876 connections = list(self._get_connections(topic))
877
878 topic_datatypes[topic] = connections[0].datatype
879 topic_conn_counts[topic] = len(connections)
880
881 msg_count = 0
882 for connection in connections:
883 for chunk in self._chunks:
884 msg_count += chunk.connection_counts.get(connection.id, 0)
885 topic_msg_counts[topic] = msg_count
886
887 if self._connection_indexes_read:
888 stamps = [entry.time.to_sec() for entry in self._get_entries(connections)]
889 if len(stamps) > 1:
890 periods = [s1 - s0 for s1, s0 in zip(stamps[1:], stamps[:-1])]
891 med_period = _median(periods)
892 if med_period > 0.0:
893 topic_freqs_median[topic] = 1.0 / med_period
894
895 topics = sorted(topic_datatypes.keys())
896 max_topic_len = max([len(topic) for topic in topics])
897 max_datatype_len = max([len(datatype) for datatype in datatypes])
898 max_msg_count_len = max([len('%d' % msg_count) for msg_count in topic_msg_counts.values()])
899 max_freq_median_len = max([len(_human_readable_frequency(freq)) for freq in topic_freqs_median.values()]) if len(topic_freqs_median) > 0 else 0
900
901
902 s += 'types:\n'
903 for i, (datatype, md5sum, msg_def) in enumerate(sorted(datatype_infos)):
904 s += ' - type: %s\n' % datatype
905 s += ' md5: %s\n' % md5sum
906
907
908 s += 'topics:\n'
909 for i, topic in enumerate(topics):
910 topic_msg_count = topic_msg_counts[topic]
911
912 s += ' - topic: %s\n' % topic
913 s += ' type: %s\n' % topic_datatypes[topic]
914 s += ' messages: %d\n' % topic_msg_count
915
916 if topic_conn_counts[topic] > 1:
917 s += ' connections: %d\n' % topic_conn_counts[topic]
918
919 if topic in topic_freqs_median:
920 s += ' frequency: %.4f\n' % topic_freqs_median[topic]
921
922 if not key:
923 return s
924
925 class DictObject(object):
926 def __init__(self, d):
927 for a, b in d.items():
928 if isinstance(b, (list, tuple)):
929 setattr(self, a, [DictObject(x) if isinstance(x, dict) else x for x in b])
930 else:
931 setattr(self, a, DictObject(b) if isinstance(b, dict) else b)
932
933 obj = DictObject(yaml.load(s))
934 try:
935 val = eval('obj.' + key)
936 except Exception as ex:
937 print('Error getting key "%s"' % key, file=sys.stderr)
938 return None
939
940 def print_yaml(val, indent=0):
941 indent_str = ' ' * indent
942
943 if type(val) is list:
944 s = ''
945 for item in val:
946 s += '%s- %s\n' % (indent_str, print_yaml(item, indent + 1))
947 return s
948 elif type(val) is DictObject:
949 s = ''
950 for i, (k, v) in enumerate(val.__dict__.items()):
951 if i != 0:
952 s += indent_str
953 s += '%s: %s' % (k, str(v))
954 if i < len(val.__dict__) - 1:
955 s += '\n'
956 return s
957 else:
958 return indent_str + str(val)
959
960 return print_yaml(val)
961
962 except Exception as ex:
963 raise
964
965
966
967 @property
969 if not self._chunk_headers:
970 return False
971
972 return any((h.compression != Compression.NONE for h in self._chunk_headers.values()))
973
974 @property
976 if not self._chunk_headers:
977 return self.size
978
979 return sum((h.uncompressed_size for h in self._chunk_headers.values()))
980
982 """
983 Read the message from the given position in the file.
984 """
985 self.flush()
986 return self._reader.seek_and_read_message_data_record(position, raw)
987
988
989
991 """
992 Yield the connections, optionally filtering by topic and/or connection information.
993 """
994 if topics:
995 if type(topics) is str:
996 topics = set([roslib.names.canonicalize_name(topics)])
997 else:
998 topics = set([roslib.names.canonicalize_name(t) for t in topics])
999
1000 for c in self._connections.values():
1001 if topics and c.topic not in topics and roslib.names.canonicalize_name(c.topic) not in topics:
1002 continue
1003 if connection_filter and not connection_filter(c.topic, c.datatype, c.md5sum, c.msg_def, c.header):
1004 continue
1005 yield c
1006
1007 - def _get_entries(self, connections=None, start_time=None, end_time=None):
1008 """
1009 Yield index entries on the given connections in the given time range.
1010 """
1011 for entry, _ in _mergesort(self._get_indexes(connections), key=lambda entry: entry.time):
1012 if start_time and entry.time < start_time:
1013 continue
1014 if end_time and entry.time > end_time:
1015 return
1016 yield entry
1017
1019 """
1020 Yield index entries on the given connections in the given time range in reverse order.
1021 """
1022 for entry, _ in _mergesort((reversed(index) for index in self._get_indexes(connections)), key=lambda entry: -entry.time.to_sec()):
1023 if end_time and entry.time > end_time:
1024 continue
1025 if start_time and entry.time < start_time:
1026 return
1027 yield entry
1028
1029 - def _get_entry(self, t, connections=None):
1030 """
1031 Return the first index entry on/before the given time on the given connections
1032 """
1033 indexes = self._get_indexes(connections)
1034
1035 entry = _IndexEntry(t)
1036
1037 first_entry = None
1038
1039 for index in indexes:
1040 i = bisect.bisect_right(index, entry) - 1
1041 if i >= 0:
1042 index_entry = index[i]
1043 if first_entry is None or index_entry > first_entry:
1044 first_entry = index_entry
1045
1046 return first_entry
1047
1048 - def _get_entry_after(self, t, connections=None):
1049 """
1050 Return the first index entry after the given time on the given connections
1051 """
1052 indexes = self._get_indexes(connections)
1053
1054 entry = _IndexEntry(t)
1055
1056 first_entry = None
1057
1058 for index in indexes:
1059 i = bisect.bisect_right(index, entry)
1060 if i <= len(index) - 1:
1061 index_entry = index[i]
1062 if first_entry is None or index_entry < first_entry:
1063 first_entry = index_entry
1064
1065 return first_entry
1066
1068 """
1069 Get the indexes for the given connections.
1070 """
1071 if not self._connection_indexes_read:
1072 self._reader._read_connection_index_records()
1073
1074 if connections is None:
1075 return self._connection_indexes.values()
1076 else:
1077 return (self._connection_indexes[c.id] for c in connections)
1078
1079
1080
1082 self._connection_indexes_read = False
1083 self._connection_indexes = {}
1084
1085 self._topic_connections = {}
1086 self._connections = {}
1087 self._connection_count = 0
1088 self._chunk_count = 0
1089 self._chunks = []
1090 self._chunk_headers = {}
1091
1092 self._chunk_open = False
1093 self._curr_chunk_info = None
1094 self._curr_chunk_data_pos = None
1095 self._curr_chunk_connection_indexes = {}
1096
1097 - def _open(self, f, mode, allow_unindexed):
1098 if not f:
1099 raise ValueError('filename (or stream) is invalid')
1100
1101 try:
1102 if mode == 'r': self._open_read(f, allow_unindexed)
1103 elif mode == 'w': self._open_write(f)
1104 elif mode == 'a': self._open_append(f, allow_unindexed)
1105 else:
1106 raise ValueError('mode "%s" is invalid' % mode)
1107 if 'b' not in self._file.mode and not isinstance('', bytes):
1108 raise ROSBagException('In Python 3 the bag file must be opened in binary mode')
1109 except struct.error:
1110 raise ROSBagFormatException('error with bag')
1111
1113 try:
1114 return isinstance(f, file)
1115 except NameError:
1116 import io
1117 return isinstance(f, io.IOBase)
1118
1120 if self._is_file(f):
1121 self._file = f
1122 self._filename = None
1123 else:
1124 self._file = open(f, 'rb')
1125 self._filename = f
1126
1127 self._mode = 'r'
1128
1129 try:
1130 self._version = self._read_version()
1131 except:
1132 self._version = None
1133 self._close_file()
1134 raise
1135
1136 try:
1137 self._create_reader()
1138 self._reader.start_reading()
1139 except ROSBagUnindexedException as ex:
1140 if not allow_unindexed:
1141 self._close_file()
1142 raise
1143 except:
1144 self._close_file()
1145 raise
1146
1148 if self._is_file(f):
1149 self._file = f
1150 self._filename = None
1151 else:
1152 self._file = open(f, 'w+b')
1153 self._filename = f
1154
1155 self._mode = 'w'
1156
1157 self._version = 200
1158
1159 try:
1160 self._create_reader()
1161 self._start_writing()
1162 except:
1163 self._close_file()
1164 raise
1165
1167 if self._is_file(f):
1168 self._file = f
1169 self._filename = None
1170 else:
1171 try:
1172
1173 open(f, 'r').close()
1174
1175
1176 self._file = open(f, 'r+b')
1177 except IOError:
1178
1179 self._file = open(f, 'w+b')
1180
1181 self._filename = f
1182
1183 self._mode = 'a'
1184
1185 try:
1186 self._version = self._read_version()
1187 except:
1188 self._version = None
1189 self._close_file()
1190 raise
1191
1192 if self._version != 200:
1193 self._file.close()
1194 raise ROSBagException('bag version %d is unsupported for appending' % self._version)
1195
1196 try:
1197 self._start_appending()
1198 except ROSBagUnindexedException:
1199 if not allow_unindexed:
1200 self._close_file()
1201 raise
1202 except:
1203 self._close_file()
1204 raise
1205
1207 self._file.close()
1208 self._file = None
1209
1211 """
1212 @raise ROSBagException: if the bag version is unsupported
1213 """
1214 major_version, minor_version = int(self._version / 100), self._version % 100
1215 if major_version == 2:
1216 self._reader = _BagReader200(self)
1217 elif major_version == 1:
1218 if minor_version == 1:
1219 raise ROSBagException('unsupported bag version %d. Please convert bag to newer format.' % self._version)
1220 else:
1221
1222 first_record_pos = self._file.tell()
1223 header = _read_header(self._file)
1224 op = _read_uint8_field(header, 'op')
1225 self._file.seek(first_record_pos)
1226
1227 if op == _OP_FILE_HEADER:
1228 self._reader = _BagReader102_Indexed(self)
1229 else:
1230 self._reader = _BagReader102_Unindexed(self)
1231 else:
1232 raise ROSBagException('unknown bag version %d' % self._version)
1233
1235 """
1236 @raise ROSBagException: if the file is empty, or the version line can't be parsed
1237 """
1238 version_line = self._file.readline().rstrip().decode()
1239 if len(version_line) == 0:
1240 raise ROSBagException('empty file')
1241
1242 matches = re.match("#ROS(.*) V(\d).(\d)", version_line)
1243 if matches is None or len(matches.groups()) != 3:
1244 raise ROSBagException('This does not appear to be a bag file')
1245
1246 version_type, major_version_str, minor_version_str = matches.groups()
1247
1248 version = int(major_version_str) * 100 + int(minor_version_str)
1249
1250 return version
1251
1253 version = _VERSION + '\n'
1254 version = version.encode()
1255 self._file.write(version)
1256 self._file_header_pos = self._file.tell()
1257 self._write_file_header_record(0, 0, 0)
1258
1260
1261 self.flush()
1262
1263
1264 self._index_data_pos = self._file.tell()
1265
1266
1267 for connection_info in self._connections.values():
1268 self._write_connection_record(connection_info)
1269
1270
1271 for chunk_info in self._chunks:
1272 self._write_chunk_info_record(chunk_info)
1273
1274
1275 self._file.seek(self._file_header_pos)
1276 self._write_file_header_record(self._index_data_pos, len(self._connections), len(self._chunks))
1277
1279 self._file_header_pos = self._file.tell()
1280
1281 self._create_reader()
1282 self._reader.start_reading()
1283
1284
1285 self._file.truncate(self._index_data_pos)
1286 self._reader.index_data_pos = 0
1287
1288
1289 self._file.seek(self._file_header_pos);
1290 self._write_file_header_record(0, 0, 0)
1291
1292
1293 self._file.seek(0, os.SEEK_END)
1294
1296 self._curr_chunk_info = _ChunkInfo(self._file.tell(), t, t)
1297 self._write_chunk_header(_ChunkHeader(self._compression, 0, 0))
1298 self._curr_chunk_data_pos = self._file.tell()
1299 self._set_compression_mode(self._compression)
1300 self._chunk_open = True
1301
1303 if self._compression == Compression.NONE:
1304 return self._file.tell() - self._curr_chunk_data_pos
1305 else:
1306 return self._output_file.compressed_bytes_in
1307
1309
1310 self._chunks.append(self._curr_chunk_info)
1311
1312
1313 uncompressed_size = self._get_chunk_offset()
1314 self._set_compression_mode(Compression.NONE)
1315 compressed_size = self._file.tell() - self._curr_chunk_data_pos
1316
1317
1318 end_of_chunk_pos = self._file.tell()
1319 self._file.seek(self._curr_chunk_info.pos)
1320 chunk_header = _ChunkHeader(self._compression, compressed_size, uncompressed_size, self._curr_chunk_data_pos)
1321 self._write_chunk_header(chunk_header)
1322 self._chunk_headers[self._curr_chunk_info.pos] = chunk_header
1323
1324
1325 self._file.seek(end_of_chunk_pos)
1326 for connection_id, entries in self._curr_chunk_connection_indexes.items():
1327 self._write_connection_index_record(connection_id, entries)
1328 self._curr_chunk_connection_indexes.clear()
1329
1330
1331 self._chunk_open = False
1332
1349
1351 header = {
1352 'op': _pack_uint8(_OP_FILE_HEADER),
1353 'index_pos': _pack_uint64(index_pos),
1354 'conn_count': _pack_uint32(connection_count),
1355 'chunk_count': _pack_uint32(chunk_count)
1356 }
1357 _write_record(self._file, header, padded_size=_FILE_HEADER_LENGTH)
1358
1360 header = {
1361 'op': _pack_uint8(_OP_CONNECTION),
1362 'topic': connection_info.topic,
1363 'conn': _pack_uint32(connection_info.id)
1364 }
1365
1366 _write_header(self._output_file, header)
1367
1368 _write_header(self._output_file, connection_info.header)
1369
1371 header = {
1372 'op': _pack_uint8(_OP_MSG_DATA),
1373 'conn': _pack_uint32(connection_id),
1374 'time': _pack_time(t)
1375 }
1376 _write_record(self._output_file, header, serialized_bytes)
1377
1379 header = {
1380 'op': _pack_uint8(_OP_CHUNK),
1381 'compression': chunk_header.compression,
1382 'size': _pack_uint32(chunk_header.uncompressed_size)
1383 }
1384 _write_header(self._file, header)
1385
1386 self._file.write(_pack_uint32(chunk_header.compressed_size))
1387
1389 header = {
1390 'op': _pack_uint8(_OP_INDEX_DATA),
1391 'conn': _pack_uint32(connection_id),
1392 'ver': _pack_uint32(_INDEX_VERSION),
1393 'count': _pack_uint32(len(entries))
1394 }
1395
1396 buffer = self._buffer
1397 buffer.seek(0)
1398 buffer.truncate(0)
1399 for entry in entries:
1400 buffer.write(_pack_time (entry.time))
1401 buffer.write(_pack_uint32(entry.offset))
1402
1403 _write_record(self._file, header, buffer.getvalue())
1404
1406 header = {
1407 'op': _pack_uint8 (_OP_CHUNK_INFO),
1408 'ver': _pack_uint32(_CHUNK_INDEX_VERSION),
1409 'chunk_pos': _pack_uint64(chunk_info.pos),
1410 'start_time': _pack_time(chunk_info.start_time),
1411 'end_time': _pack_time(chunk_info.end_time),
1412 'count': _pack_uint32(len(chunk_info.connection_counts))
1413 }
1414
1415 buffer = self._buffer
1416 buffer.seek(0)
1417 buffer.truncate(0)
1418 for connection_id, count in chunk_info.connection_counts.items():
1419 buffer.write(_pack_uint32(connection_id))
1420 buffer.write(_pack_uint32(count))
1421
1422 _write_record(self._file, header, buffer.getvalue())
1423
1424
1425
1426 _message_types = {}
1427
1428 _OP_MSG_DEF = 0x01
1429 _OP_MSG_DATA = 0x02
1430 _OP_FILE_HEADER = 0x03
1431 _OP_INDEX_DATA = 0x04
1432 _OP_CHUNK = 0x05
1433 _OP_CHUNK_INFO = 0x06
1434 _OP_CONNECTION = 0x07
1435
1436 _OP_CODES = {
1437 _OP_MSG_DEF: 'MSG_DEF',
1438 _OP_MSG_DATA: 'MSG_DATA',
1439 _OP_FILE_HEADER: 'FILE_HEADER',
1440 _OP_INDEX_DATA: 'INDEX_DATA',
1441 _OP_CHUNK: 'CHUNK',
1442 _OP_CHUNK_INFO: 'CHUNK_INFO',
1443 _OP_CONNECTION: 'CONNECTION'
1444 }
1445
1446 _VERSION = '#ROSBAG V2.0'
1447 _FILE_HEADER_LENGTH = 4096
1448 _INDEX_VERSION = 1
1449 _CHUNK_INDEX_VERSION = 1
1452 - def __init__(self, id, topic, header):
1453 try:
1454 datatype = _read_str_field(header, 'type')
1455 md5sum = _read_str_field(header, 'md5sum')
1456 msg_def = _read_str_field(header, 'message_definition')
1457 except KeyError as ex:
1458 raise ROSBagFormatException('connection header field %s not found' % str(ex))
1459
1460 self.id = id
1461 self.topic = topic
1462 self.datatype = datatype
1463 self.md5sum = md5sum
1464 self.msg_def = msg_def
1465 self.header = header
1466
1468 return '%d on %s: %s' % (self.id, self.topic, str(self.header))
1469
1471 - def __init__(self, pos, start_time, end_time):
1472 self.pos = pos
1473 self.start_time = start_time
1474 self.end_time = end_time
1475
1476 self.connection_counts = {}
1477
1479 s = 'chunk_pos: %d\n' % self.pos
1480 s += 'start_time: %s\n' % str(self.start_time)
1481 s += 'end_time: %s\n' % str(self.end_time)
1482 s += 'connections: %d\n' % len(self.connection_counts)
1483 s += '\n'.join([' - [%4d] %d' % (connection_id, count) for connection_id, count in self.connection_counts.items()])
1484 return s
1485
1488 self.compression = compression
1489 self.compressed_size = compressed_size
1490 self.uncompressed_size = uncompressed_size
1491 self.data_pos = data_pos
1492
1494 if self.uncompressed_size > 0:
1495 ratio = 100 * (float(self.compressed_size) / self.uncompressed_size)
1496 return 'compression: %s, size: %d, uncompressed: %d (%.2f%%)' % (self.compression, self.compressed_size, self.uncompressed_size, ratio)
1497 else:
1498 return 'compression: %s, size: %d, uncompressed: %d' % (self.compression, self.compressed_size, self.uncompressed_size)
1499
1501 __slots__ = []
1502
1504 try:
1505 return method(self._cmpkey(), other._cmpkey())
1506 except (AttributeError, TypeError):
1507
1508
1509 return NotImplemented
1510
1512 return self._compare(other, lambda s, o: s < o)
1513
1515 return self._compare(other, lambda s, o: s <= o)
1516
1518 return self._compare(other, lambda s, o: s == o)
1519
1521 return self._compare(other, lambda s, o: s >= o)
1522
1524 return self._compare(other, lambda s, o: s > o)
1525
1527 return self._compare(other, lambda s, o: s != o)
1528
1529 -class _IndexEntry(ComparableMixin):
1530 __slots__ = ['time']
1531
1532 - def __init__(self, time):
1534
1535 - def _cmpkey(self):
1537
1538 -class _IndexEntry102(_IndexEntry):
1539 __slots__ = ['offset']
1540
1541 - def __init__(self, time, offset):
1542 self.time = time
1543 self.offset = offset
1544
1545 @property
1546 - def position(self):
1548
1549 - def __str__(self):
1550 return '%d.%d: %d' % (self.time.secs, self.time.nsecs, self.offset)
1551
1552 -class _IndexEntry200(_IndexEntry):
1553 __slots__ = ['chunk_pos', 'offset']
1554
1555 - def __init__(self, time, chunk_pos, offset):
1556 self.time = time
1557 self.chunk_pos = chunk_pos
1558 self.offset = offset
1559
1560 @property
1561 - def position(self):
1562 return (self.chunk_pos, self.offset)
1563
1564 - def __str__(self):
1565 return '%d.%d: %d+%d' % (self.time.secs, self.time.nsecs, self.chunk_pos, self.offset)
1566
1568 message_type = _message_types.get(info.md5sum)
1569 if message_type is None:
1570 try:
1571 message_type = genpy.dynamic.generate_dynamic(info.datatype, info.msg_def)[info.datatype]
1572 if (message_type._md5sum != info.md5sum):
1573 print('WARNING: For type [%s] stored md5sum [%s] does not match message definition [%s].\n Try: "rosrun rosbag fix_msg_defs.py old_bag new_bag."'%(info.datatype, info.md5sum, message_type._md5sum), file=sys.stderr)
1574 except genmsg.InvalidMsgSpec:
1575 message_type = genpy.dynamic.generate_dynamic(info.datatype, "")[info.datatype]
1576 print('WARNING: For type [%s] stored md5sum [%s] has invalid message definition."'%(info.datatype, info.md5sum), file=sys.stderr)
1577 except genmsg.MsgGenerationException as ex:
1578 raise ROSBagException('Error generating datatype %s: %s' % (info.datatype, str(ex)))
1579
1580 _message_types[info.md5sum] = message_type
1581
1582 return message_type
1583
1587 -def _read_time (f): return _unpack_time (f.read(8))
1588
1589 -def _decode_str(v): return v if type(v) is str else v.decode()
1593 -def _unpack_time(v): return rospy.Time(*struct.unpack('<LL', v))
1594
1598 -def _pack_time(v): return _pack_uint32(v.secs) + _pack_uint32(v.nsecs)
1599
1601 data = f.read(size)
1602 if len(data) != size:
1603 raise ROSBagException('expecting %d bytes, read %d' % (size, len(data)))
1604 return data
1605
1607 _skip_sized(f)
1608 _skip_sized(f)
1609
1611 size = _read_uint32(f)
1612 f.seek(size, os.SEEK_CUR)
1613
1615 try:
1616 size = _read_uint32(f)
1617 except struct.error as ex:
1618 raise ROSBagFormatException('error unpacking uint32: %s' % str(ex))
1619 return _read(f, size)
1620
1622 if not isinstance(v, bytes):
1623 v = v.encode()
1624 f.write(_pack_uint32(len(v)))
1625 f.write(v)
1626
1628 if field not in header:
1629 raise ROSBagFormatException('expected "%s" field in record' % field)
1630
1631 try:
1632 value = unpack_fn(header[field])
1633 except Exception as ex:
1634 raise ROSBagFormatException('error reading field "%s": %s' % (field, str(ex)))
1635
1636 return value
1637
1638 -def _read_str_field (header, field): return _read_field(header, field, _decode_str)
1639 -def _read_uint8_field (header, field): return _read_field(header, field, _unpack_uint8)
1642 -def _read_time_field (header, field): return _read_field(header, field, _unpack_time)
1643
1645 header_str = _write_header(f, header)
1646
1647 if padded_size is not None:
1648 header_len = len(header_str)
1649 if header_len < padded_size:
1650 data = ' ' * (padded_size - header_len)
1651 else:
1652 data = ''
1653
1654 _write_sized(f, data)
1655
1657 header_str = b''
1658 equal = b'='
1659 for k, v in header.items():
1660 if not isinstance(k, bytes):
1661 k = k.encode()
1662 if not isinstance(v, bytes):
1663 v = v.encode()
1664 header_str += _pack_uint32(len(k) + 1 + len(v)) + k + equal + v
1665 _write_sized(f, header_str)
1666 return header_str
1667
1669 bag_pos = f.tell()
1670
1671
1672 try:
1673 header = _read_sized(f)
1674 except ROSBagException as ex:
1675 raise ROSBagFormatException('Error reading header: %s' % str(ex))
1676
1677
1678 header_dict = {}
1679 while header != b'':
1680
1681 if len(header) < 4:
1682 raise ROSBagFormatException('Error reading header field')
1683 (size,) = struct.unpack('<L', header[:4])
1684 header = header[4:]
1685
1686
1687 if len(header) < size:
1688 raise ROSBagFormatException('Error reading header field: expected %d bytes, read %d' % (size, len(header)))
1689 (name, sep, value) = header[:size].partition(b'=')
1690 if sep == b'':
1691 raise ROSBagFormatException('Error reading header field')
1692
1693 name = name.decode()
1694 header_dict[name] = value
1695
1696 header = header[size:]
1697
1698
1699 if req_op is not None:
1700 op = _read_uint8_field(header_dict, 'op')
1701 if req_op != op:
1702 raise ROSBagFormatException('Expected op code: %s, got %s' % (_OP_CODES[req_op], _OP_CODES[op]))
1703
1704 return header_dict
1705
1707 pos = f.tell()
1708 header = _read_header(f)
1709 op = _read_uint8_field(header, 'op')
1710 f.seek(pos)
1711 return op
1712
1714 try:
1715 record_data = _read_sized(f)
1716 except ROSBagException as ex:
1717 raise ROSBagFormatException('Error reading record data: %s' % str(ex))
1718
1719 return record_data
1720
1724
1726 raise NotImplementedError()
1727
1728 - def read_messages(self, topics, start_time, end_time, connection_filter, raw):
1729 raise NotImplementedError()
1730
1732 raise NotImplementedError()
1733
1735 """
1736 Support class for reading unindexed v1.2 bag files.
1737 """
1740
1742 self.bag._file_header_pos = self.bag._file.tell()
1743
1745 """Generates all bag index information by rereading the message records."""
1746 f = self.bag._file
1747
1748 total_bytes = self.bag.size
1749
1750
1751 self.bag._file.seek(self.bag._file_header_pos)
1752
1753 offset = f.tell()
1754
1755
1756 while offset < total_bytes:
1757 yield offset
1758
1759 op = _peek_next_header_op(f)
1760
1761 if op == _OP_MSG_DEF:
1762 connection_info = self.read_message_definition_record()
1763
1764 if connection_info.topic not in self.bag._topic_connections:
1765 self.bag._topic_connections[connection_info.topic] = connection_info.id
1766 self.bag._connections[connection_info.id] = connection_info
1767 self.bag._connection_indexes[connection_info.id] = []
1768
1769 elif op == _OP_MSG_DATA:
1770
1771 header = _read_header(f)
1772
1773 topic = _read_str_field(header, 'topic')
1774 secs = _read_uint32_field(header, 'sec')
1775 nsecs = _read_uint32_field(header, 'nsec')
1776 t = genpy.Time(secs, nsecs)
1777
1778 if topic not in self.bag._topic_connections:
1779 datatype = _read_str_field(header, 'type')
1780 self._create_connection_info_for_datatype(topic, datatype)
1781
1782 connection_id = self.bag._topic_connections[topic]
1783 info = self.bag._connections[connection_id]
1784
1785
1786 _skip_sized(f)
1787
1788
1789 bisect.insort_right(self.bag._connection_indexes[connection_id], _IndexEntry102(t, offset))
1790
1791 offset = f.tell()
1792
1793 - def read_messages(self, topics, start_time, end_time, topic_filter, raw):
1794 f = self.bag._file
1795
1796 f.seek(self.bag._file_header_pos)
1797
1798 while True:
1799
1800 while True:
1801 position = f.tell()
1802
1803 try:
1804 header = _read_header(f)
1805 except Exception:
1806 return
1807
1808 op = _read_uint8_field(header, 'op')
1809 if op != _OP_MSG_DEF:
1810 break
1811
1812 connection_info = self.read_message_definition_record(header)
1813
1814 if connection_info.topic not in self.bag._topic_connections:
1815 self.bag._topic_connections[connection_info.topic] = connection_info.id
1816
1817 self.bag._connections[connection_info.id] = connection_info
1818
1819
1820 if op != _OP_MSG_DATA:
1821 raise ROSBagFormatException('Expecting OP_MSG_DATA, got %d' % op)
1822
1823 topic = _read_str_field(header, 'topic')
1824
1825 if topic not in self.bag._topic_connections:
1826 datatype = _read_str_field(header, 'type')
1827 self._create_connection_info_for_datatype(topic, datatype)
1828
1829 connection_id = self.bag._topic_connections[topic]
1830 info = self.bag._connections[connection_id]
1831
1832
1833 try:
1834 msg_type = _get_message_type(info)
1835 except KeyError:
1836 raise ROSBagException('Cannot deserialize messages of type [%s]. Message was not preceeded in bagfile by definition' % info.datatype)
1837
1838
1839 secs = _read_uint32_field(header, 'sec')
1840 nsecs = _read_uint32_field(header, 'nsec')
1841 t = genpy.Time(secs, nsecs)
1842
1843
1844 data = _read_record_data(f)
1845
1846 if raw:
1847 msg = (info.datatype, data, info.md5sum, position, msg_type)
1848 else:
1849
1850 msg = msg_type()
1851 msg.deserialize(data)
1852
1853 yield BagMessage(topic, msg, t)
1854
1855 self.bag._connection_indexes_read = True
1856
1858 for c in self.bag._connections.values():
1859 if c.datatype == datatype:
1860 connection_id = len(self.bag._connections)
1861 connection_header = { 'topic' : topic, 'type' : c.header['type'], 'md5sum' : c.header['md5sum'], 'message_definition' : c.header['message_definition'] }
1862 connection_info = _ConnectionInfo(connection_id, topic, connection_header)
1863
1864 self.bag._topic_connections[topic] = connection_id
1865 self.bag._connections[connection_id] = connection_info
1866 self.bag._connection_indexes[connection_id] = []
1867 return
1868
1869 raise ROSBagFormatException('Topic %s of datatype %s not preceded by message definition' % (topic, datatype))
1870
1872 if not header:
1873 header = _read_header(self.bag._file, _OP_MSG_DEF)
1874
1875 topic = _read_str_field(header, 'topic')
1876 datatype = _read_str_field(header, 'type')
1877 md5sum = _read_str_field(header, 'md5')
1878 msg_def = _read_str_field(header, 'def')
1879
1880 _skip_sized(self.bag._file)
1881
1882 connection_header = { 'topic' : topic, 'type' : datatype, 'md5sum' : md5sum, 'message_definition' : msg_def }
1883
1884 id = len(self.bag._connections)
1885
1886 return _ConnectionInfo(id, topic, connection_header)
1887
1889 """
1890 Support class for reading indexed v1.2 bag files.
1891 """
1894
1895 - def read_messages(self, topics, start_time, end_time, connection_filter, raw):
1896 connections = self.bag._get_connections(topics, connection_filter)
1897 for entry in self.bag._get_entries(connections, start_time, end_time):
1898 yield self.seek_and_read_message_data_record(entry.offset, raw)
1899
1901 """Generates all bag index information by rereading the message records."""
1902 f = self.bag._file
1903
1904 total_bytes = self.bag.size
1905
1906
1907 self.bag._file.seek(self.bag._file_header_pos)
1908 self.read_file_header_record()
1909
1910 offset = f.tell()
1911
1912
1913 while offset < total_bytes:
1914 yield offset
1915
1916 op = _peek_next_header_op(f)
1917
1918 if op == _OP_MSG_DEF:
1919 connection_info = self.read_message_definition_record()
1920
1921 if connection_info.topic not in self.bag._topic_connections:
1922 self.bag._topic_connections[connection_info.topic] = connection_info.id
1923 self.bag._connections[connection_info.id] = connection_info
1924 self.bag._connection_indexes[connection_info.id] = []
1925
1926 elif op == _OP_MSG_DATA:
1927
1928 header = _read_header(f)
1929
1930 topic = _read_str_field(header, 'topic')
1931 secs = _read_uint32_field(header, 'sec')
1932 nsecs = _read_uint32_field(header, 'nsec')
1933 t = genpy.Time(secs, nsecs)
1934
1935 if topic not in self.bag._topic_connections:
1936 datatype = _read_str_field(header, 'type')
1937 self._create_connection_info_for_datatype(topic, datatype)
1938
1939 connection_id = self.bag._topic_connections[topic]
1940 info = self.bag._connections[connection_id]
1941
1942
1943 _skip_sized(f)
1944
1945
1946 bisect.insort_right(self.bag._connection_indexes[connection_id], _IndexEntry102(t, offset))
1947
1948 elif op == _OP_INDEX_DATA:
1949 _skip_record(f)
1950
1951 offset = f.tell()
1952
1954 try:
1955
1956 self.read_file_header_record()
1957
1958 total_bytes = self.bag.size
1959
1960
1961 if self.bag._index_data_pos == 0:
1962 raise ROSBagUnindexedException()
1963
1964
1965 self.bag._file.seek(self.bag._index_data_pos)
1966
1967
1968 topic_indexes = {}
1969 while True:
1970 pos = self.bag._file.tell()
1971 if pos >= total_bytes:
1972 break
1973
1974 topic, index = self.read_topic_index_record()
1975
1976 topic_indexes[topic] = index
1977
1978
1979 for topic, index in topic_indexes.items():
1980 self.bag._file.seek(index[0].offset)
1981
1982 connection_info = self.read_message_definition_record()
1983
1984 if connection_info.topic not in self.bag._topic_connections:
1985 self.bag._topic_connections[connection_info.topic] = connection_info.id
1986 self.bag._connections[connection_info.id] = connection_info
1987
1988 self.bag._connection_indexes[connection_info.id] = index
1989
1990 self.bag._connection_indexes_read = True
1991
1992 except Exception as ex:
1993 raise ROSBagUnindexedException()
1994
1996 self.bag._file_header_pos = self.bag._file.tell()
1997
1998 header = _read_header(self.bag._file, _OP_FILE_HEADER)
1999
2000 self.bag._index_data_pos = _read_uint64_field(header, 'index_pos')
2001
2002 _skip_sized(self.bag._file)
2003
2005 f = self.bag._file
2006
2007 header = _read_header(f, _OP_INDEX_DATA)
2008
2009 index_version = _read_uint32_field(header, 'ver')
2010 topic = _read_str_field (header, 'topic')
2011 count = _read_uint32_field(header, 'count')
2012
2013 if index_version != 0:
2014 raise ROSBagFormatException('expecting index version 0, got %d' % index_version)
2015
2016 _read_uint32(f)
2017
2018 topic_index = []
2019
2020 for i in range(count):
2021 time = _read_time (f)
2022 offset = _read_uint64(f)
2023
2024 topic_index.append(_IndexEntry102(time, offset))
2025
2026 return (topic, topic_index)
2027
2029 f = self.bag._file
2030
2031
2032 f.seek(position)
2033
2034
2035 while True:
2036 header = _read_header(f)
2037 op = _read_uint8_field(header, 'op')
2038 if op != _OP_MSG_DEF:
2039 break
2040 _skip_sized(f)
2041
2042
2043 if op != _OP_MSG_DATA:
2044 raise ROSBagFormatException('Expecting OP_MSG_DATA, got %d' % op)
2045
2046 topic = _read_str_field(header, 'topic')
2047
2048 connection_id = self.bag._topic_connections[topic]
2049 info = self.bag._connections[connection_id]
2050
2051
2052 try:
2053 msg_type = _get_message_type(info)
2054 except KeyError:
2055 raise ROSBagException('Cannot deserialize messages of type [%s]. Message was not preceeded in bagfile by definition' % info.datatype)
2056
2057
2058 secs = _read_uint32_field(header, 'sec')
2059 nsecs = _read_uint32_field(header, 'nsec')
2060 t = genpy.Time(secs, nsecs)
2061
2062
2063 data = _read_record_data(f)
2064
2065 if raw:
2066 msg = info.datatype, data, info.md5sum, position, msg_type
2067 else:
2068
2069 msg = msg_type()
2070 msg.deserialize(data)
2071
2072 return BagMessage(topic, msg, t)
2073
2075 """
2076 Support class for reading v2.0 bag files.
2077 """
2079 _BagReader.__init__(self, bag)
2080
2081 self.decompressed_chunk_pos = None
2082 self.decompressed_chunk = None
2083 self.decompressed_chunk_io = None
2084
2086 """
2087 Generates all bag index information by rereading the chunks.
2088 Assumes the file header has been read.
2089 """
2090 f = self.bag._file
2091
2092 f.seek(0, os.SEEK_END)
2093 total_bytes = f.tell()
2094
2095
2096
2097
2098 self._read_terminal_connection_records()
2099
2100
2101 self.bag._file.seek(self.bag._file_header_pos)
2102 self.read_file_header_record()
2103
2104 trunc_pos = None
2105
2106 while True:
2107 chunk_pos = f.tell()
2108 if chunk_pos >= total_bytes:
2109 break
2110
2111 yield chunk_pos
2112
2113 try:
2114 self._reindex_read_chunk(f, chunk_pos, total_bytes)
2115 except Exception as ex:
2116 break
2117
2118 trunc_pos = f.tell()
2119
2120 if trunc_pos and trunc_pos < total_bytes:
2121 f.truncate(trunc_pos)
2122
2124
2125 chunk_header = self.read_chunk_header()
2126
2127
2128 if chunk_header.compressed_size == 0:
2129 raise ROSBagException('unterminated chunk at %d' % chunk_pos)
2130
2131 if chunk_header.compression == Compression.NONE:
2132 chunk_file = f
2133 else:
2134
2135 compressed_chunk = _read(f, chunk_header.compressed_size)
2136
2137
2138 if chunk_header.compression == Compression.BZ2:
2139 self.decompressed_chunk = bz2.decompress(compressed_chunk)
2140 elif chunk_header.compression == Compression.LZ4 and found_lz4:
2141 self.decompressed_chunk = roslz4.decompress(compressed_chunk)
2142 else:
2143 raise ROSBagException('unsupported compression type: %s' % chunk_header.compression)
2144
2145 if self.decompressed_chunk_io:
2146 self.decompressed_chunk_io.close()
2147 self.decompressed_chunk_io = StringIO(self.decompressed_chunk)
2148
2149 chunk_file = self.decompressed_chunk_io
2150
2151
2152 self.bag._curr_chunk_info = None
2153
2154 if chunk_header.compression == Compression.NONE:
2155 offset = chunk_file.tell() - chunk_pos
2156 else:
2157 offset = chunk_file.tell()
2158
2159 expected_index_length = 0
2160
2161 while offset < chunk_header.uncompressed_size:
2162 op = _peek_next_header_op(chunk_file)
2163
2164 if op == _OP_CONNECTION:
2165 connection_info = self.read_connection_record(chunk_file)
2166
2167 if connection_info.id not in self.bag._connections:
2168 self.bag._connections[connection_info.id] = connection_info
2169 if connection_info.id not in self.bag._connection_indexes:
2170 self.bag._connection_indexes[connection_info.id] = []
2171
2172 elif op == _OP_MSG_DATA:
2173
2174 header = _read_header(chunk_file)
2175
2176 connection_id = _read_uint32_field(header, 'conn')
2177 t = _read_time_field (header, 'time')
2178
2179
2180 if not self.bag._curr_chunk_info:
2181 self.bag._curr_chunk_info = _ChunkInfo(chunk_pos, t, t)
2182 else:
2183 if t > self.bag._curr_chunk_info.end_time:
2184 self.bag._curr_chunk_info.end_time = t
2185 elif t < self.bag._curr_chunk_info.start_time:
2186 self.bag._curr_chunk_info.start_time = t
2187 if connection_id in self.bag._curr_chunk_info.connection_counts:
2188 self.bag._curr_chunk_info.connection_counts[connection_id] += 1
2189 else:
2190 self.bag._curr_chunk_info.connection_counts[connection_id] = 1
2191
2192
2193 _skip_sized(chunk_file)
2194
2195
2196 if connection_id not in self.bag._connection_indexes:
2197 raise ROSBagException('connection id (id=%d) in chunk at position %d not preceded by connection record' % (connection_id, chunk_pos))
2198 bisect.insort_right(self.bag._connection_indexes[connection_id], _IndexEntry200(t, chunk_pos, offset))
2199
2200 expected_index_length += 1
2201
2202 else:
2203
2204 _skip_record(chunk_file)
2205
2206 if chunk_header.compression == Compression.NONE:
2207 offset = chunk_file.tell() - chunk_pos
2208 else:
2209 offset = chunk_file.tell()
2210
2211
2212 next_op = _peek_next_header_op(f)
2213
2214 total_index_length = 0
2215
2216 while next_op != _OP_CHUNK:
2217 if next_op == _OP_INDEX_DATA:
2218
2219 _, index = self.read_connection_index_record()
2220 total_index_length += len(index)
2221 else:
2222 _skip_record(f)
2223
2224 if f.tell() >= total_bytes:
2225 if total_index_length != expected_index_length:
2226 raise ROSBagException('index shorter than expected (%d vs %d)' % (total_index_length, expected_index_length))
2227 break
2228
2229 next_op = _peek_next_header_op(f)
2230
2231
2232 self.bag._chunk_headers[chunk_pos] = chunk_header
2233 self.bag._chunks.append(self.bag._curr_chunk_info)
2234
2236 b, f, r = self.bag, self.bag._file, self.bag._reader
2237
2238
2239 f.seek(b._file_header_pos)
2240 r.read_file_header_record()
2241
2242
2243 if self._advance_to_next_record(_OP_CONNECTION):
2244
2245 while True:
2246 connection_info = r.read_connection_record(f)
2247
2248 b._connections[connection_info.id] = connection_info
2249 b._connection_indexes[connection_info.id] = []
2250
2251 next_op = _peek_next_header_op(f)
2252 if next_op != _OP_CONNECTION:
2253 break
2254
2256 b, f = self.bag, self.bag._file
2257
2258 try:
2259 while True:
2260 next_op = _peek_next_header_op(f)
2261 if next_op == op:
2262 break
2263
2264 if next_op == _OP_INDEX_DATA:
2265
2266
2267
2268 if b._curr_chunk_info is None:
2269 b._curr_chunk_info = _ChunkInfo(0, rospy.Time(0, 1), rospy.Time(0, 1))
2270
2271 b._reader.read_connection_index_record()
2272 else:
2273 _skip_record(f)
2274
2275 return True
2276
2277 except Exception as ex:
2278 return False
2279
2281 try:
2282
2283 self.read_file_header_record()
2284
2285
2286 if self.bag._index_data_pos == 0:
2287 raise ROSBagUnindexedException()
2288
2289
2290 self.bag._file.seek(self.bag._index_data_pos)
2291
2292
2293 self.bag._connection_indexes = {}
2294 for i in range(self.bag._connection_count):
2295 connection_info = self.read_connection_record(self.bag._file)
2296 self.bag._connections[connection_info.id] = connection_info
2297 self.bag._connection_indexes[connection_info.id] = []
2298
2299
2300 self.bag._chunks = [self.read_chunk_info_record() for i in range(self.bag._chunk_count)]
2301
2302
2303 self.bag._chunk_headers = {}
2304 for chunk_info in self.bag._chunks:
2305 self.bag._file.seek(chunk_info.pos)
2306 self.bag._chunk_headers[chunk_info.pos] = self.read_chunk_header()
2307
2308 if not self.bag._skip_index:
2309 self._read_connection_index_records()
2310
2311 except Exception as ex:
2312 raise ROSBagUnindexedException()
2313
2315 for chunk_info in self.bag._chunks:
2316 self.bag._file.seek(chunk_info.pos)
2317 _skip_record(self.bag._file)
2318
2319 self.bag._curr_chunk_info = chunk_info
2320 for i in range(len(chunk_info.connection_counts)):
2321 connection_id, index = self.read_connection_index_record()
2322 self.bag._connection_indexes[connection_id].extend(index)
2323
2324
2325
2326
2327 orphan_connection_ids = [id for id, index in self.bag._connection_indexes.items() if not index]
2328 for id in orphan_connection_ids:
2329 del self.bag._connections[id]
2330 del self.bag._connection_indexes[id]
2331
2332 self.bag._connection_indexes_read = True
2333
2334 - def read_messages(self, topics, start_time, end_time, connection_filter, raw):
2335 connections = self.bag._get_connections(topics, connection_filter)
2336 for entry in self.bag._get_entries(connections, start_time, end_time):
2337 yield self.seek_and_read_message_data_record((entry.chunk_pos, entry.offset), raw)
2338
2339
2340
2342 self.bag._file_header_pos = self.bag._file.tell()
2343
2344 header = _read_header(self.bag._file, _OP_FILE_HEADER)
2345
2346 self.bag._index_data_pos = _read_uint64_field(header, 'index_pos')
2347 self.bag._chunk_count = _read_uint32_field(header, 'chunk_count')
2348 self.bag._connection_count = _read_uint32_field(header, 'conn_count')
2349
2350 _skip_sized(self.bag._file)
2351
2353 header = _read_header(f, _OP_CONNECTION)
2354
2355 conn_id = _read_uint32_field(header, 'conn')
2356 topic = _read_str_field (header, 'topic')
2357
2358 connection_header = _read_header(f)
2359
2360 return _ConnectionInfo(conn_id, topic, connection_header)
2361
2363 f = self.bag._file
2364
2365 header = _read_header(f, _OP_CHUNK_INFO)
2366
2367 chunk_info_version = _read_uint32_field(header, 'ver')
2368
2369 if chunk_info_version == 1:
2370 chunk_pos = _read_uint64_field(header, 'chunk_pos')
2371 start_time = _read_time_field (header, 'start_time')
2372 end_time = _read_time_field (header, 'end_time')
2373 connection_count = _read_uint32_field(header, 'count')
2374
2375 chunk_info = _ChunkInfo(chunk_pos, start_time, end_time)
2376
2377 _read_uint32(f)
2378
2379 for i in range(connection_count):
2380 connection_id = _read_uint32(f)
2381 count = _read_uint32(f)
2382
2383 chunk_info.connection_counts[connection_id] = count
2384
2385 return chunk_info
2386 else:
2387 raise ROSBagFormatException('Unknown chunk info record version: %d' % chunk_info_version)
2388
2390 header = _read_header(self.bag._file, _OP_CHUNK)
2391
2392 compression = _read_str_field (header, 'compression')
2393 uncompressed_size = _read_uint32_field(header, 'size')
2394
2395 compressed_size = _read_uint32(self.bag._file)
2396
2397 data_pos = self.bag._file.tell()
2398
2399 return _ChunkHeader(compression, compressed_size, uncompressed_size, data_pos)
2400
2402 f = self.bag._file
2403
2404 header = _read_header(f, _OP_INDEX_DATA)
2405
2406 index_version = _read_uint32_field(header, 'ver')
2407 connection_id = _read_uint32_field(header, 'conn')
2408 count = _read_uint32_field(header, 'count')
2409
2410 if index_version != 1:
2411 raise ROSBagFormatException('expecting index version 1, got %d' % index_version)
2412
2413 record_size = _read_uint32(f)
2414
2415 index = []
2416
2417 for i in range(count):
2418 time = _read_time (f)
2419 offset = _read_uint32(f)
2420
2421 index.append(_IndexEntry200(time, self.bag._curr_chunk_info.pos, offset))
2422
2423 return (connection_id, index)
2424
2426 chunk_pos, offset = position
2427
2428 chunk_header = self.bag._chunk_headers.get(chunk_pos)
2429 if chunk_header is None:
2430 raise ROSBagException('no chunk at position %d' % chunk_pos)
2431
2432 if chunk_header.compression == Compression.NONE:
2433 f = self.bag._file
2434 f.seek(chunk_header.data_pos + offset)
2435 else:
2436 if self.decompressed_chunk_pos != chunk_pos:
2437
2438 self.bag._file.seek(chunk_header.data_pos)
2439 compressed_chunk = _read(self.bag._file, chunk_header.compressed_size)
2440
2441 if chunk_header.compression == Compression.BZ2:
2442 self.decompressed_chunk = bz2.decompress(compressed_chunk)
2443 elif chunk_header.compression == Compression.LZ4 and found_lz4:
2444 self.decompressed_chunk = roslz4.decompress(compressed_chunk)
2445 else:
2446 raise ROSBagException('unsupported compression type: %s' % chunk_header.compression)
2447
2448 self.decompressed_chunk_pos = chunk_pos
2449
2450 if self.decompressed_chunk_io:
2451 self.decompressed_chunk_io.close()
2452 self.decompressed_chunk_io = StringIO(self.decompressed_chunk)
2453
2454 f = self.decompressed_chunk_io
2455 f.seek(offset)
2456
2457
2458 while True:
2459 header = _read_header(f)
2460 op = _read_uint8_field(header, 'op')
2461 if op != _OP_CONNECTION:
2462 break
2463 _skip_sized(f)
2464
2465
2466 if op != _OP_MSG_DATA:
2467 raise ROSBagFormatException('Expecting OP_MSG_DATA, got %d' % op)
2468
2469 connection_id = _read_uint32_field(header, 'conn')
2470 t = _read_time_field (header, 'time')
2471
2472
2473 connection_info = self.bag._connections[connection_id]
2474 try:
2475 msg_type = _get_message_type(connection_info)
2476 except KeyError:
2477 raise ROSBagException('Cannot deserialize messages of type [%s]. Message was not preceded in bag by definition' % connection_info.datatype)
2478
2479
2480 data = _read_record_data(f)
2481
2482
2483 if raw:
2484 msg = connection_info.datatype, data, connection_info.md5sum, (chunk_pos, offset), msg_type
2485 else:
2486 msg = msg_type()
2487 msg.deserialize(data)
2488
2489 return BagMessage(connection_info.topic, msg, t)
2490
2492 secs_frac = secs - int(secs)
2493 secs_frac_str = ('%.2f' % secs_frac)[1:]
2494
2495 return time.strftime('%b %d %Y %H:%M:%S', time.localtime(secs)) + secs_frac_str
2496
2498 multiple = 1024.0
2499 for suffix in ['KB', 'MB', 'GB', 'TB', 'PB', 'EB', 'ZB', 'YB']:
2500 size /= multiple
2501 if size < multiple:
2502 return '%.1f %s' % (size, suffix)
2503
2504 return '-'
2505
2507 multiple = 1000.0
2508 for suffix in ['Hz', 'kHz', 'MHz', 'GHz', 'THz', 'PHz', 'EHz', 'ZHz', 'YHz']:
2509 if freq < multiple:
2510 return '%.1f %s' % (freq, suffix)
2511 freq /= multiple
2512
2513 return '-'
2514
2517 """
2518 Perform an N-way merge operation on sorted lists.
2519
2520 @param list_of_lists: (really iterable of iterable) of sorted elements
2521 (either by naturally or by C{key})
2522 @param key: specify sort key function (like C{sort()}, C{sorted()})
2523 @param iterfun: function that returns an iterator.
2524
2525 Yields tuples of the form C{(item, iterator)}, where the iterator is the
2526 built-in list iterator or something you pass in, if you pre-generate the
2527 iterators.
2528
2529 This is a stable merge; complexity O(N lg N)
2530
2531 Examples::
2532
2533 print list(x[0] for x in mergesort([[1,2,3,4],
2534 [2,3.5,3.7,4.5,6,7],
2535 [2.6,3.6,6.6,9]]))
2536 [1, 2, 2, 2.6, 3, 3.5, 3.6, 3.7, 4, 4.5, 6, 6.6, 7, 9]
2537
2538 # note stability
2539 print list(x[0] for x in mergesort([[1,2,3,4],
2540 [2,3.5,3.7,4.5,6,7],
2541 [2.6,3.6,6.6,9]], key=int))
2542 [1, 2, 2, 2.6, 3, 3.5, 3.6, 3.7, 4, 4.5, 6, 6.6, 7, 9]
2543
2544 print list(x[0] for x in mergesort([[4,3,2,1],
2545 [7,6.5,4,3.7,3.3,1.9],
2546 [9,8.6,7.6,6.6,5.5,4.4,3.3]],
2547 key=lambda x: -x))
2548 [9, 8.6, 7.6, 7, 6.6, 6.5, 5.5, 4.4, 4, 4, 3.7, 3.3, 3.3, 3, 2, 1.9, 1]
2549 """
2550
2551 heap = []
2552 for i, itr in enumerate(iter(pl) for pl in list_of_lists):
2553 try:
2554 item = next(itr)
2555 toadd = (key(item), i, item, itr) if key else (item, i, itr)
2556 heap.append(toadd)
2557 except StopIteration:
2558 pass
2559 heapq.heapify(heap)
2560
2561 if key:
2562 while heap:
2563 _, idx, item, itr = heap[0]
2564 yield item, itr
2565 try:
2566 item = next(itr)
2567 heapq.heapreplace(heap, (key(item), idx, item, itr) )
2568 except StopIteration:
2569 heapq.heappop(heap)
2570
2571 else:
2572 while heap:
2573 item, idx, itr = heap[0]
2574 yield item, itr
2575 try:
2576 heapq.heapreplace(heap, (next(itr), idx, itr))
2577 except StopIteration:
2578 heapq.heappop(heap)
2579
2581 """
2582 A file facade for sequential compressors (e.g., bz2.BZ2Compressor).
2583 """
2585 self.file = file
2586 self.compressor = compressor
2587 self.compressed_bytes_in = 0
2588
2590 compressed = self.compressor.compress(data)
2591 if len(compressed) > 0:
2592 self.file.write(compressed)
2593 self.compressed_bytes_in += len(data)
2594
2596 compressed = self.compressor.flush()
2597 if len(compressed) > 0:
2598 self.file.write(compressed)
2599
2613