36 #include <glog/logging.h>
50 #define RX_BUFFER_BYTES (20996420)
51 #define MAX_RETRY_CNT 3
56 using namespace payload;
155 std::unique_ptr<zmq::context_t> m_context;
156 std::unique_ptr<zmq::socket_t> m_command;
157 std::unique_ptr<zmq::socket_t> m_monitor;
158 m_context = std::make_unique<zmq::context_t>(2);
159 contexts.at(m_connectionId) = std::move(m_context);
161 std::make_unique<zmq::socket_t>(*contexts.at(m_connectionId),
ZMQ_REQ);
163 std::make_unique<zmq::socket_t>(*contexts.at(m_connectionId),
ZMQ_PAIR);
165 if (!m_connectionId) {
167 command_socket.clear();
168 command_socket.emplace_back(std::move(m_command));
170 monitor_sockets.clear();
171 monitor_sockets.emplace_back(std::move(m_monitor));
175 for (
size_t i = 0;
i < command_socket.size();
i++) {
176 if (command_socket.at(
i) ==
nullptr) {
177 command_socket.erase(command_socket.begin() +
i);
181 command_socket.push_back(std::move(m_command));
183 for (
size_t i = 0;
i < monitor_sockets.size();
i++) {
184 if (monitor_sockets.at(
i) ==
nullptr) {
185 monitor_sockets.erase(monitor_sockets.begin() +
i);
189 monitor_sockets.push_back(std::move(m_monitor));
193 "inproc://monitor-client" + std::to_string(m_connectionId);
198 monitor_sockets[m_connectionId]->connect(monitor_endpoint);
201 threadObj[m_connectionId] =
205 threadObj[m_connectionId].detach();
209 if (!Server_Connected[m_connectionId]) {
210 LOG(
INFO) <<
"Attempting to connect server... ";
213 command_socket[m_connectionId]->connect(
"tcp://" + ip +
218 LOG(
ERROR) <<
"Host is unreachable";
227 std::unique_lock<std::recursive_mutex> mlock(m_mutex[m_connectionId]);
230 if (Cond_Var[m_connectionId].wait_for(
231 mlock, std::chrono::seconds(3),
233 Server_Connected[m_connectionId] =
false;
235 }
else if (command_socket.at(m_connectionId) !=
NULL) {
238 send_buff[m_connectionId].set_func_name(
"ServerConnect");
239 send_buff[m_connectionId].set_expect_reply(
true);
240 if (SendCommand() != 0) {
241 LOG(
ERROR) <<
"Send Command Failed";
242 Server_Connected[m_connectionId] =
false;
245 if (recv_server_data() == 0) {
247 if (strcmp(recv_buff[m_connectionId].
message().c_str(),
250 cout <<
"Conn established" << endl;
255 cout <<
"Server Message :: "
256 << recv_buff[m_connectionId].message() << endl;
257 Server_Connected[m_connectionId] =
false;
262 Server_Connected[m_connectionId] =
false;
265 }
else if (command_socket.at(m_connectionId) ==
NULL) {
266 Server_Connected[m_connectionId] =
false;
282 uint8_t numRetry = 0;
283 int siz = send_buff[m_connectionId].ByteSize();
284 unsigned char *pkt =
new unsigned char[siz];
288 send_buff[m_connectionId].SerializeToCodedStream(coded_output);
290 recv_buff[m_connectionId].Clear();
292 while (numRetry++ <
MAX_RETRY_CNT && Server_Connected[m_connectionId]) {
295 if (command_socket[m_connectionId]->
send(request,
296 zmq::send_flags::none)) {
298 Send_Successful[m_connectionId] =
true;
299 Cond_Var[m_connectionId].notify_all();
305 std::cout <<
"Send Timeout error" << std::endl;
310 if (!Server_Connected[m_connectionId]) {
314 m_latestActivityTimestamp = std::chrono::steady_clock::now();
328 uint8_t numRetry = 0;
331 Server_Connected[m_connectionId] !=
false) {
334 std::unique_lock<std::mutex> mlock(mutex_recv[m_connectionId]);
335 if (Cond_Var[m_connectionId].wait_for(
336 mlock, std::chrono::seconds(10),
339 Data_Received[m_connectionId] =
false;
343 if (command_socket[m_connectionId]->recv(
message,
344 zmq::recv_flags::none)) {
348 static_cast<char *
>(
message.data()),
349 static_cast<int>(
message.size()));
351 recv_buff[m_connectionId].ParseFromCodedStream(
355 Data_Received[m_connectionId] =
true;
357 if (recv_buff[m_connectionId].interrupt_occured()) {
358 InterruptDetected[m_connectionId] =
true;
362 Cond_Var[m_connectionId].notify_all();
369 <<
"Data not received from server going to send again";
370 if (SendCommand() != 0) {
377 if (SendCommand() != 0) {
384 if (Server_Connected[m_connectionId] ==
false) {
388 send_buff[m_connectionId].Clear();
391 if (InterruptDetected[m_connectionId]) {
392 InterruptDetected[m_connectionId] =
false;
399 m_latestActivityTimestamp = std::chrono::steady_clock::now();
419 if (monitor_sockets[m_connectionId]) {
420 monitor_sockets[m_connectionId]->recv(msg);
423 monitor_sockets[m_connectionId]->close();
428 callback_function(command_socket.at(m_connectionId),
event);
443 int connectionId = 0;
444 auto status = std::find(command_socket.begin(), command_socket.end(), stx);
445 if (status != command_socket.end()) {
447 static_cast<int>(std::distance(command_socket.begin(), status));
451 switch (reason.
event) {
453 LOG(
INFO) <<
"Connected to server";
454 Server_Connected[connectionId] =
true;
455 Cond_Var[connectionId].notify_all();
458 LOG(
INFO) <<
"Closed connection with connection ID: " << connectionId;
461 std::lock_guard<std::recursive_mutex> guard(m_mutex[connectionId]);
462 Server_Connected[connectionId] =
false;
464 command_socket.at(connectionId)->close();
465 monitor_sockets.at(connectionId)->close();
466 contexts.at(connectionId)->close();
467 command_socket.at(connectionId) =
NULL;
468 monitor_sockets.at(connectionId) =
NULL;
469 contexts.at(connectionId) =
NULL;
473 LOG(
INFO) <<
"Connection retried to with connection ID: "
477 LOG(
INFO) <<
"Disconnected from server at with connection ID: "
481 std::lock_guard<std::recursive_mutex> guard(m_mutex[connectionId]);
482 Server_Connected[connectionId] =
false;
484 command_socket.at(connectionId)->close();
485 monitor_sockets.at(connectionId)->close();
486 contexts.at(connectionId)->close();
487 command_socket.at(connectionId) =
NULL;
488 monitor_sockets.at(connectionId) =
NULL;
489 contexts.at(connectionId) =
NULL;
493 LOG(
INFO) <<
"Event: CONNECT_DELAYED - Connection attempt delayed, "
494 "server might be unavailable.";
496 std::lock_guard<std::recursive_mutex> guard(m_mutex[connectionId]);
497 Server_Connected[connectionId] =
false;
502 LOG(
INFO) <<
"Event: " <<
event.event
503 <<
" on with connection ID: " << connectionId;
516 return m_latestActivityTimestamp;
525 : m_intNotifCb(nullptr), m_latestActivityTimestamp{} {
536 m_connectionId = connectionId;
537 while (contexts.size() <= m_connectionId)
538 contexts.emplace_back(
nullptr);
554 std::lock_guard<std::recursive_mutex> guard(
574 if (buf_size ==
message.size()) {
578 <<
" bytes . Expected message size " << buf_size
579 <<
" bytes , dropping the frame.";
596 LOG(
INFO) <<
"Frame socket connection closed.";
604 std::make_unique<zmq::socket_t>(*
frame_context, zmq::socket_type::pull);
611 LOG(
INFO) <<
"Frame Client Connection established.";