relay.cpp
Go to the documentation of this file.
1 // relay just passes messages on. it can be useful if you're trying to ensure
3 // that a message doesn't get sent twice over a wireless link, by having the
4 // relay catch the message and then do the fanout on the far side of the
5 // wireless link.
6 //
7 // Copyright (C) 2009, Morgan Quigley
8 //
9 // Redistribution and use in source and binary forms, with or without
10 // modification, are permitted provided that the following conditions are met:
11 // * Redistributions of source code must retain the above copyright notice,
12 // this list of conditions and the following disclaimer.
13 // * Redistributions in binary form must reproduce the above copyright
14 // notice, this list of conditions and the following disclaimer in the
15 // documentation and/or other materials provided with the distribution.
16 // * Neither the name of Stanford University nor the names of its
17 // contributors may be used to endorse or promote products derived from
18 // this software without specific prior written permission.
19 //
20 // THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
21 // AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
22 // IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
23 // ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
24 // LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
25 // CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
26 // SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
27 // INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
28 // CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
29 // ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
30 // POSSIBILITY OF SUCH DAMAGE.
32 
33 
34 #include <cstdio>
36 #include "topic_tools/parse.h"
37 
38 using std::string;
39 using std::vector;
40 using namespace topic_tools;
41 
43 bool g_advertised = false;
50 bool g_lazy;
51 bool g_stealth;
53 
55 void in_cb(const ros::MessageEvent<ShapeShifter>& msg_event);
56 
57 void subscribe()
58 {
59  g_sub = new ros::Subscriber(g_node->subscribe(g_input_topic, 10, &in_cb, g_th));
60 }
61 
63 {
64  if (g_sub)
65  {
66  delete g_sub;
67  g_sub = NULL;
68  }
69 }
70 
72 {
73  // If we're in lazy subscribe mode, and the first subscriber just
74  // connected, then subscribe, #3389.
75  if(g_lazy && !g_stealth && !g_sub)
76  {
77  ROS_DEBUG("lazy mode; resubscribing");
78  subscribe();
79  }
80 }
81 
82 void in_cb(const ros::MessageEvent<ShapeShifter>& msg_event)
83 {
85  boost::shared_ptr<const ros::M_string> const& connection_header = msg_event.getConnectionHeaderPtr();
86 
87  if (!g_advertised)
88  {
89  // If the input topic is latched, make the output topic latched, #3385.
90  bool latch = false;
91  if (connection_header)
92  {
93  ros::M_string::const_iterator it = connection_header->find("latching");
94  if((it != connection_header->end()) && (it->second == "1"))
95  {
96  ROS_DEBUG("input topic is latched; latching output topic to match");
97  latch = true;
98  }
99  }
100  g_pub = msg->advertise(*g_node, g_output_topic, 10, latch, conn_cb);
101  g_advertised = true;
102  ROS_INFO("advertised as %s\n", g_output_topic.c_str());
103  }
104  // If we're in lazy subscribe mode, and nobody's listening,
105  // then unsubscribe, #3389.
106  if((g_lazy || g_stealth) && !g_pub.getNumSubscribers())
107  {
108  ROS_DEBUG("lazy mode; unsubscribing");
109  unsubscribe();
110  }
111  else
112  g_pub.publish(msg);
113 }
114 
116 {
117  if (!g_advertised) return;
118 
119  // get subscriber num of ~monitor_topic
121  if (!ros::master::execute("getSystemState", req, res, data, false))
122  {
123  ROS_ERROR("Failed to communicate with rosmaster");
124  return;
125  }
126 
127  int subscriber_num = 0;
128  XmlRpc::XmlRpcValue sub_info = data[1];
129  for (int i = 0; i < sub_info.size(); ++i)
130  {
131  string topic_name = sub_info[i][0];
132  if (topic_name != g_monitor_topic) continue;
133  XmlRpc::XmlRpcValue& subscribers = sub_info[i][1];
134  for (int j = 0; j < subscribers.size(); ++j)
135  {
136  if (subscribers[j] != ros::this_node::getName()) ++subscriber_num;
137  }
138  break;
139  }
140 
141  // if no node subscribes to ~monitor, do unsubscribe
142  if (g_sub && subscriber_num == 0) unsubscribe();
143  // if any other nodes subscribe ~monitor, do subscribe
144  else if (!g_sub && subscriber_num > 0) subscribe();
145 }
146 
147 int main(int argc, char **argv)
148 {
149  if (argc < 2)
150  {
151  printf("\nusage: relay IN_TOPIC [OUT_TOPIC]\n\n");
152  return 1;
153  }
154  std::string topic_name;
155  if(!getBaseName(string(argv[1]), topic_name))
156  return 1;
157  ros::init(argc, argv, topic_name + string("_relay"),
159  if (argc == 2)
160  g_output_topic = string(argv[1]) + string("_relay");
161  else // argc == 3
162  g_output_topic = string(argv[2]);
163  g_input_topic = string(argv[1]);
164  ros::NodeHandle n;
165  g_node = &n;
166 
167  ros::NodeHandle pnh("~");
168  bool unreliable = false;
169  pnh.getParam("unreliable", unreliable);
170  pnh.getParam("lazy", g_lazy);
171  if (unreliable)
172  g_th.unreliable().reliable(); // Prefers unreliable, but will accept reliable.
173 
174  pnh.param<bool>("stealth", g_stealth, false);
175  if (g_stealth)
176  {
177  double monitor_rate;
178  pnh.param<string>("monitor_topic", g_monitor_topic, g_input_topic);
179  pnh.param<double>("monitor_rate", monitor_rate, 1.0);
180  g_timer = n.createTimer(ros::Duration(monitor_rate), &timer_cb);
181  }
182 
183  subscribe();
184  ros::spin();
185  return 0;
186 }
187 
void in_cb(const ros::MessageEvent< ShapeShifter > &msg_event)
Definition: relay.cpp:82
ros::NodeHandle * g_node
Definition: relay.cpp:42
TransportHints & reliable()
void publish(const boost::shared_ptr< M > &message) const
bool g_advertised
Definition: relay.cpp:43
bool g_stealth
Definition: relay.cpp:51
Subscriber subscribe(const std::string &topic, uint32_t queue_size, void(T::*fp)(M), T *obj, const TransportHints &transport_hints=TransportHints())
string g_output_topic
Definition: relay.cpp:45
int size() const
ROSCPP_DECL void init(int &argc, char **argv, const std::string &name, uint32_t options=0)
ROSCPP_DECL const std::string & getName()
const boost::shared_ptr< M_string > & getConnectionHeaderPtr() const
void timer_cb(const ros::TimerEvent &)
Definition: relay.cpp:115
ros::Timer g_timer
Definition: relay.cpp:49
void subscribe()
Definition: relay.cpp:57
string g_input_topic
Definition: relay.cpp:44
const boost::shared_ptr< ConstMessage > & getConstMessage() const
ros::TransportHints g_th
Definition: relay.cpp:52
bool g_lazy
Definition: relay.cpp:50
TransportHints & unreliable()
TOPIC_TOOLS_DECL bool getBaseName(const std::string &full_name, std::string &base_name)
Definition: parse.cpp:40
string g_monitor_topic
Definition: relay.cpp:46
#define ROS_INFO(...)
bool param(const std::string &param_name, T &param_val, const T &default_val) const
ros::Publisher g_pub
Definition: relay.cpp:47
Timer createTimer(Rate r, Handler h, Obj o, bool oneshot=false, bool autostart=true) const
ROSCPP_DECL bool execute(const std::string &method, const XmlRpc::XmlRpcValue &request, XmlRpc::XmlRpcValue &response, XmlRpc::XmlRpcValue &payload, bool wait_for_master)
ROSCPP_DECL void spin()
int main(int argc, char **argv)
Definition: relay.cpp:147
void conn_cb(const ros::SingleSubscriberPublisher &)
Definition: relay.cpp:71
uint32_t getNumSubscribers() const
bool getParam(const std::string &key, std::string &s) const
void unsubscribe()
Definition: relay.cpp:62
#define ROS_ERROR(...)
ros::Subscriber * g_sub
Definition: relay.cpp:48
#define ROS_DEBUG(...)


topic_tools
Author(s): Morgan Quigley, Brian Gerkey
autogenerated on Sun Feb 3 2019 03:30:24