oem7_message_nodelet.cpp
Go to the documentation of this file.
1 //
3 // Copyright (c) 2020 NovAtel Inc.
4 //
5 // Permission is hereby granted, free of charge, to any person obtaining a copy
6 // of this software and associated documentation files (the "Software"), to deal
7 // in the Software without restriction, including without limitation the rights
8 // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
9 // copies of the Software, and to permit persons to whom the Software is
10 // furnished to do so, subject to the following conditions:
11 //
12 // The above copyright notice and this permission notice shall be included in all
13 // copies or substantial portions of the Software.
14 //
15 // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
16 // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
17 // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
18 // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
19 // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
20 // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
21 // SOFTWARE.
22 //
24 
25 #include <ros/ros.h>
26 #include <nodelet/nodelet.h>
27 #include <ros/callback_queue.h>
28 
29 #include <mutex>
30 #include <condition_variable>
31 #include <map>
32 
33 #include <boost/asio.hpp>
34 
35 #include "novatel_oem7_msgs/Oem7AbasciiCmd.h"
36 #include "novatel_oem7_msgs/Oem7RawMsg.h"
37 
38 #include <pluginlib/class_loader.h>
39 
42 
45 #include <oem7_ros_publisher.hpp>
46 
47 #include <message_handler.hpp>
48 
49 
50 namespace novatel_oem7_driver
51 {
59  public nodelet::Nodelet
60  {
61  std::mutex nodelet_mtx_;
62 
64 
67 
70 
71  // Command service
72  std::condition_variable rsp_ready_cond_;
73  std::mutex rsp_ready_mtx_;
74  std::string rsp_;
75  ros::CallbackQueue queue_; //< Dedicated queue for command requests.
78 
79 
81 
84 
85  std::set<int> raw_msg_pub_;
86 
88 
89  // Log statistics
91 
92  typedef std::map<int, long> log_count_map_t;
93  log_count_map_t log_counts_;
94 
97 
98 
101 
102 
103 
107  void onInit()
108  {
109  try
110  {
111  onInitImpl();
112  }
113  catch(std::exception const& ex)
114  {
115  NODELET_FATAL_STREAM("Fatal: " << ex.what());
116 
117  ros::shutdown();
118  }
119  }
120 
124  void onInitImpl()
125  {
126  NODELET_INFO_STREAM(getName() << ": Oem7MessageNodelet v." << novatel_oem7_driver_VERSION << "; "
127  << __DATE__ << " " << __TIME__);
128 
129  std::lock_guard<std::mutex> guard(nodelet_mtx_);
130 
131 
133 
134  getNodeHandle().setCallbackQueue(&timer_queue_);
135 
136  getPrivateNodeHandle().getParam("oem7_publish_unknown_oem7raw", publish_unknown_oem7raw_);
137 
138  getPrivateNodeHandle().getParam("oem7_publish_delay", publish_delay_sec_);
139  if(publish_delay_sec_ > 0)
140  {
141  NODELET_WARN_STREAM("Publish Delay: " << publish_delay_sec_ << " seconds. Is this is a test?");
142  }
143  // Load plugins
144 
145  // Load Oem7Receiver
146  std::string oem7_if_name;
147  getPrivateNodeHandle().getParam("oem7_if", oem7_if_name);
148  recvr_ = recvr_loader_.createInstance(oem7_if_name);
149  recvr_->initialize(getPrivateNodeHandle());
150 
151  // Load Oem7 Message Decoder
152  std::string msg_decoder_name;
153  getPrivateNodeHandle().getParam("oem7_msg_decoder", msg_decoder_name);
154  msg_decoder = oem7_msg_decoder_loader.createInstance(msg_decoder_name);
155  msg_decoder->initialize(getPrivateNodeHandle(), recvr_.get(), this);
156 
157 
158  msg_handler_.reset(new MessageHandler(getPrivateNodeHandle()));
159 
160  // Oem7 raw messages to publish.
161  std::vector<std::string> oem7_raw_msgs;
162  bool ok = getPrivateNodeHandle().getParam("oem7_raw_msgs", oem7_raw_msgs);
163  for(const auto& msg : oem7_raw_msgs)
164  {
165  int raw_msg_id = getOem7MessageId(msg);
166  if(raw_msg_id == 0)
167  {
168  NODELET_ERROR_STREAM("Unknown Oem7 message '" << msg );
169  }
170  else
171  {
172  NODELET_INFO_STREAM("Oem7 Raw message '" << msg << "' will be published." );
173  raw_msg_pub_.insert(raw_msg_id);
174  }
175  }
176 
177  oem7rawmsg_pub_.setup<novatel_oem7_msgs::Oem7RawMsg>("Oem7RawMsg", getPrivateNodeHandle());
178 
179  timer_spinner_.reset(new ros::AsyncSpinner(1, &timer_queue_)); //< 1 thread servicing the command queue.
180  timer_spinner_->start();
181 
183 
184  aspinner_.reset(new ros::AsyncSpinner(1, &queue_));
185  aspinner_->start();
186 
187  sleep(0.5); // Allow all threads to start
188 
189  ros::AdvertiseServiceOptions ops = ros::AdvertiseServiceOptions::create<novatel_oem7_msgs::Oem7AbasciiCmd>(
190  "Oem7Cmd",
191  boost::bind(&Oem7MessageNodelet::serviceOem7AbasciiCb, this, _1, _2),
193  &queue_);
194  oem7_cmd_srv_ = getPrivateNodeHandle().advertiseService(ops);
195  }
196 
197 
201  bool serviceOem7AbasciiCb(novatel_oem7_msgs::Oem7AbasciiCmd::Request& req, novatel_oem7_msgs::Oem7AbasciiCmd::Response& rsp)
202  {
203  NODELET_DEBUG_STREAM("AACmd: cmd '" << req.cmd << "'");
204 
205  // Retry sending the commands. For configuration commands, there is no harm in duplicates.
206  for(int attempt = 0;
207  attempt < 10;
208  attempt++)
209  {
210  {
211  std::lock_guard<std::mutex> lk(rsp_ready_mtx_);
212  rsp_.clear();
213  }
214 
215  recvr_->write(boost::asio::buffer(req.cmd));
216  static const std::string NEWLINE("\n");
217  recvr_->write(boost::asio::buffer(NEWLINE));
218 
219  std::unique_lock<std::mutex> lk(rsp_ready_mtx_);
220  if(rsp_ready_cond_.wait_until(lk,
221  std::chrono::steady_clock::now() + std::chrono::seconds(3)
222  ) == std::cv_status::no_timeout)
223  {
224  rsp.rsp = rsp_;
225  break;
226  }
227 
228  NODELET_ERROR_STREAM("Attempt " << attempt << ": timed out waiting for response.");
229  }
230 
231  if(rsp.rsp == "OK")
232  {
233  NODELET_INFO_STREAM("AACmd '" << req.cmd << "' : " << "'" << rsp.rsp << "'");
234  }
235  else
236  {
237  NODELET_ERROR_STREAM("AACmd '" << req.cmd << "' : " << "'" << rsp.rsp << "'");
238  }
239 
240  return true;
241  }
242 
247  {
248  NODELET_INFO("Log Statistics:");
249  NODELET_INFO_STREAM("Logs: " << total_log_count_ << "; unknown: " << unknown_msg_num_
250  << "; discarded: " << discarded_msg_num_);
251 
252  for(log_count_map_t::iterator itr = log_counts_.begin();
253  itr != log_counts_.end();
254  itr++)
255  {
256  int id = itr->first;
257  long count = itr->second;
258 
259  NODELET_INFO_STREAM("Log[" << getOem7MessageName(id) << "](" << id << "):" << count);
260  }
261  }
262 
263  /*
264  * Update Log statistics for a particular message
265  */
266  void updateLogStatistics(const Oem7RawMessageIf::ConstPtr& raw_msg)
267  {
268  total_log_count_++;
269 
270  if(log_counts_.find(raw_msg->getMessageId()) == log_counts_.end())
271  {
272  log_counts_[raw_msg->getMessageId()] = 0;
273  }
274  log_counts_[raw_msg->getMessageId()]++;
275 
276 
277  if((total_log_count_ % 10000) == 0)
278  {
280  }
281  }
282 
283  void publishOem7RawMsg(Oem7RawMessageIf::ConstPtr raw_msg)
284  {
285  novatel_oem7_msgs::Oem7RawMsg::Ptr oem7_raw_msg(new novatel_oem7_msgs::Oem7RawMsg);
286  oem7_raw_msg->message_data.insert(
287  oem7_raw_msg->message_data.end(),
288  raw_msg->getMessageData(0),
289  raw_msg->getMessageData(raw_msg->getMessageDataLength()));
290 
291  assert(oem7_raw_msg->message_data.size() == raw_msg->getMessageDataLength());
292 
293  oem7rawmsg_pub_.publish(oem7_raw_msg);
294  }
295 
296 
300  void onNewMessage(Oem7RawMessageIf::ConstPtr raw_msg)
301  {
302  NODELET_DEBUG_STREAM("onNewMsg: fmt= " << raw_msg->getMessageFormat()
303  << " type= " << raw_msg->getMessageType());
304 
305  // Discard all unknown messages; this is normally when dealing with responses to ASCII commands.
306  if(raw_msg->getMessageFormat() == Oem7RawMessageIf::OEM7MSGFMT_UNKNOWN)
307  {
309  NODELET_DEBUG_STREAM("Unknown: [ID: " << raw_msg->getMessageId() <<
310  "Type: " << raw_msg->getMessageType() <<
311  "Fmt: " << raw_msg->getMessageFormat() <<
312  "Len: " << raw_msg->getMessageDataLength() <<
313  "]");
314  if(publish_unknown_oem7raw_)
315  {
316  publishOem7RawMsg(raw_msg);
317  }
318  else
319  {
321  }
322  }
323  else
324  {
325  if(raw_msg->getMessageFormat() == Oem7RawMessageIf::OEM7MSGFMT_ASCII ||
326  raw_msg->getMessageFormat() == Oem7RawMessageIf::OEM7MSGFMT_ABASCII)
327  {
328  std::string msg(raw_msg->getMessageData(0), raw_msg->getMessageData(raw_msg->getMessageDataLength()));
329 
330  NODELET_DEBUG_STREAM(">---------------------------" << std::endl
331  << msg << std::endl
332  << "<---------------------------");
333  }
334 
335  if(raw_msg->getMessageType() == Oem7RawMessageIf::OEM7MSGTYPE_RSP) // Response
336  {
337  std::string rsp(raw_msg->getMessageData(0), raw_msg->getMessageData(raw_msg->getMessageDataLength()));
338  if(rsp.find_first_not_of(" /t/r/n") != std::string::npos && // ignore all-whitespace responses
339  std::all_of(rsp.begin(), rsp.end(), [](char c){return std::isprint(c);})) // Ignore any corrupt responses
340  {
341  {
342  std::lock_guard<std::mutex> lk(rsp_ready_mtx_);
343  rsp_ = rsp;
344  }
345  rsp_ready_cond_.notify_one();
346  }
347  else
348  {
349  NODELET_ERROR_STREAM("Discarded corrupt ASCII response: '" << rsp_ << "'");
350  }
351  }
352  else // Log
353  {
354  if( raw_msg->getMessageFormat() == Oem7RawMessageIf::OEM7MSGFMT_BINARY || // binary
355  (raw_msg->getMessageFormat() == Oem7RawMessageIf::OEM7MSGFMT_ASCII && isNMEAMessage(raw_msg)))
356  {
357  updateLogStatistics(raw_msg);
358 
359  msg_handler_->handleMessage(raw_msg);
360 
361  // Publish Oem7RawMsg if specified
362  if(raw_msg_pub_.find(raw_msg->getMessageId()) != raw_msg_pub_.end())
363  {
364  publishOem7RawMsg(raw_msg);
365  }
366 
367  if(publish_delay_sec_ > 0)
368  {
369  sleep(publish_delay_sec_);
370  }
371  }
372  }
373  }
374  }
375 
376 
380  void serviceLoopCb(const ros::TimerEvent& event)
381  {
382  msg_decoder->service();
383 
385 
386  NODELET_WARN("No more input from Decoder; Oem7MessageNodelet finished.");
387  }
388 
389 
390  public:
392  recvr_loader_( "novatel_oem7_driver", "novatel_oem7_driver::Oem7ReceiverIf"),
393  oem7_msg_decoder_loader("novatel_oem7_driver", "novatel_oem7_driver::Oem7MessageDecoderIf"),
394  total_log_count_(0),
395  unknown_msg_num_(0),
396  discarded_msg_num_(0),
397  publish_delay_sec_(0),
398  publish_unknown_oem7raw_(false)
399  {
400  }
401 
403  {
404  NODELET_DEBUG("~Oem7MessageNodelet");
405  }
406  };
407 }
408 
409 
void onNewMessage(Oem7RawMessageIf::ConstPtr raw_msg)
#define NODELET_INFO_STREAM(...)
std::mutex nodelet_mtx_
Protects nodelet internal state.
boost::shared_ptr< void const > VoidConstPtr
pluginlib::ClassLoader< novatel_oem7_driver::Oem7MessageDecoderIf > oem7_msg_decoder_loader
#define NODELET_WARN(...)
boost::shared_ptr< ros::AsyncSpinner > aspinner_
1 thread servicing the command queue.
bool isNMEAMessage(const Oem7RawMessageIf::ConstPtr &raw_msg)
std::set< int > raw_msg_pub_
Set of raw messages to publish.
const std::string & getName() const
long total_log_count_
Total number of logs received.
log_count_map_t log_counts_
Indidividual log counts.
bool publish_unknown_oem7raw_
Publish all unknown messages to &#39;Oem7Raw&#39;.
#define NODELET_WARN_STREAM(...)
void publishOem7RawMsg(Oem7RawMessageIf::ConstPtr raw_msg)
std::condition_variable rsp_ready_cond_
Response ready, signaled from response handler to Oem7 Cmd Handler.
ServiceServer advertiseService(const std::string &service, bool(T::*srv_func)(MReq &, MRes &), T *obj)
void serviceLoopCb(const ros::TimerEvent &event)
ros::NodeHandle & getPrivateNodeHandle() const
long discarded_msg_num_
Number of messages received and discarded by the driver.
boost::shared_ptr< novatel_oem7_driver::Oem7MessageDecoderIf > msg_decoder
Message Decoder plugin.
int getOem7MessageId(const std::string &msg_name)
#define NODELET_ERROR_STREAM(...)
boost::shared_ptr< novatel_oem7_driver::Oem7ReceiverIf > recvr_
Oem7 Receiver Interface plugin.
Oem7RosPublisher oem7rawmsg_pub_
Publishes raw Oem7 messages.
void setCallbackQueue(CallbackQueueInterface *queue)
ros::ServiceServer oem7_cmd_srv_
Oem7 command service.
std::mutex rsp_ready_mtx_
Response condition guard.
Timer createTimer(Rate r, Handler h, Obj o, bool oneshot=false, bool autostart=true) const
boost::shared_ptr< MessageHandler > msg_handler_
Dispatches individual messages for handling.
ros::NodeHandle & getNodeHandle() const
void updateLogStatistics(const Oem7RawMessageIf::ConstPtr &raw_msg)
void publish(boost::shared_ptr< M > &msg)
#define NODELET_DEBUG_STREAM(...)
PLUGINLIB_EXPORT_CLASS(novatel_oem7_driver::Oem7ConfigNodelet, nodelet::Nodelet)
std::string rsp_
The latest response from Oem7 receiver.
#define NODELET_INFO(...)
void setup(const std::string &name, ros::NodeHandle &nh)
void initializeOem7MessageUtil(ros::NodeHandle &nh)
double publish_delay_sec_
Delay after publishing each message; used to throttle output with static data sources.
bool getParam(const std::string &key, std::string &s) const
bool serviceOem7AbasciiCb(novatel_oem7_msgs::Oem7AbasciiCmd::Request &req, novatel_oem7_msgs::Oem7AbasciiCmd::Response &rsp)
#define NODELET_FATAL_STREAM(...)
ros::CallbackQueue timer_queue_
Dedicated queue for command requests.
long unknown_msg_num_
number of messages received that could not be identified.
ROSCPP_DECL void shutdown()
ros::Timer timer_
One time service callback.
const std::string & getOem7MessageName(int msg_id)
pluginlib::ClassLoader< novatel_oem7_driver::Oem7ReceiverIf > recvr_loader_
#define NODELET_DEBUG(...)
boost::shared_ptr< ros::AsyncSpinner > timer_spinner_
1 thread servicing the command queue.


novatel_oem7_driver
Author(s):
autogenerated on Tue Mar 9 2021 03:48:00