frame-archive.h
Go to the documentation of this file.
1 /* License: Apache 2.0. See LICENSE file in root directory. */
2 /* Copyright(c) 2019 Intel Corporation. All Rights Reserved. */
3 #pragma once
4 
5 #include "archive.h"
6 
7 namespace librealsense
8 {
9  // Defines general frames storage model
10  template<class T>
11  class frame_archive : public std::enable_shared_from_this<frame_archive<T>>, public archive_interface
12  {
13  std::atomic<uint32_t>* max_frame_queue_size;
14  std::atomic<uint32_t> published_frames_count;
16  std::shared_ptr<metadata_parser_map> _metadata_parsers = nullptr;
18 
19  std::vector<T> freelist; // return frames here
20  std::atomic<bool> recycle_frames;
21  int pending_frames = 0;
22  std::recursive_mutex mutex;
23  std::shared_ptr<platform::time_service> _time_service;
24 
25  std::weak_ptr<sensor_interface> _sensor;
26  std::shared_ptr<sensor_interface> get_sensor() const override { return _sensor.lock(); }
27  void set_sensor(std::shared_ptr<sensor_interface> s) override { _sensor = s; }
28 
29  T alloc_frame(const size_t size, const frame_additional_data& additional_data, bool requires_memory)
30  {
31  T backbuffer;
32  //const size_t size = modes[stream].get_image_size(stream);
33  {
34  std::lock_guard<std::recursive_mutex> guard(mutex);
35 
36  if (requires_memory)
37  {
38  // Attempt to obtain a buffer of the appropriate size from the freelist
39  for (auto it = begin(freelist); it != end(freelist); ++it)
40  {
41  if (it->data.size() == size)
42  {
43  backbuffer = std::move(*it);
44  freelist.erase(it);
45  break;
46  }
47  }
48  }
49 
50  // Discard buffers that have been in the freelist for longer than 1s
51  for (auto it = begin(freelist); it != end(freelist);)
52  {
53  if (additional_data.timestamp > it->additional_data.timestamp + 1000) it = freelist.erase(it);
54  else ++it;
55  }
56  }
57 
58  if (requires_memory)
59  {
60  backbuffer.data.resize(size, 0); // TODO: Allow users to provide a custom allocator for frame buffers
61  }
62  backbuffer.additional_data = additional_data;
63  return backbuffer;
64  }
65 
67  {
68  std::unique_lock<std::recursive_mutex> lock(mutex);
69 
70  auto published_frame = f.publish(this->shared_from_this());
71  if (published_frame)
72  {
73  published_frame->acquire();
74  return published_frame;
75  }
76 
77  LOG_DEBUG("publish(...) failed");
78  return nullptr;
79  }
80 
82  {
83  if (frame)
84  {
85  auto f = (T*)frame;
87  std::unique_lock<std::recursive_mutex> lock(mutex);
88 
89  frame->keep();
90 
91  if (recycle_frames)
92  {
93  freelist.push_back(std::move(*f));
94  }
95  lock.unlock();
96 
97  if (f->is_fixed())
98  published_frames.deallocate(f);
99  else
100  delete f;
101  }
102  }
103 
105  {
107  }
108 
110  {
111  auto f = (T*)frame;
112 
113  unsigned int max_frames = *max_frame_queue_size;
114 
115  if (published_frames_count >= max_frames
116  && max_frames)
117  {
118  LOG_DEBUG("User didn't release frame resource.");
119  return nullptr;
120  }
121  auto new_frame = (max_frames ? published_frames.allocate() : new T());
122 
123  if (new_frame)
124  {
125  if (max_frames)
126  new_frame->mark_fixed();
127  }
128  else
129  {
130  new_frame = new T();
131  }
132 
134  *new_frame = std::move(*f);
135 
136  return new_frame;
137  }
138 
140  {
141  if (frame && frame->get_stream())
142  {
143  auto callback_ended = _time_service ? _time_service->get_time() : 0;
144  auto callback_warning_duration = 1000 / (frame->get_stream()->get_framerate() + 1);
145  auto callback_duration = callback_ended - frame->get_frame_callback_start_time_point();
146 
147  LOG_DEBUG("CallbackFinished," << rs2_stream_to_string(frame->get_stream()->get_stream_type()) << "," << std::dec << frame->get_frame_number()
148  << ",DispatchedAt," << callback_ended);
149 
150  if (callback_duration > callback_warning_duration)
151  {
152  LOG_DEBUG("Frame Callback [" << rs2_stream_to_string(frame->get_stream()->get_stream_type())
153  << "#" << std::dec << frame->additional_data.frame_number
154  << "] overdue. (Duration: " << callback_duration
155  << "ms, FPS: " << frame->get_stream()->get_framerate() << ", Max Duration: " << callback_warning_duration << "ms)");
156  }
157  }
158  }
159 
160  std::shared_ptr<metadata_parser_map> get_md_parsers() const override { return _metadata_parsers; };
161 
162  friend class frame;
163 
164  public:
165  explicit frame_archive(std::atomic<uint32_t>* in_max_frame_queue_size,
166  std::shared_ptr<platform::time_service> ts,
167  std::shared_ptr<metadata_parser_map> parsers)
168  : max_frame_queue_size(in_max_frame_queue_size),
169  recycle_frames(true), mutex(), _time_service(ts),
170  _metadata_parsers(parsers)
171  {
172  published_frames_count = 0;
173  }
174 
176  {
177  return { callback_inflight.allocate(), &callback_inflight };
178  }
179 
181  {
182  ref->release();
183  }
184 
185  frame_interface* alloc_and_track(const size_t size, const frame_additional_data& additional_data, bool requires_memory) override
186  {
187  auto frame = alloc_frame(size, additional_data, requires_memory);
188  return track_frame(frame);
189  }
190 
191  void flush() override
192  {
193  published_frames.stop_allocation();
194  callback_inflight.stop_allocation();
195  recycle_frames = false;
196 
197  auto callbacks_inflight = callback_inflight.get_size();
198  if (callbacks_inflight > 0)
199  {
200  LOG_WARNING(callbacks_inflight << " callbacks are still running on some other threads. Waiting until all callbacks return...");
201  }
202  // wait until user is done with all the stuff he chose to borrow
203  callback_inflight.wait_until_empty();
204 
205  {
206  std::lock_guard<std::recursive_mutex> guard(mutex);
207  freelist.clear();
208  }
209 
210  pending_frames = published_frames.get_size();
211  if (pending_frames > 0)
212  {
213  LOG_INFO("The user was holding on to "
214  << std::dec << pending_frames << " frames after stream 0x"
215  << std::hex << this << " stopped" << std::dec);
216  }
217  // frames and their frame refs are not flushed, by design
218  }
219 
221  {
222  if (pending_frames > 0)
223  {
224  LOG_DEBUG("All frames from stream 0x"
225  << std::hex << this << " are now released by the user" << std::dec);
226  }
227  }
228 
229  };
230 
231 }
static const textual_icon lock
Definition: model-views.h:218
GLuint GLuint end
void deallocate(T *item)
Definition: src/types.h:1201
std::atomic< uint32_t > * max_frame_queue_size
Definition: frame-archive.h:13
callback_invocation_holder begin_callback() override
GLdouble s
std::shared_ptr< metadata_parser_map > get_md_parsers() const override
#define LOG_WARNING(...)
Definition: src/types.h:241
void log_frame_callback_end(T *frame) const
small_heap< T, RS2_USER_QUEUE_SIZE > published_frames
Definition: frame-archive.h:15
void keep_frame(frame_interface *frame) override
void set_sensor(std::shared_ptr< sensor_interface > s) override
Definition: frame-archive.h:27
frame_interface * publish_frame(frame_interface *frame) override
frame_interface * alloc_and_track(const size_t size, const frame_additional_data &additional_data, bool requires_memory) override
not_this_one begin(...)
std::recursive_mutex mutex
Definition: frame-archive.h:22
GLdouble f
frame_interface * track_frame(T &f)
Definition: frame-archive.h:66
GLsizeiptr size
T alloc_frame(const size_t size, const frame_additional_data &additional_data, bool requires_memory)
Definition: frame-archive.h:29
std::weak_ptr< sensor_interface > _sensor
Definition: frame-archive.h:25
void release_frame_ref(frame_interface *ref)
LOG_INFO("Log message using LOG_INFO()")
frame_archive(std::atomic< uint32_t > *in_max_frame_queue_size, std::shared_ptr< platform::time_service > ts, std::shared_ptr< metadata_parser_map > parsers)
const char * rs2_stream_to_string(rs2_stream stream)
Definition: rs.cpp:1262
static auto it
GLint ref
void unpublish_frame(frame_interface *frame) override
Definition: frame-archive.h:81
typename::boost::move_detail::remove_reference< T >::type && move(T &&t) BOOST_NOEXCEPT
std::atomic< bool > recycle_frames
Definition: frame-archive.h:20
std::shared_ptr< metadata_parser_map > _metadata_parsers
Definition: frame-archive.h:16
#define LOG_DEBUG(...)
Definition: src/types.h:239
std::shared_ptr< platform::time_service > _time_service
Definition: frame-archive.h:23
std::atomic< uint32_t > published_frames_count
Definition: frame-archive.h:14
std::shared_ptr< sensor_interface > get_sensor() const override
Definition: frame-archive.h:26
std::vector< T > freelist
Definition: frame-archive.h:19
callbacks_heap callback_inflight
Definition: frame-archive.h:17


librealsense2
Author(s): Sergey Dorodnicov , Doron Hirshberg , Mark Horn , Reagan Lopez , Itay Carpis
autogenerated on Mon May 3 2021 02:47:14