bt_zmq_publisher.cpp
Go to the documentation of this file.
1 #include <future>
4 #include "cppzmq/zmq.hpp"
5 
6 namespace BT
7 {
8 std::atomic<bool> PublisherZMQ::ref_count(false);
9 
11 {
12  Pimpl() : context(1), publisher(context, ZMQ_PUB), server(context, ZMQ_REP)
13  {}
14 
18 };
19 
20 PublisherZMQ::PublisherZMQ(const BT::Tree& tree, unsigned max_msg_per_second,
21  unsigned publisher_port, unsigned server_port) :
22  StatusChangeLogger(tree.rootNode()),
23  tree_(tree),
24  min_time_between_msgs_(std::chrono::microseconds(1000 * 1000) / max_msg_per_second),
25  send_pending_(false),
26  zmq_(new Pimpl())
27 {
28  bool expected = false;
29  if (!ref_count.compare_exchange_strong(expected, true))
30  {
31  throw LogicError("Only one instance of PublisherZMQ shall be created");
32  }
33  if (publisher_port == server_port)
34  {
35  throw LogicError("The TCP ports of the publisher and the server must be "
36  "different");
37  }
38 
39  flatbuffers::FlatBufferBuilder builder(1024);
40  CreateFlatbuffersBehaviorTree(builder, tree);
41 
42  tree_buffer_.resize(builder.GetSize());
43  memcpy(tree_buffer_.data(), builder.GetBufferPointer(), builder.GetSize());
44 
45  char str[100];
46 
47  sprintf(str, "tcp://*:%d", publisher_port);
48  zmq_->publisher.bind(str);
49  sprintf(str, "tcp://*:%d", server_port);
50  zmq_->server.bind(str);
51 
52  int timeout_ms = 100;
53  zmq_->server.set(zmq::sockopt::rcvtimeo, timeout_ms);
54 
55  active_server_ = true;
56 
57  thread_ = std::thread([this]() {
58  while (active_server_)
59  {
60  zmq::message_t req;
61  try
62  {
63  zmq::recv_result_t received = zmq_->server.recv(req);
64  if (received)
65  {
66  zmq::message_t reply(tree_buffer_.size());
67  memcpy(reply.data(), tree_buffer_.data(), tree_buffer_.size());
68  zmq_->server.send(reply, zmq::send_flags::none);
69  }
70  }
71  catch (zmq::error_t& err)
72  {
73  if (err.num() == ETERM)
74  {
75  std::cout << "[PublisherZMQ] Server quitting." << std::endl;
76  }
77  std::cout << "[PublisherZMQ] just died. Exception " << err.what() << std::endl;
78  active_server_ = false;
79  }
80  }
81  });
82 
84 }
85 
87 {
88  active_server_ = false;
89  if (thread_.joinable())
90  {
91  thread_.join();
92  }
93  if (send_pending_)
94  {
95  send_condition_variable_.notify_all();
96  send_future_.get();
97  }
98  flush();
100  delete zmq_;
101  ref_count = false;
102 }
103 
105 {
106  status_buffer_.clear();
107  applyRecursiveVisitor(tree_.rootNode(), [this](TreeNode* node) {
108  size_t index = status_buffer_.size();
109  status_buffer_.resize(index + 3);
110  flatbuffers::WriteScalar<uint16_t>(&status_buffer_[index], node->UID());
111  flatbuffers::WriteScalar<int8_t>(
112  &status_buffer_[index + 2],
113  static_cast<int8_t>(convertToFlatbuffers(node->status())));
114  });
115 }
116 
117 void PublisherZMQ::callback(Duration timestamp, const TreeNode& node,
118  NodeStatus prev_status, NodeStatus status)
119 {
120  SerializedTransition transition =
121  SerializeTransition(node.UID(), timestamp, prev_status, status);
122  {
123  std::unique_lock<std::mutex> lock(mutex_);
124  transition_buffer_.push_back(transition);
125  }
126 
127  if (!send_pending_.exchange(true))
128  {
129  send_future_ = std::async(std::launch::async, [this]() {
130  std::unique_lock<std::mutex> lock(mutex_);
131  const bool is_server_inactive = send_condition_variable_.wait_for(
132  lock, min_time_between_msgs_, [this]() { return !active_server_; });
133  lock.unlock();
134  if (!is_server_inactive)
135  {
136  flush();
137  }
138  });
139  }
140 }
141 
143 {
144  zmq::message_t message;
145  {
146  std::unique_lock<std::mutex> lock(mutex_);
147 
148  const size_t msg_size = status_buffer_.size() + 8 + (transition_buffer_.size() * 12);
149 
150  message.rebuild(msg_size);
151  uint8_t* data_ptr = static_cast<uint8_t*>(message.data());
152 
153  // first 4 bytes are the side of the header
154  flatbuffers::WriteScalar<uint32_t>(data_ptr,
155  static_cast<uint32_t>(status_buffer_.size()));
156  data_ptr += sizeof(uint32_t);
157  // copy the header part
158  memcpy(data_ptr, status_buffer_.data(), status_buffer_.size());
159  data_ptr += status_buffer_.size();
160 
161  // first 4 bytes are the side of the transition buffer
162  flatbuffers::WriteScalar<uint32_t>(data_ptr,
163  static_cast<uint32_t>(transition_buffer_.size()));
164  data_ptr += sizeof(uint32_t);
165 
166  for (auto& transition : transition_buffer_)
167  {
168  memcpy(data_ptr, transition.data(), transition.size());
169  data_ptr += transition.size();
170  }
171  transition_buffer_.clear();
173  }
174  try
175  {
176  zmq_->publisher.send(message, zmq::send_flags::none);
177  }
178  catch (zmq::error_t& err)
179  {
180  if (err.num() == ETERM)
181  {
182  std::cout << "[PublisherZMQ] Publisher quitting." << std::endl;
183  }
184  std::cout << "[PublisherZMQ] just died. Exception " << err.what() << std::endl;
185  }
186 
187  send_pending_ = false;
188  // printf("%.3f zmq send\n", std::chrono::duration<double>( std::chrono::high_resolution_clock::now().time_since_epoch() ).count());
189 }
190 } // namespace BT
uint16_t UID() const
Definition: tree_node.cpp:117
std::vector< uint8_t > status_buffer_
virtual const char * what() const ZMQ_NOTHROW ZMQ_OVERRIDE
std::vector< uint8_t > tree_buffer_
uint8_t * GetBufferPointer() const
Get the serialized buffer (after you call Finish()).
Definition: flatbuffers.h:1559
std::atomic_bool active_server_
Definition: any.hpp:455
size_t send(const void *buf_, size_t len_, int flags_=0)
static std::atomic< bool > ref_count
virtual void flush() override
void CreateFlatbuffersBehaviorTree(flatbuffers::FlatBufferBuilder &builder, const BT::Tree &tree)
std::array< uint8_t, 12 > SerializedTransition
const BT::Tree & tree_
TreeNode * rootNode() const
Definition: bt_factory.h:181
void * data() ZMQ_NOTHROW
Helper class to hold data needed in creation of a FlatBuffer. To serialize data, you typically call o...
Definition: flatbuffers.h:1442
std::future< void > send_future_
Serialization::NodeType convertToFlatbuffers(BT::NodeType type)
virtual void callback(Duration timestamp, const TreeNode &node, NodeStatus prev_status, NodeStatus status) override
int num() const ZMQ_NOTHROW
Struct used to store a tree. If this object goes out of scope, the tree is destroyed.
Definition: bt_factory.h:125
Abstract base class for Behavior Tree Nodes.
Definition: tree_node.h:55
void shutdown() ZMQ_NOTHROW
std::chrono::microseconds min_time_between_msgs_
void bind(std::string const &addr)
uoffset_t GetSize() const
The current size of the serialized buffer, counting from the end.
Definition: flatbuffers.h:1551
std::thread thread_
PublisherZMQ(const BT::Tree &tree, unsigned max_msg_per_second=25, unsigned publisher_port=1666, unsigned server_port=1667)
std::condition_variable send_condition_variable_
NodeStatus
Definition: basic_types.h:35
std::vector< SerializedTransition > transition_buffer_
std::chrono::high_resolution_clock::duration Duration
Definition: basic_types.h:362
std::atomic_bool send_pending_
void applyRecursiveVisitor(const TreeNode *root_node, const std::function< void(const TreeNode *)> &visitor)
SerializedTransition SerializeTransition(uint16_t UID, Duration timestamp, NodeStatus prev_status, NodeStatus status)


behaviortree_cpp_v3
Author(s): Michele Colledanchise, Davide Faconti
autogenerated on Mon Jul 3 2023 02:50:14