rmf_demos_panel.rmf_msg_observer module

class rmf_demos_panel.rmf_msg_observer.AsyncRmfMsgObserver(callback_fn: Callable[[dict], None], server_url: str = 'localhost', server_port: str = '7878', msg_filters: Dict[RmfMsgType, List[str]] = {})

Bases: object

This helper class filters the messy msg from RMF server and only return the useful data according to the provided filter args Note that this is a blocking class since spin is done internally. :param callback_fn: function callback when a filtered msg is received :param server_url: websocket server address :param server_port: websocket server port number :param json_str: input json string data :param msg_type: type of msg which is to be filter out :param data_filter: detailed filter different levels of the data obj

spin(future=<Future pending>)
class rmf_demos_panel.rmf_msg_observer.RmfMsgType

Bases: object

FleetLog = 'fleet_log_update'
FleetState = 'fleet_state_update'
TaskLog = 'task_log_update'
TaskState = 'task_state_update'
rmf_demos_panel.rmf_msg_observer.filter_rmf_msg(json_str: str, filters: Dict[RmfMsgType, List[str]] = {}) Tuple[RmfMsgType, Dict] | None