Go to the documentation of this file.00001
00020 #ifndef RTC_RINGBUFFER_H
00021 #define RTC_RINGBUFFER_H
00022
00023 #include <vector>
00024 #include <algorithm>
00025 #include <iostream>
00026
00027 #include <coil/TimeValue.h>
00028 #include <coil/Mutex.h>
00029 #include <coil/Guard.h>
00030 #include <coil/Condition.h>
00031 #include <coil/stringutil.h>
00032
00033 #include <rtm/BufferBase.h>
00034 #include <rtm/BufferStatus.h>
00035
00036 #define RINGBUFFER_DEFAULT_LENGTH 8
00037
00051 namespace RTC
00052 {
00088 template <class DataType>
00089 class RingBuffer
00090 : public BufferBase<DataType>
00091 {
00092 public:
00093 BUFFERSTATUS_ENUM
00094 typedef coil::Guard<coil::Mutex> Guard;
00118 RingBuffer(long int length = RINGBUFFER_DEFAULT_LENGTH)
00119 : m_overwrite(true), m_readback(true),
00120 m_timedwrite(false), m_timedread(false),
00121 m_wtimeout(1, 0), m_rtimeout(1, 0),
00122 m_length(length),
00123 m_wpos(0), m_rpos(0), m_fillcount(0), m_wcount(0),
00124 m_buffer(m_length)
00125 {
00126 this->reset();
00127 }
00128
00144 virtual ~RingBuffer(void)
00145 {
00146 }
00147
00187 virtual void init(const coil::Properties& prop)
00188 {
00189 initLength(prop);
00190 initWritePolicy(prop);
00191 initReadPolicy(prop);
00192 }
00193
00214 virtual size_t length(void) const
00215 {
00216 Guard guard(m_posmutex);
00217 return m_length;
00218 }
00219
00242 virtual ReturnCode length(size_t n)
00243 {
00244 m_buffer.resize(n);
00245 m_length = n;
00246 this->reset();
00247 return ::RTC::BufferStatus::BUFFER_OK;
00248 }
00249
00272 virtual ReturnCode reset()
00273 {
00274 Guard guard(m_posmutex);
00275 m_fillcount = 0;
00276 m_wcount = 0;
00277 m_wpos = 0;
00278 m_rpos = 0;
00279 return ::RTC::BufferStatus::BUFFER_OK;
00280 }
00281
00282
00283
00284
00305 virtual DataType* wptr(long int n = 0)
00306 {
00307 Guard guard(m_posmutex);
00308 return &m_buffer[(m_wpos + n + m_length) % m_length];
00309 }
00310
00334 virtual ReturnCode advanceWptr(long int n = 1)
00335 {
00336
00337
00338
00339
00340
00341
00342
00343 if ((n > 0 && n > static_cast<long int>(m_length - m_fillcount)) ||
00344 (n < 0 && n < static_cast<long int>(-m_fillcount)))
00345 {
00346 return ::RTC::BufferStatus::PRECONDITION_NOT_MET;
00347 }
00348
00349 Guard guard(m_posmutex);
00350 m_wpos = (m_wpos + n + m_length) % m_length;
00351 m_fillcount += n;
00352 m_wcount += n;
00353 return ::RTC::BufferStatus::BUFFER_OK;
00354 }
00382 virtual ReturnCode put(const DataType& value)
00383 {
00384 Guard guard(m_posmutex);
00385 m_buffer[m_wpos] = value;
00386 return ::RTC::BufferStatus::BUFFER_OK;
00387 }
00388
00430 virtual ReturnCode write(const DataType& value,
00431 long int sec = -1, long int nsec = 0)
00432 {
00433 {
00434 Guard guard(m_full.mutex);
00435
00436 if (full())
00437 {
00438
00439 bool timedwrite(m_timedwrite);
00440 bool overwrite(m_overwrite);
00441
00442 if (!(sec < 0))
00443 {
00444 timedwrite = true;
00445 overwrite = false;
00446 }
00447
00448 if (overwrite && !timedwrite)
00449 {
00450 advanceRptr();
00451 }
00452 else if (!overwrite && !timedwrite)
00453 {
00454 return ::RTC::BufferStatus::BUFFER_FULL;
00455 }
00456 else if (!overwrite && timedwrite)
00457 {
00458 if (sec < 0)
00459 {
00460 sec = m_wtimeout.sec();
00461 nsec = m_wtimeout.usec() * 1000;
00462 }
00463
00464 if (!m_full.cond.wait(sec, nsec))
00465 {
00466 return ::RTC::BufferStatus::TIMEOUT;
00467 }
00468 }
00469 else
00470 {
00471 return ::RTC::BufferStatus::PRECONDITION_NOT_MET;
00472 }
00473 }
00474 }
00475
00476 put(value);
00477
00478 if (empty())
00479 {
00480 Guard eguard(m_empty.mutex);
00481 advanceWptr(1);
00482 m_empty.cond.signal();
00483 }
00484 else
00485 {
00486 advanceWptr(1);
00487 }
00488 return ::RTC::BufferStatus::BUFFER_OK;
00489 }
00490
00512 virtual size_t writable() const
00513 {
00514 Guard guard(m_posmutex);
00515 return m_length - m_fillcount;
00516 }
00517
00537 virtual bool full(void) const
00538 {
00539 Guard guard(m_posmutex);
00540 return m_length == m_fillcount;
00541 }
00542
00543
00564 virtual DataType* rptr(long int n = 0)
00565 {
00566 Guard guard(m_posmutex);
00567 return &(m_buffer[(m_rpos + n + m_length) % m_length]);
00568 }
00569
00591 virtual ReturnCode advanceRptr(long int n = 1)
00592 {
00593
00594
00595
00596
00597
00598
00599 if ((n > 0 && n > static_cast<long int>(m_fillcount)) ||
00600 (n < 0 && n < static_cast<long int>(m_fillcount - m_length)))
00601 {
00602 return ::RTC::BufferStatus::PRECONDITION_NOT_MET;
00603 }
00604
00605 Guard guard(m_posmutex);
00606 m_rpos = (m_rpos + n + m_length) % m_length;
00607 m_fillcount -= n;
00608 return ::RTC::BufferStatus::BUFFER_OK;
00609 }
00610
00635 virtual ReturnCode get(DataType& value)
00636 {
00637 Guard gaurd(m_posmutex);
00638 value = m_buffer[m_rpos];
00639 return ::RTC::BufferStatus::BUFFER_OK;
00640 }
00641
00642
00660 virtual DataType& get()
00661 {
00662 Guard gaurd(m_posmutex);
00663 return m_buffer[m_rpos];
00664 }
00665
00666
00708 virtual ReturnCode read(DataType& value,
00709 long int sec = -1, long int nsec = 0)
00710 {
00711 {
00712 Guard gaurd(m_empty.mutex);
00713
00714 if (empty())
00715 {
00716 bool timedread(m_timedread);
00717 bool readback(m_readback);
00718
00719 if (!(sec < 0))
00720 {
00721 timedread = true;
00722 readback = false;
00723 sec = m_rtimeout.sec();
00724 nsec = m_rtimeout.usec() * 1000;
00725 }
00726
00727 if (readback && !timedread)
00728 {
00729 if (!(m_wcount > 0))
00730 {
00731 return ::RTC::BufferStatus::BUFFER_EMPTY;
00732 }
00733 advanceRptr(-1);
00734 }
00735 else if (!readback && !timedread)
00736 {
00737 return ::RTC::BufferStatus::BUFFER_EMPTY;
00738 }
00739 else if (!readback && timedread)
00740 {
00741 if (sec < 0)
00742 {
00743 sec = m_rtimeout.sec();
00744 nsec = m_rtimeout.usec() * 1000;
00745 }
00746
00747 if (!m_empty.cond.wait(sec, nsec))
00748 {
00749 return ::RTC::BufferStatus::TIMEOUT;
00750 }
00751 }
00752 else
00753 {
00754 return ::RTC::BufferStatus::PRECONDITION_NOT_MET;
00755 }
00756 }
00757 }
00758
00759 get(value);
00760
00761 if (full())
00762 {
00763 Guard fguard(m_full.mutex);
00764 advanceRptr(1);
00765 m_full.cond.signal();
00766 }
00767 else
00768 {
00769 advanceRptr(1);
00770 }
00771 return ::RTC::BufferStatus::BUFFER_OK;
00772 }
00773
00798 virtual size_t readable() const
00799 {
00800 Guard guard(m_posmutex);
00801 return m_fillcount;
00802 }
00803
00823 virtual bool empty(void) const
00824 {
00825 Guard guard(m_posmutex);
00826 return m_fillcount == 0;
00827 }
00828
00829 private:
00830 inline void initLength(const coil::Properties& prop)
00831 {
00832 if (!prop["length"].empty())
00833 {
00834 size_t n;
00835 if (coil::stringTo(n, prop["length"].c_str()))
00836 {
00837 if (n > 0)
00838 {
00839 this->length(n);
00840 }
00841 }
00842 }
00843 }
00844
00845 inline void initWritePolicy(const coil::Properties& prop)
00846 {
00847 std::string policy(prop["write.full_policy"]);
00848 coil::normalize(policy);
00849 if (policy == "overwrite")
00850 {
00851 m_overwrite = true;
00852 m_timedwrite = false;
00853 }
00854 else if (policy == "do_nothing")
00855 {
00856 m_overwrite = false;
00857 m_timedwrite = false;
00858 }
00859 else if (policy == "block")
00860 {
00861 m_overwrite = false;
00862 m_timedwrite = true;
00863
00864 double tm;
00865 if (coil::stringTo(tm, prop["write.timeout"].c_str()))
00866 {
00867 if (!(tm < 0))
00868 {
00869 m_wtimeout = tm;
00870 }
00871 }
00872 }
00873 }
00874
00875 inline void initReadPolicy(const coil::Properties& prop)
00876 {
00877 std::string policy(prop["read.empty_policy"]);
00878 if (policy == "readback")
00879 {
00880 m_readback = true;
00881 m_timedread = false;
00882 }
00883 else if (policy == "do_nothing")
00884 {
00885 m_readback = false;
00886 m_timedread = false;
00887 }
00888 else if (policy == "block")
00889 {
00890 m_readback = false;
00891 m_timedread = true;
00892 double tm;
00893 if (coil::stringTo(tm, prop["read.timeout"].c_str()))
00894 {
00895 m_rtimeout = tm;
00896 }
00897 }
00898 }
00899
00900 private:
00908 bool m_overwrite;
00909
00917 bool m_readback;
00918
00926 bool m_timedwrite;
00934 bool m_timedread;
00935
00943 coil::TimeValue m_wtimeout;
00944
00952 coil::TimeValue m_rtimeout;
00953
00961 size_t m_length;
00962
00970 size_t m_wpos;
00971
00979 size_t m_rpos;
00980
00988 size_t m_fillcount;
00989
00997 size_t m_wcount;
00998
01006 std::vector<DataType> m_buffer;
01007
01015 struct condition
01016 {
01017 condition() : cond(mutex) {}
01018 coil::Condition<coil::Mutex> cond;
01019 coil::Mutex mutex;
01020 };
01021
01029 mutable coil::Mutex m_posmutex;
01030
01038 condition m_empty;
01039
01047 condition m_full;
01048 };
01049 };
01050
01051 #endif // RTC_RINGBUFFER_H