Program Listing for File dji_mop_handler.hpp
↰ Return to documentation for file (/tmp/ws/src/aerostack2/as2_aerial_platforms/as2_platform_dji_osdk/include/dji_mop_handler.hpp
)
#ifndef DJI_MOP_HANDLER_HPP_
#define DJI_MOP_HANDLER_HPP_
#include <mutex> // std::mutex
#include <queue> // std::queue
#include <rclcpp/rclcpp.hpp>
#include <thread>
#include "as2_core/names/topics.hpp"
#include "as2_core/node.hpp"
#include "dji_vehicle.hpp"
#include "std_msgs/msg/string.hpp"
#include "dji_linux_helpers.hpp"
#include "dji_vehicle.hpp"
#include "osdk_platform.h"
#include "osdkhal_linux.h"
#define RELIABLE_RECV_ONCE_BUFFER_SIZE (1024)
#define RELIABLE_SEND_ONCE_BUFFER_SIZE (1024)
#define MSG_DELIMITER '\r'
class DJIMopHandler {
DJI::OSDK::Vehicle* vehicle_ptr_;
as2::Node* node_ptr_;
DJI::OSDK::MopPipeline* pipeline_ = NULL;
rclcpp::Publisher<std_msgs::msg::String>::SharedPtr uplink_pub_;
rclcpp::Subscription<std_msgs::msg::String>::SharedPtr downlink_sub_;
rclcpp::Subscription<std_msgs::msg::String>::SharedPtr keep_alive_sub_;
public:
DJIMopHandler(DJI::OSDK::Vehicle* vehicle, as2::Node* node)
: vehicle_ptr_(vehicle), node_ptr_(node) {
node_ptr_->declare_parameter("channel_id", 49152);
mop_channel_id_ = node_ptr_->get_parameter("channel_id").as_int();
node_ptr_->declare_parameter("sending_retries", 3);
mop_sending_retries_ = node_ptr_->get_parameter("sending_retries").as_int();
node_ptr_->declare_parameter("read_rate", 10);
mop_read_rate_ = node_ptr_->get_parameter("read_rate").as_int();
node_ptr_->declare_parameter("write_rate", 500);
mop_write_rate_ = node_ptr_->get_parameter("write_rate").as_int();
node_ptr_->declare_parameter("reconnection_rate", 5000);
mop_reconnection_rate_ =
node_ptr_->get_parameter("reconnection_rate").as_int();
uplink_pub_ = node_ptr_->create_publisher<std_msgs::msg::String>(
"/uplink", as2_names::topics::global::qos);
// Create a custom QoS profile with the desired settings
rclcpp::QoS custom_qos_profile =
rclcpp::QoS(rclcpp::KeepLast(10)).reliable();
downlink_sub_ = node_ptr_->create_subscription<std_msgs::msg::String>(
"/downlink", custom_qos_profile,
std::bind(&DJIMopHandler::downlinkCB, this, std::placeholders::_1));
keep_alive_sub_ = node_ptr_->create_subscription<std_msgs::msg::String>(
"/keep_alive", custom_qos_profile,
std::bind(&DJIMopHandler::keepAliveCB, this, std::placeholders::_1));
static auto timer_ = node_ptr_->create_timer(
std::chrono::milliseconds(mop_reconnection_rate_), [this]() {
// Check if thread is already running to launch a new mopServer
if (mop_communication_th_.get_id() == std::thread::id()) {
RCLCPP_INFO(node_ptr_->get_logger(), "CREATING NEW MOP CHANNEL");
mop_communication_th_ = std::thread(
&DJIMopHandler::mopCommunicationFnc, this, mop_channel_id_);
}
if (mop_send_th_.get_id() == std::thread::id()) {
// RCLCPP_INFO(node_ptr_->get_logger(), "NEW SEND THREAD");
mop_send_th_ =
std::thread(&DJIMopHandler::mopSendFnc, this, mop_channel_id_);
}
// If connection closed, wait to join thread before launching a new
// one
if (closed_) {
mop_communication_th_.join();
mop_send_th_.join();
closed_ = false;
}
});
};
~DJIMopHandler() {
mop_communication_th_.join();
mop_send_th_.join();
OsdkOsal_Free(recvBuf_);
OsdkOsal_Free(sendBuf_);
pipeline_->~MopPipeline();
vehicle_ptr_->mopServer->~MopServer();
};
void downlinkCB(const std_msgs::msg::String::SharedPtr msg);
void keepAliveCB(const std_msgs::msg::String::SharedPtr msg);
void mopCommunicationFnc(int id);
void mopSendFnc(int id);
private:
bool getReady();
bool send();
void parseData(MopPipeline::DataPackType data);
std::string bytesToString(const uint8_t* data, size_t len);
std::tuple<std::vector<std::string>, std::string> checkString(
const std::string& input, char delimiter);
void publishUplink(const MopPipeline::DataPackType* dataPack);
void close();
private:
// MOP configuration parameters
int mop_channel_id_;
int mop_sending_retries_;
int mop_read_rate_;
int mop_write_rate_;
int mop_reconnection_rate_;
private:
std::queue<std::string> msg_queue_;
std::mutex queue_mtx_;
std::atomic<bool> connected_ = false; // when read msg from downlink
std::atomic<bool> closed_ = false; // when open new MOP Server
std::string status_ = "{}\r";
std::string missed_msg_ = "";
std::thread mop_communication_th_;
std::thread mop_send_th_;
uint8_t* recvBuf_;
uint8_t* sendBuf_;
MopPipeline::DataPackType readPack_;
MopPipeline::DataPackType writePack_;
};
#endif // DJI_MOP_HANDLER_HPP_