sopas_json_test_server.py
Go to the documentation of this file.
1 """
2  A simple sopas test server using json input. A listening tcp socket is opened, incoming connections are accepted and cola telegrams are responded on client requests.
3  After 10 seconds, datagrams are sent continously.
4  All data (sopas responses and telegrams) are read from json file, which can be created from pcapng-file by pcap_json_converter.py
5  Note: This is just a simple test server for sick_scan_xd unittests. It does not emulate any device.
6 
7  Usage:
8  python sopas_json_test_server.py --tcp_port=<int> --json_file=<filepath>
9 
10  Example:
11  python ../test/python/sopas_test_server.py --tcp_port=2111 --json_file=../emulator/scandata/20221018_rms_1xxx_ascii_rawtarget_object.pcapng.json
12 
13 """
14 
15 import argparse
16 import datetime
17 import json
18 import select
19 import socket
20 import time
21 import threading
22 
23 """
24  SopasTestServer connects to a tcp client, receives cola telegrams and sends a response to the client.
25 """
27 
28  # Constructor
29  def __init__(self, tcp_port = 2112, json_tcp_payloads = [], verbosity = 0):
30  self.tcp_port = tcp_port
31  self.json_tcp_payloads = json_tcp_payloads
32  self.verbosity = verbosity
33  self.run_message_loop = False
34 
35  # Waits for an incoming tcp connection and connects to the tcp client
36  def connect(self):
37  self.serversocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
38  self.serversocket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
39  self.serversocket.bind(("", self.tcp_port))
40  print("SopasTestServer: listening on tcp port {}".format(self.tcp_port))
41  self.serversocket.listen(1)
42  (clientsocket, clientaddress) = self.serversocket.accept()
43  self.clientsocket = clientsocket
44  self.clientaddress = clientaddress
45  self.clientsocket.setblocking(0)
46  self.run_message_loop = True
47  print("SopasTestServer: tcp connection to {} established".format(self.clientaddress))
48 
49  # Receives a cola telegram and returns its payload (i.e. the telegram without header and CRC)
50  def receiveTelegram(self, recv_timeout_sec):
51  payload = bytearray(b"")
52  payload_idx = -1
53  payload_len = -1
54  ready_to_recv = select.select([self.clientsocket], [], [], recv_timeout_sec)
55  if ready_to_recv[0]:
56  try:
57  byte_recv = b"\x00"
58  while byte_recv != b"\x02":
59  byte_recv = self.clientsocket.recv(1)
60  payload = payload + byte_recv
61  while True:
62  byte_recv = self.clientsocket.recv(1)
63  payload = payload + byte_recv
64  if len(payload) == 8 and payload[0:4] == b"\x02\x02\x02\x02": # i.e. 4 byte 0x02020202 + 4 byte payload length
65  payload_len = int.from_bytes(payload[4:8], byteorder='big', signed=False)
66  # print("SopasTestServer.receiveTelegram(): decoded payload_len = {} byte".format(payload_len))
67  if payload in self.json_tcp_payloads:
68  payload_idx = self.json_tcp_payloads.index(payload)
69  break
70  if payload_len > 0 and len(payload) >= payload_len + 9: # 4 byte 0x02020202 + 4 byte payload length + payload + 1 byte CRC
71  break
72  except Exception as exc:
73  print("## ERROR SopasTestServer.receiveTelegram(): exception {}".format(exc))
74  print("## ERROR SopasTestServer.receiveTelegram(): received {} byte telegram {}".format(len(payload), payload))
75  if self.verbosity > 1:
76  print("SopasTestServer.receiveTelegram(): received {} byte telegram {}".format(len(payload), payload))
77  elif self.verbosity > 0:
78  print("SopasTestServer.receiveTelegram(): received {} byte telegram".format(len(payload)))
79  return payload, payload_idx
80 
81  # Sends a cola telegram "as is"
82  def sendTelegram(self, telegram):
83  if self.verbosity > 1:
84  print("SopasTestServer.sendTelegram(): sending {} byte telegram {}".format((len(telegram)), telegram))
85  elif self.verbosity > 0:
86  print("SopasTestServer.sendTelegram(): sending {} byte telegram".format(len(telegram)))
87  self.clientsocket.send(telegram)
88 
89  # Runs the message loop, i.e. receives sopas telegrams and sends a response to the client
90  def run(self):
91  print("SopasTestServer: running event loop...")
92  while self.run_message_loop:
93  # Receive a cola telegram
94  received_telegram, json_tcp_payload_idx = self.receiveTelegram(1)
95  if len(received_telegram) <= 0: # timeout (no message rececived)
96  continue
97  # Lookup sopas response to sopas request
98  if received_telegram[8:25] == b"sRN SCdevicestate":
99  response_payload = b"\x02\x02\x02\x02\x00\x00\x00\x13\x73\x52\x41\x20\x53\x43\x64\x65\x76\x69\x63\x65\x73\x74\x61\x74\x65\x20\x01\x1e" # always send "sRA SCdevicestate 1" (simulate device ready) even if json file recorded "sRA SCdevicestate 0" (device not ready)
100  if self.verbosity > 0:
101  print("SopasTestServer: request={}, response={}".format(received_telegram, response_payload))
102  self.sendTelegram(response_payload)
103  elif json_tcp_payload_idx >= 0 and json_tcp_payload_idx + 1 < len(self.json_tcp_payloads):
104  response_payload = self.json_tcp_payloads[json_tcp_payload_idx + 1]
105  if self.verbosity > 0:
106  print("SopasTestServer: request={}, response={}".format(received_telegram, response_payload))
107  self.sendTelegram(response_payload)
108  # Some NAV-350 sopas requests have 2 responses (1. response: acknowledge, 2. response: data), send next telegram in this case
109  if received_telegram[8:28] == b"sMN mNEVAChangeState" or received_telegram[8:26] == b"sMN mNMAPDoMapping":
110  if json_tcp_payload_idx + 2 < len(self.json_tcp_payloads):
111  response_payload = self.json_tcp_payloads[json_tcp_payload_idx + 2]
112  if self.verbosity > 0:
113  print("SopasTestServer: request={}, response={}".format(received_telegram, response_payload))
114  self.sendTelegram(response_payload)
115  elif received_telegram[8:28] == b"sMN mNLAYAddLandmark":
116  response_payload = b"\x02\x02\x02\x02\x00\x00\x00\x20\x73\x41\x4e\x20\x6d\x4e\x4c\x41\x59\x41\x64\x64\x4c\x61\x6e\x64\x6d\x61\x72\x6b\x20\x00\x00\x04\x00\x00\x00\x01\x00\x02\x00\x03\x7c" # "....... sAN mNLAYAddLandmark ............"
117  if self.verbosity > 0:
118  print("SopasTestServer: request={}, response={}".format(received_telegram, response_payload))
119  self.sendTelegram(response_payload)
120  elif received_telegram[8:33] == b"sRN SetActiveApplications":
121  response_payload = b"\x02\x02\x02\x02\x00\x00\x00\x19sAN SetActiveApplications\x1b"
122  if self.verbosity > 0:
123  print("SopasTestServer: request={}, response={}".format(received_telegram, response_payload))
124  self.sendTelegram(response_payload)
125  elif received_telegram[8:27] == b"sWN ScanLayerFilter":
126  response_payload = b"\x02\x02\x02\x02\x00\x00\x00\x14\x73\x57\x41\x20\x53\x63\x61\x6e\x4c\x61\x79\x65\x72\x46\x69\x6c\x74\x65\x72\x20\x39" # "........sWA ScanLayerFilter"
127  if self.verbosity > 0:
128  print("SopasTestServer: request={}, response={}".format(received_telegram, response_payload))
129  self.sendTelegram(response_payload)
130  elif received_telegram[8:24] == b"sWN FREchoFilter":
131  response_payload = b"\x02\x02\x02\x02\x00\x00\x00\x11\x73\x57\x41\x20\x46\x52\x45\x63\x68\x6f\x46\x69\x6c\x74\x65\x72\x20\x70" # "........sWA FREchoFilter"
132  if self.verbosity > 0:
133  print("SopasTestServer: request={}, response={}".format(received_telegram, response_payload))
134  self.sendTelegram(response_payload)
135  else:
136  print("## ERROR SopasTestServer: request={} not found in json file".format(received_telegram))
137  #response_payload = received_telegram
138  #if response_payload[8:11] == b"sWN":
139  # response_payload[8:11] = b"sWA"
140  #if response_payload[8:11] == b"sMN":
141  # response_payload[8:11] = b"sMA"
142  #print("## ERROR SopasTestServer: dummy_response={}".format(response_payload))
143  #self.sendTelegram(response_payload)
144  time.sleep(0.01)
145 
146  # Starts the message loop in a background thread
147  def start(self):
148  thread = threading.Thread(target=self.run, args=())
149  thread.daemon = True
150  thread.start()
151 
152  # Stops the message loop thread
153  def stop(self):
154  self.run_message_loop = False
155 
156 """
157  Run sopas test server using json input for sopas requests and responses
158 """
159 if __name__ == "__main__":
160 
161  # Configuration
162  tcp_port = 2111 # tcp port to listen for tcp connections
163  scandata_ids = [ "sSN LMDradardata", "sSN LMDscandata", "sSN InertialMeasurementUnit" ]
164  json_file = "../emulator/scandata/20221018_rms_1xxx_ascii_rawtarget_object.pcapng.json" # input jsonfile with sopas requests, responses and telegrams
165  verbosity = 2 # print all telegrams
166  send_rate = 10 # send 10 scandata telegrams per second
167  send_scandata_after = 5 # start to send scandata 5 seconds after server start
168 
169  # Overwrite with command line arguments
170  arg_parser = argparse.ArgumentParser()
171  arg_parser.add_argument("--tcp_port", help="tcp port to listen for tcp connections", default=tcp_port, type=int)
172  arg_parser.add_argument("--json_file", help="input jsonfile with sopas requests, responses and telegrams", default=json_file, type=str)
173  arg_parser.add_argument("--scandata_id", help="sopas id of scandata telegrams, e.g. \"sSN LMDradardata\"", default="", type=str)
174  arg_parser.add_argument("--send_rate", help="send rate in telegrams per second", default=send_rate, type=float)
175  arg_parser.add_argument("--verbosity", help="verbosity (0, 1 or 2)", default=verbosity, type=int)
176  arg_parser.add_argument("--scandata_after", help="start to send scandata after some seconds", default=send_scandata_after, type=float)
177  cli_args = arg_parser.parse_args()
178  tcp_port = cli_args.tcp_port
179  json_file = cli_args.json_file
180  if len(cli_args.scandata_id) > 0:
181  # scandata_ids = [ cli_args.scandata_id ]
182  scandata_ids = [ ]
183  for scandata_id in cli_args.scandata_id.split(","):
184  scandata_ids.append(scandata_id)
185  for n, scandata_id in enumerate(scandata_ids):
186  scandata_ids[n] = scandata_ids[n].replace("?", " ")
187  scandata_ids[n] = scandata_ids[n].replace("\"", "")
188  print("sopas_json_test_server: scandata_ids = {}".format(scandata_ids))
189  verbosity = cli_args.verbosity
190  send_rate = cli_args.send_rate
191  send_scandata_after = cli_args.scandata_after
192 
193  # Parse json file
194  print("sopas_json_test_server: parsing json file \"{}\":".format(json_file))
195  with open(json_file, 'r') as file_stream:
196  json_input = json.load(file_stream)
197  json_tcp_payloads = [] # list of bytearray of the tcp payload
198  for json_entry in json_input:
199  try:
200  tcp_description = json_entry["_source"]["layers"]["tcp"]["tcp.description"]
201  tcp_payload_json = json_entry["_source"]["layers"]["tcp"]["tcp.payload"]
202  tcp_payload_hex_str = "".join(tcp_payload_json.split(":"))
203  tcp_payload = bytearray.fromhex(tcp_payload_hex_str)
204  json_tcp_payloads.append(tcp_payload)
205  # print("tcp_description: \"{}\", tcp_payload: \"{}\", payload_bytes: {}".format(tcp_description, tcp_payload, payload))
206  except Exception as exc:
207  print("## ERROR parsing file {}: \"{}\", exception {}".format(json_file, json_entry, exc))
208  if verbosity > 1:
209  for json_tcp_payload in json_tcp_payloads:
210  print("{}".format(json_tcp_payload))
211 
212  # Run sopas test server
213  print("sopas_json_test_server: running event loop ...")
214  server = SopasTestServer(tcp_port, json_tcp_payloads, verbosity)
215  server.connect()
216  server.start()
217 
218  # Send sopas telegrams, e.g. "sSN LMDradardata ..." or "sSN LMDscandata ..."
219  time.sleep(send_scandata_after)
220  print("sopas_json_test_server: start sending telegrams {} ...".format(" , ".join(scandata_ids)))
221  for n in range(len(scandata_ids)):
222  scandata_ids[n] = bytearray(scandata_ids[n].encode())
223  while True:
224  for payload in json_tcp_payloads:
225  for scandata_id in scandata_ids:
226  if payload.find(scandata_id,0) >= 0:
227  # print("sopas_json_test_server: sending scandata \"{}\" ...".format(payload))
228  server.sendTelegram(payload)
229  time.sleep(1.0 / send_rate)
230 
231  server.stop()
sopas_json_test_server.SopasTestServer.start
def start(self)
Definition: sopas_json_test_server.py:147
sopas_json_test_server.SopasTestServer.run_message_loop
run_message_loop
Definition: sopas_json_test_server.py:33
roswrap::console::print
ROSCONSOLE_DECL void print(FilterBase *filter, void *logger, Level level, const char *file, int line, const char *function, const char *fmt,...) ROSCONSOLE_PRINTF_ATTRIBUTE(7
Don't call this directly. Use the ROS_LOG() macro instead.
sopas_json_test_server.SopasTestServer.__init__
def __init__(self, tcp_port=2112, json_tcp_payloads=[], verbosity=0)
Definition: sopas_json_test_server.py:29
sopas_json_test_server.SopasTestServer.verbosity
verbosity
Definition: sopas_json_test_server.py:32
sopas_json_test_server.SopasTestServer
Definition: sopas_json_test_server.py:26
sopas_json_test_server.SopasTestServer.tcp_port
tcp_port
Definition: sopas_json_test_server.py:30
sopas_json_test_server.SopasTestServer.stop
def stop(self)
Definition: sopas_json_test_server.py:153
sopas_json_test_server.SopasTestServer.connect
def connect(self)
Definition: sopas_json_test_server.py:36
sopas_json_test_server.SopasTestServer.serversocket
serversocket
Definition: sopas_json_test_server.py:37
sopas_json_test_server.SopasTestServer.run
def run(self)
Definition: sopas_json_test_server.py:90
sopas_json_test_server.SopasTestServer.clientsocket
clientsocket
Definition: sopas_json_test_server.py:43
sopas_json_test_server.SopasTestServer.receiveTelegram
def receiveTelegram(self, recv_timeout_sec)
Definition: sopas_json_test_server.py:50
sopas_json_test_server.SopasTestServer.sendTelegram
def sendTelegram(self, telegram)
Definition: sopas_json_test_server.py:82
sopas_json_test_server.SopasTestServer.json_tcp_payloads
json_tcp_payloads
Definition: sopas_json_test_server.py:31
sopas_json_test_server.SopasTestServer.clientaddress
clientaddress
Definition: sopas_json_test_server.py:44


sick_scan_xd
Author(s): Michael Lehning , Jochen Sprickerhof , Martin Günther
autogenerated on Fri Oct 25 2024 02:47:12