start_test_publisher.py
Go to the documentation of this file.
1 #!/usr/bin/env python3
2 
3 import zmq
4 import math
5 import json
6 import argparse
7 
8 from time import sleep
9 import numpy as np
10 
11 PORT = 9872
12 
13 parser = argparse.ArgumentParser("start_test_publisher")
14 
15 parser.add_argument("--topic|-t",
16  dest="topic",
17  help="Topic on which messages will be published",
18  type=str,
19  required=False)
20 
21 args = parser.parse_args()
22 topic = args.topic
23 
24 
25 def main():
26  context = zmq.Context()
27  server_socket = context.socket(zmq.PUB)
28  server_socket.bind("tcp://*:" + str(PORT))
29  ticks = 0
30 
31  while True:
32  data = {
33  "ticks": ticks,
34  "data": {
35  "cos": math.cos(ticks),
36  "sin": math.sin(ticks),
37  "floor": np.floor(np.cos(ticks)),
38  "ceil": np.ceil(np.cos(ticks))
39  }
40  }
41 
42  if topic:
43  print(f"[{topic}] - " + json.dumps(data))
44  server_socket.send_multipart(
45  [topic.encode(), json.dumps(data).encode()])
46  else:
47  print(json.dumps(data))
48  server_socket.send(json.dumps(data).encode())
49 
50  ticks += 1
51 
52  sleep(0.1)
53 
54 
55 if __name__ == '__main__':
56  main()
start_test_publisher.main
def main()
Definition: start_test_publisher.py:25
print
void print(std::FILE *f, const text_style &ts, const S &format_str, const Args &... args)
Definition: color.h:497


plotjuggler
Author(s): Davide Faconti
autogenerated on Sun Aug 11 2024 02:24:26