relay.cpp
Go to the documentation of this file.
1 // relay just passes messages on. it can be useful if you're trying to ensure
3 // that a message doesn't get sent twice over a wireless link, by having the
4 // relay catch the message and then do the fanout on the far side of the
5 // wireless link.
6 //
7 // Copyright (C) 2009, Morgan Quigley
8 //
9 // Redistribution and use in source and binary forms, with or without
10 // modification, are permitted provided that the following conditions are met:
11 // * Redistributions of source code must retain the above copyright notice,
12 // this list of conditions and the following disclaimer.
13 // * Redistributions in binary form must reproduce the above copyright
14 // notice, this list of conditions and the following disclaimer in the
15 // documentation and/or other materials provided with the distribution.
16 // * Neither the name of Stanford University nor the names of its
17 // contributors may be used to endorse or promote products derived from
18 // this software without specific prior written permission.
19 //
20 // THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
21 // AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
22 // IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
23 // ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
24 // LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
25 // CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
26 // SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
27 // INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
28 // CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
29 // ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
30 // POSSIBILITY OF SUCH DAMAGE.
32 
33 
34 #include <cstdio>
36 #include "topic_tools/parse.h"
37 
38 using std::string;
39 using std::vector;
40 using namespace topic_tools;
41 
43 bool g_advertised = false;
49 bool g_lazy;
50 bool g_stealth;
52 
54 void in_cb(const ros::MessageEvent<ShapeShifter>& msg_event);
55 
56 void subscribe()
57 {
58  g_sub = new ros::Subscriber(g_node->subscribe(g_input_topic, 10, &in_cb, g_th));
59 }
60 
62 {
63  if (g_sub)
64  {
65  delete g_sub;
66  g_sub = NULL;
67  }
68 }
69 
71 {
72  // If we're in lazy subscribe mode, and the first subscriber just
73  // connected, then subscribe, #3389.
74  if(g_lazy && !g_stealth && !g_sub)
75  {
76  ROS_DEBUG("lazy mode; resubscribing");
77  subscribe();
78  }
79 }
80 
81 void in_cb(const ros::MessageEvent<ShapeShifter>& msg_event)
82 {
84  boost::shared_ptr<const ros::M_string> const& connection_header = msg_event.getConnectionHeaderPtr();
85 
86  if (!g_advertised)
87  {
88  // If the input topic is latched, make the output topic latched, #3385.
89  bool latch = false;
90  if (connection_header)
91  {
92  ros::M_string::const_iterator it = connection_header->find("latching");
93  if((it != connection_header->end()) && (it->second == "1"))
94  {
95  ROS_DEBUG("input topic is latched; latching output topic to match");
96  latch = true;
97  }
98  }
99  g_pub = msg->advertise(*g_node, g_output_topic, 10, latch, conn_cb);
100  g_advertised = true;
101  ROS_INFO("advertised as %s\n", g_output_topic.c_str());
102  }
103  // If we're in lazy subscribe mode, and nobody's listening,
104  // then unsubscribe, #3389.
105  if((g_lazy || g_stealth) && !g_pub.getNumSubscribers())
106  {
107  ROS_DEBUG("lazy mode; unsubscribing");
108  unsubscribe();
109  }
110  else
111  g_pub.publish(msg);
112 }
113 
115 {
116  if (!g_advertised) return;
117 
118  // get subscriber num of ~monitor_topic
120  if (!ros::master::execute("getSystemState", req, res, data, false))
121  {
122  ROS_ERROR("Failed to communicate with rosmaster");
123  return;
124  }
125 
126  int subscriber_num = 0;
127  XmlRpc::XmlRpcValue sub_info = data[1];
128  for (int i = 0; i < sub_info.size(); ++i)
129  {
130  string topic_name = sub_info[i][0];
131  if (topic_name != g_monitor_topic) continue;
132  XmlRpc::XmlRpcValue& subscribers = sub_info[i][1];
133  for (int j = 0; j < subscribers.size(); ++j)
134  {
135  if (subscribers[j] != ros::this_node::getName()) ++subscriber_num;
136  }
137  break;
138  }
139 
140  // if no node subscribes to ~monitor, do unsubscribe
141  if (g_sub && subscriber_num == 0) unsubscribe();
142  // if any other nodes subscribe ~monitor, do subscribe
143  else if (!g_sub && subscriber_num > 0) subscribe();
144 }
145 
146 int main(int argc, char **argv)
147 {
148  if (argc < 2)
149  {
150  printf("\nusage: relay IN_TOPIC [OUT_TOPIC]\n\n");
151  return 1;
152  }
153  std::string topic_name;
154  if(!getBaseName(string(argv[1]), topic_name))
155  return 1;
156  ros::init(argc, argv, topic_name + string("_relay"),
158  if (argc == 2)
159  g_output_topic = string(argv[1]) + string("_relay");
160  else // argc == 3
161  g_output_topic = string(argv[2]);
162  g_input_topic = string(argv[1]);
163  ros::NodeHandle n;
164  g_node = &n;
165 
166  ros::NodeHandle pnh("~");
167  bool unreliable = false;
168  pnh.getParam("unreliable", unreliable);
169  pnh.getParam("lazy", g_lazy);
170  if (unreliable)
171  g_th.unreliable().reliable(); // Prefers unreliable, but will accept reliable.
172 
173  pnh.param<bool>("stealth", g_stealth, false);
174  ros::Timer monitor_timer;
175  if (g_stealth)
176  {
177  double monitor_rate;
178  pnh.param<string>("monitor_topic", g_monitor_topic, g_input_topic);
179  pnh.param<double>("monitor_rate", monitor_rate, 1.0);
180  monitor_timer = n.createTimer(ros::Duration(monitor_rate), &timer_cb);
181  }
182 
183  subscribe();
184  ros::spin();
185  return 0;
186 }
187 
const boost::shared_ptr< ConstMessage > & getConstMessage() const
void in_cb(const ros::MessageEvent< ShapeShifter > &msg_event)
Definition: relay.cpp:81
ros::NodeHandle * g_node
Definition: relay.cpp:42
TransportHints & reliable()
bool g_advertised
Definition: relay.cpp:43
Timer createTimer(Rate r, Handler h, Obj o, bool oneshot=false, bool autostart=true) const
bool g_stealth
Definition: relay.cpp:50
Subscriber subscribe(const std::string &topic, uint32_t queue_size, void(T::*fp)(M), T *obj, const TransportHints &transport_hints=TransportHints())
string g_output_topic
Definition: relay.cpp:45
ROSCPP_DECL void init(int &argc, char **argv, const std::string &name, uint32_t options=0)
ROSCPP_DECL const std::string & getName()
void timer_cb(const ros::TimerEvent &)
Definition: relay.cpp:114
void subscribe()
Definition: relay.cpp:56
string g_input_topic
Definition: relay.cpp:44
ros::TransportHints g_th
Definition: relay.cpp:51
bool g_lazy
Definition: relay.cpp:49
bool param(const std::string &param_name, T &param_val, const T &default_val) const
void publish(const boost::shared_ptr< M > &message) const
TransportHints & unreliable()
TOPIC_TOOLS_DECL bool getBaseName(const std::string &full_name, std::string &base_name)
Definition: parse.cpp:40
string g_monitor_topic
Definition: relay.cpp:46
#define ROS_INFO(...)
bool getParam(const std::string &key, std::string &s) const
ros::Publisher g_pub
Definition: relay.cpp:47
ROSCPP_DECL bool execute(const std::string &method, const XmlRpc::XmlRpcValue &request, XmlRpc::XmlRpcValue &response, XmlRpc::XmlRpcValue &payload, bool wait_for_master)
ROSCPP_DECL void spin()
int main(int argc, char **argv)
Definition: relay.cpp:146
void conn_cb(const ros::SingleSubscriberPublisher &)
Definition: relay.cpp:70
void unsubscribe()
Definition: relay.cpp:61
const boost::shared_ptr< M_string > & getConnectionHeaderPtr() const
uint32_t getNumSubscribers() const
#define ROS_ERROR(...)
ros::Subscriber * g_sub
Definition: relay.cpp:48
#define ROS_DEBUG(...)


topic_tools
Author(s): Morgan Quigley, Brian Gerkey, Dirk Thomas
autogenerated on Mon Feb 28 2022 23:34:17