builtin_server_impl.cpp
Go to the documentation of this file.
1 
11 #include "builtin_server_impl.h"
12 
16 
17 #include <iostream>
18 
19 using namespace OpcUa::Impl;
20 
21 
23 {
24 public:
25  explicit BufferedInput(const Common::Logger::SharedPtr & logger);
26  virtual std::size_t Receive(char * data, std::size_t size);
27  void AddBuffer(const char * buf, std::size_t size);
28  void Stop();
29 
30 private:
31  void ThrowIfStopped();
32 
33 private:
34  std::vector<char> Buffer;
35  std::atomic<bool> Running;
36  std::mutex BufferMutex;
37  std::condition_variable DataReady;
38  Common::Logger::SharedPtr Logger;
39 };
40 
41 
42 BufferedInput::BufferedInput(const Common::Logger::SharedPtr & logger)
43  : Running(true)
44  , Logger(logger)
45 {
46  Buffer.reserve(4096);
47 }
48 
49 std::size_t BufferedInput::Receive(char * data, std::size_t size)
50 {
51  LOG_DEBUG(Logger, "consuming {} bytes of data", size);
52 
54 
55 
56  std::size_t totalConsumedSize = 0;
57 
58  while (totalConsumedSize < size)
59  {
60  std::unique_lock<std::mutex> event(BufferMutex);
61 
62  if (Buffer.empty())
63  {
64  LOG_DEBUG(Logger, "waiting for client data");
65 
66  DataReady.wait(event);
67  }
68 
69  else if (!event.owns_lock())
70  {
71  event.lock();
72  }
73 
74  LOG_DEBUG(Logger, "buffer contains client data");
75 
77 
78  LOG_DEBUG(Logger, "client sent data");
79 
81 
82  if (Buffer.empty())
83  {
84  LOG_DEBUG(Logger, "buffer is empty");
85  continue;
86  }
87 
88  const std::size_t sizeToConsume = std::min(size - totalConsumedSize, Buffer.size());
89 
90  LOG_DEBUG(Logger, "consuming {} bytes of data", sizeToConsume);
91 
92  auto endIt = Buffer.begin() + sizeToConsume;
93  std::copy(begin(Buffer), endIt, data + totalConsumedSize);
94  Buffer.erase(Buffer.begin(), endIt); // TODO make behavior with round buffer to avoid this.
95  totalConsumedSize += sizeToConsume;
96  }
97 
98  return totalConsumedSize;
99 }
100 
101 
102 void BufferedInput::AddBuffer(const char * buf, std::size_t size)
103 {
104  ThrowIfStopped();
105 
106  LOG_DEBUG(Logger, "client wants to send {} bytes of data", size);
107 
108  std::lock_guard<std::mutex> lock(BufferMutex);
109  ThrowIfStopped();
110 
111  Buffer.insert(Buffer.end(), buf, buf + size);
112 
113  LOG_DEBUG(Logger, "size of buffer is {} bytes", Buffer.size());
114 
115  DataReady.notify_all();
116 }
117 
119 {
120  Running = false;
121  DataReady.notify_all();
122 }
123 
125 {
126  if (!Running)
127  {
128  throw std::logic_error("conversation through connection stopped");
129  }
130 }
131 
132 
133 namespace
134 {
135 
136 class BufferedIO : public OpcUa::IOChannel
137 {
138 public:
139  BufferedIO(const char * channelId, std::weak_ptr<InputChannel> input, std::weak_ptr<BufferedInput> output, const Common::Logger::SharedPtr & logger)
140  : Input(input)
141  , Output(output)
142  , Id(channelId)
143  , Logger(logger)
144  {
145  }
146 
147  virtual std::size_t Receive(char * data, std::size_t size)
148  {
149  LOG_DEBUG(Logger, "{}: receive data", Id);
150 
151  if (std::shared_ptr<InputChannel> input = Input.lock())
152  {
153  return input->Receive(data, size);
154  }
155 
156  return 0;
157  }
158 
159  virtual void Send(const char * message, std::size_t size)
160  {
161  LOG_DEBUG(Logger, "{}: send data", Id);
162 
163  if (std::shared_ptr<BufferedInput> output = Output.lock())
164  {
165  output->AddBuffer(message, size);
166  }
167  }
168 
169  virtual void Stop()
170  {
171  if (std::shared_ptr<BufferedInput> output = Output.lock())
172  { output->Stop(); }
173 
174  if (std::shared_ptr<InputChannel> input = Input.lock())
175  { return input->Stop(); }
176  }
177 
178 private:
179  std::weak_ptr<InputChannel> Input;
180  std::weak_ptr<BufferedInput> Output;
181  const std::string Id;
182  Common::Logger::SharedPtr Logger;
183 };
184 
185 
186 void Process(std::shared_ptr<OpcUa::Server::IncomingConnectionProcessor> processor, std::shared_ptr<OpcUa::IOChannel> channel)
187 {
188  processor->Process(channel);
189 }
190 } // namespace
191 
192 
193 BuiltinServerAddon::BuiltinServerAddon(const Common::Logger::SharedPtr & logger)
194  : TcpServer(logger)
195 {
196 }
197 
198 OpcUa::Services::SharedPtr BuiltinServerAddon::GetServices() const
199 {
200  if (!ClientChannel)
201  {
202  throw std::logic_error("Cannot access builtin computer. No endpoints was created. You have to configure endpoints.");
203  }
204 
206  params.EndpointUrl = "opc.tcp://localhost:4841";
207  params.SecurePolicy = "http://opcfoundation.org/UA/SecurityPolicy#None";
209 }
210 
212 {
213  try
214  {
215  Stop();
216  }
217 
218  catch (...)
219  {
220  }
221 }
222 
224 {
225  Logger = addons.GetLogger();
226 
227  for (const Common::Parameter parameter : params.Parameters)
228  {
229  /*
230  if (parameter.Name == "debug" && !parameter.Value.empty() && parameter.Value != "0")
231  {
232  Debug = true;
233  }
234  */
235  }
236 
237  const std::vector<OpcUa::Server::ApplicationData> applications = OpcUa::ParseEndpointsParameters(params.Groups, Logger);
238 
239  for (OpcUa::Server::ApplicationData d : applications)
240  {
241  LOG_INFO(Logger, "endpoint is: {}", d.Endpoints.front().EndpointUrl);
242  }
243 
244  std::vector<OpcUa::ApplicationDescription> applicationDescriptions;
245  std::vector<OpcUa::EndpointDescription> endpointDescriptions;
246 
247  for (const OpcUa::Server::ApplicationData application : applications)
248  {
249  applicationDescriptions.push_back(application.Application);
250  endpointDescriptions.insert(endpointDescriptions.end(), application.Endpoints.begin(), application.Endpoints.end());
251  }
252 
253  OpcUa::Server::EndpointsRegistry::SharedPtr endpointsAddon = addons.GetAddon<OpcUa::Server::EndpointsRegistry>(OpcUa::Server::EndpointsRegistryAddonId);
254 
255  if (!endpointsAddon)
256  {
257  LOG_ERROR(Logger, "cannot store endpoints information, endpoints service addon has not been registered");
258  return;
259  }
260 
261  endpointsAddon->AddEndpoints(endpointDescriptions);
262  endpointsAddon->AddApplications(applicationDescriptions);
263 
264  OpcUa::Server::ServicesRegistry::SharedPtr internalServer = addons.GetAddon<OpcUa::Server::ServicesRegistry>(OpcUa::Server::ServicesRegistryAddonId);
265 
267  Protocol->StartEndpoints(endpointDescriptions, internalServer->GetServer());
268 }
269 
271 {
272  Protocol.reset();
273 
274  if (ClientInput)
275  {
276  ClientInput->Stop();
277  ServerInput->Stop();
278  }
279 
280  if (Thread.get())
281  {
282  Thread->Join();
283  Thread.reset();
284  }
285 
286  ClientInput.reset();
287  ServerInput.reset();
288 }
289 
290 void BuiltinServerAddon::Listen(const OpcUa::Server::TcpParameters &, std::shared_ptr<OpcUa::Server::IncomingConnectionProcessor> processor)
291 {
292  if (Thread)
293  {
294  throw std::logic_error("unable to serve more then one binary connection");
295  }
296 
297  ServerInput.reset(new BufferedInput(Logger));
298  ClientInput.reset(new BufferedInput(Logger));
299 
300  ClientChannel.reset(new BufferedIO("Client", ClientInput, ServerInput, Logger));
301  ServerChannel.reset(new BufferedIO("Server", ServerInput, ClientInput, Logger));
302 
303  Thread.reset(new Common::Thread(std::bind(Process, processor, ServerChannel), this));
304 }
305 
307 {
308  Stop();
309 }
310 
312 {
313  ClientInput->Stop();
314 
315  LOG_DEBUG(Logger, "server thread exited successfully");
316 }
317 
318 void BuiltinServerAddon::OnError(const std::exception & exc)
319 {
320  ClientInput->Stop();
321 
322  LOG_ERROR(Logger, "server thread exited with error: {}", exc.what());
323 }
324 
325 OpcUa::Server::TcpServer::UniquePtr OpcUa::Server::CreateTcpServer(const Common::Logger::SharedPtr & logger)
326 {
327  return TcpServer::UniquePtr(new BuiltinServerAddon(logger));
328 }
329 
d
BufferedInput(const Common::Logger::SharedPtr &logger)
virtual const Logger::SharedPtr & GetLogger() const
Definition: addon_manager.h:90
std::shared_ptr< BufferedInput > ServerInput
const char EndpointsRegistryAddonId[]
void AddBuffer(const char *buf, std::size_t size)
std::unique_ptr< Common::Thread > Thread
Output
std::shared_ptr< OpcUa::IOChannel > ClientChannel
virtual std::shared_ptr< Addon > GetAddon(const AddonId &id) const =0
getting addon by id
virtual void Listen(const OpcUa::Server::TcpParameters &params, std::shared_ptr< OpcUa::Server::IncomingConnectionProcessor > processor) override
#define LOG_ERROR(__logger__,...)
Definition: common/logger.h:27
#define LOG_DEBUG(__logger__,...)
Definition: common/logger.h:24
virtual void OnError(const std::exception &exc) override
Thread exited with error.
Input
virtual void Initialize(Common::AddonsManager &addons, const Common::AddonParameters &params) override
initialize addon.
const char ServicesRegistryAddonId[]
virtual std::size_t Receive(char *data, std::size_t size)
Receive data.
virtual void OnSuccess() override
thread exited with Success.
OpcUa::Server::OpcUaProtocol::SharedPtr Protocol
std::condition_variable DataReady
std::shared_ptr< OpcUa::IOChannel > ServerChannel
#define LOG_INFO(__logger__,...)
Definition: common/logger.h:25
message
Definition: server.py:50
virtual void Stop() override
Stopping addon work. After calling this method addon should throw exception on any calls...
Services::SharedPtr CreateBinaryClient(IOChannel::SharedPtr channel, const SecureConnectionParams &params, const Common::Logger::SharedPtr &logger=nullptr)
Create server based on opc ua binary protocol.
OpcUa::Services::SharedPtr GetServices() const override
Common::Logger::SharedPtr Logger
std::vector< Server::ApplicationData > ParseEndpointsParameters(const std::vector< Common::ParametersGroup > &rootGroup, const Common::Logger::SharedPtr &logger)
std::vector< ParametersGroup > Groups
virtual void StopListen(const OpcUa::Server::TcpParameters &params) override
TcpServer::UniquePtr CreateTcpServer(const Common::Logger::SharedPtr &logger)
Definition: tcp_server.cpp:338
std::shared_ptr< BufferedInput > ClientInput
OpcUaProtocol::UniquePtr CreateOpcUaProtocol(TcpServer &tcpServer, const Common::Logger::SharedPtr &logger)
BuiltinServerAddon(const Common::Logger::SharedPtr &logger=nullptr)
Common::Logger::SharedPtr Logger
Definition: tcp_server.h:47
std::vector< Parameter > Parameters


ros_opcua_impl_freeopcua
Author(s): Denis Štogl
autogenerated on Tue Jan 19 2021 03:06:04