rmf_demos_panel.rmf_msg_observer module
- class rmf_demos_panel.rmf_msg_observer.AsyncRmfMsgObserver(callback_fn: Callable[[dict], None], server_url: str = 'localhost', server_port: str = '7878', msg_filters: Dict[RmfMsgType, List[str]] = {})
Bases:
object
This helper class filters the messy msg from RMF server and only return the useful data according to the provided filter args Note that this is a blocking class since spin is done internally. :param callback_fn: function callback when a filtered msg is received :param server_url: websocket server address :param server_port: websocket server port number :param json_str: input json string data :param msg_type: type of msg which is to be filter out :param data_filter: detailed filter different levels of the data obj
- spin(future=<Future pending>)
- class rmf_demos_panel.rmf_msg_observer.RmfMsgType
Bases:
object
- FleetLog = 'fleet_log_update'
- FleetState = 'fleet_state_update'
- TaskLog = 'task_log_update'
- TaskState = 'task_state_update'
- rmf_demos_panel.rmf_msg_observer.filter_rmf_msg(json_str: str, filters: Dict[RmfMsgType, List[str]] = {}) Tuple[RmfMsgType, Dict] | None