Module API

rocon_python_comms

This is the top-level namespace of the rocon_python_comms ROS package. It provides additional python modules beyond rospy for working with ros python communications.

rocon_python_comms.exceptions

This module defines exceptions raised by the rocon_python_comms package. These exception names are all included in the main rocon_python_comms namespace. To catch one, import it this way:

from rocon_python_comms import ServicePairException

exception rocon_python_comms.exceptions.MultipleFoundException[source]

Bases: exceptions.IOError

Raised when a requested entity found multiple services, or didn’t return the correct result.

exception rocon_python_comms.exceptions.NotFoundException[source]

Bases: exceptions.IOError

Raised when a requested entity cannot be found, or didn’t return the correct result.

rocon_python_comms.nodes

This module contains anything relating to introspection or manipulation of ros nodes.


rocon_python_comms.nodes.find_node(wanted_node_name, unique=False)[source]

Do a lookup to find a node with the given name. The given name is treated as the unresolved node name. Hence this lookup will find nodes with the same name, but different namespaces.

This will raise exceptions, if the node couldn’t be found or in case unique is set multiple nodes with the same name are found.

Parameters:wanted_node_name (str) – unresolved name of the node looked for (e.g. ‘gateway’, not ‘/concert/gateway’)
Returns:the fully resolved name of the node (unique) or list of fully resolved names (non-unique)
Return type:str
Raises:NotFoundException
Todo:accept resolved names -> https://github.com/robotics-in-concert/rocon_tools/issues/30

rocon_python_comms.service_pair_client

This module contains the client side api for communicating across a rocon service pair. A facade pattern is used here to simplify the interaction with the client side publisher and subscriber.


class rocon_python_comms.service_pair_client.ServicePairClient(name, ServicePairSpec)[source]

Bases: object

The client side of a pubsub service pair.This class provides a simplified api for handling requests/responses on the pubsub pair (client side). Although the development of this class was for non-blocking request-response behaviour, it does support legacy style blocking behavious as well.

Usage (Non-Blocking):

#!/usr/bin/env python

import rospy
from chatter.msg import ChatterRequest, ChatterResponse, ChatterPair
from rocon_python_comms import ServicePairClient
import unique_id

class ChatterClient(object):

    def __init__(self):
        self.response = None
        self.client = ServicePairClient('chatter', ChatterPair)
        self.client.wait_for_service(rospy.Duration(3.0)) # should catch some exceptions here in case of timeouts
        self.request_id = self.client(ChatterRequest("Hello dude"), timeout=rospy.Duration(3.0), callback=self.callback)
        rospy.loginfo("Request id %s" % unique_id.toHexString(self.request_id))
        if self.response is not None:
            print("Response %s" % self.response)

    def callback(self, request_id, msg):
        # ideally you'd check request_id against self.request_id
        self.response = msg
        rospy.loginfo("Got the response: %s" % msg)

if __name__ == '__main__':
    rospy.init_node('chatter_client', anonymous=True)
    chatter_client = ChatterClient()
    rospy.spin()

Usage (Blocking):

#!/usr/bin/env python

import rospy
from chatter.msg import ChatterRequest, ChatterResponse, ChatterPair
from rocon_python_comms import ServicePairClient
import unique_id

class ChatterClient(object):

    def __init__(self):
        self.response = None
        self.client = ServicePairClient('chatter', ChatterPair)
        self.client.wait_for_service(rospy.Duration(3.0)) # should catch some exceptions here in case of timeouts
        self.response = self.client(ChatterRequest("Hello dude"), timeout=rospy.Duration(3.0))
        if self.response is not None:
            print("Response %s" % self.response)

if __name__ == '__main__':
    rospy.init_node('chatter_client', anonymous=True)
    chatter_client = ChatterClient()
    rospy.spin()
ServicePairRequest

Request msg type for this pair <ServicePairSpec>Request.

ServicePairResponse

Response msg type for this pair <ServicePairSpec>Response.

ServicePairSpec

Base message type for this pair.

__call__(msg, timeout=None, callback=None, error_callback=None)[source]

Initiates and executes the client request to the server. The type of arguments supplied determines whether to apply blocking or non-blocking behaviour.

If no callback is supplied, the mode is blocking, otherwise non-blocking. If no timeout is specified, then a return of None indicates that the operation timed out.

