Source code for feed_the_troll.feeders
#!/usr/bin/env python
#
# License: MIT
# https://raw.githubusercontent.com/stonier/feed_the_troll/devel/LICENSE
#
##############################################################################
# Documentation
##############################################################################
"""
.. module:: loaders
:platform: Unix
:synopsis: Various loaders
"""
##############################################################################
# Imports
##############################################################################
import feed_the_troll_msgs.srv as feed_the_troll_srvs
import rosgraph
import rospy
import unique_id
##############################################################################
# Classes
##############################################################################
class Base(object):
def __init__(self, server_namespace="~"):
"""
@param str server_namespace:
"""
self._server_namespace = rospy.resolve_name(server_namespace) # converts relateive, '~' -> global appropriately
self._unload_service_name = rosgraph.names.ns_join(self._server_namespace, 'unload')
self._unload_configuration = rospy.ServiceProxy(self._unload_service_name, feed_the_troll_srvs.Unload)
self._unique_identifier = unique_id.toMsg(unique_id.fromRandom())
self.loaded = False
[docs]class ROSParameters(Base):
"""
The generic feeder script should lookup parameters, enable remappings to handle
setting of the constructor arguments. This interface is a programming interface
only.
.. warning:: this blocks indefinitely if it can't find the configuration server
.. todo:: a means of injecting a timeout by a user
.. todo:: bond construction/destruction on loading/unloading, just like nodelets do
"""
def __init__(self, server_namespace="",
configuration_namespace="~parameters",
add_shutdown_hook=True):
"""
The service namespace should be cast at the root of where the troll's services
can be found (load/param || load/yaml, unload). This is a means for programmatically
handling the connections. You can of course, also just remap in a roslaunch.
:param str server_namespace: where to find the troll services for loading/unloading
:param str configuration_namespace: where to find parameter configuration to send
:param str add_shutdown_hook: not intended for public use (testing only).
"""
super(ROSParameters, self).__init__(server_namespace)
#############################
# Parameterisation
#############################
self._configuration_namespace = rospy.resolve_name(configuration_namespace)
load_service_name = rosgraph.names.ns_join(rosgraph.names.ns_join(self._server_namespace, 'load'), 'param')
while not rospy.is_shutdown():
try:
rospy.wait_for_service(load_service_name, timeout=3.0)
break
except rospy.ROSException: # timeout
rospy.logwarn("Feeder: cannot find the troll to load configuration [{0}][{1}]".format(self._configuration_namespace,
load_service_name))
continue
except rospy.ROSInterruptException:
# may not actually reach here because time, subsequently timeouts may be awry, so check rospy.is_shutdown always!
return
if rospy.is_shutdown():
return
try:
load_configuration = rospy.ServiceProxy(load_service_name, feed_the_troll_srvs.LoadFromRosParam)
response = load_configuration(unique_identifier=self._unique_identifier,
namespace=self._configuration_namespace)
if not response.result:
rospy.logerr("Feeder: failed to load configuration [{0}]".format(response.message))
return
except rospy.ServiceException as e:
rospy.logerr("Feeder: failed to contact the configuration server [{0}][{1}]".format(load_service_name, str(e)))
return
self.loaded = True
if add_shutdown_hook:
rospy.on_shutdown(self.shutdown)
def shutdown(self):
try:
response = self._unload_configuration(unique_identifier=self._unique_identifier)
if not response.result:
rospy.logerr("Feeder: failed to unload configuration [{0}]".format(response.message))
except rospy.ServiceException as e:
rospy.logerr("Feeder: failed to contact the configuration server [{0}][{1}]".format(self._unload_service_name, str(e)))
rospy.delete_param(rosgraph.names.ns_join(rospy.get_namespace(), rospy.get_name()))