00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019 import itertools
00020 import random
00021
00022 import rospy
00023 from tf import transformations
00024 from cartographer_ros_msgs.msg import LandmarkEntry, LandmarkList
00025
00026 DESC = '''
00027 Samples a number of random landmarks at a specified rate.
00028 Can be used to test the timing of landmark input or the effect of erroneous
00029 landmarks with duplicate IDs (if --allow_duplicate_ids is set).
00030
00031 For example:
00032
00033 ./publish_fake_random_landmarks.py \\
00034 --publish_period 0.1 \\
00035 --id_vocabulary A B C \\
00036 --id_length 5 \\
00037 --sample_period 1.0
00038
00039 will publish empty landmark lists at 10Hz and random landmarks every second.
00040 IDs are also sampled, using the cartesian product of the provided "vocabulary".
00041 In the above example, a sampled ID could be e.g. "AACBC" (length=5).
00042 '''
00043
00044 TOPIC = "landmark"
00045
00046
00047 class LandmarkSamplerOptions(object):
00048
00049 def __init__(self, *args, **kwargs):
00050 self.allow_duplicate_ids = False
00051 self.id_vocabulary = {}
00052 self.id_length = 0
00053 self.max_distance = 0.
00054 self.num_landmarks = 0
00055 self.rotation_weight = 0.
00056 self.translation_weight = 0.
00057 for name, arg in kwargs.items():
00058 setattr(self, name, arg)
00059
00060
00061 class LandmarkIdSampler(object):
00062
00063 def __init__(self, id_vocabulary, id_length):
00064
00065
00066 if len(id_vocabulary)**id_length > 1e6:
00067 raise ValueError("ID sampling space is too large")
00068 self.sampling_space = list(
00069 itertools.product(*(id_vocabulary for i in range(id_length))))
00070
00071 def sample_id(self):
00072
00073 random_index = random.randint(0, len(self.sampling_space) - 1)
00074 sampled_id = "".join(self.sampling_space[random_index])
00075 return sampled_id
00076
00077
00078 class LandmarkSampler(object):
00079
00080 def __init__(self, options):
00081 if not isinstance(options, LandmarkSamplerOptions):
00082 raise TypeError("expected LandmarkSamplerOptions")
00083 self.options = options
00084 rospy.loginfo("Initializing landmark ID sampler...")
00085 self.landmark_id_sampler = LandmarkIdSampler(options.id_vocabulary,
00086 options.id_length)
00087 self._sampled_ids = []
00088
00089 def random_landmark(self):
00090 landmark = LandmarkEntry()
00091 landmark.translation_weight = self.options.translation_weight
00092 landmark.rotation_weight = self.options.rotation_weight
00093 landmark.id = self.landmark_id_sampler.sample_id()
00094 if landmark.id in self._sampled_ids:
00095 if not self.options.allow_duplicate_ids:
00096 rospy.logwarn("Ignoring duplicate ID: {}".format(landmark.id))
00097 return None
00098 else:
00099 rospy.logwarn("Duplicate ID: {}".format(landmark.id))
00100 self._sampled_ids.append(landmark.id)
00101
00102 vector = transformations.random_vector(3) * self.options.max_distance
00103 landmark.tracking_from_landmark_transform.position.x = vector[0]
00104 landmark.tracking_from_landmark_transform.position.y = vector[1]
00105 landmark.tracking_from_landmark_transform.position.z = vector[2]
00106
00107 quaternion = transformations.random_quaternion()
00108 landmark.tracking_from_landmark_transform.orientation.x = quaternion[0]
00109 landmark.tracking_from_landmark_transform.orientation.y = quaternion[1]
00110 landmark.tracking_from_landmark_transform.orientation.z = quaternion[2]
00111 landmark.tracking_from_landmark_transform.orientation.w = quaternion[3]
00112 return landmark
00113
00114 def random_landmark_list(self):
00115 landmark_list = LandmarkList()
00116 landmark_list.header.stamp = rospy.Time.now()
00117 for _ in range(self.options.num_landmarks):
00118 random_landmark = self.random_landmark()
00119 if random_landmark is not None:
00120 landmark_list.landmarks.append(random_landmark)
00121 return landmark_list
00122
00123
00124 class SampledLandmarkPublisher(object):
00125
00126 def __init__(self, publish_period, sample_period, landmark_sampler_options):
00127 self.landmark_sampler = LandmarkSampler(landmark_sampler_options)
00128 rospy.loginfo("Publishing landmarks to topic: {}".format(TOPIC))
00129 self.publisher = rospy.Publisher(TOPIC, LandmarkList, queue_size=1)
00130 self.publish_timer = rospy.Timer(
00131 rospy.Duration(publish_period), self.publish_empty_landmark_list)
00132 self.sample_timer = rospy.Timer(
00133 rospy.Duration(sample_period), self.publish_random_landmark_list)
00134
00135 def publish_random_landmark_list(self, timer_event):
00136 self.publisher.publish(self.landmark_sampler.random_landmark_list())
00137
00138 def publish_empty_landmark_list(self, timer_event):
00139 landmark_list = LandmarkList(rospy.Header(stamp=rospy.Time.now()), [])
00140 self.publisher.publish(landmark_list)
00141
00142
00143 if __name__ == '__main__':
00144 import argparse
00145 parser = argparse.ArgumentParser(
00146 description=DESC, formatter_class=argparse.RawTextHelpFormatter)
00147 parser.add_argument("--translation_weight", type=float, default=1e5)
00148 parser.add_argument("--rotation_weight", type=float, default=1e5)
00149 parser.add_argument(
00150 "--publish_period",
00151 type=float,
00152 default=0.1,
00153 help="Baseline period for publishing empty landmark lists.")
00154 parser.add_argument(
00155 "--sample_period",
00156 type=float,
00157 default=5.,
00158 help="Period at which randomly sampled landmarks are published.")
00159 parser.add_argument(
00160 "--num_landmarks",
00161 type=int,
00162 default=5,
00163 help="The number of random landmarks published simultaneously.")
00164 parser.add_argument(
00165 "--max_distance",
00166 type=float,
00167 default=1.0,
00168 help="Maximum distance of a random landmark to the tracking frame.")
00169 parser.add_argument(
00170 "--id_vocabulary",
00171 nargs='+',
00172 default={"a", "b", "c", "1", "2", "3"},
00173 help="Set of symbols that can appear in random landmark IDs.")
00174 parser.add_argument(
00175 "--id_length",
00176 type=int,
00177 default=5,
00178 help="The length of the random landmark IDs (number of symbols).")
00179 parser.add_argument(
00180 "--allow_duplicate_ids",
00181 action="store_true",
00182 help="Publish landmarks with IDs that have already been published.")
00183
00184 args, unknown = parser.parse_known_args(rospy.myargv()[1:])
00185 rospy.init_node("landmark_sampler")
00186
00187 landmark_sampler_options = LandmarkSamplerOptions(**args.__dict__)
00188 sampler = SampledLandmarkPublisher(args.publish_period, args.sample_period,
00189 landmark_sampler_options)
00190
00191 rospy.spin()