2 A simple sopas test server using json input. A listening tcp socket is opened, incoming connections are accepted and cola telegrams are responded on client requests.
3 After 10 seconds, datagrams are sent continously.
4 All data (sopas responses and telegrams) are read from json file, which can be created from pcapng-file by pcap_json_converter.py
5 Note: This is just a simple test server for sick_scan_xd unittests. It does not emulate any device.
8 python sopas_json_test_server.py --tcp_port=<int> --json_file=<filepath>
11 python ../test/python/sopas_test_server.py --tcp_port=2111 --json_file=../emulator/scandata/20221018_rms_1xxx_ascii_rawtarget_object.pcapng.json
24 SopasTestServer connects to a tcp client, receives cola telegrams and sends a response to the client.
29 def __init__(self, tcp_port = 2112, json_tcp_payloads = [], verbosity = 0):
37 self.
serversocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
38 self.
serversocket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
40 print(
"SopasTestServer: listening on tcp port {}".format(self.
tcp_port))
42 (clientsocket, clientaddress) = self.
serversocket.accept()
51 payload = bytearray(b
"")
54 ready_to_recv = select.select([self.
clientsocket], [], [], recv_timeout_sec)
58 while byte_recv != b
"\x02":
60 payload = payload + byte_recv
63 payload = payload + byte_recv
64 if len(payload) == 8
and payload[0:4] == b
"\x02\x02\x02\x02":
65 payload_len = int.from_bytes(payload[4:8], byteorder=
'big', signed=
False)
70 if payload_len > 0
and len(payload) >= payload_len + 9:
72 except Exception
as exc:
73 print(
"## ERROR SopasTestServer.receiveTelegram(): exception {}".format(exc))
74 print(
"## ERROR SopasTestServer.receiveTelegram(): received {} byte telegram {}".format(len(payload), payload))
76 print(
"SopasTestServer.receiveTelegram(): received {} byte telegram {}".format(len(payload), payload))
78 print(
"SopasTestServer.receiveTelegram(): received {} byte telegram".format(len(payload)))
79 return payload, payload_idx
84 print(
"SopasTestServer.sendTelegram(): sending {} byte telegram {}".format((len(telegram)), telegram))
86 print(
"SopasTestServer.sendTelegram(): sending {} byte telegram".format(len(telegram)))
91 print(
"SopasTestServer: running event loop...")
95 if len(received_telegram) <= 0:
98 if received_telegram[8:25] == b
"sRN SCdevicestate":
99 response_payload = b
"\x02\x02\x02\x02\x00\x00\x00\x13\x73\x52\x41\x20\x53\x43\x64\x65\x76\x69\x63\x65\x73\x74\x61\x74\x65\x20\x01\x1e"
101 print(
"SopasTestServer: request={}, response={}".format(received_telegram, response_payload))
103 elif json_tcp_payload_idx >= 0
and json_tcp_payload_idx + 1 < len(self.
json_tcp_payloads):
106 print(
"SopasTestServer: request={}, response={}".format(received_telegram, response_payload))
109 if received_telegram[8:28] == b
"sMN mNEVAChangeState" or received_telegram[8:26] == b
"sMN mNMAPDoMapping":
113 print(
"SopasTestServer: request={}, response={}".format(received_telegram, response_payload))
115 elif received_telegram[8:28] == b
"sMN mNLAYAddLandmark":
116 response_payload = b
"\x02\x02\x02\x02\x00\x00\x00\x20\x73\x41\x4e\x20\x6d\x4e\x4c\x41\x59\x41\x64\x64\x4c\x61\x6e\x64\x6d\x61\x72\x6b\x20\x00\x00\x04\x00\x00\x00\x01\x00\x02\x00\x03\x7c"
118 print(
"SopasTestServer: request={}, response={}".format(received_telegram, response_payload))
120 elif received_telegram[8:33] == b
"sRN SetActiveApplications":
121 response_payload = b
"\x02\x02\x02\x02\x00\x00\x00\x19sAN SetActiveApplications\x1b"
123 print(
"SopasTestServer: request={}, response={}".format(received_telegram, response_payload))
125 elif received_telegram[8:27] == b
"sWN ScanLayerFilter":
126 response_payload = b
"\x02\x02\x02\x02\x00\x00\x00\x14\x73\x57\x41\x20\x53\x63\x61\x6e\x4c\x61\x79\x65\x72\x46\x69\x6c\x74\x65\x72\x20\x39"
128 print(
"SopasTestServer: request={}, response={}".format(received_telegram, response_payload))
130 elif received_telegram[8:24] == b
"sWN FREchoFilter":
131 response_payload = b
"\x02\x02\x02\x02\x00\x00\x00\x11\x73\x57\x41\x20\x46\x52\x45\x63\x68\x6f\x46\x69\x6c\x74\x65\x72\x20\x70"
133 print(
"SopasTestServer: request={}, response={}".format(received_telegram, response_payload))
136 print(
"## ERROR SopasTestServer: request={} not found in json file".format(received_telegram))
148 thread = threading.Thread(target=self.
run, args=())
157 Run sopas test server using json input for sopas requests and responses
159 if __name__ ==
"__main__":
163 scandata_ids = [
"sSN LMDradardata",
"sSN LMDscandata",
"sSN InertialMeasurementUnit" ]
164 json_file =
"../emulator/scandata/20221018_rms_1xxx_ascii_rawtarget_object.pcapng.json"
167 send_scandata_after = 5
170 arg_parser = argparse.ArgumentParser()
171 arg_parser.add_argument(
"--tcp_port", help=
"tcp port to listen for tcp connections", default=tcp_port, type=int)
172 arg_parser.add_argument(
"--json_file", help=
"input jsonfile with sopas requests, responses and telegrams", default=json_file, type=str)
173 arg_parser.add_argument(
"--scandata_id", help=
"sopas id of scandata telegrams, e.g. \"sSN LMDradardata\"", default=
"", type=str)
174 arg_parser.add_argument(
"--send_rate", help=
"send rate in telegrams per second", default=send_rate, type=float)
175 arg_parser.add_argument(
"--verbosity", help=
"verbosity (0, 1 or 2)", default=verbosity, type=int)
176 arg_parser.add_argument(
"--scandata_after", help=
"start to send scandata after some seconds", default=send_scandata_after, type=float)
177 cli_args = arg_parser.parse_args()
178 tcp_port = cli_args.tcp_port
179 json_file = cli_args.json_file
180 if len(cli_args.scandata_id) > 0:
183 for scandata_id
in cli_args.scandata_id.split(
","):
184 scandata_ids.append(scandata_id)
185 for n, scandata_id
in enumerate(scandata_ids):
186 scandata_ids[n] = scandata_ids[n].replace(
"?",
" ")
187 scandata_ids[n] = scandata_ids[n].replace(
"\"",
"")
188 print(
"sopas_json_test_server: scandata_ids = {}".format(scandata_ids))
189 verbosity = cli_args.verbosity
190 send_rate = cli_args.send_rate
191 send_scandata_after = cli_args.scandata_after
194 print(
"sopas_json_test_server: parsing json file \"{}\":".format(json_file))
195 with open(json_file,
'r')
as file_stream:
196 json_input = json.load(file_stream)
197 json_tcp_payloads = []
198 for json_entry
in json_input:
200 tcp_description = json_entry[
"_source"][
"layers"][
"tcp"][
"tcp.description"]
201 tcp_payload_json = json_entry[
"_source"][
"layers"][
"tcp"][
"tcp.payload"]
202 tcp_payload_hex_str =
"".join(tcp_payload_json.split(
":"))
203 tcp_payload = bytearray.fromhex(tcp_payload_hex_str)
204 json_tcp_payloads.append(tcp_payload)
206 except Exception
as exc:
207 print(
"## ERROR parsing file {}: \"{}\", exception {}".format(json_file, json_entry, exc))
209 for json_tcp_payload
in json_tcp_payloads:
210 print(
"{}".format(json_tcp_payload))
213 print(
"sopas_json_test_server: running event loop ...")
219 time.sleep(send_scandata_after)
220 print(
"sopas_json_test_server: start sending telegrams {} ...".format(
" , ".join(scandata_ids)))
221 for n
in range(len(scandata_ids)):
222 scandata_ids[n] = bytearray(scandata_ids[n].encode())
224 for payload
in json_tcp_payloads:
225 for scandata_id
in scandata_ids:
226 if payload.find(scandata_id,0) >= 0:
228 server.sendTelegram(payload)
229 time.sleep(1.0 / send_rate)