publish_cpm_ts.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 
3 # ==============================================================================
4 # MIT License
5 #
6 # Copyright (c) 2023-2025 Institute for Automotive Engineering (ika), RWTH Aachen University
7 #
8 # Permission is hereby granted, free of charge, to any person obtaining a copy
9 # of this software and associated documentation files (the "Software"), to deal
10 # in the Software without restriction, including without limitation the rights
11 # to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
12 # copies of the Software, and to permit persons to whom the Software is
13 # furnished to do so, subject to the following conditions:
14 #
15 # The above copyright notice and this permission notice shall be included in all
16 # copies or substantial portions of the Software.
17 #
18 # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
19 # IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
20 # FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
21 # AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
22 # LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
23 # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
24 # SOFTWARE.
25 # ==============================================================================
26 
27 import rclpy
28 from rclpy.node import Node
29 from etsi_its_cpm_ts_msgs.msg import *
30 import utils
31 
32 class Publisher(Node):
33 
34  def __init__(self):
35 
36  super().__init__("cpm_publisher")
37  topic = "/etsi_its_conversion/cpm_ts/in"
38  self.publisher = self.create_publisher(CollectivePerceptionMessage, topic, 1)
39  self.timer = self.create_timer(0.1, self.publish)
40 
41  def publish(self):
42 
43  msg = CollectivePerceptionMessage()
44 
45  msg.header.protocol_version.value = 2
46  msg.header.message_id.value = msg.header.message_id.CPM
47 
48  msg.payload.management_container.reference_time.value = utils.get_t_its(self.get_clock().now().nanoseconds)
49  msg.payload.management_container.reference_position.latitude.value = int(50.78779641723146 * 1e7)
50  msg.payload.management_container.reference_position.longitude.value = int(6.047076274316094 * 1e7)
51 
52 
53  cpm_container = WrappedCpmContainer()
54  cpm_container.container_id.value = cpm_container.CHOICE_CONTAINER_DATA_PERCEIVED_OBJECT_CONTAINER
55 
56  perceived_object_container = PerceivedObjectContainer()
57  perceived_object_container.number_of_perceived_objects.value = 2
58  for i in range(perceived_object_container.number_of_perceived_objects.value):
59  perceived_object = PerceivedObject()
60  perceived_object.object_id_is_present = True
61  perceived_object.object_id.value = i
62  perceived_object.measurement_delta_time.value = 10
63  perceived_object.position.x_coordinate.value.value = int(10 * 1e2 * (i*3))
64  perceived_object.position.x_coordinate.confidence.value = perceived_object.position.x_coordinate.confidence.UNAVAILABLE
65  perceived_object.position.y_coordinate.value.value = int(2 * 1e2)
66  perceived_object.position.y_coordinate.confidence.value = perceived_object.position.y_coordinate.confidence.UNAVAILABLE
67  perceived_object.object_dimension_x_is_present = True
68  perceived_object.object_dimension_x.value.value = int(3.5 * 1e1)
69  perceived_object.object_dimension_x.confidence.value = perceived_object.object_dimension_x.confidence.UNAVAILABLE
70  perceived_object.object_dimension_y_is_present = True
71  perceived_object.object_dimension_y.value.value = int(1.8 * 1e1)
72  perceived_object.object_dimension_y.confidence.value = perceived_object.object_dimension_y.confidence.UNAVAILABLE
73  perceived_object.object_dimension_z_is_present = True
74  perceived_object.object_dimension_z.value.value = int(1.6 * 1e1)
75  perceived_object.object_dimension_z.confidence.value = perceived_object.object_dimension_z.confidence.UNAVAILABLE
76  perceived_object_container.perceived_objects.array.append(perceived_object)
77 
78  cpm_container.container_data_perceived_object_container = perceived_object_container
79  msg.payload.cpm_containers.value.array.append(cpm_container)
80 
81  self.get_logger().info(f"Publishing CPM")
82  self.publisher.publish(msg)
83 
84 if __name__ == "__main__":
85 
86  rclpy.init()
87  publisher = Publisher()
88  rclpy.spin(publisher)
89  rclpy.shutdown()
utils.get_t_its
def get_t_its(unix_nanoseconds)
Definition: utils.py:30
publish_cpm_ts.Publisher
Definition: publish_cpm_ts.py:32
publish_cpm_ts.Publisher.timer
timer
Definition: publish_cpm_ts.py:39
publish_cpm_ts.Publisher.publish
def publish(self)
Definition: publish_cpm_ts.py:41
msg
publish_cpm_ts.Publisher.__init__
def __init__(self)
Definition: publish_cpm_ts.py:34
publish_cpm_ts.Publisher.publisher
publisher
Definition: publish_cpm_ts.py:38


etsi_its_msgs_utils
Author(s): Jean-Pierre Busch , Guido Küppers , Lennart Reiher
autogenerated on Sun May 18 2025 02:32:12