Device.cpp
Go to the documentation of this file.
2 
3 // std
4 #include <iostream>
5 
6 // shared
11 
12 // project
13 #include "DeviceLogger.hpp"
17 #include "pipeline/Pipeline.hpp"
19 #include "utility/Resources.hpp"
20 
21 namespace dai {
22 
23 // Common explicit instantiation, to remove the need to define in header
24 constexpr std::size_t Device::EVENT_QUEUE_MAXIMUM_SIZE;
25 
26 Device::Device(const Pipeline& pipeline) : DeviceBase(pipeline.getDeviceConfig()) {
27  tryStartPipeline(pipeline);
28 }
29 
30 template <typename T, std::enable_if_t<std::is_same<T, bool>::value, bool>>
31 Device::Device(const Pipeline& pipeline, T usb2Mode) : DeviceBase(pipeline.getDeviceConfig(), usb2Mode) {
32  tryStartPipeline(pipeline);
33 }
34 template Device::Device(const Pipeline&, bool);
35 
36 Device::Device(const Pipeline& pipeline, UsbSpeed maxUsbSpeed) : DeviceBase(pipeline.getDeviceConfig(), maxUsbSpeed) {
37  tryStartPipeline(pipeline);
38 }
39 
40 Device::Device(const Pipeline& pipeline, const dai::Path& pathToCmd) : DeviceBase(pipeline.getDeviceConfig(), pathToCmd) {
41  tryStartPipeline(pipeline);
42 }
43 
44 Device::Device(const Pipeline& pipeline, const DeviceInfo& devInfo) : DeviceBase(pipeline.getDeviceConfig(), devInfo) {
45  tryStartPipeline(pipeline);
46 }
47 
48 Device::Device(const Pipeline& pipeline, const DeviceInfo& devInfo, const dai::Path& pathToCmd) : DeviceBase(pipeline.getDeviceConfig(), devInfo, pathToCmd) {
49  tryStartPipeline(pipeline);
50 }
51 
52 template <typename T, std::enable_if_t<std::is_same<T, bool>::value, bool>>
53 Device::Device(const Pipeline& pipeline, const DeviceInfo& devInfo, T usb2Mode) : DeviceBase(pipeline.getDeviceConfig(), devInfo, usb2Mode) {
54  tryStartPipeline(pipeline);
55 }
56 template Device::Device(const Pipeline&, const DeviceInfo&, bool);
57 
58 Device::Device(const Pipeline& pipeline, const DeviceInfo& devInfo, UsbSpeed maxUsbSpeed) : DeviceBase(pipeline.getDeviceConfig(), devInfo, maxUsbSpeed) {
59  tryStartPipeline(pipeline);
60 }
61 
63 
66 }
67 
69  // Remove callbacks to this from queues
70  for(const auto& kv : callbackIdMap) {
71  outputQueueMap[kv.first]->removeCallback(kv.second);
72  }
73  // Clear map
74  callbackIdMap.clear();
75 
76  // Close the device before clearing the queues
78 
79  // Close and clear queues
80  for(auto& kv : outputQueueMap) kv.second->close();
81  for(auto& kv : inputQueueMap) kv.second->close();
82  outputQueueMap.clear();
83  inputQueueMap.clear();
84 }
85 
86 std::shared_ptr<DataOutputQueue> Device::getOutputQueue(const std::string& name) {
87  // Throw if queue not created
88  // all queues for xlink streams are created upfront
89  if(outputQueueMap.count(name) == 0) {
90  throw std::runtime_error(fmt::format("Queue for stream name '{}' doesn't exist", name));
91  }
92  // Return pointer to this DataQueue
93  return outputQueueMap.at(name);
94 }
95 
96 std::shared_ptr<DataOutputQueue> Device::getOutputQueue(const std::string& name, unsigned int maxSize, bool blocking) {
97  // Throw if queue not created
98  // all queues for xlink streams are created upfront
99  if(outputQueueMap.count(name) == 0) {
100  throw std::runtime_error(fmt::format("Queue for stream name '{}' doesn't exist", name));
101  }
102 
103  // Modify max size and blocking
104  outputQueueMap.at(name)->setMaxSize(maxSize);
105  outputQueueMap.at(name)->setBlocking(blocking);
106 
107  // Return pointer to this DataQueue
108  return outputQueueMap.at(name);
109 }
110 
111 std::vector<std::string> Device::getOutputQueueNames() const {
112  std::vector<std::string> names;
113  names.reserve(outputQueueMap.size());
114  for(const auto& kv : outputQueueMap) {
115  names.push_back(kv.first);
116  }
117  return names;
118 }
119 
120 std::shared_ptr<DataInputQueue> Device::getInputQueue(const std::string& name) {
121  // Throw if queue not created
122  // all queues for xlink streams are created upfront
123  if(inputQueueMap.count(name) == 0) {
124  throw std::runtime_error(fmt::format("Queue for stream name '{}' doesn't exist", name));
125  }
126  // Return pointer to this DataQueue
127  return inputQueueMap.at(name);
128 }
129 
130 std::shared_ptr<DataInputQueue> Device::getInputQueue(const std::string& name, unsigned int maxSize, bool blocking) {
131  // Throw if queue not created
132  // all queues for xlink streams are created upfront
133  if(inputQueueMap.count(name) == 0) {
134  throw std::runtime_error(fmt::format("Queue for stream name '{}' doesn't exist", name));
135  }
136 
137  // Modify max size and blocking
138  inputQueueMap.at(name)->setMaxSize(maxSize);
139  inputQueueMap.at(name)->setBlocking(blocking);
140 
141  // Return pointer to this DataQueue
142  return inputQueueMap.at(name);
143 }
144 
145 std::vector<std::string> Device::getInputQueueNames() const {
146  std::vector<std::string> names;
147  names.reserve(inputQueueMap.size());
148  for(const auto& kv : inputQueueMap) {
149  names.push_back(kv.first);
150  }
151  return names;
152 }
153 
154 // void Device::setCallback(const std::string& name, std::function<std::shared_ptr<RawBuffer>(std::shared_ptr<RawBuffer>)> cb) {
155 // // creates a CallbackHandler if not yet created
156 // if(callbackMap.count(name) == 0) {
157 // throw std::runtime_error(fmt::format("Queue for stream name '{}' doesn't exist", name));
158 // } else {
159 // // already exists, replace the callback
160 // callbackMap.at(name).setCallback(cb);
161 // }
162 // }
163 
164 std::vector<std::string> Device::getQueueEvents(const std::vector<std::string>& queueNames, std::size_t maxNumEvents, std::chrono::microseconds timeout) {
165  // First check if specified queues names are actually opened
166  auto availableQueueNames = getOutputQueueNames();
167  for(const auto& outputQueue : queueNames) {
168  bool found = false;
169  for(const auto& availableQueueName : availableQueueNames) {
170  if(outputQueue == availableQueueName) {
171  found = true;
172  break;
173  }
174  }
175  if(!found) throw std::runtime_error(fmt::format("Queue with name '{}' doesn't exist", outputQueue));
176  }
177 
178  // Blocking part
179  // lock eventMtx
180  std::unique_lock<std::mutex> lock(eventMtx);
181 
182  // Create temporary string which predicate will fill when it finds the event
183  std::vector<std::string> eventsFromQueue;
184  // wait until predicate
185  auto predicate = [this, &queueNames, &eventsFromQueue, &maxNumEvents]() {
186  for(auto it = eventQueue.begin(); it != eventQueue.end();) {
187  bool wasRemoved = false;
188  for(const auto& name : queueNames) {
189  if(name == *it) {
190  // found one of the events we have specified to wait for
191  eventsFromQueue.push_back(name);
192  // remove element from queue
193  it = eventQueue.erase(it);
194  wasRemoved = true;
195  // return and acknowledge the wait prematurelly, if reached maxnumevents
196  if(eventsFromQueue.size() >= maxNumEvents) {
197  return true;
198  }
199  // breaks as other queue names won't be same as this one
200  break;
201  }
202  }
203  // If element wasn't removed, move iterator forward, else it was already moved by erase call
204  if(!wasRemoved) ++it;
205  }
206  // After search, if no events were found, return false
207  if(eventsFromQueue.empty()) return false;
208  // Otherwise acknowledge the wait and exit
209  return true;
210  };
211 
212  if(timeout < std::chrono::microseconds(0)) {
213  // if timeout < 0, infinite wait time (no timeout)
214  eventCv.wait(lock, predicate);
215  } else {
216  // otherwise respect timeout
217  eventCv.wait_for(lock, timeout, predicate);
218  }
219 
220  // eventFromQueue should now contain the event name
221  return eventsFromQueue;
222 }
223 
224 std::vector<std::string> Device::getQueueEvents(const std::initializer_list<std::string>& queueNames,
225  std::size_t maxNumEvents,
226  std::chrono::microseconds timeout) {
227  return getQueueEvents(std::vector<std::string>(queueNames), maxNumEvents, timeout);
228 }
229 
230 std::vector<std::string> Device::getQueueEvents(std::string queueName, std::size_t maxNumEvents, std::chrono::microseconds timeout) {
231  return getQueueEvents(std::vector<std::string>{queueName}, maxNumEvents, timeout);
232 }
233 
234 std::vector<std::string> Device::getQueueEvents(std::size_t maxNumEvents, std::chrono::microseconds timeout) {
235  return getQueueEvents(getOutputQueueNames(), maxNumEvents, timeout);
236 }
237 
238 std::string Device::getQueueEvent(const std::vector<std::string>& queueNames, std::chrono::microseconds timeout) {
239  auto events = getQueueEvents(queueNames, 1, timeout);
240  if(events.empty()) return "";
241  return events[0];
242 }
243 std::string Device::getQueueEvent(const std::initializer_list<std::string>& queueNames, std::chrono::microseconds timeout) {
244  return getQueueEvent(std::vector<std::string>{queueNames}, timeout);
245 }
246 
247 std::string Device::getQueueEvent(std::string queueName, std::chrono::microseconds timeout) {
248  return getQueueEvent(std::vector<std::string>{queueName}, timeout);
249 }
250 
251 std::string Device::getQueueEvent(std::chrono::microseconds timeout) {
252  return getQueueEvent(getOutputQueueNames(), timeout);
253 }
254 
255 bool Device::startPipelineImpl(const Pipeline& pipeline) {
256  // Open queues upfront, let queues know about data sizes (input queues)
257  // Go through Pipeline and check for 'XLinkIn' and 'XLinkOut' nodes
258  // and create corresponding default queues for them
259  for(const auto& kv : pipeline.getNodeMap()) {
260  const auto& node = kv.second;
261  const auto& xlinkIn = std::dynamic_pointer_cast<const node::XLinkIn>(node);
262  if(xlinkIn == nullptr) {
263  continue;
264  }
265 
266  // Create DataInputQueue's
267  auto streamName = xlinkIn->getStreamName();
268  if(inputQueueMap.count(streamName) != 0) throw std::invalid_argument(fmt::format("Streams have duplicate name '{}'", streamName));
269  // set max data size, for more verbosity
270  inputQueueMap[std::move(streamName)] = std::make_shared<DataInputQueue>(connection, xlinkIn->getStreamName(), 16, true, xlinkIn->getMaxDataSize());
271  }
272  for(const auto& kv : pipeline.getNodeMap()) {
273  const auto& node = kv.second;
274  const auto& xlinkOut = std::dynamic_pointer_cast<const node::XLinkOut>(node);
275  if(xlinkOut == nullptr) {
276  continue;
277  }
278 
279  // Create DataOutputQueue's
280  auto streamName = xlinkOut->getStreamName();
281  if(outputQueueMap.count(streamName) != 0) throw std::invalid_argument(fmt::format("Streams have duplicate name '{}'", streamName));
282  outputQueueMap[streamName] = std::make_shared<DataOutputQueue>(connection, streamName);
283 
284  // Add callback for events
285  callbackIdMap[std::move(streamName)] =
286  outputQueueMap[xlinkOut->getStreamName()]->addCallback([this](std::string queueName, std::shared_ptr<ADatatype>) {
287  {
288  // Lock first
289  std::unique_lock<std::mutex> lock(eventMtx);
290 
291  // Check if size is equal or greater than EVENT_QUEUE_MAXIMUM_SIZE
292  if(eventQueue.size() >= EVENT_QUEUE_MAXIMUM_SIZE) {
293  auto numToRemove = eventQueue.size() - EVENT_QUEUE_MAXIMUM_SIZE + 1;
294  eventQueue.erase(eventQueue.begin(), eventQueue.begin() + numToRemove);
295  }
296 
297  // Add to the end of event queue
298  eventQueue.push_back(std::move(queueName));
299  }
300 
301  // notify the rest
302  eventCv.notify_all();
303  });
304  }
305  return DeviceBase::startPipelineImpl(pipeline);
306 }
307 
308 } // namespace dai
dai::Device::~Device
~Device() override
dtor to close the device
Definition: Device.cpp:64
dai::Device::eventCv
std::condition_variable eventCv
Definition: Device.hpp:222
dai::Pipeline
Represents the pipeline, set of nodes and connections between them.
Definition: Pipeline.hpp:100
Pipeline.hpp
dai::UsbSpeed
UsbSpeed
Definition: shared/depthai-shared/include/depthai-shared/common/UsbSpeed.hpp:12
dai::DeviceBase::close
void close()
Definition: DeviceBase.cpp:516
dai::DeviceInfo
Definition: XLinkConnection.hpp:27
dai::Device::Device
Device()
Definition: Device.cpp:62
dai::Device::callbackIdMap
std::unordered_map< std::string, DataOutputQueue::CallbackId > callbackIdMap
Definition: Device.hpp:218
Resources.hpp
XLinkConstants.hpp
dai::Device::eventQueue
std::deque< std::string > eventQueue
Definition: Device.hpp:223
dai::Device::getInputQueueNames
std::vector< std::string > getInputQueueNames() const
Definition: Device.cpp:145
Bootloader.hpp
Device.hpp
dai::Device::getOutputQueue
std::shared_ptr< DataOutputQueue > getOutputQueue(const std::string &name)
Definition: Device.cpp:86
RawImgFrame.hpp
DeviceBootloader.hpp
dai::Device::getOutputQueueNames
std::vector< std::string > getOutputQueueNames() const
Definition: Device.cpp:111
Initialization.hpp
dai::Device::outputQueueMap
std::unordered_map< std::string, std::shared_ptr< DataOutputQueue > > outputQueueMap
Definition: Device.hpp:216
dai::Device::EVENT_QUEUE_MAXIMUM_SIZE
static constexpr std::size_t EVENT_QUEUE_MAXIMUM_SIZE
Maximum number of elements in event queue.
Definition: Device.hpp:97
dai::Device::eventMtx
std::mutex eventMtx
Definition: Device.hpp:221
dai::DeviceBase::startPipelineImpl
virtual bool startPipelineImpl(const Pipeline &pipeline)
Definition: DeviceBase.cpp:1525
dai::DeviceBase::closeImpl
virtual void closeImpl()
Definition: DeviceBase.cpp:536
dai::Device::inputQueueMap
std::unordered_map< std::string, std::shared_ptr< DataInputQueue > > inputQueueMap
Definition: Device.hpp:217
XLinkIn.hpp
dai::Device::startPipelineImpl
bool startPipelineImpl(const Pipeline &pipeline) override
Definition: Device.cpp:255
dai::DeviceBase::connection
std::shared_ptr< XLinkConnection > connection
Definition: DeviceBase.hpp:924
dai::DeviceBase::tryStartPipeline
void tryStartPipeline(const Pipeline &pipeline)
a safe way to start a pipeline, which is closed if any exception occurs
Definition: DeviceBase.cpp:636
dai::Pipeline::getNodeMap
const NodeMap & getNodeMap() const
Get a reference to internal node map.
Definition: Pipeline.hpp:185
dai::DeviceBase
Definition: DeviceBase.hpp:50
dai::Device::getQueueEvent
std::string getQueueEvent(const std::vector< std::string > &queueNames, std::chrono::microseconds timeout=std::chrono::microseconds(-1))
Definition: Device.cpp:238
dai::Device::getQueueEvents
std::vector< std::string > getQueueEvents(const std::vector< std::string > &queueNames, std::size_t maxNumEvents=std::numeric_limits< std::size_t >::max(), std::chrono::microseconds timeout=std::chrono::microseconds(-1))
Definition: Device.cpp:164
XLinkOut.hpp
dai::Device::closeImpl
void closeImpl() override
Definition: Device.cpp:68
dai::Device::getInputQueue
std::shared_ptr< DataInputQueue > getInputQueue(const std::string &name)
Definition: Device.cpp:120
dai::Path
Represents paths on a filesystem; accepts utf-8, Windows utf-16 wchar_t, or std::filesystem::path.
Definition: Path.hpp:27
dai
Definition: CameraExposureOffset.hpp:6
DeviceLogger.hpp


depthai
Author(s): Martin Peterlin
autogenerated on Sat Mar 22 2025 02:58:19