defaultCallbacks.py
Go to the documentation of this file.
1 # Import config
2 from config import *
3 
4 # Other imports
5 import os, bson, datetime
6 from tf.transformations import euler_from_quaternion
7 
8 # Default callback function to topics
9 
10 def debug(data, topic) -> None:
11  print(data)
12 
13 # Store data just if is difference
14 def diffStore(data, topic) -> None:
15  # Compare dictionaries
16  def compare_dict(dict1, dict2)->bool:
17  # Check size of the dicts
18  if len(dict1) != len(dict2):
19  return False
20  # Check keys and values
21  for key, value in dict1.items():
22  if key not in dict2:
23  return False
24  # If is nested
25  if isinstance(value, dict) and isinstance(dict2[key], dict):
26  if not compare_dict(value, dict2[key]):
27  return False
28  elif dict2[key] != value:
29  return False
30  return True
31  # Write a BSON in a binary file
32  def writeBfile(data, path):
33  file = open(file=path, mode='bw+')
34  file.write(bson.encode(document=data))
35  file.close()
36 
37  # Create a local data
38  _data = data.copy()
39  _data.pop('dateTime')
40  # Set file path and name, extension temporary JSON (.tjson)
41  filePath = PATH + str(topic['topic'].replace('/', '') + '.tjson')
42  # Check if path exists
43  if not os.path.exists(path=PATH):
44  os.chmod
45  os.makedirs(name=PATH)
46  # Open file
47  if not os.path.exists(path=filePath):
48  file = open(file=filePath, mode='bw+')
49  else:
50  file = open(file=filePath, mode='br+')
51  # Read the file
52  _file = file.read()
53  file.close()
54  # Compare 'data' with the data in file
55  if not _file == b'':
56  # Decode the bson
57  try:
58  _file = bson.BSON.decode(_file)
59  except bson.errors.InvalidBSON:
60  writeBfile(data=_data, path=filePath)
61  return None
62  # Compare the dictionaries
63  if compare_dict(_data, _file):
64  data.clear()
65  else:
66  writeBfile(data=_data, path=filePath)
67  # The file is void
68  else:
69  writeBfile(data=_data, path=filePath)
70  # Close file
71  file.close()
72 
73 # Quaternion to euler callback
74 def q2e(data, topic) -> None:
75  # Get orientation
76  orientation = data['pose']['pose']['orientation']
77  # Convert
78  (raw, pitch, yaw) = euler_from_quaternion([orientation['x'], orientation['y'], orientation['z'], orientation['w']])
79  # Add in a dictionary
80  orientation = {
81  'raw' : raw,
82  'pitch' : pitch,
83  'yaw' : yaw,
84  }
85  # Update the data to storage
86  data.update({'pose': {'pose': {'position': data['pose']['pose']['position'], 'orientation': orientation}}})
defaultCallbacks.debug
None debug(data, topic)
Debug function.
Definition: defaultCallbacks.py:10
defaultCallbacks.q2e
None q2e(data, topic)
Definition: defaultCallbacks.py:74
defaultCallbacks.diffStore
None diffStore(data, topic)
Definition: defaultCallbacks.py:14


cedri_node_monitoring
Author(s): Andre Luis Frana
autogenerated on Sat Jul 1 2023 02:34:25