NiclaReceiverServer.py
Go to the documentation of this file.
1 #!/usr/bin/env python3
2 
3 import queue
4 import socket
5 import socketserver
6 from threading import Thread
7 import time
8 import struct
9 import numpy as np
10 import cv2
11 
12 IMAGE_TYPE = 0b00
13 AUDIO_TYPE = 0b01
14 RANGE_TYPE = 0b10
15 IMU_TYPE = 0b11
16 
17 
18 class UDPHandler(socketserver.BaseRequestHandler):
19  def handle(self):
20 
21  # with udp, self.request is a pair (data, socket)
22  packet = self.request[0]
23  # socket = self.request[1]
24 
25  size_packet = int.from_bytes(packet[:4], "little")
26 
27  data_type = packet[8]
28 
29  if size_packet == len(packet[4:]): # or data_type == IMAGE_TYPE:
30 
31  timestamp = time.time() # int.from_bytes(packet[4:8], "big")
32 
33  data = packet[9:]
34 
35  if data_type == RANGE_TYPE:
36  if self.server.enable_range:
37  try:
38  self.server.range_buffer.put_nowait((timestamp, data))
39  except queue.Full:
40  self.server.range_buffer.get()
41  self.server.range_buffer.put_nowait((timestamp, data))
42 
43  else:
44  pass
45 
46  elif data_type == IMAGE_TYPE:
47  if self.server.enable_image:
48 
49  timestamp_img = int.from_bytes(packet[4:8], "little")
50  idx_img = packet[9]
51 
52  # print("TIMESTAMP: ", timestamp_img)
53  # print("SIZE: ", size_packet)
54  # print("IDX IMG: ", idx_img)
55 
56  if not idx_img: # first half
57  self.server.last_timestamp_img = timestamp_img
58  self.server.last_idx_img = idx_img
59  self.server.last_half_img = packet[10:]
60 
61  else: # second half
62  if timestamp_img == self.server.last_timestamp_img:
63 
64  half_img_bin_0 = self.server.last_half_img
65 
66  half_img_bin_1 = packet[10:]
67 
68  half_img_bin_0 = np.asarray(
69  bytearray(half_img_bin_0), dtype="uint8"
70  )
71  half_img_bin_1 = np.asarray(
72  bytearray(half_img_bin_1), dtype="uint8"
73  )
74 
75  half_img_dec_0 = cv2.imdecode(
76  half_img_bin_0, cv2.IMREAD_UNCHANGED
77  )
78  half_img_dec_0 = np.dstack(
79  (
80  half_img_dec_0[:, :, 2],
81  half_img_dec_0[:, :, 1],
82  half_img_dec_0[:, :, 0],
83  )
84  )
85 
86  half_img_dec_1 = cv2.imdecode(
87  half_img_bin_1, cv2.IMREAD_UNCHANGED
88  )
89  half_img_dec_1 = np.dstack(
90  (
91  half_img_dec_1[:, :, 2],
92  half_img_dec_1[:, :, 1],
93  half_img_dec_1[:, :, 0],
94  )
95  )
96 
97  # Stack the images vertically
98  combined_image = np.vstack(
99  (half_img_dec_1, half_img_dec_0)
100  )
101 
102  try:
103  self.server.image_buffer.put_nowait(
104  (timestamp, combined_image)
105  )
106  except queue.Full:
107  self.server.image_buffer.get()
108  self.server.image_buffer.put_nowait(
109  (timestamp, combined_image)
110  )
111 
112  else:
113  pass
114 
115  elif data_type == AUDIO_TYPE:
116  if self.server.enable_audio:
117  try:
118  self.server.audio_buffer.put_nowait((timestamp, data))
119  except queue.Full:
120  self.server.audio_buffer.get_nowait()
121  self.server.audio_buffer.put_nowait((timestamp, data))
122 
123  else:
124  pass
125 
126  elif data_type == IMU_TYPE:
127  if self.server.enable_imu:
128  try:
129  self.server.imu_buffer.put_nowait((timestamp, data))
130  except queue.Full:
131  self.server.imu_buffer.get_nowait()
132  self.server.imu_buffer.put_nowait((timestamp, data))
133  else:
134  pass
135 
136  else:
137  print(
138  "Warning: received packet of length {}, but expected length was {}!".format(
139  len(packet[4:]), size_packet
140  )
141  )
142 
143 
144 class NiclaReceiverUDP(socketserver.UDPServer):
145 
146  def __init__(
147  self,
148  server_ip,
149  server_port,
150  enable_range=False,
151  enable_image=False,
152  enable_audio=False,
153  enable_imu=False,
154  ):
155 
156  super().__init__((server_ip, server_port), UDPHandler)
157 
158  self.enable_range = enable_range
159  self.enable_image = enable_image
160  self.enable_audio = enable_audio
161  self.enable_imu = enable_imu
162 
163  if self.enable_range:
164  self.range_buffer = queue.Queue(maxsize=10)
165  if self.enable_image:
166  self.image_buffer = queue.Queue(maxsize=10)
167  if self.enable_audio:
168  self.audio_buffer = queue.Queue(maxsize=10)
169  if self.enable_imu:
170  self.imu_buffer = queue.Queue(maxsize=10)
171 
172  self.server_thread = None
173 
174  self.last_timestamp_img = None
175  self.last_idx_img = None
176  self.last_half_img = None
177 
178  def serve(self):
179  self.server_thread = Thread(target=self.serve_forever)
180  self.server_thread.start()
181 
182  def stop_serve(self):
183  print("stopping")
184  self.shutdown()
185  self.server_thread.join()
186  self.server_close()
187 
188  def get_range(self):
189 
190  if not self.range_buffer.empty():
191  return self.range_buffer.get_nowait()
192  else:
193  return None
194 
195  def get_image(self):
196  if not self.image_buffer.empty():
197  return self.image_buffer.get_nowait()
198  else:
199  return None
200 
201  def get_audio(self):
202  if not self.audio_buffer.empty():
203  return self.audio_buffer.get_nowait()
204  else:
205  return None
206 
207  def get_imu(self):
208  if not self.imu_buffer.empty():
209  return self.imu_buffer.get_nowait()
210  else:
211  return None
212 
213 
214 class TCPHandler(socketserver.BaseRequestHandler):
215  def handle(self):
216  self.request.settimeout(5.0) # Set a timeout
217  self.server.nicla_disconnect = False
218 
219  while True:
220  try:
221  packet = self.request.recv(65000)
222  if not packet:
223  break
224  timestamp = time.time()
225  timestamp = struct.pack(">d", timestamp)
226  packet = timestamp + packet
227  self.server.receiving_buffer.put_nowait(packet)
228  except socket.timeout:
229  print("Warning: Nicla disconnected! Resetting server... ")
230  self.server.receiving_buffer.queue.clear()
231  self.server.nicla_disconnect = True
232  break
233  except Exception as e:
234  print(f"Exception: {e}")
235  self.server.receiving_buffer.queue.clear()
236  self.server.nicla_disconnect = True
237  break # Break the loop on any other exception
238 
239 
240 class NiclaReceiverTCP(socketserver.TCPServer):
241 
242  def __init__(
243  self,
244  server_ip,
245  server_port,
246  enable_range=False,
247  enable_image=False,
248  enable_audio=False,
249  enable_imu=False,
250  ):
251 
252  super().__init__((server_ip, server_port), TCPHandler)
253 
254  self.enable_range = enable_range
255  self.enable_image = enable_image
256  self.enable_audio = enable_audio
257  self.enable_imu = enable_imu
258 
259  if self.enable_range:
260  self.range_buffer = queue.Queue(maxsize=100)
261  if self.enable_image:
262  self.image_buffer = queue.Queue(maxsize=100)
263  if self.enable_audio:
264  self.audio_buffer = queue.Queue(maxsize=100)
265  if self.enable_imu:
266  self.imu_buffer = queue.Queue(maxsize=100)
267 
268  if (
270  or self.enable_image
271  or self.enable_audio
272  or self.enable_imu
273  ):
274  self.receiving_buffer = queue.Queue(maxsize=200)
275 
276  self.server_thread = None
277  self.nicla_disconnect = False
278 
279  def serve(self):
280  self.thread_regularizer = True
281  self.sorting_thread = Thread(target=self.sorting)
282  self.server_thread = Thread(target=self.serve_forever)
283  self.sorting_thread.start()
284  self.server_thread.start()
285 
286  def sorting(self):
287 
288  bkp_bytes_packets = bytes([])
289  timestamp = None
290  half_img = None
291 
292  while self.thread_regularizer:
293 
294  try:
295  bytes_packets = self.receiving_buffer.get_nowait()
296  timestamp = struct.unpack(">d", bytes_packets[:8])[0]
297  bytes_packets = bytes_packets[8:]
298 
299  if self.nicla_disconnect:
300  bytes_packets = bytes([])
301  bkp_bytes_packets = bytes([])
302  except:
303  continue
304 
305  bytes_packets = bkp_bytes_packets + bytes_packets
306  bkp_bytes_packets = bytes([])
307 
308  if len(bytes_packets) < 9:
309  print("Got a packet from receiver less than header size!")
310  continue
311  else:
312  total_length = len(bytes_packets)
313  loop_termination_flag = True
314 
315  while loop_termination_flag:
316  size_packet = int.from_bytes(bytes_packets[:4], "little")
317 
318  if total_length - 4 >= size_packet:
319  packet = bytes_packets[4 : size_packet + 4]
320  bytes_packets = bytes_packets[size_packet + 4 :]
321 
322  # timestamp = int.from_bytes(packet[:4], "big")
323 
324  data_type = int.from_bytes(packet[4:5], "little")
325  data = packet[5:]
326 
327  if data_type == RANGE_TYPE:
328  if self.enable_range:
329  try:
330  self.range_buffer.put_nowait(
331  (timestamp, data)
332  )
333  except queue.Full:
334  self.range_buffer.get_nowait()
335  self.range_buffer.put_nowait(
336  (timestamp, data)
337  )
338  else:
339  pass
340 
341  elif data_type == IMAGE_TYPE:
342  if self.enable_image:
343  idx_img = data[0]
344  half_img_bin = np.asarray(
345  bytearray(data[1:]), dtype="uint8"
346  )
347 
348  half_img_dec = cv2.imdecode(
349  half_img_bin, cv2.IMREAD_UNCHANGED
350  )
351 
352  half_img_dec = np.dstack(
353  (
354  half_img_dec[:, :, 2],
355  half_img_dec[:, :, 1],
356  half_img_dec[:, :, 0],
357  )
358  )
359 
360  if not idx_img: # first half
361  half_img = half_img_dec
362 
363  else: # second half
364  # Stack the images vertically
365  combined_image = np.vstack(
366  (half_img_dec, half_img)
367  )
368  try:
369  self.image_buffer.put_nowait(
370  (timestamp, combined_image)
371  )
372  except queue.Full:
373  self.image_buffer.get_nowait()
374  self.image_buffer.put_nowait(
375  (timestamp, combined_image)
376  )
377 
378  half_img = None
379  else:
380  pass
381 
382  elif data_type == AUDIO_TYPE:
383  if self.enable_audio:
384  try:
385  self.audio_buffer.put_nowait(
386  (timestamp, data)
387  )
388  except queue.Full:
389  self.audio_buffer.get_nowait()
390  self.audio_buffer.put_nowait(
391  (timestamp, data)
392  )
393 
394  else:
395  pass
396 
397  elif data_type == IMU_TYPE:
398  if self.enable_imu:
399  try:
400  self.imu_buffer.put_nowait(
401  (timestamp, data)
402  )
403  except queue.Full:
404  self.imu_buffer.get_nowait()
405  self.imu_buffer.put_nowait(
406  (timestamp, data)
407  )
408  else:
409  pass
410 
411  else:
412  bkp_bytes_packets = bytes_packets
413  loop_termination_flag = False
414 
415  total_length = len(bytes_packets)
416  if total_length == 0:
417  loop_termination_flag = False
418 
419  def stop_serve(self):
420  print("stopping")
421  self.thread_regularizer = False
422  self.sorting_thread.join()
423  self.shutdown()
424  self.server_thread.join()
425  self.server_close()
426 
427  def get_range(self):
428 
429  if not self.range_buffer.empty():
430  return self.range_buffer.get_nowait()
431  else:
432  return None
433 
434  def get_image(self):
435  if not self.image_buffer.empty():
436  return self.image_buffer.get_nowait()
437  else:
438  return None
439 
440  def get_audio(self):
441  if not self.audio_buffer.empty():
442  return self.audio_buffer.get_nowait()
443  else:
444  return None
445 
446  def get_imu(self):
447  if not self.imu_buffer.empty():
448  return self.imu_buffer.get_nowait()
449  else:
450  return None
nicla_vision_ros.NiclaReceiverServer.NiclaReceiverTCP.audio_buffer
audio_buffer
Definition: NiclaReceiverServer.py:256
nicla_vision_ros.NiclaReceiverServer.NiclaReceiverUDP.get_imu
def get_imu(self)
Definition: NiclaReceiverServer.py:207
nicla_vision_ros.NiclaReceiverServer.TCPHandler
Definition: NiclaReceiverServer.py:214
nicla_vision_ros.NiclaReceiverServer.UDPHandler.handle
def handle(self)
Definition: NiclaReceiverServer.py:19
nicla_vision_ros.NiclaReceiverServer.NiclaReceiverUDP.last_timestamp_img
last_timestamp_img
Definition: NiclaReceiverServer.py:166
nicla_vision_ros.NiclaReceiverServer.NiclaReceiverTCP.enable_range
enable_range
Definition: NiclaReceiverServer.py:246
nicla_vision_ros.NiclaReceiverServer.NiclaReceiverTCP.server_thread
server_thread
Definition: NiclaReceiverServer.py:268
nicla_vision_ros.NiclaReceiverServer.NiclaReceiverUDP.get_image
def get_image(self)
Definition: NiclaReceiverServer.py:195
nicla_vision_ros.NiclaReceiverServer.NiclaReceiverTCP.sorting
def sorting(self)
Definition: NiclaReceiverServer.py:286
nicla_vision_ros.NiclaReceiverServer.NiclaReceiverUDP.get_range
def get_range(self)
Definition: NiclaReceiverServer.py:188
nicla_vision_ros.NiclaReceiverServer.NiclaReceiverTCP.get_imu
def get_imu(self)
Definition: NiclaReceiverServer.py:446
nicla_vision_ros.NiclaReceiverServer.TCPHandler.handle
def handle(self)
Definition: NiclaReceiverServer.py:215
nicla_vision_ros.NiclaReceiverServer.NiclaReceiverTCP.enable_audio
enable_audio
Definition: NiclaReceiverServer.py:248
nicla_vision_ros.NiclaReceiverServer.NiclaReceiverTCP.range_buffer
range_buffer
Definition: NiclaReceiverServer.py:252
nicla_vision_ros.NiclaReceiverServer.NiclaReceiverTCP.nicla_disconnect
nicla_disconnect
Definition: NiclaReceiverServer.py:269
nicla_vision_ros.NiclaReceiverServer.NiclaReceiverTCP.stop_serve
def stop_serve(self)
Definition: NiclaReceiverServer.py:419
nicla_vision_ros.NiclaReceiverServer.NiclaReceiverTCP.thread_regularizer
thread_regularizer
Definition: NiclaReceiverServer.py:280
nicla_vision_ros.NiclaReceiverServer.NiclaReceiverUDP.last_half_img
last_half_img
Definition: NiclaReceiverServer.py:168
nicla_vision_ros.NiclaReceiverServer.NiclaReceiverUDP.audio_buffer
audio_buffer
Definition: NiclaReceiverServer.py:160
nicla_vision_ros.NiclaReceiverServer.NiclaReceiverTCP.enable_imu
enable_imu
Definition: NiclaReceiverServer.py:249
nicla_vision_ros.NiclaReceiverServer.NiclaReceiverUDP.server_thread
server_thread
Definition: NiclaReceiverServer.py:164
nicla_vision_ros.NiclaReceiverServer.NiclaReceiverTCP
Definition: NiclaReceiverServer.py:240
nicla_vision_ros.NiclaReceiverServer.NiclaReceiverUDP.serve
def serve(self)
Definition: NiclaReceiverServer.py:178
nicla_vision_ros.NiclaReceiverServer.NiclaReceiverTCP.get_audio
def get_audio(self)
Definition: NiclaReceiverServer.py:440
nicla_vision_ros.NiclaReceiverServer.NiclaReceiverUDP.image_buffer
image_buffer
Definition: NiclaReceiverServer.py:158
nicla_vision_ros.NiclaReceiverServer.NiclaReceiverUDP.enable_imu
enable_imu
Definition: NiclaReceiverServer.py:153
nicla_vision_ros.NiclaReceiverServer.NiclaReceiverTCP.sorting_thread
sorting_thread
Definition: NiclaReceiverServer.py:281
nicla_vision_ros.NiclaReceiverServer.NiclaReceiverUDP.enable_audio
enable_audio
Definition: NiclaReceiverServer.py:152
nicla_vision_ros.NiclaReceiverServer.NiclaReceiverUDP.imu_buffer
imu_buffer
Definition: NiclaReceiverServer.py:162
nicla_vision_ros.NiclaReceiverServer.NiclaReceiverUDP.stop_serve
def stop_serve(self)
Definition: NiclaReceiverServer.py:182
nicla_vision_ros.NiclaReceiverServer.NiclaReceiverTCP.enable_image
enable_image
Definition: NiclaReceiverServer.py:247
nicla_vision_ros.NiclaReceiverServer.NiclaReceiverUDP.get_audio
def get_audio(self)
Definition: NiclaReceiverServer.py:201
nicla_vision_ros.NiclaReceiverServer.NiclaReceiverTCP.get_range
def get_range(self)
Definition: NiclaReceiverServer.py:427
nicla_vision_ros.NiclaReceiverServer.NiclaReceiverTCP.receiving_buffer
receiving_buffer
Definition: NiclaReceiverServer.py:266
nicla_vision_ros.NiclaReceiverServer.NiclaReceiverTCP.serve
def serve(self)
Definition: NiclaReceiverServer.py:279
nicla_vision_ros.NiclaReceiverServer.NiclaReceiverUDP.enable_range
enable_range
Definition: NiclaReceiverServer.py:150
nicla_vision_ros.NiclaReceiverServer.NiclaReceiverUDP.last_idx_img
last_idx_img
Definition: NiclaReceiverServer.py:167
nicla_vision_ros.NiclaReceiverServer.NiclaReceiverUDP.__init__
def __init__(self, server_ip, server_port, enable_range=False, enable_image=False, enable_audio=False, enable_imu=False)
Definition: NiclaReceiverServer.py:146
nicla_vision_ros.NiclaReceiverServer.NiclaReceiverTCP.__init__
def __init__(self, server_ip, server_port, enable_range=False, enable_image=False, enable_audio=False, enable_imu=False)
Definition: NiclaReceiverServer.py:242
nicla_vision_ros.NiclaReceiverServer.NiclaReceiverTCP.image_buffer
image_buffer
Definition: NiclaReceiverServer.py:254
nicla_vision_ros.NiclaReceiverServer.NiclaReceiverUDP.range_buffer
range_buffer
Definition: NiclaReceiverServer.py:156
nicla_vision_ros.NiclaReceiverServer.NiclaReceiverUDP.enable_image
enable_image
Definition: NiclaReceiverServer.py:151
nicla_vision_ros.NiclaReceiverServer.NiclaReceiverUDP
Definition: NiclaReceiverServer.py:144
nicla_vision_ros.NiclaReceiverServer.NiclaReceiverTCP.imu_buffer
imu_buffer
Definition: NiclaReceiverServer.py:258
nicla_vision_ros.NiclaReceiverServer.NiclaReceiverTCP.get_image
def get_image(self)
Definition: NiclaReceiverServer.py:434
nicla_vision_ros.NiclaReceiverServer.UDPHandler
Definition: NiclaReceiverServer.py:18


nicla_vision_ros
Author(s): Davide Torielli , Damiano Gasperini , Edoardo Del Bianco , Federico Rollo
autogenerated on Sat Nov 16 2024 03:38:18