64 template <
typename T,
int i>
75 size_t data_len = ser::serializationLength(msg);
76 std::unique_ptr<uint8_t> send_buffer(
new uint8_t[data_len]);
77 ser::OStream stream(send_buffer.get(), data_len);
78 ser::serialize(stream, msg);
81 zmqpp::message send_array;
82 send_array << data_len;
86 send_array.add_raw(reinterpret_cast<void const *>(send_buffer.get()), data_len);
89 bool dont_block =
false;
90 senders[i]->send(send_array, dont_block);
105 ser::IStream stream(buffer_ptr, msg_size);
106 ser::deserialize(stream, msg);
118 zmqpp::message recv_array;
122 bool dont_block =
false;
123 if (recv_flag =
receivers[i]->receive(recv_array, dont_block))
127 recv_array >> data_len;
135 std::unique_ptr<uint8_t> recv_buffer(
new uint8_t[data_len]);
137 memcpy(recv_buffer.get(),
static_cast<const uint8_t *
>(recv_array.raw_data(recv_array.read_cursor())), data_len);
146 std::this_thread::sleep_for(std::chrono::microseconds(1000));
152 if (topicName.at(0) !=
'/') {
153 if (
ns ==
"/") {topicName =
"/" + topicName;}
154 else {topicName =
ns +
"/" + topicName;}
156 ROS_INFO(
"[bridge node] \"%s\" received!", topicName.c_str());
183 int main(
int argc,
char **argv)
190 std::cout <<
"--------[bridge_node]-------" << std::endl;
191 std::cout <<
"namespaces=" <<
ns << std::endl;
196 ROS_ERROR(
"[bridge node] No IP found in the configuration!");
205 ROS_WARN(
"[bridge node] No send_topics found in the configuration!");
214 ROS_WARN(
"[bridge node] No recv_topics found in the configuration!");
220 ROS_FATAL(
"[bridge_node] The number of send topics in configuration exceeds the limit %d!",
SUB_MAX);
224 std::cout <<
"-------------IP------------" << std::endl;
227 std::string host_name = iter->first;
228 std::string host_ip = iter->second;
229 std::cout << host_name <<
" : " << host_ip << std::endl;
232 ROS_WARN(
"[bridge node] IPs with the same name in configuration %s!", host_name.c_str());
234 ip_map[host_name] = host_ip;
237 std::cout <<
"--------send topics--------" << std::endl;
238 std::set<int> srcPorts;
239 for (int32_t i=0; i <
len_send; ++i)
243 std::string topic_name = send_topic_xml[
"topic_name"];
244 std::string
msg_type = send_topic_xml[
"msg_type"];
245 int max_freq = send_topic_xml[
"max_freq"];
246 std::string srcIP =
ip_map[send_topic_xml[
"srcIP"]];
247 int srcPort = send_topic_xml[
"srcPort"];
248 TopicInfo topic = {.
name=topic_name, .type=msg_type, .max_freq=max_freq, .ip=srcIP, .port=srcPort};
251 if (srcPorts.find(srcPort) != srcPorts.end()) {
252 ROS_FATAL(
"[bridge_node] Send topics with the same srcPort %d in configuration!", srcPort);
255 srcPorts.insert(srcPort);
256 if (topic.
name.at(0) !=
'/') {
258 if (ns !=
"/") {std::cout <<
"/";}
260 std::cout << topic.
name <<
" " << topic.
max_freq <<
"Hz(max)" << std::endl;
263 std::cout <<
"-------receive topics------" << std::endl;
264 for (int32_t i=0; i <
len_recv; ++i)
268 std::string topic_name = recv_topic_xml[
"topic_name"];
269 std::string
msg_type = recv_topic_xml[
"msg_type"];
270 int max_freq = recv_topic_xml[
"max_freq"];
271 std::string srcIP =
ip_map[recv_topic_xml[
"srcIP"]];
272 int srcPort = recv_topic_xml[
"srcPort"];
273 TopicInfo topic = {.
name=topic_name, .type=msg_type, .max_freq=max_freq, .ip=srcIP, .port=srcPort};
275 if (topic.
name.at(0) !=
'/') {
277 if (ns !=
"/") {std::cout <<
"/";}
279 std::cout << topic.
name <<
" (from " << recv_topic_xml[
"srcIP"] <<
")" << std::endl;
284 for (int32_t i=0; i <
len_send; ++i)
287 std::unique_ptr<zmqpp::socket> sender(
new zmqpp::socket(
context, zmqpp::socket_type::pub));
289 senders.emplace_back(std::move(sender));
293 for (int32_t i=0; i <
len_recv; ++i)
296 std::string
const zmq_topic =
"";
297 std::unique_ptr<zmqpp::socket> receiver(
new zmqpp::socket(
context, zmqpp::socket_type::sub));
298 receiver->subscribe(zmq_topic);
299 receiver->connect(url);
300 receivers.emplace_back(std::move(receiver));
306 for (int32_t i=0; i <
len_send; ++i)
318 for (int32_t i=0; i <
len_recv; ++i)
326 for (int32_t i=0; i <
len_recv; ++i)
336 for (int32_t i=0; i <
len_send; ++i){
340 for (int32_t i=0; i <
len_recv; ++i){
std::vector< std::unique_ptr< zmqpp::socket > > senders
std::vector< TopicInfo > recvTopics
void deserialize_publish(uint8_t *buffer_ptr, size_t msg_size, std::string msg_type, int i)
std::vector< ros::Subscriber > topic_subs
std::vector< std::unique_ptr< zmqpp::socket > > receivers
ROSCPP_DECL void init(int &argc, char **argv, const std::string &name, uint32_t options=0)
ros::Subscriber topic_subscriber(std::string topic_name, std::string msg_type, ros::NodeHandle nh, int i)
std::vector< int > send_num
XmlRpc::XmlRpcValue recv_topics_xml
Header file of bridge_node.cpp.
XmlRpc::XmlRpcValue ip_xml
std::vector< ros::Publisher > topic_pubs
Type const & getType() const
XmlRpc::XmlRpcValue send_topics_xml
void deserialize_pub(uint8_t *buffer_ptr, size_t msg_size, int i)
ROSCPP_DECL const std::string & getNamespace()
bool getParam(const std::string &key, std::string &s) const
std::vector< ros::Time > sub_t_last
std::vector< bool > recv_flags_last
int main(int argc, char **argv)
std::map< std::string, std::string > ip_map
ros::Publisher topic_publisher(std::string topic_name, std::string msg_type, ros::NodeHandle nh)
std::vector< bool > recv_thread_flags
void sub_cb(const T &msg)
std::vector< TopicInfo > sendTopics
bool send_freq_control(int i)
std::vector< std::thread > recv_threads