message_relay_node.cpp
Go to the documentation of this file.
1 
28 
29 #include "ros/ros.h"
30 #include "ros/callback_queue.h"
31 #include "ros/transport_hints.h"
32 
33 #include "multimaster_msgs/GetClockOffset.h"
34 
35 #include <boost/algorithm/string.hpp>
36 
37 #include <vector>
38 #include <string>
39 
40 int main(int argc, char *argv[])
41 {
42  ros::init(argc, argv, "message_relay_node");
43 
44  ros::NodeHandle private_nh("~");
45 
46  // Get private parameters from a different namespace: allows for dynamic node names, but with a stable parameter
47  // namespace (/fancy_name_relay reading parameters from /robot_relay)
48  std::string parameter_namespace;
49  if (private_nh.getParam("parameter_namespace", parameter_namespace))
50  {
51  private_nh = ros::NodeHandle(parameter_namespace);
52  }
53 
54  // Get base origin and source nodehandles for this relay.
55  // Topic, service, and action relays will relay from origin to source.
56  std::string from = "", to = "";
57  if (!private_nh.getParam("from", from) && !private_nh.getParam("to", to))
58  {
59  ROS_FATAL("Must provide either 'from' and 'to' namespace parameters");
60  return 1;
61  }
62 
63  if (from == to)
64  {
65  ROS_FATAL("'from' and 'to' namespaces must not be equal");
66  return 1;
67  }
68 
69  boost::shared_ptr<ros::NodeHandle> origin = boost::make_shared<ros::NodeHandle>(from);
70  boost::shared_ptr<ros::NodeHandle> target = boost::make_shared<ros::NodeHandle>(to);
71 
72  // Determine if a frame_id transform should occur at this relay. The relay can either add or remove a tf_prefix.
73  std::string tf_prefix, prefix_operation;
74  if (private_nh.getParam("tf_prefix", tf_prefix) && !private_nh.getParam("prefix_operation", prefix_operation))
75  {
76  ROS_FATAL_STREAM("If tf_prefix is provided, a prefix_operation must be specified");
77  return 1;
78  }
79 
80  std::vector<std::string> global_frames;
81  private_nh.getParam("global_frames", global_frames);
82 
84  tf_prefix, prefix_operation, boost::unordered_set<std::string>(global_frames.begin(), global_frames.end()));
85 
86  // Determine if a time offset should be applied to all outgoing messages/services.
87  // The offset is read dynamically from a central GetClockOffset service, typically managed by a
88  // specialized clock_relay_node
89  std::string time_offset_operation;
90  private_nh.param<std::string>("time_offset_operation", time_offset_operation, "");
91 
93  if (time_offset_operation != "")
94  {
95  // get time offset from clock_relay
96  ros::ServiceClient clock_offset_client = origin->serviceClient<multimaster_msgs::GetClockOffset>
97  ("/get_clock_offset");
98  while (!clock_offset_client.waitForExistence(ros::Duration(5.0)))
99  {
100  ROS_WARN_STREAM("Waiting for /get_clock_offset service. Is clock_relay_node running?");
101  }
102  multimaster_msgs::GetClockOffset srv;
103  while (ros::ok() && !clock_offset_client.call(srv))
104  {
105  ROS_ERROR_STREAM("Failed to get clock offset from /get_clock_offset");
106  }
107  time_processor = message_relay::TimeProcessor::create(time_offset_operation, srv.response.clock_offset.offset);
108  }
109 
110  // Build topic relays from parameters
111  std::vector<message_relay::TopicRelay::Ptr> topic_relays;
112  XmlRpc::XmlRpcValue topics;
113  if (private_nh.getParam("topics", topics))
114  {
115  for (int i = 0; i < topics.size(); i++)
116  {
118  params.topic = static_cast<std::string>(topics[i]["topic"]);
119  params.type = static_cast<std::string>(topics[i]["type"]);
120  params.origin = origin;
121  params.target = target;
122  params.frame_id_processor = frame_id_processor;
123  params.time_processor = time_processor;
124  params.queue_size = static_cast<int>(topics[i]["queue_size"]);
125  if (params.queue_size == 0)
126  {
127  std::string key;
128  if (private_nh.searchParam("queue_size", key))
129  {
130  private_nh.getParam(key, params.queue_size);
131  }
132  else
133  {
134  // set default
135  params.queue_size = 100;
136  }
137  }
138  params.throttle_frequency = static_cast<double>(topics[i]["throttle_frequency"]);
139  if (params.throttle_frequency == 0.0)
140  {
141  std::string key;
142  if (private_nh.searchParam("throttle_frequency", key))
143  {
144  private_nh.getParam(key, params.throttle_frequency);
145  }
146  }
147  params.latch = static_cast<bool>(topics[i]["latch"]);
148  params.unreliable = static_cast<bool>(topics[i]["unreliable"]);
149  ROS_DEBUG_STREAM("Topic " << params.topic << " type " << params.type << " latch " << params.latch <<
150  " queue_size " << params.queue_size << " throttle " << params.throttle_frequency <<
151  " unreliable " << params.unreliable);
152  topic_relays.push_back(message_relay::createTopicRelay(params));
153  }
154  }
155 
156  // Determine if services should be processed on a separate queue, to prevent them from
157  // accidentally blocking topic relays
158  bool separate_service_queue;
159  private_nh.param("separate_service_queue", separate_service_queue, true);
160 
161  // Build service relays from parameters
163  if (separate_service_queue)
164  {
165  service_queue = boost::make_shared<ros::CallbackQueue>();
166  }
167  std::vector<message_relay::ServiceRelay::Ptr> service_relays;
168  XmlRpc::XmlRpcValue services;
169  if (private_nh.getParam("services", services))
170  {
171  for (int i = 0; i < services.size(); i++)
172  {
174  params.service = static_cast<std::string>(services[i]["service"]);
175  params.type = static_cast<std::string>(services[i]["type"]);
176  params.origin = origin;
177  params.target = target;
178  params.frame_id_processor = frame_id_processor;
179  params.time_processor = time_processor;
180  params.callback_queue = service_queue;
181  ROS_DEBUG_STREAM("Service " << params.service << " type " << params.type << " check_for_server_frequency " <<
183  service_relays.push_back(message_relay::createServiceRelay(params));
184  }
185  }
186 
187  // Build action relays from parameters
188  std::vector<message_relay::ActionRelay::Ptr> action_relays;
189  XmlRpc::XmlRpcValue actions;
190  if (private_nh.getParam("actions", actions))
191  {
192  for (int i = 0; i < actions.size(); i++)
193  {
195  params.action = static_cast<std::string>(actions[i]["action"]);
196  params.type = static_cast<std::string>(actions[i]["type"]);
197  params.origin = origin;
198  params.target = target;
199  params.frame_id_processor = frame_id_processor;
200  params.time_processor = time_processor;
201  params.queue_size = static_cast<int>(actions[i]["queue_size"]);
202  params.feedback_throttle_frequency = static_cast<double>(actions[i]["feedback_throttle_frequency"]);
203  if (params.queue_size == 0)
204  {
205  std::string key;
206  if (private_nh.searchParam("queue_size", key))
207  {
208  private_nh.getParam(key, params.queue_size);
209  }
210  else
211  {
212  // set default
213  params.queue_size = 100;
214  }
215  }
216  ROS_DEBUG_STREAM("Action " << params.action << " type " << params.type << " queue_size " << params.queue_size);
217  action_relays.push_back(message_relay::createActionRelay(params));
218  }
219  }
220 
221  // Process services on an async queue if necessary
222  boost::shared_ptr<ros::AsyncSpinner> service_spinner;
223  if (separate_service_queue)
224  {
225  service_spinner = boost::make_shared<ros::AsyncSpinner>(1, service_queue.get());
226  service_spinner->start();
227  }
228 
229  ros::spin();
230  return 0;
231 }
#define ROS_FATAL(...)
static ConstPtr create(std::string tf_prefix, std::string prefix_operation_string, boost::unordered_set< std::string > global_frame_names=boost::unordered_set< std::string >())
int size() const
ROSCPP_DECL void init(int &argc, char **argv, const std::string &name, uint32_t options=0)
boost::shared_ptr< ros::NodeHandle > origin
Definition: service_relay.h:41
bool call(MReq &req, MRes &res)
boost::shared_ptr< ros::NodeHandle > target
Definition: topic_relay.h:43
TimeProcessor::ConstPtr time_processor
Definition: topic_relay.h:45
ActionRelay::Ptr createActionRelay(const ActionRelayParams &params)
boost::shared_ptr< ros::NodeHandle > target
Definition: service_relay.h:42
ROSCPP_DECL void spin(Spinner &spinner)
TimeProcessor::ConstPtr time_processor
Definition: action_relay.h:47
FrameIdProcessor::ConstPtr frame_id_processor
Definition: topic_relay.h:44
bool waitForExistence(ros::Duration timeout=ros::Duration(-1))
#define ROS_FATAL_STREAM(args)
TimeProcessor::ConstPtr time_processor
Definition: service_relay.h:44
bool searchParam(const std::string &key, std::string &result) const
bool param(const std::string &param_name, T &param_val, const T &default_val) const
static ConstPtr create(std::string offset_operation_string, ros::Duration offset=ros::Duration(0.0))
ROSCPP_DECL bool ok()
int main(int argc, char *argv[])
boost::shared_ptr< ros::CallbackQueue > callback_queue
Definition: service_relay.h:46
TopicRelay::Ptr createTopicRelay(const TopicRelayParams &params)
#define ROS_WARN_STREAM(args)
#define ROS_DEBUG_STREAM(args)
boost::shared_ptr< ros::NodeHandle > origin
Definition: action_relay.h:44
boost::shared_ptr< ros::NodeHandle > target
Definition: action_relay.h:45
bool getParam(const std::string &key, std::string &s) const
#define ROS_ERROR_STREAM(args)
ServiceRelay::Ptr createServiceRelay(const ServiceRelayParams &params)
FrameIdProcessor::ConstPtr frame_id_processor
Definition: action_relay.h:46
FrameIdProcessor::ConstPtr frame_id_processor
Definition: service_relay.h:43
boost::shared_ptr< ros::NodeHandle > origin
Definition: topic_relay.h:42


message_relay
Author(s):
autogenerated on Wed Jul 17 2019 03:27:53