event_scheduler.py
Go to the documentation of this file.
1 # Copyright 2017 Mycroft AI Inc.
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 # http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14 #
15 import json
16 import time
17 from threading import Thread, Lock
18 
19 from os.path import isfile, join, expanduser
20 
21 from mycroft.configuration import Configuration
22 from mycroft.messagebus.message import Message
23 from mycroft.util.log import LOG
24 
25 
26 def repeat_time(sched_time, repeat):
27  """Next scheduled time for repeating event. Guarantees that the
28  time is not in the past (but could skip interim events)
29 
30  Args:
31  sched_time (float): Scheduled unix time for the event
32  repeat (float): Repeat period in seconds
33 
34  Returns: (float) time for next event
35  """
36  next_time = sched_time + repeat
37  while next_time < time.time():
38  # Schedule at an offset to assure no doubles
39  next_time = time.time() + abs(repeat)
40  return next_time
41 
42 
43 class EventScheduler(Thread):
44  def __init__(self, bus, schedule_file='schedule.json'):
45  """
46  Create an event scheduler thread. Will send messages at a
47  predetermined time to the registered targets.
48 
49  Args:
50  bus: Mycroft messagebus (mycroft.messagebus)
51  schedule_file: File to store pending events to on shutdown
52  """
53  super(EventScheduler, self).__init__()
54  data_dir = expanduser(Configuration.get()['data_dir'])
55 
56  self.events = {}
57  self.event_lock = Lock()
58 
59  self.bus = bus
60  self.isRunning = True
61  self.schedule_file = join(data_dir, schedule_file)
62  if self.schedule_file:
63  self.load()
64 
65  self.bus.on('mycroft.scheduler.schedule_event',
67  self.bus.on('mycroft.scheduler.remove_event',
69  self.bus.on('mycroft.scheduler.update_event',
71  self.bus.on('mycroft.scheduler.get_event',
72  self.get_event_handler)
73  self.start()
74 
75  def load(self):
76  """
77  Load json data with active events from json file.
78  """
79  if isfile(self.schedule_file):
80  json_data = {}
81  with open(self.schedule_file) as f:
82  try:
83  json_data = json.load(f)
84  except Exception as e:
85  LOG.error(e)
86  current_time = time.time()
87  with self.event_lock:
88  for key in json_data:
89  event_list = json_data[key]
90  # discard non repeating events that has already happened
91  self.events[key] = [tuple(e) for e in event_list
92  if e[0] > current_time or e[1]]
93 
94  def run(self):
95  while self.isRunning:
96  self.check_state()
97  time.sleep(0.5)
98 
99  def check_state(self):
100  """
101  Check if an event should be triggered.
102  """
103  with self.event_lock:
104  # Check all events
105  pending_messages = []
106  for event in self.events:
107  current_time = time.time()
108  e = self.events[event]
109  # Get scheduled times that has passed
110  passed = [(t, r, d) for (t, r, d) in e if t <= current_time]
111  # and remaining times that we're still waiting for
112  remaining = [(t, r, d) for t, r, d in e if t > current_time]
113  # Trigger registered methods
114  for sched_time, repeat, data in passed:
115  pending_messages.append(Message(event, data))
116  # if this is a repeated event add a new trigger time
117  if repeat:
118  next_time = repeat_time(sched_time, repeat)
119  remaining.append((next_time, repeat, data))
120  # update list of events
121  self.events[event] = remaining
122 
123  # Remove events have are now completed
124  self.clear_empty()
125 
126  # Finally, emit the queued up events that triggered
127  for msg in pending_messages:
128  self.bus.emit(msg)
129 
130  def schedule_event(self, event, sched_time, repeat=None, data=None):
131  """ Add event to pending event schedule using thread safe queue.
132 
133  Args:
134  event (str): Handler for the event
135  sched_time ([type]): [description]
136  repeat ([type], optional): Defaults to None. [description]
137  data ([type], optional): Defaults to None. [description]
138  """
139  data = data or {}
140  with self.event_lock:
141  # get current list of scheduled times for event, [] if missing
142  event_list = self.events.get(event, [])
143 
144  # Don't schedule if the event is repeating and already scheduled
145  if repeat and event in self.events:
146  LOG.debug('Repeating event {} is already scheduled, discarding'
147  .format(event))
148  else:
149  # add received event and time
150  event_list.append((sched_time, repeat, data))
151  self.events[event] = event_list
152 
153  def schedule_event_handler(self, message):
154  """
155  Messagebus interface to the schedule_event method.
156  Required data in the message envelope is
157  event: event to emit
158  time: time to emit the event
159 
160  optional data is
161  repeat: repeat interval
162  data: data to send along with the event
163 
164  """
165  event = message.data.get('event')
166  sched_time = message.data.get('time')
167  repeat = message.data.get('repeat')
168  data = message.data.get('data')
169  if event and sched_time:
170  self.schedule_event(event, sched_time, repeat, data)
171  elif not event:
172  LOG.error('Scheduled event name not provided')
173  else:
174  LOG.error('Scheduled event time not provided')
175 
176  def remove_event(self, event):
177  with self.event_lock:
178  if event in self.events:
179  self.events.pop(event)
180 
181  def remove_event_handler(self, message):
182  """ Messagebus interface to the remove_event method. """
183  event = message.data.get('event')
184  self.remove_event(event)
185 
186  def update_event(self, event, data):
187  with self.event_lock:
188  # if there is an active event with this name
189  if len(self.events.get(event, [])) > 0:
190  time, repeat, _ = self.events[event][0]
191  self.events[event][0] = (time, repeat, data)
192 
193  def update_event_handler(self, message):
194  """ Messagebus interface to the update_event method. """
195  event = message.data.get('event')
196  data = message.data.get('data')
197  self.update_event(event, data)
198 
199  def get_event_handler(self, message):
200  """
201  Messagebus interface to get_event.
202  Emits another event sending event status
203  """
204  event_name = message.data.get("name")
205  event = None
206  with self.event_lock:
207  if event_name in self.events:
208  event = self.events[event_name]
209  emitter_name = 'mycroft.event_status.callback.{}'.format(event_name)
210  self.bus.emit(message.reply(emitter_name, data=event))
211 
212  def store(self):
213  """
214  Write current schedule to disk.
215  """
216  with self.event_lock:
217  with open(self.schedule_file, 'w') as f:
218  json.dump(self.events, f)
219 
220  def clear_repeating(self):
221  """
222  Remove repeating events from events dict.
223  """
224  with self.event_lock:
225  for e in self.events:
226  self.events[e] = [i for i in self.events[e] if i[1] is None]
227 
228  def clear_empty(self):
229  """
230  Remove empty event entries from events dict
231  """
232  with self.event_lock:
233  self.events = {k: self.events[k] for k in self.events
234  if self.events[k] != []}
235 
236  def shutdown(self):
237  """ Stop the running thread. """
238  self.isRunning = False
239  # Remove listeners
240  self.bus.remove_all_listeners('mycroft.scheduler.schedule_event')
241  self.bus.remove_all_listeners('mycroft.scheduler.remove_event')
242  self.bus.remove_all_listeners('mycroft.scheduler.update_event')
243  # Wait for thread to finish
244  self.join()
245  # Prune event list in preparation for saving
246  self.clear_repeating()
247  self.clear_empty()
248  # Store all pending scheduled events
249  self.store()
def __init__(self, bus, schedule_file='schedule.json')
def schedule_event(self, event, sched_time, repeat=None, data=None)
def repeat_time(sched_time, repeat)


mycroft_ros
Author(s):
autogenerated on Mon Apr 26 2021 02:35:40