2 A simple sopas test server. A listening tcp socket is opened, incoming connections are accepted and some basic cola telegrams are responded on client requests.
3 Note: This is just a simple test server for basic unittests of sick_scansegment_xd cola commands. It does not emulate any device.
6 python sopas_test_server.py --tcp_port=<int> --cola_binary=<int>
9 python ../test/python/sopas_test_server.py --tcp_port=2111 --cola_binary=0
22 if len(payload) > 7
and payload.startswith(b
"\x02\x02\x02\x02"):
23 length = (payload[4] << 24) + (payload[5] << 16) + (payload[6] << 8) + (payload[7] << 0)
28 return bytearray(message.encode())
32 message_hex = message.replace(
":",
"")
33 return bytearray.fromhex(message_hex)
36 ColaResponseMap maps a cola request like "sMN SetAccessMode" to a binary or ascii cola response.
41 def __init__(self, cola_binary = 0, val_FREchoFilter = 0):
44 "sMN SetAccessMode":
"02:02:02:02:00:00:00:13:73:41:4e:20:53:65:74:41:63:63:65:73:73:4d:6f:64:65:20:01:38" ,
45 "sWN EIHstCola":
"02:02:02:02:00:00:00:0e:73:57:41:20:45:49:48:73:74:43:6f:6c:61:20:07",
46 "sRN FirmwareVersion":
"02:02:02:02:00:00:00:1f:73:52:41:20:46:69:72:6d:77:61:72:65:56:65:72:73:69:6f:6e:20:00:09:56:31:2e:38:30:20:20:20:20:43",
47 "sRN SCdevicestate":
"02:02:02:02:00:00:00:13:73:52:41:20:53:43:64:65:76:69:63:65:73:74:61:74:65:20:00:1f",
51 "sRN SCdevicestate":
"\x02sRA SCdevicestate 1\x03",
52 "sMN IsSystemReady":
"\x02sAN IsSystemReady 1\x03",
53 "sMN SetAccessMode":
"\x02sAN SetAccessMode 1\x03",
54 "sMN Run":
"\x02sAN Run 1\x03",
55 "sMN LMCstartmeas":
"\x02sAN LMCstartmeas\x03",
56 "sWN ScanDataEnable":
"\x02sWA ScanDataEnable\x03",
57 "sWN ScanDataFormatSettings":
"\x02sWA ScanDataFormatSettings\x03",
58 "sWN ScanDataPreformattingSettings":
"\x02sWA ScanDataPreformattingSettings\x03",
59 "sWN ScanDataFormat":
"\x02sWA ScanDataFormat\x03",
60 "sWN ScanDataPreformatting":
"\x02sWA ScanDataPreformatting\x03",
61 "sWN ScanDataEthSettings":
"\x02sWA ScanDataEthSettings\x03",
62 "sRN FREchoFilter":
"\x02sRA FREchoFilter {}\x03".format(val_FREchoFilter),
63 "sRN LFPangleRangeFilter":
"\x02sRA LFPangleRangeFilter 0 C0490FF9 40490FF9 BFC90FF9 3FC90FF9 1\x03",
64 "sRN LFPlayerFilter":
"\x02sRA LFPlayerFilter 0 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1\x03",
65 "sWN FREchoFilter":
"\x02sWA FREchoFilter\x03",
66 "sWN LFPangleRangeFilter":
"\x02sWA LFPangleRangeFilter\x03",
67 "sWN LFPlayerFilter":
"\x02sWA LFPlayerFilter\x03",
68 "sWN LFPintervalFilter":
"\x02sWA LFPintervalFilter\x03",
69 "sRN ContaminationResult":
"\x02sRA ContaminationResult 0 0\x03",
70 "sEN InertialMeasurementUnit":
"\x02sEA InertialMeasurementUnit\x03",
71 "sWN ImuDataEnable":
"\x02sWA ImuDataEnable 1\x03",
72 "sWN ImuDataEthSettings":
"\x02sWA ImuDataEthSettings\x03",
86 SopasTestServer connects to a tcp client, receives cola telegrams and sends a response to the client.
91 def __init__(self, tcp_port = 2111, cola_binary = 0, val_FREchoFilter = 0):
98 self.
serversocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
99 self.
serversocket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
101 print(
"SopasTestServer: listening on tcp port {}".format(self.
tcp_port))
103 (clientsocket, clientaddress) = self.
serversocket.accept()
111 ready_to_recv = select.select([self.
clientsocket], [], [], recv_timeout_sec)
112 if not ready_to_recv[0]:
114 payload = bytearray(b
"")
118 if payload_length <= 1:
119 print(
"## ERROR SopasTestServer.receiveTelegram(): unexpected binary payload_length {} in received header {}".format(payload_length, header))
121 while len(payload) < payload_length:
122 chunk = self.
clientsocket.recv(payload_length - len(payload))
123 payload = payload + chunk
129 while header != b
"\x02":
133 if byte_recv == b
"\x03":
135 payload = payload + byte_recv
136 print(
"SopasTestServer.receiveTelegram(): received {} byte telegram {}".format((len(header) + len(payload) + 1), payload))
142 print(
"SopasTestServer.sendTelegram(): sending {} byte telegram {}".format((len(telegram)), telegram))
144 print(
"SopasTestServer.sendTelegram(): sending {} byte telegram".format(len(telegram)))
150 print(
"SopasTestServer: running main loop...")
154 if len(received_telegram) <= 0:
157 request_key, response_str = response_map.findResponse(received_telegram)
158 if len(request_key) > 0
and len(response_str) > 0:
159 print(
"SopasTestServer: received {}: {}".format(request_key, received_telegram))
164 elif len(received_telegram) > 0:
165 print(
"## ERROR SopasTestServer: received unsupported telegram {}".format(received_telegram))
169 if __name__ ==
"__main__":
177 arg_parser = argparse.ArgumentParser()
178 arg_parser.add_argument(
"--tcp_port", help=
"tcp port to listen for tcp connections", default=tcp_port, type=int)
179 arg_parser.add_argument(
"--cola_binary", help=
"cola ascii (0) or binary (1)", default=cola_binary, type=int)
180 arg_parser.add_argument(
"--FREchoFilter", help=
"FREchoFilter, 0 (first echo, default), 1 (all echos) or 2 (last echo)", default=val_FREchoFilter, type=int)
181 cli_args = arg_parser.parse_args()
182 tcp_port = cli_args.tcp_port
183 cola_binary = cli_args.cola_binary
184 val_FREchoFilter = cli_args.FREchoFilter