cx_rl_gym.cx_rl_gym module

CXRLGym: ROS 2 Reinforcement Learning Environment.

This module defines the CXRLGym class — a ROS 2-integrated reinforcement learning environment following the Gymnasium API.

It bridges ROS 2 cognitive execution (CX) systems and reinforcement learning frameworks such as Stable Baselines3. Agents interact through Gym-compatible step() and reset() interfaces, while environment state and actions are managed via ROS 2 services and actions.

class cx_rl_gym.cx_rl_gym.CXRLGym(*args: Any, **kwargs: Any)

Bases: Env

ROS 2 integrated Gymnasium environment for reinforcement learning.

This environment dynamically constructs its observation and action spaces from ROS 2 services and executes symbolic robot actions through ROS 2 actions.

Parameters

node (rclpy.node.Node): ROS 2 node used to create service and action clients. mode (str): Reinforcement learning mode (e.g., “train” or “eval”). number_robots (int): Number of managed robots in the environment.

action_masks() numpy.typing.NDArray.numpy.int8
action_selection_cancel_done(future: rclpy.task.Future, robot_index: int) None

Invoke via callback once a cancel request for action selection is completed.

Parameters

futureFuture

The future representing the cancel result.

robot_indexint

Index of the robot for which the cancel applies.

action_selection_feedback_callback(feedback_msg: cx_rl_interfaces.action.ActionSelection.Feedback, robot_index: int) None

Handle feedback messages during action selection via callback.

If the feedback indicates that an action selection fact was asserted, the robot is unlocked for further use.

Parameters

feedback_msgActionSelection.Feedback

Feedback message from the action.

robot_indexint

Index of the robot for which feedback applies.

action_selection_get_result_callback(future: rclpy.task.Future, robot_index: int) None

Handle action selection results via callback.

Parameters

futureFuture

The future containing the action result.

robot_indexint

Index of the robot for which the result applies.

action_selection_goal_response_callback(future: rclpy.task.Future, robot_index: int) None

Handle goal responses from the action selection server via callback.

Parameters

futureFuture

The future representing the goal response.

robot_indexint

Index of the robot associated with this goal.

close() None

Clean up environment resources.

create_observable_predicate_dict(predicate_names: list[str], param_counts: list[int], param_names: list[str], param_types: list[str]) dict

Construct a dictionary describing observable predicates and their parameters.

This method takes parallel lists describing predicates and their parameters, and aggregates them into a nested dictionary for easier lookup.

Parameters

predicate_nameslist[str]

Names of the predicates.

param_countslist[int]

Number of parameters for each predicate.

param_nameslist[str]

Flat list of parameter names for all predicates.

param_typeslist[str]

Flat list of parameter types corresponding to each name.

Returns

dict

A mapping from predicate names to dictionaries of parameter names and types.

create_rl_action_space() str

Create a new RL action space.

Calls the create_rl_action_space service to obtain a list of all available actions.

Returns

str: The generated environment state as a serialized string.

create_rl_env_state() str

Create a new RL environment state.

Calls the CreateRLEnvState service to generate a new environment state for reinforcement learning execution.

Returns

str: The generated environment state as a serialized string.

exec_action_selection(request: cx_rl_interfaces.srv.ExecActionSelection.Request, response: cx_rl_interfaces.srv.ExecActionSelection.Response) cx_rl_interfaces.srv.ExecActionSelection.Response

Execute the RL-based action selection process.

This method receives a serialized environment state, converts it into an observation for the RL model, predicts the next action using the model, and fills the ROS 2 service response with the selected action ID.

Parameters

requestExecActionSelection.Request

The incoming request containing the serialized environment state and a list of executable actions.

responseExecActionSelection.Response

The response object to populate.

Returns

ExecActionSelection.Response

The populated response containing the selected action ID.

expand_grid(dictionary: dict) pandas.DataFrame

Generate a Cartesian product (grid) of all dictionary value combinations.

Each key in the input dictionary represents a column, and its list of values defines the possible entries.

Parameters

dictionarydict

A dictionary where each key maps to a list of possible values.

Returns

pandas.DataFrame

A DataFrame containing one row per value combination.

generate_observation_space() list[str]
get_action_list_executable() dict

Retrieve the list of all executable actions.

Calls the GetActionList service to obtain all available actions that can currently be executed by the system.

Returns

dict: A mapping of action names to their corresponding action IDs.

get_action_list_executable_for_robot(robot: str) dict

Retrieve the list of executable actions for a specific robot.

Calls the GetActionListRobot service for the given robot and waits for the result.

Parameters

robotstr

The robot identifier for which to retrieve executable actions.

Returns

dict

A mapping of action names to action IDs for the specified robot.

get_episode_end() tuple[bool, int]

Check whether the current RL episode has ended.

Calls the CheckForEpisodeEnd service to determine if the episode should terminate and obtain the corresponding reward.

Returns

Tuple[bool, int]: (episode_end, reward)
  • episode_end (bool): Whether the episode has ended.

  • reward (int): Reward for the transition.

get_free_robot() str

Retrieve a currently available robot.

Sends a goal to the GetFreeRobot action server and waits for the result.

Returns

str

