tests/rtc_handle.py
Go to the documentation of this file.
1 #/usr/bin/env python
2 # -*- Python -*-
3 
4 import sys
5 from omniORB import CORBA, URI
6 from omniORB import any
7 
8 import OpenRTM
9 import RTC
10 
11 
12 from CorbaNaming import *
13 import SDOPackage
14 
15 # class RtmEnv :
16 # rtm environment manager
17 # orb, naming service, rtc proxy list
18 #
19 class RtmEnv :
20 
21  def __init__(self, orb_args, nserver_names=["localhost"],
22  orb=None, naming=None):
23  if not orb :
24  orb = CORBA.ORB_init(orb_args)
25  self.orb = orb
26  self.name_space = {}
27  if naming : # naming can specify only one naming service
28  self.name_space['default']=NameSpace(orb, naming=naming)
29  else :
30  for ns in nserver_names :
31  self.name_space[ns]=NameSpace(orb, server_name=ns)
32 
33  def __del__(self):
34  self.orb.shutdown(wait_for_completion=CORBA.FALSE)
35  self.orb.destroy()
36 #
37 # class NameSpace :
38 # rtc_handles and object list in naming service
39 #
40 class NameSpace :
41  def __init__(self, orb, server_name=None, naming=None):
42  self.orb = orb
43  self.name = server_name
44  if naming :
45  self.naming = naming
46  else :
47  self.naming = CorbaNaming(self.orb, server_name)
48 
49  self.b_len = 10 # iteration cut off no.
50  self.rtc_handles = {}
51  self.obj_list = {}
52 
53  def get_object_by_name(self, name, cl=RTC.RTObject):
54  ref = self.naming.resolveStr(name)
55  if ref is None: return None # return CORBA.nil ?
56  if cl :
57  return ref._narrow(cl)
58  else :
59  return ref
60 
61  def list_obj(self) :
62  self.rtc_handes = {}
63  self.obj_list = {}
64  return self.list_obj1(self.naming._rootContext, "")
65 
66  def list_obj1(self, name_context, parent) :
67  if not name_context :
68  name_context = self.naming._rootContext
69  rslt = []
70  b_list = name_context.list(self.b_len)
71  for bd in b_list[0] :
72  rslt = rslt + self.proc_bd(bd, name_context, parent)
73  if b_list[1] : # iterator : there exists remaining.
74  t_list = b_list[1].next_n(self.b_len)
75  while t_list[0] :
76  for bd in t_list[1] :
77  rslt = rslt + self.proc_bd(bd, name_context, parent)
78  t_list = b_list[1].next_n(self.b_len)
79  return rslt
80 
81  def proc_bd(self, bd, name_context, parent) :
82 # print '-------------------------------------------------------------------'
83 # print 'bd= ', bd
84 # print 'name_context= ', name_context
85 # print 'parent= ', parent
86  rslt = []
87  pre = ""
88  if parent :
89  pre = parent + "/"
90  nam = pre + URI.nameToString(bd.binding_name)
91  if bd.binding_type == CosNaming.nobject :
92  tmp = name_context.resolve(bd.binding_name)
93  self.obj_list[nam]=tmp
94  print 'objcet '+nam+' was listed.'
95  try :
96  tmp = tmp._narrow(RTC.RTObject)
97  except :
98  print nam+' is not RTC.'
99  tmp = None
100  try :
101  if tmp :
102  rslt = [[nam, tmp]]
103  self.rtc_handles[nam]=RtcHandle(nam,self,tmp)
104  print 'handle for '+nam+' was created.'
105  else :
106  pass
107  except :
108  print nam+' is not alive.'
109  pass
110  else :
111  tmp = name_context.resolve(bd.binding_name)
112  tmp = tmp._narrow(CosNaming.NamingContext)
113  rslt = self.list_obj1(tmp, nam)
114  return rslt
115 
116 #
117 # data conversion
118 #
119 def nvlist2dict(nvlist) :
120  rslt = {}
121  for tmp in nvlist :
122  rslt[tmp.name]=tmp.value.value() # nv.value and any.value()
123  return rslt
124 def dict2nvlist(dict) :
125  rslt = []
126  for tmp in dict.keys() :
127  rslt.append(SDOPackage.NameValue(tmp, any.to_any(dict[tmp])))
128  return rslt
129 #
130 # connector, port, inport, outport, service
131 #
132 
133 class Connector :
134  def __init__(self, plist, name = None, id="", prop_dict={}) :
135  self.plist = plist
136  self.port_reflist = [tmp.port_profile.port_ref for tmp in plist]
137  if name :
138  self.name = name
139  else :
140  self.name = string.join([tmp.name for tmp in plist],'_')
141  self.prop_dict = prop_dict
142  self.prop_nvlist = dict2nvlist(self.prop_dict)
143  self.profile = RTC.ConnectorProfile(self.name, id, self.port_reflist, self.prop_nvlist)
144  self.nego_prop()
145 
146  def nego_prop(self) :
147  self.possible = True
148  for kk in self.def_prop :
149  if kk in self.prop_dict :
150  if not self.prop_dict[kk] :
151  self.prop_dict[kk]=self.def_prop[kk]
152  else :
153  self.prop_dict[kk]=self.def_prop[kk]
154  for pp in self.plist :
155  if not ((self.prop_dict[kk] in pp.prop[kk]) or
156  ('Any' in pp.prop[kk])) :
157  self.prop_dict[kk] = ""
158  self.possible = False
159  self.prop_nvlist = dict2nvlist(self.prop_dict)
160  self.profile.properties = self.prop_nvlist
161  return self.possible
162 
163  def connect(self) :
164 #
165 # out and inout parameters are retuned as a tuple
166 #
167  ret, self.profile = self.port_reflist[0].connect(self.profile)
168  self.prop_nvlist = self.profile.properties
169  self.prop_dict = nvlist2dict(self.prop_nvlist)
170  return ret
171 
172  def disconnect(self) :
173  ret = self.port_reflist[0].disconnect(self.profile.connector_id)
174  return ret
175 
176 class IOConnector(Connector) :
177  def __init__(self, plist, name = None, id="", prop_dict={}) :
178 # self.def_prop = {'dataport.dataflow_type':'Push' ,
179 # 'dataport.interface_type':'CORBA_Any' ,
180 # 'dataport.subscription_type':'Flush'}
181  self.def_prop = {'dataport.dataflow_type':'push' ,
182  'dataport.interface_type':'corba_cdr' ,
183  'dataport.subscription_type':'flush'}
184  Connector.__init__(self, plist, name, id, prop_dict)
185 
187  def __init__(self, plist, name = None, id="", prop_dict={}) :
188  self.def_prop = {'port.port_type':'CorbaPort' }
189  Connector.__init__(self, plist, name, id, prop_dict)
190 
191 
192 class Port :
193  def __init__(self, profile,nv_dict=None) :
194  self.name=profile.name
195  self.port_profile = profile
196  if not nv_dict :
197  nv_dict = nvlist2dict(profile.properties)
198  self.prop = nv_dict
199  self.con = None # this must be set in each subclasses
200  def get_info(self) :
201  self.con.connect()
202  tmp1 = self.get_connections()
203  tmp2 = [pp.connector_id for pp in tmp1]
204  if self.con.profile.connector_id in tmp2 :
205  self.con.disconnect()
206 
207  def get_connections(self) :
208  return self.port_profile.port_ref.get_connector_profiles()
209 
210 class CorbaServer :
211  def __init__(self, profile, port) :
212  self.profile = profile
213  self.port = port
214  self.name = profile.instance_name
215  self.type = profile.type_name
216  self.ref = None
217  ref_key = 'port.' + self.type + '.' + self.name
218  self.ref=self.port.con.prop_dict[ref_key]
219 #
220 # if we import stubs before we create instances,
221 # we rarely need to narrow the object references.
222 # we need to specify global symbol table to evaluate class symbols.
223 #
224  def narrow_ref(self, gls) :
225  if self.type.find('::') == -1 :
226  self.narrow_sym = eval('_GlobalIDL.' + self.type, gls)
227  else :
228  self.narrow_sym = eval(self.type.replace('::','.'), gls)
229  self.ref = self.ref._narrow(self.narrow_sym)
230 
231 class CorbaClient :
232  def __init__(self, profile) :
233  self.profile = profile
234  self.name = profile.instance_name
235  self.type = profile.type_name
236 #
237 # to connect to an outside corba client,
238 # we need an implementation of the corresponding corba server.
239 # but ....
240 #
241 
242 class RtcService(Port) :
243  def __init__(self, profile,nv_dict=None) :
244  Port.__init__(self, profile, nv_dict)
245  self.con = ServiceConnector([self])
246  self.get_info()
247  self.provided={}
248  self.required={}
249  tmp = self.port_profile.interfaces
250  for itf in tmp :
251  if itf.polarity == RTC.PROVIDED :
252  self.provided[itf.instance_name] = CorbaServer(itf,self)
253  elif itf.polarity == RTC.REQUIRED :
254  self.required[itf.instance_name] = CorbaClient(itf)
255 
256 class RtcInport(Port) :
257  def __init__(self, profile, nv_dict=None) :
258  Port.__init__(self, profile, nv_dict)
259  self.con = IOConnector([self])
260  self.get_info()
261 # self.ref = self.con.prop_dict['dataport.corba_any.inport_ref']
262  self.ref = self.con.prop_dict['dataport.corba_cdr.inport_ref']
263  self.data_class = eval('RTC.' + self.prop['dataport.data_type'])
264  self.data_tc = eval('RTC._tc_' + self.prop['dataport.data_type'])
265  def write(self,data) :
266  self.ref.put(CORBA.Any(self.data_tc,
267  self.data_class(RTC.Time(0,0),data)))
268 
269 class RtcOutport(Port) :
270  def __init__(self, profile,nv_dict=None) :
271  Port.__init__(self, profile, nv_dict)
272  self.con = IOConnector([self])
273  self.get_info()
274  if 'dataport.corba_any.outport_ref' in self.con.prop_dict :
275  self.ref = self.con.prop_dict['dataport.corba_cdr.outport_ref']
276 # self.ref = self.con.prop_dict['dataport.corba_any.outport_ref']
277  else :
278  self.ref=None
279  self.data_class = eval('RTC.' + self.prop['dataport.data_type'])
280  self.data_tc = eval('RTC._tc_' + self.prop['dataport.data_type'])
281 
282  def read(self) :
283  if self.ref :
284  return self.ref.get().value()
285  else :
286  print "not supported"
287  return None
288 #
289 # RtcHandle
290 #
291 class RtcHandle :
292  def __init__(self, name, env, ref=None) :
293  self.name = name
294  self.env = env
295  if ref :
296  self.rtc_ref = ref
297  else :
298  self.rtc_ref = env.naming.resolve(name)._narrow(RTC.RTObject)
299  self.conf_ref = None
300  self.retrieve_info()
301 
302  def retrieve_info(self) :
303  self.conf_set={}
304  self.conf_set_data={}
305  self.port_refs = []
306  self.execution_contexts =[]
307  if self.rtc_ref :
308  self.conf_ref = self.rtc_ref.get_configuration()
309  conf_set = self.conf_ref.get_configuration_sets()
310  for cc in conf_set :
311  self.conf_set[cc.id]=cc
312  self.conf_set_data[cc.id]=nvlist2dict(cc.configuration_data)
313  self.profile = self.rtc_ref.get_component_profile()
314  self.prop = nvlist2dict(self.profile.properties)
315  #self.execution_contexts = self.rtc_ref.get_contexts()
316  self.execution_contexts = self.rtc_ref.get_owned_contexts()
317  self.port_refs = self.rtc_ref.get_ports()
318  # this includes inports, outports and service ports
319  self.ports = {}
320  self.services = {}
321  self.inports = {}
322  self.outports = {}
323  for pp in self.port_refs :
324  tmp = pp.get_port_profile()
325  tmp_prop = nvlist2dict(tmp.properties)
326 # self.ports[tmp.name]=Port(tmp, tmp_prop)
327  if tmp_prop['port.port_type']=='DataInPort' :
328  self.inports[tmp.name]=RtcInport(tmp,tmp_prop)
329 # self.inports[tmp.name]=Port(tmp, tmp_prop)
330  elif tmp_prop['port.port_type']=='DataOutPort' :
331  self.outports[tmp.name]=RtcOutport(tmp, tmp_prop)
332 # self.outports[tmp.name]=Port(tmp, tmp_prop)
333  elif tmp_prop['port.port_type']=='CorbaPort' :
334  self.services[tmp.name]=RtcService(tmp, tmp_prop)
335 # self.services[tmp.name]=Port(tmp, tmp_prop)
336 
337  def set_conf(self,conf_set_name,param_name,value) :
338  conf_set=self.conf_set[conf_set_name]
339  conf_set_data=self.conf_set_data[conf_set_name]
340  conf_set_data[param_name]=value
341  conf_set.configuration_data=dict2nvlist(conf_set_data)
342  self.conf_ref.set_configuration_set_values(conf_set_name,conf_set)
343  def set_conf_activate(self,conf_set_name,param_name,value) :
344  self.set_conf(conf_set_name,param_name,value)
345  self.conf_ref.activate_configuration_set(conf_set_name)
346  def activate(self):
347  return self.execution_contexts[0].activate_component(self.rtc_ref)
348  def deactivate(self):
349  return self.execution_contexts[0].deactivate_component(self.rtc_ref)
350 #
351 # pipe
352 # a pipe is an port (interface & implementation)
353 # whhich corresponds to an outside port interface.
354 # you can subscribe and communicate to the outside port with the pipe.
355 # you need an rtc implementation to use pipes.
356 #
357 class Pipe :
358  pass
359 
360 class PipeOut(Pipe):
361  def __init__(self, name, data_buf, size=8) :
362  self.name = name
363  self.data = data_buf
364  self.OpenRTM.InPort(name,data_buf,OpenRTM.RingBuffer(size))
def __init__(self, profile, nv_dict=None)
def __init__(self, profile)
def __init__(self, plist, name=None, id="", prop_dict={})
def list_obj1(self, name_context, parent)
def __init__(self, name, data_buf, size=8)
def __init__(self, profile, nv_dict=None)
def dict2nvlist(dict)
def proc_bd(self, bd, name_context, parent)
def set_conf_activate(self, conf_set_name, param_name, value)
def __init__(self, profile, port)
def set_conf(self, conf_set_name, param_name, value)
def __init__(self, plist, name=None, id="", prop_dict={})
def __init__(self, name, env, ref=None)
def __init__(self, plist, name=None, id="", prop_dict={})
def __init__(self, profile, nv_dict=None)
def nvlist2dict(nvlist)
def __init__(self, profile, nv_dict=None)
def __init__(self, orb, server_name=None, naming=None)
def get_object_by_name(self, name, cl=RTC.RTObject)


openrtm_aist
Author(s): Noriaki Ando
autogenerated on Thu Jun 6 2019 19:26:00