client/ws.py
Go to the documentation of this file.
1 # Copyright 2017 Mycroft AI Inc.
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 # http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14 #
15 import json
16 import time
17 from threading import Event
18 import traceback
19 
20 from .threaded_event_emitter import ThreadedEventEmitter
21 from websocket import (WebSocketApp, WebSocketConnectionClosedException,
22  WebSocketException)
23 
24 from mycroft.configuration import Configuration
25 from mycroft.messagebus.message import Message
26 from mycroft.util import validate_param, create_echo_function
27 from mycroft.util.log import LOG
28 
29 
31  def __init__(self, host=None, port=None, route=None, ssl=None):
32 
33  config = Configuration.get().get("websocket")
34  host = host or config.get("host")
35  port = port or config.get("port")
36  route = route or config.get("route")
37  ssl = ssl or config.get("ssl")
38  validate_param(host, "websocket.host")
39  validate_param(port, "websocket.port")
40  validate_param(route, "websocket.route")
41 
42  self.url = WebsocketClient.build_url(host, port, route, ssl)
43  self.emitter = ThreadedEventEmitter()
44  self.client = self.create_client()
45  self.retry = 5
46  self.connected_event = Event()
47  self.started_running = False
48 
49  @staticmethod
50  def build_url(host, port, route, ssl):
51  scheme = "wss" if ssl else "ws"
52  return scheme + "://" + host + ":" + str(port) + route
53 
54  def create_client(self):
55  return WebSocketApp(self.url,
56  on_open=self.on_open, on_close=self.on_close,
57  on_error=self.on_error, on_message=self.on_message)
58 
59  def on_open(self):
60  LOG.info("Connected")
61  self.connected_event.set()
62  self.emitter.emit("open")
63  # Restore reconnect timer to 5 seconds on sucessful connect
64  self.retry = 5
65 
66  def on_close(self):
67  self.emitter.emit("close")
68 
69  def on_error(self, error):
70  """ On error start trying to reconnect to the websocket. """
71  if isinstance(error, WebSocketConnectionClosedException):
72  LOG.warning('Could not send message because connection has closed')
73  else:
74  LOG.exception('=== ' + repr(error) + ' ===')
75 
76  try:
77  self.emitter.emit('error', error)
78  if self.client.keep_running:
79  self.client.close()
80  except Exception as e:
81  LOG.error('Exception closing websocket: ' + repr(e))
82 
83  LOG.warning("WS Client will reconnect in %d seconds." % self.retry)
84  time.sleep(self.retry)
85  self.retry = min(self.retry * 2, 60)
86  try:
87  self.emitter.emit('reconnecting')
88  self.client = self.create_client()
89  self.run_forever()
90  except WebSocketException:
91  pass
92 
93  def on_message(self, message):
94  parsed_message = Message.deserialize(message)
95  self.emitter.emit('message', message)
96  self.emitter.emit(parsed_message.type, parsed_message)
97 
98  def emit(self, message):
99  if not self.connected_event.wait(10):
100  if not self.started_running:
101  raise ValueError('You must execute run_forever() '
102  'before emitting messages')
103  self.connected_event.wait()
104 
105  try:
106  if hasattr(message, 'serialize'):
107  self.client.send(message.serialize())
108  else:
109  self.client.send(json.dumps(message.__dict__))
110  except WebSocketConnectionClosedException:
111  LOG.warning('Could not send {} message because connection '
112  'has been closed'.format(message.type))
113 
114  def wait_for_response(self, message, reply_type=None, timeout=None):
115  """Send a message and wait for a response.
116 
117  Args:
118  message (Message): message to send
119  reply_type (str): the message type of the expected reply.
120  Defaults to "<message.type>.response".
121  timeout: seconds to wait before timeout, defaults to 3
122  Returns:
123  The received message or None if the response timed out
124  """
125  response = []
126 
127  def handler(message):
128  """Receive response data."""
129  response.append(message)
130 
131  # Setup response handler
132  self.once(reply_type or message.type + '.response', handler)
133  # Send request
134  self.emit(message)
135  # Wait for response
136  start_time = time.monotonic()
137  while len(response) == 0:
138  time.sleep(0.2)
139  if time.monotonic() - start_time > (timeout or 3.0):
140  try:
141  self.remove(reply_type, handler)
142  except (ValueError, KeyError):
143  # ValueError occurs on pyee 1.0.1 removing handlers
144  # registered with once.
145  # KeyError may theoretically occur if the event occurs as
146  # the handler is removed
147  pass
148  return None
149  return response[0]
150 
151  def on(self, event_name, func):
152  self.emitter.on(event_name, func)
153 
154  def once(self, event_name, func):
155  self.emitter.once(event_name, func)
156 
157  def remove(self, event_name, func):
158  try:
159  if event_name in self.emitter._events:
160  LOG.debug("Removing found '"+str(event_name)+"'")
161  else:
162  LOG.debug("Not able to find '"+str(event_name)+"'")
163  self.emitter.remove_listener(event_name, func)
164  except ValueError:
165  LOG.warning('Failed to remove event {}: {}'.format(event_name,
166  str(func)))
167  for line in traceback.format_stack():
168  LOG.warning(line.strip())
169 
170  if event_name in self.emitter._events:
171  LOG.debug("Removing found '"+str(event_name)+"'")
172  else:
173  LOG.debug("Not able to find '"+str(event_name)+"'")
174  LOG.warning("Existing events: " + str(self.emitter._events))
175  for evt in self.emitter._events:
176  LOG.warning(" "+str(evt))
177  LOG.warning(" "+str(self.emitter._events[evt]))
178  if event_name in self.emitter._events:
179  LOG.debug("Removing found '"+str(event_name)+"'")
180  else:
181  LOG.debug("Not able to find '"+str(event_name)+"'")
182  LOG.warning('----- End dump -----')
183 
184  def remove_all_listeners(self, event_name):
185  '''
186  Remove all listeners connected to event_name.
187 
188  Args:
189  event_name: event from which to remove listeners
190  '''
191  if event_name is None:
192  raise ValueError
193  self.emitter.remove_all_listeners(event_name)
194 
195  def run_forever(self):
196  self.started_running = True
197  self.client.run_forever()
198 
199  def close(self):
200  self.client.close()
201  self.connected_event.clear()
202 
203 
204 def echo():
205  ws = WebsocketClient()
206 
207  def repeat_utterance(message):
208  message.type = 'speak'
209  ws.emit(message)
210 
211  ws.on('message', create_echo_function(None))
212  ws.on('recognizer_loop:utterance', repeat_utterance)
213  ws.run_forever()
214 
215 
216 if __name__ == "__main__":
217  echo()
def wait_for_response(self, message, reply_type=None, timeout=None)
Definition: client/ws.py:114
def once(self, event_name, func)
Definition: client/ws.py:154
def remove(self, event_name, func)
Definition: client/ws.py:157
def validate_param(value, name)
def remove_all_listeners(self, event_name)
Definition: client/ws.py:184
def build_url(host, port, route, ssl)
Definition: client/ws.py:50
def create_echo_function(name, whitelist=None)
def on(self, event_name, func)
Definition: client/ws.py:151
def __init__(self, host=None, port=None, route=None, ssl=None)
Definition: client/ws.py:31
def get(phrase, lang=None, context=None)


mycroft_ros
Author(s):
autogenerated on Mon Apr 26 2021 02:35:40