25 unsigned max_msg_per_second,
26 unsigned publisher_port,
34 bool expected =
false;
35 if (!
ref_count.compare_exchange_strong(expected,
true))
37 throw LogicError(
"Only one instance of PublisherZMQ shall be created");
39 if( publisher_port == server_port)
41 throw LogicError(
"The TCP ports of the publisher and the server must be different");
52 sprintf(str,
"tcp://*:%d", publisher_port);
54 sprintf(str,
"tcp://*:%d", server_port);
58 zmq_->
server.set(zmq::sockopt::rcvtimeo, timeout_ms);
62 thread_ = std::thread([
this]() {
68 zmq::recv_result_t received =
zmq_->
server.recv(req);
78 if (err.
num() == ETERM)
80 std::cout <<
"[PublisherZMQ] Server quitting." << std::endl;
82 std::cout <<
"[PublisherZMQ] just died. Exeption " << err.
what() << std::endl;
111 flatbuffers::WriteScalar<uint16_t>(&
status_buffer_[index], node->UID());
112 flatbuffers::WriteScalar<int8_t>(&status_buffer_[index + 2],
125 std::unique_lock<std::mutex> lock(
mutex_);
132 send_future_ = std::async(std::launch::async, [
this]() {
143 std::unique_lock<std::mutex> lock(
mutex_);
148 uint8_t* data_ptr =
static_cast<uint8_t*
>(message.
data());
151 flatbuffers::WriteScalar<uint32_t>(data_ptr,
static_cast<uint32_t
>(
status_buffer_.size()));
152 data_ptr +=
sizeof(uint32_t);
158 flatbuffers::WriteScalar<uint32_t>(data_ptr,
static_cast<uint32_t
>(
transition_buffer_.size()));
159 data_ptr +=
sizeof(uint32_t);
163 memcpy(data_ptr, transition.data(), transition.size());
164 data_ptr += transition.size();
166 transition_buffer_.clear();
175 if (err.
num() == ETERM)
177 std::cout <<
"[PublisherZMQ] Publisher quitting." << std::endl;
179 std::cout <<
"[PublisherZMQ] just died. Exeption " << err.
what() << std::endl;
std::vector< uint8_t > status_buffer_
virtual const char * what() const ZMQ_NOTHROW ZMQ_OVERRIDE
std::vector< uint8_t > tree_buffer_
std::atomic_bool active_server_
size_t send(const void *buf_, size_t len_, int flags_=0)
static std::atomic< bool > ref_count
virtual void flush() override
void createStatusBuffer()
void CreateFlatbuffersBehaviorTree(flatbuffers::FlatBufferBuilder &builder, const BT::Tree &tree)
std::array< uint8_t, 12 > SerializedTransition
void * data() ZMQ_NOTHROW
TreeNode * rootNode() const
Helper class to hold data needed in creation of a FlatBuffer. To serialize data, you typically call o...
std::future< void > send_future_
Serialization::NodeType convertToFlatbuffers(BT::NodeType type)
virtual void callback(Duration timestamp, const TreeNode &node, NodeStatus prev_status, NodeStatus status) override
int num() const ZMQ_NOTHROW
uoffset_t GetSize() const
The current size of the serialized buffer, counting from the end.
Struct used to store a tree. If this object goes out of scope, the tree is destroyed.
Abstract base class for Behavior Tree Nodes.
void shutdown() ZMQ_NOTHROW
std::chrono::microseconds min_time_between_msgs_
void bind(std::string const &addr)
PublisherZMQ(const BT::Tree &tree, unsigned max_msg_per_second=25, unsigned publisher_port=1666, unsigned server_port=1667)
std::vector< SerializedTransition > transition_buffer_
std::chrono::high_resolution_clock::duration Duration
std::atomic_bool send_pending_
void applyRecursiveVisitor(const TreeNode *root_node, const std::function< void(const TreeNode *)> &visitor)
uint8_t * GetBufferPointer() const
Get the serialized buffer (after you call Finish()).
SerializedTransition SerializeTransition(uint16_t UID, Duration timestamp, NodeStatus prev_status, NodeStatus status)