12 #define LOG_IF_ENABLE( OSTREAM, ENV ) \ 15 LOG_DEBUG( OSTREAM ); \ 21 : _streams_id(streams_id){}
69 if (callbacks_inflight > 0)
71 LOG_WARNING(callbacks_inflight <<
" callbacks are still running on some other threads. Waiting until all callbacks return...");
106 for (
auto&&
matcher : matchers)
130 clean_inactive_streams(f);
131 auto matcher = find_matcher(f);
135 update_last_arrived(f,
matcher.get());
140 LOG_ERROR(
"didn't find any matcher for " << *f.
frame <<
" will not be synchronized");
148 std::shared_ptr<matcher>
matcher;
154 auto dev_exist =
false;
162 dev =
sensor->get_device().shared_from_this().get();
164 catch (
const std::bad_weak_ptr&)
174 std::ostringstream ss;
175 for (
auto const &
it : _matchers)
176 ss <<
' ' <<
it.first;
177 LOG_DEBUG(
"stream id " <<
stream_id <<
" was not found; trying to create, existing streams=" << ss.str());
180 matcher->set_callback(
186 for (
auto stream : matcher->get_streams())
190 _frames_queue.erase(_matchers[stream].
get());
195 for (
auto stream : matcher->get_streams_types())
205 else if (!matcher->get_active())
207 matcher->set_active(
true);
208 _frames_queue[matcher.get()].start();
225 _frames_queue.erase(_matchers[stream_id].
get());
243 for (
auto& fq : _frames_queue)
252 for (
auto m : matchers)
255 if(_frames_queue[
m].peek(&f))
265 update_next_expected(f);
266 auto matcher = find_matcher(f);
276 std::vector<frame_holder*> frames_arrived;
277 std::vector<librealsense::matcher*> frames_arrived_matchers;
278 std::vector<librealsense::matcher*> synced_frames;
279 std::vector<librealsense::matcher*> missing_streams;
284 auto old_frames =
false;
286 synced_frames.clear();
287 missing_streams.clear();
288 frames_arrived_matchers.clear();
289 frames_arrived.clear();
292 for (
auto s = _frames_queue.begin(); s != _frames_queue.end(); s++)
295 if (s->second.peek(&f))
297 frames_arrived.push_back(f);
298 frames_arrived_matchers.push_back(s->first);
302 missing_streams.push_back(s->first);
306 if (frames_arrived.size() == 0)
310 if (frames_arrived.size() > 0)
312 curr_sync = frames_arrived[0];
313 synced_frames.push_back(frames_arrived_matchers[0]);
316 for (
auto i = 1;
i < frames_arrived.size();
i++)
318 if (are_equivalent(*curr_sync, *frames_arrived[
i]))
320 synced_frames.push_back(frames_arrived_matchers[i]);
322 else if (is_smaller_than(*frames_arrived[i], *curr_sync))
325 synced_frames.clear();
326 synced_frames.push_back(frames_arrived_matchers[i]);
327 curr_sync = frames_arrived[
i];
337 for (
auto i : missing_streams)
339 if (!skip_missing_stream(synced_frames,
i, env))
341 LOG_IF_ENABLE(
" "<<frames_to_string(synced_frames )<<
" Wait for missing stream: ", env);
343 for (
auto&&
stream :
i->get_streams())
345 synced_frames.clear();
351 LOG_IF_ENABLE(
_name <<
" " << frames_to_string(synced_frames) <<
" Skipped missing stream: ", env);
352 for (
auto&&
stream :
i->get_streams())
363 if (synced_frames.size())
365 std::vector<frame_holder> match;
366 match.reserve(synced_frames.size());
368 for (
auto index : synced_frames)
371 int timeout_ms = 5000;
372 _frames_queue[
index].dequeue(&frame, timeout_ms);
393 }
while (synced_frames.size() > 0);
416 std::vector<stream_id> inactive_matchers;
422 s <<
"clean inactive stream in "<<
_name;
423 for (
auto stream :
m.second->get_streams_types())
429 inactive_matchers.push_back(
m.first);
430 m.second->set_active(
false);
434 for(
auto id: inactive_matchers)
451 if((*synced_frame)->get_frame_number() - next_expected > 4 || (*synced_frame)->get_frame_number() < next_expected)
490 auto min_fps =
std::min(a_fps, b_fps);
506 return ts.first < ts.second;
534 auto gap = 1000.f / (float)
fps;
552 std::vector<stream_id> dead_matchers;
561 s <<
"clean inactive stream in "<<
_name;
562 for (
auto stream :
m.second->get_streams_types())
568 dead_matchers.push_back(
m.first);
569 m.second->set_active(
false);
573 for(
auto id: dead_matchers)
594 if (
it->second != (*synced_frame)->get_frame_timestamp_domain())
599 auto gap = 1000.f/ (float)
get_fps(*synced_frame);
601 if((*synced_frame)->get_frame_timestamp() > next_expected && abs((*synced_frame)->get_frame_timestamp()- next_expected)<gap*10)
603 LOG_IF_ENABLE(
"next expected of the missing stream didn't updated yet", env);
607 return !
are_equivalent((*synced_frame)->get_frame_timestamp(), next_expected,
get_fps(*synced_frame));
612 auto gap = 1000.f / (float)fps;
613 return abs(a - b) < ((float)gap / (
float)2) ;
627 std::vector<frame_holder> match;
virtual rs2_metadata_type get_frame_metadata(const rs2_frame_metadata_value &frame_metadata) const =0
timestamp_composite_matcher(std::vector< std::shared_ptr< matcher >> matchers)
virtual rs2_timestamp_domain get_frame_timestamp_domain() const =0
GLboolean GLboolean GLboolean b
std::map< matcher *, double > _next_expected
GLuint const GLchar * name
void clean_inactive_streams(frame_holder &f) override
std::shared_ptr< platform::time_service > get_time_service()
const std::vector< stream_id > & get_streams() const override
virtual void dispatch(frame_holder f, const syncronization_environment &env)=0
void sync(frame_holder f, const syncronization_environment &env) override
bool is_smaller_than(frame_holder &a, frame_holder &b) override
std::function< void(frame_holder, const syncronization_environment &)> sync_callback
std::pair< double, double > extract_timestamps(frame_holder &a, frame_holder &b)
std::vector< rs2_stream > _streams_type
GLsizei const GLchar *const * string
identity_matcher(stream_id stream, rs2_stream streams_type)
void sort(sort_type m_sort_type, const std::string &in, const std::string &out)
matcher(std::vector< stream_id > streams_id={})
void dispatch(frame_holder f, const syncronization_environment &env) override
GLboolean GLboolean GLboolean GLboolean a
const std::vector< rs2_stream > & get_streams_types() const override
callback_invocation_holder begin_callback()
std::string frames_to_string(std::vector< librealsense::matcher * > matchers)
std::map< stream_id, std::shared_ptr< matcher > > _matchers
callbacks_heap _callback_inflight
std::shared_ptr< matcher > find_matcher(const frame_holder &f)
bool skip_missing_stream(std::vector< matcher * > synced, matcher *missing, const syncronization_environment &env) override
unsigned int get_fps(const frame_holder &f)
bool is_smaller_than(frame_holder &a, frame_holder &b) override
bool are_equivalent(frame_holder &a, frame_holder &b) override
void dispatch(frame_holder f, const syncronization_environment &env) override
std::string frame_to_string(const frame_interface &f)
composite_identity_matcher(std::vector< std::shared_ptr< matcher >> matchers)
virtual rs2_time_t get_frame_timestamp() const =0
std::string create_composite_name(const std::vector< std::shared_ptr< matcher >> &matchers, const std::string &name)
bool are_equivalent(frame_holder &a, frame_holder &b) override
synthetic_source_interface * source
std::map< matcher *, single_consumer_frame_queue< frame_holder > > _frames_queue
rs2_stream
Streams are different types of data provided by RealSense devices.
virtual std::shared_ptr< sensor_interface > get_sensor() const =0
static environment & get_instance()
void update_next_expected(const frame_holder &f) override
void update_next_expected(const frame_holder &f) override
virtual std::shared_ptr< stream_profile_interface > get_stream() const =0
void set_active(const bool active)
std::map< matcher *, rs2_timestamp_domain > _next_expected_domain
std::map< matcher *, unsigned long long > _last_arrived
frame_number_composite_matcher(std::vector< std::shared_ptr< matcher >> matchers)
virtual void update_last_arrived(frame_holder &f, matcher *m) override
const char * rs2_stream_to_string(rs2_stream stream)
std::map< matcher *, double > _last_arrived
virtual std::string get_name() const override
composite_matcher(std::vector< std::shared_ptr< matcher >> matchers, std::string name)
typename::boost::move_detail::remove_reference< T >::type && move(T &&t) BOOST_NOEXCEPT
void sync(frame_holder f, const syncronization_environment &env) override
#define LOG_IF_ENABLE(OSTREAM, ENV)
std::string frame_holder_to_string(const frame_holder &f)
virtual void update_last_arrived(frame_holder &f, matcher *m) override
virtual void set_callback(sync_callback f) override
bool skip_missing_stream(std::vector< matcher * > synced, matcher *missing, const syncronization_environment &env) override
virtual void stop() override
void clean_inactive_streams(frame_holder &f) override
virtual std::shared_ptr< matcher > create_matcher(const frame_holder &frame) const =0
std::map< matcher *, unsigned int > _fps
virtual frame_interface * allocate_composite_frame(std::vector< frame_holder > frames)=0
virtual unsigned long long get_frame_number() const =0
virtual void sync(frame_holder f, const syncronization_environment &env) override
std::vector< stream_id > _streams_id
virtual bool supports_frame_metadata(const rs2_frame_metadata_value &frame_metadata) const =0