OVR_ThreadCommandQueue.cpp
Go to the documentation of this file.
00001 /************************************************************************************
00002 
00003 PublicHeader:   None
00004 Filename    :   OVR_ThreadCommandQueue.cpp
00005 Content     :   Command queue for operations executed on a thread
00006 Created     :   October 29, 2012
00007 
00008 Copyright   :   Copyright 2012 Oculus VR, Inc. All Rights reserved.
00009 
00010 Use of this software is subject to the terms of the Oculus license
00011 agreement provided at the time of installation or download, or which
00012 otherwise accompanies this software in either electronic or hard copy form.
00013 
00014 ************************************************************************************/
00015 
00016 #include "OVR_ThreadCommandQueue.h"
00017 
00018 namespace OVR {
00019 
00020 
00021 //------------------------------------------------------------------------
00022 // ***** CircularBuffer
00023 
00024 // CircularBuffer is a FIFO buffer implemented in a single block of memory,
00025 // which allows writing and reading variable-size data chucks. Write fails
00026 // if buffer is full.
00027 
00028 class CircularBuffer
00029 {
00030     enum {
00031         AlignSize = 16,
00032         AlignMask = AlignSize - 1
00033     };
00034 
00035     UByte*  pBuffer;
00036     UPInt   Size;
00037     UPInt   Tail;   // Byte offset of next item to be popped.
00038     UPInt   Head;   // Byte offset of where next push will take place.
00039     UPInt   End;    // When Head < Tail, this is used instead of Size.    
00040 
00041     inline UPInt roundUpSize(UPInt size)
00042     { return (size + AlignMask) & ~(UPInt)AlignMask; }
00043 
00044 public:
00045 
00046     CircularBuffer(UPInt size)
00047         : Size(size), Tail(0), Head(0), End(0)
00048     {
00049         pBuffer = (UByte*)OVR_ALLOC_ALIGNED(roundUpSize(size), AlignSize);
00050     }
00051     ~CircularBuffer()
00052     {
00053         // For ThreadCommands, we must consume everything before shutdown.
00054         OVR_ASSERT(IsEmpty());
00055         OVR_FREE_ALIGNED(pBuffer);
00056     }
00057 
00058     bool    IsEmpty() const { return (Head == Tail); }
00059 
00060     // Allocates a state block of specified size and advances pointers,
00061     // returning 0 if buffer is full.
00062     UByte*  Write(UPInt size);
00063 
00064     // Returns a pointer to next available data block; 0 if none available.
00065     UByte*  ReadBegin()
00066     { return (Head != Tail) ? (pBuffer + Tail) : 0; }
00067     // Consumes data of specified size; this must match size passed to Write.
00068     void    ReadEnd(UPInt size);
00069 };
00070 
00071 
00072 // Allocates a state block of specified size and advances pointers,
00073 // returning 0 if buffer is full.
00074 UByte* CircularBuffer::Write(UPInt size)
00075 {
00076     UByte* p = 0;
00077 
00078     size = roundUpSize(size);
00079     // Since this is circular buffer, always allow at least one item.
00080     OVR_ASSERT(size < Size/2);
00081 
00082     if (Head >= Tail)
00083     {
00084         OVR_ASSERT(End == 0);
00085         
00086         if (size <= (Size - Head))
00087         {
00088             p    = pBuffer + Head;
00089             Head += size;
00090         }
00091         else if (size < Tail)
00092         {
00093             p    = pBuffer;
00094             End  = Head;
00095             Head = size;
00096             OVR_ASSERT(Head != Tail);
00097         }
00098     }
00099     else
00100     {
00101         OVR_ASSERT(End != 0);
00102 
00103         if ((Tail - Head) > size)
00104         {
00105             p    = pBuffer + Head;
00106             Head += size;
00107             OVR_ASSERT(Head != Tail);
00108         }
00109     }
00110 
00111     return p;
00112 }
00113 
00114 void CircularBuffer::ReadEnd(UPInt size)
00115 {
00116     OVR_ASSERT(Head != Tail);
00117     size = roundUpSize(size);
00118     
00119     Tail += size;        
00120     if (Tail == End)
00121     {
00122         Tail = End = 0;
00123     }
00124     else if (Tail == Head)
00125     {        
00126         OVR_ASSERT(End == 0);
00127         Tail = Head = 0;
00128     }
00129 }
00130 
00131 
00132 //-------------------------------------------------------------------------------------
00133 // ***** ThreadCommand
00134 
00135 ThreadCommand::PopBuffer::~PopBuffer()
00136 {
00137     if (Size)
00138         Destruct<ThreadCommand>(toCommand());
00139 }
00140 
00141 void ThreadCommand::PopBuffer::InitFromBuffer(void* data)
00142 {
00143     ThreadCommand* cmd = (ThreadCommand*)data;
00144     OVR_ASSERT(cmd->Size <= MaxSize);
00145 
00146     if (Size)
00147         Destruct<ThreadCommand>(toCommand());    
00148     Size = cmd->Size;    
00149     memcpy(Buffer, (void*)cmd, Size);
00150 }
00151 
00152 void ThreadCommand::PopBuffer::Execute()
00153 {
00154     ThreadCommand* command = toCommand();
00155     OVR_ASSERT(command);
00156     command->Execute();
00157     if (NeedsWait())
00158         GetEvent()->PulseEvent();
00159 }
00160 
00161 //-------------------------------------------------------------------------------------
00162 
00163 class ThreadCommandQueueImpl : public NewOverrideBase
00164 {
00165     typedef ThreadCommand::NotifyEvent NotifyEvent;
00166     friend class ThreadCommandQueue;
00167     
00168 public:
00169 
00170     ThreadCommandQueueImpl(ThreadCommandQueue* queue)
00171         : pQueue(queue), CommandBuffer(2048),
00172           ExitEnqueued(false), ExitProcessed(false)
00173     {
00174     }
00175     ~ThreadCommandQueueImpl();
00176 
00177 
00178     bool PushCommand(const ThreadCommand& command);
00179     bool PopCommand(ThreadCommand::PopBuffer* popBuffer);
00180 
00181 
00182     // ExitCommand is used by notify us that Thread is shutting down.
00183     struct ExitCommand : public ThreadCommand
00184     {
00185         ThreadCommandQueueImpl* pImpl;
00186         
00187         ExitCommand(ThreadCommandQueueImpl* impl, bool wait)
00188             : ThreadCommand(sizeof(ExitCommand), wait, true), pImpl(impl) { }
00189 
00190         virtual void Execute() const
00191         {
00192             Lock::Locker lock(&pImpl->QueueLock);
00193             pImpl->ExitProcessed = true;
00194         }
00195         virtual ThreadCommand* CopyConstruct(void* p) const 
00196         { return Construct<ExitCommand>(p, *this); }
00197     };
00198 
00199 
00200     NotifyEvent* AllocNotifyEvent_NTS()
00201     {
00202         NotifyEvent* p = AvailableEvents.GetFirst();
00203 
00204         if (!AvailableEvents.IsNull(p))
00205             p->RemoveNode();        
00206         else
00207             p = new NotifyEvent;
00208         return p;
00209     }
00210 
00211     void         FreeNotifyEvent_NTS(NotifyEvent* p)
00212     {
00213         AvailableEvents.PushBack(p);
00214     }
00215 
00216     void        FreeNotifyEvents_NTS()
00217     {
00218         while(!AvailableEvents.IsEmpty())
00219         {
00220             NotifyEvent* p = AvailableEvents.GetFirst();
00221             p->RemoveNode();
00222             delete p;
00223         }
00224     }
00225 
00226     ThreadCommandQueue* pQueue;
00227     Lock                QueueLock;
00228     volatile bool       ExitEnqueued;
00229     volatile bool       ExitProcessed;
00230     List<NotifyEvent>   AvailableEvents;
00231     List<NotifyEvent>   BlockedProducers;
00232     CircularBuffer      CommandBuffer;
00233 };
00234 
00235 
00236 
00237 ThreadCommandQueueImpl::~ThreadCommandQueueImpl()
00238 {
00239     Lock::Locker lock(&QueueLock);
00240     OVR_ASSERT(BlockedProducers.IsEmpty());
00241     FreeNotifyEvents_NTS();
00242 }
00243 
00244 bool ThreadCommandQueueImpl::PushCommand(const ThreadCommand& command)
00245 {
00246     ThreadCommand::NotifyEvent* completeEvent = 0;
00247     ThreadCommand::NotifyEvent* queueAvailableEvent = 0;
00248 
00249     // Repeat  writing command into buffer until it is available.    
00250     do {
00251 
00252         { // Lock Scope
00253             Lock::Locker lock(&QueueLock);
00254 
00255             if (queueAvailableEvent)
00256             {
00257                 FreeNotifyEvent_NTS(queueAvailableEvent);
00258                 queueAvailableEvent = 0;
00259             }
00260 
00261             // Don't allow any commands after PushExitCommand() is called.
00262             if (ExitEnqueued && !command.ExitFlag)
00263                 return false;
00264 
00265 
00266             bool   bufferWasEmpty = CommandBuffer.IsEmpty();
00267             UByte* buffer = CommandBuffer.Write(command.GetSize());
00268             if  (buffer)
00269             {
00270                 ThreadCommand* c = command.CopyConstruct(buffer);
00271                 if (c->NeedsWait())
00272                     completeEvent = c->pEvent = AllocNotifyEvent_NTS();
00273                 // Signal-waker consumer when we add data to buffer.
00274                 if (bufferWasEmpty)
00275                     pQueue->OnPushNonEmpty_Locked();
00276                 break;
00277             }
00278 
00279             queueAvailableEvent = AllocNotifyEvent_NTS();
00280             BlockedProducers.PushBack(queueAvailableEvent);
00281         } // Lock Scope
00282 
00283         queueAvailableEvent->Wait();
00284 
00285     } while(1);
00286 
00287     // Command was enqueued, wait if necessary.
00288     if (completeEvent)
00289     {
00290         completeEvent->Wait();
00291         Lock::Locker lock(&QueueLock);
00292         FreeNotifyEvent_NTS(completeEvent);
00293     }
00294 
00295     return true;
00296 }
00297 
00298 
00299 // Pops the next command from the thread queue, if any is available.
00300 bool ThreadCommandQueueImpl::PopCommand(ThreadCommand::PopBuffer* popBuffer)
00301 {    
00302     Lock::Locker lock(&QueueLock);
00303 
00304     UByte* buffer = CommandBuffer.ReadBegin();
00305     if (!buffer)
00306     {
00307         // Notify thread while in lock scope, enabling initialization of wait.
00308         pQueue->OnPopEmpty_Locked();
00309         return false;
00310     }
00311 
00312     popBuffer->InitFromBuffer(buffer);
00313     CommandBuffer.ReadEnd(popBuffer->GetSize());
00314 
00315     if (!BlockedProducers.IsEmpty())
00316     {
00317         ThreadCommand::NotifyEvent* queueAvailableEvent = BlockedProducers.GetFirst();
00318         queueAvailableEvent->RemoveNode();
00319         queueAvailableEvent->PulseEvent();
00320         // Event is freed later by waiter.
00321     }    
00322     return true;
00323 }
00324 
00325 
00326 //-------------------------------------------------------------------------------------
00327 
00328 ThreadCommandQueue::ThreadCommandQueue()
00329 {
00330     pImpl = new ThreadCommandQueueImpl(this);
00331 }
00332 ThreadCommandQueue::~ThreadCommandQueue()
00333 {
00334     delete pImpl;
00335 }
00336 
00337 bool ThreadCommandQueue::PushCommand(const ThreadCommand& command)
00338 {
00339     return pImpl->PushCommand(command);
00340 }
00341 
00342 bool ThreadCommandQueue::PopCommand(ThreadCommand::PopBuffer* popBuffer)
00343 {    
00344     return pImpl->PopCommand(popBuffer);
00345 }
00346 
00347 void ThreadCommandQueue::PushExitCommand(bool wait)
00348 {
00349     // Exit is processed in two stages:
00350     //  - First, ExitEnqueued flag is set to block further commands from queuing up.
00351     //  - Second, the actual exit call is processed on the consumer thread, flushing
00352     //    any prior commands.
00353     //    IsExiting() only returns true after exit has flushed.
00354     {
00355         Lock::Locker lock(&pImpl->QueueLock);
00356         if (pImpl->ExitEnqueued)
00357             return;
00358         pImpl->ExitEnqueued = true;
00359     }
00360 
00361     PushCommand(ThreadCommandQueueImpl::ExitCommand(pImpl, wait));
00362 }
00363 
00364 bool ThreadCommandQueue::IsExiting() const
00365 {
00366     return pImpl->ExitProcessed;
00367 }
00368 
00369 
00370 } // namespace OVR


oculus_sdk
Author(s):
autogenerated on Mon Oct 6 2014 03:01:19