Source code for z_laser_viz.zlp_viz

# Copyright (c) 2020, FADA-CATEC

# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at

#     http://www.apache.org/licenses/LICENSE-2.0

# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""This module contains utility classes and methods to run a projections visualizer."""

import rospy
import math
import numpy as np
from math import sin, cos, pi, radians
from scipy.spatial.transform import Rotation

from z_laser_zlp1.zlp_keyboard import KeyboardParameters
from z_laser_zlp1.zlp_utils import CoordinateSystemParameters, ProjectionElementParameters

from geometry_msgs.msg import Point, Quaternion, Vector3, Pose, Quaternion
from visualization_msgs.msg import Marker, MarkerArray
from std_srvs.srv import Trigger, TriggerResponse
from z_laser_msgs.msg import Figure
from z_laser_msgs.srv import CoordinateSystem, CoordinateSystemResponse
from z_laser_msgs.srv import CoordinateSystemName, CoordinateSystemNameResponse
from z_laser_msgs.srv import CoordinateSystemShow, CoordinateSystemShowResponse
from z_laser_msgs.srv import CoordinateSystemList, CoordinateSystemListResponse
from z_laser_msgs.srv import ProjectionElement, ProjectionElementResponse

[docs]class ZLPVisualizer(object): """This class implement the functions related with projection elements. Attributes: cs_marker_array (list): coordinate systems' markers list (origin axes and frame of each system) pe_marker_array (list): markers list of projection elements active_cs (str): name of active reference system cs_reference (str): auxiliar variable to differentiate and find the origin axes and frames markers STD_WAIT_TIME (int): predefined number of projection seconds in reference system definition figures_list (list): list with the figures' identificator names """
[docs] def __init__(self): """Initialize the ZLPVisualizer object.""" self.cs_marker_array = MarkerArray() self.pe_marker_array = MarkerArray() self.active_cs = "" self.cs_reference = "" self.STD_WAIT_TIME = CoordinateSystemParameters().DEFAULT_SHOW_TIME self.figures_list = ProjectionElementParameters().figures_list self.scale_factor = 1
[docs] def open_services(self): """Open ROS services for visualizer.""" self.start_proj = rospy.Service('projection_start', Trigger, self.projection_start_cb) self.stop_proj = rospy.Service('projection_stop', Trigger, self.projection_stop_cb) self.manual_cs = rospy.Service('define_coordinate_system', CoordinateSystem, self.manual_define_coord_sys_cb) self.set_cs = rospy.Service('set_coordinate_system', CoordinateSystemName, self.set_coord_sys_cb) self.rem_cs = rospy.Service('remove_coordinate_system', CoordinateSystemName, self.remove_coord_sys_cb) self.show_cs = rospy.Service('show_active_coordinate_system', CoordinateSystemShow, self.show_coord_sys_cb) self.hide_proj_elem = rospy.Service('hide_projection_element', ProjectionElement, self.hide_proj_elem_cb) self.unhide_proj_elem = rospy.Service('unhide_projection_element', ProjectionElement, self.unhide_proj_elem_cb) self.remove_proj_elem = rospy.Service('remove_projection_element', ProjectionElement, self.remove_proj_elem_cb) self.add_proj_elem = rospy.Subscriber("add_projection_element", Figure, self.add_fig_cb) self.monit_proj_elem = rospy.Subscriber("monitor_projection_element", Figure, self.init_keyboard_listener_cb)
[docs] def projection_start_cb(self, req): """Callback of ROS service to start projection of elements related to the active reference system on the surface. Args: req (object): trigger request ROS service object Returns: tuple[bool, str]: the first value in the returned tuple is a bool success value and the second value in the tuple is an information message string """ if not self.active_cs: return TriggerResponse(False, "No Coordinate System set as active.") for i, marker in enumerate(self.pe_marker_array.markers): if marker.ns.find(self.active_cs)>-1: self.pe_marker_array.markers[i].action = Marker.ADD return TriggerResponse(True, "Projection started.")
[docs] def projection_stop_cb(self, req): """Callback of ROS service to stop projection of all elements. Args: req (object): trigger request ROS service object Returns: tuple[bool, str]: the first value in the returned tuple is a bool success value and the second value in the tuple is an information message string """ for i in range(len(self.pe_marker_array.markers)): self.pe_marker_array.markers[i].action = Marker.DELETE return TriggerResponse(True, "Projection stopped.")
[docs] def manual_define_coord_sys_cb(self, req): """Callback of ROS service to define a new reference system, stating the points coordinates manually by the user. Args: req (object): object with the necessary info to define a new coordinate system Returns: tuple[list, bool, str]: the first value in the returned tuple is a list of the user reference points T0, T1, T2, T3, the second is a bool success value and the third s an information message string """ for marker in self.cs_marker_array.markers: if req.name in marker.ns: return CoordinateSystemResponse([], False, "Coordinate System already exists.") self.active_cs = req.name axis_x_marker, axis_y_marker = self.coord_sys_axes(req) self.cs_marker_array.markers.append(axis_x_marker) self.cs_marker_array.markers.append(axis_y_marker) self.cs_marker_array.markers.append(self.coord_sys_frame(req)) self.cs_reference = "_origin" self.timer_secs = self.STD_WAIT_TIME self.update_cs_markers() return CoordinateSystemResponse([], True, "Coordinate System added manually.")
[docs] def timer_cb(self, timer): """Timer for controlling the projection pause between the reference systems's different markers.""" for i in range(len(self.cs_marker_array.markers)): self.cs_marker_array.markers[i].action = Marker.DELETE self.update_cs_markers()
[docs] def update_cs_markers(self): """Change projection between origin axes and frame markers.""" for marker in self.cs_marker_array.markers: if (self.active_cs + self.cs_reference) in marker.ns: marker.action = Marker.ADD if self.cs_reference in ["_origin","_frame"]: rospy.Timer(rospy.Duration(self.timer_secs), self.timer_cb, oneshot=True) self.cs_reference = "_frame" if self.cs_reference == "_origin" else "empty"
[docs] def base_marker(self, cs_name): """Initialize the common and basic parameters of a marker. Args: cs_name (object): name of the reference system with which the marker is associated Returns: object: marker initialized """ # define marker common fields marker = Marker() marker.type = Marker.LINE_STRIP marker.action = Marker.DELETE marker.scale.x = 0.01 # Vector3(0.01, 0.01, 0) marker.color.g = 1.0 marker.color.a = 1.0 marker.header.frame_id = cs_name marker.pose.orientation = Quaternion(0,0,0,1) return marker
[docs] def coord_sys_axes(self, cs_points): """Create the origin axes markers. Args: cs_points (object): object with the x,y,z position of the reference points from the reference system Returns: tuple[object, object]: the first value in the returned tuple is the x-axis marker and the second is the y-axis marker """ # read axes points orig = Point() # axes origin point orig.x = cs_points.P[0].x * 0.001 orig.y = cs_points.P[0].y * 0.001 axis_x = Point() # axis x line end point axis_x.x = cs_points.P[1].x * 0.001 axis_x.y = orig.y axis_y = Point() # axis y line end point axis_y.x = orig.x axis_y.y = cs_points.P[2].y * 0.001 # create one marker for each axis line # and append the correspondent points axis_x_marker = self.base_marker("[P]") axis_y_marker = self.base_marker("[P]") axis_x_marker.points.append(orig) axis_x_marker.points.append(axis_x) axis_y_marker.points.append(orig) axis_y_marker.points.append(axis_y) # update frame and namespace axis_x_marker.ns = cs_points.name + "_origin/polyline/axis_x" axis_y_marker.ns = cs_points.name + "_origin/polyline/axis_y" return axis_x_marker, axis_y_marker
[docs] def coord_sys_frame(self, cs_points): """Create the frame marker. Args: cs_points (object): object with the x,y,z position of the reference points from the reference system Returns: object: frame marker """ frame = self.base_marker("[P]") # read frame points for i in [0,1,2,3,0]: point = Point() point.x = cs_points.P[i].x * 0.001 point.y = cs_points.P[i].y * 0.001 frame.points.append(point) frame.ns = cs_points.name + "_frame/polyline/T1_T2_T3_T4" return frame
[docs] def set_coord_sys_cb(self, req): """Callback of ROS service to set the indicated reference system as 'active reference system'. Args: req (object): object with the necessary parameters to identify a coordinate system Returns: tuple[bool, str]: the first value in the returned tuple is a bool success value and the second value in the tuple is an information message string """ self.active_cs = req.name return CoordinateSystemNameResponse(True, "Coordinate System set as active.")
[docs] def show_coord_sys_cb(self, req): """Callback of ROS service to project reference points, origin axes and frame of the active reference system. Args: req (object): object with the necessary parameters to identify a reference system Returns: tuple[bool, str]: the first value in the returned tuple is a bool success value and the second value in the tuple is an information message string """ if not self.active_cs: return CoordinateSystemShowResponse(False, "None Coordinate System is set.") if not req.secs > 0: return CoordinateSystemShowResponse(False, "Seconds projection is set to 0.") self.timer_secs = req.secs self.cs_reference = "_origin" self.update_cs_markers() return CoordinateSystemShowResponse(True, "Active Coordinate System showed correctly.")
[docs] def remove_coord_sys_cb(self, req): """Callback of ROS service to remove a reference system. Args: req (object): object with the necessary parameters to identify a reference system Returns: tuple[bool, str]: the first value in the returned tuple is a bool success value and the second value in the tuple is an information message string """ if any(req.name in cs.ns for cs in self.cs_marker_array.markers): self.cs_marker_array.markers = [cs for cs in self.cs_marker_array.markers if cs.ns.find(req.name)==-1] self.pe_marker_array.markers = [pe for pe in self.pe_marker_array.markers if pe.ns.find(req.name)==-1] if req.name == self.active_cs: self.active_cs = "" return CoordinateSystemNameResponse(True, "Coordinate System removed.") else: return CoordinateSystemNameResponse(False, "Coordinate System does not exist.")
[docs] def add_fig_cb(self, msg): """Callback of ROS topic to define a new projection element. Args: msg (object): object with the necessary parameters to define a new projection element """ # define marker common fields marker = self.base_marker(self.active_cs) step = self.compute_step() marker.pose.position.x = msg.position.x * step marker.pose.position.y = msg.position.y * step if msg.figure_type == Figure.POLYLINE: length = msg.size[0] * step angle = radians(msg.angle[0]) # middle point for polyline () marker.pose.position.x += length/2*cos(angle) marker.pose.position.y += length/2*sin(angle) figure = self.line_eq(length, angle) elif msg.figure_type == Figure.CIRCLE: radius = msg.size[0] * step figure = self.circle_eq(radius, 0.0, 2*pi) elif msg.figure_type == Figure.ARC: radius = msg.size[0] * step start_angle = radians(msg.angle[0]) end_angle = radians(msg.angle[1]) figure = self.circle_eq(radius, start_angle, end_angle) elif msg.figure_type == Figure.OVAL: wide_size = msg.size[0] * step height_size = msg.size[1] * step angle = radians(msg.angle[0]) figure = self.oval_eq(wide_size, height_size, angle) elif msg.figure_type == Figure.TEXT: angle = radians(msg.angle[0]) marker.type = Marker.TEXT_VIEW_FACING marker.scale.z = msg.size[0] * step # overwrite some marker fields for text rotation = Rotation.from_euler('xyz', [0, 0, angle], degrees=False) marker.pose.orientation = Quaternion(*rotation.as_quat()) marker.text = msg.text if msg.figure_type != Figure.TEXT: marker.points = figure marker.ns = self.active_cs+ "/" + msg.projection_group + self.figures_list[msg.figure_type] + msg.figure_name self.pe_marker_array.markers.append(marker)
[docs] def line_eq(self, length, ang): """Calculate points array of a new line from its parametrics equation. Args: length (float): line length ang (float): line angle slope Returns: list: list of calculated points """ line_points = [] delta_th = 0.01 for th in np.arange(-length/2, length/2, delta_th): point = Point() point.x = th * cos(ang) point.y = th * sin(ang) line_points.append(point) return line_points
[docs] def circle_eq(self, radius, start_ang, end_ang): """Calculate points array of a new circle or arc from its parametrics equation. Args: radius (float): circle or arc radius start_ang (float): arc start angle end_ang (float): arc end angle Returns: list: list of calculated points """ circle_points = [] delta_th = 0.01 for th in np.arange(start_ang, end_ang, delta_th): point = Point() point.x = radius * sin(th) point.y = radius * cos(th) circle_points.append(point) return circle_points
[docs] def oval_eq(self, a, b, angle): """Calculate points array of a new ellipse from its parametrics equation. Args: a (float): ellipse width b (float): ellipse height angle (float): rotation angle Returns: list: list of calculated points """ oval_points = [] delta_th = 0.01 for th in np.arange(0.0, 2*pi+delta_th, delta_th): point = Point() point.x = a * cos(th)*cos(angle) - b * sin(th)*sin(angle) point.y = a * cos(th)*sin(angle) + b * sin(th)*cos(angle) oval_points.append(point) return oval_points
[docs] def hide_proj_elem_cb(self, req): """Callback of ROS service to hide specific projection element from active reference system. Args: req (object): object with the necessary parameters to identify a projection element Returns: tuple[bool, str]: the first value in the returned tuple is a bool success value and the second value in the tuple is an information message string """ for i, marker in enumerate(self.pe_marker_array.markers): if marker.ns.find(req.projection_group)>-1 and marker.ns.find(req.figure_name)>-1: self.pe_marker_array.markers[i].color.a = 0 return ProjectionElementResponse(True, "Figure hidden correctly.") return ProjectionElementResponse(False, "Figure not found.")
[docs] def unhide_proj_elem_cb(self, req): """Callback of ROS service to unhide specific projection element from active reference system. Args: req (object): object with the necessary parameters to identify a projection element Returns: tuple[bool, str]: the first value in the returned tuple is a bool success value and the second value in the tuple is an information message string """ for i, marker in enumerate(self.pe_marker_array.markers): if marker.ns.find(req.projection_group)>-1 and marker.ns.find(req.figure_name)>-1: self.pe_marker_array.markers[i].color.a = 1 return ProjectionElementResponse(True, "Figure unhidden correctly.") return ProjectionElementResponse(False, "Figure not found.")
[docs] def remove_proj_elem_cb(self, req): """Callback of ROS service to remove specific figure from active reference system. Args: req (object): object with the necessary parameters to identify a projection element Returns: tuple[bool, str]: the first value in the returned tuple is a bool success value and the second value in the tuple is an information message string """ for i, marker in enumerate(self.pe_marker_array.markers): if marker.ns.find(req.projection_group)>-1 and marker.ns.find(req.figure_name)>-1: self.pe_marker_array.markers.pop(i) return ProjectionElementResponse(True, "Figure removed correctly.") return ProjectionElementResponse(False, "Figure not found.")
[docs] def translate(self, marker, dx=0, dy=0, dz=0): """Translate marker from one position to another. Args: marker (object): marker object to translate dx (float): offset in x direction dy (float): offset in y direction dz (float): offset in z direction """ marker.action = Marker.DELETE marker.pose.position.x += dx marker.pose.position.y += dy marker.pose.position.z += dz marker.action = Marker.ADD
[docs] def compute_step(self): """Calculate the resolution step of the active reference system. Returns: float: resolution step (real dimension system {P} in mm / user dimension system {T}) """ res = rospy.get_param('/zlaser/coordinate_system_resolution', 1000) P0_x = rospy.get_param('/zlaser/P0/x', 1000) * 0.001 P1_x = rospy.get_param('/zlaser/P1/x', 1000) * 0.001 step = (P1_x - P0_x)/res return step
[docs] def rotate(self, marker, angle): """Rotate marker an angle. Args: marker (object): marker object to rotate angle (float): rotation angle [degrees] """ marker.action = Marker.DELETE q = marker.pose.orientation rotation = Rotation.from_euler('xyz', [0, 0, angle], degrees=True) q_rot = Quaternion(*rotation.as_quat()) marker.pose.orientation = self.quat_multiply(q_rot, q) marker.action = Marker.ADD
[docs] def quat_multiply(self, q1, q0): """Calculate the product of two quaternions. Returns: object: object with the x,y,z,w values of the result quaternion """ return Quaternion( q1.x*q0.w + q1.y*q0.z - q1.z*q0.y + q1.w*q0.x, -q1.x*q0.z + q1.y*q0.w + q1.z*q0.x + q1.w*q0.y, q1.x*q0.y - q1.y*q0.x + q1.z*q0.w + q1.w*q0.z, -q1.x*q0.x - q1.y*q0.y - q1.z*q0.z + q1.w*q0.w)
[docs] def scale(self, marker, factor, proj_elem_params): """Scale size of marker by redefining figure equation. Args: marker (object): marker object to scale factor (float): scale factor proj_elem_params (object): object with the parameters of the projection element to transform """ marker.action = Marker.DELETE self.scale_factor *= factor # update factor size = proj_elem_params.size[0]*0.001 * self.scale_factor angle = radians(proj_elem_params.angle[0]) if proj_elem_params.figure_type == Figure.POLYLINE: figure = self.line_eq(size, angle) # size is line length elif proj_elem_params.figure_type == Figure.CIRCLE: figure = self.circle_eq(size, 0.0, 2*pi) # size is circle radius elif proj_elem_params.figure_type == Figure.ARC: end_ang = radians(proj_elem_params.angle[1]) figure = self.circle_eq(size, angle, end_ang) # size is arc radius elif proj_elem_params.figure_type == Figure.OVAL: height_size = proj_elem_params.size[1]*0.001 * self.scale_factor figure = self.oval_eq(size, height_size, angle) # size is oval width elif proj_elem_params.figure_type == Figure.TEXT: marker.scale.z = marker.scale.z*0.001 * self.scale_factor figure = [] marker.points = figure marker.action = Marker.ADD
[docs] def on_press(self, key, marker, proj_elem_params): """Check if the key pressed if one of the list and execute the respective tasks. Args: key (enum): key pressed marker (object): monitored marker object proj_elem_params (object): object with the parameters of the projection element to monitor """ if any([key in COMBO for COMBO in self.keyboard_params.COMBINATIONS]): self.current.add(key) if self.current == self.keyboard_params.KEY_UP: rospy.loginfo("VIZ_KEY_UP") self.translate(marker, dy=self.compute_step()) elif self.current == self.keyboard_params.KEY_DOWN: rospy.loginfo("VIZ_KEY_DOWN") self.translate(marker, dy=-self.compute_step()) elif self.current == self.keyboard_params.KEY_LEFT: rospy.loginfo("VIZ_KEY_LEFT") self.translate(marker, dx=-self.compute_step()) elif self.current == self.keyboard_params.KEY_RIGHT: rospy.loginfo("VIZ_KEY_RIGHT") self.translate(marker, dx=self.compute_step()) elif self.current == self.keyboard_params.KEY_PLUS: rospy.loginfo("VIZ_KEY_PLUS") self.scale(marker, 2, proj_elem_params) elif self.current == self.keyboard_params.KEY_MINUS: rospy.loginfo("VIZ_KEY_MINUS") self.scale(marker, 0.5, proj_elem_params) elif self.current == self.keyboard_params.CTRL_LEFT: rospy.loginfo("VIZ_CTRL_LEFT") self.rotate(marker, 1) elif self.current == self.keyboard_params.CTRL_RIGHT: rospy.loginfo("VIZ_CTRL_RIGHT") self.rotate(marker, -1) elif self.current == self.keyboard_params.ESC: rospy.loginfo("VIZ_ESC") marker.action = Marker.DELETE
[docs] def on_release(self, key): """Remove current stored key, on release. Args: key (enum): key pressed """ if any([key in COMBO for COMBO in self.keyboard_params.COMBINATIONS]): if self.current == self.keyboard_params.ESC: return False # stop listener self.current.remove(key)
[docs] def marker_from_name(self, name): """Find marker object in the markers array with the name. Args: name (str): name of the marker Returns: object: marker found """ for marker in self.pe_marker_array.markers: if name in marker.ns: marker.action = Marker.ADD return marker return []
[docs] def init_keyboard_listener_cb(self, msg): """Start keyboard listener for monitoring key presses. Args: msg (object): object with the necessary parameters to identify a projection element Returns: tuple[bool, str]: the first value in the returned tuple is a bool success value and the second value in the tuple is an information message string """ from pynput import keyboard self.keyboard_params = KeyboardParameters() self.current = set() name = self.active_cs + "/" + msg.projection_group + self.figures_list[msg.figure_type] + msg.figure_name marker = self.marker_from_name(name) if not marker: return ProjectionElementResponse(False, "Marker not found.") try: on_press_handler = lambda event: self.on_press(event, marker=marker, proj_elem_params=msg) listener = keyboard.Listener(on_press = on_press_handler, on_release = self.on_release) listener.start() return ProjectionElementResponse(True, "Viz monitor.") except Exception as e: rospy.logerr(e) return ProjectionElementResponse(False, "Error viz monitor.")