Source code for tests.test_node_reactive
#!/usr/bin/env python
# -*- coding: utf-8 -*-
#
# License: MIT
#
from __future__ import absolute_import, division, print_function
import time
"""
Testing the node
This module can however be run directly from a python virtual env
where pyros_setup and pyros_utils have been installed and configured
pyros_setup will take care of setting up this process environment for ROS.
"""
##############################################################################
# Imports
##############################################################################
import sys
import unittest
import tempfile
# Doing ROS setup from code if required
try:
import rospy
except ImportError:
import pyros_setup
pyros_setup.configurable_import().configure().activate()
import rospy
import rospy
import rostest
import roslaunch
import ros1_template_msgs.msg as ros1_template_msgs
import ros1_template_msgs.srv as ros1_template_srvs
from pyros_utils import rostest_nose
##############################################################################
# Test Class
##############################################################################
# Note: Here we want to test the logger
# Not the terminal output, since this will depend on the top level launcher)
httpbin_proc = None
# This should have the same effect as the <name>.test file for rostest.
# Should be used only by nose ( or other python test tool )
[docs]def setup_module():
if not rostest_nose.is_rostest_enabled():
rostest_nose.rostest_nose_setup_module()
# rostest does this for you
launch = roslaunch.scriptapi.ROSLaunch()
launch.start()
# Same as <node pkg="ros1_pip_pytemplate" type="node.py" name="httpbin">
# <param name="base_url" value="http://httpbin.org"/>
# </node>
# Ref : http://docs.ros.org/indigo/api/roslaunch/html/index.html
rospy.set_param('/httpbin/base_url', "http://httpbin.org")
httpbin = roslaunch.core.Node('ros1_pip_pytemplate', 'node_reactive.py', name='httpbin')
httpbin_proc = launch.launch(httpbin)
assert httpbin_proc.is_alive()
[docs]def teardown_module():
if not rostest_nose.is_rostest_enabled():
rostest_nose.rostest_nose_teardown_module()
if httpbin_proc and httpbin_proc.is_alive():
httpbin_proc.terminate()
[docs]class TestHttpbinProxy(unittest.TestCase):
@classmethod
[docs] def setUpClass(cls):
# WE DO NOT NEED A NODE for sending requests to services
# But this is where we would do it...
# rospy.init_node('test_httpbin')
# CAREFUL : this should be done only once per PROCESS
# Here we enforce TEST RUN 1<->1 MODULE 1<->1 PROCESS. ROStest style.
pass
[docs] def test_httpbin_get(self):
rospy.wait_for_service('/httpbin/get')
try:
get_proxy = rospy.ServiceProxy('/httpbin/get', ros1_template_srvs.Get)
req = ros1_template_srvs.GetRequest(args=[])
resp = get_proxy(req)
self.assertEqual(resp.url, "http://httpbin.org/get")
self.assertEqual(resp.args, [])
except rospy.ServiceException as exc:
print("service call failed: {0}".format(exc))
if __name__ == '__main__':
print("ARGV : {0}".format(sys.argv))
# Note : Tests should be able to run with nosetests, or rostest ( which will launch nosetest here )
rostest_nose.rostest_or_nose_main('ros1_pip_pytemplate', 'test_httpbin', TestHttpbinProxy, sys.argv)