Source code for rqt_ez_publisher.ez_publisher_plugin
import os
import rospy
import yaml
from . import ez_publisher_widget
from . import publisher
from . import config_dialog
from . import quaternion_module
from rqt_py_common.plugin_container_widget import PluginContainerWidget
from qt_gui.plugin import Plugin
[docs]class EzPublisherPlugin(Plugin):
'''Plugin top class for rqt_ez_publisher'''
def __init__(self, context):
super(EzPublisherPlugin, self).__init__(context)
self.setObjectName('EzPublisher')
modules = [quaternion_module.QuaternionModule()]
self._widget = ez_publisher_widget.EzPublisherWidget(modules=modules)
self._widget.setObjectName('EzPublisherPluginUi')
self.mainwidget = PluginContainerWidget(self._widget, True, False)
if context.serial_number() > 1:
self._widget.setWindowTitle(self._widget.windowTitle() +
(' (%d)' % context.serial_number()))
context.add_widget(self._widget)
from argparse import ArgumentParser
parser = ArgumentParser(prog='rqt_ez_publisher')
EzPublisherPlugin.add_arguments(parser)
args, unknowns = parser.parse_known_args(context.argv())
self._loaded_settings = None
self.configurable = True
if args.slider_file is not None:
self.load_from_file(args.slider_file)
[docs] def shutdown_plugin(self):
pass
[docs] def save_to_file(self, file_path):
try:
f = open(file_path, 'w')
f.write(yaml.safe_dump(self.save_to_dict(),
encoding='utf-8', allow_unicode=True))
f.close()
rospy.loginfo('saved as %s' % file_path)
except IOError as e:
rospy.logerr('%s: Failed to save as %s' % (e, file_path))
[docs] def load_from_file(self, file_path):
if os.path.exists(file_path):
self._widget.clear_sliders()
self._loaded_settings = yaml.load(open(file_path).read())
self.restore_from_dict(self._loaded_settings)
else:
rospy.logerr('%s is not found' % file_path)
[docs] def save_settings(self, plugin_settings, instance_settings):
instance_settings.set_value(
'texts', [x.get_text() for x in self._widget.get_sliders()])
instance_settings.set_value(
'publish_interval', publisher.TopicPublisherWithTimer.publish_interval)
for slider in self._widget.get_sliders():
instance_settings.set_value(
slider.get_text() + '_range', slider.get_range())
instance_settings.set_value(
slider.get_text() + '_is_repeat', slider.is_repeat())
instance_settings.set_value('configurable', self.configurable)
[docs] def restore_settings(self, plugin_settings, instance_settings):
if self._loaded_settings is not None:
return
texts = instance_settings.value('texts')
if texts:
for text in texts:
self._widget.add_slider_by_text(text)
for slider in self._widget.get_sliders():
r = instance_settings.value(slider.get_text() + '_range')
slider.set_range(r)
is_repeat = instance_settings.value(
slider.get_text() + '_is_repeat')
slider.set_is_repeat(is_repeat == 'true')
interval = instance_settings.value('publish_interval')
if interval:
publisher.TopicPublisherWithTimer.publish_interval = int(interval)
configurable = instance_settings.value('configurable')
if configurable is not None:
self.configurable = bool(configurable)
self._widget.set_configurable(self.configurable)
[docs] def restore_from_dict(self, settings):
for text in settings['texts']:
self._widget.add_slider_by_text(text)
for slider in self._widget.get_sliders():
try:
slider_setting = settings['settings'][slider.get_text()]
slider.set_range(
[slider_setting['min'], slider_setting['max']])
slider.set_is_repeat(slider_setting['is_repeat'])
except KeyError as e:
pass
publisher.TopicPublisherWithTimer.publish_interval = (
settings['publish_interval'])
[docs] def save_to_dict(self):
save_dict = {}
save_dict['texts'] = [x.get_text() for x in self._widget.get_sliders()]
save_dict['publish_interval'] = (
publisher.TopicPublisherWithTimer.publish_interval)
save_dict['settings'] = {}
for slider in self._widget.get_sliders():
save_dict['settings'][slider.get_text()] = {}
range_min, range_max = slider.get_range()
save_dict['settings'][slider.get_text()]['min'] = range_min
save_dict['settings'][slider.get_text()]['max'] = range_max
save_dict['settings'][slider.get_text()]['is_repeat'] = (
slider.is_repeat())
return save_dict
[docs] def trigger_configuration(self):
dialog = config_dialog.ConfigDialog(self)
dialog.exec_()
self.configurable = dialog.configurable_checkbox.isChecked()
self._widget.set_configurable(self.configurable)
@staticmethod
def _isfile(parser, arg):
if os.path.isfile(arg):
return arg
else:
parser.error("Setting file %s does not exist" % arg)
@staticmethod
[docs] def add_arguments(parser):
group = parser.add_argument_group(
'Options for rqt_ez_publisher plugin')
group.add_argument('--slider-file',
type=lambda x: EzPublisherPlugin._isfile(parser, x),
help="YAML setting file")