PublisherPeriodic.py
Go to the documentation of this file.
00001 #!/usr/bin/env python
00002 # -*- coding: euc-jp -*-
00003 
00004 ##
00005 # @file  PublisherPeriodic.py
00006 # @brief PublisherPeriodic class
00007 # @date  $Date: 2007/09/28 $
00008 # @author Noriaki Ando <n-ando@aist.go.jp> and Shinji Kurihara
00009 #
00010 # Copyright (C) 2006-2008
00011 #     Noriaki Ando
00012 #     Task-intelligence Research Group,
00013 #     Intelligent Systems Research Institute,
00014 #     National Institute of
00015 #         Advanced Industrial Science and Technology (AIST), Japan
00016 #     All rights reserved.
00017 
00018 import threading
00019 from omniORB import any
00020 
00021 import OpenRTM_aist
00022 
00023 
00024 ##
00025 # @if jp
00026 # @class PublisherPeriodic
00027 # @brief PublisherPeriodic クラス
00028 #
00029 # 周期的にデータを送信するための Publisher クラス。このクラスは、通
00030 # 常 Connector 内にあって、バッファおよびコンシューマに関連付けられ
00031 # る。一定周期ごとにバッファからデータを取り出しコンシューマに対して
00032 # データを送出する。
00033 #
00034 # @else
00035 # @class PublisherPeriodic
00036 # @brief PublisherPeriodic class
00037 #
00038 # Publisher for periodic data transmitting. Usually this class
00039 # object exists in a Connector object, and it is associated with a
00040 # buffer and a consumer. This publisher periodically gets data from
00041 # the buffer and publish it into the consumer.
00042 #
00043 # @endif
00044 #
00045 class PublisherPeriodic(OpenRTM_aist.PublisherBase):
00046   """
00047   """
00048 
00049   # Policy
00050   ALL  = 0
00051   FIFO = 1
00052   SKIP = 2
00053   NEW  = 3
00054 
00055   ##
00056   # @if jp
00057   # @brief コンストラクタ
00058   #
00059   # コンストラクタ
00060   # 送出処理の呼び出し間隔を、Propertyオブジェクトのdataport.push_rateメンバ
00061   # に設定しておく必要がある。送出間隔は、Hz単位の浮動小数文字列で指定。
00062   # たとえば、1000.0Hzの場合は、「1000.0」を設定。
00063   # 上記プロパティが未設定の場合は、「1000Hz」を設定。
00064   #
00065   # @param self
00066   # @param consumer データ送出を待つコンシューマ
00067   # @param property 本Publisherの駆動制御情報を設定したPropertyオブジェクト
00068   #
00069   # @else
00070   # @brief Constructor
00071   # @endif
00072   def __init__(self):
00073     self._rtcout = OpenRTM_aist.Manager.instance().getLogbuf("PublisherPeriodic")
00074     self._consumer   = None
00075     self._buffer     = None
00076     self._task       = None
00077     self._retcode    = self.PORT_OK
00078     self._retmutex   = threading.RLock()
00079     self._pushPolicy = self.NEW
00080     self._skipn      = 0
00081     self._active     = False
00082     self._readback   = False
00083     self._leftskip   = 0
00084     self._profile    = None
00085     self._listeners  = None
00086 
00087     return
00088 
00089   ##
00090   # @if jp
00091   # @brief デストラクタ
00092   #
00093   # デストラクタ
00094   #
00095   # @param self
00096   #
00097   # @else
00098   # @brief Destructor
00099   # @endif
00100   def __del__(self):
00101     self._rtcout.RTC_TRACE("~PublisherPeriodic()")
00102     if self._task:
00103       self._task.resume()
00104       self._task.finalize()
00105       self._rtcout.RTC_PARANOID("task finalized.")
00106 
00107       OpenRTM_aist.PeriodicTaskFactory.instance().deleteObject(self._task)
00108       del self._task
00109       self._rtcout.RTC_PARANOID("task deleted.")
00110 
00111     # "consumer" should be deleted in the Connector
00112     self._consumer = None
00113     # "buffer"   should be deleted in the Connector
00114     self._buffer = None
00115     return
00116 
00117   ##
00118   # @if jp
00119   # @brief PushPolicy の設定
00120   # @else
00121   # @brief Setting PushPolicy
00122   # @endif
00123   #
00124   #void PublisherNew::setPushPolicy(const coil::Properties& prop)
00125   def setPushPolicy(self, prop):
00126     push_policy = prop.getProperty("publisher.push_policy","new")
00127     self._rtcout.RTC_DEBUG("push_policy: %s", push_policy)
00128 
00129     push_policy = OpenRTM_aist.normalize([push_policy])
00130 
00131     if push_policy == "all":
00132       self._pushPolicy = self.ALL
00133 
00134     elif push_policy == "fifo":
00135       self._pushPolicy = self.FIFO
00136 
00137     elif push_policy == "skip":
00138       self._pushPolicy = self.SKIP
00139 
00140     elif push_policy == "new":
00141       self._pushPolicy = self.NEW
00142 
00143     else:
00144       self._rtcout.RTC_ERROR("invalid push_policy value: %s", push_policy)
00145       self._pushPolicy = self.NEW
00146   
00147     skip_count = prop.getProperty("publisher.skip_count","0")
00148     self._rtcout.RTC_DEBUG("skip_count: %s", skip_count)
00149 
00150     skipn = [self._skipn]
00151     ret = OpenRTM_aist.stringTo(skipn, skip_count)
00152     if ret:
00153       self._skipn = skipn[0]
00154     else:
00155       self._rtcout.RTC_ERROR("invalid skip_count value: %s", skip_count)
00156       self._skipn = 0
00157 
00158     if self._skipn < 0:
00159       self._rtcout.RTC_ERROR("invalid skip_count value: %d", self._skipn)
00160       self._skipn = 0
00161 
00162     return
00163 
00164   ##
00165   # @if jp
00166   # @brief Task の設定
00167   # @else
00168   # @brief Setting Task
00169   # @endif
00170   #
00171   #bool PublisherNew::createTask(const coil::Properties& prop)
00172   def createTask(self, prop):
00173     factory = OpenRTM_aist.PeriodicTaskFactory.instance()
00174 
00175     th = factory.getIdentifiers()
00176     self._rtcout.RTC_DEBUG("available task types: %s", OpenRTM_aist.flatten(th))
00177 
00178     self._task = factory.createObject(prop.getProperty("thread_type", "default"))
00179     if not self._task:
00180       self._rtcout.RTC_ERROR("Task creation failed: %s",
00181                              prop.getProperty("thread_type", "default"))
00182       return self.INVALID_ARGS
00183 
00184     self._rtcout.RTC_PARANOID("Task creation succeeded.")
00185 
00186     # setting task function
00187     self._task.setTask(self.svc)
00188 
00189     # Task execution rate
00190     rate = prop.getProperty("publisher.push_rate")
00191 
00192     if rate != "":
00193       hz = float(rate)
00194       if hz == 0:
00195         hz = 1000.0
00196       self._rtcout.RTC_DEBUG("Task period %f [Hz]", hz)
00197     else:
00198       hz = 1000.0
00199 
00200     self._task.setPeriod(1.0/hz)
00201     
00202     # Measurement setting
00203     mprop = prop.getNode("measurement")
00204 
00205     self._task.executionMeasure(OpenRTM_aist.toBool(mprop.getProperty("exec_time"),
00206                                                     "enable", "disable", True))
00207     
00208     ecount = [0]
00209     if OpenRTM_aist.stringTo(ecount, mprop.getProperty("exec_count")):
00210       self._task.executionMeasureCount(ecount[0])
00211 
00212     self._task.periodicMeasure(OpenRTM_aist.toBool(mprop.getProperty("period_time"),
00213                                                    "enable", "disable", True))
00214 
00215     pcount = [0]
00216     if OpenRTM_aist.stringTo(pcount, mprop.getProperty("period_count")):
00217       self._task.periodicMeasureCount(pcount[0])
00218 
00219     # Start task in suspended mode
00220     self._task.suspend()
00221     self._task.activate()
00222     self._task.suspend()
00223 
00224     return self.PORT_OK
00225 
00226   ##
00227   # @if jp
00228   # @brief 初期化
00229   #
00230   # このクラスのオブジェクトを使用するのに先立ち、必ずこの関数を呼び
00231   # 出す必要がある。引数には、このオブジェクトの各種設定情報を含む
00232   # Properties を与える。少なくとも、送出処理の呼び出し周期を単位
00233   # Hz の数値として Propertyオブジェクトの publisher.push_rate をキー
00234   # とする要素に設定する必要がある。周期 5ms すなわち、200Hzの場合、
00235   # 200.0 を設定する。 dataport.publisher.push_rate が未設定の場合、
00236   # false が返される。データをプッシュする際のポリシーとして
00237   # publisher.push_policy をキーとする値に、all, fifo, skip, new の
00238   # いずれかを与えることができる。
00239   # 
00240   # 以下のオプションを与えることができる。
00241   # 
00242   # - publisher.thread_type: スレッドのタイプ (文字列、デフォルト: default)
00243   # - publisher.push_rate: Publisherの送信周期 (数値)
00244   # - publisher.push_policy: Pushポリシー (all, fifo, skip, new)
00245   # - publisher.skip_count: 上記ポリシが skip のときのスキップ数
00246   # - measurement.exec_time: タスク実行時間計測 (enable/disable)
00247   # - measurement.exec_count: タスク関数実行時間計測周期 (数値, 回数)
00248   # - measurement.period_time: タスク周期時間計測 (enable/disable)
00249   # - measurement.period_count: タスク周期時間計測周期 (数値, 回数)
00250   #
00251   # @param property 本Publisherの駆動制御情報を設定したPropertyオブジェクト
00252   # @return ReturnCode PORT_OK 正常終了
00253   #                    INVALID_ARGS Properties が不正な値を含む
00254   #
00255   # @else
00256   # @brief Initialization
00257   #
00258   # This function have to be called before using this class object.
00259   # Properties object that includes certain configuration
00260   # information should be given as an argument.  At least, a
00261   # numerical value of unit of Hz with the key of
00262   # "dataport.publisher.push_rate" has to be set to the Properties
00263   # object of argument.  The value is the invocation cycle of data
00264   # sending process.  In case of 5 ms period or 200 Hz, the value
00265   # should be set as 200.0. False will be returned, if there is no
00266   # value with the key of "dataport.publisher.push_rate".
00267   #
00268   # The following options are available.
00269   # 
00270   # - publisher.thread_type: Thread type (string, default: default)
00271   # - publisher.push_rate: Publisher sending period (numberical)
00272   # - publisher.push_policy: Push policy (all, fifo, skip, new)
00273   # - publisher.skip_count: The number of skip count in the "skip" policy
00274   # - measurement.exec_time: Task execution time measurement (enable/disable)
00275   # - measurement.exec_count: Task execution time measurement count
00276   #                           (numerical, number of times)
00277   # - measurement.period_time: Task period time measurement (enable/disable)
00278   # - measurement.period_count: Task period time measurement count 
00279   #                             (number, count)
00280   #
00281   # @param property Property objects that includes the control information
00282   #                 of this Publisher
00283   # @return ReturnCode PORT_OK normal return
00284   #                    INVALID_ARGS Properties with invalid values.
00285   # @endif
00286   #
00287   # PublisherBase::ReturnCode PublisherPeriodic::init(coil::Properties& prop)
00288   def init(self, prop):
00289     self._rtcout.RTC_TRACE("init()")
00290     self.setPushPolicy(prop)
00291     return self.createTask(prop)
00292   
00293   ##
00294   # @if jp
00295   # @brief InPortコンシューマのセット
00296   #
00297   # この関数では、この Publisher に関連付けられるコンシューマをセットする。
00298   # コンシューマオブジェクトがヌルポインタの場合、INVALID_ARGSが返される。
00299   # それ以外の場合は、PORT_OK が返される。
00300   #
00301   # @param consumer Consumer へのポインタ
00302   # @return ReturnCode PORT_OK 正常終了
00303   #                    INVALID_ARGS 引数に不正な値が含まれている
00304   #
00305   # @else
00306   # @brief Store InPort consumer
00307   #
00308   # This operation sets a consumer that is associated with this
00309   # object. If the consumer object is NULL, INVALID_ARGS will be
00310   # returned.
00311   #
00312   # @param consumer A pointer to a consumer object.
00313   # @return ReturnCode PORT_OK normal return
00314   #                    INVALID_ARGS given argument has invalid value
00315   #
00316   # @endif
00317   #
00318   # PublisherBase::ReturnCode
00319   # PublisherPeriodic::setConsumer(InPortConsumer* consumer)
00320   def setConsumer(self, consumer):
00321     self._rtcout.RTC_TRACE("setConsumer()")
00322 
00323     if not consumer:
00324       self._rtcout.RTC_ERROR("setConsumer(consumer = 0): invalid argument.")
00325       return self.INVALID_ARGS
00326 
00327     self._consumer = consumer
00328     return self.PORT_OK
00329   
00330   ##
00331   # @if jp
00332   # @brief バッファのセット
00333   #
00334   # この関数では、この Publisher に関連付けられるバッファをセットする。
00335   # バッファオブジェクトがヌルポインタの場合、INVALID_ARGSが返される。
00336   # それ以外の場合は、PORT_OK が返される。
00337   #
00338   # @param buffer CDR buffer へのポインタ
00339   # @return ReturnCode PORT_OK 正常終了
00340   #                    INVALID_ARGS 引数に不正な値が含まれている
00341   #
00342   # @else
00343   # @brief Setting buffer pointer
00344   #
00345   # This operation sets a buffer that is associated with this
00346   # object. If the buffer object is NULL, INVALID_ARGS will be
00347   # returned.
00348   #
00349   # @param buffer A pointer to a CDR buffer object.
00350   # @return ReturnCode PORT_OK normal return
00351   #                    INVALID_ARGS given argument has invalid value
00352   #
00353   # @endif
00354   #
00355   # PublisherBase::ReturnCode PublisherPeriodic::setBuffer(CdrBufferBase* buffer)
00356   def setBuffer(self, buffer):
00357     self._rtcout.RTC_TRACE("setBuffer()")
00358     
00359     if not buffer:
00360       self._rtcout.RTC_ERROR("setBuffer(buffer == 0): invalid argument")
00361       return self.INVALID_ARGS
00362 
00363     self._buffer = buffer
00364     return self.PORT_OK
00365 
00366   ##
00367   # @if jp
00368   # @brief リスナを設定する。
00369   #
00370   # Publisher に対してリスナオブジェクト ConnectorListeners を設定する。
00371   # 各種リスナオブジェクトを含む ConnectorListeners をセットすることで、
00372   # バッファの読み書き、データの送信時等にこれらのリスナをコールする。
00373   # ConnectorListeners オブジェクトの所有権はポートまたは RTObject が持ち
00374   # Publisher 削除時に ConnectorListeners は削除されることはない。
00375   # ConnectorListeners がヌルポインタの場合 INVALID_ARGS を返す。
00376   #
00377   # @param info ConnectorProfile をローカル化したオブジェクト ConnectorInfo
00378   # @param listeners リスナを多数保持する ConnectorListeners オブジェクト
00379   # @return PORT_OK      正常終了
00380   #         INVALID_ARGS 不正な引数
00381   # @else
00382   # @brief Set the listener. 
00383   #
00384   # This function sets ConnectorListeners listener object to the
00385   # Publisher. By setting ConnectorListeners containing various
00386   # listeners objects, these listeners are called at the time of
00387   # reading and writing of a buffer, and transmission of data
00388   # etc. Since the ownership of the ConnectorListeners object is
00389   # owned by Port or RTObject, the Publisher never deletes the
00390   # ConnectorListeners object. If the given ConnectorListeners'
00391   # pointer is NULL, this function returns INVALID_ARGS.
00392   #
00393   # @param info ConnectorInfo that is localized object of ConnectorProfile
00394   # @param listeners ConnectorListeners that holds various listeners
00395   # @return PORT_OK      Normal return
00396   #         INVALID_ARGS Invalid arguments
00397   # @endif
00398   #
00399   #PublisherBase::ReturnCode
00400   #PublisherPeriodic::setListener(ConnectorInfo& info,
00401   #                               ConnectorListeners* listeners)
00402   def setListener(self, info, listeners):
00403     self._rtcout.RTC_TRACE("setListeners()")
00404 
00405     if not listeners:
00406       self._rtcout.RTC_ERROR("setListeners(listeners == 0): invalid argument")
00407       return self.INVALID_ARGS
00408 
00409     self._profile = info
00410     self._listeners = listeners
00411     return self.PORT_OK
00412 
00413   ##
00414   # @if jp
00415   # @brief データを書き込む
00416   #
00417   # Publisher が保持するバッファに対してデータを書き込む。コンシュー
00418   # マ、バッファ、リスナ等が適切に設定されていない等、Publisher オブ
00419   # ジェクトが正しく初期化されていない場合、この関数を呼び出すとエラー
00420   # コード PRECONDITION_NOT_MET が返され、バッファへの書き込み等の操
00421   # 作は一切行われない。
00422   #
00423   # バッファへの書き込みと、InPortへのデータの送信は非同期的に行われ
00424   # るため、この関数は、InPortへのデータ送信の結果を示す、
00425   # CONNECTION_LOST, BUFFER_FULL などのリターンコードを返すことがあ
00426   # る。この場合、データのバッファへの書き込みは行われない。
00427   #
00428   # バッファへの書き込みに対して、バッファがフル状態、バッファのエ
00429   # ラー、バッファへの書き込みがタイムアウトした場合、バッファの事前
00430   # 条件が満たされない場合にはそれぞれ、エラーコード BUFFER_FULL,
00431   # BUFFER_ERROR, BUFFER_TIMEOUT, PRECONDITION_NOT_MET が返される。
00432   #
00433   # これら以外のエラーの場合、PORT_ERROR が返される。
00434   # 
00435   #
00436   # @param data 書き込むデータ 
00437   # @param sec タイムアウト時間
00438   # @param nsec タイムアウト時間
00439   #
00440   # @return PORT_OK             正常終了
00441   #         PRECONDITION_NO_MET consumer, buffer, listener等が適切に設定
00442   #                             されていない等、このオブジェクトの事前条件
00443   #                             を満たさない場合。
00444   #         CONNECTION_LOST     接続が切断されたことを検知した。
00445   #         BUFFER_FULL         バッファがフル状態である。
00446   #         BUFFER_ERROR        バッファに何らかのエラーが生じた場合。
00447   #         NOT_SUPPORTED       サポートされない操作が行われた。
00448   #         TIMEOUT             タイムアウトした。
00449   #
00450   # @else
00451   # @brief Write data 
00452   #
00453   # This function writes data into the buffer associated with this
00454   # Publisher.  If a Publisher object calls this function, without
00455   # initializing correctly such as a consumer, a buffer, listeners,
00456   # etc., error code PRECONDITION_NOT_MET will be returned and no
00457   # operation of the writing to a buffer etc. will be performed.
00458   #
00459   # Since writing into the buffer and sending data to InPort are
00460   # performed asynchronously, occasionally this function returns
00461   # return-codes such as CONNECTION_LOST and BUFFER_FULL that
00462   # indicate the result of sending data to InPort. In this case,
00463   # writing data into buffer will not be performed.
00464   #
00465   # When publisher writes data to the buffer, if the buffer is
00466   # filled, returns error, is returned with timeout and returns
00467   # precondition error, error codes BUFFER_FULL, BUFFER_ERROR,
00468   # BUFFER_TIMEOUT and PRECONDITION_NOT_MET will be returned
00469   # respectively.
00470   #
00471   # In other cases, PROT_ERROR will be returned.
00472   #
00473   # @param data Data to be wrote to the buffer
00474   # @param sec Timeout time in unit seconds
00475   # @param nsec Timeout time in unit nano-seconds
00476   # @return PORT_OK             Normal return
00477   #         PRECONDITION_NO_MET Precondition does not met. A consumer,
00478   #                             a buffer, listenes are not set properly.
00479   #         CONNECTION_LOST     detected that the connection has been lost
00480   #         BUFFER_FULL         The buffer is full status.
00481   #         BUFFER_ERROR        Some kind of error occurred in the buffer.
00482   #         NOT_SUPPORTED       Some kind of operation that is not supported
00483   #                             has been performed.
00484   #         TIMEOUT             Timeout occurred when writing to the buffer.
00485   #
00486   # @endif
00487   # 
00488   # PublisherBase::ReturnCode
00489   # PublisherPeriodic::write(const cdrMemoryStream& data,
00490   #                          unsigned long sec,
00491   #                          unsigned long usec)
00492   def write(self, data, sec, usec):
00493     self._rtcout.RTC_PARANOID("write()")
00494 
00495     if not self._consumer or not self._buffer or not self._listeners:
00496       return self.PRECONDITION_NOT_MET
00497 
00498     if self._retcode == self.CONNECTION_LOST:
00499       self._rtcout.RTC_DEBUG("write(): connection lost.")
00500       return self._retcode
00501 
00502     if self._retcode == self.SEND_FULL:
00503       self._rtcout.RTC_DEBUG("write(): InPort buffer is full.")
00504       self._buffer.write(data,sec,usec)
00505       return self.BUFFER_FULL
00506 
00507     self.onBufferWrite(data)
00508     ret = self._buffer.write(data, sec, usec)
00509     self._rtcout.RTC_DEBUG("%s = write()", OpenRTM_aist.DataPortStatus.toString(ret))
00510     self._task.resume()
00511     return self.convertReturn(ret, data)
00512 
00513   ##
00514   # @if jp
00515   #
00516   # @brief アクティブ化確認
00517   # 
00518   # Publisher はデータポートと同期して activate/deactivate される。
00519   # activate() / deactivate() 関数によって、アクティブ状態と非アクティ
00520   # ブ状態が切り替わる。この関数により、現在アクティブ状態か、非アク
00521   # ティブ状態かを確認することができる。
00522   #
00523   # @return 状態確認結果(アクティブ状態:true、非アクティブ状態:false)
00524   #
00525   # @else
00526   #
00527   # @brief If publisher is active state
00528   # 
00529   # A Publisher can be activated/deactivated synchronized with the
00530   # data port.  The active state and the non-active state are made
00531   # transition by the "activate()" and the "deactivate()" functions
00532   # respectively. This function confirms if the publisher is in
00533   # active state.
00534   #
00535   # @return Result of state confirmation
00536   #         (Active state:true, Inactive state:false)
00537   #
00538   # @endif
00539   #
00540   # bool PublisherPeriodic::isActive()
00541   def isActive(self):
00542     return self._active
00543 
00544   ##
00545   # @if jp
00546   # @brief アクティブ化する
00547   #
00548   # Publisher をアクティブ化する。この関数を呼び出すことにより、
00549   # Publisherが持つ、データを送信するスレッドが動作を開始する。初期
00550   # 化が行われていないなどにより、事前条件を満たさない場合、エラーコー
00551   # ド PRECONDITION_NOT_MET を返す。
00552   #
00553   # @return PORT_OK 正常終了
00554   #         PRECONDITION_NOT_MET 事前条件を満たさない
00555   #
00556   # @else
00557   # @brief activation
00558   #
00559   # This function activates the publisher. By calling this
00560   # function, this publisher starts the thread that pushes data to
00561   # InPort. If precondition such as initialization process and so
00562   # on is not met, the error code PRECONDITION_NOT_MET is returned.
00563   #
00564   # @return PORT_OK normal return
00565   #         PRECONDITION_NOT_MET precondition is not met
00566   #
00567   # @endif
00568   #
00569   # PublisherBase::ReturnCode PublisherPeriodic::activate()
00570   def activate(self):
00571     if not self._task or not self._buffer:
00572       return self.PRECONDITION_NOT_MET
00573     self._active = True
00574     self._task.resume()
00575     return self.PORT_OK
00576 
00577   ##
00578   # @if jp
00579   # @brief 非アクティブ化する
00580   #
00581   # Publisher を非アクティブ化する。この関数を呼び出すことにより、
00582   # Publisherが持つ、データを送信するスレッドが動作を停止する。初期
00583   # 化が行われていないなどにより、事前条件を満たさない場合、エラーコー
00584   # ド PRECONDITION_NOT_MET を返す。
00585   #
00586   # @return PORT_OK 正常終了
00587   #         PRECONDITION_NOT_MET 事前条件を満たさない
00588   #
00589   # @else
00590   # @brief deactivation
00591   #
00592   # This function deactivates the publisher. By calling this
00593   # function, this publisher stops the thread that pushes data to
00594   # InPort. If precondition such as initialization process and so
00595   # on is not met, the error code PRECONDITION_NOT_MET is returned.
00596   #
00597   # @return PORT_OK normal return
00598   #         PRECONDITION_NOT_MET precondition is not met
00599   #
00600   # @endif
00601   #
00602   # PublisherBase::ReturnCode PublisherPeriodic::deactivate()
00603   def deactivate(self):
00604     if not self._task:
00605       return self.PRECONDITION_NOT_MET
00606     self._active = False
00607     self._task.suspend()
00608     return self.PORT_OK
00609 
00610   ##
00611   # @if jp
00612   # @brief スレッド実行関数
00613   # @else
00614   # @brief Thread execution function
00615   # A task execution function to be executed by coil::PeriodicTask.
00616   # @endif
00617   #
00618   # int PublisherPeriodic::svc(void)
00619   def svc(self):
00620     guard = OpenRTM_aist.ScopedLock(self._retmutex)
00621 
00622     if self._pushPolicy == self.ALL:
00623       self._retcode = self.pushAll()
00624       return 0
00625 
00626     elif self._pushPolicy == self.FIFO:
00627       self._retcode = self.pushFifo()
00628       return 0
00629 
00630     elif self._pushPolicy == self.SKIP:
00631       self._retcode = self.pushSkip()
00632       return 0
00633 
00634     elif self._pushPolicy == self.NEW:
00635       self._retcode = self.pushNew()
00636       return 0
00637 
00638     else:
00639       self._retcode = self.pushNew()
00640 
00641     return 0
00642   
00643   ##
00644   # @brief push all policy
00645   #
00646   # PublisherBase::ReturnCode PublisherPeriodic::pushAll()
00647   def pushAll(self):
00648     self._rtcout.RTC_TRACE("pushAll()")
00649 
00650     if not self._buffer:
00651       return self.PRECONDITION_NOT_MET      
00652 
00653     if self.bufferIsEmpty():
00654       return self.BUFFER_EMPTY
00655 
00656     while self._buffer.readable() > 0:
00657       cdr = self._buffer.get()
00658       self.onBufferRead(cdr)
00659 
00660       self.onSend(cdr)
00661       ret = self._consumer.put(cdr)
00662 
00663       if ret != self.PORT_OK:
00664         self._rtcout.RTC_DEBUG("%s = consumer.put()", OpenRTM_aist.DataPortStatus.toString(ret))
00665         return self.invokeListener(ret, cdr)
00666 
00667       self.onReceived(cdr)
00668       self._buffer.advanceRptr()
00669 
00670     return self.PORT_OK
00671 
00672 
00673   ##
00674   # @brief push "fifo" policy
00675   #
00676   # PublisherBase::ReturnCode PublisherPeriodic::pushFifo()
00677   def pushFifo(self):
00678     self._rtcout.RTC_TRACE("pushFifo()")
00679     if not self._buffer:
00680       return self.PRECONDITION_NOT_MET      
00681 
00682     if self.bufferIsEmpty():
00683       return self.BUFFER_EMPTY
00684 
00685     cdr = self._buffer.get()
00686     self.onBufferRead(cdr)
00687 
00688     self.onSend(cdr)
00689     ret = self._consumer.put(cdr)
00690 
00691     if ret != self.PORT_OK:
00692       self._rtcout.RTC_DEBUG("%s = consumer.put()",OpenRTM_aist.DataPortStatus.toString(ret))
00693       return self.invokeListener(ret, cdr)
00694 
00695     self.onReceived(cdr)
00696     self._buffer.advanceRptr()
00697     
00698     return self.PORT_OK
00699 
00700 
00701   ##
00702   # @brief push "skip" policy
00703   #
00704   # PublisherBase::ReturnCode PublisherPeriodic::pushSkip()
00705   def pushSkip(self):
00706     self._rtcout.RTC_TRACE("pushSkip()")
00707     if not self._buffer:
00708       return self.PRECONDITION_NOT_MET      
00709 
00710     if self.bufferIsEmpty():
00711       return self.BUFFER_EMPTY
00712 
00713     ret = self.PORT_OK
00714     preskip  = self._buffer.readable() + self._leftskip
00715     loopcnt  = preskip / (self._skipn + 1)
00716     postskip = self._skipn - self._leftskip
00717     for i in range(loopcnt):
00718       self._buffer.advanceRptr(postskip)
00719       cdr = self._buffer.get()
00720       self.onBufferRead(cdr)
00721 
00722       self.onSend(cdr)
00723       ret = self._consumer.put(cdr)
00724       if ret != self.PORT_OK:
00725         self._buffer.advanceRptr(-postskip)
00726         self._rtcout.RTC_DEBUG("%s = consumer.put()",OpenRTM_aist.DataPortStatus.toString(ret))
00727         return self.invokeListener(ret, cdr)
00728       self.onReceived(cdr)
00729       postskip = self._skipn + 1
00730 
00731     self._buffer.advanceRptr(self._buffer.readable())
00732     self._leftskip = preskip % (self._skipn + 1)
00733     
00734     return ret
00735 
00736 
00737   ##
00738   # @brief push "new" policy
00739   #
00740   # PublisherBase::ReturnCode PublisherPeriodic::pushNew()
00741   def pushNew(self):
00742     self._rtcout.RTC_TRACE("pushNew()")
00743     if not self._buffer:
00744       return self.PRECONDITION_NOT_MET      
00745 
00746     if self.bufferIsEmpty():
00747       return self.BUFFER_EMPTY
00748 
00749     # In case of the periodic/push_new policy, the buffer should
00750     # allow readback. But, readback flag should be set as "true"
00751     # after written at least one datum into the buffer.
00752     self._readback = True
00753 
00754     self._buffer.advanceRptr(self._buffer.readable() - 1)
00755     
00756     cdr = self._buffer.get()
00757     self.onBufferRead(cdr)
00758 
00759     self.onSend(cdr)
00760     ret = self._consumer.put(cdr)
00761     
00762     if ret != self.PORT_OK:
00763       self._rtcout.RTC_DEBUG("%s = consumer.put()", OpenRTM_aist.DataPortStatus.toString(ret))
00764       return self.invokeListener(ret, cdr)
00765 
00766     self.onReceived(cdr)
00767 
00768     self._buffer.advanceRptr()
00769     return self.PORT_OK
00770 
00771   ##
00772   # @if jp
00773   # @brief BufferStatus から DataPortStatus への変換
00774   #
00775   # バッファからの戻り値を DataPortStatus::Enum 型へ変換する関数。そ
00776   # れぞれ、以下のように変換される。変換時にコールバックを呼ぶ場合、
00777   # コールバク関数も付記する。
00778   # 
00779   # - BUFFER_OK: PORT_OK
00780   #  - None
00781   # - BUFFER_ERROR: BUFFER_ERROR
00782   #  - None
00783   # - BUFFER_FULL: BUFFER_FULL
00784   #  - onBufferFull()
00785   # - NOT_SUPPORTED: PORT_ERROR
00786   #  - None
00787   # - TIMEOUT: BUFFER_TIMEOUT
00788   #  - onBufferWriteTimeout()
00789   # - PRECONDITION_NOT_MET: PRECONDITION_NOT_MET
00790   #  - None
00791   # - other: PORT_ERROR
00792   #  - None
00793   #
00794   # @param status BufferStatus
00795   # @param data cdrMemoryStream
00796   # @return DataPortStatu 型のリターンコード
00797   #
00798   # @else
00799   # @brief Convertion from BufferStatus to DataPortStatus
00800   # 
00801   # This function converts return value from the buffer to
00802   # DataPortStatus::Enum typed return value. The conversion rule is
00803   # as follows. Callback functions are also shown, if it exists.
00804   # 
00805   # - BUFFER_OK: PORT_OK
00806   #  - None
00807   # - BUFFER_ERROR: BUFFER_ERROR
00808   #  - None
00809   # - BUFFER_FULL: BUFFER_FULL
00810   #  - onBufferFull()
00811   # - NOT_SUPPORTED: PORT_ERROR
00812   #  - None
00813   # - TIMEOUT: BUFFER_TIMEOUT
00814   #  - onBufferWriteTimeout()
00815   # - PRECONDITION_NOT_MET: PRECONDITION_NOT_MET
00816   #  - None
00817   # - other: PORT_ERROR
00818   #  - None
00819   #
00820   # @param status BufferStatus
00821   # @param data cdrMemoryStream
00822   # @return DataPortStatus typed return code
00823   #
00824   # @endif
00825   #
00826   # PublisherBase::ReturnCodea
00827   # PublisherPeriodic::convertReturn(BufferStatus::Enum status,
00828   #                                  const cdrMemoryStream& data)
00829   def convertReturn(self, status, data):
00830     if status == OpenRTM_aist.BufferStatus.BUFFER_OK:
00831       return self.PORT_OK
00832 
00833     elif status == OpenRTM_aist.BufferStatus.BUFFER_ERROR:
00834       return self.BUFFER_ERROR
00835 
00836     elif status == OpenRTM_aist.BufferStatus.BUFFER_FULL:
00837       self.onBufferFull(data)
00838       return self.BUFFER_FULL
00839 
00840     elif status == OpenRTM_aist.BufferStatus.NOT_SUPPORTED:
00841       return self.PORT_ERROR
00842 
00843     elif status == OpenRTM_aist.BufferStatus.TIMEOUT:
00844       self.onBufferWriteTimeout(data)
00845       return self.BUFFER_TIMEOUT
00846 
00847     elif status == OpenRTM_aist.BufferStatus.PRECONDITION_NOT_MET:
00848       return self.PRECONDITION_NOT_MET
00849 
00850     else:
00851       return self.PORT_ERROR
00852     
00853     return self.PORT_ERROR
00854 
00855   ##
00856   # @if jp
00857   # @brief DataPortStatusに従ってリスナへ通知する関数を呼び出す。
00858   #
00859   # @param status DataPortStatus
00860   # @param data cdrMemoryStream
00861   # @return リターンコード
00862   #
00863   # @else
00864   # @brief Call listeners according to the DataPortStatus
00865   #
00866   # @param status DataPortStatus
00867   # @param data cdrMemoryStream
00868   # @return Return code
00869   #
00870   # @endif
00871   #
00872   # PublisherPeriodic::ReturnCode
00873   # PublisherPeriodic::invokeListener(DataPortStatus::Enum status,
00874   #                                   const cdrMemoryStream& data)
00875   def invokeListener(self, status, data):
00876     # ret:
00877     # PORT_OK, PORT_ERROR, SEND_FULL, SEND_TIMEOUT, CONNECTION_LOST,
00878     # UNKNOWN_ERROR
00879     if status == self.PORT_ERROR:
00880       self.onReceiverError(data)
00881       return self.PORT_ERROR
00882         
00883     elif status == self.SEND_FULL:
00884       self.onReceiverFull(data)
00885       return self.SEND_FULL
00886         
00887     elif status == self.SEND_TIMEOUT:
00888       self.onReceiverTimeout(data)
00889       return self.SEND_TIMEOUT
00890         
00891     elif status == self.CONNECTION_LOST:
00892       self.onReceiverError(data)
00893       return self.CONNECTION_LOST
00894         
00895     elif status == self.UNKNOWN_ERROR:
00896       self.onReceiverError(data)
00897       return self.UNKNOWN_ERROR
00898         
00899     else:
00900       self.onReceiverError(data)
00901       return self.PORT_ERROR
00902 
00903   ##
00904   # @if jp
00905   # @brief ON_BUFFER_WRITEのリスナへ通知する。 
00906   # @param data cdrMemoryStream
00907   # @else
00908   # @brief Notify an ON_BUFFER_WRITE event to listeners
00909   # @param data cdrMemoryStream
00910   # @endif
00911   #
00912   # inline void onBufferWrite(const cdrMemoryStream& data)
00913   def onBufferWrite(self, data):
00914     if self._listeners is not None and self._profile is not None:
00915       self._listeners.connectorData_[OpenRTM_aist.ConnectorDataListenerType.ON_BUFFER_WRITE].notify(self._profile, data)
00916     return
00917 
00918   ##
00919   # @if jp
00920   # @brief ON_BUFFER_FULLリスナへイベントを通知する。 
00921   # @param data cdrMemoryStream
00922   # @else
00923   # @brief Notify an ON_BUFFER_FULL event to listeners
00924   # @param data cdrMemoryStream
00925   # @endif
00926   #
00927   # inline void onBufferFull(const cdrMemoryStream& data)
00928   def onBufferFull(self, data):
00929     if self._listeners is not None and self._profile is not None:
00930       self._listeners.connectorData_[OpenRTM_aist.ConnectorDataListenerType.ON_BUFFER_FULL].notify(self._profile, data)
00931     return
00932 
00933   ##
00934   # @if jp
00935   # @brief ON_BUFFER_WRITE_TIMEOUTのリスナへ通知する。 
00936   # @param data cdrMemoryStream
00937   # @else
00938   # @brief Notify an ON_BUFFER_WRITE_TIMEOUT event to listeners
00939   # @param data cdrMemoryStream
00940   # @endif
00941   #
00942   # inline void onBufferWriteTimeout(const cdrMemoryStream& data)
00943   def onBufferWriteTimeout(self, data):
00944     if self._listeners is not None and self._profile is not None:
00945       self._listeners.connectorData_[OpenRTM_aist.ConnectorDataListenerType.ON_BUFFER_WRITE_TIMEOUT].notify(self._profile, data)
00946     return
00947 
00948   ##
00949   # @if jp
00950   # @brief ON_BUFFER_READのリスナへ通知する。 
00951   # @param data cdrMemoryStream
00952   # @else
00953   # @brief Notify an ON_BUFFER_READ event to listeners
00954   # @param data cdrMemoryStream
00955   # @endif
00956   #
00957   #  inline void onBufferRead(const cdrMemoryStream& data)
00958   def onBufferRead(self, data):
00959     if self._listeners is not None and self._profile is not None:
00960       self._listeners.connectorData_[OpenRTM_aist.ConnectorDataListenerType.ON_BUFFER_READ].notify(self._profile, data)
00961     return
00962 
00963   ##
00964   # @if jp
00965   # @brief ON_SENDのリスナへ通知する。 
00966   # @param data cdrMemoryStream
00967   # @else
00968   # @brief Notify an ON_SEND event to listners
00969   # @param data cdrMemoryStream
00970   # @endif
00971   #
00972   # inline void onSend(const cdrMemoryStream& data)
00973   def onSend(self, data):
00974     if self._listeners is not None and self._profile is not None:
00975       self._listeners.connectorData_[OpenRTM_aist.ConnectorDataListenerType.ON_SEND].notify(self._profile, data)
00976     return
00977 
00978   ##
00979   # @if jp
00980   # @brief ON_RECEIVEDのリスナへ通知する。 
00981   # @param data cdrMemoryStream
00982   # @else
00983   # @brief Notify an ON_RECEIVED event to listeners
00984   # @param data cdrMemoryStream
00985   # @endif
00986   #
00987   # inline void onReceived(const cdrMemoryStream& data)
00988   def onReceived(self, data):
00989     if self._listeners is not None and self._profile is not None:
00990       self._listeners.connectorData_[OpenRTM_aist.ConnectorDataListenerType.ON_RECEIVED].notify(self._profile, data)
00991     return
00992 
00993   ##
00994   # @if jp
00995   # @brief ON_RECEIVER_FULLのリスナへ通知する。 
00996   # @param data cdrMemoryStream
00997   # @else
00998   # @brief Notify an ON_RECEIVER_FULL event to listeners
00999   # @param data cdrMemoryStream
01000   # @endif
01001   #
01002   # inline void onReceiverFull(const cdrMemoryStream& data)
01003   def onReceiverFull(self, data):
01004     if self._listeners is not None and self._profile is not None:
01005       self._listeners.connectorData_[OpenRTM_aist.ConnectorDataListenerType.ON_RECEIVER_FULL].notify(self._profile, data)
01006     return
01007 
01008   ##
01009   # @if jp
01010   # @brief ON_RECEIVER_TIMEOUTのリスナへ通知する。 
01011   # @param data cdrMemoryStream
01012   # @else
01013   # @brief Notify an ON_RECEIVER_TIMEOUT event to listeners
01014   # @param data cdrMemoryStream
01015   # @endif
01016   #
01017   # inline void onReceiverTimeout(const cdrMemoryStream& data)
01018   def onReceiverTimeout(self, data):
01019     if self._listeners is not None and self._profile is not None:
01020       self._listeners.connectorData_[OpenRTM_aist.ConnectorDataListenerType.ON_RECEIVER_TIMEOUT].notify(self._profile, data)
01021     return
01022 
01023   ##
01024   # @if jp
01025   # @brief ON_RECEIVER_ERRORのリスナへ通知する。 
01026   # @param data cdrMemoryStream
01027   # @else
01028   # @brief Notify an ON_RECEIVER_ERROR event to listeners
01029   # @param data cdrMemoryStream
01030   # @endif
01031   #
01032   # inline void onReceiverError(const cdrMemoryStream& data)
01033   def onReceiverError(self, data):
01034     if self._listeners is not None and self._profile is not None:
01035       self._listeners.connectorData_[OpenRTM_aist.ConnectorDataListenerType.ON_RECEIVER_ERROR].notify(self._profile, data)
01036     return
01037 
01038 
01039   ##
01040   # @if jp
01041   # @brief ON_BUFFER_EMPTYのリスナへ通知する。 
01042   # @else
01043   # @brief Notify an ON_BUFFER_EMPTY event to listeners
01044   # @endif
01045   #
01046   #inline void onBufferEmpty()
01047   def onBufferEmpty(self):
01048     if self._listeners is not None and self._profile is not None:
01049       self._listeners.connector_[OpenRTM_aist.ConnectorListenerType.ON_BUFFER_EMPTY].notify(self._profile)
01050     return
01051 
01052   ##
01053   # @if jp
01054   # @brief ON_SENDER_EMPTYのリスナへ通知する。 
01055   # @else
01056   # @brief Notify an ON_SENDER_EMPTY event to listeners
01057   # @endif
01058   #
01059   # inline void onSenderEmpty()
01060   def onSenderEmpty(self):
01061     if self._listeners is not None and self._profile is not None:
01062       self._listeners.connector_[OpenRTM_aist.ConnectorListenerType.ON_SENDER_EMPTY].notify(self._profile)
01063     return
01064 
01065   ##
01066   # @if jp
01067   # @brief ON_SENDER_ERRORのリスナへ通知する。 
01068   # @else
01069   # @brief Notify an ON_SENDER_ERROR event to listeners
01070   # @endif
01071   #
01072   # inline void onSenderError()
01073   def onSenderError(self):
01074     if self._listeners is not None and self._profile is not None:
01075       self._listeners.connector_[OpenRTM_aist.ConnectorListenerType.ON_SENDER_ERROR].notify(self._profile)
01076     return
01077 
01078 
01079   ##
01080   # @if jp
01081   # @brief バッファが空かどうかをチェックする。x
01082   # @else
01083   # @brief Whether a buffer is empty.
01084   # @endif
01085   #
01086   # bool bufferIsEmpty()
01087   def bufferIsEmpty(self):
01088     if self._buffer and self._buffer.empty() and  not self._readback:
01089       self._rtcout.RTC_DEBUG("buffer empty")
01090       self.onBufferEmpty()
01091       self.onSenderEmpty()
01092       return True
01093 
01094     return False
01095 
01096 
01097 
01098 def PublisherPeriodicInit():
01099   OpenRTM_aist.PublisherFactory.instance().addFactory("periodic",
01100                                                       OpenRTM_aist.PublisherPeriodic,
01101                                                       OpenRTM_aist.Delete)


openrtm_aist_python
Author(s): Shinji Kurihara
autogenerated on Thu Aug 27 2015 14:17:28