websocket_client.hpp
Go to the documentation of this file.
1 #pragma once
2 
3 #include <functional>
4 #include <future>
5 #include <optional>
6 #include <shared_mutex>
7 #include <utility>
8 #include <vector>
9 
10 #include <nlohmann/json.hpp>
11 #include <websocketpp/client.hpp>
12 #include <websocketpp/common/memory.hpp>
13 #include <websocketpp/common/thread.hpp>
14 
15 #include "common.hpp"
16 #include "parameter.hpp"
17 #include "serialization.hpp"
18 
19 namespace foxglove {
20 
21 inline void to_json(nlohmann::json& j, const ClientAdvertisement& p) {
22  j = nlohmann::json{{"id", p.channelId},
23  {"topic", p.topic},
24  {"encoding", p.encoding},
25  {"schemaName", p.schemaName}};
26 }
27 
28 using TextMessageHandler = std::function<void(const std::string&)>;
29 using BinaryMessageHandler = std::function<void(const uint8_t*, size_t)>;
30 using OpCode = websocketpp::frame::opcode::value;
31 
33 public:
34  virtual void connect(
35  const std::string& uri, std::function<void(websocketpp::connection_hdl)> onOpenHandler,
36  std::function<void(websocketpp::connection_hdl)> onCloseHandler = nullptr) = 0;
37  virtual std::future<void> connect(const std::string& uri) = 0;
38  virtual void close() = 0;
39 
40  virtual void subscribe(
41  const std::vector<std::pair<SubscriptionId, ChannelId>>& subscriptions) = 0;
42  virtual void unsubscribe(const std::vector<SubscriptionId>& subscriptionIds) = 0;
43  virtual void advertise(const std::vector<ClientAdvertisement>& channels) = 0;
44  virtual void unadvertise(const std::vector<ClientChannelId>& channelIds) = 0;
45  virtual void publish(ClientChannelId channelId, const uint8_t* buffer, size_t size) = 0;
46  virtual void sendServiceRequest(const ServiceRequest& request) = 0;
47  virtual void getParameters(const std::vector<std::string>& parameterNames,
48  const std::optional<std::string>& requestId) = 0;
49  virtual void setParameters(const std::vector<Parameter>& parameters,
50  const std::optional<std::string>& requestId) = 0;
51  virtual void subscribeParameterUpdates(const std::vector<std::string>& parameterNames) = 0;
52  virtual void unsubscribeParameterUpdates(const std::vector<std::string>& parameterNames) = 0;
53  virtual void fetchAsset(const std::string& name, uint32_t requestId) = 0;
54 
55  virtual void setTextMessageHandler(TextMessageHandler handler) = 0;
56  virtual void setBinaryMessageHandler(BinaryMessageHandler handler) = 0;
57 };
58 
59 template <typename ClientConfiguration>
60 class Client : public ClientInterface {
61 public:
62  using ClientType = websocketpp::client<ClientConfiguration>;
63  using MessagePtr = typename ClientType::message_ptr;
64  using ConnectionPtr = typename ClientType::connection_ptr;
65 
66  Client() {
67  _endpoint.clear_access_channels(websocketpp::log::alevel::all);
68  _endpoint.clear_error_channels(websocketpp::log::elevel::all);
69 
70  _endpoint.init_asio();
71  _endpoint.start_perpetual();
72 
73  _endpoint.set_message_handler(
74  bind(&Client::messageHandler, this, std::placeholders::_1, std::placeholders::_2));
75 
76  _thread.reset(new websocketpp::lib::thread(&ClientType::run, &_endpoint));
77  }
78 
79  virtual ~Client() {
80  close();
81  _endpoint.stop_perpetual();
82  _thread->join();
83  }
84 
85  void connect(const std::string& uri,
86  std::function<void(websocketpp::connection_hdl)> onOpenHandler,
87  std::function<void(websocketpp::connection_hdl)> onCloseHandler = nullptr) override {
88  std::unique_lock<std::shared_mutex> lock(_mutex);
89 
90  websocketpp::lib::error_code ec;
91  _con = _endpoint.get_connection(uri, ec);
92 
93  if (ec) {
94  throw std::runtime_error("Failed to get connection from URI " + uri);
95  }
96 
97  if (onOpenHandler) {
98  _con->set_open_handler(onOpenHandler);
99  }
100  if (onCloseHandler) {
101  _con->set_close_handler(onCloseHandler);
102  }
103 
104  _con->add_subprotocol(SUPPORTED_SUBPROTOCOL);
105  _endpoint.connect(_con);
106  }
107 
108  std::future<void> connect(const std::string& uri) override {
109  auto promise = std::make_shared<std::promise<void>>();
110  auto future = promise->get_future();
111 
112  connect(uri, [p = std::move(promise)](websocketpp::connection_hdl) mutable {
113  p->set_value();
114  });
115 
116  return future;
117  }
118 
119  void close() override {
120  std::unique_lock<std::shared_mutex> lock(_mutex);
121  if (!_con) {
122  return; // Already disconnected
123  }
124 
125  _endpoint.close(_con, websocketpp::close::status::going_away, "");
126  _con.reset();
127  }
128 
129  void messageHandler(websocketpp::connection_hdl hdl, MessagePtr msg) {
130  (void)hdl;
131  const OpCode op = msg->get_opcode();
132 
133  switch (op) {
134  case OpCode::TEXT: {
135  std::shared_lock<std::shared_mutex> lock(_mutex);
136  if (_textMessageHandler) {
137  _textMessageHandler(msg->get_payload());
138  }
139  } break;
140  case OpCode::BINARY: {
141  std::shared_lock<std::shared_mutex> lock(_mutex);
142  const auto& payload = msg->get_payload();
143  if (_binaryMessageHandler) {
144  _binaryMessageHandler(reinterpret_cast<const uint8_t*>(payload.data()), payload.size());
145  }
146  } break;
147  default:
148  break;
149  }
150  }
151 
152  void subscribe(const std::vector<std::pair<SubscriptionId, ChannelId>>& subscriptions) override {
153  nlohmann::json subscriptionsJson;
154  for (const auto& [subId, channelId] : subscriptions) {
155  subscriptionsJson.push_back({{"id", subId}, {"channelId", channelId}});
156  }
157 
158  const std::string payload =
159  nlohmann::json{{"op", "subscribe"}, {"subscriptions", std::move(subscriptionsJson)}}.dump();
160  sendText(payload);
161  }
162 
163  void unsubscribe(const std::vector<SubscriptionId>& subscriptionIds) override {
164  const std::string payload =
165  nlohmann::json{{"op", "unsubscribe"}, {"subscriptionIds", subscriptionIds}}.dump();
166  sendText(payload);
167  }
168 
169  void advertise(const std::vector<ClientAdvertisement>& channels) override {
170  const std::string payload = nlohmann::json{{"op", "advertise"}, {"channels", channels}}.dump();
171  sendText(payload);
172  }
173 
174  void unadvertise(const std::vector<ClientChannelId>& channelIds) override {
175  const std::string payload =
176  nlohmann::json{{"op", "unadvertise"}, {"channelIds", channelIds}}.dump();
177  sendText(payload);
178  }
179 
180  void publish(ClientChannelId channelId, const uint8_t* buffer, size_t size) override {
181  std::vector<uint8_t> payload(1 + 4 + size);
182  payload[0] = uint8_t(ClientBinaryOpcode::MESSAGE_DATA);
183  foxglove::WriteUint32LE(payload.data() + 1, channelId);
184  std::memcpy(payload.data() + 1 + 4, buffer, size);
185  sendBinary(payload.data(), payload.size());
186  }
187 
188  void sendServiceRequest(const ServiceRequest& request) override {
189  std::vector<uint8_t> payload(1 + request.size());
190  payload[0] = uint8_t(ClientBinaryOpcode::SERVICE_CALL_REQUEST);
191  request.write(payload.data() + 1);
192  sendBinary(payload.data(), payload.size());
193  }
194 
195  void getParameters(const std::vector<std::string>& parameterNames,
196  const std::optional<std::string>& requestId = std::nullopt) override {
197  nlohmann::json jsonPayload{{"op", "getParameters"}, {"parameterNames", parameterNames}};
198  if (requestId) {
199  jsonPayload["id"] = requestId.value();
200  }
201  sendText(jsonPayload.dump());
202  }
203 
204  void setParameters(const std::vector<Parameter>& parameters,
205  const std::optional<std::string>& requestId = std::nullopt) override {
206  nlohmann::json jsonPayload{{"op", "setParameters"}, {"parameters", parameters}};
207  if (requestId) {
208  jsonPayload["id"] = requestId.value();
209  }
210  sendText(jsonPayload.dump());
211  }
212 
213  void subscribeParameterUpdates(const std::vector<std::string>& parameterNames) override {
214  nlohmann::json jsonPayload{{"op", "subscribeParameterUpdates"},
215  {"parameterNames", parameterNames}};
216  sendText(jsonPayload.dump());
217  }
218 
219  void unsubscribeParameterUpdates(const std::vector<std::string>& parameterNames) override {
220  nlohmann::json jsonPayload{{"op", "unsubscribeParameterUpdates"},
221  {"parameterNames", parameterNames}};
222  sendText(jsonPayload.dump());
223  }
224 
225  void fetchAsset(const std::string& uri, uint32_t requestId) override {
226  nlohmann::json jsonPayload{{"op", "fetchAsset"}, {"uri", uri}, {"requestId", requestId}};
227  sendText(jsonPayload.dump());
228  }
229 
230  void setTextMessageHandler(TextMessageHandler handler) override {
231  std::unique_lock<std::shared_mutex> lock(_mutex);
232  _textMessageHandler = std::move(handler);
233  }
234 
236  std::unique_lock<std::shared_mutex> lock(_mutex);
237  _binaryMessageHandler = std::move(handler);
238  }
239 
240  void sendText(const std::string& payload) {
241  std::shared_lock<std::shared_mutex> lock(_mutex);
242  _endpoint.send(_con, payload, OpCode::TEXT);
243  }
244 
245  void sendBinary(const uint8_t* data, size_t dataLength) {
246  std::shared_lock<std::shared_mutex> lock(_mutex);
247  _endpoint.send(_con, data, dataLength, OpCode::BINARY);
248  }
249 
250 protected:
252  websocketpp::lib::shared_ptr<websocketpp::lib::thread> _thread;
254  std::shared_mutex _mutex;
257 };
258 
259 } // namespace foxglove
foxglove::ClientInterface::unadvertise
virtual void unadvertise(const std::vector< ClientChannelId > &channelIds)=0
foxglove::Client::_con
ConnectionPtr _con
Definition: websocket_client.hpp:253
foxglove::Client::advertise
void advertise(const std::vector< ClientAdvertisement > &channels) override
Definition: websocket_client.hpp:169
foxglove::Client::subscribeParameterUpdates
void subscribeParameterUpdates(const std::vector< std::string > &parameterNames) override
Definition: websocket_client.hpp:213
foxglove::Client
Definition: websocket_client.hpp:60
foxglove::Client::unsubscribe
void unsubscribe(const std::vector< SubscriptionId > &subscriptionIds) override
Definition: websocket_client.hpp:163
foxglove::Client::_mutex
std::shared_mutex _mutex
Definition: websocket_client.hpp:254
foxglove
Definition: base64.hpp:8
foxglove::Client::setTextMessageHandler
void setTextMessageHandler(TextMessageHandler handler) override
Definition: websocket_client.hpp:230
foxglove::ClientAdvertisement::topic
std::string topic
Definition: common.hpp:79
foxglove::ClientInterface::setBinaryMessageHandler
virtual void setBinaryMessageHandler(BinaryMessageHandler handler)=0
foxglove::ClientChannelId
uint32_t ClientChannelId
Definition: common.hpp:27
foxglove::Client::sendText
void sendText(const std::string &payload)
Definition: websocket_client.hpp:240
foxglove::ClientBinaryOpcode::SERVICE_CALL_REQUEST
@ SERVICE_CALL_REQUEST
foxglove::Client::MessagePtr
typename ClientType::message_ptr MessagePtr
Definition: websocket_client.hpp:63
foxglove::TextMessageHandler
std::function< void(const std::string &)> TextMessageHandler
Definition: websocket_client.hpp:28
common.hpp
foxglove::ClientInterface::advertise
virtual void advertise(const std::vector< ClientAdvertisement > &channels)=0
foxglove::ServiceResponse
Definition: common.hpp:130
foxglove::Client::close
void close() override
Definition: websocket_client.hpp:119
foxglove::ClientInterface::fetchAsset
virtual void fetchAsset(const std::string &name, uint32_t requestId)=0
foxglove::Client::fetchAsset
void fetchAsset(const std::string &uri, uint32_t requestId) override
Definition: websocket_client.hpp:225
foxglove::ClientInterface::connect
virtual void connect(const std::string &uri, std::function< void(websocketpp::connection_hdl)> onOpenHandler, std::function< void(websocketpp::connection_hdl)> onCloseHandler=nullptr)=0
foxglove::Client::_endpoint
ClientType _endpoint
Definition: websocket_client.hpp:251
foxglove::Client::unsubscribeParameterUpdates
void unsubscribeParameterUpdates(const std::vector< std::string > &parameterNames) override
Definition: websocket_client.hpp:219
foxglove::ServiceResponse::size
size_t size() const
Definition: common.hpp:136
foxglove::Client::sendBinary
void sendBinary(const uint8_t *data, size_t dataLength)
Definition: websocket_client.hpp:245
foxglove::Client::ClientType
websocketpp::client< ClientConfiguration > ClientType
Definition: websocket_client.hpp:62
foxglove::Client::subscribe
void subscribe(const std::vector< std::pair< SubscriptionId, ChannelId >> &subscriptions) override
Definition: websocket_client.hpp:152
foxglove::Client::unadvertise
void unadvertise(const std::vector< ClientChannelId > &channelIds) override
Definition: websocket_client.hpp:174
foxglove::ClientInterface::close
virtual void close()=0
foxglove::Client::ConnectionPtr
typename ClientType::connection_ptr ConnectionPtr
Definition: websocket_client.hpp:64
foxglove::ClientInterface::setParameters
virtual void setParameters(const std::vector< Parameter > &parameters, const std::optional< std::string > &requestId)=0
serialization.hpp
foxglove::ClientInterface
Definition: websocket_client.hpp:32
foxglove::ClientInterface::unsubscribe
virtual void unsubscribe(const std::vector< SubscriptionId > &subscriptionIds)=0
foxglove::SUPPORTED_SUBPROTOCOL
constexpr char SUPPORTED_SUBPROTOCOL[]
Definition: common.hpp:12
foxglove::ClientAdvertisement::channelId
ClientChannelId channelId
Definition: common.hpp:78
foxglove::Client::_textMessageHandler
TextMessageHandler _textMessageHandler
Definition: websocket_client.hpp:255
foxglove::Client::_thread
websocketpp::lib::shared_ptr< websocketpp::lib::thread > _thread
Definition: websocket_client.hpp:252
foxglove::ClientInterface::getParameters
virtual void getParameters(const std::vector< std::string > &parameterNames, const std::optional< std::string > &requestId)=0
foxglove::ClientAdvertisement
Definition: common.hpp:77
foxglove::json
nlohmann::json json
Definition: websocket_server.hpp:66
foxglove::Client::connect
std::future< void > connect(const std::string &uri) override
Definition: websocket_client.hpp:108
foxglove::Client::_binaryMessageHandler
BinaryMessageHandler _binaryMessageHandler
Definition: websocket_client.hpp:256
foxglove::WriteUint32LE
void WriteUint32LE(uint8_t *buf, uint32_t val)
Definition: serialization.hpp:27
foxglove::Client::sendServiceRequest
void sendServiceRequest(const ServiceRequest &request) override
Definition: websocket_client.hpp:188
foxglove::to_json
void to_json(nlohmann::json &j, const Channel &c)
Definition: serialization.cpp:6
foxglove::ClientInterface::unsubscribeParameterUpdates
virtual void unsubscribeParameterUpdates(const std::vector< std::string > &parameterNames)=0
foxglove::Client::setParameters
void setParameters(const std::vector< Parameter > &parameters, const std::optional< std::string > &requestId=std::nullopt) override
Definition: websocket_client.hpp:204
foxglove::Client::getParameters
void getParameters(const std::vector< std::string > &parameterNames, const std::optional< std::string > &requestId=std::nullopt) override
Definition: websocket_client.hpp:195
foxglove::ClientInterface::subscribeParameterUpdates
virtual void subscribeParameterUpdates(const std::vector< std::string > &parameterNames)=0
foxglove::Client::~Client
virtual ~Client()
Definition: websocket_client.hpp:79
foxglove::ClientInterface::sendServiceRequest
virtual void sendServiceRequest(const ServiceRequest &request)=0
parameter.hpp
foxglove::ClientBinaryOpcode::MESSAGE_DATA
@ MESSAGE_DATA
foxglove::ClientAdvertisement::encoding
std::string encoding
Definition: common.hpp:80
foxglove::ClientInterface::subscribe
virtual void subscribe(const std::vector< std::pair< SubscriptionId, ChannelId >> &subscriptions)=0
foxglove::ClientAdvertisement::schemaName
std::string schemaName
Definition: common.hpp:81
foxglove::Client::connect
void connect(const std::string &uri, std::function< void(websocketpp::connection_hdl)> onOpenHandler, std::function< void(websocketpp::connection_hdl)> onCloseHandler=nullptr) override
Definition: websocket_client.hpp:85
foxglove::Client::publish
void publish(ClientChannelId channelId, const uint8_t *buffer, size_t size) override
Definition: websocket_client.hpp:180
foxglove::BinaryMessageHandler
std::function< void(const uint8_t *, size_t)> BinaryMessageHandler
Definition: websocket_client.hpp:29
foxglove::ClientInterface::publish
virtual void publish(ClientChannelId channelId, const uint8_t *buffer, size_t size)=0
foxglove::Client::messageHandler
void messageHandler(websocketpp::connection_hdl hdl, MessagePtr msg)
Definition: websocket_client.hpp:129
foxglove::ClientInterface::setTextMessageHandler
virtual void setTextMessageHandler(TextMessageHandler handler)=0
foxglove::Client::Client
Client()
Definition: websocket_client.hpp:66
foxglove::ServiceResponse::write
void write(uint8_t *data) const
Definition: serialization.cpp:158
foxglove::OpCode
websocketpp::frame::opcode::value OpCode
Definition: websocket_client.hpp:30
foxglove::Client::setBinaryMessageHandler
void setBinaryMessageHandler(BinaryMessageHandler handler) override
Definition: websocket_client.hpp:235


foxglove_bridge
Author(s): Foxglove
autogenerated on Wed Mar 5 2025 03:34:31