Go to the documentation of this file.00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016 #include "OVR_ThreadCommandQueue.h"
00017
00018 namespace OVR {
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028 class CircularBuffer
00029 {
00030 enum {
00031 AlignSize = 16,
00032 AlignMask = AlignSize - 1
00033 };
00034
00035 UByte* pBuffer;
00036 UPInt Size;
00037 UPInt Tail;
00038 UPInt Head;
00039 UPInt End;
00040
00041 inline UPInt roundUpSize(UPInt size)
00042 { return (size + AlignMask) & ~(UPInt)AlignMask; }
00043
00044 public:
00045
00046 CircularBuffer(UPInt size)
00047 : Size(size), Tail(0), Head(0), End(0)
00048 {
00049 pBuffer = (UByte*)OVR_ALLOC_ALIGNED(roundUpSize(size), AlignSize);
00050 }
00051 ~CircularBuffer()
00052 {
00053
00054 OVR_ASSERT(IsEmpty());
00055 OVR_FREE_ALIGNED(pBuffer);
00056 }
00057
00058 bool IsEmpty() const { return (Head == Tail); }
00059
00060
00061
00062 UByte* Write(UPInt size);
00063
00064
00065 UByte* ReadBegin()
00066 { return (Head != Tail) ? (pBuffer + Tail) : 0; }
00067
00068 void ReadEnd(UPInt size);
00069 };
00070
00071
00072
00073
00074 UByte* CircularBuffer::Write(UPInt size)
00075 {
00076 UByte* p = 0;
00077
00078 size = roundUpSize(size);
00079
00080 OVR_ASSERT(size < Size/2);
00081
00082 if (Head >= Tail)
00083 {
00084 OVR_ASSERT(End == 0);
00085
00086 if (size <= (Size - Head))
00087 {
00088 p = pBuffer + Head;
00089 Head += size;
00090 }
00091 else if (size < Tail)
00092 {
00093 p = pBuffer;
00094 End = Head;
00095 Head = size;
00096 OVR_ASSERT(Head != Tail);
00097 }
00098 }
00099 else
00100 {
00101 OVR_ASSERT(End != 0);
00102
00103 if ((Tail - Head) > size)
00104 {
00105 p = pBuffer + Head;
00106 Head += size;
00107 OVR_ASSERT(Head != Tail);
00108 }
00109 }
00110
00111 return p;
00112 }
00113
00114 void CircularBuffer::ReadEnd(UPInt size)
00115 {
00116 OVR_ASSERT(Head != Tail);
00117 size = roundUpSize(size);
00118
00119 Tail += size;
00120 if (Tail == End)
00121 {
00122 Tail = End = 0;
00123 }
00124 else if (Tail == Head)
00125 {
00126 OVR_ASSERT(End == 0);
00127 Tail = Head = 0;
00128 }
00129 }
00130
00131
00132
00133
00134
00135 ThreadCommand::PopBuffer::~PopBuffer()
00136 {
00137 if (Size)
00138 Destruct<ThreadCommand>(toCommand());
00139 }
00140
00141 void ThreadCommand::PopBuffer::InitFromBuffer(void* data)
00142 {
00143 ThreadCommand* cmd = (ThreadCommand*)data;
00144 OVR_ASSERT(cmd->Size <= MaxSize);
00145
00146 if (Size)
00147 Destruct<ThreadCommand>(toCommand());
00148 Size = cmd->Size;
00149 memcpy(Buffer, (void*)cmd, Size);
00150 }
00151
00152 void ThreadCommand::PopBuffer::Execute()
00153 {
00154 ThreadCommand* command = toCommand();
00155 OVR_ASSERT(command);
00156 command->Execute();
00157 if (NeedsWait())
00158 GetEvent()->PulseEvent();
00159 }
00160
00161
00162
00163 class ThreadCommandQueueImpl : public NewOverrideBase
00164 {
00165 typedef ThreadCommand::NotifyEvent NotifyEvent;
00166 friend class ThreadCommandQueue;
00167
00168 public:
00169
00170 ThreadCommandQueueImpl(ThreadCommandQueue* queue)
00171 : pQueue(queue), CommandBuffer(2048),
00172 ExitEnqueued(false), ExitProcessed(false)
00173 {
00174 }
00175 ~ThreadCommandQueueImpl();
00176
00177
00178 bool PushCommand(const ThreadCommand& command);
00179 bool PopCommand(ThreadCommand::PopBuffer* popBuffer);
00180
00181
00182
00183 struct ExitCommand : public ThreadCommand
00184 {
00185 ThreadCommandQueueImpl* pImpl;
00186
00187 ExitCommand(ThreadCommandQueueImpl* impl, bool wait)
00188 : ThreadCommand(sizeof(ExitCommand), wait, true), pImpl(impl) { }
00189
00190 virtual void Execute() const
00191 {
00192 Lock::Locker lock(&pImpl->QueueLock);
00193 pImpl->ExitProcessed = true;
00194 }
00195 virtual ThreadCommand* CopyConstruct(void* p) const
00196 { return Construct<ExitCommand>(p, *this); }
00197 };
00198
00199
00200 NotifyEvent* AllocNotifyEvent_NTS()
00201 {
00202 NotifyEvent* p = AvailableEvents.GetFirst();
00203
00204 if (!AvailableEvents.IsNull(p))
00205 p->RemoveNode();
00206 else
00207 p = new NotifyEvent;
00208 return p;
00209 }
00210
00211 void FreeNotifyEvent_NTS(NotifyEvent* p)
00212 {
00213 AvailableEvents.PushBack(p);
00214 }
00215
00216 void FreeNotifyEvents_NTS()
00217 {
00218 while(!AvailableEvents.IsEmpty())
00219 {
00220 NotifyEvent* p = AvailableEvents.GetFirst();
00221 p->RemoveNode();
00222 delete p;
00223 }
00224 }
00225
00226 ThreadCommandQueue* pQueue;
00227 Lock QueueLock;
00228 volatile bool ExitEnqueued;
00229 volatile bool ExitProcessed;
00230 List<NotifyEvent> AvailableEvents;
00231 List<NotifyEvent> BlockedProducers;
00232 CircularBuffer CommandBuffer;
00233 };
00234
00235
00236
00237 ThreadCommandQueueImpl::~ThreadCommandQueueImpl()
00238 {
00239 Lock::Locker lock(&QueueLock);
00240 OVR_ASSERT(BlockedProducers.IsEmpty());
00241 FreeNotifyEvents_NTS();
00242 }
00243
00244 bool ThreadCommandQueueImpl::PushCommand(const ThreadCommand& command)
00245 {
00246 ThreadCommand::NotifyEvent* completeEvent = 0;
00247 ThreadCommand::NotifyEvent* queueAvailableEvent = 0;
00248
00249
00250 do {
00251
00252 {
00253 Lock::Locker lock(&QueueLock);
00254
00255 if (queueAvailableEvent)
00256 {
00257 FreeNotifyEvent_NTS(queueAvailableEvent);
00258 queueAvailableEvent = 0;
00259 }
00260
00261
00262 if (ExitEnqueued && !command.ExitFlag)
00263 return false;
00264
00265
00266 bool bufferWasEmpty = CommandBuffer.IsEmpty();
00267 UByte* buffer = CommandBuffer.Write(command.GetSize());
00268 if (buffer)
00269 {
00270 ThreadCommand* c = command.CopyConstruct(buffer);
00271 if (c->NeedsWait())
00272 completeEvent = c->pEvent = AllocNotifyEvent_NTS();
00273
00274 if (bufferWasEmpty)
00275 pQueue->OnPushNonEmpty_Locked();
00276 break;
00277 }
00278
00279 queueAvailableEvent = AllocNotifyEvent_NTS();
00280 BlockedProducers.PushBack(queueAvailableEvent);
00281 }
00282
00283 queueAvailableEvent->Wait();
00284
00285 } while(1);
00286
00287
00288 if (completeEvent)
00289 {
00290 completeEvent->Wait();
00291 Lock::Locker lock(&QueueLock);
00292 FreeNotifyEvent_NTS(completeEvent);
00293 }
00294
00295 return true;
00296 }
00297
00298
00299
00300 bool ThreadCommandQueueImpl::PopCommand(ThreadCommand::PopBuffer* popBuffer)
00301 {
00302 Lock::Locker lock(&QueueLock);
00303
00304 UByte* buffer = CommandBuffer.ReadBegin();
00305 if (!buffer)
00306 {
00307
00308 pQueue->OnPopEmpty_Locked();
00309 return false;
00310 }
00311
00312 popBuffer->InitFromBuffer(buffer);
00313 CommandBuffer.ReadEnd(popBuffer->GetSize());
00314
00315 if (!BlockedProducers.IsEmpty())
00316 {
00317 ThreadCommand::NotifyEvent* queueAvailableEvent = BlockedProducers.GetFirst();
00318 queueAvailableEvent->RemoveNode();
00319 queueAvailableEvent->PulseEvent();
00320
00321 }
00322 return true;
00323 }
00324
00325
00326
00327
00328 ThreadCommandQueue::ThreadCommandQueue()
00329 {
00330 pImpl = new ThreadCommandQueueImpl(this);
00331 }
00332 ThreadCommandQueue::~ThreadCommandQueue()
00333 {
00334 delete pImpl;
00335 }
00336
00337 bool ThreadCommandQueue::PushCommand(const ThreadCommand& command)
00338 {
00339 return pImpl->PushCommand(command);
00340 }
00341
00342 bool ThreadCommandQueue::PopCommand(ThreadCommand::PopBuffer* popBuffer)
00343 {
00344 return pImpl->PopCommand(popBuffer);
00345 }
00346
00347 void ThreadCommandQueue::PushExitCommand(bool wait)
00348 {
00349
00350
00351
00352
00353
00354 {
00355 Lock::Locker lock(&pImpl->QueueLock);
00356 if (pImpl->ExitEnqueued)
00357 return;
00358 pImpl->ExitEnqueued = true;
00359 }
00360
00361 PushCommand(ThreadCommandQueueImpl::ExitCommand(pImpl, wait));
00362 }
00363
00364 bool ThreadCommandQueue::IsExiting() const
00365 {
00366 return pImpl->ExitProcessed;
00367 }
00368
00369
00370 }