sync.cpp
Go to the documentation of this file.
1 #include <cmath>
2 #include "sync.h"
3 
4 using namespace rsimpl;
5 
6 syncronizing_archive::syncronizing_archive(const std::vector<subdevice_mode_selection> & selection,
7  rs_stream key_stream,
8  std::atomic<uint32_t>* max_size,
9  std::atomic<uint32_t>* event_queue_size,
10  std::atomic<uint32_t>* events_timeout,
11  std::chrono::high_resolution_clock::time_point capture_started)
12  : frame_archive(selection, max_size, capture_started), key_stream(key_stream),
13  ts_corrector(event_queue_size, events_timeout)
14 {
15  // Enumerate all streams we need to keep synchronized with the key stream
17  {
18  if(is_stream_enabled(s) && s != key_stream) other_streams.push_back(s);
19  }
20 
21  // Allocate an empty image for each stream, and move it to the frontbuffer
22  // This allows us to assume that get_frame_data/get_frame_timestamp always return valid data
23  alloc_frame(key_stream, frame_additional_data(), true);
24  frontbuffer.place_frame(key_stream, std::move(backbuffer[key_stream]));
25  for(auto s : other_streams)
26  {
28  frontbuffer.place_frame(s, std::move(backbuffer[s]));
29  }
30 }
31 
33 {
34  return frontbuffer.get_frame_data(stream);
35 }
36 
38 {
39  return frontbuffer.get_frame_timestamp(stream);
40 }
41 
43 {
44  return frontbuffer.get_frame_bpp(stream);
45 }
46 
48 {
49  return clone_frameset(&frontbuffer);
50 }
51 
53 {
54  return frontbuffer.get_frame_metadata(stream, frame_metadata);
55 }
56 
58 {
59  return frontbuffer.supports_frame_metadata(stream, frame_metadata);
60 }
61 
63 {
64  return frontbuffer.get_frame_number(stream);
65 }
66 
68 {
69  return frontbuffer.get_frame_system_time(stream);
70 }
71 
72 // Block until the next coherent frameset is available
74 {
75  std::unique_lock<std::recursive_mutex> lock(mutex);
76  const auto ready = [this]() { return !frames[key_stream].empty(); };
77  if(!ready() && !cv.wait_for(lock, std::chrono::seconds(5), ready)) throw std::runtime_error("Timeout waiting for frames.");
79 }
80 
81 // If a coherent frameset is available, obtain it and return true, otherwise return false immediately
83 {
84  // TODO: Implement a user-specifiable timeout for how long to wait before returning false?
85  std::unique_lock<std::recursive_mutex> lock(mutex);
86  if(frames[key_stream].empty()) return false;
88  return true;
89 }
90 
92 {
93  frameset * result = nullptr;
94  do
95  {
96  std::unique_lock<std::recursive_mutex> lock(mutex);
97  const auto ready = [this]() { return !frames[key_stream].empty(); };
98  if (!ready() && !cv.wait_for(lock, std::chrono::seconds(5), ready)) throw std::runtime_error("Timeout waiting for frames.");
100  result = clone_frontbuffer();
101  }
102  while (!result);
103  return result;
104 }
105 
107 {
108  // TODO: Implement a user-specifiable timeout for how long to wait before returning false?
109  std::unique_lock<std::recursive_mutex> lock(mutex);
110  if (frames[key_stream].empty()) return false;
111  get_next_frames();
112  auto result = clone_frontbuffer();
113  if (result)
114  {
115  *frameset = result;
116  return true;
117  }
118  return false;
119 }
120 
121 // Move frames from the queues to the frontbuffers to form the next coherent frameset
123 {
124  // Always dequeue a frame from the key stream
126 
127  // Dequeue from other streams if the new frame is closer to the timestamp of the key stream than the old frame
128  for(auto s : other_streams)
129  {
130  if (frames[s].empty())
131  continue;
132 
133  auto timestamp_of_new_frame = frames[s].front().additional_data.timestamp;
134  auto timestamp_of_old_frame = frontbuffer.get_frame_timestamp(s);
135  auto timestamp_of_key_stream = frontbuffer.get_frame_timestamp(key_stream);
136  if ((timestamp_of_new_frame > timestamp_of_key_stream) ||
137  (std::fabs(timestamp_of_new_frame - timestamp_of_key_stream) <= std::fabs(timestamp_of_old_frame - timestamp_of_key_stream)))
138  {
139  dequeue_frame(s);
140  }
141  }
142 }
143 
144 // Move a frame from the backbuffer to the back of the queue
146 {
147  std::unique_lock<std::recursive_mutex> lock(mutex);
148  frames[stream].push_back(std::move(backbuffer[stream]));
149  cull_frames();
150  lock.unlock();
151  if(!frames[key_stream].empty()) cv.notify_one();
152 }
153 
155 {
156  frontbuffer.cleanup(); // frontbuffer also holds frame references, since its content is publicly available through get_frame_data
158 }
159 
161 {
162  if (is_stream_enabled(stream))
163  {
164  ts_corrector.correct_timestamp(backbuffer[stream], stream);
165  }
166 }
167 
169 {
171 }
172 
174 {
175  return frontbuffer.get_frame_stride(stream);
176 }
177 
178 // Discard all frames which are older than the most recent coherent frameset
180 {
181  // Never keep more than four frames around in any given stream, regardless of timestamps
183  {
184  while(frames[s].size() > 4)
185  {
186  discard_frame(s);
187  }
188  }
189 
190  // Cannot do any culling unless at least one frame is enqueued for each enabled stream
191  if(frames[key_stream].empty()) return;
192  for(auto s : other_streams) if(frames[s].empty()) return;
193 
194  // We can discard frames from the key stream if we have at least two and the latter is closer to the most recent frame of all other streams than the former
195  while(true)
196  {
197  if(frames[key_stream].size() < 2) break;
198  const double t0 = frames[key_stream][0].additional_data.timestamp, t1 = frames[key_stream][1].additional_data.timestamp;
199 
200  bool valid_to_skip = true;
201  for(auto s : other_streams)
202  {
203  if (std::fabs(t0 - frames[s].back().additional_data.timestamp) < std::fabs(t1 - frames[s].back().additional_data.timestamp))
204  {
205  valid_to_skip = false;
206  break;
207  }
208  }
209  if(!valid_to_skip) break;
210 
212  }
213 
214  // We can discard frames for other streams if we have at least two and the latter is closer to the next key stream frame than the former
215  for(auto s : other_streams)
216  {
217  while(true)
218  {
219  if(frames[s].size() < 2) break;
220  const double t0 = frames[s][0].additional_data.timestamp, t1 = frames[s][1].additional_data.timestamp;
221 
222  if (std::fabs(t0 - frames[key_stream].front().additional_data.timestamp) < std::fabs(t1 - frames[key_stream].front().additional_data.timestamp)) break;
223  discard_frame(s);
224  }
225  }
226 }
227 
228 // Move a single frame from the head of the queue to the front buffer, while recycling the front buffer into the freelist
230 {
231  auto & frame = frames[stream].front();
232 
233  // Log callback started
234  auto callback_start_time = std::chrono::high_resolution_clock::now();
235  frame.update_frame_callback_start_ts(callback_start_time);
236  auto ts = std::chrono::duration_cast<std::chrono::milliseconds>(callback_start_time - capture_started).count();
237  LOG_DEBUG("CallbackStarted," << rsimpl::get_string(frame.get_stream_type()) << "," << frame.get_frame_number() << ",DispatchedAt," << ts);
238 
239  frontbuffer.place_frame(stream, std::move(frames[stream].front())); // the frame will move to free list once there are no external references to it
240  frames[stream].erase(begin(frames[stream]));
241 }
242 
243 // Move a single frame from the head of the queue directly to the freelist
245 {
246  std::lock_guard<std::recursive_mutex> guard(mutex);
247  freelist.push_back(std::move(frames[stream].front()));
248  frames[stream].erase(begin(frames[stream]));
249 }
double get_frame_timestamp(rs_stream stream) const
Definition: archive.h:201
syncronizing_archive(const std::vector< subdevice_mode_selection > &selection, rs_stream key_stream, std::atomic< uint32_t > *max_size, std::atomic< uint32_t > *event_queue_size, std::atomic< uint32_t > *events_timeout, std::chrono::high_resolution_clock::time_point capture_started=std::chrono::high_resolution_clock::now())
Definition: sync.cpp:6
void on_timestamp(rs_timestamp_data data)
Definition: sync.cpp:168
frame backbuffer[RS_STREAM_NATIVE_COUNT]
Definition: archive.h:223
unsigned long long get_frame_number(rs_stream stream) const
Definition: archive.h:202
double get_frame_metadata(rs_stream stream, rs_frame_metadata frame_metadata) const
Definition: sync.cpp:52
long long get_frame_system_time(rs_stream stream) const
Definition: sync.cpp:67
std::vector< frame > frames[RS_STREAM_NATIVE_COUNT]
Definition: sync.h:63
frame_metadata
Types of value provided from the device with each frame.
Definition: rs.hpp:160
virtual void flush()
Definition: archive.cpp:144
GLuint GLfloat GLfloat GLfloat GLfloat GLfloat GLfloat GLfloat t0
Definition: glext.h:8966
void commit_frame(rs_stream stream)
Definition: sync.cpp:145
byte * alloc_frame(rs_stream stream, const frame_additional_data &additional_data, bool requires_memory)
Definition: archive.cpp:88
int get_frame_stride(rs_stream stream) const
Definition: archive.h:204
GLuint GLfloat GLfloat GLfloat GLfloat GLfloat GLfloat GLfloat GLfloat GLfloat t1
Definition: glext.h:8966
Definition: archive.h:12
void correct_timestamp(frame_interface &frame, rs_stream stream) override
Definition: timestamps.cpp:110
unsigned long long get_frame_number() const override
Definition: archive.cpp:327
GLuint GLuint stream
Definition: glext.h:1774
std::condition_variable_any cv
Definition: sync.h:64
bool supports_frame_metadata(rs_stream stream, rs_frame_metadata frame_metadata) const
Definition: archive.h:199
#define LOG_DEBUG(...)
Definition: types.h:77
bool poll_for_frames_safe(frameset **frames)
Definition: sync.cpp:106
GLuint GLuint GLsizei count
Definition: glext.h:111
int get_frame_bpp(rs_stream stream) const
Definition: archive.h:205
void place_frame(rs_stream stream, frame &&new_frame)
Definition: archive.cpp:176
void update_frame_callback_start_ts(std::chrono::high_resolution_clock::time_point ts)
Definition: archive.cpp:363
GLint GLenum GLsizei GLsizei GLsizei GLint GLsizei const void * data
Definition: glext.h:223
void discard_frame(rs_stream stream)
Definition: sync.cpp:244
frameset * wait_for_frames_safe()
Definition: sync.cpp:91
std::recursive_mutex mutex
Definition: archive.h:225
Timestamp data from the motion microcontroller.
Definition: rs.h:339
const byte * get_frame_data(rs_stream stream) const
Definition: archive.h:200
GLfloat seconds
Definition: wglext.h:657
frameset * clone_frameset(frameset *frameset)
Definition: archive.cpp:25
std::chrono::high_resolution_clock::time_point capture_started
Definition: archive.h:226
const char * get_string(rs_stream value)
Definition: types.cpp:19
unsigned long long get_frame_number(rs_stream stream) const
Definition: sync.cpp:62
uint8_t byte
Definition: types.h:42
GLdouble s
Definition: glext.h:231
bool supports_frame_metadata(rs_stream stream, rs_frame_metadata frame_metadata) const
Definition: sync.cpp:57
rs_stream
Streams are different types of data provided by RealSense devices.
Definition: rs.h:33
long long get_frame_system_time(rs_stream stream) const
Definition: archive.h:203
GLsizeiptr size
Definition: glext.h:532
void correct_timestamp(rs_stream stream)
Definition: sync.cpp:160
int get_frame_bpp(rs_stream stream) const
Definition: sync.cpp:42
void dequeue_frame(rs_stream stream)
Definition: sync.cpp:229
const byte * get_frame_data(rs_stream stream) const
Definition: sync.cpp:32
std::vector< rs_stream > other_streams
Definition: sync.h:57
rs_frame_metadata
Types of value provided from the device with each frame.
Definition: rs.h:203
std::vector< frame > freelist
Definition: archive.h:224
rs_stream get_stream_type() const override
Definition: archive.cpp:373
int get_frame_stride(rs_stream stream) const
Definition: sync.cpp:173
frameset * clone_frontbuffer()
Definition: sync.cpp:47
void on_timestamp(rs_timestamp_data data) override
Definition: timestamps.cpp:80
bool is_stream_enabled(rs_stream stream) const
Definition: archive.h:232
GLuint64EXT * result
Definition: glext.h:9881
double get_frame_metadata(rs_stream stream, rs_frame_metadata frame_metadata) const
Definition: archive.h:198
double get_frame_timestamp(rs_stream stream) const
Definition: sync.cpp:37
timestamp_corrector ts_corrector
Definition: sync.h:71
void flush() override
Definition: sync.cpp:154


librealsense
Author(s): Sergey Dorodnicov , Mark Horn , Reagan Lopez
autogenerated on Fri Mar 13 2020 03:16:17