Go to the documentation of this file.00001 #include <cmath>
00002 #include "sync.h"
00003
00004 using namespace rsimpl;
00005
00006 syncronizing_archive::syncronizing_archive(const std::vector<subdevice_mode_selection> & selection,
00007 rs_stream key_stream,
00008 std::atomic<uint32_t>* max_size,
00009 std::atomic<uint32_t>* event_queue_size,
00010 std::atomic<uint32_t>* events_timeout,
00011 std::chrono::high_resolution_clock::time_point capture_started)
00012 : frame_archive(selection, max_size, capture_started), key_stream(key_stream),
00013 ts_corrector(event_queue_size, events_timeout)
00014 {
00015
00016 for(auto s : {RS_STREAM_DEPTH, RS_STREAM_INFRARED, RS_STREAM_INFRARED2, RS_STREAM_COLOR, RS_STREAM_FISHEYE})
00017 {
00018 if(is_stream_enabled(s) && s != key_stream) other_streams.push_back(s);
00019 }
00020
00021
00022
00023 alloc_frame(key_stream, frame_additional_data(), true);
00024 frontbuffer.place_frame(key_stream, std::move(backbuffer[key_stream]));
00025 for(auto s : other_streams)
00026 {
00027 alloc_frame(s, frame_additional_data(), true);
00028 frontbuffer.place_frame(s, std::move(backbuffer[s]));
00029 }
00030 }
00031
00032 const byte * syncronizing_archive::get_frame_data(rs_stream stream) const
00033 {
00034 return frontbuffer.get_frame_data(stream);
00035 }
00036
00037 double syncronizing_archive::get_frame_timestamp(rs_stream stream) const
00038 {
00039 return frontbuffer.get_frame_timestamp(stream);
00040 }
00041
00042 int syncronizing_archive::get_frame_bpp(rs_stream stream) const
00043 {
00044 return frontbuffer.get_frame_bpp(stream);
00045 }
00046
00047 frame_archive::frameset* syncronizing_archive::clone_frontbuffer()
00048 {
00049 return clone_frameset(&frontbuffer);
00050 }
00051
00052 double syncronizing_archive::get_frame_metadata(rs_stream stream, rs_frame_metadata frame_metadata) const
00053 {
00054 return frontbuffer.get_frame_metadata(stream, frame_metadata);
00055 }
00056
00057 bool syncronizing_archive::supports_frame_metadata(rs_stream stream, rs_frame_metadata frame_metadata) const
00058 {
00059 return frontbuffer.supports_frame_metadata(stream, frame_metadata);
00060 }
00061
00062 unsigned long long rsimpl::syncronizing_archive::get_frame_number(rs_stream stream) const
00063 {
00064 return frontbuffer.get_frame_number(stream);
00065 }
00066
00067 long long syncronizing_archive::get_frame_system_time(rs_stream stream) const
00068 {
00069 return frontbuffer.get_frame_system_time(stream);
00070 }
00071
00072
00073 void syncronizing_archive::wait_for_frames()
00074 {
00075 std::unique_lock<std::recursive_mutex> lock(mutex);
00076 const auto ready = [this]() { return !frames[key_stream].empty(); };
00077 if(!ready() && !cv.wait_for(lock, std::chrono::seconds(5), ready)) throw std::runtime_error("Timeout waiting for frames.");
00078 get_next_frames();
00079 }
00080
00081
00082 bool syncronizing_archive::poll_for_frames()
00083 {
00084
00085 std::unique_lock<std::recursive_mutex> lock(mutex);
00086 if(frames[key_stream].empty()) return false;
00087 get_next_frames();
00088 return true;
00089 }
00090
00091 frame_archive::frameset* syncronizing_archive::wait_for_frames_safe()
00092 {
00093 frameset * result = nullptr;
00094 do
00095 {
00096 std::unique_lock<std::recursive_mutex> lock(mutex);
00097 const auto ready = [this]() { return !frames[key_stream].empty(); };
00098 if (!ready() && !cv.wait_for(lock, std::chrono::seconds(5), ready)) throw std::runtime_error("Timeout waiting for frames.");
00099 get_next_frames();
00100 result = clone_frontbuffer();
00101 }
00102 while (!result);
00103 return result;
00104 }
00105
00106 bool syncronizing_archive::poll_for_frames_safe(frameset** frameset)
00107 {
00108
00109 std::unique_lock<std::recursive_mutex> lock(mutex);
00110 if (frames[key_stream].empty()) return false;
00111 get_next_frames();
00112 auto result = clone_frontbuffer();
00113 if (result)
00114 {
00115 *frameset = result;
00116 return true;
00117 }
00118 return false;
00119 }
00120
00121
00122 void syncronizing_archive::get_next_frames()
00123 {
00124
00125 dequeue_frame(key_stream);
00126
00127
00128 for(auto s : other_streams)
00129 {
00130 if (frames[s].empty())
00131 continue;
00132
00133 auto timestamp_of_new_frame = frames[s].front().additional_data.timestamp;
00134 auto timestamp_of_old_frame = frontbuffer.get_frame_timestamp(s);
00135 auto timestamp_of_key_stream = frontbuffer.get_frame_timestamp(key_stream);
00136 if ((timestamp_of_new_frame > timestamp_of_key_stream) ||
00137 (std::fabs(timestamp_of_new_frame - timestamp_of_key_stream) <= std::fabs(timestamp_of_old_frame - timestamp_of_key_stream)))
00138 {
00139 dequeue_frame(s);
00140 }
00141 }
00142 }
00143
00144
00145 void syncronizing_archive::commit_frame(rs_stream stream)
00146 {
00147 std::unique_lock<std::recursive_mutex> lock(mutex);
00148 frames[stream].push_back(std::move(backbuffer[stream]));
00149 cull_frames();
00150 lock.unlock();
00151 if(!frames[key_stream].empty()) cv.notify_one();
00152 }
00153
00154 void syncronizing_archive::flush()
00155 {
00156 frontbuffer.cleanup();
00157 frame_archive::flush();
00158 }
00159
00160 void syncronizing_archive::correct_timestamp(rs_stream stream)
00161 {
00162 if (is_stream_enabled(stream))
00163 {
00164 ts_corrector.correct_timestamp(backbuffer[stream], stream);
00165 }
00166 }
00167
00168 void syncronizing_archive::on_timestamp(rs_timestamp_data data)
00169 {
00170 ts_corrector.on_timestamp(data);
00171 }
00172
00173 int syncronizing_archive::get_frame_stride(rs_stream stream) const
00174 {
00175 return frontbuffer.get_frame_stride(stream);
00176 }
00177
00178
00179 void syncronizing_archive::cull_frames()
00180 {
00181
00182 for(auto s : {RS_STREAM_DEPTH, RS_STREAM_COLOR, RS_STREAM_INFRARED, RS_STREAM_INFRARED2, RS_STREAM_FISHEYE})
00183 {
00184 while(frames[s].size() > 4)
00185 {
00186 discard_frame(s);
00187 }
00188 }
00189
00190
00191 if(frames[key_stream].empty()) return;
00192 for(auto s : other_streams) if(frames[s].empty()) return;
00193
00194
00195 while(true)
00196 {
00197 if(frames[key_stream].size() < 2) break;
00198 const double t0 = frames[key_stream][0].additional_data.timestamp, t1 = frames[key_stream][1].additional_data.timestamp;
00199
00200 bool valid_to_skip = true;
00201 for(auto s : other_streams)
00202 {
00203 if (std::fabs(t0 - frames[s].back().additional_data.timestamp) < std::fabs(t1 - frames[s].back().additional_data.timestamp))
00204 {
00205 valid_to_skip = false;
00206 break;
00207 }
00208 }
00209 if(!valid_to_skip) break;
00210
00211 discard_frame(key_stream);
00212 }
00213
00214
00215 for(auto s : other_streams)
00216 {
00217 while(true)
00218 {
00219 if(frames[s].size() < 2) break;
00220 const double t0 = frames[s][0].additional_data.timestamp, t1 = frames[s][1].additional_data.timestamp;
00221
00222 if (std::fabs(t0 - frames[key_stream].front().additional_data.timestamp) < std::fabs(t1 - frames[key_stream].front().additional_data.timestamp)) break;
00223 discard_frame(s);
00224 }
00225 }
00226 }
00227
00228
00229 void syncronizing_archive::dequeue_frame(rs_stream stream)
00230 {
00231 auto & frame = frames[stream].front();
00232
00233
00234 auto callback_start_time = std::chrono::high_resolution_clock::now();
00235 frame.update_frame_callback_start_ts(callback_start_time);
00236 auto ts = std::chrono::duration_cast<std::chrono::milliseconds>(callback_start_time - capture_started).count();
00237 LOG_DEBUG("CallbackStarted," << rsimpl::get_string(frame.get_stream_type()) << "," << frame.get_frame_number() << ",DispatchedAt," << ts);
00238
00239 frontbuffer.place_frame(stream, std::move(frames[stream].front()));
00240 frames[stream].erase(begin(frames[stream]));
00241 }
00242
00243
00244 void syncronizing_archive::discard_frame(rs_stream stream)
00245 {
00246 std::lock_guard<std::recursive_mutex> guard(mutex);
00247 freelist.push_back(std::move(frames[stream].front()));
00248 frames[stream].erase(begin(frames[stream]));
00249 }