bridge_node.cpp
Go to the documentation of this file.
1 
24 #include "bridge_node.hpp"
25 
26 // /* send messages frequency control */
27 // // this is the original freq control func that has been deprecated
28 // bool send_freq_control(int i)
29 // {
30 // ros::Time t_now = ros::Time::now();
31 // bool discard_flag;
32 // if ((t_now - sub_t_last[i]).toSec() * sendTopics[i].max_freq < 1.0) {
33 // discard_flag = true;
34 // }
35 // else {
36 // discard_flag = false;
37 // sub_t_last[i] = t_now;
38 // }
39 // return discard_flag; // flag of discarding this message
40 // }
41 
42 /* send messages frequency control */
43 bool send_freq_control(int i)
44 {
45  bool discard_flag;
46  ros::Time t_now = ros::Time::now();
47  // check whether the send of this message will exceed the freq limit in the last period
48  if ((send_num[i] + 1) / (t_now - sub_t_last[i]).toSec() > sendTopics[i].max_freq) {
49  discard_flag = true;
50  }
51  else {
52  discard_flag = false;
53  send_num[i] ++;
54  }
55  // freq control period (1s)
56  if ((t_now - sub_t_last[i]).toSec() > 1.0){
57  sub_t_last[i] = t_now;
58  send_num[i] = 0;
59  }
60  return discard_flag; // flag of discarding this message
61 }
62 
63 /* uniform callback functions for ROS subscribers */
64 template <typename T, int i>
65 void sub_cb(const T &msg)
66 {
67  /* frequency control */
68  auto ignore_flag = send_freq_control(i);
69  if (ignore_flag){
70  return; // discard this message sending, abort
71  }
72 
73  /* serialize the sending messages into send_buffer */
74  namespace ser = ros::serialization;
75  size_t data_len = ser::serializationLength(msg); // bytes length of msg
76  std::unique_ptr<uint8_t> send_buffer(new uint8_t[data_len]); // create a dynamic length array
77  ser::OStream stream(send_buffer.get(), data_len);
78  ser::serialize(stream, msg);
79 
80  /* zmq send message */
81  zmqpp::message send_array;
82  send_array << data_len;
83  /* equal to:
84  send_array.add_raw(reinterpret_cast<void const*>(&data_len), sizeof(size_t));
85  */
86  send_array.add_raw(reinterpret_cast<void const *>(send_buffer.get()), data_len);
87  // std::cout << "ready send!" << std::endl;
88  // send(&, true) for non-blocking, send(&, false) for blocking
89  bool dont_block = false; // Actually for PUB mode zmq socket, send() will never block
90  senders[i]->send(send_array, dont_block);
91  // std::cout << "send!" << std::endl;
92 
93  // std::cout << msg << std::endl;
94  // std::cout << i << std::endl;
95 }
96 
97 
98 /* uniform deserialize and publish the receiving messages */
99 template<typename T>
100 void deserialize_pub(uint8_t* buffer_ptr, size_t msg_size, int i)
101 {
102  T msg;
103  // deserialize the receiving messages into ROS msg
104  namespace ser = ros::serialization;
105  ser::IStream stream(buffer_ptr, msg_size);
106  ser::deserialize(stream, msg);
107  // publish ROS msg
108  topic_pubs[i].publish(msg);
109 }
110 
111 
112 /* receive thread function to receive messages and publish them */
113 void recv_func(int i)
114 {
115  while(recv_thread_flags[i])
116  {
117  /* receive and process message */
118  zmqpp::message recv_array;
119  bool recv_flag; // receive success flag
120  // std::cout << "ready receive!" << std::endl;
121  // receive(&,true) for non-blocking, receive(&,false) for blocking
122  bool dont_block = false; // 'true' leads to high cpu load
123  if (recv_flag = receivers[i]->receive(recv_array, dont_block))
124  {
125  // std::cout << "receive!" << std::endl;
126  size_t data_len;
127  recv_array >> data_len; // unpack meta data
128  /* equal to:
129  recv_array.get(&data_len, recv_array.read_cursor++);
130  void get(T &value, size_t const cursor){
131  uint8_t const* byte = static_cast<uint8_t const*>(raw_data(cursor));
132  b = *byte;}
133  */
134  // a dynamic length array by unique_ptr
135  std::unique_ptr<uint8_t> recv_buffer(new uint8_t[data_len]);
136  // continue to copy the raw_data of recv_array into buffer
137  memcpy(recv_buffer.get(), static_cast<const uint8_t *>(recv_array.raw_data(recv_array.read_cursor())), data_len);
138  deserialize_publish(recv_buffer.get(), data_len, recvTopics[i].type, i);
139 
140  // std::cout << data_len << std::endl;
141  // std::cout << recv_buffer.get() << std::endl;
142  }
143 
144  /* if receive() does not block, sleep to decrease loop rate */
145  if (dont_block)
146  std::this_thread::sleep_for(std::chrono::microseconds(1000)); // sleep for us
147  else
148  {
149  /* check and report receive state */
150  if (recv_flag != recv_flags_last[i]){
151  std::string topicName = recvTopics[i].name;
152  if (topicName.at(0) != '/') {
153  if (ns == "/") {topicName = "/" + topicName;}
154  else {topicName = ns + "/" + topicName;}
155  } // print namespace prefix if topic name is not global
156  ROS_INFO("[bridge node] \"%s\" received!", topicName.c_str());
157  } // false -> true(first message in)
158  recv_flags_last[i] = recv_flag;
159  }
160  }
161  return;
162 }
163 
164 /* close recv socket, unsubscribe ROS topic */
165 void stop_send(int i)
166 {
167  // senders[i]->unbind(std::string const &endpoint);
168  senders[i]->close(); // close the send socket
169  topic_subs[i].shutdown(); // unsubscribe
170 }
171 
172 /* stop recv thread, close recv socket, unadvertise ROS topic */
173 void stop_recv(int i)
174 {
175  recv_thread_flags[i] = false; // finish recv_func()
176  // receivers[i]->disconnect(std::string &endpoint);
177  receivers[i]->close(); // close the receive socket
178  topic_pubs[i].shutdown(); // unadvertise
179 }
180 
181 //TODO: generate or delete topic message transfers through a remote zmq service.
182 
183 int main(int argc, char **argv)
184 {
185  ros::init(argc, argv, "swarm_bridge");
186  ros::NodeHandle nh("~");
187  ros::NodeHandle nh_public;
188  ns = ros::this_node::getNamespace(); // namespace of this node
189 
190  std::cout << "--------[bridge_node]-------" << std::endl;
191  std::cout << "namespaces=" << ns << std::endl;
192 
193  //************************ Parse configuration file **************************
194  // get hostnames and IPs
195  if (nh.getParam("IP", ip_xml) == false){
196  ROS_ERROR("[bridge node] No IP found in the configuration!");
197  return 1;
198  }
199  // get "send topics" params (topic_name, topic_type, IP, port)
200  if (nh.getParam("send_topics", send_topics_xml)){
203  }
204  else{
205  ROS_WARN("[bridge node] No send_topics found in the configuration!");
206  len_send = 0;
207  }
208  // get "receive topics" params (topic_name, topic_type, IP, port)
209  if (nh.getParam("recv_topics", recv_topics_xml)){
212  }
213  else{
214  ROS_WARN("[bridge node] No recv_topics found in the configuration!");
215  len_recv = 0;
216  }
217 
218  if (len_send > SUB_MAX)
219  {
220  ROS_FATAL("[bridge_node] The number of send topics in configuration exceeds the limit %d!", SUB_MAX);
221  return 2;
222  }
223 
224  std::cout << "-------------IP------------" << std::endl;
225  for (auto iter = ip_xml.begin(); iter != ip_xml.end(); ++iter)
226  {
227  std::string host_name = iter->first;
228  std::string host_ip = iter->second;
229  std::cout << host_name << " : " << host_ip << std::endl;
230  if (ip_map.find(host_name) != ip_map.end())
231  { // ip_xml will never contain same names actually.
232  ROS_WARN("[bridge node] IPs with the same name in configuration %s!", host_name.c_str());
233  }
234  ip_map[host_name] = host_ip;
235  }
236 
237  std::cout << "--------send topics--------" << std::endl;
238  std::set<int> srcPorts; // for duplicate check
239  for (int32_t i=0; i < len_send; ++i)
240  {
242  XmlRpc::XmlRpcValue send_topic_xml = send_topics_xml[i];
243  std::string topic_name = send_topic_xml["topic_name"];
244  std::string msg_type = send_topic_xml["msg_type"];
245  int max_freq = send_topic_xml["max_freq"];
246  std::string srcIP = ip_map[send_topic_xml["srcIP"]];
247  int srcPort = send_topic_xml["srcPort"];
248  TopicInfo topic = {.name=topic_name, .type=msg_type, .max_freq=max_freq, .ip=srcIP, .port=srcPort};
249  sendTopics.emplace_back(topic);
250  // check for duplicate ports:
251  if (srcPorts.find(srcPort) != srcPorts.end()) {
252  ROS_FATAL("[bridge_node] Send topics with the same srcPort %d in configuration!", srcPort);
253  return 3;
254  }
255  srcPorts.insert(srcPort); // for duplicate check
256  if (topic.name.at(0) != '/') {
257  std::cout << ns;
258  if (ns != "/") {std::cout << "/";}
259  } // print namespace prefix if topic.name is not global
260  std::cout << topic.name << " " << topic.max_freq << "Hz(max)" << std::endl;
261  }
262 
263  std::cout << "-------receive topics------" << std::endl;
264  for (int32_t i=0; i < len_recv; ++i)
265  {
267  XmlRpc::XmlRpcValue recv_topic_xml = recv_topics_xml[i];
268  std::string topic_name = recv_topic_xml["topic_name"];
269  std::string msg_type = recv_topic_xml["msg_type"];
270  int max_freq = recv_topic_xml["max_freq"];
271  std::string srcIP = ip_map[recv_topic_xml["srcIP"]];
272  int srcPort = recv_topic_xml["srcPort"];
273  TopicInfo topic = {.name=topic_name, .type=msg_type, .max_freq=max_freq, .ip=srcIP, .port=srcPort};
274  recvTopics.emplace_back(topic);
275  if (topic.name.at(0) != '/') {
276  std::cout << ns;
277  if (ns != "/") {std::cout << "/";}
278  } // print namespace prefix if topic.name is not global
279  std::cout << topic.name << " (from " << recv_topic_xml["srcIP"] << ")" << std::endl;
280  }
281 
282  // ********************* zmq socket initialize ***************************
283  // send sockets (zmq socket PUB mode)
284  for (int32_t i=0; i < len_send; ++i)
285  {
286  const std::string url = "tcp://" + sendTopics[i].ip + ":" + std::to_string(sendTopics[i].port);
287  std::unique_ptr<zmqpp::socket> sender(new zmqpp::socket(context, zmqpp::socket_type::pub));
288  sender->bind(url);
289  senders.emplace_back(std::move(sender)); //sender is now released by std::move
290  }
291 
292  // receive sockets (zmq socket SUB mode)
293  for (int32_t i=0; i < len_recv; ++i)
294  {
295  const std::string url = "tcp://" + recvTopics[i].ip + ":" + std::to_string(recvTopics[i].port);
296  std::string const zmq_topic = ""; // "" means all zmq topic
297  std::unique_ptr<zmqpp::socket> receiver(new zmqpp::socket(context, zmqpp::socket_type::sub));
298  receiver->subscribe(zmq_topic);
299  receiver->connect(url);
300  receivers.emplace_back(std::move(receiver));
301  }
302 
303 
304  // ******************* ROS subscribe and publish *************************
305  //ROS topic subsrcibe and send
306  for (int32_t i=0; i < len_send; ++i)
307  {
308  sub_t_last.emplace_back(ros::Time::now()); // freq control period start time
309  send_num.emplace_back(0); // the send messages number in a period
310  ros::Subscriber subscriber;
311  // The uniform callback function is sub_cb()
312  subscriber = topic_subscriber(sendTopics[i].name, sendTopics[i].type, nh_public, i);
313  topic_subs.emplace_back(subscriber);
314  // use topic_subs[i].shutdown() to unsubscribe
315  }
316 
317  // ROS topic receive and publish
318  for (int32_t i=0; i < len_recv; ++i)
319  {
320  ros::Publisher publisher;
321  publisher = topic_publisher(recvTopics[i].name, recvTopics[i].type, nh_public);
322  topic_pubs.emplace_back(publisher);
323  }
324 
325  // ****************** launch receive threads *****************************
326  for (int32_t i=0; i < len_recv; ++i)
327  {
328  recv_thread_flags.emplace_back(true); // enable receive thread flags
329  recv_flags_last.emplace_back(false); // receive success flag
330  recv_threads.emplace_back(std::thread(&recv_func, i));
331  }
332 
333  ros::spin();
334 
335  // ***************** stop send/receive ******************************
336  for (int32_t i=0; i < len_send; ++i){
337  stop_send(i);
338  }
339 
340  for (int32_t i=0; i < len_recv; ++i){
341  stop_recv(i);
342  }
343 
344  return 0;
345 }
#define ROS_FATAL(...)
std::vector< std::unique_ptr< zmqpp::socket > > senders
Definition: bridge_node.hpp:61
std::vector< TopicInfo > recvTopics
Definition: bridge_node.hpp:57
int len_send
Definition: bridge_node.hpp:51
int len_recv
Definition: bridge_node.hpp:52
void deserialize_publish(uint8_t *buffer_ptr, size_t msg_size, std::string msg_type, int i)
std::vector< ros::Subscriber > topic_subs
Definition: bridge_node.hpp:65
std::vector< std::unique_ptr< zmqpp::socket > > receivers
Definition: bridge_node.hpp:62
ROSCPP_DECL void init(int &argc, char **argv, const std::string &name, uint32_t options=0)
ros::Subscriber topic_subscriber(std::string topic_name, std::string msg_type, ros::NodeHandle nh, int i)
Definition: ros_sub_pub.hpp:90
std::vector< int > send_num
Definition: bridge_node.hpp:70
void stop_recv(int i)
XmlRpc::XmlRpcValue recv_topics_xml
Definition: bridge_node.hpp:50
#define ROS_WARN(...)
Header file of bridge_node.cpp.
XmlRpc::XmlRpcValue ip_xml
Definition: bridge_node.hpp:48
std::vector< ros::Publisher > topic_pubs
Definition: bridge_node.hpp:66
Type const & getType() const
std::string name
Definition: bridge_node.hpp:39
XmlRpc::XmlRpcValue send_topics_xml
Definition: bridge_node.hpp:49
void deserialize_pub(uint8_t *buffer_ptr, size_t msg_size, int i)
ROSCPP_DECL const std::string & getNamespace()
#define ROS_INFO(...)
bool getParam(const std::string &key, std::string &s) const
#define SUB_MAX
Definition: ros_sub_pub.hpp:48
std::vector< ros::Time > sub_t_last
Definition: bridge_node.hpp:69
ROSCPP_DECL void spin()
std::string ns
Definition: bridge_node.hpp:47
void recv_func(int i)
std::vector< bool > recv_flags_last
Definition: bridge_node.hpp:75
int main(int argc, char **argv)
zmqpp::context_t context
Definition: bridge_node.hpp:60
std::map< std::string, std::string > ip_map
Definition: bridge_node.hpp:54
def msg_type(f)
ros::Publisher topic_publisher(std::string topic_name, std::string msg_type, ros::NodeHandle nh)
std::vector< bool > recv_thread_flags
Definition: bridge_node.hpp:74
void sub_cb(const T &msg)
Definition: bridge_node.cpp:65
static Time now()
std::vector< TopicInfo > sendTopics
Definition: bridge_node.hpp:56
#define ROS_ASSERT(cond)
bool send_freq_control(int i)
Definition: bridge_node.cpp:43
void stop_send(int i)
#define ROS_ERROR(...)
std::vector< std::thread > recv_threads
Definition: bridge_node.hpp:76


swarm_ros_bridge
Author(s):
autogenerated on Tue Feb 21 2023 03:43:52