00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018 import threading
00019 from omniORB import any
00020
00021 import OpenRTM_aist
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035
00036
00037
00038
00039
00040
00041
00042
00043
00044
00045 class PublisherPeriodic(OpenRTM_aist.PublisherBase):
00046 """
00047 """
00048
00049
00050 ALL = 0
00051 FIFO = 1
00052 SKIP = 2
00053 NEW = 3
00054
00055
00056
00057
00058
00059
00060
00061
00062
00063
00064
00065
00066
00067
00068
00069
00070
00071
00072 def __init__(self):
00073 self._rtcout = OpenRTM_aist.Manager.instance().getLogbuf("PublisherPeriodic")
00074 self._consumer = None
00075 self._buffer = None
00076 self._task = None
00077 self._retcode = self.PORT_OK
00078 self._retmutex = threading.RLock()
00079 self._pushPolicy = self.NEW
00080 self._skipn = 0
00081 self._active = False
00082 self._readback = False
00083 self._leftskip = 0
00084 self._profile = None
00085 self._listeners = None
00086
00087 return
00088
00089
00090
00091
00092
00093
00094
00095
00096
00097
00098
00099
00100 def __del__(self):
00101 self._rtcout.RTC_TRACE("~PublisherPeriodic()")
00102 if self._task:
00103 self._task.resume()
00104 self._task.finalize()
00105 self._rtcout.RTC_PARANOID("task finalized.")
00106
00107 OpenRTM_aist.PeriodicTaskFactory.instance().deleteObject(self._task)
00108 del self._task
00109 self._rtcout.RTC_PARANOID("task deleted.")
00110
00111
00112 self._consumer = None
00113
00114 self._buffer = None
00115 return
00116
00117
00118
00119
00120
00121
00122
00123
00124
00125 def setPushPolicy(self, prop):
00126 push_policy = prop.getProperty("publisher.push_policy","new")
00127 self._rtcout.RTC_DEBUG("push_policy: %s", push_policy)
00128
00129 push_policy = OpenRTM_aist.normalize([push_policy])
00130
00131 if push_policy == "all":
00132 self._pushPolicy = self.ALL
00133
00134 elif push_policy == "fifo":
00135 self._pushPolicy = self.FIFO
00136
00137 elif push_policy == "skip":
00138 self._pushPolicy = self.SKIP
00139
00140 elif push_policy == "new":
00141 self._pushPolicy = self.NEW
00142
00143 else:
00144 self._rtcout.RTC_ERROR("invalid push_policy value: %s", push_policy)
00145 self._pushPolicy = self.NEW
00146
00147 skip_count = prop.getProperty("publisher.skip_count","0")
00148 self._rtcout.RTC_DEBUG("skip_count: %s", skip_count)
00149
00150 skipn = [self._skipn]
00151 ret = OpenRTM_aist.stringTo(skipn, skip_count)
00152 if ret:
00153 self._skipn = skipn[0]
00154 else:
00155 self._rtcout.RTC_ERROR("invalid skip_count value: %s", skip_count)
00156 self._skipn = 0
00157
00158 if self._skipn < 0:
00159 self._rtcout.RTC_ERROR("invalid skip_count value: %d", self._skipn)
00160 self._skipn = 0
00161
00162 return
00163
00164
00165
00166
00167
00168
00169
00170
00171
00172 def createTask(self, prop):
00173 factory = OpenRTM_aist.PeriodicTaskFactory.instance()
00174
00175 th = factory.getIdentifiers()
00176 self._rtcout.RTC_DEBUG("available task types: %s", OpenRTM_aist.flatten(th))
00177
00178 self._task = factory.createObject(prop.getProperty("thread_type", "default"))
00179 if not self._task:
00180 self._rtcout.RTC_ERROR("Task creation failed: %s",
00181 prop.getProperty("thread_type", "default"))
00182 return self.INVALID_ARGS
00183
00184 self._rtcout.RTC_PARANOID("Task creation succeeded.")
00185
00186
00187 self._task.setTask(self.svc)
00188
00189
00190 rate = prop.getProperty("publisher.push_rate")
00191
00192 if rate != "":
00193 hz = float(rate)
00194 if hz == 0:
00195 hz = 1000.0
00196 self._rtcout.RTC_DEBUG("Task period %f [Hz]", hz)
00197 else:
00198 hz = 1000.0
00199
00200 self._task.setPeriod(1.0/hz)
00201
00202
00203 mprop = prop.getNode("measurement")
00204
00205 self._task.executionMeasure(OpenRTM_aist.toBool(mprop.getProperty("exec_time"),
00206 "enable", "disable", True))
00207
00208 ecount = [0]
00209 if OpenRTM_aist.stringTo(ecount, mprop.getProperty("exec_count")):
00210 self._task.executionMeasureCount(ecount[0])
00211
00212 self._task.periodicMeasure(OpenRTM_aist.toBool(mprop.getProperty("period_time"),
00213 "enable", "disable", True))
00214
00215 pcount = [0]
00216 if OpenRTM_aist.stringTo(pcount, mprop.getProperty("period_count")):
00217 self._task.periodicMeasureCount(pcount[0])
00218
00219
00220 self._task.suspend()
00221 self._task.activate()
00222 self._task.suspend()
00223
00224 return self.PORT_OK
00225
00226
00227
00228
00229
00230
00231
00232
00233
00234
00235
00236
00237
00238
00239
00240
00241
00242
00243
00244
00245
00246
00247
00248
00249
00250
00251
00252
00253
00254
00255
00256
00257
00258
00259
00260
00261
00262
00263
00264
00265
00266
00267
00268
00269
00270
00271
00272
00273
00274
00275
00276
00277
00278
00279
00280
00281
00282
00283
00284
00285
00286
00287
00288 def init(self, prop):
00289 self._rtcout.RTC_TRACE("init()")
00290 self.setPushPolicy(prop)
00291 return self.createTask(prop)
00292
00293
00294
00295
00296
00297
00298
00299
00300
00301
00302
00303
00304
00305
00306
00307
00308
00309
00310
00311
00312
00313
00314
00315
00316
00317
00318
00319
00320 def setConsumer(self, consumer):
00321 self._rtcout.RTC_TRACE("setConsumer()")
00322
00323 if not consumer:
00324 self._rtcout.RTC_ERROR("setConsumer(consumer = 0): invalid argument.")
00325 return self.INVALID_ARGS
00326
00327 self._consumer = consumer
00328 return self.PORT_OK
00329
00330
00331
00332
00333
00334
00335
00336
00337
00338
00339
00340
00341
00342
00343
00344
00345
00346
00347
00348
00349
00350
00351
00352
00353
00354
00355
00356 def setBuffer(self, buffer):
00357 self._rtcout.RTC_TRACE("setBuffer()")
00358
00359 if not buffer:
00360 self._rtcout.RTC_ERROR("setBuffer(buffer == 0): invalid argument")
00361 return self.INVALID_ARGS
00362
00363 self._buffer = buffer
00364 return self.PORT_OK
00365
00366
00367
00368
00369
00370
00371
00372
00373
00374
00375
00376
00377
00378
00379
00380
00381
00382
00383
00384
00385
00386
00387
00388
00389
00390
00391
00392
00393
00394
00395
00396
00397
00398
00399
00400
00401
00402 def setListener(self, info, listeners):
00403 self._rtcout.RTC_TRACE("setListeners()")
00404
00405 if not listeners:
00406 self._rtcout.RTC_ERROR("setListeners(listeners == 0): invalid argument")
00407 return self.INVALID_ARGS
00408
00409 self._profile = info
00410 self._listeners = listeners
00411 return self.PORT_OK
00412
00413
00414
00415
00416
00417
00418
00419
00420
00421
00422
00423
00424
00425
00426
00427
00428
00429
00430
00431
00432
00433
00434
00435
00436
00437
00438
00439
00440
00441
00442
00443
00444
00445
00446
00447
00448
00449
00450
00451
00452
00453
00454
00455
00456
00457
00458
00459
00460
00461
00462
00463
00464
00465
00466
00467
00468
00469
00470
00471
00472
00473
00474
00475
00476
00477
00478
00479
00480
00481
00482
00483
00484
00485
00486
00487
00488
00489
00490
00491
00492 def write(self, data, sec, usec):
00493 self._rtcout.RTC_PARANOID("write()")
00494
00495 if not self._consumer or not self._buffer or not self._listeners:
00496 return self.PRECONDITION_NOT_MET
00497
00498 if self._retcode == self.CONNECTION_LOST:
00499 self._rtcout.RTC_DEBUG("write(): connection lost.")
00500 return self._retcode
00501
00502 if self._retcode == self.SEND_FULL:
00503 self._rtcout.RTC_DEBUG("write(): InPort buffer is full.")
00504 self._buffer.write(data,sec,usec)
00505 return self.BUFFER_FULL
00506
00507 self.onBufferWrite(data)
00508 ret = self._buffer.write(data, sec, usec)
00509 self._rtcout.RTC_DEBUG("%s = write()", OpenRTM_aist.DataPortStatus.toString(ret))
00510 self._task.resume()
00511 return self.convertReturn(ret, data)
00512
00513
00514
00515
00516
00517
00518
00519
00520
00521
00522
00523
00524
00525
00526
00527
00528
00529
00530
00531
00532
00533
00534
00535
00536
00537
00538
00539
00540
00541 def isActive(self):
00542 return self._active
00543
00544
00545
00546
00547
00548
00549
00550
00551
00552
00553
00554
00555
00556
00557
00558
00559
00560
00561
00562
00563
00564
00565
00566
00567
00568
00569
00570 def activate(self):
00571 if not self._task or not self._buffer:
00572 return self.PRECONDITION_NOT_MET
00573 self._active = True
00574 self._task.resume()
00575 return self.PORT_OK
00576
00577
00578
00579
00580
00581
00582
00583
00584
00585
00586
00587
00588
00589
00590
00591
00592
00593
00594
00595
00596
00597
00598
00599
00600
00601
00602
00603 def deactivate(self):
00604 if not self._task:
00605 return self.PRECONDITION_NOT_MET
00606 self._active = False
00607 self._task.suspend()
00608 return self.PORT_OK
00609
00610
00611
00612
00613
00614
00615
00616
00617
00618
00619 def svc(self):
00620 guard = OpenRTM_aist.ScopedLock(self._retmutex)
00621
00622 if self._pushPolicy == self.ALL:
00623 self._retcode = self.pushAll()
00624 return 0
00625
00626 elif self._pushPolicy == self.FIFO:
00627 self._retcode = self.pushFifo()
00628 return 0
00629
00630 elif self._pushPolicy == self.SKIP:
00631 self._retcode = self.pushSkip()
00632 return 0
00633
00634 elif self._pushPolicy == self.NEW:
00635 self._retcode = self.pushNew()
00636 return 0
00637
00638 else:
00639 self._retcode = self.pushNew()
00640
00641 return 0
00642
00643
00644
00645
00646
00647 def pushAll(self):
00648 self._rtcout.RTC_TRACE("pushAll()")
00649
00650 if not self._buffer:
00651 return self.PRECONDITION_NOT_MET
00652
00653 if self.bufferIsEmpty():
00654 return self.BUFFER_EMPTY
00655
00656 while self._buffer.readable() > 0:
00657 cdr = self._buffer.get()
00658 self.onBufferRead(cdr)
00659
00660 self.onSend(cdr)
00661 ret = self._consumer.put(cdr)
00662
00663 if ret != self.PORT_OK:
00664 self._rtcout.RTC_DEBUG("%s = consumer.put()", OpenRTM_aist.DataPortStatus.toString(ret))
00665 return self.invokeListener(ret, cdr)
00666
00667 self.onReceived(cdr)
00668 self._buffer.advanceRptr()
00669
00670 return self.PORT_OK
00671
00672
00673
00674
00675
00676
00677 def pushFifo(self):
00678 self._rtcout.RTC_TRACE("pushFifo()")
00679 if not self._buffer:
00680 return self.PRECONDITION_NOT_MET
00681
00682 if self.bufferIsEmpty():
00683 return self.BUFFER_EMPTY
00684
00685 cdr = self._buffer.get()
00686 self.onBufferRead(cdr)
00687
00688 self.onSend(cdr)
00689 ret = self._consumer.put(cdr)
00690
00691 if ret != self.PORT_OK:
00692 self._rtcout.RTC_DEBUG("%s = consumer.put()",OpenRTM_aist.DataPortStatus.toString(ret))
00693 return self.invokeListener(ret, cdr)
00694
00695 self.onReceived(cdr)
00696 self._buffer.advanceRptr()
00697
00698 return self.PORT_OK
00699
00700
00701
00702
00703
00704
00705 def pushSkip(self):
00706 self._rtcout.RTC_TRACE("pushSkip()")
00707 if not self._buffer:
00708 return self.PRECONDITION_NOT_MET
00709
00710 if self.bufferIsEmpty():
00711 return self.BUFFER_EMPTY
00712
00713 ret = self.PORT_OK
00714 preskip = self._buffer.readable() + self._leftskip
00715 loopcnt = preskip / (self._skipn + 1)
00716 postskip = self._skipn - self._leftskip
00717 for i in range(loopcnt):
00718 self._buffer.advanceRptr(postskip)
00719 cdr = self._buffer.get()
00720 self.onBufferRead(cdr)
00721
00722 self.onSend(cdr)
00723 ret = self._consumer.put(cdr)
00724 if ret != self.PORT_OK:
00725 self._buffer.advanceRptr(-postskip)
00726 self._rtcout.RTC_DEBUG("%s = consumer.put()",OpenRTM_aist.DataPortStatus.toString(ret))
00727 return self.invokeListener(ret, cdr)
00728 self.onReceived(cdr)
00729 postskip = self._skipn + 1
00730
00731 self._buffer.advanceRptr(self._buffer.readable())
00732 self._leftskip = preskip % (self._skipn + 1)
00733
00734 return ret
00735
00736
00737
00738
00739
00740
00741 def pushNew(self):
00742 self._rtcout.RTC_TRACE("pushNew()")
00743 if not self._buffer:
00744 return self.PRECONDITION_NOT_MET
00745
00746 if self.bufferIsEmpty():
00747 return self.BUFFER_EMPTY
00748
00749
00750
00751
00752 self._readback = True
00753
00754 self._buffer.advanceRptr(self._buffer.readable() - 1)
00755
00756 cdr = self._buffer.get()
00757 self.onBufferRead(cdr)
00758
00759 self.onSend(cdr)
00760 ret = self._consumer.put(cdr)
00761
00762 if ret != self.PORT_OK:
00763 self._rtcout.RTC_DEBUG("%s = consumer.put()", OpenRTM_aist.DataPortStatus.toString(ret))
00764 return self.invokeListener(ret, cdr)
00765
00766 self.onReceived(cdr)
00767
00768 self._buffer.advanceRptr()
00769 return self.PORT_OK
00770
00771
00772
00773
00774
00775
00776
00777
00778
00779
00780
00781
00782
00783
00784
00785
00786
00787
00788
00789
00790
00791
00792
00793
00794
00795
00796
00797
00798
00799
00800
00801
00802
00803
00804
00805
00806
00807
00808
00809
00810
00811
00812
00813
00814
00815
00816
00817
00818
00819
00820
00821
00822
00823
00824
00825
00826
00827
00828
00829 def convertReturn(self, status, data):
00830 if status == OpenRTM_aist.BufferStatus.BUFFER_OK:
00831 return self.PORT_OK
00832
00833 elif status == OpenRTM_aist.BufferStatus.BUFFER_ERROR:
00834 return self.BUFFER_ERROR
00835
00836 elif status == OpenRTM_aist.BufferStatus.BUFFER_FULL:
00837 self.onBufferFull(data)
00838 return self.BUFFER_FULL
00839
00840 elif status == OpenRTM_aist.BufferStatus.NOT_SUPPORTED:
00841 return self.PORT_ERROR
00842
00843 elif status == OpenRTM_aist.BufferStatus.TIMEOUT:
00844 self.onBufferWriteTimeout(data)
00845 return self.BUFFER_TIMEOUT
00846
00847 elif status == OpenRTM_aist.BufferStatus.PRECONDITION_NOT_MET:
00848 return self.PRECONDITION_NOT_MET
00849
00850 else:
00851 return self.PORT_ERROR
00852
00853 return self.PORT_ERROR
00854
00855
00856
00857
00858
00859
00860
00861
00862
00863
00864
00865
00866
00867
00868
00869
00870
00871
00872
00873
00874
00875 def invokeListener(self, status, data):
00876
00877
00878
00879 if status == self.PORT_ERROR:
00880 self.onReceiverError(data)
00881 return self.PORT_ERROR
00882
00883 elif status == self.SEND_FULL:
00884 self.onReceiverFull(data)
00885 return self.SEND_FULL
00886
00887 elif status == self.SEND_TIMEOUT:
00888 self.onReceiverTimeout(data)
00889 return self.SEND_TIMEOUT
00890
00891 elif status == self.CONNECTION_LOST:
00892 self.onReceiverError(data)
00893 return self.CONNECTION_LOST
00894
00895 elif status == self.UNKNOWN_ERROR:
00896 self.onReceiverError(data)
00897 return self.UNKNOWN_ERROR
00898
00899 else:
00900 self.onReceiverError(data)
00901 return self.PORT_ERROR
00902
00903
00904
00905
00906
00907
00908
00909
00910
00911
00912
00913 def onBufferWrite(self, data):
00914 if self._listeners is not None and self._profile is not None:
00915 self._listeners.connectorData_[OpenRTM_aist.ConnectorDataListenerType.ON_BUFFER_WRITE].notify(self._profile, data)
00916 return
00917
00918
00919
00920
00921
00922
00923
00924
00925
00926
00927
00928 def onBufferFull(self, data):
00929 if self._listeners is not None and self._profile is not None:
00930 self._listeners.connectorData_[OpenRTM_aist.ConnectorDataListenerType.ON_BUFFER_FULL].notify(self._profile, data)
00931 return
00932
00933
00934
00935
00936
00937
00938
00939
00940
00941
00942
00943 def onBufferWriteTimeout(self, data):
00944 if self._listeners is not None and self._profile is not None:
00945 self._listeners.connectorData_[OpenRTM_aist.ConnectorDataListenerType.ON_BUFFER_WRITE_TIMEOUT].notify(self._profile, data)
00946 return
00947
00948
00949
00950
00951
00952
00953
00954
00955
00956
00957
00958 def onBufferRead(self, data):
00959 if self._listeners is not None and self._profile is not None:
00960 self._listeners.connectorData_[OpenRTM_aist.ConnectorDataListenerType.ON_BUFFER_READ].notify(self._profile, data)
00961 return
00962
00963
00964
00965
00966
00967
00968
00969
00970
00971
00972
00973 def onSend(self, data):
00974 if self._listeners is not None and self._profile is not None:
00975 self._listeners.connectorData_[OpenRTM_aist.ConnectorDataListenerType.ON_SEND].notify(self._profile, data)
00976 return
00977
00978
00979
00980
00981
00982
00983
00984
00985
00986
00987
00988 def onReceived(self, data):
00989 if self._listeners is not None and self._profile is not None:
00990 self._listeners.connectorData_[OpenRTM_aist.ConnectorDataListenerType.ON_RECEIVED].notify(self._profile, data)
00991 return
00992
00993
00994
00995
00996
00997
00998
00999
01000
01001
01002
01003 def onReceiverFull(self, data):
01004 if self._listeners is not None and self._profile is not None:
01005 self._listeners.connectorData_[OpenRTM_aist.ConnectorDataListenerType.ON_RECEIVER_FULL].notify(self._profile, data)
01006 return
01007
01008
01009
01010
01011
01012
01013
01014
01015
01016
01017
01018 def onReceiverTimeout(self, data):
01019 if self._listeners is not None and self._profile is not None:
01020 self._listeners.connectorData_[OpenRTM_aist.ConnectorDataListenerType.ON_RECEIVER_TIMEOUT].notify(self._profile, data)
01021 return
01022
01023
01024
01025
01026
01027
01028
01029
01030
01031
01032
01033 def onReceiverError(self, data):
01034 if self._listeners is not None and self._profile is not None:
01035 self._listeners.connectorData_[OpenRTM_aist.ConnectorDataListenerType.ON_RECEIVER_ERROR].notify(self._profile, data)
01036 return
01037
01038
01039
01040
01041
01042
01043
01044
01045
01046
01047 def onBufferEmpty(self):
01048 if self._listeners is not None and self._profile is not None:
01049 self._listeners.connector_[OpenRTM_aist.ConnectorListenerType.ON_BUFFER_EMPTY].notify(self._profile)
01050 return
01051
01052
01053
01054
01055
01056
01057
01058
01059
01060 def onSenderEmpty(self):
01061 if self._listeners is not None and self._profile is not None:
01062 self._listeners.connector_[OpenRTM_aist.ConnectorListenerType.ON_SENDER_EMPTY].notify(self._profile)
01063 return
01064
01065
01066
01067
01068
01069
01070
01071
01072
01073 def onSenderError(self):
01074 if self._listeners is not None and self._profile is not None:
01075 self._listeners.connector_[OpenRTM_aist.ConnectorListenerType.ON_SENDER_ERROR].notify(self._profile)
01076 return
01077
01078
01079
01080
01081
01082
01083
01084
01085
01086
01087 def bufferIsEmpty(self):
01088 if self._buffer and self._buffer.empty() and not self._readback:
01089 self._rtcout.RTC_DEBUG("buffer empty")
01090 self.onBufferEmpty()
01091 self.onSenderEmpty()
01092 return True
01093
01094 return False
01095
01096
01097
01098 def PublisherPeriodicInit():
01099 OpenRTM_aist.PublisherFactory.instance().addFactory("periodic",
01100 OpenRTM_aist.PublisherPeriodic,
01101 OpenRTM_aist.Delete)