uvc-streamer.cpp
Go to the documentation of this file.
1 // License: Apache 2.0. See LICENSE file in root directory.
2 // Copyright(c) 2015 Intel Corporation. All Rights Reserved.
3 
4 #include "uvc-streamer.h"
5 #include "../backend.h"
6 
10 
12  if (ptr) ptr->owner->deallocate(ptr);
13 }
14 
15 namespace librealsense
16 {
17  namespace platform
18  {
20  _context(context), _action_dispatcher(10)
21  {
22  auto inf = context.usb_device->get_interface(context.control->bInterfaceNumber);
23  if (inf == nullptr)
24  throw std::runtime_error("can't find UVC streaming interface of device: " + context.usb_device->get_info().id);
26 
28  LOG_INFO("endpoint " << (int)_read_endpoint->get_address() << " read buffer size: " << std::dec <<_read_buff_length);
29 
31 
32  _watchdog_timeout = (1000.0 / _context.profile.fps) * 10;
33 
34  init();
35  }
36 
38  {
39  flush();
40  }
41 
42  void uvc_process_bulk_payload(backend_frame_ptr fp, size_t payload_len, backend_frames_queue& queue) {
43 
44  /* ignore empty payload transfers */
45  if (!fp || payload_len < 2)
46  return;
47 
48  uint8_t header_len = fp->pixels[0];
49  uint8_t header_info = fp->pixels[1];
50 
51  size_t data_len = payload_len - header_len;
52 
53  if (header_info & 0x40)
54  {
55  LOG_ERROR("bad packet: error bit set");
56  return;
57  }
58  if (header_len > payload_len)
59  {
60  LOG_ERROR("bogus packet: actual_len=" << payload_len << ", header_len=" << header_len);
61  return;
62  }
63 
64 
65  LOG_DEBUG("Passing packet to user CB with size " << (data_len + header_len));
66  librealsense::platform::frame_object fo{ data_len, header_len,
67  fp->pixels.data() + header_len , fp->pixels.data() };
68  fp->fo = fo;
69 
70  queue.enqueue(std::move(fp));
71  }
72 
74  {
75  _frames_archive = std::make_shared<backend_frames_archive>();
76  // Get all pointers from archive and initialize their content
77  std::vector<backend_frame *> frames;
78  for (auto i = 0; i < _frames_archive->CAPACITY; i++) {
79  auto ptr = _frames_archive->allocate();
80  ptr->pixels.resize(_read_buff_length, 0);
81  ptr->owner = _frames_archive.get();
82  frames.push_back(ptr);
83  }
84 
85  for (auto ptr : frames) {
86  _frames_archive->deallocate(ptr);
87  }
88 
89  _publish_frame_thread = std::make_shared<active_object<>>([this](dispatcher::cancellable_timer cancellable_timer)
90  {
91  backend_frame_ptr fp(nullptr, [](backend_frame *) {});
93  {
94  if(_publish_frames && running())
95  _context.user_cb(_context.profile, fp->fo, []() mutable {});
96  }
97  });
98 
99  _watchdog = std::make_shared<watchdog>([this]()
100  {
102  {
103  if(!_running || !_frame_arrived)
104  return;
105 
106  LOG_ERROR("uvc streamer watchdog triggered on endpoint: " << (int)_read_endpoint->get_address());
108  _frame_arrived = false;
109  });
110  }, _watchdog_timeout);
111 
112  _watchdog->start();
113 
114  _request_callback = std::make_shared<usb_request_callback>([this](platform::rs_usb_request r)
115  {
117  {
118  if(!_running)
119  return;
120 
121  auto al = r->get_actual_length();
122  // Relax the frame size constrain for compressed streams
123  bool is_compressed = val_in_range(_context.profile.format, { 0x4d4a5047U , 0x5a313648U}); // MJPEG, Z16H
124  if(al > 0L && ((al == r->get_buffer().data()[0] + _context.control->dwMaxVideoFrameSize) || is_compressed ))
125  {
126  auto f = backend_frame_ptr(_frames_archive->allocate(), &cleanup_frame);
127  if(f)
128  {
129  _frame_arrived = true;
130  _watchdog->kick();
131  memcpy(f->pixels.data(), r->get_buffer().data(), r->get_buffer().size());
132  uvc_process_bulk_payload(std::move(f), r->get_actual_length(), _queue);
133  }
134  }
135 
136  auto sts = _context.messenger->submit_request(r);
138  LOG_ERROR("failed to submit UVC request, error: " << sts);
139  });
140  });
141 
142  _requests = std::vector<rs_usb_request>(_context.request_count);
143  for(auto&& r : _requests)
144  {
145  r = _context.messenger->create_request(_read_endpoint);
146  r->set_buffer(std::vector<uint8_t>(_read_buff_length));
147  r->set_callback(_request_callback);
148  }
149  }
150 
152  {
154  {
155  if(_running)
156  return;
157 
159 
160  {
161  std::lock_guard<std::mutex> lock(_running_mutex);
162  _running = true;
163  }
164 
165  for(auto&& r : _requests)
166  {
167  auto sts = _context.messenger->submit_request(r);
169  throw std::runtime_error("failed to submit UVC request while start streaming");
170  }
171 
172  _publish_frame_thread->start();
173 
174  }, [this](){ return _running; });
175  }
176 
178  {
180  {
181  if(!_running)
182  return;
183 
184  _request_callback->cancel();
185 
186  _watchdog->stop();
187 
188  _frames_archive->stop_allocation();
189 
190  _queue.clear();
191 
192  for(auto&& r : _requests)
193  _context.messenger->cancel_request(r);
194 
195  _requests.clear();
196 
197  _frames_archive->wait_until_empty();
198 
200 
201  _publish_frame_thread->stop();
202 
203  {
204  std::lock_guard<std::mutex> lock(_running_mutex);
205  _running = false;
206  _stopped_cv.notify_one();
207  }
208 
209  }, [this](){ return !_running; });
210  }
211 
213  {
214  if(_running)
215  stop();
216 
217  // synchronized so do not destroy shared pointers while it's still being running
218  {
219  std::unique_lock<std::mutex> lock(_running_mutex);
220  _stopped_cv.wait_for(lock, std::chrono::seconds(1), [&]() { return !_running; });
221  }
222 
223  _read_endpoint.reset();
224 
225  _watchdog.reset();
226  _publish_frame_thread.reset();
227  _request_callback.reset();
228 
229  _frames_archive.reset();
230 
232  }
233 
235  {
237  while(!_frame_arrived)
238  {
240  auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(end - start).count();
241  if(duration > timeout_ms)
242  break;
243  }
244  return _frame_arrived;
245  }
246  }
247 }
void enqueue(T &&item)
Definition: concurrency.h:35
static const textual_icon lock
Definition: model-views.h:218
GLuint GLuint end
void deallocate(T *item)
Definition: src/types.h:1201
std::vector< rs_usb_request > _requests
Definition: uvc-streamer.h:62
std::unique_ptr< backend_frame, cleanup_ptr > backend_frame_ptr
Definition: uvc-types.h:440
std::shared_ptr< usb_request > rs_usb_request
Definition: usb-request.h:41
std::shared_ptr< platform::usb_request_callback > _request_callback
Definition: uvc-streamer.h:65
void start()
Definition: concurrency.h:286
bool val_in_range(const T &val, const std::initializer_list< T > &list)
Definition: src/types.h:174
bool dequeue(T *item, unsigned int timeout_ms)
Definition: concurrency.h:69
bool wait_for_first_frame(uint32_t timeout_ms)
unsigned char uint8_t
Definition: stdint.h:78
std::shared_ptr< active_object<> > _publish_frame_thread
Definition: uvc-streamer.h:64
GLdouble f
void stop()
Definition: concurrency.h:294
std::shared_ptr< uvc_stream_ctrl_t > control
Definition: uvc-streamer.h:25
const GLubyte * c
Definition: glext.h:12690
GLdouble GLdouble r
unsigned int uint32_t
Definition: stdint.h:80
std::shared_ptr< backend_frames_archive > _frames_archive
Definition: uvc-streamer.h:63
GLuint start
std::shared_ptr< watchdog > _watchdog
Definition: uvc-streamer.h:58
#define LOG_ERROR(...)
Definition: src/types.h:242
backend_frames_archive * owner
Definition: uvc-types.h:434
uvc_streamer(uvc_streamer_context context)
LOG_INFO("Log message using LOG_INFO()")
void uvc_process_bulk_payload(backend_frame_ptr fp, size_t payload_len, backend_frames_queue &queue)
void cleanup_frame(backend_frame *ptr)
const int DEQUEUE_MILLISECONDS_TIMEOUT
Definition: uvc-streamer.cpp:8
GLint GLsizei count
void invoke(T item, bool is_blocking=false)
Definition: concurrency.h:254
typename::boost::move_detail::remove_reference< T >::type && move(T &&t) BOOST_NOEXCEPT
const int UVC_PAYLOAD_MAX_HEADER_LENGTH
Definition: uvc-streamer.cpp:7
std::condition_variable _stopped_cv
Definition: uvc-streamer.h:48
int i
#define LOG_DEBUG(...)
Definition: src/types.h:239
const int ENDPOINT_RESET_MILLISECONDS_TIMEOUT
Definition: uvc-streamer.cpp:9
void invoke_and_wait(T item, std::function< bool()> exit_condition, bool is_blocking=false)
Definition: concurrency.h:266


librealsense2
Author(s): Sergey Dorodnicov , Doron Hirshberg , Mark Horn , Reagan Lopez , Itay Carpis
autogenerated on Mon May 3 2021 02:50:13