start_test_publisher.py
Go to the documentation of this file.
1 #!/usr/bin/env python3
2 
3 import zmq
4 import math
5 import json
6 import argparse
7 
8 from time import sleep, time_ns
9 import numpy as np
10 
11 PORT = 9872
12 
13 parser = argparse.ArgumentParser("start_test_publisher")
14 
15 parser.add_argument(
16  "--topic|-t",
17  dest="topic",
18  help="Topic on which messages will be published",
19  type=str,
20  required=False,
21 )
22 parser.add_argument(
23  "--timestamp",
24  dest="timestamp",
25  help="Send timestamp as message part, requires topic to work properly",
26  required=False,
27  action="store_true",
28 )
29 
30 args = parser.parse_args()
31 topic = args.topic
32 timestamp = args.timestamp
33 
34 
35 def main():
36  context = zmq.Context()
37  server_socket = context.socket(zmq.PUB)
38  server_socket.bind("tcp://*:" + str(PORT))
39  ticks = 0
40 
41  while True:
42  out_str = []
43  packet = []
44  data = {
45  "ticks": ticks,
46  "data": {
47  "cos": math.cos(ticks),
48  "sin": math.sin(ticks),
49  "floor": np.floor(np.cos(ticks)),
50  "ceil": np.ceil(np.cos(ticks)),
51  },
52  }
53 
54  if topic:
55  out_str.append(f"[{topic}] - ")
56  packet.append(topic.encode())
57 
58  out_str.append(json.dumps(data))
59  packet.append(out_str[-1].encode())
60 
61  if timestamp:
62  timestamp_s = str(time_ns() * 1e-9)
63  out_str.append(" - timestamp: " + timestamp_s)
64  packet.append(timestamp_s.encode())
65 
66  print("".join(out_str))
67  server_socket.send_multipart(packet)
68 
69  ticks += 1
70 
71  sleep(0.1)
72 
73 
74 if __name__ == "__main__":
75  main()
start_test_publisher.main
def main()
Definition: start_test_publisher.py:35
print
void print(std::FILE *f, const text_style &ts, const S &format_str, const Args &... args)
Definition: color.h:497
join
auto join(It begin, Sentinel end, string_view sep) -> join_view< It, Sentinel >
Definition: format.h:4280


plotjuggler
Author(s): Davide Faconti
autogenerated on Mon May 26 2025 02:22:38