SmachRGOAPTest.py
Go to the documentation of this file.
00001 # Copyright (c) 2013, Felix Kolbe
00002 # All rights reserved. BSD License
00003 #
00004 # Redistribution and use in source and binary forms, with or without
00005 # modification, are permitted provided that the following conditions
00006 # are met:
00007 #
00008 # * Redistributions of source code must retain the above copyright
00009 #   notice, this list of conditions and the following disclaimer.
00010 #
00011 # * Redistributions in binary form must reproduce the above copyright
00012 #   notice, this list of conditions and the following disclaimer in the
00013 #   documentation and/or other materials provided with the distribution.
00014 #
00015 # * Neither the name of the {organization} nor the names of its
00016 #   contributors may be used to endorse or promote products derived
00017 #   from this software without specific prior written permission.
00018 #
00019 # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
00020 # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
00021 # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
00022 # A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
00023 # HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
00024 # SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
00025 # LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
00026 # DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
00027 # THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
00028 # (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
00029 # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
00030 
00031 
00032 import unittest
00033 
00034 from smach import Sequence, State
00035 
00036 from rgoap import Condition, Precondition, Effect, VariableEffect, Goal
00037 from rgoap import MemoryCondition
00038 from rgoap import Runner
00039 
00040 from rgoap_smach import SMACHStateWrapperAction
00041 
00042 
00043 
00044 def get_lookaround_smach_mock():
00045     sq = Sequence(outcomes=['succeeded', 'aborted', 'preempted'],
00046                   connector_outcome='succeeded')
00047     return sq
00048 
00049 
00050 class LookAroundAction(SMACHStateWrapperAction):
00051 
00052     def __init__(self):
00053         SMACHStateWrapperAction.__init__(self, get_lookaround_smach_mock(),
00054                                   [Precondition(Condition.get('arm_can_move'), True)],
00055                                   [VariableEffect(Condition.get('awareness'))])
00056 
00057     def _generate_variable_preconditions(self, var_effects, worldstate, start_worldstate):
00058         effect = var_effects.pop()  # this action has one variable effect
00059         assert effect is self._effects[0]
00060         # increase awareness by one
00061         precond_value = worldstate.get_condition_value(effect._condition) - 1
00062         return [Precondition(effect._condition, precond_value, None)]
00063 
00064 
00065 
00066 class Test(unittest.TestCase):
00067 
00068     @classmethod
00069     def setUpClass(cls):
00070         # rospy.init_node('smach_rgoap_test')
00071         pass
00072 
00073     def setUp(self):
00074         Condition._conditions_dict.clear() # start every test without previous conditions
00075         self.runner = Runner()
00076 
00077     def tearDown(self):
00078         pass
00079 
00080     @classmethod
00081     def tearDownClass(cls):
00082         pass
00083 
00084 
00085     def testRunner(self):
00086         memory = self.runner.memory
00087         Condition.add(MemoryCondition(memory, 'awareness', 0))
00088         Condition.add(MemoryCondition(memory, 'arm_can_move', True))
00089 
00090         self.runner.actions.add(LookAroundAction())
00091         print self.runner.actions
00092 
00093         goal = Goal([Precondition(Condition.get('awareness'), 2)])
00094         self.runner.update_and_plan_and_execute(goal)
00095 
00096 
00097     def testStateWrapperAction(self):
00098         # The in-value is translated into the SMACHStateWrapperAction
00099         # and checked inside the wrapped state to check data translation
00100         # to the wrapped state.
00101         NUMBER_IN = 123
00102         # The out-value is translated into the SMACHStateWrapperAction
00103         # and out of it again to check data translation from the wrapped
00104         # state and also the next_worldstate parameter.
00105         NUMBER_OUT = 456
00106 
00107         Condition.add(MemoryCondition(self.runner.memory, 'memory.in', NUMBER_IN))
00108         Condition.add(MemoryCondition(self.runner.memory, 'memory.out', -1))
00109 
00110         class ValueLooperState(State):
00111             def __init__(self):
00112                 State.__init__(self, ['succeeded', 'aborted'],
00113                                input_keys=['i', 'to_loop'], output_keys=['o'])
00114 
00115             def execute(self, userdata):
00116                 print "%s found 'i' in userdata to be %s.." % (self.__class__.__name__, userdata.i)
00117                 if userdata.i == NUMBER_IN:
00118                     print "..which is correct"
00119                     userdata.o = userdata.to_loop
00120                     #return 'succeeded' # will only work when using rgoap_ros.SMACHRunner
00121                 else:
00122                     print "..which is not correct!"
00123                     #return 'aborted' # see above
00124 
00125         class TranslateAction(SMACHStateWrapperAction):
00126             def __init__(self):
00127                 SMACHStateWrapperAction.__init__(self, ValueLooperState(),
00128                                                  [Precondition(Condition.get('memory.in'), NUMBER_IN),
00129                                                   Precondition(Condition.get('memory.out'), -1)],
00130                                                  [Effect(Condition.get('memory.out'), NUMBER_OUT)])
00131 
00132             def translate_worldstate_to_userdata(self, next_worldstate, userdata):
00133                 userdata.i = next_worldstate.get_condition_value(Condition.get('memory.in'))
00134                 userdata.to_loop = next_worldstate.get_condition_value(Condition.get('memory.out'))
00135 
00136 
00137             def translate_userdata_to_worldstate(self, userdata, next_worldstate):
00138                 print "FIXME: translation from userdata does not work"
00139                 # FIXME: translation from userdata does not work
00140 #                # memory.set_value('memory.out', userdata.o)
00141 #                next_worldstate.set_condition_value(Condition.get('memory.out'), userdata.o)
00142 
00143 
00144         self.runner.actions.add(TranslateAction())
00145 
00146         goal = Goal([Precondition(Condition.get('memory.out'), NUMBER_OUT),
00147                      # memory.in is added to goal to be available in goal/next_worldstate
00148                      Precondition(Condition.get('memory.in'), NUMBER_IN)])
00149 
00150         plan = self.runner.update_and_plan(goal)
00151         print "plan:", plan
00152         self.assertIsNotNone(plan, "plan should not be None")
00153 
00154         success = self.runner.execute(plan)
00155         print "success: ", success
00156         self.assertTrue(success, "executing plan was not successful")
00157         # self.assertEqual(outcome, 'succeeded', "executing plan did not return succeeded")
00158 
00159         mem_out = self.runner.memory.get_value('memory.out')
00160         print "memory.out = %s" % mem_out
00161 
00162         self.assertEqual(mem_out, NUMBER_OUT,
00163                          "out variable in memory is not changed successfully")
00164 
00165 
00166 
00167 
00168 
00169 
00170 if __name__ == "__main__":
00171     #import sys;sys.argv = ['', 'Test.testName']
00172     unittest.main()


rgoap_smach
Author(s): Felix Kolbe
autogenerated on Sun Oct 5 2014 23:53:05