rko_lio.lio_pipeline module¶
Equivalent logic to the ros wrapper’s message buffering. A convenience class to buffer IMU and LiDAR messages to ensure the core cpp implementation always gets the data in sync. The difference is this is not multi-threaded, therefore is a bit slower.
- class rko_lio.lio_pipeline.LIOPipeline(config: PipelineConfig)¶
- Bases: - object- Minimal sequential pipeline for LIO processing with out-of-sync IMU/lidar. Buffers are managed internally; data is added via add_imu and add_lidar. When IMU data covers an already available lidar frame, registration is triggered. - add_imu(time: float, acceleration: numpy.ndarray, angular_velocity: numpy.ndarray)¶
- Add IMU measurement to pipeline (will be buffered until processed by lidar). - Parameters:
- time (float) – Measurement timestamp in seconds. 
- acceleration (array of float, shape (3,)) – Acceleration vector in m/s^2. 
- angular_velocity (array of float, shape (3,)) – Angular velocity in rad/s. 
 
 
 - add_lidar(scan: numpy.ndarray, timestamps: numpy.ndarray)¶
- Add a lidar point cloud and absolute timestamps. Scan start and end times are inferred from the timestamps vector. - Parameters:
- scan (array of float, shape (N,3)) – Point cloud. 
- timestamps (array of float, shape (N,)) – Absolute timestamps (seconds) for each point. 
 
 
 - dump_results_to_disk()¶
- Write LIO results to disk under LIOPipeline.output_dir. - Writes: - Trajectory (timestamps and poses) in TUM format text file. - Configuration as YAML file. 
 - property output_dir: Path¶
- The directory used for file logging if enabled. Folder is {log_dir}/{run_name}_{index}. Automatically bumps the index (from 0) if similar names exist, to avoid overwriting. 
 
- rko_lio.lio_pipeline.log_vector(rerun, entity_path_prefix: str, vector)¶
- Logs a vector as three scalar time-series in rerun. - Parameters:
- rerun – rerun module 
- entity_path_prefix – Base path for scalar logs (e.g. “imu/avg_acceleration”) 
- vector – Iterable or np.ndarray with 3 elements (x, y, z) 
 
 
- rko_lio.lio_pipeline.log_vector_columns(rerun, entity_path_prefix: str, times: numpy.ndarray, vectors: numpy.ndarray)¶
- Log a batch of 3D vectors over multiple timestamps in rerun, sending one column batch per vector axis. - Parameters:
- rerun – rerun module or rerun instance. 
- entity_path_prefix – base path e.g. ‘imu/acceleration’. 
- times – 1D np.ndarray of timestamps (float64). 
- vectors – 2D np.ndarray, shape (N, 3) where columns are x,y,z.