6 #include <shared_mutex> 10 #include <nlohmann/json.hpp> 11 #include <websocketpp/client.hpp> 12 #include <websocketpp/common/memory.hpp> 13 #include <websocketpp/common/thread.hpp> 30 using OpCode = websocketpp::frame::opcode::value;
35 const std::string& uri, std::function<
void(websocketpp::connection_hdl)> onOpenHandler,
36 std::function<
void(websocketpp::connection_hdl)> onCloseHandler =
nullptr) = 0;
37 virtual std::future<void>
connect(
const std::string& uri) = 0;
38 virtual void close() = 0;
41 const std::vector<std::pair<SubscriptionId, ChannelId>>& subscriptions) = 0;
42 virtual void unsubscribe(
const std::vector<SubscriptionId>& subscriptionIds) = 0;
43 virtual void advertise(
const std::vector<ClientAdvertisement>& channels) = 0;
44 virtual void unadvertise(
const std::vector<ClientChannelId>& channelIds) = 0;
47 virtual void getParameters(
const std::vector<std::string>& parameterNames,
48 const std::optional<std::string>& requestId) = 0;
49 virtual void setParameters(
const std::vector<Parameter>& parameters,
50 const std::optional<std::string>& requestId) = 0;
58 template <
typename ClientConfiguration>
61 using ClientType = websocketpp::client<ClientConfiguration>;
66 _endpoint.clear_access_channels(websocketpp::log::alevel::all);
67 _endpoint.clear_error_channels(websocketpp::log::elevel::all);
69 _endpoint.init_asio();
70 _endpoint.start_perpetual();
72 _endpoint.set_message_handler(
75 _thread.reset(
new websocketpp::lib::thread(&ClientType::run, &_endpoint));
80 _endpoint.stop_perpetual();
85 std::function<
void(websocketpp::connection_hdl)> onOpenHandler,
86 std::function<
void(websocketpp::connection_hdl)> onCloseHandler =
nullptr)
override {
87 std::unique_lock<std::shared_mutex> lock(_mutex);
89 websocketpp::lib::error_code ec;
90 _con = _endpoint.get_connection(uri, ec);
93 throw std::runtime_error(
"Failed to get connection from URI " + uri);
97 _con->set_open_handler(onOpenHandler);
100 _con->set_close_handler(onCloseHandler);
104 _endpoint.connect(_con);
107 std::future<void>
connect(
const std::string& uri)
override {
108 auto promise = std::make_shared<std::promise<void>>();
109 auto future = promise->get_future();
111 connect(uri, [p = std::move(promise)](websocketpp::connection_hdl)
mutable {
119 std::unique_lock<std::shared_mutex> lock(_mutex);
124 _endpoint.close(_con, websocketpp::close::status::going_away,
"");
130 const OpCode op = msg->get_opcode();
134 std::shared_lock<std::shared_mutex> lock(_mutex);
135 if (_textMessageHandler) {
136 _textMessageHandler(msg->get_payload());
139 case OpCode::BINARY: {
140 std::shared_lock<std::shared_mutex> lock(_mutex);
141 const auto& payload = msg->get_payload();
142 if (_binaryMessageHandler) {
143 _binaryMessageHandler(reinterpret_cast<const uint8_t*>(payload.data()), payload.size());
151 void subscribe(
const std::vector<std::pair<SubscriptionId, ChannelId>>& subscriptions)
override {
153 for (
const auto& [subId, channelId] : subscriptions) {
154 subscriptionsJson.push_back({{
"id", subId}, {
"channelId", channelId}});
157 const std::string payload =
158 nlohmann::json{{
"op",
"subscribe"}, {
"subscriptions", std::move(subscriptionsJson)}}.dump();
162 void unsubscribe(
const std::vector<SubscriptionId>& subscriptionIds)
override {
163 const std::string payload =
164 nlohmann::json{{
"op",
"unsubscribe"}, {
"subscriptionIds", subscriptionIds}}.dump();
168 void advertise(
const std::vector<ClientAdvertisement>& channels)
override {
169 const std::string payload =
nlohmann::json{{
"op",
"advertise"}, {
"channels", channels}}.dump();
173 void unadvertise(
const std::vector<ClientChannelId>& channelIds)
override {
174 const std::string payload =
175 nlohmann::json{{
"op",
"unadvertise"}, {
"channelIds", channelIds}}.dump();
180 std::vector<uint8_t> payload(1 + 4 + size);
183 std::memcpy(payload.data() + 1 + 4, buffer, size);
184 sendBinary(payload.data(), payload.size());
188 std::vector<uint8_t> payload(1 + request.
size());
190 request.
write(payload.data() + 1);
191 sendBinary(payload.data(), payload.size());
195 const std::optional<std::string>& requestId = std::nullopt)
override {
196 nlohmann::json jsonPayload{{
"op",
"getParameters"}, {
"parameterNames", parameterNames}};
198 jsonPayload[
"id"] = requestId.value();
200 sendText(jsonPayload.dump());
204 const std::optional<std::string>& requestId = std::nullopt)
override {
205 nlohmann::json jsonPayload{{
"op",
"setParameters"}, {
"parameters", parameters}};
207 jsonPayload[
"id"] = requestId.value();
209 sendText(jsonPayload.dump());
214 {
"parameterNames", parameterNames}};
215 sendText(jsonPayload.dump());
220 {
"parameterNames", parameterNames}};
221 sendText(jsonPayload.dump());
225 std::unique_lock<std::shared_mutex> lock(_mutex);
226 _textMessageHandler = std::move(handler);
230 std::unique_lock<std::shared_mutex> lock(_mutex);
231 _binaryMessageHandler = std::move(handler);
235 std::shared_lock<std::shared_mutex> lock(_mutex);
236 _endpoint.send(_con, payload, OpCode::TEXT);
240 std::shared_lock<std::shared_mutex> lock(_mutex);
241 _endpoint.send(_con, data, dataLength, OpCode::BINARY);
246 websocketpp::lib::shared_ptr<websocketpp::lib::thread>
_thread;
void subscribe(const std::vector< std::pair< SubscriptionId, ChannelId >> &subscriptions) override
virtual void subscribeParameterUpdates(const std::vector< std::string > ¶meterNames)=0
typename ClientType::connection_ptr ConnectionPtr
virtual void setParameters(const std::vector< Parameter > ¶meters, const std::optional< std::string > &requestId)=0
void setBinaryMessageHandler(BinaryMessageHandler handler) override
constexpr char SUPPORTED_SUBPROTOCOL[]
TextMessageHandler _textMessageHandler
void write(uint8_t *data) const
void to_json(nlohmann::json &j, const Channel &c)
void subscribeParameterUpdates(const std::vector< std::string > ¶meterNames) override
virtual void publish(ClientChannelId channelId, const uint8_t *buffer, size_t size)=0
void advertise(const std::vector< ClientAdvertisement > &channels) override
virtual void unsubscribe(const std::vector< SubscriptionId > &subscriptionIds)=0
virtual void sendServiceRequest(const ServiceRequest &request)=0
std::future< void > connect(const std::string &uri) override
websocketpp::lib::shared_ptr< websocketpp::lib::thread > _thread
websocketpp::client< ClientConfiguration > ClientType
virtual void getParameters(const std::vector< std::string > ¶meterNames, const std::optional< std::string > &requestId)=0
websocketpp::frame::opcode::value OpCode
virtual void connect(const std::string &uri, std::function< void(websocketpp::connection_hdl)> onOpenHandler, std::function< void(websocketpp::connection_hdl)> onCloseHandler=nullptr)=0
void connect(const std::string &uri, std::function< void(websocketpp::connection_hdl)> onOpenHandler, std::function< void(websocketpp::connection_hdl)> onCloseHandler=nullptr) override
virtual void setBinaryMessageHandler(BinaryMessageHandler handler)=0
virtual void setTextMessageHandler(TextMessageHandler handler)=0
void publish(ClientChannelId channelId, const uint8_t *buffer, size_t size) override
void unsubscribeParameterUpdates(const std::vector< std::string > ¶meterNames) override
void sendBinary(const uint8_t *data, size_t dataLength)
std::function< void(const std::string &)> TextMessageHandler
std::function< void(const uint8_t *, size_t)> BinaryMessageHandler
void unadvertise(const std::vector< ClientChannelId > &channelIds) override
void sendServiceRequest(const ServiceRequest &request) override
void getParameters(const std::vector< std::string > ¶meterNames, const std::optional< std::string > &requestId=std::nullopt) override
virtual void subscribe(const std::vector< std::pair< SubscriptionId, ChannelId >> &subscriptions)=0
virtual void advertise(const std::vector< ClientAdvertisement > &channels)=0
virtual void unadvertise(const std::vector< ClientChannelId > &channelIds)=0
void unsubscribe(const std::vector< SubscriptionId > &subscriptionIds) override
void setTextMessageHandler(TextMessageHandler handler) override
BinaryMessageHandler _binaryMessageHandler
void sendText(const std::string &payload)
void setParameters(const std::vector< Parameter > ¶meters, const std::optional< std::string > &requestId=std::nullopt) override
typename ClientType::message_ptr MessagePtr
ClientChannelId channelId
void messageHandler(websocketpp::connection_hdl hdl, MessagePtr msg)
virtual void unsubscribeParameterUpdates(const std::vector< std::string > ¶meterNames)=0
void WriteUint32LE(uint8_t *buf, uint32_t val)