generic_throttle.py
Go to the documentation of this file.
00001 #!/usr/bin/env python
00002 #
00003 # Copyright 2017 Fraunhofer Institute for Manufacturing Engineering and Automation (IPA)
00004 #
00005 # Licensed under the Apache License, Version 2.0 (the "License");
00006 # you may not use this file except in compliance with the License.
00007 # You may obtain a copy of the License at
00008 #
00009 #   http://www.apache.org/licenses/LICENSE-2.0
00010 #
00011 # Unless required by applicable law or agreed to in writing, software
00012 # distributed under the License is distributed on an "AS IS" BASIS,
00013 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
00014 # See the License for the specific language governing permissions and
00015 # limitations under the License.
00016 
00017 
00018 import rospy
00019 import rostopic
00020 import cv2
00021 from threading import Lock
00022 from threading import currentThread
00023 from functools import partial
00024 from cv_bridge import CvBridge, CvBridgeError
00025 
00026 
00027 class GenericThrottle:
00028     def __init__(self):
00029         self.topics = None
00030         self.bridge = CvBridge()
00031 
00032         mandatory_parameters = ['topic_rate','latched', 'lazy']
00033 
00034         topics_param_name = '~topics'
00035         if not rospy.has_param(topics_param_name):
00036             rospy.logerr('Parameter ' + topics_param_name + ' not available')
00037         topics_list = rospy.get_param(topics_param_name, [])
00038 
00039         # create dictionary out of the topic list
00040         self.topics = {item.keys()[0]: item.values()[0] for item in topics_list}
00041 
00042         # Check if each entry of topics has the mandatory parameters
00043         mandatory_flag = all(set(mandatory_parameters) <=
00044                              set(element) for key, element in
00045                              self.topics.iteritems())
00046 
00047         if(not(mandatory_flag)):
00048             rospy.logerr('Each throttled topic needs 3 parameters ' +
00049                          str(mandatory_parameters))
00050             exit(10)
00051 
00052         # Populate the dictionary for the ros topic throttling
00053         self._populate_dictionary()
00054 
00055     def timer_callback(self, event, topic_id):
00056         # The False argument is for a non blocking call
00057         locking = self.topics[topic_id]['lock'].acquire_lock(False)
00058 
00059         if not(locking):
00060             current_t = currentThread()
00061             rospy.logdebug(str(current_t._Thread__name) + ': cannot lock topic '
00062                           + topic_id)
00063             return
00064 
00065         publisher = self.topics[topic_id]['publisher']
00066         subscriber = self.topics[topic_id]['subscriber']
00067 
00068         # Check if pub and sub already exist, if not try to create them after
00069         # checking the ros topic
00070 
00071         if None in(publisher, subscriber):
00072             topic_info = rostopic.get_topic_class(topic_id)
00073             if topic_info[0] is None:
00074                 rospy.logwarn('Cannot find topic ' + topic_id)
00075 
00076                 self.topics[topic_id]['lock'].release_lock()
00077                 return
00078             else:
00079                 # Create a (latched if needed) publisher
00080                 self.topics[topic_id]['publisher'] = \
00081                     rospy.Publisher(topic_id + '_throttled', topic_info[0],
00082                                     queue_size = 1,
00083                                     latch = self.topics[topic_id]['latched'])
00084 
00085                 rospy.loginfo('Created publisher for ' + topic_id)
00086 
00087                 # Create subscriber
00088                 subscriber_partial = partial(self.subscriber_callback,
00089                                              topic_id=topic_id)
00090                 self.topics[topic_id]['subscriber'] = \
00091                     rospy.Subscriber(topic_id, topic_info[0],
00092                                      subscriber_partial)
00093                 rospy.loginfo('Created subscriber for ' + topic_id)
00094 
00095                 self.topics[topic_id]['lock'].release_lock()
00096                 return
00097 
00098         last_message = self.topics[topic_id]['last_message']
00099 
00100         if last_message is not None:
00101             lazy_behavior = \
00102                 self.topics[topic_id]['publisher'].get_num_connections() == 0 \
00103                 and self.topics[topic_id]['lazy']
00104             if not lazy_behavior:
00105                 # if requested and possible, apply the resolution change
00106                 try:
00107                     resolution = self.topics[topic_id]['resolution_factor']
00108                 except KeyError:
00109                     resolution = None
00110 
00111                 if resolution is not None:
00112                     last_message = self._resize_image(last_message, resolution)
00113                 # publish the throttled message
00114                 self.topics[topic_id]['publisher'].publish(last_message)
00115 
00116             self.topics[topic_id]['last_message'] = None
00117             self.topics[topic_id]['lock'].release_lock()
00118             return
00119 
00120         self.topics[topic_id]['lock'].release_lock()
00121         return
00122 
00123     def subscriber_callback(self, data, topic_id):
00124         locking = self.topics[topic_id]['lock'].acquire_lock(False)
00125         # The False argument is for a non blocking call
00126 
00127         if not (locking):
00128             current_t = currentThread()
00129             rospy.logdebug(str(current_t._Thread__name) + ': cannot lock topic '
00130                           + topic_id)
00131             return
00132 
00133         try:
00134             resolution = self.topics[topic_id]['resolution_factor']
00135         except KeyError:
00136             resolution = None
00137 
00138         if resolution is not None:
00139             if data._type != 'sensor_msgs/Image':
00140                 rospy.logwarn('resolution_factor option is not available for ' +
00141                                 data._type + '. Topic ' + topic_id +
00142                                 ' will not be resolution throttled.')
00143                 self.topics[topic_id]['resolution_factor'] = None
00144 
00145 
00146         self.topics[topic_id]['last_message'] = data
00147         self.topics[topic_id]['lock'].release_lock()
00148 
00149     def _populate_dictionary(self):
00150         # Topic dictionary structure
00151         # {topic_id: {topic_rate, lazy, latched, subscriber,
00152         # publisher, lock, timer, last_message}
00153 
00154         for key, element in self.topics.iteritems():
00155             element['lock'] = Lock()
00156             element['lock'].acquire_lock()
00157             element['publisher'] = None
00158             element['subscriber'] = None
00159             # Create Timer for each topic
00160             personal_callback = partial(self.timer_callback, topic_id=key)
00161             element['timer'] = rospy.Timer(
00162                 rospy.Duration(1.0 / element['topic_rate']), personal_callback)
00163             element['last_message'] = None
00164             element['lock'].release_lock()
00165 
00166     def _shutdown(self):
00167         rospy.loginfo('Stopping ' + str(rospy.get_name()))
00168 
00169     def _resize_image(self, data, resolution):
00170         old_image = None
00171 
00172         try:
00173             old_image = self.bridge.imgmsg_to_cv2(data)
00174         except CvBridgeError as e:
00175             print(e)
00176             exit(20)
00177 
00178         try:
00179             if resolution < 1:
00180                 # shrinking
00181                 new_image = cv2.resize(old_image, (0, 0), fx=resolution,
00182                                        fy=resolution,
00183                                        interpolation=cv2.INTER_AREA)
00184             elif resolution > 1:
00185                 # enlarging
00186                 new_image = cv2.resize(old_image, (0, 0), fx=resolution,
00187                                        fy=resolution)
00188             else:
00189                 # resolution == 1 --> Don't resize at all...
00190                 new_image = old_image
00191         except cv2.error as e:
00192             print(e)
00193             exit(20)
00194 
00195         try:
00196             new_message = self.bridge.cv2_to_imgmsg(new_image,
00197                                                     encoding=data.encoding)
00198             return new_message
00199         except CvBridgeError as e:
00200             print(e)
00201             exit(20)


generic_throttle
Author(s): Mattia Racca
autogenerated on Sun Jun 9 2019 20:20:26