1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33 """
34 The rosbag Python API.
35
36 Provides serialization of bag files.
37 """
38
39 from __future__ import print_function
40
41 import bisect
42 import bz2
43 import heapq
44 import os
45 import re
46 import struct
47 import sys
48 import threading
49 import time
50 import yaml
51
52 try:
53 from cStringIO import StringIO
54 except ImportError:
55 from io import StringIO
56
57 import genmsg
58 import genpy
59 import genpy.dynamic
60 import genpy.message
61
62 import roslib.names
63 import rospy
66 """
67 Base class for exceptions in rosbag.
68 """
71
74
81
83 """
84 Exception for unindexed bags.
85 """
88
90 """
91 Allowable compression types
92 """
93 NONE = 'none'
94 BZ2 = 'bz2'
95
97 """
98 Bag serialize messages to and from a single file on disk using the bag format.
99 """
100 - def __init__(self, f, mode='r', compression=Compression.NONE, chunk_threshold=768 * 1024, allow_unindexed=False, options=None, skip_index=False):
101 """
102 Open a bag file. The mode can be 'r', 'w', or 'a' for reading (default),
103 writing or appending. The file will be created if it doesn't exist
104 when opened for writing or appending; it will be truncated when opened
105 for writing. Simultaneous reading and writing is allowed when in writing
106 or appending mode.
107 @param f: filename of bag to open or a stream to read from
108 @type f: str or file
109 @param mode: mode, either 'r', 'w', or 'a'
110 @type mode: str
111 @param compression: compression mode, see U{rosbag.Compression} for valid modes
112 @type compression: str
113 @param chunk_threshold: minimum number of uncompressed bytes per chunk
114 @type chunk_threshold: int
115 @param allow_unindexed: if True, allow opening unindexed bags
116 @type allow_unindexed: bool
117 @param options: the bag options (currently: compression and chunk_threshold)
118 @type options: dict
119 @param skip_index: if True, don't read the connection index records on open [2.0+]
120 @type skip_index: bool
121 @raise ValueError: if any argument is invalid
122 @raise ROSBagException: if an error occurs opening file
123 @raise ROSBagFormatException: if bag format is corrupted
124 """
125 if options is not None:
126 if type(options) is not dict:
127 raise ValueError('options must be of type dict')
128 if 'compression' in options:
129 compression = options['compression']
130 if 'chunk_threshold' in options:
131 chunk_threshold = options['chunk_threshold']
132
133 self._file = None
134 self._filename = None
135 self._version = None
136
137 allowed_compressions = [Compression.NONE, Compression.BZ2]
138 if compression not in allowed_compressions:
139 raise ValueError('compression must be one of: %s' % ', '.join(allowed_compressions))
140 self._compression = compression
141
142 if chunk_threshold < 0:
143 raise ValueError('chunk_threshold must be greater than or equal to zero')
144 self._chunk_threshold = chunk_threshold
145
146 self._skip_index = skip_index
147
148 self._reader = None
149
150 self._file_header_pos = None
151 self._index_data_pos = 0
152
153 self._clear_index()
154
155 self._buffer = StringIO()
156
157 self._curr_compression = Compression.NONE
158
159 self._open(f, mode, allow_unindexed)
160
161 self._output_file = self._file
162
165
168
169 - def __exit__(self, exc_type, exc_value, traceback):
171
172 @property
174 """Get the options."""
175 return { 'compression' : self._compression, 'chunk_threshold' : self._chunk_threshold }
176
177 @property
179 """Get the filename."""
180 return self._filename
181
182 @property
184 """Get the version."""
185 return self._version
186
187 @property
189 """Get the mode."""
190 return self._mode
191
192 @property
194 """Get the size in bytes."""
195 if not self._file:
196 raise ValueError('I/O operation on closed bag')
197
198 pos = self._file.tell()
199 self._file.seek(0, os.SEEK_END)
200 size = self._file.tell()
201 self._file.seek(pos)
202 return size
203
204
205
207 """Get the compression method to use for writing."""
208 return self._compression
209
211 """Set the compression method to use for writing."""
212 allowed_compressions = [Compression.NONE, Compression.BZ2]
213 if compression not in allowed_compressions:
214 raise ValueError('compression must be one of: %s' % ', '.join(allowed_compressions))
215
216 self.flush()
217 self._compression = compression
218
219 compression = property(_get_compression, _set_compression)
220
221
222
224 """Get the chunk threshold to use for writing."""
225 return self._chunk_threshold
226
228 """Set the chunk threshold to use for writing."""
229 if chunk_threshold < 0:
230 raise ValueError('chunk_threshold must be greater than or equal to zero')
231
232 self.flush()
233 self._chunk_threshold = chunk_threshold
234
235 chunk_threshold = property(_get_chunk_threshold, _set_chunk_threshold)
236
237 - def read_messages(self, topics=None, start_time=None, end_time=None, connection_filter=None, raw=False):
238 """
239 Read messages from the bag, optionally filtered by topic, timestamp and connection details.
240 @param topics: list of topics or a single topic [optional]
241 @type topics: list(str) or str
242 @param start_time: earliest timestamp of message to return [optional]
243 @type start_time: U{genpy.Time}
244 @param end_time: latest timestamp of message to return [optional]
245 @type end_time: U{genpy.Time}
246 @param connection_filter: function to filter connections to include [optional]
247 @type connection_filter: function taking (topic, datatype, md5sum, msg_def, header) and returning bool
248 @param raw: if True, then generate tuples of (datatype, (data, md5sum, position), pytype)
249 @type raw: bool
250 @return: generator of (topic, message, timestamp) tuples for each message in the bag file
251 @rtype: generator of tuples of (str, U{genpy.Message}, U{genpy.Time}) [not raw] or (str, (str, str, str, tuple, class), U{genpy.Time}) [raw]
252 """
253 self.flush()
254
255 if topics and type(topics) is str:
256 topics = [topics]
257
258 return self._reader.read_messages(topics, start_time, end_time, connection_filter, raw)
259
261 """
262 Write the open chunk to disk so subsequent reads will read all messages.
263 @raise ValueError: if bag is closed
264 """
265 if not self._file:
266 raise ValueError('I/O operation on closed bag')
267
268 if self._chunk_open:
269 self._stop_writing_chunk()
270
271 - def write(self, topic, msg, t=None, raw=False):
272 """
273 Write a message to the bag.
274 @param topic: name of topic
275 @type topic: str
276 @param msg: message to add to bag, or tuple (if raw)
277 @type msg: Message or tuple of raw message data
278 @param t: ROS time of message publication, if None specifed, use current time [optional]
279 @type t: U{genpy.Time}
280 @param raw: if True, msg is in raw format, i.e. (msg_type, serialized_bytes, md5sum, pytype)
281 @type raw: bool
282 @raise ValueError: if arguments are invalid or bag is closed
283 """
284 if not self._file:
285 raise ValueError('I/O operation on closed bag')
286
287 if not topic:
288 raise ValueError('topic is invalid')
289 if not msg:
290 raise ValueError('msg is invalid')
291
292 if t is None:
293 t = genpy.Time.from_sec(time.time())
294
295
296 self._file.seek(0, os.SEEK_END)
297
298
299 if not self._chunk_open:
300 self._start_writing_chunk(t)
301
302
303 if raw:
304 if len(msg) == 5:
305 msg_type, serialized_bytes, md5sum, pos, pytype = msg
306 elif len(msg) == 4:
307 msg_type, serialized_bytes, md5sum, pytype = msg
308 else:
309 raise ValueError('msg must be of length 4 or 5')
310
311
312 if topic in self._topic_connections:
313 connection_info = self._topic_connections[topic]
314 conn_id = connection_info.id
315 else:
316 conn_id = len(self._connections)
317
318 if raw:
319 if pytype is None:
320 try:
321 pytype = genpy.message.get_message_class(msg_type)
322 except Exception:
323 pytype = None
324 if pytype is None:
325 raise ROSBagException('cannot locate message class and no message class provided for [%s]' % msg_type)
326
327 if pytype._md5sum != md5sum:
328 print('WARNING: md5sum of loaded type [%s] does not match that specified' % msg_type, file=sys.stderr)
329
330
331 header = { 'topic' : topic, 'type' : msg_type, 'md5sum' : md5sum, 'message_definition' : pytype._full_text }
332 else:
333 header = { 'topic' : topic, 'type' : msg.__class__._type, 'md5sum' : msg.__class__._md5sum, 'message_definition' : msg._full_text }
334
335 connection_info = _ConnectionInfo(conn_id, topic, header)
336
337 self._write_connection_record(connection_info)
338
339 self._connections[conn_id] = connection_info
340 self._topic_connections[topic] = connection_info
341
342
343 index_entry = _IndexEntry200(t, self._curr_chunk_info.pos, self._get_chunk_offset())
344
345
346 if conn_id not in self._curr_chunk_connection_indexes:
347
348 self._curr_chunk_connection_indexes[conn_id] = [index_entry]
349 self._curr_chunk_info.connection_counts[conn_id] = 1
350 else:
351 curr_chunk_connection_index = self._curr_chunk_connection_indexes[conn_id]
352 if index_entry >= curr_chunk_connection_index[-1]:
353
354 curr_chunk_connection_index.append(index_entry)
355 else:
356 bisect.insort_right(curr_chunk_connection_index, index_entry)
357
358 self._curr_chunk_info.connection_counts[conn_id] += 1
359
360 if conn_id not in self._connection_indexes:
361 self._connection_indexes[conn_id] = [index_entry]
362 else:
363 bisect.insort_right(self._connection_indexes[conn_id], index_entry)
364
365
366 if t > self._curr_chunk_info.end_time:
367 self._curr_chunk_info.end_time = t
368 elif t < self._curr_chunk_info.start_time:
369 self._curr_chunk_info.start_time = t
370
371 if not raw:
372
373 self._buffer.seek(0)
374 self._buffer.truncate(0)
375 msg.serialize(self._buffer)
376 serialized_bytes = self._buffer.getvalue()
377
378
379 self._write_message_data_record(conn_id, t, serialized_bytes)
380
381
382 chunk_size = self._get_chunk_offset()
383 if chunk_size > self._chunk_threshold:
384 self._stop_writing_chunk()
385
387 """
388 Reindexes the bag file. Yields position of each chunk for progress.
389 """
390 self._clear_index()
391 return self._reader.reindex()
392
394 """
395 Close the bag file. Closing an already closed bag does nothing.
396 """
397 if self._file:
398 if self._mode in 'wa':
399 self._stop_writing()
400
401 self._close_file()
402
404 rows = []
405
406 try:
407 if self._filename:
408 rows.append(('path', self._filename))
409
410 if self._version == 102 and type(self._reader) == _BagReader102_Unindexed:
411 rows.append(('version', '1.2 (unindexed)'))
412 else:
413 rows.append(('version', '%d.%d' % (self._version / 100, self._version % 100)))
414
415 if not self._connection_indexes and not self._chunks:
416 rows.append(('size', _human_readable_size(self.size)))
417 else:
418 if self._chunks:
419 start_stamp = self._chunks[ 0].start_time.to_sec()
420 end_stamp = self._chunks[-1].end_time.to_sec()
421 else:
422 start_stamp = min([index[ 0].time.to_sec() for index in self._connection_indexes.itervalues()])
423 end_stamp = max([index[-1].time.to_sec() for index in self._connection_indexes.itervalues()])
424
425
426 duration = end_stamp - start_stamp
427 dur_secs = duration % 60
428 dur_mins = int(duration / 60)
429 dur_hrs = int(dur_mins / 60)
430 if dur_hrs > 0:
431 dur_mins = dur_mins % 60
432 duration_str = '%dhr %d:%02ds (%ds)' % (dur_hrs, dur_mins, dur_secs, duration)
433 elif dur_mins > 0:
434 duration_str = '%d:%02ds (%ds)' % (dur_mins, dur_secs, duration)
435 else:
436 duration_str = '%.1fs' % duration
437
438 rows.append(('duration', duration_str))
439
440
441 rows.append(('start', '%s (%.2f)' % (_time_to_str(start_stamp), start_stamp)))
442 rows.append(('end', '%s (%.2f)' % (_time_to_str(end_stamp), end_stamp)))
443
444 rows.append(('size', _human_readable_size(self.size)))
445
446 if self._chunks:
447 num_messages = 0
448 for c in self._chunks:
449 for counts in c.connection_counts.itervalues():
450 num_messages += counts
451 else:
452 num_messages = sum([len(index) for index in self._connection_indexes.itervalues()])
453 rows.append(('messages', str(num_messages)))
454
455
456 if len(self._chunk_headers) == 0:
457 rows.append(('compression', 'none'))
458 else:
459 compression_counts = {}
460 compression_uncompressed = {}
461 compression_compressed = {}
462 for chunk_header in self._chunk_headers.itervalues():
463 if chunk_header.compression not in compression_counts:
464 compression_counts[chunk_header.compression] = 1
465 compression_uncompressed[chunk_header.compression] = chunk_header.uncompressed_size
466 compression_compressed[chunk_header.compression] = chunk_header.compressed_size
467 else:
468 compression_counts[chunk_header.compression] += 1
469 compression_uncompressed[chunk_header.compression] += chunk_header.uncompressed_size
470 compression_compressed[chunk_header.compression] += chunk_header.compressed_size
471
472 chunk_count = len(self._chunk_headers)
473
474 compressions = []
475 for count, compression in reversed(sorted([(v, k) for k, v in compression_counts.items()])):
476 if compression != Compression.NONE:
477 fraction = (100.0 * compression_compressed[compression]) / compression_uncompressed[compression]
478 compressions.append('%s [%d/%d chunks; %.2f%%]' % (compression, count, chunk_count, fraction))
479 else:
480 compressions.append('%s [%d/%d chunks]' % (compression, count, chunk_count))
481 rows.append(('compression', ', '.join(compressions)))
482
483 all_uncompressed = (sum([count for c, count in compression_counts.items() if c != Compression.NONE]) == 0)
484 if not all_uncompressed:
485 total_uncompressed_size = sum((h.uncompressed_size for h in self._chunk_headers.itervalues()))
486 total_compressed_size = sum((h.compressed_size for h in self._chunk_headers.itervalues()))
487
488 total_uncompressed_size_str = _human_readable_size(total_uncompressed_size)
489 total_compressed_size_str = _human_readable_size(total_compressed_size)
490 total_size_str_length = max([len(total_uncompressed_size_str), len(total_compressed_size_str)])
491
492 if duration > 0:
493 uncompressed_rate_str = _human_readable_size(total_uncompressed_size / duration)
494 compressed_rate_str = _human_readable_size(total_compressed_size / duration)
495 rate_str_length = max([len(uncompressed_rate_str), len(compressed_rate_str)])
496
497 rows.append(('uncompressed', '%*s @ %*s/s' %
498 (total_size_str_length, total_uncompressed_size_str, rate_str_length, uncompressed_rate_str)))
499 rows.append(('compressed', '%*s @ %*s/s (%.2f%%)' %
500 (total_size_str_length, total_compressed_size_str, rate_str_length, compressed_rate_str, (100.0 * total_compressed_size) / total_uncompressed_size)))
501 else:
502 rows.append(('uncompressed', '%*s' % (total_size_str_length, total_uncompressed_size_str)))
503 rows.append(('compressed', '%*s' % (total_size_str_length, total_compressed_size_str)))
504
505 datatypes = set()
506 datatype_infos = []
507 for connection in self._connections.itervalues():
508 if connection.datatype in datatypes:
509 continue
510 datatype_infos.append((connection.datatype, connection.md5sum, connection.msg_def))
511 datatypes.add(connection.datatype)
512
513 topics = sorted(set([c.topic for c in self._get_connections()]))
514 topic_datatypes = {}
515 topic_conn_counts = {}
516 topic_msg_counts = {}
517 topic_freqs_median = {}
518 for topic in topics:
519 connections = list(self._get_connections(topic))
520
521 topic_datatypes[topic] = connections[0].datatype
522 topic_conn_counts[topic] = len(connections)
523
524 msg_count = 0
525 for connection in connections:
526 for chunk in self._chunks:
527 msg_count += chunk.connection_counts.get(connection.id, 0)
528 topic_msg_counts[topic] = msg_count
529
530 if self._connection_indexes_read:
531 stamps = [entry.time.to_sec() for entry in self._get_entries(connections)]
532 if len(stamps) > 1:
533 periods = [s1 - s0 for s1, s0 in zip(stamps[1:], stamps[:-1])]
534 med_period = _median(periods)
535 if med_period > 0.0:
536 topic_freqs_median[topic] = 1.0 / med_period
537
538 topics = sorted(topic_datatypes.iterkeys())
539 max_topic_len = max([len(topic) for topic in topics])
540 max_datatype_len = max([len(datatype) for datatype in datatypes])
541 max_msg_count_len = max([len('%d' % msg_count) for msg_count in topic_msg_counts.itervalues()])
542 max_freq_median_len = max([len(_human_readable_frequency(freq)) for freq in topic_freqs_median.itervalues()]) if len(topic_freqs_median) > 0 else 0
543
544
545 for i, (datatype, md5sum, msg_def) in enumerate(sorted(datatype_infos)):
546 s = '%-*s [%s]' % (max_datatype_len, datatype, md5sum)
547 if i == 0:
548 rows.append(('types', s))
549 else:
550 rows.append(('', s))
551
552
553 for i, topic in enumerate(topics):
554 topic_msg_count = topic_msg_counts[topic]
555
556 s = '%-*s %*d %s' % (max_topic_len, topic, max_msg_count_len, topic_msg_count, 'msgs' if topic_msg_count > 1 else 'msg ')
557 if topic in topic_freqs_median:
558 s += ' @ %*s' % (max_freq_median_len, _human_readable_frequency(topic_freqs_median[topic]))
559 else:
560 s += ' %*s' % (max_freq_median_len, '')
561
562 s += ' : %-*s' % (max_datatype_len, topic_datatypes[topic])
563 if topic_conn_counts[topic] > 1:
564 s += ' (%d connections)' % topic_conn_counts[topic]
565
566 if i == 0:
567 rows.append(('topics', s))
568 else:
569 rows.append(('', s))
570
571 except Exception as ex:
572 raise
573
574 first_column_width = max([len(field) for field, _ in rows]) + 1
575
576 s = ''
577 for (field, value) in rows:
578 if field:
579 s += '%-*s %s\n' % (first_column_width, field + ':', value)
580 else:
581 s += '%-*s %s\n' % (first_column_width, '', value)
582
583 return s.rstrip()
584
586 s = ''
587
588 try:
589 if self._filename:
590 s += 'path: %s\n' % self._filename
591
592 if self._version == 102 and type(self._reader) == _BagReader102_Unindexed:
593 s += 'version: 1.2 (unindexed)\n'
594 else:
595 s += 'version: %d.%d\n' % (self._version / 100, self._version % 100)
596
597 if not self._connection_indexes and not self._chunks:
598 s += 'size: %d\n' % self.size
599 s += 'indexed: False\n'
600 else:
601 if self._chunks:
602 start_stamp = self._chunks[ 0].start_time.to_sec()
603 end_stamp = self._chunks[-1].end_time.to_sec()
604 else:
605 start_stamp = min([index[ 0].time.to_sec() for index in self._connection_indexes.itervalues()])
606 end_stamp = max([index[-1].time.to_sec() for index in self._connection_indexes.itervalues()])
607
608 duration = end_stamp - start_stamp
609 s += 'duration: %.6f\n' % duration
610 s += 'start: %.6f\n' % start_stamp
611 s += 'end: %.6f\n' % end_stamp
612 s += 'size: %d\n' % self.size
613 if self._chunks:
614 num_messages = 0
615 for c in self._chunks:
616 for counts in c.connection_counts.itervalues():
617 num_messages += counts
618 else:
619 num_messages = sum([len(index) for index in self._connection_indexes.itervalues()])
620 s += 'messages: %d\n' % num_messages
621 s += 'indexed: True\n'
622
623
624 if len(self._chunk_headers) == 0:
625 s += 'compression: none\n'
626 else:
627 compression_counts = {}
628 compression_uncompressed = {}
629 compression_compressed = {}
630 for chunk_header in self._chunk_headers.itervalues():
631 if chunk_header.compression not in compression_counts:
632 compression_counts[chunk_header.compression] = 1
633 compression_uncompressed[chunk_header.compression] = chunk_header.uncompressed_size
634 compression_compressed[chunk_header.compression] = chunk_header.compressed_size
635 else:
636 compression_counts[chunk_header.compression] += 1
637 compression_uncompressed[chunk_header.compression] += chunk_header.uncompressed_size
638 compression_compressed[chunk_header.compression] += chunk_header.compressed_size
639
640 chunk_count = len(self._chunk_headers)
641
642 main_compression_count, main_compression = list(reversed(sorted([(v, k) for k, v in compression_counts.items()])))[0]
643 s += 'compression: %s\n' % str(main_compression)
644
645 all_uncompressed = (sum([count for c, count in compression_counts.items() if c != Compression.NONE]) == 0)
646 if not all_uncompressed:
647 s += 'uncompressed: %d\n' % sum((h.uncompressed_size for h in self._chunk_headers.itervalues()))
648 s += 'compressed: %d\n' % sum((h.compressed_size for h in self._chunk_headers.itervalues()))
649
650 datatypes = set()
651 datatype_infos = []
652 for connection in self._connections.itervalues():
653 if connection.datatype in datatypes:
654 continue
655 datatype_infos.append((connection.datatype, connection.md5sum, connection.msg_def))
656 datatypes.add(connection.datatype)
657
658 topics = sorted(set([c.topic for c in self._get_connections()]))
659 topic_datatypes = {}
660 topic_conn_counts = {}
661 topic_msg_counts = {}
662 topic_freqs_median = {}
663 for topic in topics:
664 connections = list(self._get_connections(topic))
665
666 topic_datatypes[topic] = connections[0].datatype
667 topic_conn_counts[topic] = len(connections)
668
669 msg_count = 0
670 for connection in connections:
671 for chunk in self._chunks:
672 msg_count += chunk.connection_counts.get(connection.id, 0)
673 topic_msg_counts[topic] = msg_count
674
675 if self._connection_indexes_read:
676 stamps = [entry.time.to_sec() for entry in self._get_entries(connections)]
677 if len(stamps) > 1:
678 periods = [s1 - s0 for s1, s0 in zip(stamps[1:], stamps[:-1])]
679 med_period = _median(periods)
680 if med_period > 0.0:
681 topic_freqs_median[topic] = 1.0 / med_period
682
683 topics = sorted(topic_datatypes.iterkeys())
684 max_topic_len = max([len(topic) for topic in topics])
685 max_datatype_len = max([len(datatype) for datatype in datatypes])
686 max_msg_count_len = max([len('%d' % msg_count) for msg_count in topic_msg_counts.itervalues()])
687 max_freq_median_len = max([len(_human_readable_frequency(freq)) for freq in topic_freqs_median.itervalues()]) if len(topic_freqs_median) > 0 else 0
688
689
690 s += 'types:\n'
691 for i, (datatype, md5sum, msg_def) in enumerate(sorted(datatype_infos)):
692 s += ' - type: %s\n' % datatype
693 s += ' md5: %s\n' % md5sum
694
695
696 s += 'topics:\n'
697 for i, topic in enumerate(topics):
698 topic_msg_count = topic_msg_counts[topic]
699
700 s += ' - topic: %s\n' % topic
701 s += ' type: %s\n' % topic_datatypes[topic]
702 s += ' messages: %d\n' % topic_msg_count
703
704 if topic_conn_counts[topic] > 1:
705 s += ' connections: %d\n' % topic_conn_counts[topic]
706
707 if topic in topic_freqs_median:
708 s += ' frequency: %.4f\n' % topic_freqs_median[topic]
709
710 if not key:
711 return s
712
713 class DictObject(object):
714 def __init__(self, d):
715 for a, b in d.items():
716 if isinstance(b, (list, tuple)):
717 setattr(self, a, [DictObject(x) if isinstance(x, dict) else x for x in b])
718 else:
719 setattr(self, a, DictObject(b) if isinstance(b, dict) else b)
720
721 obj = DictObject(yaml.load(s))
722 try:
723 val = eval('obj.' + key)
724 except Exception as ex:
725 print('Error getting key "%s"' % key, file=sys.stderr)
726 return None
727
728 def print_yaml(val, indent=0):
729 indent_str = ' ' * indent
730
731 if type(val) is list:
732 s = ''
733 for item in val:
734 s += '%s- %s\n' % (indent_str, print_yaml(item, indent + 1))
735 return s
736 elif type(val) is DictObject:
737 s = ''
738 for i, (k, v) in enumerate(val.__dict__.items()):
739 if i != 0:
740 s += indent_str
741 s += '%s: %s' % (k, str(v))
742 if i < len(val.__dict__) - 1:
743 s += '\n'
744 return s
745 else:
746 return indent_str + str(val)
747
748 return print_yaml(val)
749
750 except Exception as ex:
751 raise
752
753
754
755 @property
757 if not self._chunk_headers:
758 return False
759
760 return any((h.compression != Compression.NONE for h in self._chunk_headers.itervalues()))
761
762 @property
764 if not self._chunk_headers:
765 return self.size
766
767 return sum((h.uncompressed_size for h in self._chunk_headers.itervalues()))
768
770 """
771 Read the message from the given position in the file.
772 """
773 self.flush()
774 return self._reader.seek_and_read_message_data_record(position, raw)
775
776
777
779 """
780 Yield the connections, optionally filtering by topic and/or connection information.
781 """
782 if topics:
783 if type(topics) is str:
784 topics = set([roslib.names.canonicalize_name(topics)])
785 else:
786 topics = set([roslib.names.canonicalize_name(t) for t in topics])
787
788 for c in self._connections.itervalues():
789 if topics and c.topic not in topics and roslib.names.canonicalize_name(c.topic) not in topics:
790 continue
791 if connection_filter and not connection_filter(c.topic, c.datatype, c.md5sum, c.msg_def, c.header):
792 continue
793 yield c
794
795 - def _get_entries(self, connections=None, start_time=None, end_time=None):
796 """
797 Yield index entries on the given connections in the given time range.
798 """
799 for entry, _ in _mergesort(self._get_indexes(connections), key=lambda entry: entry.time):
800 if start_time and entry.time < start_time:
801 continue
802 if end_time and entry.time > end_time:
803 return
804 yield entry
805
807 """
808 Yield index entries on the given connections in the given time range in reverse order.
809 """
810 for entry, _ in _mergesort((reversed(index) for index in self._get_indexes(connections)), key=lambda entry: -entry.time.to_sec()):
811 if end_time and entry.time > end_time:
812 continue
813 if start_time and entry.time < start_time:
814 return
815 yield entry
816
817 - def _get_entry(self, t, connections=None):
818 """
819 Return the first index entry on/before the given time on the given connections
820 """
821 indexes = self._get_indexes(connections)
822
823 entry = _IndexEntry(t)
824
825 first_entry = None
826
827 for index in indexes:
828 i = bisect.bisect_right(index, entry) - 1
829 if i >= 0:
830 index_entry = index[i]
831 if first_entry is None or index_entry > first_entry:
832 first_entry = index_entry
833
834 return first_entry
835
836 - def _get_entry_after(self, t, connections=None):
837 """
838 Return the first index entry after the given time on the given connections
839 """
840 indexes = self._get_indexes(connections)
841
842 entry = _IndexEntry(t)
843
844 first_entry = None
845
846 for index in indexes:
847 i = bisect.bisect_right(index, entry)
848 if i <= len(index) - 1:
849 index_entry = index[i]
850 if first_entry is None or index_entry < first_entry:
851 first_entry = index_entry
852
853 return first_entry
854
856 """
857 Get the indexes for the given connections.
858 """
859 if not self._connection_indexes_read:
860 self._reader._read_connection_index_records()
861
862 if connections is None:
863 return self._connection_indexes.itervalues()
864 else:
865 return (self._connection_indexes[c.id] for c in connections)
866
867
868
870 self._connection_indexes_read = False
871 self._connection_indexes = {}
872
873 self._topic_connections = {}
874 self._connections = {}
875 self._connection_count = 0
876 self._chunk_count = 0
877 self._chunks = []
878 self._chunk_headers = {}
879
880 self._chunk_open = False
881 self._curr_chunk_info = None
882 self._curr_chunk_data_pos = None
883 self._curr_chunk_connection_indexes = {}
884
885 - def _open(self, f, mode, allow_unindexed):
886 if not f:
887 raise ValueError('filename (or stream) is invalid')
888
889 try:
890 if mode == 'r': self._open_read(f, allow_unindexed)
891 elif mode == 'w': self._open_write(f)
892 elif mode == 'a': self._open_append(f, allow_unindexed)
893 else:
894 raise ValueError('mode "%s" is invalid' % mode)
895 except struct.error:
896 raise ROSBagFormatException('error with bag')
897
899 if isinstance(f, file):
900 self._file = f
901 self._filename = None
902 else:
903 self._file = open(f, 'rb')
904 self._filename = f
905
906 self._mode = 'r'
907
908 try:
909 self._version = self._read_version()
910 except:
911 self._version = None
912 self._close_file()
913 raise
914
915 try:
916 self._create_reader()
917 self._reader.start_reading()
918 except ROSBagUnindexedException as ex:
919 if not allow_unindexed:
920 self._close_file()
921 raise
922 except:
923 self._close_file()
924 raise
925
927 if isinstance(f, file):
928 self._file = f
929 self._filename = None
930 else:
931 self._file = open(f, 'w+b')
932 self._filename = f
933
934 self._mode = 'w'
935
936 self._version = 200
937
938 try:
939 self._create_reader()
940 self._start_writing()
941 except:
942 self._close_file()
943 raise
944
946 if isinstance(f, file):
947 self._file = f
948 self._filename = None
949 else:
950 try:
951
952 open(f, 'r').close()
953
954
955 self._file = open(f, 'r+b')
956 except IOError:
957
958 self._file = open(f, 'w+b')
959
960 self._filename = f
961
962 self._mode = 'a'
963
964 try:
965 self._version = self._read_version()
966 except:
967 self._version = None
968 self._close_file()
969 raise
970
971 if self._version != 200:
972 self._file.close()
973 raise ROSBagException('bag version %d is unsupported for appending' % self._version)
974
975 try:
976 self._start_appending()
977 except ROSBagUnindexedException:
978 if not allow_unindexed:
979 self._close_file()
980 raise
981 except:
982 self._close_file()
983 raise
984
986 self._file.close()
987 self._file = None
988
990 """
991 @raise ROSBagException: if the bag version is unsupported
992 """
993 major_version, minor_version = self._version / 100, self._version % 100
994 if major_version == 2:
995 self._reader = _BagReader200(self)
996 elif major_version == 1:
997 if minor_version == 1:
998 raise ROSBagException('unsupported bag version %d. Please convert bag to newer format.' % self._version)
999 else:
1000
1001 first_record_pos = self._file.tell()
1002 header = _read_header(self._file)
1003 op = _read_uint8_field(header, 'op')
1004 self._file.seek(first_record_pos)
1005
1006 if op == _OP_FILE_HEADER:
1007 self._reader = _BagReader102_Indexed(self)
1008 else:
1009 self._reader = _BagReader102_Unindexed(self)
1010 else:
1011 raise ROSBagException('unknown bag version %d' % self._version)
1012
1014 """
1015 @raise ROSBagException: if the file is empty, or the version line can't be parsed
1016 """
1017 version_line = self._file.readline().rstrip()
1018 if len(version_line) == 0:
1019 raise ROSBagException('empty file')
1020
1021 matches = re.match("#ROS(.*) V(\d).(\d)", version_line)
1022 if matches is None or len(matches.groups()) != 3:
1023 raise ROSBagException('This does not appear to be a bag file')
1024
1025 version_type, major_version_str, minor_version_str = matches.groups()
1026
1027 version = int(major_version_str) * 100 + int(minor_version_str)
1028
1029 return version
1030
1032 self._file.write(_VERSION + '\n')
1033 self._file_header_pos = self._file.tell()
1034 self._write_file_header_record(0, 0, 0)
1035
1037
1038 self.flush()
1039
1040
1041 self._index_data_pos = self._file.tell()
1042
1043
1044 for connection_info in self._connections.itervalues():
1045 self._write_connection_record(connection_info)
1046
1047
1048 for chunk_info in self._chunks:
1049 self._write_chunk_info_record(chunk_info)
1050
1051
1052 self._file.seek(self._file_header_pos)
1053 self._write_file_header_record(self._index_data_pos, len(self._connections), len(self._chunks))
1054
1056 self._file_header_pos = self._file.tell()
1057
1058 self._create_reader()
1059 self._reader.start_reading()
1060
1061
1062 self._file.truncate(self._index_data_pos)
1063 self._reader.index_data_pos = 0
1064
1065
1066 self._file.seek(self._file_header_pos);
1067 self._write_file_header_record(0, 0, 0)
1068
1069
1070 self._file.seek(0, os.SEEK_END)
1071
1073 self._curr_chunk_info = _ChunkInfo(self._file.tell(), t, t)
1074 self._write_chunk_header(_ChunkHeader(self._compression, 0, 0))
1075 self._curr_chunk_data_pos = self._file.tell()
1076 self._set_compression_mode(self._compression)
1077 self._chunk_open = True
1078
1080 if self._compression == Compression.NONE:
1081 return self._file.tell() - self._curr_chunk_data_pos
1082 else:
1083 return self._output_file.compressed_bytes_in
1084
1086
1087 self._chunks.append(self._curr_chunk_info)
1088
1089
1090 uncompressed_size = self._get_chunk_offset()
1091 self._set_compression_mode(Compression.NONE)
1092 compressed_size = self._file.tell() - self._curr_chunk_data_pos
1093
1094
1095 end_of_chunk_pos = self._file.tell()
1096 self._file.seek(self._curr_chunk_info.pos)
1097 chunk_header = _ChunkHeader(self._compression, compressed_size, uncompressed_size, self._curr_chunk_data_pos)
1098 self._write_chunk_header(chunk_header)
1099 self._chunk_headers[self._curr_chunk_info.pos] = chunk_header
1100
1101
1102 self._file.seek(end_of_chunk_pos)
1103 for connection_id, entries in self._curr_chunk_connection_indexes.items():
1104 self._write_connection_index_record(connection_id, entries)
1105 self._curr_chunk_connection_indexes.clear()
1106
1107
1108 self._chunk_open = False
1109
1124
1126 header = {
1127 'op': _pack_uint8(_OP_FILE_HEADER),
1128 'index_pos': _pack_uint64(index_pos),
1129 'conn_count': _pack_uint32(connection_count),
1130 'chunk_count': _pack_uint32(chunk_count)
1131 }
1132 _write_record(self._file, header, padded_size=_FILE_HEADER_LENGTH)
1133
1135 header = {
1136 'op': _pack_uint8(_OP_CONNECTION),
1137 'topic': connection_info.topic,
1138 'conn': _pack_uint32(connection_info.id)
1139 }
1140
1141 _write_header(self._output_file, header)
1142
1143 _write_header(self._output_file, connection_info.header)
1144
1146 header = {
1147 'op': _pack_uint8(_OP_MSG_DATA),
1148 'conn': _pack_uint32(connection_id),
1149 'time': _pack_time(t)
1150 }
1151 _write_record(self._output_file, header, serialized_bytes)
1152
1154 header = {
1155 'op': _pack_uint8(_OP_CHUNK),
1156 'compression': chunk_header.compression,
1157 'size': _pack_uint32(chunk_header.uncompressed_size)
1158 }
1159 _write_header(self._file, header)
1160
1161 self._file.write(_pack_uint32(chunk_header.compressed_size))
1162
1164 header = {
1165 'op': _pack_uint8(_OP_INDEX_DATA),
1166 'conn': _pack_uint32(connection_id),
1167 'ver': _pack_uint32(_INDEX_VERSION),
1168 'count': _pack_uint32(len(entries))
1169 }
1170
1171 buffer = self._buffer
1172 buffer.seek(0)
1173 buffer.truncate(0)
1174 for entry in entries:
1175 buffer.write(_pack_time (entry.time))
1176 buffer.write(_pack_uint32(entry.offset))
1177
1178 _write_record(self._file, header, buffer.getvalue())
1179
1181 header = {
1182 'op': _pack_uint8 (_OP_CHUNK_INFO),
1183 'ver': _pack_uint32(_CHUNK_INDEX_VERSION),
1184 'chunk_pos': _pack_uint64(chunk_info.pos),
1185 'start_time': _pack_time(chunk_info.start_time),
1186 'end_time': _pack_time(chunk_info.end_time),
1187 'count': _pack_uint32(len(chunk_info.connection_counts))
1188 }
1189
1190 buffer = self._buffer
1191 buffer.seek(0)
1192 buffer.truncate(0)
1193 for connection_id, count in chunk_info.connection_counts.items():
1194 buffer.write(_pack_uint32(connection_id))
1195 buffer.write(_pack_uint32(count))
1196
1197 _write_record(self._file, header, buffer.getvalue())
1198
1199
1200
1201 _message_types = {}
1202
1203 _OP_MSG_DEF = 0x01
1204 _OP_MSG_DATA = 0x02
1205 _OP_FILE_HEADER = 0x03
1206 _OP_INDEX_DATA = 0x04
1207 _OP_CHUNK = 0x05
1208 _OP_CHUNK_INFO = 0x06
1209 _OP_CONNECTION = 0x07
1210
1211 _OP_CODES = {
1212 _OP_MSG_DEF: 'MSG_DEF',
1213 _OP_MSG_DATA: 'MSG_DATA',
1214 _OP_FILE_HEADER: 'FILE_HEADER',
1215 _OP_INDEX_DATA: 'INDEX_DATA',
1216 _OP_CHUNK: 'CHUNK',
1217 _OP_CHUNK_INFO: 'CHUNK_INFO',
1218 _OP_CONNECTION: 'CONNECTION'
1219 }
1220
1221 _VERSION = '#ROSBAG V2.0'
1222 _FILE_HEADER_LENGTH = 4096
1223 _INDEX_VERSION = 1
1224 _CHUNK_INDEX_VERSION = 1
1227 - def __init__(self, id, topic, header):
1228 try:
1229 datatype, md5sum, msg_def = header['type'], header['md5sum'], header['message_definition']
1230 except KeyError as ex:
1231 raise ROSBagFormatException('connection header field %s not found' % str(ex))
1232
1233 self.id = id
1234 self.topic = topic
1235 self.datatype = datatype
1236 self.md5sum = md5sum
1237 self.msg_def = msg_def
1238 self.header = header
1239
1241 return '%d on %s: %s' % (self.id, self.topic, str(self.header))
1242
1244 - def __init__(self, pos, start_time, end_time):
1245 self.pos = pos
1246 self.start_time = start_time
1247 self.end_time = end_time
1248
1249 self.connection_counts = {}
1250
1252 s = 'chunk_pos: %d\n' % self.pos
1253 s += 'start_time: %s\n' % str(self.start_time)
1254 s += 'end_time: %s\n' % str(self.end_time)
1255 s += 'connections: %d\n' % len(self.connection_counts)
1256 s += '\n'.join([' - [%4d] %d' % (connection_id, count) for connection_id, count in self.connection_counts.items()])
1257 return s
1258
1261 self.compression = compression
1262 self.compressed_size = compressed_size
1263 self.uncompressed_size = uncompressed_size
1264 self.data_pos = data_pos
1265
1267 if self.uncompressed_size > 0:
1268 ratio = 100 * (float(self.compressed_size) / self.uncompressed_size)
1269 return 'compression: %s, size: %d, uncompressed: %d (%.2f%%)' % (self.compression, self.compressed_size, self.uncompressed_size, ratio)
1270 else:
1271 return 'compression: %s, size: %d, uncompressed: %d' % (self.compression, self.compressed_size, self.uncompressed_size)
1272
1273 -class _IndexEntry(object):
1274 - def __init__(self, time):
1276
1277 - def __cmp__(self, other):
1278 return self.time.__cmp__(other.time)
1279
1280 -class _IndexEntry102(_IndexEntry):
1281 - def __init__(self, time, offset):
1282 self.time = time
1283 self.offset = offset
1284
1285 @property
1286 - def position(self):
1288
1289 - def __str__(self):
1290 return '%d.%d: %d' % (self.time.secs, self.time.nsecs, self.offset)
1291
1292 -class _IndexEntry200(_IndexEntry):
1293 - def __init__(self, time, chunk_pos, offset):
1294 self.time = time
1295 self.chunk_pos = chunk_pos
1296 self.offset = offset
1297
1298 @property
1299 - def position(self):
1300 return (self.chunk_pos, self.offset)
1301
1302 - def __str__(self):
1303 return '%d.%d: %d+%d' % (self.time.secs, self.time.nsecs, self.chunk_pos, self.offset)
1304
1306 message_type = _message_types.get(info.md5sum)
1307 if message_type is None:
1308 try:
1309 message_type = genpy.dynamic.generate_dynamic(info.datatype, info.msg_def)[info.datatype]
1310 if (message_type._md5sum != info.md5sum):
1311 print('WARNING: For type [%s] stored md5sum [%s] does not match message definition [%s].\n Try: "rosrun rosbag fix_msg_defs.py old_bag new_bag."'%(info.datatype, info.md5sum, message_type._md5sum), file=sys.stderr)
1312 except genmsg.InvalidMsgSpec:
1313 message_type = genpy.dynamic.generate_dynamic(info.datatype, "")[info.datatype]
1314 print('WARNING: For type [%s] stored md5sum [%s] has invalid message definition."'%(info.datatype, info.md5sum), file=sys.stderr)
1315 except genmsg.MsgGenerationException as ex:
1316 raise ROSBagException('Error generating datatype %s: %s' % (info.datatype, str(ex)))
1317
1318 _message_types[info.md5sum] = message_type
1319
1320 return message_type
1321
1325 -def _read_time (f): return _unpack_time (f.read(8))
1326
1330 -def _unpack_time(v): return rospy.Time(*struct.unpack('<LL', v))
1331
1335 -def _pack_time(v): return _pack_uint32(v.secs) + _pack_uint32(v.nsecs)
1336
1338 data = f.read(size)
1339 if len(data) != size:
1340 raise ROSBagException('expecting %d bytes, read %d' % (size, len(data)))
1341 return data
1342
1344 _skip_sized(f)
1345 _skip_sized(f)
1346
1348 size = _read_uint32(f)
1349 f.seek(size, os.SEEK_CUR)
1350
1352 try:
1353 size = _read_uint32(f)
1354 except struct.error as ex:
1355 raise ROSBagFormatException('error unpacking uint32: %s' % str(ex))
1356 return _read(f, size)
1357
1359 f.write(_pack_uint32(len(v)))
1360 f.write(v)
1361
1363 if field not in header:
1364 raise ROSBagFormatException('expected "%s" field in record' % field)
1365
1366 try:
1367 value = unpack_fn(header[field])
1368 except Exception as ex:
1369 raise ROSBagFormatException('error reading field "%s": %s' % (field, str(ex)))
1370
1371 return value
1372
1373 -def _read_str_field (header, field): return _read_field(header, field, lambda v: v)
1374 -def _read_uint8_field (header, field): return _read_field(header, field, _unpack_uint8)
1377 -def _read_time_field (header, field): return _read_field(header, field, _unpack_time)
1378
1380 header_str = _write_header(f, header)
1381
1382 if padded_size is not None:
1383 header_len = len(header_str)
1384 if header_len < padded_size:
1385 data = ' ' * (padded_size - header_len)
1386 else:
1387 data = ''
1388
1389 _write_sized(f, data)
1390
1392 header_str = ''.join([_pack_uint32(len(k) + 1 + len(v)) + k + '=' + v for k, v in header.items()])
1393 _write_sized(f, header_str)
1394 return header_str
1395
1397 bag_pos = f.tell()
1398
1399
1400 try:
1401 header = _read_sized(f)
1402 except ROSBagException as ex:
1403 raise ROSBagFormatException('Error reading header: %s' % str(ex))
1404
1405
1406 header_dict = {}
1407 while header != '':
1408
1409 if len(header) < 4:
1410 raise ROSBagFormatException('Error reading header field')
1411 (size,) = struct.unpack('<L', header[:4])
1412 header = header[4:]
1413
1414
1415 if len(header) < size:
1416 raise ROSBagFormatException('Error reading header field: expected %d bytes, read %d' % (size, len(header)))
1417 (name, sep, value) = header[:size].partition('=')
1418 if sep == '':
1419 raise ROSBagFormatException('Error reading header field')
1420
1421 header_dict[name] = value
1422
1423 header = header[size:]
1424
1425
1426 if req_op is not None:
1427 op = _read_uint8_field(header_dict, 'op')
1428 if req_op != op:
1429 raise ROSBagFormatException('Expected op code: %s, got %s' % (_OP_CODES[req_op], _OP_CODES[op]))
1430
1431 return header_dict
1432
1434 pos = f.tell()
1435 header = _read_header(f)
1436 op = _read_uint8_field(header, 'op')
1437 f.seek(pos)
1438 return op
1439
1441 try:
1442 record_data = _read_sized(f)
1443 except ROSBagException as ex:
1444 raise ROSBagFormatException('Error reading record data: %s' % str(ex))
1445
1446 return record_data
1447
1451
1453 raise NotImplementedError()
1454
1455 - def read_messages(self, topics, start_time, end_time, connection_filter, raw):
1456 raise NotImplementedError()
1457
1459 raise NotImplementedError()
1460
1462 """
1463 Support class for reading unindexed v1.2 bag files.
1464 """
1467
1469 self.bag._file_header_pos = self.bag._file.tell()
1470
1472 """Generates all bag index information by rereading the message records."""
1473 f = self.bag._file
1474
1475 total_bytes = self.bag.size
1476
1477
1478 self.bag._file.seek(self.bag._file_header_pos)
1479
1480 offset = f.tell()
1481
1482
1483 while offset < total_bytes:
1484 yield offset
1485
1486 op = _peek_next_header_op(f)
1487
1488 if op == _OP_MSG_DEF:
1489 connection_info = self.read_message_definition_record()
1490
1491 if connection_info.topic not in self.bag._topic_connections:
1492 self.bag._topic_connections[connection_info.topic] = connection_info.id
1493 self.bag._connections[connection_info.id] = connection_info
1494 self.bag._connection_indexes[connection_info.id] = []
1495
1496 elif op == _OP_MSG_DATA:
1497
1498 header = _read_header(f)
1499
1500 topic = _read_str_field(header, 'topic')
1501 secs = _read_uint32_field(header, 'sec')
1502 nsecs = _read_uint32_field(header, 'nsec')
1503 t = genpy.Time(secs, nsecs)
1504
1505 if topic not in self.bag._topic_connections:
1506 datatype = _read_str_field(header, 'type')
1507 self._create_connection_info_for_datatype(topic, datatype)
1508
1509 connection_id = self.bag._topic_connections[topic]
1510 info = self.bag._connections[connection_id]
1511
1512
1513 _skip_sized(f)
1514
1515
1516 bisect.insort_right(self.bag._connection_indexes[connection_id], _IndexEntry102(t, offset))
1517
1518 offset = f.tell()
1519
1520 - def read_messages(self, topics, start_time, end_time, topic_filter, raw):
1521 f = self.bag._file
1522
1523 f.seek(self.bag._file_header_pos)
1524
1525 while True:
1526
1527 while True:
1528 position = f.tell()
1529
1530 try:
1531 header = _read_header(f)
1532 except Exception:
1533 return
1534
1535 op = _read_uint8_field(header, 'op')
1536 if op != _OP_MSG_DEF:
1537 break
1538
1539 connection_info = self.read_message_definition_record(header)
1540
1541 if connection_info.topic not in self.bag._topic_connections:
1542 self.bag._topic_connections[connection_info.topic] = connection_info.id
1543
1544 self.bag._connections[connection_info.id] = connection_info
1545
1546
1547 if op != _OP_MSG_DATA:
1548 raise ROSBagFormatException('Expecting OP_MSG_DATA, got %d' % op)
1549
1550 topic = _read_str_field(header, 'topic')
1551
1552 if topic not in self.bag._topic_connections:
1553 datatype = _read_str_field(header, 'type')
1554 self._create_connection_info_for_datatype(topic, datatype)
1555
1556 connection_id = self.bag._topic_connections[topic]
1557 info = self.bag._connections[connection_id]
1558
1559
1560 try:
1561 msg_type = _get_message_type(info)
1562 except KeyError:
1563 raise ROSBagException('Cannot deserialize messages of type [%s]. Message was not preceeded in bagfile by definition' % info.datatype)
1564
1565
1566 secs = _read_uint32_field(header, 'sec')
1567 nsecs = _read_uint32_field(header, 'nsec')
1568 t = genpy.Time(secs, nsecs)
1569
1570
1571 data = _read_record_data(f)
1572
1573 if raw:
1574 msg = (info.datatype, data, info.md5sum, position, msg_type)
1575 else:
1576
1577 msg = msg_type()
1578 msg.deserialize(data)
1579
1580 yield (topic, msg, t)
1581
1582 self.bag._connection_indexes_read = True
1583
1585 for c in self.bag._connections.itervalues():
1586 if c.datatype == datatype:
1587 connection_id = len(self.bag._connections)
1588 connection_header = { 'topic' : topic, 'type' : c.header['type'], 'md5sum' : c.header['md5sum'], 'message_definition' : c.header['message_definition'] }
1589 connection_info = _ConnectionInfo(connection_id, topic, connection_header)
1590
1591 self.bag._topic_connections[topic] = connection_id
1592 self.bag._connections[connection_id] = connection_info
1593 self.bag._connection_indexes[connection_id] = []
1594 return
1595
1596 raise ROSBagFormatException('Topic %s of datatype %s not preceded by message definition' % (topic, datatype))
1597
1599 if not header:
1600 header = _read_header(self.bag._file, _OP_MSG_DEF)
1601
1602 topic = _read_str_field(header, 'topic')
1603 datatype = _read_str_field(header, 'type')
1604 md5sum = _read_str_field(header, 'md5')
1605 msg_def = _read_str_field(header, 'def')
1606
1607 _skip_sized(self.bag._file)
1608
1609 connection_header = { 'topic' : topic, 'type' : datatype, 'md5sum' : md5sum, 'message_definition' : msg_def }
1610
1611 id = len(self.bag._connections)
1612
1613 return _ConnectionInfo(id, topic, connection_header)
1614
1616 """
1617 Support class for reading indexed v1.2 bag files.
1618 """
1621
1622 - def read_messages(self, topics, start_time, end_time, connection_filter, raw):
1623 connections = self.bag._get_connections(topics, connection_filter)
1624 for entry in self.bag._get_entries(connections, start_time, end_time):
1625 yield self.seek_and_read_message_data_record(entry.offset, raw)
1626
1628 """Generates all bag index information by rereading the message records."""
1629 f = self.bag._file
1630
1631 total_bytes = self.bag.size
1632
1633
1634 self.bag._file.seek(self.bag._file_header_pos)
1635 self.read_file_header_record()
1636
1637 offset = f.tell()
1638
1639
1640 while offset < total_bytes:
1641 yield offset
1642
1643 op = _peek_next_header_op(f)
1644
1645 if op == _OP_MSG_DEF:
1646 connection_info = self.read_message_definition_record()
1647
1648 if connection_info.topic not in self.bag._topic_connections:
1649 self.bag._topic_connections[connection_info.topic] = connection_info.id
1650 self.bag._connections[connection_info.id] = connection_info
1651 self.bag._connection_indexes[connection_info.id] = []
1652
1653 elif op == _OP_MSG_DATA:
1654
1655 header = _read_header(f)
1656
1657 topic = _read_str_field(header, 'topic')
1658 secs = _read_uint32_field(header, 'sec')
1659 nsecs = _read_uint32_field(header, 'nsec')
1660 t = genpy.Time(secs, nsecs)
1661
1662 if topic not in self.bag._topic_connections:
1663 datatype = _read_str_field(header, 'type')
1664 self._create_connection_info_for_datatype(topic, datatype)
1665
1666 connection_id = self.bag._topic_connections[topic]
1667 info = self.bag._connections[connection_id]
1668
1669
1670 _skip_sized(f)
1671
1672
1673 bisect.insort_right(self.bag._connection_indexes[connection_id], _IndexEntry102(t, offset))
1674
1675 elif op == _OP_INDEX_DATA:
1676 _skip_record(f)
1677
1678 offset = f.tell()
1679
1681 try:
1682
1683 self.read_file_header_record()
1684
1685 total_bytes = self.bag.size
1686
1687
1688 if self.bag._index_data_pos == 0:
1689 raise ROSBagUnindexedException()
1690
1691
1692 self.bag._file.seek(self.bag._index_data_pos)
1693
1694
1695 topic_indexes = {}
1696 while True:
1697 pos = self.bag._file.tell()
1698 if pos >= total_bytes:
1699 break
1700
1701 topic, index = self.read_topic_index_record()
1702
1703 topic_indexes[topic] = index
1704
1705
1706 for topic, index in topic_indexes.items():
1707 self.bag._file.seek(index[0].offset)
1708
1709 connection_info = self.read_message_definition_record()
1710
1711 if connection_info.topic not in self.bag._topic_connections:
1712 self.bag._topic_connections[connection_info.topic] = connection_info.id
1713 self.bag._connections[connection_info.id] = connection_info
1714
1715 self.bag._connection_indexes[connection_info.id] = index
1716
1717 self.bag._connection_indexes_read = True
1718
1719 except Exception as ex:
1720 raise ROSBagUnindexedException()
1721
1723 self.bag._file_header_pos = self.bag._file.tell()
1724
1725 header = _read_header(self.bag._file, _OP_FILE_HEADER)
1726
1727 self.bag._index_data_pos = _read_uint64_field(header, 'index_pos')
1728
1729 _skip_sized(self.bag._file)
1730
1732 f = self.bag._file
1733
1734 header = _read_header(f, _OP_INDEX_DATA)
1735
1736 index_version = _read_uint32_field(header, 'ver')
1737 topic = _read_str_field (header, 'topic')
1738 count = _read_uint32_field(header, 'count')
1739
1740 if index_version != 0:
1741 raise ROSBagFormatException('expecting index version 0, got %d' % index_version)
1742
1743 _read_uint32(f)
1744
1745 topic_index = []
1746
1747 for i in range(count):
1748 time = _read_time (f)
1749 offset = _read_uint64(f)
1750
1751 topic_index.append(_IndexEntry102(time, offset))
1752
1753 return (topic, topic_index)
1754
1756 f = self.bag._file
1757
1758
1759 f.seek(position)
1760
1761
1762 while True:
1763 header = _read_header(f)
1764 op = _read_uint8_field(header, 'op')
1765 if op != _OP_MSG_DEF:
1766 break
1767 _skip_sized(f)
1768
1769
1770 if op != _OP_MSG_DATA:
1771 raise ROSBagFormatException('Expecting OP_MSG_DATA, got %d' % op)
1772
1773 topic = _read_str_field(header, 'topic')
1774
1775 connection_id = self.bag._topic_connections[topic]
1776 info = self.bag._connections[connection_id]
1777
1778
1779 try:
1780 msg_type = _get_message_type(info)
1781 except KeyError:
1782 raise ROSBagException('Cannot deserialize messages of type [%s]. Message was not preceeded in bagfile by definition' % info.datatype)
1783
1784
1785 secs = _read_uint32_field(header, 'sec')
1786 nsecs = _read_uint32_field(header, 'nsec')
1787 t = genpy.Time(secs, nsecs)
1788
1789
1790 data = _read_record_data(f)
1791
1792 if raw:
1793 msg = info.datatype, data, info.md5sum, position, msg_type
1794 else:
1795
1796 msg = msg_type()
1797 msg.deserialize(data)
1798
1799 return (topic, msg, t)
1800
1802 """
1803 Support class for reading v2.0 bag files.
1804 """
1806 _BagReader.__init__(self, bag)
1807
1808 self.decompressed_chunk_pos = None
1809 self.decompressed_chunk = None
1810 self.decompressed_chunk_io = None
1811
1813 """
1814 Generates all bag index information by rereading the chunks.
1815 Assumes the file header has been read.
1816 """
1817 f = self.bag._file
1818
1819 f.seek(0, os.SEEK_END)
1820 total_bytes = f.tell()
1821
1822
1823
1824
1825 self._read_terminal_connection_records()
1826
1827
1828 self.bag._file.seek(self.bag._file_header_pos)
1829 self.read_file_header_record()
1830
1831 trunc_pos = None
1832
1833 while True:
1834 chunk_pos = f.tell()
1835 if chunk_pos >= total_bytes:
1836 break
1837
1838 yield chunk_pos
1839
1840 try:
1841 self._reindex_read_chunk(f, chunk_pos, total_bytes)
1842 except Exception as ex:
1843 break
1844
1845 trunc_pos = f.tell()
1846
1847 if trunc_pos and trunc_pos < total_bytes:
1848 f.truncate(trunc_pos)
1849
1851
1852 chunk_header = self.read_chunk_header()
1853
1854
1855 if chunk_header.compressed_size == 0:
1856 raise ROSBagException('unterminated chunk at %d' % chunk_pos)
1857
1858 if chunk_header.compression == Compression.NONE:
1859 chunk_file = f
1860 else:
1861
1862 compressed_chunk = _read(f, chunk_header.compressed_size)
1863
1864
1865 if chunk_header.compression == Compression.BZ2:
1866 self.decompressed_chunk = bz2.decompress(compressed_chunk)
1867 else:
1868 raise ROSBagException('unsupported compression type: %s' % chunk_header.compression)
1869
1870 if self.decompressed_chunk_io:
1871 self.decompressed_chunk_io.close()
1872 self.decompressed_chunk_io = StringIO(self.decompressed_chunk)
1873
1874 chunk_file = self.decompressed_chunk_io
1875
1876
1877 self.bag._curr_chunk_info = None
1878
1879 if chunk_header.compression == Compression.NONE:
1880 offset = chunk_file.tell() - chunk_pos
1881 else:
1882 offset = chunk_file.tell()
1883
1884 expected_index_length = 0
1885
1886 while offset < chunk_header.uncompressed_size:
1887 op = _peek_next_header_op(chunk_file)
1888
1889 if op == _OP_CONNECTION:
1890 connection_info = self.read_connection_record(chunk_file)
1891
1892 if connection_info.id not in self.bag._connections:
1893 self.bag._connections[connection_info.id] = connection_info
1894 if connection_info.id not in self.bag._connection_indexes:
1895 self.bag._connection_indexes[connection_info.id] = []
1896
1897 elif op == _OP_MSG_DATA:
1898
1899 header = _read_header(chunk_file)
1900
1901 connection_id = _read_uint32_field(header, 'conn')
1902 t = _read_time_field (header, 'time')
1903
1904
1905 if not self.bag._curr_chunk_info:
1906 self.bag._curr_chunk_info = _ChunkInfo(chunk_pos, t, t)
1907 else:
1908 if t > self.bag._curr_chunk_info.end_time:
1909 self.bag._curr_chunk_info.end_time = t
1910 elif t < self.bag._curr_chunk_info.start_time:
1911 self.bag._curr_chunk_info.start_time = t
1912 if connection_id in self.bag._curr_chunk_info.connection_counts:
1913 self.bag._curr_chunk_info.connection_counts[connection_id] += 1
1914 else:
1915 self.bag._curr_chunk_info.connection_counts[connection_id] = 1
1916
1917
1918 _skip_sized(chunk_file)
1919
1920
1921 if connection_id not in self.bag._connection_indexes:
1922 raise ROSBagException('connection id (id=%d) in chunk at position %d not preceded by connection record' % (connection_id, chunk_pos))
1923 bisect.insort_right(self.bag._connection_indexes[connection_id], _IndexEntry200(t, chunk_pos, offset))
1924
1925 expected_index_length += 1
1926
1927 else:
1928
1929 _skip_record(chunk_file)
1930
1931 if chunk_header.compression == Compression.NONE:
1932 offset = chunk_file.tell() - chunk_pos
1933 else:
1934 offset = chunk_file.tell()
1935
1936
1937 next_op = _peek_next_header_op(f)
1938
1939 total_index_length = 0
1940
1941 while next_op != _OP_CHUNK:
1942 if next_op == _OP_INDEX_DATA:
1943
1944 _, index = self.read_connection_index_record()
1945 total_index_length += len(index)
1946 else:
1947 _skip_record(f)
1948
1949 if f.tell() >= total_bytes:
1950 if total_index_length != expected_index_length:
1951 raise ROSBagException('index shorter than expected (%d vs %d)' % (total_index_length, expected_index_length))
1952 break
1953
1954 next_op = _peek_next_header_op(f)
1955
1956
1957 self.bag._chunk_headers[chunk_pos] = chunk_header
1958 self.bag._chunks.append(self.bag._curr_chunk_info)
1959
1961 b, f, r = self.bag, self.bag._file, self.bag._reader
1962
1963
1964 f.seek(b._file_header_pos)
1965 r.read_file_header_record()
1966
1967
1968 if self._advance_to_next_record(_OP_CONNECTION):
1969
1970 while True:
1971 connection_info = r.read_connection_record(f)
1972
1973 b._connections[connection_info.id] = connection_info
1974 b._connection_indexes[connection_info.id] = []
1975
1976 next_op = _peek_next_header_op(f)
1977 if next_op != _OP_CONNECTION:
1978 break
1979
1981 b, f = self.bag, self.bag._file
1982
1983 try:
1984 while True:
1985 next_op = _peek_next_header_op(f)
1986 if next_op == op:
1987 break
1988
1989 if next_op == _OP_INDEX_DATA:
1990
1991
1992
1993 if b._curr_chunk_info is None:
1994 b._curr_chunk_info = _ChunkInfo(0, rospy.Time(0, 1), rospy.Time(0, 1))
1995
1996 b._reader.read_connection_index_record()
1997 else:
1998 _skip_record(f)
1999
2000 return True
2001
2002 except Exception as ex:
2003 return False
2004
2006 try:
2007
2008 self.read_file_header_record()
2009
2010
2011 if self.bag._index_data_pos == 0:
2012 raise ROSBagUnindexedException()
2013
2014
2015 self.bag._file.seek(self.bag._index_data_pos)
2016
2017
2018 self.bag._connection_indexes = {}
2019 for i in range(self.bag._connection_count):
2020 connection_info = self.read_connection_record(self.bag._file)
2021 self.bag._connections[connection_info.id] = connection_info
2022 self.bag._connection_indexes[connection_info.id] = []
2023
2024
2025 self.bag._chunks = [self.read_chunk_info_record() for i in range(self.bag._chunk_count)]
2026
2027
2028 self.bag._chunk_headers = {}
2029 for chunk_info in self.bag._chunks:
2030 self.bag._file.seek(chunk_info.pos)
2031 self.bag._chunk_headers[chunk_info.pos] = self.read_chunk_header()
2032
2033 if not self.bag._skip_index:
2034 self._read_connection_index_records()
2035
2036 except Exception as ex:
2037 raise ROSBagUnindexedException()
2038
2040 for chunk_info in self.bag._chunks:
2041 self.bag._file.seek(chunk_info.pos)
2042 _skip_record(self.bag._file)
2043
2044 self.bag._curr_chunk_info = chunk_info
2045 for i in range(len(chunk_info.connection_counts)):
2046 connection_id, index = self.read_connection_index_record()
2047 self.bag._connection_indexes[connection_id].extend(index)
2048
2049
2050
2051
2052 orphan_connection_ids = [id for id, index in self.bag._connection_indexes.iteritems() if not index]
2053 for id in orphan_connection_ids:
2054 del self.bag._connections[id]
2055 del self.bag._connection_indexes[id]
2056
2057 self.bag._connection_indexes_read = True
2058
2059 - def read_messages(self, topics, start_time, end_time, connection_filter, raw):
2060 connections = self.bag._get_connections(topics, connection_filter)
2061 for entry in self.bag._get_entries(connections, start_time, end_time):
2062 yield self.seek_and_read_message_data_record((entry.chunk_pos, entry.offset), raw)
2063
2064
2065
2067 self.bag._file_header_pos = self.bag._file.tell()
2068
2069 header = _read_header(self.bag._file, _OP_FILE_HEADER)
2070
2071 self.bag._index_data_pos = _read_uint64_field(header, 'index_pos')
2072 self.bag._chunk_count = _read_uint32_field(header, 'chunk_count')
2073 self.bag._connection_count = _read_uint32_field(header, 'conn_count')
2074
2075 _skip_sized(self.bag._file)
2076
2078 header = _read_header(f, _OP_CONNECTION)
2079
2080 conn_id = _read_uint32_field(header, 'conn')
2081 topic = _read_str_field (header, 'topic')
2082
2083 connection_header = _read_header(f)
2084
2085 return _ConnectionInfo(conn_id, topic, connection_header)
2086
2088 f = self.bag._file
2089
2090 header = _read_header(f, _OP_CHUNK_INFO)
2091
2092 chunk_info_version = _read_uint32_field(header, 'ver')
2093
2094 if chunk_info_version == 1:
2095 chunk_pos = _read_uint64_field(header, 'chunk_pos')
2096 start_time = _read_time_field (header, 'start_time')
2097 end_time = _read_time_field (header, 'end_time')
2098 connection_count = _read_uint32_field(header, 'count')
2099
2100 chunk_info = _ChunkInfo(chunk_pos, start_time, end_time)
2101
2102 _read_uint32(f)
2103
2104 for i in range(connection_count):
2105 connection_id = _read_uint32(f)
2106 count = _read_uint32(f)
2107
2108 chunk_info.connection_counts[connection_id] = count
2109
2110 return chunk_info
2111 else:
2112 raise ROSBagFormatException('Unknown chunk info record version: %d' % chunk_info_version)
2113
2115 header = _read_header(self.bag._file, _OP_CHUNK)
2116
2117 compression = _read_str_field (header, 'compression')
2118 uncompressed_size = _read_uint32_field(header, 'size')
2119
2120 compressed_size = _read_uint32(self.bag._file)
2121
2122 data_pos = self.bag._file.tell()
2123
2124 return _ChunkHeader(compression, compressed_size, uncompressed_size, data_pos)
2125
2127 f = self.bag._file
2128
2129 header = _read_header(f, _OP_INDEX_DATA)
2130
2131 index_version = _read_uint32_field(header, 'ver')
2132 connection_id = _read_uint32_field(header, 'conn')
2133 count = _read_uint32_field(header, 'count')
2134
2135 if index_version != 1:
2136 raise ROSBagFormatException('expecting index version 1, got %d' % index_version)
2137
2138 record_size = _read_uint32(f)
2139
2140 index = []
2141
2142 for i in range(count):
2143 time = _read_time (f)
2144 offset = _read_uint32(f)
2145
2146 index.append(_IndexEntry200(time, self.bag._curr_chunk_info.pos, offset))
2147
2148 return (connection_id, index)
2149
2151 chunk_pos, offset = position
2152
2153 chunk_header = self.bag._chunk_headers.get(chunk_pos)
2154 if chunk_header is None:
2155 raise ROSBagException('no chunk at position %d' % chunk_pos)
2156
2157 if chunk_header.compression == Compression.NONE:
2158 f = self.bag._file
2159 f.seek(chunk_header.data_pos + offset)
2160 else:
2161 if self.decompressed_chunk_pos != chunk_pos:
2162
2163 self.bag._file.seek(chunk_header.data_pos)
2164 compressed_chunk = _read(self.bag._file, chunk_header.compressed_size)
2165
2166 if chunk_header.compression == Compression.BZ2:
2167 self.decompressed_chunk = bz2.decompress(compressed_chunk)
2168 else:
2169 raise ROSBagException('unsupported compression type: %s' % chunk_header.compression)
2170
2171 self.decompressed_chunk_pos = chunk_pos
2172
2173 if self.decompressed_chunk_io:
2174 self.decompressed_chunk_io.close()
2175 self.decompressed_chunk_io = StringIO(self.decompressed_chunk)
2176
2177 f = self.decompressed_chunk_io
2178 f.seek(offset)
2179
2180
2181 while True:
2182 header = _read_header(f)
2183 op = _read_uint8_field(header, 'op')
2184 if op != _OP_CONNECTION:
2185 break
2186 _skip_sized(f)
2187
2188
2189 if op != _OP_MSG_DATA:
2190 raise ROSBagFormatException('Expecting OP_MSG_DATA, got %d' % op)
2191
2192 connection_id = _read_uint32_field(header, 'conn')
2193 t = _read_time_field (header, 'time')
2194
2195
2196 connection_info = self.bag._connections[connection_id]
2197 try:
2198 msg_type = _get_message_type(connection_info)
2199 except KeyError:
2200 raise ROSBagException('Cannot deserialize messages of type [%s]. Message was not preceded in bag by definition' % connection_info.datatype)
2201
2202
2203 data = _read_record_data(f)
2204
2205
2206 if raw:
2207 msg = connection_info.datatype, data, connection_info.md5sum, (chunk_pos, offset), msg_type
2208 else:
2209 msg = msg_type()
2210 msg.deserialize(data)
2211
2212 return (connection_info.topic, msg, t)
2213
2215 secs_frac = secs - int(secs)
2216 secs_frac_str = ('%.2f' % secs_frac)[1:]
2217
2218 return time.strftime('%b %d %Y %H:%M:%S', time.localtime(secs)) + secs_frac_str
2219
2221 multiple = 1024.0
2222 for suffix in ['KB', 'MB', 'GB', 'TB', 'PB', 'EB', 'ZB', 'YB']:
2223 size /= multiple
2224 if size < multiple:
2225 return '%.1f %s' % (size, suffix)
2226
2227 return '-'
2228
2230 multiple = 1000.0
2231 for suffix in ['Hz', 'kHz', 'MHz', 'GHz', 'THz', 'PHz', 'EHz', 'ZHz', 'YHz']:
2232 if freq < multiple:
2233 return '%.1f %s' % (freq, suffix)
2234 freq /= multiple
2235
2236 return '-'
2237
2240 """
2241 Perform an N-way merge operation on sorted lists.
2242
2243 @param list_of_lists: (really iterable of iterable) of sorted elements
2244 (either by naturally or by C{key})
2245 @param key: specify sort key function (like C{sort()}, C{sorted()})
2246 @param iterfun: function that returns an iterator.
2247
2248 Yields tuples of the form C{(item, iterator)}, where the iterator is the
2249 built-in list iterator or something you pass in, if you pre-generate the
2250 iterators.
2251
2252 This is a stable merge; complexity O(N lg N)
2253
2254 Examples::
2255
2256 print list(x[0] for x in mergesort([[1,2,3,4],
2257 [2,3.5,3.7,4.5,6,7],
2258 [2.6,3.6,6.6,9]]))
2259 [1, 2, 2, 2.6, 3, 3.5, 3.6, 3.7, 4, 4.5, 6, 6.6, 7, 9]
2260
2261 # note stability
2262 print list(x[0] for x in mergesort([[1,2,3,4],
2263 [2,3.5,3.7,4.5,6,7],
2264 [2.6,3.6,6.6,9]], key=int))
2265 [1, 2, 2, 2.6, 3, 3.5, 3.6, 3.7, 4, 4.5, 6, 6.6, 7, 9]
2266
2267 print list(x[0] for x in mergesort([[4,3,2,1],
2268 [7,6.5,4,3.7,3.3,1.9],
2269 [9,8.6,7.6,6.6,5.5,4.4,3.3]],
2270 key=lambda x: -x))
2271 [9, 8.6, 7.6, 7, 6.6, 6.5, 5.5, 4.4, 4, 4, 3.7, 3.3, 3.3, 3, 2, 1.9, 1]
2272 """
2273
2274 heap = []
2275 for i, itr in enumerate(iter(pl) for pl in list_of_lists):
2276 try:
2277 item = itr.next()
2278 toadd = (key(item), i, item, itr) if key else (item, i, itr)
2279 heap.append(toadd)
2280 except StopIteration:
2281 pass
2282 heapq.heapify(heap)
2283
2284 if key:
2285 while heap:
2286 _, idx, item, itr = heap[0]
2287 yield item, itr
2288 try:
2289 item = itr.next()
2290 heapq.heapreplace(heap, (key(item), idx, item, itr) )
2291 except StopIteration:
2292 heapq.heappop(heap)
2293
2294 else:
2295 while heap:
2296 item, idx, itr = heap[0]
2297 yield item, itr
2298 try:
2299 heapq.heapreplace(heap, (itr.next(), idx, itr))
2300 except StopIteration:
2301 heapq.heappop(heap)
2302
2304 """
2305 A file facade for the bz2.BZ2Compressor.
2306 """
2308 self.file = file
2309 self.compressor = bz2.BZ2Compressor()
2310 self.compressed_bytes_in = 0
2311
2313 compressed = self.compressor.compress(data)
2314 if len(compressed) > 0:
2315 self.file.write(compressed)
2316 self.compressed_bytes_in += len(data)
2317
2319 compressed = self.compressor.flush()
2320 if len(compressed) > 0:
2321 self.file.write(compressed)
2322
2336