quick_call_server.py
Go to the documentation of this file.
00001 #!/usr/bin/env python
00002 #       
00003 # License: BSD
00004 #   https://raw.github.com/robotics-in-concert/rocon_tools/license/LICENSE
00005 #
00006 
00007 ##############################################################################
00008 # Imports
00009 ##############################################################################
00010 
00011 import rospy
00012 import unique_id
00013 import rocon_service_pair_msgs.msg as rocon_service_pair_msgs 
00014 import rocon_python_comms
00015 import threading
00016 
00017 ##############################################################################
00018 # Classes
00019 ##############################################################################
00020 
00021 class Jagi(object):
00022     __slots__ = [
00023         'server',
00024         'threaded_server',
00025         'requests',
00026         'lock',
00027     ]
00028 
00029     def __init__(self):
00030         self.server = rocon_python_comms.ServicePairServer('testies', self.callback, rocon_service_pair_msgs.TestiesPair)
00031         self.threaded_server = rocon_python_comms.ServicePairServer('threaded_testies', self.threaded_callback, rocon_service_pair_msgs.TestiesPair, use_threads=True)
00032         self.requests = []
00033 
00034     def callback(self, request_id, msg):
00035         '''
00036           @param request_id
00037           @type uuid_msgs/UniqueID
00038           @param msg
00039           @type ServiceRequest
00040         '''
00041         # need warning so that the test program visualises it when run with --text
00042         rospy.logwarn("QuickCallServer : non threaded received [%s][%s]" % (msg.data, unique_id.toHexString(request_id)))
00043         response = rocon_service_pair_msgs.TestiesResponse()
00044         response.data = "I heard ya dude" 
00045         rospy.sleep(2.0)
00046         self.server.reply(request_id, response)
00047     
00048     def threaded_callback(self, request_id, msg):
00049         '''
00050           @param request_id
00051           @type uuid_msgs/UniqueID
00052           @param msg
00053           @type ServiceRequest
00054         '''
00055         # need warning so that the test program visualises it when run with --text
00056         rospy.logwarn("QuickCallServer : threaded received [%s][%s]" % (msg.data, unique_id.toHexString(request_id)))
00057         response = rocon_service_pair_msgs.TestiesResponse()
00058         response.data = "I heard ya dude" 
00059         rospy.sleep(2.0)
00060         self.threaded_server.reply(request_id, response)
00061 
00062 ##############################################################################
00063 # Main
00064 ##############################################################################
00065     
00066 
00067 if __name__ == '__main__':
00068     
00069     rospy.init_node('example_rocon_pair_server')
00070     jagi = Jagi()
00071     rospy.spin()


rocon_python_comms
Author(s): Daniel Stonier
autogenerated on Fri May 2 2014 10:35:42