generic_throttle.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 #
3 # Copyright 2017 Fraunhofer Institute for Manufacturing Engineering and Automation (IPA)
4 #
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
8 #
9 # http://www.apache.org/licenses/LICENSE-2.0
10 #
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
16 
17 
18 import rospy
19 import rostopic
20 import cv2
21 from threading import Lock
22 from threading import currentThread
23 from functools import partial
24 from cv_bridge import CvBridge, CvBridgeError
25 
26 
28  def __init__(self):
29  self.topics = None
30  self.bridge = CvBridge()
31 
32  mandatory_parameters = ['topic_rate','latched', 'lazy']
33 
34  topics_param_name = '~topics'
35  if not rospy.has_param(topics_param_name):
36  rospy.logerr('Parameter ' + topics_param_name + ' not available')
37 
38  # See README for format
39  topics_list = rospy.get_param(topics_param_name, []) # type: List[Mapping]
40 
41  # create dictionary out of the topic list
42  self.topics = {next(iter(item.keys())): next(iter(item.values())) for item in topics_list}
43 
44  # Check if each entry of topics has the mandatory parameters
45  mandatory_flag = all(set(mandatory_parameters) <=
46  set(element) for key, element in
47  self.topics.items())
48 
49  if(not(mandatory_flag)):
50  rospy.logerr('Each throttled topic needs 3 parameters ' +
51  str(mandatory_parameters))
52  exit(10)
53 
54  # Populate the dictionary for the ros topic throttling
56 
57  def timer_callback(self, event, topic_id):
58  # The False argument is for a non blocking call
59  locking = self.topics[topic_id]['lock'].acquire_lock(False)
60 
61  if not(locking):
62  current_t = currentThread()
63  rospy.logdebug(str(current_t.getName()) + ': cannot lock topic '
64  + topic_id)
65  return
66 
67  publisher = self.topics[topic_id]['publisher']
68  subscriber = self.topics[topic_id]['subscriber']
69 
70  # Check if pub and sub already exist, if not try to create them after
71  # checking the ros topic
72 
73  if None in(publisher, subscriber):
74  topic_info = rostopic.get_topic_class(topic_id)
75  if topic_info[0] is None:
76  rospy.logwarn('Cannot find topic ' + topic_id)
77 
78  self.topics[topic_id]['lock'].release_lock()
79  return
80  else:
81  # Create a (latched if needed) publisher
82  self.topics[topic_id]['publisher'] = \
83  rospy.Publisher(topic_id + '_throttled', topic_info[0],
84  queue_size = 1,
85  latch = self.topics[topic_id]['latched'])
86 
87  rospy.loginfo('Created publisher for ' + topic_id)
88 
89  # Create subscriber
90  subscriber_partial = partial(self.subscriber_callback,
91  topic_id=topic_id)
92  self.topics[topic_id]['subscriber'] = \
93  rospy.Subscriber(topic_id, topic_info[0],
94  subscriber_partial)
95  rospy.loginfo('Created subscriber for ' + topic_id)
96 
97  self.topics[topic_id]['lock'].release_lock()
98  return
99 
100  last_message = self.topics[topic_id]['last_message']
101 
102  if last_message is not None:
103  lazy_behavior = \
104  self.topics[topic_id]['publisher'].get_num_connections() == 0 \
105  and self.topics[topic_id]['lazy']
106  if not lazy_behavior:
107  # if requested and possible, apply the resolution change
108  try:
109  resolution = self.topics[topic_id]['resolution_factor']
110  except KeyError:
111  resolution = None
112 
113  if resolution is not None:
114  last_message = self._resize_image(last_message, resolution)
115  # publish the throttled message
116  self.topics[topic_id]['publisher'].publish(last_message)
117 
118  self.topics[topic_id]['last_message'] = None
119  self.topics[topic_id]['lock'].release_lock()
120  return
121 
122  self.topics[topic_id]['lock'].release_lock()
123  return
124 
125  def subscriber_callback(self, data, topic_id):
126  locking = self.topics[topic_id]['lock'].acquire_lock(False)
127  # The False argument is for a non blocking call
128 
129  if not (locking):
130  current_t = currentThread()
131  rospy.logdebug(str(current_t.getName()) + ': cannot lock topic '
132  + topic_id)
133  return
134 
135  try:
136  resolution = self.topics[topic_id]['resolution_factor']
137  except KeyError:
138  resolution = None
139 
140  if resolution is not None:
141  if data._type != 'sensor_msgs/Image':
142  rospy.logwarn('resolution_factor option is not available for ' +
143  data._type + '. Topic ' + topic_id +
144  ' will not be resolution throttled.')
145  self.topics[topic_id]['resolution_factor'] = None
146 
147 
148  self.topics[topic_id]['last_message'] = data
149  self.topics[topic_id]['lock'].release_lock()
150 
152  # Topic dictionary structure
153  # {topic_id: {topic_rate, lazy, latched, subscriber,
154  # publisher, lock, timer, last_message}
155 
156  for key, element in self.topics.items():
157  element['lock'] = Lock()
158  element['lock'].acquire_lock()
159  element['publisher'] = None
160  element['subscriber'] = None
161  # Create Timer for each topic
162  personal_callback = partial(self.timer_callback, topic_id=key)
163  element['timer'] = rospy.Timer(
164  rospy.Duration(1.0 / element['topic_rate']), personal_callback)
165  element['last_message'] = None
166  element['lock'].release_lock()
167 
168  def _shutdown(self):
169  rospy.loginfo('Stopping ' + str(rospy.get_name()))
170 
171  def _resize_image(self, data, resolution):
172  old_image = None
173 
174  try:
175  old_image = self.bridge.imgmsg_to_cv2(data)
176  except CvBridgeError as e:
177  print(e)
178  exit(20)
179 
180  try:
181  if resolution < 1:
182  # shrinking
183  new_image = cv2.resize(old_image, (0, 0), fx=resolution,
184  fy=resolution,
185  interpolation=cv2.INTER_AREA)
186  elif resolution > 1:
187  # enlarging
188  new_image = cv2.resize(old_image, (0, 0), fx=resolution,
189  fy=resolution)
190  else:
191  # resolution == 1 --> Don't resize at all...
192  new_image = old_image
193  except cv2.error as e:
194  print(e)
195  exit(20)
196 
197  try:
198  new_message = self.bridge.cv2_to_imgmsg(new_image,
199  encoding=data.encoding)
200  return new_message
201  except CvBridgeError as e:
202  print(e)
203  exit(20)


generic_throttle
Author(s): Mattia Racca
autogenerated on Wed Apr 7 2021 03:03:17