periodic_replicator_client.py
Go to the documentation of this file.
00001 #!/usr/bin/env python
00002 # -*- coding: utf-8 -*-
00003 
00004 """
00005 Provides a service to store ROS message objects in a mongodb database in JSON.
00006 """
00007 
00008 import rospy
00009 import actionlib
00010 from mongodb_store_msgs.msg import  MoveEntriesAction, MoveEntriesGoal, StringList
00011 import datetime as dt
00012 from threading import Thread, Event
00013 import sys
00014 import signal
00015 import time
00016 from std_msgs.msg import Bool
00017 
00018 from mongodb_store.message_store import MessageStoreProxy
00019 
00020 class PeriodicReplicatorClient(Thread):
00021     def __init__(self):
00022         Thread.__init__(self)
00023         self.dead = Event()
00024         self.replicate_interval = rospy.get_param("replication/interval", 60 * 60 * 24) # default: 1 day
00025         self.delete_after_move = rospy.get_param("replication/delete_after_move", False)
00026         self.database = rospy.get_param("robot/database")
00027         self.collections = rospy.myargv()[1:]
00028         try:
00029             self.collections.append(rospy.get_param("robot/name"))
00030         except KeyError as e:
00031             rospy.logerr("specify param \"robot/name\" (e.g. pr1012, olive)")
00032             exit(1)
00033 
00034         self.periodic = rospy.get_param("replication/periodic", True)
00035         if self.periodic:
00036             rospy.loginfo("periodic replication interval: %d [sec]", self.replicate_interval)
00037             self.disable_on_wireless_network = rospy.get_param("replication/disable_on_wireless_network", False)
00038             if self.disable_on_wireless_network:
00039                 self.network_connected = False
00040                 self.net_sub = rospy.Subscriber("/network/connected", Bool, self.network_connected_cb)
00041         else:
00042             self.replicate_interval = 1
00043 
00044         self.date_msg_store = MessageStoreProxy(database=self.database,
00045                                                 collection="replication")
00046         self.replicate_ac = actionlib.SimpleActionClient('move_mongodb_entries', MoveEntriesAction)
00047         rospy.loginfo("waiting for service advertise /move_mongodb_entries ...")
00048         self.replicate_ac.wait_for_server()
00049         rospy.loginfo("replication enabled: db: %s, collection: %s, periodic: %s",
00050                       self.database, self.collections, self.periodic)
00051 
00052     def run(self):
00053         while not self.dead.wait(self.replicate_interval):
00054             if self.disable_on_wireless_network and not self.network_connected:
00055                 rospy.loginfo("no wired network connection. skipping replication...")
00056             else:
00057                 move_before = self.time_after_last_replicate_date()
00058                 self.move_entries(move_before)
00059                 self.insert_replicate_date()
00060                 if not self.periodic:
00061                     self.exit()
00062 
00063     def time_after_last_replicate_date(self):
00064         delta = 0
00065         try:
00066             last_replicated = self.date_msg_store.query(StringList._type, single=True, sort_query=[("$natural",-1)])
00067             date = last_replicated[1]["inserted_at"]
00068             rospy.loginfo("last replicated at %s", date)
00069         except Exception as e:
00070             rospy.logwarn("failed to search last replicated date from database: %s", e)
00071         finally:
00072             return rospy.Duration(delta)
00073 
00074     def insert_replicate_date(self):
00075         try:
00076             self.date_msg_store.insert(StringList(self.collections))
00077         except Exception as e:
00078             rospy.logwarn("failed to insert last replicate date to database: %s", e)
00079 
00080     def move_entries(self, move_before):
00081         goal = MoveEntriesGoal(database=self.database,
00082                                collections=StringList(self.collections),
00083                                move_before=move_before,
00084                                delete_after_move=self.delete_after_move)
00085         self.replicate_ac.send_goal(goal,
00086                                     active_cb=self.active_cb,
00087                                     feedback_cb=self.feedback_cb)
00088         while not self.replicate_ac.wait_for_result(timeout=rospy.Duration(5.0)):
00089             if self.disable_on_wireless_network and not self.network_connected:
00090                 rospy.loginfo("disconnected wired network connection. canceling replication...")
00091                 self.cancel()
00092 
00093 
00094     def active_cb(self):
00095         if self.disable_on_wireless_network and not self.network_connected:
00096                 rospy.loginfo("disconnected wired network connection. canceling replication...")
00097                 self.cancel()
00098 
00099     def feedback_cb(self, feedback):
00100         rospy.loginfo(feedback)
00101 
00102     def cancel(self):
00103         self.replicate_ac.cancel_all_goals()
00104 
00105     def exit(self):
00106         self.cancel()
00107         self.dead.set()
00108 
00109     def network_connected_cb(self, msg):
00110         self.network_connected = msg.data
00111 
00112 if __name__ == '__main__':
00113     rospy.init_node("mongodb_replicator_client")
00114     r = PeriodicReplicatorClient()
00115     rospy.on_shutdown(r.cancel)
00116     r.start()


jsk_robot_startup
Author(s):
autogenerated on Sat Jul 1 2017 02:42:18