record_device.cpp
Go to the documentation of this file.
1 // License: Apache 2.0. See LICENSE file in root directory.
2 // Copyright(c) 2017 Intel Corporation. All Rights Reserved.
3 
4 #include <core/debug.h>
5 #include <core/motion.h>
6 #include <core/advanced_mode.h>
7 #include "record_device.h"
8 #include "l500/l500-depth.h"
9 
10 using namespace librealsense;
11 
12 librealsense::record_device::record_device(std::shared_ptr<librealsense::device_interface> device,
13  std::shared_ptr<librealsense::device_serializer::writer> serializer):
14  m_write_thread([](){return std::make_shared<dispatcher>(std::numeric_limits<unsigned int>::max());}),
15  m_is_recording(true),
17 {
18  if (device == nullptr)
19  {
20  throw invalid_value_exception("device is null");
21  }
22 
23  if (serializer == nullptr)
24  {
25  throw invalid_value_exception("serializer is null");
26  }
27 
28  m_device = device;
29  m_ros_writer = serializer;
30  (*m_write_thread)->start(); //Start thread before creating the sensors (since they might write right away)
32  LOG_DEBUG("Created record_device");
33 }
34 
35 std::vector<std::shared_ptr<librealsense::record_sensor>> librealsense::record_device::create_record_sensors(std::shared_ptr<librealsense::device_interface> device)
36 {
37  std::vector<std::shared_ptr<librealsense::record_sensor>> record_sensors;
38  for (size_t sensor_index = 0; sensor_index < device->get_sensors_count(); sensor_index++)
39  {
40  auto& live_sensor = device->get_sensor(sensor_index);
41  auto recording_sensor = std::make_shared<librealsense::record_sensor>(*this, live_sensor);
42  m_on_notification_token = recording_sensor->on_notification += [this, recording_sensor, sensor_index](const notification& n) { write_notification(sensor_index, n); };
43  auto on_error = [recording_sensor](const std::string& s) {recording_sensor->stop_with_error(s); };
44  m_on_frame_token = recording_sensor->on_frame += [this, recording_sensor, sensor_index, on_error](frame_holder f) {
45  write_data(sensor_index, std::move(f), on_error);
46  };
47  m_on_extension_change_token = recording_sensor->on_extension_change += [this, recording_sensor, sensor_index, on_error](rs2_extension ext, std::shared_ptr<extension_snapshot> snapshot) { write_sensor_extension_snapshot(sensor_index, ext, snapshot, on_error); };
48  recording_sensor->init(); //Calling init AFTER register to the above events
49  record_sensors.emplace_back(recording_sensor);
50  }
51  return record_sensors;
52 }
53 
55 {
56  for (auto&& s : m_sensors)
57  {
58  s->on_notification -= m_on_notification_token;
59  s->on_frame -= m_on_frame_token;
60  s->on_extension_change -= m_on_extension_change_token;
61  s->disable_recording();
62  }
63  if ((*m_write_thread)->flush() == false)
64  {
65  LOG_ERROR("Error - timeout waiting for flush, possible deadlock detected");
66  }
67  (*m_write_thread)->stop();
68  //Just in case someone still holds a reference to the sensors,
69  // we make sure that they will not try to record anything
70  m_sensors.clear();
71 }
72 
73 std::shared_ptr<context> librealsense::record_device::get_context() const
74 {
75  return m_device->get_context();
76 }
77 
79 {
80  return *(m_sensors.at(i));
81 }
82 
84 {
85  return m_sensors.size();
86 }
87 
89 {
90  auto device_extensions_md = get_extensions_snapshots(m_device.get());
91  LOG_DEBUG("Created device snapshot with " << device_extensions_md.get_snapshots().size() << " snapshots");
92 
93  std::vector<device_serializer::sensor_snapshot> sensors_snapshot;
94  for (size_t j = 0; j < m_device->get_sensors_count(); ++j)
95  {
96  auto& sensor = m_device->get_sensor(j);
97  auto sensor_extensions_snapshots = get_extensions_snapshots(&sensor);
98  sensors_snapshot.emplace_back(static_cast<uint32_t>(j), sensor_extensions_snapshots);
99  LOG_DEBUG("Created sensor " << j << " snapshot with " << device_extensions_md.get_snapshots().size() << " snapshots");
100  }
101 
102  m_ros_writer->write_device_description({ device_extensions_md, sensors_snapshot, {/*extrinsics are written by ros_writer*/} });
103 }
104 
105 //Returns the time relative to beginning of the recording
107 {
108  if (m_capture_time_base.time_since_epoch() == std::chrono::nanoseconds::zero())
109  {
110  return std::chrono::nanoseconds::zero();
111  }
114 }
115 
116 void librealsense::record_device::write_data(size_t sensor_index, librealsense::frame_holder frame, std::function<void(std::string const&)> on_error)
117 {
118  //write_data is called from the sensors, when the live sensor raises a frame
119 
120  LOG_DEBUG("write frame " << (frame ? std::to_string(frame.frame->get_frame_number()) : "") << " from sensor " << sensor_index);
121 
122  std::call_once(m_first_call_flag, [this]()
123  {
125  });
126 
127  //TODO: restore: uint64_t data_size = frame.frame->get_frame_data_size();
128  uint64_t cached_data_size = m_cached_data_size; //TODO: restore: (+ data_size)
129  if (cached_data_size > MAX_CACHED_DATA_SIZE)
130  {
131  LOG_WARNING("Recorder reached maximum cache size, frame dropped");
132  on_error("Recorder reached maximum cache size, frame dropped");
133  return;
134  }
135 
136  m_cached_data_size = cached_data_size;
138  //TODO: remove usage of shared pointer when frame_holder is copyable
139  auto frame_holder_ptr = std::make_shared<frame_holder>();
140  *frame_holder_ptr = std::move(frame);
141  (*m_write_thread)->invoke([this, frame_holder_ptr, sensor_index, capture_time/*, data_size*/, on_error](dispatcher::cancellable_timer t) {
142  if (m_is_recording == false)
143  {
144  return; //Recording is paused
145  }
146  std::call_once(m_first_frame_flag, [&]()
147  {
148  try
149  {
150  write_header();
151  }
152  catch (const std::exception& e)
153  {
154  LOG_ERROR("Failed to write header. " << e.what());
155  on_error(to_string() << "Failed to write header. " << e.what());
156  }
157  });
158 
159  try
160  {
161  const uint32_t device_index = 0;
162  auto stream_type = frame_holder_ptr->frame->get_stream()->get_stream_type();
163  auto stream_index = static_cast<uint32_t>(frame_holder_ptr->frame->get_stream()->get_stream_index());
164  m_ros_writer->write_frame({ device_index, static_cast<uint32_t>(sensor_index), stream_type, stream_index }, capture_time, std::move(*frame_holder_ptr));
165  //TODO: restore: std::lock_guard<std::mutex> locker(m_mutex); m_cached_data_size -= data_size;
166  }
167  catch(std::exception& e)
168  {
169  on_error(to_string() << "Failed to write frame. " << e.what());
170  }
171  });
172 }
173 
175 {
176  return m_device->get_info(info);
177 }
178 
180 {
181  return m_device->supports_info(info);
182 }
183 
185 {
186  return *m_sensors.at(i);
187 }
189 {
190  m_device->hardware_reset();
191 }
192 
193 template <typename T, typename Ext>
195 {
196  auto api = dynamic_cast<recordable<Ext>*>(extendable);
197 
198  if (api != nullptr)
199  {
200  std::shared_ptr<Ext> p;
201  try
202  {
203  api->create_snapshot(p);
204  auto snapshot = std::dynamic_pointer_cast<extension_snapshot>(p);
205  if (snapshot != nullptr)
206  {
207  snapshots[TypeToExtension<Ext>::value] = snapshot;
208  LOG_INFO("Added snapshot of type: " << TypeToExtension<Ext>::to_string());
209  }
210  else
211  {
212  LOG_ERROR("Failed to downcast snapshot of type " << TypeToExtension<Ext>::to_string());
213  }
214  }
215  catch (const std::exception& e)
216  {
217  LOG_ERROR("Failed to add snapshot of type " << TypeToExtension<Ext>::to_string() << ". Exception: " << e.what());
218  }
219  }
220 }
221 
228 template<typename T>
230 {
231  //No support for extensions with more than a single type - i.e every extension has exactly one type in rs2_extension
233  for (int i = 0; i < static_cast<int>(RS2_EXTENSION_COUNT ); ++i)
234  {
235  rs2_extension ext = static_cast<rs2_extension>(i);
236  switch (ext)
237  {
241  //case RS2_EXTENSION_VIDEO : try_add_snapshot<T, ExtensionToType<RS2_EXTENSION_VIDEO >::type>(extendable, snapshots); break;
242  //case RS2_EXTENSION_ROI : try_add_snapshot<T, ExtensionToType<RS2_EXTENSION_ROI >::type>(extendable, snapshots); break;
249  //case RS2_EXTENSION_ADVANCED_MODE : try_add_snapshot<T, ExtensionToType<RS2_EXTENSION_ADVANCED_MODE >::type>(extendable, snapshots); break;
251  case RS2_EXTENSION_VIDEO_FRAME : break;
252  case RS2_EXTENSION_MOTION_FRAME : break;
253  case RS2_EXTENSION_COMPOSITE_FRAME : break;
254  case RS2_EXTENSION_POINTS : break;
255  case RS2_EXTENSION_RECORD : break;
256  case RS2_EXTENSION_PLAYBACK : break;
257  case RS2_EXTENSION_COUNT : break;
258  case RS2_EXTENSION_UNKNOWN : break;
259  default:
260  LOG_WARNING("Extensions type is unhandled: " << get_string(ext));
261  }
262  }
263  return snapshots;
264 }
265 
266 template <typename T>
268 {
269  std::shared_ptr<T> snapshot;
270  ext.create_snapshot(snapshot);
271  auto ext_snapshot = As<extension_snapshot>(snapshot);
272  if (!ext_snapshot)
273  {
274  assert(0);
275  return;
276  }
278  (*m_write_thread)->invoke([this, capture_time, ext_snapshot](dispatcher::cancellable_timer t)
279  {
280  try
281  {
282  const uint32_t device_index = 0;
283  m_ros_writer->write_snapshot(device_index, capture_time, TypeToExtension<T>::value, ext_snapshot);
284  }
285  catch (const std::exception& e)
286  {
287  LOG_ERROR(e.what());
288  }
289  });
290 }
291 
293  rs2_extension ext,
294  std::shared_ptr<extension_snapshot> snapshot,
295  std::function<void(std::string const&)> on_error)
296 {
298  (*m_write_thread)->invoke([this, sensor_index, capture_time, ext, snapshot, on_error](dispatcher::cancellable_timer t)
299  {
300  try
301  {
302  const uint32_t device_index = 0;
303  m_ros_writer->write_snapshot({ device_index, static_cast<uint32_t>(sensor_index) }, capture_time, ext, snapshot);
304  }
305  catch (const std::exception& e)
306  {
307  on_error(e.what());
308  }
309  });
310 }
311 
313 {
315  (*m_write_thread)->invoke([this, sensor_index, capture_time, n](dispatcher::cancellable_timer t)
316  {
317  try
318  {
319  const uint32_t device_index = 0;
320  m_ros_writer->write_notification({ device_index, static_cast<uint32_t>(sensor_index) }, capture_time, n);
321  }
322  catch (const std::exception& e)
323  {
324  LOG_ERROR(e.what());
325  }
326  });
327 
328 }
329 
330 template <rs2_extension E, typename P>
331 bool librealsense::record_device::extend_to_aux(std::shared_ptr<P> p, void** ext)
332 {
333  using EXT_TYPE = typename ExtensionToType<E>::type;
334  auto ptr = As<EXT_TYPE>(p);
335  if (!ptr)
336  return false;
337 
339  {
340  recordable->enable_recording([this](const EXT_TYPE& ext) {
342  });
343  }
344 
345  *ext = ptr.get();
346  return true;
347 }
348 
350 {
351  /**************************************************************************************
352  A record device wraps the live device, and should have the same functionalities.
353  To do that, the record device implements the extendable_interface and when the user tries to
354  treat the device as some extension, this function is called, and we return a pointer to the
355  live device's extension. If that extension is a recordable one, we also enable_recording for it.
356  **************************************************************************************/
357 
358  switch (extension_type)
359  {
360  case RS2_EXTENSION_INFO: // [[fallthrough]]
362  *ext = this;
363  return true;
364  case RS2_EXTENSION_OPTIONS : return extend_to_aux<RS2_EXTENSION_OPTIONS >(m_device, ext);
365  case RS2_EXTENSION_ADVANCED_MODE : return extend_to_aux<RS2_EXTENSION_ADVANCED_MODE >(m_device, ext);
366  case RS2_EXTENSION_DEBUG : return extend_to_aux<RS2_EXTENSION_DEBUG >(m_device, ext);
367  //Other cases are not extensions that we expect a device to have.
368  default:
369  LOG_WARNING("Extensions type is unhandled: " << get_string(extension_type));
370  return false;
371  }
372 }
373 
375 {
376  LOG_INFO("Record Pause called");
377 
378  (*m_write_thread)->invoke([this](dispatcher::cancellable_timer c)
379  {
380  LOG_DEBUG("Record pause invoked");
381 
382  if (m_is_recording == false)
383  return;
384 
385  //unregister_callbacks();
387  m_is_recording = false;
388  LOG_DEBUG("Time of pause: " << m_time_of_pause.time_since_epoch().count());
389  });
390  (*m_write_thread)->flush();
391  LOG_INFO("Record paused");
392 }
394 {
395  LOG_INFO("Record resume called");
396  (*m_write_thread)->invoke([this](dispatcher::cancellable_timer c)
397  {
398  LOG_DEBUG("Record resume invoked");
399  if (m_is_recording)
400  return;
401 
403  //register_callbacks();
404  m_is_recording = true;
405  LOG_DEBUG("Total pause time: " << m_record_pause_time.count());
406  LOG_INFO("Record resumed");
407  });
408 }
409 
411 {
412  return m_ros_writer->get_file_name();
413 }
415 {
416  return m_device->get_device_data();
417 }
418 std::shared_ptr<matcher> record_device::create_matcher(const frame_holder& frame) const
419 {
420  return m_device->create_matcher(frame);
421 }
422 
424 {
425  //Expected to be called once when recording to file actually starts
427  m_cached_data_size = 0;
428 }
430 {
431  for (auto&& sensor : m_sensors)
432  {
433  sensor->stop();
434  sensor->close();
435  }
436 }
437 
438 std::pair<uint32_t, rs2_extrinsics> record_device::get_extrinsics(const stream_interface& stream) const
439 {
440  return m_device->get_extrinsics(stream);
441 }
442 
444 {
445  return m_device->is_valid();
446 }
std::shared_ptr< device_serializer::writer > m_ros_writer
Definition: record_device.h:67
rs2_camera_info
Read-only strings that can be queried from the device. Not all information attributes are available o...
Definition: rs_sensor.h:22
const char * get_string(rs2_rs400_visual_preset value)
GLdouble s
std::shared_ptr< T > As(std::shared_ptr< P > ptr)
Definition: extension.h:88
GLfloat GLfloat p
Definition: glext.h:12687
sensor_interface & get_sensor(size_t i) override
device_serializer::snapshot_collection get_extensions_snapshots(T *extendable)
#define LOG_WARNING(...)
Definition: src/types.h:241
void write_data(size_t sensor_index, frame_holder f, std::function< void(std::string const &)> on_error)
std::chrono::high_resolution_clock::time_point m_capture_time_base
Definition: record_device.h:69
std::shared_ptr< device_interface > m_device
Definition: record_device.h:63
GLsizei const GLchar *const * string
virtual std::shared_ptr< matcher > create_matcher(const frame_holder &frame) const override
platform::backend_device_group get_device_data() const override
GLdouble n
Definition: glext.h:1966
void try_add_snapshot(T *extendable, device_serializer::snapshot_collection &snapshots)
e
Definition: rmse.py:177
virtual void enable_recording(std::function< void(const T &)> recording_function)=0
std::chrono::nanoseconds get_capture_time() const
GLdouble t
def info(name, value, persistent=False)
Definition: test.py:301
std::once_flag m_first_frame_flag
Definition: record_device.h:75
bool extend_to(rs2_extension extension_type, void **ext) override
bool is_valid() const override
std::shared_ptr< context > get_context() const override
GLdouble f
bool extend_to_aux(std::shared_ptr< P > p, void **ext)
size_t get_sensors_count() const override
std::once_flag m_first_call_flag
Definition: record_device.h:80
const GLubyte * c
Definition: glext.h:12690
void write_sensor_extension_snapshot(size_t sensor_index, rs2_extension ext, std::shared_ptr< extension_snapshot > snapshot, std::function< void(std::string const &)> on_error)
unsigned int uint32_t
Definition: stdint.h:80
unsigned __int64 uint64_t
Definition: stdint.h:90
GLint j
bool supports_info(rs2_camera_info info) const override
#define LOG_ERROR(...)
Definition: src/types.h:242
GLuint GLuint64EXT * capture_time
Definition: glext.h:11770
std::vector< std::shared_ptr< record_sensor > > create_record_sensors(std::shared_ptr< device_interface > m_device)
std::chrono::duration< uint64_t, std::nano > nanoseconds
LOG_INFO("Log message using LOG_INFO()")
frame_interface * frame
Definition: streaming.h:126
rs2_extension
Specifies advanced interfaces (capabilities) objects may implement.
Definition: rs_types.h:166
GLenum type
record_device(std::shared_ptr< device_interface > device, std::shared_ptr< device_serializer::writer > serializer)
std::chrono::high_resolution_clock::duration m_record_pause_time
Definition: record_device.h:70
void write_notification(size_t sensor_index, const notification &n)
typename::boost::move_detail::remove_reference< T >::type && move(T &&t) BOOST_NOEXCEPT
std::vector< std::shared_ptr< record_sensor > > m_sensors
Definition: record_device.h:64
std::pair< uint32_t, rs2_extrinsics > get_extrinsics(const stream_interface &stream) const override
int i
#define LOG_DEBUG(...)
Definition: src/types.h:239
std::chrono::high_resolution_clock::time_point m_time_of_pause
Definition: record_device.h:71
virtual unsigned long long get_frame_number() const =0
void stop_gracefully(to_string error_msg)
auto device
Definition: pyrs_net.cpp:17
const std::string & get_filename() const
void write_device_extension_changes(const T &ext)
std::string to_string(T value)
const std::string & get_info(rs2_camera_info info) const override
static const uint64_t MAX_CACHED_DATA_SIZE
Definition: record_device.h:21


librealsense2
Author(s): Sergey Dorodnicov , Doron Hirshberg , Mark Horn , Reagan Lopez , Itay Carpis
autogenerated on Mon May 3 2021 02:47:39