RingBuffer.h
Go to the documentation of this file.
00001 // -*- C++ -*-
00020 #ifndef RTC_RINGBUFFER_H
00021 #define RTC_RINGBUFFER_H
00022 
00023 #include <vector>
00024 #include <algorithm>
00025 #include <iostream>
00026 
00027 #include <coil/TimeValue.h>
00028 #include <coil/Mutex.h>
00029 #include <coil/Guard.h>
00030 #include <coil/Condition.h>
00031 #include <coil/stringutil.h>
00032 
00033 #include <rtm/BufferBase.h>
00034 #include <rtm/BufferStatus.h>
00035 
00036 #define RINGBUFFER_DEFAULT_LENGTH 8
00037 
00051 namespace RTC
00052 {
00088   template <class DataType>
00089   class RingBuffer
00090     : public BufferBase<DataType>
00091   {
00092   public:
00093     BUFFERSTATUS_ENUM
00094     typedef coil::Guard<coil::Mutex> Guard;
00118     RingBuffer(long int length = RINGBUFFER_DEFAULT_LENGTH)
00119       : m_overwrite(true), m_readback(true),
00120         m_timedwrite(false), m_timedread(false),
00121         m_wtimeout(1, 0), m_rtimeout(1, 0),
00122         m_length(length),
00123         m_wpos(0), m_rpos(0), m_fillcount(0), m_wcount(0),
00124         m_buffer(m_length)
00125     {
00126       this->reset();
00127     }
00128     
00144     virtual ~RingBuffer(void)
00145     {
00146     }
00147     
00187     virtual void init(const coil::Properties& prop)
00188     {
00189       initLength(prop);
00190       initWritePolicy(prop);
00191       initReadPolicy(prop);
00192     }
00193     
00214     virtual size_t length(void) const
00215     {
00216       Guard guard(m_posmutex);
00217       return m_length;
00218     }
00219     
00242     virtual ReturnCode length(size_t n)
00243     {
00244       m_buffer.resize(n);
00245       m_length = n;
00246       this->reset();
00247       return ::RTC::BufferStatus::BUFFER_OK; //BUFFER_OK;
00248     }
00249     
00272     virtual ReturnCode reset()
00273     {
00274       Guard guard(m_posmutex);
00275       m_fillcount = 0;
00276       m_wcount = 0;
00277       m_wpos = 0;
00278       m_rpos = 0;
00279       return ::RTC::BufferStatus::BUFFER_OK;
00280     }
00281     
00282     
00283     
00284     //----------------------------------------------------------------------
00305     virtual DataType* wptr(long int n = 0) 
00306     {
00307       Guard guard(m_posmutex);
00308       return &m_buffer[(m_wpos + n + m_length) % m_length];
00309     }
00310     
00334     virtual ReturnCode advanceWptr(long int n = 1)
00335     {
00336       // n > 0 :
00337       //     n satisfies n <= writable elements
00338       //                 n <= m_length - m_fillcout
00339       // n < 0 : -n = n'
00340       //     n satisfies n'<= readable elements
00341       //                 n'<= m_fillcount
00342       //                 n >= - m_fillcount
00343       if ((n > 0 && n > static_cast<long int>(m_length - m_fillcount)) ||
00344           (n < 0 && n < static_cast<long int>(-m_fillcount)))
00345         {
00346           return ::RTC::BufferStatus::PRECONDITION_NOT_MET;
00347         }
00348 
00349       Guard guard(m_posmutex);
00350       m_wpos = (m_wpos + n + m_length) % m_length;
00351       m_fillcount += n;
00352       m_wcount += n;
00353       return ::RTC::BufferStatus::BUFFER_OK;
00354     }
00382     virtual ReturnCode put(const DataType& value)
00383     {
00384       Guard guard(m_posmutex);
00385       m_buffer[m_wpos] = value;
00386       return ::RTC::BufferStatus::BUFFER_OK;
00387     }
00388     
00430     virtual ReturnCode write(const DataType& value,
00431                              long int sec = -1, long int nsec = 0)
00432     {
00433       {
00434       Guard guard(m_full.mutex);
00435         
00436       if (full())
00437         {
00438           
00439           bool timedwrite(m_timedwrite);
00440           bool overwrite(m_overwrite);
00441 
00442           if (!(sec < 0)) // if second arg is set -> block mode
00443             {
00444               timedwrite = true;
00445               overwrite  = false;
00446             }
00447 
00448           if (overwrite && !timedwrite)       // "overwrite" mode
00449             {
00450               advanceRptr();
00451             }
00452           else if (!overwrite && !timedwrite) // "do_nothing" mode
00453             {
00454               return ::RTC::BufferStatus::BUFFER_FULL;
00455             }
00456           else if (!overwrite && timedwrite)  // "block" mode
00457             {
00458               if (sec < 0)
00459                 {
00460                   sec = m_wtimeout.sec();
00461                   nsec = m_wtimeout.usec() * 1000;
00462                 }
00463               //  true: signaled, false: timeout
00464               if (!m_full.cond.wait(sec, nsec))
00465                 {
00466                   return ::RTC::BufferStatus::TIMEOUT;
00467                 }
00468             }
00469           else                                    // unknown condition
00470             {
00471               return ::RTC::BufferStatus::PRECONDITION_NOT_MET;
00472             }
00473         }
00474       }      
00475     
00476       put(value);
00477       
00478       if (empty())
00479         {
00480           Guard eguard(m_empty.mutex);
00481           advanceWptr(1);
00482           m_empty.cond.signal();
00483         }
00484       else
00485         {
00486           advanceWptr(1);
00487         }
00488       return ::RTC::BufferStatus::BUFFER_OK;
00489     }
00490     
00512     virtual size_t writable() const
00513     {
00514       Guard guard(m_posmutex);
00515       return m_length - m_fillcount;
00516     }
00517     
00537     virtual bool full(void) const
00538     {
00539       Guard guard(m_posmutex);
00540       return m_length == m_fillcount;
00541     }
00542     
00543     //----------------------------------------------------------------------
00564     virtual DataType* rptr(long int n = 0)
00565     {
00566       Guard guard(m_posmutex);
00567       return &(m_buffer[(m_rpos + n + m_length) % m_length]);
00568     }
00569     
00591     virtual ReturnCode advanceRptr(long int n = 1)
00592     {
00593       // n > 0 :
00594       //     n satisfies n <= readable elements
00595       //                 n <= m_fillcout 
00596       // n < 0 : -n = n'
00597       //     n satisfies n'<= m_length - m_fillcount
00598       //                 n >= m_fillcount - m_length
00599       if ((n > 0 && n > static_cast<long int>(m_fillcount)) ||
00600           (n < 0 && n < static_cast<long int>(m_fillcount - m_length)))
00601         {
00602           return ::RTC::BufferStatus::PRECONDITION_NOT_MET;
00603         }
00604 
00605       Guard guard(m_posmutex);
00606       m_rpos = (m_rpos + n + m_length) % m_length;
00607       m_fillcount -= n;
00608       return ::RTC::BufferStatus::BUFFER_OK;
00609     }
00610     
00635     virtual ReturnCode get(DataType& value)
00636     {
00637       Guard gaurd(m_posmutex);
00638       value = m_buffer[m_rpos];
00639       return ::RTC::BufferStatus::BUFFER_OK;
00640     }
00641     
00642     
00660     virtual DataType& get()
00661     {
00662       Guard gaurd(m_posmutex);
00663       return m_buffer[m_rpos];
00664     }
00665     
00666     
00708     virtual ReturnCode read(DataType& value,
00709                             long int sec = -1, long int nsec = 0)
00710     {
00711       {
00712       Guard gaurd(m_empty.mutex);
00713       
00714       if (empty())
00715         {
00716           bool timedread(m_timedread);
00717           bool readback(m_readback);
00718 
00719           if (!(sec < 0)) // if second arg is set -> block mode
00720             {
00721               timedread = true;
00722               readback  = false;
00723               sec = m_rtimeout.sec();
00724               nsec = m_rtimeout.usec() * 1000;
00725             }
00726 
00727           if (readback && !timedread)       // "readback" mode
00728             {
00729               if (!(m_wcount > 0))
00730                 {
00731                   return ::RTC::BufferStatus::BUFFER_EMPTY;
00732                 }
00733               advanceRptr(-1);
00734             }
00735           else if (!readback && !timedread) // "do_nothing" mode
00736             {
00737               return ::RTC::BufferStatus::BUFFER_EMPTY;
00738             }
00739           else if (!readback && timedread)  // "block" mode
00740             {
00741               if (sec < 0)
00742                 {
00743                   sec = m_rtimeout.sec();
00744                   nsec = m_rtimeout.usec() * 1000;
00745                 }
00746               //  true: signaled, false: timeout
00747               if (!m_empty.cond.wait(sec, nsec))
00748                 {
00749                   return ::RTC::BufferStatus::TIMEOUT;
00750                 }
00751             }
00752           else                                    // unknown condition
00753             {
00754               return ::RTC::BufferStatus::PRECONDITION_NOT_MET;
00755             }
00756         }
00757       }
00758       
00759       get(value);
00760 
00761       if (full())
00762         {
00763           Guard fguard(m_full.mutex);
00764           advanceRptr(1);
00765           m_full.cond.signal();
00766         }
00767       else
00768         {
00769           advanceRptr(1);
00770         }
00771       return ::RTC::BufferStatus::BUFFER_OK;
00772     }
00773     
00798     virtual size_t readable() const
00799     {
00800       Guard guard(m_posmutex);
00801       return m_fillcount;
00802     }
00803     
00823     virtual bool empty(void) const
00824     {
00825       Guard guard(m_posmutex);
00826       return m_fillcount == 0;
00827     }
00828     
00829   private:
00830     inline void initLength(const coil::Properties& prop)
00831     {
00832       if (!prop["length"].empty())
00833         {
00834           size_t n;
00835           if (coil::stringTo(n, prop["length"].c_str()))
00836             {
00837               if (n > 0)
00838                 {
00839                   this->length(n);
00840                 }
00841             }
00842         }
00843     }
00844     
00845     inline void initWritePolicy(const coil::Properties& prop)
00846     {
00847       std::string policy(prop["write.full_policy"]);
00848       coil::normalize(policy);
00849       if (policy == "overwrite")
00850         {
00851           m_overwrite = true;
00852           m_timedwrite = false;
00853         }
00854       else if (policy == "do_nothing")
00855         {
00856           m_overwrite = false;
00857           m_timedwrite = false;
00858         }
00859       else if (policy == "block")
00860         {
00861           m_overwrite = false;
00862           m_timedwrite = true;
00863           
00864           double tm;
00865           if (coil::stringTo(tm, prop["write.timeout"].c_str()))
00866             {
00867               if (!(tm < 0))
00868                 {
00869                   m_wtimeout = tm;
00870                 }
00871             }
00872         }
00873     }
00874     
00875     inline void initReadPolicy(const coil::Properties& prop)
00876     {
00877       std::string policy(prop["read.empty_policy"]);
00878       if (policy == "readback")
00879         {
00880           m_readback = true;
00881           m_timedread = false;
00882         }
00883       else if (policy == "do_nothing")
00884         {
00885           m_readback = false;
00886           m_timedread = false;
00887         }
00888       else if (policy == "block")
00889         {
00890           m_readback = false;
00891           m_timedread = true;
00892           double tm;
00893           if (coil::stringTo(tm, prop["read.timeout"].c_str()))
00894             {
00895               m_rtimeout = tm;
00896             }
00897         }
00898     }
00899     
00900   private:
00908     bool m_overwrite;
00909 
00917     bool m_readback;
00918 
00926     bool m_timedwrite;
00934     bool m_timedread;
00935 
00943     coil::TimeValue m_wtimeout;
00944 
00952     coil::TimeValue m_rtimeout;
00953 
00961     size_t m_length;
00962 
00970     size_t m_wpos;
00971 
00979     size_t m_rpos;
00980 
00988     size_t m_fillcount;
00989 
00997     size_t m_wcount;
00998 
01006     std::vector<DataType> m_buffer;
01007     
01015     struct condition
01016     {
01017       condition() : cond(mutex) {}
01018       coil::Condition<coil::Mutex> cond;
01019       coil::Mutex mutex;
01020     };
01021     
01029     mutable coil::Mutex m_posmutex;
01030 
01038     condition m_empty;
01039 
01047     condition m_full;
01048   };
01049 }; // namespace RTC
01050 
01051 #endif // RTC_RINGBUFFER_H
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Properties Friends Defines


openrtm
Author(s): AIST, ros package is maintained by Kei Okada
autogenerated on Thu Jun 27 2013 14:57:51