SimpleIO/Connector.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 # -*- coding: utf-8 -*-
3 # -*- Python -*-
4 
5 import sys
6 
7 from omniORB import CORBA
8 from optparse import OptionParser, OptionError
9 
10 import RTC
11 import OpenRTM_aist
12 
13 def usage():
14  print("usage: ConnectorComp [options]")
15  print(" --flush ")
16  print(": Set subscription type Flush")
17  print(" --new ")
18  print(": Set subscription type New")
19  print(" --periodic [Hz] ")
20  print(": Set subscription type Periodic \n")
21  print("exsample:")
22  print(" ConnectorComp --flush")
23  print(" ConnectorComp --new")
24  print(" ConnectorComp --new --policy ALL")
25  print(" ConnectorComp --new --policy SKIP --skip 100")
26  print(" ConnectorComp --periodic 10")
27  print(" ConnectorComp --periodic 10 --policy FIFO")
28  print(" ConnectorComp --periodic 10 --policy NEW \n")
29 
30 def main():
31 
32  # initialization of ORB
33  orb = CORBA.ORB_init(sys.argv)
34 
35  # get NamingService
36  naming = OpenRTM_aist.CorbaNaming(orb, "localhost")
37 
40 
41  # find ConsoleIn0 component
42  conin.setObject(naming.resolve("ConsoleIn0.rtc"))
43 
44  # get ports
45  inobj = conin.getObject()._narrow(RTC.RTObject)
46  pin = inobj.get_ports()
47  pin[0].disconnect_all()
48 
49 
50  # find ConsoleOut0 component
51  conout.setObject(naming.resolve("ConsoleOut0.rtc"))
52 
53  # get ports
54  outobj = conout.getObject()._narrow(RTC.RTObject)
55  pout = outobj.get_ports()
56  pout[0].disconnect_all()
57 
58 
59  # subscription type
60  subs_type = "flush"
61  period = "1.0"
62  push_policy = "new"
63  skip_count = "0"
64 
65  for arg in sys.argv[1:]:
66  if arg == "--flush":
67  subs_type = "flush"
68  break
69 
70  elif arg == "--new":
71  subs_type = "new"
72  if len(sys.argv) > 2:
73  push_policy = OpenRTM_aist.normalize([sys.argv[3]])
74  if push_policy == "skip":
75  skip_count = sys.argv[5]
76  break
77 
78  elif arg == "--periodic":
79  subs_type = "periodic"
80  period = sys.argv[2]
81  if len(sys.argv) > 3:
82  push_policy = OpenRTM_aist.normalize([sys.argv[4]])
83  if push_policy == "skip":
84  skip_count = sys.argv[6]
85  break
86 
87  # elif sbus_type == "periodic" and type(arg) == float:
88  # period = srt(arg)
89  # break
90 
91  else:
92  usage()
93 
94  print("Subscription Type: ", subs_type)
95  print("Period: ", period, " [Hz]")
96  print("push policy: ", push_policy)
97  print("skip count: ", skip_count)
98 
99  # connect ports
100  conprof = RTC.ConnectorProfile("connector0", "", [pin[0],pout[0]], [])
101  OpenRTM_aist.CORBA_SeqUtil.push_back(conprof.properties,
102  OpenRTM_aist.NVUtil.newNV("dataport.interface_type",
103  "corba_cdr"))
104 
105  OpenRTM_aist.CORBA_SeqUtil.push_back(conprof.properties,
106  OpenRTM_aist.NVUtil.newNV("dataport.dataflow_type",
107  "push"))
108 
109  OpenRTM_aist.CORBA_SeqUtil.push_back(conprof.properties,
110  OpenRTM_aist.NVUtil.newNV("dataport.subscription_type",
111  subs_type))
112 
113  if subs_type == "periodic":
114  OpenRTM_aist.CORBA_SeqUtil.push_back(conprof.properties,
115  OpenRTM_aist.NVUtil.newNV("dataport.publisher.push_rate",
116  period))
117  OpenRTM_aist.CORBA_SeqUtil.push_back(conprof.properties,
118  OpenRTM_aist.NVUtil.newNV("dataport.publisher.push_policy",
119  push_policy))
120  if push_policy == "skip":
121  OpenRTM_aist.CORBA_SeqUtil.push_back(conprof.properties,
122  OpenRTM_aist.NVUtil.newNV("dataport.publisher.skip_count",
123  skip_count))
124 
125 
126  ret,conprof = pin[0].connect(conprof)
127 
128  # activate ConsoleIn0
129  eclistin = inobj.get_owned_contexts()
130  eclistin[0].activate_component(inobj)
131 
132  # activate ConsoleOut0
133  eclistout = outobj.get_owned_contexts()
134  eclistout[0].activate_component(outobj)
135 
136 
137 if __name__ == "__main__":
138  main()
OpenRTM_aist.NVUtil.newNV
def newNV(name, value)
Create NameVale.
Definition: NVUtil.py:50
OpenRTM_aist.CORBA_SeqUtil.push_back
def push_back(seq, elem)
Push the new element back to the CORBA sequence.
Definition: CORBA_SeqUtil.py:113
OpenRTM_aist.examples.SimpleIO.Connector.usage
def usage()
Definition: SimpleIO/Connector.py:13
OpenRTM_aist.CorbaNaming.CorbaNaming
CORBA Naming Service helper class.
Definition: CorbaNaming.py:61
OpenRTM_aist.examples.SimpleIO.Connector.main
def main()
Definition: SimpleIO/Connector.py:30
OpenRTM_aist.CorbaConsumer.CorbaConsumer
Definition: CorbaConsumer.py:187


openrtm_aist_python
Author(s): Shinji Kurihara
autogenerated on Mon Apr 21 2025 02:45:06