rko_lio.lio_pipeline module

Equivalent logic to the ros node but mostly synchronous. Only the rerun viz is multi-threaded.

class rko_lio.lio_pipeline.LIOPipeline(config: PipelineConfig, map_log_period_s: float = 1.0)

Bases: object

Minimal sequential pipeline for LIO processing.

add_imu(time: int, acceleration: numpy.ndarray, angular_velocity: numpy.ndarray)

Add IMU measurement to pipeline (will be buffered until processed by lidar).

Parameters:
  • time (int) – Measurement timestamp in nanoseconds (absolute, since the unix epoch).

  • acceleration (array of float, shape (3,)) – Acceleration vector in m/s^2.

  • angular_velocity (array of float, shape (3,)) – Angular velocity in rad/s.

close()
cloud_log_loop()
dump_results_to_disk()

Write LIO results to disk under LIOPipeline.output_dir.

Writes: - Trajectory (timestamps and poses) in TUM format text file. - Configuration as YAML file.

property output_dir: Path

The directory used for file logging if enabled. Folder is {log_dir}/{run_name}_{index}. Automatically bumps the index (from 0) if similar names exist, to avoid overwriting.

register_scan(**kwargs)
visualize(**kwargs)
class rko_lio.lio_pipeline.LatestMailbox

Bases: object

Single-slot, latest-wins handoff between producer and one consumer. Producer never blocks.

close()
get()
put(item)