threaded_event_emitter.py
Go to the documentation of this file.
1 # Copyright 2019 Mycroft AI Inc.
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 # http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14 #
15 from pyee import EventEmitter
16 from multiprocessing.pool import ThreadPool
17 from collections import defaultdict
18 
19 
20 class ThreadedEventEmitter(EventEmitter):
21  """ Event Emitter using the threadpool to run event functions in
22  separate threads.
23  """
24  def __init__(self, threads=10):
25  super().__init__()
26  self.pool = ThreadPool(threads)
27  self.wrappers = defaultdict(list)
28 
29  def on(self, event, f=None):
30  """ Wrap on with a threaded launcher. """
31  def wrapped(*args, **kwargs):
32  return self.pool.apply_async(f, args, kwargs)
33 
34  w = super().on(event, wrapped)
35  # Store mapping from function to wrapped function
36  self.wrappers[event].append((f, wrapped))
37  return w
38 
39  def once(self, event, f=None):
40  """ Wrap once with a threaded launcher. """
41  def wrapped(*args, **kwargs):
42  return self.pool.apply_async(f, args, kwargs)
43 
44  wrapped = super().once(event, wrapped)
45  self.wrappers[event].append((f, wrapped))
46  return wrapped
47 
48  def remove_listener(self, event_name, func):
49  """ Wrap the remove to translate from function to wrapped
50  function.
51  """
52  for w in self.wrappers[event_name]:
53  if w[0] == func:
54  self.wrappers[event_name].remove(w)
55  return super().remove_listener(event_name, w[1])
56  # if no wrapper exists try removing the function
57  return super().remove_listener(event_name, func)


mycroft_ros
Author(s):
autogenerated on Mon Apr 26 2021 02:35:40