2 multiScan test emulator to parse pcapng files recorded from multiScan and replay the UDP packages
3 to emulate a local multiScan lidar.
5 The UDP Sender sends packets via UDP over localhost, Port:2115
10 pip install pypcapfile
11 pip install python-pcapng
21 from pcapng
import FileScanner
22 from pcapng.blocks
import EnhancedPacket
26 from scapy.layers.l2
import Ether
30 timestamp_end = time.perf_counter() + seconds
31 while time.perf_counter() < timestamp_end:
36 stx_index = payload.find(b
'\x02\x02\x02\x02')
38 payload = payload[stx_index:]
43 def __init__(self, src_ip = [], dst_ip = [], port = [], proto = []):
51 def __init__(self, rawblock = None, timestamp = 0.0):
58 if rawblock
is not None:
60 self.
dst_ip = rawblock.underlayer.dst
61 self.
src_ip = rawblock.underlayer.src
62 self.
proto = rawblock.name
63 if "dport" in rawblock.underlayer.payload.fields:
64 self.
dst_port = rawblock.underlayer.payload.fields[
"dport"]
65 if rawblock.name ==
"IP":
69 payload_start_hex_str =
"".join(
"\\x{:02x}".format(payload_byte)
for payload_byte
in self.
payload[:4])
70 return "{} byte payload {}..., timestamp={}, src_ip={}, dst_ip={}, port={}".format(len(self.
payload), payload_start_hex_str, self.
timestamp, self.
src_ip, self.
dst_ip, self.
dst_port)
73 pcap_decoded_blocks = []
74 payload_length_accumulated_since_stx = 0
75 with open(pcap_filename,
'rb')
as pcap_file:
76 pcap_scanner = FileScanner(pcap_file)
77 for block_cnt, block
in enumerate(pcap_scanner):
78 if isinstance(block, EnhancedPacket):
80 if block.captured_len != block.packet_len:
81 print(
"## multiscan_pcap_player block {}: {} byte block truncated to {} bytes".format(block_cnt, block.packet_len, block.captured_len))
82 block_data = Ether(block.packet_data)
83 block_decoded = block_data
85 if isinstance(block_decoded.payload, scapy.packet.Raw):
87 elif isinstance(block_decoded.payload, scapy.packet.Packet):
88 block_decoded = block_decoded.payload
100 if isinstance(block_decoded.payload, scapy.packet.Raw)
and len(block_decoded.payload) > 0:
102 if len(pcap_filter.src_ip) > 0
and pcap_decoded_block.src_ip
not in pcap_filter.src_ip:
104 if len(pcap_filter.dst_ip) > 0
and pcap_decoded_block.dst_ip
not in pcap_filter.dst_ip:
106 if pcap_decoded_block.dst_port > 0
and len(pcap_filter.port) > 0
and pcap_decoded_block.dst_port
not in pcap_filter.port:
108 if len(pcap_filter.proto) > 0
and pcap_decoded_block.proto
not in pcap_filter.proto:
110 if pcap_decoded_block.payload.find(b
'\x02\x02\x02\x02') >= 0:
111 payload_length_accumulated_since_stx = 0
112 payload_length_accumulated_since_stx = payload_length_accumulated_since_stx + len(pcap_decoded_block.payload)
114 print(
"pcap block {}: {}, {} byte since stx".format(block_cnt, pcap_decoded_block.print(), payload_length_accumulated_since_stx))
115 if len(pcap_decoded_block.payload) < 64000:
117 pcap_decoded_blocks.append(pcap_decoded_block)
118 return pcap_decoded_blocks
122 with open(json_filename,
"r")
as file_stream:
123 json_blocks = json.load(file_stream)
124 for json_block
in json_blocks:
125 if len(json_block) == 2:
127 pcap_block.timestamp = json_block[0]
128 pcap_block.payload = bytes.fromhex(json_block[1])
129 pcap_blocks.append(pcap_block)
132 if __name__ ==
"__main__":
136 save_udp_jsonfile =
""
140 udp_dst_ip =
"<broadcast>"
145 arg_parser = argparse.ArgumentParser()
146 arg_parser.add_argument(
"--pcap_filename", help=
"read upd packets rom pcapng-file", default=pcap_filename, type=str)
147 arg_parser.add_argument(
"--json_filename", help=
"read upd packets from json-file", default=json_filename, type=str)
148 arg_parser.add_argument(
"--save_udp_jsonfile", help=
"save upd packets to json-file", default=save_udp_jsonfile, type=str)
149 arg_parser.add_argument(
"--udp_port", help=
"dst udp port, or -1 for udp port from pcapng file)", default=udp_port, type=int)
150 arg_parser.add_argument(
"--send_rate", help=
"udp send rate in msgpacks per second, 240 for multiScan, or 0 to send by pcap-timestamps, or > 10000 for max. rate", default=udp_send_rate, type=int)
151 arg_parser.add_argument(
"--prompt", help=
"prompt for key after sending each udp packet (debugging only)", default=udp_prompt, type=int)
152 arg_parser.add_argument(
"--dst_ip", help=
"udp destination ip, e.g. 127.0.0.1 or <broadcast>", default=udp_dst_ip, type=str)
153 arg_parser.add_argument(
"--repeat", help=
"number of repetitions", default=num_repetitions, type=int)
154 arg_parser.add_argument(
"--verbose", help=
"print verbose messages", default=verbose, type=int)
155 arg_parser.add_argument(
"--filter", help=
"enable pcap filter by name, e.g. pcap_filter_multiscan_hildesheim for src_ip=192.168.0.1, dst_ip=192.168.0.100", default=
"", type=str)
156 arg_parser.add_argument(
"--max_seconds", help=
"max seconds to play", default=max_seconds, type=float)
158 cli_args = arg_parser.parse_args()
159 pcap_filename = cli_args.pcap_filename
160 json_filename = cli_args.json_filename
161 save_udp_jsonfile = cli_args.save_udp_jsonfile
162 udp_port = cli_args.udp_port
163 udp_send_rate = cli_args.send_rate
164 udp_prompt = cli_args.prompt
165 udp_dst_ip = cli_args.dst_ip
166 num_repetitions = cli_args.repeat
167 verbose = cli_args.verbose
168 max_seconds = cli_args.max_seconds
172 if cli_args.filter ==
"pcap_filter_multiscan_hildesheim":
173 pcap_filter =
PcapFilter([
"192.168.0.1" ], [
"192.168.0.100" ], [ 2115, 7503 ], [
"UDP",
"IP" ] )
177 if pcap_filename !=
"":
178 print(
"multiscan_pcap_player: reading pcapfile \"{}\" ...".format(pcap_filename))
180 elif json_filename !=
"":
183 print(
"## ERROR multiscan_pcap_player: neither pcapng of json file specified, use option --pcap_filename or --json_filename to configure inputfile with upd packets")
184 if len(pcap_blocks) == 0:
185 print(
"## ERROR multiscan_pcap_player: no udp packets found, aborting.")
187 print(
"multiscan_pcap_player: sending {} udp packets ...".format(len(pcap_blocks)))
190 udp_sender_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP)
191 udp_sender_socket.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
192 print(
"multiscan_pcap_player: sending on udp port {}, send_rate={}".format(udp_port, udp_send_rate))
195 save_udp_json_blocks = []
196 timestamp_end = time.perf_counter() + max_seconds
197 udp_port_last = udp_port
198 for repeat_cnt
in range(num_repetitions):
199 if time.perf_counter() >= timestamp_end:
202 for block_cnt, pcap_block
in enumerate(pcap_blocks):
203 block_payload = pcap_block.payload
204 block_timestamp = pcap_block.timestamp
207 dst_udp_port = udp_port
208 elif pcap_block.dst_port > 0:
209 dst_udp_port = pcap_block.dst_port
210 elif udp_port_last > 0:
211 dst_udp_port = udp_port_last
214 udp_port_last = dst_udp_port
216 if verbose > 0
or udp_prompt > 0:
217 payload_hex_str =
"".join(
"\\x{:02x}".format(payload_byte)
for payload_byte
in block_payload[:4])
218 print(
"pcap message {}: sending {} byte {}... (udp {}:{})".format(block_cnt, len(block_payload), payload_hex_str, udp_dst_ip, dst_udp_port))
220 payload_hex_str =
"".join(
"{:02x}".format(payload_byte)
for payload_byte
in block_payload)
221 if len(payload_hex_str) > 32:
222 payload_hex_str = payload_hex_str[0:32] +
"..."
223 if block_payload.find(b
'\x02\x02\x02\x02') >= 0:
225 input(
"pcap message {}: press ENTER to send {} byte {} >".format(block_cnt, len(block_payload), payload_hex_str))
227 print(
"pcap message {}: sending {} byte {} ...".format(block_cnt, len(block_payload), payload_hex_str))
230 udp_sender_socket.sendto(block_payload, (udp_dst_ip, dst_udp_port))
231 if save_udp_jsonfile:
232 payload_hex_str =
"".join(
"{:02x}".format(payload_byte)
for payload_byte
in block_payload)
233 save_udp_json_blocks.append((block_timestamp, payload_hex_str))
235 print(
"pcap message {}: {} byte sent".format(block_cnt, len(block_payload), udp_dst_ip, dst_udp_port))
239 if time.perf_counter() >= timestamp_end:
241 if send_timestamp > 0
and block_timestamp > send_timestamp
and udp_send_rate < 10000:
242 if udp_send_rate <= 0:
243 delay = block_timestamp - send_timestamp
245 delay = 1.0 / udp_send_rate
249 send_timestamp = block_timestamp
250 if save_udp_jsonfile:
251 with open(save_udp_jsonfile,
"w")
as file_stream:
252 json.dump(save_udp_json_blocks, file_stream, indent=2)
253 print(f
"multiscan_pcap_player: udp packets saved to file {save_udp_jsonfile}")
255 print(
"multiscan_pcap_player finished.")