bt_zmq_publisher.cpp
Go to the documentation of this file.
3 #include <future>
4 #include <zmq.hpp>
5 
6 namespace BT
7 {
8 std::atomic<bool> PublisherZMQ::ref_count(false);
9 
11 {
12  Pimpl():
13  context(1)
14  , publisher(context, ZMQ_PUB)
15  , server(context, ZMQ_REP)
16  {}
17 
18  zmq::context_t context;
19  zmq::socket_t publisher;
20  zmq::socket_t server;
21 };
22 
23 
24 PublisherZMQ::PublisherZMQ(TreeNode* root_node, int max_msg_per_second)
25  : StatusChangeLogger(root_node)
26  , root_node_(root_node)
27  , min_time_between_msgs_(std::chrono::microseconds(1000 * 1000) / max_msg_per_second)
28  , send_pending_(false)
29  , zmq_(new Pimpl())
30 {
31  static bool first_instance = true;
32  if (first_instance)
33  {
34  first_instance = false;
35  }
36  else
37  {
38  throw std::logic_error("Only one instance of PublisherZMQ shall be created");
39  }
40 
42  CreateFlatbuffersBehaviorTree(builder, root_node);
43 
44  tree_buffer_.resize(builder.GetSize());
45  memcpy(tree_buffer_.data(), builder.GetBufferPointer(), builder.GetSize());
46 
47  zmq_->publisher.bind("tcp://*:1666");
48  zmq_->server.bind("tcp://*:1667");
49 
50  int timeout_ms = 100;
51  zmq_->server.setsockopt(ZMQ_RCVTIMEO, &timeout_ms, sizeof(int));
52 
53  active_server_ = true;
54 
55  thread_ = std::thread([this]() {
56  while (active_server_)
57  {
58  zmq::message_t req;
59  try
60  {
61  bool received = zmq_->server.recv(&req);
62  if (received)
63  {
64  zmq::message_t reply(tree_buffer_.size());
65  memcpy(reply.data(), tree_buffer_.data(), tree_buffer_.size());
66  zmq_->server.send(reply);
67  }
68  }
69  catch (zmq::error_t& err)
70  {
71  std::cout << "[PublisherZMQ] just died. Exeption " << err.what() << std::endl;
72  active_server_ = false;
73  }
74  }
75  });
76 
78 }
79 
81 {
82  active_server_ = false;
83  if (thread_.joinable())
84  {
85  thread_.join();
86  }
87  flush();
88  delete zmq_;
89  ref_count = false;
90 }
91 
92 
94 {
95  status_buffer_.clear();
97  size_t index = status_buffer_.size();
98  status_buffer_.resize(index + 3);
99  flatbuffers::WriteScalar<uint16_t>(&status_buffer_[index], node->UID());
100  flatbuffers::WriteScalar<int8_t>(&status_buffer_[index + 2],
101  static_cast<int8_t>(convertToFlatbuffers(node->status())));
102  });
103 }
104 
105 void PublisherZMQ::callback(Duration timestamp, const TreeNode& node, NodeStatus prev_status,
106  NodeStatus status)
107 {
108  using namespace std::chrono;
109 
110  SerializedTransition transition =
111  SerializeTransition(node.UID(), timestamp, prev_status, status);
112  {
113  std::unique_lock<std::mutex> lock(mutex_);
114  transition_buffer_.push_back(transition);
115  }
116 
117  if (!send_pending_)
118  {
119  send_pending_ = true;
120  send_future_ = std::async(std::launch::async, [this]() {
121  std::this_thread::sleep_for(min_time_between_msgs_);
122  flush();
123  });
124  }
125 }
126 
128 {
129  zmq::message_t message;
130  {
131  std::unique_lock<std::mutex> lock(mutex_);
132 
133  const size_t msg_size = status_buffer_.size() + 8 + (transition_buffer_.size() * 12);
134 
135  message.rebuild(msg_size);
136  uint8_t* data_ptr = static_cast<uint8_t*>(message.data());
137 
138  // first 4 bytes are the side of the header
139  flatbuffers::WriteScalar<uint32_t>(data_ptr, status_buffer_.size());
140  data_ptr += sizeof(uint32_t);
141  // copy the header part
142  memcpy(data_ptr, status_buffer_.data(), status_buffer_.size());
143  data_ptr += status_buffer_.size();
144 
145  // first 4 bytes are the side of the transition buffer
146  flatbuffers::WriteScalar<uint32_t>(data_ptr, transition_buffer_.size());
147  data_ptr += sizeof(uint32_t);
148 
149  for (auto& transition : transition_buffer_)
150  {
151  memcpy(data_ptr, transition.data(), transition.size());
152  data_ptr += transition.size();
153  }
154  transition_buffer_.clear();
156  }
157 
158  zmq_->publisher.send(message);
159  send_pending_ = false;
160  // printf("%.3f zmq send\n", std::chrono::duration<double>( std::chrono::high_resolution_clock::now().time_since_epoch() ).count());
161 }
162 }
std::vector< uint8_t > status_buffer_
std::array< uint8_t, 12 > SerializedTransition
PublisherZMQ(TreeNode *root_node, int max_msg_per_second=25)
std::vector< uint8_t > tree_buffer_
TreeNode * root_node_
std::atomic_bool active_server_
Definition: any.hpp:455
static std::atomic< bool > ref_count
BT_Serialization::Type convertToFlatbuffers(NodeType type)
virtual void flush() override
uint16_t UID() const
Definition: tree_node.cpp:108
Helper class to hold data needed in creation of a FlatBuffer. To serialize data, you typically call o...
Definition: flatbuffers.h:727
std::future< void > send_future_
virtual void callback(Duration timestamp, const TreeNode &node, NodeStatus prev_status, NodeStatus status) override
builder
Definition: build.py:70
uoffset_t GetSize() const
The current size of the serialized buffer, counting from the end.
Definition: flatbuffers.h:779
std::chrono::microseconds min_time_between_msgs_
NodeStatus status() const
Definition: tree_node.cpp:75
std::thread thread_
void CreateFlatbuffersBehaviorTree(flatbuffers::FlatBufferBuilder &builder, BT::TreeNode *root_node)
NodeStatus
Definition: basic_types.h:28
std::vector< SerializedTransition > transition_buffer_
std::chrono::high_resolution_clock::duration Duration
Definition: tree_node.h:36
std::atomic_bool send_pending_
void applyRecursiveVisitor(const TreeNode *root_node, const std::function< void(const TreeNode *)> &visitor)
uint8_t * GetBufferPointer() const
Get the serialized buffer (after you call Finish()).
Definition: flatbuffers.h:784
SerializedTransition SerializeTransition(uint16_t UID, Duration timestamp, NodeStatus prev_status, NodeStatus status)


behaviortree_cpp
Author(s): Michele Colledanchise, Davide Faconti
autogenerated on Sat Feb 2 2019 04:01:53