00001
00002
00003
00004 '''rtctree
00005
00006 Copyright (C) 2009-2014
00007 Geoffrey Biggs
00008 RT-Synthesis Research Group
00009 Intelligent Systems Research Institute,
00010 National Institute of Advanced Industrial Science and Technology (AIST),
00011 Japan
00012 All rights reserved.
00013 Licensed under the Eclipse Public License -v 1.0 (EPL)
00014 http://www.opensource.org/licenses/eclipse-1.0.txt
00015
00016 File: ports.py
00017
00018 Objects representing ports and connections.
00019
00020 Do not create port objects directly. Call the parse_port function, which will
00021 create the correct type of port object automatically.
00022
00023 '''
00024
00025
00026 import RTC
00027 import threading
00028
00029 from rtctree.exceptions import *
00030 from rtctree.utils import build_attr_string, dict_to_nvlist, nvlist_to_dict
00031
00032
00033
00034
00035
00036 def parse_port(port_obj, owner):
00037 '''Create a port object of the correct type.
00038
00039 The correct port object type is chosen based on the port.port_type
00040 property of port_obj.
00041
00042 @param port_obj The CORBA PortService object to wrap.
00043 @param owner The owner of this port. Should be a Component object or None.
00044 @return The created port object.
00045
00046 '''
00047 profile = port_obj.get_port_profile()
00048 props = nvlist_to_dict(profile.properties)
00049 if props['port.port_type'] == 'DataInPort':
00050 return DataInPort(port_obj, owner)
00051 elif props['port.port_type'] == 'DataOutPort':
00052 return DataOutPort(port_obj, owner)
00053 elif props['port.port_type'] == 'CorbaPort':
00054 return CorbaPort(port_obj, owner)
00055 else:
00056 return Port(port_obj, owner)
00057
00058
00059
00060
00061
00062 class Port(object):
00063 '''Base class representing a port of a component.
00064
00065 Do not create Port objects directly. Call parse_port().
00066
00067 '''
00068 def __init__(self, port_obj=None, owner=None, *args, **kwargs):
00069 '''Base port constructor.
00070
00071 @param port_obj The CORBA PortService object to wrap.
00072 @param owner The owner of this port. Should be a Component object or
00073 None.
00074
00075 '''
00076 super(Port, self).__init__(*args, **kwargs)
00077 self._obj = port_obj
00078 self._connections = None
00079 self._owner = owner
00080 self._mutex = threading.RLock()
00081 self._parse()
00082
00083 def connect(self, dests=[], name=None, id='', props={}):
00084 '''Connect this port to other ports.
00085
00086 After the connection has been made, a delayed reparse of the
00087 connections for this and the destination port will be triggered.
00088
00089 @param dests A list of the destination Port objects. Must be provided.
00090 @param name The name of the connection. If None, a suitable default
00091 will be created based on the names of the two ports.
00092 @param id The ID of this connection. If None, one will be generated by
00093 the RTC implementation.
00094 @param props Properties of the connection. Required values depend on
00095 the type of the two ports being connected.
00096 @raises IncompatibleDataPortConnectionPropsError, FailedToConnectError
00097
00098 '''
00099 with self._mutex:
00100 if self.porttype == 'DataInPort' or self.porttype == 'DataOutPort':
00101 for prop in props:
00102 if prop in self.properties:
00103 if props[prop] not in self.properties[prop] and \
00104 'Any' not in self.properties[prop]:
00105
00106 raise IncompatibleDataPortConnectionPropsError
00107 for d in dests:
00108 if prop in d.properties:
00109 if props[prop] not in d.properties[prop] and \
00110 'Any' not in d.properties[prop]:
00111
00112 raise IncompatibleDataPortConnectionPropsError
00113 if not name:
00114 name = self.name + '_'.join([d.name for d in dests])
00115 props = dict_to_nvlist(props)
00116 profile = RTC.ConnectorProfile(name, id,
00117 [self._obj] + [d._obj for d in dests], props)
00118 return_code, profile = self._obj.connect(profile)
00119 if return_code != RTC.RTC_OK:
00120 raise FailedToConnectError(return_code)
00121 self.reparse_connections()
00122 for d in dests:
00123 d.reparse_connections()
00124
00125 def disconnect_all(self):
00126 '''Disconnect all connections to this port.'''
00127 with self._mutex:
00128 for conn in self.connections:
00129 self.object.disconnect(conn.id)
00130 self.reparse_connections()
00131
00132 def get_connection_by_dest(self, dest):
00133 '''DEPRECATED. Search for a connection between this and another port.'''
00134 with self._mutex:
00135 for conn in self.connections:
00136 if conn.has_port(self) and conn.has_port(dest):
00137 return conn
00138 return None
00139
00140 def get_connections_by_dest(self, dest):
00141 '''Search for all connections between this and another port.'''
00142 with self._mutex:
00143 res = []
00144 for c in self.connections:
00145 if c.has_port(self) and c.has_port(dest):
00146 res.append(c)
00147 return res
00148
00149 def get_connections_by_dests(self, dests):
00150 '''Search for all connections involving this and all other ports.'''
00151 with self._mutex:
00152 res = []
00153 for c in self.connections:
00154 if not c.has_port(self):
00155 continue
00156 for d in dests:
00157 if not c.has_port(d):
00158 continue
00159 res.append(c)
00160 return res
00161
00162 def get_connection_by_id(self, id):
00163 '''Search for a connection on this port by its ID.'''
00164 with self._mutex:
00165 for conn in self.connections:
00166 if conn.id == id:
00167 return conn
00168 return None
00169
00170 def get_connection_by_name(self, name):
00171 '''Search for a connection to or from this port by name.'''
00172 with self._mutex:
00173 for conn in self.connections:
00174 if conn.name == name:
00175 return conn
00176 return None
00177
00178 def reparse(self):
00179 '''Reparse the port.'''
00180 self._parse()
00181 self.reparse_connections()
00182
00183 def reparse_connections(self):
00184 '''Reparse the connections this port is involved in.'''
00185 with self._mutex:
00186 self._connections = None
00187
00188 @property
00189 def connections(self):
00190 '''A list of connections to or from this port.
00191
00192 This list will be created at the first reference to this property.
00193 This means that the first reference may be delayed by CORBA calls,
00194 but others will return quickly (unless a delayed reparse has been
00195 triggered).
00196
00197 '''
00198 with self._mutex:
00199 if not self._connections:
00200 self._connections = [Connection(cp, self) \
00201 for cp in self._obj.get_connector_profiles()]
00202 return self._connections
00203
00204 @property
00205 def is_connected(self):
00206 '''Check if this port is connected to any other ports.'''
00207 with self._mutex:
00208 if self.connections:
00209 return True
00210 return False
00211
00212 @property
00213 def name(self):
00214 '''The name of this port.'''
00215 with self._mutex:
00216 return self._name
00217
00218 @property
00219 def object(self):
00220 '''The PortService object that represents the port.'''
00221 with self._mutex:
00222 return self._obj
00223
00224 @property
00225 def owner(self):
00226 '''This port's owner (usually a Component object).'''
00227 with self._mutex:
00228 return self._owner
00229
00230 @property
00231 def porttype(self):
00232 '''The type of port this is.
00233
00234 Valid values are any class that @ref parse_port can create.
00235
00236 '''
00237 return self.__class__.__name__
00238
00239 @property
00240 def properties(self):
00241 '''Properties of the port.'''
00242 with self._mutex:
00243 return self._properties
00244
00245 def _parse(self):
00246
00247 with self._mutex:
00248 profile = self._obj.get_port_profile()
00249 self._name = profile.name
00250 self._properties = nvlist_to_dict(profile.properties)
00251 if self.owner:
00252 prefix = self.owner.instance_name + '.'
00253 if self._name.startswith(prefix):
00254 self._name = self._name[len(prefix):]
00255
00256
00257
00258
00259
00260 class DataPort(Port):
00261 '''Specialisation of the Port class for data ports.
00262
00263 Do not create DataPort objects directly. Call parse_port().
00264
00265 '''
00266 def __init__(self, port_obj=None, owner=None, *args, **kwargs):
00267 '''DataPort constructor.
00268
00269 @param port_obj The CORBA PortService object to wrap.
00270 @param owner The owner of this port. Should be a Component object or
00271 None.
00272
00273 '''
00274 super(DataPort, self).__init__(port_obj=port_obj, owner=owner, *args,
00275 **kwargs)
00276
00277 def connect(self, dests=[], name=None, id='', props={}):
00278 '''Connect this port to other DataPorts.
00279
00280 After the connection has been made, a delayed reparse of the
00281 connections for this and the destination port will be triggered.
00282
00283 @param dests A list of the destination Port objects. Must be provided.
00284 @param name The name of the connection. If None, a suitable default
00285 will be created based on the names of the two ports.
00286 @param id The ID of this connection. If None, one will be generated by
00287 the RTC implementation.
00288 @param props Properties of the connection. Suitable defaults will be
00289 set for required values if they are not already present.
00290 @raises WrongPortTypeError
00291
00292 '''
00293
00294 with self._mutex:
00295 new_props = props.copy()
00296 ptypes = [d.porttype for d in dests]
00297 if self.porttype == 'DataInPort':
00298 if 'DataOutPort' not in ptypes:
00299 raise WrongPortTypeError
00300 if self.porttype == 'DataOutPort':
00301 if 'DataInPort' not in ptypes:
00302 raise WrongPortTypeError
00303 if 'dataport.dataflow_type' not in new_props:
00304 new_props['dataport.dataflow_type'] = 'push'
00305 if 'dataport.interface_type' not in new_props:
00306 new_props['dataport.interface_type'] = 'corba_cdr'
00307 if 'dataport.subscription_type' not in new_props:
00308 new_props['dataport.subscription_type'] = 'new'
00309 if 'dataport.data_type' not in new_props:
00310 new_props['dataport.data_type'] = \
00311 self.properties['dataport.data_type']
00312 super(DataPort, self).connect(dests=dests, name=name, id=id,
00313 props=new_props)
00314
00315
00316 class DataInPort(DataPort):
00317 '''Specialisation of the DataPort class for input ports.
00318
00319 Do not create DataInPort objects directly. Call parse_port().
00320
00321 '''
00322 pass
00323
00324
00325 class DataOutPort(DataPort):
00326 '''Specialisation of the DataPort class for output ports.
00327
00328 Do not create DataOutPort objects directly. Call parse_port().
00329
00330 '''
00331 pass
00332
00333
00334
00335
00336
00337 class CorbaPort(Port):
00338 '''Specialisation of the Port class for service ports.
00339
00340 Do not create CorbaPort objects directly. Call parse_port().
00341
00342 '''
00343 def __init__(self, port_obj=None, owner=None, *args, **kwargs):
00344 '''CorbaPort constructor.
00345
00346 @param port_obj The CORBA PortService object to wrap.
00347 @param owner The owner of this port. Should be a Component object or
00348 None.
00349
00350 '''
00351 super(CorbaPort, self).__init__(port_obj=port_obj, owner=owner,
00352 *args, **kwargs)
00353 self._interfaces = None
00354
00355 def connect(self, dests=None, name=None, id='', props={}):
00356 '''Connect this port to other CorbaPorts.
00357
00358 After the connection has been made, a delayed reparse of the
00359 connections for this and the destination port will be triggered.
00360
00361 @param dests A list of the destination Port objects. Must be provided.
00362 @param name The name of the connection. If None, a suitable default
00363 will be created based on the names of the two ports.
00364 @param id The ID of this connection. If None, one will be generated by
00365 the RTC implementation.
00366 @param props Properties of the connection. Suitable defaults will be
00367 set for required values if they are not already present.
00368 @raises WrongPortTypeError, MismatchedInterfacesError,
00369 MismatchedPolarityError
00370
00371 '''
00372 with self._mutex:
00373
00374
00375 for d in dests:
00376 if not d.porttype == 'CorbaPort':
00377 raise WrongPortTypeError
00378
00379 if self.interfaces:
00380 for d in dests:
00381 if not d.interfaces:
00382 raise MismatchedInterfacesError
00383 for intf in self.interfaces:
00384 for d in dests:
00385 match = d.get_interface_by_instance_name(
00386 intf.instance_name)
00387 if not match:
00388 raise MismatchedInterfacesError
00389 if intf.polarity == match.polarity:
00390
00391 raise MismatchedPolarityError
00392 else:
00393 for d in dests:
00394 if d.interfaces:
00395 raise MismatchedInterfacesError
00396
00397 new_props = props.copy()
00398 if 'port.port_type' not in new_props:
00399 new_props['port.port_type'] = 'CorbaPort'
00400 super(CorbaPort, self).connect(dests=dests, name=name, id=id,
00401 props=new_props)
00402
00403 def get_interface_by_instance_name(self, name):
00404 '''Get an interface of this port by instance name.'''
00405 with self._mutex:
00406 for intf in self.interfaces:
00407 if intf.instance_name == name:
00408 return intf
00409 return None
00410
00411 @property
00412 def interfaces(self):
00413 '''The list of interfaces this port provides or uses.
00414
00415 This list will be created at the first reference to this property.
00416 This means that the first reference may be delayed by CORBA calls,
00417 but others will return quickly (unless a delayed reparse has been
00418 triggered).
00419
00420 '''
00421 with self._mutex:
00422 if not self._interfaces:
00423 profile = self._obj.get_port_profile()
00424 self._interfaces = [SvcInterface(intf) \
00425 for intf in profile.interfaces]
00426 return self._interfaces
00427
00428
00429
00430
00431
00432 class SvcInterface(object):
00433 '''Object representing the interface used by a service port.'''
00434 def __init__(self, intf_obj=None, *args, **kwargs):
00435 '''Constructor.
00436
00437 @param intf_obj The CORBA PortInterfaceProfile object to wrap.
00438
00439 '''
00440 super(SvcInterface, self).__init__(*args, **kwargs)
00441 self._obj = intf_obj
00442 self._mutex = threading.RLock()
00443 self._parse()
00444
00445 def polarity_as_string(self, add_colour=True):
00446 '''Get the polarity of this interface as a string.
00447
00448 @param add_colour If True, ANSI colour codes will be added to the
00449 string.
00450 @return A string describing the polarity of this interface.
00451
00452 '''
00453 with self._mutex:
00454 if self.polarity == self.PROVIDED:
00455 result = 'Provided', ['reset']
00456 elif self.polarity == self.REQUIRED:
00457 result = 'Required', ['reset']
00458 if add_colour:
00459 return build_attr_string(result[1], supported=add_colour) + \
00460 result[0] + build_attr_string('reset',
00461 supported=add_colour)
00462 else:
00463 return result[0]
00464
00465 def reparse(self):
00466 '''Reparse the interface object.'''
00467 self._parse()
00468
00469 @property
00470 def instance_name(self):
00471 '''Instance name of the interface.'''
00472 with self._mutex:
00473 return self._instance_name
00474
00475 @property
00476 def polarity(self):
00477 '''Polarity of this interface.'''
00478 with self._mutex:
00479 return self._polarity
00480
00481 @property
00482 def polarity_string(self):
00483 '''The polarity of this interface as a coloured string.'''
00484 with self._mutex:
00485 return self.polarity_as_string()
00486
00487 @property
00488 def type_name(self):
00489 '''Type name of the interface.'''
00490 with self._mutex:
00491 return self._type_name
00492
00493 def _parse(self):
00494
00495 with self._mutex:
00496 self._instance_name = self._obj.instance_name
00497 self._type_name = self._obj.type_name
00498 if self._obj.polarity == RTC.PROVIDED:
00499 self._polarity = self.PROVIDED
00500 else:
00501 self._polarity = self.REQUIRED
00502
00503
00504 PROVIDED = 1
00505
00506 REQUIRED = 2
00507
00508
00509
00510
00511
00512 class Connection(object):
00513 '''An object representing a connection between two or more ports.'''
00514 def __init__(self, conn_profile_obj=None, owner=None, *args, **kwargs):
00515 '''Constructor.
00516
00517 @param conn_profile_obj The CORBA ConnectorProfile object to wrap.
00518 @param owner The owner of this connection. If the creator of this
00519 object is not a Port object (or derivative thereof), this
00520 value should be set to None.
00521
00522 '''
00523 super(Connection, self).__init__(*args, **kwargs)
00524 self._obj = conn_profile_obj
00525 self._owner = owner
00526 self._mutex = threading.RLock()
00527 self._parse()
00528
00529 def __str__(self):
00530 return 'Connection {0} (ID: {1}), properties {2}, with ports '\
00531 '{3}'.format(self._name, self._id, self._properties, self._ports)
00532
00533 def disconnect(self):
00534 '''Disconnect this connection.'''
00535 with self._mutex:
00536 if not self.ports:
00537 raise NotConnectedError
00538
00539
00540
00541
00542 p = self.ports[0][1]
00543 ii = 1
00544 while not p and ii < len(self.ports):
00545 p = self.ports[ii][1]
00546 ii += 1
00547 if not p:
00548 raise UnknownConnectionOwnerError
00549 p.object.disconnect(self.id)
00550
00551 def has_port(self, port):
00552 '''Return True if this connection involves the given Port object.
00553
00554 @param port The Port object to search for in this connection's ports.
00555
00556 '''
00557 with self._mutex:
00558 for p in self.ports:
00559 if not p[1]:
00560
00561 continue
00562 if port.object._is_equivalent(p[1].object):
00563 return True
00564 return False
00565
00566 def reparse(self):
00567 '''Reparse the connection.'''
00568 self._parse()
00569
00570 @property
00571 def id(self):
00572 '''The ID of the connection.'''
00573 with self._mutex:
00574 return self._id
00575
00576 @property
00577 def name(self):
00578 '''The name of the connection.'''
00579 with self._mutex:
00580 return self._name
00581
00582 @property
00583 def owner(self):
00584 '''This connection's owner, if created by a Port object.'''
00585 with self._mutex:
00586 return self._owner
00587
00588 @property
00589 def ports(self):
00590 '''The list of ports involved in this connection.
00591
00592 The result is a list of tuples, (port name, port object). Each port
00593 name is a full path to the port (e.g. /localhost/Comp0.rtc:in) if
00594 this Connection object is owned by a Port, which is in turn owned by
00595 a Component in the tree. Otherwise, only the port's name will be used
00596 (in which case it will be the full port name, which will include the
00597 component name, e.g. 'ConsoleIn0.in'). The full path can be used to
00598 find ports in the tree.
00599
00600 If, for some reason, the owner node of a port cannot be found, that
00601 entry in the list will contain ('Unknown', None). This typically means
00602 that a component's name has been clobbered on the name server.
00603
00604 This list will be created at the first reference to this property.
00605 This means that the first reference may be delayed by CORBA calls,
00606 but others will return quickly (unless a delayed reparse has been
00607 triggered).
00608
00609 '''
00610 def has_port(node, args):
00611 if node.get_port_by_ref(args):
00612 return node
00613 return None
00614
00615 with self._mutex:
00616 if not self._ports:
00617 self._ports = []
00618 for p in self._obj.ports:
00619
00620 if self.owner and self.owner.owner:
00621 root = self.owner.owner.root
00622 owner_nodes = [n for n in root.iterate(has_port,
00623 args=p, filter=['is_component']) if n]
00624 if not owner_nodes:
00625 self._ports.append(('Unknown', None))
00626 else:
00627 port_owner = owner_nodes[0]
00628 port_owner_path = port_owner.full_path_str
00629 port_name = p.get_port_profile().name
00630 prefix = port_owner.instance_name + '.'
00631 if port_name.startswith(prefix):
00632 port_name = port_name[len(prefix):]
00633 self._ports.append((port_owner_path + ':' + \
00634 port_name, parse_port(p, self.owner.owner)))
00635 else:
00636 self._ports.append((p.get_port_profile().name,
00637 parse_port(p, None)))
00638 return self._ports
00639
00640 @property
00641 def properties(self):
00642 '''The connection's properties dictionary.'''
00643 with self._mutex:
00644 return self._properties
00645
00646 def _parse(self):
00647
00648 with self._mutex:
00649 self._name = self._obj.name
00650 self._id = self._obj.connector_id
00651 self._ports = None
00652 self._properties = nvlist_to_dict(self._obj.properties)
00653
00654
00655
00656