aggregator.cpp
Go to the documentation of this file.
1 // License: Apache 2.0. See LICENSE file in root directory.
2 // Copyright(c) 2015 Intel Corporation. All Rights Reserved.
3 
4 #include <algorithm>
5 #include "stream.h"
6 #include "aggregator.h"
7 #include <src/composite-frame.h>
9 
10 namespace librealsense
11 {
12  namespace pipeline
13  {
14  aggregator::aggregator(const std::vector<int>& streams_to_aggregate, const std::vector<int>& streams_to_sync) :
15  processing_block("aggregator"),
17  _streams_to_aggregate_ids(streams_to_aggregate),
18  _streams_to_sync_ids(streams_to_sync),
19  _accepting(true)
20  {
23  { handle_frame( std::move( frame ), source ); } ) );
24  }
25 
27  {
28  if (!_accepting) {
29  // If this causes stopping a pipeline with realtime=false playback device to
30  // generate high CPU utilization for a significant length of time, adding a
31  // short sleep here should mitigate it.
32 // std::this_thread::sleep_for(std::chrono::milliseconds(10));
33  return;
34  }
35  std::lock_guard<std::mutex> lock(_mutex);
36  auto comp = dynamic_cast<composite_frame*>(frame.frame);
37  if (comp)
38  {
39  for (auto i = 0; i < comp->get_embedded_frames_count(); i++)
40  {
41  auto f = comp->get_frame(i);
42  f->acquire();
43  _last_set[f->get_stream()->get_unique_id()] = f;
44  }
45 
46  // in case not all required streams were aggregated don't publish the frame set
47  for (int s : _streams_to_aggregate_ids)
48  {
49  if (!_last_set[s])
50  return;
51  }
52 
53  // prepare the output frame set for wait_for_frames/poll_frames calls
54  std::vector<frame_holder> sync_set;
55  // prepare the output frame set for the callbacks
56  std::vector<frame_holder> async_set;
57  for (auto&& s : _last_set)
58  {
59  sync_set.push_back(s.second.clone());
60  // send only the synchronized frames to the user callback
62  s.second->get_stream()->get_unique_id()) != _streams_to_sync_ids.end())
63  async_set.push_back(s.second.clone());
64  }
65 
66  frame_holder sync_fref = source->allocate_composite_frame(std::move(sync_set));
67  frame_holder async_fref = source->allocate_composite_frame(std::move(async_set));
68 
69  if (!sync_fref || !async_fref)
70  {
71  LOG_ERROR("Failed to allocate composite frame");
72  return;
73  }
74  // for async pipeline usage - provide only the synchronized frames to the user via callback
75  source->frame_ready(async_fref.clone());
76 
77  // for sync pipeline usage - push the aggregated to the output queue
78  _queue->enqueue(sync_fref.clone());
79  }
80  else
81  {
82  source->frame_ready(frame.clone());
83  _last_set[frame->get_stream()->get_unique_id()] = frame.clone();
84  if (_streams_to_sync_ids.empty() && _last_set.size() == _streams_to_aggregate_ids.size())
85  {
86  // prepare the output frame set for wait_for_frames/poll_frames calls
87  std::vector<frame_holder> sync_set;
88  for (auto&& s : _last_set)
89  sync_set.push_back(s.second.clone());
90 
91  frame_holder sync_fref = source->allocate_composite_frame(std::move(sync_set));
92  if (!sync_fref)
93  {
94  LOG_ERROR("Failed to allocate composite frame");
95  return;
96  }
97  // for sync pipeline usage - push the aggregated to the output queue
98  _queue->enqueue(sync_fref.clone());
99  }
100  }
101  }
102 
103  bool aggregator::dequeue(frame_holder* item, unsigned int timeout_ms)
104  {
105  return _queue->dequeue(item, timeout_ms);
106  }
107 
109  {
110  return _queue->try_dequeue(item);
111  }
112 
114  {
115  _accepting = true;
116  }
117 
119  {
120  _accepting = false;
121  _queue->stop();
122  }
123  }
124 }
librealsense
Definition: algo.h:18
librealsense::frame::get_stream
std::shared_ptr< stream_profile_interface > get_stream() const override
Definition: frame.h:58
rspy.file.find
def find(dir, mask)
Definition: file.py:45
librealsense::pipeline::pipeline
Definition: pipeline.h:22
librealsense::processing_block::set_processing_callback
void set_processing_callback(rs2_frame_processor_callback_sptr callback) override
Definition: synthetic-stream.cpp:22
librealsense::pipeline::aggregator::_queue
std::unique_ptr< single_consumer_frame_queue< frame_holder > > _queue
Definition: aggregator.h:25
librealsense::synthetic_source_interface
Definition: synthetic-source-interface.h:29
librealsense::frame_holder::clone
frame_holder clone() const
Definition: frame-holder.h:28
librealsense::pipeline::aggregator::stop
void stop()
Definition: aggregator.cpp:118
aggregator.h
composite-frame.h
f
GLdouble f
Definition: glad/glad/glad.h:1517
i
int i
Definition: rs-pcl-color.cpp:54
librealsense::pipeline::aggregator::try_dequeue
bool try_dequeue(frame_holder *item)
Definition: aggregator.cpp:108
librealsense::pipeline::aggregator::_streams_to_sync_ids
std::vector< int > _streams_to_sync_ids
Definition: aggregator.h:27
frame-processor-callback.h
librealsense::frame_holder
Definition: frame-holder.h:15
librealsense::frame_interface::acquire
virtual void acquire()=0
librealsense::pipeline::aggregator::_last_set
std::map< int, frame_holder > _last_set
Definition: aggregator.h:24
librealsense::composite_frame
Definition: composite-frame.h:9
librealsense::pipeline::aggregator::start
void start()
Definition: aggregator.cpp:113
librealsense::pipeline::aggregator::aggregator
aggregator(const std::vector< int > &streams_to_aggregate, const std::vector< int > &streams_to_sync)
Definition: aggregator.cpp:14
source
GLsizei GLsizei GLchar * source
Definition: glad/glad/glad.h:2828
librealsense::frame
Definition: frame.h:19
single_consumer_frame_queue
Definition: concurrency.h:198
librealsense::pipeline::aggregator::dequeue
bool dequeue(frame_holder *item, unsigned int timeout_ms)
Definition: aggregator.cpp:103
librealsense::pipeline::aggregator::_streams_to_aggregate_ids
std::vector< int > _streams_to_aggregate_ids
Definition: aggregator.h:26
librealsense::composite_frame::get_frame
frame_interface * get_frame(int i) const
Definition: composite-frame.h:17
librealsense::frame::frame
frame()
Definition: frame.h:26
librealsense::make_frame_processor_callback
rs2_frame_processor_callback_sptr make_frame_processor_callback(frame_processor_callback::fn &&callback)
Definition: frame-processor-callback.h:38
librealsense::pipeline::aggregator::_mutex
std::mutex _mutex
Definition: aggregator.h:23
librealsense::processing_block
Definition: synthetic-stream.h:53
rs2::textual_icons::lock
static const textual_icon lock
Definition: device-model.h:186
librealsense::pipeline::aggregator::_accepting
std::atomic< bool > _accepting
Definition: aggregator.h:28
librealsense::pipeline::aggregator::handle_frame
void handle_frame(frame_holder frame, synthetic_source_interface *source)
Definition: aggregator.cpp:26
s
GLdouble s
Definition: glad/glad/glad.h:2441
LOG_ERROR
#define LOG_ERROR(...)
Definition: easyloggingpp.h:73


librealsense2
Author(s): LibRealSense ROS Team
autogenerated on Fri Aug 2 2024 08:30:00