data_buffer.py
Go to the documentation of this file.
1 from __future__ import division
2 
3 import array
4 from threading import Lock
5 
6 import numpy as np
7 import rospy
8 import rostopic
9 
10 
11 class DataBuffer(object):
12 
13  def __init__(self, topic_name='~input',
14  expr_to_get_data='m.data',
15  input_sample_rate=500,
16  window_size=10.0,
17  is_integer=False,
18  is_signed=True,
19  bitdepth=64,
20  n_channel=1, target_channel=0,
21  get_latest_data=False,
22  discard_data=False,
23  auto_start=False):
24  self.is_subscribing = True
25  self.get_latest_data = get_latest_data
26  self.discard_data = discard_data
27  self._window_size = window_size
28  self.data_buffer_len = int(self._window_size * input_sample_rate)
29  self.lock = Lock()
30  self.bitdepth = bitdepth
31  self.n_channel = n_channel
32  self.target_channel = min(self.n_channel - 1, max(0, target_channel))
33  self.input_sample_rate = input_sample_rate
34  self.type_code = {}
35  if is_integer:
36  if is_signed:
37  codes = ['b', 'h', 'i', 'l']
38  else:
39  codes = ['B', 'H', 'I', 'L']
40  else:
41  codes = ['f', 'd']
42  for code in codes:
43  self.type_code[array.array(code).itemsize] = code
44 
45  self.dtype = self.type_code[self.bitdepth / 8]
46  self.data_buffer = np.array([], dtype=self.dtype)
47 
48  if is_integer:
49  self.max_value = 2 ** (self.bitdepth - 1) - 1
50  else:
51  self.max_value = None
52 
53  self.topic_name = topic_name
54  self.expr_to_get_data = expr_to_get_data
55 
56  if auto_start:
57  self.subscribe()
58 
59  def __len__(self):
60  return len(self.data_buffer)
61 
62  @property
63  def window_size(self):
64  return self._window_size
65 
66  @window_size.setter
67  def window_size(self, size):
68  with self.lock:
69  self._window_size = size
70  self.data_buffer_len = int(self._window_size
71  * self.input_sample_rate)
72  self.data_buffer = np.array([], dtype=self.dtype)
73 
74  @staticmethod
75  def from_rosparam(**kwargs):
76  expr_to_get_data = rospy.get_param(
77  '~expression_to_get_data', 'm.data')
78  n_channel = rospy.get_param('~n_channel', 1)
79  target_channel = rospy.get_param('~target_channel', 0)
80  data_sampling_rate = rospy.get_param('~data_sampling_rate', 500)
81  is_integer = rospy.get_param('~is_integer', False)
82  is_signed = rospy.get_param('~is_signed', True)
83  bitdepth = rospy.get_param('~bitdepth', 64)
84  return DataBuffer(expr_to_get_data=expr_to_get_data,
85  input_sample_rate=data_sampling_rate,
86  is_integer=is_integer,
87  is_signed=is_signed,
88  bitdepth=bitdepth,
89  n_channel=n_channel,
90  target_channel=target_channel,
91  **kwargs)
92 
93  def subscribe(self):
94  self.data_buffer = np.array([], dtype=self.dtype)
95  # https://github.com/ros/ros_comm/blob/1.14.13/tools/topic_tools/scripts/transform#L86-L87
96  input_class, input_topic, self.input_fn = rostopic.get_topic_class(
97  rospy.resolve_name(self.topic_name), blocking=True)
98  # https://github.com/ros/ros_comm/blob/1.14.13/tools/topic_tools/scripts/transform#L95
99  self.sub = rospy.Subscriber(
100  input_topic, input_class, self.input_cb)
101 
102  def unsubscribe(self):
103  self.sub.unregister()
104 
105  def _read(self, size, normalize=False):
106  with self.lock:
107  if self.get_latest_data:
108  data_buffer = self.data_buffer[-size:]
109  else:
110  data_buffer = self.data_buffer[:size]
111  if self.discard_data:
112  self.data_buffer = self.data_buffer[size:]
113  if normalize is True:
114  if self.max_value is not None:
115  data_buffer = data_buffer / self.max_value
116  else:
117  rospy.logerr('normalize is disabled with current settings')
118  return data_buffer
119 
120  def sufficient_data(self, size):
121  return len(self.data_buffer) < size
122 
123  def read(self, size=None, wait=False, normalize=False):
124  if size is None:
125  size = self.data_buffer_len
126  size = int(size * self.input_sample_rate)
127  while wait is True \
128  and not rospy.is_shutdown() and len(self.data_buffer) < size:
129  rospy.sleep(0.001)
130  return self._read(size, normalize=normalize)
131 
132  def close(self):
133  try:
134  self.sub.unregister()
135  except Exception:
136  pass
137  self.data_buffer = np.array([], dtype=self.dtype)
138 
139  def input_cb(self, m):
140  # https://github.com/ros/ros_comm/blob/1.14.13/tools/topic_tools/scripts/transform#L98-L99
141  if self.input_fn is not None:
142  m = self.input_fn(m)
143  data = eval(self.expr_to_get_data)
144  data_type = type(data)
145  if data_type is bytes:
146  data_buffer = np.frombuffer(data, dtype=self.dtype)
147  elif data_type in [list, tuple]:
148  data_buffer = np.fromiter(data, dtype=self.dtype)
149  elif data_type in [int, float]:
150  data_buffer = np.array([data], dtype=self.dtype)
151  else:
152  rospy.logerr('Unsupported data type: {}'.format(data_type))
153  return
154  # This division into cases assumes
155  # expr_to_get_data returns a Python built-in numeric or sequence type
156  # deserialized from a ROS msg by scripts generated by genpy.
157  # Other than uint8[], this deserialization is executed with
158  # struct.Struct.unpack() and it returns int or float (or its tuple).
159  # Against uint8[], this deserialization returns bytes.
160  # /opt/ros/melodic/lib/python2.7/dist-packages/std_msgs/msg/_Float64.py
161  # /opt/ros/melodic/lib/python2.7/dist-packages/audio_common_msgs/msg/_AudioData.py
162  # https://docs.python.org/2.7/library/struct.html#format-characters
163  # https://github.com/ros/genpy/blob/0.6.16/src/genpy/generate_struct.py#L165
164 
165  data_buffer = data_buffer[self.target_channel::self.n_channel]
166  with self.lock:
167  self.data_buffer = np.append(
168  self.data_buffer, data_buffer)
169  self.data_buffer = self.data_buffer[
170  -self.data_buffer_len:]
audio_to_spectrogram.data_buffer.DataBuffer.input_cb
def input_cb(self, m)
Definition: data_buffer.py:139
audio_to_spectrogram.data_buffer.DataBuffer.type_code
type_code
Definition: data_buffer.py:24
audio_to_spectrogram.data_buffer.DataBuffer.lock
lock
Definition: data_buffer.py:19
audio_to_spectrogram.data_buffer.DataBuffer.read
def read(self, size=None, wait=False, normalize=False)
Definition: data_buffer.py:123
audio_to_spectrogram.data_buffer.DataBuffer.input_sample_rate
input_sample_rate
Definition: data_buffer.py:23
audio_to_spectrogram.data_buffer.DataBuffer.sufficient_data
def sufficient_data(self, size)
Definition: data_buffer.py:120
audio_to_spectrogram.data_buffer.DataBuffer.subscribe
def subscribe(self)
Definition: data_buffer.py:93
audio_to_spectrogram.data_buffer.DataBuffer.bitdepth
bitdepth
Definition: data_buffer.py:20
audio_to_spectrogram.data_buffer.DataBuffer.unsubscribe
def unsubscribe(self)
Definition: data_buffer.py:102
audio_to_spectrogram.data_buffer.DataBuffer.data_buffer_len
data_buffer_len
Definition: data_buffer.py:18
audio_to_spectrogram.data_buffer.DataBuffer.get_latest_data
get_latest_data
Definition: data_buffer.py:15
audio_to_spectrogram.data_buffer.DataBuffer._window_size
_window_size
Definition: data_buffer.py:17
audio_to_spectrogram.data_buffer.DataBuffer.topic_name
topic_name
Definition: data_buffer.py:43
audio_to_spectrogram.data_buffer.DataBuffer.max_value
max_value
Definition: data_buffer.py:39
audio_to_spectrogram.data_buffer.DataBuffer.__init__
def __init__(self, topic_name='~input', expr_to_get_data='m.data', input_sample_rate=500, window_size=10.0, is_integer=False, is_signed=True, bitdepth=64, n_channel=1, target_channel=0, get_latest_data=False, discard_data=False, auto_start=False)
Definition: data_buffer.py:13
audio_to_spectrogram.data_buffer.DataBuffer.discard_data
discard_data
Definition: data_buffer.py:16
audio_to_spectrogram.data_buffer.DataBuffer.data_buffer
data_buffer
Definition: data_buffer.py:36
audio_to_spectrogram.data_buffer.DataBuffer.sub
sub
Definition: data_buffer.py:99
audio_to_spectrogram.data_buffer.DataBuffer.close
def close(self)
Definition: data_buffer.py:132
audio_to_spectrogram.data_buffer.DataBuffer.target_channel
target_channel
Definition: data_buffer.py:22
audio_to_spectrogram.data_buffer.DataBuffer.window_size
def window_size(self)
Definition: data_buffer.py:63
audio_to_spectrogram.data_buffer.DataBuffer.input_fn
input_fn
Definition: data_buffer.py:96
audio_to_spectrogram.data_buffer.DataBuffer.__len__
def __len__(self)
Definition: data_buffer.py:59
audio_to_spectrogram.data_buffer.DataBuffer._read
def _read(self, size, normalize=False)
Definition: data_buffer.py:105
audio_to_spectrogram.data_buffer.DataBuffer.dtype
dtype
Definition: data_buffer.py:35
audio_to_spectrogram.data_buffer.DataBuffer.expr_to_get_data
expr_to_get_data
Definition: data_buffer.py:44
audio_to_spectrogram.data_buffer.DataBuffer.from_rosparam
def from_rosparam(**kwargs)
Definition: data_buffer.py:75
audio_to_spectrogram.data_buffer.DataBuffer.n_channel
n_channel
Definition: data_buffer.py:21
audio_to_spectrogram.data_buffer.DataBuffer.is_subscribing
is_subscribing
Definition: data_buffer.py:14
audio_to_spectrogram.data_buffer.DataBuffer
Definition: data_buffer.py:11


audio_to_spectrogram
Author(s):
autogenerated on Sat Feb 22 2025 04:03:22