5 from omniORB
import CORBA, URI
6 from omniORB
import any
12 from CorbaNaming
import *
21 def __init__(self, orb_args, nserver_names=["localhost"],
22 orb=
None, naming=
None):
24 orb = CORBA.ORB_init(orb_args)
30 for ns
in nserver_names :
34 self.orb.shutdown(wait_for_completion=CORBA.FALSE)
41 def __init__(self, orb, server_name=None, naming=None):
43 self.
name = server_name
54 ref = self.naming.resolveStr(name)
55 if ref
is None:
return None 57 return ref._narrow(cl)
64 return self.
list_obj1(self.naming._rootContext,
"")
68 name_context = self.naming._rootContext
70 b_list = name_context.list(self.
b_len)
72 rslt = rslt + self.
proc_bd(bd, name_context, parent)
74 t_list = b_list[1].next_n(self.
b_len)
77 rslt = rslt + self.
proc_bd(bd, name_context, parent)
78 t_list = b_list[1].next_n(self.
b_len)
81 def proc_bd(self, bd, name_context, parent) :
90 nam = pre + URI.nameToString(bd.binding_name)
91 if bd.binding_type == CosNaming.nobject :
92 tmp = name_context.resolve(bd.binding_name)
94 print 'objcet '+nam+
' was listed.' 96 tmp = tmp._narrow(RTC.RTObject)
98 print nam+
' is not RTC.' 104 print 'handle for '+nam+
' was created.' 108 print nam+
' is not alive.' 111 tmp = name_context.resolve(bd.binding_name)
112 tmp = tmp._narrow(CosNaming.NamingContext)
122 rslt[tmp.name]=tmp.value.value()
126 for tmp
in dict.keys() :
127 rslt.append(SDOPackage.NameValue(tmp, any.to_any(dict[tmp])))
134 def __init__(self, plist, name = None, id="", prop_dict={}) :
136 self.
port_reflist = [tmp.port_profile.port_ref
for tmp
in plist]
140 self.
name = string.join([tmp.name
for tmp
in plist],
'_')
148 for kk
in self.def_prop :
154 for pp
in self.
plist :
155 if not ((self.
prop_dict[kk]
in pp.prop[kk])
or 156 (
'Any' in pp.prop[kk])) :
177 def __init__(self, plist, name = None, id="", prop_dict={}) :
181 self.
def_prop = {
'dataport.dataflow_type':
'push' ,
182 'dataport.interface_type':
'corba_cdr' ,
183 'dataport.subscription_type':
'flush'}
184 Connector.__init__(self, plist, name, id, prop_dict)
187 def __init__(self, plist, name = None, id="", prop_dict={}) :
188 self.
def_prop = {
'port.port_type':
'CorbaPort' }
189 Connector.__init__(self, plist, name, id, prop_dict)
194 self.
name=profile.name
203 tmp2 = [pp.connector_id
for pp
in tmp1]
204 if self.con.profile.connector_id
in tmp2 :
205 self.con.disconnect()
208 return self.port_profile.port_ref.get_connector_profiles()
214 self.
name = profile.instance_name
215 self.
type = profile.type_name
217 ref_key =
'port.' + self.
type +
'.' + self.
name 218 self.
ref=self.port.con.prop_dict[ref_key]
225 if self.type.find(
'::') == -1 :
228 self.
narrow_sym = eval(self.type.replace(
'::',
'.'), gls)
234 self.
name = profile.instance_name
235 self.
type = profile.type_name
244 Port.__init__(self, profile, nv_dict)
249 tmp = self.port_profile.interfaces
251 if itf.polarity == RTC.PROVIDED :
253 elif itf.polarity == RTC.REQUIRED :
258 Port.__init__(self, profile, nv_dict)
262 self.
ref = self.con.prop_dict[
'dataport.corba_cdr.inport_ref']
264 self.
data_tc = eval(
'RTC._tc_' + self.
prop[
'dataport.data_type'])
266 self.ref.put(CORBA.Any(self.
data_tc,
271 Port.__init__(self, profile, nv_dict)
274 if 'dataport.corba_any.outport_ref' in self.con.prop_dict :
275 self.
ref = self.con.prop_dict[
'dataport.corba_cdr.outport_ref']
280 self.
data_tc = eval(
'RTC._tc_' + self.
prop[
'dataport.data_type'])
284 return self.ref.get().value()
286 print "not supported" 298 self.
rtc_ref = env.naming.resolve(name)._narrow(RTC.RTObject)
308 self.
conf_ref = self.rtc_ref.get_configuration()
309 conf_set = self.conf_ref.get_configuration_sets()
313 self.
profile = self.rtc_ref.get_component_profile()
317 self.
port_refs = self.rtc_ref.get_ports()
324 tmp = pp.get_port_profile()
327 if tmp_prop[
'port.port_type']==
'DataInPort' :
330 elif tmp_prop[
'port.port_type']==
'DataOutPort' :
333 elif tmp_prop[
'port.port_type']==
'CorbaPort' :
337 def set_conf(self,conf_set_name,param_name,value) :
338 conf_set=self.
conf_set[conf_set_name]
340 conf_set_data[param_name]=value
341 conf_set.configuration_data=
dict2nvlist(conf_set_data)
342 self.conf_ref.set_configuration_set_values(conf_set_name,conf_set)
344 self.
set_conf(conf_set_name,param_name,value)
345 self.conf_ref.activate_configuration_set(conf_set_name)
364 self.OpenRTM.InPort(name,data_buf,OpenRTM.RingBuffer(size))
def __init__(self, profile, nv_dict=None)
def __init__(self, profile)
def __init__(self, plist, name=None, id="", prop_dict={})
def narrow_ref(self, gls)
def list_obj1(self, name_context, parent)
def __init__(self, name, data_buf, size=8)
def __init__(self, profile, nv_dict=None)
def proc_bd(self, bd, name_context, parent)
def set_conf_activate(self, conf_set_name, param_name, value)
def __init__(self, profile, port)
def set_conf(self, conf_set_name, param_name, value)
def __init__(self, plist, name=None, id="", prop_dict={})
def __init__(self, name, env, ref=None)
def __init__(self, plist, name=None, id="", prop_dict={})
def __init__(self, profile, nv_dict=None)
def __init__(self, profile, nv_dict=None)
def __init__(self, orb, server_name=None, naming=None)
def get_connections(self)
def get_object_by_name(self, name, cl=RTC.RTObject)