file_server.py
Go to the documentation of this file.
1 #
2 # Copyright (C) 2014-2015 UAVCAN Development Team <uavcan.org>
3 #
4 # This software is distributed under the terms of the MIT License.
5 #
6 # Author: Ben Dyer <ben_dyer@mac.com>
7 # Pavel Kirienko <pavel.kirienko@zubax.com>
8 #
9 
10 from __future__ import division, absolute_import, print_function, unicode_literals
11 import os
12 from collections import defaultdict
13 from logging import getLogger
14 import pyuavcan_v0
15 import errno
16 
17 
18 logger = getLogger(__name__)
19 
20 
21 def _try_resolve_relative_path(search_in, rel_path):
22  rel_path = os.path.normcase(os.path.normpath(rel_path))
23  for p in search_in:
24  p = os.path.normcase(os.path.abspath(p))
25  if p.endswith(rel_path) and os.path.isfile(p):
26  return p
27  joined = os.path.join(p, rel_path)
28  if os.path.isfile(joined):
29  return joined
30 
31 
32 # noinspection PyBroadException
33 class FileServer(object):
34  def __init__(self, node, lookup_paths=None):
35  if node.is_anonymous:
36  raise pyuavcan_v0.UAVCANException('File server cannot be launched on an anonymous node')
37 
38  self.lookup_paths = lookup_paths or []
39 
40  self._path_hit_counters = defaultdict(int)
41  self._handles = []
42 
43  def add_handler(datatype, callback):
44  self._handles.append(node.add_handler(datatype, callback))
45 
46  add_handler(pyuavcan_v0.protocol.file.GetInfo, self._get_info)
47  add_handler(pyuavcan_v0.protocol.file.Read, self._read)
48  # TODO: support all file services
49 
50  def close(self):
51  for x in self._handles:
52  x.remove()
53 
54  @property
55  def path_hit_counters(self):
56  return dict(self._path_hit_counters)
57 
58  def _resolve_path(self, relative):
59  rel = relative.path.decode().replace(chr(relative.SEPARATOR), os.path.sep)
61  if not out:
62  raise OSError(errno.ENOENT)
63 
64  self._path_hit_counters[out] += 1
65  return out
66 
67  def _get_info(self, e):
68  logger.debug("[#{0:03d}:pyuavcan_v0.protocol.file.GetInfo] {1!r}"
69  .format(e.transfer.source_node_id, e.request.path.path.decode()))
70  try:
71  with open(self._resolve_path(e.request.path), "rb") as f:
72  data = f.read()
73  resp = pyuavcan_v0.protocol.file.GetInfo.Response()
74  resp.error.value = resp.error.OK
75  resp.size = len(data)
76  resp.entry_type.flags = resp.entry_type.FLAG_FILE | resp.entry_type.FLAG_READABLE
77  except Exception:
78  # TODO: Convert OSError codes to the error codes defined in DSDL
79  logger.exception("[#{0:03d}:pyuavcan_v0.protocol.file.GetInfo] error", exc_info=True)
80  resp = pyuavcan_v0.protocol.file.GetInfo.Response()
81  resp.error.value = resp.error.UNKNOWN_ERROR
82 
83  return resp
84 
85  def _read(self, e):
86  logger.debug("[#{0:03d}:pyuavcan_v0.protocol.file.Read] {1!r} @ offset {2:d}"
87  .format(e.transfer.source_node_id, e.request.path.path.decode(), e.request.offset))
88  try:
89  with open(self._resolve_path(e.request.path), "rb") as f:
90  f.seek(e.request.offset)
91  resp = pyuavcan_v0.protocol.file.Read.Response()
92  read_size = pyuavcan_v0.get_uavcan_data_type(pyuavcan_v0.get_fields(resp)['data']).max_size
93  resp.data = bytearray(f.read(read_size))
94  resp.error.value = resp.error.OK
95  except Exception:
96  logger.exception("[#{0:03d}:pyuavcan_v0.protocol.file.Read] error")
97  resp = pyuavcan_v0.protocol.file.Read.Response()
98  resp.error.value = resp.error.UNKNOWN_ERROR
99 
100  return resp
pyuavcan_v0.app.file_server.FileServer.lookup_paths
lookup_paths
Definition: file_server.py:38
pyuavcan_v0.app.file_server.FileServer._get_info
def _get_info(self, e)
Definition: file_server.py:67
pyuavcan_v0.app.file_server.FileServer._read
def _read(self, e)
Definition: file_server.py:85
pyuavcan_v0.UAVCANException
Definition: pyuavcan/pyuavcan_v0/__init__.py:45
pyuavcan_v0.app.file_server.FileServer._resolve_path
def _resolve_path(self, relative)
Definition: file_server.py:58
pyuavcan_v0.app.file_server.FileServer.close
def close(self)
Definition: file_server.py:50
pyuavcan_v0.app.file_server.FileServer
Definition: file_server.py:33
pyuavcan_v0.app.file_server.FileServer.__init__
def __init__(self, node, lookup_paths=None)
Definition: file_server.py:34
pyuavcan_v0.app.file_server._try_resolve_relative_path
def _try_resolve_relative_path(search_in, rel_path)
Definition: file_server.py:21
pyuavcan_v0.app.file_server.FileServer.path_hit_counters
def path_hit_counters(self)
Definition: file_server.py:55
test.format
format
Definition: dsdl/test.py:6
pyuavcan_v0.app.file_server.FileServer._path_hit_counters
_path_hit_counters
Definition: file_server.py:40
pyuavcan_v0.app.file_server.FileServer._handles
_handles
Definition: file_server.py:41


uavcan_communicator
Author(s):
autogenerated on Fri Dec 13 2024 03:10:02