Go to the documentation of this file.00001
00002
00003
00004 """
00005 Provides a service to store ROS message objects in a mongodb database in JSON.
00006 """
00007
00008 import rospy
00009 import actionlib
00010 from mongodb_store_msgs.msg import MoveEntriesAction, MoveEntriesGoal, StringList
00011 import datetime as dt
00012 from threading import Thread, Event
00013 import sys
00014 import signal
00015 import time
00016 from std_msgs.msg import Bool
00017
00018 from mongodb_store.message_store import MessageStoreProxy
00019
00020 class PeriodicReplicatorClient(Thread):
00021 def __init__(self):
00022 Thread.__init__(self)
00023 self.dead = Event()
00024 self.replicate_interval = rospy.get_param("replication/interval", 60 * 60 * 24)
00025 self.delete_after_move = rospy.get_param("replication/delete_after_move", False)
00026 self.database = rospy.get_param("robot/database")
00027 self.collections = rospy.myargv()[1:]
00028 try:
00029 self.collections.append(rospy.get_param("robot/name"))
00030 except KeyError as e:
00031 rospy.logerr("specify param \"robot/name\" (e.g. pr1012, olive)")
00032 exit(1)
00033
00034 self.periodic = rospy.get_param("replication/periodic", True)
00035 if self.periodic:
00036 rospy.loginfo("periodic replication interval: %d [sec]", self.replicate_interval)
00037 self.disable_on_wireless_network = rospy.get_param("replication/disable_on_wireless_network", False)
00038 if self.disable_on_wireless_network:
00039 self.network_connected = False
00040 self.net_sub = rospy.Subscriber("/network/connected", Bool, self.network_connected_cb)
00041 else:
00042 self.replicate_interval = 1
00043
00044 self.date_msg_store = MessageStoreProxy(database=self.database,
00045 collection="replication")
00046 self.replicate_ac = actionlib.SimpleActionClient('move_mongodb_entries', MoveEntriesAction)
00047 rospy.loginfo("waiting for service advertise /move_mongodb_entries ...")
00048 self.replicate_ac.wait_for_server()
00049 rospy.loginfo("replication enabled: db: %s, collection: %s, periodic: %s",
00050 self.database, self.collections, self.periodic)
00051
00052 def run(self):
00053 while not self.dead.wait(self.replicate_interval):
00054 if self.disable_on_wireless_network and not self.network_connected:
00055 rospy.loginfo("no wired network connection. skipping replication...")
00056 else:
00057 move_before = self.time_after_last_replicate_date()
00058 self.move_entries(move_before)
00059 self.insert_replicate_date()
00060 if not self.periodic:
00061 self.exit()
00062
00063 def time_after_last_replicate_date(self):
00064 delta = 0
00065 try:
00066 last_replicated = self.date_msg_store.query(StringList._type, single=True, sort_query=[("$natural",-1)])
00067 date = last_replicated[1]["inserted_at"]
00068 rospy.loginfo("last replicated at %s", date)
00069 except Exception as e:
00070 rospy.logwarn("failed to search last replicated date from database: %s", e)
00071 finally:
00072 return rospy.Duration(delta)
00073
00074 def insert_replicate_date(self):
00075 try:
00076 self.date_msg_store.insert(StringList(self.collections))
00077 except Exception as e:
00078 rospy.logwarn("failed to insert last replicate date to database: %s", e)
00079
00080 def move_entries(self, move_before):
00081 goal = MoveEntriesGoal(database=self.database,
00082 collections=StringList(self.collections),
00083 move_before=move_before,
00084 delete_after_move=self.delete_after_move)
00085 self.replicate_ac.send_goal(goal,
00086 active_cb=self.active_cb,
00087 feedback_cb=self.feedback_cb)
00088 while not self.replicate_ac.wait_for_result(timeout=rospy.Duration(5.0)):
00089 if self.disable_on_wireless_network and not self.network_connected:
00090 rospy.loginfo("disconnected wired network connection. canceling replication...")
00091 self.cancel()
00092
00093
00094 def active_cb(self):
00095 if self.disable_on_wireless_network and not self.network_connected:
00096 rospy.loginfo("disconnected wired network connection. canceling replication...")
00097 self.cancel()
00098
00099 def feedback_cb(self, feedback):
00100 rospy.loginfo(feedback)
00101
00102 def cancel(self):
00103 self.replicate_ac.cancel_all_goals()
00104
00105 def exit(self):
00106 self.cancel()
00107 self.dead.set()
00108
00109 def network_connected_cb(self, msg):
00110 self.network_connected = msg.data
00111
00112 if __name__ == '__main__':
00113 rospy.init_node("mongodb_replicator_client")
00114 r = PeriodicReplicatorClient()
00115 rospy.on_shutdown(r.cancel)
00116 r.start()