sync.cpp
Go to the documentation of this file.
00001 #include <cmath>
00002 #include "sync.h"
00003 
00004 using namespace rsimpl;
00005 
00006 syncronizing_archive::syncronizing_archive(const std::vector<subdevice_mode_selection> & selection,
00007     rs_stream key_stream,
00008     std::atomic<uint32_t>* max_size,
00009     std::atomic<uint32_t>* event_queue_size,
00010     std::atomic<uint32_t>* events_timeout,
00011     std::chrono::high_resolution_clock::time_point capture_started)
00012     : frame_archive(selection, max_size, capture_started), key_stream(key_stream),
00013     ts_corrector(event_queue_size, events_timeout)
00014 {
00015     // Enumerate all streams we need to keep synchronized with the key stream
00016     for(auto s : {RS_STREAM_DEPTH, RS_STREAM_INFRARED, RS_STREAM_INFRARED2, RS_STREAM_COLOR, RS_STREAM_FISHEYE})
00017     {
00018         if(is_stream_enabled(s) && s != key_stream) other_streams.push_back(s);
00019     }
00020 
00021     // Allocate an empty image for each stream, and move it to the frontbuffer
00022     // This allows us to assume that get_frame_data/get_frame_timestamp always return valid data
00023     alloc_frame(key_stream, frame_additional_data(), true);
00024     frontbuffer.place_frame(key_stream, std::move(backbuffer[key_stream]));
00025     for(auto s : other_streams)
00026     {
00027         alloc_frame(s, frame_additional_data(), true);
00028         frontbuffer.place_frame(s, std::move(backbuffer[s]));
00029     }
00030 }
00031 
00032 const byte * syncronizing_archive::get_frame_data(rs_stream stream) const
00033 {
00034     return frontbuffer.get_frame_data(stream);
00035 }
00036 
00037 double syncronizing_archive::get_frame_timestamp(rs_stream stream) const
00038 {
00039     return frontbuffer.get_frame_timestamp(stream);
00040 }
00041 
00042 int syncronizing_archive::get_frame_bpp(rs_stream stream) const
00043 {
00044     return frontbuffer.get_frame_bpp(stream);
00045 }
00046 
00047 frame_archive::frameset* syncronizing_archive::clone_frontbuffer()
00048 {
00049     return clone_frameset(&frontbuffer);
00050 }
00051 
00052 double syncronizing_archive::get_frame_metadata(rs_stream stream, rs_frame_metadata frame_metadata) const
00053 {
00054     return frontbuffer.get_frame_metadata(stream, frame_metadata);
00055 }
00056 
00057 bool syncronizing_archive::supports_frame_metadata(rs_stream stream, rs_frame_metadata frame_metadata) const
00058 {
00059     return frontbuffer.supports_frame_metadata(stream, frame_metadata);
00060 }
00061 
00062 unsigned long long rsimpl::syncronizing_archive::get_frame_number(rs_stream stream) const
00063 {
00064     return frontbuffer.get_frame_number(stream);
00065 }
00066 
00067 long long syncronizing_archive::get_frame_system_time(rs_stream stream) const
00068 {
00069     return frontbuffer.get_frame_system_time(stream);
00070 }
00071 
00072 // Block until the next coherent frameset is available
00073 void syncronizing_archive::wait_for_frames()
00074 {
00075     std::unique_lock<std::recursive_mutex> lock(mutex);
00076     const auto ready = [this]() { return !frames[key_stream].empty(); };
00077     if(!ready() && !cv.wait_for(lock, std::chrono::seconds(5), ready)) throw std::runtime_error("Timeout waiting for frames.");
00078     get_next_frames();
00079 }
00080 
00081 // If a coherent frameset is available, obtain it and return true, otherwise return false immediately
00082 bool syncronizing_archive::poll_for_frames()
00083 {
00084     // TODO: Implement a user-specifiable timeout for how long to wait before returning false?
00085     std::unique_lock<std::recursive_mutex> lock(mutex);
00086     if(frames[key_stream].empty()) return false;
00087     get_next_frames();
00088     return true;
00089 }
00090 
00091 frame_archive::frameset* syncronizing_archive::wait_for_frames_safe()
00092 {
00093     frameset * result = nullptr;
00094     do
00095     {
00096         std::unique_lock<std::recursive_mutex> lock(mutex);
00097         const auto ready = [this]() { return !frames[key_stream].empty(); };
00098         if (!ready() && !cv.wait_for(lock, std::chrono::seconds(5), ready)) throw std::runtime_error("Timeout waiting for frames.");
00099         get_next_frames();
00100         result = clone_frontbuffer();
00101     } 
00102     while (!result);
00103     return result;
00104 }
00105 
00106 bool syncronizing_archive::poll_for_frames_safe(frameset** frameset)
00107 {
00108     // TODO: Implement a user-specifiable timeout for how long to wait before returning false?
00109     std::unique_lock<std::recursive_mutex> lock(mutex);
00110     if (frames[key_stream].empty()) return false;
00111     get_next_frames();
00112     auto result = clone_frontbuffer();
00113     if (result)
00114     {
00115         *frameset = result;
00116         return true;
00117     }
00118     return false;
00119 }
00120 
00121 // Move frames from the queues to the frontbuffers to form the next coherent frameset
00122 void syncronizing_archive::get_next_frames()
00123 {
00124     // Always dequeue a frame from the key stream
00125     dequeue_frame(key_stream);
00126 
00127     // Dequeue from other streams if the new frame is closer to the timestamp of the key stream than the old frame
00128     for(auto s : other_streams)
00129     {
00130         if (frames[s].empty())
00131             continue;
00132 
00133         auto timestamp_of_new_frame = frames[s].front().additional_data.timestamp;
00134         auto timestamp_of_old_frame = frontbuffer.get_frame_timestamp(s);
00135         auto timestamp_of_key_stream = frontbuffer.get_frame_timestamp(key_stream);
00136         if ((timestamp_of_new_frame > timestamp_of_key_stream) ||
00137             (std::fabs(timestamp_of_new_frame - timestamp_of_key_stream) <= std::fabs(timestamp_of_old_frame - timestamp_of_key_stream)))
00138         {
00139             dequeue_frame(s);
00140         }
00141     }
00142 }
00143 
00144 // Move a frame from the backbuffer to the back of the queue
00145 void syncronizing_archive::commit_frame(rs_stream stream)
00146 {
00147     std::unique_lock<std::recursive_mutex> lock(mutex);
00148     frames[stream].push_back(std::move(backbuffer[stream]));
00149     cull_frames();
00150     lock.unlock();
00151     if(!frames[key_stream].empty()) cv.notify_one();
00152 }
00153 
00154 void syncronizing_archive::flush()
00155 {
00156     frontbuffer.cleanup(); // frontbuffer also holds frame references, since its content is publicly available through get_frame_data
00157     frame_archive::flush();
00158 }
00159 
00160 void syncronizing_archive::correct_timestamp(rs_stream stream)
00161 {
00162     if (is_stream_enabled(stream))
00163         {
00164             ts_corrector.correct_timestamp(backbuffer[stream], stream);
00165         }
00166 }
00167 
00168 void syncronizing_archive::on_timestamp(rs_timestamp_data data)
00169 {
00170     ts_corrector.on_timestamp(data);
00171 }
00172 
00173 int syncronizing_archive::get_frame_stride(rs_stream stream) const
00174 {
00175     return frontbuffer.get_frame_stride(stream);
00176 }
00177 
00178 // Discard all frames which are older than the most recent coherent frameset
00179 void syncronizing_archive::cull_frames()
00180 {
00181     // Never keep more than four frames around in any given stream, regardless of timestamps
00182     for(auto s : {RS_STREAM_DEPTH, RS_STREAM_COLOR, RS_STREAM_INFRARED, RS_STREAM_INFRARED2, RS_STREAM_FISHEYE})
00183     {
00184         while(frames[s].size() > 4)
00185         {
00186             discard_frame(s);
00187         }
00188     }
00189 
00190     // Cannot do any culling unless at least one frame is enqueued for each enabled stream    
00191     if(frames[key_stream].empty()) return;
00192     for(auto s : other_streams) if(frames[s].empty()) return;
00193 
00194     // We can discard frames from the key stream if we have at least two and the latter is closer to the most recent frame of all other streams than the former
00195     while(true)
00196     {
00197         if(frames[key_stream].size() < 2) break;
00198         const double t0 = frames[key_stream][0].additional_data.timestamp, t1 = frames[key_stream][1].additional_data.timestamp;
00199 
00200         bool valid_to_skip = true;
00201         for(auto s : other_streams)
00202         {
00203             if (std::fabs(t0 - frames[s].back().additional_data.timestamp) < std::fabs(t1 - frames[s].back().additional_data.timestamp))
00204             {
00205                 valid_to_skip = false;
00206                 break;
00207             }
00208         }
00209         if(!valid_to_skip) break;
00210 
00211         discard_frame(key_stream);
00212     }
00213 
00214     // We can discard frames for other streams if we have at least two and the latter is closer to the next key stream frame than the former
00215     for(auto s : other_streams)
00216     {
00217         while(true)
00218         {
00219             if(frames[s].size() < 2) break;
00220             const double t0 = frames[s][0].additional_data.timestamp, t1 = frames[s][1].additional_data.timestamp;
00221 
00222             if (std::fabs(t0 - frames[key_stream].front().additional_data.timestamp) < std::fabs(t1 - frames[key_stream].front().additional_data.timestamp)) break;
00223             discard_frame(s);
00224         }
00225     }
00226 }
00227 
00228 // Move a single frame from the head of the queue to the front buffer, while recycling the front buffer into the freelist
00229 void syncronizing_archive::dequeue_frame(rs_stream stream)
00230 {
00231     auto & frame = frames[stream].front();
00232     
00233     // Log callback started
00234     auto callback_start_time = std::chrono::high_resolution_clock::now();
00235     frame.update_frame_callback_start_ts(callback_start_time);
00236     auto ts = std::chrono::duration_cast<std::chrono::milliseconds>(callback_start_time - capture_started).count();
00237     LOG_DEBUG("CallbackStarted," << rsimpl::get_string(frame.get_stream_type()) << "," << frame.get_frame_number() << ",DispatchedAt," << ts);
00238 
00239     frontbuffer.place_frame(stream, std::move(frames[stream].front())); // the frame will move to free list once there are no external references to it
00240     frames[stream].erase(begin(frames[stream]));
00241 }
00242 
00243 // Move a single frame from the head of the queue directly to the freelist
00244 void syncronizing_archive::discard_frame(rs_stream stream)
00245 {
00246     std::lock_guard<std::recursive_mutex> guard(mutex);
00247     freelist.push_back(std::move(frames[stream].front()));
00248     frames[stream].erase(begin(frames[stream]));
00249 }


librealsense
Author(s): Sergey Dorodnicov , Mark Horn , Reagan Lopez
autogenerated on Tue Jun 25 2019 19:54:39