5 from omniORB
import CORBA, URI
6 from omniORB
import any
11 from CorbaNaming
import *
20 def __init__(self, orb_args, nserver_names=["localhost"],
21 orb=
None, naming=
None):
23 orb = CORBA.ORB_init(orb_args)
29 for ns
in nserver_names :
33 self.orb.shutdown(wait_for_completion=CORBA.FALSE)
40 def __init__(self, orb, server_name=None, naming=None):
53 ref = self.naming.resolveStr(name)
54 if ref
is None:
return None 56 return ref._narrow(cl)
63 return self.
list_obj1(self.naming._rootContext,
"")
67 name_context = self.naming._rootContext
69 b_list = name_context.list(self.
b_len)
71 rslt = rslt + self.
proc_bd(bd, name_context, parent)
73 t_list = b_list[1].next_n(self.
b_len)
76 rslt = rslt + self.
proc_bd(bd, name_context, parent)
77 t_list = b_list[1].next_n(self.
b_len)
80 def proc_bd(self, bd, name_context, parent) :
89 nam = pre + URI.nameToString(bd.binding_name)
90 if bd.binding_type == CosNaming.nobject :
91 tmp = name_context.resolve(bd.binding_name)
93 print 'objcet '+nam+
' was listed.' 95 tmp = tmp._narrow(RTC.RTObject)
97 print nam+
' is not RTC.' 103 print 'handle for '+nam+
' was created.' 107 print nam+
' is not alive.' 110 tmp = name_context.resolve(bd.binding_name)
111 tmp = tmp._narrow(CosNaming.NamingContext)
121 rslt[tmp.name]=tmp.value.value()
125 for tmp
in dict.keys() :
126 rslt.append(SDOPackage.NameValue(tmp, any.to_any(dict[tmp])))
133 def __init__(self, plist, name = None, id="", prop_dict={}) :
139 self.
name = string.join([tmp.name
for tmp
in plist],
'_')
147 for kk
in self.def_prop :
153 for pp
in self.
plist :
154 if not ((self.
prop_dict[kk]
in pp.prop[kk])
or 155 (
'Any' in pp.prop[kk])) :
176 def __init__(self, plist, name = None, id="", prop_dict={}) :
181 'dataport.interface_type':
'corba_cdr' ,
182 'dataport.subscription_type':
'flush'}
183 Connector.__init__(self, plist, name, id, prop_dict)
186 def __init__(self, plist, name = None, id="", prop_dict={}) :
188 Connector.__init__(self, plist, name, id, prop_dict)
202 tmp2 = [pp.connector_id
for pp
in tmp1]
203 if self.con.profile.connector_id
in tmp2 :
204 self.con.disconnect()
207 return self.port_profile.port_ref.get_connector_profiles()
213 self.
name = profile.instance_name
216 ref_key =
'port.' + self.
type +
'.' + self.
name 217 self.
ref=self.port.con.prop_dict[ref_key]
224 if self.type.find(
'::') == -1 :
227 self.
narrow_sym = eval(self.type.replace(
'::',
'.'), gls)
233 self.
name = profile.instance_name
243 Port.__init__(self, profile, nv_dict)
248 tmp = self.port_profile.interfaces
250 if itf.polarity == RTC.PROVIDED :
252 elif itf.polarity == RTC.REQUIRED :
257 Port.__init__(self, profile, nv_dict)
261 self.
ref = self.con.prop_dict[
'dataport.corba_cdr.inport_ref']
265 self.ref.put(CORBA.Any(self.
data_tc,
270 Port.__init__(self, profile, nv_dict)
273 if 'dataport.corba_any.outport_ref' in self.con.prop_dict :
274 self.
ref = self.con.prop_dict[
'dataport.corba_cdr.outport_ref']
283 return self.ref.get().value()
285 print "not supported" 297 self.
rtc_ref = env.naming.resolve(name)._narrow(RTC.RTObject)
307 self.
conf_ref = self.rtc_ref.get_configuration()
308 conf_set = self.conf_ref.get_configuration_sets()
312 self.
profile = self.rtc_ref.get_component_profile()
316 self.
port_refs = self.rtc_ref.get_ports()
323 tmp = pp.get_port_profile()
326 if tmp_prop[
'port.port_type']==
'DataInPort' :
329 elif tmp_prop[
'port.port_type']==
'DataOutPort' :
332 elif tmp_prop[
'port.port_type']==
'CorbaPort' :
336 def set_conf(self,conf_set_name,param_name,value) :
337 conf_set=self.
conf_set[conf_set_name]
339 conf_set_data[param_name]=value
340 conf_set.configuration_data=
dict2nvlist(conf_set_data)
341 self.conf_ref.set_configuration_set_values(conf_set_name,conf_set)
343 self.
set_conf(conf_set_name,param_name,value)
344 self.conf_ref.activate_configuration_set(conf_set_name)
def __init__(self, profile, nv_dict=None)
def __init__(self, profile)
def __init__(self, plist, name=None, id="", prop_dict={})
def narrow_ref(self, gls)
def list_obj1(self, name_context, parent)
def __init__(self, profile, nv_dict=None)
def proc_bd(self, bd, name_context, parent)
def __init__(self, orb_args, nserver_names=["localhost"], orb=None, naming=None)
def set_conf_activate(self, conf_set_name, param_name, value)
def __init__(self, profile, port)
def set_conf(self, conf_set_name, param_name, value)
def __init__(self, plist, name=None, id="", prop_dict={})
def __init__(self, name, env, ref=None)
def __init__(self, plist, name=None, id="", prop_dict={})
def __init__(self, profile, nv_dict=None)
def __init__(self, profile, nv_dict=None)
def __init__(self, orb, server_name=None, naming=None)
def get_connections(self)
def get_object_by_name(self, name, cl=RTC.RTObject)