Parameters:
  • msg (<name>Request) – the request message
  • timeout (rospy.Duration) – time to wait for data
  • callback (method with arguments (uuid_msgs.UniqueID, <name>Response)) – user callback invoked for responses of non-blocking calls
Returns:

msg/id for blocking calls it is the response message, for non-blocking it is the unique id

Return type:

<name>Response/uuid_msgs.UniqueID or None (if timed out)

__init__(name, ServicePairSpec)[source]
Parameters:
  • name (str) – resource name of service pair (e.g. testies for pair topics testies/request, testies/response)
  • ServicePairSpec (str) – the pair type (e.g. rocon_service_pair_msgs.msg.TestiesPair)
wait_for_service(timeout)[source]

Waits for the service pair server to appear.

Parameters:timeout (rospy.Duration) – time to wait for data
Raises:rospy.ROSException if specified timeout is exceeded
Raises:rospy.ROSInterruptException if shutdown interrupts wait

rocon_python_comms.service_pair_server

This module contains the server side api for communicating across a rocon service pair. A facade pattern is used here to simplify the interaction with the server side publisher and subscriber.


class rocon_python_comms.service_pair_server.ServicePairServer(name, callback, ServicePairSpec, use_threads=False, queue_size=5)[source]

Bases: object

The server side of a pubsub service pair. This class provides a simplified api for handling requests/responses on the pubsub pair. There are two modes of operation - 1) blocking and 2) threaded.

Non-Threaded

In the first, the users’ callback function directly runs whenever an incoming request is received. In this case, your callbacks should be very minimal so that incoming requests don’t get blocked and queued up.

#!/usr/bin/env python

import rospy
from chatter.msg import ChatterRequest, ChatterResponse, ChatterPair
from rocon_python_comms import ServicePairServer

class ChatterServer(object):

    def __init__(self):
        self.server = ServicePairServer('chatter', self.callback, ChatterPair)

    def callback(self, request_id, msg):
        rospy.loginfo("Server : I heard %s" % msg.babble)
        response = ChatterResponse()
        response.reply = "I heard %s" % msg.babble
        self.server.reply(request_id, response)

if __name__ == '__main__':
    rospy.init_node('chatter_server', anonymous=True)
    chatter_server = ChatterServer()
    rospy.spin()

Threaded

In the second, we spawn a background thread and shunt the callback into this thread. Just toggle the use_threads flag when constructing the server:

self.server = ServicePairServer('chatter', self.callback, ChatterPair, use_threads=True)
ServicePairRequest

Request msg type for this pair <ServicePairSpec>Request.

ServicePairResponse

Response msg type for this pair <ServicePairSpec>Response.

ServicePairSpec

Base message type for this pair.

__init__(name, callback, ServicePairSpec, use_threads=False, queue_size=5)[source]
Parameters:
  • name (str) – resource name of service pair (e.g. testies for pair topics testies/request, testies/response)
  • callback – function invoked when a request arrives
  • ServicePairSpec – the pair type (e.g. rocon_service_pair_msgs.msg.TestiesPair)
  • use_threads (bool) – put the callback function into a fresh background thread when a request arrives.
  • queue_size (int) – size of the queue to configure the publisher with.
reply(request_id, msg)[source]

Send a reply to a previously received request (identified by request_id). Use this instead of writing directly to the publisher - just pass the content of the response data and the id that was issued with the request.

Parameters:
  • request_id (uuid_msgs.UniqueID) – the request id to associate with this response.
  • msg (ServiceResponse) – the response

rocon_python_comms.services

This module contains anything relating to introspection or manipulation of ros services.


rocon_python_comms.services.find_service(service_type, timeout=rospy.Duration[5000000000], unique=False)[source]

Do a lookup to find services of the type specified. This will raise exceptions if it times out or returns multiple values. It can apply the additional logic of whether this should return a single unique result, or a list. Under the hood this calls out to the ros master for a list of registered services and it parses that to determine the result. If nothing is found, it loops around internally on a 10Hz loop until the result is found or the specified timeout is reached.

Usage:

from rocon_python_comms import find_service

try:
    service_name = rocon_python_comms.find_service('rocon_interaction_msgs/SetInteractions',
                                                   timeout=rospy.rostime.Duration(15.0),
                                                   unique=True)
