bt_zmq_publisher.cpp
Go to the documentation of this file.
00001 #include "behaviortree_cpp/loggers/bt_zmq_publisher.h"
00002 #include "behaviortree_cpp/loggers/bt_flatbuffer_helper.h"
00003 #include <future>
00004 #include <zmq.hpp>
00005 
00006 namespace BT
00007 {
00008 std::atomic<bool> PublisherZMQ::ref_count(false);
00009 
00010 struct PublisherZMQ::Pimpl
00011 {
00012     Pimpl():
00013         context(1)
00014       , publisher(context, ZMQ_PUB)
00015       , server(context, ZMQ_REP)
00016     {}
00017 
00018     zmq::context_t context;
00019     zmq::socket_t publisher;
00020     zmq::socket_t server;
00021 };
00022 
00023 
00024 PublisherZMQ::PublisherZMQ(TreeNode* root_node, int max_msg_per_second)
00025   : StatusChangeLogger(root_node)
00026   , root_node_(root_node)
00027   , min_time_between_msgs_(std::chrono::microseconds(1000 * 1000) / max_msg_per_second)
00028   , send_pending_(false)
00029   , zmq_(new Pimpl())
00030 {
00031     static bool first_instance = true;
00032     if (first_instance)
00033     {
00034         first_instance = false;
00035     }
00036     else
00037     {
00038         throw std::logic_error("Only one instance of PublisherZMQ shall be created");
00039     }
00040 
00041     flatbuffers::FlatBufferBuilder builder(1024);
00042     CreateFlatbuffersBehaviorTree(builder, root_node);
00043 
00044     tree_buffer_.resize(builder.GetSize());
00045     memcpy(tree_buffer_.data(), builder.GetBufferPointer(), builder.GetSize());
00046 
00047     zmq_->publisher.bind("tcp://*:1666");
00048     zmq_->server.bind("tcp://*:1667");
00049 
00050     int timeout_ms = 100;
00051     zmq_->server.setsockopt(ZMQ_RCVTIMEO, &timeout_ms, sizeof(int));
00052 
00053     active_server_ = true;
00054 
00055     thread_ = std::thread([this]() {
00056         while (active_server_)
00057         {
00058             zmq::message_t req;
00059             try
00060             {
00061                 bool received = zmq_->server.recv(&req);
00062                 if (received)
00063                 {
00064                     zmq::message_t reply(tree_buffer_.size());
00065                     memcpy(reply.data(), tree_buffer_.data(), tree_buffer_.size());
00066                     zmq_->server.send(reply);
00067                 }
00068             }
00069             catch (zmq::error_t& err)
00070             {
00071                 std::cout << "[PublisherZMQ] just died. Exeption " << err.what() << std::endl;
00072                 active_server_ = false;
00073             }
00074         }
00075     });
00076 
00077     createStatusBuffer();
00078 }
00079 
00080 PublisherZMQ::~PublisherZMQ()
00081 {
00082     active_server_ = false;
00083     if (thread_.joinable())
00084     {
00085         thread_.join();
00086     }
00087     flush();
00088     delete zmq_;
00089         ref_count = false;
00090 }
00091 
00092 
00093 void PublisherZMQ::createStatusBuffer()
00094 {
00095     status_buffer_.clear();
00096     applyRecursiveVisitor(root_node_, [this](TreeNode* node) {
00097         size_t index = status_buffer_.size();
00098         status_buffer_.resize(index + 3);
00099         flatbuffers::WriteScalar<uint16_t>(&status_buffer_[index], node->UID());
00100         flatbuffers::WriteScalar<int8_t>(&status_buffer_[index + 2],
00101                                          static_cast<int8_t>(convertToFlatbuffers(node->status())));
00102     });
00103 }
00104 
00105 void PublisherZMQ::callback(Duration timestamp, const TreeNode& node, NodeStatus prev_status,
00106                             NodeStatus status)
00107 {
00108     using namespace std::chrono;
00109 
00110     SerializedTransition transition =
00111         SerializeTransition(node.UID(), timestamp, prev_status, status);
00112     {
00113         std::unique_lock<std::mutex> lock(mutex_);
00114         transition_buffer_.push_back(transition);
00115     }
00116 
00117     if (!send_pending_)
00118     {
00119         send_pending_ = true;
00120         send_future_ = std::async(std::launch::async, [this]() {
00121             std::this_thread::sleep_for(min_time_between_msgs_);
00122             flush();
00123         });
00124     }
00125 }
00126 
00127 void PublisherZMQ::flush()
00128 {
00129     zmq::message_t message;
00130     {
00131         std::unique_lock<std::mutex> lock(mutex_);
00132 
00133         const size_t msg_size = status_buffer_.size() + 8 + (transition_buffer_.size() * 12);
00134 
00135         message.rebuild(msg_size);
00136         uint8_t* data_ptr = static_cast<uint8_t*>(message.data());
00137 
00138         // first 4 bytes are the side of the header
00139         flatbuffers::WriteScalar<uint32_t>(data_ptr, status_buffer_.size());
00140         data_ptr += sizeof(uint32_t);
00141         // copy the header part
00142         memcpy(data_ptr, status_buffer_.data(), status_buffer_.size());
00143         data_ptr += status_buffer_.size();
00144 
00145         // first 4 bytes are the side of the transition buffer
00146         flatbuffers::WriteScalar<uint32_t>(data_ptr, transition_buffer_.size());
00147         data_ptr += sizeof(uint32_t);
00148 
00149         for (auto& transition : transition_buffer_)
00150         {
00151             memcpy(data_ptr, transition.data(), transition.size());
00152             data_ptr += transition.size();
00153         }
00154         transition_buffer_.clear();
00155         createStatusBuffer();
00156     }
00157 
00158     zmq_->publisher.send(message);
00159     send_pending_ = false;
00160     // printf("%.3f zmq send\n", std::chrono::duration<double>( std::chrono::high_resolution_clock::now().time_since_epoch() ).count());
00161 }
00162 }


behaviortree_cpp
Author(s): Michele Colledanchise, Davide Faconti
autogenerated on Sat Feb 2 2019 03:50:10