builtin_server_impl.cpp
Go to the documentation of this file.
00001 
00002 
00003 
00004 
00005 
00006 
00007 
00008 
00009 
00010 
00011 #include "builtin_server_impl.h"
00012 
00013 #include <opc/ua/server/addons/endpoints_services.h>
00014 #include <opc/ua/server/endpoints_services.h>
00015 #include <src/server/endpoints_parameters.h>
00016 
00017 #include <iostream>
00018 
00019 using namespace OpcUa::Impl;
00020 
00021 
00022 class OpcUa::Impl::BufferedInput : public OpcUa::InputChannel
00023 {
00024 public:
00025   explicit BufferedInput(bool debug);
00026   virtual std::size_t Receive(char* data, std::size_t size);
00027   void AddBuffer(const char* buf, std::size_t size);
00028   void Stop();
00029 
00030 private:
00031   void ThrowIfStopped();
00032 
00033 private:
00034   std::vector<char> Buffer;
00035   std::atomic<bool> Running;
00036   std::mutex BufferMutex;
00037   std::condition_variable DataReady;
00038   bool Debug;
00039 };
00040 
00041 
00042 BufferedInput::BufferedInput(bool debug)
00043   : Running(true)
00044   , Debug(debug)
00045 {
00046   Buffer.reserve(4096);
00047 }
00048 
00049 std::size_t BufferedInput::Receive(char* data, std::size_t size)
00050 {
00051   if (Debug) std::clog << "Consuming " << size << " bytes of data." << std::endl;
00052 
00053   ThrowIfStopped();
00054 
00055 
00056   std::size_t totalConsumedSize = 0;
00057   while (totalConsumedSize < size)
00058   {
00059     std::unique_lock<std::mutex> event(BufferMutex);
00060     if (Buffer.empty())
00061     {
00062       if (Debug) std::clog << "Waiting data from client" << std::endl;
00063       DataReady.wait(event);
00064     }
00065     else if(!event.owns_lock())
00066     {
00067       event.lock();
00068     }
00069     if (Debug) std::clog << "Buffer contain data from client." << std::endl;
00070     ThrowIfStopped();
00071     if (Debug) std::clog << "Client sent data." << std::endl;
00072 
00073     ThrowIfStopped();
00074     if (Buffer.empty())
00075     {
00076       if (Debug) std::clog << "No data in buffer." << std::endl;
00077       continue;
00078     }
00079 
00080     const std::size_t sizeToConsume = std::min(size - totalConsumedSize, Buffer.size());
00081     if (Debug) std::clog << "Consuming " << sizeToConsume << " bytes of data." << std::endl;
00082     auto endIt = Buffer.begin() + sizeToConsume;
00083     std::copy(begin(Buffer), endIt, data + totalConsumedSize);
00084     Buffer.erase(Buffer.begin(), endIt); // TODO make behavior with round buffer to avoid this.
00085     totalConsumedSize += sizeToConsume;
00086   }
00087 
00088   return totalConsumedSize;
00089 }
00090 
00091 
00092 void BufferedInput::AddBuffer(const char* buf, std::size_t size)
00093 {
00094   ThrowIfStopped();
00095   if (Debug) std::clog << "Client want to send " << size << " bytes of data" << std::endl;
00096   std::lock_guard<std::mutex> lock(BufferMutex);
00097   ThrowIfStopped();
00098 
00099   Buffer.insert(Buffer.end(), buf, buf + size);
00100   if (Debug) std::clog << "Size of buffer " << Buffer.size() << " bytes." << std::endl;
00101   DataReady.notify_all();
00102 }
00103 
00104 void BufferedInput::Stop()
00105 {
00106   Running = false;
00107   DataReady.notify_all();
00108 }
00109 
00110 void BufferedInput::ThrowIfStopped()
00111 {
00112   if (!Running)
00113   {
00114     throw std::logic_error("Conversation through connection stopped.");
00115   }
00116 }
00117 
00118 
00119 namespace
00120 {
00121 
00122   class BufferedIO : public OpcUa::IOChannel
00123   {
00124   public:
00125     BufferedIO(const char* channelId, std::weak_ptr<InputChannel> input, std::weak_ptr<BufferedInput> output, bool debug)
00126       : Input(input)
00127       , Output(output)
00128       , Id(channelId)
00129       , Debug(debug)
00130     {
00131     }
00132 
00133     virtual std::size_t Receive(char* data, std::size_t size)
00134     {
00135       if (Debug) std::clog << Id << ": receive data." << std::endl;
00136 
00137       if (std::shared_ptr<InputChannel> input = Input.lock())
00138       {
00139         return input->Receive(data, size);
00140       }
00141       return 0;
00142     }
00143 
00144     virtual void Send(const char* message, std::size_t size)
00145     {
00146       if (Debug) std::clog << Id << ": send data." << std::endl;
00147 
00148       if (std::shared_ptr<BufferedInput> output = Output.lock())
00149       {
00150         output->AddBuffer(message, size);
00151       }
00152     }
00153 
00154     virtual void Stop()
00155     {
00156       if (std::shared_ptr<BufferedInput> output = Output.lock())
00157         output->Stop();
00158 
00159       if (std::shared_ptr<InputChannel> input = Input.lock())
00160         return input->Stop();
00161     }
00162 
00163   private:
00164     std::weak_ptr<InputChannel> Input;
00165     std::weak_ptr<BufferedInput> Output;
00166     const std::string Id;
00167     bool Debug;
00168   };
00169 
00170 
00171   void Process(std::shared_ptr<OpcUa::Server::IncomingConnectionProcessor> processor, std::shared_ptr<OpcUa::IOChannel> channel)
00172   {
00173     processor->Process(channel);
00174   }
00175 }  // namespace
00176 
00177 
00178 
00179 BuiltinServerAddon::BuiltinServerAddon()
00180   : Debug(false)
00181 {
00182 }
00183 
00184 OpcUa::Services::SharedPtr BuiltinServerAddon::GetServices() const
00185 {
00186   if (!ClientChannel)
00187   {
00188     throw std::logic_error("Cannot access builtin computer. No endpoints was created. You have to configure endpoints.");
00189   }
00190 
00191   OpcUa::SecureConnectionParams params;
00192   params.EndpointUrl = "opc.tcp://localhost:4841";
00193   params.SecurePolicy = "http://opcfoundation.org/UA/SecurityPolicy#None";
00194   return OpcUa::CreateBinaryClient(ClientChannel, params);
00195 }
00196 
00197 BuiltinServerAddon::~BuiltinServerAddon()
00198 {
00199   try
00200   {
00201     Stop();
00202   }
00203   catch (...)
00204   {
00205   }
00206 }
00207 
00208 void BuiltinServerAddon::Initialize(Common::AddonsManager& addons, const Common::AddonParameters& params)
00209 {
00210   for (const Common::Parameter parameter : params.Parameters)
00211   {
00212     if (parameter.Name == "debug" && !parameter.Value.empty() && parameter.Value != "0")
00213     {
00214       Debug = true;
00215     }
00216   }
00217 
00218   const std::vector<OpcUa::Server::ApplicationData> applications = OpcUa::ParseEndpointsParameters(params.Groups, Debug);
00219   for (OpcUa::Server::ApplicationData d: applications) {
00220     std::cout << "Endpoint is: " << d.Endpoints.front().EndpointUrl << std::endl;
00221   }
00222 
00223   std::vector<OpcUa::ApplicationDescription> applicationDescriptions;
00224   std::vector<OpcUa::EndpointDescription> endpointDescriptions;
00225   for (const OpcUa::Server::ApplicationData application : applications)
00226   {
00227     applicationDescriptions.push_back(application.Application);
00228     endpointDescriptions.insert(endpointDescriptions.end(), application.Endpoints.begin(), application.Endpoints.end());
00229   }
00230 
00231   OpcUa::Server::EndpointsRegistry::SharedPtr endpointsAddon = addons.GetAddon<OpcUa::Server::EndpointsRegistry>(OpcUa::Server::EndpointsRegistryAddonId);
00232   if (!endpointsAddon)
00233   {
00234     std::cerr << "Cannot save information about endpoints. Endpoints services addon didn't' registered." << std::endl;
00235     return;
00236   }
00237   endpointsAddon->AddEndpoints(endpointDescriptions);
00238   endpointsAddon->AddApplications(applicationDescriptions);
00239 
00240   OpcUa::Server::ServicesRegistry::SharedPtr internalServer = addons.GetAddon<OpcUa::Server::ServicesRegistry>(OpcUa::Server::ServicesRegistryAddonId);
00241 
00242   Protocol = OpcUa::Server::CreateOpcUaProtocol(*this, Debug);
00243   Protocol->StartEndpoints(endpointDescriptions, internalServer->GetServer());
00244 }
00245 
00246 void BuiltinServerAddon::Stop()
00247 {
00248   Protocol.reset();
00249   if (ClientInput)
00250   {
00251     ClientInput->Stop();
00252     ServerInput->Stop();
00253   }
00254 
00255   if (Thread.get())
00256   {
00257     Thread->Join();
00258     Thread.reset();
00259   }
00260 
00261   ClientInput.reset();
00262   ServerInput.reset();
00263 }
00264 
00265 void BuiltinServerAddon::Listen(const OpcUa::Server::TcpParameters&, std::shared_ptr<OpcUa::Server::IncomingConnectionProcessor> processor)
00266 {
00267   if (Thread)
00268   {
00269     throw std::logic_error("Unable to start second thread. Builtin computer can listen only one binary connection.");
00270   }
00271 
00272   ServerInput.reset(new BufferedInput(Debug));
00273   ClientInput.reset(new BufferedInput(Debug));
00274 
00275   ClientChannel.reset(new BufferedIO("Client", ClientInput, ServerInput, Debug));
00276   ServerChannel.reset(new BufferedIO("Server", ServerInput, ClientInput, Debug));
00277 
00278   Thread.reset(new Common::Thread(std::bind(Process, processor, ServerChannel), this));
00279 }
00280 
00281 void BuiltinServerAddon::StopListen(const OpcUa::Server::TcpParameters&)
00282 {
00283   Stop();
00284 }
00285 
00286 void BuiltinServerAddon::OnSuccess()
00287 {
00288   ClientInput->Stop();
00289   if (Debug) std::clog  << "Server thread exited with success." << std::endl;
00290 }
00291 
00292 void BuiltinServerAddon::OnError(const std::exception& exc)
00293 {
00294   ClientInput->Stop();
00295   if (Debug) std::clog  << "Server thread exited with error: " << exc.what() << std::endl;
00296 }
00297 
00298 OpcUa::Server::TcpServer::UniquePtr OpcUa::Server::CreateTcpServer()
00299 {
00300   return TcpServer::UniquePtr(new BuiltinServerAddon);
00301 }
00302 


ros_opcua_impl_freeopcua
Author(s): Denis Štogl
autogenerated on Sat Jun 8 2019 18:24:40