Source code for z_laser_zlp1.zlp_connections

# Copyright (c) 2020, FADA-CATEC

# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at

#     http://www.apache.org/licenses/LICENSE-2.0

# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""Helper module for python thrift interface to ZLP Service. 
This module contains utility classes and methods which ease the usage of the thrift interface to 
communicate with the ZLP Service."""

import os
import sys
import time
import socket
import copy
import threading 

import thriftpy
from thriftpy.protocol import TBinaryProtocolFactory
from thriftpy.server import TThreadedServer, TSimpleServer
from thriftpy.thrift import TProcessor, TClient
from thriftpy.transport import TBufferedTransportFactory, TServerSocket, TSocket

[docs]class EventChannelInterfaceHandler(object): """This class implement the functions of ClientEventChannel thrift interface. Attributes: property_changed_callback (object): callback to handle settings changes on laser projector system geo_tree_changed_callback (object): callback to handle changes on geotree operator service_state_changed_callback (object): callback to handle changes on services state function_module_changed_callback (object): callback to handle changes on function module state rc_command_received_callback (object): callback to handle remote control commands reception on_reflection_state_changed_callback (object): callback to handle changes on reflection state """
[docs] def __init__(self): """Initialize the EventChannelInterfaceHandler object.""" self.property_changed_callback = lambda x, y: None self.geo_tree_changed_callback = lambda x, y: None self.service_state_changed_callback = lambda x, y: None self.function_module_changed_callback = lambda x, y, z: None self.rc_command_received_callback = lambda a, b, c, d, e: None self.on_reflection_state_changed_callback = lambda a, b: None
[docs] def PropertyChanged(self, name, value): """Set callback function to handle settings changes on laser projector system. Args: name (str): full path of property that was changed value (int): value of property """ self.property_changed_callback(name, value)
[docs] def GeoTreeChanged(self, changed_flags, element_names): """Set callback function to handle changes on geotree operator. Args: changed_flags (int): integer value with flags of type GeoTreeChangedFlags element_names (enum): identification of changed element (within the GeoTreeElemId enumeration ) """ self.geo_tree_changed_callback(changed_flags, element_names)
[docs] def ServiceStateChanged(self, oldState, newState): """Set callback function to handle changes on services state. Args: oldState (enum): old state (within the ServiceStates enumeration) before change newState (enum): new state (within the ServiceStates enumeration) after change """ self.service_state_changed_callback(oldState, newState)
[docs] def FunctionModuleStateChanged(self, functionModID, oldState, newState): """Set callback function to handle changes on function module state. Args: functionModID (str): identificator name of function module oldState (enum): old state (within the FunctionModuleStates enumeration) before change newState (enum): new state (within the FunctionModuleStates enumeration) after change """ self.function_module_changed_callback(functionModID, oldState, newState)
[docs] def RemoteControlFrameReceived(self, rc_id, command, toggle, projector, timestamp): """Set callback function to handle remote control commands reception. Args: rc_id (str): address of RC-device command (enum): enum with command codes for remotecontrol functions toggle (bool): toggle function active projector (str): serial number of the projector timestamp (int): timestamp """ self.rc_command_received_callback(rc_id, command, toggle, projector, timestamp)
[docs] def onReflectionStateChanged(self, elementName, state): """Set callback function to handle changes on reflection state. Args: elementName (str): name of the element that changed state state (bool): true if a reflection was detected; False otherwise """ self.on_reflection_state_changed_callback(elementName, state)
[docs]class ThriftClient(TClient): """This class implement the functions to carry out the connection with the ZLP Service. Args: event_handler (object): object with functions of ClientEventChannel thrift interface Attributes: thrift_interface (obj): load the interface description file (interface.thrift) for the communication between ZLP-Service and a remote client """
[docs] def __init__(self, event_handler=EventChannelInterfaceHandler()): """Initialize the ThriftClient object.""" self._event_channel = None self._event_channel_handler = event_handler _interface_file = os.path.join(os.path.dirname(os.path.abspath(__file__)), "interface.thrift") self.thrift_interface = thriftpy.load(_interface_file, module_name="zlaser_thrift")
[docs] def init_client(self, ip, port): """Establish a connection to thrift server of ZLP Service. Init client opening sockets and init events handler. Args: ip (str): ipv6 network address of ZLP-Service port (str): port number on which ZLP-Service listens for requests """ client_socket = TSocket(ip, port, socket_family=socket.AF_INET, socket_timeout=50000) transport = TBufferedTransportFactory().get_transport(client_socket) protocol = TBinaryProtocolFactory().get_protocol(transport) transport.open() super().__init__(self.thrift_interface.ServiceInterface, protocol)
[docs] def init_event_channel(self): """Create a thrift server and register it at ZLP Service to receive events.""" if self._event_channel_handler and not self._event_channel: processor = TProcessor(self.thrift_interface.ClientEventChannel, self._event_channel_handler) server_socket = TServerSocket(host="0.0.0.0", port=0, socket_family=socket.AF_INET, client_timeout=200000) server_socket.client_timeout = 1000*60*10 self._event_channel = TSimpleServer(processor, server_socket) t = threading.Thread(target=self._event_channel.serve, daemon=True) t.start() time.sleep(1) connection = self._event_channel.trans.sock.getsockname() self.ConnectClientEventChannel(connection[1])
[docs] def set_property_changed_callback(self, callback): """Set callback function related with laser projector settings changes. Args: callback (object): callback function to set Raises: ValueError """ if self._event_channel_handler: self._event_channel_handler.property_changed_callback = callback else: raise ValueError("Error: Can't install callback, because event_handler = none!")
[docs] def set_geotree_changed_callback(self, callback): """Set callback function related with geotree operator changes. Args: callback (object): callback function to set Raises: ValueError """ if self._event_channel_handler: self._event_channel_handler.geo_tree_changed_callback = callback else: raise ValueError("Error: Can't install callback, because event_handler = none!")
[docs] def set_function_module_state_changed_callback(self, callback): """Set callback function related with function module state changes. Args: callback (object): callback function to set Raises: ValueError """ if self._event_channel_handler: self._event_channel_handler.function_module_changed_callback = callback else: raise ValueError("Error: Can't install callback, because event_handler = none!")
[docs] def set_rc_command_received_callback(self, callback): """Set callback function related with remote control commands reception. Args: callback (object): callback function to set Raises: ValueError """ if self._event_channel_handler: self._event_channel_handler.rc_command_received_callback = callback else: raise ValueError("Error: Can't install callback, because event_handler = none!")
[docs] def set_reflection_state_changed_callback(self, callback): """Set callback function related with reflection state changes. Args: callback (object): callback function to set Raises: ValueError """ if self._event_channel_handler: self._event_channel_handler.on_reflection_state_changed_callback = callback else: raise ValueError("Error: Can't install callback, because event_handler = none!")
[docs]class ProjectorClient(object): """This class implements the functions for connecting to the projector and basic projection features. Attributes: projector_id (str): serial number of the projector module_id (str): function module identification name """
[docs] def __init__(self): """Initialize the ProjectorClient object.""" self.projector_id = "" self.module_id = "" self.__thrift_client = ThriftClient() self.cv = threading.Condition()
[docs] def get_thrift_client(self): """Return the object generated to communicate with the projector. Returns: object: thrift client object generated to communicate with the projector """ try: return self.__thrift_client except Exception as e: return e
[docs] def connect(self,server_IP,connection_port): """Create and connect the client to thrift server (located at projector) of ZLP-Service and establish an event channel if needed. Args: server_IP (str): ipv6 network address of ZLP-Service connection_port (str): port number on which ZLP-Service listens for requests Returns: tuple[bool, str]: the first value in the returned tuple is a bool success value and the second value in the tuple is an information message string """ try: if not self.__thrift_client._event_channel: self.__thrift_client.init_client(server_IP, connection_port) self.__thrift_client.init_event_channel() success = True message = "Client connected" else: success = False message = "Projector already connected" except Exception as e: success = False message = e return success,message
[docs] def disconnect(self): """Disconnect from ZLP Service thrift server and close own event server. Returns: tuple[bool, str]: the first value in the returned tuple is a bool success value and the second value in the tuple is an information message string """ try: self.__thrift_client.RemoveGeoTreeElem("") self.__thrift_client.FunctionModuleRelease(self.module_id) self.__thrift_client.DisconnectClientEventChannel() self.__thrift_client.close() if self.__thrift_client._event_channel: self.__thrift_client._event_channel.close() self.__thrift_client._event_channel = None success = True message = "Projector disconnected" except Exception as e: success = False message = e return success,message
[docs] def connection_status(self): """Get status of projection connection. Returns: bool: status of the event channel object. Projector connected if true, disconnected otherwise """ return self.__thrift_client._event_channel
[docs] def transfer_license(self, lic_path): """Transfer data of the local license file to remote file at ZLP-Service. Args: lic_path (str): license file path Returns: tuple[bool, str]: the first value in the returned tuple is a bool success value and the second value in the tuple is an information message string """ try: license_path = os.path.abspath(lic_path) license_file = os.path.basename(license_path) content = open(license_path, 'r').read() self.__thrift_client.TransferDataToFile(content, license_file, True) self.__thrift_client.LoadLicense(license_file) success = True message = "License transfered." except self.__thrift_client.thrift_interface.CantWriteFile as e: success = False message = e except FileNotFoundError as e: success = False message = e except Exception as e: success = False message = e return success,message
[docs] def check_license(self): """Check if license is valid. Returns: tuple[bool, str]: the first value in the returned tuple is a bool success value and the second value in the tuple is an information message string """ try: success = self.__thrift_client.CheckLicense() if success: message = "License is valid" else: message = "License is not valid" except Exception as e: success = False message = e return success,message
[docs] def scan_projectors(self, scan_addresses=""): """Scan the network for projectors. Get a list of active projectors. Args: scan_addresses (str): addresses or address to scan Returns: tuple[list, bool, str]: the first value in the returned tuple is a list of serial numbers of the projectors found, the second a bool success value and the third value in the tuple is an information message string """ try: self.__thrift_client.SetProperty("config.projectorManager.cmdGetProjectors.scan", "1") self.__thrift_client.SetProperty("config.projectorManager.cmdGetProjectors.scanAddresses", scan_addresses) self.__thrift_client.SetProperty("config.projectorManager.cmdGetProjectors", "1") serial_list = self.__thrift_client.GetProperty("config.projectorManager.cmdGetProjectors.result.entries") self.__thrift_client.SetProperty("config.projectorManager.cmdGetProjectors.scan", "0") if serial_list: serial_list = serial_list.split(" ") success = True message = "" else: serial_list = [] success = False message = "No projectors found" except Exception as e: serial_list = [] success = False message = e return serial_list,success,message
[docs] def property_changed_callback(self, prop, value): """Callback function related with laser projector settings changes. Args: prop (str): full path of property that was changed value (int): value of property """ self.cv.acquire() self.cv.notify() self.cv.release()
[docs] def activate_projector(self,projector_IP): """Set properties to activate a projector. Args: projector_IP (str): address of the projector to scan Returns: tuple[str, bool, str]: the first value in the returned tuple is the serial number string of the activated projector, the second a bool success value and the third value in the tuple is an information message string """ try: projectors, success, message = self.scan_projectors(projector_IP) if success: self.__thrift_client.set_property_changed_callback(self.property_changed_callback) self.__thrift_client.RegisterForChangedProperty("config.licenseState.IsValid") self.cv.acquire() self.projector_id = projectors[0] self.__thrift_client.SetProperty("config.projectorManager.cmdActivateProjector.serial", self.projector_id) self.__thrift_client.SetProperty("config.projectorManager.cmdActivateProjector.active", "1") self.__thrift_client.SetProperty("config.projectorManager.cmdActivateProjector", "1") self.cv.wait() self.cv.release() message = "Projector activated" except Exception as e: success = False message = e return self.projector_id,success,message
[docs] def deactivate_projector(self): """Set properties to deactivate a projector. Returns: tuple[bool, str]: the first value in the returned tuple is a bool success value and the second value in the tuple is an information message string """ try: projector_property_path = "config.projectorManager.projectors." + self.projector_id self.__thrift_client.SetProperty(projector_property_path + ".cmdShowProjection.show", "0") self.__thrift_client.SetProperty(projector_property_path + ".cmdShowProjection", "1") self.__thrift_client.SetProperty("config.projectorManager.cmdActivateProjector.serial", self.projector_id) self.__thrift_client.SetProperty("config.projectorManager.cmdActivateProjector.active", "0") self.__thrift_client.SetProperty("config.projectorManager.cmdActivateProjector", "1") success = True message = "Projector deactivated:" + self.projector_id except Exception as e: success = False message = e return success,message
[docs] def function_module_create(self): """Create function module to operate with GeoTreeElements (coordinate systems and projection elements). Returns: tuple[str, bool, str]: the first value in the returned tuple is the function module identification name string, the second is a bool success value and the third value in the tuple is an information message string """ try: self.module_id = self.__thrift_client.FunctionModuleCreate("zFunctModRegister3d", "3DReg") success = True message = "Function module created" except Exception as e: success = False message = e return self.module_id,success,message
[docs] def start_project(self, cs_name): """Start projection on the surface of all projection elements that belong to the active coordinate system. Args: cs_name (str): name of the active coordinate system Returns: tuple[bool, str]: the first value in the returned tuple is a bool success value and the second value in the tuple is an information message string """ try: if not cs_name: success = False message = "None Coordinate System set" if not self.__thrift_client.GetGeoTreeElement(cs_name).activated: success = False message = "Coordinate_system is not activated" if self.is_empty(cs_name): success = False message = "Nothing to project" else: self.__thrift_client.TriggerProjection() success = True message = "Projecting elements from [" + cs_name + "] coordinate system." except Exception as e: success = False message = e return success,message
[docs] def stop_project(self): """Stop projection of all elements. Returns: tuple[bool, str]: the first value in the returned tuple is a bool success value and the second value in the tuple is an information message string """ try: projector_property_path = "config.projectorManager.projectors." + self.projector_id self.__thrift_client.SetProperty(projector_property_path + ".cmdShowProjection.show", "0") self.__thrift_client.SetProperty(projector_property_path + ".cmdShowProjection", "1") success = True message = "Projection stopped" except Exception as e: success = False message = e return success,message
[docs] def update_project(self,cs_name): """Update changes on figures projected (restart projection). Args: cs_name (str): name of the coordinate system to update Returns: tuple[bool, str]: the first value in the returned tuple is a bool success value and the second value in the tuple is an information message string """ try: self.stop_project() self.start_project(cs_name) success = True message = "Projection updated." except Exception as e: success = False message = e return success,message
[docs] def is_empty(self, cs_name): """Check if coordinate system has associated projection elements. Args: cs_name (str): name of the coordinate system to check Returns: bool: true if there is any projection element defined at the coordinate system, false otherwise """ is_empty = False try: geo_tree_list = self.__thrift_client.GetGeoTreeIds() # elemType = 1024, 1025, 1026, 1027 refers to projection element identificator at the projector device matches = [elem.name for elem in geo_tree_list if elem.elemType in (1024,1025,1026,1027)] proj_elems = [self.__thrift_client.GetProjectionElement(name) for name in matches] for proj_elem in proj_elems: if proj_elem.activated == True and proj_elem.coordinateSystemList[0] == cs_name: proj_elems_actives = proj_elem if not proj_elems_actives: is_empty = True except Exception: is_empty = True return is_empty
[docs] def on_reflection_change(self, name, reflection): """Default callback for reflection state change, events handler. It is used for running code when pointer is reflected. Args: name (str): name of the pointer that changed state reflection (bool): true if a reflection was detected; False otherwise """ self.stop_project()
[docs] def scan_pointer(self, reflection_callback=None): """Set callback for reflection state change. Args: reflection_callback (object): callback function Returns: tuple[bool, str]: the first value in the returned tuple is a bool success value and the second value in the tuple is an information message string """ try: if reflection_callback is None: # use default reflection_callback = self.on_reflection_change self.__thrift_client.set_reflection_state_changed_callback(reflection_callback) success = True message = "Reflection callback set." except Exception as e: success = False message = e return success,message