SeqOut.py
Go to the documentation of this file.
00001 #!/usr/bin/env python
00002 # -*- coding: utf-8 -*-
00003 # -*- Python -*-
00004 
00005 import sys
00006 
00007 import RTC
00008 import OpenRTM_aist
00009 
00010 import random
00011 import time
00012 
00013 seqout_spec = ["implementation_id", "SeqOut",
00014                "type_name",         "SequenceOutComponent",
00015                "description",       "Sequence OutPort component",
00016                "version",           "1.0",
00017                "vendor",            "Shinji Kurihara",
00018                "category",          "example",
00019                "activity_type",     "DataFlowComponent",
00020                "max_instance",      "10",
00021                "language",          "Python",
00022                "lang_type",         "script",
00023                ""]
00024 
00025 
00026 
00027 class SeqOut(OpenRTM_aist.DataFlowComponentBase):
00028   def __init__(self, manager):
00029     OpenRTM_aist.DataFlowComponentBase.__init__(self, manager)
00030     return
00031     
00032   def onInitialize(self):
00033     self._octet     = RTC.TimedOctet(RTC.Time(0,0),0)
00034     self._short     = RTC.TimedShort(RTC.Time(0,0),0)
00035     self._long      = RTC.TimedLong(RTC.Time(0,0),0)
00036     self._float     = RTC.TimedFloat(RTC.Time(0,0),0)
00037     self._double    = RTC.TimedDouble(RTC.Time(0,0),0)
00038     self._octetSeq  = RTC.TimedOctetSeq(RTC.Time(0,0),[])
00039     self._shortSeq  = RTC.TimedShortSeq(RTC.Time(0,0),[])
00040     self._longSeq   = RTC.TimedLongSeq(RTC.Time(0,0),[])
00041     self._floatSeq  = RTC.TimedFloatSeq(RTC.Time(0,0),[])
00042     self._doubleSeq = RTC.TimedDoubleSeq(RTC.Time(0,0),[])
00043 
00044     self._octetOut     = OpenRTM_aist.OutPort("Octet", self._octet)
00045     self._shortOut     = OpenRTM_aist.OutPort("Short", self._short)
00046     self._longOut      = OpenRTM_aist.OutPort("Long", self._long)
00047     self._floatOut     = OpenRTM_aist.OutPort("Float", self._float)
00048     self._doubleOut    = OpenRTM_aist.OutPort("Double", self._double)
00049     self._octetSeqOut  = OpenRTM_aist.OutPort("OctetSeq", self._octetSeq)
00050     self._shortSeqOut  = OpenRTM_aist.OutPort("ShortSeq", self._shortSeq)
00051     self._longSeqOut   = OpenRTM_aist.OutPort("LongSeq", self._longSeq)
00052     self._floatSeqOut  = OpenRTM_aist.OutPort("FloatSeq", self._floatSeq)
00053     self._doubleSeqOut = OpenRTM_aist.OutPort("DoubleSeq", self._doubleSeq)
00054 
00055 
00056     # Set OutPort buffer
00057     self.addOutPort("Octet", self._octetOut)
00058     self.addOutPort("Short", self._shortOut)
00059     self.addOutPort("Long", self._longOut)
00060     self.addOutPort("Float", self._floatOut)
00061     self.addOutPort("Double", self._doubleOut)
00062 
00063     self.addOutPort("OctetSeq", self._octetSeqOut)
00064     self.addOutPort("ShortSeq", self._shortSeqOut)
00065     self.addOutPort("LongSeq", self._longSeqOut)
00066     self.addOutPort("FloatSeq", self._floatSeqOut)
00067     self.addOutPort("DoubleSeq", self._doubleSeqOut)
00068     return RTC.RTC_OK
00069 
00070 
00071   def onExecute(self, ec_id):
00072     octetSeq  = ""
00073     shortSeq  = []
00074     longSeq   = []
00075     floatSeq  = []
00076     doubleSeq = []
00077 
00078     self._octet.data = int(random.uniform(0x41, 0x4a))
00079     self._short.data = int(random.uniform(0, 10))
00080     self._long.data = long(random.uniform(0, 10))
00081     self._float.data = float(random.uniform(0.0, 10.0))
00082     self._double.data = float(random.uniform(0.0, 10.0))
00083 
00084     print '%3.2s   %10.8s %10.8s %10.8s %10.8s %10.8s' \
00085         % (' ', 'octet', 'short', 'long', 'float', 'double')
00086     print '%3.2s   %7s[%s] %10.8s %10.8s %10.8s %10.8s' \
00087         % (' ', self._octet.data, chr(self._octet.data), self._short.data, self._long.data, self._float.data, self._double.data)
00088     print "-------------------------------------------------------------"
00089     print "                 Sequence Data                     "
00090     print "-------------------------------------------------------------"
00091     for i in range(10):
00092       octetSeq = octetSeq + chr(int(random.uniform(0x41, 0x4a)))
00093       shortSeq.append(int(random.uniform(0, 10)))
00094       longSeq.append(long(random.uniform(0, 10)))
00095       floatSeq.append(float(random.uniform(0.0, 10.0)))
00096       doubleSeq.append(float(random.uniform(0.0, 10.0)))
00097       print '%3.2s : %7s[%s] %10.8s %10.8s %10.8s %10.8s' \
00098           % (str(i), ord(octetSeq[i]), octetSeq[i], shortSeq[i], longSeq[i], floatSeq[i], doubleSeq[i])
00099 
00100     # Moving cursor (^[[nA : n lines upward)
00101     print "\r\r\r\r\r\r\r\r\r\r\r\r\r\r\r\r"
00102         
00103     self._octetSeq.data = octetSeq
00104     self._shortSeq.data = shortSeq
00105     self._longSeq.data = longSeq
00106     self._floatSeq.data = floatSeq
00107     self._doubleSeq.data = doubleSeq
00108 
00109     self._octetOut.write()
00110     self._shortOut.write()
00111     self._longOut.write()
00112     self._floatOut.write()
00113     self._doubleOut.write()
00114     self._octetSeqOut.write()
00115     self._shortSeqOut.write()
00116     self._longSeqOut.write()
00117     self._floatSeqOut.write()
00118     self._doubleSeqOut.write()
00119 
00120     time.sleep(1)
00121 
00122     return RTC.RTC_OK
00123 
00124 
00125 def SeqOutInit(manager):
00126   profile = OpenRTM_aist.Properties(defaults_str=seqout_spec)
00127   manager.registerFactory(profile,
00128                           SeqOut,
00129                           OpenRTM_aist.Delete)
00130   return
00131 
00132 
00133 def MyModuleInit(manager):
00134   SeqOutInit(manager)
00135 
00136   # Create a component
00137   comp = manager.createComponent("SeqOut")
00138 
00139   print "Component created"
00140 
00141 
00142 def main():
00143   # Initialize manager
00144   mgr = OpenRTM_aist.Manager.init(sys.argv)
00145 
00146   # Set module initialization proceduer
00147   # This procedure will be invoked in activateManager() function.
00148   mgr.setModuleInitProc(MyModuleInit)
00149 
00150   # Activate manager and register to naming service
00151   mgr.activateManager()
00152 
00153   # run the manager in blocking mode
00154   # runManager(False) is the default
00155   mgr.runManager()
00156 
00157   # If you want to run the manager in non-blocking mode, do like this
00158   # mgr.runManager(True)
00159 
00160 if __name__ == "__main__":
00161   main()


openrtm_aist_python
Author(s): Shinji Kurihara
autogenerated on Thu Aug 27 2015 14:17:28