bt_zmq_publisher.cpp
Go to the documentation of this file.
1 #include <future>
4 #include "cppzmq/zmq.hpp"
5 
6 namespace BT
7 {
8 std::atomic<bool> PublisherZMQ::ref_count(false);
9 
11 {
12  Pimpl() : context(1), publisher(context, ZMQ_PUB), server(context, ZMQ_REP)
13  {}
14 
18 };
19 
20 PublisherZMQ::PublisherZMQ(const BT::Tree& tree, unsigned max_msg_per_second,
21  unsigned publisher_port, unsigned server_port) :
22  StatusChangeLogger(tree.rootNode()),
23  tree_(tree),
24  min_time_between_msgs_(std::chrono::microseconds(1000 * 1000) / max_msg_per_second),
25  send_pending_(false),
26  zmq_(new Pimpl())
27 {
28  bool expected = false;
29  if (!ref_count.compare_exchange_strong(expected, true))
30  {
31  throw LogicError("Only one instance of PublisherZMQ shall be created");
32  }
33  if (publisher_port == server_port)
34  {
35  throw LogicError("The TCP ports of the publisher and the server must be "
36  "different");
37  }
38 
39  flatbuffers::FlatBufferBuilder builder(1024);
40  CreateFlatbuffersBehaviorTree(builder, tree);
41 
42  tree_buffer_.resize(builder.GetSize());
43  memcpy(tree_buffer_.data(), builder.GetBufferPointer(), builder.GetSize());
44 
45  char str[100];
46 
47  sprintf(str, "tcp://*:%d", publisher_port);
48  zmq_->publisher.bind(str);
49  sprintf(str, "tcp://*:%d", server_port);
50  zmq_->server.bind(str);
51 
52  int timeout_ms = 100;
53  zmq_->server.set(zmq::sockopt::rcvtimeo, timeout_ms);
54 
55  active_server_ = true;
56 
57  thread_ = std::thread([this]() {
58  while (active_server_)
59  {
60  zmq::message_t req;
61  try
62  {
63  zmq::recv_result_t received = zmq_->server.recv(req);
64  if (received)
65  {
66  zmq::message_t reply(tree_buffer_.size());
67  memcpy(reply.data(), tree_buffer_.data(), tree_buffer_.size());
68  zmq_->server.send(reply, zmq::send_flags::none);
69  }
70  }
71  catch (zmq::error_t& err)
72  {
73  if (err.num() == ETERM)
74  {
75  std::cout << "[PublisherZMQ] Server quitting." << std::endl;
76  }
77  std::cout << "[PublisherZMQ] just died. Exception " << err.what() << std::endl;
78  active_server_ = false;
79  }
80  }
81  });
82 
84 }
85 
87 {
88  active_server_ = false;
89  if (thread_.joinable())
90  {
91  thread_.join();
92  }
93  if (send_pending_)
94  {
95  send_condition_variable_.notify_all();
96  send_future_.get();
97  }
98  flush();
100  delete zmq_;
101  ref_count = false;
102 }
103 
105 {
106  status_buffer_.clear();
107  applyRecursiveVisitor(tree_.rootNode(), [this](TreeNode* node) {
108  size_t index = status_buffer_.size();
109  status_buffer_.resize(index + 3);
110  flatbuffers::WriteScalar<uint16_t>(&status_buffer_[index], node->UID());
111  flatbuffers::WriteScalar<int8_t>(
112  &status_buffer_[index + 2],
113  static_cast<int8_t>(convertToFlatbuffers(node->status())));
114  });
115 }
116 
117 void PublisherZMQ::callback(Duration timestamp, const TreeNode& node,
118  NodeStatus prev_status, NodeStatus status)
119 {
120  SerializedTransition transition =
121  SerializeTransition(node.UID(), timestamp, prev_status, status);
122  {
123  std::unique_lock<std::mutex> lock(mutex_);
124  transition_buffer_.push_back(transition);
125  }
126 
127  if (!send_pending_.exchange(true))
128  {
129  send_future_ = std::async(std::launch::async, [this]() {
130  std::unique_lock<std::mutex> lock(mutex_);
131  const bool is_server_inactive = send_condition_variable_.wait_for(
132  lock, min_time_between_msgs_, [this]() { return !active_server_; });
133  lock.unlock();
134  if (!is_server_inactive)
135  {
136  flush();
137  }
138  });
139  }
140 }
141 
143 {
144  zmq::message_t message;
145  {
146  std::unique_lock<std::mutex> lock(mutex_);
147 
148  const size_t msg_size = status_buffer_.size() + 8 + (transition_buffer_.size() * 12);
149 
150  message.rebuild(msg_size);
151  uint8_t* data_ptr = static_cast<uint8_t*>(message.data());
152 
153  // first 4 bytes are the side of the header
154  flatbuffers::WriteScalar<uint32_t>(data_ptr,
155  static_cast<uint32_t>(status_buffer_.size()));
156  data_ptr += sizeof(uint32_t);
157  // copy the header part
158  memcpy(data_ptr, status_buffer_.data(), status_buffer_.size());
159  data_ptr += status_buffer_.size();
160 
161  // first 4 bytes are the side of the transition buffer
162  flatbuffers::WriteScalar<uint32_t>(data_ptr,
163  static_cast<uint32_t>(transition_buffer_.size()));
164  data_ptr += sizeof(uint32_t);
165 
166  for (auto& transition : transition_buffer_)
167  {
168  memcpy(data_ptr, transition.data(), transition.size());
169  data_ptr += transition.size();
170  }
171  transition_buffer_.clear();
173  }
174  try
175  {
176  zmq_->publisher.send(message, zmq::send_flags::none);
177  }
178  catch (zmq::error_t& err)
179  {
180  if (err.num() == ETERM)
181  {
182  std::cout << "[PublisherZMQ] Publisher quitting." << std::endl;
183  }
184  std::cout << "[PublisherZMQ] just died. Exception " << err.what() << std::endl;
185  }
186 
187  send_pending_ = false;
188  // printf("%.3f zmq send\n", std::chrono::duration<double>( std::chrono::high_resolution_clock::now().time_since_epoch() ).count());
189 }
190 } // namespace BT
BT
Definition: ex01_wrap_legacy.cpp:29
BT::PublisherZMQ::active_server_
std::atomic_bool active_server_
Definition: bt_zmq_publisher.h:32
bt_zmq_publisher.h
BT::TreeNode::UID
uint16_t UID() const
Definition: tree_node.cpp:117
zmq::message_t
Definition: 3rdparty/cppzmq/zmq.hpp:394
BT::PublisherZMQ::~PublisherZMQ
virtual ~PublisherZMQ()
Definition: bt_zmq_publisher.cpp:86
BT::PublisherZMQ::flush
virtual void flush() override
Definition: bt_zmq_publisher.cpp:142
flatbuffers::FlatBufferBuilder
Helper class to hold data needed in creation of a FlatBuffer. To serialize data, you typically call o...
Definition: flatbuffers.h:1442
zmq::socket_t
Definition: 3rdparty/cppzmq/zmq.hpp:2128
BT::PublisherZMQ::Pimpl::publisher
zmq::socket_t publisher
Definition: bt_zmq_publisher.cpp:16
zmq::message_t::data
void * data() ZMQ_NOTHROW
Definition: 3rdparty/cppzmq/zmq.hpp:574
BT::TreeNode
Abstract base class for Behavior Tree Nodes.
Definition: tree_node.h:55
flatbuffers::FlatBufferBuilder::GetBufferPointer
uint8_t * GetBufferPointer() const
Get the serialized buffer (after you call Finish()).
Definition: flatbuffers.h:1559
BT::PublisherZMQ::Pimpl::server
zmq::socket_t server
Definition: bt_zmq_publisher.cpp:17
BT::Tree
Struct used to store a tree. If this object goes out of scope, the tree is destroyed.
Definition: bt_factory.h:125
BT::PublisherZMQ::thread_
std::thread thread_
Definition: bt_zmq_publisher.h:33
BT::PublisherZMQ::zmq_
Pimpl * zmq_
Definition: bt_zmq_publisher.h:43
zmq::detail::socket_base::send
size_t send(const void *buf_, size_t len_, int flags_=0)
Definition: 3rdparty/cppzmq/zmq.hpp:1837
BT::LogicError
Definition: exceptions.h:45
BT::StatusChangeLogger
Definition: abstract_logger.h:17
BT::PublisherZMQ::Pimpl
Definition: bt_zmq_publisher.cpp:10
BT::PublisherZMQ::min_time_between_msgs_
std::chrono::microseconds min_time_between_msgs_
Definition: bt_zmq_publisher.h:30
BT::PublisherZMQ::PublisherZMQ
PublisherZMQ(const BT::Tree &tree, unsigned max_msg_per_second=25, unsigned publisher_port=1666, unsigned server_port=1667)
Definition: bt_zmq_publisher.cpp:20
BT::SerializeTransition
SerializedTransition SerializeTransition(uint16_t UID, Duration timestamp, NodeStatus prev_status, NodeStatus status)
Definition: bt_flatbuffer_helper.h:139
BT::PublisherZMQ::transition_buffer_
std::vector< SerializedTransition > transition_buffer_
Definition: bt_zmq_publisher.h:29
zmq::context_t
Definition: 3rdparty/cppzmq/zmq.hpp:779
BT::PublisherZMQ::Pimpl::context
zmq::context_t context
Definition: bt_zmq_publisher.cpp:15
BT::PublisherZMQ::send_pending_
std::atomic_bool send_pending_
Definition: bt_zmq_publisher.h:39
BT::SerializedTransition
std::array< uint8_t, 12 > SerializedTransition
Definition: bt_flatbuffer_helper.h:9
BT::PublisherZMQ::send_condition_variable_
std::condition_variable send_condition_variable_
Definition: bt_zmq_publisher.h:40
BT::PublisherZMQ::createStatusBuffer
void createStatusBuffer()
Definition: bt_zmq_publisher.cpp:104
BT::CreateFlatbuffersBehaviorTree
void CreateFlatbuffersBehaviorTree(flatbuffers::FlatBufferBuilder &builder, const BT::Tree &tree)
Definition: bt_flatbuffer_helper.h:61
BT::Duration
std::chrono::high_resolution_clock::duration Duration
Definition: basic_types.h:371
BT::PublisherZMQ::Pimpl::Pimpl
Pimpl()
Definition: bt_zmq_publisher.cpp:12
zmq::error_t::num
int num() const ZMQ_NOTHROW
Definition: 3rdparty/cppzmq/zmq.hpp:296
flatbuffers::FlatBufferBuilder::GetSize
uoffset_t GetSize() const
The current size of the serialized buffer, counting from the end.
Definition: flatbuffers.h:1551
BT::PublisherZMQ::tree_
const BT::Tree & tree_
Definition: bt_zmq_publisher.h:26
zmq::context_t::shutdown
void shutdown() ZMQ_NOTHROW
Definition: 3rdparty/cppzmq/zmq.hpp:864
zmq::message_t::rebuild
void rebuild()
Definition: 3rdparty/cppzmq/zmq.hpp:498
BT::applyRecursiveVisitor
void applyRecursiveVisitor(const TreeNode *root_node, const std::function< void(const TreeNode *)> &visitor)
Definition: behavior_tree.cpp:18
bt_flatbuffer_helper.h
zmq.hpp
std
Definition: any.hpp:455
BT::Tree::rootNode
TreeNode * rootNode() const
Definition: bt_factory.h:181
BT::PublisherZMQ::ref_count
static std::atomic< bool > ref_count
Definition: bt_zmq_publisher.h:12
zmq::detail::socket_base::bind
void bind(std::string const &addr)
Definition: 3rdparty/cppzmq/zmq.hpp:1797
BT::PublisherZMQ::send_future_
std::future< void > send_future_
Definition: bt_zmq_publisher.h:41
BT::PublisherZMQ::status_buffer_
std::vector< uint8_t > status_buffer_
Definition: bt_zmq_publisher.h:28
BT::PublisherZMQ::mutex_
std::mutex mutex_
Definition: bt_zmq_publisher.h:38
zmq::error_t::what
virtual const char * what() const ZMQ_NOTHROW ZMQ_OVERRIDE
Definition: 3rdparty/cppzmq/zmq.hpp:292
BT::NodeStatus
NodeStatus
Definition: basic_types.h:35
BT::PublisherZMQ::tree_buffer_
std::vector< uint8_t > tree_buffer_
Definition: bt_zmq_publisher.h:27
zmq::error_t
Definition: 3rdparty/cppzmq/zmq.hpp:287
BT::PublisherZMQ::callback
virtual void callback(Duration timestamp, const TreeNode &node, NodeStatus prev_status, NodeStatus status) override
Definition: bt_zmq_publisher.cpp:117


behaviortree_cpp_v3
Author(s): Michele Colledanchise, Davide Faconti
autogenerated on Wed Jun 26 2024 02:51:19