4 from config
import PATH, CLIENT
7 from pymongo
import errors
as pymongo_erros
8 from bson
import errors
as bson_errors
14 rospy.init_node(
'bufferManager', anonymous=
False)
15 rospy.loginfo(
"Buffer manager started")
19 while not rospy.is_shutdown():
22 if len(os.listdir(path=PATH)) > 0
and CLIENT.is_primary:
27 for i
in range(0,10): rate.sleep()
28 except FileNotFoundError:
31 os.makedirs(name=PATH)
32 except (pymongo_erros.ConnectionFailure, pymongo_erros.ServerSelectionTimeoutError):
34 for i
in range(0,10): rate.sleep()
35 except Exception
as e:
36 rospy.logerr(
"Error on file queue")
37 rospy.logerr(
"An exception occurred:", type(e).__name__,e.args)
44 files = sorted(os.listdir(path=PATH), reverse=
True)
45 files = list(filter(
lambda file: file.endswith(
".cjson"), files))
46 except Exception
as e:
47 rospy.logerr(
"Error on get the file list")
48 rospy.logerr(
"An exception occurred:", type(e).__name__,e.args)
52 get = open(file=PATH+file, mode=
'rb')
53 except Exception
as e:
54 rospy.logerr(
"Error on file open: " + file)
57 data = bson.BSON.decode(get.read())
58 except Exception
as e:
59 rospy.logerr(
"Errosr on file decode: " + file)
60 rospy.logerr(
"An exception occurred:", type(e).__name__,e.args)
62 self.
rm(file=file, msg=
"Error on BSON")
64 if self.
send2cloud(dataPath=data[
'dataPath'], content=data[
'content']):
69 def rm(self, file, msg = None):
73 rospy.loginfo(
"Delete file " + file +
" | " + msg)
75 except Exception
as e:
76 rospy.logerr(
"Error on the file remove")
77 rospy.logerr(
"An exception occurred:", type(e).__name__,e.args)
84 test = dataPath[
'dataSource']
85 test = dataPath[
'dataBase']
86 test = dataPath[
'collection']
87 except Exception
as e:
88 rospy.logerr(
"Error in storage data path")
89 rospy.logerr(
"An exception occurred:", type(e).__name__,e.args)
93 if not isinstance(content, list):
95 return CLIENT[dataPath[
'dataBase']][dataPath[
'collection']].insert_many(content).acknowledged
96 except pymongo_erros.DuplicateKeyError:
99 except Exception
as e:
100 rospy.logerr(
"Error when update to cloud")
101 rospy.logerr(
"An exception occurred:", type(e).__name__,e.args)
105 if __name__ ==
'__main__':
108 except rospy.ROSInterruptException: