Source code for z_laser_zlp1.zlp_projector_manager

# Copyright (c) 2020, FADA-CATEC

# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at

#     http://www.apache.org/licenses/LICENSE-2.0

# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""This module imports all the other modules and manages the functionalities provided from a layer of abstraction, 
simplifying the task of developing advanced applications."""

from z_laser_zlp1.zlp_connections import ProjectorClient
from z_laser_zlp1.zlp_coordinate_system import CoordinateSystem
from z_laser_zlp1.zlp_projection_element import ProjectionElement
from z_laser_zlp1.zlp_keyboard import KeyboardControl
from z_laser_zlp1.zlp_utils import CoordinateSystemParameters, ProjectionElementParameters

from z_laser_msgs.msg import Figure

[docs]class ZLPProjectorManager(object): """This class envolves the methods from the modules imported. Args: projector_IP (str): IP number of projector device server_IP (str): IP number of service running at projector device connection_port (int): connection port number Attributes: projector_client (object): ProjectorClient object from utils library """
[docs] def __init__(self, projector_IP = "192.168.10.10", server_IP = "192.168.10.11", connection_port = 9090, lic_path=""): """Initialize the ZLPProjectorManager object.""" self.__projector_IP = projector_IP self.__server_IP = server_IP self.__connection_port = connection_port self.__license_path = lic_path self.__projector_id = "" self.__active_coord_sys = "" self.projector_client = ProjectorClient()
[docs] def connect_and_setup(self): """Prepare projector to be used: connect, load license and activate. Raises: SystemError """ try: self.client_server_connect() self.load_license(self.__license_path) self.activate() self.geotree_operator_create() except Exception as e: raise SystemError(e)
[docs] def client_server_connect(self): """Connect to thrift server of ZLP-Service. Raises: ConnectionError """ success,message = self.projector_client.connect(self.__server_IP, self.__connection_port) if not success: raise ConnectionError(message)
[docs] def client_server_disconnect(self): """Disconnect from thrift server of ZLP-Service. Raises: ConnectionError """ success,message = self.projector_client.disconnect() if success: self.__active_coord_sys = "" else: raise ConnectionError(message)
[docs] def get_connection_status(self): """Get connection status of projector device. Returns: bool: true if projector is connected, false otherwise """ status = self.projector_client.connection_status() return status
[docs] def load_license(self, license_path): """Transfer license file to service. Args: license_path (str): path of the license file Raises: FileNotFoundError """ success,message = self.projector_client.transfer_license(license_path) if not success: raise FileNotFoundError(message)
[docs] def activate(self): """Activate projector once it is connected and license is transferred. Raises: SystemError """ self.__projector_id,success,message = self.projector_client.activate_projector(self.__projector_IP) if not success: raise SystemError(message) success,message = self.projector_client.check_license() if not success: raise SystemError(message)
[docs] def deactivate(self): """Deactivate projector. Raises: SystemError """ success,message = self.projector_client.deactivate_projector() if not success: raise SystemError(message)
[docs] def geotree_operator_create(self): """Create geotree operator to handle reference systems and projection elements. Raises: SystemError """ module_id,success,message = self.projector_client.function_module_create() if not success: raise SystemError(message) thrift_client = self.projector_client.get_thrift_client() self.cs_element = CoordinateSystem(self.__projector_id, module_id, thrift_client) self.projection_element = ProjectionElement(module_id,thrift_client) self.keyboard_control = KeyboardControl(self.projector_client,self.projection_element)
[docs] def start_projection(self): """Start projection of elements associated to the active reference system. Raises: Warning SystemError """ if not self.__active_coord_sys: raise Warning("No Active Coordinate System set yet.") success,message = self.projector_client.start_project(self.__active_coord_sys) if not success: raise SystemError(message)
[docs] def stop_projection(self): """Stop projection of all elements. Raises: SystemError """ success,message = self.projector_client.stop_project() if not success: raise SystemError(message)
[docs] def get_coordinate_systems_list(self): """Get list of all defined reference systems. Raises: SystemError Returns: list: list of defined coordinate systems string: name of the active coordinate system if it exists """ cs_list,success,message = self.cs_element.coordinate_system_list() if not success: raise SystemError(message) return cs_list,self.__active_coord_sys
[docs] def get_coordinate_system_params(self, cs_name): """Get parameters values of a defined coordinate system. Args: cs_name (str): name of reference coordinate system Raises: SystemError """ cs_params,success,message = self.cs_element.get_cs(cs_name, CoordinateSystemParameters()) if not success: raise SystemError(message) return cs_params
[docs] def define_coordinate_system(self, cs_params, do_target_search): """Define a new coordinate reference system. Args: cs_params (list): list of necessary parameters to define a new reference system Raises: SystemError """ success,message,self.cs_scanned = self.cs_element.define_cs(cs_params, do_target_search) if not success: raise SystemError(message) elif success and do_target_search: success,message,_ = self.cs_element.define_cs(self.cs_scanned,False) try: success,message = self.cs_element.register_cs(cs_params.name) self.set_coordinate_system(cs_params.name) except SystemError as e: raise SystemError(e)
[docs] def set_coordinate_system(self, cs_name): """Set the active reference system. Args: cs_name (str): name of reference system Raises: SystemError """ success,message = self.cs_element.set_cs(cs_name) if success: self.__active_coord_sys = cs_name if not success: raise SystemError(message)
[docs] def show_coordinate_system(self): """Project the reference points of the active reference system on the projection surface. Raises: SystemError """ if not self.__active_coord_sys: message = "No active coordinate system set yet." raise SystemError(message) success,message = self.cs_element.show_cs(self.__active_coord_sys) if not success: raise SystemError(message)
[docs] def hide_coordinate_system(self): """Hide the reference points of the active reference system. Raises: SystemError """ if not self.__active_coord_sys: message = "Coordinate system does not exist." raise SystemError(message) success,message = self.cs_element.hide_cs(self.__active_coord_sys) if not success: raise SystemError(message)
[docs] def remove_coordinate_system(self, cs_name): """Delete a reference system. Args: cs_name (str): name of reference system Returns: bool: true if the reference system to remove is the active coordinate system, false otherwise Raises: SystemError """ success,message = self.cs_element.remove_cs(cs_name) if success: if cs_name == self.__active_coord_sys: self.__active_coord_sys = "" return True else: return False else: raise SystemError(message)
[docs] def create_polyline(self, proj_elem_params): """Create a polyline as new projection element, associated to the active reference system. Args: proj_elem_params (object): object with the necessary parameters to define a new polyline Raises: SystemError """ if not self.__active_coord_sys: message = "There is not an active coordinate system. Define or set one first." raise SystemError(message) success,message = self.projection_element.define_polyline(self.__active_coord_sys, proj_elem_params) if not success: raise SystemError(message)
[docs] def create_curve(self, proj_elem_params): """Create a curve (circle, oval, arc) as new projection element, associated to the active reference system. Args: proj_elem_params (object): object with the necessary parameters to define a new curve Raises: SystemError """ if not self.__active_coord_sys: message = "There is not an active coordinate system. Define or set one first." raise SystemError(message) if proj_elem_params.figure_type == 1: success,message = self.projection_element.define_circle(self.__active_coord_sys,proj_elem_params) elif proj_elem_params.figure_type == 2: success,message = self.projection_element.define_arc(self.__active_coord_sys,proj_elem_params) elif proj_elem_params.figure_type == 3: success,message = self.projection_element.define_oval(self.__active_coord_sys,proj_elem_params) else: message = "curve_type does not correspond to any possible figure" raise SystemError(message) if not success: raise SystemError(message)
[docs] def create_text(self, proj_elem_params): """Create a text as new projection element, associated to the active reference system. Args: proj_elem_params (object): object with the necessary parameters to define a new text Raises: SystemError """ if not self.__active_coord_sys: message = "There is not an active coordinate system. Define or set one first." raise SystemError(message) success,message = self.projection_element.define_text(self.__active_coord_sys,proj_elem_params) if not success: raise SystemError(message)
[docs] def get_proj_elem(self, params): """Get the parameters of a defined projection element. Args: params (object): object with the necessary parameters to identify a projection element Raises: SystemError """ proj_elem,success,message = self.projection_element.get_figure(params) if not success: raise SystemError(message) return proj_elem
[docs] def hide_proj_elem(self, params): """Hide a projection element from active reference system. Args: params (object): object with necessary parameters to identify a projection element Raises: SystemError """ success,message = self.projection_element.activate_figure(params,False) if not success: raise SystemError(message)
[docs] def unhide_proj_elem(self, params): """Unhide a projection element from active reference system. Args: params (object): object with necessary parameters to identify a projection element Raises: SystemError """ success,message = self.projection_element.activate_figure(params,True) if not success: raise SystemError(message)
[docs] def remove_proj_elem(self,params): """Delete a projection element from active reference system. Args: params (object): object with necessary parameters to identify a projection element Raises: SystemError """ success,message = self.projection_element.delete_figure(params) if not success: raise SystemError(message)
[docs] def monitor_proj_elem(self, params): """Monitor transformation operations (translation, rotation, scalation) to a specific projection element on real time projection by the use of keyboard. Args: params (object): object with necessary parameters to identify a projection element Raises: Warning SystemError """ if not self.__active_coord_sys: raise Warning("No Active Coordinate System set yet.") success,message = self.projector_client.start_project(self.__active_coord_sys) if success: success,message = self.keyboard_control.init_keyboard_listener(self.__active_coord_sys,params) if not success: raise SystemError(message)
[docs] def create_pointer(self, proj_elem_params): """Create a pointer to be scanned as new projection element, associated to the active reference system. Args: proj_elem_params (object): object with the necessary parameters to define a new pointer Raises: SystemError """ if not self.__active_coord_sys: message = "There is not an active coordinate system. Define or set one first." raise SystemError(message) success,message = self.projection_element.define_pointer(self.__active_coord_sys, proj_elem_params) if not success: raise SystemError(message)
[docs] def scan_pointer(self, reflection_callback=None): """Set callback for reflection state change and start pointers scanning. Args: reflection_callback (object): callback function Raises: SystemError """ if not self.__active_coord_sys: raise Warning("No Active Coordinate System set yet.") try: success,message = self.projector_client.scan_pointer(reflection_callback) if not success: raise SystemError(message) self.start_projection() except SystemError as e: raise SystemError(e)
[docs] def show_frame(self): """Project the origin axes and frame of the active reference system on the projection surface. Raises: SystemError """ if not self.__active_coord_sys: message = "Coordinate system does not exist." raise SystemError(message) try: self.cs_frame_unhide() self.cs_axes_unhide() self.start_projection() except SystemError as e: raise SystemError(e)
[docs] def hide_frame(self): """Hide the origin axes and frame of the active reference system. Raises: SystemError """ if not self.__active_coord_sys: message = "Coordinate system does not exist." raise SystemError(message) try: self.stop_projection() self.cs_frame_hide() self.cs_axes_hide() except SystemError as e: raise SystemError(e)
[docs] def cs_axes_create(self, cs_params): """Create the projection elements of the origin axes for the user reference system. Args: cs_params (object): object with the parameters of the new reference system defined Raises: SystemError """ success,message = self.projection_element.cs_axes_create(cs_params, ProjectionElementParameters()) if not success: raise SystemError(message) try: self.cs_axes_hide() except SystemError as e: raise SystemError(e)
[docs] def cs_frame_create(self, cs_params): """Create the projection element of the system frame for the user reference system. Args: cs_params (object): object with the parameters of the new reference system defined Raises: SystemError """ success,message = self.projection_element.cs_frame_create(cs_params.name, ProjectionElementParameters(), cs_params.T) if not success: raise SystemError(message) try: self.cs_frame_hide() except SystemError as e: raise SystemError(e)
[docs] def cs_axes_unhide(self): """Unhide user reference system origin axes. Raises: SystemError """ proj_elem_params = ProjectionElementParameters() proj_elem_params.projection_group = self.__active_coord_sys + "_origin" proj_elem_params.figure_type = Figure.POLYLINE try: for axis_id in ["axis_x","axis_y"]: proj_elem_params.figure_name = axis_id self.unhide_proj_elem(proj_elem_params) except SystemError as e: raise SystemError(e)
[docs] def cs_axes_hide(self): """Hide user reference system origin axes. Raises: SystemError """ proj_elem_params = ProjectionElementParameters() proj_elem_params.projection_group = self.__active_coord_sys + "_origin" proj_elem_params.figure_type = Figure.POLYLINE try: for axis_id in ["axis_x","axis_y"]: proj_elem_params.figure_name = axis_id self.hide_proj_elem(proj_elem_params) except SystemError as e: raise SystemError(e)
[docs] def cs_frame_unhide(self): """Unhide frame of user reference system. Raises: SystemError """ proj_elem_params = ProjectionElementParameters() proj_elem_params.projection_group = self.__active_coord_sys + "_origin" proj_elem_params.figure_type = Figure.POLYLINE try: proj_elem_params.figure_name = "frame" self.unhide_proj_elem(proj_elem_params) except SystemError as e: raise SystemError(e)
[docs] def cs_frame_hide(self): """Hide frame of user reference system. Raises: SystemError """ proj_elem_params = ProjectionElementParameters() proj_elem_params.projection_group = self.__active_coord_sys + "_origin" proj_elem_params.figure_type = Figure.POLYLINE try: proj_elem_params.figure_name = "frame" self.hide_proj_elem(proj_elem_params) except SystemError as e: raise SystemError(e)
@property def projector_IP(self): """Get or set the projector IP address.""" return self.__projector_IP @projector_IP.setter def projector_IP(self, IP_address): self.__projector_IP = IP_address @property def server_IP(self): """Get or set IP address of server running at projector device.""" return self.__server_IP @server_IP.setter def server_IP(self, IP_address): self.__server_IP = IP_address @property def connection_port(self): """Get or set connection port number.""" return self.__connection_port @connection_port.setter def connection_port(self, port): self.__connection_port = port @property def license_path(self): """Get or set license file path.""" return self.__license_path @license_path.setter def license_path(self, path): self.__license_path = path