Go to the documentation of this file.00001
00002
00003
00004 """
00005 Provides a service to store ROS message objects in a mongodb database in JSON.
00006 """
00007
00008 import rospy
00009 import actionlib
00010 from mongodb_store_msgs.msg import MoveEntriesAction, MoveEntriesGoal, StringList
00011 import datetime as dt
00012 from threading import Thread, Event
00013 import sys
00014 import signal
00015 import time
00016
00017 from mongodb_store.message_store import MessageStoreProxy
00018
00019 class PeriodicReplicatorClient(Thread):
00020 def __init__(self):
00021 Thread.__init__(self)
00022 self.dead = Event()
00023 self.interval = rospy.get_param("mongodb_replication_interval", 60 * 60 * 24)
00024 self.delete_after_move = rospy.get_param("mongodb_replication_delete_after_move", False)
00025 self.replicate_interval = rospy.Duration(self.interval)
00026 self.database = rospy.get_param("robot/database")
00027 self.collections = rospy.myargv()[1:]
00028 try:
00029 self.collections.append(rospy.get_param("robot/name"))
00030 except KeyError as e:
00031 rospy.logerr("specify param \"robot/name\" (e.g. pr1012, olive)")
00032 exit(1)
00033 self.periodic = rospy.get_param("~periodic", True)
00034 self.date_msg_store = MessageStoreProxy(database=self.database,
00035 collection="replication")
00036 rospy.loginfo("replication enabled: db: %s, collection: %s, periodic: %s",
00037 self.database, self.collections, self.periodic)
00038 if self.periodic:
00039 rospy.loginfo("periodic replication interval: %d [sec]", self.interval)
00040
00041
00042 def run(self):
00043 while not self.dead.wait(self.interval):
00044 move_before = self.time_after_last_replicate_date()
00045 self.move_entries(move_before)
00046 self.insert_replicate_date()
00047
00048 def time_after_last_replicate_date(self):
00049 delta = 0
00050 try:
00051 last_replicated = self.date_msg_store.query(StringList._type, single=True, sort_query=[("$natural",-1)])
00052 date = last_replicated[1]["inserted_at"]
00053 rospy.loginfo("last replicated at %s", date)
00054 except Exception as e:
00055 rospy.logwarn("failed to search last replicated date from database: %s", e)
00056 finally:
00057 return rospy.Duration(delta)
00058
00059 def insert_replicate_date(self):
00060 try:
00061 self.date_msg_store.insert(StringList(self.collections))
00062 except Exception as e:
00063 rospy.logwarn("failed to insert last replicate date to database: %s", e)
00064
00065 def move_entries(self, move_before):
00066 client = actionlib.SimpleActionClient('move_mongodb_entries', MoveEntriesAction)
00067 client.wait_for_server()
00068 goal = MoveEntriesGoal(database=self.database,
00069 collections=StringList(self.collections),
00070 move_before=move_before,
00071 delete_after_move=self.delete_after_move)
00072 client.send_goal(goal, feedback_cb=self.feedback_cb)
00073 client.wait_for_result()
00074
00075 def feedback_cb(self, feedback):
00076 rospy.loginfo(feedback)
00077
00078 def cancel(self):
00079 self.dead.set()
00080
00081 if __name__ == '__main__':
00082 rospy.init_node("mongodb_replicator_client")
00083 r = PeriodicReplicatorClient()
00084 rospy.on_shutdown(r.cancel)
00085 r.start()