30 template <typename T, std::enable_if_t<std::is_same<T, bool>::value,
bool>>
52 template <typename T, std::enable_if_t<std::is_same<T, bool>::value,
bool>>
90 throw std::runtime_error(fmt::format(
"Queue for stream name '{}' doesn't exist", name));
96 std::shared_ptr<DataOutputQueue>
Device::getOutputQueue(
const std::string& name,
unsigned int maxSize,
bool blocking) {
100 throw std::runtime_error(fmt::format(
"Queue for stream name '{}' doesn't exist", name));
112 std::vector<std::string> names;
115 names.push_back(kv.first);
124 throw std::runtime_error(fmt::format(
"Queue for stream name '{}' doesn't exist", name));
130 std::shared_ptr<DataInputQueue>
Device::getInputQueue(
const std::string& name,
unsigned int maxSize,
bool blocking) {
134 throw std::runtime_error(fmt::format(
"Queue for stream name '{}' doesn't exist", name));
146 std::vector<std::string> names;
149 names.push_back(kv.first);
164 std::vector<std::string>
Device::getQueueEvents(
const std::vector<std::string>& queueNames, std::size_t maxNumEvents, std::chrono::microseconds timeout) {
167 for(
const auto& outputQueue : queueNames) {
169 for(
const auto& availableQueueName : availableQueueNames) {
170 if(outputQueue == availableQueueName) {
175 if(!found)
throw std::runtime_error(fmt::format(
"Queue with name '{}' doesn't exist", outputQueue));
180 std::unique_lock<std::mutex> lock(
eventMtx);
183 std::vector<std::string> eventsFromQueue;
185 auto predicate = [
this, &queueNames, &eventsFromQueue, &maxNumEvents]() {
187 bool wasRemoved =
false;
188 for(
const auto& name : queueNames) {
191 eventsFromQueue.push_back(name);
196 if(eventsFromQueue.size() >= maxNumEvents) {
204 if(!wasRemoved) ++it;
207 if(eventsFromQueue.empty())
return false;
212 if(timeout < std::chrono::microseconds(0)) {
217 eventCv.wait_for(lock, timeout, predicate);
221 return eventsFromQueue;
225 std::size_t maxNumEvents,
226 std::chrono::microseconds timeout) {
227 return getQueueEvents(std::vector<std::string>(queueNames), maxNumEvents, timeout);
230 std::vector<std::string>
Device::getQueueEvents(std::string queueName, std::size_t maxNumEvents, std::chrono::microseconds timeout) {
231 return getQueueEvents(std::vector<std::string>{queueName}, maxNumEvents, timeout);
240 if(events.empty())
return "";
243 std::string
Device::getQueueEvent(
const std::initializer_list<std::string>& queueNames, std::chrono::microseconds timeout) {
244 return getQueueEvent(std::vector<std::string>{queueNames}, timeout);
248 return getQueueEvent(std::vector<std::string>{queueName}, timeout);
260 const auto& node = kv.second;
261 const auto& xlinkIn = std::dynamic_pointer_cast<const node::XLinkIn>(node);
262 if(xlinkIn ==
nullptr) {
267 auto streamName = xlinkIn->getStreamName();
268 if(
inputQueueMap.count(streamName) != 0)
throw std::invalid_argument(fmt::format(
"Streams have duplicate name '{}'", streamName));
270 inputQueueMap[std::move(streamName)] = std::make_shared<DataInputQueue>(
connection, xlinkIn->getStreamName(), 16,
true, xlinkIn->getMaxDataSize());
273 const auto& node = kv.second;
274 const auto& xlinkOut = std::dynamic_pointer_cast<const node::XLinkOut>(node);
275 if(xlinkOut ==
nullptr) {
280 auto streamName = xlinkOut->getStreamName();
281 if(
outputQueueMap.count(streamName) != 0)
throw std::invalid_argument(fmt::format(
"Streams have duplicate name '{}'", streamName));
286 outputQueueMap[xlinkOut->getStreamName()]->addCallback([
this](std::string queueName, std::shared_ptr<ADatatype>) {
289 std::unique_lock<std::mutex> lock(
eventMtx);