The name or identifier of the free robot.

get_free_robot_cancel_done(future: rclpy.task.Future) None

Invoke via callback once a cancel request for GetFreeRobot is completed.

Parameters

futureFuture

The future representing the cancel result.

get_free_robot_feedback_callback(feedback_msg: cx_rl_interfaces.action.GetFreeRobot.Feedback) None

Handle feedback messages from the GetFreeRobot action via callback.

Parameters

feedback_msgGetFreeRobot.Feedback

The feedback message received from the action.

get_free_robot_get_result_callback(future: rclpy.task.Future) None

Handle the result of the GetFreeRobot action via callback.

Parameters

futureFuture

The future containing the action result.

get_free_robot_goal_response_callback(future: rclpy.task.Future) None

Handle goal responses from the GetFreeRobot action server via callback.

Parameters

futureFuture

The future representing the goal response.

get_observable_objects(obj_type: str) list[str]

Retrieve all observable objects of a given type.

Calls the GetObservableObjects service to get a list of currently observable objects matching the specified type.

Parameters

obj_typestr

The type of observable object (e.g., “robot”, “item”).

Returns

list[str]

List of observable object names.

get_observable_predicates() dict

Retrieve all observable predicates and their parameters.

Calls the GetObservablePredicates service and constructs a dictionary mapping predicate names to their parameter information.

Returns

dict: Dictionary where keys are predicate names and values describe parameter

counts, names, and types.

get_observation() numpy.typing.NDArray.numpy.float32

Generate the current RL observation vector.

Creates a new RL environment state using create_rl_env_state(), parses the returned fact string into Python objects, and converts them into a numerical observation vector suitable for the RL model.

Returns

numpy.ndarray

The observation vector as a NumPy array of type float32.

get_predefined_observables() list

Retrieve the list of predefined observables.

Calls the GetPredefinedObservables service to get predefined observables used for RL environment construction or monitoring.

Returns

list: A list of predefined observables.

get_state_from_facts(obs_facts) numpy.typing.NDArray.numpy.float32

Convert a set of observation facts into a binary state vector.

Each fact is matched against the internal observation dictionary. The corresponding index is set to 1.0 if the fact is present, 0.0 otherwise.

Parameters

obs_factsiterable

Collection of fact strings representing the current state.

Returns

numpy.ndarray

Binary state vector as a NumPy array of type float32.

render()

Render the environment (no-op).

This environment is symbolic and does not provide visualization.

reset(seed: int = None, options: dict[str, any] = None) tuple[numpy.typing.NDArray.numpy.float32, dict]

Reset the environment and return the initial observation.

Calls the /reset_cx action and queries /create_rl_env_state for the initial environment state.

Parameters

seedint, optional

Random seed for reproducibility.

optionsdict, optional

Additional reset parameters.

Returns

tuple
observationnp.ndarray

The initial observation.

infodict

Metadata about the reset process.

reset_cx() str

Reset the CLIPS executive (CX) node.

Sends a goal to the ResetCX action server and waits for completion.

Returns

str: Confirmation message from the CX node after reset.

reset_cx_cancel_done(future: rclpy.task.Future) None

Invoke once a cancel request for ResetCX is completed via callback.

Parameters

futureFuture

The future representing the cancel result.

reset_cx_feedback_callback(feedback_msg: cx_rl_interfaces.action.ResetCX.Feedback) None

Handle feedback messages from the ResetCX action via callback.

Parameters

feedback_msgResetCX.Feedback

The feedback message received from the action.

reset_cx_get_result_callback(future: rclpy.task.Future) None

Handle the result returned from the ResetCX action via callback.

Parameters

futureFuture

The future containing the action result.

reset_cx_goal_response_callback(future: rclpy.task.Future) None

Handle the goal response from the ResetCX action server via callback.

Parameters

futureFuture

The future object representing the goal response.

set_rl_mode(mode: str) str

Set the current reinforcement learning (RL) mode on the remote system.

This method calls the SetRLMode ROS 2 service and waits until a confirmation is received. It blocks until the service is ready and the asynchronous call completes.

Parameters

modestr

The RL mode to set (e.g., “train”, “eval”).

Returns

str

Confirmation message returned by the service.

set_rl_model(model) None

Attach an RL model for autonomous action selection.

Registers a service /exec_action_selection to let ROS query the model when actions are needed.

Parameters

modelobject

Trained RL model supporting predict(observation).

step(action: int) tuple[numpy.typing.NDArray.numpy.float32, int, bool, bool, dict]

Execute one environment step.

This sends an action to the CX system and returns the resulting observation, reward, and done state.

Parameters

actionint

Index of the chosen action.

Returns

tuple
observationnp.ndarray

The next state observation.

rewardfloat

Scalar reward.

terminatedbool

Whether the episode ended normally.

truncatedbool

Whether the episode ended early.

infodict

Additional debug information.

unpack_transmitted_action_list(action_list: list[str]) dict

Parse a list of encoded action strings into a dictionary mapping names to IDs.

Each action string is expected to have the format: "<action_id>|<action_name>".

Parameters

action_listlist[str]

List of encoded action strings.

Returns

dict

Mapping from action names to action IDs.