cx_rl_gym.cx_rl_gym module

CXRLGym: ROS 2 Reinforcement Learning Environment.

This module defines the CXRLGym class — a ROS 2-integrated reinforcement learning environment following the Gymnasium API.

It bridges ROS 2 cognitive execution (CX) systems and reinforcement learning frameworks such as Stable Baselines3. Agents interact through Gym-compatible step() and reset() interfaces, while environment state and actions are managed via ROS 2 services and actions.

class cx_rl_gym.cx_rl_gym.CXRLGym(*args: Any, **kwargs: Any)

Bases: Env

ROS 2 integrated Gymnasium environment for reinforcement learning.

This environment dynamically constructs its observation and action spaces from ROS 2 services and executes symbolic robot actions through ROS 2 actions.

Parameters

node (rclpy.node.Node): ROS 2 node used to create service and action clients. mode (str): Reinforcement learning mode (e.g., “TRAINING” or “EXECUTION”).

action_masks() numpy.typing.NDArray.numpy.int8
action_selection_cancel_done(future: rclpy.task.Future, robot: str) None

Invoke via callback once a cancel request for action selection is completed.

Parameters

futureFuture

The future representing the cancel result.

robotstr

Name of the robot for which the cancel applies.

action_selection_feedback_callback(feedback_msg: cx_rl_interfaces.action.ActionSelection.Feedback, robot: str) None

Handle feedback messages during action selection via callback.

If the feedback indicates that an action selection fact was asserted, the robot is unlocked for further use.

Parameters

feedback_msgActionSelection.Feedback

Feedback message from the action.

robotstr

Name of the robot for which feedback applies.

action_selection_get_result_callback(future: rclpy.task.Future, robot: str) None

Handle action selection results via callback.

Parameters

futureFuture

The future containing the action result.

robotstr

Name of the robot for which the result applies.

action_selection_goal_response_callback(future: rclpy.task.Future, robot: str) None

Handle goal responses from the action selection server via callback.

Parameters

futureFuture

The future representing the goal response.

robotstr

name of the robot associated with this goal.

close() None

Clean up environment resources.

create_observable_dict(names: list[str], param_counts: list[int], param_names: list[str], param_types: list[str]) dict

Construct a dictionary describing entities and their parameters.

This method takes parallel lists describing either observable predicates (for observation space) or parameterized actions (for action space), and aggregates them into a nested dictionary for easier lookup.

Parameters

nameslist[str]

Names of the predicates or actions.

param_countslist[int]

Number of parameters for each predicate or action.

param_nameslist[str]

Flat list of parameter names for all predicates or actions.

param_typeslist[str]

Flat list of parameter types corresponding to each name.

Returns

dict

A mapping from entity names (predicates or actions) to dictionaries of parameter names and types.

encode_name_args(name: str, args: list[str]) str
exec_action_selection(request: cx_rl_interfaces.srv.ExecActionSelection.Request, response: cx_rl_interfaces.srv.ExecActionSelection.Response) cx_rl_interfaces.srv.ExecActionSelection.Response

Execute the RL-based action selection process.

This method receives a serialized environment state, converts it into an observation for the RL model, predicts the next action using the model, and fills the ROS 2 service response with the selected action ID.

Parameters

requestExecActionSelection.Request

The incoming request containing the serialized environment state and a list of executable actions.

responseExecActionSelection.Response

The response object to populate.

Returns

ExecActionSelection.Response

The populated response containing the selected action ID.

expand_grid(dictionary: dict) pandas.DataFrame

Generate a Cartesian product (grid) of all dictionary value combinations.

Each key in the input dictionary represents a column, and its list of values defines the possible entries.

Parameters

dictionarydict

A dictionary where each key maps to a list of possible values.

Returns

pandas.DataFrame

A DataFrame containing one row per value combination.

generate_action_space() list[str]

Generate the RL action space dynamically.

This includes predefined actions as well as actions with parameters, which are expanded over all currently observable objects of the corresponding types.

Returns

list[str]

A list of action strings for the RL environment.

generate_observation_space() list[str]
get_action_list_executable_for_robot(robot: str) dict

Retrieve the list of executable actions for a specific robot.

Calls the GetActionListRobot service for the given robot and waits for the result.

Parameters

robotstr

The robot identifier for which to retrieve executable actions.

Returns

dict

A mapping of action names to action IDs for the specified robot.

get_current_observations() str

Create a new RL environment state.

Calls the GetObservations service to generate a new environment state for reinforcement learning execution.

Returns

str: The generated environment state as a serialized string.

get_episode_end() tuple[bool, int]

Check whether the current RL episode has ended.

Calls the CheckForEpisodeEnd service to determine if the episode should terminate and obtain the corresponding reward.

Returns

Tuple[bool, int]: (episode_end, reward)
  • episode_end (bool): Whether the episode has ended.

  • reward (int): Reward for the transition.

get_free_robot() tuple[bool, str]

Retrieve a currently available robot.

Sends a goal to the GetFreeRobot action server and waits for the result.

Returns

boool

Indicates whether the retrieval was successful

str

The name or identifier of the free robot.

get_free_robot_cancel_done(future: rclpy.task.Future) None

Invoke via callback once a cancel request for GetFreeRobot is completed.

Parameters

futureFuture

The future representing the cancel result.

get_free_robot_feedback_callback(feedback_msg: cx_rl_interfaces.action.GetFreeRobot.Feedback) None

