1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33 """
34 The rosbag Python API.
35
36 Provides serialization of bag files.
37 """
38
39 from __future__ import print_function
40
41 import bisect
42 import bz2
43 import collections
44 import heapq
45 import os
46 import re
47 import struct
48 import sys
49 import threading
50 import time
51 import yaml
52
53 from Cryptodome.Cipher import AES
54 from Cryptodome.Random import get_random_bytes
55
56 import gnupg
57
58 try:
59 from cStringIO import StringIO
60 except ImportError:
61 from io import BytesIO as StringIO
62
63 import genmsg
64 import genpy
65 import genpy.dynamic
66 import genpy.message
67
68 import roslib.names
69 import rospy
70 try:
71 import roslz4
72 found_lz4 = True
73 except ImportError:
74 rospy.logwarn(
75 'Failed to load Python extension for LZ4 support. '
76 'LZ4 compression will not be available.')
77 found_lz4 = False
80 """
81 Base class for exceptions in rosbag.
82 """
84 self.value = value
85
86
87 self.args = (value,)
88
91
98
100 """
101 Exception for unindexed bags.
102 """
106
108 """
109 Exception raised when encryption is not supported.
110 """
113
115 """
116 Exception raised when encryption or decryption failed.
117 """
120
122 """
123 Allowable compression types
124 """
125 NONE = 'none'
126 BZ2 = 'bz2'
127 LZ4 = 'lz4'
128
129 BagMessage = collections.namedtuple('BagMessage', 'topic message timestamp')
130 BagMessageWithConnectionHeader = collections.namedtuple('BagMessageWithConnectionHeader', 'topic message timestamp connection_header')
133 """
134 Base class for bag encryptor.
135 """
136 _ENCRYPTOR_FIELD_NAME = 'encryptor'
137
140
142 """
143 Class for unencrypted bags.
144 """
146 super(_ROSBagNoEncryptor, self).__init__()
147
150
153
156
159
162
164 return write_header(f, header)
165
167 return read_header(f, req_op)
168
171
174
176 """
177 Class for AES-CBC-encrypted bags.
178 """
179 NAME = 'rosbag/AesCbcEncryptor'
180 _GPG_USER_FIELD_NAME = 'gpg_user'
181 _ENCRYPTED_KEY_FIELD_NAME = 'encrypted_key'
182
184 """
185 Create AES encryptor.
186 """
187 super(_ROSBagAesCbcEncryptor, self).__init__()
188
189 self._gpg_key_user = None
190
191 self._gpg_passphrase = None
192
193 self._symmetric_key = None
194
195 self._encrypted_symmetric_key = None
196
197 - def initialize(self, bag, gpg_key_user, passphrase=None):
198 """
199 Initialize encryptor by composing AES symmetric key.
200 @param bag: bag to be encrypted/decrypted
201 @type bag: Bag
202 @param gpg_key_user: user name of GPG key used for symmetric key encryption
203 @type gpg_key_user: str
204 @raise ROSBagException: if GPG key user has already been set
205 """
206 if bag._mode != 'w':
207 self._gpg_passphrase = passphrase or os.getenv('ROSBAG_GPG_PASSPHRASE', None)
208 return
209 if self._gpg_key_user == gpg_key_user:
210 return
211 if not self._gpg_key_user:
212 self._gpg_key_user = gpg_key_user
213 self._build_symmetric_key()
214 else:
215 raise ROSBagException('Encryption user has already been set to {}'.format(self._gpg_key_user))
216
218 """
219 Read chunk from file, encrypt it, and write back to file.
220 @param chunk_size: size of chunk
221 @type chunk_size: int
222 @param chunk_data_pos: position of chunk data portion
223 @type chunk_data_pos: int
224 @param f: file stream
225 @type f: file
226 @return: size of initialization vector and encrypted chunk
227 @rtype: int
228 """
229 f.seek(chunk_data_pos)
230 chunk = _read(f, chunk_size)
231
232 iv = get_random_bytes(AES.block_size)
233 f.seek(chunk_data_pos)
234 f.write(iv)
235 cipher = AES.new(self._symmetric_key, AES.MODE_CBC, iv)
236 encrypted_chunk = cipher.encrypt(_add_padding(chunk))
237
238 f.write(encrypted_chunk)
239 f.truncate(f.tell())
240 return AES.block_size + len(encrypted_chunk)
241
243 """
244 Decrypt chunk.
245 @param encrypted_chunk: chunk to decrypt
246 @type encrypted_chunk: str
247 @return: decrypted chunk
248 @rtype: str
249 @raise ROSBagFormatException: if size of input chunk is not multiple of AES block size
250 """
251 if len(encrypted_chunk) % AES.block_size != 0:
252 raise ROSBagFormatException('Error in encrypted chunk size: {}'.format(len(encrypted_chunk)))
253 if len(encrypted_chunk) < AES.block_size:
254 raise ROSBagFormatException('No initialization vector in encrypted chunk: {}'.format(len(encrypted_chunk)))
255
256 iv = encrypted_chunk[:AES.block_size]
257 cipher = AES.new(self._symmetric_key, AES.MODE_CBC, iv)
258 decrypted_chunk = cipher.decrypt(encrypted_chunk[AES.block_size:])
259 return _remove_padding(decrypted_chunk)
260
262 """
263 Add encryptor information to bag file header.
264 @param header: bag file header
265 @type header: dict
266 """
267 header[self._ENCRYPTOR_FIELD_NAME] = self.NAME
268 header[self._GPG_USER_FIELD_NAME] = self._gpg_key_user
269 header[self._ENCRYPTED_KEY_FIELD_NAME] = self._encrypted_symmetric_key
270
272 """
273 Read encryptor information from bag file header.
274 @param header: bag file header
275 @type header: dict
276 @raise ROSBagFormatException: if GPG key user is not found in header
277 """
278 try:
279 self._encrypted_symmetric_key = _read_bytes_field(header, self._ENCRYPTED_KEY_FIELD_NAME)
280 except ROSBagFormatException:
281 raise ROSBagFormatException('Encrypted symmetric key is not found in header')
282 try:
283 self._gpg_key_user = _read_str_field(header, self._GPG_USER_FIELD_NAME)
284 except ROSBagFormatException:
285 raise ROSBagFormatException('GPG key user is not found in header')
286 try:
287 self._symmetric_key = _decrypt_string_gpg(self._encrypted_symmetric_key, self._gpg_passphrase)
288 except ROSBagFormatException:
289 raise
290
292 """
293 Write encrypted header to bag file.
294 @param f: file stream
295 @type f: file
296 @param header: unencrypted header
297 @type header: dict
298 @return: encrypted string representing header
299 @rtype: str
300 """
301 header_str = b''
302 equal = b'='
303 for k, v in header.items():
304 if not isinstance(k, bytes):
305 k = k.encode()
306 if not isinstance(v, bytes):
307 v = v.encode()
308 header_str += _pack_uint32(len(k) + 1 + len(v)) + k + equal + v
309
310 iv = get_random_bytes(AES.block_size)
311 enc_str = iv
312 cipher = AES.new(self._symmetric_key, AES.MODE_CBC, iv)
313 enc_str += cipher.encrypt(_add_padding(header_str))
314 _write_sized(f, enc_str)
315 return enc_str
316
318 """
319 Read encrypted header from bag file.
320 @param f: file stream
321 @type f: file
322 @param req_op: expected header op code
323 @type req_op: int
324 @return: decrypted header
325 @rtype: dict
326 @raise ROSBagFormatException: if error occurs while decrypting/reading header
327 """
328
329 try:
330 header = self._decrypt_encrypted_header(f)
331 except ROSBagException as ex:
332 raise ROSBagFormatException('Error reading header: %s' % str(ex))
333
334 return _build_header_from_str(header, req_op)
335
337 """
338 Add rows for rosbag info.
339 @param rows: information on bag encryption
340 @type rows: list of tuples
341 """
342 rows.append(('encryption', self.NAME))
343 rows.append(('GPG key user', self._gpg_key_user))
344
346 """
347 Return string for rosbag info.
348 @return: information on bag encryption
349 @rtype: str
350 """
351 return 'encryption: %s\nGPG key user: %s\n' % (self.NAME, self._gpg_key_user)
352
354 if not self._gpg_key_user:
355 return
356 self._symmetric_key = get_random_bytes(AES.block_size)
357 self._encrypted_symmetric_key = _encrypt_string_gpg(self._gpg_key_user, self._symmetric_key)
358
360 try:
361 size = _read_uint32(f)
362 except struct.error as ex:
363 raise ROSBagFormatException('error unpacking uint32: %s' % str(ex))
364
365 if size % AES.block_size != 0:
366 raise ROSBagFormatException('Error in encrypted header size: {}'.format(size))
367 if size < AES.block_size:
368 raise ROSBagFormatException('No initialization vector in encrypted header: {}'.format(size))
369
370 iv = _read(f, AES.block_size)
371 size -= AES.block_size
372 encrypted_header = _read(f, size)
373 cipher = AES.new(self._symmetric_key, AES.MODE_CBC, iv)
374 header = cipher.decrypt(encrypted_header)
375 return _remove_padding(header)
376
378
379 padding_num = AES.block_size - len(input_bytes) % AES.block_size
380 return input_bytes + bytes((padding_num,) * padding_num)
381
383
384 return input_str[:-ord(input_str[len(input_str) - 1:])]
385
387 gpg = gnupg.GPG()
388 enc_data = gpg.encrypt(input, [key_user], always_trust=True)
389 if not enc_data.ok:
390 raise ROSBagEncryptException('Failed to encrypt bag: {}. Have you installed a required public key?'.format(enc_data.status))
391 return str(enc_data)
392
394 gpg = gnupg.GPG()
395 dec_data = gpg.decrypt(input, passphrase=passphrase)
396 if not dec_data.ok:
397 raise ROSBagEncryptException('Failed to decrypt bag: {}. Have you installed a required private key?'.format(dec_data.status))
398 return dec_data.data
399
401 """
402 Bag serialize messages to and from a single file on disk using the bag format.
403 """
404 - def __init__(self, f, mode='r', compression=Compression.NONE, chunk_threshold=768 * 1024, allow_unindexed=False, options=None, skip_index=False):
405 """
406 Open a bag file. The mode can be 'r', 'w', or 'a' for reading (default),
407 writing or appending. The file will be created if it doesn't exist
408 when opened for writing or appending; it will be truncated when opened
409 for writing. Simultaneous reading and writing is allowed when in writing
410 or appending mode.
411 @param f: filename of bag to open or a stream to read from
412 @type f: str or file
413 @param mode: mode, either 'r', 'w', or 'a'
414 @type mode: str
415 @param compression: compression mode, see U{rosbag.Compression} for valid modes
416 @type compression: str
417 @param chunk_threshold: minimum number of uncompressed bytes per chunk
418 @type chunk_threshold: int
419 @param allow_unindexed: if True, allow opening unindexed bags
420 @type allow_unindexed: bool
421 @param options: the bag options (currently: compression and chunk_threshold)
422 @type options: dict
423 @param skip_index: if True, don't read the connection index records on open [2.0+]
424 @type skip_index: bool
425 @raise ValueError: if any argument is invalid
426 @raise ROSBagException: if an error occurs opening file
427 @raise ROSBagFormatException: if bag format is corrupted
428 """
429 if options is not None:
430 if type(options) is not dict:
431 raise ValueError('options must be of type dict')
432 if 'compression' in options:
433 compression = options['compression']
434 if 'chunk_threshold' in options:
435 chunk_threshold = options['chunk_threshold']
436
437 self._file = None
438 self._filename = None
439 self._version = None
440
441 allowed_compressions = [Compression.NONE, Compression.BZ2]
442 if found_lz4:
443 allowed_compressions.append(Compression.LZ4)
444 if compression not in allowed_compressions:
445 raise ValueError('compression must be one of: %s' % ', '.join(allowed_compressions))
446 self._compression = compression
447
448 if chunk_threshold < 0:
449 raise ValueError('chunk_threshold must be greater than or equal to zero')
450 self._chunk_threshold = chunk_threshold
451
452 self._skip_index = skip_index
453
454 self._reader = None
455
456 self._file_header_pos = None
457 self._index_data_pos = 0
458
459 self._clear_index()
460
461 self._buffer = StringIO()
462
463 self._curr_compression = Compression.NONE
464
465 self._encryptor = _ROSBagNoEncryptor()
466
467 self._open(f, mode, allow_unindexed)
468
469 self._output_file = self._file
470
473
476
477 - def __exit__(self, exc_type, exc_value, traceback):
479
480 @property
482 """Get the options."""
483 return { 'compression' : self._compression, 'chunk_threshold' : self._chunk_threshold }
484
485 @property
487 """Get the filename."""
488 return self._filename
489
490 @property
492 """Get the version."""
493 return self._version
494
495 @property
497 """Get the mode."""
498 return self._mode
499
500 @property
502 """Get the size in bytes."""
503 if not self._file:
504 raise ValueError('I/O operation on closed bag')
505
506 pos = self._file.tell()
507 self._file.seek(0, os.SEEK_END)
508 size = self._file.tell()
509 self._file.seek(pos)
510 return size
511
512
513
515 """Get the compression method to use for writing."""
516 return self._compression
517
528
529 compression = property(_get_compression, _set_compression)
530
531
532
534 """Get the chunk threshold to use for writing."""
535 return self._chunk_threshold
536
538 """Set the chunk threshold to use for writing."""
539 if chunk_threshold < 0:
540 raise ValueError('chunk_threshold must be greater than or equal to zero')
541
542 self.flush()
543 self._chunk_threshold = chunk_threshold
544
545 chunk_threshold = property(_get_chunk_threshold, _set_chunk_threshold)
546
547 - def read_messages(self, topics=None, start_time=None, end_time=None, connection_filter=None, raw=False, return_connection_header=False):
548 """
549 Read messages from the bag, optionally filtered by topic, timestamp and connection details.
550 @param topics: list of topics or a single topic. if an empty list is given all topics will be read [optional]
551 @type topics: list(str) or str
552 @param start_time: earliest timestamp of message to return [optional]
553 @type start_time: U{genpy.Time}
554 @param end_time: latest timestamp of message to return [optional]
555 @type end_time: U{genpy.Time}
556 @param connection_filter: function to filter connections to include [optional]
557 @type connection_filter: function taking (topic, datatype, md5sum, msg_def, header) and returning bool
558 @param raw: if True, then generate tuples of (datatype, (data, md5sum, position), pytype)
559 @type raw: bool
560 @return: generator of BagMessage(topic, message, timestamp) namedtuples for each message in the bag file
561 @rtype: generator of tuples of (str, U{genpy.Message}, U{genpy.Time}) [not raw] or (str, (str, str, str, tuple, class), U{genpy.Time}) [raw]
562 """
563 self.flush()
564
565 if topics and type(topics) is str:
566 topics = [topics]
567
568 return self._reader.read_messages(topics, start_time, end_time, connection_filter, raw, return_connection_header)
569
571 """
572 Write the open chunk to disk so subsequent reads will read all messages.
573 @raise ValueError: if bag is closed
574 """
575 if not self._file:
576 raise ValueError('I/O operation on closed bag')
577
578 if self._chunk_open:
579 self._stop_writing_chunk()
580
581 - def write(self, topic, msg, t=None, raw=False, connection_header=None):
582 """
583 Write a message to the bag.
584 @param topic: name of topic
585 @type topic: str
586 @param msg: message to add to bag, or tuple (if raw)
587 @type msg: Message or tuple of raw message data
588 @param t: ROS time of message publication, if None specified, use current time [optional]
589 @type t: U{genpy.Time}
590 @param raw: if True, msg is in raw format, i.e. (msg_type, serialized_bytes, md5sum, pytype)
591 @type raw: bool
592 @raise ValueError: if arguments are invalid or bag is closed
593 """
594 if not self._file:
595 raise ValueError('I/O operation on closed bag')
596
597 if not topic:
598 raise ValueError('topic is invalid')
599 if not msg:
600 raise ValueError('msg is invalid')
601
602 if t is None:
603 t = genpy.Time.from_sec(time.time())
604
605
606 self._file.seek(0, os.SEEK_END)
607
608
609 if not self._chunk_open:
610 self._start_writing_chunk(t)
611
612
613 if raw:
614 if len(msg) == 5:
615 msg_type, serialized_bytes, md5sum, pos, pytype = msg
616 elif len(msg) == 4:
617 msg_type, serialized_bytes, md5sum, pytype = msg
618 else:
619 raise ValueError('msg must be of length 4 or 5')
620
621
622 if topic in self._topic_connections:
623 connection_info = self._topic_connections[topic]
624 conn_id = connection_info.id
625 else:
626 conn_id = len(self._connections)
627
628 if raw:
629 if pytype is None:
630 try:
631 pytype = genpy.message.get_message_class(msg_type)
632 except Exception:
633 pytype = None
634 if pytype is None:
635 raise ROSBagException('cannot locate message class and no message class provided for [%s]' % msg_type)
636
637 if pytype._md5sum != md5sum:
638 print('WARNING: md5sum of loaded type [%s] does not match that specified' % msg_type, file=sys.stderr)
639
640
641 header = connection_header if connection_header is not None else {
642 'topic': topic,
643 'type': msg_type,
644 'md5sum': md5sum,
645 'message_definition': pytype._full_text
646 }
647 else:
648 header = connection_header if connection_header is not None else {
649 'topic': topic,
650 'type': msg.__class__._type,
651 'md5sum': msg.__class__._md5sum,
652 'message_definition': msg._full_text
653 }
654
655 connection_info = _ConnectionInfo(conn_id, topic, header)
656
657 self._write_connection_record(connection_info, False)
658
659 self._connections[conn_id] = connection_info
660 self._topic_connections[topic] = connection_info
661
662
663 index_entry = _IndexEntry200(t, self._curr_chunk_info.pos, self._get_chunk_offset())
664
665
666 if conn_id not in self._curr_chunk_connection_indexes:
667
668 self._curr_chunk_connection_indexes[conn_id] = [index_entry]
669 self._curr_chunk_info.connection_counts[conn_id] = 1
670 else:
671 curr_chunk_connection_index = self._curr_chunk_connection_indexes[conn_id]
672 if index_entry >= curr_chunk_connection_index[-1]:
673
674 curr_chunk_connection_index.append(index_entry)
675 else:
676 bisect.insort_right(curr_chunk_connection_index, index_entry)
677
678 self._curr_chunk_info.connection_counts[conn_id] += 1
679
680 if conn_id not in self._connection_indexes:
681 self._connection_indexes[conn_id] = [index_entry]
682 else:
683 bisect.insort_right(self._connection_indexes[conn_id], index_entry)
684
685
686 if t > self._curr_chunk_info.end_time:
687 self._curr_chunk_info.end_time = t
688 elif t < self._curr_chunk_info.start_time:
689 self._curr_chunk_info.start_time = t
690
691 if not raw:
692
693 self._buffer.seek(0)
694 self._buffer.truncate(0)
695 msg.serialize(self._buffer)
696 serialized_bytes = self._buffer.getvalue()
697
698
699 self._write_message_data_record(conn_id, t, serialized_bytes)
700
701
702 chunk_size = self._get_chunk_offset()
703 if chunk_size > self._chunk_threshold:
704 self._stop_writing_chunk()
705
707 """
708 Reindexes the bag file. Yields position of each chunk for progress.
709 """
710 self._clear_index()
711 return self._reader.reindex()
712
714 """
715 Close the bag file. Closing an already closed bag does nothing.
716 """
717 if self._file:
718 if self._mode in 'wa':
719 self._stop_writing()
720
721 self._close_file()
722
724 """
725 Returns information about the compression of the bag
726 @return: generator of CompressionTuple(compression, uncompressed, compressed) describing the
727 type of compression used, size of the uncompressed data in Bytes, size of the compressed data in Bytes. If
728 no compression is used then uncompressed and compressed data will be equal.
729 @rtype: generator of CompressionTuple of (str, int, int)
730 """
731
732 compression = self.compression
733 uncompressed = 0
734 compressed = 0
735
736 if self._chunk_headers:
737 compression_counts = {}
738 compression_uncompressed = {}
739 compression_compressed = {}
740
741
742
743 for chunk_header in self._chunk_headers.values():
744 if chunk_header.compression not in compression_counts:
745 compression_counts[chunk_header.compression] = 0
746 if chunk_header.compression not in compression_uncompressed:
747 compression_uncompressed[chunk_header.compression] = 0
748 if chunk_header.compression not in compression_compressed:
749 compression_compressed[chunk_header.compression] = 0
750
751 compression_counts[chunk_header.compression] += 1
752 compression_uncompressed[chunk_header.compression] += chunk_header.uncompressed_size
753 uncompressed += chunk_header.uncompressed_size
754 compression_compressed[chunk_header.compression] += chunk_header.compressed_size
755 compressed += chunk_header.compressed_size
756
757 chunk_count = len(self._chunk_headers)
758
759 main_compression_count, main_compression = sorted([(v, k) for k, v in compression_counts.items()], reverse=True)[0]
760 compression = str(main_compression)
761
762 return collections.namedtuple("CompressionTuple", ["compression",
763 "uncompressed", "compressed"])(compression=compression,
764 uncompressed=uncompressed, compressed=compressed)
765
767 """
768 Returns the number of messages in the bag. Can be filtered by Topic
769 @param topic_filters: One or more topics to filter by
770 @type topic_filters: Could be either a single str or a list of str.
771 @return: The number of messages in the bag, optionally filtered by topic
772 @rtype: int
773 """
774
775 num_messages = 0
776
777 if topic_filters is not None:
778 info = self.get_type_and_topic_info(topic_filters=topic_filters)
779 for topic in info.topics.values():
780 num_messages += topic.message_count
781 else:
782 if self._chunks:
783 for c in self._chunks:
784 for counts in c.connection_counts.values():
785 num_messages += counts
786 else:
787 num_messages = sum([len(index) for index in self._connection_indexes.values()])
788
789 return num_messages
790
792 """
793 Returns the start time of the bag.
794 @return: a timestamp of the start of the bag
795 @rtype: float, timestamp in seconds, includes fractions of a second
796 """
797
798 if self._chunks:
799 start_stamp = self._chunks[0].start_time.to_sec()
800 else:
801 if not self._connection_indexes:
802 raise ROSBagException('Bag contains no message')
803 start_stamps = [index[0].time.to_sec() for index in self._connection_indexes.values() if index]
804 start_stamp = min(start_stamps) if start_stamps else 0
805
806 return start_stamp
807
809 """
810 Returns the end time of the bag.
811 @return: a timestamp of the end of the bag
812 @rtype: float, timestamp in seconds, includes fractions of a second
813 """
814
815 if self._chunks:
816 end_stamp = self._chunks[-1].end_time.to_sec()
817 else:
818 if not self._connection_indexes:
819 raise ROSBagException('Bag contains no message')
820 end_stamps = [index[-1].time.to_sec() for index in self._connection_indexes.values() if index]
821 end_stamp = max(end_stamps) if end_stamps else 0
822
823 return end_stamp
824
826 """
827 Coallates info about the type and topics in the bag.
828
829 Note, it would be nice to filter by type as well, but there appear to be some limitations in the current architecture
830 that prevent that from working when more than one message type is written on the same topic.
831
832 @param topic_filters: specify one or more topic to filter by.
833 @type topic_filters: either a single str or a list of str.
834 @return: generator of TypesAndTopicsTuple(types{key:type name, val: md5hash},
835 topics{type: msg type (Ex. "std_msgs/String"),
836 message_count: the number of messages of the particular type,
837 connections: the number of connections,
838 frequency: the data frequency,
839 key: type name,
840 val: md5hash}) describing the types of messages present
841 and information about the topics
842 @rtype: TypesAndTopicsTuple(dict(str, str),
843 TopicTuple(str, int, int, float, str, str))
844 """
845
846 datatypes = set()
847 datatype_infos = []
848
849 for connection in self._connections.values():
850 if connection.datatype in datatypes:
851 continue
852
853 datatype_infos.append((connection.datatype, connection.md5sum, connection.msg_def))
854 datatypes.add(connection.datatype)
855
856 topics = []
857
858 if topic_filters is not None:
859 if not isinstance(topic_filters, list):
860 topic_filters = [topic_filters]
861
862 topics = topic_filters
863 else:
864 topics = [c.topic for c in self._get_connections()]
865
866 topics = sorted(set(topics))
867
868 topic_datatypes = {}
869 topic_conn_counts = {}
870 topic_msg_counts = {}
871 topic_freqs_median = {}
872
873 for topic in topics:
874 connections = list(self._get_connections(topic))
875
876 if not connections:
877 continue
878
879 topic_datatypes[topic] = connections[0].datatype
880 topic_conn_counts[topic] = len(connections)
881
882 msg_count = 0
883 for connection in connections:
884 for chunk in self._chunks:
885 msg_count += chunk.connection_counts.get(connection.id, 0)
886
887 topic_msg_counts[topic] = msg_count
888
889 if self._connection_indexes_read:
890 stamps = [entry.time.to_sec() for entry in self._get_entries(connections)]
891 if len(stamps) > 1:
892 periods = [s1 - s0 for s1, s0 in zip(stamps[1:], stamps[:-1])]
893 med_period = _median(periods)
894 if med_period > 0.0:
895 topic_freqs_median[topic] = 1.0 / med_period
896
897
898 types = {}
899 for datatype, md5sum, msg_def in sorted(datatype_infos):
900 types[datatype] = md5sum
901
902
903 topics_t = {}
904 TopicTuple = collections.namedtuple("TopicTuple", ["msg_type", "message_count", "connections", "frequency"])
905 for topic in sorted(topic_datatypes.keys()):
906 topic_msg_count = topic_msg_counts[topic]
907 frequency = topic_freqs_median[topic] if topic in topic_freqs_median else None
908 topics_t[topic] = TopicTuple(msg_type=topic_datatypes[topic],
909 message_count=topic_msg_count,
910 connections=topic_conn_counts[topic],
911 frequency=frequency)
912
913 return collections.namedtuple("TypesAndTopicsTuple", ["msg_types", "topics"])(msg_types=types, topics=topics_t)
914
916 if self._chunks:
917 raise ROSBagException('Cannot set encryptor after chunks are written')
918 if encryptor is None:
919 self._encryptor = _ROSBagNoEncryptor()
920 elif encryptor == _ROSBagAesCbcEncryptor.NAME:
921 if sys.platform == 'win32':
922 raise ROSBagEncryptNotSupportedException('AES CBC encryptor is not supported for Windows')
923 else:
924 self._encryptor = _ROSBagAesCbcEncryptor()
925 else:
926 self._encryptor = _ROSBagNoEncryptor()
927 self._encryptor.initialize(self, param)
928
930 rows = []
931
932 try:
933 if self._filename:
934 rows.append(('path', self._filename))
935
936 if self._version == 102 and type(self._reader) == _BagReader102_Unindexed:
937 rows.append(('version', '1.2 (unindexed)'))
938 else:
939 rows.append(('version', '%d.%d' % (int(self._version / 100), self._version % 100)))
940
941 if not self._connection_indexes and not self._chunks:
942 rows.append(('size', _human_readable_size(self.size)))
943 else:
944 if self._chunks:
945 start_stamp = self._chunks[ 0].start_time.to_sec()
946 end_stamp = self._chunks[-1].end_time.to_sec()
947 else:
948 start_stamps = [index[0].time.to_sec() for index in self._connection_indexes.values() if index]
949 start_stamp = min(start_stamps) if start_stamps else 0
950 end_stamps = [index[-1].time.to_sec() for index in self._connection_indexes.values() if index]
951 end_stamp = max(end_stamps) if end_stamps else 0
952
953
954 duration = end_stamp - start_stamp
955 dur_secs = duration % 60
956 dur_mins = int(duration / 60)
957 dur_hrs = int(dur_mins / 60)
958 if dur_hrs > 0:
959 dur_mins = dur_mins % 60
960 duration_str = '%dhr %d:%02ds (%ds)' % (dur_hrs, dur_mins, dur_secs, duration)
961 elif dur_mins > 0:
962 duration_str = '%d:%02ds (%ds)' % (dur_mins, dur_secs, duration)
963 else:
964 duration_str = '%.1fs' % duration
965
966 rows.append(('duration', duration_str))
967
968
969 rows.append(('start', '%s (%.2f)' % (_time_to_str(start_stamp), start_stamp)))
970 rows.append(('end', '%s (%.2f)' % (_time_to_str(end_stamp), end_stamp)))
971
972 rows.append(('size', _human_readable_size(self.size)))
973
974 if self._chunks:
975 num_messages = 0
976 for c in self._chunks:
977 for counts in c.connection_counts.values():
978 num_messages += counts
979 else:
980 num_messages = sum([len(index) for index in self._connection_indexes.values()])
981 rows.append(('messages', str(num_messages)))
982
983
984 if len(self._chunk_headers) == 0:
985 rows.append(('compression', 'none'))
986 else:
987 compression_counts = {}
988 compression_uncompressed = {}
989 compression_compressed = {}
990 for chunk_header in self._chunk_headers.values():
991 if chunk_header.compression not in compression_counts:
992 compression_counts[chunk_header.compression] = 1
993 compression_uncompressed[chunk_header.compression] = chunk_header.uncompressed_size
994 compression_compressed[chunk_header.compression] = chunk_header.compressed_size
995 else:
996 compression_counts[chunk_header.compression] += 1
997 compression_uncompressed[chunk_header.compression] += chunk_header.uncompressed_size
998 compression_compressed[chunk_header.compression] += chunk_header.compressed_size
999
1000 chunk_count = len(self._chunk_headers)
1001
1002 compressions = []
1003 for count, compression in reversed(sorted([(v, k) for k, v in compression_counts.items()])):
1004 if compression != Compression.NONE:
1005 fraction = (100.0 * compression_compressed[compression]) / compression_uncompressed[compression]
1006 compressions.append('%s [%d/%d chunks; %.2f%%]' % (compression, count, chunk_count, fraction))
1007 else:
1008 compressions.append('%s [%d/%d chunks]' % (compression, count, chunk_count))
1009 rows.append(('compression', ', '.join(compressions)))
1010
1011 all_uncompressed = (sum([count for c, count in compression_counts.items() if c != Compression.NONE]) == 0)
1012 if not all_uncompressed:
1013 total_uncompressed_size = sum((h.uncompressed_size for h in self._chunk_headers.values()))
1014 total_compressed_size = sum((h.compressed_size for h in self._chunk_headers.values()))
1015
1016 total_uncompressed_size_str = _human_readable_size(total_uncompressed_size)
1017 total_compressed_size_str = _human_readable_size(total_compressed_size)
1018 total_size_str_length = max([len(total_uncompressed_size_str), len(total_compressed_size_str)])
1019
1020 if duration > 0:
1021 uncompressed_rate_str = _human_readable_size(total_uncompressed_size / duration)
1022 compressed_rate_str = _human_readable_size(total_compressed_size / duration)
1023 rate_str_length = max([len(uncompressed_rate_str), len(compressed_rate_str)])
1024
1025 rows.append(('uncompressed', '%*s @ %*s/s' %
1026 (total_size_str_length, total_uncompressed_size_str, rate_str_length, uncompressed_rate_str)))
1027 rows.append(('compressed', '%*s @ %*s/s (%.2f%%)' %
1028 (total_size_str_length, total_compressed_size_str, rate_str_length, compressed_rate_str, (100.0 * total_compressed_size) / total_uncompressed_size)))
1029 else:
1030 rows.append(('uncompressed', '%*s' % (total_size_str_length, total_uncompressed_size_str)))
1031 rows.append(('compressed', '%*s' % (total_size_str_length, total_compressed_size_str)))
1032
1033 self._encryptor.add_info_rows(rows)
1034
1035 datatypes = set()
1036 datatype_infos = []
1037 for connection in self._connections.values():
1038 if connection.datatype in datatypes:
1039 continue
1040 datatype_infos.append((connection.datatype, connection.md5sum, connection.msg_def))
1041 datatypes.add(connection.datatype)
1042
1043 topics = sorted(set([c.topic for c in self._get_connections()]))
1044 topic_datatypes = {}
1045 topic_conn_counts = {}
1046 topic_msg_counts = {}
1047 topic_freqs_median = {}
1048 for topic in topics:
1049 connections = list(self._get_connections(topic))
1050
1051 topic_datatypes[topic] = connections[0].datatype
1052 topic_conn_counts[topic] = len(connections)
1053
1054 msg_count = 0
1055 for connection in connections:
1056 if self._chunks:
1057 for chunk in self._chunks:
1058 msg_count += chunk.connection_counts.get(connection.id, 0)
1059 else:
1060 msg_count += len(self._connection_indexes.get(connection.id, []))
1061 topic_msg_counts[topic] = msg_count
1062
1063 if self._connection_indexes_read:
1064 stamps = [entry.time.to_sec() for entry in self._get_entries(connections)]
1065 if len(stamps) > 1:
1066 periods = [s1 - s0 for s1, s0 in zip(stamps[1:], stamps[:-1])]
1067 med_period = _median(periods)
1068 if med_period > 0.0:
1069 topic_freqs_median[topic] = 1.0 / med_period
1070
1071 topics = sorted(topic_datatypes.keys())
1072 max_topic_len = max([len(topic) for topic in topics])
1073 max_datatype_len = max([len(datatype) for datatype in datatypes])
1074 max_msg_count_len = max([len('%d' % msg_count) for msg_count in topic_msg_counts.values()])
1075 max_freq_median_len = max([len(_human_readable_frequency(freq)) for freq in topic_freqs_median.values()]) if len(topic_freqs_median) > 0 else 0
1076
1077
1078 for i, (datatype, md5sum, msg_def) in enumerate(sorted(datatype_infos)):
1079 s = '%-*s [%s]' % (max_datatype_len, datatype, md5sum)
1080 if i == 0:
1081 rows.append(('types', s))
1082 else:
1083 rows.append(('', s))
1084
1085
1086 for i, topic in enumerate(topics):
1087 topic_msg_count = topic_msg_counts[topic]
1088
1089 s = '%-*s %*d %s' % (max_topic_len, topic, max_msg_count_len, topic_msg_count, 'msgs' if topic_msg_count > 1 else 'msg ')
1090 if topic in topic_freqs_median:
1091 s += ' @ %*s' % (max_freq_median_len, _human_readable_frequency(topic_freqs_median[topic]))
1092 else:
1093 s += ' %*s' % (max_freq_median_len, '')
1094
1095 s += ' : %-*s' % (max_datatype_len, topic_datatypes[topic])
1096 if topic_conn_counts[topic] > 1:
1097 s += ' (%d connections)' % topic_conn_counts[topic]
1098
1099 if i == 0:
1100 rows.append(('topics', s))
1101 else:
1102 rows.append(('', s))
1103
1104 except Exception as ex:
1105 raise
1106
1107 first_column_width = max([len(field) for field, _ in rows]) + 1
1108
1109 s = ''
1110 for (field, value) in rows:
1111 if field:
1112 s += '%-*s %s\n' % (first_column_width, field + ':', value)
1113 else:
1114 s += '%-*s %s\n' % (first_column_width, '', value)
1115
1116 return s.rstrip()
1117
1119 s = ''
1120
1121 try:
1122 if self._filename:
1123 s += 'path: %s\n' % self._filename
1124
1125 if self._version == 102 and type(self._reader) == _BagReader102_Unindexed:
1126 s += 'version: 1.2 (unindexed)\n'
1127 else:
1128 s += 'version: %d.%d\n' % (int(self._version / 100), self._version % 100)
1129
1130 if not self._connection_indexes and not self._chunks:
1131 s += 'size: %d\n' % self.size
1132 s += 'indexed: False\n'
1133 else:
1134 if self._chunks:
1135 start_stamp = self._chunks[ 0].start_time.to_sec()
1136 end_stamp = self._chunks[-1].end_time.to_sec()
1137 else:
1138 start_stamps = [index[0].time.to_sec() for index in self._connection_indexes.values() if index]
1139 start_stamp = min(start_stamps) if start_stamps else 0
1140 end_stamps = [index[-1].time.to_sec() for index in self._connection_indexes.values() if index]
1141 end_stamp = max(end_stamps) if end_stamps else 0
1142
1143 duration = end_stamp - start_stamp
1144 s += 'duration: %.6f\n' % duration
1145 s += 'start: %.6f\n' % start_stamp
1146 s += 'end: %.6f\n' % end_stamp
1147 s += 'size: %d\n' % self.size
1148 if self._chunks:
1149 num_messages = 0
1150 for c in self._chunks:
1151 for counts in c.connection_counts.values():
1152 num_messages += counts
1153 else:
1154 num_messages = sum([len(index) for index in self._connection_indexes.values()])
1155 s += 'messages: %d\n' % num_messages
1156 s += 'indexed: True\n'
1157
1158
1159 if len(self._chunk_headers) == 0:
1160 s += 'compression: none\n'
1161 else:
1162 compression_counts = {}
1163 compression_uncompressed = {}
1164 compression_compressed = {}
1165 for chunk_header in self._chunk_headers.values():
1166 if chunk_header.compression not in compression_counts:
1167 compression_counts[chunk_header.compression] = 1
1168 compression_uncompressed[chunk_header.compression] = chunk_header.uncompressed_size
1169 compression_compressed[chunk_header.compression] = chunk_header.compressed_size
1170 else:
1171 compression_counts[chunk_header.compression] += 1
1172 compression_uncompressed[chunk_header.compression] += chunk_header.uncompressed_size
1173 compression_compressed[chunk_header.compression] += chunk_header.compressed_size
1174
1175 chunk_count = len(self._chunk_headers)
1176
1177 main_compression_count, main_compression = list(reversed(sorted([(v, k) for k, v in compression_counts.items()])))[0]
1178 s += 'compression: %s\n' % str(main_compression)
1179
1180 all_uncompressed = (sum([count for c, count in compression_counts.items() if c != Compression.NONE]) == 0)
1181 if not all_uncompressed:
1182 s += 'uncompressed: %d\n' % sum((h.uncompressed_size for h in self._chunk_headers.values()))
1183 s += 'compressed: %d\n' % sum((h.compressed_size for h in self._chunk_headers.values()))
1184
1185 s += self._encryptor.get_info_str()
1186
1187 datatypes = set()
1188 datatype_infos = []
1189 for connection in self._connections.values():
1190 if connection.datatype in datatypes:
1191 continue
1192 datatype_infos.append((connection.datatype, connection.md5sum, connection.msg_def))
1193 datatypes.add(connection.datatype)
1194
1195 topics = sorted(set([c.topic for c in self._get_connections()]))
1196 topic_datatypes = {}
1197 topic_conn_counts = {}
1198 topic_msg_counts = {}
1199 topic_freqs_median = {}
1200 for topic in topics:
1201 connections = list(self._get_connections(topic))
1202
1203 topic_datatypes[topic] = connections[0].datatype
1204 topic_conn_counts[topic] = len(connections)
1205
1206 msg_count = 0
1207 for connection in connections:
1208 for chunk in self._chunks:
1209 msg_count += chunk.connection_counts.get(connection.id, 0)
1210 topic_msg_counts[topic] = msg_count
1211
1212 if self._connection_indexes_read:
1213 stamps = [entry.time.to_sec() for entry in self._get_entries(connections)]
1214 if len(stamps) > 1:
1215 periods = [s1 - s0 for s1, s0 in zip(stamps[1:], stamps[:-1])]
1216 med_period = _median(periods)
1217 if med_period > 0.0:
1218 topic_freqs_median[topic] = 1.0 / med_period
1219
1220 topics = sorted(topic_datatypes.keys())
1221 max_topic_len = max([len(topic) for topic in topics])
1222 max_datatype_len = max([len(datatype) for datatype in datatypes])
1223 max_msg_count_len = max([len('%d' % msg_count) for msg_count in topic_msg_counts.values()])
1224 max_freq_median_len = max([len(_human_readable_frequency(freq)) for freq in topic_freqs_median.values()]) if len(topic_freqs_median) > 0 else 0
1225
1226
1227 s += 'types:\n'
1228 for i, (datatype, md5sum, msg_def) in enumerate(sorted(datatype_infos)):
1229 s += ' - type: %s\n' % datatype
1230 s += ' md5: %s\n' % md5sum
1231
1232
1233 s += 'topics:\n'
1234 for i, topic in enumerate(topics):
1235 topic_msg_count = topic_msg_counts[topic]
1236
1237 s += ' - topic: %s\n' % topic
1238 s += ' type: %s\n' % topic_datatypes[topic]
1239 s += ' messages: %d\n' % topic_msg_count
1240
1241 if topic_conn_counts[topic] > 1:
1242 s += ' connections: %d\n' % topic_conn_counts[topic]
1243
1244 if topic in topic_freqs_median:
1245 s += ' frequency: %.4f\n' % topic_freqs_median[topic]
1246
1247 if not key:
1248 return s
1249
1250 class DictObject(object):
1251 def __init__(self, d):
1252 for a, b in d.items():
1253 if isinstance(b, (list, tuple)):
1254 setattr(self, a, [DictObject(x) if isinstance(x, dict) else x for x in b])
1255 else:
1256 setattr(self, a, DictObject(b) if isinstance(b, dict) else b)
1257
1258 obj = DictObject(yaml.safe_load(s))
1259 try:
1260 val = eval('obj.' + key)
1261 except Exception as ex:
1262 print('Error getting key "%s"' % key, file=sys.stderr)
1263 return None
1264
1265 def print_yaml(val, indent=0):
1266 indent_str = ' ' * indent
1267
1268 if type(val) is list:
1269 s = ''
1270 for item in val:
1271 s += '%s- %s\n' % (indent_str, print_yaml(item, indent + 1))
1272 return s
1273 elif type(val) is DictObject:
1274 s = ''
1275 for i, (k, v) in enumerate(val.__dict__.items()):
1276 if i != 0:
1277 s += indent_str
1278 s += '%s: %s' % (k, str(v))
1279 if i < len(val.__dict__) - 1:
1280 s += '\n'
1281 return s
1282 else:
1283 return indent_str + str(val)
1284
1285 return print_yaml(val)
1286
1287 except Exception as ex:
1288 raise
1289
1290
1291
1292 @property
1294 if not self._chunk_headers:
1295 return False
1296
1297 return any((h.compression != Compression.NONE for h in self._chunk_headers.values()))
1298
1299 @property
1301 if not self._chunk_headers:
1302 return self.size
1303
1304 return sum((h.uncompressed_size for h in self._chunk_headers.values()))
1305
1306 - def _read_message(self, position, raw=False, return_connection_header=False):
1307 """
1308 Read the message from the given position in the file.
1309 """
1310 self.flush()
1311 return self._reader.seek_and_read_message_data_record(position, raw, return_connection_header)
1312
1313
1314
1316 """
1317 Yield the connections, optionally filtering by topic and/or connection information.
1318 """
1319 if topics:
1320 if type(topics) is str:
1321 topics = set([roslib.names.canonicalize_name(topics)])
1322 else:
1323 topics = set([roslib.names.canonicalize_name(t) for t in topics])
1324
1325 for c in self._connections.values():
1326 if topics and c.topic not in topics and roslib.names.canonicalize_name(c.topic) not in topics:
1327 continue
1328 if connection_filter and not connection_filter(c.topic, c.datatype, c.md5sum, c.msg_def, c.header):
1329 continue
1330 yield c
1331
1332 - def _get_entries(self, connections=None, start_time=None, end_time=None):
1333 """
1334 Yield index entries on the given connections in the given time range.
1335 """
1336 for entry, _ in _mergesort(self._get_indexes(connections), key=lambda entry: entry.time):
1337 if start_time and entry.time < start_time:
1338 continue
1339 if end_time and entry.time > end_time:
1340 return
1341 yield entry
1342
1344 """
1345 Yield index entries on the given connections in the given time range in reverse order.
1346 """
1347 for entry, _ in _mergesort((reversed(index) for index in self._get_indexes(connections)), key=lambda entry: -entry.time.to_sec()):
1348 if end_time and entry.time > end_time:
1349 continue
1350 if start_time and entry.time < start_time:
1351 return
1352 yield entry
1353
1354 - def _get_entry(self, t, connections=None):
1355 """
1356 Return the first index entry on/before the given time on the given connections
1357 """
1358 indexes = self._get_indexes(connections)
1359
1360 entry = _IndexEntry(t)
1361
1362 first_entry = None
1363
1364 for index in indexes:
1365 i = bisect.bisect_right(index, entry) - 1
1366 if i >= 0:
1367 index_entry = index[i]
1368 if first_entry is None or index_entry > first_entry:
1369 first_entry = index_entry
1370
1371 return first_entry
1372
1373 - def _get_entry_after(self, t, connections=None):
1374 """
1375 Return the first index entry after the given time on the given connections
1376 """
1377 indexes = self._get_indexes(connections)
1378
1379 entry = _IndexEntry(t)
1380
1381 first_entry = None
1382
1383 for index in indexes:
1384 i = bisect.bisect_right(index, entry)
1385 if i <= len(index) - 1:
1386 index_entry = index[i]
1387 if first_entry is None or index_entry < first_entry:
1388 first_entry = index_entry
1389
1390 return first_entry
1391
1393 """
1394 Get the indexes for the given connections.
1395 """
1396 if not self._connection_indexes_read:
1397 self._reader._read_connection_index_records()
1398
1399 if connections is None:
1400 return self._connection_indexes.values()
1401 else:
1402 return (self._connection_indexes[c.id] for c in connections)
1403
1404
1405
1407 self._connection_indexes_read = False
1408 self._connection_indexes = {}
1409
1410 self._topic_connections = {}
1411 self._connections = {}
1412 self._connection_count = 0
1413 self._chunk_count = 0
1414 self._chunks = []
1415 self._chunk_headers = {}
1416
1417 self._chunk_open = False
1418 self._curr_chunk_info = None
1419 self._curr_chunk_data_pos = None
1420 self._curr_chunk_connection_indexes = {}
1421
1422 - def _open(self, f, mode, allow_unindexed):
1423 if not f:
1424 raise ValueError('filename (or stream) is invalid')
1425
1426 try:
1427 if mode == 'r': self._open_read(f, allow_unindexed)
1428 elif mode == 'w': self._open_write(f)
1429 elif mode == 'a': self._open_append(f, allow_unindexed)
1430 else:
1431 raise ValueError('mode "%s" is invalid' % mode)
1432 if 'b' not in self._file.mode and not isinstance('', bytes):
1433 raise ROSBagException('In Python 3 the bag file must be opened in binary mode')
1434 except struct.error:
1435 raise ROSBagFormatException('error with bag')
1436
1438 try:
1439 return isinstance(f, file)
1440 except NameError:
1441 import io
1442 return isinstance(f, io.IOBase)
1443
1445 if self._is_file(f):
1446 self._file = f
1447 self._filename = None
1448 else:
1449 self._file = open(f, 'rb')
1450 self._filename = f
1451
1452 self._mode = 'r'
1453
1454 try:
1455 self._version = self._read_version()
1456 except:
1457 self._version = None
1458 self._close_file()
1459 raise
1460
1461 try:
1462 self._create_reader()
1463 self._reader.start_reading()
1464 except ROSBagUnindexedException as ex:
1465 if not allow_unindexed:
1466 self._close_file()
1467 raise
1468 except:
1469 self._close_file()
1470 raise
1471
1473 if self._is_file(f):
1474 self._file = f
1475 self._filename = None
1476 else:
1477 self._file = open(f, 'w+b')
1478 self._filename = f
1479
1480 self._mode = 'w'
1481
1482 self._version = 200
1483
1484 try:
1485 self._create_reader()
1486 self._start_writing()
1487 except:
1488 self._close_file()
1489 raise
1490
1492 if self._is_file(f):
1493 self._file = f
1494 self._filename = None
1495 else:
1496 try:
1497
1498 open(f, 'r').close()
1499
1500
1501 self._file = open(f, 'r+b')
1502 except IOError:
1503
1504 self._file = open(f, 'w+b')
1505
1506 self._filename = f
1507
1508 self._mode = 'a'
1509
1510 try:
1511 self._version = self._read_version()
1512 except:
1513 self._version = None
1514 self._close_file()
1515 raise
1516
1517 if self._version != 200:
1518 self._file.close()
1519 raise ROSBagException('bag version %d is unsupported for appending' % self._version)
1520
1521 try:
1522 self._start_appending()
1523 except ROSBagUnindexedException:
1524 if not allow_unindexed:
1525 self._close_file()
1526 raise
1527 except:
1528 self._close_file()
1529 raise
1530
1532 self._file.close()
1533 self._file = None
1534
1536 """
1537 @raise ROSBagException: if the bag version is unsupported
1538 """
1539 major_version, minor_version = int(self._version / 100), self._version % 100
1540 if major_version == 2:
1541 self._reader = _BagReader200(self)
1542 elif major_version == 1:
1543 if minor_version == 1:
1544 raise ROSBagException('unsupported bag version %d. Please convert bag to newer format.' % self._version)
1545 else:
1546
1547 first_record_pos = self._file.tell()
1548 header = _read_header(self._file)
1549 op = _read_uint8_field(header, 'op')
1550 self._file.seek(first_record_pos)
1551
1552 if op == _OP_FILE_HEADER:
1553 self._reader = _BagReader102_Indexed(self)
1554 else:
1555 self._reader = _BagReader102_Unindexed(self)
1556 else:
1557 raise ROSBagException('unknown bag version %d' % self._version)
1558
1560 """
1561 @raise ROSBagException: if the file is empty, or the version line can't be parsed
1562 """
1563 version_line = self._file.readline().rstrip().decode()
1564 if len(version_line) == 0:
1565 raise ROSBagException('empty file')
1566
1567 matches = re.match("#ROS(.*) V(\d).(\d)", version_line)
1568 if matches is None or len(matches.groups()) != 3:
1569 raise ROSBagException('This does not appear to be a bag file')
1570
1571 version_type, major_version_str, minor_version_str = matches.groups()
1572
1573 version = int(major_version_str) * 100 + int(minor_version_str)
1574
1575 return version
1576
1578 version = _VERSION + '\n'
1579 version = version.encode()
1580 self._file.write(version)
1581 self._file_header_pos = self._file.tell()
1582 self._write_file_header_record(0, 0, 0)
1583
1585
1586 self.flush()
1587
1588
1589
1590
1591
1592 self._file.seek(0, os.SEEK_CUR)
1593
1594
1595 self._index_data_pos = self._file.tell()
1596
1597
1598 for connection_info in self._connections.values():
1599
1600 self._write_connection_record(connection_info, True)
1601
1602
1603 for chunk_info in self._chunks:
1604 self._write_chunk_info_record(chunk_info)
1605
1606
1607 self._file.seek(self._file_header_pos)
1608 self._write_file_header_record(self._index_data_pos, len(self._connections), len(self._chunks))
1609
1611 self._file_header_pos = self._file.tell()
1612
1613 self._create_reader()
1614 self._reader.start_reading()
1615
1616
1617 self._file.truncate(self._index_data_pos)
1618 self._reader.index_data_pos = 0
1619
1620
1621 self._file.seek(self._file_header_pos);
1622 self._write_file_header_record(0, 0, 0)
1623
1624
1625 self._file.seek(0, os.SEEK_END)
1626
1628 self._curr_chunk_info = _ChunkInfo(self._file.tell(), t, t)
1629 self._write_chunk_header(_ChunkHeader(self._compression, 0, 0))
1630 self._curr_chunk_data_pos = self._file.tell()
1631 self._set_compression_mode(self._compression)
1632 self._chunk_open = True
1633
1635 if self._compression == Compression.NONE:
1636 return self._file.tell() - self._curr_chunk_data_pos
1637 else:
1638 return self._output_file.compressed_bytes_in
1639
1641
1642 self._chunks.append(self._curr_chunk_info)
1643
1644
1645 uncompressed_size = self._get_chunk_offset()
1646 self._set_compression_mode(Compression.NONE)
1647 compressed_size = self._file.tell() - self._curr_chunk_data_pos
1648
1649
1650
1651 compressed_size = self._encryptor.encrypt_chunk(compressed_size, self._curr_chunk_data_pos, self._file)
1652
1653
1654 end_of_chunk_pos = self._file.tell()
1655 self._file.seek(self._curr_chunk_info.pos)
1656 chunk_header = _ChunkHeader(self._compression, compressed_size, uncompressed_size, self._curr_chunk_data_pos)
1657 self._write_chunk_header(chunk_header)
1658 self._chunk_headers[self._curr_chunk_info.pos] = chunk_header
1659
1660
1661 self._file.seek(end_of_chunk_pos)
1662 for connection_id, entries in self._curr_chunk_connection_indexes.items():
1663 self._write_connection_index_record(connection_id, entries)
1664 self._curr_chunk_connection_indexes.clear()
1665
1666
1667 self._chunk_open = False
1668
1685
1687 header = {
1688 'op': _pack_uint8(_OP_FILE_HEADER),
1689 'index_pos': _pack_uint64(index_pos),
1690 'conn_count': _pack_uint32(connection_count),
1691 'chunk_count': _pack_uint32(chunk_count)
1692 }
1693 self._encryptor.add_fields_to_file_header(header)
1694 _write_record(self._file, header, padded_size=_FILE_HEADER_LENGTH)
1695
1697 header = {
1698 'op': _pack_uint8(_OP_CONNECTION),
1699 'topic': connection_info.topic,
1700 'conn': _pack_uint32(connection_info.id)
1701 }
1702 if encrypt:
1703 self._encryptor.write_encrypted_header(_write_header, self._output_file, header)
1704 else:
1705 _write_header(self._output_file, header)
1706
1707 if encrypt:
1708 self._encryptor.write_encrypted_header(_write_header, self._output_file, connection_info.header)
1709 else:
1710 _write_header(self._output_file, connection_info.header)
1711
1713 header = {
1714 'op': _pack_uint8(_OP_MSG_DATA),
1715 'conn': _pack_uint32(connection_id),
1716 'time': _pack_time(t)
1717 }
1718 _write_record(self._output_file, header, serialized_bytes)
1719
1721 header = {
1722 'op': _pack_uint8(_OP_CHUNK),
1723 'compression': chunk_header.compression,
1724 'size': _pack_uint32(chunk_header.uncompressed_size)
1725 }
1726 _write_header(self._file, header)
1727
1728 self._file.write(_pack_uint32(chunk_header.compressed_size))
1729
1731 header = {
1732 'op': _pack_uint8(_OP_INDEX_DATA),
1733 'conn': _pack_uint32(connection_id),
1734 'ver': _pack_uint32(_INDEX_VERSION),
1735 'count': _pack_uint32(len(entries))
1736 }
1737
1738 buffer = self._buffer
1739 buffer.seek(0)
1740 buffer.truncate(0)
1741 for entry in entries:
1742 buffer.write(_pack_time (entry.time))
1743 buffer.write(_pack_uint32(entry.offset))
1744
1745 _write_record(self._file, header, buffer.getvalue())
1746
1748 header = {
1749 'op': _pack_uint8 (_OP_CHUNK_INFO),
1750 'ver': _pack_uint32(_CHUNK_INDEX_VERSION),
1751 'chunk_pos': _pack_uint64(chunk_info.pos),
1752 'start_time': _pack_time(chunk_info.start_time),
1753 'end_time': _pack_time(chunk_info.end_time),
1754 'count': _pack_uint32(len(chunk_info.connection_counts))
1755 }
1756
1757 buffer = self._buffer
1758 buffer.seek(0)
1759 buffer.truncate(0)
1760 for connection_id, count in chunk_info.connection_counts.items():
1761 buffer.write(_pack_uint32(connection_id))
1762 buffer.write(_pack_uint32(count))
1763
1764 _write_record(self._file, header, buffer.getvalue())
1765
1766
1767
1768 _message_types = {}
1769
1770 _OP_MSG_DEF = 0x01
1771 _OP_MSG_DATA = 0x02
1772 _OP_FILE_HEADER = 0x03
1773 _OP_INDEX_DATA = 0x04
1774 _OP_CHUNK = 0x05
1775 _OP_CHUNK_INFO = 0x06
1776 _OP_CONNECTION = 0x07
1777
1778 _OP_CODES = {
1779 _OP_MSG_DEF: 'MSG_DEF',
1780 _OP_MSG_DATA: 'MSG_DATA',
1781 _OP_FILE_HEADER: 'FILE_HEADER',
1782 _OP_INDEX_DATA: 'INDEX_DATA',
1783 _OP_CHUNK: 'CHUNK',
1784 _OP_CHUNK_INFO: 'CHUNK_INFO',
1785 _OP_CONNECTION: 'CONNECTION'
1786 }
1787
1788 _VERSION = '#ROSBAG V2.0'
1789 _FILE_HEADER_LENGTH = 4096
1790 _INDEX_VERSION = 1
1791 _CHUNK_INDEX_VERSION = 1
1794 - def __init__(self, id, topic, header):
1795 try:
1796 datatype = _read_str_field(header, 'type')
1797 md5sum = _read_str_field(header, 'md5sum')
1798 msg_def = _read_str_field(header, 'message_definition')
1799 except KeyError as ex:
1800 raise ROSBagFormatException('connection header field %s not found' % str(ex))
1801
1802 self.id = id
1803 self.topic = topic
1804 self.datatype = datatype
1805 self.md5sum = md5sum
1806 self.msg_def = msg_def
1807 self.header = header
1808
1810 return '%d on %s: %s' % (self.id, self.topic, str(self.header))
1811
1813 - def __init__(self, pos, start_time, end_time):
1814 self.pos = pos
1815 self.start_time = start_time
1816 self.end_time = end_time
1817
1818 self.connection_counts = {}
1819
1821 s = 'chunk_pos: %d\n' % self.pos
1822 s += 'start_time: %s\n' % str(self.start_time)
1823 s += 'end_time: %s\n' % str(self.end_time)
1824 s += 'connections: %d\n' % len(self.connection_counts)
1825 s += '\n'.join([' - [%4d] %d' % (connection_id, count) for connection_id, count in self.connection_counts.items()])
1826 return s
1827
1830 self.compression = compression
1831 self.compressed_size = compressed_size
1832 self.uncompressed_size = uncompressed_size
1833 self.data_pos = data_pos
1834
1836 if self.uncompressed_size > 0:
1837 ratio = 100 * (float(self.compressed_size) / self.uncompressed_size)
1838 return 'compression: %s, size: %d, uncompressed: %d (%.2f%%)' % (self.compression, self.compressed_size, self.uncompressed_size, ratio)
1839 else:
1840 return 'compression: %s, size: %d, uncompressed: %d' % (self.compression, self.compressed_size, self.uncompressed_size)
1841
1843 __slots__ = []
1844
1846 try:
1847 return method(self._cmpkey(), other._cmpkey())
1848 except (AttributeError, TypeError):
1849
1850
1851 return NotImplemented
1852
1854 return self._compare(other, lambda s, o: s < o)
1855
1857 return self._compare(other, lambda s, o: s <= o)
1858
1860 return self._compare(other, lambda s, o: s == o)
1861
1863 return self._compare(other, lambda s, o: s >= o)
1864
1866 return self._compare(other, lambda s, o: s > o)
1867
1869 return self._compare(other, lambda s, o: s != o)
1870
1871 -class _IndexEntry(ComparableMixin):
1872 __slots__ = ['time']
1873
1874 - def __init__(self, time):
1876
1877 - def _cmpkey(self):
1879
1880 -class _IndexEntry102(_IndexEntry):
1881 __slots__ = ['offset']
1882
1883 - def __init__(self, time, offset):
1884 self.time = time
1885 self.offset = offset
1886
1887 @property
1888 - def position(self):
1890
1891 - def __str__(self):
1892 return '%d.%d: %d' % (self.time.secs, self.time.nsecs, self.offset)
1893
1894 -class _IndexEntry200(_IndexEntry):
1895 __slots__ = ['chunk_pos', 'offset']
1896
1897 - def __init__(self, time, chunk_pos, offset):
1898 self.time = time
1899 self.chunk_pos = chunk_pos
1900 self.offset = offset
1901
1902 @property
1903 - def position(self):
1904 return (self.chunk_pos, self.offset)
1905
1906 - def __str__(self):
1907 return '%d.%d: %d+%d' % (self.time.secs, self.time.nsecs, self.chunk_pos, self.offset)
1908
1910 message_type = _message_types.get(info.md5sum)
1911 if message_type is None:
1912 try:
1913 message_type = genpy.dynamic.generate_dynamic(info.datatype, info.msg_def)[info.datatype]
1914 if (message_type._md5sum != info.md5sum):
1915 print('WARNING: For type [%s] stored md5sum [%s] does not match message definition [%s].\n Try: "rosrun rosbag fix_msg_defs.py old_bag new_bag."'%(info.datatype, info.md5sum, message_type._md5sum), file=sys.stderr)
1916 except genmsg.InvalidMsgSpec:
1917 message_type = genpy.dynamic.generate_dynamic(info.datatype, "")[info.datatype]
1918 print('WARNING: For type [%s] stored md5sum [%s] has invalid message definition."'%(info.datatype, info.md5sum), file=sys.stderr)
1919 except genmsg.MsgGenerationException as ex:
1920 raise ROSBagException('Error generating datatype %s: %s' % (info.datatype, str(ex)))
1921
1922 _message_types[info.md5sum] = message_type
1923
1924 return message_type
1925
1929 -def _read_time (f): return _unpack_time (f.read(8))
1930
1932 -def _decode_str(v): return v if type(v) is str else v.decode()
1936 -def _unpack_time(v): return rospy.Time(*struct.unpack('<LL', v))
1937
1941 -def _pack_time(v): return _pack_uint32(v.secs) + _pack_uint32(v.nsecs)
1942
1944 data = f.read(size)
1945 if len(data) != size:
1946 raise ROSBagException('expecting %d bytes, read %d' % (size, len(data)))
1947 return data
1948
1950 _skip_sized(f)
1951 _skip_sized(f)
1952
1954 size = _read_uint32(f)
1955 f.seek(size, os.SEEK_CUR)
1956
1958 try:
1959 size = _read_uint32(f)
1960 except struct.error as ex:
1961 raise ROSBagFormatException('error unpacking uint32: %s' % str(ex))
1962 return _read(f, size)
1963
1965 if not isinstance(v, bytes):
1966 v = v.encode()
1967 f.write(_pack_uint32(len(v)))
1968 f.write(v)
1969
1971 if field not in header:
1972 raise ROSBagFormatException('expected "%s" field in record' % field)
1973
1974 try:
1975 value = unpack_fn(header[field])
1976 except Exception as ex:
1977 raise ROSBagFormatException('error reading field "%s": %s' % (field, str(ex)))
1978
1979 return value
1980
1981 -def _read_bytes_field (header, field): return _read_field(header, field, _decode_bytes)
1982 -def _read_str_field (header, field): return _read_field(header, field, _decode_str)
1983 -def _read_uint8_field (header, field): return _read_field(header, field, _unpack_uint8)
1986 -def _read_time_field (header, field): return _read_field(header, field, _unpack_time)
1987
1989 header_str = _write_header(f, header)
1990
1991 if padded_size is not None:
1992 header_len = len(header_str)
1993 if header_len < padded_size:
1994 data = ' ' * (padded_size - header_len)
1995 else:
1996 data = ''
1997
1998 _write_sized(f, data)
1999
2001 header_str = b''
2002 equal = b'='
2003 for k, v in header.items():
2004 if not isinstance(k, bytes):
2005 k = k.encode()
2006 if not isinstance(v, bytes):
2007 v = v.encode()
2008 header_str += _pack_uint32(len(k) + 1 + len(v)) + k + equal + v
2009 _write_sized(f, header_str)
2010 return header_str
2011
2013 bag_pos = f.tell()
2014
2015
2016 try:
2017 header = _read_sized(f)
2018 except ROSBagException as ex:
2019 raise ROSBagFormatException('Error reading header: %s' % str(ex))
2020
2021 return _build_header_from_str(header, req_op)
2022
2024
2025 header_dict = {}
2026 while header != b'':
2027
2028 if len(header) < 4:
2029 raise ROSBagFormatException('Error reading header field')
2030 (size,) = struct.unpack('<L', header[:4])
2031 header = header[4:]
2032
2033
2034 if len(header) < size:
2035 raise ROSBagFormatException('Error reading header field: expected %d bytes, read %d' % (size, len(header)))
2036 (name, sep, value) = header[:size].partition(b'=')
2037 if sep == b'':
2038 raise ROSBagFormatException('Error reading header field')
2039
2040 name = name.decode()
2041 header_dict[name] = value
2042
2043 header = header[size:]
2044
2045
2046 if req_op is not None:
2047 op = _read_uint8_field(header_dict, 'op')
2048 if req_op != op:
2049 raise ROSBagFormatException('Expected op code: %s, got %s' % (_OP_CODES[req_op], _OP_CODES[op]))
2050
2051 return header_dict
2052
2054 pos = f.tell()
2055 header = _read_header(f)
2056 op = _read_uint8_field(header, 'op')
2057 f.seek(pos)
2058 return op
2059
2061 try:
2062 record_data = _read_sized(f)
2063 except ROSBagException as ex:
2064 raise ROSBagFormatException('Error reading record data: %s' % str(ex))
2065
2066 return record_data
2067
2071
2073 raise NotImplementedError()
2074
2075 - def read_messages(self, topics, start_time, end_time, connection_filter, raw, return_connection_header=False):
2076 raise NotImplementedError()
2077
2079 raise NotImplementedError()
2080
2082 """
2083 Support class for reading unindexed v1.2 bag files.
2084 """
2087
2089 self.bag._file_header_pos = self.bag._file.tell()
2090
2092 """Generates all bag index information by rereading the message records."""
2093 f = self.bag._file
2094
2095 total_bytes = self.bag.size
2096
2097
2098 self.bag._file.seek(self.bag._file_header_pos)
2099
2100 offset = f.tell()
2101
2102
2103 while offset < total_bytes:
2104 yield offset
2105
2106 op = _peek_next_header_op(f)
2107
2108 if op == _OP_MSG_DEF:
2109 connection_info = self.read_message_definition_record()
2110
2111 if connection_info.topic not in self.bag._topic_connections:
2112 self.bag._topic_connections[connection_info.topic] = connection_info.id
2113 self.bag._connections[connection_info.id] = connection_info
2114 self.bag._connection_indexes[connection_info.id] = []
2115
2116 elif op == _OP_MSG_DATA:
2117
2118 header = _read_header(f)
2119
2120 topic = _read_str_field(header, 'topic')
2121 secs = _read_uint32_field(header, 'sec')
2122 nsecs = _read_uint32_field(header, 'nsec')
2123 t = genpy.Time(secs, nsecs)
2124
2125 if topic not in self.bag._topic_connections:
2126 datatype = _read_str_field(header, 'type')
2127 self._create_connection_info_for_datatype(topic, datatype)
2128
2129 connection_id = self.bag._topic_connections[topic]
2130 info = self.bag._connections[connection_id]
2131
2132
2133 _skip_sized(f)
2134
2135
2136 bisect.insort_right(self.bag._connection_indexes[connection_id], _IndexEntry102(t, offset))
2137
2138 offset = f.tell()
2139
2140 - def read_messages(self, topics, start_time, end_time, topic_filter, raw, return_connection_header=False):
2141 f = self.bag._file
2142
2143 f.seek(self.bag._file_header_pos)
2144
2145 while True:
2146
2147 while True:
2148 position = f.tell()
2149
2150 try:
2151 header = _read_header(f)
2152 except Exception:
2153 return
2154
2155 op = _read_uint8_field(header, 'op')
2156 if op != _OP_MSG_DEF:
2157 break
2158
2159 connection_info = self.read_message_definition_record(header)
2160
2161 if connection_info.topic not in self.bag._topic_connections:
2162 self.bag._topic_connections[connection_info.topic] = connection_info.id
2163
2164 self.bag._connections[connection_info.id] = connection_info
2165
2166
2167 if op != _OP_MSG_DATA:
2168 raise ROSBagFormatException('Expecting OP_MSG_DATA, got %d' % op)
2169
2170 topic = _read_str_field(header, 'topic')
2171
2172 if topic not in self.bag._topic_connections:
2173 datatype = _read_str_field(header, 'type')
2174 self._create_connection_info_for_datatype(topic, datatype)
2175
2176 connection_id = self.bag._topic_connections[topic]
2177 info = self.bag._connections[connection_id]
2178
2179
2180 try:
2181 msg_type = _get_message_type(info)
2182 except KeyError:
2183 raise ROSBagException('Cannot deserialize messages of type [%s]. Message was not preceded in bagfile by definition' % info.datatype)
2184
2185
2186 secs = _read_uint32_field(header, 'sec')
2187 nsecs = _read_uint32_field(header, 'nsec')
2188 t = genpy.Time(secs, nsecs)
2189
2190
2191 data = _read_record_data(f)
2192
2193 if raw:
2194 msg = (info.datatype, data, info.md5sum, position, msg_type)
2195 else:
2196
2197 msg = msg_type()
2198 msg.deserialize(data)
2199
2200 if return_connection_header:
2201 yield BagMessageWithConnectionHeader(topic, msg, t, info.header)
2202 else:
2203 yield BagMessage(topic, msg, t)
2204
2205 self.bag._connection_indexes_read = True
2206
2208 for c in self.bag._connections.values():
2209 if c.datatype == datatype:
2210 connection_id = len(self.bag._connections)
2211 connection_header = { 'topic' : topic, 'type' : c.header['type'], 'md5sum' : c.header['md5sum'], 'message_definition' : c.header['message_definition'] }
2212 connection_info = _ConnectionInfo(connection_id, topic, connection_header)
2213
2214 self.bag._topic_connections[topic] = connection_id
2215 self.bag._connections[connection_id] = connection_info
2216 self.bag._connection_indexes[connection_id] = []
2217 return
2218
2219 raise ROSBagFormatException('Topic %s of datatype %s not preceded by message definition' % (topic, datatype))
2220
2222 if not header:
2223 header = _read_header(self.bag._file, _OP_MSG_DEF)
2224
2225 topic = _read_str_field(header, 'topic')
2226 datatype = _read_str_field(header, 'type')
2227 md5sum = _read_str_field(header, 'md5')
2228 msg_def = _read_str_field(header, 'def')
2229
2230 _skip_sized(self.bag._file)
2231
2232 connection_header = { 'topic' : topic, 'type' : datatype, 'md5sum' : md5sum, 'message_definition' : msg_def }
2233
2234 id = len(self.bag._connections)
2235
2236 return _ConnectionInfo(id, topic, connection_header)
2237
2239 """
2240 Support class for reading indexed v1.2 bag files.
2241 """
2244
2245 - def read_messages(self, topics, start_time, end_time, connection_filter, raw):
2246 connections = self.bag._get_connections(topics, connection_filter)
2247 for entry in self.bag._get_entries(connections, start_time, end_time):
2248 yield self.seek_and_read_message_data_record(entry.offset, raw)
2249
2251 """Generates all bag index information by rereading the message records."""
2252 f = self.bag._file
2253
2254 total_bytes = self.bag.size
2255
2256
2257 self.bag._file.seek(self.bag._file_header_pos)
2258 self.read_file_header_record()
2259
2260 offset = f.tell()
2261
2262
2263 while offset < total_bytes:
2264 yield offset
2265
2266 op = _peek_next_header_op(f)
2267
2268 if op == _OP_MSG_DEF:
2269 connection_info = self.read_message_definition_record()
2270
2271 if connection_info.topic not in self.bag._topic_connections:
2272 self.bag._topic_connections[connection_info.topic] = connection_info.id
2273 self.bag._connections[connection_info.id] = connection_info
2274 self.bag._connection_indexes[connection_info.id] = []
2275
2276 elif op == _OP_MSG_DATA:
2277
2278 header = _read_header(f)
2279
2280 topic = _read_str_field(header, 'topic')
2281 secs = _read_uint32_field(header, 'sec')
2282 nsecs = _read_uint32_field(header, 'nsec')
2283 t = genpy.Time(secs, nsecs)
2284
2285 if topic not in self.bag._topic_connections:
2286 datatype = _read_str_field(header, 'type')
2287 self._create_connection_info_for_datatype(topic, datatype)
2288
2289 connection_id = self.bag._topic_connections[topic]
2290 info = self.bag._connections[connection_id]
2291
2292
2293 _skip_sized(f)
2294
2295
2296 bisect.insort_right(self.bag._connection_indexes[connection_id], _IndexEntry102(t, offset))
2297
2298 elif op == _OP_INDEX_DATA:
2299 _skip_record(f)
2300
2301 offset = f.tell()
2302
2304 try:
2305
2306 self.read_file_header_record()
2307
2308 total_bytes = self.bag.size
2309
2310
2311 if self.bag._index_data_pos == 0:
2312 raise ROSBagUnindexedException()
2313
2314
2315 self.bag._file.seek(self.bag._index_data_pos)
2316
2317
2318 topic_indexes = {}
2319 while True:
2320 pos = self.bag._file.tell()
2321 if pos >= total_bytes:
2322 break
2323
2324 topic, index = self.read_topic_index_record()
2325
2326 topic_indexes[topic] = index
2327
2328
2329 for topic, index in topic_indexes.items():
2330 self.bag._file.seek(index[0].offset)
2331
2332 connection_info = self.read_message_definition_record()
2333
2334 if connection_info.topic not in self.bag._topic_connections:
2335 self.bag._topic_connections[connection_info.topic] = connection_info.id
2336 self.bag._connections[connection_info.id] = connection_info
2337
2338 self.bag._connection_indexes[connection_info.id] = index
2339
2340 self.bag._connection_indexes_read = True
2341
2342 except Exception as ex:
2343 raise ROSBagUnindexedException()
2344
2346 self.bag._file_header_pos = self.bag._file.tell()
2347
2348 header = _read_header(self.bag._file, _OP_FILE_HEADER)
2349
2350 self.bag._index_data_pos = _read_uint64_field(header, 'index_pos')
2351
2352 _skip_sized(self.bag._file)
2353
2355 f = self.bag._file
2356
2357 header = _read_header(f, _OP_INDEX_DATA)
2358
2359 index_version = _read_uint32_field(header, 'ver')
2360 topic = _read_str_field (header, 'topic')
2361 count = _read_uint32_field(header, 'count')
2362
2363 if index_version != 0:
2364 raise ROSBagFormatException('expecting index version 0, got %d' % index_version)
2365
2366 _read_uint32(f)
2367
2368 topic_index = []
2369
2370 for i in range(count):
2371 time = _read_time (f)
2372 offset = _read_uint64(f)
2373
2374 topic_index.append(_IndexEntry102(time, offset))
2375
2376 return (topic, topic_index)
2377
2379 f = self.bag._file
2380
2381
2382 f.seek(position)
2383
2384
2385 while True:
2386 header = _read_header(f)
2387 op = _read_uint8_field(header, 'op')
2388 if op != _OP_MSG_DEF:
2389 break
2390 _skip_sized(f)
2391
2392
2393 if op != _OP_MSG_DATA:
2394 raise ROSBagFormatException('Expecting OP_MSG_DATA, got %d' % op)
2395
2396 topic = _read_str_field(header, 'topic')
2397
2398 connection_id = self.bag._topic_connections[topic]
2399 info = self.bag._connections[connection_id]
2400
2401
2402 try:
2403 msg_type = _get_message_type(info)
2404 except KeyError:
2405 raise ROSBagException('Cannot deserialize messages of type [%s]. Message was not preceded in bagfile by definition' % info.datatype)
2406
2407
2408 secs = _read_uint32_field(header, 'sec')
2409 nsecs = _read_uint32_field(header, 'nsec')
2410 t = genpy.Time(secs, nsecs)
2411
2412
2413 data = _read_record_data(f)
2414
2415 if raw:
2416 msg = info.datatype, data, info.md5sum, position, msg_type
2417 else:
2418
2419 msg = msg_type()
2420 msg.deserialize(data)
2421
2422 if return_connection_header:
2423 return BagMessageWithConnectionHeader(topic, msg, t, header)
2424 else:
2425 return BagMessage(topic, msg, t)
2426
2428 """
2429 Support class for reading v2.0 bag files.
2430 """
2432 _BagReader.__init__(self, bag)
2433
2434 self.decompressed_chunk_pos = None
2435 self.decompressed_chunk = None
2436 self.decompressed_chunk_io = None
2437
2439 """
2440 Generates all bag index information by rereading the chunks.
2441 Assumes the file header has been read.
2442 """
2443 f = self.bag._file
2444
2445 f.seek(0, os.SEEK_END)
2446 total_bytes = f.tell()
2447
2448
2449
2450
2451 self._read_terminal_connection_records()
2452
2453
2454 self.bag._file.seek(self.bag._file_header_pos)
2455 self.read_file_header_record()
2456
2457 trunc_pos = None
2458
2459 while True:
2460 chunk_pos = f.tell()
2461 if chunk_pos >= total_bytes:
2462 break
2463 yield chunk_pos
2464
2465 try:
2466 self._reindex_read_chunk(f, chunk_pos, total_bytes)
2467 except Exception as ex:
2468 break
2469
2470 trunc_pos = f.tell()
2471
2472 if trunc_pos and trunc_pos < total_bytes:
2473 f.truncate(trunc_pos)
2474
2476
2477 chunk_header = self.read_chunk_header()
2478
2479
2480 if chunk_header.compressed_size == 0:
2481 raise ROSBagException('unterminated chunk at %d' % chunk_pos)
2482
2483 if chunk_header.compression == Compression.NONE:
2484 encrypted_chunk = _read(f, chunk_header.compressed_size)
2485
2486 chunk = self.bag._encryptor.decrypt_chunk(encrypted_chunk)
2487
2488 if self.decompressed_chunk_io:
2489 self.decompressed_chunk_io.close()
2490 self.decompressed_chunk_io = StringIO(chunk)
2491
2492 chunk_file = self.decompressed_chunk_io
2493
2494 else:
2495
2496 encrypted_chunk = _read(f, chunk_header.compressed_size)
2497
2498 compressed_chunk = self.bag._encryptor.decrypt_chunk(encrypted_chunk)
2499
2500
2501 if chunk_header.compression == Compression.BZ2:
2502 self.decompressed_chunk = bz2.decompress(compressed_chunk)
2503 elif chunk_header.compression == Compression.LZ4 and found_lz4:
2504 self.decompressed_chunk = roslz4.decompress(compressed_chunk)
2505 else:
2506 raise ROSBagException('unsupported compression type: %s' % chunk_header.compression)
2507
2508 if self.decompressed_chunk_io:
2509 self.decompressed_chunk_io.close()
2510 self.decompressed_chunk_io = StringIO(self.decompressed_chunk)
2511
2512 chunk_file = self.decompressed_chunk_io
2513
2514
2515 self.bag._curr_chunk_info = None
2516
2517 offset = chunk_file.tell()
2518
2519 expected_index_length = 0
2520
2521 while offset < chunk_header.uncompressed_size:
2522 op = _peek_next_header_op(chunk_file)
2523 if op == _OP_CONNECTION:
2524
2525 connection_info = self.read_connection_record(chunk_file, False)
2526
2527 if connection_info.id not in self.bag._connections:
2528 self.bag._connections[connection_info.id] = connection_info
2529 if connection_info.id not in self.bag._connection_indexes:
2530 self.bag._connection_indexes[connection_info.id] = []
2531
2532 elif op == _OP_MSG_DATA:
2533
2534 header = _read_header(chunk_file)
2535
2536 connection_id = _read_uint32_field(header, 'conn')
2537 t = _read_time_field (header, 'time')
2538
2539
2540 if not self.bag._curr_chunk_info:
2541 self.bag._curr_chunk_info = _ChunkInfo(chunk_pos, t, t)
2542 else:
2543 if t > self.bag._curr_chunk_info.end_time:
2544 self.bag._curr_chunk_info.end_time = t
2545 elif t < self.bag._curr_chunk_info.start_time:
2546 self.bag._curr_chunk_info.start_time = t
2547 if connection_id in self.bag._curr_chunk_info.connection_counts:
2548 self.bag._curr_chunk_info.connection_counts[connection_id] += 1
2549 else:
2550 self.bag._curr_chunk_info.connection_counts[connection_id] = 1
2551
2552
2553 _skip_sized(chunk_file)
2554
2555
2556 if connection_id not in self.bag._connection_indexes:
2557 raise ROSBagException('connection id (id=%d) in chunk at position %d not preceded by connection record' % (connection_id, chunk_pos))
2558 bisect.insort_right(self.bag._connection_indexes[connection_id], _IndexEntry200(t, chunk_pos, offset))
2559
2560 expected_index_length += 1
2561
2562 else:
2563
2564 _skip_record(chunk_file)
2565
2566 offset = chunk_file.tell()
2567
2568
2569 next_op = _peek_next_header_op(f)
2570
2571 total_index_length = 0
2572
2573 while next_op != _OP_CHUNK:
2574 if next_op == _OP_INDEX_DATA:
2575
2576 _, index = self.read_connection_index_record()
2577 total_index_length += len(index)
2578 else:
2579 _skip_record(f)
2580
2581 if f.tell() >= total_bytes:
2582 if total_index_length != expected_index_length:
2583 raise ROSBagException('index shorter than expected (%d vs %d)' % (total_index_length, expected_index_length))
2584 break
2585
2586 next_op = _peek_next_header_op(f)
2587
2588
2589 self.bag._chunk_headers[chunk_pos] = chunk_header
2590 self.bag._chunks.append(self.bag._curr_chunk_info)
2591
2593 b, f, r = self.bag, self.bag._file, self.bag._reader
2594
2595
2596 f.seek(b._file_header_pos)
2597 r.read_file_header_record()
2598
2599
2600 if self._advance_to_next_record(_OP_CONNECTION):
2601
2602 while True:
2603 connection_info = r.read_connection_record(f, False)
2604
2605 b._connections[connection_info.id] = connection_info
2606 b._connection_indexes[connection_info.id] = []
2607
2608 next_op = _peek_next_header_op(f)
2609 if next_op != _OP_CONNECTION:
2610 break
2611
2613 b, f = self.bag, self.bag._file
2614
2615 try:
2616 while True:
2617 next_op = _peek_next_header_op(f)
2618 if next_op == op:
2619 break
2620
2621 if next_op == _OP_INDEX_DATA:
2622
2623
2624
2625 if b._curr_chunk_info is None:
2626 b._curr_chunk_info = _ChunkInfo(0, rospy.Time(0, 1), rospy.Time(0, 1))
2627
2628 b._reader.read_connection_index_record()
2629 else:
2630 _skip_record(f)
2631
2632 return True
2633
2634 except Exception as ex:
2635 return False
2636
2638 try:
2639
2640 self.read_file_header_record()
2641
2642
2643 if self.bag._index_data_pos == 0:
2644 raise ROSBagUnindexedException()
2645
2646
2647 self.bag._file.seek(self.bag._index_data_pos)
2648
2649
2650 self.bag._connection_indexes = {}
2651 for i in range(self.bag._connection_count):
2652
2653 connection_info = self.read_connection_record(self.bag._file, True)
2654 self.bag._connections[connection_info.id] = connection_info
2655 self.bag._connection_indexes[connection_info.id] = []
2656
2657
2658 self.bag._chunks = [self.read_chunk_info_record() for i in range(self.bag._chunk_count)]
2659
2660
2661 self.bag._chunk_headers = {}
2662 for chunk_info in self.bag._chunks:
2663 self.bag._file.seek(chunk_info.pos)
2664 self.bag._chunk_headers[chunk_info.pos] = self.read_chunk_header()
2665
2666 if not self.bag._skip_index:
2667 self._read_connection_index_records()
2668
2669 except ROSBagEncryptNotSupportedException:
2670 raise
2671 except ROSBagEncryptException:
2672 raise
2673 except Exception as ex:
2674 raise ROSBagUnindexedException()
2675
2677 for chunk_info in self.bag._chunks:
2678 self.bag._file.seek(chunk_info.pos)
2679 _skip_record(self.bag._file)
2680
2681 self.bag._curr_chunk_info = chunk_info
2682 for i in range(len(chunk_info.connection_counts)):
2683 connection_id, index = self.read_connection_index_record()
2684 self.bag._connection_indexes[connection_id].extend(index)
2685
2686
2687
2688
2689 orphan_connection_ids = [id for id, index in self.bag._connection_indexes.items() if not index]
2690 for id in orphan_connection_ids:
2691 del self.bag._connections[id]
2692 del self.bag._connection_indexes[id]
2693
2694 self.bag._connection_indexes_read = True
2695
2696 - def read_messages(self, topics, start_time, end_time, connection_filter, raw, return_connection_header=False):
2697 connections = self.bag._get_connections(topics, connection_filter)
2698 for entry in self.bag._get_entries(connections, start_time, end_time):
2699 yield self.seek_and_read_message_data_record((entry.chunk_pos, entry.offset), raw, return_connection_header)
2700
2701
2702
2704 self.bag._file_header_pos = self.bag._file.tell()
2705
2706 header = _read_header(self.bag._file, _OP_FILE_HEADER)
2707
2708 self.bag._index_data_pos = _read_uint64_field(header, 'index_pos')
2709 self.bag._chunk_count = _read_uint32_field(header, 'chunk_count')
2710 self.bag._connection_count = _read_uint32_field(header, 'conn_count')
2711 try:
2712 encryptor = _read_str_field(header, 'encryptor')
2713 self.bag.set_encryptor(encryptor)
2714 self.bag._encryptor.read_fields_from_file_header(header)
2715 except ROSBagFormatException:
2716
2717 pass
2718
2719 _skip_sized(self.bag._file)
2720
2722 if encrypt:
2723 header = self.bag._encryptor.read_encrypted_header(_read_header, f, _OP_CONNECTION)
2724 else:
2725 header = _read_header(f, _OP_CONNECTION)
2726
2727 conn_id = _read_uint32_field(header, 'conn')
2728 topic = _read_str_field (header, 'topic')
2729
2730 if encrypt:
2731 connection_header = self.bag._encryptor.read_encrypted_header(_read_header, f)
2732 else:
2733 connection_header = _read_header(f)
2734
2735 return _ConnectionInfo(conn_id, topic, connection_header)
2736
2738 f = self.bag._file
2739
2740 header = _read_header(f, _OP_CHUNK_INFO)
2741
2742 chunk_info_version = _read_uint32_field(header, 'ver')
2743
2744 if chunk_info_version == 1:
2745 chunk_pos = _read_uint64_field(header, 'chunk_pos')
2746 start_time = _read_time_field (header, 'start_time')
2747 end_time = _read_time_field (header, 'end_time')
2748 connection_count = _read_uint32_field(header, 'count')
2749
2750 chunk_info = _ChunkInfo(chunk_pos, start_time, end_time)
2751
2752 _read_uint32(f)
2753
2754 for i in range(connection_count):
2755 connection_id = _read_uint32(f)
2756 count = _read_uint32(f)
2757
2758 chunk_info.connection_counts[connection_id] = count
2759
2760 return chunk_info
2761 else:
2762 raise ROSBagFormatException('Unknown chunk info record version: %d' % chunk_info_version)
2763
2765 header = _read_header(self.bag._file, _OP_CHUNK)
2766
2767 compression = _read_str_field (header, 'compression')
2768 uncompressed_size = _read_uint32_field(header, 'size')
2769
2770 compressed_size = _read_uint32(self.bag._file)
2771
2772 data_pos = self.bag._file.tell()
2773
2774 return _ChunkHeader(compression, compressed_size, uncompressed_size, data_pos)
2775
2777 f = self.bag._file
2778
2779 header = _read_header(f, _OP_INDEX_DATA)
2780
2781 index_version = _read_uint32_field(header, 'ver')
2782 connection_id = _read_uint32_field(header, 'conn')
2783 count = _read_uint32_field(header, 'count')
2784
2785 if index_version != 1:
2786 raise ROSBagFormatException('expecting index version 1, got %d' % index_version)
2787
2788 record_size = _read_uint32(f)
2789
2790 index = []
2791
2792 for i in range(count):
2793 time = _read_time (f)
2794 offset = _read_uint32(f)
2795
2796 index.append(_IndexEntry200(time, self.bag._curr_chunk_info.pos, offset))
2797
2798 return (connection_id, index)
2799
2801 chunk_pos, offset = position
2802
2803 chunk_header = self.bag._chunk_headers.get(chunk_pos)
2804 if chunk_header is None:
2805 raise ROSBagException('no chunk at position %d' % chunk_pos)
2806
2807 if chunk_header.compression == Compression.NONE:
2808 if self.decompressed_chunk_pos != chunk_pos:
2809 f = self.bag._file
2810 f.seek(chunk_header.data_pos)
2811 encrypted_chunk = _read(f, chunk_header.compressed_size)
2812
2813 chunk = self.bag._encryptor.decrypt_chunk(encrypted_chunk)
2814
2815 self.decompressed_chunk_pos = chunk_pos
2816
2817 if self.decompressed_chunk_io:
2818 self.decompressed_chunk_io.close()
2819 self.decompressed_chunk_io = StringIO(chunk)
2820 else:
2821 if self.decompressed_chunk_pos != chunk_pos:
2822
2823 self.bag._file.seek(chunk_header.data_pos)
2824 encrypted_chunk = _read(self.bag._file, chunk_header.compressed_size)
2825
2826 compressed_chunk = self.bag._encryptor.decrypt_chunk(encrypted_chunk)
2827
2828 if chunk_header.compression == Compression.BZ2:
2829 self.decompressed_chunk = bz2.decompress(compressed_chunk)
2830 elif chunk_header.compression == Compression.LZ4 and found_lz4:
2831 self.decompressed_chunk = roslz4.decompress(compressed_chunk)
2832 else:
2833 raise ROSBagException('unsupported compression type: %s' % chunk_header.compression)
2834
2835 self.decompressed_chunk_pos = chunk_pos
2836
2837 if self.decompressed_chunk_io:
2838 self.decompressed_chunk_io.close()
2839 self.decompressed_chunk_io = StringIO(self.decompressed_chunk)
2840
2841 f = self.decompressed_chunk_io
2842 f.seek(offset)
2843
2844
2845 while True:
2846 header = _read_header(f)
2847 op = _read_uint8_field(header, 'op')
2848 if op != _OP_CONNECTION:
2849 break
2850 _skip_sized(f)
2851
2852
2853 if op != _OP_MSG_DATA:
2854 raise ROSBagFormatException('Expecting OP_MSG_DATA, got %d' % op)
2855
2856 connection_id = _read_uint32_field(header, 'conn')
2857 t = _read_time_field (header, 'time')
2858
2859
2860 connection_info = self.bag._connections[connection_id]
2861 try:
2862 msg_type = _get_message_type(connection_info)
2863 except KeyError:
2864 raise ROSBagException('Cannot deserialize messages of type [%s]. Message was not preceded in bag by definition' % connection_info.datatype)
2865
2866
2867 data = _read_record_data(f)
2868
2869
2870 if raw:
2871 msg = connection_info.datatype, data, connection_info.md5sum, (chunk_pos, offset), msg_type
2872 else:
2873 msg = msg_type()
2874 msg.deserialize(data)
2875
2876 if return_connection_header:
2877 return BagMessageWithConnectionHeader(connection_info.topic, msg, t, connection_info.header)
2878 else:
2879 return BagMessage(connection_info.topic, msg, t)
2880
2882 secs_frac = secs - int(secs)
2883 secs_frac_str = ('%.2f' % secs_frac)[1:]
2884
2885 return time.strftime('%b %d %Y %H:%M:%S', time.localtime(secs)) + secs_frac_str
2886
2888 multiple = 1024.0
2889 for suffix in ['KB', 'MB', 'GB', 'TB', 'PB', 'EB', 'ZB', 'YB']:
2890 size /= multiple
2891 if size < multiple:
2892 return '%.1f %s' % (size, suffix)
2893
2894 return '-'
2895
2897 multiple = 1000.0
2898 for suffix in ['Hz', 'kHz', 'MHz', 'GHz', 'THz', 'PHz', 'EHz', 'ZHz', 'YHz']:
2899 if freq < multiple:
2900 return '%.1f %s' % (freq, suffix)
2901 freq /= multiple
2902
2903 return '-'
2904
2907 """
2908 Perform an N-way merge operation on sorted lists.
2909
2910 @param list_of_lists: (really iterable of iterable) of sorted elements
2911 (either by naturally or by C{key})
2912 @param key: specify sort key function (like C{sort()}, C{sorted()})
2913 @param iterfun: function that returns an iterator.
2914
2915 Yields tuples of the form C{(item, iterator)}, where the iterator is the
2916 built-in list iterator or something you pass in, if you pre-generate the
2917 iterators.
2918
2919 This is a stable merge; complexity O(N lg N)
2920
2921 Examples::
2922
2923 print list(x[0] for x in mergesort([[1,2,3,4],
2924 [2,3.5,3.7,4.5,6,7],
2925 [2.6,3.6,6.6,9]]))
2926 [1, 2, 2, 2.6, 3, 3.5, 3.6, 3.7, 4, 4.5, 6, 6.6, 7, 9]
2927
2928 # note stability
2929 print list(x[0] for x in mergesort([[1,2,3,4],
2930 [2,3.5,3.7,4.5,6,7],
2931 [2.6,3.6,6.6,9]], key=int))
2932 [1, 2, 2, 2.6, 3, 3.5, 3.6, 3.7, 4, 4.5, 6, 6.6, 7, 9]
2933
2934 print list(x[0] for x in mergesort([[4,3,2,1],
2935 [7,6.5,4,3.7,3.3,1.9],
2936 [9,8.6,7.6,6.6,5.5,4.4,3.3]],
2937 key=lambda x: -x))
2938 [9, 8.6, 7.6, 7, 6.6, 6.5, 5.5, 4.4, 4, 4, 3.7, 3.3, 3.3, 3, 2, 1.9, 1]
2939 """
2940
2941 heap = []
2942 for i, itr in enumerate(iter(pl) for pl in list_of_lists):
2943 try:
2944 item = next(itr)
2945 toadd = (key(item), i, item, itr) if key else (item, i, itr)
2946 heap.append(toadd)
2947 except StopIteration:
2948 pass
2949 heapq.heapify(heap)
2950
2951 if key:
2952 while heap:
2953 _, idx, item, itr = heap[0]
2954 yield item, itr
2955 try:
2956 item = next(itr)
2957 heapq.heapreplace(heap, (key(item), idx, item, itr) )
2958 except StopIteration:
2959 heapq.heappop(heap)
2960
2961 else:
2962 while heap:
2963 item, idx, itr = heap[0]
2964 yield item, itr
2965 try:
2966 heapq.heapreplace(heap, (next(itr), idx, itr))
2967 except StopIteration:
2968 heapq.heappop(heap)
2969
2971 """
2972 A file facade for sequential compressors (e.g., bz2.BZ2Compressor).
2973 """
2975 self.file = file
2976 self.compressor = compressor
2977 self.compressed_bytes_in = 0
2978
2980 compressed = self.compressor.compress(data)
2981 if len(compressed) > 0:
2982 self.file.write(compressed)
2983 self.compressed_bytes_in += len(data)
2984
2986 compressed = self.compressor.flush()
2987 if len(compressed) > 0:
2988 self.file.write(compressed)
2989
3003