6 #include <shared_mutex>
10 #include <nlohmann/json.hpp>
11 #include <websocketpp/client.hpp>
12 #include <websocketpp/common/memory.hpp>
13 #include <websocketpp/common/thread.hpp>
30 using OpCode = websocketpp::frame::opcode::value;
35 const std::string& uri, std::function<
void(websocketpp::connection_hdl)> onOpenHandler,
36 std::function<
void(websocketpp::connection_hdl)> onCloseHandler =
nullptr) = 0;
37 virtual std::future<void>
connect(
const std::string& uri) = 0;
38 virtual void close() = 0;
41 const std::vector<std::pair<SubscriptionId, ChannelId>>& subscriptions) = 0;
42 virtual void unsubscribe(
const std::vector<SubscriptionId>& subscriptionIds) = 0;
43 virtual void advertise(
const std::vector<ClientAdvertisement>& channels) = 0;
44 virtual void unadvertise(
const std::vector<ClientChannelId>& channelIds) = 0;
47 virtual void getParameters(
const std::vector<std::string>& parameterNames,
48 const std::optional<std::string>& requestId) = 0;
49 virtual void setParameters(
const std::vector<Parameter>& parameters,
50 const std::optional<std::string>& requestId) = 0;
53 virtual void fetchAsset(
const std::string& name, uint32_t requestId) = 0;
59 template <
typename ClientConfiguration>
62 using ClientType = websocketpp::client<ClientConfiguration>;
67 _endpoint.clear_access_channels(websocketpp::log::alevel::all);
68 _endpoint.clear_error_channels(websocketpp::log::elevel::all);
86 std::function<
void(websocketpp::connection_hdl)> onOpenHandler,
87 std::function<
void(websocketpp::connection_hdl)> onCloseHandler =
nullptr)
override {
88 std::unique_lock<std::shared_mutex> lock(
_mutex);
90 websocketpp::lib::error_code ec;
94 throw std::runtime_error(
"Failed to get connection from URI " + uri);
98 _con->set_open_handler(onOpenHandler);
100 if (onCloseHandler) {
101 _con->set_close_handler(onCloseHandler);
108 std::future<void>
connect(
const std::string& uri)
override {
109 auto promise = std::make_shared<std::promise<void>>();
110 auto future = promise->get_future();
112 connect(uri, [p = std::move(promise)](websocketpp::connection_hdl)
mutable {
120 std::unique_lock<std::shared_mutex> lock(
_mutex);
125 _endpoint.close(
_con, websocketpp::close::status::going_away,
"");
131 const OpCode op = msg->get_opcode();
135 std::shared_lock<std::shared_mutex> lock(
_mutex);
140 case OpCode::BINARY: {
141 std::shared_lock<std::shared_mutex> lock(
_mutex);
142 const auto& payload = msg->get_payload();
152 void subscribe(
const std::vector<std::pair<SubscriptionId, ChannelId>>& subscriptions)
override {
154 for (
const auto& [subId, channelId] : subscriptions) {
155 subscriptionsJson.push_back({{
"id", subId}, {
"channelId", channelId}});
158 const std::string payload =
159 nlohmann::json{{
"op",
"subscribe"}, {
"subscriptions", std::move(subscriptionsJson)}}.dump();
163 void unsubscribe(
const std::vector<SubscriptionId>& subscriptionIds)
override {
164 const std::string payload =
165 nlohmann::json{{
"op",
"unsubscribe"}, {
"subscriptionIds", subscriptionIds}}.dump();
169 void advertise(
const std::vector<ClientAdvertisement>& channels)
override {
170 const std::string payload =
nlohmann::json{{
"op",
"advertise"}, {
"channels", channels}}.dump();
174 void unadvertise(
const std::vector<ClientChannelId>& channelIds)
override {
175 const std::string payload =
176 nlohmann::json{{
"op",
"unadvertise"}, {
"channelIds", channelIds}}.dump();
181 std::vector<uint8_t> payload(1 + 4 + size);
184 std::memcpy(payload.data() + 1 + 4, buffer, size);
189 std::vector<uint8_t> payload(1 + request.
size());
191 request.
write(payload.data() + 1);
196 const std::optional<std::string>& requestId = std::nullopt)
override {
197 nlohmann::json jsonPayload{{
"op",
"getParameters"}, {
"parameterNames", parameterNames}};
199 jsonPayload[
"id"] = requestId.value();
205 const std::optional<std::string>& requestId = std::nullopt)
override {
206 nlohmann::json jsonPayload{{
"op",
"setParameters"}, {
"parameters", parameters}};
208 jsonPayload[
"id"] = requestId.value();
215 {
"parameterNames", parameterNames}};
221 {
"parameterNames", parameterNames}};
225 void fetchAsset(
const std::string& uri, uint32_t requestId)
override {
226 nlohmann::json jsonPayload{{
"op",
"fetchAsset"}, {
"uri", uri}, {
"requestId", requestId}};
231 std::unique_lock<std::shared_mutex> lock(
_mutex);
236 std::unique_lock<std::shared_mutex> lock(
_mutex);
241 std::shared_lock<std::shared_mutex> lock(
_mutex);
246 std::shared_lock<std::shared_mutex> lock(
_mutex);
252 websocketpp::lib::shared_ptr<websocketpp::lib::thread>
_thread;