rqt_bag.rosbag2 module

A rosbag abstraction with functionality required by rqt_bag.

class rqt_bag.rosbag2.Entry(topic, data, timestamp)

Bases: tuple

data

Alias for field number 1

timestamp

Alias for field number 2

topic

Alias for field number 0

class rqt_bag.rosbag2.Rosbag2(bag_path, recording=False, topics={}, serialization_format='cdr', storage_id=rosbag2_py.get_default_storage_id)

Bases: object

deserialize_entry(entry)

Deserialize a bag entry into its corresponding ROS message.

entries_in_range_generator(t_start: rclpy.time.Time, t_end: rclpy.time.Time, topic: str | Iterable[str] | None = None, progress_cb: Callable[[int], None] | None = None) Iterator[Entry]

Get a generator of all entries in a given time range, sorted by receive stamp.

Parameters:
  • t_start – stamp to start at, ‘’rclpy.time.Time’’

  • t_end – stamp to end at, ‘’rclpy.time.Time’’

  • topic – topic or list of topics to query (if None, all topics are), ‘’list(str)’’

  • progress_cb – callback function to report progress, called once per each percent.

Returns:

generator of entries in the bag file, ‘’Generator(Entry)’’

estimate_num_entries_in_range(t_start: rclpy.time.Time, t_end: rclpy.time.Time, topic: str | Iterable[str] | None = None) int

Estimate the number of entries in the given time range.

The computation is only approximate, based on the assumption that messages are distributed evenly across the whole bag on every topic.

Parameters:
  • t_start – stamp to start at, ‘’rclpy.time.Time’’

  • t_end – stamp to end at, ‘’rclpy.time.Time’’

  • topic – topic or list of topics to query (if None, all topics are), ‘’list(str)’’

Returns:

the approximate number of entries, ‘’int’’

get_earliest_timestamp()

Get the timestamp of the earliest message in the bag.

get_entries_in_range(t_start: rclpy.time.Time, t_end: rclpy.time.Time, topic: str | Iterable[str] | None = None, progress_cb: Callable[[int], None] | None = None) List[Entry] | None

Get a list of all entries in a given time range, sorted by receive stamp.

Do not use this function for large bags. It will load all entries into memory. Use entries_in_range_generator() instead and process the data as they are returned.

Parameters:
  • t_start – stamp to start at, ‘’rclpy.time.Time’’

  • t_end – stamp to end at, ‘’rclpy.time.Time’’

  • topic – topic or list of topics to query (if None, all topics are), ‘’list(str)’’

  • progress_cb – callback function to report progress, called once per each percent.

Returns:

entries in the bag file, ‘’list(Entry)’’

get_entry(timestamp, topic=None)

Get the (serialized) entry for a specific timestamp.

Returns the entry that is closest in time (<=) to the provided timestamp.

get_entry_after(timestamp, topic=None)

Get the next entry after a given timestamp.

get_latest_timestamp()

Get the timestamp of the most recent message in the bag.

get_topic_metadata(topic)

Get the full metadata for a given topic name.

get_topic_type(topic)

Get the topic type for a given topic name.

get_topics()

Get all of the topics used in this bag.

get_topics_by_type()

Return a map of topic data types to a list of topics publishing that type.

read_next()
set_latest_timestamp(t)
size()

Get the size of the rosbag.