Go to the documentation of this file.00001
00002
00003
00004
00005
00006
00007
00008
00009
00010 from python_qt_binding.QtCore import QObject, Signal, pyqtSlot
00011
00012 import geometry_msgs.msg as geometry_msgs
00013 import rospy
00014 import sensor_msgs.msg as sensor_msgs
00015 import threading
00016
00017
00018
00019
00020
00021 class VideoTeleopInterface(QObject):
00022
00023 cmd_vel_publishing_interval = 0.1
00024
00025 __slots__ = [
00026 '_cmd_vel',
00027 '_cmd_vel_publisher_timer',
00028 '_cmd_vel_publisher',
00029 '_compressed_image_subscriber',
00030 '_lock'
00031 ]
00032 image_received = Signal(sensor_msgs.CompressedImage, name="image_received")
00033
00034 def __init__(self,
00035 image_received_slot,
00036 cmd_vel_topic_name='cmd_vel',
00037 compressed_image_topic_name='compressed_image'):
00038 """
00039 Initialise the robot interface with slot connections back to whatever
00040 qt views are attached and the ros topic names to be used.
00041
00042 :param slot image_received_slot: slot to connect to the ``image_received`` signal.
00043 :param str cmd_vel_topic_name: topic name for the command velocity publisher.
00044 :param str compressed_image_topic_name: topic name for the compressed image subscriber.
00045 """
00046 self._lock = threading.Lock()
00047 self._cmd_vel = geometry_msgs.Twist()
00048 self._cmd_vel.linear.x = self._cmd_vel.angular.z = 0.0
00049 super(VideoTeleopInterface, self).__init__()
00050 self._cmd_vel_publisher = None
00051 self._compressed_image_subscriber = None
00052 if image_received_slot is not None:
00053 self.image_received.connect(image_received_slot)
00054 self._cmd_vel_publisher = rospy.Publisher(cmd_vel_topic_name, geometry_msgs.Twist, latch=True, queue_size=10)
00055 self._compressed_image_subscriber = rospy.Subscriber(compressed_image_topic_name, sensor_msgs.CompressedImage, self._ros_subscriber_image_callback)
00056 self._cmd_vel_publisher_timer = \
00057 rospy.Timer(rospy.Duration(VideoTeleopInterface.cmd_vel_publishing_interval),
00058 self._publish_cmd_vel
00059 )
00060
00061 def _ros_subscriber_image_callback(self, msg):
00062 """
00063 Update the teleop image
00064
00065 :param sensor_msgs.CompressedImage msg: an image stream feed from the robot
00066 """
00067
00068 self.image_received.emit(msg)
00069
00070 def _publish_cmd_vel(self, unused_event):
00071 """
00072 Convert the command into an appropriate one for the
00073 cmd_vel topic and publish.
00074 """
00075 if self._cmd_vel_publisher is not None:
00076 with self._lock:
00077 self._cmd_vel_publisher.publish(self._cmd_vel)
00078
00079 def shutdown(self):
00080 """
00081 There is a problem in rospy here which spams you with a warning
00082 when the cmd_vel is unregistered. This is just noise I believe (it shouldn't
00083 be printing anything). The publisher gets unregistered properly. Should
00084 be safe to ignore (lots of work to actually follow up).
00085 """
00086 self._cmd_vel_publisher_timer.shutdown()
00087 with self._lock:
00088 if self._cmd_vel_publisher:
00089 self._cmd_vel_publisher.unregister()
00090 if self._compressed_image_subscriber:
00091 self._compressed_image_subscriber.unregister()
00092 self._cmd_vel_publisher = self._compressed_image_subscriber = None
00093
00094 @property
00095 def cmd_vel(self):
00096 """
00097 Returns this cmd_vel in a 2-tuple appropriate for 2d teleop'ing robots,
00098 i.e. (linear, angular).
00099
00100 :returns: the last configured cmd_vel as a 2-tuple.
00101 :rtype: (float, float)
00102 """
00103 return (self._cmd_vel.linear.x, self._cmd_vel.angular.z)
00104
00105 @cmd_vel.setter
00106 def cmd_vel(self, value):
00107 """
00108 Convenient setter from a 2-tuple. The 2-tuple takes the form (linear, angular)
00109 with units in (m/s, rad/s).
00110
00111 Usage:
00112
00113 .. code-block:: python
00114
00115 teleop_interaface.cmd_vel = (1.0, 0.35) # linear x, angular z values)
00116 """
00117 with self._lock:
00118 (self._cmd_vel.linear.x, self._cmd_vel.angular.z) = value