26 std::shared_ptr<sensor_interface>
get_sensor()
const override {
return _sensor.lock(); }
27 void set_sensor(std::shared_ptr<sensor_interface>
s)
override { _sensor =
s; }
34 std::lock_guard<std::recursive_mutex> guard(mutex);
41 if (
it->data.size() ==
size)
53 if (additional_data.
timestamp >
it->additional_data.timestamp + 1000)
it = freelist.erase(
it);
60 backbuffer.data.resize(size, 0);
62 backbuffer.additional_data = additional_data;
68 std::unique_lock<std::recursive_mutex>
lock(mutex);
70 auto published_frame = f.publish(this->shared_from_this());
73 published_frame->acquire();
74 return published_frame;
87 std::unique_lock<std::recursive_mutex>
lock(mutex);
115 if (published_frames_count >= max_frames
118 LOG_DEBUG(
"User didn't release frame resource.");
121 auto new_frame = (max_frames ? published_frames.
allocate() :
new T());
126 new_frame->mark_fixed();
141 if (frame && frame->get_stream())
143 auto callback_ended = _time_service ? _time_service->get_time() : 0;
144 auto callback_warning_duration = 1000 / (frame->get_stream()->get_framerate() + 1);
145 auto callback_duration = callback_ended - frame->get_frame_callback_start_time_point();
147 LOG_DEBUG(
"CallbackFinished," <<
rs2_stream_to_string(frame->get_stream()->get_stream_type()) <<
"," << std::dec << frame->get_frame_number()
148 <<
",DispatchedAt," << callback_ended);
150 if (callback_duration > callback_warning_duration)
153 <<
"#" << std::dec << frame->additional_data.frame_number
154 <<
"] overdue. (Duration: " << callback_duration
155 <<
"ms, FPS: " << frame->get_stream()->get_framerate() <<
", Max Duration: " << callback_warning_duration <<
"ms)");
166 std::shared_ptr<platform::time_service> ts,
167 std::shared_ptr<metadata_parser_map> parsers)
168 : max_frame_queue_size(in_max_frame_queue_size),
169 recycle_frames(true), mutex(), _time_service(ts),
170 _metadata_parsers(parsers)
172 published_frames_count = 0;
177 return { callback_inflight.
allocate(), &callback_inflight };
195 recycle_frames =
false;
197 auto callbacks_inflight = callback_inflight.
get_size();
198 if (callbacks_inflight > 0)
200 LOG_WARNING(callbacks_inflight <<
" callbacks are still running on some other threads. Waiting until all callbacks return...");
206 std::lock_guard<std::recursive_mutex> guard(mutex);
210 pending_frames = published_frames.
get_size();
211 if (pending_frames > 0)
213 LOG_INFO(
"The user was holding on to " 214 << std::dec << pending_frames <<
" frames after stream 0x" 215 << std::hex <<
this <<
" stopped" << std::dec);
222 if (pending_frames > 0)
225 << std::hex <<
this <<
" are now released by the user" << std::dec);
static const textual_icon lock
std::atomic< uint32_t > * max_frame_queue_size
callback_invocation_holder begin_callback() override
std::shared_ptr< metadata_parser_map > get_md_parsers() const override
void log_frame_callback_end(T *frame) const
small_heap< T, RS2_USER_QUEUE_SIZE > published_frames
void keep_frame(frame_interface *frame) override
void set_sensor(std::shared_ptr< sensor_interface > s) override
frame_interface * publish_frame(frame_interface *frame) override
frame_interface * alloc_and_track(const size_t size, const frame_additional_data &additional_data, bool requires_memory) override
std::recursive_mutex mutex
frame_interface * track_frame(T &f)
T alloc_frame(const size_t size, const frame_additional_data &additional_data, bool requires_memory)
std::weak_ptr< sensor_interface > _sensor
void release_frame_ref(frame_interface *ref)
LOG_INFO("Log message using LOG_INFO()")
frame_archive(std::atomic< uint32_t > *in_max_frame_queue_size, std::shared_ptr< platform::time_service > ts, std::shared_ptr< metadata_parser_map > parsers)
const char * rs2_stream_to_string(rs2_stream stream)
void unpublish_frame(frame_interface *frame) override
typename::boost::move_detail::remove_reference< T >::type && move(T &&t) BOOST_NOEXCEPT
std::atomic< bool > recycle_frames
std::shared_ptr< metadata_parser_map > _metadata_parsers
std::shared_ptr< platform::time_service > _time_service
std::atomic< uint32_t > published_frames_count
std::shared_ptr< sensor_interface > get_sensor() const override
std::vector< T > freelist
callbacks_heap callback_inflight