8 import pymongo, bson, datetime, rospy, os, json, subprocess, fractions
9 from pymongo
import errors
as pymongo_erros
12 from std_msgs.msg
import String
22 rospy.init_node(
'queueActions', anonymous=
False)
23 rospy.loginfo(
"Queue of actions topics started")
26 rospy.loginfo(
"Create the subscriber /addNewActions")
30 rospy.loginfo(
"Starting synchronisation")
35 rospy.loginfo(
"Awaiting actions")
36 while not rospy.is_shutdown():
50 result = list(CLIENT[DATALAKE][COLL].aggregate(pipeline=pipeline))
52 except Exception
as e:
53 rospy.logerr(
"Error on query the cloud")
54 rospy.logerr(
"An exception occurred:", type(e).__name__,e.args)
62 data = json.loads(data)
63 except Exception
as e:
64 rospy.logerr(
"Error on the convert the mesage: " + str(msg))
65 rospy.logerr(
"An exception occurred:", type(e).__name__,e.args)
69 "_id": bson.ObjectId(),
70 "command": data[
'command'],
71 "dateTime": datetime.datetime.now(),
75 except Exception
as e:
76 rospy.logerr(
"Error on parse a new action")
77 rospy.logerr(
"An exception occurred:", type(e).__name__,e.args)
81 except Exception
as e:
82 rospy.logerr(
"Error on add to local queue")
83 rospy.logerr(
"An exception occurred:", type(e).__name__,e.args)
89 _command = action[
'command']
90 rospy.loginfo(
"Run command " + _command)
91 result = subprocess.Popen(_command, shell=
True)
93 action[
'status'] =
'runned'
94 except Exception
as e:
95 rospy.logerr(
"Error in the execution of " + str(action))
96 rospy.logerr(
"An exception occurred:", type(e).__name__,e.args)
97 action[
'status'] =
'error'
103 rospy.loginfo(
"Add action")
105 if data[
'source'] ==
'admin':
106 action[
'status'] =
'canceled'
108 elif action[
'command'] == data[
'command']
and action[
'status'] ==
'wait':
109 data[
'status'] =
'canceled'
110 except Exception
as e:
111 rospy.logerr(
"Error in the check local queue for the action: " + str(data))
112 rospy.logerr(
"An exception occurred:", type(e).__name__,e.args)
116 except Exception
as e:
117 rospy.logerr(
"Error add the action: " + str(data))
118 rospy.logerr(
"An exception occurred:", type(e).__name__,e.args)
123 if not isinstance(content, list):
126 for document
in content:
129 _result = CLIENT[DATALAKE][COLL].find_one_and_update(filter={
'_id': document[
'_id']},update={
'$set':document}, upsert=
True)
131 if not _result[
'status'] ==
'wait':
132 content.remove(document)
133 rospy.loginfo(
"Remove action from queue: " + str(document))
134 except Exception
as e:
135 rospy.logerr(
"Error on send document to cloud: " + str(document) )
136 rospy.logerr(
"An exception occurred:", type(e).__name__,e.args)
142 PIPELINE_CHANGE_WAIT = [
153 rospy.loginfo(
"Run the initial query againts " + COLL)
155 initial = self.
queryCloud(pipeline=PIPELINE_CHANGE_WAIT)
162 PIPELINE_GET_WAIT = [{
167 _result = self.
queryCloud(pipeline=PIPELINE_GET_WAIT)
169 for action
in _result:
178 if action[
'status'] ==
'wait':
180 except Exception
as e:
181 rospy.logerr(
"Error action: " + str(action) )
182 rospy.logerr(
"An exception occurred:", type(e).__name__,e.args)
185 if __name__ ==
'__main__':
189 except rospy.ROSInterruptException: