ip_device.cpp
Go to the documentation of this file.
1 // License: Apache 2.0. See LICENSE file in root directory.
2 // Copyright(c) 2020 Intel Corporation. All Rights Reserved.
3 
4 #include "ip_device.hh"
6 
7 #include "api.h"
9 
10 #include <chrono>
11 #include <list>
12 #include <thread>
13 #include <iostream>
14 #include <string>
15 
16 #include <NetdevLog.h>
17 
18 extern std::map<std::pair<int, int>, rs2_extrinsics> minimal_extrinsics_map;
19 
21 
22 //WA for stop
23 void ip_device::recover_rtsp_client(int sensor_index)
24 {
25  remote_sensors[sensor_index]->rtsp_client = RsRTSPClient::createNew(std::string("rtsp://" + ip_address + ":" + std::to_string(ip_port) + "/" + sensors_str[sensor_index]).c_str(), "rs_network_device", 0, sensor_index);
26 
27  ((RsRTSPClient*)remote_sensors[sensor_index]->rtsp_client)->initFunc(&rs_rtp_stream::get_memory_pool());
28  ((RsRTSPClient*)remote_sensors[sensor_index]->rtsp_client)->getStreams();
29 }
30 
32 {
33  DBG << "Destroying ip_device";
34 
35  try
36  {
37  is_device_alive = false;
38 
39  if (sw_device_status_check.joinable())
40  {
42  }
43 
44  for (int remote_sensor_index = 0; remote_sensor_index < NUM_OF_SENSORS; remote_sensor_index++)
45  {
46  update_sensor_state(remote_sensor_index, {}, false);
47  delete (remote_sensors[remote_sensor_index]);
48  }
49  }
50  catch (const std::exception &e)
51  {
52  ERR << e.what();
53  }
54  DBG << "Destroying ip_device completed";
55 }
56 
57 void ip_device::stop_sensor_streams(int sensor_index)
58 {
59  for(long long int key : remote_sensors[sensor_index]->active_streams_keys)
60  {
61  DBG << "Stopping stream [uid:key] " << streams_collection[key].get()->m_rs_stream.uid << ":" << key << "]";
62  streams_collection[key].get()->is_enabled = false;
63  if(inject_frames_thread[key].joinable())
64  inject_frames_thread[key].join();
65  }
66  remote_sensors[sensor_index]->active_streams_keys.clear();
67 }
68 
70 {
71  int colon = ip_address.find(":");
72  this->ip_address = ip_address.substr(0, colon); // 10.10.10.10:8554 => 10.10.10.10
73  this->ip_port = 8554; // default RTSP port
74  if (colon != -1)
75  try
76  {
77  this->ip_port = std::stoi(ip_address.substr(colon + 1)); // 10.10.10.10:8554 => 8554
78  }
79  catch(...) {}
80 
81  this->is_device_alive = true;
82 
83  //init device data
84  init_device_data(sw_device);
85 }
86 
87 std::vector<rs2_video_stream> ip_device::query_streams(int sensor_id)
88 {
89  DBG << "query_streams";
90  std::vector<rs2_video_stream> streams;
91 
92  if(remote_sensors[sensor_id]->rtsp_client == NULL)
93  return streams;
94 
95  //workaround
96  if(remote_sensors[sensor_id]->rtsp_client == nullptr)
97  recover_rtsp_client(sensor_id);
98 
99  streams = remote_sensors[sensor_id]->rtsp_client->getStreams();
100  return streams;
101 }
102 std::vector<IpDeviceControlData> ip_device::get_controls(int sensor_id)
103 {
104  DBG << "get_controls";
105  std::vector<IpDeviceControlData> controls;
106  controls = remote_sensors[sensor_id]->rtsp_client->getControls();
107 
108  return controls;
109 }
110 
112 {
113  std::vector<rs2::stream_profile> device_streams;
114  std::string url, sensor_name = "";
115  for(int sensor_id = 0; sensor_id < NUM_OF_SENSORS; sensor_id++)
116  {
117 
118  url = std::string("rtsp://" + ip_address + ":" + std::to_string(ip_port) + "/" + sensors_str[sensor_id]);
119  sensor_name = sensors_str[sensor_id];
120 
121  remote_sensors[sensor_id] = new ip_sensor();
122 
123  remote_sensors[sensor_id]->rtsp_client = RsRTSPClient::createNew(url.c_str(), "rs_network_device", 0, sensor_id);
124  ((RsRTSPClient*)remote_sensors[sensor_id]->rtsp_client)->initFunc(&rs_rtp_stream::get_memory_pool());
125 
126  rs2::software_sensor tmp_sensor = sw_device.add_sensor(sensor_name);
127 
128  remote_sensors[sensor_id]->sw_sensor = std::make_shared<rs2::software_sensor>(tmp_sensor);
129 
130  if(sensor_id == 1) //todo: remove hard coded
131  {
132  std::vector<IpDeviceControlData> controls = get_controls(sensor_id);
133  for(auto& control : controls)
134  {
135 
136  float val = NAN;
137 
138  INF << "Init sensor " << control.sensorId << ", option '" << control.option << "', value " << control.range.def;
139 
140  if(control.range.min == control.range.max)
141  {
142  remote_sensors[control.sensorId]->sw_sensor->add_read_only_option(control.option, control.range.def);
143  }
144  else
145  {
146  remote_sensors[control.sensorId]->sw_sensor->add_option(control.option, {control.range.min, control.range.max, control.range.def, control.range.step});
147  }
148  remote_sensors[control.sensorId]->sensors_option[control.option] = control.range.def;
149  try
150  {
151  get_option_value(control.sensorId, control.option, val);
152  if(val != control.range.def && val >= control.range.min && val <= control.range.max)
153  {
154  remote_sensors[control.sensorId]->sw_sensor->set_option(control.option, val);
155  }
156  }
157  catch(const std::exception& e)
158  {
159  ERR << e.what();
160  }
161  }
162  }
163 
164  auto streams = query_streams(sensor_id);
165 
166  DBG << "Init got " << streams.size() << " streams per sensor " << sensor_id;
167 
168  for(int stream_index = 0; stream_index < streams.size(); stream_index++)
169  {
170  bool is_default=false;
171  // just for readable code
172  rs2_video_stream st = streams[stream_index];
173  long long int stream_key = RsRTSPClient::getStreamProfileUniqueKey(st);
174 
175  //check if default value per this stream type were picked
176  if(default_streams[std::make_pair(st.type, st.index)] == -1)
177  {
180  {
181  default_streams[std::make_pair(st.type, st.index)] = stream_index;
182  is_default=true;
183  }
184  }
185 
186  auto stream_profile = remote_sensors[sensor_id]->sw_sensor->add_video_stream(st, is_default);
187  device_streams.push_back(stream_profile);
188  streams_collection[stream_key] = std::make_shared<rs_rtp_stream>(st, stream_profile);
190  }
191  DBG << "Init done adding streams for sensor ID: " << sensor_id;
192  }
193 
194  for(auto stream_profile_from : device_streams)
195  {
196  for(auto stream_profile_to : device_streams)
197  {
198  int from_key = RsRTSPClient::getPhysicalSensorUniqueKey(stream_profile_from.stream_type(), stream_profile_from.stream_index());
199  int to_key = RsRTSPClient::getPhysicalSensorUniqueKey(stream_profile_from.stream_type(), stream_profile_from.stream_index());
200 
201  if(minimal_extrinsics_map.find(std::make_pair(from_key, to_key)) == minimal_extrinsics_map.end())
202  {
203  ERR << "Extrinsics data is missing.";
204  }
205  rs2_extrinsics extrinisics = minimal_extrinsics_map[std::make_pair(from_key, to_key)];
206 
207  stream_profile_from.register_extrinsics_to(stream_profile_to, extrinisics);
208  }
209  }
210 
211  //poll sw device streaming state
212  this->sw_device_status_check = std::thread(&ip_device::polling_state_loop, this);
213  return true;
214 }
215 
217 {
218  while(this->is_device_alive)
219  {
220  try
221  {
222  bool enabled;
223  for(int i = 0; i < NUM_OF_SENSORS; i++)
224  {
225  //poll start/stop events
226  auto sw_sensor = remote_sensors[i]->sw_sensor.get();
227 
228  enabled = (sw_sensor->get_active_streams().size() > 0);
229 
230  if (remote_sensors[i]->is_enabled != enabled)
231  {
232  try
233  {
234  //the state flag is togled before the actual updatee to avoid endless re-tries on case of failure.
236  update_sensor_state(i, sw_sensor->get_active_streams(), true);
237  }
238  catch(const std::exception& e)
239  {
240  ERR << e.what();
241  update_sensor_state(i, {}, true);
242  rs2_software_notification notification;
243  notification.description = e.what();
244  notification.severity = RS2_LOG_SEVERITY_ERROR;
245  notification.type = RS2_EXCEPTION_TYPE_UNKNOWN;
246  notification.serialized_data = e.what();
247  remote_sensors[i]->sw_sensor.get()->on_notification(notification);
248  continue;
249  }
250  }
251  auto sensor_supported_option = sw_sensor->get_supported_options();
252  for(rs2_option opt : sensor_supported_option)
253  {
254  if(remote_sensors[i]->sensors_option[opt] != (float)sw_sensor->get_option(opt))
255  {
256  //TODO: get from map once to reduce logarithmic complexity
257  remote_sensors[i]->sensors_option[opt] = (float)sw_sensor->get_option(opt);
258  INF << "Option '" << opt << "' has changed to: " << remote_sensors[i]->sensors_option[opt];
259  update_option_value(i, opt, remote_sensors[i]->sensors_option[opt]);
260  }
261  }
262  }
263  }
264  catch(const std::exception& e)
265  {
266  ERR << e.what();
267  }
268  std::this_thread::sleep_for(std::chrono::milliseconds(POLLING_SW_DEVICE_STATE_INTERVAL));
269  }
270 }
271 
272 void ip_device::update_option_value(int sensor_index, rs2_option opt, float val)
273 {
274  float updated_value = 0;
275  set_option_value(sensor_index, opt, val);
276  get_option_value(sensor_index, opt, updated_value);
277  if(val != updated_value)
278  {
279  //TODO:: to uncomment after adding exception handling
280  //throw std::runtime_error("[update_option_value] error");
281  ERR << "Cannot update option value.";
282  }
283 }
284 
285 void ip_device::set_option_value(int sensor_index, rs2_option opt, float val)
286 {
287  if(sensor_index < (sizeof(remote_sensors) / sizeof(remote_sensors[0])) && remote_sensors[sensor_index] != nullptr)
288  {
289  remote_sensors[sensor_index]->rtsp_client->setOption(std::string(sensors_str[sensor_index]), opt, val);
290  }
291 }
292 
293 void ip_device::get_option_value(int sensor_index, rs2_option opt, float& val)
294 {
295  if(sensor_index < sizeof(remote_sensors) && remote_sensors[sensor_index] != nullptr)
296  {
297  remote_sensors[sensor_index]->rtsp_client->getOption(std::string(sensors_str[sensor_index]), opt, val);
298  }
299 }
300 
302 {
303  rs2_video_stream retVal;
304  retVal.fmt = sp.format();
305  retVal.type = sp.stream_type();
306  retVal.fps = sp.fps();
307  retVal.width = sp.width();
308  retVal.height = sp.height();
309  retVal.index = sp.stream_index();
310 
311  return retVal;
312 }
313 
314 void ip_device::update_sensor_state(int sensor_index, std::vector<rs2::stream_profile> updated_streams, bool recover)
315 {
316  //check if need to close all
317  if(updated_streams.size() == 0)
318  {
319  remote_sensors[sensor_index]->rtsp_client->stop();
320  remote_sensors[sensor_index]->rtsp_client->close();
321  remote_sensors[sensor_index]->rtsp_client = nullptr;
322  stop_sensor_streams(sensor_index);
323  if(recover)
324  {
325  recover_rtsp_client(sensor_index);
326  }
327  return;
328  }
329  for(size_t i = 0; i < updated_streams.size(); i++)
330  {
331  rs2::video_stream_profile vst(updated_streams[i]);
332 
333  long long int requested_stream_key = RsRTSPClient::getStreamProfileUniqueKey(convert_stream_object(vst));
334 
335  if(streams_collection.find(requested_stream_key) == streams_collection.end())
336  {
337  throw std::runtime_error("[update_sensor_state] stream key: " + std::to_string(requested_stream_key) + " is not found. closing device.");
338  }
339 
340  rtp_callbacks[requested_stream_key] = new rs_rtp_callback(streams_collection[requested_stream_key]);
341  remote_sensors[sensor_index]->rtsp_client->addStream(streams_collection[requested_stream_key].get()->m_rs_stream, rtp_callbacks[requested_stream_key]);
342  inject_frames_thread[requested_stream_key] = std::thread(&ip_device::inject_frames_loop, this, streams_collection[requested_stream_key]);
343  remote_sensors[sensor_index]->active_streams_keys.push_front(requested_stream_key);
344  }
345 
346  remote_sensors[sensor_index]->rtsp_client->start();
347  INF << "Stream started for sensor " << sensor_index;
348 }
349 
351 {
352  if(type == RS2_STREAM_INFRARED || type == RS2_STREAM_DEPTH)
353  return 0;
354  return 1;
355 }
356 
357 void ip_device::inject_frames_loop(std::shared_ptr<rs_rtp_stream> rtp_stream)
358 {
359  try
360  {
361  rtp_stream.get()->is_enabled = true;
362 
363  rtp_stream.get()->frame_data_buff.frame_number = 0;
364 
365  rtp_stream->frame_data_buff.bpp = getStreamProfileBpp(rtp_stream.get()->get_stream_profile().format());
366  rtp_stream->frame_data_buff.stride = rtp_stream->frame_data_buff.bpp * rtp_stream->m_rs_stream.width;
367 
368  int uid = rtp_stream.get()->m_rs_stream.uid;
369  rs2_stream type = rtp_stream.get()->m_rs_stream.type;
370  int sensor_id = stream_type_to_sensor_id(type);
371 
372  while(rtp_stream.get()->is_enabled == true)
373  {
374  if(rtp_stream.get()->queue_size() != 0)
375  {
376  Raw_Frame* frame = rtp_stream.get()->extract_frame();
377  rtp_stream.get()->frame_data_buff.pixels = frame->m_buffer;
378 
379  rtp_stream.get()->frame_data_buff.timestamp = frame->m_metadata->data.timestamp;
380 
381  rtp_stream.get()->frame_data_buff.frame_number++;
382  rtp_stream.get()->frame_data_buff.domain = frame->m_metadata->data.timestampDomain;
383 
384  remote_sensors[sensor_id]->sw_sensor->set_metadata(RS2_FRAME_METADATA_FRAME_TIMESTAMP, rtp_stream.get()->frame_data_buff.timestamp);
386  remote_sensors[sensor_id]->sw_sensor->set_metadata(RS2_FRAME_METADATA_FRAME_COUNTER, rtp_stream.get()->frame_data_buff.frame_number);
387  remote_sensors[sensor_id]->sw_sensor->set_metadata(RS2_FRAME_METADATA_FRAME_EMITTER_MODE, 1);
388 
389  remote_sensors[sensor_id]->sw_sensor->set_metadata(RS2_FRAME_METADATA_TIME_OF_ARRIVAL, std::chrono::duration<double, std::milli>(std::chrono::system_clock::now().time_since_epoch()).count());
390  remote_sensors[sensor_id]->sw_sensor->on_video_frame(rtp_stream.get()->frame_data_buff);
391  }
392  }
393 
394  rtp_stream.get()->reset_queue();
395  DBG << "Polling data at stream " << rtp_stream.get()->m_rs_stream.uid << " completed";
396  }
397  catch(const std::exception& ex)
398  {
399  ERR << ex.what();
400  }
401 }
402 
404 {
405  verify_version_compatibility(api_version);
407 
409 
410  // create sw device
412  // create IP instance
413  ip_device* ip_dev = new ip_device(sw_dev, addr);
414  // set client destruction functioun
415  sw_dev.set_destruction_callback([ip_dev] { delete ip_dev; });
416  // register device info to sw device
417  DeviceData data = ip_dev->remote_sensors[0]->rtsp_client->getDeviceData();
418  sw_dev.update_info(RS2_CAMERA_INFO_NAME, data.name + " IP Device");
422 
423  return sw_dev.get().get();
424 }
425 HANDLE_EXCEPTIONS_AND_RETURN(nullptr, api_version, address)
static int getPhysicalSensorUniqueKey(rs2_stream stream_type, int sensors_index)
rs2_format fmt
Definition: rs_internal.h:50
#define DEFAULT_PROFILE_HIGHT
Definition: ip_device.hh:27
std::map< rs2_option, float > sensors_option
Definition: ip_sensor.hh:24
std::string serialNum
Definition: IRsRtsp.h:16
bool is_device_alive
Definition: ip_device.hh:41
void get_option_value(int sensor_index, rs2_option opt, float &val)
Definition: ip_device.cpp:293
const std::shared_ptr< rs2_device > & get() const
Definition: rs_device.hpp:116
GLenum const void * addr
Definition: glext.h:8864
unsigned int ip_port
Definition: ip_device.hh:47
rs2_option
Defines general configuration controls. These can generally be mapped to camera UVC controls...
Definition: rs_option.h:22
rs2_video_stream convert_stream_object(rs2::video_stream_profile sp)
Definition: ip_device.cpp:301
#define ERR
Definition: NetdevLog.h:9
void register_info(rs2_camera_info info, const std::string &val)
std::map< std::pair< int, int >, rs2_extrinsics > minimal_extrinsics_map
std::map< long long int, std::shared_ptr< rs_rtp_stream > > streams_collection
Definition: ip_device.hh:50
static long long int getStreamProfileUniqueKey(rs2_video_stream t_profile)
#define INF
Definition: NetdevLog.h:11
const std::string RGB_SENSOR_NAME("RGB Camera")
RsMetadataHeader * m_metadata
All the parameters required to define a video stream.
Definition: rs_internal.h:41
std::thread sw_device_status_check
Definition: ip_device.hh:56
virtual int start()=0
rs2_stream type
Definition: rs_internal.h:43
void update_option_value(int sensor_index, rs2_option opt, float val)
Definition: ip_device.cpp:272
void verify_version_compatibility(int api_version)
Definition: api.h:463
struct RsMetadataHeader::@5 data
int stream_type_to_sensor_id(rs2_stream type)
Definition: ip_device.cpp:350
GLsizei const GLchar *const * string
software_sensor add_sensor(std::string name)
void update_sensor_state(int sensor_index, std::vector< rs2::stream_profile > updated_streams, bool recover)
Definition: ip_device.cpp:314
e
Definition: rmse.py:177
virtual int stop()=0
IRsRtsp * rtsp_client
Definition: ip_sensor.hh:29
GLuint GLfloat * val
GLuint64 key
Definition: glext.h:8966
std::string name
Definition: IRsRtsp.h:17
dictionary streams
Definition: t265_stereo.py:140
rs2_timestamp_domain timestampDomain
Definition: RsCommon.h:26
#define HANDLE_EXCEPTIONS_AND_RETURN(R,...)
Definition: api.h:399
std::string sensors_str[]
Definition: ip_device.cpp:20
void polling_state_loop()
Definition: ip_device.cpp:216
const std::string STEREO_SENSOR_NAME("Stereo Module")
Definition: api.h:28
#define DEFAULT_PROFILE_COLOR_FORMAT
Definition: ip_device.hh:29
GLenum GLenum GLsizei const GLuint GLboolean enabled
virtual std::vector< rs2_video_stream > getStreams()=0
virtual int getOption(const std::string &t_sensorName, rs2_option t_option, float &t_value)=0
int stream_index() const
Definition: rs_frame.hpp:34
std::vector< IpDeviceControlData > get_controls(int sensor_id)
Definition: ip_device.cpp:102
void update_info(rs2_camera_info info, const std::string &val)
std::vector< IpDeviceControlData > controls
static MemoryPool & get_memory_pool()
#define NUM_OF_SENSORS
Definition: ip_device.hh:19
bool is_enabled
Definition: ip_sensor.hh:26
#define DBG
Definition: NetdevLog.h:8
#define POLLING_SW_DEVICE_STATE_INTERVAL
Definition: ip_device.hh:21
rs2_stream
Streams are different types of data provided by RealSense devices.
Definition: rs_sensor.h:42
virtual std::vector< IpDeviceControlData > getControls()=0
virtual int setOption(const std::string &t_sensorName, rs2_option t_option, float t_value)=0
#define BEGIN_API_CALL
Definition: api.h:397
void set_destruction_callback(T callback) const
Cross-stream extrinsics: encodes the topology describing how the different devices are oriented...
Definition: rs_sensor.h:96
std::map< long long int, rs_rtp_callback * > rtp_callbacks
Definition: ip_device.hh:54
virtual int close()=0
bool init_device_data(rs2::software_device sw_device)
Definition: ip_device.cpp:111
void inject_frames_loop(std::shared_ptr< rs_rtp_stream > rtp_stream)
Definition: ip_device.cpp:357
std::vector< rs2_video_stream > query_streams(int sensor_id)
Definition: ip_device.cpp:87
ip_device(rs2::software_device sw_device, std::string ip_address)
Definition: ip_device.cpp:69
rs2_format format() const
Definition: rs_frame.hpp:44
void set_option_value(int sensor_index, rs2_option opt, float val)
Definition: ip_device.cpp:285
#define DEFAULT_PROFILE_FPS
Definition: ip_device.hh:23
rs2_device * rs2_create_net_device(int api_version, const char *address, rs2_error **error) BEGIN_API_CALL
Definition: ip_device.cpp:403
All the parameters required to define a sensor notification.
Definition: rs_internal.h:122
void recover_rtsp_client(int sensor_index)
Definition: ip_device.cpp:23
double timestamp
Definition: RsCommon.h:23
static IRsRtsp * createNew(char const *t_rtspURL, char const *t_applicationName, portNumBits t_tunnelOverHTTPPortNum, int idx)
int getStreamProfileBpp(rs2_format t_format)
Definition: RsCommon.cpp:6
GLenum type
std::list< long long int > active_streams_keys
Definition: ip_sensor.hh:22
std::map< long long int, std::thread > inject_frames_thread
Definition: ip_device.hh:52
std::map< std::pair< rs2_stream, int >, int > default_streams
Definition: ip_device.hh:80
GLint GLsizei count
#define DEFAULT_PROFILE_WIDTH
Definition: ip_device.hh:25
GLuint GLuint64EXT address
Definition: glext.h:11417
int stoi(const std::string &value)
#define NULL
Definition: tinycthread.c:47
std::shared_ptr< rs2::software_sensor > sw_sensor
Definition: ip_sensor.hh:20
int i
std::string ip_address
Definition: ip_device.hh:46
void stop_sensor_streams(int sensor_id)
Definition: ip_device.cpp:57
virtual int addStream(rs2_video_stream t_stream, rtp_callback *t_frameCallBack)=0
rs2_stream stream_type() const
Definition: rs_frame.hpp:39
char * m_buffer
ip_sensor * remote_sensors[NUM_OF_SENSORS]
Definition: ip_device.hh:38
int fps() const
Definition: rs_frame.hpp:49
std::string usbType
Definition: IRsRtsp.h:18
Definition: parser.hpp:150
#define VALIDATE_NOT_NULL(ARG)
Definition: api.h:406
MemoryPool * memory_pool
Definition: ip_device.hh:44
std::string to_string(T value)


librealsense2
Author(s): Sergey Dorodnicov , Doron Hirshberg , Mark Horn , Reagan Lopez , Itay Carpis
autogenerated on Mon May 3 2021 02:47:17