PublisherNew.py
Go to the documentation of this file.
00001 #!/usr/bin/env python
00002 # -*- coding: euc-jp -*-
00003 
00004 ##
00005 # @file  PublisherNew.py
00006 # @brief PublisherNew class
00007 # @date  $Date: 2007/09/27 $
00008 # @author Noriaki Ando <n-ando@aist.go.jp> and Shinji Kurihara
00009 #
00010 # Copyright (C) 2006-2008
00011 #     Noriaki Ando
00012 #     Task-intelligence Research Group,
00013 #     Intelligent Systems Research Institute,
00014 #     National Institute of
00015 #         Advanced Industrial Science and Technology (AIST), Japan
00016 #     All rights reserved.
00017  
00018 import threading
00019 
00020 import OpenRTM_aist
00021 
00022 
00023 ##
00024 # @if jp
00025 # @class PublisherNew
00026 # @brief PublisherNew クラス
00027 #
00028 # バッファ内に新規データが格納されたタイミングで、その新規データを送信する。
00029 # データ送出タイミングを待つコンシューマを、送出する側とは異なるスレッドで
00030 # 動作させる場合に使用。
00031 # Publisherの駆動は、データ送出のタイミングになるまでブロックされ、
00032 # 送出タイミングの通知を受けると、即座にコンシューマの送出処理を呼び出す。
00033 #
00034 # @else
00035 # @class PublisherNew
00036 # @brief PublisherNew class
00037 #
00038 # Send new data at timing of when it is stored into the buffer.
00039 # This class is used when operating Consumer that waits for the data send
00040 # timing in different thread from one of the send side.
00041 # Publisher's driven is blocked until the data send timing reaches, if the
00042 # send timing notification is received, the Consumer's send processing will
00043 # be invoked immediately.
00044 #
00045 # @endif
00046 class PublisherNew(OpenRTM_aist.PublisherBase):
00047   """
00048   """
00049 
00050   # Policy
00051   ALL  = 0
00052   FIFO = 1
00053   SKIP = 2
00054   NEW  = 3
00055 
00056   ##
00057   # @if jp
00058   # @brief コンストラクタ
00059   #
00060   # コンストラクタ
00061   # 本 Publisher 用新規スレッドを生成する。
00062   #
00063   # @param self
00064   # @param consumer データ送出を待つコンシューマ
00065   # @param property 本Publisherの駆動制御情報を設定したPropertyオブジェクト
00066   #                 (本Publisherでは未使用)
00067   # @else
00068   # @brief Constructor
00069   # @endif
00070   def __init__(self):
00071     self._rtcout = OpenRTM_aist.Manager.instance().getLogbuf("PublisherNew")
00072     self._consumer = None
00073     self._buffer   = None
00074     self._task     = None
00075     self._retcode  = self.PORT_OK
00076     self._retmutex = threading.RLock()
00077     self._pushPolicy = self.NEW
00078     self._skipn      = 0
00079     self._active     = False
00080     self._leftskip   = 0
00081     self._profile    = None
00082     self._listeners  = None
00083 
00084   ##
00085   # @if jp
00086   # @brief デストラクタ
00087   #
00088   # デストラクタ
00089   #
00090   # @param self
00091   #
00092   # @else
00093   # @brief Destructor
00094   #
00095   # @endif
00096   def __del__(self):
00097     self._rtcout.RTC_TRACE("~PublisherNew()")
00098     if self._task:
00099       self._task.resume()
00100       self._task.finalize()
00101 
00102       OpenRTM_aist.PeriodicTaskFactory.instance().deleteObject(self._task)
00103       del self._task
00104       self._rtcout.RTC_PARANOID("task deleted.")
00105 
00106     # "consumer" should be deleted in the Connector
00107     self._consumer = 0
00108     # "buffer"   should be deleted in the Connector
00109     self._buffer = 0
00110     return
00111 
00112   ##
00113   # @if jp
00114   # @brief PushPolicy の設定
00115   # @else
00116   # @brief Setting PushPolicy
00117   # @endif
00118   #
00119   #void PublisherNew::setPushPolicy(const coil::Properties& prop)
00120   def setPushPolicy(self, prop):
00121     push_policy = prop.getProperty("publisher.push_policy","new")
00122     self._rtcout.RTC_DEBUG("push_policy: %s", push_policy)
00123 
00124     push_policy = OpenRTM_aist.normalize([push_policy])
00125 
00126     if push_policy == "all":
00127       self._pushPolicy = self.ALL
00128 
00129     elif push_policy == "fifo":
00130       self._pushPolicy = self.FIFO
00131 
00132     elif push_policy == "skip":
00133       self._pushPolicy = self.SKIP
00134 
00135     elif push_policy == "new":
00136       self._pushPolicy = self.NEW
00137 
00138     else:
00139       self._rtcout.RTC_ERROR("invalid push_policy value: %s", push_policy)
00140       self._pushPolicy = self.NEW
00141   
00142     skip_count = prop.getProperty("publisher.skip_count","0")
00143     self._rtcout.RTC_DEBUG("skip_count: %s", skip_count)
00144 
00145     skipn = [self._skipn]
00146     ret = OpenRTM_aist.stringTo(skipn, skip_count)
00147     if ret:
00148       self._skipn = skipn[0]
00149     else:
00150       self._rtcout.RTC_ERROR("invalid skip_count value: %s", skip_count)
00151       self._skipn = 0
00152 
00153     if self._skipn < 0:
00154       self._rtcout.RTC_ERROR("invalid skip_count value: %d", self._skipn)
00155       self._skipn = 0
00156 
00157     return
00158 
00159   ##
00160   # @if jp
00161   # @brief Task の設定
00162   # @else
00163   # @brief Setting Task
00164   # @endif
00165   #
00166   #bool PublisherNew::createTask(const coil::Properties& prop)
00167   def createTask(self, prop):
00168     factory = OpenRTM_aist.PeriodicTaskFactory.instance()
00169 
00170     th = factory.getIdentifiers()
00171     self._rtcout.RTC_DEBUG("available task types: %s", OpenRTM_aist.flatten(th))
00172 
00173     self._task = factory.createObject(prop.getProperty("thread_type", "default"))
00174 
00175     if not self._task:
00176       self._rtcout.RTC_ERROR("Task creation failed: %s",
00177                              prop.getProperty("thread_type", "default"))
00178       return self.INVALID_ARGS
00179 
00180     self._rtcout.RTC_PARANOID("Task creation succeeded.")
00181 
00182     mprop = prop.getNode("measurement")
00183 
00184     # setting task function
00185     self._task.setTask(self.svc)
00186     self._task.setPeriod(0.0)
00187     self._task.executionMeasure(OpenRTM_aist.toBool(mprop.getProperty("exec_time"),
00188                                                     "enable", "disable", True))
00189     ecount = [0]
00190     if OpenRTM_aist.stringTo(ecount, mprop.getProperty("exec_count")):
00191       self._task.executionMeasureCount(ecount[0])
00192 
00193     self._task.periodicMeasure(OpenRTM_aist.toBool(mprop.getProperty("period_time"),
00194                                                    "enable", "disable", True))
00195     pcount = [0]
00196     if OpenRTM_aist.stringTo(pcount, mprop.getProperty("period_count")):
00197       self._task.periodicMeasureCount(pcount[0])
00198 
00199     self._task.suspend()
00200     self._task.activate()
00201     self._task.suspend()
00202 
00203     return self.PORT_OK
00204 
00205   ##
00206   # @if jp
00207   # @brief 初期化
00208   #
00209   # このクラスのオブジェクトを使用するのに先立ち、必ずこの関数を呼び
00210   # 出す必要がある。引数には、このオブジェクトの各種設定情報を含む
00211   # Properties を与える。データをプッシュする際のポリシーとして
00212   # publisher.push_policy をキーとする値に、all, fifo, skip, new の
00213   # いずれかを与えることができる。
00214   # 
00215   # 以下のオプションを与えることができる。
00216   # 
00217   # - thread_type: スレッドのタイプ (文字列、デフォルト: default)
00218   # - publisher.push_policy: Pushポリシー (all, fifo, skip, new)
00219   # - publisher.skip_count: 上記ポリシが skip のときのスキップ数
00220   # - measurement.exec_time: タスク実行時間計測 (enable/disable)
00221   # - measurement.exec_count: タスク関数実行時間計測周期 (数値, 回数)
00222   # - measurement.period_time: タスク周期時間計測 (enable/disable)
00223   # - measurement.period_count: タスク周期時間計測周期 (数値, 回数)
00224   #
00225   # @param property 本Publisherの駆動制御情報を設定したPropertyオブジェクト
00226   # @return ReturnCode PORT_OK 正常終了
00227   #                    INVALID_ARGS Properties が不正な値を含む
00228   #
00229   # @else
00230   # @brief Initialization
00231   #
00232   # This function have to be called before using this class object.
00233   # Properties object that includes certain configuration
00234   # information should be given as an argument.  all, fifo, skip,
00235   # new can be given as a data push policy in a value of the key
00236   # "publisher.push_policy."
00237   #
00238   # The following options are available.
00239   # 
00240   # - thread_type: Thread type (string, default: default)
00241   # - publisher.push_policy: Push policy (all, fifo, skip, new)
00242   # - publisher.skip_count: The number of skip count in the "skip" policy
00243   # - measurement.exec_time: Task execution time measurement (enable/disable)
00244   # - measurement.exec_count: Task execution time measurement count
00245   #                           (numerical, number of times)
00246   # - measurement.period_time: Task period time measurement (enable/disable)
00247   # - measurement.period_count: Task period time measurement count 
00248   #                             (number, count)
00249   #
00250   # @param property Property objects that includes the control information
00251   #                 of this Publisher
00252   # @return ReturnCode PORT_OK normal return
00253   #                    INVALID_ARGS Properties with invalid values.
00254   # @endif
00255   #
00256   # PublisherBase::ReturnCode PublisherNew::init(coil::Properties& prop)
00257   def init(self, prop):
00258     self._rtcout.RTC_TRACE("init()")
00259     self.setPushPolicy(prop)
00260     return self.createTask(prop)
00261 
00262   ##
00263   # @if jp
00264   # @brief InPortコンシューマのセット
00265   #
00266   # この関数では、この Publisher に関連付けられるコンシューマをセットする。
00267   # コンシューマオブジェクトがヌルポインタの場合、INVALID_ARGSが返される。
00268   # それ以外の場合は、PORT_OK が返される。
00269   #
00270   # @param consumer Consumer へのポインタ
00271   # @return ReturnCode PORT_OK 正常終了
00272   #                    INVALID_ARGS 引数に不正な値が含まれている
00273   #
00274   # @else
00275   # @brief Store InPort consumer
00276   #
00277   # This operation sets a consumer that is associated with this
00278   # object. If the consumer object is NULL, INVALID_ARGS will be
00279   # returned.
00280   #
00281   # @param consumer A pointer to a consumer object.
00282   # @return ReturnCode PORT_OK normal return
00283   #                    INVALID_ARGS given argument has invalid value
00284   #
00285   # @endif
00286   #
00287   # PublisherBase::ReturnCode PublisherNew::setConsumer(InPortConsumer* consumer)
00288   def setConsumer(self, consumer):
00289     self._rtcout.RTC_TRACE("setConsumer()")
00290     
00291     if not consumer:
00292       self._rtcout.RTC_ERROR("setConsumer(consumer = 0): invalid argument.")
00293       return self.INVALID_ARGS
00294 
00295     self._consumer = consumer
00296     return self.PORT_OK
00297 
00298   ##
00299   # @if jp
00300   # @brief バッファのセット
00301   #
00302   # この関数では、この Publisher に関連付けられるバッファをセットする。
00303   # バッファオブジェクトがヌルポインタの場合、INVALID_ARGSが返される。
00304   # それ以外の場合は、PORT_OK が返される。
00305   #
00306   # @param buffer CDR buffer へのポインタ
00307   # @return ReturnCode PORT_OK 正常終了
00308   #                    INVALID_ARGS 引数に不正な値が含まれている
00309   #
00310   # @else
00311   # @brief Setting buffer pointer
00312   #
00313   # This operation sets a buffer that is associated with this
00314   # object. If the buffer object is NULL, INVALID_ARGS will be
00315   # returned.
00316   #
00317   # @param buffer A pointer to a CDR buffer object.
00318   # @return ReturnCode PORT_OK normal return
00319   #                    INVALID_ARGS given argument has invalid value
00320   #
00321   # @endif
00322   #
00323   # PublisherBase::ReturnCode PublisherNew::setBuffer(CdrBufferBase* buffer)
00324   def setBuffer(self, buffer):
00325     self._rtcout.RTC_TRACE("setBuffer()")
00326 
00327     if not buffer:
00328       self._rtcout.RTC_ERROR("setBuffer(buffer == 0): invalid argument")
00329       return self.INVALID_ARGS
00330 
00331     self._buffer = buffer
00332     return self.PORT_OK
00333 
00334   ##
00335   # @if jp
00336   # @brief リスナを設定する。
00337   #
00338   # Publisher に対してリスナオブジェクト ConnectorListeners を設定する。
00339   # 各種リスナオブジェクトを含む ConnectorListeners をセットすることで、
00340   # バッファの読み書き、データの送信時等にこれらのリスナをコールする。
00341   # ConnectorListeners オブジェクトの所有権はポートまたは RTObject が持ち
00342   # Publisher 削除時に ConnectorListeners は削除されることはない。
00343   # ConnectorListeners がヌルポインタの場合 INVALID_ARGS を返す。
00344   #
00345   # @param info ConnectorProfile をローカル化したオブジェクト ConnectorInfo
00346   # @param listeners リスナを多数保持する ConnectorListeners オブジェクト
00347   # @return PORT_OK      正常終了
00348   #         INVALID_ARGS 不正な引数
00349   # @else
00350   # @brief Set the listener. 
00351   #
00352   # This function sets ConnectorListeners listener object to the
00353   # Publisher. By setting ConnectorListeners containing various
00354   # listeners objects, these listeners are called at the time of
00355   # reading and writing of a buffer, and transmission of data
00356   # etc. Since the ownership of the ConnectorListeners object is
00357   # owned by Port or RTObject, the Publisher never deletes the
00358   # ConnectorListeners object. If the given ConnectorListeners'
00359   # pointer is NULL, this function returns INVALID_ARGS.
00360   #
00361   # @param info ConnectorInfo that is localized object of ConnectorProfile
00362   # @param listeners ConnectorListeners that holds various listeners
00363   # @return PORT_OK      Normal return
00364   #         INVALID_ARGS Invalid arguments
00365   # @endif
00366   #
00367   # virtual ReturnCode setListener(ConnectorInfo& info,
00368   #                                ConnectorListeners* listeners);
00369   def setListener(self, info, listeners):
00370     self._rtcout.RTC_TRACE("setListener()")
00371     
00372     if not listeners:
00373       self._rtcout.RTC_ERROR("setListeners(listeners == 0): invalid argument")
00374       return self.INVALID_ARGS
00375 
00376     self._profile = info
00377     self._listeners = listeners
00378 
00379     return self.PORT_OK
00380 
00381   ##
00382   # @if jp
00383   # @brief データを書き込む
00384   #
00385   # Publisher が保持するバッファに対してデータを書き込む。コンシュー
00386   # マ、バッファ、リスナ等が適切に設定されていない等、Publisher オブ
00387   # ジェクトが正しく初期化されていない場合、この関数を呼び出すとエラー
00388   # コード PRECONDITION_NOT_MET が返され、バッファへの書き込み等の操
00389   # 作は一切行われない。
00390   #
00391   # バッファへの書き込みと、InPortへのデータの送信は非同期的に行われ
00392   # るため、この関数は、InPortへのデータ送信の結果を示す、
00393   # CONNECTION_LOST, BUFFER_FULL などのリターンコードを返すことがあ
00394   # る。この場合、データのバッファへの書き込みは行われない。
00395   #
00396   # バッファへの書き込みに対して、バッファがフル状態、バッファのエ
00397   # ラー、バッファへの書き込みがタイムアウトした場合、バッファの事前
00398   # 条件が満たされない場合にはそれぞれ、エラーコード BUFFER_FULL,
00399   # BUFFER_ERROR, BUFFER_TIMEOUT, PRECONDITION_NOT_MET が返される。
00400   #
00401   # これら以外のエラーの場合、PORT_ERROR が返される。
00402   # 
00403   #
00404   # @param data 書き込むデータ 
00405   # @param sec タイムアウト時間
00406   # @param nsec タイムアウト時間
00407   #
00408   # @return PORT_OK             正常終了
00409   #         PRECONDITION_NO_MET consumer, buffer, listener等が適切に設定
00410   #                             されていない等、このオブジェクトの事前条件
00411   #                             を満たさない場合。
00412   #         CONNECTION_LOST     接続が切断されたことを検知した。
00413   #         BUFFER_FULL         バッファがフル状態である。
00414   #         BUFFER_ERROR        バッファに何らかのエラーが生じた場合。
00415   #         NOT_SUPPORTED       サポートされない操作が行われた。
00416   #         TIMEOUT             タイムアウトした。
00417   #
00418   # @else
00419   # @brief Write data 
00420   #
00421   # This function writes data into the buffer associated with this
00422   # Publisher.  If a Publisher object calls this function, without
00423   # initializing correctly such as a consumer, a buffer, listeners,
00424   # etc., error code PRECONDITION_NOT_MET will be returned and no
00425   # operation of the writing to a buffer etc. will be performed.
00426   #
00427   # Since writing into the buffer and sending data to InPort are
00428   # performed asynchronously, occasionally this function returns
00429   # return-codes such as CONNECTION_LOST and BUFFER_FULL that
00430   # indicate the result of sending data to InPort. In this case,
00431   # writing data into buffer will not be performed.
00432   #
00433   # When publisher writes data to the buffer, if the buffer is
00434   # filled, returns error, is returned with timeout and returns
00435   # precondition error, error codes BUFFER_FULL, BUFFER_ERROR,
00436   # BUFFER_TIMEOUT and PRECONDITION_NOT_MET will be returned
00437   # respectively.
00438   #
00439   # In other cases, PROT_ERROR will be returned.
00440   #
00441   # @param data Data to be wrote to the buffer
00442   # @param sec Timeout time in unit seconds
00443   # @param nsec Timeout time in unit nano-seconds
00444   # @return PORT_OK             Normal return
00445   #         PRECONDITION_NO_MET Precondition does not met. A consumer,
00446   #                             a buffer, listenes are not set properly.
00447   #         CONNECTION_LOST     detected that the connection has been lost
00448   #         BUFFER_FULL         The buffer is full status.
00449   #         BUFFER_ERROR        Some kind of error occurred in the buffer.
00450   #         NOT_SUPPORTED       Some kind of operation that is not supported
00451   #                             has been performed.
00452   #         TIMEOUT             Timeout occurred when writing to the buffer.
00453   #
00454   # @endif
00455   #
00456   # PublisherBase::ReturnCode PublisherNew::write(const cdrMemoryStream& data,
00457   #                                               unsigned long sec,
00458   #                                               unsigned long usec)
00459   def write(self, data, sec, usec):
00460     self._rtcout.RTC_PARANOID("write()")
00461     
00462     if not self._consumer or not self._buffer or not self._listeners:
00463       return self.PRECONDITION_NOT_MET
00464 
00465     if self._retcode == self.CONNECTION_LOST:
00466       self._rtcout.RTC_DEBUG("write(): connection lost.")
00467       return self._retcode
00468 
00469     if self._retcode == self.SEND_FULL:
00470       self._rtcout.RTC_DEBUG("write(): InPort buffer is full.")
00471       ret = self._buffer.write(data, sec, usec)
00472       self._task.signal()
00473       return self.BUFFER_FULL
00474 
00475     # why?
00476     assert(self._buffer != 0)
00477 
00478     self.onBufferWrite(data)
00479     ret = self._buffer.write(data, sec, usec)
00480 
00481     self._task.signal()
00482     self._rtcout.RTC_DEBUG("%s = write()", OpenRTM_aist.DataPortStatus.toString(ret))
00483 
00484     return self.convertReturn(ret, data)
00485 
00486   ##
00487   # @if jp
00488   #
00489   # @brief アクティブ化確認
00490   # 
00491   # Publisher はデータポートと同期して activate/deactivate される。
00492   # activate() / deactivate() 関数によって、アクティブ状態と非アクティ
00493   # ブ状態が切り替わる。この関数により、現在アクティブ状態か、非アク
00494   # ティブ状態かを確認することができる。
00495   #
00496   # @return 状態確認結果(アクティブ状態:true、非アクティブ状態:false)
00497   #
00498   # @else
00499   #
00500   # @brief If publisher is active state
00501   # 
00502   # A Publisher can be activated/deactivated synchronized with the
00503   # data port.  The active state and the non-active state are made
00504   # transition by the "activate()" and the "deactivate()" functions
00505   # respectively. This function confirms if the publisher is in
00506   # active state.
00507   #
00508   # @return Result of state confirmation
00509   #         (Active state:true, Inactive state:false)
00510   #
00511   # @endif
00512   #
00513   # bool PublisherNew::isActive()
00514   def isActive(self):
00515     return self._active
00516 
00517   ##
00518   # @if jp
00519   # @brief アクティブ化する
00520   #
00521   # Publisher をアクティブ化する。この関数を呼び出すことにより、
00522   # Publisherが持つ、データを送信するスレッドが動作を開始する。初期
00523   # 化が行われていないなどにより、事前条件を満たさない場合、エラーコー
00524   # ド PRECONDITION_NOT_MET を返す。
00525   #
00526   # @return PORT_OK 正常終了
00527   #         PRECONDITION_NOT_MET 事前条件を満たさない
00528   #
00529   # @else
00530   # @brief activation
00531   #
00532   # This function activates the publisher. By calling this
00533   # function, this publisher starts the thread that pushes data to
00534   # InPort. If precondition such as initialization process and so
00535   # on is not met, the error code PRECONDITION_NOT_MET is returned.
00536   #
00537   # @return PORT_OK normal return
00538   #         PRECONDITION_NOT_MET precondition is not met
00539   #
00540   # @endif
00541   #
00542   # PublisherBase::ReturnCode PublisherNew::activate()
00543   def activate(self):
00544     self._active = True
00545     return self.PORT_OK
00546 
00547   ##
00548   # @if jp
00549   # @brief 非アクティブ化する
00550   #
00551   # Publisher を非アクティブ化する。この関数を呼び出すことにより、
00552   # Publisherが持つ、データを送信するスレッドが動作を停止する。初期
00553   # 化が行われていないなどにより、事前条件を満たさない場合、エラーコー
00554   # ド PRECONDITION_NOT_MET を返す。
00555   #
00556   # @return PORT_OK 正常終了
00557   #         PRECONDITION_NOT_MET 事前条件を満たさない
00558   #
00559   # @else
00560   # @brief deactivation
00561   #
00562   # This function deactivates the publisher. By calling this
00563   # function, this publisher stops the thread that pushes data to
00564   # InPort. If precondition such as initialization process and so
00565   # on is not met, the error code PRECONDITION_NOT_MET is returned.
00566   #
00567   # @return PORT_OK normal return
00568   #         PRECONDITION_NOT_MET precondition is not met
00569   #
00570   # @endif
00571   #
00572   # PublisherBase::ReturnCode PublisherNew::deactivate()
00573   def deactivate(self):
00574     self._active = False;
00575     return self.PORT_OK
00576 
00577   ##
00578   # @if jp
00579   # @brief スレッド実行関数
00580   #
00581   # coil::PeriodicTask により周期実行されるタスク実行関数。
00582   #
00583   # @else
00584   # @brief Thread execution function
00585   #
00586   # A task execution function to be executed by coil::PeriodicTask.
00587   #
00588   # @endif
00589   #
00590   # int PublisherNew::svc(void)
00591   def svc(self):
00592     guard = OpenRTM_aist.ScopedLock(self._retmutex)
00593 
00594     if self._pushPolicy == self.ALL:
00595       self._retcode = self.pushAll()
00596       return 0
00597     elif self._pushPolicy == self.FIFO:
00598       self._retcode = self.pushFifo()
00599       return 0
00600     elif self._pushPolicy == self.SKIP:
00601       self._retcode = self.pushSkip()
00602       return 0
00603     elif self._pushPolicy == self.NEW:
00604       self._retcode = self.pushNew()
00605       return 0
00606     else:
00607       self._retcode = self.pushNew()
00608 
00609     return 0
00610 
00611   ##
00612   # @brief push all policy
00613   #
00614   # PublisherNew::ReturnCode PublisherNew::pushAll()
00615   def pushAll(self):
00616     self._rtcout.RTC_TRACE("pushAll()")
00617     try:
00618 
00619       while self._buffer.readable() > 0:
00620         cdr = self._buffer.get()
00621         self.onBufferRead(cdr)
00622 
00623         self.onSend(cdr)
00624         ret = self._consumer.put(cdr)
00625             
00626         if ret != self.PORT_OK:
00627           self._rtcout.RTC_DEBUG("%s = consumer.put()", OpenRTM_aist.DataPortStatus.toString(ret))
00628           return self.invokeListener(ret, cdr)
00629         self.onReceived(cdr)
00630 
00631         self._buffer.advanceRptr()
00632 
00633       return self.PORT_OK
00634     except:
00635       self._rtcout.RTC_ERROR(OpenRTM_aist.Logger.print_exception())
00636       return self.CONNECTION_LOST
00637 
00638     return self.PORT_ERROR
00639 
00640   ##
00641   # @brief push "fifo" policy
00642   #
00643   # PublisherNew::ReturnCode PublisherNew::pushFifo()
00644   def pushFifo(self):
00645     self._rtcout.RTC_TRACE("pushFifo()")
00646 
00647     try:
00648       cdr = self._buffer.get()
00649       self.onBufferRead(cdr)
00650 
00651       self.onSend(cdr)
00652       ret = self._consumer.put(cdr)
00653 
00654       if ret != self.PORT_OK:
00655         self._rtcout.RTC_DEBUG("%s = consumer.put()", OpenRTM_aist.DataPortStatus.toString(ret))
00656         return self.invokeListener(ret, cdr)
00657       self.onReceived(cdr)
00658         
00659       self._buffer.advanceRptr()
00660         
00661       return self.PORT_OK
00662     except:
00663       self._rtcout.RTC_ERROR(OpenRTM_aist.Logger.print_exception())
00664       return self.CONNECTION_LOST
00665 
00666     return self.PORT_ERROR
00667 
00668   ##
00669   # @brief push "skip" policy
00670   #
00671   # PublisherNew::ReturnCode PublisherNew::pushSkip()
00672   def pushSkip(self):
00673     self._rtcout.RTC_TRACE("pushSkip()")
00674     try:
00675       ret = self.PORT_OK
00676       preskip = self._buffer.readable() + self._leftskip
00677       loopcnt = preskip/(self._skipn+1)
00678       postskip = self._skipn - self._leftskip
00679 
00680       for i in range(loopcnt):
00681         self._buffer.advanceRptr(postskip)
00682         cdr = self._buffer.get()
00683         self.onBufferRead(cdr)
00684 
00685         self.onSend(cdr)
00686         ret = self._consumer.put(cdr)
00687         if ret != self.PORT_OK:
00688           self._buffer.advanceRptr(-postskip)
00689           self._rtcout.RTC_DEBUG("%s = consumer.put()", OpenRTM_aist.DataPortStatus.toString(ret))
00690           return self.invokeListener(ret, cdr)
00691 
00692         self.onReceived(cdr)
00693         postskip = self._skipn + 1
00694 
00695       self._buffer.advanceRptr(self._buffer.readable())
00696 
00697       if loopcnt == 0:
00698         # Not put
00699         self._leftskip = preskip % (self._skipn + 1)
00700       else:
00701         if self._retcode != self.PORT_OK:
00702           # put Error after
00703           self._leftskip = 0
00704         else:
00705           # put OK after
00706           self._leftskip = preskip % (self._skipn + 1)
00707 
00708       return ret
00709 
00710     except:
00711       self._rtcout.RTC_ERROR(OpenRTM_aist.Logger.print_exception())
00712       return self.CONNECTION_LOST
00713 
00714     return self.PORT_ERROR
00715 
00716   ##
00717   # @brief push "new" policy
00718   #
00719   # PublisherNew::ReturnCode PublisherNew::pushNew()
00720   def pushNew(self):
00721     self._rtcout.RTC_TRACE("pushNew()")
00722     try:
00723       self._buffer.advanceRptr(self._buffer.readable() - 1)
00724         
00725       cdr = self._buffer.get()
00726       self.onBufferRead(cdr)
00727 
00728       self.onSend(cdr)
00729       ret = self._consumer.put(cdr)
00730 
00731       if ret != self.PORT_OK:
00732         self._rtcout.RTC_DEBUG("%s = consumer.put()", OpenRTM_aist.DataPortStatus.toString(ret))
00733         return self.invokeListener(ret, cdr)
00734 
00735       self.onReceived(cdr)
00736       self._buffer.advanceRptr()
00737 
00738       return self.PORT_OK
00739 
00740     except:
00741       self._rtcout.RTC_ERROR(OpenRTM_aist.Logger.print_exception())
00742       return self.CONNECTION_LOST
00743 
00744     return self.PORT_ERROR
00745 
00746   ##
00747   # @if jp
00748   # @brief BufferStatus から DataPortStatus への変換
00749   #
00750   # バッファからの戻り値を DataPortStatus::Enum 型へ変換する関数。そ
00751   # れぞれ、以下のように変換される。変換時にコールバックを呼ぶ場合、
00752   # コールバク関数も付記する。
00753   # 
00754   # - BUFFER_OK: PORT_OK
00755   #  - None
00756   # - BUFFER_ERROR: BUFFER_ERROR
00757   #  - None
00758   # - BUFFER_FULL: BUFFER_FULL
00759   #  - onBufferFull()
00760   # - NOT_SUPPORTED: PORT_ERROR
00761   #  - None
00762   # - TIMEOUT: BUFFER_TIMEOUT
00763   #  - onBufferWriteTimeout()
00764   # - PRECONDITION_NOT_MET: PRECONDITION_NOT_MET
00765   #  - None
00766   # - other: PORT_ERROR
00767   #  - None
00768   #
00769   # @param status BufferStatus
00770   # @param data cdrMemoryStream
00771   # @return DataPortStatu 型のリターンコード
00772   #
00773   # @else
00774   # @brief Convertion from BufferStatus to DataPortStatus
00775   # 
00776   # This function converts return value from the buffer to
00777   # DataPortStatus::Enum typed return value. The conversion rule is
00778   # as follows. Callback functions are also shown, if it exists.
00779   # 
00780   # - BUFFER_OK: PORT_OK
00781   #  - None
00782   # - BUFFER_ERROR: BUFFER_ERROR
00783   #  - None
00784   # - BUFFER_FULL: BUFFER_FULL
00785   #  - onBufferFull()
00786   # - NOT_SUPPORTED: PORT_ERROR
00787   #  - None
00788   # - TIMEOUT: BUFFER_TIMEOUT
00789   #  - onBufferWriteTimeout()
00790   # - PRECONDITION_NOT_MET: PRECONDITION_NOT_MET
00791   #  - None
00792   # - other: PORT_ERROR
00793   #  - None
00794   #
00795   # @param status BufferStatus
00796   # @param data cdrMemoryStream
00797   # @return DataPortStatus typed return code
00798   #
00799   # @endif
00800   #
00801   # PublisherBase::ReturnCode
00802   # PublisherNew::convertReturn(BufferStatus::Enum status,
00803   #                             const cdrMemoryStream& data)
00804   def convertReturn(self, status, data):
00805     ##
00806     # BufferStatus -> DataPortStatus
00807     #
00808     # BUFFER_OK     -> PORT_OK
00809     # BUFFER_ERROR  -> BUFFER_ERROR
00810     # BUFFER_FULL   -> BUFFER_FULL
00811     # NOT_SUPPORTED -> PORT_ERROR
00812     # TIMEOUT       -> BUFFER_TIMEOUT
00813     # PRECONDITION_NOT_MET -> PRECONDITION_NOT_MET
00814     ##
00815     if status == OpenRTM_aist.BufferStatus.BUFFER_OK:
00816       return self.PORT_OK
00817     
00818     elif status == OpenRTM_aist.BufferStatus.BUFFER_ERROR:
00819       return self.BUFFER_ERROR
00820 
00821     elif status == OpenRTM_aist.BufferStatus.BUFFER_FULL:
00822       self.onBufferFull(data)
00823       return self.BUFFER_FULL
00824 
00825     elif status == OpenRTM_aist.BufferStatus.NOT_SUPPORTED:
00826       return self.PORT_ERROR
00827 
00828     elif status == OpenRTM_aist.BufferStatus.TIMEOUT:
00829       self.onBufferWriteTimeout(data)
00830       return self.BUFFER_TIMEOUT
00831 
00832     elif status == OpenRTM_aist.BufferStatus.PRECONDITION_NOT_MET:
00833       return self.PRECONDITION_NOT_MET
00834 
00835     else:
00836       return self.PORT_ERROR
00837 
00838   ##
00839   # @if jp
00840   # @brief DataPortStatusに従ってリスナへ通知する関数を呼び出す。
00841   #
00842   # @param status DataPortStatus
00843   # @param data cdrMemoryStream
00844   # @return リターンコード
00845   #
00846   # @else
00847   # @brief Call listeners according to the DataPortStatus
00848   #
00849   # @param status DataPortStatus
00850   # @param data cdrMemoryStream
00851   # @return Return code
00852   #
00853   # @endif
00854   #
00855   # PublisherNew::ReturnCode
00856   # PublisherNew::invokeListener(DataPortStatus::Enum status,
00857   #                              const cdrMemoryStream& data)
00858   def invokeListener(self, status, data):
00859     # ret:
00860     # PORT_OK, PORT_ERROR, SEND_FULL, SEND_TIMEOUT, CONNECTION_LOST,
00861     # UNKNOWN_ERROR
00862     if status == self.PORT_ERROR:
00863       self.onReceiverError(data)
00864       return self.PORT_ERROR
00865         
00866     elif status == self.SEND_FULL:
00867       self.onReceiverFull(data)
00868       return self.SEND_FULL
00869         
00870     elif status == self.SEND_TIMEOUT:
00871       self.onReceiverTimeout(data)
00872       return self.SEND_TIMEOUT
00873         
00874     elif status == self.CONNECTION_LOST:
00875       self.onReceiverError(data)
00876       return self.CONNECTION_LOST
00877         
00878     elif status == self.UNKNOWN_ERROR:
00879       self.onReceiverError(data)
00880       return self.UNKNOWN_ERROR
00881         
00882     else:
00883       self.onReceiverError(data)
00884       return self.PORT_ERROR
00885 
00886   ##
00887   # @if jp
00888   # @brief ON_BUFFER_WRITEのリスナへ通知する。 
00889   # @param data cdrMemoryStream
00890   # @else
00891   # @brief Notify an ON_BUFFER_WRITE event to listeners
00892   # @param data cdrMemoryStream
00893   # @endif
00894   #
00895   # inline void onBufferWrite(const cdrMemoryStream& data)
00896   def onBufferWrite(self, data):
00897     if self._listeners is not None and self._profile is not None:
00898       self._listeners.connectorData_[OpenRTM_aist.ConnectorDataListenerType.ON_BUFFER_WRITE].notify(self._profile, data)
00899     return
00900 
00901   ##
00902   # @if jp
00903   # @brief ON_BUFFER_FULLリスナへイベントを通知する。 
00904   # @param data cdrMemoryStream
00905   # @else
00906   # @brief Notify an ON_BUFFER_FULL event to listeners
00907   # @param data cdrMemoryStream
00908   # @endif
00909   #
00910   # inline void onBufferFull(const cdrMemoryStream& data)
00911   def onBufferFull(self, data):
00912     if self._listeners is not None and self._profile is not None:
00913       self._listeners.connectorData_[OpenRTM_aist.ConnectorDataListenerType.ON_BUFFER_FULL].notify(self._profile, data)
00914     return
00915 
00916   ##
00917   # @if jp
00918   # @brief ON_BUFFER_WRITE_TIMEOUTのリスナへ通知する。 
00919   # @param data cdrMemoryStream
00920   # @else
00921   # @brief Notify an ON_BUFFER_WRITE_TIMEOUT event to listeners
00922   # @param data cdrMemoryStream
00923   # @endif
00924   #
00925   # inline void onBufferWriteTimeout(const cdrMemoryStream& data)
00926   def onBufferWriteTimeout(self, data):
00927     if self._listeners is not None and self._profile is not None:
00928       self._listeners.connectorData_[OpenRTM_aist.ConnectorDataListenerType.ON_BUFFER_WRITE_TIMEOUT].notify(self._profile, data)
00929     return
00930 
00931   ##
00932   # @if jp
00933   # @brief ON_BUFFER_OVERWRITEのリスナへ通知する。 
00934   # @param data cdrMemoryStream
00935   # @else
00936   # @brief Notify an ON_BUFFER_OVERWRITE event to listeners
00937   # @param data cdrMemoryStream
00938   # @endif
00939   #
00940   # inline void onBufferWriteOverwrite(const cdrMemoryStream& data)
00941   def onBufferWriteOverwrite(self, data):
00942     if self._listeners is not None and self._profile is not None:
00943       self._listeners.connectorData_[OpenRTM_aist.ConnectorDataListenerType.ON_BUFFER_OVERWRITE].notify(self._profile, data)
00944     return
00945 
00946   ##
00947   # @if jp
00948   # @brief ON_BUFFER_READのリスナへ通知する。 
00949   # @param data cdrMemoryStream
00950   # @else
00951   # @brief Notify an ON_BUFFER_READ event to listeners
00952   # @param data cdrMemoryStream
00953   # @endif
00954   #
00955   # inline void onBufferRead(const cdrMemoryStream& data)
00956   def onBufferRead(self, data):
00957     if self._listeners is not None and self._profile is not None:
00958       self._listeners.connectorData_[OpenRTM_aist.ConnectorDataListenerType.ON_BUFFER_READ].notify(self._profile, data)
00959     return
00960 
00961   ##
00962   # @if jp
00963   # @brief ON_SENDのリスナへ通知する。 
00964   # @param data cdrMemoryStream
00965   # @else
00966   # @brief Notify an ON_SEND event to listners
00967   # @param data cdrMemoryStream
00968   # @endif
00969   #
00970   #inline void onSend(const cdrMemoryStream& data)
00971   def onSend(self, data):
00972     if self._listeners is not None and self._profile is not None:
00973       self._listeners.connectorData_[OpenRTM_aist.ConnectorDataListenerType.ON_SEND].notify(self._profile, data)
00974     return
00975 
00976   ##
00977   # @if jp
00978   # @brief ON_RECEIVEDのリスナへ通知する。 
00979   # @param data cdrMemoryStream
00980   # @else
00981   # @brief Notify an ON_RECEIVED event to listeners
00982   # @param data cdrMemoryStream
00983   # @endif
00984   #
00985   # inline void onReceived(const cdrMemoryStream& data)
00986   def onReceived(self, data):
00987     if self._listeners is not None and self._profile is not None:
00988       self._listeners.connectorData_[OpenRTM_aist.ConnectorDataListenerType.ON_RECEIVED].notify(self._profile, data)
00989     return
00990 
00991   ##
00992   # @if jp
00993   # @brief ON_RECEIVER_FULLのリスナへ通知する。 
00994   # @param data cdrMemoryStream
00995   # @else
00996   # @brief Notify an ON_RECEIVER_FULL event to listeners
00997   # @param data cdrMemoryStream
00998   # @endif
00999   #
01000   # inline void onReceiverFull(const cdrMemoryStream& data)
01001   def onReceiverFull(self, data):
01002     if self._listeners is not None and self._profile is not None:
01003       self._listeners.connectorData_[OpenRTM_aist.ConnectorDataListenerType.ON_RECEIVER_FULL].notify(self._profile, data)
01004     return
01005 
01006   ##
01007   # @if jp
01008   # @brief ON_RECEIVER_TIMEOUTのリスナへ通知する。 
01009   # @param data cdrMemoryStream
01010   # @else
01011   # @brief Notify an ON_RECEIVER_TIMEOUT event to listeners
01012   # @param data cdrMemoryStream
01013   # @endif
01014   #
01015   # inline void onReceiverTimeout(const cdrMemoryStream& data)
01016   def onReceiverTimeout(self, data):
01017     if self._listeners is not None and self._profile is not None:
01018       self._listeners.connectorData_[OpenRTM_aist.ConnectorDataListenerType.ON_RECEIVER_TIMEOUT].notify(self._profile, data)
01019     return
01020 
01021   ##
01022   # @if jp
01023   # @brief ON_RECEIVER_ERRORのリスナへ通知する。 
01024   # @param data cdrMemoryStream
01025   # @else
01026   # @brief Notify an ON_RECEIVER_ERROR event to listeners
01027   # @param data cdrMemoryStream
01028   # @endif
01029   #
01030   # inline void onReceiverError(const cdrMemoryStream& data)
01031   def onReceiverError(self, data):
01032     if self._listeners is not None and self._profile is not None:
01033       self._listeners.connectorData_[OpenRTM_aist.ConnectorDataListenerType.ON_RECEIVER_ERROR].notify(self._profile, data)
01034     return
01035 
01036   ##
01037   # @if jp
01038   # @brief ON_SENDER_ERRORのリスナへ通知する。 
01039   # @param data cdrMemoryStream
01040   # @else
01041   # @brief Notify an ON_SENDER_ERROR event to listeners
01042   # @param data cdrMemoryStream
01043   # @endif
01044   #
01045   # inline void onSenderError()
01046   def onSenderError(self):
01047     if self._listeners is not None and self._profile is not None:
01048       self._listeners.connector_[OpenRTM_aist.ConnectorListenerType.ON_SENDER_ERROR].notify(self._profile)
01049     return
01050 
01051 
01052 
01053 def PublisherNewInit():
01054   OpenRTM_aist.PublisherFactory.instance().addFactory("new",
01055                                                       OpenRTM_aist.PublisherNew,
01056                                                       OpenRTM_aist.Delete)


openrtm_aist_python
Author(s): Shinji Kurihara
autogenerated on Thu Aug 27 2015 14:17:28