Source code for concert_software_farmer.instance
#
# License: BSD
# https://raw.github.com/robotics-in-concert/rocon_concert/license/LICENSE
#
##############################################################################
# Imports
##############################################################################
import rospy
import os
import subprocess
import tempfile
import roslaunch
import rocon_python_utils.ros
import concert_msgs.msg as concert_msgs
from .exceptions import SoftwareInstanceException
[docs]class SoftwareInstance(object):
'''
Maintains runtime information of software. Such as users, parameters
'''
shutdown_timeout = 5
kill_timeout = 10
def __init__(self, profile, parameters):
self._profile = profile
success, params, msg = self._profile.validate_parameters(parameters)
if success:
self._parameters = params
else:
raise SoftwareInstanceException(msg)
self._namespace = concert_msgs.Strings.SOFTWARE_NAMESPACE + '/' + str(self._profile.name)
self._users = []
[docs] def to_msg(self):
'''
returns data as message format
:returns: Software Instance data as message format
:rtype: :exc:`concert_msgs.msg.SoftwareInstance`
'''
msg = concert_msgs.SoftwareInstance()
msg.name = self._profile.msg.name
msg.resource_name = self._profile.msg.resource_name
msg.max_count = self._profile.msg.max_count
msg.namespace = self._namespace
msg.users = self._users
msg.parameters = self._parameters
return msg
[docs] def start(self, user):
'''
Starts software for given user
:param user str: user name
:returns: whether it is success of not
:rtype: bool
'''
success = False
try:
force_screen = rospy.get_param(concert_msgs.Strings.PARAM_ROCON_SCREEN, True)
roslaunch_file_path = rocon_python_utils.ros.find_resource_from_string(self._profile.msg.launch, extension='launch')
temp = tempfile.NamedTemporaryFile(mode='w+t', delete=False)
launch_text =self._prepare_launch_text(roslaunch_file_path, self._namespace, self._parameters)
temp.write(launch_text)
temp.close()
self._roslaunch = roslaunch.parent.ROSLaunchParent(rospy.get_param('/run_id'), [temp.name], is_core=False, process_listeners=[], force_screen=force_screen)
self._roslaunch._load_config()
self._roslaunch.start()
finally:
if temp:
os.unlink(temp.name)
self.add_user(user)
return success
[docs] def stop(self):
'''
stops the software instance
:returns: whether it is success of not
:rtype: bool
'''
count = 0
while self._roslaunch.pm and not self._roslaunch.pm.done:
if count == 2 * SoftwareInstance.shutdown_timeout:
self._roslaunch.shutdown()
rospy.rostime.wallsleep(0.5)
count = count + 1
self._users = []
return True
def _prepare_launch_text(self, roslaunch_filepath, namespace, parameters):
launch_text = '<launch>\n <include ns="%s" file="%s">\n' % (namespace, roslaunch_filepath)
for param in parameters:
launch_text += '<arg name="%s" value="%s"/>'%(param.key, param.value)
launch_text +='</include></launch>'
return launch_text
[docs] def add_user(self, user):
'''
Add user to already running software instance
'''
if user in self._users:
return False, len(self._users)
else:
self._users.append(user)
return True, len(self._users)
[docs] def remove_user(self, user):
'''
remove user from running software instance
'''
if user in self._users:
self._users.remove(user)
return True, len(self._users)
else:
return False, len(self._users)
[docs] def is_max_capacity(self):
'''
Check if the software instance has reached it's max capacity
'''
return len(self._users) == self._profile.msg.max_count
[docs] def get_namespace(self):
'''
Returns the software instance namespace
'''
return self._namespace
[docs] def get_parameters(self):
'''
Returns the software instance parameters
'''
return self._parameters