client_node.cpp
Go to the documentation of this file.
1 /****************************************************************
2  ** **
3  ** Copyright(C) 2020 Quanergy Systems. All Rights Reserved. **
4  ** Contact: http://www.quanergy.com **
5  ** **
6  ****************************************************************/
7 
8 // publisher module
10 
11 #include <quanergy/client/version.h>
12 
13 #if (QUANERGY_CLIENT_VERSION/100000 != 5)
14  #error Incompatible Quanergy Client Version; looking for v5.x.x
15 #endif
16 
17 // console parser
18 #include <boost/program_options.hpp>
19 
20 // TCP client for sensor
21 #include <quanergy/client/sensor_client.h>
22 
23 // sensor pipeline
24 #include <quanergy/pipelines/sensor_pipeline.h>
25 
26 // async module for multithreading
27 #include <quanergy/pipelines/async.h>
28 
29 // settings specific to ROS
31 {
32  // Whether or not to publish each return on a separate topic
33  // This reuses SensorPipeline's return_selection with an additional option
34  bool separate_return_topics = false;
35 
36  // ROS topic to publish on
37  std::string topic = "points";
38 
39  // determines whether to use ROS Time in the ROS Msg.
40  bool use_ros_time = false;
41 
43  void load(const quanergy::pipeline::SettingsFileLoader& settings);
44 };
45 
46 int main(int argc, char** argv)
47 {
48  ros::init(argc, argv, "quanergy_client_ros");
49  ros::NodeHandle nh;
50 
51  namespace po = boost::program_options;
52 
53  po::options_description description("Quanergy Client ROS Node");
54  const po::positional_options_description p; // empty positional options
55 
56  quanergy::pipeline::SensorPipelineSettings pipeline_settings;
57  RosNodeSettings ros_node_settings;
58  std::string return_string;
59  std::vector<float> correct_params;
60 
61  // port
62  std::string port = "4141";
63 
64  description.add_options()
65  ("help,h", "Display this help message.")
66  ("settings-file,s", po::value<std::string>(),
67  "Settings file. Setting file values override defaults and command line arguments override the settings file.")
68  ("topic,t", po::value<std::string>(&ros_node_settings.topic)->
69  default_value(ros_node_settings.topic),
70  "ROS topic for publishing the point cloud. If 'all_separate_topics' is chosen for --return, "
71  "a topic will be created for each return number with the return number appended to the topic name.")
72  ("use-ros-time", po::bool_switch(&ros_node_settings.use_ros_time),
73  "flag determining whether to use ROS time in message; uses sensor time otherwise,")
74  ("host", po::value<std::string>(&pipeline_settings.host),
75  "Host name or IP of the sensor.")
76  ("frame,f", po::value<std::string>(&pipeline_settings.frame)->
77  default_value(pipeline_settings.frame),
78  "Frame name inserted in the point cloud.")
79  ("return,r", po::value<std::string>(&return_string),
80  "Return selection (M-series only) - "
81  "Options are 0, 1, 2, all, or all_separate_topics. For 3 return packets, 'all' creates an unorganized point cloud "
82  "and 'all_separate_topics' creates 3 separate organized point clouds on their own topics. "
83  "For single return, explicitly setting a value produces an error if the selection doesn't match the packet.")
84  ("calibrate", po::bool_switch(&pipeline_settings.calibrate),
85  "Flag indicating encoder calibration should be performed and applied to outgoing points; M-series only.")
86  ("frame-rate", po::value<double>(&pipeline_settings.frame_rate)->
87  default_value(pipeline_settings.frame_rate),
88  "Frame rate used when peforming encoder calibration; M-series only.")
89  ("manual-correct", po::value<std::vector<float>>(&correct_params)->multitoken()->value_name("amplitude phase"),
90  "Correct encoder error with user defined values. Both amplitude and phase are in radians; M-series only.")
91  ("min-distance", po::value<float>(&pipeline_settings.min_distance)->
92  default_value(pipeline_settings.min_distance),
93  "minimum distance (inclusive) for distance filtering.")
94  ("max-distance", po::value<float>(&pipeline_settings.max_distance)->
95  default_value(pipeline_settings.max_distance),
96  "maximum distance (inclusive) for distance filtering.")
97  ("min-cloud-size", po::value<std::int32_t>(&pipeline_settings.min_cloud_size)->
98  default_value(pipeline_settings.min_cloud_size),
99  "minimum cloud size; produces an error and ignores clouds smaller than this.")
100  ("max-cloud-size", po::value<std::int32_t>(&pipeline_settings.max_cloud_size)->
101  default_value(pipeline_settings.max_cloud_size),
102  "maximum cloud size; produces an error and ignores clouds larger than this.");
103 
104  try
105  {
106  // load the command line options into the variables map
107  po::variables_map vm;
108  po::store(po::command_line_parser(argc, argv).options(description).positional(p).run(), vm);
109 
110  if (vm.count("help"))
111  {
112  std::cout << description << std::endl;
113  return 0;
114  }
115 
116  // if there is a settings file, load that before notifying (which fills the variables)
117  if (vm.count("settings-file"))
118  {
119  std::string settings_file = vm["settings-file"].as<std::string>();
120  quanergy::pipeline::SettingsFileLoader file_loader;
121  file_loader.loadXML(settings_file);
122  ros_node_settings.load(file_loader);
123 
124  if (ros_node_settings.separate_return_topics)
125  {
126  // we'll deal with this later but we need to change the value in settings
127  // since pipeline_settings won't know how to handle it
128  file_loader.put("Settings.return", pipeline_settings.stringFromReturn(0));
129  }
130 
131  pipeline_settings.load(file_loader);
132  }
133 
134  // notify; this stores command line options in associated variables
135  po::notify(vm);
136 
137  // let the user know if there is no host value
138  if (pipeline_settings.host.empty())
139  {
140  std::cout << "No host provided" << std::endl;
141  std::cout << description << std::endl;
142  return -1;
143  }
144 
145  // handle return selection
146  if (!return_string.empty())
147  {
148  pipeline_settings.return_selection_set = true;
149  if (return_string == "all_separate_topics")
150  {
151  ros_node_settings.separate_return_topics = true;
152  pipeline_settings.return_selection = 0;
153  }
154  else
155  {
156  pipeline_settings.return_selection = pipeline_settings.returnFromString(return_string);
157  }
158  }
159 
160  // handle encoder correction parameters
161  if (!correct_params.empty())
162  {
163  if (correct_params.size() == 2)
164  {
165  pipeline_settings.override_encoder_params = true;
166  pipeline_settings.amplitude = correct_params[0];
167  pipeline_settings.phase = correct_params[1];
168  }
169  else
170  {
171  std::cout << "Manual encoder correction expects exactly 2 paramters: amplitude and phase" << std::endl;
172  std::cout << description << std::endl;
173  return -1;
174  }
175  }
176  }
177  catch (po::error& e)
178  {
179  std::cout << "Boost Program Options Error: " << e.what() << std::endl << std::endl;
180  std::cout << description << std::endl;
181  return -1;
182  }
183  catch (std::exception& e)
184  {
185  std::cout << "Error: " << e.what() << std::endl;
186  return -2;
187  }
188 
189 
190  // create client to get raw packets from the sensor
191  quanergy::client::SensorClient client(pipeline_settings.host, port, 100);
192 
193  // create list (because it is noncopyable) of asyncs that will use if all_separate_topics so processing can happen on separate threads
194  using Async = quanergy::pipeline::AsyncModule<std::shared_ptr<std::vector<char>>>;
195  std::list<Async> asyncs;
196 
197  // create list (because it is noncopyable) of pipelines to produce point cloud from raw packets
198  // we may need multiple for separate_return_topics
199  std::list<quanergy::pipeline::SensorPipeline> pipelines;
200 
201  // create list (because it is noncopyable) of ROS publishers to consume point clouds and publish them
202  // we may need multiple for separate_return_topics
203  std::list<SimplePublisher<quanergy::PointXYZIR>> publishers;
204 
205  // create vector of threads for publisher(s)
206  // we may need multiple for separate_return_topics
207  std::vector<std::thread> publisher_threads;
208 
209  // store connections for cleaner shutdown
210  std::vector<boost::signals2::connection> connections;
211 
212  // Some synchronization stuff
213  std::mutex client_mutex;
214  std::atomic<int> publishers_count = {0};
215  std::condition_variable publisher_cv;
216 
217  // connect things and run publisher(s) on their own thread(s)
218  int pipeline_count = 1;
219  if (ros_node_settings.separate_return_topics)
220  {
221  pipeline_count = quanergy::client::M_SERIES_NUM_RETURNS;
222  }
223 
224  for (int i = 0; i < pipeline_count; ++i)
225  {
226  // set return for pipeline and define topic, if needed
227  std::string topic = ros_node_settings.topic;
228  if (ros_node_settings.separate_return_topics)
229  {
230  pipeline_settings.return_selection = i;
231  topic += std::to_string(i);
232  }
233 
234  // create pipeline and get reference to it
235  pipelines.emplace_back(pipeline_settings);
236  auto& pipeline = pipelines.back();
237 
238  // create publisher and get reference to it
239  publishers.emplace_back(nh, topic, ros_node_settings.use_ros_time);
240  auto& publisher = publishers.back();
241 
242  if (ros_node_settings.separate_return_topics)
243  {
244  // create async to put pipelines on separate threads
245  // need larger queue because the parser collects ~100 packets to assemble into point cloud
246  asyncs.emplace_back(100);
247  auto& async = asyncs.back();
248 
249  // connect client to async
250  connections.push_back(client.connect(
251  [&async](const std::shared_ptr<std::vector<char>>& packet){ async.slot(packet); }
252  ));
253 
254  // connect async to pipeline
255  connections.push_back(async.connect(
256  [&pipeline](const std::shared_ptr<std::vector<char>>& packet){ pipeline.slot(packet); }
257  ));
258  }
259  else
260  {
261  // connect client to pipeline
262  connections.push_back(client.connect(
263  [&pipeline](const std::shared_ptr<std::vector<char>>& packet){ pipeline.slot(packet); }
264  ));
265  }
266 
267  // connect the pipeline to the publisher
268  connections.push_back(pipeline.connect(
269  [&publisher](const boost::shared_ptr<pcl::PointCloud<quanergy::PointXYZIR>>& pc){ publisher.slot(pc); }
270  ));
271 
272  // create publisher thread
273  publisher_threads.emplace_back(
274  [&publisher, &client_mutex, &client, &publishers_count, &publisher_cv]
275  {
276  std::unique_lock<std::mutex> lock(client_mutex);
277  ++publishers_count;
278  lock.unlock();
279  publisher_cv.notify_one();
280 
281  publisher.run();
282  lock.lock();
283  client.stop();
284  }
285  );
286  }
287 
288  // this makes sure the publishers have started; without it, sometimes the client starts first (due to OS scheduling)
289  // and then we get a bunch of warning messages related to data coming in without being consumed
290  {
291  std::unique_lock<std::mutex> lock(client_mutex);
292  publisher_cv.wait(lock, [&publishers_count, pipeline_count]{ return publishers_count == pipeline_count; });
293  }
294 
295  // start client
296  try
297  {
298  client.run();
299  }
300  catch (std::exception& e)
301  {
302  std::cerr << "Terminating after catching exception: "
303  << e.what()
304  << std::endl;
305  }
306 
307 
308  // Clean up
309  ros::shutdown();
310 
311  connections.clear();
312 
313  for (auto &thread : publisher_threads)
314  {
315  thread.join();
316  }
317 
318  publisher_threads.clear();
319 
320  return (0);
321 }
322 
323 void RosNodeSettings::load(const quanergy::pipeline::SettingsFileLoader& settings)
324 {
325  auto r = settings.get_optional<std::string>("Settings.return");
326  if (r && *r == "all_separate_topics")
327  {
328  separate_return_topics = true;
329  }
330 
331  topic = settings.get("Settings.RosNode.topic", topic);
332 
333  use_ros_time = settings.get("Settings.RosNode.useRosTime", use_ros_time);
334 }
run
void run(class_loader::ClassLoader *loader)
boost::shared_ptr
ros::init
ROSCPP_DECL void init(const M_string &remappings, const std::string &name, uint32_t options=0)
ros::shutdown
ROSCPP_DECL void shutdown()
main
int main(int argc, char **argv)
Definition: client_node.cpp:46
simple_publisher.h
RosNodeSettings::topic
std::string topic
Definition: client_node.cpp:42
RosNodeSettings::load
void load(const quanergy::pipeline::SettingsFileLoader &settings)
loads settings from SettingsFileLoader
Definition: client_node.cpp:323
default_value
def default_value(type_)
RosNodeSettings::use_ros_time
bool use_ros_time
Definition: client_node.cpp:45
RosNodeSettings
Definition: client_node.cpp:30
RosNodeSettings::separate_return_topics
bool separate_return_topics
Definition: client_node.cpp:39
ros::NodeHandle


quanergy_client_ros
Author(s):
autogenerated on Thu May 5 2022 02:07:29