except rocon_python_comms.NotFoundException as e:
    rospy.logwarn("failed to find the set_interactions service.")
Parameters:
  • service_type (str) – service type specification, e.g. concert_msgs/GetInteractions
  • timeout (rospy.Duration) – raise an exception if nothing is found before this timeout occurs.
  • unique (bool) – flag to select the lookup behaviour (single/multiple results)
Returns:

the fully resolved name of the service (unique) or list of names (non-unique)

Return type:

str

Raises:

NotFoundException

rocon_python_comms.services.service_is_available(service_name)[source]

Check whether the specific service is validated or not as retrieving from master state.

Param:str service_name service name checking validtation
Returns:result of service’s validation
Return type:bool

rocon_python_comms.subscriber_proxy

This module provides a means of interacting with a ros latched publisher in the same style as you would a ros service (request-response).


class rocon_python_comms.subscriber_proxy.SubscriberProxy(topic, msg_type)[source]

Works like a service proxy, but using a latched subscriber instead (regular subscribers will also work, but this is especially useful for latched subscribers since they typically always provide data).

If no timeout is specified when calling, it blocks indefinitely on a 100ms loop until a message arrives. Alternatively it will return with None if a specified timeout is reached.

Usage:

from rocon_python_comms import SubscriberProxy

gateway_info = SubscriberProxy('gateway_info', gateway_msgs.GatewayInfo)(rospy.Duration(0.5))
if gateway_info is not None:
    # do something
Todo:upgrade to make use of python events instead of manual loops
__call__(timeout=None)[source]

Returns immediately with the latest data or blocks indefinitely until the next data arrives.

Parameters:timeout (rospy.Duration) – time to wait for data, polling at 10Hz.
Returns:latest data or None
__init__(topic, msg_type)[source]
Parameters:
  • topic (str) – the topic name to subscriber to
  • msg_type (str) – any ros message type (e.g. std_msgs/String)
  • timeout (rospy.Duration) – timeout on the wait operation (None = /infty)
Returns:

msg type data or None

Return type:

same as the msg type specified in the arg or None

unregister()[source]

Unregister the subscriber so future instantiations of this class can pull a fresh subscriber (important if the data is latched).

wait_for_next(timeout=None)[source]

Makes sure any current data is cleared and waits for new data.

Parameters:timeout (rospy.Duration) – time to wait for data, polling at 10Hz.
Returns:latest data or None
wait_for_publishers()[source]

Blocks until publishers are seen.

Raises:rospy.ROSInterruptException if we are in shutdown.

rocon_python_comms.topics

This module contains anything relating to introspection or manipulation of ros topics.


rocon_python_comms.topics.find_topic(topic_type, timeout=rospy.Duration[5000000000], unique=False)[source]

Do a lookup to find topics of the type specified. It can apply the additional logic of whether this should return a single unique result, or a list. Under the hood this calls out to the ros master for a list of registered topics and it parses that to determine the result. If nothing is found, it loops around internally on a 100ms loop until the result is found or the specified timeout is reached.

This will raise exceptions if it times out or returns multiple values.

Usage:

from rocon_python_comms import find_topic

try:
    pairing_topic_name = find_topic('rocon_interaction_msgs/Pair', timeout=rospy.rostime.Duration(0.5), unique=True)
except rocon_python_comms.NotFoundException as e:
    rospy.logwarn("support for interactions disabled")
Parameters:
  • topic_type (str) – topic type specification, e.g. rocon_std_msgs/MasterInfo
  • timeout (rospy.rostime.Duration) – raise an exception if nothing is found before this timeout occurs.
  • unique (bool) – flag to select the lookup behaviour (single/multiple results)
Returns:

the fully resolved name of the topic (unique) or list of names (non-unique)

Return type:

str

Raises:

NotFoundException if no topic is found within the timeout

rocon_python_comms.wall_rate

This module provides a wall time rate class (missing in rospy).


class rocon_python_comms.wall_rate.WallRate(rate)[source]

A wall time implementation of ros’ rospy.time.Rate class.

Usage:

from rocon_python_comms import WallRate

rate = WallRate(10)  # 10Hz = 0.1s period
while True:
    # do something
    rate.sleep()
__init__(rate)[source]
Parameters:rate (float) – rate in hertz, if rate = 0, then won’t sleep
sleep()[source]

Sleep until the rate period is exceeded.