Go to the documentation of this file.00001 #include "behaviortree_cpp/loggers/bt_zmq_publisher.h"
00002 #include "behaviortree_cpp/loggers/bt_flatbuffer_helper.h"
00003 #include <future>
00004 #include <zmq.hpp>
00005
00006 namespace BT
00007 {
00008 std::atomic<bool> PublisherZMQ::ref_count(false);
00009
00010 struct PublisherZMQ::Pimpl
00011 {
00012 Pimpl():
00013 context(1)
00014 , publisher(context, ZMQ_PUB)
00015 , server(context, ZMQ_REP)
00016 {}
00017
00018 zmq::context_t context;
00019 zmq::socket_t publisher;
00020 zmq::socket_t server;
00021 };
00022
00023
00024 PublisherZMQ::PublisherZMQ(TreeNode* root_node, int max_msg_per_second)
00025 : StatusChangeLogger(root_node)
00026 , root_node_(root_node)
00027 , min_time_between_msgs_(std::chrono::microseconds(1000 * 1000) / max_msg_per_second)
00028 , send_pending_(false)
00029 , zmq_(new Pimpl())
00030 {
00031 static bool first_instance = true;
00032 if (first_instance)
00033 {
00034 first_instance = false;
00035 }
00036 else
00037 {
00038 throw std::logic_error("Only one instance of PublisherZMQ shall be created");
00039 }
00040
00041 flatbuffers::FlatBufferBuilder builder(1024);
00042 CreateFlatbuffersBehaviorTree(builder, root_node);
00043
00044 tree_buffer_.resize(builder.GetSize());
00045 memcpy(tree_buffer_.data(), builder.GetBufferPointer(), builder.GetSize());
00046
00047 zmq_->publisher.bind("tcp://*:1666");
00048 zmq_->server.bind("tcp://*:1667");
00049
00050 int timeout_ms = 100;
00051 zmq_->server.setsockopt(ZMQ_RCVTIMEO, &timeout_ms, sizeof(int));
00052
00053 active_server_ = true;
00054
00055 thread_ = std::thread([this]() {
00056 while (active_server_)
00057 {
00058 zmq::message_t req;
00059 try
00060 {
00061 bool received = zmq_->server.recv(&req);
00062 if (received)
00063 {
00064 zmq::message_t reply(tree_buffer_.size());
00065 memcpy(reply.data(), tree_buffer_.data(), tree_buffer_.size());
00066 zmq_->server.send(reply);
00067 }
00068 }
00069 catch (zmq::error_t& err)
00070 {
00071 std::cout << "[PublisherZMQ] just died. Exeption " << err.what() << std::endl;
00072 active_server_ = false;
00073 }
00074 }
00075 });
00076
00077 createStatusBuffer();
00078 }
00079
00080 PublisherZMQ::~PublisherZMQ()
00081 {
00082 active_server_ = false;
00083 if (thread_.joinable())
00084 {
00085 thread_.join();
00086 }
00087 flush();
00088 delete zmq_;
00089 ref_count = false;
00090 }
00091
00092
00093 void PublisherZMQ::createStatusBuffer()
00094 {
00095 status_buffer_.clear();
00096 applyRecursiveVisitor(root_node_, [this](TreeNode* node) {
00097 size_t index = status_buffer_.size();
00098 status_buffer_.resize(index + 3);
00099 flatbuffers::WriteScalar<uint16_t>(&status_buffer_[index], node->UID());
00100 flatbuffers::WriteScalar<int8_t>(&status_buffer_[index + 2],
00101 static_cast<int8_t>(convertToFlatbuffers(node->status())));
00102 });
00103 }
00104
00105 void PublisherZMQ::callback(Duration timestamp, const TreeNode& node, NodeStatus prev_status,
00106 NodeStatus status)
00107 {
00108 using namespace std::chrono;
00109
00110 SerializedTransition transition =
00111 SerializeTransition(node.UID(), timestamp, prev_status, status);
00112 {
00113 std::unique_lock<std::mutex> lock(mutex_);
00114 transition_buffer_.push_back(transition);
00115 }
00116
00117 if (!send_pending_)
00118 {
00119 send_pending_ = true;
00120 send_future_ = std::async(std::launch::async, [this]() {
00121 std::this_thread::sleep_for(min_time_between_msgs_);
00122 flush();
00123 });
00124 }
00125 }
00126
00127 void PublisherZMQ::flush()
00128 {
00129 zmq::message_t message;
00130 {
00131 std::unique_lock<std::mutex> lock(mutex_);
00132
00133 const size_t msg_size = status_buffer_.size() + 8 + (transition_buffer_.size() * 12);
00134
00135 message.rebuild(msg_size);
00136 uint8_t* data_ptr = static_cast<uint8_t*>(message.data());
00137
00138
00139 flatbuffers::WriteScalar<uint32_t>(data_ptr, status_buffer_.size());
00140 data_ptr += sizeof(uint32_t);
00141
00142 memcpy(data_ptr, status_buffer_.data(), status_buffer_.size());
00143 data_ptr += status_buffer_.size();
00144
00145
00146 flatbuffers::WriteScalar<uint32_t>(data_ptr, transition_buffer_.size());
00147 data_ptr += sizeof(uint32_t);
00148
00149 for (auto& transition : transition_buffer_)
00150 {
00151 memcpy(data_ptr, transition.data(), transition.size());
00152 data_ptr += transition.size();
00153 }
00154 transition_buffer_.clear();
00155 createStatusBuffer();
00156 }
00157
00158 zmq_->publisher.send(message);
00159 send_pending_ = false;
00160
00161 }
00162 }