RingBuffer.h
Go to the documentation of this file.
00001 // -*- C++ -*-
00020 #ifndef RTC_RINGBUFFER_H
00021 #define RTC_RINGBUFFER_H
00022 
00023 #include <vector>
00024 #include <algorithm>
00025 #include <iostream>
00026 
00027 #include <coil/TimeValue.h>
00028 #include <coil/Mutex.h>
00029 #include <coil/Guard.h>
00030 #include <coil/Condition.h>
00031 #include <coil/stringutil.h>
00032 
00033 #include <rtm/BufferBase.h>
00034 #include <rtm/BufferStatus.h>
00035 
00036 #define RINGBUFFER_DEFAULT_LENGTH 8
00037 
00051 namespace RTC
00052 {
00088   template <class DataType>
00089   class RingBuffer
00090     : public BufferBase<DataType>
00091   {
00092   public:
00093     BUFFERSTATUS_ENUM
00094     typedef coil::Guard<coil::Mutex> Guard;
00118     RingBuffer(long int length = RINGBUFFER_DEFAULT_LENGTH)
00119       : m_overwrite(true), m_readback(true),
00120         m_timedwrite(false), m_timedread(false),
00121         m_wtimeout(1, 0), m_rtimeout(1, 0),
00122         m_length(length),
00123         m_wpos(0), m_rpos(0), m_fillcount(0), m_wcount(0),
00124         m_buffer(m_length)
00125     {
00126       this->reset();
00127     }
00128     
00144     virtual ~RingBuffer(void)
00145     {
00146     }
00147     
00187     virtual void init(const coil::Properties& prop)
00188     {
00189       initLength(prop);
00190       initWritePolicy(prop);
00191       initReadPolicy(prop);
00192     }
00193     
00214     virtual size_t length(void) const
00215     {
00216       Guard guard(m_posmutex);
00217       return m_length;
00218     }
00219     
00242     virtual ReturnCode length(size_t n)
00243     {
00244       m_buffer.resize(n);
00245       m_length = n;
00246       this->reset();
00247       return ::RTC::BufferStatus::BUFFER_OK; //BUFFER_OK;
00248     }
00249     
00272     virtual ReturnCode reset()
00273     {
00274       Guard guard(m_posmutex);
00275       m_fillcount = 0;
00276       m_wcount = 0;
00277       m_wpos = 0;
00278       m_rpos = 0;
00279       return ::RTC::BufferStatus::BUFFER_OK;
00280     }
00281     
00282     
00283     
00284     //----------------------------------------------------------------------
00305     virtual DataType* wptr(long int n = 0) 
00306     {
00307       Guard guard(m_posmutex);
00308       return &m_buffer[(m_wpos + n + m_length) % m_length];
00309     }
00310     
00334     virtual ReturnCode advanceWptr(long int n = 1)
00335     {
00336       // n > 0 :
00337       //     n satisfies n <= writable elements
00338       //                 n <= m_length - m_fillcout
00339       // n < 0 : -n = n'
00340       //     n satisfies n'<= readable elements
00341       //                 n'<= m_fillcount
00342       //                 n >= - m_fillcount
00343       Guard guard(m_posmutex);
00344       if ((n > 0 && n > static_cast<long int>(m_length - m_fillcount)) ||
00345           (n < 0 && n < static_cast<long int>(-m_fillcount)))
00346         {
00347           return ::RTC::BufferStatus::PRECONDITION_NOT_MET;
00348         }
00349 
00350       m_wpos = (m_wpos + n + m_length) % m_length;
00351       m_fillcount += n;
00352       m_wcount += n;
00353       return ::RTC::BufferStatus::BUFFER_OK;
00354     }
00382     virtual ReturnCode put(const DataType& value)
00383     {
00384       Guard guard(m_posmutex);
00385       m_buffer[m_wpos] = value;
00386       return ::RTC::BufferStatus::BUFFER_OK;
00387     }
00388     
00430     virtual ReturnCode write(const DataType& value,
00431                              long int sec = -1, long int nsec = 0)
00432     {
00433       {
00434       Guard guard(m_full.mutex);
00435         
00436       if (full())
00437         {
00438           
00439           bool timedwrite(m_timedwrite);
00440           bool overwrite(m_overwrite);
00441 
00442           if (!(sec < 0)) // if second arg is set -> block mode
00443             {
00444               timedwrite = true;
00445               overwrite  = false;
00446             }
00447 
00448           if (overwrite && !timedwrite)       // "overwrite" mode
00449             {
00450               advanceRptr();
00451             }
00452           else if (!overwrite && !timedwrite) // "do_nothing" mode
00453             {
00454               return ::RTC::BufferStatus::BUFFER_FULL;
00455             }
00456           else if (!overwrite && timedwrite)  // "block" mode
00457             {
00458               if (sec < 0)
00459                 {
00460                   sec = m_wtimeout.sec();
00461                   nsec = m_wtimeout.usec() * 1000;
00462                 }
00463               //  true: signaled, false: timeout
00464               if (!m_full.cond.wait(sec, nsec))
00465                 {
00466                   return ::RTC::BufferStatus::TIMEOUT;
00467                 }
00468             }
00469           else                                    // unknown condition
00470             {
00471               return ::RTC::BufferStatus::PRECONDITION_NOT_MET;
00472             }
00473         }
00474       }      
00475     
00476       put(value);
00477 
00478           {
00479                 Guard eguard(m_empty.mutex);
00480                 if (empty())
00481                   {
00482                         // Guard eguard(m_empty.mutex);
00483                         advanceWptr(1);
00484                         m_empty.cond.signal();
00485                   }
00486                 else
00487                   {
00488                         advanceWptr(1);
00489                   }
00490           }
00491       return ::RTC::BufferStatus::BUFFER_OK;
00492     }
00493     
00515     virtual size_t writable() const
00516     {
00517       Guard guard(m_posmutex);
00518       return m_length - m_fillcount;
00519     }
00520     
00540     virtual bool full(void) const
00541     {
00542       Guard guard(m_posmutex);
00543       return m_length == m_fillcount;
00544     }
00545     
00546     //----------------------------------------------------------------------
00567     virtual DataType* rptr(long int n = 0)
00568     {
00569       Guard guard(m_posmutex);
00570       return &(m_buffer[(m_rpos + n + m_length) % m_length]);
00571     }
00572     
00594     virtual ReturnCode advanceRptr(long int n = 1)
00595     {
00596       // n > 0 :
00597       //     n satisfies n <= readable elements
00598       //                 n <= m_fillcout 
00599       // n < 0 : -n = n'
00600       //     n satisfies n'<= m_length - m_fillcount
00601       //                 n >= m_fillcount - m_length
00602       Guard guard(m_posmutex);
00603       if ((n > 0 && n > static_cast<long int>(m_fillcount)) ||
00604           (n < 0 && n < static_cast<long int>(m_fillcount - m_length)))
00605         {
00606           return ::RTC::BufferStatus::PRECONDITION_NOT_MET;
00607         }
00608 
00609       m_rpos = (m_rpos + n + m_length) % m_length;
00610       m_fillcount -= n;
00611       return ::RTC::BufferStatus::BUFFER_OK;
00612     }
00613     
00638     virtual ReturnCode get(DataType& value)
00639     {
00640       Guard gaurd(m_posmutex);
00641       value = m_buffer[m_rpos];
00642       return ::RTC::BufferStatus::BUFFER_OK;
00643     }
00644     
00645     
00663     virtual DataType& get()
00664     {
00665       Guard gaurd(m_posmutex);
00666       return m_buffer[m_rpos];
00667     }
00668     
00669     
00711     virtual ReturnCode read(DataType& value,
00712                             long int sec = -1, long int nsec = 0)
00713     {
00714       {
00715       Guard gaurd(m_empty.mutex);
00716       
00717       if (empty())
00718         {
00719           bool timedread(m_timedread);
00720           bool readback(m_readback);
00721 
00722           if (!(sec < 0)) // if second arg is set -> block mode
00723             {
00724               timedread = true;
00725               readback  = false;
00726               sec = m_rtimeout.sec();
00727               nsec = m_rtimeout.usec() * 1000;
00728             }
00729 
00730           if (readback && !timedread)       // "readback" mode
00731             {
00732               if (!(m_wcount > 0))
00733                 {
00734                   return ::RTC::BufferStatus::BUFFER_EMPTY;
00735                 }
00736               advanceRptr(-1);
00737             }
00738           else if (!readback && !timedread) // "do_nothing" mode
00739             {
00740               return ::RTC::BufferStatus::BUFFER_EMPTY;
00741             }
00742           else if (!readback && timedread)  // "block" mode
00743             {
00744               if (sec < 0)
00745                 {
00746                   sec = m_rtimeout.sec();
00747                   nsec = m_rtimeout.usec() * 1000;
00748                 }
00749               //  true: signaled, false: timeout
00750               if (!m_empty.cond.wait(sec, nsec))
00751                 {
00752                   return ::RTC::BufferStatus::TIMEOUT;
00753                 }
00754             }
00755           else                                    // unknown condition
00756             {
00757               return ::RTC::BufferStatus::PRECONDITION_NOT_MET;
00758             }
00759         }
00760       }
00761       
00762       get(value);
00763 
00764           {
00765                 Guard fguard(m_full.mutex);
00766                 if (full())
00767                   {
00768                         // Guard fguard(m_full.mutex);
00769                         advanceRptr(1);
00770                         m_full.cond.signal();
00771                   }
00772                 else
00773                   {
00774                         advanceRptr(1);
00775                   }
00776           }
00777       return ::RTC::BufferStatus::BUFFER_OK;
00778     }
00779     
00804     virtual size_t readable() const
00805     {
00806       Guard guard(m_posmutex);
00807       return m_fillcount;
00808     }
00809     
00829     virtual bool empty(void) const
00830     {
00831       Guard guard(m_posmutex);
00832       return m_fillcount == 0;
00833     }
00834     
00835   private:
00836     inline void initLength(const coil::Properties& prop)
00837     {
00838       if (!prop["length"].empty())
00839         {
00840           size_t n;
00841           if (coil::stringTo(n, prop["length"].c_str()))
00842             {
00843               if (n > 0)
00844                 {
00845                   this->length(n);
00846                 }
00847             }
00848         }
00849     }
00850     
00851     inline void initWritePolicy(const coil::Properties& prop)
00852     {
00853       std::string policy(prop["write.full_policy"]);
00854       coil::normalize(policy);
00855       if (policy == "overwrite")
00856         {
00857           m_overwrite = true;
00858           m_timedwrite = false;
00859         }
00860       else if (policy == "do_nothing")
00861         {
00862           m_overwrite = false;
00863           m_timedwrite = false;
00864         }
00865       else if (policy == "block")
00866         {
00867           m_overwrite = false;
00868           m_timedwrite = true;
00869           
00870           double tm;
00871           if (coil::stringTo(tm, prop["write.timeout"].c_str()))
00872             {
00873               if (!(tm < 0))
00874                 {
00875                   m_wtimeout = tm;
00876                 }
00877             }
00878         }
00879     }
00880     
00881     inline void initReadPolicy(const coil::Properties& prop)
00882     {
00883       std::string policy(prop["read.empty_policy"]);
00884       if (policy == "readback")
00885         {
00886           m_readback = true;
00887           m_timedread = false;
00888         }
00889       else if (policy == "do_nothing")
00890         {
00891           m_readback = false;
00892           m_timedread = false;
00893         }
00894       else if (policy == "block")
00895         {
00896           m_readback = false;
00897           m_timedread = true;
00898           double tm;
00899           if (coil::stringTo(tm, prop["read.timeout"].c_str()))
00900             {
00901               m_rtimeout = tm;
00902             }
00903         }
00904     }
00905     
00906   private:
00914     bool m_overwrite;
00915 
00923     bool m_readback;
00924 
00932     bool m_timedwrite;
00940     bool m_timedread;
00941 
00949     coil::TimeValue m_wtimeout;
00950 
00958     coil::TimeValue m_rtimeout;
00959 
00967     size_t m_length;
00968 
00976     size_t m_wpos;
00977 
00985     size_t m_rpos;
00986 
00994     size_t m_fillcount;
00995 
01003     size_t m_wcount;
01004 
01012     std::vector<DataType> m_buffer;
01013     
01021     struct condition
01022     {
01023       condition() : cond(mutex) {}
01024       coil::Condition<coil::Mutex> cond;
01025       coil::Mutex mutex;
01026     };
01027     
01035     mutable coil::Mutex m_posmutex;
01036 
01044     condition m_empty;
01045 
01053     condition m_full;
01054   };
01055 }; // namespace RTC
01056 
01057 #endif // RTC_RINGBUFFER_H


openrtm_aist
Author(s): Noriaki Ando
autogenerated on Thu Aug 27 2015 14:16:38