AutoTest/rtc_handle.py
Go to the documentation of this file.
1 #/usr/bin/env python
2 # -*- Python -*-
3 
4 import sys
5 from omniORB import CORBA, URI
6 from omniORB import any
7 
8 import RTC
9 
10 
11 from CorbaNaming import *
12 import SDOPackage
13 
14 # class RtmEnv :
15 # rtm environment manager
16 # orb, naming service, rtc proxy list
17 #
18 class RtmEnv :
19 
20  def __init__(self, orb_args, nserver_names=["localhost"],
21  orb=None, naming=None):
22  if not orb :
23  orb = CORBA.ORB_init(orb_args)
24  self.orb = orb
25  self.name_space = {}
26  if naming : # naming can specify only one naming service
27  self.name_space['default']=NameSpace(orb, naming=naming)
28  else :
29  for ns in nserver_names :
30  self.name_space[ns]=NameSpace(orb, server_name=ns)
31 
32  def __del__(self):
33  self.orb.shutdown(wait_for_completion=CORBA.FALSE)
34  self.orb.destroy()
35 #
36 # class NameSpace :
37 # rtc_handles and object list in naming service
38 #
39 class NameSpace :
40  def __init__(self, orb, server_name=None, naming=None):
41  self.orb = orb
42  self.name = server_name
43  if naming :
44  self.naming = naming
45  else :
46  self.naming = CorbaNaming(self.orb, server_name)
47 
48  self.b_len = 10 # iteration cut off no.
49  self.rtc_handles = {}
50  self.obj_list = {}
51 
52  def get_object_by_name(self, name, cl=RTC.RTObject):
53  ref = self.naming.resolveStr(name)
54  if ref is None: return None # return CORBA.nil ?
55  if cl :
56  return ref._narrow(cl)
57  else :
58  return ref
59 
60  def list_obj(self) :
61  self.rtc_handes = {}
62  self.obj_list = {}
63  return self.list_obj1(self.naming._rootContext, "")
64 
65  def list_obj1(self, name_context, parent) :
66  if not name_context :
67  name_context = self.naming._rootContext
68  rslt = []
69  b_list = name_context.list(self.b_len)
70  for bd in b_list[0] :
71  rslt = rslt + self.proc_bd(bd, name_context, parent)
72  if b_list[1] : # iterator : there exists remaining.
73  t_list = b_list[1].next_n(self.b_len)
74  while t_list[0] :
75  for bd in t_list[1] :
76  rslt = rslt + self.proc_bd(bd, name_context, parent)
77  t_list = b_list[1].next_n(self.b_len)
78  return rslt
79 
80  def proc_bd(self, bd, name_context, parent) :
81 # print '-------------------------------------------------------------------'
82 # print 'bd= ', bd
83 # print 'name_context= ', name_context
84 # print 'parent= ', parent
85  rslt = []
86  pre = ""
87  if parent :
88  pre = parent + "/"
89  nam = pre + URI.nameToString(bd.binding_name)
90  if bd.binding_type == CosNaming.nobject :
91  tmp = name_context.resolve(bd.binding_name)
92  self.obj_list[nam]=tmp
93  print 'objcet '+nam+' was listed.'
94  try :
95  tmp = tmp._narrow(RTC.RTObject)
96  except :
97  print nam+' is not RTC.'
98  tmp = None
99  try :
100  if tmp :
101  rslt = [[nam, tmp]]
102  self.rtc_handles[nam]=RtcHandle(nam,self,tmp)
103  print 'handle for '+nam+' was created.'
104  else :
105  pass
106  except :
107  print nam+' is not alive.'
108  pass
109  else :
110  tmp = name_context.resolve(bd.binding_name)
111  tmp = tmp._narrow(CosNaming.NamingContext)
112  rslt = self.list_obj1(tmp, nam)
113  return rslt
114 
115 #
116 # data conversion
117 #
118 def nvlist2dict(nvlist) :
119  rslt = {}
120  for tmp in nvlist :
121  rslt[tmp.name]=tmp.value.value() # nv.value and any.value()
122  return rslt
123 def dict2nvlist(dict) :
124  rslt = []
125  for tmp in dict.keys() :
126  rslt.append(SDOPackage.NameValue(tmp, any.to_any(dict[tmp])))
127  return rslt
128 #
129 # connector, port, inport, outport, service
130 #
131 
132 class Connector :
133  def __init__(self, plist, name = None, id="", prop_dict={}) :
134  self.plist = plist
135  self.port_reflist = [tmp.port_profile.port_ref for tmp in plist]
136  if name :
137  self.name = name
138  else :
139  self.name = string.join([tmp.name for tmp in plist],'_')
140  self.prop_dict = prop_dict
142  self.profile = RTC.ConnectorProfile(self.name, id, self.port_reflist, self.prop_nvlist)
143  self.nego_prop()
144 
145  def nego_prop(self) :
146  self.possible = True
147  for kk in self.def_prop :
148  if kk in self.prop_dict :
149  if not self.prop_dict[kk] :
150  self.prop_dict[kk]=self.def_prop[kk]
151  else :
152  self.prop_dict[kk]=self.def_prop[kk]
153  for pp in self.plist :
154  if not ((self.prop_dict[kk] in pp.prop[kk]) or
155  ('Any' in pp.prop[kk])) :
156  self.prop_dict[kk] = ""
157  self.possible = False
158  self.prop_nvlist = dict2nvlist(self.prop_dict)
159  self.profile.properties = self.prop_nvlist
160  return self.possible
161 
162  def connect(self) :
163 #
164 # out and inout parameters are retuned as a tuple
165 #
166  ret, self.profile = self.port_reflist[0].connect(self.profile)
167  self.prop_nvlist = self.profile.properties
168  self.prop_dict = nvlist2dict(self.prop_nvlist)
169  return ret
170 
171  def disconnect(self) :
172  ret = self.port_reflist[0].disconnect(self.profile.connector_id)
173  return ret
174 
176  def __init__(self, plist, name = None, id="", prop_dict={}) :
177 # self.def_prop = {'dataport.dataflow_type':'Push' ,
178 # 'dataport.interface_type':'CORBA_Any' ,
179 # 'dataport.subscription_type':'Flush'}
180  self.def_prop = {'dataport.dataflow_type':'push' ,
181  'dataport.interface_type':'corba_cdr' ,
182  'dataport.subscription_type':'flush'}
183  Connector.__init__(self, plist, name, id, prop_dict)
184 
186  def __init__(self, plist, name = None, id="", prop_dict={}) :
187  self.def_prop = {'port.port_type':'CorbaPort' }
188  Connector.__init__(self, plist, name, id, prop_dict)
189 
190 
191 class Port :
192  def __init__(self, profile,nv_dict=None) :
193  self.name=profile.name
194  self.port_profile = profile
195  if not nv_dict :
196  nv_dict = nvlist2dict(profile.properties)
197  self.prop = nv_dict
198  self.con = None # this must be set in each subclasses
199  def get_info(self) :
200  self.con.connect()
201  tmp1 = self.get_connections()
202  tmp2 = [pp.connector_id for pp in tmp1]
203  if self.con.profile.connector_id in tmp2 :
204  self.con.disconnect()
205 
206  def get_connections(self) :
207  return self.port_profile.port_ref.get_connector_profiles()
208 
209 class CorbaServer :
210  def __init__(self, profile, port) :
211  self.profile = profile
212  self.port = port
213  self.name = profile.instance_name
214  self.type = profile.type_name
215  self.ref = None
216  ref_key = 'port.' + self.type + '.' + self.name
217  self.ref=self.port.con.prop_dict[ref_key]
218 #
219 # if we import stubs before we create instances,
220 # we rarely need to narrow the object references.
221 # we need to specify global symbol table to evaluate class symbols.
222 #
223  def narrow_ref(self, gls) :
224  if self.type.find('::') == -1 :
225  self.narrow_sym = eval('_GlobalIDL.' + self.type, gls)
226  else :
227  self.narrow_sym = eval(self.type.replace('::','.'), gls)
228  self.ref = self.ref._narrow(self.narrow_sym)
229 
230 class CorbaClient :
231  def __init__(self, profile) :
232  self.profile = profile
233  self.name = profile.instance_name
234  self.type = profile.type_name
235 #
236 # to connect to an outside corba client,
237 # we need an implementation of the corresponding corba server.
238 # but ....
239 #
240 
241 class RtcService(Port) :
242  def __init__(self, profile,nv_dict=None) :
243  Port.__init__(self, profile, nv_dict)
244  self.con = ServiceConnector([self])
245  self.get_info()
246  self.provided={}
247  self.required={}
248  tmp = self.port_profile.interfaces
249  for itf in tmp :
250  if itf.polarity == RTC.PROVIDED :
251  self.provided[itf.instance_name] = CorbaServer(itf,self)
252  elif itf.polarity == RTC.REQUIRED :
253  self.required[itf.instance_name] = CorbaClient(itf)
254 
255 class RtcInport(Port) :
256  def __init__(self, profile, nv_dict=None) :
257  Port.__init__(self, profile, nv_dict)
258  self.con = IOConnector([self])
259  self.get_info()
260 # self.ref = self.con.prop_dict['dataport.corba_any.inport_ref']
261  self.ref = self.con.prop_dict['dataport.corba_cdr.inport_ref']
262  self.data_class = eval('RTC.' + self.prop['dataport.data_type'])
263  self.data_tc = eval('RTC._tc_' + self.prop['dataport.data_type'])
264  def write(self,data) :
265  self.ref.put(CORBA.Any(self.data_tc,
266  self.data_class(RTC.Time(0,0),data)))
267 
268 class RtcOutport(Port) :
269  def __init__(self, profile,nv_dict=None) :
270  Port.__init__(self, profile, nv_dict)
271  self.con = IOConnector([self])
272  self.get_info()
273  if 'dataport.corba_any.outport_ref' in self.con.prop_dict :
274  self.ref = self.con.prop_dict['dataport.corba_cdr.outport_ref']
275 # self.ref = self.con.prop_dict['dataport.corba_any.outport_ref']
276  else :
277  self.ref=None
278  self.data_class = eval('RTC.' + self.prop['dataport.data_type'])
279  self.data_tc = eval('RTC._tc_' + self.prop['dataport.data_type'])
280 
281  def read(self) :
282  if self.ref :
283  return self.ref.get().value()
284  else :
285  print "not supported"
286  return None
287 #
288 # RtcHandle
289 #
290 class RtcHandle :
291  def __init__(self, name, env, ref=None) :
292  self.name = name
293  self.env = env
294  if ref :
295  self.rtc_ref = ref
296  else :
297  self.rtc_ref = env.naming.resolve(name)._narrow(RTC.RTObject)
298  self.conf_ref = None
299  self.retrieve_info()
300 
301  def retrieve_info(self) :
302  self.conf_set={}
304  self.port_refs = []
306  if self.rtc_ref :
307  self.conf_ref = self.rtc_ref.get_configuration()
308  conf_set = self.conf_ref.get_configuration_sets()
309  for cc in conf_set :
310  self.conf_set[cc.id]=cc
311  self.conf_set_data[cc.id]=nvlist2dict(cc.configuration_data)
312  self.profile = self.rtc_ref.get_component_profile()
313  self.prop = nvlist2dict(self.profile.properties)
314  #self.execution_contexts = self.rtc_ref.get_contexts()
315  self.execution_contexts = self.rtc_ref.get_owned_contexts()
316  self.port_refs = self.rtc_ref.get_ports()
317  # this includes inports, outports and service ports
318  self.ports = {}
319  self.services = {}
320  self.inports = {}
321  self.outports = {}
322  for pp in self.port_refs :
323  tmp = pp.get_port_profile()
324  tmp_prop = nvlist2dict(tmp.properties)
325 # self.ports[tmp.name]=Port(tmp, tmp_prop)
326  if tmp_prop['port.port_type']=='DataInPort' :
327  self.inports[tmp.name]=RtcInport(tmp,tmp_prop)
328 # self.inports[tmp.name]=Port(tmp, tmp_prop)
329  elif tmp_prop['port.port_type']=='DataOutPort' :
330  self.outports[tmp.name]=RtcOutport(tmp, tmp_prop)
331 # self.outports[tmp.name]=Port(tmp, tmp_prop)
332  elif tmp_prop['port.port_type']=='CorbaPort' :
333  self.services[tmp.name]=RtcService(tmp, tmp_prop)
334 # self.services[tmp.name]=Port(tmp, tmp_prop)
335 
336  def set_conf(self,conf_set_name,param_name,value) :
337  conf_set=self.conf_set[conf_set_name]
338  conf_set_data=self.conf_set_data[conf_set_name]
339  conf_set_data[param_name]=value
340  conf_set.configuration_data=dict2nvlist(conf_set_data)
341  self.conf_ref.set_configuration_set_values(conf_set_name,conf_set)
342  def set_conf_activate(self,conf_set_name,param_name,value) :
343  self.set_conf(conf_set_name,param_name,value)
344  self.conf_ref.activate_configuration_set(conf_set_name)
345  def activate(self):
346  return self.execution_contexts[0].activate_component(self.rtc_ref)
347  def deactivate(self):
348  return self.execution_contexts[0].deactivate_component(self.rtc_ref)
349 #
350 # pipe
351 # a pipe is an port (interface & implementation)
352 # whhich corresponds to an outside port interface.
353 # you can subscribe and communicate to the outside port with the pipe.
354 # you need an rtc implementation to use pipes.
355 #
356 # class Pipe :
357 # pass
358 #
359 # class PipeOut(Pipe):
360 # def __init__(self, name, data_buf, size=8) :
361 # self.name = name
362 # self.data = data_buf
363 # self.OpenRTM_aist.InPort(name,data_buf,OpenRTM_aist.RingBuffer(size))
def __init__(self, profile, nv_dict=None)
def __init__(self, profile)
def __init__(self, plist, name=None, id="", prop_dict={})
def list_obj1(self, name_context, parent)
def __init__(self, profile, nv_dict=None)
def dict2nvlist(dict)
def proc_bd(self, bd, name_context, parent)
def __init__(self, orb_args, nserver_names=["localhost"], orb=None, naming=None)
def set_conf_activate(self, conf_set_name, param_name, value)
CORBA::Long find(const CorbaSequence &seq, Functor f)
Return the index of CORBA sequence element that functor matches.
def __init__(self, profile, port)
def set_conf(self, conf_set_name, param_name, value)
def __init__(self, plist, name=None, id="", prop_dict={})
def __init__(self, name, env, ref=None)
def __init__(self, plist, name=None, id="", prop_dict={})
def __init__(self, profile, nv_dict=None)
def nvlist2dict(nvlist)
def __init__(self, profile, nv_dict=None)
def __init__(self, orb, server_name=None, naming=None)
def get_object_by_name(self, name, cl=RTC.RTObject)


openrtm_aist
Author(s): Noriaki Ando
autogenerated on Mon Feb 28 2022 23:00:44