00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032 import unittest
00033
00034 from rgoap import Condition, Precondition, Effect, Memory, MemoryCondition
00035 from rgoap_ros import common_ros
00036
00037 from std_msgs.msg import Bool
00038 import rospy
00039
00040
00041
00042 class Test(unittest.TestCase):
00043
00044 topic = '/testbool'
00045
00046 def setUp(self):
00047 rospy.init_node(self.__class__.__name__)
00048
00049 Condition._conditions_dict.clear()
00050 memory = Memory()
00051
00052 self.rtc = common_ros.ROSTopicCondition('topic.testbool', Test.topic, Bool, '/data')
00053 Condition.add(self.rtc)
00054 Condition.add(MemoryCondition(memory, 'memory.anyvar'))
00055 print self.rtc
00056
00057
00058
00059 self.rta_true = common_ros.ROSTopicAction(Test.topic, Bool,
00060 [Precondition(Condition.get('memory.anyvar'), 42)],
00061 [Effect('topic.testbool', True)],
00062 msg_args=[True])
00063 self.rta_false = common_ros.ROSTopicAction(Test.topic, Bool,
00064 [Precondition(Condition.get('memory.anyvar'), 42)],
00065 [Effect('topic.testbool', False)],
00066 msg_args=[False])
00067
00068 def msg_cb(message, next_worldstate, value):
00069 message.data = value
00070 return message
00071 self.rta_cb_true = common_ros.ROSTopicAction(Test.topic, Bool,
00072 [Precondition(Condition.get('memory.anyvar'), 42)],
00073 [Effect('topic.testbool', True)],
00074 msg_cb=lambda msg, ws: msg_cb(msg, ws, True))
00075 self.rta_cb_false = common_ros.ROSTopicAction(Test.topic, Bool,
00076 [Precondition(Condition.get('memory.anyvar'), 42)],
00077 [Effect('topic.testbool', False)],
00078 msg_cb=lambda msg, ws: msg_cb(msg, ws, False))
00079
00080 def tearDown(self):
00081 pass
00082
00083
00084 def testRTC(self):
00085 publisher = rospy.Publisher(Test.topic, Bool, latch=True)
00086 rospy.sleep(2)
00087 self.assertIsNone(self.rtc.get_value(), 'New topic cond should have value None.')
00088
00089 print 'publishing..'
00090
00091 publisher.publish(True)
00092 rospy.sleep(2)
00093 self.assertTrue(self.rtc.get_value(), 'Now topic cond should have value True.')
00094
00095 publisher.publish(False)
00096 rospy.sleep(2)
00097 self.assertFalse(self.rtc.get_value(), 'Now topic cond should have value False.')
00098
00099
00100 def testRTA_msg_args(self):
00101 print self.rta_true
00102 print self.rta_false
00103
00104 rospy.sleep(2)
00105 self.assertIsNone(self.rtc.get_value(), 'New topic cond should have value None.')
00106
00107 print 'publishing via ROSTopicAction..'
00108
00109 self.rta_true.run(None)
00110 rospy.sleep(2)
00111 self.assertTrue(self.rtc.get_value(), 'Now topic cond should have value True.')
00112
00113 self.rta_false.run(None)
00114 rospy.sleep(2)
00115 self.assertFalse(self.rtc.get_value(), 'Now topic cond should have value False.')
00116
00117 def testRTA_msg_cb(self):
00118 print self.rta_cb_true
00119 print self.rta_cb_false
00120
00121 rospy.sleep(2)
00122 self.assertIsNone(self.rtc.get_value(), 'New topic cond should have value None.')
00123
00124 print 'publishing via ROSTopicAction..'
00125
00126 self.rta_cb_true.run(None)
00127 rospy.sleep(2)
00128 self.assertTrue(self.rtc.get_value(), 'Now topic cond should have value True.')
00129
00130 self.rta_cb_false.run(None)
00131 rospy.sleep(2)
00132 self.assertFalse(self.rtc.get_value(), 'Now topic cond should have value False.')
00133
00134
00135
00136 if __name__ == "__main__":
00137
00138 unittest.main()