XLinkStream.cpp
Go to the documentation of this file.
2 
3 // libraries
4 #include "XLink/XLink.h"
5 #include "spdlog/fmt/fmt.h"
6 
7 // project
9 
10 namespace dai {
11 
12 // static
13 constexpr std::chrono::milliseconds XLinkStream::WAIT_FOR_STREAM_RETRY;
15 
16 XLinkStream::XLinkStream(const std::shared_ptr<XLinkConnection> conn, const std::string& name, std::size_t maxWriteSize) : connection(conn), streamName(name) {
17  if(name.empty()) throw std::invalid_argument("Cannot create XLinkStream using empty stream name");
18  if(!connection || connection->getLinkId() == -1) throw std::invalid_argument("Cannot create XLinkStream using unconnected XLinkConnection");
19 
20  streamId = INVALID_STREAM_ID;
21 
22  for(int retryCount = 0; retryCount < STREAM_OPEN_RETRIES; retryCount++) {
23  streamId = XLinkOpenStream(connection->getLinkId(), streamName.c_str(), static_cast<int>(maxWriteSize));
24  if(streamId == INVALID_STREAM_ID) {
25  // Give some time before continuing
26  std::this_thread::sleep_for(WAIT_FOR_STREAM_RETRY);
27  } else {
28  break;
29  }
30  }
31 
32  if(streamId == INVALID_STREAM_ID) throw std::runtime_error("Couldn't open stream");
33 }
34 
35 // Move constructor
37  : connection(std::move(other.connection)), streamName(std::exchange(other.streamName, {})), streamId(std::exchange(other.streamId, INVALID_STREAM_ID)) {
38  // Set other's streamId to INVALID_STREAM_ID to prevent closing
39 }
40 
42  if(this != &other) {
43  connection = std::move(other.connection);
44  streamId = std::exchange(other.streamId, INVALID_STREAM_ID);
45  streamName = std::exchange(other.streamName, {});
46  }
47  return *this;
48 }
49 
51  // If streamId != invalid (eg. wasn't moved to another XLinkStream)
52  if(streamId != INVALID_STREAM_ID) {
53  XLinkCloseStream(streamId);
54  }
55 }
56 
57 StreamPacketDesc::StreamPacketDesc(StreamPacketDesc&& other) noexcept : streamPacketDesc_t{other.data, other.length, other.tRemoteSent, other.tReceived} {
58  other.data = nullptr;
59  other.length = 0;
60 }
61 
63  if(this != &other) {
64  data = std::exchange(other.data, nullptr);
65  length = std::exchange(other.length, 0);
66  tRemoteSent = std::exchange(other.tRemoteSent, {});
67  tReceived = std::exchange(other.tReceived, {});
68  }
69  return *this;
70 }
71 
73  XLinkDeallocateMoveData(data, length);
74 }
75 
77 // BLOCKING VERSIONS
79 
80 void XLinkStream::write(const std::uint8_t* data, std::size_t size) {
81  auto status = XLinkWriteData(streamId, data, static_cast<int>(size));
82  if(status != X_LINK_SUCCESS) {
84  }
85 }
86 void XLinkStream::write(const void* data, std::size_t size) {
87  write(reinterpret_cast<const uint8_t*>(data), size);
88 }
89 
90 void XLinkStream::write(const std::vector<std::uint8_t>& data) {
91  write(data.data(), data.size());
92 }
93 
94 void XLinkStream::read(std::vector<std::uint8_t>& data) {
95  StreamPacketDesc packet;
96  const auto status = XLinkReadMoveData(streamId, &packet);
97  if(status != X_LINK_SUCCESS) {
99  }
100  data = std::vector<std::uint8_t>(packet.data, packet.data + packet.length);
101 }
102 
103 void XLinkStream::read(std::vector<std::uint8_t>& data, XLinkTimespec& timestampReceived) {
104  StreamPacketDesc packet;
105  const auto status = XLinkReadMoveData(streamId, &packet);
106  if(status != X_LINK_SUCCESS) {
108  }
109  data = std::vector<std::uint8_t>(packet.data, packet.data + packet.length);
110  timestampReceived = packet.tReceived;
111 }
112 
113 std::vector<std::uint8_t> XLinkStream::read() {
114  std::vector<std::uint8_t> data;
115  read(data);
116  return data;
117 }
118 
119 std::vector<std::uint8_t> XLinkStream::read(XLinkTimespec& timestampReceived) {
120  std::vector<std::uint8_t> data;
121  read(data, timestampReceived);
122  return data;
123 }
124 
126  StreamPacketDesc packet;
127  const auto status = XLinkReadMoveData(streamId, &packet);
128  if(status != X_LINK_SUCCESS) {
130  }
131  return packet;
132 }
133 
134 // USE ONLY WHEN COPYING DATA AT LATER STAGES
135 streamPacketDesc_t* XLinkStream::readRaw() {
136  streamPacketDesc_t* pPacket = nullptr;
137  auto status = XLinkReadData(streamId, &pPacket);
138  if(status != X_LINK_SUCCESS) {
140  }
141  return pPacket;
142 }
143 
144 // USE ONLY WHEN COPYING DATA AT LATER STAGES
146  XLinkError_t status;
147  if((status = XLinkReleaseData(streamId)) != X_LINK_SUCCESS) throw XLinkReadError(status, streamName);
148 }
149 
150 // SPLIT HELPER
151 void XLinkStream::writeSplit(const void* d, std::size_t size, std::size_t split) {
152  const uint8_t* data = (const uint8_t*)d;
153  std::size_t currentOffset = 0;
154  std::size_t remaining = size;
155  std::size_t sizeToTransmit = 0;
156  XLinkError_t ret = X_LINK_SUCCESS;
157  while(remaining > 0) {
158  sizeToTransmit = remaining > split ? split : remaining;
159  ret = XLinkWriteData(streamId, data + currentOffset, static_cast<int>(sizeToTransmit));
160  if(ret != X_LINK_SUCCESS) {
161  throw XLinkWriteError(ret, streamName);
162  }
163  currentOffset += sizeToTransmit;
164  remaining = size - currentOffset;
165  }
166 }
167 
168 void XLinkStream::writeSplit(const std::vector<uint8_t>& data, std::size_t split) {
169  writeSplit(data.data(), data.size(), split);
170 }
171 
173 // Timeout versions //
175 
176 bool XLinkStream::write(const std::uint8_t* data, std::size_t size, std::chrono::milliseconds timeout) {
177  auto status = XLinkWriteDataWithTimeout(streamId, data, static_cast<int>(size), static_cast<unsigned int>(timeout.count()));
178  if(status == X_LINK_SUCCESS) {
179  return true;
180  } else if(status == X_LINK_TIMEOUT) {
181  return false;
182  } else {
184  }
185 }
186 
187 bool XLinkStream::write(const void* data, std::size_t size, std::chrono::milliseconds timeout) {
188  return write(reinterpret_cast<const std::uint8_t*>(data), size, timeout);
189 }
190 
191 bool XLinkStream::write(const std::vector<std::uint8_t>& data, std::chrono::milliseconds timeout) {
192  return write(data.data(), data.size(), timeout);
193 }
194 
195 bool XLinkStream::read(std::vector<std::uint8_t>& data, std::chrono::milliseconds timeout) {
196  StreamPacketDesc packet;
197  const auto status = XLinkReadMoveDataWithTimeout(streamId, &packet, static_cast<unsigned int>(timeout.count()));
198  if(status == X_LINK_SUCCESS) {
199  data = std::vector<std::uint8_t>(packet.data, packet.data + packet.length);
200  return true;
201  } else if(status == X_LINK_TIMEOUT) {
202  return false;
203  } else {
205  }
206 }
207 
208 bool XLinkStream::readMove(StreamPacketDesc& packet, const std::chrono::milliseconds timeout) {
209  const auto status = XLinkReadMoveDataWithTimeout(streamId, &packet, static_cast<unsigned int>(timeout.count()));
210  if(status == X_LINK_SUCCESS) {
211  return true;
212  } else if(status == X_LINK_TIMEOUT) {
213  return false;
214  } else {
216  }
217 }
218 
219 bool XLinkStream::readRaw(streamPacketDesc_t*& pPacket, std::chrono::milliseconds timeout) {
220  auto status = XLinkReadDataWithTimeout(streamId, &pPacket, static_cast<unsigned int>(timeout.count()));
221  if(status == X_LINK_SUCCESS) {
222  return true;
223  } else if(status == X_LINK_TIMEOUT) {
224  return false;
225  } else {
227  }
228 }
229 
230 streamId_t XLinkStream::getStreamId() const {
231  return streamId;
232 }
233 
234 XLinkReadError::XLinkReadError(XLinkError_t status, const std::string& streamName)
235  : XLinkError(status, streamName, fmt::format("Couldn't read data from stream: '{}' ({})", streamName, XLinkConnection::convertErrorCodeToString(status))) {}
236 
237 XLinkWriteError::XLinkWriteError(XLinkError_t status, const std::string& streamName)
238  : XLinkError(status, streamName, fmt::format("Couldn't write data to stream: '{}' ({})", streamName, XLinkConnection::convertErrorCodeToString(status))) {}
239 
240 } // namespace dai
dai::XLinkStream::read
std::vector< std::uint8_t > read()
Definition: XLinkStream.cpp:113
dai::XLinkReadError::XLinkError
XLinkError XLinkError
Definition: XLinkStream.hpp:95
dai::XLinkStream::getStreamId
streamId_t getStreamId() const
Definition: XLinkStream.cpp:230
dai::XLinkWriteError::XLinkWriteError
XLinkWriteError(XLinkError_t status, const std::string &stream)
Definition: XLinkStream.cpp:237
dai::StreamPacketDesc::StreamPacketDesc
StreamPacketDesc() noexcept
Definition: XLinkStream.hpp:29
dai::StreamPacketDesc
Definition: XLinkStream.hpp:27
dai::XLinkStream::~XLinkStream
~XLinkStream()
Definition: XLinkStream.cpp:50
dai::XLinkStream::writeSplit
void writeSplit(const void *data, std::size_t size, std::size_t split)
Definition: XLinkStream.cpp:151
dai::XLinkStream
Definition: XLinkStream.hpp:37
dai::XLinkReadError
Definition: XLinkStream.hpp:94
split
std::vector< int > split(std::string s, std::string delimiter)
Definition: poe_set_ip.cpp:9
dai::XLinkWriteError::XLinkError
XLinkError XLinkError
Definition: XLinkStream.hpp:99
dai::XLinkStream::connection
std::shared_ptr< XLinkConnection > connection
Definition: XLinkStream.hpp:42
DAI_SPAN_NAMESPACE_NAME::detail::data
constexpr auto data(C &c) -> decltype(c.data())
Definition: span.hpp:177
dai::XLinkWriteError
Definition: XLinkStream.hpp:98
DAI_SPAN_NAMESPACE_NAME::detail::size
constexpr auto size(const C &c) -> decltype(c.size())
Definition: span.hpp:167
dai::XLinkStream::readMove
StreamPacketDesc readMove()
Definition: XLinkStream.cpp:125
XLinkConnection.hpp
dai::XLinkStream::readRaw
streamPacketDesc_t * readRaw()
Definition: XLinkStream.cpp:135
dai::StreamPacketDesc::~StreamPacketDesc
~StreamPacketDesc() noexcept
Definition: XLinkStream.cpp:72
XLinkStream.hpp
dai::XLinkStream::operator=
XLinkStream & operator=(const XLinkStream &)=delete
dai::StreamPacketDesc::operator=
StreamPacketDesc & operator=(const StreamPacketDesc &)=delete
dai::XLinkReadError::XLinkReadError
XLinkReadError(XLinkError_t status, const std::string &stream)
Definition: XLinkStream.cpp:234
dai::XLinkStream::WAIT_FOR_STREAM_RETRY
constexpr static std::chrono::milliseconds WAIT_FOR_STREAM_RETRY
Definition: XLinkStream.hpp:40
dai::XLinkStream::write
void write(const void *data, std::size_t size)
Definition: XLinkStream.cpp:86
nanorpc::core::detail::pack::meta::status
status
Definition: pack_meta.h:33
std
Definition: Node.hpp:366
fmt
dai::XLinkStream::streamId
streamId_t streamId
Definition: XLinkStream.hpp:44
dai::XLinkConnection
Definition: XLinkConnection.hpp:51
dai::XLinkStream::XLinkStream
XLinkStream(const std::shared_ptr< XLinkConnection > conn, const std::string &name, std::size_t maxWriteSize)
Definition: XLinkStream.cpp:16
dai
Definition: CameraExposureOffset.hpp:6
dai::XLinkStream::readRawRelease
void readRawRelease()
Definition: XLinkStream.cpp:145
dai::XLinkStream::streamName
std::string streamName
Definition: XLinkStream.hpp:43
dai::XLinkStream::STREAM_OPEN_RETRIES
constexpr static int STREAM_OPEN_RETRIES
Definition: XLinkStream.hpp:39


depthai
Author(s): Martin Peterlin
autogenerated on Sat Mar 22 2025 02:58:19