line_follower.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 from re import S
3 import sys
4 import argparse
5 
6 import rospy
7 from sensor_msgs.msg import Image
8 from geometry_msgs.msg import Twist
9 from dynamic_reconfigure.server import Server
10 from leo_example_line_follower.cfg import ColorMaskConfig
11 import cv2
12 import cv_bridge
13 
14 import tflite_runtime.interpreter as tflite
15 import numpy as np
16 
17 
19  color_mask_config = None
20 
21  def __init__(self, modelPath=None, velocityTopic=None, videoTopic=None):
22  self.bridge = cv_bridge.CvBridge()
23 
24  try:
25  self.interpreter = tflite.Interpreter(model_path=modelPath)
26  self.interpreter.allocate_tensors()
27  except ValueError as e:
28  rospy.logerr("Couldnt load tflite model: %s" % (modelPath))
29  return
30 
31  self.mask_func = self.simple_mask
32 
33  self.srv = Server(ColorMaskConfig, self.param_callback)
34 
35  self.pub_mask = rospy.get_param("~pub_mask", default=False)
36 
37  self.vel_pub = rospy.Publisher(velocityTopic, Twist, queue_size=10)
38 
39  self.mask_pub = None
40  if self.pub_mask:
41  self.mask_pub = rospy.Publisher("color_mask", Image, queue_size=1)
42 
43  self.video_sub = rospy.Subscriber(videoTopic, Image, self.video_callback)
44 
45  def simple_mask(self, img):
46  color_mask = cv2.inRange(
47  img,
48  (
49  self.color_mask_config.hue_min,
50  self.color_mask_config.sat_min,
51  self.color_mask_config.val_min,
52  ),
53  (
54  self.color_mask_config.hue_max,
55  self.color_mask_config.sat_max,
56  self.color_mask_config.val_max,
57  ),
58  )
59  return color_mask
60 
61  def double_range_mask(self, img):
62  # hue_min > hue_max, so there are two masks:
63  # 1) 0-hue_max
64  # 2) hue_min - 180
65  lower_mask = cv2.inRange(
66  img,
67  (
68  0,
69  self.color_mask_config.sat_min,
70  self.color_mask_config.val_min,
71  ),
72  (
73  self.color_mask_config.hue_max,
74  self.color_mask_config.sat_max,
75  self.color_mask_config.val_max,
76  ),
77  )
78 
79  upper_mask = cv2.inRange(
80  img,
81  (
82  self.color_mask_config.hue_min,
83  self.color_mask_config.sat_min,
84  self.color_mask_config.val_min,
85  ),
86  (
87  179,
88  self.color_mask_config.sat_max,
89  self.color_mask_config.val_max,
90  ),
91  )
92 
93  final_mask = lower_mask + upper_mask
94 
95  return final_mask
96 
97 
98  def param_callback(self, config, level):
99  self.color_mask_config = config
100  self.mask_func = (
101  self.simple_mask
102  if config.hue_min < config.hue_max
103  else self.double_range_mask
104  )
105 
106 
107  return config
108 
109  def video_callback(self, data: Image):
110  cv_img = self.bridge.imgmsg_to_cv2(data, desired_encoding="bgr8")
111  processed_img = self.preprocess_img(cv_img)
112 
113  steering = self.get_steering(processed_img)
114 
115  if self.pub_mask:
116  self.publish_mask(processed_img)
117 
118  rospy.logdebug("steering: %f, %f" % (steering[0], steering[1]))
119 
120  self.publish_vel(steering)
121 
122  def publish_vel(self, steering):
123  vel_msg = Twist()
124  vel_msg.linear.x = steering[0]
125  vel_msg.angular.z = steering[1]
126 
127  self.vel_pub.publish(vel_msg)
128 
129  def publish_mask(self, mask):
130  mask *= 255.0
131  img_msg = self.bridge.cv2_to_imgmsg(mask, encoding="32FC1")
132  self.mask_pub.publish(img_msg)
133 
134  def get_steering(self, img):
135  input_details = self.interpreter.get_input_details()
136  output_details = self.interpreter.get_output_details()
137  self.interpreter.allocate_tensors()
138 
139  # providing input
140  self.interpreter.set_tensor(input_details[0]["index"], [img])
141  # running interferance
142  self.interpreter.invoke()
143 
144  # getting answer
145  linear_x = self.interpreter.get_tensor(output_details[0]["index"])[0][0]
146  angular_z = self.interpreter.get_tensor(output_details[1]["index"])[0][0]
147  rospy.logdebug("pred = (%f, %f)" % (linear_x, angular_z))
148 
149  return linear_x, angular_z
150 
151  def preprocess_img(self, img):
152  # changing BGR to HSV
153  hsv_img = cv2.cvtColor(img, cv2.COLOR_BGR2HSV)
154  # cropping img
155  crop_img = hsv_img[200 : hsv_img.shape[0], :]
156  # getting color mask
157  color_mask = self.mask_func(crop_img)
158  # converting int balues to float
159  float_img = color_mask.astype(np.float32)
160  # resizing
161  resized_img = cv2.resize(float_img, (160, 120))
162  # normalize
163  final_img = resized_img / 255.0
164 
165  return final_img[:, :, np.newaxis]
166 
167 
168 def add_argparse_arguments(parser: argparse.ArgumentParser):
169  parser.add_argument(
170  "-p",
171  "--path",
172  default="~/Downloads/saved_model.tflite",
173  type=str,
174  required=True,
175  dest="model_path",
176  help="path to the saved CNN model",
177  )
178  parser.add_argument(
179  "-v",
180  "--velocity",
181  default="cmd_vel",
182  type=str,
183  required=False,
184  dest="velocity_topic",
185  help="name of the topic with Twist messages for rover",
186  )
187  parser.add_argument(
188  "-c",
189  "--camera",
190  default="camera/image_raw",
191  type=str,
192  required=False,
193  dest="image_topic",
194  help="name of the topic with Image messages from rover",
195  )
196 
197 
198 if __name__ == "__main__":
199  rospy.init_node("line_follower")
200 
201  parser = argparse.ArgumentParser(description="Record video from rover as images.")
202  add_argparse_arguments(parser)
203 
204  rosargv = rospy.myargv(argv=sys.argv)
205  args = parser.parse_args(rosargv[1:])
206 
207  follower = LineFollower(args.model_path, args.velocity_topic, args.image_topic)
208  rospy.spin()
line_follower.LineFollower.publish_mask
def publish_mask(self, mask)
Definition: line_follower.py:129
line_follower.LineFollower.double_range_mask
def double_range_mask(self, img)
Definition: line_follower.py:61
line_follower.LineFollower.__init__
def __init__(self, modelPath=None, velocityTopic=None, videoTopic=None)
Definition: line_follower.py:21
line_follower.LineFollower.simple_mask
def simple_mask(self, img)
Definition: line_follower.py:45
line_follower.LineFollower.preprocess_img
def preprocess_img(self, img)
Definition: line_follower.py:151
line_follower.LineFollower.param_callback
def param_callback(self, config, level)
Definition: line_follower.py:98
line_follower.LineFollower.vel_pub
vel_pub
Definition: line_follower.py:37
line_follower.LineFollower.color_mask_config
color_mask_config
Definition: line_follower.py:19
line_follower.LineFollower.interpreter
interpreter
Definition: line_follower.py:25
line_follower.LineFollower.srv
srv
Definition: line_follower.py:33
line_follower.LineFollower.bridge
bridge
Definition: line_follower.py:22
line_follower.LineFollower.publish_vel
def publish_vel(self, steering)
Definition: line_follower.py:122
line_follower.LineFollower.pub_mask
pub_mask
Definition: line_follower.py:35
line_follower.LineFollower.get_steering
def get_steering(self, img)
Definition: line_follower.py:134
line_follower.LineFollower
Definition: line_follower.py:18
line_follower.LineFollower.video_sub
video_sub
Definition: line_follower.py:43
line_follower.LineFollower.mask_pub
mask_pub
Definition: line_follower.py:39
line_follower.add_argparse_arguments
def add_argparse_arguments(argparse.ArgumentParser parser)
Definition: line_follower.py:168
line_follower.LineFollower.video_callback
def video_callback(self, Image data)
Definition: line_follower.py:109
line_follower.LineFollower.mask_func
mask_func
Definition: line_follower.py:31


leo_example_line_follower
Author(s): Aleksander SzymaƄski
autogenerated on Fri Nov 25 2022 03:11:50