audio_to_spectrum.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 
3 import numpy as np
4 import rospy
5 
6 from audio_common_msgs.msg import AudioData
7 from jsk_recognition_msgs.msg import Spectrum
8 
9 
10 # This node execute FFT to audio (audio_common_msgs/AudioData)
11 # The format of audio topic is assumed to be wave.
12 
13 class AudioToSpectrum(object):
14 
15  def __init__(self):
16  super(AudioToSpectrum, self).__init__()
17 
18  # Audio topic config
19  # The number of channels in audio data
20  self.n_channel = rospy.get_param('~n_channel', 1)
21  # Sampling rate of microphone (namely audio topic).
22  mic_sampling_rate = rospy.get_param('~mic_sampling_rate', 16000)
23  # Period[s] to sample audio data for one fft
24  fft_sampling_period = rospy.get_param('~fft_sampling_period', 0.3)
25  # Bits per one audio data
26  bitdepth = rospy.get_param('~bitdepth', 16)
27  if bitdepth == 16:
28  self.dtype = 'int16'
29  else:
30  rospy.logerr("'~bitdepth' {} is unsupported.".format(bitdepth))
31  # Audio topic buffer
32  self.audio_buffer = np.array([], dtype=self.dtype)
33  # How long audio_buffer should be for one audio topic
34  self.audio_buffer_len = int(mic_sampling_rate * fft_sampling_period)
35 
36  # fft config
37  window_function = np.arange(
38  0.0, 1.0, 1.0 / self.audio_buffer_len)
39  self.window_function = 0.54 - 0.46 * np.cos(
40  2 * np.pi * window_function)
41  high_cut_freq = rospy.get_param('~high_cut_freq', 800)
42  if high_cut_freq > mic_sampling_rate / 2:
43  rospy.logerr('Set high_cut_freq lower than {} Hz'.format(
44  mic_sampling_rate / 2))
45  low_cut_freq = rospy.get_param('~low_cut_freq', 1) # remove 0 Hz
46  self.freq = np.fft.fftfreq(
47  self.audio_buffer_len, d=1./mic_sampling_rate)
48  self.cutoff_mask = np.where(
49  (low_cut_freq <= self.freq) & (self.freq <= high_cut_freq),
50  True, False)
51  # How many times fft is executed in one second
52  # fft_exec_rate equals to output spectrogram hz
53  self.fft_exec_rate = rospy.get_param('~fft_exec_rate', 50)
54 
55  # Publisher and Subscriber
56  rospy.Subscriber(
57  '~audio', AudioData, self.audio_cb)
58  self.pub_spectrum = rospy.Publisher(
59  '~spectrum', Spectrum, queue_size=1)
60  self.pub_spectrum_filtered = rospy.Publisher(
61  '~spectrum_filtered', Spectrum, queue_size=1)
62  rospy.Timer(rospy.Duration(1. / self.fft_exec_rate), self.timer_cb)
63 
64  def audio_cb(self, msg):
65  # Convert audio buffer to int array
66  data = msg.data
67  audio_buffer = np.frombuffer(data, dtype=self.dtype)
68  # Retreive one channel data
69  audio_buffer = audio_buffer[0::self.n_channel]
70  # Save audio msg to audio_buffer
71  self.audio_buffer = np.append(
72  self.audio_buffer, audio_buffer)
73  self.audio_buffer = self.audio_buffer[
74  -self.audio_buffer_len:]
75 
76  def timer_cb(self, timer):
77  if len(self.audio_buffer) != self.audio_buffer_len:
78  return
79  # Calc spectrum by fft
80  amplitude = np.fft.fft(self.audio_buffer * self.window_function)
81  amplitude = np.log(np.abs(amplitude))
82  spectrum_msg = Spectrum()
83  spectrum_msg.header.stamp = rospy.Time.now()
84  spectrum_msg.amplitude = amplitude
85  spectrum_msg.frequency = self.freq
86  self.pub_spectrum.publish(spectrum_msg)
87  spectrum_msg.amplitude = amplitude[self.cutoff_mask]
88  spectrum_msg.frequency = self.freq[self.cutoff_mask]
89  self.pub_spectrum_filtered.publish(spectrum_msg)
90 
91 
92 if __name__ == '__main__':
93  rospy.init_node('audio_to_spectrum')
95  rospy.spin()


audio_to_spectrogram
Author(s):
autogenerated on Mon May 3 2021 03:02:58