Source code for rocon_python_comms.service_pair_server

# License: BSD
# Description

.. module:: service_pair_server
   :platform: Unix
   :synopsis: Server side api for communicating across a rocon service pair.

This module contains the server side api for communicating across a rocon
service pair. A `facade pattern`_ is used here to simplify the interaction with
the server side publisher and subscriber.

.. include:: weblinks.rst


# Imports

import rospy
import threading

# Local imports
from .exceptions import ServicePairException

# Server Class

[docs]class ServicePairServer(object): ''' The server side of a pubsub service pair. This class provides a simplified api for handling requests/responses on the pubsub pair. There are two modes of operation - 1) blocking and 2) threaded. **Non-Threaded** In the first, the users' callback function directly runs whenever an incoming request is received. In this case, your callbacks should be very minimal so that incoming requests don't get blocked and queued up. .. code-block:: python #!/usr/bin/env python import rospy from chatter.msg import ChatterRequest, ChatterResponse, ChatterPair from rocon_python_comms import ServicePairServer class ChatterServer(object): def __init__(self): self.server = ServicePairServer('chatter', self.callback, ChatterPair) def callback(self, request_id, msg): rospy.loginfo("Server : I heard %s" % msg.babble) response = ChatterResponse() response.reply = "I heard %s" % msg.babble self.server.reply(request_id, response) if __name__ == '__main__': rospy.init_node('chatter_server', anonymous=True) chatter_server = ChatterServer() rospy.spin() **Threaded** In the second, we spawn a background thread and shunt the callback into this thread. Just toggle the ``use_threads`` flag when constructing the server: .. code-block:: python self.server = ServicePairServer('chatter', self.callback, ChatterPair, use_threads=True) ''' __slots__ = [ '_publisher', '_subscriber', '_callback', '_use_threads', #'_request_handlers', # initiate, track and execute requests with these { hex string ids : dic of RequestHandler objects (Blocking/NonBlocking) } 'ServicePairSpec', 'ServicePairRequest', 'ServicePairResponse', ] ########################################################################## # Initialisation ##########################################################################
[docs] def __init__(self, name, callback, ServicePairSpec, use_threads=False, queue_size=5): ''' :param str name: resource name of service pair (e.g. testies for pair topics testies/request, testies/response) :param callback: function invoked when a request arrives :param ServicePairSpec: the pair type (e.g. rocon_service_pair_msgs.msg.TestiesPair) :param bool use_threads: put the callback function into a fresh background thread when a request arrives. :param int queue_size: size of the queue to configure the publisher with. ''' self._callback = callback self._use_threads = use_threads try: p = ServicePairSpec() self.ServicePairSpec = ServicePairSpec """Base message type for this pair.""" self.ServicePairRequest = type(p.pair_request) """Request msg type for this pair <ServicePairSpec>Request.""" self.ServicePairResponse = type(p.pair_response) """Response msg type for this pair <ServicePairSpec>Response.""" except AttributeError: raise ServicePairException("Type is not an pair spec: %s" % str(ServicePairSpec)) self._subscriber = rospy.Subscriber(name + "/request", self.ServicePairRequest, self._internal_callback) self._publisher = rospy.Publisher(name + "/response", self.ServicePairResponse, queue_size=queue_size) ########################################################################## # Public Methods ##########################################################################
[docs] def reply(self, request_id, msg): ''' Send a reply to a previously received request (identified by request_id). Use this instead of writing directly to the publisher - just pass the content of the response data and the id that was issued with the request. :param uuid_msgs.UniqueID request_id: the request id to associate with this response. :param ServiceResponse msg: the response ''' pair_response = self.ServicePairResponse() = request_id pair_response.response = msg self._publisher.publish(pair_response) ########################################################################## # Callbacks ##########################################################################
def _internal_callback(self, msg): ''' :param ServicePairRequest msg: message returned from the server (with pair id etc) ''' # Check if it is a blocking call that has requested it. if self._use_threads: thread = threading.Thread(target=self._callback, args=(, msg.request)) thread.start() else: self._callback(, msg.request)