multiscan_sopas_test_server.py
Go to the documentation of this file.
1 """
2  A simple sopas test server. A listening tcp socket is opened, incoming connections are accepted and some basic cola telegrams are responded on client requests.
3  Note: This is just a simple test server for basic unittests of sick_scansegment_xd cola commands. It does not emulate any device.
4 
5  Usage:
6  python sopas_test_server.py --tcp_port=<int> --cola_binary=<int>
7 
8  Example:
9  python ../test/python/sopas_test_server.py --tcp_port=2111 --cola_binary=0
10 
11 """
12 
13 import argparse
14 import datetime
15 import select
16 import socket
17 import time
18 
19 # Decodes and returns the payload length of a cola message, i.e. returns message_payload_length in a message := { 4 byte STX 0x02020202 } + { 4 byte message_payload_length } + { message_payload } + { 1 byte CRC }
21  length = 0
22  if len(payload) > 7 and payload.startswith(b"\x02\x02\x02\x02"):
23  length = (payload[4] << 24) + (payload[5] << 16) + (payload[6] << 8) + (payload[7] << 0)
24  return length
25 
26 # Returns a string as bytearray
27 def stringToByteArray(message):
28  return bytearray(message.encode())
29 
30 # Removes optional ":" from message and returns hex values as bytearray
31 def hexStringToByteArray(message):
32  message_hex = message.replace(":", "")
33  return bytearray.fromhex(message_hex)
34 
35 """
36  ColaResponseMap maps a cola request like "sMN SetAccessMode" to a binary or ascii cola response.
37 """
39 
40  # Constructor
41  def __init__(self, cola_binary = 0, val_FREchoFilter = 0):
42  if cola_binary > 0:
43  self.mapped_response = {
44  "sMN SetAccessMode": "02:02:02:02:00:00:00:13:73:41:4e:20:53:65:74:41:63:63:65:73:73:4d:6f:64:65:20:01:38" ,
45  "sWN EIHstCola": "02:02:02:02:00:00:00:0e:73:57:41:20:45:49:48:73:74:43:6f:6c:61:20:07",
46  "sRN FirmwareVersion": "02:02:02:02:00:00:00:1f:73:52:41:20:46:69:72:6d:77:61:72:65:56:65:72:73:69:6f:6e:20:00:09:56:31:2e:38:30:20:20:20:20:43",
47  "sRN SCdevicestate": "02:02:02:02:00:00:00:13:73:52:41:20:53:43:64:65:76:69:63:65:73:74:61:74:65:20:00:1f",
48  }
49  else:
50  self.mapped_response = {
51  "sRN SCdevicestate": "\x02sRA SCdevicestate 1\x03", # "sRN SCdevicestate" -> "sRA SCdevicestate 1"
52  "sMN IsSystemReady": "\x02sAN IsSystemReady 1\x03", # "sMN IsSystemReady" -> "sAN IsSystemReady 1"
53  "sMN SetAccessMode": "\x02sAN SetAccessMode 1\x03", # "sMN SetAccessMode 3 F4724744" -> "sAN SetAccessMode 1"
54  "sMN Run": "\x02sAN Run 1\x03", # "sMN Run" -> "sAN Run 1"
55  "sMN LMCstartmeas": "\x02sAN LMCstartmeas\x03", # "sMN LMCstartmeas" -> "sAN LMCstartmeas"
56  "sWN ScanDataEnable": "\x02sWA ScanDataEnable\x03", # "sWN ScanDataEnable 1" -> "sWA ScanDataEnable"
57  "sWN ScanDataFormatSettings": "\x02sWA ScanDataFormatSettings\x03", # "sWN ScanDataFormatSettings 1" -> "sWA ScanDataFormatSettings"
58  "sWN ScanDataPreformattingSettings": "\x02sWA ScanDataPreformattingSettings\x03", # "sWN ScanDataPreformattingSettings 1" -> "sWA ScanDataPreformattingSettings"
59  "sWN ScanDataFormat": "\x02sWA ScanDataFormat\x03", # "sWN ScanDataFormat 1" -> "sWA ScanDataFormat"
60  "sWN ScanDataPreformatting": "\x02sWA ScanDataPreformatting\x03", # "sWN ScanDataPreformatting 1" -> "sWA ScanDataPreformatting"
61  "sWN ScanDataEthSettings": "\x02sWA ScanDataEthSettings\x03", # "sWN ScanDataEthSettings 1 +127 +0 +0 +1 +2115" -> "sWA ScanDataEthSettings"
62  "sRN FREchoFilter": "\x02sRA FREchoFilter {}\x03".format(val_FREchoFilter), # "sRN FREchoFilter" -> "sRA FREchoFilter 0" (default: 0, i.e. first echo only, echo_count = 1)
63  "sRN LFPangleRangeFilter": "\x02sRA LFPangleRangeFilter 0 C0490FF9 40490FF9 BFC90FF9 3FC90FF9 1\x03", # "sRN LFPangleRangeFilter" -> "sRA LFPangleRangeFilter 0 C0490FF9 40490FF9 BFC90FF9 3FC90FF9 1"
64  "sRN LFPlayerFilter": "\x02sRA LFPlayerFilter 0 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1\x03", # "sRN LFPlayerFilter" -> "sRA LFPlayerFilter 0 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1"
65  "sWN FREchoFilter": "\x02sWA FREchoFilter\x03", # "sWN FREchoFilter 1" -> "sWA FREchoFilter"
66  "sWN LFPangleRangeFilter": "\x02sWA LFPangleRangeFilter\x03", # "sWN LFPangleRangeFilter 0 C0490FF9 40490FF9 BFC90FF9 3FC90FF9 1" -> "sWA LFPangleRangeFilter"
67  "sWN LFPlayerFilter": "\x02sWA LFPlayerFilter\x03", # "sWN LFPlayerFilter 0 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1" -> "sWA LFPlayerFilter"
68  "sWN LFPintervalFilter": "\x02sWA LFPintervalFilter\x03", # "sWN LFPintervalFilter 0 1" -> "sWA LFPintervalFilter"
69  "sRN ContaminationResult": "\x02sRA ContaminationResult 0 0\x03", # "sRN ContaminationResult" -> "sRA ContaminationResult 0 0"
70  "sEN InertialMeasurementUnit": "\x02sEA InertialMeasurementUnit\x03", # "sEN InertialMeasurementUnit" -> "sEA InertialMeasurementUnit"
71  "sWN ImuDataEnable": "\x02sWA ImuDataEnable 1\x03", # "sWN ImuDataEnable" -> "sWA ImuDataEnable"
72  "sWN ImuDataEthSettings": "\x02sWA ImuDataEthSettings\x03", # "sWN ImuDataEthSettings" -> "sWA ImuDataEthSettings"
73  # Simulate picoScan150 w/o addons (no IMU available): "sWN ImuDataEthSettings" -> "sFA 3" (unknown sopas index, no IMU or IMU license error)
74  # "sWN ImuDataEnable": "\x02sFA 3\x03", # "sWN ImuDataEnable" -> "sFA 3"
75  # "sWN ImuDataEthSettings": "\x02sFA 3\x03", # "sWN ImuDataEthSettings" -> "sFA 3"
76  }
77 
78  # Search for a mapped response given a cola request and returns key and response as strings
79  def findResponse(self, request):
80  for key, response in self.mapped_response.items():
81  if request.find(stringToByteArray(key)) >= 0:
82  return key, response
83  return "", ""
84 
85 """
86  SopasTestServer connects to a tcp client, receives cola telegrams and sends a response to the client.
87 """
89 
90  # Constructor
91  def __init__(self, tcp_port = 2111, cola_binary = 0, val_FREchoFilter = 0):
92  self.tcp_port = tcp_port
93  self.cola_binary = cola_binary
94  self.val_FREchoFilter = val_FREchoFilter
95 
96  # Waits for an incoming tcp connection and connects to the tcp client
97  def connect(self):
98  self.serversocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
99  self.serversocket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
100  self.serversocket.bind(("", self.tcp_port))
101  print("SopasTestServer: listening on tcp port {}".format(self.tcp_port))
102  self.serversocket.listen(1)
103  (clientsocket, clientaddress) = self.serversocket.accept()
104  self.clientsocket = clientsocket
105  self.clientaddress = clientaddress
106  self.clientsocket.setblocking(0)
107  print("SopasTestServer: tcp connection to {} established".format(self.clientaddress))
108 
109  # Receives a cola telegram and returns its payload (i.e. the telegram without header and CRC)
110  def receiveTelegram(self, recv_timeout_sec):
111  ready_to_recv = select.select([self.clientsocket], [], [], recv_timeout_sec)
112  if not ready_to_recv[0]:
113  return ""
114  payload = bytearray(b"")
115  if self.cola_binary > 0:
116  header = self.clientsocket.recv(8)
117  payload_length = parseBinaryColaPayloadLength(header) # message := { 4 byte STX 0x02020202 } + { 4 byte message_payload_length } + { message_payload } + { 1 byte CRC }
118  if payload_length <= 1:
119  print("## ERROR SopasTestServer.receiveTelegram(): unexpected binary payload_length {} in received header {}".format(payload_length, header))
120  return header
121  while len(payload) < payload_length:
122  chunk = self.clientsocket.recv(payload_length - len(payload))
123  payload = payload + chunk
124  crc = bytearray(b"")
125  while len(crc) < 1:
126  crc = self.clientsocket.recv(1)
127  else:
128  header = b"\x00"
129  while header != b"\x02":
130  header = self.clientsocket.recv(1)
131  while True:
132  byte_recv = self.clientsocket.recv(1)
133  if byte_recv == b"\x03":
134  break
135  payload = payload + byte_recv
136  print("SopasTestServer.receiveTelegram(): received {} byte telegram {}".format((len(header) + len(payload) + 1), payload))
137  return payload
138 
139  # Sends a cola telegram "as is"
140  def sendTelegram(self, telegram, verbosity):
141  if verbosity > 1:
142  print("SopasTestServer.sendTelegram(): sending {} byte telegram {}".format((len(telegram)), telegram))
143  elif verbosity > 0:
144  print("SopasTestServer.sendTelegram(): sending {} byte telegram".format(len(telegram)))
145  self.clientsocket.send(telegram.encode("utf-8"))
146 
147  # Runs the message loop, i.e. receives binary cola telegrams and sends a response to the client
148  def run(self):
149  response_map = ColaResponseMap(self.cola_binary, self.val_FREchoFilter)
150  print("SopasTestServer: running main loop...")
151  while True:
152  # Receive a cola telegram
153  received_telegram = self.receiveTelegram(1)
154  if len(received_telegram) <= 0: # timeout (no message rececived)
155  continue
156  # Lookup for keywords and send a response from predefined (mapped) responses
157  request_key, response_str = response_map.findResponse(received_telegram)
158  if len(request_key) > 0 and len(response_str) > 0: # request and response found in map, send response to client
159  print("SopasTestServer: received {}: {}".format(request_key, received_telegram))
160  if self.cola_binary > 0:
161  self.sendTelegram(hexStringToByteArray(response_str), 2)
162  else:
163  self.sendTelegram(response_str, 2)
164  elif len(received_telegram) > 0: # request not found in map
165  print("## ERROR SopasTestServer: received unsupported telegram {}".format(received_telegram))
166 
167  time.sleep(0.01)
168 
169 if __name__ == "__main__":
170 
171  # Configuration
172  tcp_port = 2111 # tcp port to listen for tcp connections
173  cola_binary = 0 # cola ascii (0) or binary (1)
174  val_FREchoFilter = 0 # default configuration: FREchoFilter=0
175 
176  # Overwrite with command line arguments
177  arg_parser = argparse.ArgumentParser()
178  arg_parser.add_argument("--tcp_port", help="tcp port to listen for tcp connections", default=tcp_port, type=int)
179  arg_parser.add_argument("--cola_binary", help="cola ascii (0) or binary (1)", default=cola_binary, type=int)
180  arg_parser.add_argument("--FREchoFilter", help="FREchoFilter, 0 (first echo, default), 1 (all echos) or 2 (last echo)", default=val_FREchoFilter, type=int)
181  cli_args = arg_parser.parse_args()
182  tcp_port = cli_args.tcp_port
183  cola_binary = cli_args.cola_binary
184  val_FREchoFilter = cli_args.FREchoFilter
185 
186  # Run test server
187  server = SopasTestServer(tcp_port, cola_binary, val_FREchoFilter)
188  server.connect()
189  server.run()
190 
multiscan_sopas_test_server.hexStringToByteArray
def hexStringToByteArray(message)
Definition: multiscan_sopas_test_server.py:31
multiscan_sopas_test_server.SopasTestServer.serversocket
serversocket
Definition: multiscan_sopas_test_server.py:98
multiscan_sopas_test_server.SopasTestServer.cola_binary
cola_binary
Definition: multiscan_sopas_test_server.py:93
multiscan_sopas_test_server.SopasTestServer.clientsocket
clientsocket
Definition: multiscan_sopas_test_server.py:104
multiscan_sopas_test_server.SopasTestServer.__init__
def __init__(self, tcp_port=2111, cola_binary=0, val_FREchoFilter=0)
Definition: multiscan_sopas_test_server.py:91
multiscan_sopas_test_server.SopasTestServer.connect
def connect(self)
Definition: multiscan_sopas_test_server.py:97
roswrap::console::print
ROSCONSOLE_DECL void print(FilterBase *filter, void *logger, Level level, const char *file, int line, const char *function, const char *fmt,...) ROSCONSOLE_PRINTF_ATTRIBUTE(7
Don't call this directly. Use the ROS_LOG() macro instead.
multiscan_sopas_test_server.SopasTestServer.sendTelegram
def sendTelegram(self, telegram, verbosity)
Definition: multiscan_sopas_test_server.py:140
multiscan_sopas_test_server.parseBinaryColaPayloadLength
def parseBinaryColaPayloadLength(payload)
Definition: multiscan_sopas_test_server.py:20
multiscan_sopas_test_server.SopasTestServer.run
def run(self)
Definition: multiscan_sopas_test_server.py:148
multiscan_sopas_test_server.ColaResponseMap.findResponse
def findResponse(self, request)
Definition: multiscan_sopas_test_server.py:79
multiscan_sopas_test_server.ColaResponseMap.__init__
def __init__(self, cola_binary=0, val_FREchoFilter=0)
Definition: multiscan_sopas_test_server.py:41
multiscan_sopas_test_server.SopasTestServer.val_FREchoFilter
val_FREchoFilter
Definition: multiscan_sopas_test_server.py:94
multiscan_sopas_test_server.stringToByteArray
def stringToByteArray(message)
Definition: multiscan_sopas_test_server.py:27
multiscan_sopas_test_server.SopasTestServer
Definition: multiscan_sopas_test_server.py:88
multiscan_sopas_test_server.SopasTestServer.receiveTelegram
def receiveTelegram(self, recv_timeout_sec)
Definition: multiscan_sopas_test_server.py:110
multiscan_sopas_test_server.SopasTestServer.tcp_port
tcp_port
Definition: multiscan_sopas_test_server.py:92
multiscan_sopas_test_server.SopasTestServer.clientaddress
clientaddress
Definition: multiscan_sopas_test_server.py:105
multiscan_sopas_test_server.ColaResponseMap
Definition: multiscan_sopas_test_server.py:38
multiscan_sopas_test_server.ColaResponseMap.mapped_response
mapped_response
Definition: multiscan_sopas_test_server.py:43


sick_scan_xd
Author(s): Michael Lehning , Jochen Sprickerhof , Martin Günther
autogenerated on Fri Oct 25 2024 02:47:09