Handle feedback messages from the GetFreeRobot action via callback.

Parameters

feedback_msgGetFreeRobot.Feedback

The feedback message received from the action.

get_free_robot_get_result_callback(future: rclpy.task.Future) None

Handle the result of the GetFreeRobot action via callback.

Parameters

futureFuture

The future containing the action result.

get_free_robot_goal_response_callback(future: rclpy.task.Future) None

Handle goal responses from the GetFreeRobot action server via callback.

Parameters

futureFuture

The future representing the goal response.

get_id(name: str, args: list[str]) str
get_observable_actions() dict

Retrieve all observable actions and their parameters.

Calls the GetObservableActions service and constructs a dictionary mapping predicate names to their parameter information.

Returns

dict: Dictionary where keys are predicate names and values describe parameter

counts, names, and types.

get_observable_objects(obj_type: str) list[str]

Retrieve all observable objects of a given type.

Calls the GetObservableObjects service to get a list of currently observable objects matching the specified type.

Parameters

obj_typestr

The type of observable object (e.g., “robot”, “item”).

Returns

list[str]

List of observable object names.

get_observable_predicates() dict

Retrieve all observable predicates and their parameters.

Calls the GetObservablePredicates service and constructs a dictionary mapping predicate names to their parameter information.

Returns

dict: Dictionary where keys are predicate names and values describe parameter

counts, names, and types.

get_observation() numpy.typing.NDArray.numpy.float32

Generate the current RL observation vector.

Creates a new RL environment state using get_current_observations(), parses the returned fact string into Python objects, and converts them into a numerical observation vector suitable for the RL model.

Returns

numpy.ndarray

The observation vector as a NumPy array of type float32.

get_observation_encoding_fron_ids(obs_facts) numpy.typing.NDArray.numpy.float32

Convert a set of observation ids into a binary state vector.

Each id is matched against the internal observation dictionary. The corresponding index is set to 1.0 if the fact is present, 0.0 otherwise.

Parameters

obs_factsiterable

Collection of fact strings representing the current state.

Returns

numpy.ndarray

Binary state vector as a NumPy array of type float32.

get_predefined_actions() list

Retrieve the list of predefined observables.

Calls the GetPredefinedActions service to get predefined observables used for RL environment construction or monitoring.

Returns

list: A list of predefined actions.

get_predefined_observables() list

Retrieve the list of predefined observables.

Calls the GetPredefinedObservables service to get predefined observables used for RL environment construction or monitoring.

Returns

list: A list of predefined observables.

get_status(request: cx_rl_interfaces.srv.GetStatus.Request, response: cx_rl_interfaces.srv.GetStatus.Response) cx_rl_interfaces.srv.GetStatus.Response

Fetch information regarding environment and training progress.

This method receives a serialized environment state, converts it into an observation for the RL model, predicts the next action using the model, and fills the ROS 2 service response with the selected action ID.

Parameters

requestGetStatus.Request

Empty request.

responseGetStatus.Response

The response object to populate.

Returns

GetStatus.Response

Information for status

on_training_end()
render()

Render the environment (no-op).

This environment is symbolic and does not provide visualization.

reset(seed: int = None, options: dict[str, any] = None) tuple[numpy.typing.NDArray.numpy.float32, dict]

Reset the environment and return the initial observation.

Calls the /reset_env action and queries /get_current_observations for the initial environment state.

Parameters

seedint, optional

Random seed for reproducibility.

optionsdict, optional

Additional reset parameters.

Returns

tuple
observationnp.ndarray

The initial observation.

infodict

Metadata about the reset process.

reset_env() str

Reset the CLIPS executive (CX) node.

Sends a goal to the ResetEnv action server and waits for completion.

Returns

str: Confirmation message from the CX node after reset.

reset_env_cancel_done(future: rclpy.task.Future) None

Invoke once a cancel request for ResetEnv is completed via callback.

Parameters

futureFuture

The future representing the cancel result.

reset_env_feedback_callback(feedback_msg: cx_rl_interfaces.action.ResetEnv.Feedback) None

Handle feedback messages from the ResetEnv action via callback.

Parameters

feedback_msgResetEnv.Feedback

The feedback message received from the action.

reset_env_get_result_callback(future: rclpy.task.Future) None

Handle the result returned from the ResetEnv action via callback.

Parameters

futureFuture

The future containing the action result.

reset_env_goal_response_callback(future: rclpy.task.Future) None

Handle the goal response from the ResetEnv action server via callback.

Parameters

futureFuture

The future object representing the goal response.

set_rl_model(model) None

Attach an RL model for autonomous action selection.

Registers a service /exec_action_selection to let ROS query the model when actions are needed.

Parameters

modelobject

Trained RL model supporting predict(observation).

step(action: int) tuple[numpy.typing.NDArray.numpy.float32, int, bool, bool, dict]

Execute one environment step.

This sends an action to the CX system and returns the resulting observation, reward, and done state.

Parameters

actionint

Index of the chosen action.

Returns

tuple
observationnp.ndarray

The next state observation.

rewardfloat

Scalar reward.

terminatedbool

Whether the episode ended normally.

truncatedbool

Whether the episode ended early.

infodict

Additional debug information.

unpack_transmitted_actions(action_list: list[cx_rl_interfaces.msg.Action], action_ids: list[str])