websocket_client.hpp
Go to the documentation of this file.
1 #pragma once
2 
3 #include <functional>
4 #include <future>
5 #include <optional>
6 #include <shared_mutex>
7 #include <utility>
8 #include <vector>
9 
10 #include <nlohmann/json.hpp>
11 #include <websocketpp/client.hpp>
12 #include <websocketpp/common/memory.hpp>
13 #include <websocketpp/common/thread.hpp>
14 
15 #include "common.hpp"
16 #include "parameter.hpp"
17 #include "serialization.hpp"
18 
19 namespace foxglove {
20 
21 inline void to_json(nlohmann::json& j, const ClientAdvertisement& p) {
22  j = nlohmann::json{{"id", p.channelId},
23  {"topic", p.topic},
24  {"encoding", p.encoding},
25  {"schemaName", p.schemaName}};
26 }
27 
28 using TextMessageHandler = std::function<void(const std::string&)>;
29 using BinaryMessageHandler = std::function<void(const uint8_t*, size_t)>;
30 using OpCode = websocketpp::frame::opcode::value;
31 
33 public:
34  virtual void connect(
35  const std::string& uri, std::function<void(websocketpp::connection_hdl)> onOpenHandler,
36  std::function<void(websocketpp::connection_hdl)> onCloseHandler = nullptr) = 0;
37  virtual std::future<void> connect(const std::string& uri) = 0;
38  virtual void close() = 0;
39 
40  virtual void subscribe(
41  const std::vector<std::pair<SubscriptionId, ChannelId>>& subscriptions) = 0;
42  virtual void unsubscribe(const std::vector<SubscriptionId>& subscriptionIds) = 0;
43  virtual void advertise(const std::vector<ClientAdvertisement>& channels) = 0;
44  virtual void unadvertise(const std::vector<ClientChannelId>& channelIds) = 0;
45  virtual void publish(ClientChannelId channelId, const uint8_t* buffer, size_t size) = 0;
46  virtual void sendServiceRequest(const ServiceRequest& request) = 0;
47  virtual void getParameters(const std::vector<std::string>& parameterNames,
48  const std::optional<std::string>& requestId) = 0;
49  virtual void setParameters(const std::vector<Parameter>& parameters,
50  const std::optional<std::string>& requestId) = 0;
51  virtual void subscribeParameterUpdates(const std::vector<std::string>& parameterNames) = 0;
52  virtual void unsubscribeParameterUpdates(const std::vector<std::string>& parameterNames) = 0;
53 
54  virtual void setTextMessageHandler(TextMessageHandler handler) = 0;
55  virtual void setBinaryMessageHandler(BinaryMessageHandler handler) = 0;
56 };
57 
58 template <typename ClientConfiguration>
59 class Client : public ClientInterface {
60 public:
61  using ClientType = websocketpp::client<ClientConfiguration>;
62  using MessagePtr = typename ClientType::message_ptr;
63  using ConnectionPtr = typename ClientType::connection_ptr;
64 
65  Client() {
66  _endpoint.clear_access_channels(websocketpp::log::alevel::all);
67  _endpoint.clear_error_channels(websocketpp::log::elevel::all);
68 
69  _endpoint.init_asio();
70  _endpoint.start_perpetual();
71 
72  _endpoint.set_message_handler(
73  bind(&Client::messageHandler, this, std::placeholders::_1, std::placeholders::_2));
74 
75  _thread.reset(new websocketpp::lib::thread(&ClientType::run, &_endpoint));
76  }
77 
78  virtual ~Client() {
79  close();
80  _endpoint.stop_perpetual();
81  _thread->join();
82  }
83 
84  void connect(const std::string& uri,
85  std::function<void(websocketpp::connection_hdl)> onOpenHandler,
86  std::function<void(websocketpp::connection_hdl)> onCloseHandler = nullptr) override {
87  std::unique_lock<std::shared_mutex> lock(_mutex);
88 
89  websocketpp::lib::error_code ec;
90  _con = _endpoint.get_connection(uri, ec);
91 
92  if (ec) {
93  throw std::runtime_error("Failed to get connection from URI " + uri);
94  }
95 
96  if (onOpenHandler) {
97  _con->set_open_handler(onOpenHandler);
98  }
99  if (onCloseHandler) {
100  _con->set_close_handler(onCloseHandler);
101  }
102 
103  _con->add_subprotocol(SUPPORTED_SUBPROTOCOL);
104  _endpoint.connect(_con);
105  }
106 
107  std::future<void> connect(const std::string& uri) override {
108  auto promise = std::make_shared<std::promise<void>>();
109  auto future = promise->get_future();
110 
111  connect(uri, [p = std::move(promise)](websocketpp::connection_hdl) mutable {
112  p->set_value();
113  });
114 
115  return future;
116  }
117 
118  void close() override {
119  std::unique_lock<std::shared_mutex> lock(_mutex);
120  if (!_con) {
121  return; // Already disconnected
122  }
123 
124  _endpoint.close(_con, websocketpp::close::status::going_away, "");
125  _con.reset();
126  }
127 
128  void messageHandler(websocketpp::connection_hdl hdl, MessagePtr msg) {
129  (void)hdl;
130  const OpCode op = msg->get_opcode();
131 
132  switch (op) {
133  case OpCode::TEXT: {
134  std::shared_lock<std::shared_mutex> lock(_mutex);
135  if (_textMessageHandler) {
136  _textMessageHandler(msg->get_payload());
137  }
138  } break;
139  case OpCode::BINARY: {
140  std::shared_lock<std::shared_mutex> lock(_mutex);
141  const auto& payload = msg->get_payload();
142  if (_binaryMessageHandler) {
143  _binaryMessageHandler(reinterpret_cast<const uint8_t*>(payload.data()), payload.size());
144  }
145  } break;
146  default:
147  break;
148  }
149  }
150 
151  void subscribe(const std::vector<std::pair<SubscriptionId, ChannelId>>& subscriptions) override {
152  nlohmann::json subscriptionsJson;
153  for (const auto& [subId, channelId] : subscriptions) {
154  subscriptionsJson.push_back({{"id", subId}, {"channelId", channelId}});
155  }
156 
157  const std::string payload =
158  nlohmann::json{{"op", "subscribe"}, {"subscriptions", std::move(subscriptionsJson)}}.dump();
159  sendText(payload);
160  }
161 
162  void unsubscribe(const std::vector<SubscriptionId>& subscriptionIds) override {
163  const std::string payload =
164  nlohmann::json{{"op", "unsubscribe"}, {"subscriptionIds", subscriptionIds}}.dump();
165  sendText(payload);
166  }
167 
168  void advertise(const std::vector<ClientAdvertisement>& channels) override {
169  const std::string payload = nlohmann::json{{"op", "advertise"}, {"channels", channels}}.dump();
170  sendText(payload);
171  }
172 
173  void unadvertise(const std::vector<ClientChannelId>& channelIds) override {
174  const std::string payload =
175  nlohmann::json{{"op", "unadvertise"}, {"channelIds", channelIds}}.dump();
176  sendText(payload);
177  }
178 
179  void publish(ClientChannelId channelId, const uint8_t* buffer, size_t size) override {
180  std::vector<uint8_t> payload(1 + 4 + size);
181  payload[0] = uint8_t(ClientBinaryOpcode::MESSAGE_DATA);
182  foxglove::WriteUint32LE(payload.data() + 1, channelId);
183  std::memcpy(payload.data() + 1 + 4, buffer, size);
184  sendBinary(payload.data(), payload.size());
185  }
186 
187  void sendServiceRequest(const ServiceRequest& request) override {
188  std::vector<uint8_t> payload(1 + request.size());
189  payload[0] = uint8_t(ClientBinaryOpcode::SERVICE_CALL_REQUEST);
190  request.write(payload.data() + 1);
191  sendBinary(payload.data(), payload.size());
192  }
193 
194  void getParameters(const std::vector<std::string>& parameterNames,
195  const std::optional<std::string>& requestId = std::nullopt) override {
196  nlohmann::json jsonPayload{{"op", "getParameters"}, {"parameterNames", parameterNames}};
197  if (requestId) {
198  jsonPayload["id"] = requestId.value();
199  }
200  sendText(jsonPayload.dump());
201  }
202 
203  void setParameters(const std::vector<Parameter>& parameters,
204  const std::optional<std::string>& requestId = std::nullopt) override {
205  nlohmann::json jsonPayload{{"op", "setParameters"}, {"parameters", parameters}};
206  if (requestId) {
207  jsonPayload["id"] = requestId.value();
208  }
209  sendText(jsonPayload.dump());
210  }
211 
212  void subscribeParameterUpdates(const std::vector<std::string>& parameterNames) override {
213  nlohmann::json jsonPayload{{"op", "subscribeParameterUpdates"},
214  {"parameterNames", parameterNames}};
215  sendText(jsonPayload.dump());
216  }
217 
218  void unsubscribeParameterUpdates(const std::vector<std::string>& parameterNames) override {
219  nlohmann::json jsonPayload{{"op", "unsubscribeParameterUpdates"},
220  {"parameterNames", parameterNames}};
221  sendText(jsonPayload.dump());
222  }
223 
224  void setTextMessageHandler(TextMessageHandler handler) override {
225  std::unique_lock<std::shared_mutex> lock(_mutex);
226  _textMessageHandler = std::move(handler);
227  }
228 
230  std::unique_lock<std::shared_mutex> lock(_mutex);
231  _binaryMessageHandler = std::move(handler);
232  }
233 
234  void sendText(const std::string& payload) {
235  std::shared_lock<std::shared_mutex> lock(_mutex);
236  _endpoint.send(_con, payload, OpCode::TEXT);
237  }
238 
239  void sendBinary(const uint8_t* data, size_t dataLength) {
240  std::shared_lock<std::shared_mutex> lock(_mutex);
241  _endpoint.send(_con, data, dataLength, OpCode::BINARY);
242  }
243 
244 protected:
246  websocketpp::lib::shared_ptr<websocketpp::lib::thread> _thread;
248  std::shared_mutex _mutex;
251 };
252 
253 } // namespace foxglove
size_t size() const
Definition: common.hpp:134
void subscribe(const std::vector< std::pair< SubscriptionId, ChannelId >> &subscriptions) override
virtual void subscribeParameterUpdates(const std::vector< std::string > &parameterNames)=0
typename ClientType::connection_ptr ConnectionPtr
virtual void setParameters(const std::vector< Parameter > &parameters, const std::optional< std::string > &requestId)=0
std::shared_mutex _mutex
void setBinaryMessageHandler(BinaryMessageHandler handler) override
constexpr char SUPPORTED_SUBPROTOCOL[]
Definition: common.hpp:12
virtual void close()=0
TextMessageHandler _textMessageHandler
void write(uint8_t *data) const
void to_json(nlohmann::json &j, const Channel &c)
nlohmann::json json
void subscribeParameterUpdates(const std::vector< std::string > &parameterNames) override
virtual void publish(ClientChannelId channelId, const uint8_t *buffer, size_t size)=0
void advertise(const std::vector< ClientAdvertisement > &channels) override
virtual void unsubscribe(const std::vector< SubscriptionId > &subscriptionIds)=0
virtual void sendServiceRequest(const ServiceRequest &request)=0
std::future< void > connect(const std::string &uri) override
websocketpp::lib::shared_ptr< websocketpp::lib::thread > _thread
websocketpp::client< ClientConfiguration > ClientType
virtual void getParameters(const std::vector< std::string > &parameterNames, const std::optional< std::string > &requestId)=0
void close() override
websocketpp::frame::opcode::value OpCode
virtual void connect(const std::string &uri, std::function< void(websocketpp::connection_hdl)> onOpenHandler, std::function< void(websocketpp::connection_hdl)> onCloseHandler=nullptr)=0
void connect(const std::string &uri, std::function< void(websocketpp::connection_hdl)> onOpenHandler, std::function< void(websocketpp::connection_hdl)> onCloseHandler=nullptr) override
uint32_t ClientChannelId
Definition: common.hpp:26
virtual void setBinaryMessageHandler(BinaryMessageHandler handler)=0
virtual void setTextMessageHandler(TextMessageHandler handler)=0
void publish(ClientChannelId channelId, const uint8_t *buffer, size_t size) override
void unsubscribeParameterUpdates(const std::vector< std::string > &parameterNames) override
void sendBinary(const uint8_t *data, size_t dataLength)
std::function< void(const std::string &)> TextMessageHandler
std::function< void(const uint8_t *, size_t)> BinaryMessageHandler
void unadvertise(const std::vector< ClientChannelId > &channelIds) override
void sendServiceRequest(const ServiceRequest &request) override
void getParameters(const std::vector< std::string > &parameterNames, const std::optional< std::string > &requestId=std::nullopt) override
virtual void subscribe(const std::vector< std::pair< SubscriptionId, ChannelId >> &subscriptions)=0
virtual void advertise(const std::vector< ClientAdvertisement > &channels)=0
virtual void unadvertise(const std::vector< ClientChannelId > &channelIds)=0
void unsubscribe(const std::vector< SubscriptionId > &subscriptionIds) override
void setTextMessageHandler(TextMessageHandler handler) override
BinaryMessageHandler _binaryMessageHandler
void sendText(const std::string &payload)
void setParameters(const std::vector< Parameter > &parameters, const std::optional< std::string > &requestId=std::nullopt) override
typename ClientType::message_ptr MessagePtr
ClientChannelId channelId
Definition: common.hpp:76
void messageHandler(websocketpp::connection_hdl hdl, MessagePtr msg)
virtual void unsubscribeParameterUpdates(const std::vector< std::string > &parameterNames)=0
void WriteUint32LE(uint8_t *buf, uint32_t val)


foxglove_bridge
Author(s): Foxglove
autogenerated on Mon Jul 3 2023 02:12:22