aggregator.cpp
Go to the documentation of this file.
1 // License: Apache 2.0. See LICENSE file in root directory.
2 // Copyright(c) 2015 Intel Corporation. All Rights Reserved.
3 
4 #include <algorithm>
5 #include "stream.h"
6 #include "aggregator.h"
7 
8 namespace librealsense
9 {
10  namespace pipeline
11  {
12  aggregator::aggregator(const std::vector<int>& streams_to_aggregate, const std::vector<int>& streams_to_sync) :
13  processing_block("aggregator"),
15  _streams_to_aggregate_ids(streams_to_aggregate),
16  _streams_to_sync_ids(streams_to_sync),
17  _accepting(true)
18  {
19  auto processing_callback = [&](frame_holder frame, synthetic_source_interface* source)
20  {
21  handle_frame(std::move(frame), source);
22  };
23 
24  set_processing_callback(std::shared_ptr<rs2_frame_processor_callback>(
25  new internal_frame_processor_callback<decltype(processing_callback)>(processing_callback)));
26  }
27 
29  {
30  if (!_accepting) {
31  // If this causes stopping a pipeline with realtime=false playback device to
32  // generate high CPU utilization for a significant length of time, adding a
33  // short sleep here should mitigate it.
34 // std::this_thread::sleep_for(std::chrono::milliseconds(10));
35  return;
36  }
37  std::lock_guard<std::mutex> lock(_mutex);
38  auto comp = dynamic_cast<composite_frame*>(frame.frame);
39  if (comp)
40  {
41  for (auto i = 0; i < comp->get_embedded_frames_count(); i++)
42  {
43  auto f = comp->get_frame(i);
44  f->acquire();
45  _last_set[f->get_stream()->get_unique_id()] = f;
46  }
47 
48  // in case not all required streams were aggregated don't publish the frame set
49  for (int s : _streams_to_aggregate_ids)
50  {
51  if (!_last_set[s])
52  return;
53  }
54 
55  // prepare the output frame set for wait_for_frames/poll_frames calls
56  std::vector<frame_holder> sync_set;
57  // prepare the output frame set for the callbacks
58  std::vector<frame_holder> async_set;
59  for (auto&& s : _last_set)
60  {
61  sync_set.push_back(s.second.clone());
62  // send only the synchronized frames to the user callback
64  s.second->get_stream()->get_unique_id()) != _streams_to_sync_ids.end())
65  async_set.push_back(s.second.clone());
66  }
67 
68  frame_holder sync_fref = source->allocate_composite_frame(std::move(sync_set));
69  frame_holder async_fref = source->allocate_composite_frame(std::move(async_set));
70 
71  if (!sync_fref || !async_fref)
72  {
73  LOG_ERROR("Failed to allocate composite frame");
74  return;
75  }
76  // for async pipeline usage - provide only the synchronized frames to the user via callback
77  source->frame_ready(async_fref.clone());
78 
79  // for sync pipeline usage - push the aggregated to the output queue
80  _queue->enqueue(sync_fref.clone());
81  }
82  else
83  {
84  source->frame_ready(frame.clone());
85  _last_set[frame->get_stream()->get_unique_id()] = frame.clone();
86  if (_streams_to_sync_ids.empty() && _last_set.size() == _streams_to_aggregate_ids.size())
87  {
88  // prepare the output frame set for wait_for_frames/poll_frames calls
89  std::vector<frame_holder> sync_set;
90  for (auto&& s : _last_set)
91  sync_set.push_back(s.second.clone());
92 
93  frame_holder sync_fref = source->allocate_composite_frame(std::move(sync_set));
94  if (!sync_fref)
95  {
96  LOG_ERROR("Failed to allocate composite frame");
97  return;
98  }
99  // for sync pipeline usage - push the aggregated to the output queue
100  _queue->enqueue(sync_fref.clone());
101  }
102  }
103  }
104 
105  bool aggregator::dequeue(frame_holder* item, unsigned int timeout_ms)
106  {
107  return _queue->dequeue(item, timeout_ms);
108  }
109 
111  {
112  return _queue->try_dequeue(item);
113  }
114 
116  {
117  _accepting = true;
118  }
119 
121  {
122  _accepting = false;
123  _queue->stop();
124  }
125  }
126 }
static const textual_icon lock
Definition: model-views.h:219
GLsizei GLsizei GLchar * source
#define LOG_ERROR(...)
Definition: easyloggingpp.h:58
virtual void frame_ready(frame_holder result)=0
GLdouble s
bool dequeue(frame_holder *item, unsigned int timeout_ms)
Definition: aggregator.cpp:105
std::vector< int > _streams_to_aggregate_ids
Definition: aggregator.h:19
frame_holder clone() const
Definition: frame.h:160
bool try_dequeue(frame_holder *item)
Definition: aggregator.cpp:110
std::map< stream_id, frame_holder > _last_set
Definition: aggregator.h:17
aggregator(const std::vector< int > &streams_to_aggregate, const std::vector< int > &streams_to_sync)
Definition: aggregator.cpp:12
std::unique_ptr< single_consumer_frame_queue< frame_holder > > _queue
Definition: aggregator.h:18
std::atomic< bool > _accepting
Definition: aggregator.h:21
def find(dir, mask)
Definition: file.py:45
void set_processing_callback(frame_processor_callback_ptr callback) override
void handle_frame(frame_holder frame, synthetic_source_interface *source)
Definition: aggregator.cpp:28
frame_interface * frame
Definition: frame.h:150
GLdouble f
typename ::boost::move_detail::remove_reference< T >::type && move(T &&t) BOOST_NOEXCEPT
frame_interface * get_frame(int i) const
int i
virtual frame_interface * allocate_composite_frame(std::vector< frame_holder > frames)=0
virtual std::shared_ptr< stream_profile_interface > get_stream() const =0
std::vector< int > _streams_to_sync_ids
Definition: aggregator.h:20


librealsense2
Author(s): LibRealSense ROS Team
autogenerated on Thu Dec 22 2022 03:41:41