Go to the documentation of this file.00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011 #include "builtin_server_impl.h"
00012
00013 #include <opc/ua/server/addons/endpoints_services.h>
00014 #include <opc/ua/server/endpoints_services.h>
00015 #include <src/server/endpoints_parameters.h>
00016
00017 #include <iostream>
00018
00019 using namespace OpcUa::Impl;
00020
00021
00022 class OpcUa::Impl::BufferedInput : public OpcUa::InputChannel
00023 {
00024 public:
00025 explicit BufferedInput(bool debug);
00026 virtual std::size_t Receive(char* data, std::size_t size);
00027 void AddBuffer(const char* buf, std::size_t size);
00028 void Stop();
00029
00030 private:
00031 void ThrowIfStopped();
00032
00033 private:
00034 std::vector<char> Buffer;
00035 std::atomic<bool> Running;
00036 std::mutex BufferMutex;
00037 std::condition_variable DataReady;
00038 bool Debug;
00039 };
00040
00041
00042 BufferedInput::BufferedInput(bool debug)
00043 : Running(true)
00044 , Debug(debug)
00045 {
00046 Buffer.reserve(4096);
00047 }
00048
00049 std::size_t BufferedInput::Receive(char* data, std::size_t size)
00050 {
00051 if (Debug) std::clog << "Consuming " << size << " bytes of data." << std::endl;
00052
00053 ThrowIfStopped();
00054
00055
00056 std::size_t totalConsumedSize = 0;
00057 while (totalConsumedSize < size)
00058 {
00059 std::unique_lock<std::mutex> event(BufferMutex);
00060 if (Buffer.empty())
00061 {
00062 if (Debug) std::clog << "Waiting data from client" << std::endl;
00063 DataReady.wait(event);
00064 }
00065 else if(!event.owns_lock())
00066 {
00067 event.lock();
00068 }
00069 if (Debug) std::clog << "Buffer contain data from client." << std::endl;
00070 ThrowIfStopped();
00071 if (Debug) std::clog << "Client sent data." << std::endl;
00072
00073 ThrowIfStopped();
00074 if (Buffer.empty())
00075 {
00076 if (Debug) std::clog << "No data in buffer." << std::endl;
00077 continue;
00078 }
00079
00080 const std::size_t sizeToConsume = std::min(size - totalConsumedSize, Buffer.size());
00081 if (Debug) std::clog << "Consuming " << sizeToConsume << " bytes of data." << std::endl;
00082 auto endIt = Buffer.begin() + sizeToConsume;
00083 std::copy(begin(Buffer), endIt, data + totalConsumedSize);
00084 Buffer.erase(Buffer.begin(), endIt);
00085 totalConsumedSize += sizeToConsume;
00086 }
00087
00088 return totalConsumedSize;
00089 }
00090
00091
00092 void BufferedInput::AddBuffer(const char* buf, std::size_t size)
00093 {
00094 ThrowIfStopped();
00095 if (Debug) std::clog << "Client want to send " << size << " bytes of data" << std::endl;
00096 std::lock_guard<std::mutex> lock(BufferMutex);
00097 ThrowIfStopped();
00098
00099 Buffer.insert(Buffer.end(), buf, buf + size);
00100 if (Debug) std::clog << "Size of buffer " << Buffer.size() << " bytes." << std::endl;
00101 DataReady.notify_all();
00102 }
00103
00104 void BufferedInput::Stop()
00105 {
00106 Running = false;
00107 DataReady.notify_all();
00108 }
00109
00110 void BufferedInput::ThrowIfStopped()
00111 {
00112 if (!Running)
00113 {
00114 throw std::logic_error("Conversation through connection stopped.");
00115 }
00116 }
00117
00118
00119 namespace
00120 {
00121
00122 class BufferedIO : public OpcUa::IOChannel
00123 {
00124 public:
00125 BufferedIO(const char* channelId, std::weak_ptr<InputChannel> input, std::weak_ptr<BufferedInput> output, bool debug)
00126 : Input(input)
00127 , Output(output)
00128 , Id(channelId)
00129 , Debug(debug)
00130 {
00131 }
00132
00133 virtual std::size_t Receive(char* data, std::size_t size)
00134 {
00135 if (Debug) std::clog << Id << ": receive data." << std::endl;
00136
00137 if (std::shared_ptr<InputChannel> input = Input.lock())
00138 {
00139 return input->Receive(data, size);
00140 }
00141 return 0;
00142 }
00143
00144 virtual void Send(const char* message, std::size_t size)
00145 {
00146 if (Debug) std::clog << Id << ": send data." << std::endl;
00147
00148 if (std::shared_ptr<BufferedInput> output = Output.lock())
00149 {
00150 output->AddBuffer(message, size);
00151 }
00152 }
00153
00154 virtual void Stop()
00155 {
00156 if (std::shared_ptr<BufferedInput> output = Output.lock())
00157 output->Stop();
00158
00159 if (std::shared_ptr<InputChannel> input = Input.lock())
00160 return input->Stop();
00161 }
00162
00163 private:
00164 std::weak_ptr<InputChannel> Input;
00165 std::weak_ptr<BufferedInput> Output;
00166 const std::string Id;
00167 bool Debug;
00168 };
00169
00170
00171 void Process(std::shared_ptr<OpcUa::Server::IncomingConnectionProcessor> processor, std::shared_ptr<OpcUa::IOChannel> channel)
00172 {
00173 processor->Process(channel);
00174 }
00175 }
00176
00177
00178
00179 BuiltinServerAddon::BuiltinServerAddon()
00180 : Debug(false)
00181 {
00182 }
00183
00184 OpcUa::Services::SharedPtr BuiltinServerAddon::GetServices() const
00185 {
00186 if (!ClientChannel)
00187 {
00188 throw std::logic_error("Cannot access builtin computer. No endpoints was created. You have to configure endpoints.");
00189 }
00190
00191 OpcUa::SecureConnectionParams params;
00192 params.EndpointUrl = "opc.tcp://localhost:4841";
00193 params.SecurePolicy = "http://opcfoundation.org/UA/SecurityPolicy#None";
00194 return OpcUa::CreateBinaryClient(ClientChannel, params);
00195 }
00196
00197 BuiltinServerAddon::~BuiltinServerAddon()
00198 {
00199 try
00200 {
00201 Stop();
00202 }
00203 catch (...)
00204 {
00205 }
00206 }
00207
00208 void BuiltinServerAddon::Initialize(Common::AddonsManager& addons, const Common::AddonParameters& params)
00209 {
00210 for (const Common::Parameter parameter : params.Parameters)
00211 {
00212 if (parameter.Name == "debug" && !parameter.Value.empty() && parameter.Value != "0")
00213 {
00214 Debug = true;
00215 }
00216 }
00217
00218 const std::vector<OpcUa::Server::ApplicationData> applications = OpcUa::ParseEndpointsParameters(params.Groups, Debug);
00219 for (OpcUa::Server::ApplicationData d: applications) {
00220 std::cout << "Endpoint is: " << d.Endpoints.front().EndpointUrl << std::endl;
00221 }
00222
00223 std::vector<OpcUa::ApplicationDescription> applicationDescriptions;
00224 std::vector<OpcUa::EndpointDescription> endpointDescriptions;
00225 for (const OpcUa::Server::ApplicationData application : applications)
00226 {
00227 applicationDescriptions.push_back(application.Application);
00228 endpointDescriptions.insert(endpointDescriptions.end(), application.Endpoints.begin(), application.Endpoints.end());
00229 }
00230
00231 OpcUa::Server::EndpointsRegistry::SharedPtr endpointsAddon = addons.GetAddon<OpcUa::Server::EndpointsRegistry>(OpcUa::Server::EndpointsRegistryAddonId);
00232 if (!endpointsAddon)
00233 {
00234 std::cerr << "Cannot save information about endpoints. Endpoints services addon didn't' registered." << std::endl;
00235 return;
00236 }
00237 endpointsAddon->AddEndpoints(endpointDescriptions);
00238 endpointsAddon->AddApplications(applicationDescriptions);
00239
00240 OpcUa::Server::ServicesRegistry::SharedPtr internalServer = addons.GetAddon<OpcUa::Server::ServicesRegistry>(OpcUa::Server::ServicesRegistryAddonId);
00241
00242 Protocol = OpcUa::Server::CreateOpcUaProtocol(*this, Debug);
00243 Protocol->StartEndpoints(endpointDescriptions, internalServer->GetServer());
00244 }
00245
00246 void BuiltinServerAddon::Stop()
00247 {
00248 Protocol.reset();
00249 if (ClientInput)
00250 {
00251 ClientInput->Stop();
00252 ServerInput->Stop();
00253 }
00254
00255 if (Thread.get())
00256 {
00257 Thread->Join();
00258 Thread.reset();
00259 }
00260
00261 ClientInput.reset();
00262 ServerInput.reset();
00263 }
00264
00265 void BuiltinServerAddon::Listen(const OpcUa::Server::TcpParameters&, std::shared_ptr<OpcUa::Server::IncomingConnectionProcessor> processor)
00266 {
00267 if (Thread)
00268 {
00269 throw std::logic_error("Unable to start second thread. Builtin computer can listen only one binary connection.");
00270 }
00271
00272 ServerInput.reset(new BufferedInput(Debug));
00273 ClientInput.reset(new BufferedInput(Debug));
00274
00275 ClientChannel.reset(new BufferedIO("Client", ClientInput, ServerInput, Debug));
00276 ServerChannel.reset(new BufferedIO("Server", ServerInput, ClientInput, Debug));
00277
00278 Thread.reset(new Common::Thread(std::bind(Process, processor, ServerChannel), this));
00279 }
00280
00281 void BuiltinServerAddon::StopListen(const OpcUa::Server::TcpParameters&)
00282 {
00283 Stop();
00284 }
00285
00286 void BuiltinServerAddon::OnSuccess()
00287 {
00288 ClientInput->Stop();
00289 if (Debug) std::clog << "Server thread exited with success." << std::endl;
00290 }
00291
00292 void BuiltinServerAddon::OnError(const std::exception& exc)
00293 {
00294 ClientInput->Stop();
00295 if (Debug) std::clog << "Server thread exited with error: " << exc.what() << std::endl;
00296 }
00297
00298 OpcUa::Server::TcpServer::UniquePtr OpcUa::Server::CreateTcpServer()
00299 {
00300 return TcpServer::UniquePtr(new BuiltinServerAddon);
00301 }
00302