sick_generic_device_finder.py
Go to the documentation of this file.
1 #!/usr/bin/env python3
2 
3 import socket
4 import random
5 import xml.etree.ElementTree as ET
6 from threading import Thread, Event
7 import sys
8 sys.stderr = object
9 
10 print(r'If you have more than one network interface it may happen that no scanner is found'
11  r' because the broadcast ip address does not match and the discovery packets are sent to the wrong interface.'
12  r'To fix this problem change the parameter <UDP_IP = "192.168.0.255"> '
13  r'to the broadcast address that ifconfig returns for your network interface e.g. "192.168.178.255".')
14 UDP_IP = "192.168.0.255"
15 UDP_PORT = 30718
16 RANDOM_KEY=random.randrange(4294967295)
17 MESSAGE = bytes.fromhex('10000008ffffffffffffc8f4b6270102c0a8007effffff00')
18 MESSAGE=MESSAGE.replace(bytes.fromhex('c8f4b627'),RANDOM_KEY.to_bytes(4, byteorder='big', signed=False))
19 HOST = '' # Symbolic name meaning all available interfaces
20 DEBUGMSGENABLED = False
21 ANSWERTIMEOUT=10 #timeout in seconds
22 # print("Test .... {<Var-ID>}".format(<VAR_ID>=<VAR|Wert>))
23 print("UDP target IP: {ip}".format(ip=UDP_IP))
24 print("UDP target port: {port}".format(port=UDP_PORT))
25 if(DEBUGMSGENABLED):
26  print("Message: {message}".format(message=MESSAGE))
27 print("The scan result is available in "+str(ANSWERTIMEOUT)+" seconds.")
28 #print("Random key: {random_key}".format(random_key=RANDOM_KEY.to_bytes(4, byteorder='big', signed=False)))
29 #inet udp communication
30 
31 # Event object used to send signals from one thread to another
32 stop_event = Event()
33 
34 
35 rawData =[]
36 
37 def uniq(input):
38  output = []
39  for x in input:
40  if x not in output:
41  output.append(x)
42  return output
43 
45  global rawData
46  sock = socket.socket(socket.AF_INET, # Internet
47  socket.SOCK_DGRAM) # UDP
48  sock.settimeout(ANSWERTIMEOUT - 5)
49  sock.bind((HOST, UDP_PORT))
50  # broadcast rechte holen
51  sock.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
52  sock.sendto(MESSAGE, (UDP_IP, UDP_PORT))
53  while True:
54  DATA, addr = sock.recvfrom(2048) # buffer size is 1024 bytes
55  if(DEBUGMSGENABLED):
56  print("received message: {data}".format(data=DATA))
57  rawData.append(DATA)
58  #DATA=b'\x90\x00\x04\xae\x00\x06w"H\x83\xc8\xf4\xb6\'\x00\x00<?xml version="1.0" encoding="UTF-8"?><NetScanResult MACAddr="00:06:77:22:48:83"><Item key="IPAddress" value="192.168.0.51" readonly="FALSE"></Item><Item key="IPMask" value="255.255.255.0" readonly="FALSE"></Item><Item key="IPGateway" value="0.0.0.0" readonly="FALSE"></Item><Item key="HostPortNo" value="2112" readonly="TRUE"></Item><Item key="HostModeClnEna" value="0" readonly="TRUE"></Item><Item key="AuxPortNo" value="2111" readonly="TRUE"></Item><Item key="DeviceType" value="TiM5xx V2.54-22.06.16" readonly="TRUE"></Item><Item key="FirmwareVersion" value="V2.54" readonly="TRUE"></Item><Item key="SerialNumber" value="16300542" readonly="TRUE"></Item><Item key="OrderNumber" value="1060445" readonly="TRUE"></Item><Item key="IPConfigDuration" value="5000" readonly="TRUE"></Item><Item key="ModeToHandleIP" value="NoExtraCmd" readonly="TRUE"></Item><Item key="LocationName" value="not defined" readonly="TRUE"></Item><Item key="HasDHCPClient" value="TRUE" readonly="TRUE"></Item><Item key="DHCPClientEnabled" value="FALSE" readonly="FALSE"></Item><Item key="IsBeepSupported" value="FALSE" readonly="TRUE"></Item><Item key="AddressingMode" value="index" readonly="TRUE"></Item></NetScanResult>'
59  if stop_event.is_set():
60  sock.close()
61  print("Wait time out")
62  if (DEBUGMSGENABLED):
63  print(rawData)
64  break
65  print("Wait time out 2")
66 
67 def getIpFromXmlData(DataListInput):
68  DataList=uniq(DataListInput)
69  for Data in DataList:
70  if b"<?xml" in Data:
71  if (DEBUGMSGENABLED):
72  print('Scanner found:')
73  XML_INDEX=Data.find(b'<?xml')
74  #print("XML index in Answerstring is: {xml_index}".format(xml_index=XML_INDEX))
75  XML_DATA=Data[XML_INDEX:]
76  if (DEBUGMSGENABLED):
77  print(XML_DATA)
78  root = ET.fromstring(XML_DATA)
79  ipAddress = "???"
80  IPMask= "???"
81  IPGateway = "???"
82  DeviceType = "???"
83  SerialNumber = "???"
84  DHCPClientEnabled= "???"
85  FirmwareVersion= "????"
86  for child in root:
87  k = child.attrib['key']
88  v = child.attrib['value']
89  if (k == 'IPAddress'):
90  ipAddress=v
91  if (k == 'IPMask'):
92  IPMask=v
93  if (k== 'IPGateway'):
94  IPGateway=v
95  if (k=='DeviceType'):
96  DeviceType=v
97  if (k=='SerialNumber'):
98  SerialNumber=v
99  if (k=='DHCPClientEnabled'):
100  DHCPClientEnabled=v
101  if (k=='FirmwareVersion'):
102  FirmwareVersion=v
103  print("Device type = "+DeviceType+" SN = "+SerialNumber+" IP = "+ipAddress+" IPMask = "+IPMask+" Gatway = "+IPGateway+" DHCPEnable = "+DHCPClientEnabled+"Firmwarevers.= "+FirmwareVersion)
104 
105 
106 
107 if __name__ == '__main__':
108  # We create another Thread
109  action_thread= Thread(target=getScannerXML)
110 
111  # Here we start the thread and we wait 5 seconds before the code continues to execute.
112  action_thread.start()
113  action_thread.join(timeout=ANSWERTIMEOUT+1)
114 
115  # We send a signal that the other thread should stop.
116  stop_event.set()
117  getIpFromXmlData(rawData)
118  sys.exit()
119 
120 
121 


sick_scan
Author(s): Michael Lehning , Jochen Sprickerhof , Martin Günther
autogenerated on Wed May 5 2021 03:05:48