00001 #!/usr/bin/python
00002 #
00003 # This script stores All actionlib goals and results by PostgreSQL
00004 # And store the current robot joints when recieve result.
00005 #
00006 # table configuration is bottom of this script
00007 #
00009 # parameters:
00010 #    dbname,hostname,port,username,passwd: for DB connection
00011 #    except action type:
00012 #    joint_tolerance: store the past joint_states (sec)
00014 import roslib; roslib.load_manifest('jsk_pr2_startup')
00015 import rospy
00016 import pgdb
00017 import tf
00018 import cStringIO
00019 import thread
00021 import std_msgs.msg
00022 import actionlib_msgs.msg
00023 import sensor_msgs.msg
00024 from sensor_msgs.msg import JointState
00026 def message_serialize(msg):
00027     buf = cStringIO.StringIO()
00028     msg.serialize(buf)
00029     return buf.getvalue().encode('hex_codec')
00031 class ActionResultDB:
00032     loaded_types = []
00033     useless_types = ['roslib.msg.Header'] # message types not but action (goal|result)
00034     subscribers = {} # topicname:subscriber
00036     def __init__(self,connection=None,db_lock=None): # TODO
00037         # args dbname, host, port, opt, tty, user, passwd
00038         db_name = rospy.get_param('~db_name','pr2db')
00039         host = rospy.get_param('~host_name','c1')
00040         port = rospy.get_param('~port',5432)
00041         username = rospy.get_param('~user_name','pr2admin')
00042         passwd = rospy.get_param('~passwd','')
00043         self.con = pgdb.connect(database=db_name, host=host, user=username, password=passwd)
00044         self.lockobj = thread.allocate_lock()
00045         # initialize
00046         self.joint_tolerance = 1.0
00047         self.joint_states = []
00048         self.joint_states_inserted = [] # list of time stamp
00049         self.joint_sub = rospy.Subscriber('/joint_states', JointState, self.joint_states_cb)
00050         return
00052     # callback functions
00053     def action_goal_cb(self, topic, type_name, msg):
00054         self.lockobj.acquire()
00055         self.insert_action_goal(topic,type_name,msg)
00056         self.lockobj.release()
00057         return
00059     def action_result_cb(self, topic, type_name, msg):
00060         self.lockobj.acquire()
00061         self.insert_action_result(topic,type_name,msg)
00062         key_func = (lambda js: abs(js.header.stamp-msg.header.stamp))
00063         nearest_state = min(self.joint_states,key=key_func)
00064         self.insert_joint_states(nearest_state)
00065         self.lockobj.release()
00066         return
00068     def joint_states_cb(self, msg):
00069         self.lockobj.acquire()
00070         self.joint_states = [m for m in self.joint_states if (msg.header.stamp - m.header.stamp).to_sec() < self.joint_tolerance] + [msg]
00071         self.joint_states_inserted = [s for s in self.joint_states_inserted if (msg.header.stamp - s).to_sec() < self.joint_tolerance]
00072         self.lockobj.release()
00073         return
00075     # insert functions
00076     def insert_action_goal(self,topic,type_name,msg):
00077         rospy.loginfo("insert action_goal message");
00078         goal_string = message_serialize(msg.goal)
00079         cursor = self.con.cursor()
00080         cursor.execute("INSERT INTO action_goal (topic,type,header_stamp, header_frame_id, goal_id_stamp, goal_id_id, goal) VALUES ('%s','%s',%d,'%s',%d,'%s','%s');" % (topic,type_name,msg.header.stamp.to_nsec(), msg.header.frame_id, msg.goal_id.stamp.to_nsec(),, goal_string))
00081         cursor.close()
00082         self.con.commit()
00083         return
00085     def insert_action_result(self,topic,type_name,msg):
00086         rospy.loginfo("insert action_result message");
00087         result_string = message_serialize(msg.result)
00088         cursor = self.con.cursor()
00089         cursor.execute("INSERT INTO action_result (topic,type,header_stamp, header_frame_id, status_goal_id_stamp, status_goal_id_id, status_status, status_text, result) VALUES ('%s','%s',%d,'%s',%d,'%s',%d,'%s','%s');" % (topic,type_name, msg.header.stamp.to_nsec(), msg.header.frame_id, msg.status.goal_id.stamp.to_nsec(),, msg.status.status, msg.status.text, result_string))
00090         cursor.close()
00091         self.con.commit()
00092         return
00094     def insert_joint_states(self,msg):
00095         if msg.header.stamp in self.joint_states_inserted:
00096             return
00097         rospy.loginfo("insert joint_states message");
00098         msg_string = message_serialize(msg)
00099         cursor = self.con.cursor()
00100         cursor.execute("INSERT INTO joint_states (header_stamp, header_frame_id, msg) VALUES (%d,'%s','%s');" % (msg.header.stamp.to_nsec(), msg.header.frame_id, msg_string))
00101         cursor.close()
00102         self.con.commit()
00103         return
00105     # if the message type is goal or result, return the callback
00106     def message_callback_type(self,name,type_name,type_obj):
00107         if not hasattr(type_obj,'header'): return None
00108         if type(type_obj.header) != std_msgs.msg.Header: return None
00109         if type_name in ['JointTrajectoryActionGoal',
00110                          'JointTrajectoryActionResult',
00111                          'PointHeadActionGoal',
00112                          'PointHeadActionResult',
00113                          'SingleJointPositionActionGoal',
00114                          'SingleJointPositionActionResult',
00115                          'Pr2GripperCommandActionGoal',
00116                          'Pr2GripperCommandActionResult']:
00117             return None
00118         if hasattr(type_obj,'goal_id') and hasattr(type_obj,'goal') and type(type_obj.goal_id) == actionlib_msgs.msg.GoalID:
00119             return (lambda msg: self.action_goal_cb(name,type_name,msg))
00120         if hasattr(type_obj,'status') and hasattr(type_obj,'result') and type(type_obj.status) == actionlib_msgs.msg.GoalStatus:
00121             return (lambda msg: self.action_result_cb(name,type_name,msg))
00122         return None
00124     # subscriber updater
00125     def update_subscribers(self):
00127         # check connections
00128 #        for name in self.subscribers.keys():
00129 #            sub = self.subscribers[name]
00130 #            if sub.get_num_connections() == 0:
00131 #                sub.unregister()
00132 #                self.subscribers.pop(name)
00133 #                rospy.loginfo('unsubscribe (%s)',name)
00135         # check new publishers
00136         current_subscribers = rospy.client.get_published_topics()
00137         for topic_info in current_subscribers:
00138             name = topic_info[0]
00139             typ_tuple  = tuple(topic_info[1].split('/'))
00140             typ = '%s.msg.%s'%typ_tuple
00141             if typ in self.useless_types:
00142                 continue
00143             if name in self.subscribers.keys():
00144                 continue
00146             # Import and Check
00147             try:
00148                 type_obj = eval(typ)()
00149             except (AttributeError, NameError), e:
00150                 try:
00151                     rospy.loginfo("try to load [%s]", typ_tuple[0]);
00152                     roslib.load_manifest(typ_tuple[0])
00153                     exec('import ' + typ_tuple[0] + '.msg')
00154                 except SyntaxError, e:
00155                     rospy.loginfo('please rosmake %s', typ_tuple[0])
00157             try:
00158                 type_class = eval(typ)
00159                 cb_obj = self.message_callback_type(name,typ_tuple[1],type_class())
00160                 if cb_obj == None:
00161                     self.useless_types += [typ]
00162                     continue
00163                 self.subscribers[name] = rospy.Subscriber(name,type_class,cb_obj)
00164                 rospy.loginfo("start subscribe (topic=%s type=%s)", name, typ);
00165             except Exception, e:
00166                 self.useless_types += [typ]
00167                 rospy.loginfo('error in checking '+typ_tuple+e)
00168                 continue
00169         return
00171 if __name__ == "__main__":
00172     rospy.init_node('action_result_db')
00173     obj = ActionResultDB()
00174     looprate = rospy.Rate(1.0)
00175     while not rospy.is_shutdown():
00176         obj.update_subscribers()
00177         looprate.sleep()
00179 '''
00180 CREATE TABLE action_goal(
00181   id               SERIAL PRIMARY KEY UNIQUE,
00182   topic            TEXT,
00183   type             TEXT,
00184   header_stamp     BIGINT,
00185   header_frame_id  TEXT,
00186   goal_id_stamp    BIGINT,
00187   goal_id_id       TEXT,
00188   goal             TEXT);
00189 '''
00190 '''
00191 CREATE TABLE action_result(
00192   id                    SERIAL PRIMARY KEY UNIQUE,
00193   topic                 TEXT,
00194   type                  TEXT,
00195   header_stamp          BIGINT,
00196   header_frame_id       TEXT,
00197   status_goal_id_stamp  BIGINT,
00198   status_goal_id_id     TEXT,
00199   status_status         INTEGER,
00200   status_text           TEXT,
00201   result                TEXT);
00202 '''
00203 '''
00204 CREATE TABLE joint_states(
00205   id               SERIAL PRIMARY KEY UNIQUE,
00206   header_stamp     BIGINT,
00207   header_frame_id  TEXT,
00208   msg              TEXT);
00209 '''
00210 # goal,result,msg is a serialized message object and coded in hex
