6 Copyright (C) 2009-2014 8 RT-Synthesis Research Group 9 Intelligent Systems Research Institute, 10 National Institute of Advanced Industrial Science and Technology (AIST), 13 Licensed under the Eclipse Public License -v 1.0 (EPL) 14 http://www.opensource.org/licenses/eclipse-1.0.txt 18 Objects representing ports and connections. 20 Do not create port objects directly. Call the parse_port function, which will 21 create the correct type of port object automatically. 30 from rtctree.utils import build_attr_string, dict_to_nvlist, nvlist_to_dict
37 '''Create a port object of the correct type. 39 The correct port object type is chosen based on the port.port_type 42 @param port_obj The CORBA PortService object to wrap. 43 @param owner The owner of this port. Should be a Component object or None. 44 @return The created port object. 47 profile = port_obj.get_port_profile()
49 if props[
'port.port_type'] ==
'DataInPort':
51 elif props[
'port.port_type'] ==
'DataOutPort':
53 elif props[
'port.port_type'] ==
'CorbaPort':
56 return Port(port_obj, owner)
63 '''Base class representing a port of a component. 65 Do not create Port objects directly. Call parse_port(). 68 def __init__(self, port_obj=None, owner=None, *args, **kwargs):
69 '''Base port constructor. 71 @param port_obj The CORBA PortService object to wrap. 72 @param owner The owner of this port. Should be a Component object or 76 super(Port, self).
__init__(*args, **kwargs)
83 def connect(self, dests=[], name=None, id='', props={}):
84 '''Connect this port to other ports. 86 After the connection has been made, a delayed reparse of the 87 connections for this and the destination port will be triggered. 89 @param dests A list of the destination Port objects. Must be provided. 90 @param name The name of the connection. If None, a suitable default 91 will be created based on the names of the two ports. 92 @param id The ID of this connection. If None, one will be generated by 93 the RTC implementation. 94 @param props Properties of the connection. Required values depend on 95 the type of the two ports being connected. 96 @raises IncompatibleDataPortConnectionPropsError, FailedToConnectError 103 if props[prop]
not in self.
properties[prop]
and \
106 raise IncompatibleDataPortConnectionPropsError
108 if prop
in d.properties:
109 if props[prop]
not in d.properties[prop]
and \
110 'Any' not in d.properties[prop]:
112 raise IncompatibleDataPortConnectionPropsError
114 name = self.
name +
'_'.join([d.name
for d
in dests])
116 profile = RTC.ConnectorProfile(name, id,
117 [self.
_obj] + [d._obj
for d
in dests], props)
119 if return_code != RTC.RTC_OK:
123 d.reparse_connections()
126 '''Disconnect all connections to this port.''' 129 self.
object.disconnect(conn.id)
133 '''DEPRECATED. Search for a connection between this and another port.''' 136 if conn.has_port(self)
and conn.has_port(dest):
141 '''Search for all connections between this and another port.''' 145 if c.has_port(self)
and c.has_port(dest):
150 '''Search for all connections involving this and all other ports.''' 154 if not c.has_port(self):
157 if not c.has_port(d):
163 '''Search for a connection on this port by its ID.''' 171 '''Search for a connection to or from this port by name.''' 174 if conn.name == name:
179 '''Reparse the port.''' 184 '''Reparse the connections this port is involved in.''' 190 '''A list of connections to or from this port. 192 This list will be created at the first reference to this property. 193 This means that the first reference may be delayed by CORBA calls, 194 but others will return quickly (unless a delayed reparse has been 201 for cp
in self.
_obj.get_connector_profiles()]
206 '''Check if this port is connected to any other ports.''' 214 '''The name of this port.''' 220 '''The PortService object that represents the port.''' 226 '''This port's owner (usually a Component object).''' 232 '''The type of port this is. 234 Valid values are any class that @ref parse_port can create. 237 return self.__class__.__name__
241 '''Properties of the port.''' 248 profile = self.
_obj.get_port_profile()
252 prefix = self.
owner.instance_name +
'.' 253 if self.
_name.startswith(prefix):
261 '''Specialisation of the Port class for data ports. 263 Do not create DataPort objects directly. Call parse_port(). 266 def __init__(self, port_obj=None, owner=None, *args, **kwargs):
267 '''DataPort constructor. 269 @param port_obj The CORBA PortService object to wrap. 270 @param owner The owner of this port. Should be a Component object or 274 super(DataPort, self).
__init__(port_obj=port_obj, owner=owner, *args,
277 def connect(self, dests=[], name=None, id='', props={}):
278 '''Connect this port to other DataPorts. 280 After the connection has been made, a delayed reparse of the 281 connections for this and the destination port will be triggered. 283 @param dests A list of the destination Port objects. Must be provided. 284 @param name The name of the connection. If None, a suitable default 285 will be created based on the names of the two ports. 286 @param id The ID of this connection. If None, one will be generated by 287 the RTC implementation. 288 @param props Properties of the connection. Suitable defaults will be 289 set for required values if they are not already present. 290 @raises WrongPortTypeError 295 new_props = props.copy()
296 ptypes = [d.porttype
for d
in dests]
298 if 'DataOutPort' not in ptypes:
299 raise WrongPortTypeError
301 if 'DataInPort' not in ptypes:
302 raise WrongPortTypeError
303 if 'dataport.dataflow_type' not in new_props:
304 new_props[
'dataport.dataflow_type'] =
'push' 305 if 'dataport.interface_type' not in new_props:
306 new_props[
'dataport.interface_type'] =
'corba_cdr' 307 if 'dataport.subscription_type' not in new_props:
308 new_props[
'dataport.subscription_type'] =
'new' 309 if 'dataport.data_type' not in new_props:
310 new_props[
'dataport.data_type'] = \
312 super(DataPort, self).
connect(dests=dests, name=name, id=id,
317 '''Specialisation of the DataPort class for input ports. 319 Do not create DataInPort objects directly. Call parse_port(). 325 class DataOutPort(DataPort):
326 '''Specialisation of the DataPort class for output ports. 328 Do not create DataOutPort objects directly. Call parse_port(). 338 '''Specialisation of the Port class for service ports. 340 Do not create CorbaPort objects directly. Call parse_port(). 343 def __init__(self, port_obj=None, owner=None, *args, **kwargs):
344 '''CorbaPort constructor. 346 @param port_obj The CORBA PortService object to wrap. 347 @param owner The owner of this port. Should be a Component object or 351 super(CorbaPort, self).
__init__(port_obj=port_obj, owner=owner,
355 def connect(self, dests=None, name=None, id='', props={}):
356 '''Connect this port to other CorbaPorts. 358 After the connection has been made, a delayed reparse of the 359 connections for this and the destination port will be triggered. 361 @param dests A list of the destination Port objects. Must be provided. 362 @param name The name of the connection. If None, a suitable default 363 will be created based on the names of the two ports. 364 @param id The ID of this connection. If None, one will be generated by 365 the RTC implementation. 366 @param props Properties of the connection. Suitable defaults will be 367 set for required values if they are not already present. 368 @raises WrongPortTypeError, MismatchedInterfacesError, 369 MismatchedPolarityError 376 if not d.porttype ==
'CorbaPort':
377 raise WrongPortTypeError
382 raise MismatchedInterfacesError
385 match = d.get_interface_by_instance_name(
388 raise MismatchedInterfacesError
389 if intf.polarity == match.polarity:
391 raise MismatchedPolarityError
395 raise MismatchedInterfacesError
397 new_props = props.copy()
398 if 'port.port_type' not in new_props:
399 new_props[
'port.port_type'] =
'CorbaPort' 400 super(CorbaPort, self).
connect(dests=dests, name=name, id=id,
404 '''Get an interface of this port by instance name.''' 407 if intf.instance_name == name:
413 '''The list of interfaces this port provides or uses. 415 This list will be created at the first reference to this property. 416 This means that the first reference may be delayed by CORBA calls, 417 but others will return quickly (unless a delayed reparse has been 423 profile = self.
_obj.get_port_profile()
425 for intf
in profile.interfaces]
433 '''Object representing the interface used by a service port.''' 434 def __init__(self, intf_obj=None, *args, **kwargs):
437 @param intf_obj The CORBA PortInterfaceProfile object to wrap. 440 super(SvcInterface, self).
__init__(*args, **kwargs)
446 '''Get the polarity of this interface as a string. 448 @param add_colour If True, ANSI colour codes will be added to the 450 @return A string describing the polarity of this interface. 455 result =
'Provided', [
'reset']
457 result =
'Required', [
'reset']
461 supported=add_colour)
466 '''Reparse the interface object.''' 471 '''Instance name of the interface.''' 477 '''Polarity of this interface.''' 483 '''The polarity of this interface as a coloured string.''' 489 '''Type name of the interface.''' 498 if self.
_obj.polarity == RTC.PROVIDED:
513 '''An object representing a connection between two or more ports.''' 514 def __init__(self, conn_profile_obj=None, owner=None, *args, **kwargs):
517 @param conn_profile_obj The CORBA ConnectorProfile object to wrap. 518 @param owner The owner of this connection. If the creator of this 519 object is not a Port object (or derivative thereof), this 520 value should be set to None. 523 super(Connection, self).
__init__(*args, **kwargs)
530 return 'Connection {0} (ID: {1}), properties {2}, with ports '\
534 '''Disconnect this connection.''' 537 raise NotConnectedError
544 while not p
and ii < len(self.
ports):
545 p = self.
ports[ii][1]
548 raise UnknownConnectionOwnerError
549 p.object.disconnect(self.
id)
552 '''Return True if this connection involves the given Port object. 554 @param port The Port object to search for in this connection's ports. 562 if port.object._is_equivalent(p[1].object):
567 '''Reparse the connection.''' 572 '''The ID of the connection.''' 578 '''The name of the connection.''' 584 '''This connection's owner, if created by a Port object.''' 590 '''The list of ports involved in this connection. 592 The result is a list of tuples, (port name, port object). Each port 593 name is a full path to the port (e.g. /localhost/Comp0.rtc:in) if 594 this Connection object is owned by a Port, which is in turn owned by 595 a Component in the tree. Otherwise, only the port's name will be used 596 (in which case it will be the full port name, which will include the 597 component name, e.g. 'ConsoleIn0.in'). The full path can be used to 598 find ports in the tree. 600 If, for some reason, the owner node of a port cannot be found, that 601 entry in the list will contain ('Unknown', None). This typically means 602 that a component's name has been clobbered on the name server. 604 This list will be created at the first reference to this property. 605 This means that the first reference may be delayed by CORBA calls, 606 but others will return quickly (unless a delayed reparse has been 611 if node.get_port_by_ref(args):
618 for p
in self.
_obj.ports:
621 root = self.
owner.owner.root
622 owner_nodes = [n
for n
in root.iterate(has_port,
623 args=p, filter=[
'is_component'])
if n]
625 self.
_ports.append((
'Unknown',
None))
627 port_owner = owner_nodes[0]
628 port_owner_path = port_owner.full_path_str
629 port_name = p.get_port_profile().name
630 prefix = port_owner.instance_name +
'.' 631 if port_name.startswith(prefix):
632 port_name = port_name[len(prefix):]
633 self.
_ports.append((port_owner_path +
':' + \
636 self.
_ports.append((p.get_port_profile().name,
642 '''The connection's properties dictionary.'''
def __init__(self, intf_obj=None, args, kwargs)
int PROVIDED
Constant for provided interface polarity.
Service port interface object.
def __init__(self, conn_profile_obj=None, owner=None, args, kwargs)
def connect(self, dests=[], name=None, id='', props={})
def __init__(self, port_obj=None, owner=None, args, kwargs)
def __init__(self, port_obj=None, owner=None, args, kwargs)
def get_connections_by_dests(self, dests)
def get_connections_by_dest(self, dest)
def get_connection_by_id(self, id)
def get_connection_by_name(self, name)
def get_connection_by_dest(self, dest)
def polarity_as_string(self, add_colour=True)
def get_interface_by_instance_name(self, name)
def build_attr_string(attrs, supported=True)
def connect(self, dests=None, name=None, id='', props={})
def nvlist_to_dict(nvlist)
def polarity_string(self)
def parse_port(port_obj, owner)
API functions.
def reparse_connections(self)
def __init__(self, port_obj=None, owner=None, args, kwargs)
int REQUIRED
Constant for required interface polarity.
def connect(self, dests=[], name=None, id='', props={})