periodic_replicator_client.py
Go to the documentation of this file.
00001 #!/usr/bin/env python
00002 # -*- coding: utf-8 -*-
00003 
00004 """
00005 Provides a service to store ROS message objects in a mongodb database in JSON.
00006 """
00007 
00008 import rospy
00009 import actionlib
00010 from mongodb_store_msgs.msg import  MoveEntriesAction, MoveEntriesGoal, StringList
00011 import datetime as dt
00012 from threading import Thread, Event
00013 import sys
00014 import signal
00015 import time
00016 
00017 from mongodb_store.message_store import MessageStoreProxy
00018 
00019 class PeriodicReplicatorClient(Thread):
00020     def __init__(self):
00021         Thread.__init__(self)
00022         self.dead = Event()
00023         self.interval = rospy.get_param("mongodb_replication_interval", 60 * 60 * 24) # default: 1 day
00024         self.delete_after_move = rospy.get_param("mongodb_replication_delete_after_move", False)
00025         self.replicate_interval = rospy.Duration(self.interval)
00026         self.database = rospy.get_param("robot/database")
00027         self.collections = rospy.myargv()[1:]
00028         try:
00029             self.collections.append(rospy.get_param("robot/name"))
00030         except KeyError as e:
00031             rospy.logerr("specify param \"robot/name\" (e.g. pr1012, olive)")
00032             exit(1)
00033         self.periodic = rospy.get_param("~periodic", True)
00034         self.date_msg_store = MessageStoreProxy(database=self.database,
00035                                                 collection="replication")
00036         rospy.loginfo("replication enabled: db: %s, collection: %s, periodic: %s",
00037                       self.database, self.collections, self.periodic)
00038         if self.periodic:
00039             rospy.loginfo("periodic replication interval: %d [sec]", self.interval)
00040         
00041 
00042     def run(self):
00043         while not self.dead.wait(self.interval):
00044             move_before = self.time_after_last_replicate_date()
00045             self.move_entries(move_before)
00046             self.insert_replicate_date()
00047 
00048     def time_after_last_replicate_date(self):
00049         delta = 0
00050         try:
00051             last_replicated = self.date_msg_store.query(StringList._type, single=True, sort_query=[("$natural",-1)])
00052             date = last_replicated[1]["inserted_at"]
00053             rospy.loginfo("last replicated at %s", date)
00054         except Exception as e:
00055             rospy.logwarn("failed to search last replicated date from database: %s", e)
00056         finally:
00057             return rospy.Duration(delta)
00058 
00059     def insert_replicate_date(self):
00060         try:
00061             self.date_msg_store.insert(StringList(self.collections))
00062         except Exception as e:
00063             rospy.logwarn("failed to insert last replicate date to database: %s", e)
00064 
00065     def move_entries(self, move_before):
00066         client = actionlib.SimpleActionClient('move_mongodb_entries', MoveEntriesAction)
00067         client.wait_for_server()
00068         goal = MoveEntriesGoal(database=self.database,
00069                                collections=StringList(self.collections),
00070                                move_before=move_before,
00071                                delete_after_move=self.delete_after_move)
00072         client.send_goal(goal, feedback_cb=self.feedback_cb)
00073         client.wait_for_result()
00074 
00075     def feedback_cb(self, feedback):
00076         rospy.loginfo(feedback)
00077 
00078     def cancel(self):
00079         self.dead.set()
00080 
00081 if __name__ == '__main__':
00082     rospy.init_node("mongodb_replicator_client")
00083     r = PeriodicReplicatorClient()
00084     rospy.on_shutdown(r.cancel)
00085     r.start()


jsk_robot_startup
Author(s):
autogenerated on Wed Sep 16 2015 10:31:26