00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014 import roslib; roslib.load_manifest('jsk_pr2_startup')
00015 import rospy
00016 import pgdb
00017 import tf
00018 import cStringIO
00019 import thread
00020
00021 import std_msgs.msg
00022 import actionlib_msgs.msg
00023 import sensor_msgs.msg
00024 from sensor_msgs.msg import JointState
00025
00026 def message_serialize(msg):
00027 buf = cStringIO.StringIO()
00028 msg.serialize(buf)
00029 return buf.getvalue().encode('hex_codec')
00030
00031 class ActionResultDB:
00032 loaded_types = []
00033 useless_types = ['roslib.msg.Header']
00034 subscribers = {}
00035
00036 def __init__(self,connection=None,db_lock=None):
00037
00038 db_name = rospy.get_param('~db_name','pr2db')
00039 host = rospy.get_param('~host_name','c1')
00040 port = rospy.get_param('~port',5432)
00041 username = rospy.get_param('~user_name','pr2admin')
00042 passwd = rospy.get_param('~passwd','')
00043 self.con = pgdb.connect(database=db_name, host=host, user=username, password=passwd)
00044 self.lockobj = thread.allocate_lock()
00045
00046 self.joint_tolerance = 1.0
00047 self.joint_states = []
00048 self.joint_states_inserted = []
00049 self.joint_sub = rospy.Subscriber('/joint_states', JointState, self.joint_states_cb)
00050 return
00051
00052
00053 def action_goal_cb(self, topic, type_name, msg):
00054 self.lockobj.acquire()
00055 self.insert_action_goal(topic,type_name,msg)
00056 self.lockobj.release()
00057 return
00058
00059 def action_result_cb(self, topic, type_name, msg):
00060 self.lockobj.acquire()
00061 self.insert_action_result(topic,type_name,msg)
00062 key_func = (lambda js: abs(js.header.stamp-msg.header.stamp))
00063 nearest_state = min(self.joint_states,key=key_func)
00064 self.insert_joint_states(nearest_state)
00065 self.lockobj.release()
00066 return
00067
00068 def joint_states_cb(self, msg):
00069 self.lockobj.acquire()
00070 self.joint_states = [m for m in self.joint_states if (msg.header.stamp - m.header.stamp).to_sec() < self.joint_tolerance] + [msg]
00071 self.joint_states_inserted = [s for s in self.joint_states_inserted if (msg.header.stamp - s).to_sec() < self.joint_tolerance]
00072 self.lockobj.release()
00073 return
00074
00075
00076 def insert_action_goal(self,topic,type_name,msg):
00077 rospy.loginfo("insert action_goal message");
00078 goal_string = message_serialize(msg.goal)
00079 cursor = self.con.cursor()
00080 cursor.execute("INSERT INTO action_goal (topic,type,header_stamp, header_frame_id, goal_id_stamp, goal_id_id, goal) VALUES ('%s','%s',%d,'%s',%d,'%s','%s');" % (topic,type_name,msg.header.stamp.to_nsec(), msg.header.frame_id, msg.goal_id.stamp.to_nsec(), msg.goal_id.id, goal_string))
00081 cursor.close()
00082 self.con.commit()
00083 return
00084
00085 def insert_action_result(self,topic,type_name,msg):
00086 rospy.loginfo("insert action_result message");
00087 result_string = message_serialize(msg.result)
00088 cursor = self.con.cursor()
00089 cursor.execute("INSERT INTO action_result (topic,type,header_stamp, header_frame_id, status_goal_id_stamp, status_goal_id_id, status_status, status_text, result) VALUES ('%s','%s',%d,'%s',%d,'%s',%d,'%s','%s');" % (topic,type_name, msg.header.stamp.to_nsec(), msg.header.frame_id, msg.status.goal_id.stamp.to_nsec(), msg.status.goal_id.id, msg.status.status, msg.status.text, result_string))
00090 cursor.close()
00091 self.con.commit()
00092 return
00093
00094 def insert_joint_states(self,msg):
00095 if msg.header.stamp in self.joint_states_inserted:
00096 return
00097 rospy.loginfo("insert joint_states message");
00098 msg_string = message_serialize(msg)
00099 cursor = self.con.cursor()
00100 cursor.execute("INSERT INTO joint_states (header_stamp, header_frame_id, msg) VALUES (%d,'%s','%s');" % (msg.header.stamp.to_nsec(), msg.header.frame_id, msg_string))
00101 cursor.close()
00102 self.con.commit()
00103 return
00104
00105
00106 def message_callback_type(self,name,type_name,type_obj):
00107 if not hasattr(type_obj,'header'): return None
00108 if type(type_obj.header) != std_msgs.msg.Header: return None
00109 if type_name in ['JointTrajectoryActionGoal',
00110 'JointTrajectoryActionResult',
00111 'PointHeadActionGoal',
00112 'PointHeadActionResult',
00113 'SingleJointPositionActionGoal',
00114 'SingleJointPositionActionResult',
00115 'Pr2GripperCommandActionGoal',
00116 'Pr2GripperCommandActionResult']:
00117 return None
00118 if hasattr(type_obj,'goal_id') and hasattr(type_obj,'goal') and type(type_obj.goal_id) == actionlib_msgs.msg.GoalID:
00119 return (lambda msg: self.action_goal_cb(name,type_name,msg))
00120 if hasattr(type_obj,'status') and hasattr(type_obj,'result') and type(type_obj.status) == actionlib_msgs.msg.GoalStatus:
00121 return (lambda msg: self.action_result_cb(name,type_name,msg))
00122 return None
00123
00124
00125 def update_subscribers(self):
00126
00127
00128
00129
00130
00131
00132
00133
00134
00135
00136 current_subscribers = rospy.client.get_published_topics()
00137 for topic_info in current_subscribers:
00138 name = topic_info[0]
00139 typ_tuple = tuple(topic_info[1].split('/'))
00140 typ = '%s.msg.%s'%typ_tuple
00141 if typ in self.useless_types:
00142 continue
00143 if name in self.subscribers.keys():
00144 continue
00145
00146
00147 try:
00148 type_obj = eval(typ)()
00149 except (AttributeError, NameError), e:
00150 try:
00151 rospy.loginfo("try to load [%s]", typ_tuple[0]);
00152 roslib.load_manifest(typ_tuple[0])
00153 exec('import ' + typ_tuple[0] + '.msg')
00154 except SyntaxError, e:
00155 rospy.loginfo('please rosmake %s', typ_tuple[0])
00156
00157 try:
00158 type_class = eval(typ)
00159 cb_obj = self.message_callback_type(name,typ_tuple[1],type_class())
00160 if cb_obj == None:
00161 self.useless_types += [typ]
00162 continue
00163 self.subscribers[name] = rospy.Subscriber(name,type_class,cb_obj)
00164 rospy.loginfo("start subscribe (topic=%s type=%s)", name, typ);
00165 except Exception, e:
00166 self.useless_types += [typ]
00167 rospy.loginfo('error in checking '+typ_tuple+e)
00168 continue
00169 return
00170
00171 if __name__ == "__main__":
00172 rospy.init_node('action_result_db')
00173 obj = ActionResultDB()
00174 looprate = rospy.Rate(1.0)
00175 while not rospy.is_shutdown():
00176 obj.update_subscribers()
00177 looprate.sleep()
00178
00179 '''
00180 CREATE TABLE action_goal(
00181 id SERIAL PRIMARY KEY UNIQUE,
00182 topic TEXT,
00183 type TEXT,
00184 header_stamp BIGINT,
00185 header_frame_id TEXT,
00186 goal_id_stamp BIGINT,
00187 goal_id_id TEXT,
00188 goal TEXT);
00189 '''
00190 '''
00191 CREATE TABLE action_result(
00192 id SERIAL PRIMARY KEY UNIQUE,
00193 topic TEXT,
00194 type TEXT,
00195 header_stamp BIGINT,
00196 header_frame_id TEXT,
00197 status_goal_id_stamp BIGINT,
00198 status_goal_id_id TEXT,
00199 status_status INTEGER,
00200 status_text TEXT,
00201 result TEXT);
00202 '''
00203 '''
00204 CREATE TABLE joint_states(
00205 id SERIAL PRIMARY KEY UNIQUE,
00206 header_stamp BIGINT,
00207 header_frame_id TEXT,
00208 msg TEXT);
00209 '''
00210