test-still-lidar2d.py
Go to the documentation of this file.
1 #!/usr/bin/env python3
2 
3 # ---------------------------------------------------------------------
4 # This example shows how to:
5 # - Subscribe to an MVSIM topic with a custom callback.
6 #
7 # Install python3-mvsim, or test with a local build with:
8 # export PYTHONPATH=$HOME/code/mvsim/build/:$PYTHONPATH
9 # ---------------------------------------------------------------------
10 
11 from mvsim_comms import pymvsim_comms
12 from mvsim_msgs import ObservationLidar2D_pb2
13 from mvsim_msgs import SrvShutdown_pb2
14 from mvsim_msgs import SrvGetPose_pb2
15 
16 import subprocess
17 import time
18 import os
19 
20 TESTS_DIR = os.environ['TESTS_DIR']
21 MVSIM_CLI_EXE_PATH = os.environ['MVSIM_CLI_EXE_PATH']
22 
23 global TEST_PASSED
24 TEST_PASSED = False
25 
26 
27 def onMessage(msgType, msg):
28  global TEST_PASSED
29  assert(msgType == "mvsim_msgs.ObservationLidar2D")
30  p = ObservationLidar2D_pb2.ObservationLidar2D()
31  p.ParseFromString(bytes(msg))
32  print("callback received:\n ranges=\n" + str(p.scanRanges) +
33  "\n validRanges=" + str(p.validRanges))
34  scanRanges = list(p.scanRanges)
35  validRanges = list(p.validRanges)
36 
37  # TODO: Fix wildly different ranges between u20.04 and u22.04!
38  if (len(scanRanges) == 181) and \
39  (abs(scanRanges[0]-9.0) < 1.5) and \
40  (validRanges[0] == True):
41  TEST_PASSED = True
42 
43 
44 def call_mvsim_shutdown(client):
45  # Send the request:
46  req = SrvShutdown_pb2.SrvShutdown()
47  # (no fields to fill in for this case)
48  client.callService('shutdown', req.SerializeToString())
49 
50 
51 if __name__ == "__main__":
52 
53  # Important: Run with simulated slow-down time (--realtime-factor),
54  # to prevent timeout in real-time check asserts within mvsim
55  # when run inside cloud docker containers:
56  #
57  subprocess.Popen([MVSIM_CLI_EXE_PATH, "launch",
58  TESTS_DIR + "/test-still-lidar2d.world.xml",
59  "--headless", "-v DEBUG", "--realtime-factor 0.1"])
60 
61  client = pymvsim_comms.mvsim.Client()
62  print("Connecting to server...", flush=True)
63  client.connect()
64  print("Connected successfully.", flush=True)
65 
66  # Subscribe to "/r1/laser1_scan"
67  client.subscribeTopic("/r1/laser1_scan", onMessage)
68 
69  for i in range(300):
70  if TEST_PASSED:
71  break
72  print("Running and waiting...", flush=True)
73  time.sleep(0.25)
74 
75  call_mvsim_shutdown(client)
76 
77  assert(TEST_PASSED)
rapidxml::print
OutIt print(OutIt out, const xml_node< Ch > &node, int flags=0)
Definition: rapidxml_print.hpp:405
test-still-lidar2d.call_mvsim_shutdown
def call_mvsim_shutdown(client)
Definition: test-still-lidar2d.py:44
test-still-lidar2d.onMessage
def onMessage(msgType, msg)
Definition: test-still-lidar2d.py:27


mvsim
Author(s):
autogenerated on Wed May 28 2025 02:13:08