Go to the documentation of this file.00001 #include "timestamps.h"
00002 #include "sync.h"
00003 #include <algorithm>
00004
00005 using namespace rsimpl;
00006 using namespace std;
00007
00008
00009
00010 void concurrent_queue::push_back_data(rs_timestamp_data data)
00011 {
00012 lock_guard<mutex> lock(mtx);
00013
00014 data_queue.push_back(data);
00015 }
00016
00017 bool concurrent_queue::pop_front_data()
00018 {
00019 lock_guard<mutex> lock(mtx);
00020
00021 if (!data_queue.size())
00022 return false;
00023
00024 data_queue.pop_front();
00025
00026 return true;
00027 }
00028
00029 bool concurrent_queue::erase(rs_timestamp_data data)
00030 {
00031 lock_guard<mutex> lock(mtx);
00032
00033 auto it = find_if(data_queue.begin(), data_queue.end(),
00034 [&](const rs_timestamp_data& element) {
00035 return (data.frame_number == element.frame_number);
00036 });
00037
00038 if (it != data_queue.end())
00039 {
00040 data_queue.erase(it);
00041 return true;
00042 }
00043
00044 return false;
00045 }
00046
00047 size_t concurrent_queue::size()
00048 {
00049 lock_guard<mutex> lock(mtx);
00050
00051 return data_queue.size();
00052 }
00053
00054 bool concurrent_queue::correct( frame_interface& frame)
00055 {
00056 lock_guard<mutex> lock(mtx);
00057
00058 auto it = find_if(data_queue.begin(), data_queue.end(),
00059 [&](const rs_timestamp_data& element) {
00060 return ((frame.get_frame_number() == element.frame_number));
00061 });
00062
00063 if (it != data_queue.end())
00064 {
00065 frame.set_timestamp(it->timestamp);
00066 return true;
00067 }
00068 return false;
00069 }
00070
00071 timestamp_corrector::timestamp_corrector(std::atomic<uint32_t>* queue_size, std::atomic<uint32_t>* timeout)
00072 :event_queue_size(queue_size), events_timeout(timeout)
00073 {
00074 }
00075
00076 timestamp_corrector::~timestamp_corrector()
00077 {
00078 }
00079
00080 void timestamp_corrector::on_timestamp(rs_timestamp_data data)
00081 {
00082 lock_guard<mutex> lock(mtx);
00083
00084 if (data_queue[data.source_id].size() <= *event_queue_size)
00085 data_queue[data.source_id].push_back_data(data);
00086 if (data_queue[data.source_id].size() > *event_queue_size)
00087 data_queue[data.source_id].pop_front_data();
00088
00089 cv.notify_one();
00090 }
00091
00092 void timestamp_corrector::update_source_id(rs_event_source& source_id, const rs_stream stream)
00093 {
00094 switch(stream)
00095 {
00096 case RS_STREAM_DEPTH:
00097 case RS_STREAM_COLOR:
00098 case RS_STREAM_INFRARED:
00099 case RS_STREAM_INFRARED2:
00100 source_id = RS_EVENT_IMU_DEPTH_CAM;
00101 break;
00102 case RS_STREAM_FISHEYE:
00103 source_id = RS_EVENT_IMU_MOTION_CAM;
00104 break;
00105 default:
00106 throw std::runtime_error(to_string() << "Unsupported source stream requested " << rs_stream_to_string(stream));
00107 }
00108 }
00109
00110 void timestamp_corrector::correct_timestamp(frame_interface& frame, rs_stream stream)
00111 {
00112 unique_lock<mutex> lock(mtx);
00113
00114 bool res;
00115 rs_event_source source_id;
00116 update_source_id(source_id, stream);
00117 if (!(res = data_queue[source_id].correct(frame)))
00118 {
00119 const auto ready = [&]() { return data_queue[source_id].correct(frame); };
00120 res = cv.wait_for(lock, std::chrono::milliseconds(*events_timeout), ready);
00121 }
00122
00123 if (res)
00124 {
00125 frame.set_timestamp_domain(RS_TIMESTAMP_DOMAIN_MICROCONTROLLER);
00126 }
00127 lock.unlock();
00128 }