queueActions.py
Go to the documentation of this file.
1 #!/usr/bin/env python3
2 
3 
4 # Import config
5 from config import *
6 
7 # Import librarys
8 import pymongo, bson, datetime, rospy, os, json, subprocess, fractions
9 from pymongo import errors as pymongo_erros
10 
11 # Messages
12 from std_msgs.msg import String
13 
14 # Define default action collection
15 COLL = '/actions'
16 
17 # Queue action class
19 
20  def __init__(self) -> None:
21  # Start the node
22  rospy.init_node('queueActions', anonymous=False)
23  rospy.loginfo("Queue of actions topics started")
24  # Create a subscriber to get action from another ROS topics
25  rospy.Subscriber("addNewActions", String, self.callback_addNewActions)
26  rospy.loginfo("Create the subscriber /addNewActions")
27  # Define a global queue empty
28  self.LocalQueue = []
29  # Do the initial query
30  rospy.loginfo("Starting synchronisation")
31  while not self.initialQuery():
32  rospy.sleep(5)
33  None
34  # Awaiting instructions
35  rospy.loginfo("Awaiting actions")
36  while not rospy.is_shutdown():
37  # Get action from cloud
38  self.getCloudActions()
39  # Try to execute the actions in the local queue
40  self.runLocalQueue()
41  # Sync with cloud
42  self.send2cloud(content=self.LocalQueue)
43  # Wait for 1 second
44  rospy.sleep(1)
45 
46  # Get the list of action available on cloud, or run a pipeline
47  def queryCloud(self, pipeline=None):
48  # Get the list from cloud
49  try:
50  result = list(CLIENT[DATALAKE][COLL].aggregate(pipeline=pipeline))
51  return result
52  except Exception as e:
53  rospy.logerr("Error on query the cloud")
54  rospy.logerr("An exception occurred:", type(e).__name__,e.args)
55  return None
56 
57  # Local action send callback
58  def callback_addNewActions(self, msg):
59  # Get the value of msg
60  try:
61  data = msg.data
62  data = json.loads(data)
63  except Exception as e:
64  rospy.logerr("Error on the convert the mesage: " + str(msg))
65  rospy.logerr("An exception occurred:", type(e).__name__,e.args)
66  # Get the values of command and priority
67  try:
68  _data = {
69  "_id": bson.ObjectId(),
70  "command": data['command'],
71  "dateTime": datetime.datetime.now(),
72  "source": 'robot',
73  "status": 'wait'
74  }
75  except Exception as e:
76  rospy.logerr("Error on parse a new action")
77  rospy.logerr("An exception occurred:", type(e).__name__,e.args)
78  # Add the action to the local queue
79  try:
80  self.add2LocalQueue(_data)
81  except Exception as e:
82  rospy.logerr("Error on add to local queue")
83  rospy.logerr("An exception occurred:", type(e).__name__,e.args)
84 
85  # Run action
86  def runAction(self, action):
87  # Try to execute the action
88  try:
89  _command = action['command']
90  rospy.loginfo("Run command " + _command)
91  result = subprocess.Popen(_command, shell=True)
92  rospy.loginfo("Send")
93  action['status'] = 'runned'
94  except Exception as e:
95  rospy.logerr("Error in the execution of " + str(action))
96  rospy.logerr("An exception occurred:", type(e).__name__,e.args)
97  action['status'] = 'error'
98 
99  # Add action to the queue
100  def add2LocalQueue(self, data):
101  # Check local queue
102  try:
103  rospy.loginfo("Add action")
104  for action in self.LocalQueue:
105  if data['source'] == 'admin':
106  action['status'] = 'canceled'
107  # Action is not on list and is wait
108  elif action['command'] == data['command'] and action['status'] == 'wait':
109  data['status'] = 'canceled'
110  except Exception as e:
111  rospy.logerr("Error in the check local queue for the action: " + str(data))
112  rospy.logerr("An exception occurred:", type(e).__name__,e.args)
113  # Add the action to the local queue
114  try:
115  self.LocalQueue.append(data)
116  except Exception as e:
117  rospy.logerr("Error add the action: " + str(data))
118  rospy.logerr("An exception occurred:", type(e).__name__,e.args)
119 
120  # Add|update documents to the cloud
121  def send2cloud(self, content) -> bool:
122  # Check if content is a list or just one document
123  if not isinstance(content, list):
124  content = [content]
125  # Send or uupdate all documents in content
126  for document in content:
127  # Try send to the cloud
128  try:
129  _result = CLIENT[DATALAKE][COLL].find_one_and_update(filter={'_id': document['_id']},update={'$set':document}, upsert=True)
130  # If document is not in 'wait' delete
131  if not _result['status'] == 'wait':
132  content.remove(document)
133  rospy.loginfo("Remove action from queue: " + str(document))
134  except Exception as e:
135  rospy.logerr("Error on send document to cloud: " + str(document) )
136  rospy.logerr("An exception occurred:", type(e).__name__,e.args)
137  return False
138  return True
139 
140  # The initial query
141  def initialQuery(self) -> bool:
142  PIPELINE_CHANGE_WAIT = [
143  {
144  '$match': {
145  'status': 'wait'
146  }
147  }, {
148  '$addFields': {
149  'status': 'canceled'
150  }
151  }
152  ]
153  rospy.loginfo("Run the initial query againts " + COLL)
154  # Get 'wait' actions in cloud
155  initial = self.queryCloud(pipeline=PIPELINE_CHANGE_WAIT)
156  # Send documents to cloud
157  return self.send2cloud(initial)
158 
159  # Get actions from cloud
160  def getCloudActions(self):
161  # Get just wait items aggregations
162  PIPELINE_GET_WAIT = [{
163  '$match': {
164  'status': 'wait'
165  }
166  },]
167  _result = self.queryCloud(pipeline=PIPELINE_GET_WAIT)
168  # Add actions to the local queu
169  for action in _result:
170  self.add2LocalQueue(action)
171 
172  # Run the actions in the local queue
173  def runLocalQueue(self):
174  # Run all actions in localqueue
175  for action in self.LocalQueue:
176  try:
177  # Check if the action is waiting
178  if action['status'] == 'wait':
179  self.runAction(action=action)
180  except Exception as e:
181  rospy.logerr("Error action: " + str(action) )
182  rospy.logerr("An exception occurred:", type(e).__name__,e.args)
183 
184 
185 if __name__ == '__main__':
186  try:
187  # Run action
188  queueActions()
189  except rospy.ROSInterruptException:
190  pass
queueActions.queueActions.add2LocalQueue
def add2LocalQueue(self, data)
Definition: queueActions.py:100
queueActions.queueActions.getCloudActions
def getCloudActions(self)
Definition: queueActions.py:160
queueActions.queueActions.__init__
None __init__(self)
Definition: queueActions.py:20
queueActions.queueActions.runAction
def runAction(self, action)
Definition: queueActions.py:86
queueActions.queueActions.send2cloud
bool send2cloud(self, content)
Definition: queueActions.py:121
queueActions.queueActions.callback_addNewActions
def callback_addNewActions(self, msg)
Definition: queueActions.py:58
queueActions.queueActions.queryCloud
def queryCloud(self, pipeline=None)
Definition: queueActions.py:47
queueActions.queueActions
Definition: queueActions.py:18
queueActions.queueActions.initialQuery
bool initialQuery(self)
Definition: queueActions.py:141
queueActions.queueActions.LocalQueue
LocalQueue
Definition: queueActions.py:28
queueActions.queueActions.runLocalQueue
def runLocalQueue(self)
Definition: queueActions.py:173


cedri_node_monitoring
Author(s): Andre Luis Frana
autogenerated on Sat Jul 1 2023 02:34:25