pipeline/pipeline.cpp
Go to the documentation of this file.
1 // License: Apache 2.0. See LICENSE file in root directory.
2 // Copyright(c) 2015 Intel Corporation. All Rights Reserved.
3 
4 #include <algorithm>
5 #include "pipeline.h"
6 #include "stream.h"
8 #include "media/ros/ros_writer.h"
9 
10 namespace librealsense
11 {
12  namespace pipeline
13  {
14  pipeline::pipeline(std::shared_ptr<librealsense::context> ctx) :
15  _ctx(ctx),
16  _dispatcher(10),
17  _hub(ctx, RS2_PRODUCT_LINE_ANY_INTEL),
19  {}
20 
22  {
23  if (_active_profile) {
24  try {
25  unsafe_stop();
26  }
27  catch (...) {}
28  }
29  }
30 
31  std::shared_ptr<profile> pipeline::start(std::shared_ptr<config> conf, frame_callback_ptr callback)
32  {
33  std::lock_guard<std::mutex> lock(_mtx);
34  if (_active_profile)
35  {
36  throw librealsense::wrong_api_call_sequence_exception("start() cannot be called before stop()");
37  }
39  unsafe_start(conf);
41  }
42 
43  std::shared_ptr<profile> pipeline::get_active_profile() const
44  {
45  std::lock_guard<std::mutex> lock(_mtx);
47  }
48 
49  std::shared_ptr<profile> pipeline::unsafe_get_active_profile() const
50  {
51  if (!_active_profile)
52  throw librealsense::wrong_api_call_sequence_exception("get_active_profile() can only be called between a start() and a following stop()");
53 
54  return _active_profile;
55  }
56 
57  void pipeline::unsafe_start(std::shared_ptr<config> conf)
58  {
59  std::shared_ptr<profile> profile = nullptr;
60  //first try to get the previously resolved profile (if exists)
61  auto cached_profile = conf->get_cached_resolved_profile();
62  if (cached_profile)
63  {
64  profile = cached_profile;
65  }
66  else
67  {
68  const int NUM_TIMES_TO_RETRY = 3;
69  for (int i = 1; i <= NUM_TIMES_TO_RETRY; i++)
70  {
71  try
72  {
73  profile = conf->resolve(shared_from_this(), std::chrono::seconds(5));
74  break;
75  }
76  catch (...)
77  {
78  if (i == NUM_TIMES_TO_RETRY)
79  throw;
80  }
81  }
82  }
83 
84  assert(profile);
85  if (!profile->_multistream.get_profiles().size())
86  throw librealsense::wrong_api_call_sequence_exception("No streams are selected!");
87 
88  auto synced_streams_ids = on_start(profile);
89 
90  frame_callback_ptr callbacks = get_callback(synced_streams_ids);
91 
92  auto dev = profile->get_device();
93  if (auto playback = As<librealsense::playback_device>(dev))
94  {
95  _playback_stopped_token = playback->playback_status_changed += [this, callbacks](rs2_playback_status status)
96  {
98  {
100  {
101  //If the pipeline holds a playback device, and it reached the end of file (stopped)
102  //Then we restart it
103  if (_active_profile && _prev_conf->get_repeat_playback())
104  {
105  _active_profile->_multistream.open();
106  _active_profile->_multistream.start(callbacks);
107  }
108  });
109  }
110  };
111  }
112 
113  _dispatcher.start();
114  profile->_multistream.open();
115  profile->_multistream.start(callbacks);
117  _prev_conf = std::make_shared<config>(*conf);
118  }
119 
121  {
122  std::lock_guard<std::mutex> lock(_mtx);
123  if (!_active_profile)
124  {
125  throw librealsense::wrong_api_call_sequence_exception("stop() cannot be called before start()");
126  }
127  unsafe_stop();
128  }
129 
131  {
132  if (_active_profile)
133  {
134  try
135  {
136  _syncer->stop();
137  _aggregator->stop();
138  auto dev = _active_profile->get_device();
139  if (auto playback = As<librealsense::playback_device>(dev))
140  {
141  playback->playback_status_changed -= _playback_stopped_token;
142  }
143  _active_profile->_multistream.stop();
144  _active_profile->_multistream.close();
145  _dispatcher.stop();
146  }
147  catch (...)
148  {
149  } // Stop will throw if device was disconnected. TODO - refactoring anticipated
150 
151  // shared pointers initialized when pipeline running with _active_profile
152  // should be reset with _active_profile too
153  _active_profile.reset();
154  _prev_conf.reset();
155  _streams_callback.reset();
156  }
157  }
158 
159  std::shared_ptr<device_interface> pipeline::wait_for_device(const std::chrono::milliseconds& timeout, const std::string& serial)
160  {
161  // pipeline's device selection shall be deterministic
162  return _hub.wait_for_device(timeout, false, serial);
163  }
164 
165  std::shared_ptr<librealsense::context> pipeline::get_context() const
166  {
167  return _ctx;
168  }
169 
170  std::vector<int> pipeline::on_start(std::shared_ptr<profile> profile)
171  {
172  std::vector<int> _streams_to_aggregate_ids;
173  std::vector<int> _streams_to_sync_ids;
174  bool sync_any = false;
176  sync_any = true;
177  // check wich of the active profiles should be synced and update the sync list accordinglly
178  for (auto&& s : profile->get_active_streams())
179  {
180  _streams_to_aggregate_ids.push_back(s->get_unique_id());
181  bool sync_current = sync_any;
182  if (!sync_any && std::find(_synced_streams.begin(), _synced_streams.end(), s->get_stream_type()) != _synced_streams.end())
183  sync_current = true;
184  if(sync_current)
185  _streams_to_sync_ids.push_back(s->get_unique_id());
186  }
187 
188  _syncer = std::unique_ptr<syncer_process_unit>(new syncer_process_unit());
189  _aggregator = std::unique_ptr<aggregator>(new aggregator(_streams_to_aggregate_ids, _streams_to_sync_ids));
190 
191  if (_streams_callback)
192  _aggregator->set_output_callback(_streams_callback);
193 
194  return _streams_to_sync_ids;
195  }
196 
197  frame_callback_ptr pipeline::get_callback(std::vector<int> synced_streams_ids)
198  {
199  auto pipeline_process_callback = [&](frame_holder fref)
200  {
201  _aggregator->invoke(std::move(fref));
202  };
203 
204  frame_callback_ptr to_pipeline_process = {
206  [](rs2_frame_callback* p) { p->release(); }
207  };
208 
209  _syncer->set_output_callback(to_pipeline_process);
210 
211  auto to_syncer = [&, synced_streams_ids](frame_holder fref)
212  {
213  // if the user requested to sync the frame push it to the syncer, otherwise push it to the aggregator
214  if (std::find(synced_streams_ids.begin(), synced_streams_ids.end(), fref->get_stream()->get_unique_id()) != synced_streams_ids.end())
215  _syncer->invoke(std::move(fref));
216  else
217  _aggregator->invoke(std::move(fref));
218  };
219 
220  frame_callback_ptr rv = {
222  [](rs2_frame_callback* p) { p->release(); }
223  };
224 
225  return rv;
226  }
227 
228  frame_holder pipeline::wait_for_frames(unsigned int timeout_ms)
229  {
230  std::lock_guard<std::mutex> lock(_mtx);
231  if (!_active_profile)
232  {
233  throw librealsense::wrong_api_call_sequence_exception("wait_for_frames cannot be called before start()");
234  }
235  if (_streams_callback)
236  {
237  throw librealsense::wrong_api_call_sequence_exception("wait_for_frames cannot be called if a callback was provided");
238  }
239 
240  frame_holder f;
241  if (_aggregator->dequeue(&f, timeout_ms))
242  {
243  return f;
244  }
245 
246  //hub returns true even if device already reconnected
247  if (!_hub.is_connected(*_active_profile->get_device()))
248  {
249  try
250  {
251  auto prev_conf = _prev_conf;
252  unsafe_stop();
253  unsafe_start(prev_conf);
254 
255  if (_aggregator->dequeue(&f, timeout_ms))
256  {
257  return f;
258  }
259 
260  }
261  catch (const std::exception& e)
262  {
263  throw std::runtime_error(to_string() << "Device disconnected. Failed to recconect: " << e.what() << timeout_ms);
264  }
265  }
266  throw std::runtime_error(to_string() << "Frame didn't arrive within " << timeout_ms);
267  }
268 
270  {
271  std::lock_guard<std::mutex> lock(_mtx);
272 
273  if (!_active_profile)
274  {
275  throw librealsense::wrong_api_call_sequence_exception("poll_for_frames cannot be called before start()");
276  }
277  if (_streams_callback)
278  {
279  throw librealsense::wrong_api_call_sequence_exception("poll_for_frames cannot be called if a callback was provided");
280  }
281 
282  if (_aggregator->try_dequeue(frame))
283  {
284  return true;
285  }
286  return false;
287  }
288 
289  bool pipeline::try_wait_for_frames(frame_holder* frame, unsigned int timeout_ms)
290  {
291  std::lock_guard<std::mutex> lock(_mtx);
292  if (!_active_profile)
293  {
294  throw librealsense::wrong_api_call_sequence_exception("try_wait_for_frames cannot be called before start()");
295  }
296  if (_streams_callback)
297  {
298  throw librealsense::wrong_api_call_sequence_exception("try_wait_for_frames cannot be called if a callback was provided");
299  }
300 
301  if (_aggregator->dequeue(frame, timeout_ms))
302  {
303  return true;
304  }
305 
306  //hub returns true even if device already reconnected
307  if (!_hub.is_connected(*_active_profile->get_device()))
308  {
309  try
310  {
311  auto prev_conf = _prev_conf;
312  unsafe_stop();
313  unsafe_start(prev_conf);
314  return _aggregator->dequeue(frame, timeout_ms);
315  }
316  catch (const std::exception& e)
317  {
318  LOG_INFO(e.what());
319  return false;
320  }
321  }
322  return false;
323  }
324  }
325 }
static const textual_icon lock
Definition: model-views.h:218
bool is_connected(const device_interface &dev)
Definition: device_hub.cpp:153
std::unique_ptr< aggregator > _aggregator
Definition: pipeline.h:58
GLdouble s
std::shared_ptr< rs2_frame_callback > frame_callback_ptr
Definition: src/types.h:1071
void start()
Definition: concurrency.h:286
GLfloat GLfloat p
Definition: glext.h:12687
bool poll_for_frames(frame_holder *frame)
frame_holder wait_for_frames(unsigned int timeout_ms)
GLsizei const GLchar *const * string
e
Definition: rmse.py:177
std::vector< rs2_stream > _synced_streams
Definition: pipeline.h:61
status
Defines return codes that SDK interfaces use. Negative values indicate errors, a zero value indicates...
virtual void release()=0
GLdouble t
bool try_wait_for_frames(frame_holder *frame, unsigned int timeout_ms)
GLdouble f
std::unique_ptr< syncer_process_unit > _syncer
Definition: pipeline.h:57
void unsafe_start(std::shared_ptr< config > conf)
void stop()
Definition: concurrency.h:294
std::vector< int > on_start(std::shared_ptr< profile > profile)
rs2_playback_status
std::shared_ptr< profile > start(std::shared_ptr< config > conf, frame_callback_ptr callback=nullptr)
def find(dir, mask)
Definition: file.py:25
def callback(frame)
Definition: t265_stereo.py:91
frame_callback_ptr get_callback(std::vector< int > unique_ids)
std::shared_ptr< device_interface > wait_for_device(const std::chrono::milliseconds &timeout=std::chrono::milliseconds(std::chrono::hours(1)), bool loop_through_devices=true, const std::string &serial="")
Definition: device_hub.cpp:120
#define RS2_PRODUCT_LINE_ANY_INTEL
Definition: rs_context.h:92
frame_callback_ptr _streams_callback
Definition: pipeline.h:60
LOG_INFO("Log message using LOG_INFO()")
std::shared_ptr< profile > _active_profile
Definition: pipeline.h:46
std::shared_ptr< config > _prev_conf
Definition: pipeline.h:48
GLbitfield GLuint64 timeout
std::shared_ptr< profile > unsafe_get_active_profile() const
std::shared_ptr< librealsense::context > get_context() const
void invoke(T item, bool is_blocking=false)
Definition: concurrency.h:254
typename::boost::move_detail::remove_reference< T >::type && move(T &&t) BOOST_NOEXCEPT
int i
std::shared_ptr< device_interface > wait_for_device(const std::chrono::milliseconds &timeout=std::chrono::hours::max(), const std::string &serial="")
std::shared_ptr< profile > get_active_profile() const
std::shared_ptr< librealsense::context > _ctx
Definition: pipeline.h:53
std::string to_string(T value)


librealsense2
Author(s): Sergey Dorodnicov , Doron Hirshberg , Mark Horn , Reagan Lopez , Itay Carpis
autogenerated on Mon May 3 2021 02:47:39