protobuf2ros_stream.cc
Go to the documentation of this file.
1 /*
2  * Copyright (c) 2017 Roboception GmbH
3  * All rights reserved
4  *
5  * Author: Christian Emmerich
6  *
7  * Redistribution and use in source and binary forms, with or without
8  * modification, are permitted provided that the following conditions are met:
9  *
10  * 1. Redistributions of source code must retain the above copyright notice,
11  * this list of conditions and the following disclaimer.
12  *
13  * 2. Redistributions in binary form must reproduce the above copyright notice,
14  * this list of conditions and the following disclaimer in the documentation
15  * and/or other materials provided with the distribution.
16  *
17  * 3. Neither the name of the copyright holder nor the names of its contributors
18  * may be used to endorse or promote products derived from this software without
19  * specific prior written permission.
20  *
21  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
22  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
23  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
24  * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
25  * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
26  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
27  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
28  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
29  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
30  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
31  * POSSIBILITY OF SUCH DAMAGE.
32  */
33 
34 #include "protobuf2ros_stream.h"
35 
38 
39 #include <ros/ros.h>
41 #include <visualization_msgs/Marker.h>
42 #include <nav_msgs/Odometry.h>
43 
45 
46 using namespace std;
47 
48 namespace rc
49 {
50 namespace rcd = dynamics;
51 
52 
53 bool Protobuf2RosStream::startReceivingAndPublishingAsRos()
54 {
55  unsigned int timeoutMillis = 500;
56 
57  initRosPublishers();
58 
59  unsigned int cntNoListener = 0;
60  bool failed = false;
61 
62  while (!_stop && !failed)
63  {
64 
65  // start streaming only if someone is listening
66 
67  bool someoneListens = checkRosPublishersUsed();
68  if (!someoneListens)
69  {
70  // ros getNumSubscribers usually takes a while to register the subscribers
71  if (++cntNoListener > 200)
72  {
73  _requested = false;
74  }
75  usleep(1000 * 10);
76  continue;
77  }
78  cntNoListener = 0;
79  _requested = true;
80  _success = false;
81  ROS_INFO_STREAM("rc_visard_driver: Enabled rc-dynamics stream: " << _stream);
82 
83  // initialize data stream to this host
84 
85  rcd::DataReceiver::Ptr receiver;
86  while (!_stop && someoneListens && !receiver)
87  {
88  someoneListens = checkRosPublishersUsed();
89  try
90  {
91  receiver = _rcdyn->createReceiverForStream(_stream);
92  receiver->setTimeout(timeoutMillis);
93  ROS_INFO_STREAM("rc_visard_driver: rc-dynamics stream ready: " << _stream);
94  }
95  catch (rcd::RemoteInterface::DynamicsNotRunning& e)
96  {
97  stringstream msg;
98  msg << "Could not initialize rc-dynamics stream: " << _stream << ":" << endl << e.what();
99  msg << "\nWaiting until rc_dynamics node is running...." << endl;
100  msg << "\(Hint: Use one of this node's /dynamics_start* services.)" << endl;
101  ROS_WARN_STREAM_THROTTLE(30, msg.str());
102  usleep(1000 * 1000);
103  continue;
104  }
105  catch (exception& e)
106  {
107  ROS_ERROR_STREAM(std::string("Could not initialize rc-dynamics stream: ") + _stream + ": " + e.what());
108  failed = true;
109  _stop = true;
110  break;
111  }
112  }
113 
114  // main loop for listening, receiving and republishing data to ROS
115 
116  string pbMsgType = _rcdyn->getPbMsgTypeOfStream(_stream);
117  shared_ptr<::google::protobuf::Message> pbMsg;
118  unsigned int consecutive_timeouts = 0;
119  while (!_stop && someoneListens)
120  {
121  someoneListens = checkRosPublishersUsed();
122  // try receive msg; blocking call (timeout)
123  try
124  {
125  pbMsg = receiver->receive(pbMsgType);
126  _success = true;
127  }
128  catch (std::exception& e)
129  {
130  ROS_ERROR_STREAM("Caught exception during receiving " << _stream << ": " << e.what());
131  failed = true;
132  break; // stop receiving loop
133  }
134 
135  // timeout happened
136  if (!pbMsg)
137  {
138  // check if dynamics node is still running
139 
140  try
141  {
142  string state = _rcdyn->getDynamicsState();
143  if (state != rcd::RemoteInterface::State::RUNNING &&
144  state != rcd::RemoteInterface::State::RUNNING_WITH_SLAM)
145  {
146  ROS_WARN_STREAM_THROTTLE(30, "Not receiving any " << _stream << " messages because rc_visard's dynamics node is in state " << state << "!");
147  continue; // still keep stream alive and retry until dynamics is turned on
148  }
149  } catch (exception& e)
150  {
151  ROS_ERROR_STREAM("Did not receive any " << _stream << " message within the last " << timeoutMillis << " ms. "
152  << "Could not access state of rc_visard's rc_dynamics module: " << e.what());
153  break; // stop receiving loop
154  }
155 
156  // check if stream is still active on rc_visard
157 
158  string stream_dest = receiver->getIpAddress() + ":" + std::to_string(receiver->getPort());
159  list<string> active_destinations = _rcdyn->getDestinationsOfStream(_stream);
160  if (std::find(active_destinations.begin(), active_destinations.end(), stream_dest) == active_destinations.end())
161  {
162  ROS_WARN_STREAM("Not receiving any " << _stream << " messages because rc_visard stopped streaming to destination "
163  << stream_dest << "! (Did someone delete this stream destination?) Re-creating stream... ");
164  break; // stop receiving loop and re-create receiver
165  }
166 
167 
168  if (++consecutive_timeouts>3) {
169  // dynamics state takes a while to change to a non-running state; give warning only if we are sure to still be in running-state
170  ROS_WARN_STREAM_THROTTLE(1, "Did not receive any " << _stream << " message within the last " << timeoutMillis << " ms. "
171  << "There seems to problems with your network or connection to rc_visard! Resetting stream..."
172  << "(Please check https://tutorials.roboception.de/rc_visard_general/network_setup for proper network setup!)");
173  break; // stop receiving loop
174  }
175  continue; // wait for next packet
176  }
177 
178  //ROS_DEBUG_STREAM_THROTTLE(1, "Received protobuf message: " << pbMsg->DebugString());
179 
180  // convert to ROS message(s) and publish
181  publishToRos(pbMsg);
182 
183  } // receiving loop
184 
185  ROS_INFO_STREAM("rc_visard_driver: Disabled rc-dynamics stream: " << _stream);
186 
187  } // main thread loop
188 
189  // return info about stream being stopped externally or failed internally
190  return !failed;
191 }
192 
193 void Protobuf2RosStream::initRosPublishers()
194 {
195  string pbMsgType = _rcdyn->getPbMsgTypeOfStream(_stream);
196  _rosPublisher = shared_ptr<Protobuf2RosPublisher>(new Protobuf2RosPublisher(_nh, _stream, pbMsgType, _tfPrefix));
197 }
198 
199 bool Protobuf2RosStream::checkRosPublishersUsed()
200 {
201  return _rosPublisher->used();
202 }
203 
204 void Protobuf2RosStream::publishToRos(shared_ptr<::google::protobuf::Message> pbMsg)
205 {
206  _rosPublisher->publish(pbMsg);
207 }
208 
209 void PoseAndTFStream::initRosPublishers()
210 {
211  Protobuf2RosStream::initRosPublishers();
212  _tf_pub = shared_ptr<tf::TransformBroadcaster>(new tf::TransformBroadcaster());
213 }
214 
215 bool PoseAndTFStream::checkRosPublishersUsed()
216 {
217  return Protobuf2RosStream::checkRosPublishersUsed() || _tfEnabled ;
218 }
219 
220 void PoseAndTFStream::publishToRos(shared_ptr<::google::protobuf::Message> pbMsg)
221 {
222  Protobuf2RosStream::publishToRos(pbMsg);
223 
224  shared_ptr<roboception::msgs::Frame> protoFrame = dynamic_pointer_cast<roboception::msgs::Frame>(pbMsg);
225 
226  // overwrite frame name/ids
227  protoFrame->set_parent(_tfPrefix + protoFrame->parent());
228  protoFrame->set_name(_tfPrefix + protoFrame->name());
229 
230  // convert to tf and publish
231  if (_tfEnabled)
232  {
233  tf::StampedTransform transform = toRosTfStampedTransform(*protoFrame);
234  _tf_pub->sendTransform(transform);
235  }
236 
237 }
238 
239 void DynamicsStream::initRosPublishers()
240 {
241  // publish visualization markers only if enabled by ros param and only for 'dynamics', not for 'dynamics_ins'
242 
243  _publishVisualizationMarkers = false;
244  ros::NodeHandle("~").param("enable_visualization_markers", _publishVisualizationMarkers, _publishVisualizationMarkers);
245  _publishVisualizationMarkers = _publishVisualizationMarkers && (_stream == "dynamics");
246 
247  // create publishers
248 
249  _pub_odom = shared_ptr<ros::Publisher>(new ros::Publisher(_nh.advertise<nav_msgs::Odometry>(_stream, 1000)));
250  if (_publishVisualizationMarkers)
251  {
252  _pub_markers = shared_ptr<ros::Publisher>(new ros::Publisher(
253  _nh.advertise<visualization_msgs::Marker>("dynamics_visualization_markers", 1000)));
254  }
255  if (_publishImu2CamAsTf)
256  {
257  _tf_pub = shared_ptr<tf::TransformBroadcaster>(new tf::TransformBroadcaster());
258  }
259 }
260 
261 bool DynamicsStream::checkRosPublishersUsed()
262 {
263  return _pub_odom->getNumSubscribers() > 0 || (_publishVisualizationMarkers && _pub_markers->getNumSubscribers() > 0);
264 }
265 
266 void DynamicsStream::publishToRos(shared_ptr<::google::protobuf::Message> pbMsg)
267 {
268  shared_ptr<roboception::msgs::Dynamics> protoMsg = dynamic_pointer_cast<roboception::msgs::Dynamics>(pbMsg);
269 
270  // prefix all frame ids before
271  protoMsg->set_pose_frame(_tfPrefix + protoMsg->pose_frame());
272  protoMsg->set_linear_velocity_frame(_tfPrefix + protoMsg->linear_velocity_frame());
273  protoMsg->set_angular_velocity_frame(_tfPrefix + protoMsg->angular_velocity_frame());
274  protoMsg->set_linear_acceleration_frame(_tfPrefix + protoMsg->linear_acceleration_frame());
275  protoMsg->mutable_cam2imu_transform()->set_name(_tfPrefix + protoMsg->cam2imu_transform().name());
276  protoMsg->mutable_cam2imu_transform()->set_parent(_tfPrefix + protoMsg->cam2imu_transform().parent());
277 
278  // time stamp of this message
279  ros::Time msgStamp = toRosTime(protoMsg->timestamp());
280 
281  // if desired, invert and convert cam2imu to tf and publish it
282  if (_publishImu2CamAsTf)
283  {
284  auto cam2imu = toRosTfStampedTransform(protoMsg->cam2imu_transform());
285  // overwriting cam2imu timestamp as this would be too old for normal use with tf
286  auto imu2cam = tf::StampedTransform(cam2imu.inverse(), msgStamp, cam2imu.child_frame_id_, cam2imu.frame_id_);
287  _tf_pub->sendTransform(imu2cam);
288  }
289 
290  // convert pose to transform to use it for transformations
291  auto imu2world_rot_only = toRosTfTransform(protoMsg->pose());
292  imu2world_rot_only.setOrigin(tf::Vector3(0.0, 0.0, 0.0));
293 
294  // publish rc dynamics message as ros odometry
295  auto odom = boost::make_shared<nav_msgs::Odometry>();
296  odom->header.frame_id = protoMsg->pose_frame(); // "world"
297  odom->header.stamp = msgStamp;
298  odom->child_frame_id = protoMsg->linear_acceleration_frame(); //"imu"
299  odom->pose.pose = *toRosPose(protoMsg->pose());
300  // for odom twist, we need to transform lin_velocity from world to imu
301  auto lin_vel = protoMsg->linear_velocity();
302  auto lin_vel_transformed = imu2world_rot_only.inverse() * tf::Vector3(lin_vel.x(), lin_vel.y(), lin_vel.z());
303  odom->twist.twist.linear.x = lin_vel_transformed.x();
304  odom->twist.twist.linear.y = lin_vel_transformed.y();
305  odom->twist.twist.linear.z = lin_vel_transformed.z();
306  auto ang_vel = protoMsg->angular_velocity(); // ang_velocity already is in imu
307  odom->twist.twist.angular.x = ang_vel.x();
308  odom->twist.twist.angular.y = ang_vel.y();
309  odom->twist.twist.angular.z = ang_vel.z();
310  _pub_odom->publish(odom);
311 
312  // convert velocities and accelerations to visualization Markers
313  if (_publishVisualizationMarkers)
314  {
315  geometry_msgs::Point start, end;
316  auto protoPosePosition = protoMsg->pose().position();
317  start.x = protoPosePosition.x();
318  start.y = protoPosePosition.y();
319  start.z = protoPosePosition.z();
320 
321  visualization_msgs::Marker lin_vel_marker; // single marker for linear velocity
322  lin_vel_marker.header.stamp = msgStamp;
323  lin_vel_marker.header.frame_id = protoMsg->linear_velocity_frame();
324  lin_vel_marker.ns = _tfPrefix;
325  lin_vel_marker.id = 0;
326  lin_vel_marker.type = visualization_msgs::Marker::ARROW;
327  lin_vel_marker.action = visualization_msgs::Marker::MODIFY;
328  lin_vel_marker.frame_locked = true;
329  end.x = start.x + lin_vel.x();
330  end.y = start.y + lin_vel.y();
331  end.z = start.z + lin_vel.z();
332  lin_vel_marker.points.push_back(start);
333  lin_vel_marker.points.push_back(end);
334  lin_vel_marker.scale.x = 0.005;
335  lin_vel_marker.scale.y = 0.01;
336  lin_vel_marker.color.a = 1;
337  lin_vel_marker.color.g = lin_vel_marker.color.b = 1.0; // cyan
338  _pub_markers->publish(lin_vel_marker);
339 
340  visualization_msgs::Marker ang_vel_marker; // single marker for angular velocity
341  ang_vel_marker.header.stamp = msgStamp;
342  ang_vel_marker.header.frame_id = protoMsg->pose_frame(); // "world"
343  ang_vel_marker.ns = _tfPrefix;
344  ang_vel_marker.id = 1;
345  ang_vel_marker.type = visualization_msgs::Marker::ARROW;
346  ang_vel_marker.action = visualization_msgs::Marker::MODIFY;
347  ang_vel_marker.frame_locked = true;
348  auto ang_vel_transformed = imu2world_rot_only * tf::Vector3(ang_vel.x(), ang_vel.y(), ang_vel.z());
349  end.x = start.x + ang_vel_transformed.x();
350  end.y = start.y + ang_vel_transformed.y();
351  end.z = start.z + ang_vel_transformed.z();
352  ang_vel_marker.points.push_back(start);
353  ang_vel_marker.points.push_back(end);
354  ang_vel_marker.scale.x = 0.005;
355  ang_vel_marker.scale.y = 0.01;
356  ang_vel_marker.color.a = 1;
357  ang_vel_marker.color.r = ang_vel_marker.color.b = 1.0; // mangenta
358  _pub_markers->publish(ang_vel_marker);
359 
360  visualization_msgs::Marker lin_accel_marker; // single marker for linear acceleration
361  lin_accel_marker.header.stamp = msgStamp;
362  lin_accel_marker.header.frame_id = protoMsg->pose_frame(); // "world"
363  lin_accel_marker.ns = _tfPrefix;
364  lin_accel_marker.id = 2;
365  lin_accel_marker.type = visualization_msgs::Marker::ARROW;
366  lin_accel_marker.action = visualization_msgs::Marker::MODIFY;
367  lin_accel_marker.frame_locked = true;
368  auto lin_accel = protoMsg->linear_acceleration();
369  auto lin_accel_transformed = imu2world_rot_only * tf::Vector3(lin_accel.x(), lin_accel.y(), lin_accel.z());
370  end.x = start.x + lin_accel_transformed.x();
371  end.y = start.y + lin_accel_transformed.y();
372  end.z = start.z + lin_accel_transformed.z();
373  lin_accel_marker.points.push_back(start);
374  lin_accel_marker.points.push_back(end);
375  lin_accel_marker.scale.x = 0.005;
376  lin_accel_marker.scale.y = 0.01;
377  lin_accel_marker.color.a = 1;
378  lin_accel_marker.color.r = lin_accel_marker.color.g = 1.0; // yellow
379 
380  _pub_markers->publish(lin_accel_marker);
381  }
382 
383 }
384 
385 }
msg
msg
ros::Publisher
ROS_ERROR_STREAM
#define ROS_ERROR_STREAM(args)
ros.h
tf::StampedTransform
transform_broadcaster.h
rc
ROS_WARN_STREAM
#define ROS_WARN_STREAM(args)
ROS_WARN_STREAM_THROTTLE
#define ROS_WARN_STREAM_THROTTLE(period, args)
rc::toRosTime
ros::Time toRosTime(const roboception::msgs::Time &time)
Definition: protobuf2ros_conversions.cc:18
tf::TransformBroadcaster
unexpected_receive_timeout.h
ROS_INFO_STREAM
#define ROS_INFO_STREAM(args)
rc::toRosTfStampedTransform
tf::StampedTransform toRosTfStampedTransform(const roboception::msgs::Frame &frame)
Definition: protobuf2ros_conversions.cc:118
start
ROSCPP_DECL void start()
ros::Time
rc::toRosPose
geometry_msgs::PosePtr toRosPose(const roboception::msgs::Pose &pose)
Definition: protobuf2ros_conversions.cc:39
std
protobuf2ros_publisher.h
protobuf2ros_conversions.h
rc::toRosTfTransform
tf::Transform toRosTfTransform(const roboception::msgs::Pose &pose)
Definition: protobuf2ros_conversions.cc:109
protobuf2ros_stream.h
ros::NodeHandle
rc::Protobuf2RosPublisher
Generic implementation for publishing protobuf messages to ros.
Definition: protobuf2ros_publisher.h:45


rc_visard_driver
Author(s): Heiko Hirschmueller , Christian Emmerich , Felix Ruess
autogenerated on Sun May 15 2022 02:25:43