scripts/mycroft/identity/__init__.py
Go to the documentation of this file.
1 # Copyright 2017 Mycroft AI Inc.
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 # http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14 #
15 import json
16 import time
17 import os
18 
19 from mycroft.filesystem import FileSystemAccess
20 from mycroft.util.log import LOG
21 from mycroft.util.combo_lock import ComboLock
22 identity_lock = ComboLock('/tmp/identity-lock')
23 
24 
26  def __init__(self, **kwargs):
27  self.uuid = kwargs.get("uuid", "")
28  self.access = kwargs.get("access", "")
29  self.refresh = kwargs.get("refresh", "")
30  self.expires_at = kwargs.get("expires_at", 0)
31 
32  def is_expired(self):
33  return self.refresh and 0 < self.expires_at <= time.time()
34 
35  def has_refresh(self):
36  return self.refresh != ""
37 
38 
40  __identity = None
41 
42  @staticmethod
43  def _load():
44  LOG.debug('Loading identity')
45  try:
46  with FileSystemAccess('identity').open('identity2.json', 'r') as f:
47  IdentityManager.__identity = DeviceIdentity(**json.load(f))
48  except Exception:
49  IdentityManager.__identity = DeviceIdentity()
50 
51  @staticmethod
52  def load(lock=True):
53  try:
54  if lock:
55  identity_lock.acquire()
56  IdentityManager._load()
57  finally:
58  if lock:
59  identity_lock.release()
60  return IdentityManager.__identity
61 
62  @staticmethod
63  def save(login=None, lock=True):
64  LOG.debug('Saving identity')
65  if lock:
66  identity_lock.acquire()
67  try:
68  if login:
69  IdentityManager._update(login)
70  with FileSystemAccess('identity').open('identity2.json', 'w') as f:
71  json.dump(IdentityManager.__identity.__dict__, f)
72  f.flush()
73  os.fsync(f.fileno())
74  finally:
75  if lock:
76  identity_lock.release()
77 
78  @staticmethod
79  def _update(login=None):
80  LOG.debug('Updaing identity')
81  login = login or {}
82  expiration = login.get("expiration", 0)
83  IdentityManager.__identity.uuid = login.get("uuid", "")
84  IdentityManager.__identity.access = login.get("accessToken", "")
85  IdentityManager.__identity.refresh = login.get("refreshToken", "")
86  IdentityManager.__identity.expires_at = time.time() + expiration
87 
88  @staticmethod
89  def update(login=None, lock=True):
90  if lock:
91  identity_lock.acquire()
92  try:
93  IdentityManager._update()
94  finally:
95  if lock:
96  identity_lock.release()
97 
98  @staticmethod
99  def get():
100  if not IdentityManager.__identity:
101  IdentityManager.load()
102  return IdentityManager.__identity


mycroft_ros
Author(s):
autogenerated on Mon Apr 26 2021 02:35:40