NiclaReceiverServerMicroPy.py
Go to the documentation of this file.
1 #!/usr/bin/env python3
2 
3 import queue
4 import socket
5 import socketserver
6 from threading import Thread
7 import time
8 import struct
9 
10 IMAGE_TYPE = 0b00
11 AUDIO_TYPE = 0b01
12 RANGE_TYPE = 0b10
13 IMU_TYPE = 0b11
14 
15 
16 class UDPHandlerMicroPy(socketserver.BaseRequestHandler):
17  def handle(self):
18 
19  # with udp, self.request is a pair (data, socket)
20  packet = self.request[0]
21  # socket = self.request[1]
22 
23  size_packet = int.from_bytes(packet[:4], "big")
24 
25  if size_packet == len(packet[4:]):
26 
27  timestamp = time.time() # int.from_bytes(packet[4:8], "big")
28  data_type = packet[8]
29  data = packet[9:]
30 
31  if data_type == RANGE_TYPE:
32  if self.server.enable_range:
33  try:
34  self.server.range_buffer.put_nowait((timestamp, data))
35  except queue.Full:
36  self.server.range_buffer.get()
37  self.server.range_buffer.put_nowait((timestamp, data))
38 
39  else:
40  pass
41 
42  elif data_type == IMAGE_TYPE:
43  if self.server.enable_image:
44  try:
45  self.server.image_buffer.put_nowait((timestamp, data))
46  except queue.Full:
47  self.server.image_buffer.get()
48  self.server.image_buffer.put_nowait((timestamp, data))
49  else:
50  pass
51 
52  elif data_type == AUDIO_TYPE:
53  if self.server.enable_audio:
54  try:
55  self.server.audio_buffer.put_nowait((timestamp, data))
56  except queue.Full:
57  self.server.audio_buffer.get_nowait()
58  self.server.audio_buffer.put_nowait((timestamp, data))
59 
60  else:
61  pass
62 
63  elif data_type == IMU_TYPE:
64  if self.server.enable_imu:
65  try:
66  self.server.imu_buffer.put_nowait((timestamp, data))
67  except queue.Full:
68  self.server.imu_buffer.get_nowait()
69  self.server.imu_buffer.put_nowait((timestamp, data))
70  else:
71  pass
72 
73  else:
74  print(
75  "Warning: received packet of length {}, but expected length was {}!".format(
76  len(packet[4:]), size_packet
77  )
78  )
79 
80 
81 class NiclaReceiverUDPMicroPy(socketserver.UDPServer):
82 
83  def __init__(
84  self,
85  server_ip,
86  server_port,
87  enable_range=False,
88  enable_image=False,
89  enable_audio=False,
90  enable_imu=False,
91  ):
92 
93  super().__init__((server_ip, server_port), UDPHandlerMicroPy)
94 
95  self.enable_range = enable_range
96  self.enable_image = enable_image
97  self.enable_audio = enable_audio
98  self.enable_imu = enable_imu
99 
100  if self.enable_range:
101  self.range_buffer = queue.Queue(maxsize=10)
102  if self.enable_image:
103  self.image_buffer = queue.Queue(maxsize=10)
104  if self.enable_audio:
105  self.audio_buffer = queue.Queue(maxsize=10)
106  if self.enable_imu:
107  self.imu_buffer = queue.Queue(maxsize=10)
108 
109  self.server_thread = None
110 
111  def serve(self):
112  self.server_thread = Thread(target=self.serve_forever)
113  self.server_thread.start()
114 
115  def stop_serve(self):
116  print("stopping")
117  self.shutdown()
118  self.server_thread.join()
119  self.server_close()
120 
121  def get_range(self):
122 
123  if not self.range_buffer.empty():
124  return self.range_buffer.get_nowait()
125  else:
126  return None
127 
128  def get_image(self):
129  if not self.image_buffer.empty():
130  return self.image_buffer.get_nowait()
131  else:
132  return None
133 
134  def get_audio(self):
135  if not self.audio_buffer.empty():
136  return self.audio_buffer.get_nowait()
137  else:
138  return None
139 
140  def get_imu(self):
141  if not self.imu_buffer.empty():
142  return self.imu_buffer.get_nowait()
143  else:
144  return None
145 
146 
147 class TCPHandlerMicroPy(socketserver.BaseRequestHandler):
148  def handle(self):
149  self.request.settimeout(5.0) # Set a timeout
150  self.server.nicla_disconnect = False
151 
152  while True:
153  try:
154  packet = self.request.recv(65000)
155  if not packet:
156  break
157  timestamp = time.time()
158  timestamp = struct.pack(">d", timestamp)
159  packet = timestamp + packet
160  self.server.receiving_buffer.put_nowait(packet)
161  except socket.timeout:
162  print("Warning: Nicla disconnected! Resetting server... ")
163  self.server.receiving_buffer.queue.clear()
164  self.server.nicla_disconnect = True
165  break
166  except Exception as e:
167  print(f"Exception: {e}")
168  self.server.receiving_buffer.queue.clear()
169  self.server.nicla_disconnect = True
170  break # Break the loop on any other exception
171 
172 
173 class NiclaReceiverTCPMicroPy(socketserver.TCPServer):
174 
175  def __init__(
176  self,
177  server_ip,
178  server_port,
179  enable_range=False,
180  enable_image=False,
181  enable_audio=False,
182  enable_imu=False,
183  ):
184 
185  super().__init__((server_ip, server_port), TCPHandlerMicroPy)
186 
187  self.enable_range = enable_range
188  self.enable_image = enable_image
189  self.enable_audio = enable_audio
190  self.enable_imu = enable_imu
191 
192  if self.enable_range:
193  self.range_buffer = queue.Queue(maxsize=100)
194  if self.enable_image:
195  self.image_buffer = queue.Queue(maxsize=100)
196  if self.enable_audio:
197  self.audio_buffer = queue.Queue(maxsize=100)
198  if self.enable_imu:
199  self.imu_buffer = queue.Queue(maxsize=100)
200 
201  if (
203  or self.enable_image
204  or self.enable_audio
205  or self.enable_imu
206  ):
207  self.receiving_buffer = queue.Queue(maxsize=200)
208 
209  self.server_thread = None
210  self.nicla_disconnect = False
211 
212  def serve(self):
213  self.thread_regularizer = True
214  self.sorting_thread = Thread(target=self.sorting)
215  self.server_thread = Thread(target=self.serve_forever)
216  self.sorting_thread.start()
217  self.server_thread.start()
218 
219  def sorting(self):
220 
221  bkp_bytes_packets = bytes([])
222  timestamp = None
223 
224  while self.thread_regularizer:
225 
226  try:
227  bytes_packets = self.receiving_buffer.get_nowait()
228  timestamp = struct.unpack(">d", bytes_packets[:8])[0]
229  bytes_packets = bytes_packets[8:]
230 
231  if self.nicla_disconnect:
232  bytes_packets = bytes([])
233  bkp_bytes_packets = bytes([])
234  except Exception:
235  continue
236 
237  bytes_packets = bkp_bytes_packets + bytes_packets
238  bkp_bytes_packets = bytes([])
239 
240  if len(bytes_packets) < 9:
241  print("Got a packet from receiver less than header size!")
242  continue
243  else:
244  total_length = len(bytes_packets)
245  loop_termination_flag = True
246 
247  while loop_termination_flag:
248  size_packet = int.from_bytes(bytes_packets[:4], "big")
249 
250  if total_length - 4 >= size_packet:
251  packet = bytes_packets[4 : size_packet + 4]
252  bytes_packets = bytes_packets[size_packet + 4 :]
253 
254  # timestamp = int.from_bytes(packet[:4], "big")
255 
256  data_type = packet[4]
257  data = packet[5:]
258 
259  if data_type == RANGE_TYPE:
260  if self.enable_range:
261  try:
262  self.range_buffer.put_nowait(
263  (timestamp, data)
264  )
265  except queue.Full:
266  self.range_buffer.get_nowait()
267  self.range_buffer.put_nowait(
268  (timestamp, data)
269  )
270  else:
271  pass
272 
273  elif data_type == IMAGE_TYPE:
274  if self.enable_image:
275  try:
276  self.image_buffer.put_nowait(
277  (timestamp, data)
278  )
279  except queue.Full:
280  self.image_buffer.get_nowait()
281  self.image_buffer.put_nowait(
282  (timestamp, data)
283  )
284 
285  else:
286  pass
287 
288  elif data_type == AUDIO_TYPE:
289  if self.enable_audio:
290  try:
291  self.audio_buffer.put_nowait(
292  (timestamp, data)
293  )
294  except queue.Full:
295  self.audio_buffer.get_nowait()
296  self.audio_buffer.put_nowait(
297  (timestamp, data)
298  )
299 
300  else:
301  pass
302 
303  elif data_type == IMU_TYPE:
304  if self.enable_imu:
305  try:
306  self.imu_buffer.put_nowait(
307  (timestamp, data)
308  )
309  except queue.Full:
310  self.imu_buffer.get_nowait()
311  self.imu_buffer.put_nowait(
312  (timestamp, data)
313  )
314  else:
315  pass
316 
317  else:
318  bkp_bytes_packets = bytes_packets
319  loop_termination_flag = False
320 
321  total_length = len(bytes_packets)
322  if total_length == 0:
323  loop_termination_flag = False
324 
325  def stop_serve(self):
326  print("stopping")
327  self.thread_regularizer = False
328  self.sorting_thread.join()
329  self.shutdown()
330  self.server_thread.join()
331  self.server_close()
332 
333  def get_range(self):
334 
335  if not self.range_buffer.empty():
336  return self.range_buffer.get_nowait()
337  else:
338  return None
339 
340  def get_image(self):
341  if not self.image_buffer.empty():
342  return self.image_buffer.get_nowait()
343  else:
344  return None
345 
346  def get_audio(self):
347  if not self.audio_buffer.empty():
348  return self.audio_buffer.get_nowait()
349  else:
350  return None
351 
352  def get_imu(self):
353  if not self.imu_buffer.empty():
354  return self.imu_buffer.get_nowait()
355  else:
356  return None
nicla_vision_ros.NiclaReceiverServerMicroPy.NiclaReceiverTCPMicroPy.sorting_thread
sorting_thread
Definition: NiclaReceiverServerMicroPy.py:214
nicla_vision_ros.NiclaReceiverServerMicroPy.NiclaReceiverTCPMicroPy.nicla_disconnect
nicla_disconnect
Definition: NiclaReceiverServerMicroPy.py:202
nicla_vision_ros.NiclaReceiverServerMicroPy.NiclaReceiverTCPMicroPy.enable_range
enable_range
Definition: NiclaReceiverServerMicroPy.py:179
nicla_vision_ros.NiclaReceiverServerMicroPy.NiclaReceiverUDPMicroPy.enable_image
enable_image
Definition: NiclaReceiverServerMicroPy.py:88
nicla_vision_ros.NiclaReceiverServerMicroPy.NiclaReceiverTCPMicroPy.receiving_buffer
receiving_buffer
Definition: NiclaReceiverServerMicroPy.py:199
nicla_vision_ros.NiclaReceiverServerMicroPy.NiclaReceiverUDPMicroPy.server_thread
server_thread
Definition: NiclaReceiverServerMicroPy.py:101
nicla_vision_ros.NiclaReceiverServerMicroPy.NiclaReceiverTCPMicroPy
Definition: NiclaReceiverServerMicroPy.py:173
nicla_vision_ros.NiclaReceiverServerMicroPy.NiclaReceiverUDPMicroPy.stop_serve
def stop_serve(self)
Definition: NiclaReceiverServerMicroPy.py:115
nicla_vision_ros.NiclaReceiverServerMicroPy.UDPHandlerMicroPy
Definition: NiclaReceiverServerMicroPy.py:16
nicla_vision_ros.NiclaReceiverServerMicroPy.NiclaReceiverUDPMicroPy.image_buffer
image_buffer
Definition: NiclaReceiverServerMicroPy.py:95
nicla_vision_ros.NiclaReceiverServerMicroPy.NiclaReceiverTCPMicroPy.get_audio
def get_audio(self)
Definition: NiclaReceiverServerMicroPy.py:346
nicla_vision_ros.NiclaReceiverServerMicroPy.NiclaReceiverTCPMicroPy.imu_buffer
imu_buffer
Definition: NiclaReceiverServerMicroPy.py:191
nicla_vision_ros.NiclaReceiverServerMicroPy.NiclaReceiverTCPMicroPy.sorting
def sorting(self)
Definition: NiclaReceiverServerMicroPy.py:219
nicla_vision_ros.NiclaReceiverServerMicroPy.TCPHandlerMicroPy.handle
def handle(self)
Definition: NiclaReceiverServerMicroPy.py:148
nicla_vision_ros.NiclaReceiverServerMicroPy.NiclaReceiverTCPMicroPy.enable_image
enable_image
Definition: NiclaReceiverServerMicroPy.py:180
nicla_vision_ros.NiclaReceiverServerMicroPy.NiclaReceiverTCPMicroPy.enable_audio
enable_audio
Definition: NiclaReceiverServerMicroPy.py:181
nicla_vision_ros.NiclaReceiverServerMicroPy.NiclaReceiverUDPMicroPy.range_buffer
range_buffer
Definition: NiclaReceiverServerMicroPy.py:93
nicla_vision_ros.NiclaReceiverServerMicroPy.NiclaReceiverTCPMicroPy.thread_regularizer
thread_regularizer
Definition: NiclaReceiverServerMicroPy.py:213
nicla_vision_ros.NiclaReceiverServerMicroPy.NiclaReceiverTCPMicroPy.get_imu
def get_imu(self)
Definition: NiclaReceiverServerMicroPy.py:352
nicla_vision_ros.NiclaReceiverServerMicroPy.NiclaReceiverTCPMicroPy.stop_serve
def stop_serve(self)
Definition: NiclaReceiverServerMicroPy.py:325
nicla_vision_ros.NiclaReceiverServerMicroPy.NiclaReceiverUDPMicroPy.enable_range
enable_range
Definition: NiclaReceiverServerMicroPy.py:87
nicla_vision_ros.NiclaReceiverServerMicroPy.NiclaReceiverTCPMicroPy.enable_imu
enable_imu
Definition: NiclaReceiverServerMicroPy.py:182
nicla_vision_ros.NiclaReceiverServerMicroPy.NiclaReceiverTCPMicroPy.get_image
def get_image(self)
Definition: NiclaReceiverServerMicroPy.py:340
nicla_vision_ros.NiclaReceiverServerMicroPy.NiclaReceiverUDPMicroPy.get_audio
def get_audio(self)
Definition: NiclaReceiverServerMicroPy.py:134
nicla_vision_ros.NiclaReceiverServerMicroPy.NiclaReceiverUDPMicroPy.__init__
def __init__(self, server_ip, server_port, enable_range=False, enable_image=False, enable_audio=False, enable_imu=False)
Definition: NiclaReceiverServerMicroPy.py:83
nicla_vision_ros.NiclaReceiverServerMicroPy.NiclaReceiverUDPMicroPy.serve
def serve(self)
Definition: NiclaReceiverServerMicroPy.py:111
nicla_vision_ros.NiclaReceiverServerMicroPy.NiclaReceiverTCPMicroPy.get_range
def get_range(self)
Definition: NiclaReceiverServerMicroPy.py:333
nicla_vision_ros.NiclaReceiverServerMicroPy.NiclaReceiverUDPMicroPy.get_image
def get_image(self)
Definition: NiclaReceiverServerMicroPy.py:128
nicla_vision_ros.NiclaReceiverServerMicroPy.UDPHandlerMicroPy.handle
def handle(self)
Definition: NiclaReceiverServerMicroPy.py:17
nicla_vision_ros.NiclaReceiverServerMicroPy.NiclaReceiverUDPMicroPy.imu_buffer
imu_buffer
Definition: NiclaReceiverServerMicroPy.py:99
nicla_vision_ros.NiclaReceiverServerMicroPy.NiclaReceiverUDPMicroPy.enable_imu
enable_imu
Definition: NiclaReceiverServerMicroPy.py:90
nicla_vision_ros.NiclaReceiverServerMicroPy.NiclaReceiverUDPMicroPy.enable_audio
enable_audio
Definition: NiclaReceiverServerMicroPy.py:89
nicla_vision_ros.NiclaReceiverServerMicroPy.NiclaReceiverTCPMicroPy.audio_buffer
audio_buffer
Definition: NiclaReceiverServerMicroPy.py:189
nicla_vision_ros.NiclaReceiverServerMicroPy.TCPHandlerMicroPy
Definition: NiclaReceiverServerMicroPy.py:147
nicla_vision_ros.NiclaReceiverServerMicroPy.NiclaReceiverTCPMicroPy.__init__
def __init__(self, server_ip, server_port, enable_range=False, enable_image=False, enable_audio=False, enable_imu=False)
Definition: NiclaReceiverServerMicroPy.py:175
nicla_vision_ros.NiclaReceiverServerMicroPy.NiclaReceiverTCPMicroPy.range_buffer
range_buffer
Definition: NiclaReceiverServerMicroPy.py:185
nicla_vision_ros.NiclaReceiverServerMicroPy.NiclaReceiverUDPMicroPy
Definition: NiclaReceiverServerMicroPy.py:81
nicla_vision_ros.NiclaReceiverServerMicroPy.NiclaReceiverTCPMicroPy.image_buffer
image_buffer
Definition: NiclaReceiverServerMicroPy.py:187
nicla_vision_ros.NiclaReceiverServerMicroPy.NiclaReceiverUDPMicroPy.audio_buffer
audio_buffer
Definition: NiclaReceiverServerMicroPy.py:97
nicla_vision_ros.NiclaReceiverServerMicroPy.NiclaReceiverTCPMicroPy.serve
def serve(self)
Definition: NiclaReceiverServerMicroPy.py:212
nicla_vision_ros.NiclaReceiverServerMicroPy.NiclaReceiverTCPMicroPy.server_thread
server_thread
Definition: NiclaReceiverServerMicroPy.py:201
nicla_vision_ros.NiclaReceiverServerMicroPy.NiclaReceiverUDPMicroPy.get_imu
def get_imu(self)
Definition: NiclaReceiverServerMicroPy.py:140
nicla_vision_ros.NiclaReceiverServerMicroPy.NiclaReceiverUDPMicroPy.get_range
def get_range(self)
Definition: NiclaReceiverServerMicroPy.py:121


nicla_vision_ros
Author(s): Davide Torielli , Damiano Gasperini , Edoardo Del Bianco , Federico Rollo
autogenerated on Sat Nov 16 2024 03:38:18