bt_zmq_publisher.cpp
Go to the documentation of this file.
1 #include <future>
4 #include "cppzmq/zmq.hpp"
5 
6 namespace BT
7 {
8 std::atomic<bool> PublisherZMQ::ref_count(false);
9 
11 {
12  Pimpl():
13  context(1)
14  , publisher(context, ZMQ_PUB)
15  , server(context, ZMQ_REP)
16  {}
17 
21 };
22 
23 
25  unsigned max_msg_per_second,
26  unsigned publisher_port,
27  unsigned server_port)
28  : StatusChangeLogger(tree.rootNode())
29  , tree_(tree)
30  , min_time_between_msgs_(std::chrono::microseconds(1000 * 1000) / max_msg_per_second)
31  , send_pending_(false)
32  , zmq_(new Pimpl())
33 {
34  bool expected = false;
35  if (!ref_count.compare_exchange_strong(expected, true))
36  {
37  throw LogicError("Only one instance of PublisherZMQ shall be created");
38  }
39  if( publisher_port == server_port)
40  {
41  throw LogicError("The TCP ports of the publisher and the server must be different");
42  }
43 
44  flatbuffers::FlatBufferBuilder builder(1024);
45  CreateFlatbuffersBehaviorTree(builder, tree);
46 
47  tree_buffer_.resize(builder.GetSize());
48  memcpy(tree_buffer_.data(), builder.GetBufferPointer(), builder.GetSize());
49 
50  char str[100];
51 
52  sprintf(str, "tcp://*:%d", publisher_port);
53  zmq_->publisher.bind(str);
54  sprintf(str, "tcp://*:%d", server_port);
55  zmq_->server.bind(str);
56 
57  int timeout_ms = 100;
58  zmq_->server.set(zmq::sockopt::rcvtimeo, timeout_ms);
59 
60  active_server_ = true;
61 
62  thread_ = std::thread([this]() {
63  while (active_server_)
64  {
65  zmq::message_t req;
66  try
67  {
68  zmq::recv_result_t received = zmq_->server.recv(req);
69  if (received)
70  {
71  zmq::message_t reply(tree_buffer_.size());
72  memcpy(reply.data(), tree_buffer_.data(), tree_buffer_.size());
73  zmq_->server.send(reply, zmq::send_flags::none);
74  }
75  }
76  catch (zmq::error_t& err)
77  {
78  if (err.num() == ETERM)
79  {
80  std::cout << "[PublisherZMQ] Server quitting." << std::endl;
81  }
82  std::cout << "[PublisherZMQ] just died. Exeption " << err.what() << std::endl;
83  active_server_ = false;
84  }
85  }
86  });
87 
89 }
90 
92 {
93  active_server_ = false;
95  if (thread_.joinable())
96  {
97  thread_.join();
98  }
99  flush();
100  delete zmq_;
101  ref_count = false;
102 }
103 
104 
106 {
107  status_buffer_.clear();
108  applyRecursiveVisitor(tree_.rootNode(), [this](TreeNode* node) {
109  size_t index = status_buffer_.size();
110  status_buffer_.resize(index + 3);
111  flatbuffers::WriteScalar<uint16_t>(&status_buffer_[index], node->UID());
112  flatbuffers::WriteScalar<int8_t>(&status_buffer_[index + 2],
113  static_cast<int8_t>(convertToFlatbuffers(node->status())));
114  });
115 }
116 
117 void PublisherZMQ::callback(Duration timestamp, const TreeNode& node, NodeStatus prev_status,
118  NodeStatus status)
119 {
120  using namespace std::chrono;
121 
122  SerializedTransition transition =
123  SerializeTransition(node.UID(), timestamp, prev_status, status);
124  {
125  std::unique_lock<std::mutex> lock(mutex_);
126  transition_buffer_.push_back(transition);
127  }
128 
129  if (!send_pending_)
130  {
131  send_pending_ = true;
132  send_future_ = std::async(std::launch::async, [this]() {
133  std::this_thread::sleep_for(min_time_between_msgs_);
134  flush();
135  });
136  }
137 }
138 
140 {
141  zmq::message_t message;
142  {
143  std::unique_lock<std::mutex> lock(mutex_);
144 
145  const size_t msg_size = status_buffer_.size() + 8 + (transition_buffer_.size() * 12);
146 
147  message.rebuild(msg_size);
148  uint8_t* data_ptr = static_cast<uint8_t*>(message.data());
149 
150  // first 4 bytes are the side of the header
151  flatbuffers::WriteScalar<uint32_t>(data_ptr, static_cast<uint32_t>(status_buffer_.size()));
152  data_ptr += sizeof(uint32_t);
153  // copy the header part
154  memcpy(data_ptr, status_buffer_.data(), status_buffer_.size());
155  data_ptr += status_buffer_.size();
156 
157  // first 4 bytes are the side of the transition buffer
158  flatbuffers::WriteScalar<uint32_t>(data_ptr, static_cast<uint32_t>(transition_buffer_.size()));
159  data_ptr += sizeof(uint32_t);
160 
161  for (auto& transition : transition_buffer_)
162  {
163  memcpy(data_ptr, transition.data(), transition.size());
164  data_ptr += transition.size();
165  }
166  transition_buffer_.clear();
168  }
169  try
170  {
171  zmq_->publisher.send(message, zmq::send_flags::none);
172  }
173  catch (zmq::error_t& err)
174  {
175  if (err.num() == ETERM)
176  {
177  std::cout << "[PublisherZMQ] Publisher quitting." << std::endl;
178  }
179  std::cout << "[PublisherZMQ] just died. Exeption " << err.what() << std::endl;
180  }
181 
182  send_pending_ = false;
183  // printf("%.3f zmq send\n", std::chrono::duration<double>( std::chrono::high_resolution_clock::now().time_since_epoch() ).count());
184 }
185 }
std::vector< uint8_t > status_buffer_
virtual const char * what() const ZMQ_NOTHROW ZMQ_OVERRIDE
std::vector< uint8_t > tree_buffer_
std::atomic_bool active_server_
Definition: any.hpp:455
size_t send(const void *buf_, size_t len_, int flags_=0)
static std::atomic< bool > ref_count
virtual void flush() override
void CreateFlatbuffersBehaviorTree(flatbuffers::FlatBufferBuilder &builder, const BT::Tree &tree)
std::array< uint8_t, 12 > SerializedTransition
const BT::Tree & tree_
void * data() ZMQ_NOTHROW
uint16_t UID() const
Definition: tree_node.cpp:89
TreeNode * rootNode() const
Definition: bt_factory.h:176
Helper class to hold data needed in creation of a FlatBuffer. To serialize data, you typically call o...
Definition: flatbuffers.h:1132
std::future< void > send_future_
Serialization::NodeType convertToFlatbuffers(BT::NodeType type)
virtual void callback(Duration timestamp, const TreeNode &node, NodeStatus prev_status, NodeStatus status) override
int num() const ZMQ_NOTHROW
uoffset_t GetSize() const
The current size of the serialized buffer, counting from the end.
Definition: flatbuffers.h:1233
Struct used to store a tree. If this object goes out of scope, the tree is destroyed.
Definition: bt_factory.h:130
Abstract base class for Behavior Tree Nodes.
Definition: tree_node.h:53
void shutdown() ZMQ_NOTHROW
std::chrono::microseconds min_time_between_msgs_
void bind(std::string const &addr)
std::thread thread_
PublisherZMQ(const BT::Tree &tree, unsigned max_msg_per_second=25, unsigned publisher_port=1666, unsigned server_port=1667)
NodeStatus
Definition: basic_types.h:35
std::vector< SerializedTransition > transition_buffer_
std::chrono::high_resolution_clock::duration Duration
Definition: basic_types.h:340
std::atomic_bool send_pending_
void applyRecursiveVisitor(const TreeNode *root_node, const std::function< void(const TreeNode *)> &visitor)
uint8_t * GetBufferPointer() const
Get the serialized buffer (after you call Finish()).
Definition: flatbuffers.h:1238
SerializedTransition SerializeTransition(uint16_t UID, Duration timestamp, NodeStatus prev_status, NodeStatus status)


behaviotree_cpp_v3
Author(s): Michele Colledanchise, Davide Faconti
autogenerated on Tue May 4 2021 02:56:24