36 std::uniform_int_distribution<uint32_t> dist;
37 std::array<char, 16> out;
38 char*
bytes = out.data();
39 for(
int i = 0; i < 16; i += 4)
41 *
reinterpret_cast<uint32_t*
>(
bytes + i) = dist(gen);
58 server.set(zmq::sockopt::linger, 0);
61 int timeout_rcv = 100;
62 server.set(zmq::sockopt::rcvtimeo, timeout_rcv);
63 publisher.set(zmq::sockopt::rcvtimeo, timeout_rcv);
65 int timeout_ms = 1000;
66 server.set(zmq::sockopt::sndtimeo, timeout_ms);
67 publisher.set(zmq::sockopt::sndtimeo, timeout_ms);
86 std::unordered_map<std::string, std::weak_ptr<BT::Tree::Subtree>>
subtrees;
87 std::unordered_map<uint16_t, std::weak_ptr<BT::TreeNode>>
nodes_by_uid;
90 std::unordered_map<uint16_t, Monitor::Hook::Ptr>
pre_hooks;
91 std::unordered_map<uint16_t, Monitor::Hook::Ptr>
post_hooks;
110 _p->server_port = server_port;
117 auto msg =
StrCat(
"Another instance of Groot2Publisher is using port ",
129 size_t node_count = 0;
130 for(
const auto& subtree : tree.
subtrees)
132 node_count += subtree->nodes.size();
134 _p->status_buffer.resize(3 * node_count);
136 unsigned ptr_offset = 0;
137 char* buffer_ptr =
_p->status_buffer.data();
139 for(
const auto& subtree : tree.
subtrees)
142 subtree->instance_name.empty() ? subtree->tree_ID : subtree->instance_name;
143 _p->subtrees.insert({ name, subtree });
145 for(
const auto& node : subtree->nodes)
147 _p->nodes_by_uid.insert({ node->UID(), node });
150 _p->status_buffermap.insert({ node->UID(), buffer_ptr + ptr_offset });
158 _p->server.bind(
_p->server_address.c_str());
159 _p->publisher.bind(
_p->publisher_address.c_str());
167 _p->max_heartbeat_delay = delay;
172 return _p->max_heartbeat_delay;
179 _p->active_server =
false;
180 if(
_p->server_thread.joinable())
182 _p->server_thread.join();
185 if(
_p->heartbeat_thread.joinable())
187 _p->heartbeat_thread.join();
202 std::unique_lock<std::mutex> lk(
_p->status_mutex);
203 auto status =
static_cast<char>(new_status);
207 status = 10 +
static_cast<char>(prev_status);
209 *(
_p->status_buffermap.at(node.
UID())) = status;
215 trans.
status =
static_cast<uint8_t
>(new_status);
216 auto timestamp = ts -
_p->recording_fist_time;
218 std::chrono::duration_cast<std::chrono::microseconds>(timestamp).count();
219 _p->transitions_buffer.push_back(trans);
220 while(
_p->transitions_buffer.size() > 1000)
222 _p->transitions_buffer.pop_front();
236 _p->active_server =
true;
237 auto& socket =
_p->server;
239 auto sendErrorReply = [&socket](
const std::string& msg) {
240 zmq::multipart_t error_msg;
241 error_msg.addstr(
"error");
242 error_msg.addstr(msg);
243 error_msg.send(socket);
247 _p->last_heartbeat = std::chrono::system_clock::now();
249 while(
_p->active_server)
251 zmq::multipart_t requestMsg;
252 if(!requestMsg.recv(socket) || requestMsg.size() == 0)
257 _p->last_heartbeat = std::chrono::system_clock::now();
259 std::string
const request_str = requestMsg[0].to_string();
262 sendErrorReply(
"wrong request header");
269 reply_header.
request = request_header;
271 reply_header.
tree_id = serialized_uuid;
273 zmq::multipart_t reply_msg;
276 switch(request_header.type)
279 reply_msg.addstr(
_p->tree_xml);
284 std::unique_lock<std::mutex> lk(
_p->status_mutex);
285 reply_msg.addstr(
_p->status_buffer);
290 if(requestMsg.size() != 2)
292 sendErrorReply(
"must be 2 parts message");
295 std::string
const bb_names_str = requestMsg[1].to_string();
297 reply_msg.addmem(msg.data(), msg.size());
302 if(requestMsg.size() != 2)
304 sendErrorReply(
"must be 2 parts message");
309 uint16_t
const node_uid =
json[
"uid"].
get<uint16_t>();
312 if(
auto hook =
getHook(pos, node_uid))
314 std::unique_lock<std::mutex> lk(hook->mutex);
323 hook->wakeup.notify_all();
328 auto new_hook = std::make_shared<Monitor::Hook>();
337 if(received_json.is_array())
339 for(
auto const&
json : received_json)
346 InsertHook(received_json);
352 if(requestMsg.size() != 2)
354 sendErrorReply(
"must be 2 parts message");
359 uint16_t node_uid =
json.
at(
"uid").
get<uint16_t>();
360 std::string status_str =
json.
at(
"desired_status").
get<std::string>();
362 bool remove =
json.
at(
"remove_when_done").
get<
bool>();
365 if(status_str ==
"SUCCESS")
369 else if(status_str ==
"FAILURE")
376 sendErrorReply(
"Node ID not found");
393 if(requestMsg.size() != 2)
395 sendErrorReply(
"must be 2 parts message");
400 uint16_t node_uid =
json.
at(
"uid").
get<uint16_t>();
405 sendErrorReply(
"Node ID not found");
412 std::unique_lock<std::mutex> lk(
_p->hooks_map_mutex);
413 auto json_out = nlohmann::json::array();
414 for(
auto [node_uid, breakpoint] :
_p->pre_hooks)
416 json_out.push_back(*breakpoint);
418 reply_msg.addstr(json_out.dump());
423 if(requestMsg.size() != 2)
425 sendErrorReply(
"must be 2 parts message");
429 auto const cmd = (requestMsg[1].to_string());
432 _p->recording =
true;
434 _p->recording_fist_time = std::chrono::duration_cast<std::chrono::microseconds>(
437 auto now = std::chrono::duration_cast<std::chrono::microseconds>(
440 std::unique_lock<std::mutex> lk(
_p->status_mutex);
441 _p->transitions_buffer.clear();
443 else if(cmd ==
"stop")
445 _p->recording =
false;
451 thread_local std::string trans_buffer;
452 trans_buffer.resize(9 *
_p->transitions_buffer.size());
454 std::unique_lock<std::mutex> lk(
_p->status_mutex);
456 for(
const auto& trans :
_p->transitions_buffer)
458 std::memcpy(&trans_buffer[offset], &trans.timestamp_usec, 6);
460 std::memcpy(&trans_buffer[offset], &trans.node_uid, 2);
462 std::memcpy(&trans_buffer[offset], &trans.status, 1);
465 _p->transitions_buffer.clear();
466 trans_buffer.resize(offset);
467 reply_msg.addstr(trans_buffer);
472 sendErrorReply(
"Request not recognized");
477 reply_msg.send(socket);
483 std::unique_lock<std::mutex> lk(_p->hooks_map_mutex);
484 for(
auto& [node_uid, hook] : _p->pre_hooks)
486 std::unique_lock<std::mutex> lk(hook->mutex);
487 hook->enabled = enable;
492 hook->wakeup.notify_all();
499 bool has_heartbeat =
true;
501 while(
_p->active_server)
503 std::this_thread::sleep_for(std::chrono::milliseconds(10));
505 auto now = std::chrono::system_clock::now();
506 bool prev_heartbeat = has_heartbeat;
508 has_heartbeat = (now -
_p->last_heartbeat <
_p->max_heartbeat_delay);
511 if(has_heartbeat != prev_heartbeat)
522 for(
auto name : bb_names)
524 std::string
const bb_name(name);
525 auto it =
_p->subtrees.find(bb_name);
527 if(it !=
_p->subtrees.end())
530 if(
auto subtree = it->second.lock())
536 return nlohmann::json::to_msgpack(
json);
541 auto const node_uid = hook->node_uid;
542 auto it =
_p->nodes_by_uid.find(node_uid);
543 if(it ==
_p->nodes_by_uid.end())
554 std::unique_lock<std::mutex> lk(hook->mutex);
562 zmq::multipart_t request_msg;
565 request_msg.send(
_p->publisher);
570 hook->wakeup.wait(lk, [hook]() {
return hook->ready || !hook->enabled; });
581 if(hook->remove_when_done)
584 std::unique_lock<std::mutex> lk(
_p->hooks_map_mutex);
585 _p->pre_hooks.erase(hook->node_uid);
586 node.setPreTickFunction({});
588 return hook->desired_status;
591 std::unique_lock<std::mutex> lk(
_p->hooks_map_mutex);
592 _p->pre_hooks[node_uid] = hook;
593 node->setPreTickFunction(injectedCallback);
601 auto it =
_p->nodes_by_uid.find(node_uid);
602 if(it ==
_p->nodes_by_uid.end())
612 auto hook =
getHook(pos, node_uid);
619 std::unique_lock<std::mutex> lk(hook->mutex);
620 hook->desired_status = result;
621 hook->remove_when_done |= remove;
626 hook->wakeup.notify_all();
634 auto it =
_p->nodes_by_uid.find(node_uid);
635 if(it ==
_p->nodes_by_uid.end())
645 auto hook =
getHook(pos, node_uid);
652 std::unique_lock<std::mutex> lk(
_p->hooks_map_mutex);
653 _p->pre_hooks.erase(node_uid);
655 node->setPreTickFunction({});
659 std::unique_lock<std::mutex> lk(hook->mutex);
662 hook->enabled =
false;
664 hook->wakeup.notify_all();
672 std::vector<uint16_t> uids;
678 std::unique_lock<std::mutex> lk(
_p->hooks_map_mutex);
681 uids.reserve(hooks->size());
682 for(
auto [node_uid,
_] : *hooks)
684 uids.push_back(node_uid);
688 for(
auto node_uid : uids)
699 std::unique_lock<std::mutex> lk(
_p->hooks_map_mutex);
700 auto bk_it = hooks->find(node_uid);
701 if(bk_it == hooks->end())
705 return bk_it->second;