cx_rl_gym.cx_rl_gym module
CXRLGym: ROS 2 Reinforcement Learning Environment.
This module defines the CXRLGym class — a ROS 2-integrated
reinforcement learning environment following the Gymnasium API.
It bridges ROS 2 cognitive execution (CX) systems and reinforcement learning frameworks such as Stable Baselines3. Agents interact through Gym-compatible step() and reset() interfaces, while environment state and actions are managed via ROS 2 services and actions.
- class cx_rl_gym.cx_rl_gym.CXRLGym(*args: Any, **kwargs: Any)
Bases:
EnvROS 2 integrated Gymnasium environment for reinforcement learning.
This environment dynamically constructs its observation and action spaces from ROS 2 services and executes symbolic robot actions through ROS 2 actions.
Parameters
node (rclpy.node.Node): ROS 2 node used to create service and action clients. mode (str): Reinforcement learning mode (e.g., “train” or “eval”). number_robots (int): Number of managed robots in the environment.
- action_masks() numpy.typing.NDArray.numpy.int8
- action_selection_cancel_done(future: rclpy.task.Future, robot_index: int) None
Invoke via callback once a cancel request for action selection is completed.
Parameters
- futureFuture
The future representing the cancel result.
- robot_indexint
Index of the robot for which the cancel applies.
- action_selection_feedback_callback(feedback_msg: cx_rl_interfaces.action.ActionSelection.Feedback, robot_index: int) None
Handle feedback messages during action selection via callback.
If the feedback indicates that an action selection fact was asserted, the robot is unlocked for further use.
Parameters
- feedback_msgActionSelection.Feedback
Feedback message from the action.
- robot_indexint
Index of the robot for which feedback applies.
- action_selection_get_result_callback(future: rclpy.task.Future, robot_index: int) None
Handle action selection results via callback.
Parameters
- futureFuture
The future containing the action result.
- robot_indexint
Index of the robot for which the result applies.
- action_selection_goal_response_callback(future: rclpy.task.Future, robot_index: int) None
Handle goal responses from the action selection server via callback.
Parameters
- futureFuture
The future representing the goal response.
- robot_indexint
Index of the robot associated with this goal.
- close() None
Clean up environment resources.
- create_observable_predicate_dict(predicate_names: list[str], param_counts: list[int], param_names: list[str], param_types: list[str]) dict
Construct a dictionary describing observable predicates and their parameters.
This method takes parallel lists describing predicates and their parameters, and aggregates them into a nested dictionary for easier lookup.
Parameters
- predicate_nameslist[str]
Names of the predicates.
- param_countslist[int]
Number of parameters for each predicate.
- param_nameslist[str]
Flat list of parameter names for all predicates.
- param_typeslist[str]
Flat list of parameter types corresponding to each name.
Returns
- dict
A mapping from predicate names to dictionaries of parameter names and types.
- create_rl_action_space() str
Create a new RL action space.
Calls the create_rl_action_space service to obtain a list of all available actions.
Returns
str: The generated environment state as a serialized string.
- create_rl_env_state() str
Create a new RL environment state.
Calls the CreateRLEnvState service to generate a new environment state for reinforcement learning execution.
Returns
str: The generated environment state as a serialized string.
- exec_action_selection(request: cx_rl_interfaces.srv.ExecActionSelection.Request, response: cx_rl_interfaces.srv.ExecActionSelection.Response) cx_rl_interfaces.srv.ExecActionSelection.Response
Execute the RL-based action selection process.
This method receives a serialized environment state, converts it into an observation for the RL model, predicts the next action using the model, and fills the ROS 2 service response with the selected action ID.
Parameters
- requestExecActionSelection.Request
The incoming request containing the serialized environment state and a list of executable actions.
- responseExecActionSelection.Response
The response object to populate.
Returns
- ExecActionSelection.Response
The populated response containing the selected action ID.
- expand_grid(dictionary: dict) pandas.DataFrame
Generate a Cartesian product (grid) of all dictionary value combinations.
Each key in the input dictionary represents a column, and its list of values defines the possible entries.
Parameters
- dictionarydict
A dictionary where each key maps to a list of possible values.
Returns
- pandas.DataFrame
A DataFrame containing one row per value combination.
- generate_observation_space() list[str]
- get_action_list_executable() dict
Retrieve the list of all executable actions.
Calls the GetActionList service to obtain all available actions that can currently be executed by the system.
Returns
dict: A mapping of action names to their corresponding action IDs.
- get_action_list_executable_for_robot(robot: str) dict
Retrieve the list of executable actions for a specific robot.
Calls the GetActionListRobot service for the given robot and waits for the result.
Parameters
- robotstr
The robot identifier for which to retrieve executable actions.
Returns
- dict
A mapping of action names to action IDs for the specified robot.
- get_episode_end() tuple[bool, int]
Check whether the current RL episode has ended.
Calls the CheckForEpisodeEnd service to determine if the episode should terminate and obtain the corresponding reward.
Returns
- Tuple[bool, int]: (episode_end, reward)
episode_end (bool): Whether the episode has ended.
reward (int): Reward for the transition.
- get_free_robot() str
Retrieve a currently available robot.
Sends a goal to the GetFreeRobot action server and waits for the result.
Returns
- str
The name or identifier of the free robot.
- get_free_robot_cancel_done(future: rclpy.task.Future) None
Invoke via callback once a cancel request for GetFreeRobot is completed.
Parameters
- futureFuture
The future representing the cancel result.
- get_free_robot_feedback_callback(feedback_msg: cx_rl_interfaces.action.GetFreeRobot.Feedback) None
Handle feedback messages from the GetFreeRobot action via callback.
Parameters
- feedback_msgGetFreeRobot.Feedback
The feedback message received from the action.
- get_free_robot_get_result_callback(future: rclpy.task.Future) None
Handle the result of the GetFreeRobot action via callback.
Parameters
- futureFuture
The future containing the action result.
- get_free_robot_goal_response_callback(future: rclpy.task.Future) None
Handle goal responses from the GetFreeRobot action server via callback.
Parameters
- futureFuture
The future representing the goal response.
- get_observable_objects(obj_type: str) list[str]
Retrieve all observable objects of a given type.
Calls the GetObservableObjects service to get a list of currently observable objects matching the specified type.
Parameters
- obj_typestr
The type of observable object (e.g., “robot”, “item”).
Returns
- list[str]
List of observable object names.
- get_observable_predicates() dict
Retrieve all observable predicates and their parameters.
Calls the GetObservablePredicates service and constructs a dictionary mapping predicate names to their parameter information.
Returns
- dict: Dictionary where keys are predicate names and values describe parameter
counts, names, and types.
- get_observation() numpy.typing.NDArray.numpy.float32
Generate the current RL observation vector.
Creates a new RL environment state using create_rl_env_state(), parses the returned fact string into Python objects, and converts them into a numerical observation vector suitable for the RL model.
Returns
- numpy.ndarray
The observation vector as a NumPy array of type float32.
- get_predefined_observables() list
Retrieve the list of predefined observables.
Calls the GetPredefinedObservables service to get predefined observables used for RL environment construction or monitoring.
Returns
list: A list of predefined observables.
- get_state_from_facts(obs_facts) numpy.typing.NDArray.numpy.float32
Convert a set of observation facts into a binary state vector.
Each fact is matched against the internal observation dictionary. The corresponding index is set to 1.0 if the fact is present, 0.0 otherwise.
Parameters
- obs_factsiterable
Collection of fact strings representing the current state.
Returns
- numpy.ndarray
Binary state vector as a NumPy array of type float32.
- render()
Render the environment (no-op).
This environment is symbolic and does not provide visualization.
- reset(seed: int = None, options: dict[str, any] = None) tuple[numpy.typing.NDArray.numpy.float32, dict]
Reset the environment and return the initial observation.
Calls the /reset_cx action and queries /create_rl_env_state for the initial environment state.
Parameters
- seedint, optional
Random seed for reproducibility.
- optionsdict, optional
Additional reset parameters.
Returns
- tuple
- observationnp.ndarray
The initial observation.
- infodict
Metadata about the reset process.
- reset_cx() str
Reset the CLIPS executive (CX) node.
Sends a goal to the ResetCX action server and waits for completion.
Returns
str: Confirmation message from the CX node after reset.
- reset_cx_cancel_done(future: rclpy.task.Future) None
Invoke once a cancel request for ResetCX is completed via callback.
Parameters
- futureFuture
The future representing the cancel result.
- reset_cx_feedback_callback(feedback_msg: cx_rl_interfaces.action.ResetCX.Feedback) None
Handle feedback messages from the ResetCX action via callback.
Parameters
- feedback_msgResetCX.Feedback
The feedback message received from the action.
- reset_cx_get_result_callback(future: rclpy.task.Future) None
Handle the result returned from the ResetCX action via callback.
Parameters
- futureFuture
The future containing the action result.
- reset_cx_goal_response_callback(future: rclpy.task.Future) None
Handle the goal response from the ResetCX action server via callback.
Parameters
- futureFuture
The future object representing the goal response.
- set_rl_mode(mode: str) str
Set the current reinforcement learning (RL) mode on the remote system.
This method calls the SetRLMode ROS 2 service and waits until a confirmation is received. It blocks until the service is ready and the asynchronous call completes.
Parameters
- modestr
The RL mode to set (e.g., “train”, “eval”).
Returns
- str
Confirmation message returned by the service.
- set_rl_model(model) None
Attach an RL model for autonomous action selection.
Registers a service /exec_action_selection to let ROS query the model when actions are needed.
Parameters
- modelobject
Trained RL model supporting
predict(observation).
- step(action: int) tuple[numpy.typing.NDArray.numpy.float32, int, bool, bool, dict]
Execute one environment step.
This sends an action to the CX system and returns the resulting observation, reward, and done state.
Parameters
- actionint
Index of the chosen action.
Returns
- tuple
- observationnp.ndarray
The next state observation.
- rewardfloat
Scalar reward.
- terminatedbool
Whether the episode ended normally.
- truncatedbool
Whether the episode ended early.
- infodict
Additional debug information.
- unpack_transmitted_action_list(action_list: list[str]) dict
Parse a list of encoded action strings into a dictionary mapping names to IDs.
Each action string is expected to have the format:
"<action_id>|<action_name>".Parameters
- action_listlist[str]
List of encoded action strings.
Returns
- dict
Mapping from action names to action IDs.