21 unsigned publisher_port,
unsigned server_port) :
28 bool expected =
false;
29 if (!
ref_count.compare_exchange_strong(expected,
true))
31 throw LogicError(
"Only one instance of PublisherZMQ shall be created");
33 if (publisher_port == server_port)
35 throw LogicError(
"The TCP ports of the publisher and the server must be " 47 sprintf(str,
"tcp://*:%d", publisher_port);
49 sprintf(str,
"tcp://*:%d", server_port);
53 zmq_->
server.set(zmq::sockopt::rcvtimeo, timeout_ms);
57 thread_ = std::thread([
this]() {
63 zmq::recv_result_t received =
zmq_->
server.recv(req);
73 if (err.
num() == ETERM)
75 std::cout <<
"[PublisherZMQ] Server quitting." << std::endl;
77 std::cout <<
"[PublisherZMQ] just died. Exception " << err.
what() << std::endl;
110 flatbuffers::WriteScalar<uint16_t>(&
status_buffer_[index], node->UID());
111 flatbuffers::WriteScalar<int8_t>(
112 &status_buffer_[index + 2],
123 std::unique_lock<std::mutex> lock(
mutex_);
129 send_future_ = std::async(std::launch::async, [
this]() {
130 std::unique_lock<std::mutex> lock(
mutex_);
134 if (!is_server_inactive)
146 std::unique_lock<std::mutex> lock(
mutex_);
151 uint8_t* data_ptr =
static_cast<uint8_t*
>(message.
data());
154 flatbuffers::WriteScalar<uint32_t>(data_ptr,
156 data_ptr +=
sizeof(uint32_t);
162 flatbuffers::WriteScalar<uint32_t>(data_ptr,
164 data_ptr +=
sizeof(uint32_t);
168 memcpy(data_ptr, transition.data(), transition.size());
169 data_ptr += transition.size();
171 transition_buffer_.clear();
180 if (err.
num() == ETERM)
182 std::cout <<
"[PublisherZMQ] Publisher quitting." << std::endl;
184 std::cout <<
"[PublisherZMQ] just died. Exception " << err.
what() << std::endl;
std::vector< uint8_t > status_buffer_
virtual const char * what() const ZMQ_NOTHROW ZMQ_OVERRIDE
std::vector< uint8_t > tree_buffer_
uint8_t * GetBufferPointer() const
Get the serialized buffer (after you call Finish()).
std::atomic_bool active_server_
size_t send(const void *buf_, size_t len_, int flags_=0)
static std::atomic< bool > ref_count
virtual void flush() override
void createStatusBuffer()
void CreateFlatbuffersBehaviorTree(flatbuffers::FlatBufferBuilder &builder, const BT::Tree &tree)
std::array< uint8_t, 12 > SerializedTransition
TreeNode * rootNode() const
void * data() ZMQ_NOTHROW
Helper class to hold data needed in creation of a FlatBuffer. To serialize data, you typically call o...
std::future< void > send_future_
Serialization::NodeType convertToFlatbuffers(BT::NodeType type)
virtual void callback(Duration timestamp, const TreeNode &node, NodeStatus prev_status, NodeStatus status) override
int num() const ZMQ_NOTHROW
Struct used to store a tree. If this object goes out of scope, the tree is destroyed.
Abstract base class for Behavior Tree Nodes.
void shutdown() ZMQ_NOTHROW
std::chrono::microseconds min_time_between_msgs_
void bind(std::string const &addr)
uoffset_t GetSize() const
The current size of the serialized buffer, counting from the end.
PublisherZMQ(const BT::Tree &tree, unsigned max_msg_per_second=25, unsigned publisher_port=1666, unsigned server_port=1667)
std::condition_variable send_condition_variable_
std::vector< SerializedTransition > transition_buffer_
std::chrono::high_resolution_clock::duration Duration
std::atomic_bool send_pending_
void applyRecursiveVisitor(const TreeNode *root_node, const std::function< void(const TreeNode *)> &visitor)
SerializedTransition SerializeTransition(uint16_t UID, Duration timestamp, NodeStatus prev_status, NodeStatus status)