00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035
00036 #include "stream.h"
00037
00038 #include "gentl_wrapper.h"
00039 #include "exception.h"
00040 #include "cport.h"
00041
00042 #include <iostream>
00043 #include <algorithm>
00044
00045 #ifdef _WIN32
00046 #undef min
00047 #undef max
00048 #endif
00049
00050 namespace rcg
00051 {
00052
00053 Stream::Stream(const std::shared_ptr<Device> &_parent,
00054 const std::shared_ptr<const GenTLWrapper> &_gentl, const char *_id) :
00055 buffer(_gentl, this)
00056 {
00057 parent=_parent;
00058 gentl=_gentl;
00059 id=_id;
00060
00061 n_open=0;
00062 stream=0;
00063 event=0;
00064 bn=0;
00065 }
00066
00067 Stream::~Stream()
00068 {
00069 try
00070 {
00071 stopStreaming();
00072
00073 if (stream != 0)
00074 {
00075 gentl->DSClose(stream);
00076 }
00077 }
00078 catch (...)
00079 { }
00080 }
00081
00082 std::shared_ptr<Device> Stream::getParent() const
00083 {
00084 return parent;
00085 }
00086
00087 const std::string &Stream::getID() const
00088 {
00089 return id;
00090 }
00091
00092 void Stream::open()
00093 {
00094 std::lock_guard<std::recursive_mutex> lock(mtx);
00095
00096 if (n_open == 0)
00097 {
00098 if (parent->getHandle() != 0)
00099 {
00100 if (gentl->DevOpenDataStream(parent->getHandle(), id.c_str(), &stream) !=
00101 GenTL::GC_ERR_SUCCESS)
00102 {
00103 throw GenTLException("Stream::open()", gentl);
00104 }
00105 }
00106 else
00107 {
00108 throw GenTLException("Stream::open(): Device must be opened before open before opening a stream");
00109 }
00110 }
00111
00112 n_open++;
00113 }
00114
00115 void Stream::close()
00116 {
00117 std::lock_guard<std::recursive_mutex> lock(mtx);
00118
00119 if (n_open > 0)
00120 {
00121 n_open--;
00122 }
00123
00124 if (n_open == 0)
00125 {
00126 stopStreaming();
00127 gentl->DSClose(stream);
00128 stream=0;
00129
00130 nodemap=0;
00131 cport=0;
00132 }
00133 }
00134
00135 void Stream::startStreaming(int na)
00136 {
00137 std::lock_guard<std::recursive_mutex> lock(mtx);
00138
00139 buffer.setHandle(0);
00140
00141 if (stream == 0)
00142 {
00143 throw GenTLException("Stream::startStreaming(): Stream is not open");
00144 }
00145
00146
00147
00148 if (bn > 0)
00149 {
00150 stopStreaming();
00151 }
00152
00153
00154
00155 size_t size=0;
00156 if (getDefinesPayloadsize())
00157 {
00158 size=getPayloadSize();
00159 }
00160 else
00161 {
00162 std::shared_ptr<GenApi::CNodeMapRef> nmap=parent->getRemoteNodeMap();
00163 GenApi::IInteger *p=dynamic_cast<GenApi::IInteger *>(nmap->_GetNode("PayloadSize"));
00164
00165 if (GenApi::IsReadable(p))
00166 {
00167 size=static_cast<size_t>(p->GetValue());
00168 }
00169 }
00170
00171
00172
00173 bool err=false;
00174
00175 bn=std::max(static_cast<size_t>(8), getBufAnnounceMin());
00176 for (size_t i=0; i<bn; i++)
00177 {
00178 GenTL::BUFFER_HANDLE p=0;
00179
00180 if (gentl->DSAllocAndAnnounceBuffer(stream, size, 0, &p) != GenTL::GC_ERR_SUCCESS)
00181 {
00182 err=true;
00183 break;
00184 }
00185
00186 if (!err && gentl->DSQueueBuffer(stream, p) != GenTL::GC_ERR_SUCCESS)
00187 {
00188 err=true;
00189 break;
00190 }
00191 }
00192
00193
00194
00195 if (!err && gentl->GCRegisterEvent(stream, GenTL::EVENT_NEW_BUFFER, &event) !=
00196 GenTL::GC_ERR_SUCCESS)
00197 {
00198 err=true;
00199 }
00200
00201
00202
00203 uint64_t n=GENTL_INFINITE;
00204
00205 if (na > 0)
00206 {
00207 n=static_cast<uint64_t>(na);
00208 }
00209
00210 if (!err && gentl->DSStartAcquisition(stream, GenTL::ACQ_START_FLAGS_DEFAULT, n) !=
00211 GenTL::GC_ERR_SUCCESS)
00212 {
00213 gentl->GCUnregisterEvent(stream, GenTL::EVENT_NEW_BUFFER);
00214 err=true;
00215 }
00216
00217 if (!err)
00218 {
00219 GenApi::CCommandPtr start=parent->getRemoteNodeMap()->_GetNode("AcquisitionStart");
00220 start->Execute();
00221 }
00222
00223
00224
00225 if (err)
00226 {
00227 gentl->DSFlushQueue(stream, GenTL::ACQ_QUEUE_ALL_DISCARD);
00228
00229 GenTL::BUFFER_HANDLE p=0;
00230 while (gentl->DSGetBufferID(stream, 0, &p) == GenTL::GC_ERR_SUCCESS)
00231 {
00232 gentl->DSRevokeBuffer(stream, p, 0, 0);
00233 }
00234
00235 throw GenTLException("Stream::startStreaming()", gentl);
00236 }
00237 }
00238
00239 void Stream::stopStreaming()
00240 {
00241 std::lock_guard<std::recursive_mutex> lock(mtx);
00242
00243 if (bn > 0)
00244 {
00245 buffer.setHandle(0);
00246
00247
00248
00249 GenApi::CCommandPtr stop=parent->getRemoteNodeMap()->_GetNode("AcquisitionStop");
00250 stop->Execute();
00251
00252 gentl->DSStopAcquisition(stream, GenTL::ACQ_STOP_FLAGS_DEFAULT);
00253 gentl->GCUnregisterEvent(stream, GenTL::EVENT_NEW_BUFFER);
00254 gentl->DSFlushQueue(stream, GenTL::ACQ_QUEUE_ALL_DISCARD);
00255
00256
00257
00258 GenTL::BUFFER_HANDLE p=0;
00259 while (gentl->DSGetBufferID(stream, 0, &p) == GenTL::GC_ERR_SUCCESS)
00260 {
00261 gentl->DSRevokeBuffer(stream, p, 0, 0);
00262 }
00263
00264 event=0;
00265 bn=0;
00266 }
00267 }
00268
00269 const Buffer *Stream::grab(int64_t _timeout)
00270 {
00271 std::lock_guard<std::recursive_mutex> lock(mtx);
00272
00273 uint64_t timeout=GENTL_INFINITE;
00274 if (_timeout >= 0)
00275 {
00276 timeout=static_cast<uint64_t>(_timeout);
00277 }
00278
00279
00280
00281 if (bn == 0 && event == 0)
00282 {
00283 throw GenTLException("Streaming::grab(): Streaming not started");
00284 }
00285
00286
00287
00288 if (buffer.getHandle() != 0)
00289 {
00290 if (gentl->DSQueueBuffer(stream, buffer.getHandle()) != GenTL::GC_ERR_SUCCESS)
00291 {
00292 buffer.setHandle(0);
00293 throw GenTLException("Stream::grab()", gentl);
00294 }
00295
00296 buffer.setHandle(0);
00297 }
00298
00299
00300
00301 GenTL::EVENT_NEW_BUFFER_DATA data;
00302 size_t size=sizeof(GenTL::EVENT_NEW_BUFFER_DATA);
00303 memset(&data, 0, size);
00304
00305 GenTL::GC_ERROR err=gentl->EventGetData(event, &data, &size, timeout);
00306
00307
00308
00309
00310 if (err == GenTL::GC_ERR_ABORT || err == GenTL::GC_ERR_TIMEOUT)
00311 {
00312 return 0;
00313 }
00314 else if (err != GenTL::GC_ERR_SUCCESS)
00315 {
00316 throw GenTLException("Stream::grab()", gentl);
00317 }
00318
00319
00320
00321 buffer.setHandle(data.BufferHandle);
00322
00323 return &buffer;
00324 }
00325
00326 namespace
00327 {
00328
00329 template<class T> inline T getStreamValue(const std::shared_ptr<const GenTLWrapper> &gentl,
00330 void *stream, GenTL::STREAM_INFO_CMD cmd)
00331 {
00332 T ret=0;
00333
00334 GenTL::INFO_DATATYPE type;
00335 size_t size=sizeof(T);
00336
00337 if (stream != 0)
00338 {
00339 gentl->DSGetInfo(stream, cmd, &type, &ret, &size);
00340 }
00341
00342 return ret;
00343 }
00344
00345 inline bool getStreamBool(const std::shared_ptr<const GenTLWrapper> &gentl,
00346 void *stream, GenTL::STREAM_INFO_CMD cmd)
00347 {
00348 bool8_t ret=0;
00349
00350 GenTL::INFO_DATATYPE type;
00351 size_t size=sizeof(ret);
00352
00353 if (stream != 0)
00354 {
00355 gentl->DSGetInfo(stream, cmd, &type, &ret, &size);
00356 }
00357
00358 return ret != 0;
00359 }
00360
00361 }
00362
00363 uint64_t Stream::getNumDelivered()
00364 {
00365 std::lock_guard<std::recursive_mutex> lock(mtx);
00366 return getStreamValue<uint64_t>(gentl, stream, GenTL::STREAM_INFO_NUM_DELIVERED);
00367 }
00368
00369 uint64_t Stream::getNumUnderrun()
00370 {
00371 std::lock_guard<std::recursive_mutex> lock(mtx);
00372 return getStreamValue<uint64_t>(gentl, stream, GenTL::STREAM_INFO_NUM_UNDERRUN);
00373 }
00374
00375 size_t Stream::getNumAnnounced()
00376 {
00377 std::lock_guard<std::recursive_mutex> lock(mtx);
00378 return getStreamValue<size_t>(gentl, stream, GenTL::STREAM_INFO_NUM_ANNOUNCED);
00379 }
00380
00381 size_t Stream::getNumQueued()
00382 {
00383 std::lock_guard<std::recursive_mutex> lock(mtx);
00384 return getStreamValue<size_t>(gentl, stream, GenTL::STREAM_INFO_NUM_QUEUED);
00385 }
00386
00387 size_t Stream::getNumAwaitDelivery()
00388 {
00389 std::lock_guard<std::recursive_mutex> lock(mtx);
00390 return getStreamValue<size_t>(gentl, stream, GenTL::STREAM_INFO_NUM_AWAIT_DELIVERY);
00391 }
00392
00393 uint64_t Stream::getNumStarted()
00394 {
00395 std::lock_guard<std::recursive_mutex> lock(mtx);
00396 return getStreamValue<uint64_t>(gentl, stream, GenTL::STREAM_INFO_NUM_STARTED);
00397 }
00398
00399 size_t Stream::getPayloadSize()
00400 {
00401 std::lock_guard<std::recursive_mutex> lock(mtx);
00402 return getStreamValue<size_t>(gentl, stream, GenTL::STREAM_INFO_PAYLOAD_SIZE);
00403 }
00404
00405 bool Stream::getIsGrabbing()
00406 {
00407 std::lock_guard<std::recursive_mutex> lock(mtx);
00408 return getStreamBool(gentl, stream, GenTL::STREAM_INFO_IS_GRABBING);
00409 }
00410
00411 bool Stream::getDefinesPayloadsize()
00412 {
00413 std::lock_guard<std::recursive_mutex> lock(mtx);
00414 return getStreamBool(gentl, stream, GenTL::STREAM_INFO_DEFINES_PAYLOADSIZE);
00415 }
00416
00417 std::string Stream::getTLType()
00418 {
00419 std::lock_guard<std::recursive_mutex> lock(mtx);
00420 std::string ret;
00421
00422 GenTL::INFO_DATATYPE type;
00423 char tmp[1024]="";
00424 size_t size=sizeof(tmp);
00425
00426 if (stream != 0)
00427 {
00428 if (gentl->DSGetInfo(stream, GenTL::STREAM_INFO_TLTYPE, &type, &ret, &size) ==
00429 GenTL::GC_ERR_SUCCESS)
00430 {
00431 if (type == GenTL::INFO_DATATYPE_STRING)
00432 {
00433 ret=tmp;
00434 }
00435 }
00436 }
00437
00438 return ret;
00439 }
00440
00441 size_t Stream::getNumChunksMax()
00442 {
00443 std::lock_guard<std::recursive_mutex> lock(mtx);
00444 return getStreamValue<size_t>(gentl, stream, GenTL::STREAM_INFO_NUM_CHUNKS_MAX);
00445 }
00446
00447 size_t Stream::getBufAnnounceMin()
00448 {
00449 std::lock_guard<std::recursive_mutex> lock(mtx);
00450 return getStreamValue<size_t>(gentl, stream, GenTL::STREAM_INFO_BUF_ANNOUNCE_MIN);
00451 }
00452
00453 size_t Stream::getBufAlignment()
00454 {
00455 std::lock_guard<std::recursive_mutex> lock(mtx);
00456 return getStreamValue<size_t>(gentl, stream, GenTL::STREAM_INFO_BUF_ALIGNMENT);
00457 }
00458
00459 std::shared_ptr<GenApi::CNodeMapRef> Stream::getNodeMap()
00460 {
00461 std::lock_guard<std::recursive_mutex> lock(mtx);
00462 if (stream != 0 && !nodemap)
00463 {
00464 cport=std::shared_ptr<CPort>(new CPort(gentl, &stream));
00465 nodemap=allocNodeMap(gentl, stream, cport.get());
00466 }
00467
00468 return nodemap;
00469 }
00470
00471 void *Stream::getHandle() const
00472 {
00473 return stream;
00474 }
00475
00476 }